1. 背景
此前写过两篇文章,分别介绍了应用提交到ResourceManager的事件流程,Resourcemanager命令Nodemanager启动container流程。
其中,https://blog.51cto.com/u_15327484/7790888介绍了应用提交到ResourceManager的事件流程。其中包含存储application信息到zk中,将application信息存放到scheduler中,发送LAUNCH事件命令Nodemanager启动容器。
后面,https://blog.51cto.com/u_15327484/7815432介绍了Nodemanager启动容器的过程。其中包含container日志聚合的启动,下载作业所需的public、private和application三种类型的资源,ContainerExecutor执行启动脚本。
在启动完ApplicationMaster进程后,ApplicationMaster会向ResourceManager进行注册,并向ResourceManager申请资源执行Task。本文将介绍Application申请资源的过程。
2. 资源调度模型
在Yarn中,资源都是由ResourceManager分配给每个ApplicationMaster,再由ApplicationMaster分配给它内部的各个任务。
ResourceManager不会主动获取每个NodeManager当前的状态,而是由Nodemanager定期发送心跳给ResourceManager,ResourceManager获取到Nodemanager节点的资源使用情况后,分配部分资源返回给ApplicationMaster,ApplicationMaster根据返回的节点信息,请求该节点执行Task。
在Yarn中,其资源分配流程如下所示:
- NodeManager周期性汇报节点信息。
- ResourceManager返回心跳应答,例如释放Container。
- ResourceManager收到心跳后,出发NODE_UPDATE事件。由ResourceScheduler实现类处理NODE_UPDATE事件,它分配资源返回给ApplicationMaster。
- ApplicationMaster收到分配的容器后,更新资源请求,发送其他待分配的容器请求。
- 最后,ApplicationMaster向NodeManager发送startContainer请求启动容器。
它的执行流程图如下所示:
ApplicationMaster和ResourceManager之间的通信协议是ApplicationMasterProtocol,它定义了三个方法:
- registerApplicationMaster()方法:ApplicationMaster通过rpc向ResourceManager注册。
- allocate()方法:注册成功后,ApplicationMaster通过rpc向ResourceManager申请资源。
- finishApplicationMaster()方法:ApplicationMaster将通过rpc告诉ResoureManager程序执行完毕并退出。
最重要的是registerApplicationMaster和allocate方法。以下通过Spark ApplicationMaster启动流程,讲解资源申请流程。
3. Spark启动ApplicationMaster流程
Yarn通过ContainerExecutor执行启动脚本后,会执行具体计算框架的逻辑。在Spark中,会进行两个步骤:
- 注册ApplicationMaster。
- 申请资源。
private def runDriver(): Unit = {
addAmIpFilter(None)
userClassThread = startUserApplication()
// This a bit hacky, but we need to wait until the spark.driver.port property has
// been set by the Thread executing the user class.
logInfo("Waiting for spark context initialization...")
val totalWaitTime = sparkConf.get(AM_MAX_WAIT_TIME)
try {
val sc = ThreadUtils.awaitResult(sparkContextPromise.future,
Duration(totalWaitTime, TimeUnit.MILLISECONDS))
if (sc != null) {
rpcEnv = sc.env.rpcEnv
val userConf = sc.getConf
val host = userConf.get("spark.driver.host")
val port = userConf.get("spark.driver.port").toInt
//注册ApplicationMaster
registerAM(host, port, userConf, sc.ui.map(_.webUrl))
val driverRef = rpcEnv.setupEndpointRef(
RpcAddress(host, port),
YarnSchedulerBackend.ENDPOINT_NAME)
//AM申请container
createAllocator(driverRef, userConf)
} else {
// Sanity check; should never happen in normal operation, since sc should only be null
// if the user app did not create a SparkContext.
throw new IllegalStateException("User did not initialize spark context!")
}
resumeDriver()
userClassThread.join()
} catch {
case e: SparkException if e.getCause().isInstanceOf[TimeoutException] =>
logError(
s"SparkContext did not initialize after waiting for $totalWaitTime ms. " +
"Please check earlier log output for errors. Failing the application.")
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_SC_NOT_INITED,
"Timed out waiting for SparkContext.")
} finally {
resumeDriver()
}
}
申请资源时,直接调用Yarn的接口发送allocate请求,并在launchReporterThread中启动容器,同时发送剩余所需资源请求:
private val client = doAsUser { new YarnRMClient() }
@volatile private var allocator: YarnAllocator = _
private def createAllocator(driverRef: RpcEndpointRef, _sparkConf: SparkConf): Unit = {
val appId = client.getAttemptId().getApplicationId().toString()
val driverUrl = RpcEndpointAddress(driverRef.address.host, driverRef.address.port,
CoarseGrainedSchedulerBackend.ENDPOINT_NAME).toString
// Before we initialize the allocator, let's log the information about how executors will
// be run up front, to avoid printing this out for every single executor being launched.
// Use placeholders for information that changes such as executor IDs.
logInfo {
val executorMemory = _sparkConf.get(EXECUTOR_MEMORY).toInt
val executorCores = _sparkConf.get(EXECUTOR_CORES)
val dummyRunner = new ExecutorRunnable(None, yarnConf, _sparkConf, driverUrl, "<executorId>",
"<hostname>", executorMemory, executorCores, appId, securityMgr, localResources)
dummyRunner.launchContextDebugInfo()
}
allocator = client.createAllocator(
yarnConf,
_sparkConf,
driverUrl,
driverRef,
securityMgr,
localResources)
credentialRenewer.foreach(_.setDriverRef(driverRef))
// Initialize the AM endpoint *after* the allocator has been initialized. This ensures
// that when the driver sends an initial executor request (e.g. after an AM restart),
// the allocator is ready to service requests.
rpcEnv.setupEndpoint("YarnAM", new AMEndpoint(rpcEnv, driverRef))
//申请container
allocator.allocateResources()
val ms = MetricsSystem.createMetricsSystem("applicationMaster", sparkConf, securityMgr)
val prefix = _sparkConf.get(YARN_METRICS_NAMESPACE).getOrElse(appId)
ms.registerSource(new ApplicationMasterSource(prefix, allocator))
ms.start()
metricsSystem = Some(ms)
//启动容器并申请剩余所需资源
reporterThread = launchReporterThread()
}
4. 注册ApplicationMaster
在ApplicationMaster向ResourceManager发送资源请求之前,应该告诉ResourceManager自己的基本信息,这就是ApplicationMaster的注册流程的作用。
registerApplicationMaster方法最终是由DefaultAMSProcessor进行处理。它会发送RMAppAttemptRegistrationEvent事件:
//向RMAppAttemptImpl发送一个RMAppAttemptEventType.registered事件
getRmContext().getDispatcher().getEventHandler()
.handle(
new RMAppAttemptRegistrationEvent(applicationAttemptId, request
.getHost(), request.getRpcPort(), request.getTrackingUrl()));
RMAppAttemptImpl中定义了RMAppAttemptRegistrationEvent的执行逻辑,由FinalSavingTransition进行处理:
.addTransition(RMAppAttemptState.NEW, RMAppAttemptState.FINAL_SAVING,
RMAppAttemptEventType.REGISTERED,
new FinalSavingTransition(
new UnexpectedAMRegisteredTransition(), RMAppAttemptState.FAILED))
FinalSavingTransition则是将ApplicationMaster的attempt信息存储到了zk中:
AggregateAppResourceUsage resUsage =
this.attemptMetrics.getAggregateAppResourceUsage();
RMStateStore rmStore = rmContext.getStateStore();
setFinishTime(System.currentTimeMillis());
ApplicationAttemptStateData attemptState =
ApplicationAttemptStateData.newInstance(
applicationAttemptId, getMasterContainer(),
rmStore.getCredentialsFromAppAttempt(this),
startTime, stateToBeStored, finalTrackingUrl, diags,
finalStatus, exitStatus,
getFinishTime(), resUsage.getMemorySeconds(),
resUsage.getVcoreSeconds());
LOG.info("Updating application attempt " + applicationAttemptId
+ " with final state: " + targetedFinalState + ", and exit status: "
+ exitStatus);
rmStore.updateApplicationAttemptState(attemptState);
RMAppAttemptRegistrationEvent表示AppMaster的访问URL相关信息:
public RMAppAttemptRegistrationEvent(ApplicationAttemptId appAttemptId,
String host, int rpcPort, String trackingUrl) {
//registered类型的RMAppAttemptEventType
super(appAttemptId, RMAppAttemptEventType.REGISTERED);
this.host = host;
this.rpcport = rpcPort;
this.trackingurl = trackingUrl;
}
在resourcemanager webUI中,AppMaster的webUI交给proxy服务进行代理。proxy会访问application的webui,返回给用户。如下,是spark的注册相关日志:
5. ApplicationMaster申请资源
Spark通过YarnAllocator.scala,通过ApplicationMasterProtocol#allocate方法向resourcemanager请求资源。请求资源后,执行handleAllocatedContainers启动容器:
/**
* Request resources such that, if YARN gives us all we ask for, we'll have a number of containers
* equal to maxExecutors.
*
* Deal with any containers YARN has granted to us by possibly launching executors in them.
*
* This must be synchronized because variables read in this method are mutated by other methods.
*/
def allocateResources(): Unit = synchronized {
updateResourceRequests()
val progressIndicator = 0.1f
// Poll the ResourceManager. This doubles as a heartbeat if there are no pending container
// requests.
//调用hadoop-yarn的AMRMClient#allocate()方法申请container
val allocateResponse = amClient.allocate(progressIndicator)
val allocatedContainers = allocateResponse.getAllocatedContainers()
allocatorBlacklistTracker.setNumClusterNodes(allocateResponse.getNumClusterNodes)
if (allocatedContainers.size > 0) {
logDebug(("Allocated containers: %d. Current executor count: %d. " +
"Launching executor count: %d. Cluster resources: %s.")
.format(
allocatedContainers.size,
runningExecutors.size,
numExecutorsStarting.get,
allocateResponse.getAvailableResources))
//获取到container资源后,在NM端start containers
handleAllocatedContainers(allocatedContainers.asScala)
}
val completedContainers = allocateResponse.getCompletedContainersStatuses()
if (completedContainers.size > 0) {
logDebug("Completed %d containers".format(completedContainers.size))
processCompletedContainers(completedContainers.asScala)
logDebug("Finished processing %d completed containers. Current running executor count: %d."
.format(completedContainers.size, runningExecutors.size))
}
}
spark通过AMRMClientImpl#allocate()向resourcemanager申请container,后续处理流程为:ApplicationMasterProtocol.allocate() -> ApplicationMasterService.allocate() -> AMSProcessingChain.allocate() -> DefaultAMSProcessor.allocate() -> FairScheduler.allocate() or CapacityScheduler.allocate()。
以FifoScheduler为例,它维护了yarn中的application信息。执行allocate时,先更新一下要申请的资源,再通过application.pullNewlyAllocatedContainersAndNMTokens查看并返回调度器分配的资源:
public Allocation allocate(
ApplicationAttemptId applicationAttemptId, List<ResourceRequest> ask,
List<ContainerId> release, List<String> blacklistAdditions, List<String> blacklistRemovals) {
FiCaSchedulerApp application = getApplicationAttempt(applicationAttemptId);
if (application == null) {
LOG.error("Calling allocate on removed " +
"or non existant application " + applicationAttemptId);
return EMPTY_ALLOCATION;
}
if (!ask.isEmpty()) {
LOG.debug("allocate: pre-update" +
" applicationId=" + applicationAttemptId +
" application=" + application);
application.showRequests();
// Update application requests
//更新以下要申请的资源
application.updateResourceRequests(ask);
LOG.debug("allocate: post-update" +
" applicationId=" + applicationAttemptId +
" application=" + application);
application.showRequests();
LOG.debug("allocate:" +
" applicationId=" + applicationAttemptId +
" #ask=" + ask.size());
}
if (application.isWaitingForAMContainer(application.getApplicationId())) {
// Allocate is for AM and update AM blacklist for this
application.updateAMBlacklist(
blacklistAdditions, blacklistRemovals);
} else {
application.updateBlacklist(blacklistAdditions, blacklistRemovals);
}
//获取已经分配的资源
ContainersAndNMTokensAllocation allocation =
application.pullNewlyAllocatedContainersAndNMTokens();
//返回这些资源给ApplicationMaster
return new Allocation(allocation.getContainerList(),
application.getHeadroom(), null, null, null,
allocation.getNMTokenList());
}
spark中,Application获取到资源后,会通过runAllocatedContainers启动线程,并执行容器启动命令。请求Nodemanager执行容器:
def startContainer(): java.util.Map[String, ByteBuffer] = {
val ctx = Records.newRecord(classOf[ContainerLaunchContext])
.asInstanceOf[ContainerLaunchContext]
val env = prepareEnvironment().asJava
ctx.setLocalResources(localResources.asJava)
ctx.setEnvironment(env)
val credentials = UserGroupInformation.getCurrentUser().getCredentials()
val dob = new DataOutputBuffer()
credentials.writeTokenStorageToStream(dob)
ctx.setTokens(ByteBuffer.wrap(dob.getData()))
val commands = prepareCommand()
//设置container执行的命令
ctx.setCommands(commands.asJava)
ctx.setApplicationACLs(
YarnSparkHadoopUtil.getApplicationAclsForYarn(securityMgr).asJava)
// If external shuffle service is enabled, register with the Yarn shuffle service already
// started on the NodeManager and, if authentication is enabled, provide it with our secret
// key for fetching shuffle files later
if (sparkConf.get(SHUFFLE_SERVICE_ENABLED)) {
val secretString = securityMgr.getSecretKey()
val secretBytes =
if (secretString != null) {
// This conversion must match how the YarnShuffleService decodes our secret
JavaUtils.stringToBytes(secretString)
} else {
// Authentication is not enabled, so just provide dummy metadata
ByteBuffer.allocate(0)
}
ctx.setServiceData(Collections.singletonMap("spark_shuffle", secretBytes))
}
// Send the start request to the ContainerManager
try {
//调用NMClientImpl的startContainer()方法在NM端启动container
nmClient.startContainer(container.get, ctx)
} catch {
case ex: Exception =>
throw new SparkException(s"Exception while starting container ${container.get.getId}" +
s" on host $hostname", ex)
}
}
注意,Spark不会一次性从resourcemanager获取所有资源,每次resourcemanager只能分配部分资源给appmaster,appmaster会启动定时线程,从resourcemanager中获取剩余资源:
private def allocationThreadImpl(): Unit = {
// The number of failures in a row until the allocation thread gives up.
val reporterMaxFailures = sparkConf.get(MAX_REPORTER_THREAD_FAILURES)
var failureCount = 0
while (!finished) {
try {
if (allocator.getNumExecutorsFailed >= maxNumExecutorFailures) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
s"Max number of executor failures ($maxNumExecutorFailures) reached")
} else if (allocator.isAllNodeExcluded) {
finish(FinalApplicationStatus.FAILED,
ApplicationMaster.EXIT_MAX_EXECUTOR_FAILURES,
"Due to executor failures all available nodes are excluded")
} else {
logDebug("Sending progress")
//
allocator.allocateResources()
}
failureCount = 0
}
//省略
try {
val numPendingAllocate = allocator.getNumContainersPendingAllocate
var sleepStartNs = 0L
var sleepInterval = 200L // ms
allocatorLock.synchronized {
sleepInterval =
if (numPendingAllocate > 0 || allocator.getNumPendingLossReasonRequests > 0) {
val currentAllocationInterval =
math.min(heartbeatInterval, nextAllocationInterval)
nextAllocationInterval = currentAllocationInterval * 2 // avoid overflow
currentAllocationInterval
} else {
nextAllocationInterval = initialAllocationInterval
heartbeatInterval
}
sleepStartNs = System.nanoTime()
allocatorLock.wait(sleepInterval)
}
val sleepDuration = System.nanoTime() - sleepStartNs
if (sleepDuration < TimeUnit.MILLISECONDS.toNanos(sleepInterval)) {
// log when sleep is interrupted
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval ms.")
// if sleep was less than the minimum interval, sleep for the rest of it
val toSleep = math.max(0, initialAllocationInterval - sleepDuration)
if (toSleep > 0) {
logDebug(s"Going back to sleep for $toSleep ms")
// use Thread.sleep instead of allocatorLock.wait. there is no need to be woken up
// by the methods that signal allocatorLock because this is just finishing the min
// sleep interval, which should happen even if this is signalled again.
Thread.sleep(toSleep)
}
} else {
logDebug(s"Number of pending allocations is $numPendingAllocate. " +
s"Slept for $sleepDuration/$sleepInterval.")
}
} catch {
case e: InterruptedException =>
}
}
}
6. 总结
- applicationmaster会向resourcemanger进行注册,resourcemanager将applicationmaster的ui等信息存放在zk中,当resourcemanager切换时,可以快速恢复proxy服务。
- spark applicationmaster启动线程,定时向resourcemanager请求资源。
- spark每次只能获取部分资源,剩下的资源由定时线程发出请求。
- applicationmaster获取资源后,定义好启动命令后,向nodemanager启动容器。