Spark中ApplicationMaster申请资源过程
  GQ7psP7UJw7k 2023年11月02日 38 0

1. 背景

此前写过两篇文章,分别介绍了应用提交到ResourceManager的事件流程,Resourcemanager命令Nodemanager启动container流程。

其中,https://blog.51cto.com/u_15327484/7790888介绍了应用提交到ResourceManager的事件流程。其中包含存储application信息到zk中,将application信息存放到scheduler中,发送LAUNCH事件命令Nodemanager启动容器。

后面,https://blog.51cto.com/u_15327484/7815432介绍了Nodemanager启动容器的过程。其中包含container日志聚合的启动,下载作业所需的public、private和application三种类型的资源,ContainerExecutor执行启动脚本。

在启动完ApplicationMaster进程后,ApplicationMaster会向ResourceManager进行注册,并向ResourceManager申请资源执行Task。本文将介绍Application申请资源的过程。

2. 资源调度模型

在Yarn中,资源都是由ResourceManager分配给每个ApplicationMaster,再由ApplicationMaster分配给它内部的各个任务。

ResourceManager不会主动获取每个NodeManager当前的状态,而是由Nodemanager定期发送心跳给ResourceManager,ResourceManager获取到Nodemanager节点的资源使用情况后,分配部分资源返回给ApplicationMaster,ApplicationMaster根据返回的节点信息,请求该节点执行Task。

在Yarn中,其资源分配流程如下所示:

  1. NodeManager周期性汇报节点信息。
  2. ResourceManager返回心跳应答,例如释放Container。
  3. ResourceManager收到心跳后,出发NODE_UPDATE事件。由ResourceScheduler实现类处理NODE_UPDATE事件,它分配资源返回给ApplicationMaster。
  4. ApplicationMaster收到分配的容器后,更新资源请求,发送其他待分配的容器请求。
  5. 最后,ApplicationMaster向NodeManager发送startContainer请求启动容器。

它的执行流程图如下所示:

Untitled.png

ApplicationMaster和ResourceManager之间的通信协议是ApplicationMasterProtocol,它定义了三个方法:

  • registerApplicationMaster()方法:ApplicationMaster通过rpc向ResourceManager注册。
  • allocate()方法:注册成功后,ApplicationMaster通过rpc向ResourceManager申请资源。
  • finishApplicationMaster()方法:ApplicationMaster将通过rpc告诉ResoureManager程序执行完毕并退出。

最重要的是registerApplicationMaster和allocate方法。以下通过Spark ApplicationMaster启动流程,讲解资源申请流程。

3. Spark启动ApplicationMaster流程

Yarn通过ContainerExecutor执行启动脚本后,会执行具体计算框架的逻辑。在Spark中,会进行两个步骤:

  1. 注册ApplicationMaster。
  2. 申请资源。
private def runDriver(): Unit = {
    addAmIpFilter(None)
    userClassThread = startUserApplication()
 
    // This a bit hacky, but we need to wait until the spark.driver.port property has
    // been set by the Thread executing the user class.
    logInfo("Waiting for spark context initialization...")
    val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
    try {
      val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
        Duration(totalWaitTime, TimeUnit.MILLISECONDS))
      if (sc != null) {
        rpcEnv = sc.env.rpcEnv
 
        val userConf = sc.getConf
        val host = userConf.get("spark.driver.host")
        val port = userConf.get("spark.driver.port").toInt
//注册ApplicationMaster
        registerAM(host, port, userConf, sc.ui.map(_.webUrl))
 
        val driverRef = rpcEnv.setupEndpointRef(
          RpcAddress(host, port),
          YarnSchedulerBackend.ENDPOINT_NAME)
//AM申请container
        createAllocator(driverRef, userConf)
      } else {
        // Sanity check; should never happen in normal operation, since sc should only be null
        // if the user app did not create a SparkContext.
        throw new IllegalStateException("User did not initialize spark context!")
      }
      resumeDriver()
      userClassThread.join()
    } catch {
      case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
        logError(
          s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
           "Please check earlier log output for errors. Failing the application.")
        finish(FinalApplicationStatus.FAILED,
          ApplicationMaster.EXIT_SC_NOT_INITED,
          "Timed out waiting for SparkContext.")
    } finally {
      resumeDriver()
    }
  }

申请资源时,直接调用Yarn的接口发送allocate请求,并在launchReporterThread中启动容器,同时发送剩余所需资源请求:

private val client = doAsUser { new YarnRMClient() }
@volatile private var allocator: YarnAllocator = _
 
private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
    val appId = client.getAttemptId().getApplicationId().toString()
    val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
      CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
 
    // Before we initialize the allocator, let's log the information about how executors will
    // be run up front, to avoid printing this out for every single executor being launched.
    // Use placeholders for information that changes such as executor IDs.
    logInfo {
      val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
      val executorCores = _sparkConf.get(EXECUTOR_CORES)
      val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
        "<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
      dummyRunner.launchContextDebugInfo()
    }
 
    allocator = client.createAllocator(
      yarnConf,
      _sparkConf,
      driverUrl,
      driverRef,
      securityMgr,
      localResources)
 
    credentialRenewer.foreach(_.setDriverRef(driverRef))
 
    // Initialize the AM endpoint *after* the allocator has been initialized. This ensures
    // that when the driver sends an initial executor request (e.g. after an AM restart),
    // the allocator is ready to service requests.
    rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
//申请container
    allocator.allocateResources()
    val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
    val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
    ms.registerSource(new ApplicationMasterSource(prefix, allocator))
    ms.start()
    metricsSystem = Some(ms)
    //启动容器并申请剩余所需资源
    reporterThread = launchReporterThread()
  }

4. 注册ApplicationMaster

在ApplicationMaster向ResourceManager发送资源请求之前,应该告诉ResourceManager自己的基本信息,这就是ApplicationMaster的注册流程的作用。

registerApplicationMaster方法最终是由DefaultAMSProcessor进行处理。它会发送RMAppAttemptRegistrationEvent事件:

//向RMAppAttemptImpl发送一个RMAppAttemptEventType.registered事件
    getRmContext().getDispatcher().getEventHandler()
        .handle(
            new RMAppAttemptRegistrationEvent(applicationAttemptId, request
                .getHost(), request.getRpcPort(), request.getTrackingUrl()));

RMAppAttemptImpl中定义了RMAppAttemptRegistrationEvent的执行逻辑,由FinalSavingTransition进行处理:

.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING,
          RMAppAttemptEventType.REGISTERED,
          new FinalSavingTransition(
            new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))

FinalSavingTransition则是将ApplicationMaster的attempt信息存储到了zk中:

AggregateAppResourceUsage resUsage =
        this.attemptMetrics.getAggregateAppResourceUsage();
    RMStateStore rmStore = rmContext.getStateStore();
    setFinishTime(System.currentTimeMillis());

    ApplicationAttemptStateData attemptState =
        ApplicationAttemptStateData.newInstance(
            applicationAttemptId,  getMasterContainer(),
            rmStore.getCredentialsFromAppAttempt(this),
            startTime, stateToBeStored, finalTrackingUrl, diags,
            finalStatus, exitStatus,
          getFinishTime(), resUsage.getMemorySeconds(),
          resUsage.getVcoreSeconds());
    LOG.info("Updating application attempt " + applicationAttemptId
        + " with final state: " + targetedFinalState + ", and exit status: "
        + exitStatus);
    rmStore.updateApplicationAttemptState(attemptState);

RMAppAttemptRegistrationEvent表示AppMaster的访问URL相关信息:

public RMAppAttemptRegistrationEvent(ApplicationAttemptId appAttemptId,
      String host, int rpcPort, String trackingUrl) {
//registered类型的RMAppAttemptEventType
    super(appAttemptId, RMAppAttemptEventType.REGISTERED);
    this.host = host;
    this.rpcport = rpcPort;
    this.trackingurl = trackingUrl;
  }

在resourcemanager webUI中,AppMaster的webUI交给proxy服务进行代理。proxy会访问application的webui,返回给用户。如下,是spark的注册相关日志:

Untitled 1.png

5. ApplicationMaster申请资源

Spark通过YarnAllocator.scala,通过ApplicationMasterProtocol#allocate方法向resourcemanager请求资源。请求资源后,执行handleAllocatedContainers启动容器:

/**
   * Request resources such that, if YARN gives us all we ask for, we'll have a number of containers
   * equal to maxExecutors.
   *
   * Deal with any containers YARN has granted to us by possibly launching executors in them.
   *
   * This must be synchronized because variables read in this method are mutated by other methods.
   */
  def allocateResources(): Unit = synchronized {
    updateResourceRequests()
 
    val progressIndicator = 0.1f
    // Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
    // requests.
//调用hadoop-yarn的AMRMClient#allocate()方法申请container
    val allocateResponse = amClient.allocate(progressIndicator)
 
    val allocatedContainers = allocateResponse.getAllocatedContainers()
    allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)
 
    if (allocatedContainers.size > 0) {
      logDebug(("Allocated containers: %d. Current executor count: %d. " +
        "Launching executor count: %d. Cluster resources: %s.")
        .format(
          allocatedContainers.size,
          runningExecutors.size,
          numExecutorsStarting.get,
          allocateResponse.getAvailableResources))
//获取到container资源后,在NM端start containers
      handleAllocatedContainers(allocatedContainers.asScala)
    }
 
    val completedContainers = allocateResponse.getCompletedContainersStatuses()
    if (completedContainers.size > 0) {
      logDebug("Completed %d containers".format(completedContainers.size))
      processCompletedContainers(completedContainers.asScala)
      logDebug("Finished processing %d completed containers. Current running executor count: %d."
        .format(completedContainers.size, runningExecutors.size))
    }
  }

spark通过AMRMClientImpl#allocate()向resourcemanager申请container,后续处理流程为:ApplicationMasterProtocol.allocate() -> ApplicationMasterService.allocate() -> AMSProcessingChain.allocate() -> DefaultAMSProcessor.allocate() -> FairScheduler.allocate() or CapacityScheduler.allocate()。

以FifoScheduler为例,它维护了yarn中的application信息。执行allocate时,先更新一下要申请的资源,再通过application.pullNewlyAllocatedContainersAndNMTokens查看并返回调度器分配的资源:

public Allocation allocate(
      ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
      List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
    FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
    if (application == null) {
      LOG.error("Calling allocate on removed " +
          "or non existant application " + applicationAttemptId);
      return EMPTY_ALLOCATION;
    }

      if (!ask.isEmpty()) {
        LOG.debug("allocate: pre-update" +
            " applicationId=" + applicationAttemptId + 
            " application=" + application);
        application.showRequests();

        // Update application requests
        //更新以下要申请的资源
        application.updateResourceRequests(ask);

        LOG.debug("allocate: post-update" +
            " applicationId=" + applicationAttemptId + 
            " application=" + application);
        application.showRequests();

        LOG.debug("allocate:" +
            " applicationId=" + applicationAttemptId +
            " #ask=" + ask.size());
      }

      if (application.isWaitingForAMContainer(application.getApplicationId())) {
        // Allocate is for AM and update AM blacklist for this
        application.updateAMBlacklist(
            blacklistAdditions, blacklistRemovals);
      } else {
        application.updateBlacklist(blacklistAdditions, blacklistRemovals);
      }
      //获取已经分配的资源
      ContainersAndNMTokensAllocation allocation =
          application.pullNewlyAllocatedContainersAndNMTokens();
      //返回这些资源给ApplicationMaster
      return new Allocation(allocation.getContainerList(),
        application.getHeadroom(), null, null, null,
        allocation.getNMTokenList());
    }

spark中,Application获取到资源后,会通过runAllocatedContainers启动线程,并执行容器启动命令。请求Nodemanager执行容器:

def startContainer(): java.util.Map[String, ByteBuffer] = {
    val ctx = Records.newRecord(classOf[ContainerLaunchContext])
      .asInstanceOf[ContainerLaunchContext]
    val env = prepareEnvironment().asJava
 
    ctx.setLocalResources(localResources.asJava)
    ctx.setEnvironment(env)
 
    val credentials = UserGroupInformation.getCurrentUser().getCredentials()
    val dob = new DataOutputBuffer()
    credentials.writeTokenStorageToStream(dob)
    ctx.setTokens(ByteBuffer.wrap(dob.getData()))
 
    val commands = prepareCommand()
//设置container执行的命令
    ctx.setCommands(commands.asJava)
    ctx.setApplicationACLs(
      YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)
 
    // If external shuffle service is enabled, register with the Yarn shuffle service already
    // started on the NodeManager and, if authentication is enabled, provide it with our secret
    // key for fetching shuffle files later
    if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
      val secretString = securityMgr.getSecretKey()
      val secretBytes =
        if (secretString != null) {
          // This conversion must match how the YarnShuffleService decodes our secret
          JavaUtils.stringToBytes(secretString)
        } else {
          // Authentication is not enabled, so just provide dummy metadata
          ByteBuffer.allocate(0)
        }
      ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
    }
 
    // Send the start request to the ContainerManager
    try {
//调用NMClientImpl的startContainer()方法在NM端启动container
      nmClient.startContainer(container.get, ctx)
    } catch {
      case ex: Exception =>
        throw new SparkException(s"Exception while starting container ${container.get.getId}" +
          s" on host $hostname", ex)
    }
  }

注意,Spark不会一次性从resourcemanager获取所有资源,每次resourcemanager只能分配部分资源给appmaster,appmaster会启动定时线程,从resourcemanager中获取剩余资源:

private def allocationThreadImpl(): Unit = {
    // The number of failures in a row until the allocation thread gives up.
    val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
    var failureCount = 0
    while (!finished) {
      try {
        if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
          finish(FinalApplicationStatus.FAILED,
            ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
            s"Max number of executor failures ($maxNumExecutorFailures) reached")
        } else if (allocator.isAllNodeExcluded) {
          finish(FinalApplicationStatus.FAILED,
            ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
            "Due to executor failures all available nodes are excluded")
        } else {
          logDebug("Sending progress")
          //
          allocator.allocateResources()
        }
        failureCount = 0
      }
        //省略
      try {
        val numPendingAllocate = allocator.getNumContainersPendingAllocate
        var sleepStartNs = 0L
        var sleepInterval = 200L // ms
        allocatorLock.synchronized {
          sleepInterval =
            if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
              val currentAllocationInterval =
                math.min(heartbeatInterval, nextAllocationInterval)
              nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
              currentAllocationInterval
            } else {
              nextAllocationInterval = initialAllocationInterval
              heartbeatInterval
            }
          sleepStartNs = System.nanoTime()
          allocatorLock.wait(sleepInterval)
        }
        val sleepDuration = System.nanoTime() - sleepStartNs
        if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) {
          // log when sleep is interrupted
          logDebug(s"Number of pending allocations is $numPendingAllocate. " +
              s"Slept for $sleepDuration/$sleepInterval ms.")
          // if sleep was less than the minimum interval, sleep for the rest of it
          val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
          if (toSleep > 0) {
            logDebug(s"Going back to sleep for $toSleep ms")
            // use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
            // by the methods that signal allocatorLock because this is just finishing the min
            // sleep interval, which should happen even if this is signalled again.
            Thread.sleep(toSleep)
          }
        } else {
          logDebug(s"Number of pending allocations is $numPendingAllocate. " +
              s"Slept for $sleepDuration/$sleepInterval.")
        }
      } catch {
        case e: InterruptedException =>
      }
    }
  }

6. 总结

  1. applicationmaster会向resourcemanger进行注册,resourcemanager将applicationmaster的ui等信息存放在zk中,当resourcemanager切换时,可以快速恢复proxy服务。
  2. spark applicationmaster启动线程,定时向resourcemanager请求资源。
  3. spark每次只能获取部分资源,剩下的资源由定时线程发出请求。
  4. applicationmaster获取资源后,定义好启动命令后,向nodemanager启动容器。
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  F36IaJwrKLcw   2023年12月23日   34   0   0 idesparkidesparkDataData
  r3WP0l4Uu2vq   2023年12月23日   31   0   0 sqlsparksparkSQL
GQ7psP7UJw7k
最新推荐 更多

2024-05-31