" select " +
" DATE_FORMAT(TUMBLE_START(rt, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') stt, " +
" DATE_FORMAT(TUMBLE_END(rt, INTERVAL '10' SECOND), 'yyyy-MM-dd HH:mm:ss') edt, " +
" province_id, " +
" province_name, " +
" province_area_code, " +
" province_iso_code, " +
" count(distinct order_id) order_count, " +
" sum(original_total_amount) order_amount, " +
" UNIX_TIMESTAMP()*1000 ts " +
"from " +
" order_wide " +
"group by " +
" province_id ," +
" province_name, " +
" province_area_code, " +
" province_iso_code, " +
" TUMBLE(rt, INTERVAL '10' SECOND)");
这是通过flinksql需要实现的查询分析 语句,现需要通过flink stream来实现
由于province_id ,province_name,province_area_code,province_iso_code是一一对应的,所以转化为API只需要keyby(province_id),如果不是,则需要引入new KeySelector(OrderWide,Tuple4<Long,String,String,String>),主要的问题是卡在count(distinct Order_id)上,上网查看了答案五花八门,前后花了5天时间吧,才勉强搞出了一套能得出跟上述sql方式实现相同结果的代码。
主要的思路是:需要清楚每一步操作的对象,以及生成的对象的类型。这里在统计时可以抽象成select a,sum(b),count(distinct c) from table group by a;
由于count(distinct c)跟后面的分组字段a不同,所以在count时,需要分两步才能实现。为此构造了两个pojo。
public ProvinceStats(OrderWide orderWide) throws ParseException {
province_id=orderWide.getProvince_id();
province_name=orderWide.getProvince_name();
province_area_code=orderWide.getProvince_area_code();
province_iso_code=orderWide.getProvince_iso_code();
order_amount=orderWide.getOriginal_total_amount();
// 注意是毫秒为单位
long windowsize = 10000L;
// 注意是毫秒为单位,滚动窗口 offset = 0L
long offset = 0L;
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
long time = sdf.parse(orderWide.getCreate_time()).getTime();
stt=sdf.format(getWindowStartWithOffset(time, offset, windowsize));
long time1=time+windowsize;
edt=sdf.format(new Date(time1));
order_count=0L;
ts=new Date().getTime();
}
//计算窗口的起始时间
private static long getWindowStartWithOffset(long timestamp, long offset, long windowSize) {
return timestamp - (timestamp - offset + windowSize) % windowSize;
}
public ProvinceStats1(ProvinceStats provinceStats) throws ParseException {
province_id=provinceStats.getProvince_id();
province_name=provinceStats.getProvince_name();
province_area_code=provinceStats.getProvince_area_code();
province_iso_code=provinceStats.getProvince_iso_code();
order_amount=provinceStats.getOrder_amount();
order_count=0L;
stt=provinceStats.getStt();
edt=provinceStats.getEdt();
ts=new Date().getTime();
}
虽然两个实体的数据类型相同,但初始化条件却有差别。
实现keyby统计代码如下:
SingleOutputStreamOperator<OrderWide> orderWideDs = dataStreamSource.assignTimestampsAndWatermarks(WatermarkStrategy.<OrderWide>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<OrderWide>() {
@Override
public long extractTimestamp(OrderWide element, long recordTimestamp) {
// return System.currentTimeMillis();
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
try {
return sdf.parse(element.getCreate_time()).getTime();
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}));
SingleOutputStreamOperator<ProvinceStats> provinceStatsDs = orderWideDs.keyBy(new KeySelector<OrderWide, Tuple2<Long, Long>>() {
@Override
public Tuple2<Long, Long> getKey(OrderWide value) throws Exception {
return Tuple2.of(value.getProvince_id(), value.getOrder_id());
}
})
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
//聚合
.process(new ProvinceProcessWindowFunction());
SingleOutputStreamOperator<ProvinceStats1> process = provinceStatsDs.keyBy(ProvinceStats::getProvince_id)
.window(TumblingEventTimeWindows.of(Time.seconds(10)))
.process(new ProvinceProcessWindowFunction1());
上面是count(distinct)有累加的元素,如果不累加的话,也可以使用如下的代码:
OrderWide orderWide1 = iterable.iterator().next();
ProvinceStatsZhang provinceStats = new ProvinceStatsZhang(orderWide1);
Set<Long> orderId = new HashSet<>();
for (OrderWide orderWide : iterable) {
provinceStats.setOrder_amount(provinceStats.getOrder_amount().add(orderWide.getOriginal_total_amount()));
orderId.add(orderWide.getOrder_id());
}
//注意此处的order_count字段赋值需要设置在循环体外,否则结果是错的。
provinceStats.setOrder_count((long) orderId.size());
SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
provinceStats.setStt(sdf.format(new Date(context.window().getStart())));
provinceStats.setEdt(sdf.format(new Date(context.window().getEnd())));
collector.collect(provinceStats);
//对应放入实体对象设置是:
public ProvinceStatsZhang(OrderWide orderWide) {
province_id = orderWide.getProvince_id();
province_name = orderWide.getProvince_name();
province_area_code = orderWide.getProvince_area_code();
province_iso_code = orderWide.getProvince_iso_code();
order_count = 0L;
order_amount = new BigDecimal(0);
ts = new Date().getTime();
}
对应的水位线设置:(窗口时间使用事件时间)
SingleOutputStreamOperator<OrderWide> orderWideDataStream = dataStreamSource.assignTimestampsAndWatermarks(
WatermarkStrategy.<OrderWide>forMonotonousTimestamps()
.withTimestampAssigner(new SerializableTimestampAssigner<OrderWide>() {
@Override
public long extractTimestamp(OrderWide element, long recordTimestamp) {
try {
return new SimpleDateFormat("yyyy-MM-dd HH:mm:ss").parse(element.getCreate_time()).getTime();
} catch (ParseException e) {
throw new RuntimeException(e);
}
}
}));
两种方法的的执行计划如下:
{
"nodes" : [ {
"id" : 1,
"type" : "Source: orderWideKafkaSource",
"pact" : "Data Source",
"contents" : "Source: orderWideKafkaSource",
"parallelism" : 1
}, {
"id" : 2,
"type" : "Timestamps/Watermarks",
"pact" : "Operator",
"contents" : "Timestamps/Watermarks",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 4,
"type" : "TumblingEventTimeWindows",
"pact" : "Operator",
"contents" : "Window(TumblingEventTimeWindows(10000), EventTimeTrigger, ProvinceProcessWindowFunction)",
"parallelism" : 1,
"predecessors" : [ {
"id" : 2,
"ship_strategy" : "HASH",
"side" : "second"
} ]
}, {
"id" : 6,
"type" : "TumblingEventTimeWindows",
"pact" : "Operator",
"contents" : "Window(TumblingEventTimeWindows(10000), EventTimeTrigger, ProvinceProcessWindowFunction1)",
"parallelism" : 1,
"predecessors" : [ {
"id" : 4,
"ship_strategy" : "HASH",
"side" : "second"
} ]
}, {
"id" : 7,
"type" : "Sink: Print to Std. Out",
"pact" : "Data Sink",
"contents" : "Sink: Print to Std. Out",
"parallelism" : 1,
"predecessors" : [ {
"id" : 6,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 8,
"type" : "Sink: Unnamed",
"pact" : "Data Sink",
"contents" : "Sink: Unnamed",
"parallelism" : 1,
"predecessors" : [ {
"id" : 6,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
{
"nodes" : [ {
"id" : 1,
"type" : "Source: orderWideKafkaSource",
"pact" : "Data Source",
"contents" : "Source: orderWideKafkaSource",
"parallelism" : 1
}, {
"id" : 2,
"type" : "Timestamps/Watermarks",
"pact" : "Operator",
"contents" : "Timestamps/Watermarks",
"parallelism" : 1,
"predecessors" : [ {
"id" : 1,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
}, {
"id" : 4,
"type" : "TumblingEventTimeWindows",
"pact" : "Operator",
"contents" : "Window(TumblingEventTimeWindows(10000), EventTimeTrigger, ProvinceProcessWindowFunctionZhang)",
"parallelism" : 1,
"predecessors" : [ {
"id" : 2,
"ship_strategy" : "HASH",
"side" : "second"
} ]
}, {
"id" : 5,
"type" : "Sink: Print to Std. Out",
"pact" : "Data Sink",
"contents" : "Sink: Print to Std. Out",
"parallelism" : 1,
"predecessors" : [ {
"id" : 4,
"ship_strategy" : "FORWARD",
"side" : "second"
} ]
} ]
}
显然,第二种的代码更简洁高效。
参考文献:
https://blog.csdn.net/qq_36923376/article/details/88081428
https://cloud.tencent.com/developer/ask/sof/367275
处理函数:
http://python1234.cn/archives/bgdata19392
Flink使用POJO实现分组和汇总
https://www.cnblogs.com/yoyowin/p/14800178.html
如何改进flink中数据流实现的count distinct?
https://www.saoniuhuo.com/question/detail-2031993.html
基于Flink 的实时 精准去重方法总结
https://blog.csdn.net/weixin_42993799/article/details/106436637
Flink数据统计UV、PV统计(三种写法)
Flink计算pv和uv的通用方法
https://blog.csdn.net/ddxygq/article/details/121092253
基于flink的电商用户行为数据分析【3】| 实时流量统计(系列文章值得一看)
https://blog.51cto.com/u_15105906/5849971
https://blog.51cto.com/u_15105906/4966884
flink学习总括
https://www.lmlphp.com/user/57801/article/item/1617606/