【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(2)- interval join
  nNPyvzOmRTFq 2023年12月07日 16 0

Flink 系列文章

1、Flink 专栏等系列综合文章链接


(文章目录)


本文主要介绍Flink 的常用的operator interval join 及详细示例。 如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。 本文除了maven依赖外,没有其他依赖。本文maven依赖参考【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(1)- window join中的依赖。

本专题分为四篇文章介绍,即 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(1)- window join 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(2)- interval join 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(3)- 数据倾斜处理、分区示例 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例-完整版

三、interval join

Interval join 组合元素的条件为:两个流(我们暂时称为 A 和 B)中 key 相同且 B 中元素的 timestamp 处于 A 中元素 timestamp 的一定范围内。

这个条件可以更加正式地表示为 b.timestamp ∈ [a.timestamp + lowerBound; a.timestamp + upperBound] 或 a.timestamp + lowerBound <= b.timestamp <= a.timestamp + upperBound

这里的 a 和 b 为 A 和 B 中共享相同 key 的元素。上界和下界可正可负,只要下界永远小于等于上界即可。 Interval join 目前仅执行 inner join。

当一对元素被传递给 ProcessJoinFunction,他们的 timestamp 会从两个元素的 timestamp 中取最大值 (timestamp 可以通过 ProcessJoinFunction.Context 访问)。

Interval join 截至版本1.17 仅支持 event time。

在这里插入图片描述 上例中,我们 join 了橙色和绿色两个流,join 的条件是:以 -2 毫秒为下界、+1 毫秒为上界。 默认情况下,上下界也被包括在区间内,但 .lowerBoundExclusive() 和 .upperBoundExclusive() 可以将它们排除在外。

图中三角形所表示的条件也可以写成更加正式的表达式:

orangeElem.ts + lowerBound <= greenElem.ts <= orangeElem.ts + upperBound

  • 示例代码

import org.apache.flink.api.java.functions.KeySelector;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;

...
DataStream<Integer> orangeStream = ...;
DataStream<Integer> greenStream = ...;

orangeStream
    .keyBy(<KeySelector>)
    .intervalJoin(greenStream.keyBy(<KeySelector>))
    .between(Time.milliseconds(-2), Time.milliseconds(1))
    .process (new ProcessJoinFunction<Integer, Integer, String>(){

        @Override
        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
            out.collect(left + "," + right);
        }
    });

1、示例

本示例与上述的window joing构造数据一样,不同的就是数据聚合方式不同,也就是window join与interval join的功能不同,其他都一样。 本示例功能是通过系统模拟生成订单数据,然后通过订单关联商品信息,统计订单的金额。 本示例有2种实现方式,其区别就是WatermarkStrategy的实现方式不同,一个是匿名类,一个是实现接口。

1)、数据结构及bean

  • 商品类
package org.datastreamapi.operator.window.bean;

import java.math.BigDecimal;
import java.util.ArrayList;
import java.util.List;
import java.util.Random;

import com.alibaba.fastjson.JSON;

import lombok.Data;

/**
 * @author alanchan
 *
 */
// 商品类(商品id,商品名称,商品价格)
@Data
public class Goods {
	private String goodsId;
	private String goodsName;
	private BigDecimal goodsPrice;
	public static List<Goods> GOODSLIST;
	public static Random r;

	static {
		r = new Random();
		GOODSLIST = new ArrayList<>();
		GOODSLIST.add(new Goods("1", "iphone11", new BigDecimal(6000)));
		GOODSLIST.add(new Goods("2", "iphone12", new BigDecimal(7000)));
		GOODSLIST.add(new Goods("3", "MacBookPro", new BigDecimal(15000)));
		GOODSLIST.add(new Goods("4", "iphone13", new BigDecimal(8000)));
		GOODSLIST.add(new Goods("5", "iphone14", new BigDecimal(9000)));
		GOODSLIST.add(new Goods("6", "iphone15", new BigDecimal(10000)));
	}

	public static Goods randomGoods() {
		int rIndex = r.nextInt(GOODSLIST.size());
		return GOODSLIST.get(rIndex);
	}

	public Goods() {
	}

	public Goods(String goodsId, String goodsName, BigDecimal goodsPrice) {
		this.goodsId = goodsId;
		this.goodsName = goodsName;
		this.goodsPrice = goodsPrice;
	}

	@Override
	public String toString() {
		return JSON.toJSONString(this);
	}
}

  • 订单类
package org.datastreamapi.operator.window.bean;

import com.alibaba.fastjson.JSON;

import lombok.Data;

/**
 * @author alanchan
 *
 */
// 订单明细类(订单id,商品id,商品数量)
@Data
public class Order {
	private String itemId;
	private String goodsId;
	private Integer count;

	@Override
	public String toString() {
		return JSON.toJSONString(this);
	}
}

  • 商品和订单关联类
package org.datastreamapi.operator.window.bean;

import java.math.BigDecimal;

import com.alibaba.fastjson.JSON;

import lombok.Data;

/**
 * @author alanchan
 *
 */
// 商品类(商品id,商品名称,商品价格)
// 订单明细类(订单id,商品id,商品数量)
// 关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
@Data
public class OrderItem {
	private String goodsId;
	private String goodsName;
	private BigDecimal count;
	private BigDecimal total;

	@Override
	public String toString() {
		return JSON.toJSONString(this);
	}
}

2)、定义商品和订单数据源

  • 商品数据源
package org.datastreamapi.operator.window.source;

import java.util.concurrent.TimeUnit;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.datastreamapi.operator.window.bean.Goods;

/**
 * @author alanchan
 *
 */
public class GoodsSource extends RichSourceFunction<Goods> {
	private Boolean isCancel;

	@Override
	public void open(Configuration parameters) throws Exception {
		isCancel = false;
	}

	@Override
	public void run(SourceContext sourceContext) throws Exception {
		while (!isCancel) {
			Goods.GOODSLIST.stream().forEach(goods -> sourceContext.collect(goods));
			TimeUnit.SECONDS.sleep(1);
		}
	}

	@Override
	public void cancel() {
		isCancel = true;
	}
}

  • 订单数据源
package org.datastreamapi.operator.window.source;

import java.util.Random;
import java.util.UUID;
import java.util.concurrent.TimeUnit;

import org.apache.flink.configuration.Configuration;
import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
import org.datastreamapi.operator.window.bean.Goods;
import org.datastreamapi.operator.window.bean.Order;

/**
 * @author alanchan
 *
 */
public class OrderSource extends RichSourceFunction<Order>{
	private Boolean isCancel;
	private Random r;

	@Override
	public void open(Configuration parameters) throws Exception {
		isCancel = false;
		r = new Random();
	}

	@Override
	public void run(SourceContext sourceContext) throws Exception {
		while (!isCancel) {
			Goods goods = Goods.randomGoods();
			Order order = new Order();
			order.setGoodsId(goods.getGoodsId());
			order.setCount(r.nextInt(10) + 1);
			order.setItemId(UUID.randomUUID().toString());
			sourceContext.collect(order);

			// 模拟一个订单中有多个商品
			order.setGoodsId("10");
			sourceContext.collect(order);
			TimeUnit.SECONDS.sleep(1);
		}
	}

	@Override
	public void cancel() {
		isCancel = true;
	}
}

3)、interval join 实现方式二

/**
 * @author alanchan
 */
package org.datastreamapi.operator.window;

import java.math.BigDecimal;
import java.time.Duration;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.datastreamapi.operator.window.bean.Goods;
import org.datastreamapi.operator.window.bean.Order;
import org.datastreamapi.operator.window.bean.OrderItem;
import org.datastreamapi.operator.window.source.GoodsSource;
import org.datastreamapi.operator.window.source.OrderSource;

/**
 * @author alanchan
 *
 */
public class TestIntervalJoinDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// 0.env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// 1.source
		// 商品数据流
		DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());
		// 订单数据流
		DataStreamSource<Order> orderDS = env.addSource(new OrderSource());

		// 给数据添加水印(直接使用系统时间作为事件时间)
		// 方式一
		SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Order>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定maxOutOfOrderness最大无序度/最大允许的延迟时间/乱序时间
				.withTimestampAssigner((element, timestamp) -> System.currentTimeMillis()));

		SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(WatermarkStrategy.<Goods>forBoundedOutOfOrderness(Duration.ofSeconds(3))// 指定maxOutOfOrderness最大无序度/最大允许的延迟时间/乱序时间
				.withTimestampAssigner((element, timestamp) -> System.currentTimeMillis()));

		// 2.transformation
		// 商品类(商品id,商品名称,商品价格)
		// 订单明细类(订单id,商品id,商品数量)
		// 关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
		// 代码示例
//				orangeStream
//			    .keyBy(<KeySelector>)
//			    .intervalJoin(greenStream.keyBy(<KeySelector>))
//			    .between(Time.milliseconds(-2), Time.milliseconds(1))
//			    .process (new ProcessJoinFunction<Integer, Integer, String(){
//			 
//			        @Override
//			        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
//			            out.collect(first + "," + second);
//			        }
//			    });
		DataStream<OrderItem> resultDS = goodsDSWithWatermark.keyBy(goods -> goods.getGoodsId())
				// join的条件:
				// 条件1.id要相等
				// 条件2. Order的时间戳 - 2 <=Goods的时间戳 <= Order的时间戳 + 1
				.intervalJoin(orderDSWithWatermark.keyBy(orderItem -> orderItem.getGoodsId())).between(Time.seconds(-2), Time.seconds(1))
				.process(new ProcessJoinFunction<Goods, Order, OrderItem>() {

					@Override
					public void processElement(Goods first, Order second, Context ctx, Collector<OrderItem> out) throws Exception {
						OrderItem orderItem = new OrderItem();
						orderItem.setGoodsId(first.getGoodsId());
						orderItem.setGoodsName(first.getGoodsName());
						orderItem.setCount(new BigDecimal(second.getCount()));
						orderItem.setTotal(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));
						out.collect(orderItem);
					}
				});
		// 3.sink
		resultDS.print();

		// 4.execute
		env.execute();
	}

}

4)、interval join 实现方式二

  • GoodsWatermark
package org.datastreamapi.operator.window.watermark;

import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.datastreamapi.operator.window.bean.Goods;

/**
 * @author alanchan
 * 使用系统时间构建水印分配器
 */
public class GoodsWatermark implements WatermarkStrategy<Goods> {
	@Override
	public TimestampAssigner<Goods> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
		return (element, recordTimestamp) -> System.currentTimeMillis();
	}

	@Override
	public WatermarkGenerator<Goods> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
		return new WatermarkGenerator<Goods>() {
			@Override
			public void onEvent(Goods event, long eventTimestamp, WatermarkOutput output) {
				output.emitWatermark(new Watermark(System.currentTimeMillis()));
			}

			@Override
			public void onPeriodicEmit(WatermarkOutput output) {
				output.emitWatermark(new Watermark(System.currentTimeMillis()));
			}
		};
	}

}

  • OrderWatermark
package org.datastreamapi.operator.window.watermark;

import org.apache.flink.api.common.eventtime.TimestampAssigner;
import org.apache.flink.api.common.eventtime.TimestampAssignerSupplier;
import org.apache.flink.api.common.eventtime.Watermark;
import org.apache.flink.api.common.eventtime.WatermarkGenerator;
import org.apache.flink.api.common.eventtime.WatermarkGeneratorSupplier;
import org.apache.flink.api.common.eventtime.WatermarkOutput;
import org.apache.flink.api.common.eventtime.WatermarkStrategy;
import org.datastreamapi.operator.window.bean.Order;

/**
 * @author alanchan
 * 使用系统时间构建水印分配器
 */
public class OrderWatermark implements WatermarkStrategy<Order> {
	@Override
	public TimestampAssigner<Order> createTimestampAssigner(TimestampAssignerSupplier.Context context) {
		return (element, recordTimestamp) -> System.currentTimeMillis();
	}

	@Override
	public WatermarkGenerator<Order> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context) {
		return new WatermarkGenerator<Order>() {

			@Override
			public void onEvent(Order event, long eventTimestamp, WatermarkOutput output) {
				output.emitWatermark(new Watermark(System.currentTimeMillis()));
			}

			@Override
			public void onPeriodicEmit(WatermarkOutput output) {
				output.emitWatermark(new Watermark(System.currentTimeMillis()));
			}
		};
	}
}

  • interval Join实现
package org.datastreamapi.operator.window;

import java.math.BigDecimal;

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.ProcessJoinFunction;
import org.apache.flink.streaming.api.windowing.time.Time;
import org.apache.flink.util.Collector;
import org.datastreamapi.operator.window.bean.Goods;
import org.datastreamapi.operator.window.bean.Order;
import org.datastreamapi.operator.window.bean.OrderItem;
import org.datastreamapi.operator.window.source.GoodsSource;
import org.datastreamapi.operator.window.source.OrderSource;
import org.datastreamapi.operator.window.watermark.GoodsWatermark;
import org.datastreamapi.operator.window.watermark.OrderWatermark;

/**
 * @author alanchan
 *
 */
public class TestIntervalJoinDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		// 0.env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// 1.source
		// 商品数据流
		DataStreamSource<Goods> goodsDS = env.addSource(new GoodsSource());
		// 订单数据流
		DataStreamSource<Order> orderDS = env.addSource(new OrderSource());

		// 给数据添加水印(直接使用系统时间作为事件时间)
		// 方式二
		SingleOutputStreamOperator<Goods> goodsDSWithWatermark = goodsDS.assignTimestampsAndWatermarks(new GoodsWatermark());
		SingleOutputStreamOperator<Order> orderDSWithWatermark = orderDS.assignTimestampsAndWatermarks(new OrderWatermark());

		// 2.transformation
		// 商品类(商品id,商品名称,商品价格)
		// 订单明细类(订单id,商品id,商品数量)
		// 关联结果(商品id,商品名称,商品数量,商品价格*商品数量)
		// 代码示例
//				orangeStream
//			    .keyBy(<KeySelector>)
//			    .intervalJoin(greenStream.keyBy(<KeySelector>))
//			    .between(Time.milliseconds(-2), Time.milliseconds(1))
//			    .process (new ProcessJoinFunction<Integer, Integer, String(){
//			 
//			        @Override
//			        public void processElement(Integer left, Integer right, Context ctx, Collector<String> out) {
//			            out.collect(first + "," + second);
//			        }
//			    });
		DataStream<OrderItem> resultDS = goodsDSWithWatermark.keyBy(goods -> goods.getGoodsId())
				// join的条件:
				// 条件1.id要相等
				// 条件2. Order的时间戳 - 2 <=Goods的时间戳 <= Order的时间戳 + 1
				.intervalJoin(orderDSWithWatermark.keyBy(orderItem -> orderItem.getGoodsId())).between(Time.seconds(-2), Time.seconds(1))
				.process(new ProcessJoinFunction<Goods, Order, OrderItem>() {

					@Override
					public void processElement(Goods first, Order second, Context ctx, Collector<OrderItem> out) throws Exception {
						OrderItem orderItem = new OrderItem();
						orderItem.setGoodsId(first.getGoodsId());
						orderItem.setGoodsName(first.getGoodsName());
						orderItem.setCount(new BigDecimal(second.getCount()));
						orderItem.setTotal(new BigDecimal(second.getCount()).multiply(first.getGoodsPrice()));
						out.collect(orderItem);
					}
				});
		// 3.sink
		resultDS.print();

		// 4.execute
		env.execute();
	}

}

5)、运行结果

Interval Join实现方式有2种,但运行结果类似,因为数据都是随机产生的,下述结果供参考。

5> {"count":3,"goodsId":"3","goodsName":"MacBookPro","total":45000}
1> {"count":6,"goodsId":"4","goodsName":"iphone13","total":48000}
5> {"count":3,"goodsId":"3","goodsName":"MacBookPro","total":45000}
1> {"count":6,"goodsId":"4","goodsName":"iphone13","total":48000}
7> {"count":6,"goodsId":"1","goodsName":"iphone11","total":36000}
7> {"count":6,"goodsId":"1","goodsName":"iphone11","total":36000}
5> {"count":3,"goodsId":"3","goodsName":"MacBookPro","total":45000}
1> {"count":3,"goodsId":"4","goodsName":"iphone13","total":24000}
5> {"count":3,"goodsId":"3","goodsName":"MacBookPro","total":45000}
7> {"count":2,"goodsId":"1","goodsName":"iphone11","total":12000}
1> {"count":3,"goodsId":"4","goodsName":"iphone13","total":24000}
7> {"count":2,"goodsId":"1","goodsName":"iphone11","total":12000}
7> {"count":2,"goodsId":"1","goodsName":"iphone11","total":12000}
7> {"count":2,"goodsId":"1","goodsName":"iphone11","total":12000}
7> {"count":8,"goodsId":"1","goodsName":"iphone11","total":48000}
4> {"count":10,"goodsId":"2","goodsName":"iphone12","total":70000}
7> {"count":8,"goodsId":"1","goodsName":"iphone11","total":48000}

以上,本文主要介绍Flink 的常用的operator interval join 及详细示例。 如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为四篇文章介绍,即 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(1)- window join 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(2)- interval join 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例(3)- 数据倾斜处理、分区示例 【flink番外篇】2、flink的23种算子window join 和interval join 数据倾斜、分区介绍及详细示例-完整版

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年12月07日 0

暂无评论

推荐阅读
nNPyvzOmRTFq
最新推荐 更多

2024-05-03