精华内容
下载资源
问答
  • flink api

    2018-01-04 15:20:20
    DataStream对象,简称ds,作为流计算编程的核心上下文。提供多种功能 //1:接入数据源 1:ds由env和SourceFunction产生,完成接入数据源的功能 ...21:writeUsingOutputFormat,通过...详细Api参考地址

    DataStream对象,简称ds,作为流计算编程的核心上下文。提供多种功能

    //1:接入数据源

    1:ds由env和SourceFunction产生,完成接入数据源的功能

    //2:数据处理

    2:filter。实现过滤功能

    3:map,实现1->1的映射转换

    4:flatmap,实现拆解字符串等复杂的转换功能

    5:project,实现字段裁剪

    6:transform实现复杂需求的功能

    //3:join流

    7:coGroup,流合并

    8:join,实现流join

    0:union,merge两个流

    //4:分组,种类太多。也没整明白。大多只用keyBy就行。其他都有特殊场景,或者性能优化用处的

    9:keyBy,实现按指定key的value值的hash,实现数据分组

    10:shuffle,实现数据随机分组

    11:broadcast,实现数据的广播

    12:forward,实现本地task分组

    13:rebalance,使用随机函数,进行数据分组

    14:rescale,数据随机的分组到一个instance子集

    15:global,将数据发送到,下个操作的第一个instance

    15:partitionCustom,实现数据的自定义分组

    //5:窗口函数,跳动窗口,滑动窗口两种类型

    16:timeWindowAll,实现时间的滑动窗口和跳动窗口

    17:countWindowAll,实现事件个数的滑动窗口和跳动窗口

    18:windowAll,自定义窗口。时间和事件个数的窗口就是基于这个实现的。

    //6:事件时间

    19:assignTimestampsAndWatermarks,为事件指定时间,和WATERMARK类型。

    //:输出

    20:addSink,根据SinkFunction的实现,实现数据处理的结点,一般是数据落地

    21:writeUsingOutputFormat,通过OutputFormatSinkFunction,封装了addSink,根据OutputFormat实现类,实现数据落地。

    详细Api参考地址

    展开全文
  • Flink API入门

    2020-07-04 15:32:21
    Flink数据处理流程 通过前面的文章我们大概了解了实时流处理...入门Flink API 后面我们都选择使用Scala来完成Flink程序的编写,当然你也可以选择Java或者Python。Scala DataSet API的所有核心类都在包org...

    Flink数据处理流程

     

       通过前面的文章我们大概了解了实时流处理框架,这篇文章开始我们将详细来学习下Flink的使用。Flink为开发流式/批处理应用程序提供了不同级别的抽象。

    而这几个抽象的模块中DataStream API用于处理实时流处理,DataSet API用于离线批处理。

     

    入门Flink API

     

          后面我们都选择使用Scala来完成Flink程序的编写,当然你也可以选择Java或者Python。Scala DataSet API的所有核心类都在包org.apache.flink.api.scala中,DataStream API都在org.apache.flink.streaming.api.scala中。

     

    要执行一个flink程序首先要获取一个执行环境,获得一个执行环境有三种方式:

     

    getExecutionEnvironment() 

    createLocalEnvironment() 

    createRemoteEnvironment(host: String, port: Int, jarFiles: String*)

     

    DataSet我们建议这样获取:

     

    val env =
    展开全文
  • 在本篇中我们详细学习下flink api的分类及其应用场景。 Flink’s API flink提供了4个层次的api抽象: 在最下层,就是有状态流式编程,它提供了状态编程和时间编程(时间编程后面会学到)的基础能力。它通过Process...

    前文我们写了一个WordCount程序体验了一下flink程序的编写。在本篇中我们详细学习下flink api的分类及其应用场景。

    Flink’s API

    flink提供了4个层次的api抽象:
    在这里插入图片描述

    • 在最下层,就是有状态流式编程,它提供了ProcessFunctionAPI。flink在这个底层api上帮我们实现了最基础的流式处理能力,我们可以在上门进行有状态编程,并且我们可以自定义定时器,可以实现复杂的时间语义处理。(注意:ProcessFunction是嵌入到DataStreamAPI中使用的)。
    • 在core api层,flink提供了DataStreamAPI和DataSetAPI。这两个API提供了数据处理的基本操作:各种数据转化(map,flatmap,filter等等),分组,join,window,状态编程等等。我们在前文写的WordCount程序就是基于DataStreamAPI实现的。
    • Table API是基于表的声明式dsl。它与DataStream区别主要在以下几个方面:
      1. 遵循关系型数据模型,自带schema,提供了类sql操作,如select,project,join,group-by等等。
      2. 声明式api,指定数据处理逻辑,不指定代码实现逻辑
      3. 可实现udf,但是表达力不如core api
      4. 在执行前,计划器会优化执行逻辑
      5. Table可与DataStream/DataSet无缝转化。
    • flink提供的最高层api是flink-sql。它的抽象层次与Table API类似,但是允许用户直接写sql便可以执行job,而无需会写java或scala代码。

    这四层关系很好理解,简单总结一下,主要是有以下两点:

    1. 越往上层,开发人员写的代码越简洁,面向的开发人员越广。

    2. 越往上层,处理的数据越结构化,功能灵活性越低。

    展开全文
  • flink api 常用算子的操作

    千次阅读 2019-07-05 09:24:30
    flink api常用操作算子 1.flink 中算子是将一个或多个DataStream转换为新的DataStream,可以将多个转换组合成复杂的数据流拓扑 2.在flink中有多种不同的DataStream类型,他们之间是通过使用各种算子进行的 3.在...

    flink api常用操作算子

    1.flink 中算子是将一个或多个DataStream转换为新的DataStream,可以将多个转换组合成复杂的数据流拓扑
    2.在flink中有多种不同的DataStream类型,他们之间是通过使用各种算子进行的
    3.在flink中使用scala语言开发,需要引用import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._

    1.1 map操作

    map可以理解为映射,对每个元素进行一定的变换后,映射为另一个元素

    package com.kn.operator
    
    import org.apache.flink.api.common.functions.MapFunction
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import org.apache.flink.api.java.tuple.Tuple1
    
    object MapOperator {
      def main(args: Array[String]): Unit = {
        //获取环境变量
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        //准备数据,类型DataStreamSource
        val dataStreamSource = env.fromElements(Tuple1.of("flink")
                                                ,Tuple1.of("spark")
                                                ,Tuple1.of("hadoop"))
          .map(new MapFunction[Tuple1[String],String] {  //准备map操作,将元素做一定的转换,映射
            override def map(value: Tuple1[String]): String = {
              return "i like "+ value.f0
            }
          })
            .print()
        env.execute("flink map operator")
      }
    
    }
    
    运行结果:
    2> i like flink
    4> i like hadoop
    3> i like spark
    

    1.1.2 scala 环境

    package com.kn.operator
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    
    
    object MapOperator {
      def main(args: Array[String]): Unit = {
        //获取环境变量
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        //准备数据,类型DataStreamSource
        val dataStreamSource = env.fromElements(Tuple1.apply("flink")
                                                ,Tuple1.apply("spark")
                                                ,Tuple1.apply("hadoop"))
          .map("i like "+_._1)
          .print()
        env.execute("flink map operator")
      }
    
    }
    
    运行结果:
    3> i like hadoop
    2> i like spark
    1> i like flink
    
    

    1.2 flatmap

    flatmap 可以理解为将元素摊平,每个元素可以变为0个、1个、或者多个元素。

    package com.kn.operator
    
    import org.apache.flink.api.common.functions.FlatMapFunction
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import org.apache.flink.api.java.tuple.Tuple1
    import org.apache.flink.util.Collector
    
    object FlatMapOperator {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.fromElements(Tuple1.of("flink jobmanger taskmanager")
                        ,Tuple1.of("spark streaming")
                        ,Tuple1.of("hadoop hdfs"))
          //注意这里的FlatMapFuncation函数,第一个参数为input类型,第二个参数为output类型
          .flatMap(new FlatMapFunction[Tuple1[String],Tuple1[String]](){
            override def flatMap(value: Tuple1[String], out: Collector[Tuple1[String]]): Unit = {
              for(s:String <- value.f0.split(" ")){
                out.collect(Tuple1.of(s))
              }
            }
          })
          .print()
    
        env.execute("flink flatmap operator")
      }
    
    }
    运行结果:
    4> (spark)
    3> (flink)
    1> (hadoop)
    3> (jobmanger)
    4> (streaming)
    3> (taskmanager)
    1> (hdfs)
    

    1.2.2 scala环境

    package com.kn.operator
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    object FlatMapOperator {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.fromElements(Tuple1.apply("flink jobmanger taskmanager")
                        ,Tuple1.apply("spark streaming")
                        ,Tuple1.apply("hadoop hdfs"))
          //注意这里的FlatMapFuncation函数,第一个参数为input类型,第二个参数为output类型
          .flatMap((t1,out:Collector[Tuple2[String,Long]]) => {
            t1._1.split(" ").foreach(s => out.collect(Tuple2.apply(s,1L)))
        })
          .print()
    
        env.execute("flink flatmap operator")
      }
    
    }
    运行结果:
    2> (flink,1)
    4> (hadoop,1)
    3> (spark,1)
    4> (hdfs,1)
    2> (jobmanger,1)
    3> (streaming,1)
    2> (taskmanager,1)
    

    1.3 filter

    filter 用于过滤记录

    1.3.1

    package com.kn.operator
    
    import org.apache.flink.api.common.functions.RichFilterFunction
    import org.apache.flink.api.java.tuple.Tuple1
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    
    object FilterOperator {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.fromElements(Tuple1.of("flink")
                        ,Tuple1.of("spark")
                        ,Tuple1.of("hadoop"))
          .filter(new RichFilterFunction[Tuple1[String]] {
            override def filter(value: Tuple1[String]): Boolean = {
              return !"flink".equals(value.f0)    //过滤掉flink的记录
            }
          })
          .print()
    
        env.execute("flink filter operator")
    
      }
    }
    运行结果:
    2> (spark)
    3> (hadoop)
    

    1.3.2 scala 环境

    package com.kn.operator
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    
    object FilterOperator {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.fromElements(Tuple1.apply("flink")
                        ,Tuple1.apply("spark")
                        ,Tuple1.apply("hadoop"))
          .filter(!"flink".equals(_._1))  //过滤掉flink字符串
          .print()
    
        env.execute("flink filter operator")
      }
    }
    运行环境:
    3> (spark)
    4> (hadoop)
    

    1.4 keyby

    逻辑上将Stream根据指定的Key进行分区,是根据key的散列值进行分区的。
    注:keyed state 必须要在keyby() 之后使用

    1.4.1

    package com.kn.operator
    
    import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction}
    import org.apache.flink.api.java.tuple.{Tuple1, Tuple2}
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import org.apache.flink.util.Collector
    import org.apache.flink.api.common.state.{ValueState, ValueStateDescriptor}
    import org.apache.flink.api.common.typeinfo.{TypeHint, TypeInformation}
    import org.apache.flink.configuration.Configuration
    
    
    /*
    * 1.先按空格将字符串拆分
    * 2.计算key分组统计数量
    * */
    object KeyByOperator {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.fromElements(Tuple1.of("flink jobmanger taskmanager")
                        ,Tuple1.of("spark hadoop")
                        ,Tuple1.of("hadoop hdfs"))
    
          .flatMap(new RichFlatMapFunction[Tuple1[String],Tuple2[String,Long]] {
            override def flatMap(value: Tuple1[String], out: Collector[Tuple2[String,Long]]): Unit = {
              for(s:String <- value.f0.split(" ")){
                out.collect(Tuple2.of(s,1L))
              }
            }
          })
          .keyBy(0)
          .map(new RichMapFunction[Tuple2[String,Long],Tuple2[String,Long]] {
            private var state :ValueState[Tuple2[String,Long]] = null
            override def open(parameters: Configuration): Unit = {
              super.open(parameters)
              val descriptor = new ValueStateDescriptor[Tuple2[String,Long]]("keyby-wordCount",
                TypeInformation.of(new TypeHint[Tuple2[String,Long]](){}))
              state = getRuntimeContext.getState(descriptor)
            }
            override def map(value: Tuple2[String, Long]):Tuple2[String, Long] = {
              val oldState = state.value()
              if(oldState != null && value.f0.equals(oldState.f0)){
                state.update(Tuple2.of(value.f0,oldState.f1+1L))
                return Tuple2.of(value.f0,oldState.f1+1L)
              }else{
                state.update(Tuple2.of(value.f0,1L))
                return Tuple2.of(value.f0,1L)
              }
            }
          })
          .print()
    
        env.execute("flink keyby operator")
      }
    
    }
    运行结果:
    1> (jobmanger,1)
    1> (taskmanager,1)
    3> (hdfs,1)
    1> (spark,1)
    4> (hadoop,1)
    4> (flink,1)
    4> (hadoop,2)
    

    1.4.2 scala版本

    package com.kn.operator
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    /*
    * 1.先按空格将字符串拆分
    * 2.计算key分组统计数量
    * */
    object KeyByOperator {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.fromElements(Tuple1.apply("flink jobmanger taskmanager")
                        ,Tuple1.apply("spark hadoop")
                        ,Tuple1.apply("hadoop hdfs"))
          .flatMap((t1,out:Collector[Tuple2[String,Long]]) => {
            t1._1.split(" ").foreach(s => out.collect(Tuple2.apply(s,1L)))
          })
          .keyBy(0)
          .reduce((t1,t2) => Tuple2.apply(t1._1,t1._2+t2._2))
          .print()
    
        env.execute("flink keyby operator")
      }
    
    }
    
    

    1.5 reduce

    reduce是归并操作,它可以将KeyedStream 转变为 DataStream,实质是按照key做叠加计算

    1.5.1

    package com.kn.operator
    
    import org.apache.flink.api.common.functions.{RichFlatMapFunction, RichMapFunction, RichReduceFunction}
    import org.apache.flink.api.java.tuple.{Tuple1, Tuple2}
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    import org.apache.flink.util.Collector
    
    object ReduceOperator {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.fromElements(Tuple1.of("flink hadoop taskmanager")
          ,Tuple1.of("spark hadoop")
          ,Tuple1.of("hadoop hdfs"))
    
          .flatMap(new RichFlatMapFunction[Tuple1[String],Tuple2[String,Long]] {
            override def flatMap(value: Tuple1[String], out: Collector[Tuple2[String,Long]]): Unit = {
              for(s:String <- value.f0.split(" ")){
                out.collect(Tuple2.of(s,1L))
              }
            }
          })
          .keyBy(0)
          .reduce(new RichReduceFunction[Tuple2[String, Long]] {
            override def reduce(value1: Tuple2[String, Long], value2: Tuple2[String, Long]): Tuple2[String, Long] = {
              return Tuple2.of(value1.f0,value1.f1+value2.f1)
            }
          })
          .print()
    
        env.execute("flink reduce operator")
      }
    }
    运行结果:
    4> (hadoop,1)
    3> (hdfs,1)
    4> (flink,1)
    4> (hadoop,2)
    4> (hadoop,3)
    1> (spark,1)
    1> (taskmanager,1)
    

    1.5.2 scala环境

    reduce是归并操作,最后归并后结果的数据类型和input的数据类型一致,所以scala在定义参数时,不用指定返回值类型

    package com.kn.operator
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.util.Collector
    
    object ReduceOperator {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.fromElements(Tuple1.apply("flink hadoop taskmanager")
          ,Tuple1.apply("spark hadoop")
          ,Tuple1.apply("hadoop hdfs"))
          .flatMap((t1,out:Collector[Tuple2[String,Long]]) => {
            t1._1.split(" ").foreach(s => out.collect(Tuple2.apply(s,1L)))
          })
          .keyBy(0)
          .reduce((t1,t2) => Tuple2.apply(t1._1,t1._2+t2._2))
          .print()
    
        env.execute("flink reduce operator")
      }
    }
    
    运行结果:
    1> (spark,1)
    3> (hdfs,1)
    1> (taskmanager,1)
    4> (hadoop,1)
    4> (hadoop,2)
    4> (flink,1)
    4> (hadoop,3)
    

    1.6 union操作

    union可以将多个流合并到一个流中,以便对合并的流进行集中统处理。是对多个流的水平拼接
    多个数据流必须是同类型

    1.6.1

    package com.kn.operator
    
    import org.apache.flink.api.java.tuple.Tuple1
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment
    
    object UnionOperator {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val df1 = env.fromElements(Tuple1.of("flink")
          ,Tuple1.of("spark")
          ,Tuple1.of("hadoop"))
        val df2 = env.fromElements(Tuple1.of("oracle")
          ,Tuple1.of("mysql")
          ,Tuple1.of("sqlserver"))
    
        //将多个流合并到一个流,多个数据流必须同类型,使流数据集中处理
        df1.union(df2).print()
        env.execute("flink union operator")
      }
    }
    
    

    1.6.2 scala 版本

    package com.kn.operator
    
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.scala._
    
    
    object UnionOperator {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val df1 = env.fromElements(Tuple1.apply("flink")
          ,Tuple1.apply("spark")
          ,Tuple1.apply("hadoop"))
        val df2 = env.fromElements(Tuple1.apply("oracle")
          ,Tuple1.apply("mysql")
          ,Tuple1.apply("sqlserver"))
    
        //将多个流合并到一个流,多个数据流必须同类型,使流数据集中处理
        df1.union(df2).filter(!_._1.equals("hadoop")).print()
        env.execute("flink union operator")
    
      }
    
    }
    运行结果:
    3> (sqlserver)
    1> (oracle)
    2> (mysql)
    3> (spark)
    2> (flink)
    

    1.7 join 操作

    join是根据指定的key将两个流做关联

    package com.kn.operator
    
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.windowing.triggers.CountTrigger
    import org.apache.flink.streaming.api.scala.StreamExecutionEnvironment
    import org.apache.flink.streaming.api.windowing.assigners.{ProcessingTimeSessionWindows}
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.util.Collector
    
    object JoinOperator {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        val df1 = env.fromElements(
          Tuple2.apply("flink",1L)
          ,Tuple2.apply("spark",2L)
          ,Tuple2.apply("hadoop",3L))
        val df2 = env.fromElements(Tuple2.apply("flink",1L)
          ,Tuple2.apply("mysql",1L)
          ,Tuple2.apply("spark",1L))
    
        df1.join(df2)
          .where(_._1)
          .equalTo(_._1)
          .window(ProcessingTimeSessionWindows.withGap(Time.seconds(10)))
          .trigger(CountTrigger.of(1))  //这里的含义为每到来一个元素,都会立刻触发计算。
          .apply((t1,t2,out:Collector[Tuple2[String,Long]]) =>{
            out.collect(Tuple2.apply(t1._1,t1._2+t2._2))
          })
          .print()
          env.execute("flink join operator")
    
      }
    }
    
    运行结果:
    4> (flink,2)
    1> (spark,3)
    

    1.8 eventTime & watermark

    (1)使用flink中的eventTime和watermark用于解决数据乱序的问题,这里要重点说明一下:只是指定了一个解决数据乱序问题的方案,但是无法彻底解决数据延迟及乱序
    (2)watermark的生成方式有2种:
    With Periodic Watermarks:周期性的触发watermark的生成和发送 (比较常用)
    with punctuated watermarks:基于某些事件触发watermark的生成和发送

    先上代码:详细执行过程请参考https://blog.csdn.net/xu470438000/article/details/83271123

    package com.kn.operator
    
    import java.text.SimpleDateFormat
    
    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.functions.AssignerWithPeriodicWatermarks
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.streaming.api.watermark.Watermark
    import org.apache.flink.streaming.api.windowing.assigners.TumblingEventTimeWindows
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.util.Collector
    
    object WaterMarkOpertor {
      def main(args: Array[String]): Unit = {
        val sdf = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS")
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        //设置eventTime 默认为process time
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        env.setParallelism(1)
    
        //add source,每一个消息体有两个元素,第一个元素为context ,第二个元素为时间戳,以逗号分隔
        env.socketTextStream("localhost", 9000)
          .map(s => Tuple2.apply(s.split(",")(0),s.split(",")(1).toLong))
          .assignTimestampsAndWatermarks(new AssignerWithPeriodicWatermarks[Tuple2[String,Long]] {
            private var currentMaxTimestamp =0L
            private val maxOutofOrderness = 10000L //最大允许乱序时间为10S
    
            //每条记录都要先执行extractTimestamp,从记录中抽取时间戳eventTime,确认currentMaxTimestamp;再确认watermark 即调用getCurrentWatermark 方法
            override def getCurrentWatermark: Watermark = new Watermark(currentMaxTimestamp - maxOutofOrderness)
    
            override def extractTimestamp(element: (String, Long), previousElementTimestamp: Long): Long = {
              currentMaxTimestamp = math.max(element._2,currentMaxTimestamp)
              println("key:"+element._1,"eventtime_format:"+sdf.format(element._2)
              ,"currentMaxTimestamp_format:"+sdf.format(currentMaxTimestamp)
              ,"currentWaterMark_format:"+sdf.format(getCurrentWatermark.getTimestamp))
              element._2
            }
          })
          .keyBy(0)
          .window(TumblingEventTimeWindows.of(Time.seconds(3))) //按照消息的EventTime分配窗口,和调用TimeWindow效果一样
          .allowedLateness(Time.seconds(2))  //允许数据延迟2秒,只有当(watermark < window_end_time + lateness) 且有新增窗口数据才会触发window计算
          //      .trigger(CountTrigger.of(1))
          .apply((t1,w1,input,out:Collector[Tuple2[String,String]]) => { //满足时间窗口条件才会触发,t1:
            val key = t1.toString
            input.foreach(t => {
              out.collect(t._1,sdf.format(t._2))  //窗口中数据集合
            })
            println("key:"+key,"window start time:"+sdf.format(w1.getStart)," window end time:"+sdf.format(w1.getEnd))
        })
          .print()
    
        env.execute("flink window watermark")
      }
    
    }
    
    

    代码解读:

    (1)env设置时间特性,默认为processing Time,我们这里设置为eventTime,即已事件时间为依据来计算时间窗口数据
    (2)数据源,直接读取socket数据,每条数据包含2个元素,第一个元素为context,第二个元素为数据的timestamp,元素中间用逗号分隔;
    (3)使用map函数,将每条信息封装为Tuple2[String,Long]
    (4)配置时间戳及watermark的生成策略:这里使用AssignerWithPeriodicWatermarks,采用周期性的生成watermark;实现两个方法(每条消息都会执行):
    extractTimestamp:抽取消息记录中的时间戳,保证系统全部最大时间戳,跟消息体的时间戳取max,这里的currentMaxTimestamp只会递增,保证了watermark的递增
    getCurrentWatermark :指定watermark的生成策略,这里使用的是,全局消息体中时间戳-最大允许乱序时间(这里设置的最大乱序允许时间为10S)
    (5)window: 这里指定的是TumblingEventTimeWindows,固定时间范围窗口,这里指定的时间窗口间隔为3s,则系统自动分配窗口为[0,3),[3,6),[6,9)…[57,60) 左闭右开 (–这里的窗口分配跟数据时间没有关系,直接属于系统按照时间间隔自动分配窗口区间)
    触发窗口计算条件:watermark >= window_end_time && window窗口内有数据
    watermark机制,是在牺牲数据时效性为代价,尽量保证数据计算的准确性;这里如果想保持实时计算,可以使用trigger 函数,每消费一条数据,可以触发一次窗口计算,直到超过时延…
    (6)针对于数据时延比较大的数据,错过了窗口计算的数据,flink默认的机制是直接丢弃(即不会再触发计算);但是flink又提供了其他2种针对时延数据的处理机制:
    #1.允许数据延迟时间(allowedLateness),注意,这里的数据时延指的是跟watermark做比较,例如我们允许时延2秒,则当watermark < window_end_time + lateness && 且窗口中有新增数据,则会重新触发窗口计算 (每增加一条数据,会触发窗口计算多次),再超过时序的数据则直接丢弃
    #2.sideOutputLateData 收集迟到的数据 : 可以把迟到的数据统一收集,统计存储(也可以外部存储),方便后期排查问题

    展开全文
  • Kudu分布式存储引擎 Kudu & Spark&Flink API操作_Kudu_文档
  • 中,介绍了Flink API使用中的一些基本概念,而更详细的信息就要到具体的信息处理界面 Streaming Guide 和 Batch Guide 去查看。 DataSet and DataStream Anatomy of a Flink Program Lazy ...
  • Flink教程(5)-Flink常用API

    千次阅读 多人点赞 2020-12-01 23:20:16
    Flink API讲解。Flink是一个同时具备流数据处理和批数据处理的分布式计算框架。Flink代码主要是由 Java 实现,部分代码由 Scala实现。Flink既可以处理有界的批量数据集、也可以处理无界的实时数据集。Flink处理的...
  • Flink API的抽象级别 1、概述 source是程序的数据源输入,你可以通过StreamExecutionEnvironment.addSource(sourceFunction)来为你的程序添加一个source。 flink提供了大量的已经实现好的source方法,你也...
  • Flink DataStream常用API

    2020-03-10 13:50:50
    Flink API的抽象级别分析2. Flink DataStream常用API 1. Flink API的抽象级别分析 Flink中提供了4种不同层次的API: 低级API:提供了对时间和状态的细粒度控制,简洁性和易用性较差,主要应用在对一些复杂事件的...
  • Flink DataStream API

    千次阅读 2021-05-26 15:13:02
    Flink DataStream API 编程指南概览前言什么是DataStreamFlink程序剖析程序样例Data SourcesDataStream Transformations算子数据流转换算子物理分区算子链和资源组Data Sinks迭代执行参数 概览 前言 Flink中的...
  • FlinkFlinkAPI 和 Table 以及 SQL API是否可以共存.pdf
  • Flink:table API

    千次阅读 2020-07-07 00:07:35
    Flink API概述 两套方案的区别 两者在编译与执行的区别 两者在优化SQL语句时的区别 如何使用Table API 入门 导入依赖 代码结构 创建TableEnvironment 创建表 从外部关系型数据库中创建表 标识符 查询表 ...
  • 1 flink api :底层 是DataStreamApi处理和DataSetApi批处理 ;高层是 table Api/Sql批处理与流处理合一 2 在flink1.12中dataStream api 实现批处理与流处理合一,推荐使用dataStream,dataSet api可能被废弃
  • flink常用api详解

    2021-02-22 11:47:23
    1.Flink提供了四种不同层级的API。低级API,核心API,Table API,SQL。 2.Flink DataStream的常用API(主要分为三块):DataSource(程序的数据源输入),Transformation(具体的操作),Sink(程序的输出) 3....
  • Flink REST API 的使用

    千次阅读 2019-07-24 17:55:22
    Flink有一个监视 API,可以用来查询正在运行的作业以及最近完成的作业的状态和统计信息。这个监视APIFlink自己的仪表板使用,但也被设计用于定制监视工具。 监视API是一个REST-ful API,它接受HTTP请求并使用...
  • 概要堆叠式技能大赏,包括Flink的运行架构、DataStream API、TableAPI、CEP、SQL 文章目录Flink API技能大赏Flink SQL技能大赏 Flink API技能大赏 看图自己回忆各个模块并散发式产生各个实际概念和应用方式以及...
  • Flink Rest API使用

    万次阅读 2018-05-29 19:28:16
    Flink Rest API使用 ---------------- ### 1. 上传jar包接口 - 请求路径 - http://hadoop3.test.yunwei.puppet.dh:40610/jars/upload - 请求方式 - POST - 请求参数 - 设置请求头 application/java-archive...
  • Flink中的API层级划分

    2020-07-21 17:47:54
    Flink 中的 API Flink 为流式/批式处理应用程序的开发提供了不同级别的抽象。 Flink API 最底层的抽象为有状态实时流处理。其抽象实现是 Process Function,并且 Process Function被 Flink 框架集成到了 ...
  • Flink DataStream API编程

    千次阅读 2018-08-06 18:26:14
    DataStream API Flink 的DataStream程序四实现数据流转换的常规程序(例如L过滤, 更新状态,定义窗口,聚合).最初从各种源(例如,消息队列,套接字流,文件)创建数据流。结果通过接收器返回,接收器可以例如将数据...
  • Flink之Table API

    2021-01-20 12:52:45
    Table API是SQL语言的超集并专门为Apache Flink设计的,Table API是Scala和Java语言集成式的API。与常规SQL语言中将查询指定为字符串不同,Table API查询是以Java或Scala中的语言嵌入样式来定义的,具有IDE支持如:...
  • Flink Table API 使用详解

    千次阅读 2019-05-17 18:10:33
    Table API是SQL语言的超集并专门为Apache Flink设计的,Table API是Scala 和Java语言集成式的API。与常规SQL语言中将查询指定为字符串不同,Table API查询是以Java或Scala中的语言嵌入样式来定义的,具有IDE支持如:...
  • Flink restful API demo

    千次阅读 2019-10-14 14:59:19
    主要解决用flink的restful API 来启动和停止yarn上的flink任务github地址:https://github.com/wenbaoup/flink-restful-demo package com.wenbao.flink.restful.flink; import com.alibaba.fastjson.JSONObject; ...
  • Flink常用监控API

    2020-12-02 16:35:35
    实际上,Flink有一套自己的监控REST APIFlink WebUI也是基于这套监控API实现的,我们可以调用官方提供的监控API,来实现我们自己想要的参数查看和状态监控功能。 下面的表格列举了常用的Flink监控REST API: ...
  • flink分层 api

    2019-06-26 23:41:00
    最底层的processFunction 功能强大,...中间层的DataSet api map reduce 。。。一些基本运算api 中上层的tableAPI 最上层 SQL 两个相似,只是写法上的不同 转载于:https://www.cnblogs.com/yszzu/p/11094583.html...
  • Flink Table API & SQL概念和通用API

    千次阅读 2020-02-10 14:01:18
    官网链接:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/table/common.html#register-a-datastream-or-dataset-as-table ...Apache Flink具有两个关系API-Table API和SQL-用于统一...
  • Flink Table API 和 SQL之概述

    千次阅读 2018-11-23 15:05:34
    Flink针对标准的流处理和批处理提供了两种相关的API,Table API和sql。TableAPI允许用户以一种很直观的方式进行select 、filter和join操作。Flink SQL支持基于Apache Calcite实现的标准SQL。针对批处理和流处理可以...
  • at org.apache.flink.api.scala.operators.ScalaAggregateOperator.translateToDataFlow(ScalaAggregateOperator.java:220) at org.apache.flink.api.scala.operators.ScalaAggregateOperator.translateToDataFlow...
  • Flink Table Api详解(算子)

    千次阅读 2020-03-05 13:50:31
    该文章主要是对Flink官网相关内容进行翻译,原文地址:https://ci.apache.org/projects/flink/flink-docs-release-1.10/dev/table/tableApi.html#over-windows Table API是用于流和批处理的统一的关系API。Table ...
  • Flink 一共有三个层级API TABLE SQL API transfrom API Procession API 这节主要讲 Procession

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 30,365
精华内容 12,146
关键字:

flink的api