Flink 系列文章
(文章目录)
本文主要介绍Flink 的11种常用的operator(union、window join、connect、outputtag、cache、iterator、project)及以具体可运行示例进行说明. 如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。 本文除了maven依赖外,没有其他依赖。
本专题分为五篇,即: 【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter 【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations 【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等 【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project 【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)
一、Flink的23种算子说明及示例
本文示例中使用的maven依赖和java bean 参考本专题的第一篇中的maven和java bean。
15、Union
Union 函数将两个或多个数据流结合在一起。 这样就可以并行地组合数据流。 如果我们将一个流与自身组合,那么它会输出每个记录两次。
public static void unionFunction(ExecutionEnvironment env) throws Exception {
//Produces the union of two DataSets, which have to be of the same type. A union of more than two DataSets can be implemented with multiple union calls
List<String> info1 = new ArrayList<>();
info1.add("team A");
info1.add("team B");
List<String> info2 = new ArrayList<>();
info2.add("team C");
info2.add("team D");
List<String> info3 = new ArrayList<>();
info3.add("team E");
info3.add("team F");
List<String> info4 = new ArrayList<>();
info4.add("team G");
info4.add("team H");
DataSet<String> source1 = env.fromCollection(info1);
DataSet<String> source2 = env.fromCollection(info2);
DataSet<String> source3 = env.fromCollection(info3);
DataSet<String> source4 = env.fromCollection(info4);
source1.union(source2).union(source3).union(source4).print();
// team A
// team C
// team E
// team G
// team B
// team D
// team F
// team H
}
16、Window Join
DataStream,DataStream → DataStream 可以通过一些 key 将同一个 window 的两个数据流 join 起来。 在 5 秒的窗口中连接两个流,其中第一个流的第一个属性的连接条件等于另一个流的第二个属性
inputStream.join(inputStream1)
.where(0).equalTo(1)
.window(Time.seconds(5))
.apply (new JoinFunction () {...});
inputStream.join(otherStream)
.where(<key selector>).equalTo(<key selector>)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new JoinFunction () {...});
具体介绍参考文章: 【flink番外篇】2、flink的23种算子window join 和interval join 介绍及详细示例
17、Interval Join
KeyedStream,KeyedStream → DataStream 根据 key 相等并且满足指定的时间范围内(e1.timestamp + lowerBound <= e2.timestamp <= e1.timestamp + upperBound)的条件将分别属于两个 keyed stream 的元素 e1 和 e2 Join 在一起。
// this will join the two streams so that
// key1 == key2 && leftTs - 2 < rightTs < leftTs + 2
keyedStream.intervalJoin(otherKeyedStream)
.between(Time.milliseconds(-2), Time.milliseconds(2)) // lower and upper bound
.upperBoundExclusive(true) // optional
.lowerBoundExclusive(true) // optional
.process(new IntervalJoinFunction() {...});
具体介绍参考文章: 【flink番外篇】2、flink的18种算子window join 和interval join 介绍及详细示例
18、Window CoGroup
DataStream,DataStream → DataStream 根据指定的 key 和窗口将两个数据流组合在一起。
dataStream.coGroup(otherStream)
.where(0).equalTo(1)
.window(TumblingEventTimeWindows.of(Time.seconds(3)))
.apply (new CoGroupFunction () {...});
19、Connect
DataStream,DataStream → ConnectedStreams connect提供了和union类似的功能,用来连接两个数据流,它与union的区别在于:
- connect只能连接两个数据流,union可以连接多个数据流。
- connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。 两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.streaming.api.datastream.ConnectedStreams;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.co.CoMapFunction;
/**
* @author alanchan
* union算子可以合并多个同类型的数据流,并生成同类型的数据流,即可以将多个DataStream[T]合并为一个新的DataStream[T]。数据将按照先进先出(First
* In First Out)的模式合并,且不去重。 connect只能连接两个数据流,union可以连接多个数据流。
* connect所连接的两个数据流的数据类型可以不一致,union所连接的两个数据流的数据类型必须一致。
* 两个DataStream经过connect之后被转化为ConnectedStreams,ConnectedStreams会对两个流的数据应用不同的处理方法,且双流之间可以共享状态。
*
*/
public class TestConnectDemo {
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// source
DataStream<String> ds1 = env.fromElements("i", "am", "alanchan");
DataStream<String> ds2 = env.fromElements("i", "like", "flink");
DataStream<Long> ds3 = env.fromElements(10L, 20L, 30L);
// transformation
// 注意union能合并同类型
DataStream<String> result1 = ds1.union(ds2);
// union不可以合并不同类,直接出错
// ds1.union(ds3);
// connet可以合并同类型
ConnectedStreams<String, String> result2 = ds1.connect(ds2);
// connet可以合并不同类型
ConnectedStreams<String, Long> result3 = ds1.connect(ds3);
/*
* public interface CoMapFunction<IN1, IN2, OUT> extends Function, Serializable {
* OUT map1(IN1 value) throws Exception;
* OUT map2(IN2 value) throws Exception;
* }
*/
DataStream<String> result = result3.map(new CoMapFunction<String, Long, String>() {
private static final long serialVersionUID = 1L;
@Override
public String map1(String value) throws Exception {
return value + "String";
}
@Override
public String map2(Long value) throws Exception {
return value * 2 + "_Long";
}
});
// sink
result1.print();
// connect之后需要做其他的处理,不能直接输出
// result2.print();
// result3.print();
result.print();
// execute
env.execute();
}
}
20、CoMap, CoFlatMap
ConnectedStreams → DataStream 类似于在连接的数据流上进行 map 和 flatMap。
connectedStreams.map(new CoMapFunction<Integer, String, Boolean>() {
@Override
public Boolean map1(Integer value) {
return true;
}
@Override
public Boolean map2(String value) {
return false;
}
});
connectedStreams.flatMap(new CoFlatMapFunction<Integer, String, String>() {
@Override
public void flatMap1(Integer value, Collector<String> out) {
out.collect(value.toString());
}
@Override
public void flatMap2(String value, Collector<String> out) {
for (String word: value.split(" ")) {
out.collect(word);
}
}
});
21、Iterate
DataStream → IterativeStream → ConnectedStream 通过将一个算子的输出重定向到某个之前的算子来在流中创建“反馈”循环。这对于定义持续更新模型的算法特别有用。下面的代码从一个流开始,并不断地应用迭代自身。大于 0 的元素被发送回反馈通道,其余元素被转发到下游。
IterativeStream<Long> iteration = initialStream.iterate();
DataStream<Long> iterationBody = iteration.map (/*do something*/);
DataStream<Long> feedback = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value > 0;
}
});
iteration.closeWith(feedback);
DataStream<Long> output = iterationBody.filter(new FilterFunction<Long>(){
@Override
public boolean filter(Long value) throws Exception {
return value <= 0;
}
});
22、Cache
DataStream → CachedDataStream 把算子的结果缓存起来。目前只支持批执行模式下运行的作业。算子的结果在算子第一次执行的时候会被缓存起来,之后的 作业中会复用该算子缓存的结果。如果算子的结果丢失了,它会被原来的算子重新计算并缓存。
DataStream<Integer> dataStream = //...
CachedDataStream<Integer> cachedDataStream = dataStream.cache();
cachedDataStream.print(); // Do anything with the cachedDataStream
...
env.execute(); // Execute and create cache.
cachedDataStream.print(); // Consume cached result.
env.execute();
23、Split
此功能根据条件将流拆分为两个或多个流。 当获得混合流并且可能希望单独处理每个数据流时,可以使用此方法。新版本使用OutputTag替代。
SplitStream<Integer> split = inputStream.split(new OutputSelector<Integer>() {
@Override
public Iterable<String> select(Integer value) {
List<String> output = new ArrayList<String>();
if (value % 2 == 0) {
output.add("even");
}
else {
output.add("odd");
}
return output;
}
});
OutputTag示例如下
import org.apache.flink.api.common.RuntimeExecutionMode;
import org.apache.flink.api.common.typeinfo.TypeInformation;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.datastream.DataStreamSource;
import org.apache.flink.streaming.api.datastream.SingleOutputStreamOperator;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.api.functions.ProcessFunction;
import org.apache.flink.util.Collector;
import org.apache.flink.util.OutputTag;
/**
* @author alanchan
*
*/
public class TestOutpuTagAndProcessDemo {
public static void main(String[] args) throws Exception {
// env
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
env.setRuntimeMode(RuntimeExecutionMode.AUTOMATIC);
// Source
DataStreamSource<String> ds = env.fromElements("alanchanchn is my vx", "i like flink", "alanchanchn is my name", "i like kafka too", "alanchanchn is my true vx");
// transformation
// 对流中的数据按照alanchanchn拆分并选择
OutputTag<String> nameTag = new OutputTag<>("alanchanchn", TypeInformation.of(String.class));
OutputTag<String> frameworkTag = new OutputTag<>("framework", TypeInformation.of(String.class));
// public abstract class ProcessFunction<I, O> extends AbstractRichFunction {
//
// private static final long serialVersionUID = 1L;
//
// public abstract void processElement(I value, Context ctx, Collector<O> out) throws Exception;
//
// public void onTimer(long timestamp, OnTimerContext ctx, Collector<O> out) throws Exception {}
//
// public abstract class Context {
//
// public abstract Long timestamp();
//
// public abstract TimerService timerService();
//
// public abstract <X> void output(OutputTag<X> outputTag, X value);
// }
//
// public abstract class OnTimerContext extends Context {
// public abstract TimeDomain timeDomain();
// }
//
// }
SingleOutputStreamOperator<String> result = ds.process(new ProcessFunction<String, String>() {
@Override
public void processElement(String inValue, Context ctx, Collector<String> outValue) throws Exception {
// out收集完的还是放在一起的,ctx可以将数据放到不同的OutputTag
if (inValue.startsWith("alanchanchn")) {
ctx.output(nameTag, inValue);
} else {
ctx.output(frameworkTag, inValue);
}
}
});
DataStream<String> nameResult = result.getSideOutput(nameTag);
DataStream<String> frameworkResult = result.getSideOutput(frameworkTag);
// .sink
System.out.println(nameTag);// OutputTag(Integer, 奇数)
System.out.println(frameworkTag);// OutputTag(Integer, 偶数)
nameResult.print("name->");
frameworkResult.print("framework->");
// OutputTag(String, alanchanchn)
// OutputTag(String, framework)
// framework->> alanchanchn is my vx
// name->> alanchanchn is my name
// framework->> i like flink
// name->> alanchanchn is my true vx
// framework->> i like kafka too
// execute
env.execute();
}
}
24、Select
此功能允许您从拆分流中选择特定流。新版本使用OutputTag替代。
SplitStream<Integer> split;
DataStream<Integer> even = split.select("even");
DataStream<Integer> odd = split.select("odd");
DataStream<Integer> all = split.select("even","odd");
参考上文中spilt中的outputtag示例。
25、Project
Project 函数允许从事件流中选择属性子集,并仅将所选元素发送到下一个处理流。
DataStream<Tuple4<Integer, Double, String, String>> in = // [...]
DataStream<Tuple2<String, String>> out = in.project(3,2);
上述函数从给定记录中选择属性号 2 和 3。 以下是示例输入和输出记录:
(1,10.0,A,B)=> (B,A)
(2,20.0,C,D)=> (D,C)
- 完整示例
import java.util.Arrays;
import org.apache.flink.api.java.tuple.Tuple3;
import org.apache.flink.api.java.tuple.Tuple5;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
/**
* @author alanchan
*
*/
public class TestprojectDemo {
/**
* @param args
* @throws Exception
*/
public static void main(String[] args) throws Exception {
StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
DataStream<Tuple5<Integer, String, Integer, String,Double>> in = env.fromCollection(Arrays.asList(
Tuple5.of(1, "alan", 17, "alan.chan.chn@163.com", 20d),
Tuple5.of(2, "alanchan", 18, "alan.chan.chn@163.com", 25d),
Tuple5.of(3, "alanchanchn", 19, "alan.chan.chn@163.com", 30d),
Tuple5.of(4, "alan_chan", 18, "alan.chan.chn@163.com", 25d),
Tuple5.of(5, "alan_chan_chn", 20, "alan.chan.chn@163.com", 30d)
));
DataStream<Tuple3<String, Integer,Double>> out = in.project(1, 2,4);
out.print();
// 8> (alan,17,20.0)
// 11> (alan_chan,18,25.0)
// 12> (alan_chan_chn,20,30.0)
// 10> (alanchanchn,19,30.0)
// 9> (alanchan,18,25.0)
env.execute();
}
}
以上,本文主要介绍Flink 的11种常用的operator(union、window join、connect、outputtag、cache、iterator、project)及以具体可运行示例进行说明. 如果需要了解更多内容,可以在本人Flink 专栏中了解更新系统的内容。
本专题分为五篇,即: 【flink番外篇】1、flink的23种常用算子介绍及详细示例(1)- map、flatmap和filter 【flink番外篇】1、flink的23种常用算子介绍及详细示例(2)- keyby、reduce和Aggregations 【flink番外篇】1、flink的23种常用算子介绍及详细示例(3)-window、distinct、join等 【flink番外篇】1、flink的23种常用算子介绍及详细示例(4)- union、window join、connect、outputtag、cache、iterator、project 【flink番外篇】1、flink的23种常用算子介绍及详细示例(完整版)