flink-kafkaconsumer bug解决
  UqrkOCyfkQZc 2023年11月02日 50 0

之前一开始的代码是这样:

Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_BROKER);
        properties.setProperty(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG,org.apache.kafka.common.serialization.StringDeserializer.class.getName());
         properties.setProperty(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, org.apache.kafka.common.serialization.StringDeserializer.class.getName());
        //properties.setProperty("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
        //properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG,"latest");//生产中使用
        properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//测试中使用
        DataStreamSource<String> dataStreamSource = env.addSource(new FlinkKafkaConsumer<String>(orderInfoSourceTopic
                , new ProductBeanJSONDeSerializer(), new Properties()));

总之是在properties里转,各种设置不起作用,始终报错:

org.apache.kafka.common.KafkaException: Failed to construct kafka producer
Kafka Getting error No resolvable bootstrap urls given in bootstrap servers
org.apache.kafka.common.KafkaException: Failed to construct kafka producer
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:432)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:298)
    at oracle.fs.framework.core.transports.event.kafka.KafkaFactory.createProducer(KafkaFactory.java:29)
    at oracle.fs.framework.core.transports.event.kafka.stream.KafkaStreamEventTransport.start(KafkaStreamEventTransport.java:165)
    at oracle.fs.framework.core.service.TransportLifecycleHandler.start(TransportLifecycleHandler.java:57)
    at oracle.fs.framework.core.service.AbstractService.start(AbstractService.java:400)
    at oracle.fs.foundation.bootstrap.Bootstrap.startDomain(Bootstrap.java:555)
    at oracle.fs.foundation.bootstrap.Bootstrap.startDomain(Bootstrap.java:188)
    at oracle.fs.foundation.bootstrap.Bootstrap.startDomain(Bootstrap.java:147)
    at oracle.fs.foundation.bootstrap.Bootstrap.startDomain(Bootstrap.java:102)
    at oracle.fs.service.driver.DomainServiceDriver.startService(DomainServiceDriver.java:24)
    at oracle.fs.service.driver.DomainServiceDriver.main(DomainServiceDriver.java:19)
Caused by: org.apache.kafka.common.config.ConfigException: No resolvable bootstrap urls given in bootstrap.servers
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:88)
    at org.apache.kafka.clients.ClientUtils.parseAndValidateAddresses(ClientUtils.java:47)
    at org.apache.kafka.clients.producer.KafkaProducer.<init>(KafkaProducer.java:407)

一直围绕这两个报错在查找原因,后面自己一个一个排除,每个算子都sout以便,然后看它的输出。结果发现是new flinkKafkaconsumer这一步 报错。接下来就是网上搜查各种版本的这个接口的应用,是new SimpleStringSchema()的报错,还是new properties()的报错。由于上面的排查,所以把原因锁定在new SimpleStringSchema()上。网上查找了几个版本的,经修正后如下:

String orderInfoSourceTopic = "ods_order_info";
        String orderDetailSourceTopic = "ods_order_detail";
        String orderWideSinkTopic = "dwd_order_wide";
        String groupId = "order_wide_group";
   Properties properties = new Properties();
        properties.setProperty(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, Constants.KAFKA_BROKER);
        properties.setProperty(ConsumerConfig.GROUP_ID_CONFIG, groupId);
         properties.setProperty(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG, "earliest");//测试中使用
 
 KafkaSource<OrderInfo> orderInfoKafkaSource = KafkaSource.<OrderInfo>builder()
                .setBootstrapServers(Constants.KAFKA_BROKER)
                .setTopics(orderInfoSourceTopic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.of(new ProductBeanJSONDeSerializer())).build();

        System.out.println(orderInfoKafkaSource);
        DataStreamSource<OrderInfo> dataStreamSource = env.fromSource(orderInfoKafkaSource, WatermarkStrategy.noWatermarks(), "orderInfo kafka Source");
    //自定义反序列化对象orderInfo 的类ProductBeanJSONDeSerializer    
 private static class ProductBeanJSONDeSerializer implements KafkaDeserializationSchema<OrderInfo> {

        private final String encoding = "UTF8";

       
        @Override
        public boolean isEndOfStream(OrderInfo nextElement) {
            return false;
        }

       
        @Override
        public OrderInfo deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            if (record != null) {

                String value = new String(record.value(), encoding);
                OrderInfo orderInfo = JSON.parseObject(value, OrderInfo.class);
                return orderInfo;
            }
            return null;
        }


        @Override
        public TypeInformation<OrderInfo> getProducedType() {
            return TypeInformation.of(OrderInfo.class);
        }
    }

最终才运行正常。这个bug花了1天半的时间。特此记下,主要的经验就是先需要精确定位bug,知道出错的代码在哪个部位。然后知道每一步操作的对象是谁。对象的数据类型是什么。

https://blog.csdn.net/goblinintree/article/details/124268914

https://blog.csdn.net/wflh323/article/details/98055345/

https://blog.csdn.net/weixin_40163498/article/details/115655368

https://blog.csdn.net/lifetragedy/article/details/127141744

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  KRe60ogUm4le   2024年05月03日   55   0   0 javascala
UqrkOCyfkQZc
最新推荐 更多

2024-05-31