文章目录
- 01 引言
- 02 源码剖析
- 2.1 作业提交脚本
- 2.2 作业提交入口类
- 2.3 local模式提交
- 2.3.1 执行SQL类型作业
- 2.3.2 执行同步类型作业
- 2.4 其它模式提交
- 03 小结&感想
01 引言
博主在上一篇文章《数据集成框架FlinkX(纯钧)入门》,大致讲解了FlinkX
的一些概念,以及举了相关FlinkX
的使用案例。
本文我们继续探索一下FlinkX
的源码以及整个执行流程。阅读前,复制上一篇文章有关FlinkX
的项目目录:
- bin # 存放执行脚本的目录
├── chunjun-docker.sh # Docker 启动脚本
├── chunjun-kubernetes-application.sh # Kubernetes 应用模式启动脚本
├── chunjun-kubernetes-session.sh # Kubernetes 会话模式启动脚本
├── chunjun-local.sh # 本地启动脚本
├── chunjun-standalone.sh # 单机模式启动脚本
├── chunjun-yarn-perjob.sh # YARN 每作业模式启动脚本
├── chunjun-yarn-session.sh # YARN 会话模式启动脚本
├── start-chunjun # 通用启动脚本
└── submit.sh # 提交任务脚本
- build # 构建脚本目录
└── build.sh # 构建脚本
- chunjun-assembly # 汇总装配模块目录
- chunjun-clients # 客户端模块目录
- chunjun-connectors # 连接器模块目录
├── (多个子目录) # 不同的数据连接器子模块
- chunjun-core # 核心模块目录
- chunjun-ddl # 数据定义语言模块目录
├── chunjun-ddl-base # DDL 基础模块
├── chunjun-ddl-mysql # MySQL DDL 模块
├── chunjun-ddl-oracle # Oracle DDL 模块
- chunjun-dev # 开发工具模块目录
├── (多个子目录) # 包含开发用的各种工具和资源
- chunjun-dirty # 脏数据处理模块目录
├── (多个子目录) # 不同的脏数据处理子模块
- chunjun-docker # Docker 相关模块目录
├── (多个子目录) # Docker 相关资源和配置
- chunjun-e2e # 端到端测试模块目录
- chunjun-examples # 示例模块目录
├── json # JSON 示例
└── sql # SQL 示例
- chunjun-local-test # 本地测试模块目录
- chunjun-metrics # 指标监控模块目录
├── (多个子目录) # 包含不同的监控模块
- chunjun-restore # 数据恢复模块目录
├── chunjun-restore-common # 通用数据恢复模块
└── chunjun-restore-mysql # MySQL 数据恢复模块
02 源码剖析
通过阅读官网,可以知道如果要跑一个FlinkX
的任务,一般的都会使用如下命令:
## local模式
sh bin/chunjun-local.sh -job chunjun-examples/json/stream/stream.json
## standalone模式
sh bin/chunjun-standalone.sh -job chunjun-examples/json/stream/stream.json
## yarn session模式
sh ./bin/chunjun-yarn-session.sh -job chunjun-examples/json/stream/stream.json -confProp {\"yarn.application.id\":\"SESSION_APPLICATION_ID\"}
## 其它模式。。。。。
命令主要有以下核心的参数:
参数 |
描述 |
mode |
任务提交的类型,非必填项,类型有:local(默认值),standalone,yarn-session, yarn-per-job,kubernetes-session,kubernetes-application,对应源码中枚举类 ClusterMode; |
jobType |
纯钧任务类型,必填项,同步任务为:sync,SQL计算任务为:sql |
job |
纯钧任务脚本地址,必填项 |
chunjunDistDir |
纯钧插件包地址 |
confProp |
纯钧任务配置参数,Flink相关配置也是在这里配置 |
flinkConfDir |
flink-conf.yaml 地址,在非local模式时,需要配置 |
那么,源码的入口就从启动flink作业的命令开始。
2.1 作业提交脚本
提交FlinkX
作业的脚本都放在了项目的bin
目录下,不同提交模式(local、standlone、session等)里面的脚本,最终执行的都是submit.sh
:
因此,我们看submit.sh命令里面的内容,注释后如下:
#!/usr/bin/env bash
set -e
# 查找 Java 可执行文件的路径
if [ -n "${JAVA_HOME}" ]; then
JAVA_RUN="${JAVA_HOME}/bin/java"
else
if [ `command -v java` ]; then
JAVA_RUN="java"
else
echo "JAVA_HOME is not set" >&2
exit 1
fi
fi
# 注释: 以上代码用于查找 Java 可执行文件的路径,首先检查 JAVA_HOME 环境变量,然后检查系统中是否已经安装了 Java。
# 设置 CHUNJUN_HOME 变量,以确定部署模式
# 1:使用装配分发包文件进行部署
# 2:使用项目包进行部署
CHUNJUN_DEPLOY_MODE=1
if [[ $CHUNJUN_HOME && -z $CHUNJUN_HOME ]];then
export CHUNJUN_HOME=$CHUNJUN_HOME
else
CHUNJUN_HOME="$(cd "`dirname "$0"`"/..; pwd)"
if [ -d "$CHUNJUN_HOME/chunjun-dist" ]; then
CHUNJUN_HOME="$CHUNJUN_HOME/chunjun-dist"
CHUNJUN_DEPLOY_MODE=2
fi
fi
# 注释: 以上代码用于设置 CHUNJUN_HOME 变量,用于确定部署模式。首先检查是否已经设置 CHUNJUN_HOME 变量,如果没有设置,则根据脚本的路径来确定 CHUNJUN_HOME 的值。
# 根据部署模式设置 JAR_DIR 变量
# 1. 在 yarn-session 情况下,无法找到 JAR_DIR
# 2. 在其他情况下,可以找到 JAR_DIR
if [ $CHUNJUN_DEPLOY_MODE -eq 1 ]; then
JAR_DIR=$CHUNJUN_HOME/lib/chunjun-clients.jar:$CHUNJUN_HOME/lib/*
else
JAR_DIR=$CHUNJUN_HOME/../lib/chunjun-clients.jar:$CHUNJUN_HOME/../lib/*
fi
# 注释: 以上代码根据不同的部署模式设置 JAR_DIR 变量,用于指定需要加载的 Java 类库路径。
# 入口类全路径
CLASS_NAME=com.dtstack.chunjun.client.Launcher
# 检查参数中是否包含 ".sql",以确定作业类型
JOBTYPE="sync"
ARGS=$@
if [[ $ARGS == *.sql* ]];
then JOBTYPE="sql"
fi;
echo "
# #
# #
#
##### ###### # # # #### #### # # # ####
# # # # # ## # # # # ## #
# # # # # # # # # # # #
# # # # ## # # # # ## # #
##### # # #### # # # # #### # # #
#
####
Reference site: https://dtstack.github.io/chunjun
chunjun is starting ...
CHUNJUN_HOME is auto set $CHUNJUN_HOME"
# 设置基本参数,用于所有作业
PARAMS="$ARGS -mode $MODE -jobType $JOBTYPE -chunjunDistDir $CHUNJUN_HOME"
# 如果 FLINK_HOME 未设置或不是目录,则忽略 flinkConfDir 参数
if [ ! -z $FLINK_HOME ] && [ -d $FLINK_HOME ];
then
echo "FLINK_HOME is $FLINK_HOME"
PARAMS="$PARAMS -flinkConfDir $FLINK_HOME/conf -flinkLibDir $FLINK_HOME/lib"
else
echo "FLINK_HOME is empty!"
fi
# 如果 HADOOP_HOME 未设置或不是目录,则忽略 hadoopConfDir 参数
if [ ! -z $HADOOP_HOME ] && [ -d $HADOOP_HOME ];
then
echo "HADOOP_HOME is $HADOOP_HOME"
PARAMS="$PARAMS -hadoopConfDir $HADOOP_HOME/etc/hadoop"
else
echo "HADOOP_HOME is empty!"
fi
# 添加一个空行,用于与日志分隔
echo ""
echo "start command: $JAVA_RUN -cp $JAR_DIR $CLASS_NAME $PARAMS"
echo ""
# 执行Java命令
$JAVA_RUN -cp $JAR_DIR $CLASS_NAME $PARAMS
总结:其实submit.sh命令主要的作用就是,执行提交作业的java程序,首先封装好入参(如:flink_home、hadoop_home、依赖jar路径等参数),然后指定程序的入口类(com.dtstack.chunjun.client.Launcher
),最后使用java命令来执行java程序。
ok,接下来看看com.dtstack.chunjun.client.Launcher
这个类做了什么事情。
2.2 作业提交入口类
作业提交入口里在项目chunjun-clients
目录下的Launcher类(com.dtstack.chunjun.client.Launcher
)。
我们来看看它的main
入口方法,这里添加了相关的注释:
/**
* 作业提交命令(sbumit.sh)执行之后,会进入该方法
*
* @author : YangLinWei
* @createTime: 2023/11/8 15:31
* @version: 1.0.0
*/
public static void main(String[] args) throws Exception {
// 1. 解析入参,并设置进Options类
OptionParser optionParser = new OptionParser(args);
Options launcherOptions = optionParser.getOptions();
// 2. 查询并设置flink_home以及hadoop_home的配置进入Options类
findDefaultConfigDir(launcherOptions);
// 3. 解析程序执行参数集合,例如该命令里面的参数: sh bin/chunjun-local.sh -job chunjun-examples/json/stream/stream.json
List<String> argList = optionParser.getProgramExeArgList();
// 将argList转化为HashMap,方便通过参数名称来获取参数值
Map<String, String> commandMap = Maps.newHashMap();
for (int i = 0; i < argList.size(); i += 2) {
commandMap.put(argList.get(i), argList.get(i + 1));
}
// 清空list,填充修改后的参数值
argList.clear();
for (int i = 0; i < commandMap.size(); i++) {
argList.add(commandMap.keySet().toArray()[i].toString());
argList.add(commandMap.values().toArray()[i].toString());
}
// 4. 根据Options类里面的设置的参数以及命令行里面的参数,初始化构建作业发布类。
JobDeployer jobDeployer = new JobDeployer(launcherOptions, argList);
// 5. 根据启动模式,创建不同的提交客户端帮助类
ClusterClientHelper<?> clusterClientHelper = createHelper(launcherOptions.getMode());
// 6. 加载自定义的jar包到当前的类加载器
URLClassLoader urlClassLoader = (URLClassLoader) Launcher.class.getClassLoader();
List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(launcherOptions.getAddjar());
ClassLoaderManager.loadExtraJar(jarUrlList, urlClassLoader);
// 7. 使用不同的提交客户端提交作业
try (ClusterClient<?> client = clusterClientHelper.submit(jobDeployer)) {
if (null != client) {
log.info(client.getClusterId() + " submit successfully.");
}
}
}
通过阅读源码,可以知道该方法主要做了如下几个事情:
- 封装执行作业相关的参数(flink_home、hadoop_home以及命令行的参数);
- 根据不同的模式,生成不同的提交客户端;
- 使用不同的客户端提交作业。
上述的第2个步骤,即根据模式生成提交客户端的代码截图如下:
第3个步骤是最为核心的一个步骤,它是根据第2步骤生成不同的客户端来提交作业的,通过代码提示,可以看到现在支持这几种提交模式:
接下来,分别讲解不同模式下的提交。
2.3 local模式提交
local模式的提交,使用到了LocalClusterClientHelper
,其代码很简单,做了注释之后如下:
/**
* 提交客户端-Local模式
*
* @author : YangLinWei
* @createTime: 2023/11/8 15:41
* @version: 1.0.0
*/
public class LocalClusterClientHelper implements ClusterClientHelper<Void> {
@Override
public ClusterClient<Void> submit(JobDeployer jobDeployer) throws Exception {
// 获取程序参数
String[] args = jobDeployer.getProgramArgs().toArray(new String[0]);
// 执行程序
Main.main(args);
return null;
}
}
继续看执行的细节,具体的方法在 “com.dtstack.chunjun.Main#main
”(友情提示:这个类也可以用作不同模式下的entrypoint的入口程序):
/**
* 执行入口
*
* @author : YangLinWei
* @createTime: 2023/11/8 15:45
* @version: 1.0.0
*/
public static void main(String[] args) throws Exception {
log.info("------------program params-------------------------");
Arrays.stream(args).forEach(arg -> log.info("{}", arg));
log.info("-------------------------------------------");
Options options = new OptionParser(args).getOptions();
String replacedJob = "";
// 获取作业配置
File file = new File(options.getJob());
if (file.isFile()) {
try {
replacedJob = FileUtils.readFileToString(file, StandardCharsets.UTF_8.name());
} catch (IOException ioe) {
log.error("Can not get the job info !!!", ioe);
throw new RuntimeException(ioe);
}
} else {
String job = URLDecoder.decode(options.getJob(), StandardCharsets.UTF_8.name());
replacedJob = JobUtil.replaceJobParameter(options.getP(), job);
}
Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());
// 判断是否为SQL作业类型,如果是,则设置SQL作业的配置
if (EJobType.getByName(options.getJobType()).equals(SQL)) {
options.setSqlSetConfiguration(SqlParser.parseSqlSet(replacedJob));
}
// 初始化TableEnviorment,并注入配置
StreamExecutionEnvironment env = EnvFactory.createStreamExecutionEnvironment(options);
StreamTableEnvironment tEnv =
EnvFactory.createStreamTableEnvironment(env, confProperties, options.getJobName());
log.info(
"Register to table configuration:{}",
tEnv.getConfig().getConfiguration().toString());
// 判断不同的类型,执行作业
switch (EJobType.getByName(options.getJobType())) {
case SQL:
exeSqlJob(env, tEnv, replacedJob, options);
break;
case SYNC:
exeSyncJob(env, tEnv, replacedJob, options);
break;
default:
throw new ChunJunRuntimeException(
"unknown jobType: ["
+ options.getJobType()
+ "], jobType must in [SQL, SYNC].");
}
log.info("program {} execution success", options.getJobName());
}
可以看到,执行local
模式的入口方法主要是初始化了Flink
的TableEnviorment
,以及注册了相关的配置,并根据类型去执行SQL
作业或同步作业。
2.3.1 执行SQL类型作业
执行SQL类型的作业实际代码如下:
/**
* 执行类型为sql的作业
*
* @author : YangLinWei
* @createTime: 2023/11/8 15:50
* @version: 1.0.0
*/
private static void exeSqlJob(
StreamExecutionEnvironment env,
StreamTableEnvironment tableEnv,
String job,
Options options) {
try {
// 配置TableEnviorment环境
configStreamExecutionEnvironment(env, options, null);
List<URL> jarUrlList = ExecuteProcessHelper.getExternalJarUrls(options.getAddjar());
// 根据配置,设置为“批”还是“流”任务
String runMode = options.getRunMode();
if ("batch".equalsIgnoreCase(runMode)) env.setRuntimeMode(RuntimeExecutionMode.BATCH);
// 解析SQL语句并执行
StatementSet statementSet = SqlParser.parseSql(job, jarUrlList, tableEnv);
TableResult execute = statementSet.execute();
// 解决执行批处理作业时YARN模式不退出的问题
Properties confProperties = PropertiesUtil.parseConf(options.getConfProp());
String executionMode =
confProperties.getProperty(
"chunjun.cluster.execution-mode",
ClusterEntrypoint.ExecutionMode.DETACHED.name());
if (!ClusterEntrypoint.ExecutionMode.DETACHED.name().equalsIgnoreCase(executionMode)) {
// 等待作业结束
printSqlResult(execute);
}
// 如果是本地环境,打印作业执行结果
if (env instanceof MyLocalStreamEnvironment) {
Optional<JobClient> jobClient = execute.getJobClient();
if (jobClient.isPresent()) {
PrintUtil.printResult(
jobClient
.get()
.getJobExecutionResult()
.get()
.getAllAccumulatorResults());
}
}
} catch (Exception e) {
throw new ChunJunRuntimeException(e);
}
}
以上的代码:主要做的事情就是设置TableEnviorment的参数,解析并执行Flink SQL。
2.3.2 执行同步类型作业
执行同步类型的作业实际代码如下:
/**
* 执行类型为“同步”的作业
*
* @author : YangLinWei
* @createTime: 2023/11/8 15:58
* @version: 1.0.0
*/
private static void exeSyncJob(
StreamExecutionEnvironment env,
StreamTableEnvironment tableEnv,
String job,
Options options)
throws Exception {
// 解析同步作业配置
SyncConfig config = parseConfig(job, options);
// 配置TableEnvironment环境
configStreamExecutionEnvironment(env, options, config);
// 创建数据源工厂,用于获取输入数据流
SourceFactory sourceFactory = DataSyncFactoryUtil.discoverSource(config, env);
DataStream<RowData> dataStreamSource = sourceFactory.createSource();
// 根据配置设置输入数据流的并行度
SpeedConfig speed = config.getSpeed();
if (speed.getReaderChannel() > 0) {
dataStreamSource =
((DataStreamSource<RowData>) dataStreamSource)
.setParallelism(speed.getReaderChannel());
}
// 添加映射操作符
dataStreamSource = addMappingOperator(config, dataStreamSource);
// 处理CDC(变更数据捕获)相关配置
if (null != config.getCdcConf()
&& (null != config.getCdcConf().getDdl()
&& null != config.getCdcConf().getCache())) {
CdcConfig cdcConfig = config.getCdcConf();
DDLHandler ddlHandler = DataSyncFactoryUtil.discoverDdlHandler(cdcConfig, config);
// 获取DDL处理器和缓存处理器
CacheHandler cacheHandler = DataSyncFactoryUtil.discoverCacheHandler(cdcConfig, config);
// 应用变更数据捕获处理器
dataStreamSource =
dataStreamSource.flatMap(
new RestorationFlatMap(ddlHandler, cacheHandler, cdcConfig));
}
DataStream<RowData> dataStream;
boolean transformer =
config.getTransformer() != null
&& StringUtils.isNotBlank(config.getTransformer().getTransformSql());
if (transformer) {
// 如果存在数据转换操作,将数据流转换为表
dataStream = syncStreamToTable(tableEnv, config, dataStreamSource);
} else {
dataStream = dataStreamSource;
}
// 根据配置设置是否进行数据重平衡
if (speed.isRebalance()) {
dataStream = dataStream.rebalance();
}
// 创建目标端工厂,用于将数据流写入目标
SinkFactory sinkFactory = DataSyncFactoryUtil.discoverSink(config);
DataStreamSink<RowData> dataStreamSink = sinkFactory.createSink(dataStream);
if (speed.getWriterChannel() > 0) {
dataStreamSink.setParallelism(speed.getWriterChannel());
}
// 根据配置设置输出数据流的并行度
JobExecutionResult result = env.execute(options.getJobName());
// 如果是本地环境,打印作业执行结果
if (env instanceof MyLocalStreamEnvironment) {
PrintUtil.printResult(result.getAllAccumulatorResults());
}
}
以上代码主要做了:根据配置初始化TableEnviorment,使用工厂模式去加载源端数据源和目标端数据源,并设置源端(并行度、操作符、ddl变更等)、算子、目标端等。
博主认为这一块是FlinkX区别与其它的框架的一个核心点,里面用到了不少的技术,后续会博主也会单独拎出来讲解(如:脏数据如何处理、ddl变更是如何捕获的、checkpoint增量同步的原理等)
ok,接下来快速的看看其它的几种模式。
2.4 其它模式提交
YarnPerJob模式提交,在com.dtstack.chunjun.client.yarn.YarnPerJobClusterClientHelper#submit
,主要:就是做了配置的初始化,安全认证的处理以及使用yarn的api提交,具体代码如下:
/**
* Yarn per-job模式提交
*
* @author : YangLinWei
* @createTime: 2023/11/8 16:15
* @version: 1.0.0
*/
@Override
public ClusterClient<ApplicationId> submit(JobDeployer jobDeployer) throws Exception {
//获取配置参数
Options launcherOptions = jobDeployer.getLauncherOptions();
String confProp = launcherOptions.getConfProp();
if (StringUtils.isBlank(confProp)) {
throw new IllegalArgumentException("per-job mode must have confProp!");
}
// 获取flink lib目录路径
String libJar = launcherOptions.getFlinkLibDir();
if (StringUtils.isBlank(libJar)) {
throw new IllegalArgumentException("per-job mode must have flink lib path!");
}
// 获取有效的配置
Configuration flinkConfig = jobDeployer.getEffectiveConfiguration();
// 处理安全认证(如:kerberos、simple模式等)
SecurityUtils.install(new SecurityConfiguration(flinkConfig));
// 初始化ClusterSpecification,并使用yarn的api提交作业
ClusterSpecification clusterSpecification = createClusterSpecification(jobDeployer);
try (YarnClusterDescriptor descriptor =
createPerJobClusterDescriptor(launcherOptions, flinkConfig)) {
ClusterClientProvider<ApplicationId> provider =
descriptor.deployJobCluster(
clusterSpecification, new JobGraph("chunjun"), true);
String applicationId = provider.getClusterClient().getClusterId().toString();
String flinkJobId = clusterSpecification.getJobGraph().getJobID().toString();
log.info("deploy per_job with appId: {}}, jobId: {}", applicationId, flinkJobId);
return provider.getClusterClient();
}
}
Kubernetes模式提交,在“com.dtstack.chunjun.client.kubernetes.KubernetesApplicationClusterClientHelper#submit
”,主要的作用就是初始化作业参数配置以及使用flink自带的Kubernetes api去提交作业到Kubernetes。
/**
* 提交kubernetes Application模式的作业
*
* @author : YangLinWei
* @createTime: 2023/11/8 16:19
* @version: 1.0.0
*/
@Override
public ClusterClient<String> submit(JobDeployer jobDeployer) throws Exception {
// 获取并设置部署的配置
Options launcherOptions = jobDeployer.getLauncherOptions();
List<String> programArgs = jobDeployer.getProgramArgs();
Configuration effectiveConfiguration = jobDeployer.getEffectiveConfiguration();
setDeployerConfig(effectiveConfiguration, launcherOptions);
// 设置kubernetes的主机别名
setHostAliases(effectiveConfiguration);
replaceRemoteParams(programArgs, effectiveConfiguration);
ApplicationConfiguration applicationConfiguration =
new ApplicationConfiguration(
programArgs.toArray(new String[0]), PluginInfoUtil.getMainClass());
// 使用flink自带的kubernetes api提交作业
KubernetesClusterClientFactory kubernetesClusterClientFactory =
new KubernetesClusterClientFactory();
try (KubernetesClusterDescriptor descriptor =
kubernetesClusterClientFactory.createClusterDescriptor(effectiveConfiguration)) {
ClusterSpecification clusterSpecification =
getClusterSpecification(effectiveConfiguration);
ClusterClientProvider<String> clientProvider =
descriptor.deployApplicationCluster(
clusterSpecification, applicationConfiguration);
ClusterClient<String> clusterClient = clientProvider.getClusterClient();
log.info("Deploy Application with Cluster Id: {}", clusterClient.getClusterId());
return clusterClient;
}
}
以上大致讲了部署目标为yarn
或者Kubernetes
类型的的作业提交方式,与这两种部署目标类型相关的作业提交类型的操作方式也大同小异,此处不再详述。
03 小结&感想
本文主要从FlinkX
的一个sh
提交命令开始讲解,然后逐步深入到不同的提交类型客户端的java
代码。
可以知道,FlinkX
主要的作用还是做“同步”,相对于其他的Flink
二次开发框架,在简单了解它的官网描述之后,可以知道FlinkX在脏数据处理、增量同步 这两块可能做得是比较有特色,但是关于增量同步是否只适用于“批”类型的作业,博主有空再阅读其源码,再出相关的博文。
其实给我的感觉,“批” 这一块,给我的感觉有点像DataX
,只是底层使用的技术栈不同。
还有就是对比其它的Flink
二次开发框架,很好奇为何不直接使用Flink原生的命令行呢?这样不更好的去与其它的开源框架整合吗?举个情景例子:Flink
批处理作业作为DolphinScheduler
的一个任务节点,那么具体Flink
作业节点执行的方式是官方提供的命令行标准,这样就不会出现一件事情同时多个人(Flink Developer Or Others
)做了。