Spark源码分析之-scheduler模块
  WaYJTbj6RMqU 2023年11月02日 63 0


Background

Spark在资源管理和调度方式上采用了类似于Hadoop YARN的方式,最上层是资源调度器,它负责分配资源和调度注册到Spark中的所有应用,Spark选用Mesos或是YARN等作为其资源调度框架。在每一个应用内部,Spark又实现了任务调度器,负责任务的调度和协调,类似于MapReduce。本质上,外层的资源调度和内层的任务调度相互独立,各司其职。本文对于Spark的源码分析主要集中在内层的任务调度器上,分析Spark任务调度器的实现。

Scheduler模块整体架构

Spark源码分析之-scheduler模块_ide


TaskSchedulerListener

Spark源码分析之-scheduler模块_ide_02

Spark源码分析之-scheduler模块_Spark_03

RDD的依赖关系和Stage的分类

Spark源码分析之-scheduler模块_Dependency_04

Spark源码分析之-scheduler模块_依赖关系_05

DAGScheduler

Spark源码分析之-scheduler模块_依赖关系_06

privatevar taskScheduler:TaskScheduler={
//...
}
taskScheduler.start()
privatevar dagScheduler =newDAGScheduler(taskScheduler)
dagScheduler.start()

Spark源码分析之-scheduler模块_Spark_07

privatedef run(){
SparkEnv.set(env)
while(true){
val event= eventQueue.poll(POLL_TIMEOUT,TimeUnit.MILLISECONDS)
if(event!=null){
logDebug("Got event of type "+event.getClass.getName)
}
if(event!=null){
if(processEvent(event)){
return
}
}
val time =System.currentTimeMillis()// TODO: use a pluggable clock for testability
if(failed.size >0&& time > lastFetchFailureTime + RESUBMIT_TIMEOUT){
resubmitFailedStages()
}else{
submitWaitingStages()
}
}
}

Spark源码分析之-scheduler模块_依赖关系_08

Job的生与死


Spark源码分析之-scheduler模块_ide_09

def runJob[T, U:ClassManifest](
finalRdd: RDD[T],
func:(TaskContext,Iterator[T])=> U,
partitions:Seq[Int],
callSite:String,
allowLocal:Boolean,
resultHandler:(Int, U)=>Unit)
{
if(partitions.size ==0){
return
}
val (toSubmit, waiter)= prepareJob(
finalRdd, func, partitions, callSite, allowLocal, resultHandler)
eventQueue.put(toSubmit)
waiter.awaitResult() match {
caseJobSucceeded=>{}
caseJobFailed(exception:Exception)=>
logInfo("Failed to run "+ callSite)
throw exception
}
}

Spark源码分析之-scheduler模块_Spark_10

caseJobSubmitted(finalRDD, func, partitions, allowLocal, callSite, listener)=>
val runId = nextRunId.getAndIncrement()
val finalStage = newStage(finalRDD,None, runId)
val job =newActiveJob(runId, finalStage, func, partitions, callSite, listener)
clearCacheLocs()
if(allowLocal && finalStage.parents.size ==0&& partitions.length ==1){
runLocally(job)
}else{
activeJobs += job
resultStageToJob(finalStage)= job
submitStage(finalStage)
}

Spark源码分析之-scheduler模块_Dependency_11

privatedef submitStage(stage:Stage){
if(!waiting(stage)&&!running(stage)&&!failed(stage)){
val missing = getMissingParentStages(stage).sortBy(_.id)
if(missing ==Nil){
submitMissingTasks(stage)
running += stage
}else{
for(parent <- missing){
submitStage(parent)
}
waiting += stage
}
}
}


Spark源码分析之-scheduler模块_Dependency_12

privatedef getMissingParentStages(stage:Stage):List[Stage]={
val missing =newHashSet[Stage]
val visited =newHashSet[RDD[_]]
def visit(rdd: RDD[_]){
if(!visited(rdd)){
visited += rdd
if(getCacheLocs(rdd).contains(Nil)){
for(dep <- rdd.dependencies){
dep match {
case shufDep:ShuffleDependency[_,_]=>
val mapStage = getShuffleMapStage(shufDep, stage.priority)
if(!mapStage.isAvailable){
missing += mapStage
}
case narrowDep:NarrowDependency[_]=>
visit(narrowDep.rdd)
}
}
}
}
}
visit(stage.rdd)
missing.toList
}


Spark源码分析之-scheduler模块_ide_13

privatedef submitMissingTasks(stage:Stage){
val myPending = pendingTasks.getOrElseUpdate(stage,newHashSet)
myPending.clear()
var tasks =ArrayBuffer[Task[_]]()
if(stage.isShuffleMap){
for(p <-0until stage.numPartitions if stage.outputLocs(p)==Nil){
val locs = getPreferredLocs(stage.rdd, p)
tasks +=newShuffleMapTask(stage.id, stage.rdd, stage.shuffleDep.get, p, locs)
}
}else{
val job = resultStageToJob(stage)
for(id <-0until job.numPartitions if(!job.finished(id))){
val partition = job.partitions(id)
val locs = getPreferredLocs(stage.rdd, partition)
tasks +=newResultTask(stage.id, stage.rdd, job.func, partition, locs, id)
}
}
if(tasks.size >0){
myPending ++= tasks
taskSched.submitTasks(
newTaskSet(tasks.toArray, stage.id, stage.newAttemptId(), stage.priority))
if(!stage.submissionTime.isDefined){
stage.submissionTime =Some(System.currentTimeMillis())
}
}else{
running -= stage
}
}

Spark源码分析之-scheduler模块_ide_14

privatedef handleTaskCompletion(event:CompletionEvent){
val task =event.task
val stage = idToStage(task.stageId)
def markStageAsFinished(stage:Stage)={
val serviceTime = stage.submissionTime match {
caseSome(t)=>"%.03f".format((System.currentTimeMillis()- t)/1000.0)
case _ =>"Unkown"
}
logInfo("%s (%s) finished in %s s".format(stage, stage.origin, serviceTime))
running -= stage
}
event.reason match {
caseSuccess=>
...
task match {
case rt:ResultTask[_, _]=>
...
case smt:ShuffleMapTask=>
...
}
caseResubmitted=>
...
caseFetchFailed(bmAddress, shuffleId, mapId, reduceId)=>
...
case other =>
abortStage(idToStage(task.stageId), task +" failed: "+ other)
}
}

Spark源码分析之-scheduler模块_Spark_15

RDD的计算

Spark源码分析之-scheduler模块_Spark_16

overridedef run(attemptId:Long): U ={
val context =newTaskContext(stageId, partition, attemptId)
try{
func(context, rdd.iterator(split, context))
}finally{
context.executeOnCompleteCallbacks()
}
}

Spark源码分析之-scheduler模块_ide_17

overridedef run(attemptId:Long):MapStatus={
val numOutputSplits = dep.partitioner.numPartitions
val taskContext =newTaskContext(stageId, partition, attemptId)
try{
val buckets =Array.fill(numOutputSplits)(newArrayBuffer[(Any,Any)])
for(elem <- rdd.iterator(split, taskContext)){
val pair = elem.asInstanceOf[(Any,Any)]
val bucketId = dep.partitioner.getPartition(pair._1)
buckets(bucketId)+= pair
}
val compressedSizes =newArray[Byte](numOutputSplits)
val blockManager =SparkEnv.get.blockManager
for(i <-0until numOutputSplits){
val blockId ="shuffle_"+ dep.shuffleId +"_"+ partition +"_"+ i
val iter:Iterator[(Any,Any)]= buckets(i).iterator
val size = blockManager.put(blockId, iter,StorageLevel.DISK_ONLY,false)
compressedSizes(i)=MapOutputTracker.compressSize(size)
}
returnnewMapStatus(blockManager.blockManagerId, compressedSizes)
}finally{
taskContext.executeOnCompleteCallbacks()
}
}

Spark源码分析之-scheduler模块_ide_18


finaldef iterator(split:Partition, context:TaskContext):Iterator[T]={
if(storageLevel !=StorageLevel.NONE){
SparkEnv.get.cacheManager.getOrCompute(this, split, context, storageLevel)
}else{
computeOrReadCheckpoint(split, context)
}
}
private[spark]def computeOrReadCheckpoint(split:Partition, context:TaskContext):Iterator[T]={
if(isCheckpointed){
firstParent[T].iterator(split, context)
}else{
compute(split, context)
}
}

Spark源码分析之-scheduler模块_Spark_19

Spark源码分析之-scheduler模块_Spark_20

master match {
...
case SPARK_REGEX(sparkUrl)=>
val scheduler =newClusterScheduler(this)
val backend =newSparkDeploySchedulerBackend(scheduler,this, sparkUrl, appName)
scheduler.initialize(backend)
scheduler
case LOCAL_CLUSTER_REGEX(numSlaves, coresPerSlave, memoryPerSlave)=>
...
case _ =>
...
}
}
taskScheduler.start()

Spark源码分析之-scheduler模块_依赖关系_21

overridedef start(){
backend.start()
if(System.getProperty("spark.speculation","false")=="true"){
newThread("ClusterScheduler speculation check"){
setDaemon(true)
overridedef run(){
while(true){
try{
Thread.sleep(SPECULATION_INTERVAL)
}catch{
case e:InterruptedException=>{}
}
checkSpeculatableTasks()
}
}
}.start()
}
}

Spark源码分析之-scheduler模块_Spark_22

overridedef start(){
super.start()
val driverUrl ="akka://spark@%s:%s/user/%s".format(
System.getProperty("spark.driver.host"),System.getProperty("spark.driver.port"),
StandaloneSchedulerBackend.ACTOR_NAME)
val args =Seq(driverUrl,"","","")
val command =Command("spark.executor.StandaloneExecutorBackend", args, sc.executorEnvs)
val sparkHome = sc.getSparkHome().getOrElse(
thrownewIllegalArgumentException("must supply spark home for spark standalone"))
val appDesc =newApplicationDescription(appName, maxCores, executorMemory, command, sparkHome)
client =newClient(sc.env.actorSystem, master, appDesc,this)
client.start()
}

Spark源码分析之-scheduler模块_ide_23

overridedef start(){
val properties =newArrayBuffer[(String,String)]
val iterator =System.getProperties.entrySet.iterator
while(iterator.hasNext){
val entry = iterator.next
val (key, value)=(entry.getKey.toString, entry.getValue.toString)
if(key.startsWith("spark.")){
properties +=((key, value))
}
}
driverActor = actorSystem.actorOf(
Props(newDriverActor(properties)), name =StandaloneSchedulerBackend.ACTOR_NAME)
}

Spark源码分析之-scheduler模块_Dependency_24

overridedef submitTasks(taskSet:TaskSet){
val tasks = taskSet.tasks
logInfo("Adding task set "+ taskSet.id +" with "+ tasks.length +" tasks")
this.synchronized{
val manager =newTaskSetManager(this, taskSet)
activeTaskSets(taskSet.id)= manager
activeTaskSetsQueue += manager
taskSetTaskIds(taskSet.id)=newHashSet[Long]()
if(hasReceivedTask ==false){
starvationTimer.scheduleAtFixedRate(newTimerTask(){
overridedef run(){
if(!hasLaunchedTask){
logWarning("Initial job has not accepted any resources; "+
"check your cluster UI to ensure that workers are registered")
}else{
this.cancel()
}
}
}, STARVATION_TIMEOUT, STARVATION_TIMEOUT)
}
hasReceivedTask =true;
}
backend.reviveOffers()
}

Spark源码分析之-scheduler模块_依赖关系_25

// Make fake resource offers on just one executor
def makeOffers(executorId:String){
launchTasks(scheduler.resourceOffers(
Seq(newWorkerOffer(executorId, executorHost(executorId), freeCores(executorId)))))
}
// Launch tasks returned by a set of resource offers
def launchTasks(tasks:Seq[Seq[TaskDescription]]){
for(task <- tasks.flatten){
freeCores(task.executorId)-=1
executorActor(task.executorId)!LaunchTask(task)
}
}

Spark源码分析之-scheduler模块_Dependency_26


overridedef receive ={
caseRegisteredExecutor(sparkProperties)=>
...
caseRegisterExecutorFailed(message)=>
...
caseLaunchTask(taskDesc)=>
logInfo("Got assigned task "+ taskDesc.taskId)
executor.launchTask(this, taskDesc.taskId, taskDesc.serializedTask)
caseTerminated(_)|RemoteClientDisconnected(_, _)|RemoteClientShutdown(_, _)=>
...
}
def launchTask(context:ExecutorBackend, taskId:Long, serializedTask:ByteBuffer){
threadPool.execute(newTaskRunner(context, taskId, serializedTask))
}

Spark源码分析之-scheduler模块_Spark_27


classTaskRunner(context:ExecutorBackend, taskId:Long, serializedTask:ByteBuffer)
extendsRunnable{
overridedef run(){
SparkEnv.set(env)
Thread.currentThread.setContextClassLoader(urlClassLoader)
val ser =SparkEnv.get.closureSerializer.newInstance()
logInfo("Running task ID "+ taskId)
context.statusUpdate(taskId,TaskState.RUNNING, EMPTY_BYTE_BUFFER)
try{
SparkEnv.set(env)
Accumulators.clear()
val (taskFiles, taskJars, taskBytes)=Task.deserializeWithDependencies(serializedTask)
updateDependencies(taskFiles, taskJars)
val task = ser.deserialize[Task[Any]](taskBytes,Thread.currentThread.getContextClassLoader)
logInfo("Its generation is "+ task.generation)
env.mapOutputTracker.updateGeneration(task.generation)
val value = task.run(taskId.toInt)
val accumUpdates =Accumulators.values
val result =newTaskResult(value, accumUpdates)
val serializedResult = ser.serialize(result)
logInfo("Serialized size of result for "+ taskId +" is "+ serializedResult.limit)
context.statusUpdate(taskId,TaskState.FINISHED, serializedResult)
logInfo("Finished task ID "+ taskId)
}catch{
case ffe:FetchFailedException=>{
val reason = ffe.toTaskEndReason
context.statusUpdate(taskId,TaskState.FAILED, ser.serialize(reason))
}
case t:Throwable=>{
val reason =ExceptionFailure(t)
context.statusUpdate(taskId,TaskState.FAILED, ser.serialize(reason))
// TODO: Should we exit the whole executor here? On the one hand, the failed task may
// have left some weird state around depending on when the exception was thrown, but on
// the other hand, maybe we could detect that when future tasks fail and exit then.
logError("Exception in task ID "+ taskId, t)
//System.exit(1)
}
}
}
}

Spark源码分析之-scheduler模块_ide_28



【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  A32uB2Hhmc6N   2023年12月12日   51   0   0 MySQLMySQLideide
WaYJTbj6RMqU