source 单并行度参考 https://blog.csdn.net/xu470438000/article/details/83271123
结论:
###窗口触发条件
1、watermark时间 >= window_end_time
2、在[window_start_time,window_end_time)中有数据存在###对于设置了allowedLateness(Time.seconds(N))的
1. 第一次触发是在watermark >=window_end_time时
2. 第二次(或多次)触发的条件是watermark < window_end_time + allowedLateness时间内,这个窗口有late数据到达时。
多并行度的情况下,watermark对齐会取所有channel最小的watermark
这里把数据贴出来,省的小伙伴照着敲,并行度为1的watermark上述博客中有,不再赘述
data 并行度为1下的watermark 多并行度下watermark(并行度=3)
0001,1538359882000 1538359872000
0001,1538359886000 1538359876000
0001,1538359892000 1538359882000 1538359872000
0001,1538359893000 1538359883000 1538359876000
0001,1538359894000 (并行度为1下第一次触发窗口计算的时间点) 1538359884000 1538359882000
0001,1538359896000 1538359883000
0001,1538359897000 (3并行度下第一次触发窗口计算的时间点) 1538359884000
0001,1538359899000
0001,1538359891000
0001,1538359903000
##测试延迟的数据
0001,1538359890000
0001,1538359903000
0001,1538359890000
0001,1538359891000
0001,1538359892000
0001,1538359904000
0001,1538359890000
0001,1538359891000
0001,1538359892000
0001,1538359905000
0001,1538359890000
0001,1538359891000
0001,1538359892000
代码,基本和单source源的一致,改了source源和watermark生成部分
public class StreamingWindowWatermark {public static void main(String[] args) throws Exception {StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();FlinkKafkaConsumer010<String> kafkaConsumer = new FlinkKafkaConsumer010<>("zjf_topic_003", new SimpleStringSchema(), getKafkaProperties());SingleOutputStreamOperator<String> text = env.addSource(kafkaConsumer).uid("gateway-source").setParallelism(1);text.setParallelism(3);//设置使用eventtime,默认是使用processtimeenv.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);//设置并行度为1,默认并行度是当前机器的cpu数量env.setParallelism(3);//连接socket获取输入的数据
// DataStream<String> text = env.socketTextStream("172.31.120.110", 8999, "\n");//数据分割 数据+时间戳SingleOutputStreamOperator<Tuple2<String, Long>> input = text.map(new MapFunction<String, Tuple2<String, Long>>() {@Overridepublic Tuple2<String, Long> map(String value) throws Exception {String[] arr = value.split(",");return new Tuple2<>(arr[0], Long.parseLong(arr[1]));}});//生成watermarkSingleOutputStreamOperator<Tuple2<String, Long>> watermarks = input.assignTimestampsAndWatermarks(WatermarkStrategy.<Tuple2<String, Long>>forBoundedOutOfOrderness(Duration.ofSeconds(10)).withTimestampAssigner(new SerializableTimestampAssigner<Tuple2<String, Long>>() {@Overridepublic long extractTimestamp(Tuple2<String, Long> element, long recordTimestamp) {return element.f1; //指定EventTime对应的字段}}));SingleOutputStreamOperator<String> window = watermarks.keyBy(0).window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样
// .allowedLateness(Time.seconds(2)).apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {/*** 对window内的数据进行排序,保证数据的顺序* @param tuple* @param window* @param input* @param out* @throws Exception*/@Overridepublic void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {String key = tuple.toString();List<Long> arrarList = new ArrayList<Long>();Iterator<Tuple2<String, Long>> it = input.iterator();while (it.hasNext()) {Tuple2<String, Long> next = it.next();arrarList.add(next.f1);}Collections.sort(arrarList);SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");String result = key + "---------" + arrarList.size() + "-----------" + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))+ "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());out.collect(result);}});window.print();env.execute("watermark demo");}private static Properties getKafkaProperties() {Properties properties = new Properties();//2020-01-09 切换到vpc环境
// properties.setProperty("bootstrap.servers", "172.21.164.59:9092,172.21.147.215:9092,172.21.243.86:9092");properties.setProperty("bootstrap.servers", "172.31.117.101:9092");properties.setProperty("group.id", "flink_01");return properties;}
}
pom文件:
<dependencies><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-jdbc_2.11</artifactId><version>1.11.2</version></dependency><dependency><groupId>org.apache.commons</groupId><artifactId>commons-dbcp2</artifactId><version>2.1.1</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-format-changelog-json</artifactId><version>1.0.0</version></dependency><dependency><groupId>com.alibaba.ververica</groupId><artifactId>flink-connector-mysql-cdc</artifactId><version>1.0.0</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-cep_${scala.version}</artifactId><version>${flink.version}</version></dependency><!--布隆过滤器的包--><dependency><groupId>com.google.guava</groupId><artifactId>guava</artifactId><version>22.0</version></dependency><!--导入scala的依赖--><dependency><groupId>org.apache.flink</groupId><artifactId>flink-scala_${scala.version}</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-clients_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.scala-lang</groupId><artifactId>scala-library</artifactId><version>2.11.8</version></dependency><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api-scala_2.11</artifactId><version>11.0</version></dependency><!-- log4j-core --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-core</artifactId><version>2.8.2</version></dependency><!-- log4j-api --><dependency><groupId>org.apache.logging.log4j</groupId><artifactId>log4j-api</artifactId><version>2.8.2</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-java-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-api-scala-bridge_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-common</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-table-planner-blink_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-kafka-0.11_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-connector-elasticsearch7_2.11</artifactId><version>${flink.version}</version></dependency><dependency><groupId>org.projectlombok</groupId><artifactId>lombok</artifactId><version>1.18.10</version></dependency><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka-clients</artifactId><version>2.3.0</version></dependency><!-- https://mvnrepository.com/artifact/org.apache.kafka/kafka --><dependency><groupId>org.apache.kafka</groupId><artifactId>kafka_2.12</artifactId><version>2.3.0</version></dependency><!-- 导入加载配置文件的依赖--><dependency><groupId>com.typesafe</groupId><artifactId>config</artifactId><version>1.2.1</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>fastjson</artifactId><version>1.2.47</version></dependency><dependency><groupId>mysql</groupId><artifactId>mysql-connector-java</artifactId><version>5.1.38</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-api</artifactId><version>1.7.25</version></dependency><dependency><groupId>org.slf4j</groupId><artifactId>slf4j-simple</artifactId><version>1.7.25</version></dependency><dependency><groupId>com.alibaba</groupId><artifactId>easyexcel</artifactId><version>2.2.6</version></dependency><dependency><groupId>junit</groupId><artifactId>junit</artifactId><version>RELEASE</version></dependency><dependency><groupId>ru.ivi.opensource</groupId><artifactId>flink-clickhouse-sink</artifactId><version>1.1.0</version><exclusions><exclusion><artifactId>flink-java</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-core</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-hadoop-fs</artifactId><groupId>org.apache.flink</groupId></exclusion><exclusion><artifactId>flink-streaming-java_2.11</artifactId><groupId>org.apache.flink</groupId></exclusion></exclusions></dependency><dependency><groupId>org.apache.flink</groupId><artifactId>flink-runtime-web_2.11</artifactId><version>${flink.version}</version></dependency></dependencies>