背景:
在ods->dwd时,由于需要同时对pojo类型的kafka多流(消费多个topic)数据消费时需要操作类似的反序列化,所以需要先将此操泛型化。加入class这个参数。即将操作对象泛型化。
原来的分topic反序列化:
KafkaSource<OrderInfo> orderInfoKafkaSource = KafkaSource.<OrderInfo>builder()
.setBootstrapServers(Constants.KAFKA_BROKER)
.setTopics(orderInfoSourceTopic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new ProductBeanJSONDeSerializer())).build();
KafkaSource<OrderDetail> orderDetailKafkaSource = KafkaSource.<OrderDetail>builder()
.setBootstrapServers(Constants.KAFKA_BROKER)
.setTopics(orderDetailSourceTopic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new ProductOrderDetailJSONDeSerializer())).build();
private static class ProductBeanJSONDeSerializer implements KafkaDeserializationSchema<OrderInfo> {
private final String encoding = "UTF8";
@Override
public boolean isEndOfStream(OrderInfo nextElement) {
return false;
}
@Override
public OrderInfo deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (record != null) {
String value = new String(record.value(), encoding);
OrderInfo orderInfo = JSON.parseObject(value, OrderInfo.class);
return orderInfo;
}
return null;
}
@Override
public TypeInformation<OrderInfo> getProducedType() {
return TypeInformation.of(OrderInfo.class);
}
}
private static class ProductOrderDetailJSONDeSerializer implements KafkaDeserializationSchema<OrderDetail> {
private final String encoding = "UTF8";
@Override
public boolean isEndOfStream(OrderDetail nextElement) {
return false;
}
@Override
public OrderDetail deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if (record != null) {
String value = new String(record.value(), encoding);
OrderDetail OrderDetail = JSON.parseObject(value, OrderDetail.class);
return OrderDetail;
}
return null;
}
@Override
public TypeInformation<OrderDetail> getProducedType() {
return TypeInformation.of(OrderDetail.class);
}
}
}
后面抽取出来后是:
package com.flink.util;
import com.alibaba.fastjson.JSON;
import com.alibaba.fastjson.JSONObject;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.connectors.kafka.KafkaDeserializationSchema;
import org.apache.kafka.clients.consumer.ConsumerRecord;
public class ProductJSONDeSerializerUtil<T> implements KafkaDeserializationSchema<T> {
private Class<T> tClass;
private final String encoding = "UTF8";
public ProductJSONDeSerializerUtil(Class<T> tClass) {
this.tClass=tClass;
}
/**
* Method to decide whether the element signals the end of the stream. If true is returned the
* element won't be emitted.
*
* @param Element The element to test for the end-of-stream signal.
* @return True, if the element signals end of stream, false otherwise.
*/
@Override
public boolean isEndOfStream(T Element) {
return false;
}
/**
* Deserializes the Kafka record.
*
* @param record Kafka record to be deserialized.
* @return The deserialized message as an object (null if the message cannot be deserialized).
*/
@Override
public T deserialize(ConsumerRecord<byte[], byte[]> record) throws Exception {
if(record!=null){
String value = new String(record.value(), encoding);
JSONObject jsonObject = JSONObject.parseObject(value);
return JSON.parseObject(jsonObject.toString(),this.tClass);
}
return null;
}
/**
* Gets the data type (as a {@link TypeInformation}) produced by this function or input format.
*
* @return The data type produced by this function or input format.
*/
@Override
public TypeInformation<T> getProducedType() {
return TypeInformation.of(this.tClass);
}
}
KafkaSource<OrderInfo> orderInfoKafkaSource = KafkaSource.<OrderInfo>builder()
.setBootstrapServers(Constants.KAFKA_BROKER)
.setTopics(orderInfoSourceTopic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new ProductJSONDeSerializerUtil<>(OrderInfo.class))).build();
KafkaSource<OrderDetail> orderDetailKafkaSource = KafkaSource.<OrderDetail>builder()
.setBootstrapServers(Constants.KAFKA_BROKER)
.setTopics(orderDetailSourceTopic)
.setGroupId(groupId)
.setStartingOffsets(OffsetsInitializer.earliest())
.setDeserializer(KafkaRecordDeserializationSchema.of(new ProductJSONDeSerializerUtil<>(OrderDetail.class))).build();
这样就就能把所有topic消费的反序列化操作专门放在一个方法里。
参考文献:
https://blog.csdn.net/xianpanjia4616/article/details/117849943
https://blog.csdn.net/xianpanjia4616?type=blog
https://blog.csdn.net/xianpanjia4616/article/details/112970428?spm=1001.2014.3001.5502