【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project
  nNPyvzOmRTFq 2023年12月07日 20 0

Flink 系列文章

1、Flink 专栏等系列综合文章链接


(文章目录)


本文主要介绍Flink 的11种常用的operator(union、window join、connect、outputtag、cache、iterator、project)及以具体可运行示例进行说明. 如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。 本文除了maven依赖外,没有其他依赖。

本专题分为五篇,即: 【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter 【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations 【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等 【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project 【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)

一、Flink的23种算子说明及示例

本文示例中使用的maven依赖和java bean 参考本专题的第一篇中的maven和java bean。

15、Union

Union 函数将两个或多个数据流结合在一起。 这样就可以并行地组合数据流。 如果我们将一个流与自身组合,那么它会输出每个记录两次。

public static void unionFunction(ExecutionEnvironment env) throws Exception {
		//Produces the union of two DataSets, which have to be of the same type. A union of more than two DataSets can be implemented with multiple union calls
		List<String> info1 = new ArrayList<>();
        info1.add("team A");
        info1.add("team B");
        
        List<String> info2 = new ArrayList<>();
        info2.add("team C");
        info2.add("team D");
        
        List<String> info3 = new ArrayList<>();
        info3.add("team E");
        info3.add("team F");
        
        List<String> info4 = new ArrayList<>();
        info4.add("team G");
        info4.add("team H");
        
        DataSet<String> source1 = env.fromCollection(info1);
        DataSet<String> source2 = env.fromCollection(info2);
        DataSet<String> source3 = env.fromCollection(info3);
        DataSet<String> source4 = env.fromCollection(info4);
        
        source1.union(source2).union(source3).union(source4).print();
//        team A
//        team C
//        team E
//        team G
//        team B
//        team D
//        team F
//        team H
	} 

16、Window Join

DataStream,DataStream → DataStream 可以通过一些 key 将同一个 window 的两个数据流 join 起来。 在 5 秒的窗口中连接两个流,其中第一个流的第一个属性的连接条件等于另一个流的第二个属性

inputStream.join(inputStream1)
           .where(0).equalTo(1)
           .window(Time.seconds(5))     
           .apply (new JoinFunction () {...});

inputStream.join(otherStream)
    .where(<key selector>).equalTo(<key selector>)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new JoinFunction () {...});

具体介绍参考文章: 【flink番外篇】2、flink的23种算子window join 和interval join 介绍及详细示例

17、Interval Join

KeyedStream,KeyedStream → DataStream 根据 key 相等并且满足指定的时间范围内(e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound)的条件将分别属于两个 keyed stream 的元素 e1 和 e2 Join 在一起。

// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
    .between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
    .upperBoundExclusive(true) // optional
    .lowerBoundExclusive(true) // optional
    .process(new IntervalJoinFunction() {...});

具体介绍参考文章: 【flink番外篇】2、flink的18种算子window join 和interval join 介绍及详细示例

18、Window CoGroup

DataStream,DataStream → DataStream 根据指定的 key 和窗口将两个数据流组合在一起。

dataStream.coGroup(otherStream)
    .where(0).equalTo(1)
    .window(TumblingEventTimeWindows.of(Time.seconds(3)))
    .apply (new CoGroupFunction () {...});

19、Connect

DataStream,DataStream → ConnectedStreams connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:

  • connect只能连接两个数据流,union可以连接多个数据流。
  • connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。 两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;

/**
 * @author alanchan
*         union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First
*         In First Out)的模式合并,且不去重。 connect只能连接两个数据流,union可以连接多个数据流。
*         connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
*         两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
 *
 */
public class TestConnectDemo {
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);

		// source
		DataStream<String> ds1 = env.fromElements("i", "am", "alanchan");
		DataStream<String> ds2 = env.fromElements("i", "like", "flink");
		DataStream<Long> ds3 = env.fromElements(10L, 20L, 30L);

		// transformation
		// 注意union能合并同类型
		DataStream<String> result1 = ds1.union(ds2);
		// union不可以合并不同类,直接出错
//		ds1.union(ds3);

		// connet可以合并同类型
		ConnectedStreams<String, String> result2 = ds1.connect(ds2);
		// connet可以合并不同类型
		ConnectedStreams<String, Long> result3 = ds1.connect(ds3);

		/*
		 * public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable { 
		 * 		OUT map1(IN1 value) throws Exception; 
		 *		OUT map2(IN2 value) throws Exception;
		 * }
		 */
		DataStream<String> result = result3.map(new CoMapFunction<String, Long, String>() {

			private static final long serialVersionUID = 1L;

			@Override
			public String map1(String value) throws Exception {
				return value + "String";
			}

			@Override
			public String map2(Long value) throws Exception {
				return value * 2 + "_Long";
			}
		});

		// sink
		result1.print();
		// connect之后需要做其他的处理,不能直接输出
		// result2.print();
		// result3.print();
		result.print();

		// execute
		env.execute();
	}
}

20、CoMap, CoFlatMap

ConnectedStreams → DataStream 类似于在连接的数据流上进行 map 和 flatMap。

connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
    @Override
    public Boolean map1(Integer value) {
        return true;
    }

    @Override
    public Boolean map2(String value) {
        return false;
    }
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {

   @Override
   public void flatMap1(Integer value, Collector<String> out) {
       out.collect(value.toString());
   }

   @Override
   public void flatMap2(String value, Collector<String> out) {
       for (String word: value.split(" ")) {
         out.collect(word);
       }
   }
});

21、Iterate

DataStream → IterativeStream → ConnectedStream 通过将一个算子的输出重定向到某个之前的算子来在流中创建“反馈”循环。这对于定义持续更新模型的算法特别有用。下面的代码从一个流开始,并不断地应用迭代自身。大于 0 的元素被发送回反馈通道,其余元素被转发到下游。

IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value > 0;
    }
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
    @Override
    public boolean filter(Long value) throws Exception {
        return value <= 0;
    }
});

22、Cache

DataStream → CachedDataStream 把算子的结果缓存起来。目前只支持批执行模式下运行的作业。算子的结果在算子第一次执行的时候会被缓存起来,之后的 作业中会复用该算子缓存的结果。如果算子的结果丢失了,它会被原来的算子重新计算并缓存。

DataStream<Integer> dataStream = //...
CachedDataStream<Integer> cachedDataStream = dataStream.cache();
cachedDataStream.print(); // Do anything with the cachedDataStream
...
env.execute(); // Execute and create cache.
        
cachedDataStream.print(); // Consume cached result.
env.execute();

23、Split

此功能根据条件将流拆分为两个或多个流。 当获得混合流并且可能希望单独处理每个数据流时,可以使用此方法。新版本使用OutputTag替代。

SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {
    @Override
    public Iterable<String> select(Integer value) {
        List<String> output = new ArrayList<String>(); 
        if (value % 2 == 0) {
            output.add("even");
        }
        else {
            output.add("odd");
        }
        return output;
    }
});

OutputTag示例如下

import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;

/**
 * @author alanchan
 *
 */
public class TestOutpuTagAndProcessDemo {
	public static void main(String[] args) throws Exception {
		// env
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
		// Source
		DataStreamSource<String> ds = env.fromElements("alanchanchn is my vx", "i like flink", "alanchanchn is my name", "i like kafka too", "alanchanchn is my true vx");

		// transformation
		// 对流中的数据按照alanchanchn拆分并选择
		OutputTag<String> nameTag = new OutputTag<>("alanchanchn", TypeInformation.of(String.class));
		OutputTag<String> frameworkTag = new OutputTag<>("framework", TypeInformation.of(String.class));

//		public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
//
//			private static final long serialVersionUID = 1L;
//
//			public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
//
//			public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
//
//			public abstract class Context {
//
//				public abstract Long timestamp();
//
//				public abstract TimerService timerService();
//
//				public abstract <X> void output(OutputTag<X> outputTag, X value);
//			}
//
//			public abstract class OnTimerContext extends Context {
//				public abstract TimeDomain timeDomain();
//			}
//
//		}

		SingleOutputStreamOperator<String> result = ds.process(new ProcessFunction<String, String>() {

			@Override
			public void processElement(String inValue, Context ctx, Collector<String> outValue) throws Exception {
				// out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag
				if (inValue.startsWith("alanchanchn")) {
					ctx.output(nameTag, inValue);
				} else {
					ctx.output(frameworkTag, inValue);
				}

			}
		});

		DataStream<String> nameResult = result.getSideOutput(nameTag);
		DataStream<String> frameworkResult = result.getSideOutput(frameworkTag);

		// .sink
		System.out.println(nameTag);// OutputTag(Integer, 奇数)
		System.out.println(frameworkTag);// OutputTag(Integer, 偶数)
		nameResult.print("name->");
		frameworkResult.print("framework->");
//		OutputTag(String, alanchanchn)
//		OutputTag(String, framework)
//		framework->> alanchanchn is my vx
//		name->> alanchanchn is my name
//		framework->> i like flink
//		name->> alanchanchn is my true vx
//		framework->> i like kafka too
		
		
		// execute
		env.execute();

	}
}

24、Select

此功能允许您从拆分流中选择特定流。新版本使用OutputTag替代。

SplitStream<Integer> split;
DataStream<Integer> even = split.select("even"); 
DataStream<Integer> odd = split.select("odd"); 
DataStream<Integer> all = split.select("even","odd");

参考上文中spilt中的outputtag示例。

25、Project

Project 函数允许从事件流中选择属性子集,并仅将所选元素发送到下一个处理流。

DataStream<Tuple4<Integer, Double, String, String>> in = // [...] 
DataStream<Tuple2<String, String>> out = in.project(3,2);

上述函数从给定记录中选择属性号 2 和 3。 以下是示例输入和输出记录:

(1,10.0,A,B)=> (B,A)
(2,20.0,C,D)=> (D,C)
  • 完整示例
import java.util.Arrays;

import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;

/**
 * @author alanchan
 *
 */
public class TestprojectDemo {

	/**
	 * @param args
	 * @throws Exception
	 */
	public static void main(String[] args) throws Exception {
		StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
		
		DataStream<Tuple5<Integer, String, Integer, String,Double>> in = env.fromCollection(Arrays.asList(
				Tuple5.of(1, "alan", 17, "alan.chan.chn@163.com", 20d),
				Tuple5.of(2, "alanchan", 18, "alan.chan.chn@163.com", 25d),
				Tuple5.of(3, "alanchanchn", 19, "alan.chan.chn@163.com", 30d),
				Tuple5.of(4, "alan_chan", 18, "alan.chan.chn@163.com", 25d),
				Tuple5.of(5, "alan_chan_chn", 20, "alan.chan.chn@163.com", 30d)
				));

		DataStream<Tuple3<String, Integer,Double>> out = in.project(1, 2,4);
		out.print();
//		8> (alan,17,20.0)
//		11> (alan_chan,18,25.0)
//		12> (alan_chan_chn,20,30.0)
//		10> (alanchanchn,19,30.0)
//		9> (alanchan,18,25.0)
		
		env.execute();

	}

}

以上,本文主要介绍Flink 的11种常用的operator(union、window join、connect、outputtag、cache、iterator、project)及以具体可运行示例进行说明. 如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。

本专题分为五篇,即: 【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter 【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations 【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等 【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project 【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年12月07日 0

暂无评论

推荐阅读
nNPyvzOmRTFq
最新推荐 更多

2024-05-03