Flink调优-资源配置
  qYWFTniLcEgF 2023年11月02日 69 0

一、资源配置优化

1、并行度

1.1 并行度设置

并行度(parallelism):为任务提供足够的并行度,但并行度也不是越大越好,太多会加重数据在多个solt/task manager之间数据传输压力,包括序列化和反序列化带来的压力。

一个任务的并行度设置可以从4个层面指定:

  • Operator Level(算子层面)
  • Execution Environment Level(执行环境层面)
  • Client Level(客户端层面)
  • System Level(系统层面)

这些并行度的优先级为Operator Level>Execution Environment Level>ClientLevel>System Level

1.2 如何设置合适的并行度

  • 数据源的并行度:数据源的并行度应该等于或大于最终算子的并行度,以充分利用系统资源。
  • 算子逻辑的复杂度:算子逻辑越复杂,相应的并行度就需要更高才能提供足够的吞吐量。因此,在确定算子并行度时,应该考虑算子逻辑的复杂度。
  • 系统资源的可用性:在确定算子并行度时,应该考虑系统可用的资源。例如,如果系统的CPU和内存资源有限,那么高并行度可能会导致任务竞争和低性能。
  • 数据倾斜:数据倾斜是指数据分布不均衡,这可能会导致某些算子的并行度成为瓶颈。在这种情况下,可以通过调整算子的并行度来解决数据倾斜问题。
  • 实验调整:最终的并行度设置可能需要通过实验来进行调整。可以通过监视和调整算子的并行度来观察和优化作业性能。

1.3 并行度设置不合理会导致哪些问题

  • 反压:Flink中某些算子的并行度设置过小可能会导致反压的情况。反压是指某个算子的输出速度超过了下游算子的处理速度,导致下游算子的任务堆积和延迟。这可能会导致整个作业的性能下降和任务失败。可以通过web ui查看作业的反压情况(具体另文说明)
  • 过高的并行度设置可能会导致资源浪费。如果某个算子的并行度设置过高,该算子可能会占用过多的系统资源,从而影响整个作业的性能。此外,过高的并行度设置还可能会导致任务之间的竞争,从而影响系统的稳定性。
  • 过高或过低的并行度设置还可能会导致数据倾斜。如果某个算子的并行度设置过高或过低,可能会导致数据倾斜,从而影响整个作业的性能。过高的并行度设置可能会导致某些子任务负载过重,从而导致数据倾斜。过低的并行度设置可能会导致某些子任务负载过轻,从而导致数据倾斜。
  • 并行度的限制:Source和Sink环节的并行度设置,要结合相应connector的特性。比如:mysql-cdc binlog的并行度通常为1;写clickhouse的并行度不能太大,太大了可能会影响clickhouse的写性能。

备注:mysql-cdc connector只能设置一个并行度,主要可能有这些原因:

  • mysql binlog本质上是一个文件,多个并行度消费需要避免重复。
  • 多个并行度消费难以保证顺序

2、Flink内存管理及优化

Flink内存介绍的官网文档地址:https://nightlies.apache.org/flink/flink-docs-release-1.17/docs/deployment/memory/mem_setup/

2.1 堆内内存(on-heap memory)和堆外内存(off-heap memory)

在介绍Flink内存之前,先简单介绍下,JVM中堆内内存(on-heap memory)和堆外内存(off-heap memory)。堆外内存和堆内内存是相对的二个概念。

2.1.1 堆内内存(jvm heap memory)

其中堆内内存是运行JAVA程序时经常用到的内存配置,在jvm参数中只要使用-Xms,-Xmx等参数就可以设置堆的大小和最大值。

在使用堆内内存(heap memory)的时候,遵守JVM虚拟机的内存管理机制,采用垃圾回收器(GC)统一进行内存管理,GC会在某些特定的时间点进行一次彻底回收,也就是Full GC,GC会对所有分配的堆内内存进行扫描,在这个过程中会对JAVA应用程序的性能造成一定影响,还可能会产生程序异常或退出。

JAM堆的计算公式:堆内内存 = 新生代+老年代+持久代

2.1.2 堆外内存(off-heap memory)

和堆内内存相对应,堆外内存就是把内存对象分配在Java虚拟机的堆以外的内存,这些内存直接受操作系统管理(而不是虚拟机),这样做的结果就是能够在一定程度上减少垃圾回收对应用程序造成的影响。

使用堆外内存的优点:

  • 减少了垃圾回收:因为垃圾回收会暂停其他的工作。
  • 加快了复制的速度:堆内在flush到远程时,会先复制到直接内存(非堆内存),然后再发送;而堆外内存相当于省略掉了这个工作。

2.2 Flink内存结构

Flink JVM 进程的进程总内存(Total Process Memory)包含了由 Flink 应用使用的内存(Flink 总内存)以及由运行 Flink 的 JVM 使用的内存。 Flink 总内存(Total Flink Memory)包括 JVM 堆内存(Heap Memory)和堆外内存(Off-Heap Memory)。,其中堆外内存包括直接内存(Direct Memory)和本地内存(Native Memory)。

2.2.1 JobManager的内存结构

Flink调优-资源配置_数据倾斜


Flink调优-资源配置_Memory_02

2.2.2 TaskManager的内存结构

Flink调优-资源配置_数据倾斜_03


Flink调优-资源配置_Memory_04

2.3 内存参数配置

2.3.1 不同运行模式的内存参数

独立部署模式下,我们通常更关注 Flink 应用本身使用的内存大小。 建议配置 Flink 总内存taskmanager.memory.flink.size 或者 jobmanager.memory.flink.size)或其组成部分

容器化部署模式(Containerized Deployment)下(Kubernetes 或 Yarn),建议配置进程总内存taskmanager.memory.process.size 或者 jobmanager.memory.process.size)。 该配置参数用于指定分配给 Flink JVM 进程的总内存,也就是需要申请的容器大小。

用户需要至少选择其中一种进行配置(本地运行除外),否则 Flink 将无法启动。 这意味着,用户需要从以下无默认值的配置参数(或参数组合)中选择一个给出明确的配置:

不建议同时设置进程总内存和 Flink 总内存。 这可能会造成内存配置冲突,从而导致部署失败。 额外配置其他内存部分时,同样需要注意可能产生的配置冲突。

2.3.2 Flink内存配置建议

正常情况下,只需要配置Flink总内存或进程总内存,就可以了。可以通过yarn web ui,查看每个Job的内存配置。

Flink调优-资源配置_数据倾斜_05

Flink调优-资源配置_数据倾斜_06

针对一些特殊情况的应用程序,需要对内存结构中的各个内存组成部分(不同颗粒度)进行更细致的配置。可以通过yarn web ui页面查看各内存组成部分的配置值及内存使用情况。

Flink调优-资源配置_Memory_07

Flink调优-资源配置_数据倾斜_08

内存模型中的各组成部分,在整个Flink程序中有不同的使用场景,具体配置方式及使用场景,可以参考官网的文档:

JobManager 内存配置:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/memory/mem_setup_jobmanager/

TaskManager 内存配置:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/memory/mem_setup_tm/

当出现内存异常时,根据日志打印的异常信息,可以下面的页面找到对应的配置方法:

Flink内存的常见问题:https://nightlies.apache.org/flink/flink-docs-release-1.17/zh/docs/deployment/memory/mem_trouble/

2.4 内存优化-应用程序侧

Flink任务需要大量的内存来存储数据和状态信息。因此,我们要尽可能地减少内存的使用量。可以通过以下几种方式来实现:

  • 使用更小的窗口大小:窗口大小越大,需要的内存就越多,使用更小的窗口来减少内存的使用量。
  • 使用更小的数据类型:Flink支持多种数据类型,使用更小的数据类型来减少内存的使用量。
  • 使用更小的并行度:并行度越大,需要的内存越多,使用更小的并行度来减少内存的使用量。


3 配置Flink运行参数

3.1 运行命令

yarn-cluster的运行模式为例进行说明。

运行命名及参数说明

flink run -m yarn-cluster -ys 2 -p 1 -yjm 1G -ytm 2G

可以用下面的命令,查看相关的参数及说明

flink run -help

3.2 常用参数

摘选出四个跟资源配置相关的参数。

-p,--parallelism <parallelism>             The parallelism with which to
                                            run the program. Optional flag
                                            to override the default value
                                            specified in the configuration.


-yjm,--yarnjobManagerMemory <arg>    Memory for JobManager Container with
                                      optional unit (default: MB)

-ytm,--yarntaskManagerMemory <arg>   Memory per TaskManager Container with
                                      optional unit (default: MB)
									  
-ys,--yarnslots <arg>                Number of slots per TaskManager

参数对应的含义是:任务并行度、JobManager内存、每个TaskManager内存、每个TaskManager的Slot数量。

TaskManager的数量不需要直接配置,是通过任务及上面四个参数,计算出来的。各资源的计算逻辑如下:

资源名称

计算值\配置值\默认值

说明

任务并行度

-p

程序运行的并行度,不指定时会取默认值

JobManager内存

-yjm

JobManager进程的总内存,参考JobManager内存模型

每个TaskManager内存

-ytm

每个TaskManager进程的总内存,参考TaskManager内存模型

每个TaskManager的Slot数量

-ys

每个TaskManager的Slot数量配置,影响并行度和TaskManager的数量。

JobManager 的数量

固定值为1

一个任务只能有一个active的JobManager

JobManager 的内存

1 * 配置值(-yjm)的大小


TaskManager 的数量

配置值(-p)/ 配置值(-ys)

并行度总数 / 每个 TaskManager 的 Slot 数量,为非整数时向下取整

TaskManager (单个)内存

配置值(-ytm)

每个 TaskManager 的内存容量

TaskManager 的内存总量

TaskManager 的数量 * 每个TaskManager 的内存


每个 Slot 的内存容量

配置值(-ytm) / 配置值(-ys)

每个 TaskManager 的内存容量 / 每一个 TaskManager 中的 Slot 数量

Slot 的总数量

配置值(-p)

最大并行度数量

Slot 所占用的总内存容量

(p / ys) x ytm

TaskManager 的内存总量

yarn vcore 总数量

p + m (不足则取 Yarn 的最小 vcore 分配数量)

Slot 的总数量 + JobManager 占用的 vcore 数量 (与 Yarn 的 minimum Allocation 有关) 

yarn container 的总数量

(p / ys) + 1

TaskManager 的数量 + JobManager 的数量

yarn 的内存总量

1 x yjm (不足则取 Yarn 的最小 Memory 分配数量) + (p / ys) x ytm

JobManager 的数量 x yjm (与 Yarn 的 minimum Allocation 有关) + TaskManager 的数量 x ytm








配置进程参数


解决数据倾斜


一、 Checkpoint 设置

1、Checkpoint 间隔不要太短

虽然理论上 Flink 支持很短的 checkpoint 间隔,但是在实际生产中,过短的间隔对于底层分布式文件系统而言,会带来很大的压力。另一方面,由于检查点的语义,所以实际上 Flink 作业处理 record 与执行 checkpoint 存在互斥锁,过于频繁的 checkpoint,可能会影响整体的性能。当然,这个建议的出发点是底层分布式文件系统的压力考虑。

2、 合理设置超时时间

默认的超时时间是 10min,如果 state 规模大,则需要合理配置。最坏情况是分布式地创建速度大于单点(job master 端)的删除速度,导致整体存储集群可用空间压力较大。建议当检查点频繁因为超时而失败时,增大超时时间。



并行度(parallelism):保证足够的并行度,并行度也不是越大越好,太多会加重数据在多个solt/task manager之间数据传输压力,包括序列化和反序列化带来的压力。


CPU:CPU资源是task manager上的solt共享的,注意监控CPU的使用。

3、内存:内存是分solt隔离使用的,注意存储大state的时候,内存要足够。

4、网络:大数据处理,flink节点之间数据传输会很多,服务器网卡尽量使用万兆网卡。

Operator Chain

Slot Sharing


Flink 作业的问题定位

六、Flink常见性能问题

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
qYWFTniLcEgF
作者其他文章 更多
最新推荐 更多

2024-05-31