flink之延迟数据处理watermark allowedLateness() sideOutputLateData()
  TEZNKK3IfmPf 2023年11月12日 29 0

针对eventtime处理乱序数据,如何保证在需要的窗口内获得指定的数据?

flink采用watermark allowedLateness() sideOutputLateData()三个机制来保证获取数据

先来示例

import org.apache.flink.streaming.api.TimeCharacteristic
import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
import org.apache.flink.streaming.api.scala._
import org.apache.flink.streaming.api.windowing.assigners.SlidingEventTimeWindows
import org.apache.flink.streaming.api.windowing.time.Time

object demo1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setParallelism(1)
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime )
    val inputStream: DataStream[String] = env.socketTextStream("hadoop102",7777)
    val outputTag = new OutputTag[SensorReading]("side")
    val dataStream = inputStream
      .map(data => {
        val dataArray = data.split(",")
        SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
      }).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(2)) {
      override def extractTimestamp(element: SensorReading): Long = {
        element.timestamp*1000 //我的测试时间戳是s,flink要求ms
      }
    })
    val minStream: DataStream[SensorReading] = dataStream.keyBy(_.id)
      //  .window( SlidingEventTimeWindows.of(Time.seconds(10),Time.seconds(2)))
       .timeWindow(Time.seconds(10))
      .allowedLateness(Time.seconds(4))
      .sideOutputLateData(outputTag)
      .minBy("temperature")
    dataStream.print("data")
    minStream.print("min")
    minStream.getSideOutput(outputTag).print("slide")
    env.execute("demo1")
  }
}
case class SensorReading(id: String, timestamp: Long, temperature: Double)

注意参数

1、窗口开窗为10s ,此次采用滚动窗口比较简单点

2、watermark为2s

3、允许延迟为4s

注意事项

1、如果用我的代码进行测试,不要修改测试数据第一条,因为涉及到计算窗口的start

测试数据

sensor_1, 1547718120,20
sensor_1, 1547718130,10
sensor_1, 1547718131,9
sensor_1, 1547718132,8
sensor_1, 1547718120,9
sensor_1, 1547718135,5
sensor_1, 1547718120,9
sensor_1, 1547718136,4
sensor_1, 1547718120,9

打印结果

data> SensorReading(sensor_1,1547718120,20.0)
data> SensorReading(sensor_1,1547718130,10.0)
data> SensorReading(sensor_1,1547718131,9.0)
data> SensorReading(sensor_1,1547718132,8.0)  
min> SensorReading(sensor_1,1547718120,20.0)
data> SensorReading(sensor_1,1547718120,9.0)
min> SensorReading(sensor_1,1547718120,9.0)
data> SensorReading(sensor_1,1547718135,5.0)
data> SensorReading(sensor_1,1547718120,9.0)
min> SensorReading(sensor_1,1547718120,9.0)
data> SensorReading(sensor_1,1547718136,4.0)
data> SensorReading(sensor_1,1547718120,9.0)
slide> SensorReading(sensor_1,1547718120,9.0)

说明

1、经过计算窗口的开始时间是1547718120,所以第一个窗口是【20-30),

2、第一个窗口关闭的时间是20+10+2=32,所以当输入32这条数据的时候【20-30)的窗口关闭,此时窗口内的数据只有20,所以算出温度最小值为20。

3、当第一次输入SensorReading(sensor_1,1547718120,9.0)这条数据的时候,allowlateness起作用,认为这条数据也是延迟数据,对原先算出的最小值20进行修正,最后算出min=9.0

     备注如果不加allowlateness。此时窗口【20-30)已经关闭了,对数据是没有影响的

4、此时需要计算一个最多延迟时间20+10+2+4=36,所以输入35的时候,这条数据,会进入到第二个窗口,同时第一个窗口还没有彻底关闭,所以再次输入 SensorReading(sensor_1,1547718120,9.0),仍然会进入到【20-30)的窗口,并在此计算最小值

5、输入SensorReading(sensor_1,1547718136,4.0),窗口彻底关闭,再次输入 SensorReading(sensor_1,1547718120,9.0),不再对第一个窗口min进行修正,直接把数据放到测输入流,以后所有的【20-30)的数据在输入都会全部放到侧输出流

总结

1、窗口window 的作用是为了周期性的获取数据

2、watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法,

3、allowLateNess,是将窗口关闭时间再延迟一段时间,

   思考?这里的allowLateNess 感觉就好像window变大了,那么为什么不直接把window设置大一点呢?或者把watermark加大点

    业务需要,比如我业务需要统计每个小时内的数据,那么开窗一定是1h,但是数据乱序可能会达到几分钟,一般来说水印设置的都比较小(为什么呢?暂时不知道),所以提出了延迟时间这个概念

2022-09-14 更新

比如来的数据是 1 2 3 4 5  6 4 7 8 9 ....999 1000 1001 窗口假设是10s,水印是2s

设置的过大1000s,那么1-10的窗口要在数据1+1000+10=1011来的时候才会关闭,就违背了流处理的原则:实时。

设置的过小1s,上图中4s数据就会丢失,就会导致数据不能完全接收。

4、sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

上一篇: 已经是第一篇 下一篇: 已经是最后一篇
  1. 分享:
最后一次编辑于 2023年11月12日 0

暂无评论