Flink之状态管理
  GLcPk8bYyq2p 2023年11月19日 35 0

状态

概述

在流处理任务中,数据会以连续的流的形式输入到Flink中,而状态计算允许我们跟踪和处理这些输入数据的状态信息。状态可以是任何需要记录和使用的数据,例如聚合计数、累积结果、窗口中的中间状态等。

Flink中的状态管理是指在流处理任务中对数据的状态进行有效管理和维护的过程。状态管理是非常重要的,因为它允许我们在流式处理中维护和操作数据的状态信息,以实现复杂的计算逻辑和应用需求。

图片来源于网络,如有侵权,联系删除

状态分类

在Flink中,Flink状态有两种:系统状态Managed State和原始状态Raw State。通常使用系统状态,而原始状态则需要自定义实现。

系统状态根据数据集是否按照某一个Key进行分区,将状态分为算子状态Operator State和按键分区状态Keyed State。

1.系统状态

由Flink管理的全局状态,可以在整个应用程序中共享。系统状态与算子或键无关,可以被整个应用程序中的所有算子访问和更新。

2.原始状态

原始状态是一种低级别的状态表示形式,它提供了一种灵活的方式来定义和管理状态。它允许开发人员自定义状态的存储和访问方式,以满足特定的需求。

3.算子状态

用于在算子之间维护中间结果、聚合状态等。它与具体的算子实例绑定,与其他算子实例的状态相互独立。算子状态是分布式的,可以在故障恢复时进行检查点和状态恢复。

一个算子任务会按照并行度分为多个并行子任务执行,而不同的子任务会占据不同的任务槽task slot。由于不同的slot在计算资源上是物理隔离的,所以Flink能管理的状态在并行任务间是无法共享的,每个状态只能针对当前子任务的实例有效。

图片来源于网络,如有侵权,联系删除。 4.按键分区状态

与流的键相关联的状态,用于存储和管理与每个键相关的数据信息。按键分区状态能在Keyed Stream或Keyed ProcessFunction中使用。它会根据键将数据进行分区,保证相同键的数据会被同一个状态管理。

很多有状态的操作,如聚合、窗口都是要先做keyBy进行按键分区,之后任务所进行的所有计算都应该只针对当前key有效,所以状态也应该按照key彼此隔离。

(图片来源于网络,如有侵权,联系删除。)

键控、按键分区状态

概述

按键分区状态Keyed State是任务按照键key来访问和维护的状态。它就是以key为作用范围进行隔离。

注意:

使用按键分区状态必须基于Keyed Stream。没有进行keyBy分区的Data Stream,即使转换算子实现了对应的富函数类,也不能通过运行时上下文访问按键分区状态。

Keyed State在Flink中分为不同类型,具体支持的状态类型如下所示:

ValueState<T>:存储和访问单个值的状态,通常是一个单一的状态值。它可以用于存储中间结果、累加器等

ListState<T>:存储和访问元素列表的状态,通常用于按键分区的列表操作

MapState<UK,UV>:存储和访问键值对的状态,通常用于需要以键-值对形式存储和检索数据的情况

AggregatingState<IN,OUT>:使用用户定义的聚合函数来逐个聚合元素的状态,通常用于对数据进行聚合操作,如计算平均值

ReducingState<T>:使用用户定义的reduce函数来逐个聚合元素的状态,通常用于聚合操作,如求和

值状态 ValueState

值状态(ValueState)是Flink中的一种状态类型,用于存储和访问单个值。它可以用于在状态中保存和维护一个单一的值。

值状态通常用于在状态中存储一些需要随时间更新的值,例如计数器、累加器、最大/最小值等。

接口如下:

// T是泛型,表示状态的数据内容可以是任何具体的数据类型
public interface ValueState<T> extends State {

	// 获取当前状态的值
    T value() throws IOException;

	// 对状态进行更新,传入的参数value就是要覆写的状态值
    void update(T var1) throws IOException;
}

创建一个状态描述器StateDescriptor来提供状态的基本信息,状态描述器构造方法如下

public class ValueStateDescriptor<T> extends StateDescriptor<ValueState<T>, T> {
	// 需要传入状态的名称和类型
    public ValueStateDescriptor(String name, TypeInformation<T> typeInfo) {
        super(name, typeInfo, (Object)null);
    }	 
}

当前输入数据与上一条数据差值比较:

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从socket接收数据流
        SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)
                // 将输入数据转换为Tuple2
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] split = value.split(",");
                        return Tuple2.of(split[0], Integer.valueOf(split[1]));
                    }
                })
                // 指定 watermark策略
                .assignTimestampsAndWatermarks(
                        // 定义Watermark策略
                        WatermarkStrategy
                                .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );


        // keyBy分区
        KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);

        keyByStream.process(new MyKeyedProcessFunction())
                .print();

        env.execute();
    }

    public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
        // 定义状态
        ValueState<Integer> lastState;

        /**
         * 在open方法中,初始化状态
         *
         * @param configuration
         * @throws Exception
         */
        @Override
        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            // 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称和存储类型
            lastState = getRuntimeContext().getState(new ValueStateDescriptor<Integer>("lastState", Types.INT));

        }

        @Override
        public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
            // 取出上一条数据的水位值,注意Integer默认值是null
            int lastValue = lastState.value() == null ? 0 : lastState.value();
            // 求水位线差值的绝对值>5的数据
            Integer currentValue = value.f1;
            if (Math.abs(currentValue - lastValue) > 5) {
                out.collect("窗口:" + value.f0 + " 数据:" + value + " 当前值" + currentValue + " 上一条数据值:" + lastValue + " 差值>5");
            }
            // 更新状态里的水位值
            lastState.update(currentValue);
        }
    }

输入测试数据:

>nc -lk 8086
key1,5
key1,7
key1,13
key1,20
key1,10

控制台输出结果:

窗口:key1 数据:(key1,13) 当前值13 上一条数据值:7 差值>5
窗口:key1 数据:(key1,20) 当前值20 上一条数据值:13 差值>5
窗口:key1 数据:(key1,10) 当前值10 上一条数据值:20 差值>5

列表状态 ListState

列表状态(ListState)是Flink中的一种状态类型,用于存储和访问元素列表。它可以用于在状态中保存和维护一组元素,并对列表中的元素进行添加、删除和访问操作。

列表状态通常用于需要在状态中保存多个元素的场景,例如累积计算、聚合操作或缓冲区管理等。

在ListState<T>接口中同样有一个类型参数T,表示列表中数据的类型。

public interface ListState<T> extends MergingState<T, Iterable<T>> {
    void update(List<T> var1) throws Exception;

    void addAll(List<T> var1) throws Exception;
}

ListState也提供了一系列的方法来操作状态,使用方式与一般的List非常相似。

Iterable<T> get():获取当前的列表状态,返回的是一个可迭代类型Iterable<T>

update(List<T> values):传入一个列表values,直接对状态进行覆盖

add(T value):在状态列表中添加一个元素value

addAll(List<T> values):向列表中添加多个元素,以列表values形式传入

void clear(): 清空List状态 本组数据

ListState的状态描述器就叫作ListStateDescriptor,用法跟ValueStateDescriptor完全一致。

定义一个描述列表状态的描述符。描述符指定状态的名称和类型,状态描述器构造方法如下

public class ListStateDescriptor<T> extends StateDescriptor<ListState<T>, List<T>> {

    public ListStateDescriptor(String name, TypeInformation<T> elementTypeInfo) {
        super(name, new ListTypeInfo(elementTypeInfo), (Object)null);
    }
}    

取流中3个最大值,且排序

public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从socket接收数据流
        SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)
                // 将输入数据转换为Tuple2
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] split = value.split(",");
                        return Tuple2.of(split[0], Integer.valueOf(split[1]));
                    }
                })
                // 指定 watermark策略
                .assignTimestampsAndWatermarks(
                        // 定义Watermark策略
                        WatermarkStrategy
                                .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );


        // keyBy分区
        KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);

        keyByStream.process(new MyKeyedProcessFunction())
                .print();
        env.execute();
    }

    public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
        // 定义状态
        ListState<Integer> listState;

        /**
         * 在open方法中,初始化状态
         *
         * @param configuration
         * @throws Exception
         */
        @Override
        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            // 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称和存储类型
            listState = getRuntimeContext().getListState(new ListStateDescriptor<Integer>("listState", Types.INT));

        }

        @Override
        public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
            // 来一条数据则存list状态里
            listState.add(value.f1);

            // 从list状态拿出来,得到一个Iterable
            Iterable<Integer> iterableList = listState.get();
            // 拷贝到List中
            List<Integer> list = new ArrayList<>();
            for (Integer val : iterableList) {
                list.add(val);
            }
            // 对List进行降序排序
            list.sort((o1, o2) -> o2 - o1);
            // list中的个数是连续变大的,一但超过3个就立即清理
            if (list.size() > 3) {
                // 元素清除,清除第4个
                list.remove(3);
            }

            out.collect("keyBy:" + value.f0 + " 当前数据:" + value + " 最大3个水位值:" + list.toString());

            // 更新list状态
            listState.update(list);
        }
    }
key1,1
key1,5
key1,7
key1,8
key1,9
keyBy:key1 当前数据:(key1,1) 最大3个水位值:[1]
keyBy:key1 当前数据:(key1,5) 最大3个水位值:[5, 1]
keyBy:key1 当前数据:(key1,7) 最大3个水位值:[7, 5, 1]
keyBy:key1 当前数据:(key1,8) 最大3个水位值:[8, 7, 5]
keyBy:key1 当前数据:(key1,9) 最大3个水位值:[9, 8, 7]

Map状态 MapState

Map 状态(MapState)是 Flink 中的一种状态类型,用于存储和访问键值对。它可以用于在状态中保存和维护一组键值对。

Map 状态通常用于需要根据键进行查找和更新的场景,例如缓存、索引、关联操作等。对应的是MapState<UK, UV>接口,有UK、UV两个泛型,分别表示保存的key和value的类型。

MapState提供了操作映射状态的方法,与Map的使用非常类似。另外,MapState也提供了获取整个映射相关信息的方法

public interface MapState<UK, UV> extends State {
	// 传入一个key作为参数,查询对应的value值
    UV get(UK var1) throws Exception;
	// 传入一个键值对,更新key对应的value值
    void put(UK var1, UV var2) throws Exception;
	// 将传入的映射map中所有的键值对,全部添加到映射状态中
    void putAll(Map<UK, UV> var1) throws Exception;
	// 将指定key对应的键值对删除
    void remove(UK var1) throws Exception;
	// 判断是否存在指定的key,返回一个boolean值
    boolean contains(UK var1) throws Exception;
	// 获取映射状态中所有的键值对
    Iterable<Map.Entry<UK, UV>> entries() throws Exception;
	// 获取映射状态中所有的键(key),返回一个可迭代Iterable类型
    Iterable<UK> keys() throws Exception;
	// 获取映射状态中所有的值(value),返回一个可迭代Iterable类型
    Iterable<UV> values() throws Exception;
	// 获取迭代器
    Iterator<Map.Entry<UK, UV>> iterator() throws Exception;
	// 判断映射是否为空,返回一个boolean值
    boolean isEmpty() throws Exception;
}

模拟统计 数字 出现频率计数

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        // 从socket接收数据流
        SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)
                // 将输入数据转换为Tuple2
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] split = value.split(",");
                        return Tuple2.of(split[0], Integer.valueOf(split[1]));
                    }
                })
                // 指定 watermark策略
                .assignTimestampsAndWatermarks(
                        // 定义Watermark策略
                        WatermarkStrategy
                                .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );


        // keyBy分区
        KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);

        keyByStream.process(new MyKeyedProcessFunction())
                .print();
        env.execute();

    }

    public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
        // 定义状态
        MapState<Integer, Integer> mapState;

        /**
         * 在open方法中,初始化状态
         *
         * @param configuration
         * @throws Exception
         */
        @Override
        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            // 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称和存储类型
            mapState = getRuntimeContext().getMapState(new MapStateDescriptor<Integer, Integer>("mapState", Types.INT, Types.INT));
        }

        /**
         * 模拟统计 数字 出现频率计数
         */
        @Override
        public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
            // 判断是否存在对应的key
            Integer number = value.f1;
            if (mapState.contains(number)) {
                // 包含key,直接对value+1
                Integer count = mapState.get(number);
                mapState.put(number, ++count);
            } else {
                // 不包含key,初始化
                mapState.put(number, 1);
            }

            out.collect("keyBy:" + value.f0 + " 数字:" + number + " 出现次数:" + mapState.get(number));
        }
    }
nc -lk 8086
key1,1
key1,1
key1,2
key1,3
key1,2
key1,1
keyBy:key1 数字:1 出现次数:1
keyBy:key1 数字:1 出现次数:2
keyBy:key1 数字:2 出现次数:1
keyBy:key1 数字:3 出现次数:1
keyBy:key1 数字:2 出现次数:2
keyBy:key1 数字:1 出现次数:3

归约状态 ReducingState

归约状态(Reducing State)是 Flink 中一种特殊类型的状态,用于对输入流进行归约操作。归约操作将输入流中的元素逐个进行聚合,生成一个汇总的结果值。不同于普通的 Map、List 或 Value 状态,归约状态可以在接收到新的元素时,对当前的状态值进行相应的归约操作。

归约状态ReducingState类似于值状态,不过需要对添加进来的所有数据进行归约,将归约聚合之后的值作为状态保存下来。

使用接口ReducingState<T>,调用的方法类似于ListState,只不过它保存的只是一个聚合值,调用.add()方法时,不是在状态列表里添加元素,而是直接把新数据和之前的状态进行归约,并用得到的结果更新状态。

// 对Reducing状态,获取结果
OUT get() throws Exception;
// 对Reducing状态,添加数据
void add(IN var1) throws Exception;
// 对Reducing状态,清空数据
void clear();

创建一个状态描述器StateDescriptor来提供状态的基本信息,状态描述器构造方法如下

public class ReducingStateDescriptor<T> extends StateDescriptor<ReducingState<T>, T> {
    private final ReduceFunction<T> reduceFunction;

    public ReducingStateDescriptor(String name, ReduceFunction<T> reduceFunction, TypeInformation<T> typeInfo) {
        super(name, typeInfo, (Object)null);
        this.reduceFunction = (ReduceFunction)Preconditions.checkNotNull(reduceFunction);
    }
}

使用归约状态来计算输入流中的累计和

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        // 从socket接收数据流
        SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)
                // 将输入数据转换为Tuple2
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] split = value.split(",");
                        return Tuple2.of(split[0], Integer.valueOf(split[1]));
                    }
                })
                // 指定 watermark策略
                .assignTimestampsAndWatermarks(
                        // 定义Watermark策略
                        WatermarkStrategy
                                .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );

        // keyBy分区
        KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);
        keyByStream.process(new MyKeyedProcessFunction())
                .print();
        env.execute();
    }

    public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
        // 定义状态
        ReducingState<Integer> reducingState;

        /**
         * 在open方法中,初始化状态
         *
         * @param configuration
         * @throws Exception
         */
        @Override
        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            // 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称、reduceFunction、存储类型
            reducingState = getRuntimeContext().getReducingState(
                    new ReducingStateDescriptor<Integer>("reducingState", new ReduceFunction<Integer>() {
                        @Override
                        public Integer reduce(Integer value1, Integer value2) throws Exception {
                            return value1 + value2;
                        }
                    }, Types.INT)
            );

        }

        @Override
        public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
            // 来一条数据则添加到reducing状态里
            reducingState.add(value.f1);
            // 对本组的Reducing状态,获取结果
            Integer sum = reducingState.get();
            out.collect("keyBy:" + value.f0 + " 当前数据:" + value + " 水位值合计:" + sum);
        }
    }
key1,1
key1,2
key1,3
keyBy:key1 当前数据:(key1,1) 水位值合计:1
keyBy:key1 当前数据:(key1,2) 水位值合计:3
keyBy:key1 当前数据:(key1,3) 水位值合计:6

聚合状态 Aggregating State

聚合状态是Flink 中一种特殊类型的状态,用于对输入流进行聚合操作。聚合操作将输入流中的元素逐个进行聚合,并生成一个汇总的结果值。与归约状态不同,聚合状态可以在接收到新的元素时,根据自定义的聚合逻辑对当前的状态值进行增量聚合。

AggregatingState接口相关方法

// 对Reducing状态,获取结果
OUT get() throws Exception;
// 对Reducing状态,添加数据
void add(IN var1) throws Exception;
// 对Reducing状态,清空数据
void clear();

与归约状态不同的是,它的聚合逻辑是由在描述器中传入一个更加一般化的聚合函数AggregateFunction来定义的 。里面通过一个累加器Accumulator来表示状态,所以聚合的状态类型可以跟添加进来的数据类型完全不同,使用更加灵活。

public class AggregatingStateDescriptor<IN, ACC, OUT> extends StateDescriptor<AggregatingState<IN, OUT>, ACC> {

    private final AggregateFunction<IN, ACC, OUT> aggFunction;

    public AggregatingStateDescriptor(String name, AggregateFunction<IN, ACC, OUT> aggFunction, TypeInformation<ACC> stateType) {
        super(name, stateType, (Object)null);
        this.aggFunction = (AggregateFunction)Preconditions.checkNotNull(aggFunction);
    }
}

模拟统计 数字 出现频率计数

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);

        // 从socket接收数据流
        SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)
                // 将输入数据转换为Tuple2
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] split = value.split(",");
                        return Tuple2.of(split[0], Integer.valueOf(split[1]));
                    }
                })
                // 指定 watermark策略
                .assignTimestampsAndWatermarks(
                        // 定义Watermark策略
                        WatermarkStrategy
                                .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );

        // keyBy分区
        KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);

        keyByStream.process(new MyKeyedProcessFunction())
                .print();
        env.execute();
    }

    /**
     * param1 键的类型
     * param2 输入类型
     * param3 输出元素的类型
     */
    public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {


        // 定义状态
        AggregatingState<Integer, HashMap<Integer, Integer>> aggregatingState;

        /**
         * 在open方法中,初始化状态
         *
         * @param configuration
         * @throws Exception
         */
        @Override
        public void open(Configuration configuration) throws Exception {
            super.open(configuration);
            // 创建一个状态描述器StateDescriptor来提供状态的基本信息 传入状态的名称、AggregateFunction、累加器类型
            aggregatingState = getRuntimeContext().getAggregatingState(
                    new AggregatingStateDescriptor<Integer, HashMap<Integer, Integer>, HashMap<Integer, Integer>>(
                            "aggregatingState",
                            new MyAggregateFunction(),
                            TypeInformation.of(new TypeHint<HashMap<Integer, Integer>>() {
                            }))
            );

        }


        @Override
        public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
            // 将水位值添加到聚合状态中
            aggregatingState.add(value.f1);
            // 从聚合状态中获取结果
            HashMap<Integer, Integer> res = aggregatingState.get();
            out.collect("keyBy:" + value.f0 + " 数字:" + value.f1 + " 出现次数:" + res.get(value.f1));
        }

    }

    /**
     * param1 聚合的值的类型 (输入值)
     * param2 累加器的类型 (中间聚合状态)
     * param3 聚合结果的类型
     */
    public static class MyAggregateFunction implements AggregateFunction<Integer, HashMap<Integer, Integer>, HashMap<Integer, Integer>> {
        // 创建累加器,类型HashMap<Integer, Integer>
        @Override
        public HashMap<Integer, Integer> createAccumulator() {
            HashMap<Integer, Integer> map = new HashMap<>();
            return map;
        }

        @Override
        public HashMap<Integer, Integer> add(Integer value, HashMap<Integer, Integer> accumulator) {
            if (accumulator.containsKey(value)) {
                Integer sum = accumulator.get(value) + 1;
                accumulator.put(value, sum);
            } else {
                accumulator.put(value, 1);
            }
            return accumulator;
        }

        @Override
        public HashMap<Integer, Integer> getResult(HashMap<Integer, Integer> accumulator) {
            return accumulator;
        }

        @Override
        public HashMap<Integer, Integer> merge(HashMap<Integer, Integer> a, HashMap<Integer, Integer> b) {
            return null;
        }
    }
key1,1
key1,2
key1,3
key1,2
key1,3
key1,2
keyBy:key1 数字:1 出现次数:1
keyBy:key1 数字:2 出现次数:1
keyBy:key1 数字:3 出现次数:1
keyBy:key1 数字:2 出现次数:2
keyBy:key1 数字:3 出现次数:2
keyBy:key1 数字:2 出现次数:3

算子状态

概述

算子状态(Operator State)是 Flink 中一种用于保存和管理算子(Operator)状态的机制。算子状态通常用于在算子之间保持一些中间结果,或者用于保存全局信息。

算子状态是一个算子并行实例上定义的状态,作用范围被限定为当前算子任务。算子状态跟数据的key无关,所以不同key的数据只要被分发到同一个并行子任务,就会访问到同一个算子状态。

算子状态一般用在Source或Sink等与外部系统连接的算子上,或者完全没有key定义的场景。

当算子的并行度发生变化时,算子状态也支持在并行的算子任务实例之间做重组分配。根据状态的类型不同,重组分配的方案也会不同。

在Flink中,算子任务可以分为无状态和有状态两种情况。

无状态算子

无状态的算子任务只需要观察每个独立事件,根据当前输入的数据直接转换输出结果。例如:基本转换算子map、filter、flatMap等计算时不依赖其他数据,就都属于无状态的算子。

有状态算子

有状态的算子任务,除当前数据之外,还需要一些其他数据来得到计算结果。其他数据就是所谓的状态。例如:聚合算子、窗口算子都属于有状态的算子。

算子状态有以下几个特点:

算子状态是与算子实例绑定的,每个算子实例都会维护自己的状态。这意味着在并行计算中,每个并行实例都会有独立的状态

算子状态可以是一种类型,也可以是多种类型的组合。常见的算子状态类型包括 ValueState、ListState、MapState 等

算子状态可以在算子实例之间进行快速的备份和恢复,以保证程序的容错性

算子状态可以存储在内存中,也可以通过配置选择将其存储在外部持久化存储系统中,如 RocksDB

算子状态也支持不同的结构类型,主要有三种:ListState、UnionListState和BroadcastState。

列表状态 ListState

在算子状态的上下文中,不会按键分别处理状态,每一个并行子任务上会保留一个列表

当算子并行度进行缩放调整时,算子的列表状态中的所有元素项会被统一收集起来,相当于把多个分区的列表合并成了一个大列表,然后再均匀地分配给所有并行任务。

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 从socket接收数据流
        DataStreamSource<String> source = env.socketTextStream("IP", 8086);

        source.map(new MyMapFunction()).print();
        env.execute();
    }


    // 实现CheckpointedFunction接口
    public static class MyMapFunction implements MapFunction<String, Integer>, CheckpointedFunction {
        // 本地变量
        private Integer count = 0;
        // 定义状态
        private ListState<Integer> state;


        @Override
        public Integer map(String value) throws Exception {
            return ++count;
        }

        /**
         * 本地变量持久化:将本地变量拷贝到算子状态中,开启checkpoint时才会调用
         */
        @Override
        public void snapshotState(FunctionSnapshotContext context) throws Exception {
            System.out.println("snapshotState...");
            // 清空算子状态
            state.clear();
            // 将 本地变量 添加到 算子状态 中
            state.add(count);
        }

        /**
         * 初始化本地变量:程序启动和恢复时, 从状态中把数据添加到本地变量,每个子任务调用一次
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("initializeState...");
            // 从上下文初始化 子状态
            state = context
                    .getOperatorStateStore()
                    .getListState(new ListStateDescriptor<Integer>("state", Types.INT));

            // 从算子状态中把数据拷贝到本地变量
            if (context.isRestored()) {
                for (Integer val : state.get()) {
                    count += val;
                }
            }
        }
    }

初始化本地变量方法与并行度设置有关

initializeState...
initializeState...

输入测试数据

1
2
3
4
1> 1
2> 1
1> 2
2> 2

联合列表状态 UnionListState

它与常规列表状态的区别在于,算子并行度进行缩放调整时对于状态的分配方式不同。

在并行度调整时,常规列表状态是轮询分配状态项,而联合列表状态的算子则会直接广播状态的完整列表。

并行度缩放之后的并行子任务就获取到了联合后完整的大列表,可以自行选择要使用的状态项和要丢弃的状态项。

        /**
         * 初始化本地变量:程序启动和恢复时, 从状态中把数据添加到本地变量,每个子任务调用一次
         */
        @Override
        public void initializeState(FunctionInitializationContext context) throws Exception {
            System.out.println("initializeState...");
            // 从上下文初始化 子状态
            state = context
                    .getOperatorStateStore()
                    .getUnionListState(new ListStateDescriptor<Integer>("union-state", Types.INT));

            // 从算子状态中把数据拷贝到本地变量
            if (context.isRestored()) {
                for (Integer val : state.get()) {
                    count += val;
                }
            }
        }

广播状态 Broadcast State

广播状态是 Flink 中一种特殊的算子状态类型,可用于在流处理任务中将数据广播到所有并行任务中共享和访问。它适用于将少量的全局信息广播到算子的每个实例,以便进行更灵活的计算。

因为广播状态在每个并行子任务上的实例都一样,所以在并行度调整的时候就比较简单,只要复制一份到新的并行任务就可以实现扩展。而对于并行度缩小的情况,可以将多余的并行子任务连同状态直接删除,因为状态都是复制出来的,并不会丢失

广播状态具有以下特点:

广播状态只需要占用少量的内存,因为它通常用于存储比较小的全局数据或配置信息

广播状态在整个任务中共享,使得每个算子实例都可以访问广播状态中的数据,而无需进行网络通信

广播状态在任务开始时被广播并分发到每个算子实例,保持数据的一致性

更改广播状态示例

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(2);

        // 从socket接收数据流
        SingleOutputStreamOperator<Tuple2<String, Integer>> sourceMap = env.socketTextStream("IP", 8086)
                // 将输入数据转换为Tuple2
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] split = value.split(",");
                        return Tuple2.of(split[0], Integer.valueOf(split[1]));
                    }
                });


        // 广播配置流
        DataStreamSource<String> dataStreamSource = env.socketTextStream("IP", 8087);
        // 使用给定的名称和给定的类型信息新建一个MapStateDescriptor
        MapStateDescriptor<String, Integer> broadcastMapState = new MapStateDescriptor<>("broadcast-state", Types.STRING, Types.INT);
        // 得到广播流
        BroadcastStream<String> broadcastStream = dataStreamSource.broadcast(broadcastMapState);

        // 数据流和广播配置流进行关联
        BroadcastConnectedStream<Tuple2<String, Integer>, String> broadcastConnectedStream = sourceMap.connect(broadcastStream);

        // 调用 process
        broadcastConnectedStream.process(
                        new BroadcastProcessFunction<Tuple2<String, Integer>, String, String>() {
                            /**
                             * 数据流的处理
                             * 数据流只能读取广播状态,不能修改
                             * @param value 非广播侧的输入类型
                             * @param ctx 广播端的输入类型
                             * @param out 运算符的输出类型
                             */
                            @Override
                            public void processElement(Tuple2<String, Integer> value, ReadOnlyContext ctx, Collector<String> out) throws Exception {
                                // 通过上下文获取广播状态,取出值
                                ReadOnlyBroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
                                Integer configValue = broadcastState.get("myConfig");
                                // 注意:刚启动时,可能是数据流的第一条数据先来
                                configValue = (configValue == null ? 0 : configValue);
                                if (value.f1 > configValue) {
                                    out.collect("输入数字:" + value.f1 + " > 广播状态值:" + configValue);
                                } else {
                                    out.collect("输入数字:" + value.f1 + " <= 广播状态值:" + configValue);
                                }

                            }

                            /**
                             * 广播后配置流的处理
                             * 只有广播流才能修改广播状态

                             */
                            @Override
                            public void processBroadcastElement(String value, Context ctx, Collector<String> out) throws Exception {
                                // 通过上下文获取广播状态,往里面写数据
                                BroadcastState<String, Integer> broadcastState = ctx.getBroadcastState(broadcastMapState);
                                broadcastState.put("myConfig", Integer.valueOf(value));
                            }
                        }
                )
                .print();
        env.execute();
    }

输入测试数据

nc -lk 8086
key1,1
key1,2
key1,3

输出:

1> 输入数字:1 > 广播状态值:0
2> 输入数字:2 > 广播状态值:0
1> 输入数字:3 > 广播状态值:0

更改广播状态

nc -lk 8087
5

输入测试数据

nc -lk 8086
key1,6
key1,8

输出:

2> 输入数字:6 > 广播状态值:5
1> 输入数字:8 > 广播状态值:5

状态有效期 (TTL)

概述

状态效期、生存时间(State TTL,Time-to-Live)是 Flink 中的一个功能,用于为状态设置过期时间。通过设置状态生存时间,可以自动清理过期的状态数据,避免无限增长的状态。

任何类型的keyed state都可以有 有效期 (TTL)。如果配置了TTL且状态值已过期,则会尽最大可能清除对应的值

所有状态类型都支持单元素的TTL。 这意味着列表元素和映射元素将独立到期。

StateTtlConfig 配置对象

配置状态的TTL时,需要创建一个StateTtlConfig配置对象,然后调用状态描述器的enableTimeToLive()方法启动TTL功能。

创建一个StateTtlConfig配置对象

        StateTtlConfig stateTtlConfig = StateTtlConfig
                .newBuilder(Time.seconds(10))
                .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite)
                .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired)
                .build();

启动TTL功能

        ValueStateDescriptor<String> valueStateDescriptor = new ValueStateDescriptor<>("MyState", String.class);
        valueStateDescriptor.enableTimeToLive(stateTtlConfig);

参数说明

newBuilder()

状态TTL配置的构造器方法,必须调用,返回一个Builder之后再调用.build()方法就可以得到StateTtlConfig。方法需要传入一个Time作为参数,这就是设定的状态生存时间。

在这里插入图片描述

setUpdateType()

设置更新类型。更新类型指定了什么时候更新状态失效时间

在这里插入图片描述

Disabled:TTL 已禁用。这意味着状态不会过期,它将一直保持有效,直到显式删除或状态存储由于其他原因而被清理

OnCreateAndWrite:表示只有创建状态和更改状态(写操作)时更新失效时间。配置默认为OnCreateAndWrite

OnReadAndWrite:表示无论读写操作都会更新失效时间,也就是只要对状态进行了访问,就表明它是活跃的,从而延长生存时间

setStateVisibility()

设置状态的可见性。状态可见性是指因为清除操作并不是实时的,当状态过期之后还可能继续存在,如果对它进行访问,能否正常读取到是一个问题

在这里插入图片描述

NeverReturnExpired:默认行为,表示从不返回过期值,也就是只要过期就认为它已经被清除,不能继续读取
	
ReturnExpireDefNotCleanedUp:如果过期状态还存在,就返回它的值

清理

过期数据的清理

默认情况下,过期数据会在读取的时候被删除,同时会有后台线程定期清理(StateBackend支持)。可以通过StateTtlConfig配置关闭后台清理:

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .disableCleanupInBackground()
    .build();

全量快照时进行清理

可以启用全量快照时进行清理的策略,这可以减少整个快照的大小。当前实现中不会清理本地的状态,但从上次快照恢复时,不会恢复那些已经删除的过期数据。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupFullSnapshot()
    .build();

增量数据清理

在状态访问或处理时进行,会在存储后端保留一个所有状态的惰性全局迭代器。 每次触发增量清理时,从迭代器中选择已经过期的数进行清理。

 StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    /**
     * @cleanupSize 每次清理时检查状态的条目数,在每个状态访问时触发
     * @runCleanupForEveryRecord  表示是否在处理每条记录时触发清理
     */
    .cleanupIncrementally(10, true)
    .build();

在RocksDB压缩时清理

如果使用RocksDBstatebackend,则会启用Flink为RocksDB定制的压缩过滤器。RocksDB会周期性的对数据进行合并压缩从而减少存储空间。Flink提供的RocksDB压缩过滤器会在压缩时过滤掉已经过期的状态数据。

StateTtlConfig ttlConfig = StateTtlConfig
    .newBuilder(Time.seconds(1))
    .cleanupInRocksdbCompactFilter(1000)
    .build();

使用示例

    public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
        env.setParallelism(1);


        // 从socket接收数据流
        SingleOutputStreamOperator<Tuple2<String, Integer>> streamSource = env.socketTextStream("IP", 8086)
                // 将输入数据转换为Tuple2
                .map(new MapFunction<String, Tuple2<String, Integer>>() {
                    @Override
                    public Tuple2<String, Integer> map(String value) throws Exception {
                        String[] split = value.split(",");
                        return Tuple2.of(split[0], Integer.valueOf(split[1]));
                    }
                })
                // 指定 watermark策略
                .assignTimestampsAndWatermarks(
                        // 定义Watermark策略
                        WatermarkStrategy
                                .<Tuple2<String, Integer>>forBoundedOutOfOrderness(Duration.ofSeconds(3))
                                .withTimestampAssigner((value, ts) -> value.f1 * 1000L)
                );

        // keyBy分区
        KeyedStream<Tuple2<String, Integer>, String> keyByStream = streamSource.keyBy(a -> a.f0);
        keyByStream.process(new MyKeyedProcessFunction())
                .print();
        env.execute();
    }

    public static class MyKeyedProcessFunction extends KeyedProcessFunction<String, Tuple2<String, Integer>, String> {
        // 定义状态
        ValueState<Integer> lastState;

        /**
         * 在open方法中,初始化状态
         *
         * @param configuration
         * @throws Exception
         */
        @Override
        public void open(Configuration configuration) throws Exception {
            super.open(configuration);

            // 创建StateTtlConfig
            StateTtlConfig stateTtlConfig = StateTtlConfig
                    .newBuilder(Time.seconds(5)) // 过期时间5s
//                                        .setUpdateType(StateTtlConfig.UpdateType.OnCreateAndWrite) // 状态 创建和写入更新 时更新过期时间
                    .setUpdateType(StateTtlConfig.UpdateType.OnReadAndWrite) // 状态 读取、创建和写入更新 时更新 过期时间
                    .setStateVisibility(StateTtlConfig.StateVisibility.NeverReturnExpired) // 不返回过期的状态值
                    .build();

            // 状态描述器 启用TTL
            ValueStateDescriptor<Integer> stateDescriptor = new ValueStateDescriptor<>("lastState", Types.INT);
            stateDescriptor.enableTimeToLive(stateTtlConfig);

            this.lastState = getRuntimeContext().getState(stateDescriptor);
        }

        @Override
        public void processElement(Tuple2<String, Integer> value, Context ctx, Collector<String> out) throws Exception {
            // 获取状态值
            Integer lastValue = lastState.value();
            out.collect("keyBy:" + value.f0 + " 状态值: " + lastValue);
            // 更新状态
            lastState.update(value.f1);
        }
    }

快速输入测试数据

key1,1
key1,2
key1,4
keyBy:key1 状态值: null
keyBy:key1 状态值: 1
keyBy:key1 状态值: 2

等待超过5秒输入测试数据

key1,6
keyBy:key1 状态值: null

状态后端 State Backend

概述

状态后端是 Flink 中用于管理和持久化状态数据的机制。状态后端负责将算子状态和键控状态Keyed State存储在可靠且可恢复的存储系统中,并提供对状态数据的读取和写入操作。

状态后端主要负责管理本地状态的存储方式和位置

可用状态后端

Flink内置了以下这些开箱即用的state backends :

如果不设置,默认使用HashMapStateBackend。两种状态后端最大的区别,就在于本地状态存放在哪里

HashMapStateBackend: 哈希表状态后端

EmbeddedRocksDBStateBackend:内嵌RocksDB状态后端

1.HashMapStateBackend

在HashMapStateBackend内部,数据以Java对象的形式存储在堆中。Key/value形式的状态和窗口算子会持有一个hash table,其中存储着状态值、触发器。

具有以下特点:

高性能:由于状态存储在内存中,哈希表状态后端提供极快的数据读取和写入性能

低延迟:状态的访问速度非常快,因为无需进行磁盘或网络访问

低容错性:哈希表状态后端不提供持久化能力,即在故障发生时可能会丢失状态数据。适用于开发和调试环境,或对数据一致性要求较低的场景

2.EmbeddedRocksDBStateBackend

将状态数据存储在硬盘上的RocksDB数据库中,RocksDB是一种内嵌的key-value存储介质,可以把数据持久化到本地硬盘。配置开启后,会将处理中的数据全部放入RocksDB数据库中,RocksDB默认存储在TaskManager的本地数据目录里。

RocksDB的状态数据被存储为序列化的字节数组,读写操作需要序列化/反序列化,因此状态的访问性能要差一些。另外,因为做了序列化,key的比较也会按照字节进行,而不是直接调用.hashCode()和.equals()方法。

不同于HashMapStateBackend中的java对象,状态数据被以序列化字节数组的方式存储,需要序列化、反序列化,因此key之间的比较是以字节序的形式进行而不是使用Java的调用.hashCode()和.equals()方法。

执行是异步快照,不会因为保存检查点而阻塞数据的处理,并且还提供了增量式保存检查点的机制,在很多情况下可以大大提升保存效率。

具有以下特点:

持久化和可恢复性:内嵌RocksDB状态后端可将状态数据持久化到磁盘,并在故障发生时能够恢复状态数据

高容量:由于状态存储在磁盘上,内嵌RocksDB状态后端可以处理大规模的状态数据

中等性能:相较于哈希表状态后端,内嵌RocksDB状态后端的读写性能略低。但由于RocksDB是一个高效的键值存储引擎,它仍然提供了相对较好的读写性能

状态后端的配置

默认状态后端是由集群配置文件flink-conf.yaml指定的,配置的键名称为state.backend

默认配置对集群上运行的所有作业都有效,可以通过更改配置值来改变默认的状态后端。还可以在代码中为当前作业单独配置状态后端,这个配置会覆盖掉集群配置文件的默认值。

1.配置默认全局的状态后端

flink-conf.yaml中,可以使用state.backend来配置默认状态后端。

# 默认状态后端,哈希表状态后端
state.backend.type: hashmap

# 内嵌RocksDB状态后端
state.backend.type: rocksdb

# 定义检查点和元数据写入的目录
state.checkpoints.dir: hdfs://node01:8020/flink/checkpoints

2.设置每个Job的状态后端

使用hashmap状态后端

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
HashMapStateBackend hashMapStateBackend = new HashMapStateBackend();
        env.setStateBackend(hashMapStateBackend);

使用rocksdb状态后端

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
EmbeddedRocksDBStateBackend embeddedRocksDBStateBackend = new EmbeddedRocksDBStateBackend();
        env.setStateBackend(embeddedRocksDBStateBackend);

注意:Flink发行版中默认包含了RocksDB(解压的Flink安装包),在IDE中使用rocksdb状态后端,需要为Flink项目添加依赖

<dependency>
    <groupId>org.apache.flink</groupId>
    <artifactId>flink-statebackend-rocksdb</artifactId>
    <version>${flink.version}</version>
</dependency>

3.提交参数指定

flink run-application -t yarn-application-p 2 -Dstate.backend.type=rocksdb -c 全类名 jar包
【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月19日 0

暂无评论

推荐阅读
  KRe60ogUm4le   2024年04月26日   24   0   0 java算法
  KRe60ogUm4le   2024年05月03日   50   0   0 javascala
GLcPk8bYyq2p
最新推荐 更多

2024-05-31