flink 实现指标分组统计分析
  UqrkOCyfkQZc 2023年11月02日 59 0
" select " +
                "    DATE_FORMAT(TUMBLE_START(rt, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') stt, " +
                "    DATE_FORMAT(TUMBLE_END(rt, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') edt, " +
                "    province_id, " +
                "    province_name, " +
                "    province_area_code, " +
                "    province_iso_code, " +
                "    count(distinct order_id) order_count, " +
                "    sum(original_total_amount) order_amount, " +
                "    UNIX_TIMESTAMP()*1000 ts " +
                "from " +
                "    order_wide " +
                "group by " +
                "    province_id ," +
                "    province_name, " +
                "    province_area_code, " +
                "    province_iso_code, " +
                "    TUMBLE(rt, INTERVAL '10' SECOND)");

这是通过flinksql需要实现的查询分析 语句,现需要通过flink stream来实现

由于province_id ,province_name,province_area_code,province_iso_code是一一对应的,所以转化为API只需要keyby(province_id),如果不是,则需要引入new KeySelector(OrderWide,Tuple4<Long,String,String,String>),主要的问题是卡在count(distinct Order_id)上,上网查看了答案五花八门,前后花了5天时间吧,才勉强搞出了一套能得出跟上述sql方式实现相同结果的代码。

主要的思路是:需要清楚每一步操作的对象,以及生成的对象的类型。这里在统计时可以抽象成select a,sum(b),count(distinct c) from table group by a;

由于count(distinct c)跟后面的分组字段a不同,所以在count时,需要分两步才能实现。为此构造了两个pojo。

public ProvinceStats(OrderWide orderWide) throws ParseException {
        province_id=orderWide.getProvince_id();
        province_name=orderWide.getProvince_name();
        province_area_code=orderWide.getProvince_area_code();
        province_iso_code=orderWide.getProvince_iso_code();
        order_amount=orderWide.getOriginal_total_amount();
        // 注意是毫秒为单位
        long windowsize = 10000L;
        // 注意是毫秒为单位,滚动窗口 offset = 0L
        long offset = 0L;
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
         long time = sdf.parse(orderWide.getCreate_time()).getTime();
         stt=sdf.format(getWindowStartWithOffset(time, offset, windowsize));
         long time1=time+windowsize;
         edt=sdf.format(new Date(time1));

         order_count=0L;

         ts=new Date().getTime();

    }
//计算窗口的起始时间
 private static  long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {

        return timestamp - (timestamp - offset + windowSize) % windowSize;
    }
public ProvinceStats1(ProvinceStats provinceStats) throws ParseException {
        province_id=provinceStats.getProvince_id();
        province_name=provinceStats.getProvince_name();
        province_area_code=provinceStats.getProvince_area_code();
        province_iso_code=provinceStats.getProvince_iso_code();
        order_amount=provinceStats.getOrder_amount();
        order_count=0L;
        stt=provinceStats.getStt();
        edt=provinceStats.getEdt();
        ts=new Date().getTime();

    }

虽然两个实体的数据类型相同,但初始化条件却有差别。

实现keyby统计代码如下:

SingleOutputStreamOperator<OrderWide> orderWideDs = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderWide>forMonotonousTimestamps()
                .withTimestampAssigner(new SerializableTimestampAssigner<OrderWide>() {


                    @Override
                    public long extractTimestamp(OrderWide element, long recordTimestamp) {
                       // return System.currentTimeMillis();
                         SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
                        try {
                            return  sdf.parse(element.getCreate_time()).getTime();
                        } catch (ParseException e) {
                            throw new RuntimeException(e);
                        }



                    }
                }));




SingleOutputStreamOperator<ProvinceStats> provinceStatsDs = orderWideDs.keyBy(new KeySelector<OrderWide, Tuple2<Long, Long>>() {

                    @Override
                    public Tuple2<Long, Long> getKey(OrderWide value) throws Exception {
                        return Tuple2.of(value.getProvince_id(), value.getOrder_id());
                    }
                })
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))

                //聚合
                .process(new ProvinceProcessWindowFunction());
         SingleOutputStreamOperator<ProvinceStats1> process = provinceStatsDs.keyBy(ProvinceStats::getProvince_id)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .process(new ProvinceProcessWindowFunction1());

上面是count(distinct)有累加的元素,如果不累加的话,也可以使用如下的代码:

OrderWide orderWide1 = iterable.iterator().next();
        ProvinceStatsZhang provinceStats = new ProvinceStatsZhang(orderWide1);
        Set<Long> orderId = new HashSet<>();
        for (OrderWide orderWide : iterable) {
            provinceStats.setOrder_amount(provinceStats.getOrder_amount().add(orderWide.getOriginal_total_amount()));
            orderId.add(orderWide.getOrder_id());
        }
		//注意此处的order_count字段赋值需要设置在循环体外,否则结果是错的。
        provinceStats.setOrder_count((long) orderId.size());
        SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
        provinceStats.setStt(sdf.format(new Date(context.window().getStart())));
        provinceStats.setEdt(sdf.format(new Date(context.window().getEnd())));
        collector.collect(provinceStats);

//对应放入实体对象设置是:
 public ProvinceStatsZhang(OrderWide orderWide) {
        province_id = orderWide.getProvince_id();
        province_name = orderWide.getProvince_name();
        province_area_code = orderWide.getProvince_area_code();
        province_iso_code = orderWide.getProvince_iso_code();

        order_count = 0L;
        order_amount = new BigDecimal(0);
        ts = new Date().getTime();
    }

对应的水位线设置:(窗口时间使用事件时间)
SingleOutputStreamOperator<OrderWide> orderWideDataStream = dataStreamSource.assignTimestampsAndWatermarks(
                WatermarkStrategy.<OrderWide>forMonotonousTimestamps()
                        .withTimestampAssigner(new SerializableTimestampAssigner<OrderWide>() {
                            @Override
                            public long extractTimestamp(OrderWide element, long recordTimestamp) {
                                try {
                                    return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(element.getCreate_time()).getTime();
                                } catch (ParseException e) {
                                    throw new RuntimeException(e);
                                }
                            }
                        }));

两种方法的的执行计划如下:

{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: orderWideKafkaSource",
    "pact" : "Data Source",
    "contents" : "Source: orderWideKafkaSource",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Timestamps/Watermarks",
    "pact" : "Operator",
    "contents" : "Timestamps/Watermarks",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "TumblingEventTimeWindows",
    "pact" : "Operator",
    "contents" : "Window(TumblingEventTimeWindows(10000), EventTimeTrigger, ProvinceProcessWindowFunction)",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 6,
    "type" : "TumblingEventTimeWindows",
    "pact" : "Operator",
    "contents" : "Window(TumblingEventTimeWindows(10000), EventTimeTrigger, ProvinceProcessWindowFunction1)",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 7,
    "type" : "Sink: Print to Std. Out",
    "pact" : "Data Sink",
    "contents" : "Sink: Print to Std. Out",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 6,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 8,
    "type" : "Sink: Unnamed",
    "pact" : "Data Sink",
    "contents" : "Sink: Unnamed",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 6,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}
{
  "nodes" : [ {
    "id" : 1,
    "type" : "Source: orderWideKafkaSource",
    "pact" : "Data Source",
    "contents" : "Source: orderWideKafkaSource",
    "parallelism" : 1
  }, {
    "id" : 2,
    "type" : "Timestamps/Watermarks",
    "pact" : "Operator",
    "contents" : "Timestamps/Watermarks",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 1,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  }, {
    "id" : 4,
    "type" : "TumblingEventTimeWindows",
    "pact" : "Operator",
    "contents" : "Window(TumblingEventTimeWindows(10000), EventTimeTrigger, ProvinceProcessWindowFunctionZhang)",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 2,
      "ship_strategy" : "HASH",
      "side" : "second"
    } ]
  }, {
    "id" : 5,
    "type" : "Sink: Print to Std. Out",
    "pact" : "Data Sink",
    "contents" : "Sink: Print to Std. Out",
    "parallelism" : 1,
    "predecessors" : [ {
      "id" : 4,
      "ship_strategy" : "FORWARD",
      "side" : "second"
    } ]
  } ]
}

显然,第二种的代码更简洁高效。

参考文献:

https://blog.csdn.net/qq_36923376/article/details/88081428

https://blog.csdn.net/kamputer/article/details/121612169?spm=1001.2101.3001.6650.1&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-1-121612169-blog-127390261.235%5Ev33%5Epc_relevant_increate_t0_download_v2&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-1-121612169-blog-127390261.235%5Ev33%5Epc_relevant_increate_t0_download_v2&utm_relevant_index=2

https://cloud.tencent.com/developer/ask/sof/367275

处理函数:

http://python1234.cn/archives/bgdata19392

Flink使用POJO实现分组和汇总

https://www.cnblogs.com/yoyowin/p/14800178.html

如何改进flink中数据流实现的count distinct?

https://www.saoniuhuo.com/question/detail-2031993.html

基于Flink 的实时 精准去重方法总结

https://blog.csdn.net/weixin_42993799/article/details/106436637

Flink数据统计UV、PV统计(三种写法)

https://blog.csdn.net/baifanwudi/article/details/105428152?spm=1001.2101.3001.6650.8&utm_medium=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-8-105428152-blog-121092253.235%5Ev34%5Epc_relevant_increate_t0_download_v2&depth_1-utm_source=distribute.pc_relevant.none-task-blog-2%7Edefault%7EBlogCommendFromBaidu%7ERate-8-105428152-blog-121092253.235%5Ev34%5Epc_relevant_increate_t0_download_v2&utm_relevant_index=11

Flink计算pv和uv的通用方法

https://blog.csdn.net/ddxygq/article/details/121092253

基于flink的电商用户行为数据分析【3】| 实时流量统计(系列文章值得一看)

https://blog.51cto.com/u_15105906/5849971

https://blog.51cto.com/u_15105906/4966884

flink学习总括

https://www.lmlphp.com/user/57801/article/item/1617606/







【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
  KRe60ogUm4le   2024年05月31日   101   0   0 flink大数据
  F36IaJwrKLcw   2023年12月23日   39   0   0 idesparkidesparkDataData
  KRe60ogUm4le   2024年05月31日   31   0   0 flink大数据
UqrkOCyfkQZc
最新推荐 更多

2024-05-31