通过flink KafkaDeserializationSchema实现多topic消费kafka时反序列化
  UqrkOCyfkQZc 2023年11月02日 64 0

背景:

在ods->dwd时,由于需要同时对pojo类型的kafka多流(消费多个topic)数据消费时需要操作类似的反序列化,所以需要先将此操泛型化。加入class这个参数。即将操作对象泛型化。

原来的分topic反序列化:

KafkaSource<OrderInfo> orderInfoKafkaSource = KafkaSource.<OrderInfo>builder()
                .setBootstrapServers(Constants.KAFKA_BROKER)
                .setTopics(orderInfoSourceTopic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.of(new ProductBeanJSONDeSerializer())).build();

KafkaSource<OrderDetail> orderDetailKafkaSource = KafkaSource.<OrderDetail>builder()
                .setBootstrapServers(Constants.KAFKA_BROKER)
                .setTopics(orderDetailSourceTopic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.of(new ProductOrderDetailJSONDeSerializer())).build();


private static class ProductBeanJSONDeSerializer implements KafkaDeserializationSchema<OrderInfo> {

        private final String encoding = "UTF8";


        @Override
        public boolean isEndOfStream(OrderInfo nextElement) {
            return false;
        }


        @Override
        public OrderInfo deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            if (record != null) {

                String value = new String(record.value(), encoding);
                OrderInfo orderInfo = JSON.parseObject(value, OrderInfo.class);
                return orderInfo;
            }
            return null;
        }


        @Override
        public TypeInformation<OrderInfo> getProducedType() {
            return TypeInformation.of(OrderInfo.class);
        }
    }
    private static class ProductOrderDetailJSONDeSerializer implements KafkaDeserializationSchema<OrderDetail> {

        private final String encoding = "UTF8";


        @Override
        public boolean isEndOfStream(OrderDetail nextElement) {
            return false;
        }


        @Override
        public OrderDetail deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
            if (record != null) {

                String value = new String(record.value(), encoding);
                OrderDetail OrderDetail = JSON.parseObject(value, OrderDetail.class);
                return OrderDetail;
            }
            return null;
        }


        @Override
        public TypeInformation<OrderDetail> getProducedType() {
            return TypeInformation.of(OrderDetail.class);
        }
    }
}

后面抽取出来后是:

package com.flink.util;

import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;

public class ProductJSONDeSerializerUtil<T> implements KafkaDeserializationSchema<T> {


    private Class<T> tClass;

    private final String encoding = "UTF8";

    public ProductJSONDeSerializerUtil(Class<T> tClass) {
        this.tClass=tClass;

    }

    /**
     * Method to decide whether the element signals the end of the stream. If true is returned the
     * element won't be emitted.
     *
     * @param Element The element to test for the end-of-stream signal.
     * @return True, if the element signals end of stream, false otherwise.
     */
    @Override
    public boolean isEndOfStream(T Element) {
        return false;
    }

    /**
     * Deserializes the Kafka record.
     *
     * @param record Kafka record to be deserialized.
     * @return The deserialized message as an object (null if the message cannot be deserialized).
     */
    @Override
    public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
         if(record!=null){
        String value = new String(record.value(), encoding);
         JSONObject jsonObject = JSONObject.parseObject(value);
         return JSON.parseObject(jsonObject.toString(),this.tClass);
     }
        return null;
    }

    /**
     * Gets the data type (as a {@link TypeInformation}) produced by this function or input format.
     *
     * @return The data type produced by this function or input format.
     */
    @Override
    public TypeInformation<T> getProducedType() {
        return TypeInformation.of(this.tClass);
    }
}
 KafkaSource<OrderInfo> orderInfoKafkaSource = KafkaSource.<OrderInfo>builder()
                .setBootstrapServers(Constants.KAFKA_BROKER)
                .setTopics(orderInfoSourceTopic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.of(new ProductJSONDeSerializerUtil<>(OrderInfo.class))).build();
KafkaSource<OrderDetail> orderDetailKafkaSource = KafkaSource.<OrderDetail>builder()
                .setBootstrapServers(Constants.KAFKA_BROKER)
                .setTopics(orderDetailSourceTopic)
                .setGroupId(groupId)
                .setStartingOffsets(OffsetsInitializer.earliest())
                .setDeserializer(KafkaRecordDeserializationSchema.of(new ProductJSONDeSerializerUtil<>(OrderDetail.class))).build();

这样就就能把所有topic消费的反序列化操作专门放在一个方法里。

参考文献:

https://blog.csdn.net/xianpanjia4616/article/details/117849943

https://blog.csdn.net/xianpanjia4616?type=blog

https://blog.csdn.net/xianpanjia4616/article/details/112970428?spm=1001.2014.3001.5502

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  F36IaJwrKLcw   2023年12月23日   42   0   0 idesparkidesparkDataData
  dpoUgXS1q0aA   2023年12月12日   29   0   0 JSONJSON数据数据
UqrkOCyfkQZc
最新推荐 更多

2024-05-31