精华内容
下载资源
问答
  • 在前面的流处理随谈一文中已经简单介绍了Watermark,本文主要再结合Flink具体分析一下,作为补充。 理论 如果看完之前的文章,已经完全理解了Watermark,那可以直接跳过这部分,看实战部分。如果还不太理解,我...

    在前面的流处理随谈一文中已经简单介绍了Watermark,本文主要再结合Flink具体分析一下,作为补充。

    理论

    如果看完之前的文章,已经完全理解了Watermark,那可以直接跳过这部分,看实战部分。如果还不太理解,我通过几个问题来阐述一下,帮助你理解。要注意的是下面的描述方式和实际实现未必完全一样(有些甚至是我的个人观点),但可以帮助你更好的理解。

    What?

    Watermark是什么?从不同的维度可以有不同的理解

    1. 从Watermark的计算角度看:可以将Watermark理解为一个函数:F(P)−>EF(P)−>E,它的输入是当前的系统时间,输出是一个Event Time(一个时间戳),而且输出的这个时间戳是严格单调递增的。这样看,Watermark就是一个函数。
    2. 从Watermark的具体形式来看:可以将Watermark当成一个个时间戳,值就是1中输出的那个时间戳。
    3. 从Watermark流转的角度看:可以将Watermark理解成夹杂在正常流事件中的一个个特殊事件。

    这3种描述方式,看似不同,实则一样,只是从不同的角度去看了而已。不管怎么理解,我们必须知道:流处理系统规定,如果某个时刻Watermark的值为T1,那系统就认为凡是早于或等于T1时间的事件都已经收到了。注意,这个就是Watermark所代表的含义,实际因为现实中各种情况,未必能严格做到这样,但目标就是要达到上面规定的这样,或者无限逼近。

    Why?

    为什么需要Watermark?这个也有很多种描述方式,往大了说就是提供一种理论上解决分布式系统中消息乱序问题(这是分布式系统中一个经典难题)的方案。说小点就是在有状态的流计算中,当我们关注事件的顺序或者完整性时,需要有这么一种机制能实现这个需求。

    这里的完整性我举个例子解释一下:比如我们基于事件发生时间统计每5min的用户PV总量,那比如12:00-12:05这个5min的统计该在什么时间点计算呢?假设没有Watermark这个概念,你就永远不知道什么时候12:00-12:05区间的所有事件才全到齐。你不能假定收到12:00-12:05的数据就认为之前的数据已经全部来了,因为数据可能延迟+乱序了。而Watermark就是为了解决这个问题而提出的,当你收到Watermark的值为12:00-12:05的事件时,你就可以认为早于这个时间的数据已经都到了,数据已经完整了,可以进行12:00-12:05这个5min区间的数据计算了。至于如何保证,这个是框架要做的事情(当然一般需要开发者参与)。

    Where?

    哪里需要Watermark?这里我给一个简单粗暴的结论,当同时满足下面两个条件的时候才会需要Watermark:

    1. 计算中使用了时间相关的算子(time-based operations),其实再明确点,就是使用了Window的时候(注:Flink的Global Window除外,这个Window不是基于时间的)。
    2. 1中使用的时间相关的算子选择使用事件时间,即Event Time(注:如果是Flink的话,也包含Ingestion Time)。

    这里再解释一下2。前文我们介绍过有两种时间Event Time和Processing Time(Flink独有的Ingestion Time在Watermark这里可以归结为Event Time,后文不再另行说明),时间相关的算子选择时间时必然是二选一。并不是选择Processing Time的时候就没有Watermark了,只是这个时候Processing Time自身就是一个完美的Watermark(因为时间一去不复返,Processing Time永远是单调递增的),并不需要产生单独的Watermark了。所以在Processing Time里面,你可以认为Watermark没有意义了,所以去掉了,或者认为Processing Time自身就是Watermark都行。

    How?

    如何使用Watermark?实战部分介绍Flink中的Watermark如何使用。

    关于Watermark我给一个我个人的意见或者看法吧:Watermark其实是一个比较好理解的概念和机制,但却很容易给刚接触它的人造成困惑,我觉得一方面是它关联了时间和Window的概念,也就是如果你不了解Event Time和Processing Time的区别,你不了解Window的机制,你就很难理解Watermark,因为它就是为了解决Event Time + Window中的问题而设计的;另一方面是Watermark没有特别好的观测点和跟踪手段,导致你只能一直在外围理解,却始终无法揭开它的面纱,直面它。关于第一点,我的建议是先了解相关的Time和Window的概念,然后从宏观上看问题,想清楚Watermark的What、Why、Where三个问题,基本也就了解了这个概念的本质了。关于第二点,我会在实战部分给出一些观测的方式,虽然很有限,但也能看到一些细节。

    实战

    场景介绍

    为了方便说明,我构造了一个简单的场景,假设有一个设备产生了一组事件,如下:

    {"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
    {"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
    {"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
    {"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
    {"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
    {"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
    {"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
    {"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
    {"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}

    复制

    一共9个事件,id是事件名称,timestamp是设备端事件真实产生的时间。也就是事件真实产生顺序是:

    event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9event1, event2, event3, event4, event5, event6, event7, event8, event9

    但在传输过程中因为各种现实原因乱序了,到Flink这里的时候,事件顺序变成了:

    event1, event2, event4, event5, event7, event3, event6, event8, event9

    现在我们要做的事情就是计算每5秒中的事件个数,以此来判断事件高峰期。

    说明:

    1. 这个计算是非常有代表性的,比如电商统计每小时的pv就能知道每天用户高峰期发生在哪几个时段,这里为了方便说明问题,把问题简化了,并且为了快速出结果,把时间粒度缩短为5秒钟。
    2. 计算时,要想结果准确,就不能使用Processing Time,这样如果数据从产生到被处理延迟比较大的话,最终计算的结果也会不准确。除非这个延迟可控或者可接受,则可简单的使用Processing Time,否则就必须用Event Time进行计算。

    Flink提供的Watermark机制

    Flink提供了3种方式来生成Watermark:

    1. 在Source中生成Watermark;
    2. 通过AssignerWithPeriodicWatermarks生成Watermark;
    3. 通过AssignerWithPunctuatedWatermarks生成Watermark;

    前面介绍过了Watermark是在使用Event Time的场景下才使用的,所以给事件增加Event Time和生成Watermark是一对操作,一般都是一起使用的。方式1是直接在Flink的最源头Source那里就生成了Event Time和Watermark。方式2和方式3则是流处理中的某一步骤(可以理解为一个特殊点的算子),它的输入是流,输出还是流,只不过经过这个流之后事件就会有Event Timestamp和Watermark了,一般这一步放在Source之后,最晚也要在时间算子之前,也就是Window之前。而且他两的优先级高,如果Source中生成了Watermark,后面又使用了方式2或3,则会覆盖之前的Event Timestamp和Watermark。

    下面我们分别介绍每种方式。

    Watermark In Source

    直接上代码,为了完整性,我把所有代码写在了一个文件里面(源文件点这里):

    package com.niyanchun.watermark;
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.joda.time.DateTime;
    
    import java.text.Format;
    import java.text.SimpleDateFormat;
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * Assign timestamp and watermark at Source Function Demo.
     *
     * @author NiYanchun
     **/
    public class AssignAtSourceDemo {
    
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
    //    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
        env.addSource(new CustomSource())
            .timeWindowAll(Time.seconds(5))
            .process(new CustomProcessFunction())
            .print();
    
        env.execute();
      }
    
    
      public static class CustomSource extends RichSourceFunction<JSONObject> {
    
        @Override
        public void run(SourceContext<JSONObject> ctx) throws Exception {
          System.out.println("event in source:");
          getOutOfOrderEvents().forEach(e -> {
            System.out.println(e);
            long timestampInMills = ((DateTime) e.get("timestamp")).getMillis();
            ctx.collectWithTimestamp(e, timestampInMills);
            ctx.emitWatermark(new Watermark(timestampInMills));
          });
    
          try {
            Thread.sleep(5000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
    
        @Override
        public void cancel() {
    
        }
      }
    
    
      /**
       * generate out of order events
       *
       * @return List<JSONObject>
       */
      private static List<JSONObject> getOutOfOrderEvents() {
        // 2020-05-24 12:00:00
        JSONObject event1 = new JSONObject().fluentPut("id", "event1")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
        // 2020-05-24 12:00:01
        JSONObject event2 = new JSONObject().fluentPut("id", "event2")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
        // 2020-05-24 12:00:03
        JSONObject event3 = new JSONObject().fluentPut("id", "event3")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
        // 2020-05-24 12:00:04
        JSONObject event4 = new JSONObject().fluentPut("id", "event4")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
        // 2020-05-24 12:00:05
        JSONObject event5 = new JSONObject().fluentPut("id", "event5")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
        // 2020-05-24 12:00:06
        JSONObject event6 = new JSONObject().fluentPut("id", "event6")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
        // 2020-05-24 12:00:07
        JSONObject event7 = new JSONObject().fluentPut("id", "event7")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
        // 2020-05-24 12:00:08
        JSONObject event8 = new JSONObject().fluentPut("id", "event8")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
        // 2020-05-24 12:00:09
        JSONObject event9 = new JSONObject().fluentPut("id", "event9")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));
    
        // 这里把消息打乱,模拟实际中的消息乱序
        // 真实的消息产生顺序是(根据时间戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
        // 打乱之后的消息顺序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
        return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
      }
    
      public static class CustomProcessFunction extends ProcessAllWindowFunction<JSONObject, Object, TimeWindow> {
    
        @Override
        public void process(Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
          TimeWindow window = context.window();
          Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          System.out.println(String.format("\nwindow{%s - %s}", sdf.format(window.getStart()), sdf.format(window.getEnd())));
    
          int count = 0;
          for (JSONObject element : elements) {
            System.out.println(element.getString("id"));
            count++;
          }
          System.out.println("Total:" + count);
        }
      }
    }

    复制

    这里自定义了一个Source,然后接了一个Window(timeWindowAll),做了一个简单的处理,最终输出。这里需要注意一个点:timeWindowAll底层其实是定义了一个TumblingWindows,至于使用Processing Time(TumblingProcessingTimeWindows),还是Event Time(TumblingEventTimeWindows)则由env.setStreamTimeCharacteristic来确定的,该选项的默认值是TimeCharacteristic.ProcessingTime,即使用Processing Time。

    作为演示,修改一下上面代码,先使用Processing Time,看下结果:

    event in source:
    {"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
    {"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
    {"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
    {"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
    {"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
    {"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
    {"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
    {"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
    {"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
    
    window{2020-05-24 20:12:30 - 2020-05-24 20:12:35}
    event1
    event2
    event4
    event5
    event7
    event3
    event6
    event8
    event9
    Total:9
    
    Process finished with exit code 0

    复制

    可以看到,只有一个Window,其范围是window{2020-05-24 20:12:30 - 2020-05-24 20:12:35},即我代码运行的时间,显然这样的统计结果是没有意义的,因为它体现不出业务真正的高峰期。后面我们只讨论使用Event Time的情况。

    现在重新改为env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);,然后运行:

    event in source:
    {"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
    {"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
    {"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
    {"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
    {"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
    {"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
    {"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
    {"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
    {"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
    
    window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
    event1
    event2
    event4
    Total:3
    
    window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
    event5
    event7
    event6
    event8
    event9
    Total:5
    
    Process finished with exit code 0

    复制

    我们看下现在的输出,有两个Window:window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}window{2020-05-24 12:00:05 - 2020-05-24 12:00:10},可以看到就是5秒钟一个Window。然后12:00:00-12:00:05这个Window里面包含了3个事件:event1,event2,event4;12:00:05-12:00:10这个Window里面包含了5个事件:event5、event7、event6、event8、event9。

    从这个结果看event3丢了,其它数据都在,为什么呢?如果我说因为event3乱序了,排在了后边,你肯定会说event6也排到了event7后边,为什么event6却没有丢呢?要解释清楚这个问题还需要涉及到触发器以及窗口的原理和机制,为了保证行文的连贯性,这里我先直接给出结论:因为窗口默认的触发器实现机制是本该在一个窗口内的数据乱序了以后,只要在这个窗口结束(即被触发)之前来,那是不影响的,不认为是迟到数据,不会被丢掉;但如果这个窗口已经结束了才来,就会被丢掉了。比如event3本应该属于12:00:00-12:00:05这个窗口,当event5这条数据来的时候,这个窗口就就认为数据完整了,于是触发计算,接着就销毁了。等event3来的时候已经是12:00:05-12:00:10窗口了,所以它直接被丢掉了。也就是在时间窗口这里,对于“乱序”的定义不是要求每个到来事件的时间戳都严格升序,而是看属于这个窗口的事件能否在窗口时间范围内来,如果能来,就不算乱序,至于在这个时间范围内来的先后顺序无所谓。这个其实也是合理的。后面我计划单独写一篇介绍触发器和窗口的文章,在那篇文章中来从代码层面分析这个结论。

    另外还有两个细节点要注意一下:

    • 当Source是有界数据时,当所有数据发送完毕后,系统会自动发一个值为Long.MAX_VALUE的Watermark,表示数据发送完了。
    • Window是一个左闭右开区间,比如12:00:00的数据属于12:00:00-12:00:05窗口,而12:00:05的数据属于12:00:05-12:00:10窗口。

    AssignerWithPeriodicWatermarks && AssignerWithPunctuatedWatermarks

    AssignerWithPriodicWatermarksAssignerWithPunctuatedWatermarks其实非常像,哪怕是用法都非常像,他两个的主要区别是Watermark的产生机制或者时机:AssignerWithPriodicWatermarks是根据一个固定的时间周期性的产生Watermark,而AssignerWithPunctuatedWatermarks则是由事件驱动,然后代码自己控制何时以何种方式产生Watermark,比如一个event就产生一个,还是几个event产生一个,或者满足什么条件时产生Watermark等,就是用户可以灵活控制。

    看下代码(源文件点这里):

    package com.niyanchun.watermark;
    
    import com.alibaba.fastjson.JSONObject;
    import org.apache.flink.streaming.api.TimeCharacteristic;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks;
    import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks;
    import org.apache.flink.streaming.api.functions.source.RichSourceFunction;
    import org.apache.flink.streaming.api.functions.windowing.ProcessAllWindowFunction;
    import org.apache.flink.streaming.api.watermark.Watermark;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow;
    import org.apache.flink.util.Collector;
    import org.joda.time.DateTime;
    
    import javax.annotation.Nullable;
    import java.text.Format;
    import java.text.SimpleDateFormat;
    import java.util.Arrays;
    import java.util.List;
    
    /**
     * Assign timestamp and watermark at Source Function Demo.
     *
     * @author NiYanchun
     **/
    public class AssignerWatermarksDemo {
    
      public static void main(String[] args) throws Exception {
        StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    
        env.setParallelism(1);
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
    
        env.addSource(new CustomSource())
            .assignTimestampsAndWatermarks(new CustomAssignerWithPeriodicWatermarks())
    //        .assignTimestampsAndWatermarks(new CustomAssignerWithPunctuatedWatermarks())
            .timeWindowAll(Time.seconds(5))
            .process(new CustomProcessFunction())
            .print();
    
        env.execute();
      }
    
      public static class CustomSource extends RichSourceFunction<JSONObject> {
    
        @Override
        public void run(SourceContext<JSONObject> ctx) throws Exception {
          System.out.println("event in source:");
          getOutOfOrderEvents().forEach(e -> {
            System.out.println(e);
            ctx.collect(e);
          });
    
          try {
            Thread.sleep(2000);
          } catch (InterruptedException e) {
            e.printStackTrace();
          }
        }
    
        @Override
        public void cancel() {
    
        }
      }
      
      /**
       * generate out of order events
       *
       * @return List<JSONObject>
       */
      private static List<JSONObject> getOutOfOrderEvents() {
        // 2020-05-24 12:00:00
        JSONObject event1 = new JSONObject().fluentPut("id", "event1")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 0));
        // 2020-05-24 12:00:01
        JSONObject event2 = new JSONObject().fluentPut("id", "event2")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 1));
        // 2020-05-24 12:00:03
        JSONObject event3 = new JSONObject().fluentPut("id", "event3")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 3));
        // 2020-05-24 12:00:04
        JSONObject event4 = new JSONObject().fluentPut("id", "event4")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 4));
        // 2020-05-24 12:00:05
        JSONObject event5 = new JSONObject().fluentPut("id", "event5")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 5));
        // 2020-05-24 12:00:06
        JSONObject event6 = new JSONObject().fluentPut("id", "event6")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 6));
        // 2020-05-24 12:00:07
        JSONObject event7 = new JSONObject().fluentPut("id", "event7")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 7));
        // 2020-05-24 12:00:08
        JSONObject event8 = new JSONObject().fluentPut("id", "event8")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 8));
        // 2020-05-24 12:00:09
        JSONObject event9 = new JSONObject().fluentPut("id", "event9")
            .fluentPut("timestamp", new DateTime(2020, 5, 24, 12, 0, 9));
    
        // 可以把消息打乱,模拟实际中的消息乱序。
        // 真实的消息产生顺序是(根据时间戳):event1, event2, event3, event4, event5, event6, event7, event8, event9
        // 打乱之后的消息顺序是:event1, event2, event4, event3, event5, event7, event6, event8, event9
        return Arrays.asList(event1, event2, event4, event5, event7, event3, event6, event8, event9);
      }
    
      public static class CustomProcessFunction extends ProcessAllWindowFunction<JSONObject, Object, TimeWindow> {
    
        @Override
        public void process(Context context, Iterable<JSONObject> elements, Collector<Object> out) throws Exception {
          TimeWindow window = context.window();
          Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          System.out.println(String.format("\nwindow{%s - %s}", sdf.format(window.getStart()), sdf.format(window.getEnd())));
    
          int count = 0;
          for (JSONObject element : elements) {
            System.out.println(element.getString("id"));
            count++;
          }
          System.out.println("Total:" + count);
        }
      }
    
      /**
       * AssignerWithPeriodicWatermarks demo
       */
      public static class CustomAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {
    
        private long currentTimestamp;
    
        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
          Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          System.out.println(String.format("invoke getCurrentWatermark at %s and watermark is: %s",
              System.currentTimeMillis(), sdf.format(currentTimestamp)));
          return new Watermark(currentTimestamp);
        }
    
        @Override
        public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
          long timestamp = ((DateTime) element.get("timestamp")).getMillis();
          Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          System.out.println("invoke extractTimestamp: " + sdf.format(timestamp));
          currentTimestamp = timestamp;
          return timestamp;
        }
      }
    
      /**
       * AssignerWithPunctuatedWatermarks demo.
       */
      public static class CustomAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<JSONObject> {
    
        private long currentTimestamp;
    
        @Nullable
        @Override
        public Watermark checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp) {
          Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          System.out.println(String.format("invoke getCurrentWatermark at %s and watermark is: %s",
              System.currentTimeMillis(), sdf.format(currentTimestamp)));
          return new Watermark(currentTimestamp);
        }
    
        @Override
        public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
          long timestamp = ((DateTime) element.get("timestamp")).getMillis();
          Format sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss");
          System.out.println("invoke extractTimestamp: " + sdf.format(timestamp));
          currentTimestamp = timestamp;
          return timestamp;
        }
      }
    }

    复制

    先分别看下AssignerWithPriodicWatermarksAssignerWithPunctuatedWatermarks部分吧:

      /**
       * AssignerWithPeriodicWatermarks demo
       */
      public static class CustomAssignerWithPeriodicWatermarks implements AssignerWithPeriodicWatermarks<JSONObject> {
    
        private long currentTimestamp;
    
        @Nullable
        @Override
        public Watermark getCurrentWatermark() {
          // 省略一些逻辑
          return new Watermark(currentTimestamp);
        }
    
        @Override
        public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
          // 省略一些逻辑
          return timestamp;
        }
      }
    
      /**
       * AssignerWithPunctuatedWatermarks demo.
       */
      public static class CustomAssignerWithPunctuatedWatermarks implements AssignerWithPunctuatedWatermarks<JSONObject> {
    
        private long currentTimestamp;
    
        @Nullable
        @Override
        public Watermark checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp) {
          // 省略一些逻辑
          return new Watermark(currentTimestamp);
        }
    
        @Override
        public long extractTimestamp(JSONObject element, long previousElementTimestamp) {
          // 省略一些逻辑
          return timestamp;
        }
      }

    复制

    为了突出重点,我删掉了具体实现。可以看到这两个类都有一个extractTimestamp方法,这个方法每个Event都会调用,作用就是给Event赋一个Event Time。另外一个方法稍微有点差异,AssignerWithPeriodicWatermarks的方法叫getCurrentWatermark(),而AssignerWithPunctuatedWatermarks的方法是checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp),它们的主要区别是方法的调用机制:

    • getCurrentWatermark()没有参数,它是框架根据用户设置的固定时间周期性的调用。这个固定的时间可以通过以下方式设置:

      StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
      
      ExecutionConfig executionConfig = env.getConfig();
      executionConfig.setAutoWatermarkInterval(500);

      复制

    上面的代码设置每500毫秒调用一次getCurrentWatermark(),即每500毫秒产生一个Watermark。不显式的设置的话,默认值是0,但实际效果是每200ms调用一次。

    • checkAndGetNextWatermark(JSONObject lastElement, long extractedTimestamp)有两个参数:一个是event,一个是extractTimestamp方法返回的时间戳。这个方法被调用的时间点是:每个事件来了先调用extractTimestamp,然后马上调用checkAndGetNextWatermark。在checkAndGetNextWatermark中你可以通过返回值控制是否产生新的Watermark,如果你不想返回新的Watermark,可以返回null或者一个小于等于上一个Watermark的时间戳,这样就相当于本次不返回Watermark或者返回的Watermark不是递增的被丢弃了,继续使用原来的Watermark。因为Watermark不能为null,且必须单调递增。

    AssignerWithPriodicWatermarksAssignerWithPunctuatedWatermarks的区别就这些,最佳实践的话我个人觉得优先考虑AssignerWithPriodicWatermarks,如果不能满足需求,再考虑AssignerWithPunctuatedWatermarks。一方面是前者简单一些,另一方面是一般没有必要每个事件就计算一个Watermark,这样会增加不是很有必要的计算量。

    然后我们把最上面完整代码里面的Watermark产生器设置为AssignerWithPriodicWatermarks(打开注释的assignTimestampsAndWatermarks(new CustomAssignerWithPeriodicWatermarks())),执行一下,看下效果:

    event in source:
    {"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:00
    {"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
    invoke getCurrentWatermark at 1590404165438 and watermark is: 2020-05-24 12:00:00
    invoke extractTimestamp: 2020-05-24 12:00:01
    {"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:04
    {"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:05
    {"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:07
    {"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:03
    {"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:06
    {"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:08
    {"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:09
    invoke getCurrentWatermark at 1590404165641 and watermark is: 2020-05-24 12:00:09
    
    window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
    event1
    event2
    event4
    event3
    Total:4
    invoke getCurrentWatermark at 1590404165843 and watermark is: 2020-05-24 12:00:09
    invoke getCurrentWatermark at 1590404166051 and watermark is: 2020-05-24 12:00:09
    invoke getCurrentWatermark at 1590404166256 and watermark is: 2020-05-24 12:00:09
    invoke getCurrentWatermark at 1590404166459 and watermark is: 2020-05-24 12:00:09
    invoke getCurrentWatermark at 1590404166665 and watermark is: 2020-05-24 12:00:09
    invoke getCurrentWatermark at 1590404166871 and watermark is: 2020-05-24 12:00:09
    invoke getCurrentWatermark at 1590404167075 and watermark is: 2020-05-24 12:00:09
    invoke getCurrentWatermark at 1590404167279 and watermark is: 2020-05-24 12:00:09
    invoke getCurrentWatermark at 1590404167461 and watermark is: 2020-05-24 12:00:09
    
    window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
    event5
    event7
    event6
    event8
    event9
    Total:5
    
    Process finished with exit code 0

    复制

    可以看到每个事件都会调用extractTimestamp,基本是200ms调用一次getCurrentWatermark,而且在Source的数据全部发送完之后,因为我加了Sleep,所以还在调用getCurrentWatermark,这就是上面说的它是固定周期调用的,而不是事件驱动调用。

    然后改为AssignerWithPunctuatedWatermarks(打开注释的assignTimestampsAndWatermarks(new CustomAssignerWithPunctuatedWatermarks())),运行一下,输出如下:

    event in source:
    {"id":"event1","timestamp":"2020-05-24T12:00:00.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:00
    invoke getCurrentWatermark at 1590404475754 and watermark is: 2020-05-24 12:00:00
    {"id":"event2","timestamp":"2020-05-24T12:00:01.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:01
    invoke getCurrentWatermark at 1590404475756 and watermark is: 2020-05-24 12:00:01
    {"id":"event4","timestamp":"2020-05-24T12:00:04.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:04
    invoke getCurrentWatermark at 1590404475758 and watermark is: 2020-05-24 12:00:04
    {"id":"event5","timestamp":"2020-05-24T12:00:05.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:05
    invoke getCurrentWatermark at 1590404475760 and watermark is: 2020-05-24 12:00:05
    {"id":"event7","timestamp":"2020-05-24T12:00:07.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:07
    invoke getCurrentWatermark at 1590404475763 and watermark is: 2020-05-24 12:00:07
    {"id":"event3","timestamp":"2020-05-24T12:00:03.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:03
    invoke getCurrentWatermark at 1590404475765 and watermark is: 2020-05-24 12:00:03
    {"id":"event6","timestamp":"2020-05-24T12:00:06.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:06
    invoke getCurrentWatermark at 1590404475768 and watermark is: 2020-05-24 12:00:06
    {"id":"event8","timestamp":"2020-05-24T12:00:08.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:08
    invoke getCurrentWatermark at 1590404475770 and watermark is: 2020-05-24 12:00:08
    {"id":"event9","timestamp":"2020-05-24T12:00:09.000+08:00"}
    invoke extractTimestamp: 2020-05-24 12:00:09
    invoke getCurrentWatermark at 1590404475772 and watermark is: 2020-05-24 12:00:09
    
    window{2020-05-24 12:00:00 - 2020-05-24 12:00:05}
    event1
    event2
    event4
    Total:3
    
    window{2020-05-24 12:00:05 - 2020-05-24 12:00:10}
    event5
    event7
    event6
    event8
    event9
    Total:5
    
    Process finished with exit code 0

    复制

    可以看到,getCurrentWatermark是事件驱动调用的,每个事件来了先调用extractTimestamp,然后紧接着就调用checkAndGetNextWatermark。我们这里的实现是每个Event都产生一个新的Watermark。

    迟到数据

    从上面的部分看到event3因为迟到被默默的丢掉了,现实中数据是重要资产,肯定是不能随便丢弃的。Flink提供了两种解决方案:

    1. 允许一定的延迟。这个延迟可以在两个地方设置:第一种是可以在上面的AssignerWithXXXWatermarks方法里面给计算出的时间戳减去一个时间,这个时间就是你允许延迟的时间。第二种就是在时间窗口那里可以通过allowedLateness来设置一个允许的延迟时间,之前流处理随谈一文中已经介绍过了,这里就不赘述了。但允许一定延迟的方式只能治标,不能治本。我们只能根据实际情况允许一定限度的延迟,但总归是有个限度的,原因主要有两个:1)延迟太高会丧失实时性,如果你的场景对实时性要求比较高,那就无法设置太大的延迟。2)延迟实际是延长了窗口的生命周期,所以资源消耗会增加。
    2. 在Window那里通过sideOutputLateData将迟到的数据以流的形式旁路出去。之前流处理随谈一文中也已经介绍过了。这个是治本的手段,它没有时间的限制,如果有迟到数据,就会发送到这个单独的流里面去,然后可以为这个流单独设置处理方式。

    这两种方式有利有弊,各有不同的适用场景,根据自己的业务灵活选择即可。

    最后总结一下:只要你理解Watermark的本质,其实它就是一个很简单的东西,使用起来也很简单。只是框架层面要实现会有一定难度,但这个对使用者而言是不可见的。

     本文遵循 CC BY-NC-SA 4.0版权协议 进行许可,转载请附上原文链接和本声明。

    参考:https://niyanchun.com/watermark-details.html

    展开全文
  • Flink实操 : Watermark

    2021-03-04 21:52:40
    Timestamp 分配和 Watermark 生成2.4. Watermark 传播2.5. ProcessFunction2.6. Watermark 处理2.7.多流的Watermark三 .代码实例 一 .前言 二 .概念 2.1. Flink 时间语义 在不同的应用场景中时间语义是各不相同的,...

    一 .前言

    二 .概念

    2.1. Flink 时间语义

    在不同的应用场景中时间语义是各不相同的,Flink 作为一个先进的分布式流处理引擎,它本身支持不同的时间语义。
    其核心是 Processing TimeEvent Time(Row Time)
    这两类时间主要的不同点如下表所示:

    在这里插入图片描述

    Processing Time 是来模拟我们真实世界的时间,其实就算是处理数据的节点本地时间也不一定就是完完全全的我们真实世界的时间,所以说它是用来模拟真实世界的时间。

    Event Time数据世界的时间,就是我们要处理的数据流世界里面的时间。

    • 关于他们的获取方式:
      Process Time 是通过直接去调用本地机器的时间,
      Event Time 则是根据每一条处理记录所携带的时间戳来判定。

    这两种时间在 Flink 内部的处理以及还是用户的实际使用方面,难易程度都是不同的。

    相对而言的 Processing Time 处理起来更加的简单,而 Event Time 要更麻烦一些。
    而在使用 Processing Time 的时候,我们得到的处理结果(或者说流处理应用的内部状态)是不确定的
    而因为在 Flink 内部对 Event Time 做了各种保障,使用 Event Time 的情况下,无论重放数据多少次,都能得到一个相对确定可重现的结果。

    • 因此在判断应该使用 Processing Time 还是 Event Time 的时候,可以遵循一个原则:
    1. 当你的应用遇到某些问题要从上一个 checkpoint 或者 savepoint 进行重放,是不是希望结果完全相同。
    2. 如果希望结果完全相同,就只能用 Event Time;
    3. 如果接受结果不同,则可以用 Processing Time。

    Processing Time 的一个常见的用途是,我们要根据现实时间来统计整个系统的吞吐,比如要计算现实时间一个小时处理了多少条数据,这种情况只能使用 Processing Time。

    2.2. 时间的特性

    时间的一个重要特性是:时间只能递增,不会来回穿越。

    在使用时间的时候我们要充分利用这个特性。
    假设我们有这么一些记录,然后我们来分别看一下 Processing Time 还有 Event Time 对于时间的处理。

    • 对于 Processing Time,因为我们是使用的是本地节点的时间(假设这个节点的时钟同步没有问题),我们每一次取到的 Processing Time 肯定都是递增的,递增就代表着有序,所以说我们相当于拿到的是一个有序的数据流。

    • 而在用 **Event Time 的时候因为时间是绑定在每一条的记录上的,由于网络延迟、程序内部逻辑、或者其他一些分布式系统的原因,数据的时间可能会存在一定程度的乱序。**在 Event Time 场景下,我们把每一个记录所包含的时间称作 Record Timestamp。
      如果 Record Timestamp 所得到的时间序列存在乱序,我们就需要去处理这种情况。

    如果单条数据之间是乱序,我们就考虑对于整个序列进行更大程度的离散化

    简单地讲,就是把数据按照一定的条数组成一些小批次,但这里的小批次并不是攒够多少条就要去处理,而是为了对他们进行时间上的划分。经过这种更高层次的离散化之后,我们会发现最右边方框里的时间就是一定会小于中间方框里的时间中间框里的时间也一定会小于最左边方框里的时间
    在这里插入图片描述
    这个时候我们在整个时间序列里插入一些类似于标志位的一些特殊的处理数据,这些特殊的处理数据叫做 watermark

    一个 watermark 本质上就代表了这个 watermark 所包含的 timestamp 数值,
    表示以后到来的数据已经再也没有小于或等于这个时间的了

    2.3. Timestamp 分配和 Watermark 生成

    Flink 支持两种 watermark 生成方式。

    • 第一种是在 SourceFunction 中产生,相当于把整个的 timestamp 分配和 watermark 生成的逻辑放在流处理应用的源头
      我们可以在 SourceFunction 里面通过这两个方法产生 watermark.
      通过 collectWithTimestamp 方法发送一条数据,其中第一个参数就是我们要发送的数据,第二个参数就是这个数据所对应的时间戳;也可以调用 emitWatermark 方法去产生一条 watermark,表示接下来不会再有时间戳小于等于这个数值记录。

    • 有时候我们不想在 SourceFunction 里生成 timestamp 或者 watermark,或者说使用的 SourceFunction 本身不支持,我们还可以在使用 DataStream API 的时候指定,调用的 DataStream.assignTimestampsAndWatermarks 这个方法,能够接收不同的 timestamp 和 watermark 的生成器。

    总体上而言生成器可以分为两类:第一类是定期生成器; 第二类是根据一些在流处理数据流中遇到的一些特殊记录生成的。

    在这里插入图片描述
    两者的区别主要有三个方面,

    • 首先定期生成现实时间驱动的,这里的“定期生成”主要是指 watermark(因为 timestamp 是每一条数据都需要有的),即定期会调用生成逻辑去产生一个 watermark

    • 而根据特殊记录生成是数据驱动的,即是否生成 watermark 不是由现实时间来决定,而是当看到一些特殊的记录就表示接下来可能不会有符合条件的数据再发过来了,这个时候相当于每一次分配 Timestamp 之后都会调用用户实现的 watermark 生成方法,用户需要在生成方法中去实现 watermark 的生成逻辑。

    • 大家要注意的是就是我们在分配 timestamp 和生成 watermark 的过程,虽然在 SourceFunction 和 DataStream 中都可以指定,但是还是建议生成的工作越靠近 DataSource 越好。这样会方便让程序逻辑里面更多的 operator 去判断某些数据是否乱序。

    Flink 内部提供了很好的机制去保证这些 timestamp 和 watermark 被正确地传递到下游的节点。

    2.4. Watermark 传播

    在这里插入图片描述
    具体的传播策略基本上遵循这三点。

    • 首先,watermark 会以广播的形式在算子之间进行传播。比如说上游的算子,它连接了三个下游的任务,它会把自己当前的收到的 watermark 以广播的形式传到下游

    • 第二,如果在程序里面收到了一个 Long.MAX_VALUE 这个数值的 watermark,就表示对应的那一条流的一个部分不会再有数据发过来了,它相当于就是一个终止的一个标志。

    • 第三,对于单流而言,这个策略比较好理解,而对于有多个输入的算子,watermark 的计算就有讲究了,一个原则是:单输入取其大,多输入取小。

    举个例子,假设这边蓝色的块代表一个算子的一个任务,然后它有三个输入,分别是 W1、W2、W3,这三个输入可以理解成任何的输入,这三个输入可能是属于同一个流,也可能是属于不同的流。然后在计算 watermark 的时候,对于单个输入而言是取他们的最大值,因为我们都知道 watermark 应该遵循一个单调递增的一个原则。对于多输入,它要统计整个算子任务的 watermark 时,就会取这三个计算出来的 watermark 的最小值。即一个多个输入的任务,它的 watermark 受制于最慢的那条输入流。这一点类似于木桶效应,整个木桶中装的水会就是受制于最矮的那块板。

    在这里插入图片描述

    watermark 在传播的时候有一个特点是,它的传播是幂等的。
    多次收到相同的 watermark,甚至收到之前的 watermark 都不会对最后的数值产生影响,因为对于单个输入永远是取最大的,而对于整个任务永远是取一个最小的

    同时我们可以注意到这种设计其实有一个局限,具体体现在它没有区分你这个输入是一条流多个 partition 还是来自于不同的逻辑上的流的 JOIN。

    对于同一个流的不同 partition,我们对他做这种强制的时钟同步是没有问题的,因为一开始就是把一条流拆散成不同的部分,但每一个部分之间共享相同的时钟。

    但是如果算子的任务是在做类似于 JOIN 操作,那么要求你两个输入的时钟强制同步其实没有什么道理的,因为完全有可能是把一条离现在时间很近的数据流和一个离当前时间很远的数据流进行 JOIN,这个时候对于快的那条流,因为它要等慢的那条流,所以说它可能就要在状态中去缓存非常多的数据,这对于整个集群来说是一个很大的性能开销

    2.5. ProcessFunction

    在正式介绍 watermark 的处理之前,先简单介绍 ProcessFunction,因为 watermark 在任务里的处理逻辑分为内部逻辑外部逻辑

    • 外部逻辑其实就是通过 ProcessFunction 来体现的,如果你需要使用 Flink 提供的时间相关的 API 的话就只能写在 ProcessFunction 里。

    ProcessFunction 和时间相关的功能主要有三点:

    1. 第一点就是根据你当前系统使用的时间语义不同,你可以去获取当前你正在处理这条记录的 Record Timestamp,或者当前的 Processing Time

    2. 第二点就是它可以获取当前算子的时间,可以把它理解成当前的 watermark

    3. 第三点就是为了在 ProcessFunction 中去实现一些相对复杂的功能,允许注册一些 timer(定时器)。

    比如说在 watermark 达到某一个时间点的时候就触发定时器,所有的这些回调逻辑也都是由用户来提供,涉及到如下三个方法,registerEventTimeTimer、registerProcessingTimeTimer 和 onTimer。在 onTimer 方法中就需要去实现自己的回调逻辑,当条件满足时回调逻辑就会被触发。

    一个简单的应用是,我们在做一些时间相关的处理的时候,可能需要缓存一部分数据,但这些数据不能一直去缓存下去,所以需要有一些过期的机制,我们可以通过 timer 去设定这么一个时间,指定某一些数据可能在将来的某一个时间点过期,从而把它从状态里删除掉。所有的这些和时间相关的逻辑在 Flink 内部都是由自己的 Time Service(时间服务)完成的。

    2.6. Watermark 处理

    在这里插入图片描述

    2.7.多流的Watermark

    在实际的计算中,往往会出现一个作业中会处理多个source的数据, 对source的数据进行groupBy分组, 那么来自不同source的相同的key会shuffle到同一个处理节点. 并且携带各自的Watermark . 
    Flink内部要保证Watermark的**单调递增** , 多个source的Watermark汇聚到一起是不可能单调递增的. 
    Flink内部实现每一个边上只能有一个递增的Watermark, 当出现多个流携带EventTime汇聚到一起(groupBy或者Union). **Flink会选择所有输入流中EventTime中最小的一个向下游流出**. 从而保证Watermark的单调递增和数据的完整性.
    

    在这里插入图片描述

    三 .代码实例

    Watermark 就是一个 时间戳 ,Flink可以给数据流添加Watermark,可以理解为:收到一条消息后,额外给这个消
    息添加了一个时间字段,这就是添加Watermark。

    • Watermark并不会影响原有Eventtime
    • 当数据流添加Watermark后,会按照Watermark时间来触发窗口计算
    • 一般会设置Watermark时间,比Eventtime小几秒钟
    • 当接收到的 Watermark时间 >= 窗口的endTime ,则触发计算

    在这里插入图片描述

    3.1. 直接在数据源上使用WaterMark [重点]

    
    package com.boyi.watermark
    
    import java.time.Duration
    import java.util.Properties
    
    import org.apache.flink.api.common.eventtime. WatermarkStrategy
    import org.apache.flink.api.common.serialization.SimpleStringSchema
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer
    import org.apache.kafka.clients.CommonClientConfigs
    
    
    //  直接在数据源上使用WaterMark
    
    
    
    //当使用 Apache Kafka 连接器作为数据源时,每个 Kafka 分区可能有一个简单的事件时间模式(递增的时间戳或有界无序)。
    // 然而,当使用 Kafka 数据源时,多个分区常常并行使用,
    // 因此交错来自各个分区的事件数据就会破坏每个分区的事件时间模式(这是 Kafka 消费客户端所固有的)。
    //
    // 在这种情况下,你可以使用 Flink 中可识别 Kafka 分区的 watermark 生成机制。
    // 使用此特性,将在 Kafka 消费端内部针对每个 Kafka 分区生成 watermark,
    // 并且不同分区 watermark 的合并方式与在数据流 shuffle 时的合并方式相同。
    //
    //例如,
    // 如果每个 Kafka 分区中的事件时间戳严格递增,则使用时间戳单调递增按分区生成的 watermark 将生成完美的全局 watermark。
    //
    // 注意,我们在示例中未使用 TimestampAssigner,而是使用了 Kafka 记录自身的时间戳。
    //
    object KafkaDataSourceWaterMarkDemo {
    
      def main(args : Array[String]) : Unit = {
        // 1. 创建流处理运行环境
        val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val kafkaCluster = "192.168.101.30:9092"
        val kafkaTopic = "test"
        // 2. 创建Kafka数据流
        val props = new Properties()
        props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,kafkaCluster)
    
        val kafkaSource = new FlinkKafkaConsumer(kafkaTopic,new SimpleStringSchema(),props)
    
        // 3. 设置watermark
        kafkaSource.assignTimestampsAndWatermarks(WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(20)))
    
        // 4. 添加数据源
        val watermarkDataStream  = env.addSource(kafkaSource)
    
        // 5. 处理数据
        watermarkDataStream.flatMap(_.split(" "))
          .map(x => (x,1))
          .keyBy(_._1)
          .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
          .reduce((x1,x2) =>(x1._1, x1._2+x1._2))
          .print()
    
        // 6. 执行
        env.execute("KafkaWaterMarkDemo")
    
      }
      
    }
    
    
    

    3.2. 在非数据源的操作之后使用 WaterMark [重点]

    • 示例

    编写代码, 计算5秒内,用户的订单总额

    订单数据(订单ID——UUID、用户ID、时间戳、订单金额),要求添加水印来解决网络延迟问题。

    • 步骤
    1. 创建流处理运行环境
    2. 设置处理时间为EventTime
    3. 创建一个订单样例类Order,包含四个字段(订单ID、用户ID、订单金额、时间戳)
    4. 创建一个自定义数据源
      • 随机生成订单ID(UUID)
      • 随机生成用户ID(0-2)
      • 随机生成订单金额(0-100)
      • 时间戳为当前系统时间
      • 每隔1秒生成一个订单
    5. 添加水印
      • 允许延迟2秒
      • 在获取水印方法中,打印水印时间、事件时间和当前系统时间
    6. 按照用户进行分流
    7. 设置5秒的时间窗口
    8. 进行聚合计算
    9. 打印结果数据
    10. 启动执行流处理
    
    package com.boyi.watermark
    
    import java.time.Duration
    import java.util.concurrent.TimeUnit
    import java.util.{Date, UUID}
    
    import org.apache.flink.api.common.eventtime.{SerializableTimestampAssigner, WatermarkStrategy}
    import org.apache.flink.api.scala.createTypeInformation
    import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.assigners.TumblingProcessingTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    
    import scala.util.Random
    
    // 在非数据源的操作之后使用 WaterMark
    
    object WaterMarkDemo {
      // 创建一个订单样例类`Order`,包含四个字段(订单ID、用户ID、订单金额、时间戳)
      case class Order (orderId: String, userId: Int, money: Long, timestamp: Long)
    
      def main(args : Array[String]) : Unit = {
        // 1. 创建流处理运行环境
        val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        // 2. 创建一个自定义数据源
        val orderDataStream = env.addSource(new RichSourceFunction[Order] {
          var isRunning = true
          override def run(sourceContext: SourceFunction.SourceContext[Order]): Unit = {
            while (isRunning){
              //   - 随机生成订单ID(UUID)
              // - 随机生成用户ID(0-2)
              // - 随机生成订单金额(0-100)
              // - 时间戳为当前系统时间
              // - 每隔1秒生成一个订单
              val order = Order(UUID.randomUUID().toString,Random.nextInt(3),Random.nextInt(101),new Date().getTime)
    
              sourceContext.collect(order)
              TimeUnit.SECONDS.sleep(2)
            }
          }
    
          override def cancel(): Unit = {
            isRunning = false
          }
        })
    
    
        // 3. 添加Watermark
        val watermarkDataStream =  orderDataStream.assignTimestampsAndWatermarks(
          WatermarkStrategy.forBoundedOutOfOrderness[Order](Duration.ofSeconds(20))
          .withTimestampAssigner(new SerializableTimestampAssigner[Order] {
            override def extractTimestamp(element: Order, recordTimestamp: Long): Long = {
              element.timestamp
            }
          })
          .withIdleness(Duration.ofMinutes(1))
        )
    
        // 6. 按照用户进行分流
        // 7. 设置5秒的时间窗口
        // 8. 进行聚合计算
        // 9. 打印结果数据
        // 10. 启动执行流处理
        watermarkDataStream.keyBy(_.userId)
          .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
          .reduce {
            (order1, order2) =>
              Order(order2.orderId, order2.userId, order1.money + order2.money, 0)
          }
          .print()
        env.execute("WarkMarkDemoJob")
    
      }
    }
    
    

    3.3. 处理空闲数据源

    
    
    /**
     *
     *
     * 如果数据源中的某一个分区/分片在一段时间内未发送事件数据,
     * 则意味着 WatermarkGenerator 也不会获得任何新数据去生成 watermark。
     *
     * 我们称这类数据源为空闲输入或空闲源。
     *
     * 在这种情况下,当某些其他分区仍然发送事件数据的时候就会出现问题。
     * 由于下游算子 watermark 的计算方式是取所有不同的上游并行数据源 watermark 的最小值,则其 watermark 将不会发生变化。
     *
     * 为了解决这个问题,
     * 你可以使用 WatermarkStrategy 来检测空闲输入并将其标记为空闲状态。
     *
     * WatermarkStrategy 为此提供了一个工具接口
     * WatermarkStrategy
     * .forBoundedOutOfOrderness[(Long, String)](Duration.ofSeconds(20))
     * .withIdleness(Duration.ofMinutes(1))
     *
     */
    object FreeDataSourceWaterMark {
    
      def main(args : Array[String]) : Unit = {
        // 1. 创建流处理运行环境
        val env : StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
    
        val kafkaCluster = "192.168.101.30:9092"
        val kafkaTopic = "test"
        // 2. 创建Kafka数据流
        val props = new Properties()
        props.setProperty(CommonClientConfigs.BOOTSTRAP_SERVERS_CONFIG,kafkaCluster)
    
        val kafkaSource = new FlinkKafkaConsumer(kafkaTopic,new SimpleStringSchema(),props)
    
        // 3. 设置watermark
        kafkaSource.assignTimestampsAndWatermarks(
          WatermarkStrategy.forBoundedOutOfOrderness[String](Duration.ofSeconds(20))
            .withIdleness(Duration.ofMinutes(1))
        )
        
        // 4. 添加数据源
        val watermarkDataStream  = env.addSource(kafkaSource)
    
        // 5. 处理数据
        watermarkDataStream.flatMap(_.split(" "))
          .map(x => (x,1))
          .keyBy(_._1)
          .window(TumblingProcessingTimeWindows.of(Time.seconds(10)))
          .reduce((x1,x2) =>(x1._1, x1._2+x1._2))
          .print()
    
        // 6. 执行
        env.execute("KafkaWaterMarkDemo")
    
      }
    }
    
    

    3.4.单调递增时间戳分配器

    周期性 watermark 生成方式的一个最简单特例就是你给定的数据源中数据的时间戳升序出现。在这种情况下,当前时间戳就可以充当 watermark,因为后续到达数据的时间戳不会比当前的小。

    注意:在 Flink 应用程序中,如果是并行数据源,则只要求并行数据源中的每个单分区数据源任务时间戳递增。例如,设置每一个并行数据源实例都只读取一个 Kafka 分区,则时间戳只需在每个 Kafka 分区内递增即可。Flink 的 watermark 合并机制会在并行数据流进行分发(shuffle)、联合(union)、连接(connect)或合并(merge)时生成正确的 watermark

    WatermarkStrategy.forMonotonousTimestamps()

    3.5. 数据之间存在最大固定延迟的时间戳分配器

    另一个周期性 watermark 生成的典型例子是,watermark 滞后于数据流中最大(事件时间)时间戳一个固定的时间量。

    该示例可以覆盖的场景是你预先知道数据流中的数据可能遇到的最大延迟,例如,在测试场景下创建了一个自定义数据源,并且这个数据源的产生的数据的时间戳在一个固定范围之内。Flink 针对上述场景提供了 boundedOutfordernessWatermarks 生成器,该生成器将 maxOutOfOrderness 作为参数,该参数代表在计算给定窗口的结果时,允许元素被忽略计算之前延迟到达的最长时间。其中延迟时长就等于 t_w - t ,其中 t 代表元素的(事件时间)时间戳,t_w 代表前一个 watermark 对应的(事件时间)时间戳。如果 lateness > 0,则认为该元素迟到了,并且在计算相应窗口的结果时默认会被忽略。有关使用延迟元素的详细内容,请参阅有关允许延迟的文档。

    WatermarkStrategy
    .forBoundedOutOfOrderness(Duration.ofSeconds(10))

    3.6. Flink1.12之前版本 [过时]

    参考代码

    
    import java.util.UUID
    import java.util.concurrent.TimeUnit
    import org.apache.commons.lang.time.FastDateFormat
    import org.apache.flink.api.scala._
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.time.Time
    
    import scala.util.Random
    
    object WaterMarkDemo {
    
      // 3. 创建一个订单样例类`Order`,包含四个字段(订单ID、用户ID、订单金额、时间戳)
      case class Order(orderId: String, userId: Int, money: Long, timestamp: Long)
    
      def main(args: Array[String]): Unit = {
        // 1. 创建流处理运行环境
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        //   2. 设置处理时间为`EventTime`
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        // 4. 创建一个自定义数据源
        val orderDataStream: DataStream[Order] = env.addSource(new RichSourceFunction[Order] {
          var isRunning = true
    
          override def run(ctx: SourceFunction.SourceContext[Order]): Unit = {
            while (isRunning) {
              //   - 随机生成订单ID(UUID)
              // - 随机生成用户ID(0-2)
              // - 随机生成订单金额(0-100)
              // - 时间戳为当前系统时间
              // - 每隔1秒生成一个订单
              val order = Order(UUID.randomUUID().toString, Random.nextInt(3), Random.nextInt(101), new java.util.Date().getTime)
              ctx.collect(order)
              TimeUnit.SECONDS.sleep(1)
            }
          }
    
          override def cancel(): Unit = isRunning = false
        })
        // 5. 添加水印
        val watermarkDataStream = orderDataStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Order] {
          var currentTimestamp = 0L
          val delayTime = 2000
    
          override def getCurrentWatermark: Watermark = {
            //   - 允许延迟2秒
            // - 在获取水印方法中,打印水印时间、当前事件时间和当前系统时间
            val watermark = new Watermark(currentTimestamp - delayTime)
            val dateFormat = FastDateFormat.getInstance("HH:mm:ss")
    
            println(s"当前水印时间:${dateFormat.format(watermark.getTimestamp)}, 当前事件时间: ${dateFormat.format(currentTimestamp)}, 当前系统时间: ${dateFormat.format(System.currentTimeMillis())}")
            watermark
          }
    
          override def extractTimestamp(element: Order, previousElementTimestamp: Long): Long = {
            val timestamp = element.timestamp
            currentTimestamp = Math.max(currentTimestamp, timestamp)
            currentTimestamp
          }
        })
        // 6. 按照用户进行分流
        // 7. 设置5秒的时间窗口
        // 8. 进行聚合计算
        // 9. 打印结果数据
        // 10. 启动执行流处理
        watermarkDataStream.keyBy(_.userId)
          .timeWindow(Time.seconds(5))
          .reduce {
            (order1, order2) =>
              Order(order2.orderId, order2.userId, order1.money + order2.money, 0)
          }
          .print()
        env.execute("WarkMarkDemoJob")
      }
    }
    

    参考:

    官方文档:
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/event_timestamps_watermarks.html
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/dev/event_time.html
    https://ververica.cn/developers/advanced-tutorial-2-time-depth-analysis/

    展开全文
  • 对于Flink来说,Watermark是个很难绕过去的概念。本文将从整体的思路上来说,运用感性直觉的思考来帮大家梳理Watermark概念。 2.问题 关于Watermark,很容易产生几个问题 Flink 流处理应用中,常见的处理需求/应对...

    本文转自:https://www.cnblogs.com/rossiXYZ/p/12286407.html

    1. 摘要

    对于Flink来说,Watermark是个很难绕过去的概念。本文将从整体的思路上来说,运用感性直觉的思考来帮大家梳理Watermark概念。

    2.问题

    关于Watermark,很容易产生几个问题

    • Flink 流处理应用中,常见的处理需求/应对方案是什么?
    • Watermark究竟应该翻译成水印还是水位线?
    • Watermark本质是什么?
    • Watermark是如何解决问题?

    下面我们就来简要解答这些问题以给大家一个大致概念,在后文中,会再深入描述。

    问题1. Flink 流处理应用中常见的需求/方案是什么
    聚合类的处理 Flink可以每来一个消息就处理一次,但是有时我们需要做一些聚合类的处理,例如:在过去的1分钟内有多少用户点击了我们的网页。所以Flink引入了窗口概念。

    窗口 窗口的作用为了周期性的获取数据。就是把传入的原始数据流切分成多个buckets,所有计算都在单一的buckets中进行。窗口(window)就是从 Streaming 到 Batch 的一个桥梁。

    带来的问题:聚合类处理带来了新的问题,比如乱序/延迟。其解决方案就是 Watermark / allowLateNess / sideOutPut 这一组合拳。

    Watermark 的作用是防止 数据乱序 / 指定时间内获取不到全部数据。

    allowLateNess 是将窗口关闭时间再延迟一段时间。

    **sideOutPut ** 是最后兜底操作,当指定窗口已经彻底关闭后,就会把所有过期延迟数据放到侧输出流,让用户决定如何处理。

    总结起来就是说

    Windows -----> Watermark -----> allowLateNess -----> sideOutPut 
        
    用Windows把流数据分块处理,用Watermark确定什么时候不再等待更早的数据/触发窗口进行计算,用allowLateNess 将窗口关闭时间再延迟一段时间。用sideOutPut 最后兜底把数据导出到其他地方。
    

    问题2. Watermark应该翻译成水位线
    我最初看的一篇文章中把Watermark翻译成“水印”。我当时比较晕。因为按说名字一定能够反应事物本质。但是我怎么也脑补不出这个”水印“的本质。

    继续看文章内容,越来越觉得这个应该翻译成“水位线”。于是查了查,确实英文有如下翻译:high-water mark 高水位线(海水或洪水所达到的最高水位)。

    后来逐渐看到其他文章中也有翻译成水位线,我才放心下来,终于不会出现第二个“套接字”这样神奇的翻译了。

    问题3. Watermark本质是什么
    Watermarks是基于已经收集的消息来估算是否还有消息未到达,本质上是一个时间戳。时间戳反映的是事件发生的时间,而不是事件处理的时间。

    这个从Flink的源码就能看出来,唯一有意义的成员变量就是 timestamp。

    public final class Watermark extends StreamElement {
      /*The watermark that signifies end-of-event-time. */
      public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
      /* The timestamp of the watermark in milliseconds. */
      private final long timestamp;
      /* Creates a new watermark with the given timestamp in milliseconds.*/
      public Watermarklong timestamp) {
        this.timestamp = timestamp;
      }
      /*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/
      public long getTimestamp() {
        return timestamp;
      }
    }
    

    问题4. Watermark如何解决问题
    Watermark是一种告诉Flink一个消息延迟多少的方式。它定义了什么时候不再等待更早的数据。

    可以把Watermarks理解为一个水位线,这个Watermarks在不断的变化。Watermark实际上作为数据流的一部分随数据流流动。

    当Flink中的运算符接收到Watermarks时,它明白早于该时间的消息已经完全抵达计算引擎,即假设不会再有时间小于水位线的事件到达。

    这个假设是触发窗口计算的基础,只有水位线越过窗口对应的结束时间,窗口才会关闭和进行计算。

    3 背景概念

    流处理
    流处理,最本质的是在处理数据的时候,接受一条处理一条数据。

    批处理,则是累积数据到一定程度在处理。这是他们本质的区别。

    在设计上Flink认为数据是流式的,批处理只是流处理的特例。同时对数据分为有界数据和无界数据。

    • 有界数据对应批处理,API对应Dateset。
    • 无界数据对应流处理,API对应DataStream。

    乱序(out-of-order)
    什么是乱序呢?可以理解为数据到达的顺序和其实际产生时间的排序不一致。导致这的原因有很多,比如延迟,消息积压,重试等等。

    我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。

    比如:

    某数据源中的某些数据由于某种原因(如:网络原因,外部存储自身原因)会有5秒的延时,
    也就是在实际时间的第1秒产生的数据有可能在第5秒中产生的数据之后到来(比如到Window处理节点)。
    
    有1~10个事件。
    乱序到达的序列是:2,3,4,5,1,6,3,8,9,10,7
    

    4 Flink中的窗口概念

    窗口
    对于Flink,如果来一条消息计算一条,这样是可以的,但是这样计算是非常频繁而且消耗资源,如果想做一些统计这是不可能的。所以对于Spark和Flink都产生了窗口计算。

    比如 是因为我们想看到过去一分钟,过去半小时的访问数据,这时候我们就需要窗口。

    Window:Window是处理无界流的关键,Windows将流拆分为一个个有限大小的buckets,可以可以在每一个buckets中进行计算。

    start_time,end_time:当Window时时间窗口的时候,每个window都会有一个开始时间和结束时间(前开后闭),这个时间是系统时间。

    窗口生命周期
    简而言之,只要属于此窗口的第一个元素到达,就会创建一个窗口,当时间(事件或处理时间)超过其结束时间戳加上用户指定的允许延迟时,窗口将被完全删除。

    例如:

    使用基于事件时间的窗口策略,每5分钟创建一个不重叠(或翻滚)的窗口并允许延迟1分钟。
     
    假定目前是12:00。
    
    当具有落入该间隔的时间戳的第一个元素到达时,Flink将为12:00到12:05之间的间隔创建一个新窗口,当水位线(watermark)到12:06时间戳时将删除它。
    

    窗口有如下组件:

    Window Assigner:用来决定某个元素被分配到哪个/哪些窗口中去。
    Trigger:触发器。决定了一个窗口何时能够被计算或清除。触发策略可能类似于“当窗口中的元素数量大于4”时,或“当水位线通过窗口结束时”。
    Evictor:它可以在 触发器触发后 & 应用函数之前和/或之后 从窗口中删除元素。

    窗口还拥有函数,比如ProcessWindowFunction,ReduceFunction,AggregateFunction或FoldFunction。该函数将包含要应用于窗口内容的计算,而触发器指定窗口被认为准备好应用该函数的条件。

    Keyed vs Non-Keyed Windows
    在定义窗口之前,要指定的第一件事是流是否需要Keyed,使用keyBy(…)将无界流分成逻辑的keyed stream。 如果未调用keyBy(…),则表示流不是keyed stream。

    • 对于Keyed流,可以将传入事件的任何属性用作key。 拥有Keyed stream将允许窗口计算由多个任务并行执行,因为每个逻辑Keyed流可以独立于其余任务进行处理。 相同Key的所有元素将被发送到同一个任务。
    • 在Non-Keyed流的情况下,原始流将不会被分成多个逻辑流,并且所有窗口逻辑将由单个任务执行,即并行性为1。

    窗口分类
    窗口分类可以分成:翻滚窗口(Tumbling Window,无重叠),滚动窗口(Sliding Window,有重叠),和会话窗口,(Session Window,活动间隙)

    滚动窗口
    滚动窗口分配器将每个元素分配给固定窗口大小的窗口。滚动窗口大小固定的并且不重叠。例如,如果指定大小为5分钟的滚动窗口,则将执行当前窗口,并且每五分钟将启动一个新窗口。

    滑动窗口
    滑动窗口与滚动窗口的区别就是滑动窗口有重复的计算部分。

    滑动窗口分配器将每个元素分配给固定窗口大小的窗口。类似于滚动窗口分配器,窗口的大小由窗口大小参数配置。另外一个窗口滑动参数控制滑动窗口的启动频率(how frequently a sliding window is started)。因此,如果滑动大小小于窗口大小,滑动窗可以重叠。在这种情况下,元素被分配到多个窗口。

    例如,你可以使用窗口大小为10分钟的窗口,滑动大小为5分钟。这样,每5分钟会生成一个窗口,包含最后10分钟内到达的事件。

    会话窗口
    会话窗口分配器通过活动会话分组元素。与滚动窗口和滑动窗口相比,会话窗口不会重叠,也没有固定的开始和结束时间。相反,当会话窗口在一段时间内没有接收到元素时会关闭。

    例如,不活动的间隙时。会话窗口分配器配置会话间隙,定义所需的不活动时间长度(defines how long is the required period of inactivity)。当此时间段到期时,当前会话关闭,后续元素被分配到新的会话窗口。

    5 Flink中的时间概念

    Flink在流处理程序支持不同的时间概念。分别为Event Time/Processing Time/Ingestion Time,也就是事件时间、处理时间、提取时间。

    从时间序列角度来说,发生的先后顺序是:

    事件时间(Event Time)----> 提取时间(Ingestion Time)----> 处理时间(Processing Time)
    
    • Event Time 是事件在现实世界中发生的时间,它通常由事件中的时间戳描述。
    • Ingestion Time 是数据进入Apache Flink流处理系统的时间,也就是Flink读取数据源时间。
    • Processing Time 是数据流入到具体某个算子 (消息被计算处理) 时候相应的系统时间。也就是Flink程序处理该事件时当前系统时间。
      但是我们讲解时,会从后往前讲解,把最重要的Event Time放在最后。

    处理时间
    是数据流入到具体某个算子时候相应的系统时间。

    这个系统时间指的是执行相应操作的机器的系统时间。当一个流程序通过处理时间来运行时,所有基于时间的操作(如: 时间窗口)将使用各自操作所在的物理机的系统时间。

    ProcessingTime 有最好的性能和最低的延迟。但在分布式计算环境或者异步环境中,ProcessingTime具有不确定性,相同数据流多次运行有可能产生不同的计算结果。因为它容易受到从记录到达系统的速度(例如从消息队列)到记录在系统内的operator之间流动的速度的影响(停电,调度或其他)。

    提取时间
    IngestionTime是数据进入Apache Flink框架的时间,是在Source Operator中设置的。每个记录将源的当前时间作为时间戳,并且后续基于时间的操作(如时间窗口)引用该时间戳。

    提取时间在概念上位于事件时间和处理时间之间。与处理时间相比,它稍早一些。IngestionTime与ProcessingTime相比可以提供更可预测的结果,因为IngestionTime的时间戳比较稳定(在源处只记录一次),所以同一数据在流经不同窗口操作时将使用相同的时间戳,而对于ProcessingTime同一数据在流经不同窗口算子会有不同的处理时间戳。

    与事件时间相比,提取时间程序无法处理任何无序事件或后期数据,但程序不必指定如何生成水位线。

    在内部,提取时间与事件时间非常相似,但具有自动时间戳分配和自动水位线生成功能。

    事件时间
    事件时间就是事件在真实世界的发生时间,即每个事件在产生它的设备上发生的时间(当地时间)。比如一个点击事件的时间发生时间,是用户点击操作所在的手机或电脑的时间。

    在进入Apache Flink框架之前EventTime通常要嵌入到记录中,并且EventTime也可以从记录中提取出来。在实际的网上购物订单等业务场景中,大多会使用EventTime来进行数据计算。

    基于事件时间处理的强大之处在于即使在乱序事件,延迟事件,历史数据以及从备份或持久化日志中的重复数据也能获得正确的结果。对于事件时间,时间的进度取决于数据,而不是任何时钟。

    事件时间程序必须指定如何生成事件时间的Watermarks,这是表示事件时间进度的机制。

    现在假设我们正在创建一个排序的数据流。这意味着应用程序处理流中的乱序到达的事件,并生成同样事件但按时间戳(事件时间)排序的新数据流。

    比如:

    有1~10个事件。
    乱序到达的序列是:1,2,4,5,6,3,8,9,10,7
    经过按 事件时间 处理后的序列是:1,2,3,4,5,6,7,8,9,10
    

    为了处理事件时间,Flink需要知道事件的时间戳,这意味着流中的每条数据都需要分配其事件时间戳。这通常通过提取每条数据中的固定字段来完成时间戳的获取。

    设定时间特性
    Flink DataStream 程序的第一部分通常是设置基本时间特性。 该设置定义了数据流源的行为方式(例如:它们是否将分配时间戳),以及像 **KeyedStream.timeWindow(Time.seconds(30)) ** 这样的窗口操作应该使用上面哪种时间概念。

    比如:

    final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
    env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
    

    6 Watermark

    前文讲到了事件时间,这个真实发生的时间是我们业务在实时处理程序中非常关心的。在一个理想的情况下,事件时间处理将产生完全一致和确定的结果,无论事件何时到达或其排序。但是在现实中,消息不在是按照顺序发送,产生了乱序,这时候该怎么处理?

    Watermark是Apache Flink为了处理EventTime 窗口计算提出的一种机制,本质上也是一种时间戳。watermark是用于处理乱序事件或延迟数据的,这通常用watermark机制结合window来实现(Watermarks用来触发window窗口计算)。

    比如对于late element,我们不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。 可以把Watermark看作是一种告诉Flink一个消息延迟多少的方式。定义了什么时候不再等待更早的数据。

    1. 窗口触发条件
    上面谈到了对数据乱序问题的处理机制是watermark+window,那么window什么时候该被触发呢?
    基于Event Time的事件处理,Flink默认的事件触发条件为:

    对于out-of-order及正常的数据而言

    • watermark的时间戳 > = window endTime
    • 在 [window_start_time,window_end_time] 中有数据存在。
      对于late element太多的数据而言
    • Event Time > watermark的时间戳

    WaterMark相当于一个EndLine,一旦Watermarks大于了某个window的end_time,就意味着windows_end_time时间和WaterMark时间相同的窗口开始计算执行了。

    就是说,我们根据一定规则,计算出Watermarks,并且设置一些延迟,给迟到的数据一些机会,也就是说正常来讲,对于迟到的数据,我只等你一段时间,再不来就没有机会了。

    WaterMark时间可以用Flink系统现实时间,也可以用处理数据所携带的Event time。

    使用Flink系统现实时间,在并行和多线程中需要注意的问题较少,因为都是以现实时间为标准。

    如果使用处理数据所携带的Event time作为WaterMark时间,需要注意两点:

    • 因为数据到达并不是循序的,注意保存一个当前最大时间戳作为WaterMark时间
    • 并行同步问题

    2. WaterMark设定方法
    标点水位线(Punctuated Watermark)
    标点水位线(Punctuated Watermark)通过数据流中某些特殊标记事件来触发新水位线的生成。这种方式下窗口的触发与时间无关,而是决定于何时收到标记事件。

    在实际的生产中Punctuated方式在TPS很高的场景下会产生大量的Watermark在一定程度上对下游算子造成压力,所以只有在实时性要求非常高的场景才会选择Punctuated的方式进行Watermark的生成。

    定期水位线(Periodic Watermark)
    周期性的(允许一定时间间隔或者达到一定的记录条数)产生一个Watermark。水位线提升的时间间隔是由用户设置的,在两次水位线提升时隔内会有一部分消息流入,用户可以根据这部分数据来计算出新的水位线。

    在实际的生产中Periodic的方式必须结合时间和积累条数两个维度继续周期性产生Watermark,否则在极端情况下会有很大的延时。

    举个例子,最简单的水位线算法就是取目前为止最大的事件时间,然而这种方式比较暴力,对乱序事件的容忍程度比较低,容易出现大量迟到事件。

    3. 迟到事件
    虽说水位线表明着早于它的事件不应该再出现,但是上如上文所讲,接收到水位线以前的的消息是不可避免的,这就是所谓的迟到事件。实际上迟到事件是乱序事件的特例,和一般乱序事件不同的是它们的乱序程度超出了水位线的预计,导致窗口在它们到达之前已经关闭。

    迟到事件出现时窗口已经关闭并产出了计算结果,因此处理的方法有3种:

    • 重新激活已经关闭的窗口并重新计算以修正结果。
    • 将迟到事件收集起来另外处理。
    • 将迟到事件视为错误消息并丢弃。
      Flink 默认的处理方式是第3种直接丢弃,其他两种方式分别使用Side OutputAllowed Lateness

    Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。

    Allowed Lateness机制允许用户设置一个允许的最大迟到时长。Flink 会在窗口关闭后一直保存窗口的状态直至超过允许迟到时长,这期间的迟到事件不会被丢弃,而是默认会触发窗口重新计算。因为保存窗口状态需要额外内存,并且如果窗口计算使用了 ProcessWindowFunction API 还可能使得每个迟到事件触发一次窗口的全量计算,代价比较大,所以允许迟到时长不宜设得太长,迟到事件也不宜过多,否则应该考虑降低水位线提高的速度或者调整算法。

    这里总结机制为:

    • 窗口window 的作用是为了周期性的获取数据。
    • watermark的作用是防止数据出现乱序(经常),事件时间内获取不到指定的全部数据,而做的一种保险方法。
    • allowLateNess是将窗口关闭时间再延迟一段时间。
    • sideOutPut是最后兜底操作,所有过期延迟数据,指定窗口已经彻底关闭了,就会把数据放到侧输出流。

    4. 实例
    采用系统时间做Watermark
    我们将水位线设置为当前系统时间间-5秒。

    override def getCurrentWatermark(): Watermark = {       
    	new Watermark(System.currentTimeMillis - 5000) 
    }
    

    通常最好保持接收到的最大时间戳,并创建具有最大预期延迟的水位线,而不是从当前系统时间减去。

    采用Event Time做watermark
    例如基于Event Time的数据,自身都包含一个类型为timestamp的字段,假设叫做rowtime,例如1543903383(2018-12-04 14:03:03),定义一个基于rowtime列,策略为偏移3s的watermark,这条数据的水位线时间戳则是:

    1543903383-3000 = 1543900383(2018-12-04 14:03:00)
    

    该条数据的水位线时间含义:timestamp小于1543900383(2018-12-04 14:03:00)的数据,都已经到达了。

    class BoundedOutOfOrdernessGenerator extends AssignerWithPeriodicWatermarks[MyEvent] {
        val maxOutOfOrderness = 3000L; // 3 seconds
        var currentMaxTimestamp: Long;
        override def extractTimestamp(element: MyEvent, previousElementTimestamp: Long): Long = {
            val timestamp = element.getCreationTime()
            currentMaxTimestamp = max(timestamp, currentMaxTimestamp)
            timestamp;
        }
        override def getCurrentWatermark(): Watermark = {
            // return the watermark as current highest timestamp minus the out-of-orderness bound
            new Watermark(currentMaxTimestamp - maxOutOfOrderness);
        }
    }
    

    看看如何触发窗口
    我们明白了窗口的触发机制,这里我们添加了水位线,到底是个怎么个情况?我们来看下面

    假如我们设置10s的时间窗口(window),那么010s,1020s都是一个窗口,以0~10s为例,0为start-time,10为end-time。假如有4个数据的event-time分别是8(A),12.5(B),9©,13.5(D),我们设置Watermarks为当前所有到达数据event-time的最大值减去延迟值3.5秒
    当A到达的时候,Watermarks为max{8}-3.5=8-3.5 = 4.5 < 10,不会触发计算
    当B到达的时候,Watermarks为max(12.5,8)-3.5=12.5-3.5 = 9 < 10,不会触发计算
    当C到达的时候,Watermarks为max(12.5,8,9)-3.5=12.5-3.5 = 9 < 10,不会触发计算
    当D到达的时候,Watermarks为max(13.5,12.5,8,9)-3.5=13.5-3.5 = 10 = 10,触发计算
    触发计算的时候,会将A,C(因为他们都小于10)都计算进去,其中C是迟到的。

    max这个很关键,就是当前窗口内,所有事件的最大事件。

    这里的延迟3.5s是我们假设一个数据到达的时候,比他早3.5s的数据肯定也都到达了,这个是需要根据经验推算。假设加入D到达以后有到达了一个E,event-time=6,但是由于0~10的时间窗口已经开始计算了,所以E就丢了。

    从这里上面E的丢失说明,水位线也不是万能的,但是如果根据我们自己的生产经验+侧道输出等方案,可以做到数据不丢失。

    7 Flink源码

    数据结构定义
    在Flink DataStream中流动着不同的元素,统称为StreamElement,StreamElement可以是StreamRecord、Watermark、StreamStatus、LatencyMarker中任何一种类型。

    StreamElement
    StreamElement是一个抽象类(是Flink 承载消息的基类),其他四种类型继承StreamElement。

    public abstract class StreamElement {
      //判断是否是Watermark
      public final boolean isWatermark() {
        return getClass() == Watermark.class;
      }
      //判断是否为StreamStatus
      public final boolean isStreamStatus() {
        return getClass() == StreamStatus.class;
      }
      //判断是否为StreamRecord
      public final boolean isRecord() {
        return getClass() == StreamRecord.class;
      }
      //判断是否为LatencyMarker
      public final boolean isLatencyMarker() {
        return getClass() == LatencyMarker.class;
      }
      //转换为StreamRecord
      public final <E> StreamRecord<E> asRecord() {
        return (StreamRecord<E>) this;
      }
      //转换为Watermark
      public final Watermark asWatermark() {
        return (Watermark) this;
      }
      //转换为StreamStatus
      public final StreamStatus asStreamStatus() {
        return (StreamStatus) this;
      }
      //转换为LatencyMarker
      public final LatencyMarker asLatencyMarker() {
        return (LatencyMarker) this;
      }
    }
    

    Watermark
    Watermark 继承了StreamElement。Watermark 是和事件一个级别的抽象,其内部包含一个成员变量时间戳timestamp,标识当前数据的时间进度。Watermark实际上作为数据流的一部分随数据流流动。

    @PublicEvolving
    public final class Watermark extends StreamElement {
      /*The watermark that signifies end-of-event-time. */
      public static final Watermark MAX_WATERMARK = new Watermark(Long.MAX_VALUE);
      /* The timestamp of the watermark in milliseconds. */
      private final long timestamp;
      /* Creates a new watermark with the given timestamp in milliseconds.*/
      public Watermarklong timestamp) {
    	this.timestamp = timestamp;
      }
      /*Returns the timestamp associated with this {@link Watermark} in milliseconds.**/
      public long getTimestamp() {
        return timestamp;
      }
    }
    

    Flink如何生成&处理Watermark
    在实际使用中大多数情况下会选择周期性生成方式也就是AssignerWithPeriodicWatermarks方式.

    //指定为evenTime时间语义
    env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    //生成watermark的周期
    env.getConfig.setAutoWatermarkInterval(watermarkInterval)
    //指定方式
    dataStream.assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[Element](Time.seconds(allowDealy)) {
       override def extractTimestamp(element: Element): Long = element.dT
      })
    

    BoundedOutOfOrdernessTimestampExtractor 是Flink内置提供的允许乱序最大延时的watermark生成方式,只需要重写其extractTimestamp方法即可。

    assignTimestampsAndWatermarks 可以理解为是一个算子转换操作,等同于map/window一样理解,可以为其设置并行度、名称,也是一个transformation/operator,

    public SingleOutputStreamOperator<T> assignTimestampsAndWatermarks(
    		AssignerWithPeriodicWatermarks<T> timestampAndWatermarkAssigner) {
    
    	// match parallelism to input, otherwise dop=1 sources could lead to some strange
    	// behaviour: the watermark will creep along very slowly because the elements
    	// from the source go to each extraction operator round robin.
    	final int inputParallelism = getTransformation().getParallelism();
    	final AssignerWithPeriodicWatermarks<T> cleanedAssigner = clean(timestampAndWatermarkAssigner);
    
    	TimestampsAndPeriodicWatermarksOperator<T> operator =
    			new TimestampsAndPeriodicWatermarksOperator<>(cleanedAssigner);
    
    	return transform("Timestamps/Watermarks", getTransformation().getOutputType(), operator)
    			.setParallelism(inputParallelism);
    }
    

    其使用的StreamOperator类型TimestampsAndPeriodicWatermarksOperator,继承了AbstractUdfStreamOperator,实现了OneInputStreamOperator接口与ProcessingTimeCallback接口,

    TimestampsAndPeriodicWatermarksOperator。

    /**
     * A stream operator that extracts timestamps from stream elements and
     * generates periodic watermarks.
     *
     * @param <T> The type of the input elements
     */
    public class TimestampsAndPeriodicWatermarksOperator<T>
    		extends AbstractUdfStreamOperator<T, AssignerWithPeriodicWatermarks<T>>
    		implements OneInputStreamOperator<T, T>, ProcessingTimeCallback {
    
    	private static final long serialVersionUID = 1L;
    	private transient long watermarkInterval;
    	private transient long currentWatermark;
    
    	public TimestampsAndPeriodicWatermarksOperator(AssignerWithPeriodicWatermarks<T> assigner) {
    		super(assigner);
    		this.chainingStrategy = ChainingStrategy.ALWAYS;
    	}
    
    	@Override
    	public void open() throws Exception {
    		super.open();
            //初始化默认当前watermark
    		currentWatermark = Long.MIN_VALUE;
            //生成watermark周期时间配置
    		watermarkInterval = getExecutionConfig().getAutoWatermarkInterval();
            //注册定时其配置
    		if (watermarkInterval > 0) {
    			long now = getProcessingTimeService().getCurrentProcessingTime();
                //注册一个watermarkInterval后触发的定时器,传入回调参数是this,也就是会调用当前对象的onProcessingTime方法
    			getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    		}
    	}
    
    	@Override
    	public void processElement(StreamRecord<T> element) throws Exception {
            //提取当前的事件时间
    		final long newTimestamp = userFunction.extractTimestamp(element.getValue(),
    				element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
            //保存当前最大的事件时间。
    		output.collect(element.replace(element.getValue(), newTimestamp));
    	}
    
    	@Override
    	public void onProcessingTime(long timestamp) throws Exception {
            //此方法表示的就是定时回调的方法,将符合要求的watermark发送出去并且注册下一个定时器。
    		// register next timer
    		Watermark newWatermark = userFunction.getCurrentWatermark();
            //当新的watermark大于当前的watermark
    		if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
    			currentWatermark = newWatermark.getTimestamp();
                //将符合要求的watermark发送出去
    			// emit watermark
    			output.emitWatermark(newWatermark);
    		}
            //注册下一次触发时间
    		long now = getProcessingTimeService().getCurrentProcessingTime();
    		getProcessingTimeService().registerTimer(now + watermarkInterval, this);
    	}
    
    	/**
    	 * Override the base implementation to completely ignore watermarks propagated from
    	 * upstream (we rely only on the {@link AssignerWithPeriodicWatermarks} to emit
    	 * watermarks from here).
    	 */
    	@Override
    	public void processWatermark(Watermark mark) throws Exception {
            //用来处理上游发送过来的watermark,可以认为不做任何处理,下游的watermark只与其上游最近的生成方式相关。
    		// if we receive a Long.MAX_VALUE watermark we forward it since it is used
    		// to signal the end of input and to not block watermark progress downstream
    		if (mark.getTimestamp() == Long.MAX_VALUE && currentWatermark != Long.MAX_VALUE) {
    			currentWatermark = Long.MAX_VALUE;
    			output.emitWatermark(mark);
    		}
    	}
    
    	@Override
    	public void close() throws Exception {
    		super.close();
    
    		// emit a final watermark
    		Watermark newWatermark = userFunction.getCurrentWatermark();
    		if (newWatermark != null && newWatermark.getTimestamp() > currentWatermark) {
    			currentWatermark = newWatermark.getTimestamp();
    			// emit watermark
    			output.emitWatermark(newWatermark);
    		}
    	}
    }
    

    Flink如何处理迟到数据
    这里我们使用 Side Output机制来说明。Side Output机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。

    生成新的Watermark
    Flink会替换StreamRecord 对象中的Timestamp,如果 根据当前事件的Timestamp 生成的Watermark 大于上一次的Watermark,就发出新的Watermark。

    具体代码在 TimestampsAndPunctuatedWatermarksOperator.processElement。

    @Override
    public void processElement(StreamRecord<T> element) throws Exception {
    	final T value = element.getValue();
        // 调用 用户实现的 extractTimestamp 获取新的Timestamp
    	final long newTimestamp = userFunction.extractTimestamp(value,
    			element.hasTimestamp() ? element.getTimestamp() : Long.MIN_VALUE);
        // 用新Timestamp 替换StreamRecord中的旧Timestamp
    	output.collect(element.replace(element.getValue(), newTimestamp));
        // 调用 用户实现的 checkAndGetNextWatermark 方法获取下一个Watermark
    	final Watermark nextWatermark = userFunction.checkAndGetNextWatermark(value, newTimestamp);
        // 如果下一个Watermark 大于当前Watermark,就发出新的Watermark
    	if (nextWatermark != null && nextWatermark.getTimestamp() > currentWatermark) {
    		currentWatermark = nextWatermark.getTimestamp();
    		output.emitWatermark(nextWatermark);
    	}
    }
    

    处理迟到数据
    首先,判断是否是迟到数据。

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
    			for (W window: elementWindows) {
    				// drop if the window is already late
                    // 如果窗口已经迟到了,则处理下一条数据
    				if (isWindowLate(window)) {
    					continue;
    				}   
                }
        ......
    }
    
    /**
     Returns {@code true} if the watermark is after the end timestamp plus the allowed lateness of the given window.
     */
    protected boolean isWindowLate(W window) {
        // 当前机制是 事件时间 && 窗口元素的最大时间戳 + 允许迟到时间 <= 当前水位线 的时候为true(即当前窗口元素迟到了)
    	return (windowAssigner.isEventTime() && (cleanupTime(window) <= internalTimerService.currentWatermark()));
    }
    
    /**
     * Returns the cleanup time for a window, which is
     * {@code window.maxTimestamp + allowedLateness}. In
     * case this leads to a value greater than {@link Long#MAX_VALUE}
     * then a cleanup time of {@link Long#MAX_VALUE} is
     * returned.
     *
     * @param window the window whose cleanup time we are computing.
     */
    private long cleanupTime(W window) {
    	if (windowAssigner.isEventTime()) {
    		long cleanupTime = window.maxTimestamp() + allowedLateness;
        //返回窗口的 cleanup 时间 : 窗口元素的最大时间戳 + 允许延迟的时间
    		return cleanupTime >= window.maxTimestamp() ? cleanupTime : Long.MAX_VALUE;
    	} else {
    		return window.maxTimestamp();
    	}
    }
    

    其次,处理迟到数据的具体代码在WindowOperator.processElement 方法的最后一段。这里就是旁路输出。

    @Override
    public void processElement(StreamRecord<IN> element) throws Exception {
        
        ......
        // 其他操作
        ......
        
        // side output input event if element not handled by any window  late arriving tag has been set
        // 如果没有window处理过这条数据,isSkippedElement = true,如果上面判断为迟到数据,isSkippedElement = false
        // windowAssigner is event time and current timestamp + allowed lateness no less than element timestamp
        if (isSkippedElement && isElementLate(element)) {
          if (lateDataOutputTag != null){
              //旁路输出
              //这就是我们之前提到的,Flink 的 Side Output 机制可以将迟到事件单独放入一个数据流分支,这会作为 window 计算结果的副产品,以便用户获取并对其进行特殊处理。
            sideOutput(element);
          } else {
            this.numLateRecordsDropped.inc();
          }
        }
    }
    
    /**
     * Decide if a record is currently late, based on current watermark and allowed lateness.
     * 当前机制是 事件时间 && (元素时间戳 + 允许延迟的时间) <= 当前水位线
     * @param element The element to check
     * @return The element for which should be considered when sideoutputs
     */
    protected boolean isElementLate(StreamRecord<IN> element){
    	return (windowAssigner.isEventTime()) &&
    		(element.getTimestamp() + allowedLateness <= internalTimerService.currentWatermark());
    }
    
    /**
     * Write skipped late arriving element to SideOutput.
     * // 把数据输出到旁路,供用户决定如何处理。
     * @param element skipped late arriving element to side output
     */
    protected void sideOutput(StreamRecord<IN> element){
        output.collect(lateDataOutputTag, element);
    }
    

    7 参考

    Flink实时性、容错机制、窗口等介绍

    彻底明白Flink系统学习11:【Flink1.7】事件时间、处理时间、提取时间有什么区别

    彻底明白Flink系统学习10:【Flink1.7】窗口生命周期、Keyed和非Keyed及分配器诠释

    Flink 轻松理解Watermark

    https://ci.apache.org/projects/flink/flink-docs-release-1.4/dev/event_time.html#event-time–processing-time–ingestion-time

    http://smartsi.club/flink-stream-event-time-and-processing-time.html

    Flink Event Time和WaterMark结合优势分析

    Flink WaterMark(水位线)分布式执行理解

    初学Flink,对Watermarks的一些理解和感悟

    浅谈WaterMark

    Flink WaterMark实例

    Apache Flink 漫谈系列(03) - Watermark

    Flink 的Event Time

    Flink流计算编程–watermark(水位线)简介

    Flink Watermark 机制浅析(透彻)

    Flink Time和Watermark的理解

    【源码解析】Flink 是如何处理迟到数据

    flink之延迟数据处理watermark allowedLateness() sideOutputLateData()

    Flink中Watermark定时生成源码分析

    展开全文
  • 一篇文章搞懂 Flink 的 watermark 机制

    千次阅读 多人点赞 2021-08-15 11:02:42
    watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。 我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到...

    前言

    本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

    本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

    正文

    在这里插入图片描述

    1、watermark的作用

    watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。
    我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间的。虽然大部分情况下,流到operator的数据都是按照事件产生的时间顺序来的,但是也不排除由于网络、背压等原因,导致乱序的产生(out-of-order或者说late element)。
    但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

    2、watermark解决迟到的数据

    实时系统中,由于各种原因造成的延时,造成某些消息发到flink的时间延时于事件产生的时间。如果基于event time构建window,但是对于late element,我们又不能无限期的等下去,必须要有个机制来保证一个特定的时间后,必须触发window去进行计算了。这个特别的机制,就是watermark。

    Watermarks(水位线)就是来处理这种问题的机制

    1. 参考google的DataFlow设计。
    2. 是event time处理进度的标志。
    3. 表示比watermark更早(更老)的事件都已经到达(没有比水位线更低的数据 )。
    4. 基于watermark来进行窗口触发计算的判断。

    有序的数据流watermark:

    在某些情况下,基于Event Time的数据流是有续的(相对event time)。在有序流中,watermark就是一个简单的周期性标记。
    在这里插入图片描述

    无序的数据流watermark:

    在更多场景下,基于Event Time的数据流是无续的(相对event time)。
    在无序流中,watermark至关重要,她告诉operator比watermark更早(更老/时间戳更小)的事件已经到达, operator可以将内部事件时间提前到watermark的时间戳(可以触发window计算啦)
    在这里插入图片描述

    并行流当中的watermark:

    通常情况下, watermark在source函数中生成,但是也可以在source后任何阶段,如果指定多次 watermark,后面指定的 watermarker会覆盖前面的值。 source的每个sub task独立生成水印。
    watermark通过operator时会推进operators处的当前event time,同时operators会为下游生成一个新的watermark。
    多输入operator(union、 keyBy、 partition)的当前event time是其输入流event time的最小值。
    注意:多并行度的情况下,watermark对齐会取所有channel最小的watermark

    在这里插入图片描述

    3、watermark如何生成

    通常,在接收到source的数据后,应该立刻生成watermark;但是,也可以在source后,应用简单的map或者filter操作,然后再生成watermark。

    生成watermark的方式主要有2大类:

    1. With Periodic Watermarks
    2. With Punctuated Watermarks

    第一种可以定义一个最大允许乱序的时间,这种情况应用较多。
    我们主要来围绕Periodic Watermarks来说明,下面是生成periodic watermark的方法:

    4、watermark处理顺序数据

    需求:定义一个窗口为10s,通过数据的event time时间结合watermark实现延迟10s的数据也能够正确统计
    我们通过数据的eventTime来向前推10s,得到数据的watermark,
    代码实现:

    package com.shockang.study.bigdata.flink.watermark
    
    import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    import java.text.SimpleDateFormat
    import scala.collection.mutable.ArrayBuffer
    import scala.util.Sorting
    
    object FlinkWaterMark2 {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._
        //设置flink的数据处理时间为eventTime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
        val tupleStream: DataStream[(String, Long)] = env.socketTextStream("node01", 9000).map(x => {
          val strings: Array[String] = x.split(" ")
          (strings(0), strings(1).toLong)
        })
    
        //注册我们的水印
        val waterMarkStream: DataStream[(String, Long)] = tupleStream.assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[(String, Long)] {
          var currentTimemillis: Long = 0L
          var timeDiff: Long = 10000L
          val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
          /* //获取当前数据的waterMark
           override def getNext: Watermark = {
           }*/
          override def getCurrentWatermark: Watermark = {
            val watermark = new Watermark(currentTimemillis - timeDiff)
            watermark
          }
    
          //抽取数据的eventTime
          override def extractTimestamp(element: (String, Long), l: Long): Long = {
            val enventTime = element._2
            currentTimemillis = Math.max(enventTime, currentTimemillis)
            val id = Thread.currentThread().getId
            println("currentThreadId:" + id + ",key:" + element._1 + ",eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimestamp:[" + currentTimemillis + "|" + sdf.format(currentTimemillis) + "],watermark:[" + this.getCurrentWatermark.getTimestamp + "|" + sdf.format(this.getCurrentWatermark.getTimestamp) + "]")
            enventTime
          }
        })
        waterMarkStream.keyBy(0)
          .window(TumblingEventTimeWindows.of(Time.seconds(10)))
          .apply(new MyWindowFunction2).print()
        env.execute()
      }
    }
    
    
    class MyWindowFunction2 extends WindowFunction[(String, Long), String, Tuple, TimeWindow] {
      override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)],
                         out: Collector[String]): Unit = {
        val keyStr = key.toString
        val arrBuf = ArrayBuffer[Long]()
        val ite = input.iterator
        while (ite.hasNext) {
          val tup2 = ite.next()
          arrBuf.append(tup2._2)
        }
        val arr = arrBuf.toArray
        Sorting.quickSort(arr) //对数据进行排序,按照eventTime进行排序
        val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        val result = "聚合数据的key为:" + keyStr + "," + "窗口当中数据的条数为:" + arr.length + "," + "窗口当中第一条数据为:" + sdf.format(arr.head) + "," + "窗口当中最后一条数据为:" + sdf.format(arr.last) + "," + "窗口起始时间为:" + sdf.format(window.getStart) + "," + "窗口结束时间为:" + sdf.format(window.getEnd) + "!!!!!看到这个结果,就证明窗口已经运行了"
        out.collect(result)
      }
    }
    

    输入测验数据

    注意:如果需要触发flink的窗口调用,必须满足两个条件
    1:waterMarkTime > eventTime
    2:窗口内有数据

    数据输入测验:

    按照十秒钟统计一次,我们程序会将时间划分成为以下时间间隔段
    2019-10-01 10:11:00  到  2019-10-01 10:11:10
    2019-10-01 10:11:10  到  2019-10-01 10:11:20
    2019-10-01 10:11:20  到  2019-10-01 10:11:30
    2019-10-01 10:11:30  到  2019-10-01 10:11:40
    2019-10-01 10:11:40  到  2019-10-01 10:11:50
    2019-10-01 10:11:50  到  2019-10-01 10:12:00
    	
    顺序计算:
    触发数据计算的条件依据为两个
    第一个waterMark时间大于数据的eventTime时间,第二个窗口之内有数据
    我们这里的waterMark直接使用eventTime的最大值减去10秒钟
    
    0001 1569895882000	 数据eventTime为:2019-10-01 10:11:22  数据waterMark为  2019-10-01 10:11:12
    0001 1569895885000	 数据eventTime为:2019-10-01 10:11:25  数据waterMark为  2019-10-01 10:11:15
    0001 1569895888000	 数据eventTime为:2019-10-01 10:11:28  数据waterMark为  2019-10-01 10:11:18
    
    0001 1569895890000	 数据eventTime为:2019-10-01 10:11:30  数据waterMark为  2019-10-01 10:11:20
    0001 1569895891000	 数据eventTime为:2019-10-01 10:11:31  数据waterMark为  2019-10-01 10:11:21
    0001 1569895895000	 数据eventTime为:2019-10-01 10:11:35  数据waterMark为  2019-10-01 10:11:25
    0001 1569895898000	 数据eventTime为:2019-10-01 10:11:38  数据waterMark为  2019-10-01 10:11:28
    
    0001 1569895900000	 数据eventTime为:2019-10-01 10:11:40  数据waterMark为  2019-10-01 10:11:30  触发第一条到第三条数据计算,数据包前不包后,不会计算2019-10-01 10:11:30 这条数据
    0001 1569895911000	 数据eventTime为:2019-10-01 10:11:51  数据waterMark为  2019-10-01 10:11:41  触发2019-10-01 10:11:20到2019-10-01 10:11:28时间段的额数据计算,数据包前不包后,不会触发2019-10-01 10:11:30这条数据的计算
    

    5、watermark处理乱序数据

    输入测验数据
    接着继续输入以下乱序数据,验证flink乱序数据的问题是否能够解决

    乱序数据
    0001 1569895948000	 数据eventTime为:2019-10-01 10:12:28  数据waterMark为  2019-10-01 10:12:18  
    0001 1569895945000	 数据eventTime为:2019-10-01 10:12:25  数据waterMark为  2019-10-01 10:12:18  
    0001 1569895947000	 数据eventTime为:2019-10-01 10:12:27  数据waterMark为  2019-10-01 10:12:18  
    
    0001 1569895950000	 数据eventTime为:2019-10-01 10:12:30  数据waterMark为  2019-10-01 10:12:20  
    
    0001 1569895960000	 数据eventTime为:2019-10-01 10:12:40  数据waterMark为  2019-10-01 10:12:30  触发计算 waterMark > eventTime 并且窗口内有数据,触发 2019-10-01 10:12:28到2019-10-01 10:12:27 这三条数据的计算,数据包前不包后,不会触发2019-10-01 10:12:30 这条数据的计算
    0001 1569895949000	 数据eventTime为:2019-10-01 10:12:29  数据waterMark为  2019-10-01 10:12:30  迟到太多的数据,flink直接丢弃,可以设置flink将这些迟到太多的数据保存起来,便于排查问题
    

    6、比watermark更晚的数据如何解决

    如果我们设置数据的watermark为每条数据的eventtime往后一定的时间,例如数据的eventtime为2019-08-20 15:30:30,程序的window窗口为10s,然后我们设置的watermark为2019-08-20 15:30:40,
    那么如果某一条数据eventtime为2019-08-20 15:30:32,到达flink程序的时间为2019-08-20 15:30:45 该怎么办,这条数据比窗口的watermark时间还要晚了5S钟该怎么办?对于这种比watermark还要晚的数据,flink有三种处理方式

    1、直接丢弃

    我们输入一个乱序很多的(其实只要 Event Time < watermark 时间)数据来测试下:

    输入:【输入两条内容】
    late element
    0001 1569895948000	 数据eventTime为:2019-10-01 10:12:28  数据直接丢弃 
    0001 1569895945000	 数据eventTime为:2019-10-01 10:12:25  数据直接丢弃
    

    注意:此时并没有触发 window。因为输入的数据所在的窗口已经执行过了,flink 默认对这 些迟到的数据的处理方案就是丢弃。

    2、allowedLateness 指定允许数据延迟的时间

    在某些情况下,我们希望对迟到的数据再提供一个宽容的时间。
    Flink 提供了 allowedLateness 方法可以实现对迟到的数据设置一个延迟时间,在指定延迟时间内到达的数据还是可以触发 window 执行的。

    修改代码:

    waterMarkStream
      .keyBy(0)
      .window(TumblingEventTimeWindows.of(Time.seconds(3))).allowedLateness(Time.seconds(2))//允许数据迟到2S
    //function: (K, W, Iterable[T], Collector[R]) => Unit
    .apply(new MyWindowFunction).print()
    

    验证数据迟到性:
    输入数据:

    更改代码之后重启我们的程序,然后从新输入之前的数据

    0001 1569895882000
    0001 1569895885000
    0001 1569895888000
    0001 1569895890000
    0001 1569895891000
    0001 1569895895000
    0001 1569895898000
    0001 1569895900000
    0001 1569895911000
    0001 1569895948000
    0001 1569895945000
    0001 1569895947000
    0001 1569895950000
    0001 1569895960000
    0001 1569895949000
    

    验证数据的延迟性:定义数据仅仅延迟2S的数据重新接收,重新计算

    0001 1569895948000	 数据eventTime为:2019-10-01 10:12:28  触发数据计算  数据waterMark为  2019-10-01 10:12:30
    0001 1569895945000	 数据eventTime为:2019-10-01 10:12:25  触发数据计算  数据waterMark为  2019-10-01 10:12:30
    
    0001 1569895958000	 数据eventTime为:2019-10-01 10:12:38  不会触发数据计算 数据waterMark为  2019-10-01 10:12:30  waterMarkTime  <  eventTime,所以不会触发计算
    

    将数据的waterMark调整为41秒就可以触发上面这条数据的计算了

    0001 1569895971000	 数据eventTime为:2019-10-01 10:12:51  数据waterMark为  2019-10-01 10:12:41
    

    又会继续触发0001 1569895958000 这条数据的计算了

    3、sideOutputLateData 收集迟到的数据

    通过 sideOutputLateData 可以把迟到的数据统一收集,统一存储,方便后期排查问题。

    需要先调整代码:

    package com.shockang.study.bigdata.flink.watermark
    
    import org.apache.flink.api.java.tuple.Tuple
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.AssignerWithPunctuatedWatermarks
    import org.apache.flink.streaming.api.scala.function.WindowFunction
    import org.apache.flink.streaming.api.scala.{DataStream, OutputTag, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.windows.TimeWindow
    import org.apache.flink.util.Collector
    
    import java.text.SimpleDateFormat
    import scala.collection.mutable.ArrayBuffer
    import scala.util.Sorting
    
    
    object FlinkWaterMark {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        import org.apache.flink.api.scala._
        //设置time类型为eventtime
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        //暂时定义并行度为1
        env.setParallelism(1)
        val text = env.socketTextStream("node01", 9000)
        val inputMap: DataStream[(String, Long)] = text.map(line => {
          val arr = line.split(" ")
          (arr(0), arr(1).toLong)
        })
    
        //给我们的数据注册waterMark
        val waterMarkStream: DataStream[(String, Long)] = inputMap
          .assignTimestampsAndWatermarks(new AssignerWithPunctuatedWatermarks[(String, Long)] {
            var currentMaxTimestamp = 0L
    
            //watermark基于eventTime向后推迟10秒钟,允许消息最大乱序时间为10s
            val waterMarkDiff: Long = 10000L
    
            val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
    
            //获取下一个水印
            override def checkAndGetNextWatermark(t: (String, Long), l: Long): Watermark = {
              val watermark = new Watermark(currentMaxTimestamp - waterMarkDiff)
              watermark
            }
    
            //抽取当前数据的时间作为eventTime
            override def extractTimestamp(element: (String, Long), l: Long): Long = {
              val eventTime = element._2
              currentMaxTimestamp = Math.max(eventTime, currentMaxTimestamp)
              val id = Thread.currentThread().getId
              println("currentThreadId:" + id + ",key:" + element._1 + ",eventtime:[" + element._2 + "|" + sdf.format(element._2) + "],currentMaxTimestamp:[" + currentMaxTimestamp + "|" + sdf.format(currentMaxTimestamp) + "],watermark:[" + this.checkAndGetNextWatermark(element, l).getTimestamp + "|" + sdf.format(this.checkAndGetNextWatermark(element, l).getTimestamp) + "]")
              eventTime
            }
          })
    
    
        val outputTag: OutputTag[(String, Long)] = new OutputTag[(String, Long)]("late_data")
        val outputWindow: DataStream[String] = waterMarkStream
          .keyBy(0)
          .window(TumblingEventTimeWindows.of(Time.seconds(3)))
          // .allowedLateness(Time.seconds(2))//允许数据迟到2S
          .sideOutputLateData(outputTag)
          //function: (K, W, Iterable[T], Collector[R]) => Unit
          .apply(new MyWindowFunction)
    
    
        val sideOuptut: DataStream[(String, Long)] = outputWindow.getSideOutput(outputTag)
    
        sideOuptut.print()
        outputWindow.print()
    
        //执行程序
        env.execute()
    
      }
    }
    
    class MyWindowFunction extends WindowFunction[(String, Long), String, Tuple, TimeWindow] {
      override def apply(key: Tuple, window: TimeWindow, input: Iterable[(String, Long)], out: Collector[String]): Unit = {
        val keyStr = key.toString
        val arrBuf = ArrayBuffer[Long]()
        val ite = input.iterator
        while (ite.hasNext) {
          val tup2 = ite.next()
          arrBuf.append(tup2._2)
        }
        val arr = arrBuf.toArray
        Sorting.quickSort(arr)
        val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");
        val result = keyStr + "," + arr.length + "," + sdf.format(arr.head) + "," + sdf.format(arr.last) + "," + sdf.format(window.getStart) + "," + sdf.format(window.getEnd)
        out.collect(result)
      }
    }
    

    我们来输入一些数据验证一下 输入

    0001 1569895882000
    0001 1569895885000
    0001 1569895888000
    0001 1569895890000
    0001 1569895891000
    0001 1569895895000
    0001 1569895898000
    0001 1569895900000
    0001 1569895911000
    0001 1569895948000
    0001 1569895945000
    0001 1569895947000
    0001 1569895950000
    0001 1569895960000
    0001 1569895949000
    输入两条迟到的数据,会被收集起来
    0001 1569895948000
    0001 1569895945000
    

    此时,针对这几条迟到的数据,都通过 sideOutputLateData 保存到了 outputTag 中。

    7、多并行度的watermark机制

    前面代码中设置了并行度为 1

    env.setParallelism(1);
    

    如果这里不设置的话,代码在运行的时候会默认读取本机 CPU 数量设置并行度。 把代码的并行度代码注释掉

    //env.setParallelism(1)
    

    然后在输出内容前面加上线程 id

    在这里插入图片描述
    会出现如下数据: 输入如下几行内容:
    在这里插入图片描述
    输出:
    在这里插入图片描述
    会发现 window 没有被触发。

    因为此时,这 7 条数据都是被不同的线程处理的。每个线程都有一个 watermark。

    因为在多并行度的情况下,watermark 对齐会取所有 channel 最小的 watermark 但是我们现在默认有 8 个并行度,这 7 条数据都被不同的线程所处理,到现在还没获取到最 小的 watermark,所以 window 无法被触发执行。
    在这里插入图片描述
    下面我们来验证一下,把代码中的并行度调整为 2.

    env.setParallelism(2)
    

    输入如下内容:

    0001 1569895890000
    0001 1569895903000
    0001 1569895908000
    

    输出:
    在这里插入图片描述
    此时会发现,当第三条数据输入完以后,[10:11:30,10:11:33)这个 window 被触发了。

    前两条数据输入之后,获取到的最小 watermark 是 10:11:20,这个时候对应的 window 中没 有数据。

    第三条数据输入之后,获取到的最小 watermark 是 10:11:33,这个时候对应的窗口就是 [10:11:30,10:11:33)。所以就触发了。

    展开全文
  • 内存水位watermark

    2021-02-01 19:14:23
    watermark struct zone { /* Read-mostly fields */ /* zone watermarks, access with *_wmark_pages(zone) macros */ unsigned long watermark[NR_WMARK]; long lowmem_reserve[MAX_NR_ZONES]; ... } 每个内存...
  • 目录 1 Fink 中的时间语义 1.1 哪种时间语义更重要 2 设置Event Time 3 水位线(Watermark) 3.1 水位线概念 3.2 watermark原理和特点 4 watermark的传递,引入和设置 4.1 watermark的传递 4.2 watermark代码中引入...
  • 7 时间语义与watermark

    2021-06-25 00:43:23
    7 时间语义语watermark Flink中的时间语义 Enent Time 是事件创建的时间。它通常由事件中的时间戳描述,例如采集的 日志数据中,每一条日志都会记录自己的生成时间,Flink 通过时间戳分配器访问事 件时间戳。 ...
  • 1.Flink中的时间语义 Event Time:事件创建时间; Ingestion Time:数据进入Flink的时间;... Processing Time:执行操作算子的...Flink 时间语义与Watermark及EventTime在window中的使用 不同的时间语义有.
  • Flink个人学习整理-WaterMark篇(六) 1、Flink中的时间语义 1.12之前时间语义分为3种 I、处理时间(默认) II、事件时间 III、数据进入时间 1.12进行了更改 时间语义合并为2种 I、处理时间 II、事件时间(默认) ...
  • WaterMark 6.3.1 Flink中的时间语义 Flink流式操作中,涉及到不同的时间概念。 (1)事件时间 event time 时间真实发生的时间。Flink1.12默认事件时间。 比如:log中,start-log中的ts字段,这个时间就是事件...
  • flink生成Watermark之WatermarkStrategy

    千次阅读 2021-11-22 22:39:48
    flink1.11版本后建议用WatermarkStrategy(Watermark生成策略)生成Watermark
  • 文章目录1 watermark简介2 watermark相关结构体3 watermark初始化3.1 managed_pages,spanned_pages,present_pages三个值对应的意义3.2 什么是min_free_kbytes3.3 Watermark的low,min和high这3档位初始化3.3.1 内存...
  • memory watermark 调节

    2021-02-02 17:48:11
    处理思路基本上是调整内存watermark使系统内存回收平滑,尽量避免触发直接内存回收。 实际调整中往往会遇到一些问题,导致效果事与愿违。如: /proc/zoneinfo 中min为何远大于min_free_kbytes; 改变min_free_...
  • Flink之watermark 处理延迟数据 详解

    千次阅读 2021-12-11 10:20:52
    watermark介绍 在Flink中,Watermark 是 Apache Flink 为了处理 EventTime 窗口计算提出的一种机制, 本质上是一种时间戳。 用来处理实时数据中的乱序问题的,通常是水位线和窗口结合使用来实现。 从设备生成实时流...
  • 理解Flink中的watermark

    2021-01-06 14:24:09
    1. Watermark 的理解 最早看到 Watermark 的概念就是在 Flink 的官方文档里面: The mechanism in Flink to measure progress in event time is watermarks. Watermarks flow as part of the data stream and carry...
  • (4)乱序流WaterMark 在更多场景下,基于Event Time的数据流是无续的 在无序流中,watermark至关重要,它告诉operator比watermark更早(更老/时间戳更小)的事件已经到达, operator可以将内部事件时间提前到...
  • 全量窗口函数 6.5、其他 API 7、时间语义与watermark 7.1、时间语义 7.2、watermark 水位线 7.2.2. Watermark原理 7.2.3. Watermark三种使用情况 7.2.4. Watermark的产生方式 7.2.5. watermark 迟到数据 8、状态...
  • MyEvent, extractedTimestamp: Long): Watermark = { if (lastElement.hasWatermarkMarker()) new Watermark(extractedTimestamp) else null } } 注意:每个单独的事件都可以产生一个watermark,然而,由于每个...
  • 注意: watermark 接口支持的最大 gif 帧数为 200,超过 200,处理结果只返回原图。图片水印接口规格注意: 接口规格不含任何空格与换行符。watermark/1/image//dissolve//gravity//dx//dy//ws//wst/参数名称必填...
  • WaterMark使用和详解

    2021-07-09 11:54:03
    WaterMark翻译为水位线,什么时候用到水位线呢? 比如说水控在顺水的时候达到紧梯就会触发,若不放水就可以发现危险的现状 在spark程序划分成窗口的时候,主要是衡量什么时候触发,这也是需要用到的水位线,其实它...
  • 文章目录六、高阶编程开窗一般在分组之后6.1 ... 灵活6.1.2.3 全窗口函数(full window functions)6.1.2.4 其它可选API6.2 时间语义与Watermark6.2.0 与spark的比较6.2.1 时间语义①Event Time (事件时间)②Ingest
  • PUT _cluster/settings { "transient": { "cluster.routing.allocation.disk.watermark.low": "85%", "cluster.routing.allocation.disk.watermark.high": "90%", "cluster.routing.allocation.disk.watermark....
  • flink 中的Watermark

    2021-03-28 23:54:29
    理解 flink中的watermark watermark 原理
  • 时间语义谈及watermark就要先从Flink支持的时间语义说起,Flink支持三种时间语义:process time:指的系统处理对应数据时的系统时间。他是最简单的一种实现,由于不需要额外的协调,因性能最好event time:是指数据中...
  • Flink Table Api 接入数据源 设置watermark String sourceDDL = "CREATE TABLE kafka_source (\n" + " msg VARCHAR,\n" + " level VARCHAR,\n" + " timeConsuming INT,\n" + " logTime TIMESTAMP(3), " + // 指
  • flink设置watermark以及事件时间字段源码分析 背景 1.1、提取时间戳字段,用于事件时间语义处理数据 1.2、设置水位线(水印)watermark TimestampAssigner 核心接口介绍 TimestampAssigner 时间分配器接口 实现类关系...
  • 周期性生成Watermark import Source.WaterSensor import org.apache.flink.streaming.api.TimeCharacteristic import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks import org.apache....
  • 一、简单水印(watermark-dom)阿里巴巴内网的不可见水印用的是什么算法?据说月饼事件截图的那位员工也被开除了?下面的只是简单的加一个很浅的水印,实现起来很容易。1、看看水印的效果随便找一个网站,比如就找掘金...
  • flink Watermark 总结

    2021-11-22 21:48:29
    1. Watermark介绍 Watermark就是设定一个延迟时间,比如水位线设置2秒,当时间缀-2小于,窗口关闭时间时,窗口不关闭,进行接收数据,当时间缀-2等于关闭时间,窗口触发关闭。 2. Watermark分类 ...
  • 【Flink】WaterMark与EventTime

    千次阅读 2021-11-27 21:02:39
    【Flink重难点辨析】WaterMark与EventTime WaterMark Flink中测量事件时间的进度的机制就是watermark(水印). watermark作为数据流的一部分在流动, 并且携带一个时间戳t。 一个Watermark(t)表示在这个流里面事件时间...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 65,587
精华内容 26,234
关键字:

watermark

友情链接: IconTool.rar