flink之timeWindow与watermark详解
  TEZNKK3IfmPf 2023年11月12日 19 0

话不多说直接上代码

object windowtest1 {
  def main(args: Array[String]): Unit = {
    val env: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    env.setParallelism(1) //必加!!!!!因为不加的话 每个分区都有自己watermark 需要大量的数据,可以自行尝试
    env.getConfig.setAutoWatermarkInterval(200L) //可不加
    val ds: DataStream[String] = env.socketTextStream("hadoop102",8888)
    val min: DataStream[SensorReading] = ds.map(data => {
      val dataArray = data.split(",")
      SensorReading(dataArray(0).trim, dataArray(1).trim.toLong, dataArray(2).trim.toDouble)
    })
      .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) {
        override def extractTimestamp(element: SensorReading) = {
          element.timestamp * 1000
        }
      })
      .keyBy(_.id)
      .timeWindow(Time.seconds(5))
      .minBy("temperature")
    ds.print("data")
    min.print("min")
    env.execute("window test1")

  }
}
case class SensorReading(id: String, timestamp: Long, temperature: Double)

flink之timeWindow与watermark详解

注意上面参数

1、timeWindow(Time.seconds(5))开窗长度5s

2、new BoundedOutOfOrdernessTimestampExtractor[SensorReading](Time.seconds(3)) 窗口乱序最大3s

3、这个是滚动开窗,根据源码可知窗口开始时间为1547718120

测试数据

sensor_1, 1547718120,20
sensor_1, 1547718121,19
sensor_1, 1547718122,18
sensor_1, 1547718123,17
sensor_1, 1547718124,16
sensor_1, 1547718125,15
sensor_1, 1547718126,14
sensor_1, 1547718127,13
sensor_1, 1547718128,12
sensor_1, 1547718129,11
sensor_1, 1547718130,10
sensor_1, 1547718131,9
sensor_1, 1547718132,8
sensor_1, 1547718133,7
sensor_1, 1547718134,6

最后数据结果

flink之timeWindow与watermark详解

注意:所有时间戳是以s为单位,但是系统是以ms为单位

为了方便解释,后续所有时间戳以后两位代替说明

说明1、黄色区域为窗口划分情况[20-24),[25-29),前闭后开行

2、红色是闭窗时输出的内容

解析

问题1、为什么会在28s的时候输出最小值?

在输入时间戳28到的时候,此时watermark=28-3(延时时间)=25,此时要关闭25的窗口即[20-25),然后会输出该窗口的最小值

就是温度为16的

问题2、滚动时间窗口,窗口中的内容和watermark有什么关系

目前来看这两者没有直接关系,你的数据在哪个窗口,只取决于你的数据中指定的时间戳,时区(一般不考虑),windowsize三个要素

比如我开5s的窗口,延迟时间为1s,第一个窗口是0-4.9999s,watermark=当前最大时间-延迟时间,那么哪些数据会放到第一个窗口呢?应该是0-5.99999s的数据,前提是watermark<5,一旦watermark=5,那么第一个窗口会立马关闭,即使是0-5.999的数据也不会进入窗口

例如以下数据

0   0.1   1    2    3    4    5 5.  5   5.9   6   这批数据除了6都会进入到第一个窗口

0   0.1   1    2    6    3    4    5 5.  5   5.9   这批数据只有在6之前的数据会进入窗口,其余3    4    5 5.  5   5.9 不会进入其他窗口

貌似会进入测输出流,需要allowedLateness()  /sideOutputLateData()参数设置

总给:

watermark=最大时间-延迟时间,只能递增

watermark决定了窗口是否关闭

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月12日 0

暂无评论

推荐阅读
  TEZNKK3IfmPf   2023年11月12日   19   0   0 flink大数据
TEZNKK3IfmPf