Namenode HA原理
  GQ7psP7UJw7k 2023年11月05日 63 0

1. 背景

在Hadoop2.0前,NameNode存在单点问题,造成服务稳定性差。Hadoop2.0后,引入HA机制,通过zk选举的方式选举active节点提供服务。

https://blog.51cto.com/u_15327484/7850359一文中,介绍过resourmanager高可用过程。NameNode HA在选举流程上和resourmanager一致,但是,为了降低复杂度,同时也为了规避选举过程对namenode的服务稳定性影响。HDFS讲NameNode选举流程独立到zkfc进程中。如下是NameNode HA架构图:

Untitled.png

2. NameNode启动时状态分析

在NameNode启动时,会创建初始的NameNode状态,如下所示,默认为STANDBY_STATE。然后进入STANDBY_STATE状态:


HAState state = createHAState(getStartupOption(conf));

protected HAState createHAState(StartupOption startOpt) {
    if (!haEnabled || startOpt == StartupOption.UPGRADE
        || startOpt == StartupOption.UPGRADEONLY) {
      return ACTIVE_STATE;
    } else if (startOpt == StartupOption.OBSERVER) {
      return OBSERVER_STATE;
    } else {
      return STANDBY_STATE;
    }
  }

state.enterState(haContext);

最后执行NameNode.startStandbyServices开始启动Standby状态的服务线程,例如checkpoint相关。在上一篇文章中已经详细解释了相关逻辑:https://blog.51cto.com/u_15327484/8122340

public void startStandbyServices() throws IOException {
      try {
        namesystem.startStandbyServices(getConf(),
            state == NameNode.OBSERVER_STATE);
      } catch (Throwable t) {
        doImmediateShutdown(t);
      }
    }

3. ZKFC初始化

在两个NameNode启动后,都处于Standby状态。后续会在两台namenode上启动zkfc进程:

hadoop/sbin/hadoop-daemon.sh start zkfc

进入~/hadoop/bin/hdfs脚本中,发现它启动了DFSZKFailoverController类:

    zkfc)
      HADOOP_SUBCMD_SUPPORTDAEMONIZATION="true"
      HADOOP_CLASSNAME='org.apache.hadoop.hdfs.tools.DFSZKFailoverController'
    ;;

DFSZKFailoverController.main方法创建DFSZKFailoverController对象,执行它的run方法:

DFSZKFailoverController zkfc = DFSZKFailoverController.create(
          parser.getConfiguration());
      System.exit(zkfc.run(parser.getRemainingArgs()));

最终循环执行ZKFailoverController.doRun方法。正式控制NameNode主备状态:

private int doRun(String[] args)
      throws Exception {
    try {
      initZK();
    } catch (KeeperException ke) {
      LOG.error("Unable to start failover controller. Unable to connect "
          + "to ZooKeeper quorum at " + zkQuorum + ". Please check the "
          + "configured value for " + ZK_QUORUM_KEY + " and ensure that "
          + "ZooKeeper is running.", ke);
      return ERR_CODE_NO_ZK;
    }//省略

    try {
      //RPC服务初始化
      initRPC();
      //启动HealthMonitor线程
      initHM();
      startRPC();
      mainLoop();
    } catch (Exception e) {
      LOG.error("The failover controller encounters runtime error: ", e);
      throw e;
    } finally {
      rpcServer.stopAndJoin();
      
      elector.quitElection(true);
      healthMonitor.shutdown();
      healthMonitor.join();
    }
    return 0;
  }

initZK主要负责读取core-site.xml中zk的地址配置ha.zookeeper.quorum,根据地址创建ActiveStandbyElector对象:

    //读取ha.zookeeper.quorum
    zkQuorum = conf.get(ZK_QUORUM_KEY);
    int zkTimeout = conf.getInt(ZK_SESSION_TIMEOUT_KEY,
        ZK_SESSION_TIMEOUT_DEFAULT);
    // Parse ACLs from configuration.
    String zkAclConf = conf.get(ZK_ACL_KEY, ZK_ACL_DEFAULT);
    zkAclConf = ZKUtil.resolveConfIndirection(zkAclConf);
    List<ACL> zkAcls = ZKUtil.parseACLs(zkAclConf);
    if (zkAcls.isEmpty()) {
      zkAcls = Ids.CREATOR_ALL_ACL;
    }
    
    //创建ActiveStandbyElector对象
    elector = new ActiveStandbyElector(zkQuorum,
        zkTimeout, getParentZnode(), zkAcls, zkAuths,
        new ElectorCallbacks(), maxRetryNum);

ActiveStandbyElector在ResourceManager中的功能一致,可以参考文章:https://blog.51cto.com/u_15327484/7850359

创建ActiveStandbyElector时,就会创建zk的连接:

// establish the ZK Connection for future API calls
    if (failFast) {
      createConnection();
    } else {
      reEstablishSession();
    }

最终执行connectToZooKeeper方法,将WatcherWithClientRef注册到zk客户端中:

protected synchronized ZooKeeper connectToZooKeeper() throws IOException,
      KeeperException {
    
    
    watcher = new WatcherWithClientRef();
    ZooKeeper zk = createZooKeeper();
    watcher.setZooKeeperRef(zk);

    // Wait for the asynchronous success/failure. This may throw an exception
    // if we don't connect within the session timeout.
    watcher.waitForZKConnectionEvent(zkSessionTimeout);
    
    for (ZKAuthInfo auth : zkAuthInfo) {
      zk.addAuthInfo(auth.getScheme(), auth.getAuth());
    }
    return zk;
  }

WatcherWithClientRef.processWatchEvent方法根据zk返回出来的状态,从而决定切换至Active还是Standby状态:

if (eventType == Event.EventType.None) {
      // the connection state has changed
      switch (event.getState()) {
      case SyncConnected:
        LOG.info("Session connected.");
        // if the listener was asked to move to safe state then it needs to
        // be undone
        ConnectionState prevConnectionState = zkConnectionState;
        zkConnectionState = ConnectionState.CONNECTED;
        if (prevConnectionState == ConnectionState.DISCONNECTED &&
            wantToBeInElection) {
          monitorActiveStatus();
        }
        break;
      case Disconnected:
        LOG.info("Session disconnected. Entering neutral mode...");

        // ask the app to move to safe state because zookeeper connection
        // is not active and we dont know our state
        zkConnectionState = ConnectionState.DISCONNECTED;
        enterNeutralMode();
        break;
      case Expired:
        // the connection got terminated because of session timeout
        // call listener to reconnect
        LOG.info("Session expired. Entering neutral mode and rejoining...");
        enterNeutralMode();
        reJoinElection(0);
        break;
      case SaslAuthenticated:
        LOG.info("Successfully authenticated to ZooKeeper using SASL.");
        break;
      default:
        fatalError("Unexpected Zookeeper watch event state: "
            + event.getState());
        break;
      }

ZKFailoverController初始化ZK客户端结束后,调用initHM启动HealthMonitor线程:

private void initHM() {
  healthMonitor = new HealthMonitor(conf, localTarget);
  healthMonitor.addCallback(new HealthCallbacks());
  healthMonitor.addServiceStateCallback(new ServiceStateCallBacks());
  healthMonitor.start();
}

4. ZKFC监控NameNode状态并切换NameNode状态

它会启动MonitorDaemon守护线程。它会先连接NameNode,再通过RPC请求获取NameNode状态:

public void run() {
      while (shouldRun) {
        try { 
          //连接NameNode
          loopUntilConnected();
          //再检测NameNode状态
          doHealthChecks();
        } catch (InterruptedException ie) {
          Preconditions.checkState(!shouldRun,
              "Interrupted but still supposed to run");
        }
      }
    }
  }

不管是健康状态还是不健康状态,都会调用enterState方法,在enterState方法中,会对新旧进行判断,如果不相等,则对调用相应的回调方法进行处理。 比如原来的状态是健康状态,后来namenode挂掉了,则是非健康状态了,这时候就要调用ZKFailoverController的相应方法进行主备选举了。具体处理逻辑不深入研究:

private void doHealthChecks() throws InterruptedException {
    while (shouldRun) {
      HAServiceStatus status = null;
      boolean healthy = false;
      try {
        status = proxy.getServiceStatus();
        
        //调用和namenode交互的协议HAServiceProtocol的monitorHealth方法发送远程的rpc请求检查namenode的状态,具体的实现方法是NameNodeRpcServer中的同名方法
        
        proxy.monitorHealth();
        healthy = true;
      } catch (Throwable t) {
        if (isHealthCheckFailedException(t)) {
          //服务运行但是出于不健康状态
          LOG.warn("Service health check failed for " + targetToMonitor
              + ": " + t.getMessage());
          enterState(State.SERVICE_UNHEALTHY);
        } else {
         //无响应
          LOG.warn("Transport-level exception trying to monitor health of " +
              targetToMonitor + ": " + t.getCause() + " " + t.getLocalizedMessage());
          RPC.stopProxy(proxy);
          proxy = null;
          enterState(State.SERVICE_NOT_RESPONDING);
          Thread.sleep(sleepAfterDisconnectMillis);
          return;
        }
      }
      
      if (status != null) {
        setLastServiceStatus(status);
      }
      //正常状态
      if (healthy) {
        enterState(State.SERVICE_HEALTHY);
      }

      Thread.sleep(checkIntervalMillis);
    }
  }

进行主备选举时,实际上就是调用zkClient.create创建临时节点。它会调用ActiveStandbyElector.processResult处理结果。

public synchronized void processResult(int rc, String path, Object ctx,
      String name) {
    if (isStaleClient(ctx)) return;
    if (LOG.isDebugEnabled()) {
      LOG.debug("CreateNode result: " + rc + " for path: " + path
          + " connectionState: " + zkConnectionState +
          "  for " + this);
    }
    //如果成功创建临时节点,就进入active状态
    Code code = Code.get(rc);
    if (isSuccess(code)) {
      // we successfully created the znode. we are the leader. start monitoring
      if (becomeActive()) {
        monitorActiveStatus();
      } else {
        reJoinElectionAfterFailureToBecomeActive();
      }
      return;
    }
    //如果已存在临时节点,继续变成standby状态
    if (isNodeExists(code)) {
      if (createRetryCount == 0) {
        // znode exists and we did not retry the operation. so a different
        // instance has created it. become standby and monitor lock.
        becomeStandby();
      }
      // if we had retried then the znode could have been created by our first
      // attempt to the server (that we lost) and this node exists response is
      // for the second attempt. verify this case via ephemeral node owner. this
      // will happen on the callback for monitoring the lock.
      monitorActiveStatus();
      return;
    }

    //省略
  }

这里的becomeActive()和becomeStandby()状态是重点:

private boolean becomeActive() {
    assert wantToBeInElection;
    if (state == State.ACTIVE) {
      // already active
      return true;
    }
    try {
      //zk上创建BreadCrumbNode节点
      Stat oldBreadcrumbStat = fenceOldActive();
      writeBreadCrumbNode(oldBreadcrumbStat);

      LOG.debug("Becoming active for {}", this);
      //转化为active状态
      appClient.becomeActive();
      state = State.ACTIVE;
      return true;
    } catch (Exception e) {
      LOG.warn("Exception handling the winning of election", e);
      // Caller will handle quitting and rejoining the election.
      return false;
    }
  }

对于ActiveStandbyElectorCallback类型的appClient有两个实现。在ResourceManager的高可用中是ActiveStandbyElectorBasedElectorService,而在ZKFC中,它的实现是ElectorCallbacks:

Untitled 1.png

ElectorCallbacks最终调用HAServiceProtocol.transitionToActive方法发送rpc请求,命令NameNode转化为active状态,对于StandBy状态的处理过程相同:

public static void transitionToActive(HAServiceProtocol svc,
      StateChangeRequestInfo reqInfo)
      throws IOException {
    try {
      svc.transitionToActive(reqInfo);
    } catch (RemoteException e) {
      throw e.unwrapRemoteException(ServiceFailedException.class);
    }
  }

5. NameNode脑裂问题预防机制

5.1 脑裂处理逻辑

当网络抖动时,zkfc检测不到Active NameNode,此时认为NameNode挂掉了,因此standby Namenode切换成为Active Namenode。而旧的Active NameNode由于网络抖动,接收不到zkfc的切换命令。此时两个NameNode都是active状态。这就是脑裂。

NameNode HA中,如果zkClient.create成功创建了临时节点hadoop-ha/${dfs.nameservices}/ActiveStandbyElectorLock ,在转换为Active状态时:

  1. 新晋NameNode对应的zkfc进程会执行隔离操作,例如通过SSH将旧的Active NameNode进程杀死。
  2. zkfc会额外创建持久节点/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb
private boolean becomeActive() {
    assert wantToBeInElection;
    if (state == State.ACTIVE) {
      // already active
      return true;
    }
    try {
      Stat oldBreadcrumbStat = fenceOldActive();
      writeBreadCrumbNode(oldBreadcrumbStat);
    //省略
}

默认情况下,如果active namenode退出后,会删除/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb 节点。如果没有删除,就表示/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb 和当前active namenode节点不一致。执行 fenceOldActive方法隔离旧的active namenode:

if (Arrays.equals(data, appData)) {
      LOG.info("But old node has our own data, so don't need to fence it.");
    } else {
      appClient.fenceOldActive(data);
    }

resourmanager中,fenceOldActive方法为空,即不处理脑裂问题:

public void fenceOldActive(byte[] oldActiveData) {
  if (LOG.isDebugEnabled()) {
LOG.debug("Request to fence old active being ignored, " +
        "as embedded leader election doesn't support fencing");
  }
}

NameNode会进行处理:

public void fenceOldActive(byte[] data) {
      ZKFailoverController.this.fenceOldActive(data);
    }

ZKFailoverController.doFence方法:

  1. 尝试通过rpc请求切换到standby状态,并检查状态。
  2. 如果方法一失败,就执行ssh隔离脑裂节点。
private void doFence(HAServiceTarget target) {
    LOG.info("Should fence: " + target);
    //尝试通过rpc请求切换到standby状态
    boolean gracefulWorked = new FailoverController(conf,
        RequestSource.REQUEST_BY_ZKFC).tryGracefulFence(target);
    if (gracefulWorked) {
      // It's possible that it's in standby but just about to go into active,
      // no? Is there some race here?
      LOG.info("Successfully transitioned " + target + " to standby " +
          "state without fencing");
      return;
    }
    
    try {
     //检查状态
      target.checkFencingConfigured();
    } catch (BadFencingConfigurationException e) {
      LOG.error("Couldn't fence old active " + target, e);
      recordActiveAttempt(new ActiveAttemptRecord(false, "Unable to fence old active"));
      throw new RuntimeException(e);
    }
    //最后执行fence方法,通过ssh方式隔离进程
    if (!target.getFencer().fence(target)) {
      throw new RuntimeException("Unable to fence " + target);
    }
  }

可以通过脚本或者端口设置的方式进行隔离:

  1. 通过脚本的方式,可以自行设置隔离逻辑,例如关闭NameNode后,启动NameNode。
  2. 通过端口的方式,通过ssh会直接kill 旧的active NameNode进程,最简单。
<property>
      <name>dfs.ha.fencing.methods</name>
      <value>sshfence
             shell(/data/hadoop/scripts/fence.sh)</value>
  </property>

<property>
    <name>dfs.ha.fencing.methods</name>
    <value>sshfence(hadoop:32200)</value>
  </property>

隔离完旧的active NameNode进程后,会重新向zk写入最新的持久节点/hadoop-ha/${dfs.nameservices}/ActiveBreadCrumb

5.2 脑裂的拖底策略

每个NameNode 与 JournalNodes通信时,需要带一个 epoch numbers(epoch numbers 是唯一的且只增不减)。而每个JournalNode 都有一个本地的promised epoch。拥有值大的epoch numbers 的NameNode会使得JournalNode提升自己的 promised epoch,从而占大多数,而epoch numbers较小的那个NameNode就成了少数派(Paxos协议思想)。

从而epoch number值大的NameNode才是真正的Active NameNode,拥有写JournalNode的权限。这样旧的active namenode的epoch number较小,journalnode禁止写入这样的edits,从而避免污染已有的fsimage。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
GQ7psP7UJw7k
最新推荐 更多

2024-05-03