Spark四种性能调优思路(三)——shuffle调优
  ILwIY8Berufg 2023年11月13日 33 0

Spark中的性能消耗主要都是在shuffle环节,对shuffle部分进行调优是很有必要的

Spark中负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager。在0.8的版本中出现了优化之后的HashShuffleManager,同时在spark1.2的版本出现的SortShuffleManager成为了默认的shuffle处理方式,目前的版本就只有一个SortShuffleManager。但是SortShuffleManager,也有普通和排序的SortShuffleManager之分。


一、HashShuffleManager

  • 未经优化的HashShuffleManager

总结:

这种未经优化的shuffle处理过程,每一个shuffleMapTask会为下游的ReduceTask生成一个磁盘blockFile文件。 所以,如果上游有

1000个shuffleMapTask ,下游有100个ReduceTask,会生成1000*100=10w个磁盘文件,所以这种shuffle操作,会生成大量的磁盘文件,性能很差,所以spark在0.8的时候做了性能优——ShuffleGroup

  • 经过优化的HashShuffleManager 总结:

经过优化之后的hashshuffle处理过程,是有每一个cpu core上面运行的多个shufflemaptask,为下游的一个reduce task创建一个buffer缓冲区 ,一个磁盘文件,多次写入的多被合并,所以,此时如果上游有1000个shufflemaptask,cpucore 50个,reducetask是100个,生成的磁盘问价50 * 100 = 5k ,生成的磁盘文件数量要比第一种少很多。同时把在一个cpu core处理的过程,我们称之为一个shuffle-group。

二、SortShuffleManager

  • 普通的sortShuffleManager

Spark四种性能调优思路(三)——shuffle调优_数据

总结:

为了处理那些在shufle过程当中需要进行排序的操作, sortShuffleManager一开始明没有直接将数据从缓冲区送出,落地到磁盘;而是先根据下游的reduce个数,进行内存级别的分区,针对这多个分区进行排序,将排序之后的结果批量写入内存缓冲区域中,缓冲区域写满之后落地到磁盘文件,一个缓冲区对应一个磁盘文件 ,此时就和未经优化的HashufleManager没有什么两样,所以对此时(一个shuffleMapTask生成的多个磁盘文件)做了磁盘文件的合并,合并成一个磁盘文件,同时为了标识清楚合并之后的结果,最后会给哪一部分属于对应的reduceTask ,生成一个索引文件阈值对应,reduceTask便可以读取数据。这种情况下游多少个磁盘文件,此时生成的磁盘文件个数就是cpu-core的个数

  • 开启ByPass机制的sortShufflerManager

Spark四种性能调优思路(三)——shuffle调优_spark_02

总结:并不是所有的shuffle操作都需要进行排序,对于那些不需要的排序的shuffle操作,使用上一种普通的SortShuffleManager,性能反而不高,因为做了不必要的排序操作,所以spark便提供了在sortShuffleManager基础之上提供了一个ByPass机制,如果不需要进行排序,我们就可以开启ByPass基础,在shuffle过程中跳过排序。设置这个参数spark.shuffle.sort.bypassMergeThreshold= 200,改参数指定的开启bypass的最大的分区数,也就是说当spark作业的并行度或者分区数高于200 ,就会走普通的sortShuffleManager过程,低于200执行ByPassw机制,所以如果不想执行的排序操作,应该尽可能的调大这个参数。

三、shuffle调优参数

参数

默认值

描述

spark.reducer.maxSizeInFlight

48M

每一次reduce拉取的数据的最大值,默认值48m,如果网络较好、spark数据很多,为了较少拉取的次数,可以适当的将这个值调大,比如96m。合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.compress

true

shuffle-write输出到磁盘的文件是否开启压缩,默认为true,开启压缩,同时配合spark.io.compression.codec(压缩方式)使用。

spark.shuffle.file.buffer

32k

shuffle过程中,一个磁盘文件对应一个缓存区域,默认大小32k,为了较少写磁盘的次数,在内存充足的条件下可以适当的调大该值,比如48k,64k都可以,进而提升性能。合理调节该参数,性能会有1%~5%的提升。

spark.shuffle.io.maxRetries

3

shuffle过程中为了避免失败,会配置一个shuffle的最大重试次数,默认为3次,如果shuffle的数据比较多,就越容易出现失败,此时可以调大这个值,比如10次,这样可以大幅度提升稳定性。

spark.shuffle.io.retryWait

5s

两次失败重试之间的间隔时间,默认5s,如果多次失败,显然问题在于网络不稳定,所以我们为了保证程序的稳定性,调大该参数的值,比如30s,60s都可以,同样可以增加程序的稳定性。

spark.shuffle.sort.bypassMergeThreshold

200

是否开启byPass机制的阈值。

spark.shuffle.memoryFraction

0.2

在executor中进行reduce拉取到的数据进行reduce聚合操作的内存默认空间大小,默认占executor的20%的内存,如果持久化操作相对较少,shuffle操作较多,就可以调大这个参数,比如0.3。

spark.shuffle.manager

sort

该参数用于设置ShuffleManager的类型。如果程序中需要排序机制,使用默认即可。如果不需要排序,可以将其设置为hash(推荐)或tungsten-sort(慎用)。

spark.shuffle.consolidateFiles

false

如果使用HashShuffleManager,该参数有效。如果设置为true,那么就会开启consolidate机制,会大幅度合并shuffle write的输出文件,对于shuffle read task数量特别多的情况下,这种方法可以极大地减少磁盘IO开销,提升性能。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月13日 0

暂无评论

推荐阅读
ILwIY8Berufg
最新推荐 更多

2024-05-31