flink1.16 mysql sinkto kafka
  UqrkOCyfkQZc 2023年11月02日 82 0

之前一直以为需要配置特别的参数来让flink按照sink事务发往不同kafka,参照了不同版本的写法,flink1.13还是使用flinkKafkaproducer接口,flink1.16已经过时,但是还能使用,并且引进新的接口kafkasink.而本次的接口并非是接口的问题,而是令人意想不到的数据源mysql,mysql数据源之前是通过mysql.init脚本初始化,所以以为所有的表都有数据,没有去检查,直到最新想换个思路,看是否跟表的顺序有关,结果发现没有关系,怀疑是表的问题。才去检查表的数据。浪费了2天时间。哎!!!

MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
                .hostname("192.168.56.131")
                .port(3306)
                .serverTimeZone("GMT-4")
                .databaseList("gmail")  //set captured database
                .tableList(FactTableConfig.TABLE_LIST)   //set captured table,逗号可以同步多个表
                /*.chunkKeyColumn("id")  // 这里的分片最好是mysql数据库中的索引字段,否则全量读取会报错
                .splitSize(2)*/
                .username("root")
                .password("root123456")
                //.startupOptions(StartupOptions.latest())//事实表只需要实时获取增量数据
                .startupOptions(StartupOptions.initial()) //为了测试使用,需要先全量后增量
                .deserializer(new JsonDebeziumDeserializationSchema())  //converts Source
                //.deserializer(new StringDebeziumDeserializationSchema())
                .build();
     DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
                .setParallelism(2);
        Properties producerProp = new Properties();
        producerProp.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_BROKER);
        producerProp.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");
        producerProp.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        producerProp.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
        streamSource.print("source >>>");
       streamSource.addSink(new FlinkKafkaProducer<String>("", new KafkaSerializationSchema<String>() {

            @Override
            public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
                JSONObject value = JSON.parseObject(element);
                String tableName = value.getString("tableName");
                String topicName = "ods_" + tableName;
                String after = value.getString("after");
                return new ProducerRecord<>(topicName, after.getBytes(StandardCharsets.UTF_8));
            }
        }, producerProp, FlinkKafkaProducer.Semantic.EXACTLY_ONCE
        ));


        // 执行
        env.execute("FactOdsJob");
    }
}


参考文献:

https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/connectors/datastream/kafka/#topic--partition-%E8%AE%A2%E9%98%85

https://blog.51cto.com/u_15277063/5237922

https://blog.csdn.net/Smille_to_life/article/details/130188158

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  KRe60ogUm4le   2024年05月31日   101   0   0 flink大数据
  KRe60ogUm4le   2024年05月31日   31   0   0 flink大数据
UqrkOCyfkQZc
最新推荐 更多

2024-05-31