Flink 系列文章
(文章目录)
本文主要介绍Flink 的常用的operator window join 及详细示例。 如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。 本文除了maven依赖外,没有其他依赖。
本专题分为四篇文章介绍,即 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(1)- window join 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(2)- interval join 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(3)- 数据倾斜处理、分区示例 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例-完整版
一、maven依赖及User bean
1、maven依赖
下文中所有示例都是用该maven依赖,除非有特殊说明的情况。
<properties>
<encoding>UTF-8</encoding>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
<maven.compiler.source>1.8</maven.compiler.source>
<maven.compiler.target>1.8</maven.compiler.target>
<java.version>1.8</java.version>
<scala.version>2.12</scala.version>
<flink.version>1.17.0</flink.version>
</properties>
<dependencies>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-clients_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-java</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-scala_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-streaming-java_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-scala-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-api-java-bridge_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-planner-blink_2.12</artifactId>
<version>${flink.version}</version>
</dependency>
<dependency>
<groupId>org.apache.flink</groupId>
<artifactId>flink-table-common</artifactId>
<version>${flink.version}</version>
</dependency>
<!-- 日志 -->
<dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-log4j12</artifactId>
<version>1.7.7</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>log4j</groupId>
<artifactId>log4j</artifactId>
<version>1.2.17</version>
<scope>runtime</scope>
</dependency>
<dependency>
<groupId>org.projectlombok</groupId>
<artifactId>lombok</artifactId>
<version>1.18.2</version>
<scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-common</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-client</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>org.apache.hadoop</groupId>
<artifactId>hadoop-hdfs</artifactId>
<version>3.1.4</version>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.44</version>
</dependency>
</dependencies>
2、User bean
import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;
/**
* @author alanchan
*
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class User {
private int id;
private String name;
private String pwd;
private String email;
private int age;
private double balance;
}
二、window join
Window join 作用在两个流中有相同 key 且处于相同窗口的元素上。这些窗口可以通过 window assigner 定义,并且两个流中的元素都会被用于计算窗口的结果。
两个流中的元素在组合之后,会被传递给用户定义的 JoinFunction 或 FlatJoinFunction,用户可以用它们输出符合 join 要求的结果。
常见的用例可以总结为以下代码:
stream.join(otherStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(<WindowAssigner>)
.apply(<JoinFunction>);
语义上有一些值得注意的地方:
- 从两个流中创建成对的元素与 inner-join 类似,即一个流中的元素在与另一个流中对应的元素完成 join 之前不会被输出。
- 完成 join 的元素会将他们的 timestamp 设为对应窗口中允许的最大 timestamp。比如一个边界为 [5, 10) 窗口中的元素在 join 之后的 timestamp 为 9。
1、滚动 Window Join - TumblingEventTimeWindows
使用滚动 window join 时,所有 key 相同且共享一个滚动窗口的元素会被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。因为这个行为与 inner join 类似,所以一个流中的元素如果没有与另一个流中的元素组合起来,它就不会被输出! 如图所示,我们定义了一个大小为 2 毫秒的滚动窗口,即形成了边界为 [0,1], [2,3], ... 的窗口。图中展示了如何将每个窗口中的元素组合成对,组合的结果将被传递给 JoinFunction。注意,滚动窗口 [6,7] 将不会输出任何数据,因为绿色流当中没有数据可以与橙色流的 ⑥ 和 ⑦ 配对。
- 示例代码
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
2、滑动 Window Join - SlidingEventTimeWindows
当使用滑动 window join 时,所有 key 相同且处于同一个滑动窗口的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。当前滑动窗口内,如果一个流中的元素没有与另一个流中的元素组合起来,它就不会被输出! 注意,在某个滑动窗口中被 join 的元素不一定会在其他滑动窗口中被 join。 本例中我们定义了长度为两毫秒,滑动距离为一毫秒的滑动窗口,生成的窗口实例区间为 [-1, 0],[0,1],[1,2],[2,3], …。 X 轴下方是每个滑动窗口中被 join 后传递给 JoinFunction 的元素。图中可以看到橙色 ② 与绿色 ③ 在窗口 [2,3] 中 join,但没有与窗口 [1,2] 中任何元素 join。
- 示例代码
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(SlidingEventTimeWindows.of(Time.milliseconds(2) /* size */, Time.milliseconds(1) /* slide */))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
3、会话 Window Join - EventTimeSessionWindows
使用会话 window join 时,所有 key 相同且组合后符合会话要求的元素将被组合成对,并传递给 JoinFunction 或 FlatJoinFunction。这个操作同样是 inner join,所以如果一个会话窗口中只含有某一个流的元素,这个窗口将不会产生输出! 这里我们定义了一个间隔为至少一毫秒的会话窗口。图中总共有三个会话,前两者中两个流都有元素,它们被 join 并传递给 JoinFunction。而第三个会话中,绿流没有任何元素,所以 ⑧ 和 ⑨ 没有被 join!
- 示例代码
import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.windowing.assigners.EventTimeSessionWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
...
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;
orangeStream.join(greenStream)
.where(<KeySelector>)
.equalTo(<KeySelector>)
.window(EventTimeSessionWindows.withGap(Time.milliseconds(1)))
.apply (new JoinFunction<Integer, Integer, String> (){
@Override
public String join(Integer first, Integer second) {
return first + "," + second;
}
});
4、TumblingEventTimeWindows示例
本示例功能是通过系统模拟生成订单数据,然后通过订单关联商品信息,统计订单的金额。 本示例有2种实现方式,其区别就是WatermarkStrategy的实现方式不同,一个是匿名类,一个是实现接口。
1)、数据结构及bean
- 商品类
package org.datastreamapi.operator.window.bean;
import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;
import com.alibaba.fastjson.JSON;
import lombok.Data;
/**
* @author alanchan
*
*/
// 商品类(商品id,商品名称,商品价格)
@Data
public class Goods {
private String goodsId;
private String goodsName;
private BigDecimal goodsPrice;
public static List<Goods> GOODSLIST;
public static Random r;
static {
r = new Random();
GOODSLIST = new ArrayList<>();
GOODSLIST.add(new Goods("1", "iphone11", new BigDecimal(6000)));
GOODSLIST.add(new Goods("2", "iphone12", new BigDecimal(7000)));
GOODSLIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
GOODSLIST.add(new Goods("4", "iphone13", new BigDecimal(8000)));
GOODSLIST.add(new Goods("5", "iphone14", new BigDecimal(9000)));
GOODSLIST.add(new Goods("6", "iphone15", new BigDecimal(10000)));
}
public static Goods randomGoods() {
int rIndex = r.nextInt(GOODSLIST.size());
return GOODSLIST.get(rIndex);
}
public Goods() {
}
public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
this.goodsId = goodsId;
this.goodsName = goodsName;
this.goodsPrice = goodsPrice;
}
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
- 订单类
package org.datastreamapi.operator.window.bean;
import com.alibaba.fastjson.JSON;
import lombok.Data;
/**
* @author alanchan
*
*/
// 订单明细类(订单id,商品id,商品数量)
@Data
public class Order {
private String itemId;
private String goodsId;
private Integer count;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
- 商品和订单关联类
package org.datastreamapi.operator.window.bean;
import java.math.BigDecimal;
import com.alibaba.fastjson.JSON;
import lombok.Data;
/**
* @author alanchan
*
*/
// 商品类(商品id,商品名称,商品价格)
// 订单明细类(订单id,商品id,商品数量)
// 关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
@Data
public class OrderItem {
private String goodsId;
private String goodsName;
private BigDecimal count;
private BigDecimal total;
@Override
public String toString() {
return JSON.toJSONString(this);
}
}
2)、定义商品和订单数据源
- 商品数据源
package org.datastreamapi.operator.window.source;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.datastreamapi.operator.window.bean.Goods;
/**
* @author alanchan
*
*/
public class GoodsSource extends RichSourceFunction<Goods> {
private Boolean isCancel;
@Override
public void open(Configuration parameters) throws Exception {
isCancel = false;
}
@Override
public void run(SourceContext sourceContext) throws Exception {
while (!isCancel) {
Goods.GOODSLIST.stream().forEach(goods -> sourceContext.collect(goods));
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
isCancel = true;
}
}
- 订单数据源
package org.datastreamapi.operator.window.source;
import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;
import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.datastreamapi.operator.window.bean.Goods;
import org.datastreamapi.operator.window.bean.Order;
/**
* @author alanchan
*
*/
public class OrderSource extends RichSourceFunction<Order>{
private Boolean isCancel;
private Random r;
@Override
public void open(Configuration parameters) throws Exception {
isCancel = false;
r = new Random();
}
@Override
public void run(SourceContext sourceContext) throws Exception {
while (!isCancel) {
Goods goods = Goods.randomGoods();
Order order = new Order();
order.setGoodsId(goods.getGoodsId());
order.setCount(r.nextInt(10) + 1);
order.setItemId(UUID.randomUUID().toString());
sourceContext.collect(order);
// 模拟一个订单中有多个商品
order.setGoodsId("10");
sourceContext.collect(order);
TimeUnit.SECONDS.sleep(1);
}
}
@Override
public void cancel() {
isCancel = true;
}
}
3)、Window Join实现方式一
package org.datastreamapi.operator.window;
import java.math.BigDecimal;
import java.time.Duration;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.datastreamapi.operator.window.bean.Goods;
import org.datastreamapi.operator.window.bean.Order;
import org.datastreamapi.operator.window.bean.OrderItem;
import org.datastreamapi.operator.window.source.GoodsSource;
import org.datastreamapi.operator.window.source.OrderSource;
/**
* @author alanchan
*
*/
public class TestWindowJoinDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 1.source
// 商品数据流
DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());
// 订单数据流
DataStreamSource<Order> orderDS = env.addSource(new OrderSource());
// 给数据添加水印(这里直接使用系统时间作为事件时间)
// 方式一
SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS
.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定maxOutOfOrderness最大无序度/最大允许的延迟时间/乱序时间
.withTimestampAssigner((element, timestamp) -> System.currentTimeMillis()));
SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Goods>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定maxOutOfOrderness最大无序度/最大允许的延迟时间/乱序时间
.withTimestampAssigner((element, timestamp) -> System.currentTimeMillis()));
// 2.transformation
// 商品类(商品id,商品名称,商品价格)
// 订单明细类(订单id,商品id,商品数量)
// 关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
// 官方示例代码
// orangeStream.join(greenStream)
// .where(<KeySelector>)
// .equalTo(<KeySelector>)
// .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
// .apply (new JoinFunction<Integer, Integer, String> (){
// @Override
// public String join(Integer first, Integer second) {
// return first + "," + second;
// }
// });
DataStream<OrderItem> resultDS = goodsDSWithWatermark.join(orderDSWithWatermark).where(goods -> goods.getGoodsId()).equalTo(orderItem -> orderItem.getGoodsId())
// .where(Goods::getGoodsId)
// .equalTo(Order::getGoodsId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// <IN1, IN2, OUT>
.apply(new JoinFunction<Goods, Order, OrderItem>() {
@Override
public OrderItem join(Goods first, Order second) throws Exception {
OrderItem orderItem = new OrderItem();
orderItem.setGoodsId(first.getGoodsId());
orderItem.setGoodsName(first.getGoodsName());
orderItem.setCount(new BigDecimal(second.getCount()));
orderItem.setTotal(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));
return orderItem;
}
});
// 3.sink
resultDS.print();
// 4.execute
env.execute();
}
}
4)、WindowJoin实现方式二
- GoodsWatermark
package org.datastreamapi.operator.window.watermark;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.datastreamapi.operator.window.bean.Goods;
/**
* @author alanchan
* 使用系统时间构建水印分配器
*/
public class GoodsWatermark implements WatermarkStrategy<Goods> {
@Override
public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (element, recordTimestamp) -> System.currentTimeMillis();
}
@Override
public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Goods>() {
@Override
public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
};
}
}
- OrderWatermark
package org.datastreamapi.operator.window.watermark;
import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.datastreamapi.operator.window.bean.Order;
/**
* @author alanchan
* 使用系统时间构建水印分配器
*/
public class OrderWatermark implements WatermarkStrategy<Order> {
@Override
public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
return (element, recordTimestamp) -> System.currentTimeMillis();
}
@Override
public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
return new WatermarkGenerator<Order>() {
@Override
public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
@Override
public void onPeriodicEmit(WatermarkOutput output) {
output.emitWatermark(new Watermark(System.currentTimeMillis()));
}
};
}
}
- WindowJoin实现
package org.datastreamapi.operator.window;
import java.math.BigDecimal;
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.functions.JoinFunction;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.datastreamapi.operator.window.bean.Goods;
import org.datastreamapi.operator.window.bean.Order;
import org.datastreamapi.operator.window.bean.OrderItem;
import org.datastreamapi.operator.window.source.GoodsSource;
import org.datastreamapi.operator.window.source.OrderSource;
import org.datastreamapi.operator.window.watermark.GoodsWatermark;
import org.datastreamapi.operator.window.watermark.OrderWatermark;
/**
* @author alanchan
*
*/
public class TestWindowJoinDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
// 0.env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// 1.source
// 商品数据流
DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());
// 订单数据流
DataStreamSource<Order> orderDS = env.addSource(new OrderSource());
// 给数据添加水印(这里直接使用系统时间作为事件时间)
// 方式二
SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(new GoodsWatermark());
SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(new OrderWatermark());
// 2.transformation
// 商品类(商品id,商品名称,商品价格)
// 订单明细类(订单id,商品id,商品数量)
// 关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
// 官方示例代码
// orangeStream.join(greenStream)
// .where(<KeySelector>)
// .equalTo(<KeySelector>)
// .window(TumblingEventTimeWindows.of(Time.milliseconds(2)))
// .apply (new JoinFunction<Integer, Integer, String> (){
// @Override
// public String join(Integer first, Integer second) {
// return first + "," + second;
// }
// });
DataStream<OrderItem> resultDS = goodsDSWithWatermark.join(orderDSWithWatermark).where(goods -> goods.getGoodsId()).equalTo(orderItem -> orderItem.getGoodsId())
// .where(Goods::getGoodsId)
// .equalTo(Order::getGoodsId)
.window(TumblingEventTimeWindows.of(Time.seconds(5)))
// <IN1, IN2, OUT>
.apply(new JoinFunction<Goods, Order, OrderItem>() {
@Override
public OrderItem join(Goods first, Order second) throws Exception {
OrderItem orderItem = new OrderItem();
orderItem.setGoodsId(first.getGoodsId());
orderItem.setGoodsName(first.getGoodsName());
orderItem.setCount(new BigDecimal(second.getCount()));
orderItem.setTotal(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));
return orderItem;
}
});
// 3.sink
resultDS.print();
// 4.execute
env.execute();
}
}
5)、运行结果
WindowJoin实现方式有2种,但运行结果类似,因为数据都是随机产生的,下述结果供参考。
7> {"count":2,"goodsId":"1","goodsName":"iphone11","total":12000}
7> {"count":7,"goodsId":"1","goodsName":"iphone11","total":42000}
1> {"count":9,"goodsId":"4","goodsName":"iphone13","total":72000}
5> {"count":10,"goodsId":"3","goodsName":"MacBookPro","total":150000}
1> {"count":9,"goodsId":"4","goodsName":"iphone13","total":72000}
7> {"count":9,"goodsId":"1","goodsName":"iphone11","total":54000}
1> {"count":9,"goodsId":"4","goodsName":"iphone13","total":72000}
5> {"count":8,"goodsId":"3","goodsName":"MacBookPro","total":120000}
7> {"count":7,"goodsId":"1","goodsName":"iphone11","total":42000}
5> {"count":10,"goodsId":"3","goodsName":"MacBookPro","total":150000}
1> {"count":9,"goodsId":"4","goodsName":"iphone13","total":72000}
5> {"count":8,"goodsId":"3","goodsName":"MacBookPro","total":120000}
7> {"count":9,"goodsId":"1","goodsName":"iphone11","total":54000}
5> {"count":10,"goodsId":"3","goodsName":"MacBookPro","total":150000}
1> {"count":9,"goodsId":"4","goodsName":"iphone13","total":72000}
7> {"count":7,"goodsId":"1","goodsName":"iphone11","total":42000}
5> {"count":8,"goodsId":"3","goodsName":"MacBookPro","total":120000}
7> {"count":9,"goodsId":"1","goodsName":"iphone11","total":54000}
以上,本文主要介绍Flink 的常用的operator window join 及详细示例。 如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本专题分为四篇文章介绍,即 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(1)- window join 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(2)- interval join 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(3)- 数据倾斜处理、分区示例 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例-完整版