Seatunnel实践及相关报错总结
  HjmbmEbaDYXJ 2023年11月02日 91 0

写在前面:本人也属于小白,这篇文章只是个人使用和总结,可能有些地方理解片面或者有误,各位大神看到的话,可以留言指正,一定认真学习。同时发出来也只是想自己能做个笔记,便于后期整理。如果能帮刚使用seatunnel的朋友避一些坑,那就最好不过了。

1、语法模块(官方文档摘抄)

1.1 source 和 sink源

仅列出目前我所需要的,基本所有数据源都可兼容,列出只是为了明确需要下载的connector的jar包

source:
hive、jdbc、maxcomputer、localfile、mysqlcdc、ossfile、rocketMQ、sftpfile、socket

sink:console、datahub、mysql、sftpfile、hive、jdbc、localfile、maxcomputer、ossfile、socket、rocketMQ
1.2 source 和 sink 相关写法
# 固定环境变量,我使用的是本地模式
env {
  execution.parallelism = 2
  job.mode = "STREAMING"
  checkpoint.interval = 2000
}
# socket
source {
    Socket {
        host = "1**.***.**.**1"
        port = 9999
    }
}

# Maxcompute
source {
    Maxcompute {
        accessId="*********************"
		accesskey="****************************"
		endpoint="http://service.cn.maxcompute.aliyun.com/api"
		project="professional_test_dev"
		table_name="test_person__info"
		partition_spec="pt=20230628"
		split_row=10000
		fields=[name,age,address]
    }
}

# oss文件
source {
    OssFile {
		path = "/seatunnel/sink/age=20/"
   	 	bucket="oss://you_bucket_name"
    	access_key="*********************"
    	access_secret="****************************"
		endpoint="oss-cn-shanghai.aliyuncs.com"
		file_format_type = "text"
		# 字段映射
		read_columns=["name","age","address","start_date"]
		# 是否按照路径解析分区,需要注意,启用的话,sink端需要多出来一个字段和这个分区字段映射
		parse_partition_from_path=true
		# 跳过头几行(去除行头的作用)
		skip_header_row_number=1
		# 什么格式的字符串需要被解析成日期
		date_format="yyyy-MM-dd"
		# 字段分隔符
		delimiter="\t"
		# 读的所有列
		schema {
			fields {
				name = string 
				age = int
				address = string
				start_date = date
			}
		}
    }
}
# socket
sink {
    Socket {
        host = "1**.***.**.**1"
        port = 8888
    }
}

# OssFile
sink { 
  OssFile {
    path="/seatunnel/sink"
    bucket="oss://you_bucket_name"
    access_key="*********************"
    access_secret="****************************"
    endpoint="oss-cn-shanghai.aliyuncs.com"
    file_format_type = "text"
    field_delimiter = "\t"
    row_delimiter = "\n"
    sink_columns = ["name","age"]
  }
}

# DataHub
sink {
    DataHub {
		source_table_name="test_split_table"
    endpoint="https://dh-cn-shanghai.aliyuncs.com"
		accessId="="*********************""
		accessKey="****************************"
		project="you_project_name"
		topic="test_seatunnel_socket"
		timeout=3000
		retryTimes=3
    }
}

# mysql
sink {
	jdbc {
		# url中得参数rewriteBatchedStatements=true  批量执行,不然mysql会一条一条执行,性能低
		url="jdbc:mysql://host/test?serverTimezone=GMT%2b8&useSSL=false&rewriteBatchedStatements=true"
		driver="com.mysql.cj.jdbc.Driver"
		user="root"
		password="123456"	
		primary_keys=["name", "age"]
		# upsert功能,性能低,唯有数据库不具upsert功能时才启用,mysql启用报错Duplicate entry '*' for key 'PRIMARY'
		# support_upsert_by_query_primary_key_exist=true
		# 可以不写query语句,根据generate_sink_sql=true参数根据下面的数据库和表名自动生成SQL语句,运行报错,未知
		database="test"
		table="clmp_user"
		# mysql的upsert写法
		query = """
			insert into clmp_user(name,age,address,date,pt) values
			(?,?,?,?,?)
			ON DUPLICATE KEY UPDATE
			name=values(name),
			age=values(age),
			address=values(address),
			date=values(date),
			pt=values(pt);
		"""
		# 连接超时时间
		connection_check_timeout_sec=100
		# 重试提交失败的次数,设置精确一次的话,这个需要设置为0,不然可能造成重复
		max_retries=0
		# 对于批处理写入,当缓冲记录的数量达到batch_size或时间达到batch_interval_ms时,数据将被刷新到数据库中
		batch_size=10000
		batch_interval_ms=60000
		# 精确一次,开启的话,必须要定义xa_data_source_class_name,mysql版本需要大于等于8.0.29
		# is_exactly_once=true
		# xa_data_source_class_name="com.mysql.cj.jdbc.MysqlXADataSource"
		# 事务提交失败的重试次数
		max_commit_attempts=10
		# 事务打开后的超时,默认值是-1(从不超时)。注意,设置超时可能会影响一次语义
		transaction_timeout_sec=-1
		# 启用自动事务提交,默认为true
		auto_commit=true
	}
}
1.3 SQL Functions

https://seatunnel.apache.org/docs/2.3.1/transform-v2/sql-functions

1.4 命令行参数

Usage: seatunnel.sh [options]

命令

作用

--async

异步运行作业,当作业提交,客户端会退出(默认值:false)

-can, --cancel-job

按JobId取消作业

--check

是否检查config(默认:false)

-cj, --close-job

关闭客户端任务也将被关闭(默认值:true)

-cn, --cluster

cluster名称

-c, --config

Config文件

--decrypt

解密配置文件,当--decrypt --encrypt两者都被指定,仅--encrypt生效(默认值:false)

-m, --master, -e, --deploy-mode

SeaTunnel job submit master, support[local, cluster] (default: cluster)

--encrypt

解密配置文件,当--decrypt --encrypt两者都被指定,仅--encrypt生效(默认值:false)

-h, --help

Show the usage message

-j, --job-id

通过JobId获取作业状态

-l, --list

列出作业状态(默认:false)

--metrics

通过JobId获取作业指标

-n, --name

SeaTunnel job name (default: SeaTunnel)

-r, --restore

通过jobId恢复保存点

-s, --savepoint

通过jobId保存点作业

-i, --variable

变量替换,例如-i City =beijing,或者-i date=20190318(默认值:[])

2、实践问题模块

2.1 快速开始案例存在问题

如果想v2.batch.config.template这个快速案例运行成功的话,需要添加下面链接的两个jar包至seatunnel安装路径/lib目录下(版本取决于你hadoop的版本,我这不使用hadoop,所以就直接和seatunnel版本一致了)下载地址

seatunnel-hadoop3-3.1.4-uber-2.3.2.jar seatunnel-hadoop3-3.1.4-uber-2.3.2-optional.jar

2.2 socket连接失败

编写文件的时候,所有的字符串指标都需要使用双引号,单引号只能用在引用变量上,如果文件的某个指标内容过长需要换多行书写,需要用三引号引住内容,且三引号内不能引用变量,如果非要引用,可以断开使用多个三引号拼接语句,中间穿插引用变量,变量在启动任务是,使用 -i 赋值(-i date=20230627);注意:引用变量在本地模式不可用,只有flink或者spark引擎是才可以

# 举例:多行书写且引用变量
var = """
			your string 1
			"""${you_var}""" your string 2"""
			
# 举例:语句内应用变量
transform {
		sql {
			query = "select * from user_view where city ='"${city}"' and dt = '"${date}"'"
		}
	}
	
# 举例:变量启动赋值
/app/clmp/seatunnel-2.3.2/bin/seatunnel.sh --config /root/test_seatunnel/ --driver-memory 4g -e local -i date=20230627
2.3 写入oss报错
# 报错信息主要是两块
1、 java.lang.RuntimeException: java.lang.ClassNotFoundException: Class org.apache.hadoop.fs.aliyun.oss.AliyunOSSFileSystem not found
2、 org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: org.apache.seatunnel.engine.server.checkpoint.CheckpointException: Aggregate commit error.
# 第一个感觉好像oss的连接jar包不存在,但排查后发现是存在的,之后在github上搜索到一个问题和我情况比较类似,最终解决方法是把hadoop-aliyun-2.7.2.jar放入seatunnel-2.3.2/lib,之后确实解决了,github上的情况是类加载器有问题
# 注意:如果是私有云,除了放jar包之外,还需要关闭cname,不然会出现一个新的报错SignatureDoesNotMatch                     怎么关闭cname方法可以咨询阿里云驻场或者提工单解决,我这里解决的方式是 ping 不加http://的oss地址,返回的结果里有一个IP,使   用这个IP代替原先的oss地址即可
2.4 使用jdbc连接mysql报错
Caused by: org.apache.seatunnel.api.common.PrepareFailException: ErrorCode:[API-01], ErrorDescription:[Configuration item validate failed] - PluginName: jdbc, PluginType: source, Message: com.mysql.cj.jdbc.exceptions.CommunicationsException: Communications link failure

The last packet sent successfully to the server was 0 milliseconds ago. The driver has not received any packets from the server.

# 这个原因比较多,我目前遇到两种情况
# 1、使用jdbc连接数据库时,您必须自己提供数据库驱动程序,复制到$SEATNUNNEL_HOME/plugins/jdbc/lib/目录以使其工作。例如,如果你使用MySQL,应该下载并复制MySQL -connector-java-xxx.jar到$SEATNUNNEL_HOME/plugins/jdbc/lib/,正常放后就可以了
# 2、SSL连接原因;MySQL在高版本需要指明是否进行SSL连接,默认是开启的。所以只需要在配置文件中的url后面加上&useSSL=false即可如:url="jdbc:mysql://local_host/test?serverTimezone=GMT%2b8&useSSL=false"
Caused by: org.apache.seatunnel.api.common.PrepareFailException: ErrorCode:[API-01], ErrorDescription:[Configuration item validate failed] - PluginName: jdbc, PluginType: source, Message: java.lang.ClassNotFoundException: com.mysql.cj.jdbc.Driver

# 首先需要注意,官网提供的驱动类名称为com.mysql.cj.jdbc.Driver,但是这个应该是8以上的驱动包。本人第一次使用的是mysql-connector-java-5.1.47.jar,他的驱动类名称就不是这个,而是com.mysql.jdbc.Driver。如果直接使用官方提供的就会报上面的错误
2.5 关于尝试连接impala的总结

seatunnel本身时没有impala做为source或者sink的,但是impala是保留了hive和jdbc的连接方式,所以我准备尝试使用这两种方式连接

2.5.1 使用jdbc连接

org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException:
ErrorCode: [JDBC-06], ErrorDescription: [No suitable dialect factory found] - Could not find any jdbc dialect factory that can handle url '' that implements "org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.JdbcDialectFactory" in the classpath
Available factories are:
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.db2.DB2DialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.dm.DmdbDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jabc.internal.dialect.gbase&a.GbaseBaDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.greenplum.GreenplunDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.mysql.My5qDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.oracle.OracleDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.phoenix.PhoenixDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.psql.PostgresDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.redshift.RedshiftDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.saphana.SapHanaDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.snowflake.SnowflakeDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlite.SqliteDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.sqlserver.SqlServerDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.tablestorel.TablestoreDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.teradata.TeradataDialectFactory 
org.apache.seatunnel.connectors.seatunnel.jdbc.internal.dialect.vertica.VerticaDialectFactory

# 结合报错和源码来看,目前jdbc的连接因为没有包含impala这个数据源类型,所以在调用jdbc这种连接器时,无法解析url、创建类加载器时失败。报错内容后面列出了目前可以使用jdbc连接的所有数据源

2.5.2 使用hive连接

2023-07-12 16:14:47,905 WARN org.apache.hadoop.util.NativeCodeLoader - Unable to load native-hadoop library for your platform...using builtin-java classes where applicable
2023-07-1216:14:47,983 WARN hive metastore - setlugi() not successful,Likely cause: new client talking to old server. Continuing without it. org.apache.thrift. TApplicationException: Invalid method name:'set ugit'
	at org.apache.thrift. TApplicationException.read (TApplicationException.java: 111) ~ [hive-exec-2.3.9.jar: 2.3.9] 
	at org.apache.thrift.TServiceClient. receiveBase (TServiceClient.java: 79) ~ [hive-exec-2.3.9.jar: 2.3.9]


2023-07-1216:14:51,089 ERROR org.apache.seatunnel.core.starter.SeaTunne1 - Exception StackTrace:org.apache.seatunnel.core.starter.exception.CommandExecuteException:SeaTunneljobexecutedfailed
	at org.apache.seatunnel.core.starter.seatunnel.command.ClientExecuteCommand.execute (ClientExecuteCommand. java: 188) 
	at org. apache.seatunnel.core.starter.SeaTunnel.run (SeaTunnel.java:40) 
	at org.apache. seatunnel.core.starter.seatunnel.SeaTunnelClient.main (SeaTunnelClient.java: 34)
Caused by: org.apache.thrift.TApplicationException: Invalid method name: 'get_table'
	at org.apache.thrift.TApplicationException.read (TApplicationException.java: 111) 
	at org.apache.thrift.TServiceClient.receiveBase (TServiceClient.java: 79)

# 第一次使用报错信息是这样,提示新客户端访问老版服务,且后面还有一个报错为get_table这个方法不存在,更能确定时版本的问题。这里主要是因为用seatunnelengine,需要把seatunnel-hadoop3-3.1.4-uber.jar和hive-exec-2.3.9.jar放在$SEATUNNEL_HOME/lib/目录下。这个hadoop和hive的版本需要和impala中的相应版本对应。

2.5.3 解决方式

按照源码该路径下(seatunnel-dev\seatunnel-dev\seatunnel-connectors-v2\connector-jdbc\src\main\java\org\apache\seatunnel\

connectors\seatunnel\jdbc\internal\dialect\mysql)的代码重写4个方法,编写一个hive的jdbc连接器。路径定义为hive-jdbc-connector

\src\main\java\org\apache\seatunnel\connectors\seatunnel\jdbc\hive\internal\dialect\hive。之后单独打包,放入seatunnel_home

/pligins/jdbc/lib目录下。之后再运行代码,可能会出现jar包冲突的问题,根据报错解决依赖的冲突后重新打包即可

# 冲突报错举例 TTransport 即为冲突的内容:
Exception in thread "main" java.1ang.LinkageError:loader constraint violation: loader (instance of sun/misc/Launcher$AppClassLoader) previously initiated loading for a different type with name "org/apache/thrift/transport/TTransport"
2.6 oss - mysql同步数据报错
Caused by: org.apache.seatunnel.engine.common.exception.SeaTunnelEngineException: java.lang.RuntimeException: java.lang.IllegalArgumentException
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:207)
at org.apache.seatunnel.engine.server.task.flow.SinkFlowLifeCycle.received(SinkFlowLifeCycle.java:59)
...
Caused by: java.lang.IllegalArgumentException
at com.google.common.base.Preconditions.checkArgument(Preconditions.java:127)
at org.apache.seatunnel.connectors.seatunnel.jdbc.internal.executor.FieldNamedPreparedStatement.
prepareStatement(FieldNamedPreparedStatement.java:639)
# 查看源码 639行代码如下,本人对于JAVA不是很熟,所以前面报错得源码部分没看懂,没发现问题在哪
checkArgument(parameterMap.size() == fieldNames.length);
# 这个说明和字段有关系,再看我写的配置文件,我使用了generate_sink_sql=true参数,可能是这个参数得原因,所以我去除后使用query参数代替,解决问题
Caused by: org.apache.seatunnel.connectors.seatunnel.jdbc.exception.JdbcConnectorException: ErrorCode:[COMMON-10], ErrorDescription:[Flush data operation that in sink connector failed] - java.sql.SQLException: Parameter index out of range (5 > number of parameters, which is 4).
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:129)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:97)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:89)
at com.mysql.cj.jdbc.exceptions.SQLError.createSQLException(SQLError.java:63)
at com.mysql.cj.jdbc.ClientPreparedStatement.checkBounds(ClientPreparedStatement.java:1373)
at com.mysql.cj.jdbc.ClientPreparedStatement.getCoreParameterIndex(ClientPreparedStatement.java:1386)
at com.mysql.cj.jdbc.ClientPreparedStatement.setString(ClientPreparedStatement.java:1753)
# 这个报错得原因很明确,就是注入SQL写得有问题。很多时候是因为使用引号,导致系统把字段识别成了字符串。所以写注入SQL时,字符类型得字段也不需要套引号。我这个报错得原因是少穿了参数,因为我OSS做为source,同时启用了parse_partition_from_path=true这个参数,这个时候是需要多一个字段来映射分区得,但是我没传,所以报错少一个字段
2.7 maven依赖缺失解决
# 正常情况maven正常下载就可以,但是有些依赖因为各种原因下载不下来,这个时候需要我们自己下载jar包并安装。jar包下载后,使用cmd进入命令行,切换至jar包所在目录下,执行命令 
mvn install:install-file -Dfile=jindo-sdk-4.6.1.jar -DgroupId=com.aliyun.jindodata -DartifactId=jindosdk -Dversion=4.6.1 -Dpackaging=jar
# 配置说明: Dfile jar包名称      DgroupId groupId        DartifactId artifactId         Dversion version  依赖下载下来直接放入本地maven仓库是不行的,java还是无法识别这个maven,必须执行命令安装
2.8 比较小的问题

2.4.1 我目前测试都是在虚拟机上编辑文件通过命令行启动本地模式执行,但是好几次因为大小写的问题出现报错,报错内容大致为某个必需指标未赋值,尤其是需要AK、SK的源,大小写很奇怪;这个问题我不是很确定,只是遇到过,但是没有时间继续在这上面深究,所以不一定正确,可以作为一个排查问题的思路

2.4.2 字段顺序需要对应,及source写的字段顺序如果为 fields=[name,age,address] 的话,sink的字段顺序也要按这个顺序写,不会自动按字段名称来对应

3、官方相关报错参考

链接:https://seatunnel.apache.org/docs/2.3.2/connector-v2/Error-Quick-Reference-Manual/

格式类似下面这样,可以帮助精确报错原因,更快定位问题

代码

描述

解决方案

API-01

配置项验证失败

当用户遇到此错误代码时,通常是由于用户配置的连接器参数有问题,请检查连接器文档并更正参数

API-02

选项项验证失败

-

API-03

目录初始化失败

当用户遇到此错误代码时,通常是因为连接器初始化目录失败,请检查连接器连接器选项是否正确

API-04

数据库不存在

当用户遇到此错误代码时,通常是因为您要访问的数据库不存在,请仔细检查数据库是否存在

API-05

表不存在

当用户遇到此错误代码时,通常是因为您要访问的表不存在,请仔细检查该表是否存在

API-06

工厂初始化失败

当用户遇到此错误代码时,通常是因为jar包依赖有问题,请检查您本地的SeaTunnel安装包是否完整

API-07

数据库已经存在

当用户遇到此错误代码时,表示您要创建的数据库已经存在,请删除数据库并重试

API-08

表已存在

当用户遇到此错误代码时,说明您要创建的表已经存在,请删除表后重试

4、目前本人正在解决的问题

1、maxcomputer to datahub 数据分字段插入,不安条数插入

2、使用本地模式 -i 传递参数一直失败,目前分区动态传入均报错

有知道的大佬也可以告知下,关于代码就是我1.1部分的代码,十分感谢!

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

HjmbmEbaDYXJ
作者其他文章 更多
最新推荐 更多

2024-05-31