问题1:没有重写方法toString
最终的合流无法显示,是因为orderWide没有重写方法toString
并且此合流后的数据还没有sink,仅仅是消费kafka,因此重写方法后,默认之前的kafka offset已经提交,对于同一消费组不会再重复消费,所以需要修改消费组再重新运行。
@Override
public String toString() {
return JSON.toJSONString(this, SerializerFeature.WriteMapNullValue);
}
问题2:如何将json中null字段打印输出,可以不区分字段类型,所有null都输出为"",代码如下
@Override
public String toString() {
//使用过滤器将所有json字符为null的全部输出"",不再区分字段类型
ValueFilter filter = new ValueFilter() {
@Override
public Object process(Object object, String name, Object value) {
if(value==null){
return "";
}
return value;
}
};
return JSON.toJSONString(this,filter);
//return JSON.toJSONString(this, SerializerFeature.WriteMapNullValue);//空字符串输出null
// return JSON.toJSONString(this,SerializerFeature.WriteNullStringAsEmpty);//空字符串输出""
//return JSON.toJSONString(this, SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullStringAsEmpty,SerializerFeature.WriteNullNumberAsZero);//将所有null输出,默认数字类型的输出设置0,字符string类型的输出""
// return JSON.toJSONString(this, SerializerFeature.WRITE_MAP_NULL_FEATURES,SerializerFeature.QuoteFieldNames);//与上同功能
// return JSON.toJSONString(this,SerializerFeature.);
}
问题3:
flink双流join后关联维度表时,维度表gmail.base_trademark表少了一个tm_id=6的,这个tm_id=6关联不上(在orderWide里有这个tm_id=6的数据记录,在维度表hbase里没有这条数据),所以一直超时,后面mysql维表更新数据后经flink upsert into入hbase表后,把timeout关联维表请求的超时时间改回来,任务运行正常,这个问题的总结是删数据会直接导致任务挂掉。
问题4
由于flink sink to kafka的步骤除了在flink双流join后关联维表写入kafka dwd层会用到外,在mysql 发往kafka的ods层也是同样类似的操作,所以项目中直接讲这部操作抽取出来命名KafkaUtil.class,右键refactor->extract methods即可。然后单独放到一个类中,同时因为两个步骤操作的具体对象类型不同,前一个操作dwd的对象类型是string,后一个ods层由于需要根据mysql表发往确定topic,所以需要再封装一层,因此抽象的这个类KafkaUtil还需要一个传递泛型参数的方法,逻辑如下:
但是OrderWideJob在直接调用kafkaUtil输出string类型 flinkkafkaproducer方法时报错,因为sinkTo kafka前的数据格式是JsonObject,需要在写入kafka前转化为string。因此需要再map一下。也可以将jsonObject先map成object,然后将kafkaUtil输出成Object类型。总之就是函数输出的格式跟sinktokafka的数据格式必须保持一致。
orderWideWithUserProvinceSkuSpuCategory3TrademarkDs.map(new MapFunction<OrderWide, String>() {
@Override
public String map(OrderWide orderWide) throws Exception {
// return JSON.toJSONString(orderWide.toString(),DisableCheckSpecialChar);
Object o = com.alibaba.fastjson.JSONObject.toJSON(orderWide);
String s = JSON.toJSONString(o, SerializerFeature.MapSortField);
//直接转成string会产生反斜杠'\"',需要先转成object
return s;
}
})
.addSink(KafkaUtil.getSinkFunction(orderWideSinkTopic)
);
参考文献:
https://blog.csdn.net/m0_37979201/article/details/79493187
https://www.cnblogs.com/JonaLin/p/11577804.html