flink 双流join问题集合
  UqrkOCyfkQZc 2023年11月02日 63 0

 问题1:没有重写方法toString

flink 双流join问题集合_ide

最终的合流无法显示,是因为orderWide没有重写方法toString

并且此合流后的数据还没有sink,仅仅是消费kafka,因此重写方法后,默认之前的kafka offset已经提交,对于同一消费组不会再重复消费,所以需要修改消费组再重新运行。

@Override
    public String toString() {
        return JSON.toJSONString(this, SerializerFeature.WriteMapNullValue);
    }

问题2:如何将json中null字段打印输出,可以不区分字段类型,所有null都输出为"",代码如下

@Override
    public String toString() {
        //使用过滤器将所有json字符为null的全部输出"",不再区分字段类型
       ValueFilter filter = new ValueFilter() {


           @Override
             public Object process(Object object, String name, Object value) {
                 if(value==null){
                     return "";
                 }
                 return value;
             }
         };
       return JSON.toJSONString(this,filter);
        //return JSON.toJSONString(this, SerializerFeature.WriteMapNullValue);//空字符串输出null
       // return JSON.toJSONString(this,SerializerFeature.WriteNullStringAsEmpty);//空字符串输出""
      //return JSON.toJSONString(this,  SerializerFeature.WriteMapNullValue, SerializerFeature.WriteNullStringAsEmpty,SerializerFeature.WriteNullNumberAsZero);//将所有null输出,默认数字类型的输出设置0,字符string类型的输出""
     // return JSON.toJSONString(this, SerializerFeature.WRITE_MAP_NULL_FEATURES,SerializerFeature.QuoteFieldNames);//与上同功能
       // return JSON.toJSONString(this,SerializerFeature.);

    }

问题3:

flink双流join后关联维度表时,维度表gmail.base_trademark表少了一个tm_id=6的,这个tm_id=6关联不上(在orderWide里有这个tm_id=6的数据记录,在维度表hbase里没有这条数据),所以一直超时,后面mysql维表更新数据后经flink upsert into入hbase表后,把timeout关联维表请求的超时时间改回来,任务运行正常,这个问题的总结是删数据会直接导致任务挂掉。


flink 双流join问题集合_kafka_02

问题4

由于flink sink to kafka的步骤除了在flink双流join后关联维表写入kafka dwd层会用到外,在mysql 发往kafka的ods层也是同样类似的操作,所以项目中直接讲这部操作抽取出来命名KafkaUtil.class,右键refactor->extract methods即可。然后单独放到一个类中,同时因为两个步骤操作的具体对象类型不同,前一个操作dwd的对象类型是string,后一个ods层由于需要根据mysql表发往确定topic,所以需要再封装一层,因此抽象的这个类KafkaUtil还需要一个传递泛型参数的方法,逻辑如下:

flink 双流join问题集合_ide_03

但是OrderWideJob在直接调用kafkaUtil输出string类型 flinkkafkaproducer方法时报错,因为sinkTo kafka前的数据格式是JsonObject,需要在写入kafka前转化为string。因此需要再map一下。也可以将jsonObject先map成object,然后将kafkaUtil输出成Object类型。总之就是函数输出的格式跟sinktokafka的数据格式必须保持一致。

 orderWideWithUserProvinceSkuSpuCategory3TrademarkDs.map(new MapFunction<OrderWide, String>() {

             @Override
             public String map(OrderWide orderWide) throws Exception {

                // return JSON.toJSONString(orderWide.toString(),DisableCheckSpecialChar);
                  Object o = com.alibaba.fastjson.JSONObject.toJSON(orderWide);
                  String s = JSON.toJSONString(o, SerializerFeature.MapSortField);
                  //直接转成string会产生反斜杠'\"',需要先转成object

                  return s;

             }
         })
        .addSink(KafkaUtil.getSinkFunction(orderWideSinkTopic)
                );

参考文献:

https://blog.csdn.net/m0_37979201/article/details/79493187

https://blog.csdn.net/sinat_34524528/article/details/90703726?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-1-90703726-blog-105807075.235%5Ev32%5Epc_relevant_increate_t0_download_v2&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7ECTRLIST%7ERate-1-90703726-blog-105807075.235%5Ev32%5Epc_relevant_increate_t0_download_v2&utm_relevant_index=2

https://www.cnblogs.com/JonaLin/p/11577804.html

https://www.cnblogs.com/JonaLin/p/11577804.html

https://www.shuzhiduo.com/A/6pdDqwlKzw/

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  F36IaJwrKLcw   2023年12月23日   26   0   0 idesparkidesparkDataData
  FoZN5OJ14wRT   2023年11月28日   21   0   0 Luaideideluasedsed
UqrkOCyfkQZc
最新推荐 更多

2024-05-03