事件时间 -生成watermarks
- 生成Watermarks
-
- 介绍Watermark策略
- 使用Watermark策略
- 处理空置的数据源
- 编写WatermarkGenerators
-
- 编写Periodic WatermarkGenerator
- 编写一个Punctuated WatermarkGenerator
- Watermark策略和Kafka连接器
- 算子怎样处理watermarks
- 过期的AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks
生成Watermarks
本文会让你学习关于Flink提供与事件时间戳和watermarks相关API.相了解事件时间,处理时间,和摄取时间,请查看事件时间介绍.
介绍Watermark策略
为了使用事件时间,Flink需要知道 事件时间戳,意味着每条流数据需要分配事件时间戳.通常通过从记录字段访问或提取时间戳,使用TimestampAssigner.
分配时间与生成watermarks紧密相连, 它也告诉系统处理事件时间到哪里了.可以通过指定WatermarkGenerator
来配置.
Flink要求一个WatermarkStrategy实例包含TimestampAssigner 和WatermarkGenerator.在WatermarkStrategy上的静态方法上有很多开箱即用的策略方法,当开发者有需要时可以创建自己的策略.
为了讨论,下边是一个完整接口:
public interface WatermarkStrategy<T> extends TimestampAssignerSupplier<T>,WatermarkGeneratorSupplier<T>{
/*** Instantiates a {@link TimestampAssigner} for assigning timestamps according to this* strategy.*/@OverrideTimestampAssigner<T> createTimestampAssigner(TimestampAssignerSupplier.Context context);/*** Instantiates a WatermarkGenerator that generates watermarks according to this strategy.*/@OverrideWatermarkGenerator<T> createWatermarkGenerator(WatermarkGeneratorSupplier.Context context);
}
正如上边提到的,你通常不需要自己实现接口.而是使用WatermarkStrategy上的静态帮助方法来对通用的,或者提供自定义的TimestampAssigner 和WatermarkGenerator.例如,使用有限无序matermarks和lambda函数来作为你使用的时间分配器.
WatermarkStrategy.forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20)).withTimestampAssigner(new SerializableTimestampAssigner[(Long, String)] {
override def extractTimestamp(element: (Long, String), recordTimestamp: Long): Long = element._1})
指定TimestampAssigner 是一个可选操作,在大多数时候你实际不想要指定.例如,当使用kafka或kinesis你会直接从它们的记录中获得时间戳.
过后可以通过手写WatermarkGenerators来接口
注意:实时戳和watermarks规定毫秒,从Java的1970-01-01T00:00:00Z开始算
使用Watermark策略
在Flink应用中有两个地方使用WatermarkStrategy :1)直接在数据源上 2)非数据源操作后.
第1个选项是最好 的,因为在watermarking逻辑中它允许数据源利用计算机系统存储关于shards/partitions/splits信息. 数据源通常在细粒度上跟踪watermark,并且所有被数据源生成的watermark更精准. 在数据源上直接指定WatermarkStrategy, 通常意味站你不得不指定接口或指向Watermark Strategies and the Kafka Connector ,在Kafka Connector上运行,和更多关于watermark在每个分区的怎样运算.
第2个选项(在任何操作后设置WatermarkStrategy )应该仅仅在你不能直接在source设置时使用.
val env = StreamExecutionEnvironment.getExecutionEnvironmentval stream: DataStream[MyEvent] = env.readFile(myFormat, myFilePath, FileProcessingMode.PROCESS_CONTINUOUSLY, 100,FilePathFilter.createDefaultFilter())val withTimestampsAndWatermarks: DataStream[MyEvent] = stream.filter( _.severity == WARNING ).assignTimestampsAndWatermarks(<watermark strategy>)withTimestampsAndWatermarks.keyBy( _.getGroup ).window(TumblingEventTimeWindows.of(Time.seconds(10))).reduce( (a, b) => a.add(b) ).addSink(...)
使用WatermarkStrategy 这种方式需要用一个stream再生成一个新的stream是带时间戳和watermarks.如果原始stream中已经有时间戳或watermarks了,timestamp assigner会覆盖它们.
处理空置的数据源
如果一个输入的split/partitions/shards没有带事件有一会儿,就意味着WatermarkGenerator 也没有 获得任何新消息来为watermark.t我们调用空置的输入或空source.这是个问题,因为它会发生一些分区仍然在传输日志.在那样例子中,watermark会被阻止,因此它计算在所有并行不同的watermark最小的.
为处理这样问题,可以使用WatermarkStrategy 来检测空置并且标记输入为空置. WatermarkStrategy 提供一个便捷的方法.
WatermarkStrategy.forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20)).withIdleness(Duration.ofMinutes(1))
编写WatermarkGenerators
TimestampAssigner是一个简单的函数,从事件中提取字段,我们因此不需要了解里边细节. 另一方面, WatermarkGenerator来写有点复杂,我们考虑怎么做下2个部分.这里WatermarkGenerator 接口.
/*** The {@code WatermarkGenerator} generates watermarks either based on events or* periodically (in a fixed interval).** <p><b>Note:</b> This WatermarkGenerator subsumes the previous distinction between the* {@code AssignerWithPunctuatedWatermarks} and the {@code AssignerWithPeriodicWatermarks}.*/
@Public
public interface WatermarkGenerator<T> {
/*** Called for every event, allows the watermark generator to examine * and remember the event timestamps, or to emit a watermark based on* the event itself.*/void onEvent(T event, long eventTimestamp, WatermarkOutput output);/*** Called periodically, and might emit a new watermark, or not.** <p>The interval in which this method is called and Watermarks * are generated depends on {@link ExecutionConfig#getAutoWatermarkInterval()}.*/void onPeriodicEmit(WatermarkOutput output);
}
有两种方式来生成watermark: periodic 和 punctuated.
- 周期生成(periodic generato)遵守刚到的事件是通过onEvent(),并且当框架调用onPeriodicEmit()之后发送一个watermark.
- puncutated 生成将要考虑onEvent()中的事件,并且在流中等指定的marker events 或punctuations带着watermark信息.当发现这些事项时会立刻发射watermark. 通常,punctuated 生成器从不*onPeriodicEmit()*中发送watermark.
我们下边看看每种方式怎样实现
编写Periodic WatermarkGenerator
periodic 生成器监视流事件和周期生成的watermark(可能依赖流中元素或纯粹是基于处理时间).
生成watermark 间隔时间(毫秒)定义是通过ExecutionConfig.setAutoWatermarkInterval(…).生成器中 onPeriodicEmit() 每次都会被调用,并且如果返回的watermark不是空的并且比上个大,新的watermark 会被发送.
现在我们展示两个watermark生成的简单例子,使用周期生成watermark. 注意,Flink携带BoundedOutOfOrdernessWatermarks, 是一个WatermarkGenerator,工作和BoundedOutOfOrdernessGenerator 相似,如下图.了解更多可以读这里.
/*** This generator generates watermarks assuming that elements arrive out of order,* but only to a certain degree. The latest elements for a certain timestamp t will arrive* at most n milliseconds after the earliest elements for timestamp t.*/
class BoundedOutOfOrdernessGenerator extends WatermarkGenerator[MyEvent] {
val maxOutOfOrderness = 3500L // 3.5 secondsvar currentMaxTimestamp: Long = _override def onEvent(element: MyEvent, eventTimestamp: Long, output: WatermarkOutput): Unit = {
currentMaxTimestamp = max(eventTimestamp, currentMaxTimestamp)}override def onPeriodicEmit(output: WatermarkOutput): Unit = {
// emit the watermark as current highest timestamp minus the out-of-orderness boundoutput.emitWatermark(new Watermark(currentMaxTimestamp - maxOutOfOrderness - 1));}
}/*** This generator generates watermarks that are lagging behind processing * time by a fixed amount. It assumes that elements arrive in Flink after * a bounded delay.*/
class TimeLagWatermarkGenerator extends WatermarkGenerator[MyEvent] {
val maxTimeLag = 5000L // 5 secondsoverride def onEvent(element: MyEvent, eventTimestamp: Long, output: WatermarkOutput): Unit = {
// don't need to do anything because we work on processing time}override def onPeriodicEmit(output: WatermarkOutput): Unit = {
output.emitWatermark(new Watermark(System.currentTimeMillis() - maxTimeLag));}
}
编写一个Punctuated WatermarkGenerator
punctuated watermark生成器会监控流中事件并且发送一个watermark ,无效什么时候,它确保一个指定的元素带着watermark信息.
下例中你可以实现punctuated 生成器, 发送watermark ,无效什么时候事件表明它携带一个确定的marker.
class PunctuatedAssigner extends WatermarkGenerator[MyEvent] {
override def onEvent(element: MyEvent, eventTimestamp: Long): Unit = {
if (event.hasWatermarkMarker()) {
output.emitWatermark(new Watermark(event.getWatermarkTimestamp()))}}override def onPeriodicEmit(): Unit = {
// don't need to do anything because we emit in reaction to events above}
}
注意:在每个单独的事件生成watermark是可能的.然而每个watermark会引起下游的计算,过多的water会降低性能.
Watermark策略和Kafka连接器
当使用kafka作为数据源时,每个kafka的partition可能 有一个简单事件时间模式(时间戳升序或无限无序流).既然这样,当从kafka中消费数据,多个partition通常并行消费,交错从partition中取数据,并且破坏了每个分区模式(这里固有的kafka消费client工作方式).
在那个例子,你可以使用flink kafka分区发现(Kafka-partition-aware)来生成watermark. 用那个特性,watermark在内部生成,kafka consumer,每个Kafka分区,并且每个watermark分区被合并以相同的方式,在流shuffle合并watermark.
例如,如果事件时间是在每个kafka分区上严格递增,生成每个分区的watermark和一个升序的watermark生成器导致一个完美完整的watermark.注意,我们没有在例子中提供TimestampAssigner ,而是kafka 记录他们自己的时间戳.
下边展示了怎样使用kafka每个分区 watermark生成,并且在那个例子中watermark怎样通过数据流传播的.
val kafkaSource = new FlinkKafkaConsumer[MyType]("myTopic", schema, props)
kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness(Duration.ofSeconds(20)))val stream: DataStream[MyType] = env.addSource(kafkaSource)
算子怎样处理watermarks
作为一般规则,operators 需要完整的处理在watermark转发到下流之前. 例如,WindowOperator会首先评估所有windows应该被触发,仅仅生成所有输出被触发,watermark它自己才会被发送到下游. 换言之,所有产生的元素,在watermark被发射之前.
相同的规则应用于TwoInputStreamOperator. 既然这样,在这个例子中,当前算子定义的watermark,是两个输入中最小的那个.
行为的细节被定义通过实现 OneInputStreamOperator#processWatermark, TwoInputStreamOperator#processWatermark1 和TwoInputStreamOperator#processWatermark2 方法.
过期的AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks
在介绍当前抽象 WatermarkStrategy, TimestampAssigner, 和WatermarkGenerator之前,Flink使用AssignerWithPeriodicWatermarks and AssignerWithPunctuatedWatermarks.你可能仍然看到他们的API,但是推荐使用新接口,因此 它们提供更清晰的关注点分离,并且也统一 periodic 和punctuated生成watermark的方式