Yarn SLS代码分析及实践
  GQ7psP7UJw7k 2023年11月02日 35 0

1. 背景

https://blog.51cto.com/u_15327484/7894282文章中,介绍了Yarn的两种调度器。在https://blog.51cto.com/u_15327484/7920197文章中,介绍了FairScheduler迁移Capacity Scheduler的迁移实践。

在实际迁移之前,必须要确保Capacity Scheduler能够达到足够的收益,即吞吐率或调度时间等指标都有所优化,才能开启迁移操作。

而为了评估迁移收益,很难直接通过搭建1k+个NodeManager的Yarn集群来测试调度器性能,这样太浪费成本。为了解决这个问题,Hadoop官方提供了Scheduler load Simulator工具,可以用它在单机上模拟超大Yarn集群,测试调度器性能。

2. Yarn SLS概要

为了压测调度器性能,需要实现两方面的需求:

  1. 启动压测的集群,该集群能够读取作业,并根据作业信息进行调度。
  2. 提供作业信息,最好根据线上已经运行过的作业,再跑到SLS集群中测试。

对于作业信息的输入,Hadoop提供了Rumen工具,可以读取Mapreduce中historyServer保存的作业历史信息,生成作业信息文件sls-jobs.json 和节点信息文件sls-nodes.json

对于集群的模拟,启动SLS脚本时,会启动三个重要组件:

  1. 启动ResourceManager进程,用于接受资源请求。
  2. 启动NMSimulator,读取sls-nodes.json 文件,获取文件中的节点信息,通过线程池的方法将每个节点的心跳请求发送给ResourceManager。
  3. 启动AMSimulator,读取sls-jobs.json 文件,获取文件中的作业信息,通过线程池的方法将每个节点的资源请求发送给ResourceManager。

其架构图如下所示:

Untitled.png

注意,SLS的职责并不是执行任务,而是执行资源的分配。因此ResourceManager接受AM的请求后,不会向NM发送启动作业的请求;分配完资源后,作业直接结束。

3. SLS重要执行流程解析

在hadoop包的share/hadoop/tools/sls/bin目录下,有脚本slsrun.sh,它就是SLS的启动脚本。脚本中,最重要的启动命令如下:

hadoop_java_exec sls org.apache.hadoop.yarn.sls.SLSRunner ${args}

它会通过java命令执行SLSRunner类,SLSRunner启动过程如下:

  1. 启动ResourceManager服务。
  2. 启动NodeManager心跳线程。
  3. 启动AM资源请求线程。
public void start() throws IOException, ClassNotFoundException, YarnException,
      InterruptedException {

    enableDNSCaching(getConf());

    // start resource manager
    //启动ResourceManager服务。
    startRM();
    // start node managers
    //启动NodeManager心跳线程。
    startNM();
    // start application masters
    //启动AM资源请求线程。
    startAM();
    // set queue & tracked apps information
    ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
        .setQueueSet(this.queueAppNumMap.keySet());
    ((SchedulerWrapper) rm.getResourceScheduler()).getTracker()
        .setTrackedAppSet(this.trackedApps);
    // print out simulation info
    printSimulationInfo();
    // blocked until all nodes RUNNING
    waitForNodesRunning();
    // starting the runner once everything is ready to go,
    runner.start();
  }

3.1 ResourceMananger启动

它调用startRM方法启动ResourceManager服务。注意,这个ResourceManager服务重写了createAMLauncher方法:

    final SLSRunner se = this;
    rm = new ResourceManager() {
      @Override
      protected ApplicationMasterLauncher createAMLauncher() {
        return new MockAMLauncher(se, this.rmContext, appIdAMSim);
      }
    };

    // Across runs of parametrized tests, the JvmMetrics objects is retained,
    // but is not registered correctly
    JvmMetrics jvmMetrics = JvmMetrics.initSingleton("ResourceManager", null);
    jvmMetrics.registerIfNeeded();

    // Init and start the actual ResourceManager
    rm.init(rmConf);
    rm.start();

返回的ApplicationMasterLauncher对象是MockAMLauncher,该类handle方法用户指定am的启动行为,这里AM不会启动容器了,如果调度到资源,就结束执行。(对于正常的ApplicationMasterLauncher,是会通知NM启动容器的)。如下所示:

         Container amContainer = event.getAppAttempt().getMasterContainer();

          setupAMRMToken(event.getAppAttempt());

          // Notify RMAppAttempt to change state
          super.context.getDispatcher().getEventHandler().handle(
              new RMAppAttemptEvent(event.getAppAttempt().getAppAttemptId(),
                  RMAppAttemptEventType.LAUNCHED));

          ams.notifyAMContainerLaunched(
              event.getAppAttempt().getMasterContainer());
          LOG.info("Notify AM launcher launched:" + amContainer.getId());

          se.getNmMap().get(amContainer.getNodeId())
              .addNewContainer(amContainer, 100000000L);

          return;

3.2 NodeManager和ApplicationMaster线程模型

在介绍NM和AM实现之前,先了解下它的线程池模型。SLSRunner的成员变量TaskRunner维护了一个线程池。NM的心跳任务和AM的资源申请任务都会提交到该线程池中,向ResourceManager发送请求。

TaskRunner包含了一个队列queue对象,通过schedule方法向queue中添加任务。queue作为线程池的参数,会直接将queue中的任务放到线程池中执行:

private static TaskRunner runner = new TaskRunner();
private DelayQueue queue;

executor = new ThreadPoolExecutor(threadPoolSize, threadPoolSize, 0,
      TimeUnit.MILLISECONDS, queue);

private void schedule(Task task, long timeNow) {
    task.timeRebase(timeNow);
    task.setQueue(queue);
    queue.add(task);
  }

NM和AM都继承了TaskRunner的内部类Task,实现的run方法。它们的任务依次执行firstStep、middleStep、lastStep。如下:

public final void run() {
      try {
        if (nextRun == startTime) {
          firstStep();
          nextRun += repeatInterval;
          if (nextRun <= endTime) {
            queue.add(this);          
          }
        } else if (nextRun < endTime) {
          middleStep();
          nextRun += repeatInterval;
          queue.add(this);
        } else {
          lastStep();
        }
      } catch (Exception e) {
        e.printStackTrace();
        Thread.getDefaultUncaughtExceptionHandler()
            .uncaughtException(Thread.currentThread(), e);
      }
    }

3.3 NodeManager心跳上报

SLS对NodeManager逻辑进行简化,阉割启动容器的能力。直接从sls-nodes.json获取所有NodeManager节点信息,解析到的每个节点,都进行心跳上报。

如下,NMSimulator通过多线程的方式讲NM心跳任务放到TaskRunner中的线程池中:


public class NMSimulator extends TaskRunner.Task {

//读取sls-nodes.json文件,获取节点信息
nodeSet = SLSUtils.parseNodesFromSLSTrace(inputTrace);
//创建线程池
ExecutorService executorService = Executors.
        newFixedThreadPool(threadPoolSize);
    for (Map.Entry<String, Resource> entry : nodeResourceMap.entrySet()) {
      executorService.submit(new Runnable() {
        @Override public void run() {
          try {
            // we randomize the heartbeat start time from zero to 1 interval
            NMSimulator nm = new NMSimulator();
            Resource nmResource = nodeManagerResource;
            String hostName = entry.getKey();
            if (entry.getValue() != null) {
              nmResource = entry.getValue();
            }
            nm.init(hostName, nmResource,
                random.nextInt(heartbeatInterval),
                heartbeatInterval, rm, resourceUtilizationRatio);
            nmMap.put(nm.getNode().getNodeID(), nm);
            //将NM包含的心跳任务放到runner中的线程池中
            runner.schedule(nm);
            rackSet.add(nm.getNode().getRackName());
          } catch (IOException | YarnException e) {
            LOG.error("Got an error while adding node", e);
          }
        }
      });
}

线程执行时,依次执行firstStep、middleStep、lastStep方法,NMSimulator只实现了middleStep方法。它负责进行心跳上报:

if (resourceUtilizationRatio > 0 && resourceUtilizationRatio <=1) {
      int pMemUsed = Math.round(node.getTotalCapability().getMemorySize()
          * resourceUtilizationRatio);
      float cpuUsed = node.getTotalCapability().getVirtualCores()
          * resourceUtilizationRatio;
      ResourceUtilization resourceUtilization = ResourceUtilization.newInstance(
          pMemUsed, pMemUsed, cpuUsed);
      ns.setContainersUtilization(resourceUtilization);
      ns.setNodeUtilization(resourceUtilization);
    }
    beatRequest.setNodeStatus(ns);
    NodeHeartbeatResponse beatResponse =
        rm.getResourceTrackerService().nodeHeartbeat(beatRequest);

3.4 AM资源请求

AMSimulator读取sls-jobs.json 文件,获取其中的任务信息:

try (Reader input = new InputStreamReader(
        new FileInputStream(inputTrace), "UTF-8")) {
      Iterator<Map> jobIter = mapper.readValues(
          jsonF.createParser(input), Map.class);

      while (jobIter.hasNext()) {
        try {
          createAMForJob(jobIter.next());
        } catch (Exception e) {
          LOG.error("Failed to create an AM: {}", e.getMessage());
        }
      }
    }

封装请求后,放倒入TaskRunner.queue中。依次执行firstStep、middleStep、lastStep方法。

firstStep中,提交任务:

rm.getClientRMService().submitApplication(subAppRequest);

middleStep中,处理响应,发送容器请求:

public void middleStep() throws Exception {
    if (isAMContainerRunning) {
      // process responses in the queue
      processResponseQueue();

      // send out request
      sendContainerRequest();

      // check whether finish
      checkStop();
    }
  }

lastStep中,发送作业完成信息:

rm.getApplicationMasterService().finishApplicationMaster(finishAMRequest);

4. Mapreduce HistoryServer日志介绍

SLS需要HistoryServer保存的作业统计信息来模拟集群和作业请求。需要线了解下HistoryServer作业历史信息相关内容。

首先,在mapred-site.xml,可以看到HistoryServer相关配置:

<property>
    #作业历史文件的保存位置,设置none表示不记录日志,如果开启intermediate-done-dir和done-dir就可以记录了
    <name>hadoop.job.history.user.location</name>
    <value>none</value>
  </property>
   <property>
    #JobHistory服务器地址,与ResourceManager、NodeManager、NameNode等组件通信
    <name>mapreduce.jobhistory.address</name>
    <value>启动historyserver服务的机器ip:10020</value>
   </property>
   <property>
     #供用户访问的地址
     <name>mapreduce.jobhistory.webapp.address</name>
     <value>启动historyserver服务的机器ip:19888</value>
   </property>
  <property>
     #存储作业执行状态信息的临时目录
     <name>mapreduce.jobhistory.intermediate-done-dir</name>	
     <value>/user/history/temp</value>
  </property>	
  <property>
     #存储作业执行完成后,将临时目录的文件转移到该目录下
     <name>mapreduce.jobhistory.done-dir</name>	
    <value>/user/history/done</value>
  </property>
  
   <!-- MapReduce Job History Server security configs -->
   <property>
     <name>mapreduce.jobhistory.keytab</name>
     <value>{% $KEYTAB_DIR %}/{% $DFS_NAMESERVICE %}.keytab</value>
   </property>
   <property>
     <name>mapreduce.jobhistory.principal</name>
     <value>hadoop/{%- $jobhisoryserver -%}@NIE.NETEASE.COM</value>
   </property>
  <property>
    #开启历史作业信息定时清理功能
    <name>mapreduce.jobhistory.cleaner.enable</name>
    <value>true</value>
  </property>
  <property>
    #每2.5小时清理一次历史作业信息
    <name>mapreduce.jobhistory.cleaner.interval-ms</name>
    <value>8640000</value>
   </property>
  <property>
    #历史作业信息最多存活存活2天
    <name>mapreduce.jobhistory.max-age-ms</name>
    <value>172800000</value>
   </property>
  <property>
    #能同时处理历史记录请求的线程数量
    <name>mapreduce.jobhistory.client.thread-count</name>
    <value>256</value>
   </property>
  <property>
    #将历史状态信息从intermediate-done-dir临时目录移动到done-dir的线程数
    <name>mapreduce.jobhistory.move.thread-count</name>
    <value>32</value>
   </property>

可以从mapreduce.jobhistory.done-dir指定路径中找到历史作业数据文件:

Untitled 1.png

job_xxx_conf.xml保存作业配置信息:

Untitled 2.png

job_xxx.jhist保存作业历史执行相关状态信息:

Untitled 3.png

5. SLS实践

首先,需要使用hadoop的rumen工具,将historyserver中的日志转为rumen格式的历史作业数据文件:job-trace.json和job-topology.json:

hadoop jar ~/hadoop/share/hadoop/tools/lib/hadoop-rumen-2.6.0-cdh5.6.0.jar \
org.apache.hadoop.tools.rumen.TraceBuilder \
file:///home/hadoop/var/rumen/job-trace.json \
file:///home/hadoop/var/rumen/job-topology.json \
hdfs:///user/history/done/2022/11/25/000358

再将job-trace.json文件转为sls-nodes.json和sls-jobs.json文件:

~/hadoop/share/hadoop/tools/sls/bin/rumen2sls.sh --rumen-file=/home/brightness/rumen/job-trace.json --output-dir=/home/hadoop/var/sls

sls-jobs.json文件如下:

image.png

sls-nodes.json文件如下: image.png SLS将sls-nodes.json和sls-jobs.json文件作为输入,执行:

cd /home/hadoop/hadoop/share/hadoop/tools/sls
/home/hadoop/hadoop/share/hadoop/tools/sls/bin/slsrun.sh \
--input-sls=/home/hadoop/var/sls/sls-jobs.json \
--nodes=/home/hadoop/var/sls/sls-nodes.json \
--output-dir=/home/hadoop/var/sls/out1 \
--print-simulation > /home/hadoop/var/sls/out1/run.log 2>&1

可以查看网页查看resourcemanager及模拟过程:

http://启动ip:8891/simulate

http://启动ip:8088/cluster/apps/RUNNING

执行结束后,生成调度耗时、心跳耗时、Container平均耗时等信息。当然还有各种指标的曲线图:

Untitled 4.png

它和日志聚合不同,日志聚合是yarn自身提供的服务。

通过上述流程,通过在yarn-site.xml中设置不同的调度器,就能够模拟不同调度器的性能结果。同时,它还可以将同一种调度在不同配置下的性能表现进行对比。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
GQ7psP7UJw7k
最新推荐 更多

2024-05-31