watermark_watermark查看 - CSDN
精华内容
参与话题
  • 文章目录背景定义生成1. 生成时机2. 生成方式更新规则1. 单并行度2. 多并行度传播窗口触发时机分析1. 示例一2. 示例二3. 示例三如何设置最大乱序时间延迟数据处理1. 定义2. 触发条件3. 示例延迟数据重定向1....

    背景

    我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络延迟等原因,导致乱序的产生,特别是使用kafka的话,多个分区的数据无法保证有序。那么此时出现一个问题,一旦出现乱序,如果只根据 eventTime 决定 window 的运行,我们不能明确数据是否全部到位,又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark
    在这里插入图片描述

    图1:数据流入图

    如图中的 record 3 和 record 5 为乱序数据,record 4 为迟到数据,下文会介绍 Flink 是如何处理迟到数据的。

    定义

    watermark是一种特殊的时间戳,也是一种被插入到数据流的特殊的数据结构,用于表示eventTime小于watermark的事件已经全部落入到相应的窗口中,此时可进行窗口操作。
    在这里插入图片描述

    图2:数据流向图

    在这里插入图片描述

    图3:数据落位图

    如图是一个乱序流,窗口大小为5。
    w(5)表示eventTime < 5的所有数据均已落入相应窗口,window_end_time < =5的所有窗口都将进行计算;
    w(10)表示表示eventTime < 10的所有数据均已落入相应窗口,5 < window_end_time < =10的所有窗口都将进行计算。

    生成

    1. 生成时机

    通常,在接收到source的数据后,应该立刻生成watermark;但是,也可以在source后应用简单的map或者filter操作,再生成watermark。

    2. 生成方式

    1. With Periodic Watermarks(常用)

    周期性的生成watermark,周期默认是200ms,可通过env.getConfig().setAutoWatermarkInterval()进行修改。这种watermark生成方式需要实现AssignerWithPeriodicWatermarks接口,代码如下:

    DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
    
                Long currentMaxTimestamp = 0L;
                final Long maxOutOfOrderness = 10000L;// 最大允许的延迟时间是10s
    
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                /**
                 * 定义生成watermark的逻辑
                 * 默认200ms被调用一次
                 */
                @Nullable
                @Override
                public Watermark getCurrentWatermark() {
                    return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                }
    
                //定义如何提取timestamp
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                    long timestamp = element.f1;
                    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                    return timestamp;
                }
            });
    
    1. With Punctuated Watermarks(不常用)

    在满足自定义条件时生成watermark,每一个元素都有机会判断是否生成一个watermark。 如果得到的watermark 不为null并且比之前的大就注入流中。这种watermark生成方式需要实现AssignerWithPunctuatedWatermarks接口,使用方式如下:

    DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks<Tuple2<String, Long>>(){
    
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                    return element.f1;
                }
    
                @Nullable
                @Override
                public Watermark checkAndGetNextWatermark(Tuple2<String, Long> lastElement, long extractedTimestamp) {
                    // 当时间戳为偶数则生成,为奇数不不生成
                    return lastElement.f1 % 2 == 0 ? new Watermark(extractedTimestamp) : null;
                }
            });
    

    更新规则

    1. 单并行度

    watermark单调递增,一直覆盖较小的watermark

    2. 多并行度

    每个分区都会维护和更新自己的watermark。某一时刻的watermark取所有分区中最小的那一个,详情见watermark的传播

    传播

    Tasks 内部有一个 time services,维护 timers ,当接收到 watermark 时触发。例如,一个窗口 operator 为每一个活跃窗口在 time servive 注册一个 timer,当event time大于窗口结束时间时,清除窗口状态。

    当 task 接收到 watermark 后,会执行以下操作:

    1. task 根据 watermark 的时间戳,更新内部的 event_time clock。
    2. time service 区分出所有时间戳小于更新之后的 event_time 的 timers,对超时的 timer,task 执行回调函数触发计算并发射数据。
    3. task 发射 watermark,时间戳为更新之后的 event_time。
      在这里插入图片描述
    图4:watermark传播

    窗口触发时机分析

    下面以一些实验对窗口的触发时机进行分析

    1. 示例一

    public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
    
        private final long maxOutOfOrderness = 3000; // 3.0 seconds
    
        private long currentMaxTimestamp;
    
        @Override
        public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
            long timestamp = element.getCreationTime();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }
    
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            // 以迄今为止收到的最大时间戳来生成 watermark
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
    

    效果解析:
    在这里插入图片描述

    图5:示例一

    图中是一个10s大小的窗口,10000~20000为一个窗口。当 eventTime 为 23000 的数据到来,生成的 watermark 的时间戳为20000,>= window_end_time,会触发窗口计算。

    2. 示例二

    示例二相较于示例一,更换了watermark的计算方式

    public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
    
        private final long maxTimeLag = 3000; // 3 seconds
    
        @Override
        public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
            return element.getCreationTime();
        }
    
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current time minus the maximum time lag
            return new Watermark(System.currentTimeMillis() - maxTimeLag);
        }
    }
    

    效果解析:
    在这里插入图片描述

    图6:示例二

    只是简单的用当前系统时间减去最大延迟时间生成 Watermark ,当 Watermark 为 20000时,>= 窗口的结束时间,会触发10000~20000窗口计算。再当 eventTime 为 19500 的数据到来,它本应该是属于窗口 10000~20000窗口的,但这个窗口已经触发计算了,所以此数据会被丢弃。
    在这里插入图片描述

    图7:示例二分析

    3. 示例三

    public class TumblingEventWindowExample {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
            DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
            DataStream<Tuple2<String, Long>> resultStream = socketStream
                // Time.seconds(3)有序的情况修改为0
                .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {
                    @Override
                    public long extractTimestamp(String element) {
                        long eventTime = Long.parseLong(element.split(" ")[0]);
                        System.out.println(eventTime);
                        return eventTime;
                    }
                })
                .map(new MapFunction<String, Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> map(String value) throws Exception {
                        return Tuple2.of(value.split(" ")[1], 1L);
                    }
                })
                .keyBy(0)
                .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                    @Override
                    public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                        return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                    }
                });
            resultStream.print();
            env.execute();
        }
    }
    

    运行程序之前,在本地启动命令行监听:

    nc -l 9999
    

    有序的情况下,watermark延迟时间为0

    miaowenting@miaowentingdeMacBook-Pro flink$ nc -l 9999
    10000 a
    11000 a
    12000 b
    13000 b
    14000 a
    19888 a
    13000 a
    20000 a  时间戳20000触发第一个窗口计算,实际上19999也会触发,因为左闭右开的原则,20000这个时间戳并不会在第一个窗口计算,第一个窗口是[10000-20000),第二个窗口是[20000-30000),以此类推
    11000 a
    12000 b
    21000 b
    22000 a
    29999 a  第一个窗口触发计算后,后续来的11000,12000这两条数据被抛弃,29999直接触发窗口计算,并且本身也属于第二个窗口,所以也参与计算了。
    

    在这里插入图片描述

    图8:示例三结果一
    无序的情况下,watermark延迟时间为3
    miaowenting@miaowentingdeMacBook-Pro flink$ nc -l 9999
    10000 a
    11000 a
    12000 b
    20000 a  从数据中可以验证,第一个窗口在20000的时候没有触发计算
    21000 a
    22000 b
    23000 a  在23000的时候触发计算,计算内容是第一个窗口[10000-20000),所以20000,21000,22000,23000属于第二个窗口,没有参与计算。
    24000 a
    29000 b
    30000 a
    22000 a
    23000 a
    33000 a  第二个窗口[20000-30000),它是在33000触发计算,并且,迟到的数据22000,23000也被计算在内(如果这个数据在水印33000后到达,则会被抛弃),30000和33000是第三个窗口的数据,没有计算
    

    在这里插入图片描述

    图9:示例三结果二

    在这里插入图片描述

    图10:示例三数据落位图
    由数据落位图可以看出,窗口是左闭右开的,20000和30000这两个数据分别会落到[20000, 30000)和[30000, 40000)这两个窗口;已经触发过的窗口不会被再次触发,即w(30000)不会再次触发窗口[20000, 30000)

    如何设置最大乱序时间

    我们已知的BoundedOutOfOrdernessTimestampExtractor中 watermark的计算公式为currentMaxTimestamp - maxOutOfOrdernessmaxOutOfOrderness通过构造函数传入。如何设置maxOutOfOrderness才会比较合理呢?

    如果maxOutOfOrderness设置的太小,而自身数据发送时由于网络等原因导致乱序或者late太多,那么最终的结果就是会有很多单条的数据在window中被触发,数据的正确性太差,容错性太低。对于严重乱序的数据,需要严格统计数据最大延迟时间,才能保证计算的数据准确。

    如果maxOutOfOrderness延时设置太大,则当大部分时间都已落入所属窗口时,flink迟迟不会进行窗口计算,影响数据的实时性;且由于在最大时间与watermark之间维护了很多未被触发的窗口,会加重Flink作业的负担。

    总结:这个要结合自己的业务以及数据情况去设置。不是对eventTime要求特别严格的数据,尽量不要采用eventTime方式来处理,会有丢数据的风险。

    延迟数据处理

    1. 定义

    所谓延迟数据,即窗口已经因为watermark进行了触发,则在此之后如果还有数据进入窗口,则默认情况下不会对窗口进行再次触发和聚合计算。要想在数据进入已经被触发过的窗口后,还能继续触发窗口计算,则可以使用延迟数据处理机制。

    2. 触发条件

    延迟数据对窗口进行第二次(或多次)触发的条件是 watermark < window_end_time + allowedLateness,只要满足该条件,延迟数据已进入窗口就会触发窗口计算。

    3. 示例

    我们对“窗口触发时机分析”这一章节中的示例三进行修改

    public class TumblingEventWindowExample {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
    //        env.getConfig().setAutoWatermarkInterval(100);
            DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
            DataStream<Tuple2<String, Long>> resultStream = socketStream
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {
                        @Override
                        public long extractTimestamp(String element) {
                            long eventTime = Long.parseLong(element.split(" ")[0]);
                            System.out.println(eventTime);
                            return eventTime;
                        }
                    })
                    .map(new MapFunction<String, Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> map(String value) throws Exception {
                            return Tuple2.of(value.split(" ")[1], 1L);
                        }
                    })
                    .keyBy(0)
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .allowedLateness(Time.seconds(2)) // 允许延迟处理2秒
                    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                            return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                        }
                    });
            resultStream.print();
            env.execute();
        }
    }
    
    djg@djgdeMacBook-Pro bin % nc -l 9999
    10000 a
    24000 a
    11000 a
    12000 a
    25000 a
    11000 a
    

    在这里插入图片描述

    图11:延迟数据处理结果
    当watermark为21000时,触发了[10000, 20000)窗口计算,由于设置了`allowedLateness(Time.seconds(2))`即允许两秒延迟处理,`watermark < window_end_time + lateTime`公式得到满足,因此随后10000和12000进入窗口时,依然能触发窗口计算;

    随后watermark增加到22000,watermark < window_end_time + lateTime不再满足,因此11000再次进入窗口时,窗口不再进行计算

    在这里插入图片描述

    图12:延迟数据处理分析

    延迟数据重定向

    1. 定义

    迟到的元素也以使用侧输出(side output)特性被重定向到另外的一条流中去。

    2. 示例

    流的返回值必须是SingleOutputStreamOperator,其是DataStream的子类。通过getSideOutput方法获取延迟数据。可以将延迟数据重定向到其他流或者进行输出。

    public class TumblingEventWindowExample {
    
        public static void main(String[] args) throws Exception {
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
            env.setParallelism(1);
            DataStream<String> socketStream = env.socketTextStream("localhost", 9999);
            //保存被丢弃的数据
            OutputTag<Tuple2<String, Long>> outputTag = new OutputTag<Tuple2<String, Long>>("late-data"){};
            //注意,由于getSideOutput方法是SingleOutputStreamOperator子类中的特有方法,所以这里的类型,不能使用它的父类dataStream。
            SingleOutputStreamOperator<Tuple2<String, Long>> resultStream = socketStream
                    // Time.seconds(3)有序的情况修改为0
                    .assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor<String>(Time.seconds(3)) {
                        @Override
                        public long extractTimestamp(String element) {
                            long eventTime = Long.parseLong(element.split(" ")[0]);
                            System.out.println(eventTime);
                            return eventTime;
                        }
                    })
                    .map(new MapFunction<String, Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> map(String value) throws Exception {
                            return Tuple2.of(value.split(" ")[1], 1L);
                        }
                    })
                    .keyBy(0)
                    .window(TumblingEventTimeWindows.of(Time.seconds(10)))
                    .sideOutputLateData(outputTag) // 收集延迟大于2s的数据
                    .allowedLateness(Time.seconds(2)) //允许2s延迟
                    .reduce(new ReduceFunction<Tuple2<String, Long>>() {
                        @Override
                        public Tuple2<String, Long> reduce(Tuple2<String, Long> value1, Tuple2<String, Long> value2) throws Exception {
                            return new Tuple2<>(value1.f0, value1.f1 + value2.f1);
                        }
                    });
            resultStream.print();
            //把迟到的数据暂时打印到控制台,实际中可以保存到其他存储介质中
            DataStream<Tuple2<String, Long>> sideOutput = resultStream.getSideOutput(outputTag);
            sideOutput.print();
            env.execute();
        }
    }
    
    djg@djgdeMacBook-Pro bin % nc -l 9999
    10000 a
    25000 a
    11000 a
    

    在这里插入图片描述

    图13:延迟数据重定向

    当25000进入window时,watermark被更新到22000,触发[10000, 20000)窗口进行计算;当延迟数据11000到达窗口时,由于不满足watermark < window_end_time + lateTime,窗口无法被再次计算。但是11000会被收集,重定向到sideOutput流中,最终可以进行打印或输出到其他介质
    在这里插入图片描述

    图14:延迟数据重定向数据落位图

    参考
    https://blog.csdn.net/sghuu/article/details/103704415
    https://miaowenting.site/2019/10/19/Apache-Flink/

    展开全文
  • window与watermark理解

    2019-03-08 14:57:21
    转载自:https://blog.csdn.net/lmalds/article/details/52704170     https://blog.csdn.net/sxiaobei/article/details/81147723     ...  实时计算中,对数据时间比较敏感...

    转载自:https://blog.csdn.net/lmalds/article/details/52704170
        https://blog.csdn.net/sxiaobei/article/details/81147723
        https://blog.csdn.net/xiao_jun_0820/article/details/79786517

      实时计算中,对数据时间比较敏感,有eventTime和processTime区分,一般来说eventTime是从原始的消息中提取过来的,processTime是Flink自己提供的,Flink是可以基于eventTime计算,这个功能很有用,因为实时数据可能会经过比较长的链路,多少会有延时,并且有很大的不确定性,对于一些需要精确体现事件变化趋势的场景中,单纯使用processTime显然是不合理的。

      在Flink中基于eventTime计算,需要注意两点,首先要设置数据流的时间特征,下面的代码的意思是基于eventTime处理数据,env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
      其次,需要提取eventTime和设置WaterMark,因为数据格式不相同,设置warterMark的方式也有多种,下面具体分析一下,eventTime结合Watermark的工作方式。

    window划分

      window是flink中划分数据一个基本单位,window的划分方式是固定的,默认会根据自然时间划分window,并且划分方式是前闭后开。
      比如定义了一个TumblingEventTimeWindows.of(Time.seconds(3)),那么会生成类似如下的窗口(左闭右开):

      [2018-03-03 03:30:00,2018-03-03 03:30:03)
      [2018-03-03 03:30:03,2018-03-03 03:30:06]
       …
      [2018-03-03 03:30:57,2018-03-03 03:31:00]
    

      当一个event time=2018-03-03 03:30:01的消息到来时,就会生成[2018-03-03 03:30:00,2018-03-03 03:30:03)这个窗口(而不是[2018-03-03 03:30:01,2018-03-03 03:30:04)这样的窗口),然后将消息buffer在这个窗口中(此时还不会触发窗口的计算),
      划分了window之后,触发window的计算,就可以得到这个window中的聚合结果了,其实基于eventTime和基于processTime计算最大的不同点就是在触发window的计算实际上不相同,通常数据流基于processTime,在window的endTime等于当前时间的时候就会触发计算,而eventTime因为数据有可能是乱序的,所以需要watermark的协助,完成window计算的触发。

    Watermark

      watermark(水位线),只会不断上升,不会下降,用来保证在水位线之前的数据一定都到。
      提取WaterMark的方式有两类,一类是定时提取watermark,对应DataStream#assignTimestampsAndWatermarks(AssignerWithPeriodicWatermarks<T>),这种方式会定时提取更新wartermark,超过窗口的endTime时,才会真正触发窗口计算逻辑,然后清除掉该窗口。
      另一类伴随event的到来就提取watermark,就是每一个event到来的时候,就会提取一次Watermark,对应AssignerWithPunctuatedWatermarks,这样的方式当然设置watermark更为精准,但是当数据量大的时候,频繁的更新wartermark会比较影响性能。通常情况下采用定时提取就足够了。

      需要注意的是,watermark的提取工作在taskManager中完成,意味着这项工作是并行进行的的,而watermark是一个全局的概念。那么warkermark一般是怎么提取呢,这里引用官网的两个例子来说明。

    /**
     * This generator generates watermarks assuming that elements arrive out of order,
     * but only to a certain degree. The latest elements for a certain timestamp t will arrive
     * at most n milliseconds after the earliest elements for timestamp t.
     */
    public class BoundedOutOfOrdernessGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
     
        private final long maxOutOfOrderness = 3500; // 3.5 seconds
     
        private long currentMaxTimestamp;
     
        @Override
        public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
            long timestamp = element.getCreationTime();
            currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
            return timestamp;
        }
     
        @Override
        public Watermark getCurrentWatermark() {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
    

      这个例子中extractTimestamp方法,在每一个event到来之后就会被调用,这里其实就是为了设置watermark的值,关键代码在于Math.max(timestamp,currentMaxTimestamp),意思是在当前的水位和当前事件时间中选取一个较大值,来让watermark流动。
      为什么要选取最大值,因为理想状态下,消息的事件时间肯定是递增的,实际处理中,消息乱序是大概率事件,所以为了保证watermark递增,要取最大值。而getCurrentWatermarker会被定时调用,可以看到方法中减了一个常量,这个原因在下面阐述。就这样,不断从eventTime中提取并更新watermark。

      而下面的例子中,并没有在提取eventTime的时候更新watermark的值,而是直接取系统当前时间减去一个常量,作为新的watermark。

    /**
     * This generator generates watermarks that are lagging behind processing time by a fixed amount.
     * It assumes that elements arrive in Flink after a bounded delay.
     */
    public class TimeLagWatermarkGenerator implements AssignerWithPeriodicWatermarks<MyEvent> {
     
    	private final long maxTimeLag = 5000; // 5 seconds
     
    	@Override
    	public long extractTimestamp(MyEvent element, long previousElementTimestamp) {
    		return element.getCreationTime();
    	}
     
    	@Override
    	public Watermark getCurrentWatermark() {
    		// return the watermark as current time minus the maximum time lag
    		return new Watermark(System.currentTimeMillis() - maxTimeLag);
    	}
    }
    

      上面两种代码中,提取watermark的时候都要减去一个常量,为了理解这么做的原因,需要了解,watermark的工作方式,上文提到在基于eventTime的计算中,需要watermark的协助来触发window的计算,触发规则是watermark大于等于window的结束时间,并且这个窗口中有数据的时候,就会触发window计算。

      举个例子说明其工作方式,当前window为10s,设想理想情况下消息都没有延迟,那么eventTime等于系统当前时间,假如设置watermark等于eventTIme的时候,当watermark = 00:00:10的时候,就会触发w1的计算,这个时候因为消息都没有延迟,watermark之前的消息[00:00:00~00:00:10)都已经落入到window中,所以会计算window中全量的数据。
      那么假如有一条消息data1,eventTime是00:00:01应该属于w1,在00:00:11才到达,因为假设消息没有延迟,那么watermark等于当前时间,00:00:11这个时候w1已经计算完毕,那么这条消息就会被丢弃,没有加入计算,这样就会出现问题。这是已经可以理解,代码中为什么要减去一个常量作为watermark,假设每次提取eventTime的时后,减去2s,那么当data1在00:00:11到达的时候,watermark才是00:00:09这个时候,w1还没有触发计算,那么data1会被加入w1,这个时候计算完全没有问题,所以减去一个常量是为了对延时的消息进行容错的。
      容忍一定时间的数据延迟,是可以DataStream#assignTimestampsAndWatermarks(自定义AssignerWithPeriodicWatermarks或BoundedOutOfOrdernessTimestampExtractor)来设置。一般来说,当watermark到了end-of-window后,window触发之后该window就会删除,再有该window的数据来也不会被计算,可以WindowedStream#allowedLatenes方法来指定一个延迟删除窗口的时间,allowedLateness(Time.seconds(5))允许5秒延迟,这样当窗口触发了之后再有该window的数据来也会触发窗口计算。

    展开全文
  • watermark学习

    2020-03-06 22:02:36
    设置watermark的值为2,则直到时间戳为7s,12s的数据出现,整体数据进行向下的传递。 可以视为一个延迟机制,直到指定的值出现,才进行事件的触发。 案例实现 定义一个watermark为2s的规则来处理流数据 package ...

    基本概念

    在数据流中,由于网络,分布式的原因,会出现数据处理时间误差。
    结合windows的概念,原定5s进行一次数据的处理
    设置watermark的值为2,则直到时间戳为7s,12s的数据出现,整体数据进行向下的传递。
    可以视为一个延迟机制,直到指定的值出现,才进行事件的触发。

    案例实现

    定义一个watermark为2s的规则来处理流数据

    package watermark
    
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.time.Time
    /**
      * Created by IBM on 2020/3/6.
      */
    object TheOne {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        //设置采用event时间来处理数据
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        //注册监听端口
        val ds: DataStream[String] = env.socketTextStream("hadoop101",7776)
        //完成数据的切分,准备的数据第二个为时间戳
        val value: DataStream[(String, String, Int)] = ds.map(x => {
          val arr = x.split(" ")
          (arr(0), arr(1), 1)
        })
        //指定watermark的参数与eventtime的值
        val value1: DataStream[(String, String, Int)] = value.assignTimestampsAndWatermarks(new
          //传入的参数为watemark的值
            BoundedOutOfOrdernessTimestampExtractor[(String, String, Int)](Time.seconds(2)) {
          override def extractTimestamp(t: (String, String, Int)): Long = t._2.toLong
        })
        //简单的按第一个值计数的逻辑-打印输出
        value1.keyBy(0).sum(2).print()
        //保持开启
        env.execute()
    
      }
    }
    
    展开全文
  • Watermark 案例

    2019-11-09 15:29:30
    Watermark 案例 Created by xuwei.tech. */ public class StreamingWindowWatermark { public static void main(String[] args) throws Exception { //定义socket的端口号 int port = 9000; ...
    /**
     *
     * Watermark 案例
     *
     * Created by xuwei.tech.
     */
    
    ```java
    public class StreamingWindowWatermark {
     
        public static void main(String[] args) throws Exception {
            //定义socket的端口号
            int port = 9000;
            //获取运行环境
            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
     
            //设置使用eventtime,默认是使用processtime
            env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
     
            //设置并行度为1,默认并行度是当前机器的cpu数量
            env.setParallelism(1);
     
            //连接socket获取输入的数据
            DataStream<String> text = env.socketTextStream("hadoop100", port, "\n");
     
            //解析输入的数据
            DataStream<Tuple2<String, Long>> inputMap = text.map(new MapFunction<String, Tuple2<String, Long>>() {
                @Override
                public Tuple2<String, Long> map(String value) throws Exception {
                    String[] arr = value.split(",");
                    return new Tuple2<>(arr[0], Long.parseLong(arr[1]));
                }
            });
     
            //抽取timestamp和生成watermark
            DataStream<Tuple2<String, Long>> waterMarkStream = inputMap.assignTimestampsAndWatermarks(
    			new AssignerWithPeriodicWatermarks<Tuple2<String, Long>>() {
     
                Long currentMaxTimestamp = 0L;
                final Long maxOutOfOrderness = 10000L;// 最大允许的乱序时间是10s
     
                SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                /**
                 * 定义生成watermark的逻辑
                 * 默认100ms被调用一次
                 */
                @Nullable
                @Override
                public Watermark getCurrentWatermark() {
                    return new Watermark(currentMaxTimestamp - maxOutOfOrderness);
                }
     
                //定义如何提取timestamp
                @Override
                public long extractTimestamp(Tuple2<String, Long> element, long previousElementTimestamp) {
                    long timestamp = element.f1;
                    currentMaxTimestamp = Math.max(timestamp, currentMaxTimestamp);
                    System.out.println("key:"+element.f0+",eventtime:["+element.f1+"|"+sdf.format(element.f1)+"],currentMaxTimestamp:["+currentMaxTimestamp+"|"+
                            sdf.format(currentMaxTimestamp)+"],watermark:["+getCurrentWatermark().getTimestamp()+"|"+sdf.format(getCurrentWatermark().getTimestamp())+"]");
                    return timestamp;
                }
            });
     
            //分组,聚合
            DataStream<String> window = waterMarkStream.keyBy(0)
                    .window(TumblingEventTimeWindows.of(Time.seconds(3)))//按照消息的EventTime分配窗口,和调用TimeWindow效果一样
                    .apply(new WindowFunction<Tuple2<String, Long>, String, Tuple, TimeWindow>() {
                        /**
                         * 对window内的数据进行排序,保证数据的顺序
                         * @param tuple
                         * @param window
                         * @param input
                         * @param out
                         * @throws Exception
                         */
                        @Override
                        public void apply(Tuple tuple, TimeWindow window, Iterable<Tuple2<String, Long>> input, Collector<String> out) throws Exception {
                            String key = tuple.toString();
                            List<Long> arrarList = new ArrayList<Long>();
                            Iterator<Tuple2<String, Long>> it = input.iterator();
                            while (it.hasNext()) {
                                Tuple2<String, Long> next = it.next();
                                arrarList.add(next.f1);
                            }
                            Collections.sort(arrarList);
                            SimpleDateFormat sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
                            String result = key + "," + arrarList.size() + "," + sdf.format(arrarList.get(0)) + "," + sdf.format(arrarList.get(arrarList.size() - 1))
                                    + "," + sdf.format(window.getStart()) + "," + sdf.format(window.getEnd());
                            out.collect(result);
                        }
                    });
            //测试-把结果打印到控制台即可
            window.print();
     
            //注意:因为flink是懒加载的,所以必须调用execute方法,上面的代码才会执行
            env.execute("eventtime-watermark");
     
        }
     
    }
    
    
    
    展开全文
  • 浅谈WaterMark

    千次阅读 2018-12-11 14:44:14
    想了想还是以watermark开始,本文只是谈谈个人对待watermark的理解,如有哪里说得不恰当,欢迎讨论。起初对Flink的watermark感动一点困惑,经过时间的沉淀,源码断断续续的阅读,稍微清楚一点,下面我将从一些概念...
  • WATERMARK

    2020-07-30 23:31:20
    WATERMARK
  • Flink流计算编程--watermark(水位线)简介

    万次阅读 多人点赞 2018-06-16 20:48:10
    【本文转自Flink流计算编程--watermark(水位线)简介】1、watermark的概念watermark是一种衡量Event Time进展的机制,它是数据本身的一个隐藏属性。通常基于Event Time的数据,自身都包含一个timestamp,例如...
  • Watermark

    2019-05-31 23:52:15
    谈及Watermark之前,需要先了解一下Flink中的三种Time,分别是Event Time(事件时间)、摄入时间(Ingestion Time)和Processing Time(处理时间): (图片来自Flink官网) 如上图,可以很清晰的了解这三种时间...
  • 经过前面文章的介绍,每个内存管理区都有一个数组watermark,内核中定义了三个watermark来表示当前系统剩余的空闲内存。 WMARK_MIN, WMARK_LOW, WMARK_HIGH, watermark high 当剩余内存在high以上时,系统...
  • JavaScript给网页添加水印

    万次阅读 2016-07-19 10:19:53
    设计前景 web页面需要提供打印功能,而且需要控制打印次数。因此给原网页添加水印,并且添加按钮去控制打印功能。 设计思路 1、给原网页进行添加水印,进行控制,避免使用浏览器的打印功能。 ... 3、打印结束后,将页面...
  • 1.导入watermark.js文件,内容如下 function watermark(settings) { //默认设置 var defaultSettings={ watermark_txt:"text", watermark_x:20,//水印起始位置x轴坐标 watermark_y:100,//水印起始...
  • 配置文件增加 cluster.routing.allocation.disk.threshold_enabled: false 详情
  • html中实现添加水印的功能

    万次阅读 2017-08-22 17:05:14
    最近项目需求中需要在html中添加水印效果,试验了一下几种方法 1、使用背景图添加水印 2、使用定位添加水印 3、使用js添加可配置控制水印
  •  WaterMark (水印) 本质上是一个时间戳。当Flink中的运算符接收到水印时,它明白(假设)它不会看到比该时间戳更早的消息。因此,在“EventTime”中,水印也可以被认为是一种告诉Flink它有多远的一种方式,...
  • watermark.js是一个给B/S网站系统加一个很浅的水印插件,确保系统的保密性,安全性,降低数据泄密风险。 水印插件内容,包含1、版本,2、水印插件-使用,3、水印插件-testTool(测试工具),4、内置方法,5、支持...
  • 使用JS为网页添加文字水印

    千次阅读 2018-03-11 14:32:48
    watermark({ watermark_txt: "学信网" })//传入动态水印内容 function watermark(settings) { //默认设置 var defaultSettings={ watermark_txt:"text", watermark_...
  • web页面设置水印

    千次阅读 2018-07-06 08:41:21
    先上图watermark.jsfunction watermark_show(settings) { // 默认设置 var defaultSettings={ watermark_txt: "", watermark_x : 150,// 水印起始位置x轴坐标 watermark_y : 50,// 水印起始位置Y轴...
  • js网页添加水印

    千次阅读 2017-06-30 16:00:39
    ////////////////////////////////////////////////// //改编摘自:...//hjl 2017年6月19日19:25:43 //////////////////////////////////////////////////// function watermar
  • JS给网页添加水印

    千次阅读 2018-06-06 15:51:33
    最近需要做一个小功能,给网页添加水印,在网上搜了一下,找到一个适合自己的。原链接https://www.cnblogs.com/daixinyu/p/6715398.htmlfunction watermark(settings) { ... watermark_txt:"text", ...
  • python opencv加水印 去水印

    千次阅读 2018-05-23 15:11:16
    收到的需求是在一个图上匹配到水印 然后将原来的水印换成一个新水印 先要安装一个库 库文件代码如下: # coding=utf-8 import cv2 import numpy as np ...# 膨胀算法 Kernel _DILATE_KERNEL = np.array([[0, 0, 1,...
1 2 3 4 5 ... 20
收藏数 40,788
精华内容 16,315
热门标签
关键字:

watermark