flinkcdc+hudi0.10+hive(自动同步分区+压缩)
  TEZNKK3IfmPf 2023年11月13日 40 0
版本情况说明:

flink1.13.1+scala2.11+CDH6.3.0 Hadoop3.0.0+Hive2.1.1+hudi0.10


mysql cdc source DDL:

CREATE TABLE `users` (
`id` bigint(20) NOT NULL AUTO_INCREMENT,
`name` varchar(20) DEFAULT NULL,
`birthday` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
`ts` timestamp NOT NULL DEFAULT CURRENT_TIMESTAMP,
PRIMARY KEY (`id`)
) ENGINE=InnoDB AUTO_INCREMENT=64 DEFAULT CHARSET=utf8;

mysql写入数据 生成binlog数据:

insert into users (name) values ('hello2');
insert into users (name) values ('world2');
insert into users (name) values ('flink2');
insert into users (id,name) values (4,'spark');
insert into users (name) values ('hudi');


select * from users;
update users set name = 'luo flinkbj' where id = 60;
delete from users where id = 61;


flink sql :

mysql cdc flink sql DDL:
Flink SQL> CREATE TABLE mysql_users (
> id BIGINT PRIMARY KEY NOT ENFORCED ,
> name STRING,
> birthday TIMESTAMP(3),
> ts TIMESTAMP(3)
> ) WITH (
> 'connector' = 'mysql-cdc',
> 'hostname' = '127.0.0.1',
> 'port' = '3306',
> 'username' = 'root',
> 'password' = 'root',
> 'server-time-zone' = 'Asia/Shanghai',
> 'debezium.snapshot.mode'='initial',
> 'database-name' = 'luo',
> 'table-name' = 'users'
> );

设置ckp:

Flink SQL> set execution.checkpointing.interval=30sec;

增加分区字段,创建一个视图缓存:

Flink SQL> create view my_v AS SELECT *, DATE_FORMAT(birthday, 'yyyyMMdd') as `partition` FROM mysql_users;

-- hudi MOR模式在线压缩测试:
'compaction.async.enable' = 'true'

---- MOR表压缩:
hive 默认生产两张表:
luo_sync_hive03_ro
luo_sync_hive03_rt


flinksql hudi hive 分区表 DDL:

CREATE TABLE luo_sync_hive03(
id bigint ,
name string,
birthday TIMESTAMP(3),
ts TIMESTAMP(3),
`partition` VARCHAR(20),
primary key(id) not enforced --必须指定uuid 主键
)
PARTITIONED BY (`partition`)
with(
'connector'='hudi',
'path' = 'hdfs://nameservice1/tmp/luo/hudi/luo_sync_hive03'
, 'hoodie.datasource.write.recordkey.field' = 'id'
, 'write.precombine.field' = 'ts'
, 'write.tasks' = '1'
, 'compaction.tasks' = '1'
, 'write.rate.limit' = '2000'
, 'table.type' = 'MERGE_ON_READ'
, 'compaction.async.enable' = 'true'
, 'compaction.trigger.strategy' = 'num_commits'
, 'compaction.delta_commits' = '5'
, 'changelog.enable' = 'true'
, 'read.streaming.enable' = 'true'
, 'read.streaming.check-interval' = '4'
, 'hive_sync.enable' = 'true'
, 'hive_sync.metastore.uris' = 'thrift://hadoop:9083'
, 'hive_sync.jdbc_url' = 'jdbc:hive2://hadoop:10000'
, 'hive_sync.table' = 'luo_sync_hive03'
, 'hive_sync.db' = 'luo'
, 'hive_sync.username' = ''
, 'hive_sync.password' = ''
, 'hive_sync.support_timestamp' = 'true'
);

-- flinksql 数据写入 hudi,并自动同步创建hive分区表+自动同步数据:

insert into luo_sync_hive03 select id,name,birthday,ts,`partition` from my_v;

默认 5个chk 触发compaction

mysql 端 多次写入数据:

insert into users (name) values ('hello25');

insert into users (name) values ('world26');

insert into users (name) values ('flink27');

insert into users (name) values ('flink32');


update users set name = 'luo flinkbj' where id = 60;
delete from users where id = 61;


--------- hive shell 查询hive数据:

hive> show partitions luo_sync_hive03_rt;


-- select count 异常处理:

hive> select count(1) from luo_sync_hive03_ro;

Diagnostic Messages for this Task:
Error: java.io.IOException: Split class org.apache.hudi.hadoop.hive.HoodieCombineRealtimeHiveSplit not found
at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:369)
at org.apache.hadoop.mapred.MapTask.runOldMapper(MapTask.java:438)
at org.apache.hadoop.mapred.MapTask.run(MapTask.java:349)
at org.apache.hadoop.mapred.YarnChild$2.run(YarnChild.java:174)
at java.security.AccessController.doPrivileged(Native Method)
at javax.security.auth.Subject.doAs(Subject.java:422)
at org.apache.hadoop.security.UserGroupInformation.doAs(UserGroupInformation.java:1875)
at org.apache.hadoop.mapred.YarnChild.main(YarnChild.java:168)
Caused by: java.lang.ClassNotFoundException: Class org.apache.hudi.hadoop.hive.HoodieCombineRealtimeHiveSplit not found
at org.apache.hadoop.conf.Configuration.getClassByName(Configuration.java:2409)
at org.apache.hadoop.mapred.MapTask.getSplitDetails(MapTask.java:367)
... 7 more


hive> add jar hdfs://nameservice1/tmp/luo/hudi-hadoop-mr-bundle-0.10.0-SNAPSHOT.jar;

hive> set hive.input.format = org.apache.hudi.hadoop.hive.HoodieCombineHiveInputFormat;
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月13日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2023年11月15日   52   0   0 apachehadoopjava
  TEZNKK3IfmPf   2023年11月14日   20   0   0 mysqlHive
  TEZNKK3IfmPf   2023年11月15日   27   0   0 apachehadoop
  TEZNKK3IfmPf   2023年11月14日   22   0   0 hadoop
  TEZNKK3IfmPf   2024年04月26日   63   0   0 hadoopHive
TEZNKK3IfmPf