精华内容
下载资源
问答
  • 我们知道 Flink 作业的配置一般都是通过在作业启动的时候通过参数传递的,或者通过读取配置文件的参数,在作业启动后初始化了之后如果再想更新作业的配置一般有两种解决方法: (1)改变启动参数或者改变配置文件,...

    我们知道 Flink 作业的配置一般都是通过在作业启动的时候通过参数传递的,或者通过读取配置文件的参数,在作业启动后初始化了之后如果再想更新作业的配置一般有两种解决方法:

    (1)改变启动参数或者改变配置文件,重启作业,让作业能够读取到修改后的配置()

    (2)通过读取配置流(需要自定义 Source 读取配置),然后流和流连接起来

    (3)读取配置信息,从mysql或者redis

    今天介绍的是一种用配置中心。携程 apollo、spring cloud config、nacos 等

    Flink  open 方法

    invoke 方法:

     日志打印结果:

     之前在京东的时候有了解过这种分布式配置中,DUCC。感兴趣的可以看看相关文档,是如何实现的。

    展开全文
  • 实现MySQL_Source配置信息动态定时更新; 实现MySQL_Source广播,此处使用最常用的keyby广播KeyedBroadcastProcessFunction; 摘要 关键字 MySQL_Source、Flink广播; 设计 MyJdbcSource 日常创建一个...

    背景

    • 适用于配置化操作流,无需终止流式程序实现配置,并且以广播流的形式在流式程序中使用;
    • 实现MySQL_Source配置信息动态定时更新;
    • 实现MySQL_Source广播流,此处使用最常用的keyby广播流KeyedBroadcastProcessFunction;

    摘要

    关键字

    • MySQL_Source、Flink广播流;

    设计

    • MyJdbcSource

      1. 日常创建一个继承源富函数的类;
      2. 初始化单连接;
      3. 配置更新时间设置自己可以编写方法应用;
    • 配置化Flink广播流;

      1. 获取配置Source随即进行broadcast;

      2. 接入数据流进行数据操作;

      3. 结果数据流与广播流连接connect,与多数据流connect底层原理类似,最底层每个core对应一个广播流进行高效匹配;

        在这里插入图片描述

      4. process,按照类重写方法分别进行处理数据流和广播流;

    理解

    • eg:网路搬来的图片,自己就不花时间画啦;

      1. 广播流

        img

      2. 普通数据流

        img

    实现

    说明

    此处的处理没有写成项目中使用的比较复杂的可配置化的形式,也就是只针对单表测试表的操作;

    依赖

    <scala.main.version>2.11</scala.main.version>
    <flink.version>1.10.1</flink.version>
    <!--flink-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_${scala.main.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_${scala.main.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka_${scala.main.version}</artifactId>
                <version>1.10.1</version>
            </dependency>
    
            <!--flink table & sql-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-api-scala-bridge_${scala.main.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-table-planner_${scala.main.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!--导入flink连接redis的文件-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-redis_${scala.main.version}</artifactId>
                <version>${flink.redis.version}</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-elasticsearch6_${scala.main.version}</artifactId>
                <version>${flink.version}</version>
            </dependency>
    
            <!--rocksdb 与flink 进行整合依赖-->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-statebackend-rocksdb_2.11</artifactId>
                <version>1.9.1</version>
            </dependency>
    
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-cep-scala_2.11</artifactId>
                <version>${flink.version}</version>
            </dependency>
    

    main

    import java.util
    
    import com.xx.beans.FlinkMergeDataResultBean
    import com.xx.utils.flink.soruce.{MyJdbcSource, MyKafkaSource}
    import com.xx.utils.flink.states.ValueStateDemo.MyMapperTableData
    import org.apache.flink.api.common.state.{BroadcastState, MapStateDescriptor, ReadOnlyBroadcastState}
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo
    import org.apache.flink.api.java.typeutils.MapTypeInfo
    import org.apache.flink.streaming.api.datastream.BroadcastStream
    import org.apache.flink.streaming.api.{CheckpointingMode, TimeCharacteristic}
    import org.apache.flink.streaming.api.functions.co.{KeyedBroadcastProcessFunction, KeyedCoProcessFunction}
    import org.apache.flink.streaming.api.scala.{BroadcastConnectedStream, DataStream, StreamExecutionEnvironment}
    import org.apache.flink.util.Collector
    
    import scala.collection.mutable
    
    /**
     * @Author KevinLu
     * @Description Flink实现动态MySQL数据配置广播流
     * @Copyright 代码类版权的最终解释权归属KevinLu本人所有;
     **/
    object BroadcastStateStreamDemo {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
    
        /**
         * 重启策略配置
         */
    
    
        /**
         * import
         */
        import org.apache.flink.api.scala._
    
        /**
         * 获取配置MySQL流
         */
        val propertiesStream: DataStream[mutable.HashMap[String, String]] = env.addSource(MyJdbcSource)
    
        /**
         * * @param name {@code MapStateDescriptor}的名称。
         * * @param keyTypeInfo状态下键的类型信息。
         * * @param valueTypeInfo状态值的类型信息。
         */
        val mapStateDescriptor: MapStateDescriptor[String, util.Map[String, String]] = new MapStateDescriptor("broadCastConfig", BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo(classOf[String], classOf[String]))
        //拿到配置广播流
        val broadcastStream: BroadcastStream[mutable.HashMap[String, String]] = propertiesStream.setParallelism(1).broadcast(mapStateDescriptor)
        //数据流
        val dStream1: DataStream[String] = MyKafkaSource.myKafkaSource(
          env,
          "xxx:9092,xxx:9092,xxx:9092",
          List("xxx")
        )
    
        val keydStream = dStream1
          .map(new MyMapperTableData)
          .keyBy(_.key)
    
        val broadcastConnectedStream: BroadcastConnectedStream[FlinkMergeDataResultBean, mutable.HashMap[String, String]] = keydStream.connect(broadcastStream)
    
        /**
         * * @param <KS>输入的关键字流的关键类型。KeyedStream 中 key 的类型
         * * @param <IN1>键控(非广播)端的输入类型。
         * * @param <IN2>广播端输入类型。
         * * @param <OUT>操作符的输出类型。
         */
        broadcastConnectedStream.process(
          new KeyedBroadcastProcessFunction[String, FlinkMergeDataResultBean, mutable.HashMap[String, String], String] {
            val mapStateDescriptor : MapStateDescriptor[String, util.Map[String, String]] = new MapStateDescriptor("broadCastConfig", BasicTypeInfo.STRING_TYPE_INFO, new MapTypeInfo(classOf[String], classOf[String]))
            /**
             * 初始化
             */
            private var hashMap: mutable.HashMap[String, String] = mutable.HashMap[String, String]()
            /**
             * 这个函数处理数据流的数据,这里之只能获取到 ReadOnlyBroadcastState,因为 Flink 不允许在这里修改 BroadcastState 的状态。
             * value 是数据流中的一个元素;ctx 是上下文,可以提供计时器服务、当前 key和只读的 BroadcastState;out 是输出流收集器。
             *
             * @param value
             * @param ctx
             * @param out
             */
            override def processElement(value: FlinkMergeDataResultBean, ctx: KeyedBroadcastProcessFunction[String, FlinkMergeDataResultBean, mutable.HashMap[String, String], String]#ReadOnlyContext, out: Collector[String]): Unit = {
    //          val broadcastValue: HeapBroadcastState[String, util.Map[String, String]] = ctx.getBroadcastState(mapStateDescriptor).asInstanceOf[HeapBroadcastState[String, util.Map[String, String]]]
              println("hashMap",hashMap)
              println("value",value)
              println("hashMap.get(value.key)",hashMap.get(value.key))
              out.collect(hashMap.get(value.key).getOrElse("NU"))
            }
    
            /**
             * 这里处理广播流的数据,将广播流数据保存到 BroadcastState 中。
             * @param value value 是广播流中的一个元素;
             * @param ctx ctx 是上下文,提供 BroadcastState 和修改方法;
             * @param out out 是输出流收集器。
             */
            override def processBroadcastElement(value: mutable.HashMap[String, String], ctx: KeyedBroadcastProcessFunction[String, FlinkMergeDataResultBean, mutable.HashMap[String, String], String]#Context, out: Collector[String]): Unit = {
    //          val broadcastState: BroadcastState[String, util.Map[String, String]] = ctx.getBroadcastState(mapStateDescriptor)
              hashMap = value
            }
          }
        ).print("输出结果:")
    
        env.execute()
      }
    }
    
    

    Bean

    case class FlinkMergeDataResultBean(
                                       key : String ,
                                       data : JSONObject
                                       )
    

    MySQL_Source

    import java.sql.Connection
    
    import com.xx.contant.ContantCommon
    import com.xx.utils.jdbc.Jdbc
    import org.apache.flink.configuration.Configuration
    import org.apache.flink.streaming.api.functions.source.{RichSourceFunction, SourceFunction}
    
    import scala.collection.mutable
    
    /**
     * Description:xxxx<br/>
     * Copyright (c) ,20xx , KevinLu <br/>
     * This program is protected by copyright laws. <br/>
     *
     * @author KevinLu
     * @version : 1.0
     */
    object MyJdbcSource extends RichSourceFunction[mutable.HashMap[String, String]] {
      private var jdbcConn: Connection = null
      val url = "jdbc:mysql://xx:3306/xx?user=xx&password=xx&characterEncoding=UTF-8&useSSL=false"
      var isRunning = true
    
      /**
       * 初始化source
       * @param parameters
       */
      override def open(parameters: Configuration): Unit = {
        println(s"查询MySQL已经建立连接!")
        //所有Jdbc单连接方法都可,根据获取情况判定是否使用连接池
        jdbcConn = Jdbc.getConnect(url)
      }
    
      override def run(ctx: SourceFunction.SourceContext[mutable.HashMap[String, String]]): Unit = {
        val sql = s"select name , properties from properties"
        /**
         * 收到同一个source_table下的多个task_name任务
         */
        var mapResult = mutable.HashMap[String, String]()
        while(isRunning){
          /**
           * 收到同一个source_table下的多个task_name任务
           */
          val map = mutable.HashMap[String, String]()
          try {
            val stmt = jdbcConn.createStatement()
            val pstmt = stmt.executeQuery(sql)
            while (pstmt.next) {
              /**
               * 双MAP做唯一判断
               */
              map.put(pstmt.getString(1), pstmt.getString(2))
            }
          } catch {
            case e: Exception => println(s"${this.getClass.getSimpleName}-查询源数据配置信息异常 → ${e}", e.printStackTrace())
          }
          mapResult = map
          ctx.collect(mapResult)
          Thread.sleep(10000)
        }
      }
    
      /**
       * 取消一个job时
       */
      //  override def cancel(): Unit = {
      //    if (jdbcConn != null) {
      //      jdbcConn.close()
      //    } else {
      //      println("JDBC连接为空,请注意检查!")
      //    }
      //  }
    
      override def close(): Unit = {
        jdbcConn.close()
        println("mysql连接已经关闭;")
      }
    
      override def cancel(): Unit = {
        isRunning = false
      }
    }
    

    Kafka_Source

    import java.util.Properties
    
    import com.xxxx.contant.Contant
    import org.apache.flink.api.common.serialization.{DeserializationSchema, SimpleStringSchema}
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.connectors.kafka.{FlinkKafkaConsumer}
    
    /**
     * @Author KevinLu(鹿)
     * @Description ******
     * @Date xxx
     * @Copyright 代码类版权的最终解释权归属KevinLu本人所有;
     **/
    object MyKafkaSource {
      def myKafkaSource(
                         env : StreamExecutionEnvironment ,
                         bootstrapServers : String ,
                         topics : List[String]
                       ): DataStream[String] ={
        /**
         * 获取基础参数
         */
        import org.apache.flink.api.scala._
        import scala.collection.JavaConversions._
        /**
         * 定义kafka-source得到DataStream
         */
        //将kafka中数据反序列化,
        val valueDeserializer: DeserializationSchema[String] = new SimpleStringSchema()
    
        val properties = new Properties()
        properties.put("bootstrap.servers", bootstrapServers)
    
        val kafkaSinkDStream: DataStream[String] = env.addSource(new FlinkKafkaConsumer[String](topics, valueDeserializer, properties))
        kafkaSinkDStream
      }
    }
    

    Map_Function

      /**
       * 自定义mapper处理相关数据
       * 判断如果是相关宽表中的数据那么分为同一个by中
       */
      class MyMapperTableData() extends RichMapFunction[String , FlinkMergeDataResultBean] {
    
        override def map(in: String): FlinkMergeDataResultBean = {
          println(s"source data:$in")
          val json = JSON.parseObject(in)
          FlinkMergeDataResultBean(json.getString("table") , json.getJSONObject("data"))
        }
      }
    

    部署

    #!/bin/bash
    flink run -m yarn-cluster \
    -c xxx.xxx \
    -p 8 \
    /Linux根目录
    

    注意事项

    1. 开发者根据自己需要拆解使用,包含知识点不单一,MySQL_Source、广播流、底层connect过程等;
    2. 单使用广播流可以使用外部文件、Kafka、Redis等进行测试,两个Kafka流进行测试相对容易上手;
    3. 需要花费时间深入的点就是广播流的运转模式,不同场景如何切换开发思路;
    展开全文
  • 在上一篇讲解 Flink 与 Nacos 整合的视频 中,讲过了常见的几种更新配置的方法,最常使用的可能就是通过广播的方式,相信看完上个视频的,估计对整合 Nacos 做动态更新配置应...

    在上一篇讲解 Flink 与 Nacos 整合的视频 中,讲过了常见的几种更新配置的方法,最常使用的可能就是通过广播流的方式,相信看完上个视频的,估计对整合 Nacos 做动态更新配置应该问题不大,zhisheng 我也觉得稍微简单,尤其 Nacos 搭建安装也比较简单。不知道大家公司有没有使用 Nacos 呢?我知道有的公司使用 Apollo 居多,所以后面就有读者问我能不能出个整合 Apollo 的视频,所以我趁着周末大晚上的时间就开始折腾了一番,本篇文章将给大家讲解与 Apollo 整合,动态的更新 Flink 配置。

    Apollo(阿波罗)是携程框架部门研发的分布式配置中心,能够集中化管理应用不同环境、不同集群的配置,配置修改后能够实时推送到应用端,并且具备规范的权限、流程治理等特性,适用于微服务配置管理场景。

    因为它的自身架构原因,导致安装可能会比较复杂,需要安装好多个组件,个人觉得比 Nacos 复杂,幸好的是官方的文档比较详细,跟着安装步骤来说还是没有问题的。zhisheng 我是只在自己 Mac 电脑上面安装了一个单机版的,仅为测试使用。

    快速上手的请参考该链接 https://github.com/nobodyiam/apollo-build-scripts,这样你就能够在几分钟内在本地环境部署、启动 Apollo 配置中心。另外还提供了 Quick Start 的 Docker 版本,如果你对 Docker 比较熟悉的话,那更方便了。

    主要演示流程(安装 Apollo 和整合 Flink),本人录了个视频(上传到 B 站了,地址链接是 https://www.bilibili.com/video/av91742999/,因为视频超过 30 分支,上传到微信公众号有点复杂,可以点击文章末尾的阅读原文查看高清视频),更方便大家去实战操作,欢迎观看。

    代码地址:https://github.com/zhisheng17/flink-learning/tree/master/flink-learning-configration-center/flink-learning-configration-center-apollo

    注意引入 Apollo 的依赖:

    <dependency>
        <groupId>com.ctrip.framework.apollo</groupId>
        <artifactId>apollo-client</artifactId>
        <version>1.5.1</version>
    </dependency>
    

    END
    
    关注我
    公众号(zhisheng)里回复 面经、ES、Flink、 Spring、Java、Kafka、监控 等关键字可以查看更多关键字对应的文章。
    

    你点的每个赞,我都认真当成了喜欢

    展开全文
  • Apache Flink:数据编程模型 | 从入门到精通 - 第 2 期(每天一期新知识) Flink是最热门的实时计算引擎之一。在动手部署和编程之前,学习Flink的数据编程模型,可以建立起核心概念的全局架构。方便局部概念深入...

    Apache Flink:数据流编程模型

    | 从入门到精通 - 第 2 期(每天一期新知识)

    Flink是最热门的实时计算引擎之一。在动手部署和编程之前,学习Flink的数据流编程模型,可以建立起核心概念的全局架构。方便局部概念深入学习。

    Apache Flink:数据流编程模型

    点击播放视频教程
    在这里插入图片描述
    https://www.bilibili.com/video/av66869896/

    Flink的数据流编程模型(基于最新版flink1.9),共包含的概念有:抽象层级,程序和数据流,并行数据流,窗口,时间概念,有状态计算,容错检查点。

    | 抽象层级

    Flink提供不同级别的抽象来开发流/批处理应用程序。
    在这里插入图片描述

    • 最低级抽象只提供有状态流。它通过Process Function嵌入到DataStream API中。它允许用户自由处理来自一个或多个流的事件,并使用一致的容错状态。此外,用户可以注册事件时间和处理时间回调,允许程序实现复杂的计算。

    • 在实践中,很多应用程序不需要上述的低级抽象,而是针对Core APIs编程,如DataStream API(有界/无界流)和DataSet API(有界数据集)。这些流畅的API提供了用于数据处理的通用构建块,例如各种形式的用户指定的转换,连接,聚合,窗口,状态等。在这些API中处理的数据类型在相应的编程语言中表示为类。

      低级Process Function与DataStream API集成在一起,因此只能对某些操作进行低级抽象。DataSet API在有界数据集上提供了额外的基元,如循环/迭代。

    • Table API是以表为中心的声明性DSL,可以是动态更改表(表示流时)。Table API遵循(扩展)关系模型:表附加了一个模式(类似于关系数据库中的表),API提供了可比较的操作,例如select,project,join,group-by,aggregate等。表API程序以声明方式定义应该执行的逻辑操作,而不是准确指定操作代码的外观。尽管Table API可以通过各种类型的用户定义函数进行扩展,但它的表现力不如Core API,但使用起来更简洁(编写的代码更少)。此外,Table API程序还会通过优化程序,在执行之前应用优化规则。

      可以在表和DataStream/DataSet之间无缝转换,允许程序混合Table API以及DataStream和DataSet API。

    • Flink提供的最高级抽象是SQL。这种抽象在语义和表达方面类似于Table API,但是将程序表示为SQL查询表达式。SQL抽象与Table API紧密交互,SQL查询可以在Table API中定义的表上执行。

    | 程序和数据流

    Flink程序的基本构建块是流和转换。(请注意,Flink的DataSet API中使用的DataSet也是内部流 - 稍后会详细介绍。)从概念上讲,流是(可能永无止境的)数据记录流,而转换的操作是将一个或多个流作为输入,并产生一个或多个输出流作为结果。

    执行时,Flink程序映射到流式数据流,由流和转换算子组成。每个数据流都以一个或多个源开始,并以一个或多个接收器结束。数据流类似于任意有向无环图(DAG) 。尽管通过迭代结构允许特殊形式的循环,但为了简单起见,我们将在大多数情况下对其进行掩盖。

    在这里插入图片描述

    通常,程序中的转换与数据流中的算子之间存在一对一的对应关系。但是,有时一个转换可能包含多个转换算子。
    源和接收器记录在流连接器和批处理连接器文档中。转换在DataStream operators算子和DataSet转换文档中。

    | 并行数据流

    Flink中的程序本质上是并行和分布式的。在执行期间,流具有一个或多个流分区,并且每个算子具有一个或多个算子子任务。算子子任务彼此独立,并且可以在不同的线程中执行,并且可能在不同的机器或容器上执行。
    算子子任务的数量是该特定算子的并行度。流的并行度始终是其生成算子的并行度。同一程序的不同算子可能具有不同的并行级别。

    在这里插入图片描述

    流可以在一对一(或转发)模式或在重新分发模式的两个算子之间传输数据:

    • 一对一 流(例如,在上图中的Source和map()算子之间)保留元素的分区和排序。这意味着source算子的子任务[1]生成的元素的顺序,将在map()算子的子任务[1]看到的是一样的。

    • 重新分配流(在上面的map()和keyBy/window之间,以及keyBy/window和Sink之间)重新分配流的分区。每个算子子任务将数据发送到不同的目标子任务,具体取决于所选的转换。例如keyBy()(通过散列键重新分区),broadcast()或rebalance()(随机重新分区)。在重新分配交换中,元素之间的排序仅保留在每对发送和接收子任务中(例如,map()的子任务[1]和keyBy/window的子任务[2]。因此,在此示例中,保留了每个键内的排序,但并行性确实引入了关于不同键的聚合结果到达接收器的顺序的非确定性。

    | 窗口

    聚合事件(例如,计数,总和)在流上的工作方式与批处理方式不同。例如,不可能计算流中的所有元素,因为流通常是无限的(无界)。相反,流上的聚合(计数,总和等)由窗口限定,例如“在最后5分钟内计数”或“最后100个元素的总和” 。

    窗口可以是时间驱动的(例如:每30秒)或数据驱动(例如:每100个元素)。人们通常区分不同类型的窗口,例如翻滚窗口(没有重叠),滑动窗口(具有重叠)和会话窗口(由不活动间隙打断)。

    在这里插入图片描述

    | 时间概念

    当在流程序中引用时间(例如定义窗口)时,可以参考不同的时间概念:

    • 事件时间(Event Time)是事件的创建时间。它通常由事件中的时间戳来描述,例如由生产传感器或生产服务来附加。Flink通过时间戳分配器访问事件时间戳。
    • 接入时间(Ingestion time)是事件在源操作员处输入Flink数据流的时间。
    • 处理时间(Processing Time)是执行基于时间的操作的每个操作员的本地时间。

    在这里插入图片描述

    | 有状态计算

    虽然数据流中的许多计算只是一次查看一个单独的事件(例如事件解析器),但某些操作会记住多个事件(例如窗口操作符)的信息。这些操作称为有状态

    状态计算的状态保持在可以被认为是嵌入式键/值存储的状态中。状态被严格地分区和分布在有状态计算读取的流中。因此,只有在keyBy()函数之后才能在有键的流上访问键/值状态,并且限制为与当前事件的键相关联的值。对齐流和状态的键可确保所有状态更新都是本地操作,从而保证一致性而无需事务开销。此对齐还允许Flink重新分配状态并透明地调整流分区。

    在这里插入图片描述

    | 容错检查点

    Flink使用流重放检查点(checkpointing)的组合实现容错。检查点与每个输入流中的特定点以及每个操作符的对应状态相关。通过恢复算子的状态并从检查点重放事件,可以从检查点恢复流数据流,同时保持一致性(恰好一次处理语义) 。
    检查点间隔是在执行期间用恢复时间(需要重放的事件的数量)来折中容错开销的手段。
    容错内部的描述提供了有关Flink如何管理检查点和相关主题的更多信息。

    | 流地批处理

    Flink流程序上执行批处理,其中流是有界的(有限数量的元素)。DataSet在内部被视为数据流。因此,上述概念以相同的方式应用于批处理程序,并且除了少数例外它们适用于流程序:

    • 批处理程序的容错不使用检查点(checkpointing)。通过完全重放流来进行恢复。因为输入有限所以是可行的。这会使成本更多地用于恢复,但使常规处理更代价更低,因为它避免了检查点。
    • DataSet API中的有状态操作使用简化的内存/核外数据结构,而不是键/值索引。
    • DataSet API引入了特殊的同步(超级步骤)迭代,这些迭代只能在有界流上进行。

    | 上期回顾

    初识Apache Flink - 数据流上的有状态计算

    【扫码关注】
    最新大数据系列教程

    公众号:从入门到精通

    (从入门到精通 - 每天一期新知识)

    展开全文
  • Flink-Cep实现规则动态更新

    千次阅读 2020-11-30 21:47:42
    Flink-Cep实现规则动态更新 规则引擎通常对我们的理解就是用来做模式匹配的,在数据里面检测满足规则要求的数据。有人会问为什么需要规则动态变更呢?直接修改了规则把服务重启一下不就可以了吗,这个当然是不行...
  • flink 动态广播状态

    千次阅读 2019-07-05 09:27:06
    flink 动态广播状态笔记 1.概要 往往我们在做flink任务计算的时候,需要动态的匹配规则,但是我们又不能每次修改都去重新部署服务,所以我们这里需要借助于flink动态广播状态机制,来实时的处理我们的规则变化 2...
  • Flink 表和

    2021-07-24 10:04:30
    通过将INSERT 操作编码为 add message、将 DELETE 操作编码为 retract message、将 UPDATE 操作编码为更新(先前)行的 retract message 和更新(新)行的 add message,将动态表转换为 retract 。下
  • FlinkSQL创建表后,将动态表转换为数据的时候,我们可以选择两种方式:toAppendStream和toRetractStream。 官网对此也有解释,编者在此给你找出来了:Flink官网动态表(表转)说明部分,此处应有掌声。 表转...
  • Apache Flink(以下简称flink) 是一个旨在提供‘一站式’ 的分布式开源数据处理框架。是不是听起来很像spark?没错,两者都希望提供一个统一功能的计算平台给用户。虽然目标非常类似,但是flink在实现上和spark存在着...
  • 前言: 在使用Flink流式计算时,代码功能实现重要的同时,后期的可维护性一样很重要。...为了解决这样问题,小编发现了一个Flink的一个功能很强大: 广播状态机制。 BroadCast State Stream: ...
  • Flink CEP 实现动态更新规则

    千次阅读 2020-12-14 22:29:00
    规则引擎通常对我们的理解就是用来做模式匹配的,在数据里面检测满足规则要求的数据。...本篇基于Flink-Cep 来实现规则动态变更加载,同时参考了Flink中文社区刘博老师的分享(https://developer.aliyun.com/articl
  • Flink动态表 (Dynamic Table)

    千次阅读 2020-02-12 00:22:16
    本文主要是想说一下Flink动态表的思路。主要是可以类比传统数据库的物化视图。 翻译于(官网)原地址:https://ci.apache.org/projects/flink/flink-docs-release-1.9/zh/dev/table/streaming/dynamic_tab...
  • FlinkSQL | 处理中的特殊概念

    多人点赞 热门讨论 2021-01-20 17:49:38
    一、前言         上一篇文章,为大家介绍了关于 FlinkSQL 的背景,常见使用...二、处理中的特殊概念         Table API和SQL,
  • 本着开源的精神将学习成果分享,转载请注明出处。 问题复现 场景 对每条数据进行关键字检测,对符合条件的消息进行拦截。例如关键字是 java,则消息 java是世界上...繁琐且不友好,有没有什么可以动态的修改算子.
  • Flink流动态表详解

    千次阅读 2019-01-18 08:16:17
    问题导读1.动态表有什么特点?2.处理与批处理转换为表后有什么相同之处?3.动态表和连续查询是什么关系?4.连续查询本文列举了什么例子?5.Flink的Table AP...
  • 动态表 是 Flink 的支持数据的 Table API 和 SQL 的核心概念。与表示批处理数据的静态表不同,动态表是随时间变化的。可以像查询静态批处理表一样查询它们。查询动态表将生成一个 连续查询 。一个连续查询永远不会...
  • 本周起,Flink 社区微信公众号将同步 Apache Flink 中文邮件列表中的 Flink Weekly 周报,本期的主要内容由 Hequn Cheng 整理,包括:发布 Flin...
  • 一种动态更新flink任务配置的方法

    千次阅读 2020-04-25 14:47:20
    通过添加控制的方式实现flink任务配置的动态更新
  • 点击上方“zhisheng”,选择“设为星标”回复”666“获取独家整理的学习资料我们知道 Flink 作业的配置一般都是通过在作业启动的时候通过参数传递的,或者通过读取配置文件的参数,...
  • Flink及主流框架比较

    千次阅读 2018-05-12 16:12:19
    Apache Flink(以下简称flink) 是一个旨在提供‘一站式’ 的分布式开源数据处理框架。是不是听起来很像spark?没错,两者都希望提供一个统一功能的计算平台给用户。虽然目标非常类似,但是flink在实现上和spark存在着...
  • 本期的主要内容由 Hequn Cheng 整理,包括:发布 Flink 1.10 和 Flink 1.9.2 的更新,关于将 Flink Docker image 发布集成到 Flink 发布过程中的讨论,PyFlink 后期新功能的讨论以及一些博客文章。 作者:程鹤群...
  • Flink-数据编程模型

    万次阅读 2018-09-27 12:45:16
    目录: 1、抽象等级 2、程序和数据 3、并行数据 4、窗口 5、时间 6、状态操作 7、容错检查点 8、批处理 1、抽象等级 &amp;amp;amp;amp;...Flink提供了不同级别的抽象来开发/
  • 通过flink的broadcast机制,可以将这一动态变化广播到业务,并进行相应的逻辑处理,最终实现配置的动态更新。 下面写一个简单的demo,仅供平时学习积累使用。 CDC CDC全称Change Data Capture,变动数据捕获。...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,578
精华内容 1,431
关键字:

flink动态更新流