FairScheduler迁移Capacity Scheduler实践
  GQ7psP7UJw7k 2023年11月13日 21 0

1. 背景

需求一

当前线上集群A还是使用fair scheduler,当调度请求量大时,经长发生pending:

Untitled.png

观察队列使用情况,发现队列并没有用满,这说明是调度性能问题。

Capacity Scheduler由于使用了细粒度锁、异步调度等特性,吞吐量能够提升5-10倍。因此希望集群能够升级调度器为Capacity Scheduler。

需求二

对于运行在AWS上的nodemanager,为了节省成本,选择了不定期回收的机器类型。在这些节点上运行的作业会失败重试,对于一些重要作业而言,这种行为不可接受。而Capacity Scheduler提供Node Label节点标签功能,可以给一些不可回收节点上的nodemanager打上标签,让重要作业运行在打过特定标签的节点上,保证作业顺利运行完成。

2.步骤

  1. 禁止fair-scheduler增删改操作。
  2. 将fair-scheduler.xml转换成为capacity-scheduler.xml。
  3. 修改yarn-site.xml配置,使用CapacityScheduler。
  4. 滚动重启resourcemanager。
  5. 未来3天持续观察yarn调度相关指标,检查是否有异常。

3. 转换fair-scheduler.xml成为capacity-scheduler.xml

FairScheduler和CapacityScheduler使用的配置文件的格式不同。对于fair-scheduler.xml,它的属性名放在了标签中:

<allocations>
    <pool name="root">
      <minSharePreemptionTimeout>600</minSharePreemptionTimeout>
      <weight>1</weight>
      <aclSubmitApps>hadoop</aclSubmitApps>
      <aclAdministerApps>hadoop,hive</aclAdministerApps>
      <maxResources>36864 mb,18 vcores</maxResources>
      <fairSharePreemptionTimeout>60</fairSharePreemptionTimeout>
      <userMaxJobsDefault>1000</userMaxJobsDefault>
      <minResources>18432 mb,9 vcores</minResources>
  </pool>
</allocations>

而capacity-scheduler.xml则使用固定的标签<name></name>指定属性名:

<configuration>
    <property>
        <name>yarn.scheduler.capacity.root.hadoop.ordering-policy</name>
        <value>fair</value>
        <final>false</final>
        <source>programmatically</source>
    </property>
    <property>
        <name>yarn.scheduler.capacity.root.maximum-capacity</name>
        <value>100</value>
        <final>false</final>
        <source>programmatically</source>
    </property>
</configuration>

因此,切换到CapacityScheduler前,需要将fair-scheduler.xml的文件内容转换成capacity-scheduler.xml能够解析的格式。

Hadoop3.3.0以上的版本中,引入了https://issues.apache.org/jira/browse/YARN-9698,它提供fs2cs 工具进行转换工作。如下所示,指定yarn-site.xml和fair-scheduler.xml,并制定输出目录,该工具就会在输出目录中生成capacity-scheduler.xml和yarn-site.xml,注意生成的yarn-site.xml中包含了CapacityScheduler相关的配置信息,将这些信息粘贴进线上的yarn-site.xml中即可。

其命令如下所示:

yarn fs2cs -y /home/hadoop/tmp/fair/yarn-site.xml -f /home/hadoop/tmp/fair/fair-scheduler.xml -o /home/hadoop/tmp/capacity/ -s

在执行工具时,查看日志,发现该工具不支持转换userMaxAppsDefault、minResources、maxResources等队列属性,需要手动调整:

Untitled 1.png

由于fs2cs工具存在缺陷,如果队列数量成百上千,那么手动调整的工作量非常大,因此选择自己编写转换工具。其思路如下:

  1. 读取fair-scheduler.xml内容到内存中。
  2. 将队列信息导出到capacity-scheduler.xml文件中。

代码片段如下:

#读取fair-scheduler.xml内容
Element root = document.getRootElement();
Element rootPool = root.element("pool");
List<CsQueue> csQueues = new ArrayList<>();
String rootPoolName = rootPool.attributeValue("name");
String rootMinResources = rootPool.elementText("minResources");
String rootMaxResources = rootPool.elementText("maxResources");
String rootAclSubmitApps = rootPool.elementText("aclSubmitApps");
String rootAclAdministerApps = rootPool.elementText("aclAdministerApps");

#转化成为capacity-scheduler.xml格式,保存到文件中
Element property = configuration.addElement("property");
Element name = property.addElement("name");
Element value = property.addElement("value");
name.setText(CAPACITY_CONFIG_PREFIX + queuePath + "." + propertyMap.getString(entry.getKey()));
value.setText(entry.getValue().toString());
File file = new File(csFile);
XMLWriter writer = new XMLWriter(new FileOutputStream(file), format);
writer.write(doc);

4. 修改yarn-site.xml配置

切换前:

<property>
        <name>yarn.resourcemanager.scheduler.class</name>
        <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.fair.FairScheduler</value>
    </property>  
    <!-- YARN-1458 -->
    <property>
        #禁止基于任务的权重大小对应用进行调度。如果打开,权重越大的任务越先执行
        <name>yarn.scheduler.fair.sizebasedweight</name>
        <value>false</value>
    </property>
    <property>
        #允许批量调度。心跳调度时每个nodemanager资源可以分配多个container,默认进行一次调度会导致nodemanager资源没有分配完,资源利用率不高
        <name>yarn.scheduler.fair.assignmultiple</name>
        <value>true</value>
    </property>
    <property>
        #应用如果没有满足本地行,可以设置跳过调度机会,跳过机会的阈值=跳过次数/节点总数。-1表示不跳过任何调度机会,存算分离时,这个配置应该关闭。
        <name>yarn.scheduler.fair.locality.threshold.node</name>
        <value>0.05</value>
    </property>
    <property>
        #批量调度时,单次最高可以分配container的上限,默认-1表示不设限
        <name>yarn.scheduler.fair.max.assign</name>
        <value>3</value>
    </property>

切换后:

<property>
        <name>yarn.resourcemanager.scheduler.class</name>
        <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.CapacityScheduler</value>
    </property>
    <property>
        #基于capacity-scheduler.xml文件保存队列配置信息
        <name>yarn.scheduler.configuration.store.class</name>
        <value>file</value>
    </property>
    <property>
        #一个应用只有一个appMaster,该配置限制所有appMaster占用资源最大能够占用集群资源的比例,用于限制并发。设置为1时,表示100%,不限制并发。
        <name>yarn.scheduler.capacity.maximum-am-resource-percent</name>
        <value>1.0</value>
    </property>
    <property>
        #最大应用数量
        <name>yarn.scheduler.capacity.maximum-applications</name>
        <value>1000000</value>
    </property>
    <property>
        #禁止自定义队列映射规则,使用默认的队列映射规则
        <name>yarn.scheduler.capacity.queue-mappings-override.enable</name>
        <value>false</value>
    </property>
    <property>
        #队列映射规则。u表示针对用户,第一个user是提交程序的用户,第二个user是对应的队列
        <name>yarn.scheduler.capacity.queue-mappings</name>
        <value>u:%user:%user</value>
    </property>
    <property>
        #基于内存和CPU对内存进行调度器资源进行比较。默认只基于内存进行比较。
        <name>yarn.scheduler.capacity.resource-calculator</name>
        <value>org.apache.hadoop.yarn.util.resource.DominantResourceCalculator</value>
    </property>
    <property>
        #启动节点标签功能
        <name>yarn.node-labels.enabled</name>
        <value>true</value>
    </property>
    <property>
        #在hdfs中保存节点标签内容
        <name>yarn.node-labels.fs-store.root-dir</name>
        <value>hdfs://集群代号/yarn/node-labels</value>
    </property>
    <property>
        #通过rm提供的命令设置每个nodemanager的标签,集中管理。
        <name>yarn.node-labels.configuration-type</name>
        <value>centralized</value>
    </property>
    <property>
      #禁止节点本地性执行
      <name>yarn.scheduler.capacity.node-locality-delay</name>
      <value>-1</value>
    </property>
    <property>
      #禁止机架本地行执行
      <name>yarn.scheduler.capacity.rack-locality-additional-delay</name>
      <value>-1</value>
    </property>

5. 指标观察即作业运行情况

集群A切换到CapacityScheduler后,作业有大量堆积,用户反馈作业执行明显变慢,AM资源等待较长时间进行分配:

Untitled 2.png

第二天观察调度耗时,container分配吞吐量等指标。对于已经切换完成的3个集群,它们的平均调度耗时对比如下:

AM平均调度耗时 container平均调度耗时
集群A 6 s 3.8 ms
集群B 155 ms 940 微秒
集群C 44 ms 676 微秒

三个集群指标差距过大,特别是刚刚切换的集群A,平均调度耗时已经达到6s:

Untitled 3.png

集群A的吞吐量已经从396下降到231:

Untitled 4.png

对CapacityScheduler调度逻辑进行排查,发现其在调度时,会根据每个队列的usedCapacity进行排序,选择usedCapacity最小的队列进行调度,这个过程耗时与队列数量有关:

队列数量 平均调度耗时
集群A 2706 3.8 ms
集群D 1700 3.7 ms
集群B 620 940 微秒
集群C 399 676 微秒

6. CapacityScheduler队列排序问题

https://blog.51cto.com/u_15327484/7894282文章中,讲解了Hadoop3.3.0之前,使用queueComparator基于队列资源使用率对队列进行排序。

在Hadoop3.0.0后,使用queueOrderingPolicy表示队列排序策略:

queueOrderingPolicy = csContext.getConfiguration().getQueueOrderingPolicy(
          getQueuePath(), parent == null ?
              null :
              ((ParentQueue) parent).getQueueOrderingPolicyConfigName());
      queueOrderingPolicy.setQueues(childQueues);

提供两种排序方法:utilization和priority-utilization。前者是指考虑资源使用率,后者还考虑了队列优先级。默认就是utilization,这和Hadoop2.6.0中的queueComparator规则一致。

utilization和priority-utilization都适用PriorityQueueComparator类进行排序,区别在于respectPriority参数,如果该参数为false,排序规则则是utilization;如果该参数是true,排序规则则是priority-utilization:

private int compare(PriorityQueueResourcesForSorting q1Sort,
        PriorityQueueResourcesForSorting q2Sort, float q1Used,
                        float q2Used, int q1Prior, int q2Prior) {

      int p1 = 0;
      int p2 = 0;
      if (respectPriority) {
        p1 = q1Prior;
        p2 = q2Prior;
      }

      int rc = PriorityUtilizationQueueOrderingPolicy.compare(q1Used, q2Used,
          p1, p2);

      // For queue with same used ratio / priority, queue with higher configured
      // capacity goes first
      if (0 == rc) {
        Resource minEffRes1 =
                q1Sort.configuredMinResource;
        Resource minEffRes2 =
                q2Sort.configuredMinResource;
        if (!minEffRes1.equals(Resources.none()) && !minEffRes2.equals(
            Resources.none())) {
          return minEffRes2.compareTo(minEffRes1);
        }

        float abs1 = q1Sort.absoluteCapacity;
        float abs2 = q2Sort.absoluteCapacity;
        return Float.compare(abs2, abs1);
      }

      return rc;
    }

每次调度时,会调用PriorityUtilizationQueueOrderingPolicy的getAssignmentIterator方法对队列进行排序:

// 对所有队列进行排序
  public Iterator<CSQueue> getAssignmentIterator(String partition) {
    // Since partitionToLookAt is a thread local variable, and every time we
    // copy and sort queues, so it's safe for multi-threading environment.
    PriorityUtilizationQueueOrderingPolicy.partitionToLookAt.set(partition);

    // Take snapshots of ParentQueue's PriorityQueueResourcesForSorting
    // 只对已有的队列使用新List进行排序,不考虑新增队列。否则会导致调度线程crash
    List<PriorityQueueResourcesForSorting> sortedQueueResources =
            queues.stream().
                    map(queue -> new PriorityQueueResourcesForSorting(queue)).
                    collect(Collectors.toList());

    // 使用PriorityQueueComparator进行排序
    Collections.sort(sortedQueueResources, new PriorityQueueComparator());
    // Return the reference queue iterator
    return sortedQueueResources.stream().
            map(sorting -> sorting.getQueue()).
            collect(Collectors.toList()).iterator();
  }

当队列数过多时,会降低调度效率。根据上述实践,当队列不超过1000时,可以直接切换到capacityScheduler中。

7. CapacityScheduler相关BUG修复

对于队列低于1000的Yarn集群,成功上线Capacity Scheduler后性能没有降低,但是会出现一些代码上的问题。

一:CapacityScheduler无法自动刷新队列

使用FairScheduler时,yarn后台线程会每10s检查一次配置文件fair-scheduler.xml有没有被修改,如果被修改则重新加载。而CapacityScheduler没有自动刷新的机制,修改capacity-scheduler.xml后需要执行 yarn rmadmin -refreshQueues 命令来刷新队列。

解决:引入https://issues.apache.org/jira/browse/YARN-10623即可。

配置队列刷新参数:

<property>
        <name>yarn.resourcemanager.scheduler.monitor.enable</name>
        <value>true</value>
    </property>
    <property>
        <name>yarn.resourcemanager.scheduler.monitor.policies</name>
        <value>org.apache.hadoop.yarn.server.resourcemanager.scheduler.capacity.QueueConfigurationAutoRefreshPolicy</value>
    </property>
    <property>
        <name>yarn.resourcemanager.queue.auto.refresh.monitoring-interval</name>
        <value>10000</value>
    </property>

查看RM日志,队列刷新完成:

Untitled 6.png

二:CapacityScheduler无法提交应用

当CapacityScheduler队列配置的是绝对资源时,叶子队列设置的maximum-applications无效,会被重新计算,计算方式如下:

队列的maximum-applications = 队列资源占比 * yarn.scheduler.capacity.maximum-applications(默认为10000)

tsinglong集群队列数量众多,平均资源占比只有0.1%,重新设置的max applications参数可能会很小

而且如果集群开启了NodeLabel功能,一个队列在所有分区的maxApplications参数会被最后加载的分区的maxApplications覆盖,而有标签分区队列默认的capacity为0,maxApplications也会被重新设置为0,导致无法提交应用。

解决方法:删除重新计算maximum-applications的逻辑:

Untitled 7.png

删除后发现maximum-applications恢复到设置的绝对值:

Untitled 8.png

三:CapacityScheduler队列名称指定错误

  1. 如果在CapacityScheduler中使用队列路径提交作业,会报错unknown queue。

Untitled 9.png

  1. 正在运行的作业如果切换到CapacityScheduler,会报错找不到队列被kill。

Untitled 10.png

原因是FairScheduler需要叶子队列路径,而CapacityScheduler需要叶子队列名称。

解决方法:引入Hadoop3.3.0中的patch:https://issues.apache.org/jira/browse/YARN-9879

四:CapacityScheduler队列名称映射错误

在FairScheduler中,队列名称中的. 需要用_dot_ 代替,提交作业时会自动映射:

Untitled 11.png

但在CapacityScheduler中不可以,会识别不到队列

Application application_1634786150614_4929 submitted by user wb.chenxiaoqi to unknown queue: wb.chenxiaoqi

解决方法:CapacityScheduler的队列名称可以直接使用.

8. 后续操作

由于集群调度性能下降,第二天直接将调度器回滚到fair-scheduler。为了优化fair-scheduler,后续的规划是:

  1. 使用yarn-federation,将yarn切分成小集群,每个集群的队列数不超过1000,这样可以解决排序性能问题。
  2. 由于yarn-federation需要调研、测试、灰度流程,上线时间会推迟。因此可以先在fair scheduler中上线并发调度功能,减轻调度压力。
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月13日 0

暂无评论

推荐阅读
  dhQTAsTc5eYm   2023年12月23日   51   0   0 HadoopHadoopapacheapache
GQ7psP7UJw7k
最新推荐 更多

2024-05-03