ResourceManager处理作业提交请求
  GQ7psP7UJw7k 2023年11月02日 28 0

1. 背景

之前写过Yarn状态机的两篇文章。

https://blog.51cto.com/u_15327484/4940200介绍了AsyncDispatcher线程,它提供以下机制:

  1. 通过调用它的register()方法注册不同类型事件对应的处理器,放入Map中。
  2. 通过调用它的handle()方法将具体的事件放入到事件队列BlockingQueue中。
  3. 内部eventHandlingThread线程负责消费BlockingQueue中的事件,它匹配事件对应的Handler,执行Handler的handle()方法。

https://blog.51cto.com/u_15327484/5015131介绍了状态机,它提供以下机制:

  1. ResourceManager维护了4个类型的状态机。RMApp(维护每个application状态),RMAppAttempt(维护每个application attempt状态),RMContainer(维护每个container状态)和RMNode(维护每个nodemanager状态)。
  2. EventHandler在执行时,内部往往会转换状态机的状态。它会调用状态机的doTransition方法,进行状态转换。
  3. 状态机会保存其转换方法,在初始化时进行构建。

2. YarnClient提交作业

通过ApplicationClientProtocol构建的反射对象,通过rpc提交submitApplication请求到服务端:

//通过ApplicatonClientProtocol发起提交应用请求
protected ApplicationClientProtocol rmClient;
 
@Override
  public ApplicationId
      submitApplication(ApplicationSubmissionContext appContext)
          throws YarnException, IOException {
    ApplicationId applicationId = appContext.getApplicationId();
    if (applicationId == null) {
      throw new ApplicationIdNotProvidedException(
          "ApplicationId is not provided in ApplicationSubmissionContext");
    }
    SubmitApplicationRequest request =
        Records.newRecord(SubmitApplicationRequest.class);
    request.setApplicationSubmissionContext(appContext);
 
    // Automatically add the timeline DT into the CLC
    // Only when the security and the timeline service are both enabled
    if (isSecurityEnabled() && timelineServiceEnabled) {
      addTimelineDelegationToken(appContext.getAMContainerSpec());
    }
 
    //TODO: YARN-1763:Handle RM failovers during the submitApplication call.
    rmClient.submitApplication(request);
 
    int pollCount = 0;
    long startTime = System.currentTimeMillis();
 
    while (true) {
      try {
        YarnApplicationState state =
            getApplicationReport(applicationId).getYarnApplicationState();
        if (!state.equals(YarnApplicationState.NEW) &&
            !state.equals(YarnApplicationState.NEW_SAVING)) {
          LOG.info("Submitted application " + applicationId);
          break;
        }
        //省略
    }
 
    return applicationId;
  }

服务端经过RPC的反射,执行ApplicationClientProtocol的实现方法。最终通过RMAppImpl将RMAppEventType.START事件放入到AsyncDispatcher中:

protected void submitApplication(
      ApplicationSubmissionContext submissionContext, long submitTime,
      String user) throws YarnException {
    ApplicationId applicationId = submissionContext.getApplicationId();
 
    // Passing start time as -1. It will be eventually set in RMAppImpl
    // constructor.
//创建RMAppImpl,并存入RMContext
    RMAppImpl application = createAndPopulateNewRMApp(
        submissionContext, submitTime, user, false, -1);
    try {
      if (UserGroupInformation.isSecurityEnabled()) {
        this.rmContext.getDelegationTokenRenewer()
            .addApplicationAsync(applicationId,
                BuilderUtils.parseCredentials(submissionContext),
                submissionContext.getCancelTokensWhenComplete(),
                application.getUser(),
                BuilderUtils.parseTokensConf(submissionContext));
      } else {
        // Dispatcher is not yet started at this time, so these START events
        // enqueued should be guaranteed to be first processed when dispatcher
        // gets started.
        this.rmContext.getDispatcher().getEventHandler()
            .handle(new RMAppEvent(applicationId, RMAppEventType.START));
      }
    } catch (Exception e) {
      LOG.warn("Unable to parse credentials for " + applicationId, e);
      // Sending APP_REJECTED is fine, since we assume that the
      // RMApp is in NEW state and thus we haven't yet informed the
      // scheduler about the existence of the application
      this.rmContext.getDispatcher().getEventHandler()
          .handle(new RMAppEvent(applicationId,
              RMAppEventType.APP_REJECTED, e.getMessage()));
      throw RPCUtil.getRemoteException(e);
    }
  }

ResourceManager的内部类RMActiveService注册了ApplicationEventDispatcher处理RMAppEvent。内部使用RMAppImpl处理RMAppEvent,RMAppImpl添加start类型的RMAppEventType的处理Transition,为:

.addTransition(RMAppState.NEW, RMAppState.NEW_SAVING,
RMAppEventType.START, new RMAppNewlySavingTransition())

它的意思是,对于处于RMAppState.NEW的应用,遇到RMAppEventType.START事件时,通过执行RMAppNewlySavingTransition.transition()方法,会将作业状态转变成为RMAppState.NEW_SAVING。

后续走了很多状态转换,这里先把它的转换过程列出来。后续只解析重要的状态转换:

  1. start类型的RMAppEvent。
  2. storeApp类型的RMStateStoreEvent,主要负责将app信息写入到zk中。
  3. newSaved类型的RMAppEvent。
  4. appAdded类型的SchedulerEvent,通过Scheduler进行调度。
  5. Accepted类型的RMAppEvent。
  6. app_accepted类型的RMAppEvent。
  7. start类型的RMAppAttemptEvent。
  8. appAttemptAdd类型的SchedulerEvent。
  9. AttemptAdded类型的RMAppAttemptEvent,通过Scheduler进行调度。
  10. storeAppAttempt类型的RMStateStoreEvent,主要负责将app attempt信息写入到zk中。
  11. AttemptNewSaved类型的RMAppAttemptEvent。
  12. launch类型的AMLaucherEvent,主要负责在Nodemanager中启动ApplicationMaster。

3. STORE_APP事件向ZK保存app状态

如下,StoreAppTransition负责处理STORE_APP事件,它调用RMStateStore.storeApplicationStateInternal,内部创建zk客户端,请求保存app状态。注意,app attempt也会请求zk保存状态:

//RMStateStore.java
stateMachineFactory = new StateMachineFactory<RMStateStore,
                                                    RMStateStoreState,
                                                    RMStateStoreEventType,
                                                    RMStateStoreEvent>(
      RMStateStoreState.ACTIVE)
.addTransition(RMStateStoreState.ACTIVE,
          EnumSet.of(RMStateStoreState.ACTIVE, RMStateStoreState.FENCED),
          RMStateStoreEventType.STORE_APP, new StoreAppTransition())
 
 
  private static class StoreAppTransition
      implements MultipleArcTransition<RMStateStore, RMStateStoreEvent,
          RMStateStoreState> {
    @Override
    public RMStateStoreState transition(RMStateStore store,
        RMStateStoreEvent event) {
      if (!(event instanceof RMStateStoreAppEvent)) {
        // should never happen
        LOG.error("Illegal event type: " + event.getClass());
        return RMStateStoreState.ACTIVE;
      }
      boolean isFenced = false;
      ApplicationStateData appState =
          ((RMStateStoreAppEvent) event).getAppState();
      ApplicationId appId =
          appState.getApplicationSubmissionContext().getApplicationId();
      LOG.info("Storing info for app: " + appId);
      try {
        store.storeApplicationStateInternal(appId, appState);
        store.notifyApplication(
            new RMAppEvent(appId, RMAppEventType.APP_NEW_SAVED));
      } catch (Exception e) {
        LOG.error("Error storing app: " + appId, e);
        if (e instanceof StoreLimitException) {
          store.notifyApplication(
              new RMAppEvent(appId, RMAppEventType.APP_SAVE_FAILED,
                  e.getMessage()));
        } else {
          isFenced = store.notifyStoreOperationFailedInternal(e);
        }
      }
      return finalState(isFenced);
    };
 
  }
 
/**
   * This method is called to notify the application that
   * new application is stored or updated in state store
   * @param event App event containing the app id and event type
   */
  private void notifyApplication(RMAppEvent event) {
    rmDispatcher.getEventHandler().handle(event);
  }

4. AppAdded通过调度器进行调度

当收到RMAppEventType.APP_NEW_SAVED事件时,会将AppAddedSchedulerEvent事件放到AsyncDispatcher中:

.addTransition(RMAppState.NEW_SAVING, RMAppState.SUBMITTED,
        RMAppEventType.APP_NEW_SAVED, new AddApplicationToSchedulerTransition())
 
private static final class AddApplicationToSchedulerTransition extends
      RMAppTransition {
    @Override
    public void transition(RMAppImpl app, RMAppEvent event) {
      app.handler.handle(
          new AppAddedSchedulerEvent(app.user, app.submissionContext, false,
              app.applicationPriority, app.placementContext));
      // send the ATS create Event
      app.sendATSCreateEvent();
    }
  }

//事件包含appId,队列等信息
public AppAddedSchedulerEvent(ApplicationId applicationId, String queue,
      String user, boolean isAppRecovering, ReservationId reservationID,
      Priority appPriority, ApplicationPlacementContext placementContext) {
    super(SchedulerEventType.APP_ADDED);
    this.applicationId = applicationId;
    this.queue = queue;
    this.user = user;
    this.reservationID = reservationID;
    this.isAppRecovering = isAppRecovering;
    this.appPriority = appPriority;
    this.placementContext = placementContext;
  }

ResourceManager的内部类RMActiveService在初始化时,规定了要使用EventDispatcher处理SchedulerEvent类型的事件:

protected void serviceInit(Configuration configuration) throws Exception {
      scheduler = createScheduler();
      scheduler.setRMContext(rmContext);
      addIfService(scheduler);
      rmContext.setScheduler(scheduler);
 
      schedulerDispatcher = createSchedulerEventDispatcher();
      addIfService(schedulerDispatcher);
      rmDispatcher.register(SchedulerEventType.class, schedulerDispatcher);
    
}
 
protected EventHandler<SchedulerEvent> createSchedulerEventDispatcher() {
    return new EventDispatcher(this.scheduler, "SchedulerEventDispatcher");
}

EventDispatcher和AsyncDispatcher类似,handle()方法将事件放入到eventQueue,通过eventProcessor消费事件,通过handler处理事件:

public class EventDispatcher<T extends Event> extends AbstractService implements EventHandler<T> {
  private final EventHandler<T> handler;
  private final BlockingQueue<T> eventQueue =  new LinkedBlockingDeque<>();
  private final Thread eventProcessor;
}

在此处,则是规定使用使用ResourceScheduler实现类处理SchedulerEvent类型事件。

ResourceScheduler有三种常用实现:FifoScheduler、FairScheduler、CapacityScheduler。

@Override
  public void handle(SchedulerEvent event) {
    switch(event.getType()) {
    case APP_ADDED:
    {
      AppAddedSchedulerEvent appAddedEvent = (AppAddedSchedulerEvent) event;
//addApplication()方法记录该app id和app到AbstractYarnSceduler的并发map集合中
      addApplication(appAddedEvent.getApplicationId(),
        appAddedEvent.getQueue(), appAddedEvent.getUser(),
        appAddedEvent.getIsAppRecovering());
    }
    break;
 
    case APP_ATTEMPT_ADDED:
    {
      AppAttemptAddedSchedulerEvent appAttemptAddedEvent =
          (AppAttemptAddedSchedulerEvent) event;
      addApplicationAttempt(appAttemptAddedEvent.getApplicationAttemptId(),
        appAttemptAddedEvent.getTransferStateFromPreviousAttempt(),
        appAttemptAddedEvent.getIsAttemptRecovering());
    }
    break;
    ..................
 
  }

对于FifoScheduler,直接添加Application:

public synchronized void addApplication(ApplicationId applicationId,
      String queue, String user, boolean isAppRecovering) {
    SchedulerApplication<FifoAppAttempt> application =
        new SchedulerApplication<>(DEFAULT_QUEUE, user);
//记录该app id和app到AbstractYarnSceduler的并发map集合中
    applications.put(applicationId, application);
    metrics.submitApp(user);
    LOG.info("Accepted application " + applicationId + " from user: " + user
        + ", currently num of applications: " + applications.size());
    if (isAppRecovering) {
      if (LOG.isDebugEnabled()) {
        LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
      }
    } else {
      rmContext.getDispatcher().getEventHandler()
        .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
    }
  }

对于FairScheduler,会分配队列:

protected void addApplication(ApplicationId applicationId,
      String queueName, String user, boolean isAppRecovering) {
    if (queueName == null || queueName.isEmpty()) {
      String message = "Reject application " + applicationId +
              " submitted by user " + user + " with an empty queue name.";
      LOG.info(message);
      rmContext.getDispatcher().getEventHandler()
          .handle(new RMAppRejectedEvent(applicationId, message));
      return;
    }

    if (queueName.startsWith(".") || queueName.endsWith(".")) {
      String message = "Reject application " + applicationId
          + " submitted by user " + user + " with an illegal queue name "
          + queueName + ". "
          + "The queue name cannot start/end with period.";
      LOG.info(message);
      rmContext.getDispatcher().getEventHandler()
          .handle(new RMAppRejectedEvent(applicationId, message));
      return;
    }

    try {
      writeLock.lock();
      RMApp rmApp = rmContext.getRMApps().get(applicationId);
      //配置队列
      FSLeafQueue queue = assignToQueue(rmApp, queueName, user);
      if (queue == null) {
        return;
      }

      // Enforce ACLs
      UserGroupInformation userUgi = UserGroupInformation.createRemoteUser(user);

      if (!queue.hasAccess(QueueACL.SUBMIT_APPLICATIONS, userUgi)
          && !queue.hasAccess(QueueACL.ADMINISTER_QUEUE, userUgi)) {
        String msg = "User " + userUgi.getUserName() +
            " cannot submit applications to queue " + queue.getName();
        LOG.info(msg);
        rmContext.getDispatcher().getEventHandler()
            .handle(new RMAppRejectedEvent(applicationId, msg));
        return;
      }

      SchedulerApplication<FSAppAttempt> application =
          new SchedulerApplication<FSAppAttempt>(queue, user);
      applications.put(applicationId, application);
      queue.getMetrics().submitApp(user);

      LOG.info("Accepted application " + applicationId + " from user: " + user
          + ", in queue: " + queueName + ", currently num of applications: "
          + applications.size());
      if (isAppRecovering) {
        if (LOG.isDebugEnabled()) {
          LOG.debug(applicationId + " is recovering. Skip notifying APP_ACCEPTED");
        }
      } else {
        rmContext.getDispatcher().getEventHandler()
            .handle(new RMAppEvent(applicationId, RMAppEventType.APP_ACCEPTED));
      }
    } finally {
      writeLock.unlock();
    }
  }

5. AMLaucherEvent启动ApplicationMaster

ResourceManager的内部类RMActiveService为AsyncDispatcher注册AMLauncherEventType类型的处理器:ApplicationMasterLauncher。

protected void serviceInit(Configuration configuration) throws Exception {
    applicationMasterLauncher = createAMLauncher();
      rmDispatcher.register(AMLauncherEventType.class,
          applicationMasterLauncher);
}
 
protected ApplicationMasterLauncher createAMLauncher() {
    return new ApplicationMasterLauncher(this.rmContext);
  }

ApplicationMasterLauncher的handle方法负责将事件放入到LinkedBlockingQueue中:

//Runnable队列,通过队列可以在短时间内有大量launch AM的事件提交时起到缓冲作用
private final BlockingQueue<Runnable> masterEvents
    = new LinkedBlockingQueue<Runnable>();
 
@Override
  public synchronized void  handle(AMLauncherEvent appEvent) {
    AMLauncherEventType event = appEvent.getType();
    RMAppAttempt application = appEvent.getAppAttempt();
    switch (event) {
    case LAUNCH:
      launch(application);
      break;
    case CLEANUP:
      cleanup(application);
      break;
    default:
      break;
    }
  }
 
private void launch(RMAppAttempt application) {
//生成AMLauncher
    Runnable launcher = createRunnableLauncher(application, 
        AMLauncherEventType.LAUNCH);
//添加到runnable队列
    masterEvents.add(launcher);
  }

AMLauncher线程中,调用launch方法,请求Nodemanager启动AM。发起了startContainer请求:

private void launch() throws IOException, YarnException {
    connect();
    ContainerId masterContainerID = masterContainer.getId();
    ApplicationSubmissionContext applicationContext =
        application.getSubmissionContext();
    LOG.info("Setting up container " + masterContainer
        + " for AM " + application.getAppAttemptId());
//创建ContainerLaunchContext,里面包含着启动AM要执行的命令
    ContainerLaunchContext launchContext =
        createAMContainerLaunchContext(applicationContext, masterContainerID);
//创建startContainerRequest,AM是NM端启动的第一个contaienr,由RM发起
//以后的startContainerRequest都由AM发起
    StartContainerRequest scRequest =
        StartContainerRequest.newInstance(launchContext,
          masterContainer.getContainerToken());
    List<StartContainerRequest> list = new ArrayList<StartContainerRequest>();
    list.add(scRequest);
//rpc请求的消息格式是StartContainersRequest
    StartContainersRequest allRequests =
        StartContainersRequest.newInstance(list);
//代理发起rpc请求
    StartContainersResponse response =
        containerMgrProxy.startContainers(allRequests);
    if (response.getFailedRequests() != null
        && response.getFailedRequests().containsKey(masterContainerID)) {
      Throwable t =
          response.getFailedRequests().get(masterContainerID).deSerialize();
      parseAndThrowException(t);
    } else {
      LOG.info("Done launching container " + masterContainer + " for AM "
          + application.getAppAttemptId());
    }
  }

5. 总结

  1. ResourceManager由AsyncDispatcher在队列中存储事件,启动线程消费并处理事件。ResourManager会注册Handler处理不同类型的事件。
  2. ResourceManager通过状态机表示应用状态,状态机设置了转换方法,当AsyncDispatcher的Handler处理事件时,往往会对状态机的状态进行转换。
  3. 处理作业提交请求的过程中,会通过STORE_APP事件,将App和App Attempt信息存储到ZK中。
  4. 处理AppAdded事件时,会通过Scheduler对作业进行调度,例如作业应该放到哪个队列中。这些事件会放到EventDispatcher中的阻塞队列中,通过新线程消费这些事件。
  5. 处理LAUNCH事件时,向Nodemanager请求启动AM。这些事件会放到ApplicationMasterLauncher的阻塞队列汇总,通过新线程消费这些事件。
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  dhQTAsTc5eYm   2023年12月23日   50   0   0 HadoopHadoopapacheapache
GQ7psP7UJw7k