resourcemanager高可用原理
  GQ7psP7UJw7k 2023年11月02日 48 0

1. 背景

一年前写过的一篇文章:https://blog.51cto.com/u_15327484/5046768,介绍了ResourceManager的启动流程。文章中介绍了ResourceManager的选举流程,但是行文逻辑较混乱。本文在此基础上,更清晰地介绍了resourcemanager的高可用原理,希望能够达到小白也能看懂的程度。

2. Zookeeper相关背景知识

2.1 Watcher通知

使用场景:客户端通过注册watcher,当zookeeper的某一路径状态发生变化时,会向客户端发送watcher事件,客户端中,通过Watcher#process处理该通知。

Watcher使用

方法一:创建zookeeper客户端时指定,它会作为默认的watcher请求监听指定路径:

public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException {
      this(connectString, sessionTimeout, watcher, false);
 }

方法二:通过zookeeper客户端api调用时指定watcher:

public void getData(String path, boolean watch, DataCallback cb, Object ctx)
public Stat exists(String path, boolean watch)
public List<String> getChildren(String path, boolean watch)

Zookeeper客户端线程模型:

在创建客户端Zookeeper对象的过程中,会创建连接相关线程并启动:

cnxn = createConnection(
            connectStringParser.getChrootPath(),
            hostProvider,
            sessionTimeout,
            this,
            watchManager,
            getClientCnxnSocket(),
            canBeReadOnly);
    	
        cnxn.start();

具体会创建发送请求的线程和处理响应的线程:

sendThread = new SendThread(clientCnxnSocket);
eventThread = new EventThread();

zookeeper客户端主线程会将请求封装成Packet,交给发送线程sendThread处理:

public Packet queuePacket(...){
    Packet packet = null;
    packet = new Packet(h, r, request, response, watchRegistration);
    ...
    packet.watchDeregistration = watchDeregistration;
    ...     
    outgoingQueue.add(packet);
    ...
    sendThread.getClientCnxnSocket().packetAdded();
}

当zk服务端触发事件并返回给客户端时,发送线程sendThread获得了响应,将响应交付给eventThread处理:

public void run() {
	try {
	  isRunning = true;
       
      // 无限循环
	  while (true) {
         // 取出事件
	     Object event = waitingEvents.take();
	     if (event == eventOfDeath) {
	        wasKilled = true;
	     } else {
            // 处理事件 
	        processEvent(event);
	     }
	     if (wasKilled)
	        synchronized (waitingEvents) {
	           if (waitingEvents.isEmpty()) {
	              isRunning = false;
	              break;
	           }
	        }
	  }
     ...
}

eventThread则调用对应的watcher.process处理通知事件:

private void processEvent(Object event) {
  ...
  if (event instanceof WatcherSetEventPair) {
      
      WatcherSetEventPair pair = (WatcherSetEventPair) event;
      for (Watcher watcher : pair.watchers) {
          try {
              // 回调事件监听的处理
              watcher.process(pair.event);
          } catch (Throwable t) {
              LOG.error("Error while calling watcher ", t);
          }
      }
  }
}

注意,一旦客户端接收了watcher事件,该watcher就失效。原因就是对于指定事件,已经处理过一次,无法重复处理同一事件,如果还需要处理,可以在watcher.process处理过程中,通过调用zk的api的方式,再次注册watcher。watcher在客户端和服务端的处理流程如下所示: Untitled.png

2.2 Callback回调

使用场景:客户端通过异步的方式,通过实现的processResult回调函数执行api的返回结果。回调是异步的,它对应的是同步。即,客户端不希望阻塞等待服务端响应,则通过回调的方式在异步线程中执行。

回调的使用:在zk客户端访问服务端时,可以指定回调函数,它会异步执行响应处理。如下,同步的方法,返回的是byte[]类型:

byte[] getData(String path, boolean watch,Stat stat)

异步的方法,返回的是void。因为真正处理响应的是AsyncCallback.DataCallback参数中的逻辑:

void getData(String path,Watcher watcher,AsyncCallback.DataCallback cb,Object ctx)

2.3 Watcher和Callback区别

watcher和callback虽然在功能看起来相似。但是在使用场景中,却是互补的关系。以ResourceManager为例:

  1. 在启动多个resourcemanager服务时,各个服务都会调用zkClient.create方法,向zookeeper同一路径创建EPHEMERAL节点,只有一个resourcemanager服务能够创建成功。
  2. 各个服务都会调用zkClient.create后,通过zk服务端的返回值,就知道自己是否成功创建了EPHEMERAL节点,成功了就称为了active节点,失败了就转换成standby节点。这个处理只需要使用Callback就行,watcher没必要监听该事件。
  3. 由于standby状态的resourcemanager可能会成为active,而active状态的resourcemanager可能因为OOM转换成为standby。我们无法知道什么时候这些事件会发生,这时Callback就无能为力了,必须使用watcher请求监听,发生事件后进行处理。

总结:Callback只能处理zk客户端api的响应;watcher则是客户端处理zk服务端的突发事件。

3. ResourceManager Callback处理流程

ResourceManager启动时,创建并启动了EmbeddedElector选举服务。默认情况下,选举服务的实现类是ActiveStandbyElectorBasedElectorService。

如果在yarn-site.xml中,设置yarn.resourcemanager.ha.curator-leader-elector.enabled参数为true,则实现类就是CuratorBasedElectorService,它基于curator框架,对zk的api进行了一些封装,使用起来更简单。

但是目前线上使用基于原生zk api的ActiveStandbyElectorBasedElectorService,因此本文会基于ActiveStandbyElectorBasedElectorService进行resourcemanager高可用流程的介绍:

protected EmbeddedElector createEmbeddedElector() throws IOException {
    EmbeddedElector elector;
    curatorEnabled =
        conf.getBoolean(YarnConfiguration.CURATOR_LEADER_ELECTOR,
            YarnConfiguration.DEFAULT_CURATOR_LEADER_ELECTOR_ENABLED);
    if (curatorEnabled) {
      this.zkManager = createAndStartZKManager(conf);
      elector = new CuratorBasedElectorService(this);
    } else {
      elector = new ActiveStandbyElectorBasedElectorService(this);
    }
    return elector;
  }

ActiveStandbyElectorBasedElectorService服务启动时,经历如下调用流程:

ActiveStandbyElectorBasedElectorService#serviceStart -> ActiveStandbyElector#joinElection -> ActiveStandbyElector#joinElectionInternal -> ActiveStandbyElector#createLockNodeAsync,它会使用zkClient.create创建EPHEMERAL路径:

public class ActiveStandbyElector implements StatCallback, StringCallback {
  private void createLockNodeAsync() {
    zkClient.create(zkLockFilePath, appData, zkAcl, CreateMode.EPHEMERAL, this, zkClient);
  }
}

回调方式就是ActiveStandbyElector#processResult实现:

  1. 如果是当前resourcemanager创建的EPHEMERAL节点,直接进入active状态。
  2. 如果EPHEMERAL节点被其他人创建,resourcemanager进入standby状态。
  3. 如果EPHEMERAL节点节点还没有被创建,则resourcemanager重新尝试创建EPHEMERAL节点。
public class ActiveStandbyElector implements StatCallback, StringCallback {
  public synchronized void processResult(int rc, String path, Object ctx,
      String name) {
    //省略
    if (isSuccess(code)) {
      // we successfully created the znode. we are the leader. start monitoring
      //尝试进入Active状态
      if (becomeActive()) {
        //验证
        monitorActiveStatus();
      } else {
        //否则重新尝试创建zookeeper节点,以获得Active状态
        reJoinElectionAfterFailureToBecomeActive();
      }
      return;
    }
    //如果创建节点失败,但是节点已经存在,就进入standby状态
    if (isNodeExists(code)) {
      if (createRetryCount == 0) {
        becomeStandby();
      }
      monitorActiveStatus();
      return;
    }

    //如果创建节点失败,节点尚未存在,就重试创建节点
    if (shouldRetry(code)) {
      if (createRetryCount < maxRetryNum) {
        ++createRetryCount;
        createLockNodeAsync();
        return;
      }
    //省略
    } 
  }

4. ResourceManager watcher处理流程

经过回调方法的处理,每个resourcemanager都成功的切换了自己的状态。但是,后续standby状态的resourcemanager可能会成为active,而active状态的resourcemanager可能因为OOM转换成为standby。这种状态的切换,就需要听过zk事件的监听及处理。

在创建zookeeper客户端时,指定了默认的watcher:

protected synchronized ZooKeeper connectToZooKeeper() throws IOException, KeeperException {
    watcher = new WatcherWithClientRef();
    //把watcher注册到zookeeper中
    ZooKeeper zk = createZooKeeper();
    watcher.setZooKeeperRef(zk);
    //省略
    watcher.waitForZKConnectionEvent(zkSessionTimeout);
    //省略
    return zk;
  }

watcher的process实现如下,它将事件交给ActiveStandbyElector进行处理:

private final class WatcherWithClientRef implements Watcher {
    private ZooKeeper zk;
    
    //只有收到zk服务端的返回的连接事件后,才允许处理其它事件
    private CountDownLatch hasReceivedEvent = new CountDownLatch(1);
    //只有等待watcher设置了zookeeper引用,才能处理事件
    private CountDownLatch hasSetZooKeeper = new CountDownLatch(1);
    //省略普通方法

    //process是watcher处理zk事件的方法
    @Override
    public void process(WatchedEvent event) {
      //省略
      ActiveStandbyElector.this.processWatchEvent(zk, event);
      //省略
    }
  }

首先,要了解不同状态下,Zookeeper服务端会触发的事件。如下表所示,例如:

  1. 当客户端和服务端处于连接状态下(SyncConnected),当客户端成功与服务端建立对话,服务端会向客户端发送None类型的事件。
  2. 当客户端和服务端处于连接状态下(SyncConnected),当数据节点的数据内容发生变更,服务端会向客户端发送NodeDataChanged类型的事件。

可以看到,当客户端与服务端建立会话,会话超时,断开连接。都会向客户端发送None事件:

Untitled 1.png

ActiveStandbyElector#processWatchEvent方法中,则定义了resourcemanager处理不同事件的逻辑。大致分为以下几类处理思路:

  1. 当客户端与服务端session建立成功,连接断开,session过期。会让resourcemanager进入中立状态,一般进入standby状态,注册watcher等待重新选举。
  2. EPHEMERAL节点被删除,重新进行选举。
synchronized void processWatchEvent(ZooKeeper zk, WatchedEvent event) {
    Event.EventType eventType = event.getType();
    if (isStaleClient(zk)) return;
    if (LOG.isDebugEnabled()) {
      LOG.debug("Watcher event type: " + eventType + " with state:"
          + event.getState() + " for path:" + event.getPath()
          + " connectionState: " + zkConnectionState
          + " for " + this);
    }
    
    //None事件
    if (eventType == Event.EventType.None) {
      // the connection state has changed
      switch (event.getState()) {
      //当客户端与服务端建立时,更新zk连接状态。如果之前连接是断开的,就进入重新选择合适的状态进行切换
      case SyncConnected:
        LOG.info("Session connected.");
        // if the listener was asked to move to safe state then it needs to
        // be undone
        ConnectionState prevConnectionState = zkConnectionState;
        zkConnectionState = ConnectionState.CONNECTED;
        if (prevConnectionState == ConnectionState.DISCONNECTED &&
            wantToBeInElection) {
          monitorActiveStatus();
        }
        break;
      //如果客户端与服务端断开连接,就进入中立状态,就是变成standby状态
      case Disconnected:
        LOG.info("Session disconnected. Entering neutral mode...");

        // ask the app to move to safe state because zookeeper connection
        // is not active and we dont know our state
        zkConnectionState = ConnectionState.DISCONNECTED;
        enterNeutralMode();
        break;
      //如果会话过期,重新连接并进入中立状态,变成standby状态
      case Expired:
        // the connection got terminated because of session timeout
        // call listener to reconnect
        LOG.info("Session expired. Entering neutral mode and rejoining...");
        enterNeutralMode();
        reJoinElection(0);
        break;
      case SaslAuthenticated:
        LOG.info("Successfully authenticated to ZooKeeper using SASL.");
        break;
      default:
        fatalError("Unexpected Zookeeper watch event state: "
            + event.getState());
        break;
      }

      return;
    }

    // a watch on lock path in zookeeper has fired. so something has changed on
    // the lock. ideally we should check that the path is the same as the lock
    // path but trusting zookeeper for now
    String path = event.getPath();
    if (path != null) {
      switch (eventType) {
      //如果节点删除,就重新选举
      case NodeDeleted:
        if (state == State.ACTIVE) {
          enterNeutralMode();
        }
        joinElectionInternal();
        break;
      //如果节点内容变化,就检测状态
      case NodeDataChanged:
        monitorActiveStatus();
        break;
      default:
        if (LOG.isDebugEnabled()) {
          LOG.debug("Unexpected node event: " + eventType + " for path: " + path);
        }
        monitorActiveStatus();
      }

      return;
    }

    // some unexpected error has occurred
    fatalError("Unexpected watch error from Zookeeper");
  }

上述流程中,最重要的应该是NodeDeleted事件,会执行joinElectionInternal方法,该方法会调用createLockNodeAsync,即zkClient.create方法尝试创建EPHEMERAL节点:

private void joinElectionInternal() {
    Preconditions.checkState(appData != null,
        "trying to join election without any app data");
    if (zkClient == null) {
      if (!reEstablishSession()) {
        fatalError("Failed to reEstablish connection with ZooKeeper");
        return;
      }
    }

    createRetryCount = 0;
    wantToBeInElection = true;
    createLockNodeAsync();
  }

5. 相关配置

<property>
    <name>yarn.resourcemanager.recovery.enabled</name>
    <value>true</value>
    <description>Enable RM to recover state after starting. If true, then yarn.resourcemanager.store.class must be specified</description>
</property>
<property>
    <name>yarn.resourcemanager.store.class</name>
    <value>org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore</value>
    <description>The class to use as the persistent store.</description>
</property>
<property>
    <name>yarn.resourcemanager.zk-address</name>
    <value>hadoop34:2181,hadoop39:2181,hadoop40:2181</value>
    <description>Comma separated list of Host:Port pairs. Each corresponds to a ZooKeeper server(e.g. "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002") to be used by the RM for storing RM state.This must be supplied when using org.apache.hadoop.yarn.server.resourcemanager.recovery.ZKRMStateStore as the value for yarn.resourcemanager.store.class</description>
</property>

6. resourcemanager脑裂问题

对于zookeeper服务端,EPHEMERAL节点的生命周期和session一致,一旦session终止,EPHEMERAL节点就会被清理。因此,由于ZK Client发生full gc,或者机器负载过高,可能就会导致

Zookeeper Session Timeout,导致Session过期,EPHEMERAL节点被清除。向其他standby机器发送事件让其重新选举出新的leader。但是原先发生full gc的机器依然认为自己是leader。此时两个resourcemanager都是active,这就是脑裂现象。

对于脑裂的解决方法,官方暂时还没有实现。官方解释到:由于只有一个resourcemanager可以对zk进行修改,因此即使发生了脑裂,也不会污染zk中现存的数据。但是隐患是新提交的请求可能无法处理。

The ZooKeeper-based state store (ZKRMStateStore) allows only a single ResourceManager to make changes to the stored state, implicitly fencing the other ResourceManager. This is accomplished by the ResourceManager claiming exclusive create-delete permissions on the root znode. The ACLs on the root znode are automatically created based on the ACLs configured for the store; in case of secure clusters, Cloudera recommends that you set ACLs for the root host such that both ResourceManagers share read-write-admin access, but have exclusive create-delete access. The fencing is implicit and does not require explicit configuration (as fencing in HDFS does). You can plug in a custom "Fencer" if you choose to – for example, to use a different implementation of the state store.

7. 总结

  1. 每个resourcemanager服务启动时,会执行zkClient.create方法尝试在zk服务端创建EPHEMERAL节点。
  2. zkClient.create方法会通过异步的方式处理返回值,最终由ActiveStandbyElector#processResult处理。当resourcemanager成功创建了EPHEMERAL节点,就转换成为Active;其他节点就是standby。
  3. 调用zkClient同样会注册watcher,当resourcemanager服务关闭时,EPHEMERAL节点删除,zk生成NodeDeleted事件,让standby状态的resourcemanager进行处理。最终执行ActiveStandbyElector#processWatchEvent处理不同的事件,处理NodeDeleted事件时,依然会调用zkClient.create方法尝试创建EPHEMERAL节点。
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

GQ7psP7UJw7k
最新推荐 更多

2024-05-31