之前一直以为需要配置特别的参数来让flink按照sink事务发往不同kafka,参照了不同版本的写法,flink1.13还是使用flinkKafkaproducer接口,flink1.16已经过时,但是还能使用,并且引进新的接口kafkasink.而本次的接口并非是接口的问题,而是令人意想不到的数据源mysql,mysql数据源之前是通过mysql.init脚本初始化,所以以为所有的表都有数据,没有去检查,直到最新想换个思路,看是否跟表的顺序有关,结果发现没有关系,怀疑是表的问题。才去检查表的数据。浪费了2天时间。哎!!!
MySqlSource<String> mySqlSource = MySqlSource.<String>builder()
.hostname("192.168.56.131")
.port(3306)
.serverTimeZone("GMT-4")
.databaseList("gmail") //set captured database
.tableList(FactTableConfig.TABLE_LIST) //set captured table,逗号可以同步多个表
/*.chunkKeyColumn("id") // 这里的分片最好是mysql数据库中的索引字段,否则全量读取会报错
.splitSize(2)*/
.username("root")
.password("root123456")
//.startupOptions(StartupOptions.latest())//事实表只需要实时获取增量数据
.startupOptions(StartupOptions.initial()) //为了测试使用,需要先全量后增量
.deserializer(new JsonDebeziumDeserializationSchema()) //converts Source
//.deserializer(new StringDebeziumDeserializationSchema())
.build();
DataStreamSource<String> streamSource = env.fromSource(mySqlSource, WatermarkStrategy.noWatermarks(), "MySQL Source")
.setParallelism(2);
Properties producerProp = new Properties();
producerProp.setProperty(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_BROKER);
producerProp.setProperty(ProducerConfig.TRANSACTION_TIMEOUT_CONFIG, "60000");
producerProp.setProperty(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
producerProp.setProperty(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG, "org.apache.kafka.common.serialization.ByteArraySerializer");
streamSource.print("source >>>");
streamSource.addSink(new FlinkKafkaProducer<String>("", new KafkaSerializationSchema<String>() {
@Override
public ProducerRecord<byte[], byte[]> serialize(String element, @Nullable Long timestamp) {
JSONObject value = JSON.parseObject(element);
String tableName = value.getString("tableName");
String topicName = "ods_" + tableName;
String after = value.getString("after");
return new ProducerRecord<>(topicName, after.getBytes(StandardCharsets.UTF_8));
}
}, producerProp, FlinkKafkaProducer.Semantic.EXACTLY_ONCE
));
// 执行
env.execute("FactOdsJob");
}
}
参考文献:
https://nightlies.apache.org/flink/flink-docs-release-1.16/zh/docs/connectors/datastream/kafka/#topic--partition-%E8%AE%A2%E9%98%85
https://blog.51cto.com/u_15277063/5237922
https://blog.csdn.net/Smille_to_life/article/details/130188158