之前一开始的代码是这样:
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_BROKER);
properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer.class.getName());
properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
//properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
//properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//生产中使用
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//测试中使用
DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<String>(orderInfoSourceTopic
, new ProductBeanJSONDeSerializer(), new Properties()));
总之是在properties里转,各种设置不起作用,始终报错:
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
Kafka Getting error No resolvable bootstrap urls given in bootstrap servers
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
at oracle.fs.framework.core.transports.event.kafka.KafkaFactory.createProducer(KafkaFactory.java:29)
at oracle.fs.framework.core.transports.event.kafka.stream.KafkaStreamEventTransport.start(KafkaStreamEventTransport.java:165)
at oracle.fs.framework.core.service.TransportLifecycleHandler.start(TransportLifecycleHandler.java:57)
at oracle.fs.framework.core.service.AbstractService.start(AbstractService.java:400)
at oracle.fs.foundation.bootstrap.Bootstrap.startDomain(Bootstrap.java:555)
at oracle.fs.foundation.bootstrap.Bootstrap.startDomain(Bootstrap.java:188)
at oracle.fs.foundation.bootstrap.Bootstrap.startDomain(Bootstrap.java:147)
at oracle.fs.foundation.bootstrap.Bootstrap.startDomain(Bootstrap.java:102)
at oracle.fs.service.driver.DomainServiceDriver.startService(DomainServiceDriver.java:24)
at oracle.fs.service.driver.DomainServiceDriver.main(DomainServiceDriver.java:19)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:407)
一直围绕这两个报错在查找原因,后面自己一个一个排除,每个算子都sout以便,然后看它的输出。结果发现是new flinkKafkaconsumer这一步 报错。接下来就是网上搜查各种版本的这个接口的应用,是new SimpleStringSchema()的报错,还是new properties()的报错。由于上面的排查,所以把原因锁定在new SimpleStringSchema()上。网上查找了几个版本的,经修正后如下:
String orderInfoSourceTopic = "ods_order_info";
String orderDetailSourceTopic = "ods_order_detail";
String orderWideSinkTopic = "dwd_order_wide";
String groupId = "order_wide_group";
Properties properties = new Properties();
properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_BROKER);
properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//测试中使用
KafkaSource<OrderInfo> orderInfoKafkaSource = KafkaSource.<OrderInfo>builder()
.setBootstrapServers(Constants.KAFKA_BROKER)
.setTopics(orderInfoSourceTopic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new ProductBeanJSONDeSerializer())).build();
System.out.println(orderInfoKafkaSource);
DataStreamSource<OrderInfo> dataStreamSource = env.fromSource(orderInfoKafkaSource, WatermarkStrategy.noWatermarks(), "orderInfo kafka Source");
//自定义反序列化对象orderInfo 的类ProductBeanJSONDeSerializer
private static class ProductBeanJSONDeSerializer implements KafkaDeserializationSchema<OrderInfo> {
private final String encoding = "UTF8";
@Override
public boolean isEndOfStream(OrderInfo nextElement) {
return false;
}
@Override
public OrderInfo deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (record != null) {
String value = new String(record.value(), encoding);
OrderInfo orderInfo = JSON.parseObject(value, OrderInfo.class);
return orderInfo;
}
return null;
}
@Override
public TypeInformation<OrderInfo> getProducedType() {
return TypeInformation.of(OrderInfo.class);
}
}
最终才运行正常。这个bug花了1天半的时间。特此记下,主要的经验就是先需要精确定位bug,知道出错的代码在哪个部位。然后知道每一步操作的对象是谁。对象的数据类型是什么。
https://blog.csdn.net/goblinintree/article/details/124268914
https://blog.csdn.net/wflh323/article/details/98055345/
https://blog.csdn.net/weixin_40163498/article/details/115655368