精华内容
下载资源
问答
  • Flink1.12 文档
    2021-01-25 11:31:54

    API

    移除掉 ExecutionConfig 中过期的方法

    移除掉了 ExecutionConfig#isLatencyTrackingEnabled 方法, 你可以使用 ExecutionConfig#getLatencyTrackingInterval 方法代替.

    移除掉了 ExecutionConfig#enable/disableSysoutLoggingExecutionConfig#set/isFailTaskOnCheckpointError 过期的方法。

    移除掉了 -q CLI 参数。

    移除掉过期的 RuntimeContext#getAllAccumulators 方法

    过期的 RuntimeContext#getAllAccumulators 方法被移除掉了,请使用 RuntimeContext#getAccumulator 方法作为代替。

    由于数据丢失的风险把 CheckpointConfig#setPreferCheckpointForRecovery 方法标为过期

    CheckpointConfig#setPreferCheckpointForRecovery 方法标记为过期了, 因为作业在进行恢复时,如果使用较旧的 Checkpoint 状态而不使用新的 Save point 状态数据,可能会导致数据丢失。

    FLIP-134: DataStream API 的批处理执行

    • 允许在 KeyedStream.intervalJoin() 的配置时间属性,在 Flink 1.12 之前 KeyedStream.intervalJoin() 算子的时间属性依赖于全局设置的时间属性。在 Flink 1.12 中我们可以在 IntervalJoin 方法后加上 inProcessingTime() 或 inEventTime() ,这样 Join 就不再依赖于全局的时间属性。

    • 在 Flink 1.12 中将 DataStream API 的 timeWindow() 方法标记为过期,请使用 window(WindowAssigner)TumblingEventTimeWindows、 SlidingEventTimeWindowsTumblingProcessingTimeWindows 或者 SlidingProcessingTimeWindows

    • 将 StreamExecutionEnvironment.setStreamTimeCharacteristic() 和 TimeCharacteristic 方法标记为过期。在 Flink 1.12 中,默认的时间属性改变成 EventTime 了,于是你不再需要该方法去开启 EventTime 了。在 EventTime 时间属性下,你使用 processing-time 的 windows 和 timers 也都依旧会生效。如果你想禁用水印,请使用 ExecutionConfig.setAutoWatermarkInterval(long) 方法。如果你想使用 IngestionTime,请手动设置适当的 WatermarkStrategy。如果你使用的是基于时间属性更改行为的通用 'time window' 算子(eg: KeyedStream.timeWindow()),请使用等效操作明确的指定处理时间和事件时间。

    • 允许在 CEP PatternStream 上显式配置时间属性在 Flink 1.12 之前,CEP 算子里面的时间依赖于全局配置的时间属性,在 1.12 之后可以在 PatternStream 上使用 inProcessingTime() 或 inEventTime() 方法。

    API 清理

    • 移除了 UdfAnalyzer 配置,移除了 ExecutionConfig#get/setCodeAnalysisMode 方法和 SkipCodeAnalysis 类。

    • 移除了过期的 DataStream#split 方法,该方法从很早的版本中已经标记成为过期的了,你可以使用 Side Output 来代替。

    • 移除了过期的 DataStream#fold() 方法和其相关的类,你可以使用更加高性能的 DataStream#reduce

    扩展 CompositeTypeSerializerSnapshot 以允许复合序列化器根据外部配置迁移

    不再推荐使用 CompositeTypeSerializerSnapshot 中的 isOuterSnapshotCompatible(TypeSerializer) 方法,推荐使用 OuterSchemaCompatibility#resolveOuterSchemaCompatibility(TypeSerializer) 方法。

    将 Scala Macros 版本升级到 2.1.1

    Flink 现在依赖 Scala Macros 2.1.1,意味着不再支持 Scala 版本小于 2.11.11。

    SQL

    对 aggregate 函数的 SQL DDL 使用新类型推断

    aggregate 函数的 CREATE FUNCTION DDL 现在使用新类型推断,可能有必要将现有实现更新为新的反射类型提取逻辑,将 StreamTableEnvironment.registerFunction 标为过期。

    更新解析器模块 FLIP-107

    现在 METADATA 属于保留关键字,记得使用反引号转义。

    将内部 aggregate 函数更新为新类型

    使用 COLLECT 函数的 SQL 查询可能需要更新为新类型的系统。

    Connectors 和 Formats

    移除 Kafka 0.10.x 和 0.11.x Connector

    在 Flink 1.12 中,移除掉了 Kafka 0.10.x 和 0.11.x Connector,请使用统一的 Kafka Connector(适用于 0.10.2.x 版本之后的任何 Kafka 集群),你可以参考 Kafka Connector 页面的文档升级到新的 Flink Kafka Connector 版本。

    CSV 序列化 Schema 包含行分隔符

    csv.line-delimiter 配置已经从 CSV 格式中移除了,因为行分隔符应该由 Connector 定义而不是由 format 定义。如果用户在以前的 Flink 版本中一直使用了该配置,则升级到 Flink 1.12 时,应该删除该配置。

    升级 Kafka Schema Registry Client 到 5.5.0 版本

    flink-avro-confluent-schema-registry 模块不再在 fat-jar 中提供,你需要显式的在你自己的作业中添加该依赖,SQL-Client 用户可以使用flink-sql-avro-confluent-schema-registry fat jar。

    将 Avro 版本从 1.8.2 升级到 1.10.0 版本

    flink-avro 模块中的 Avro 版本升级到了 1.10,如果出于某种原因要使用较旧的版本,请在项目中明确降级 Avro 版本。

    注意:我们观察到,与 1.8.2 相比,Avro 1.10 版本的性能有所下降,如果你担心性能,并且可以使用较旧版本的 Avro,那么请降级 Avro 版本。

    为 SQL Client 打包 flink-avro 模块时会创建一个 uber jar

    SQL Client jar 会被重命名为 flink-sql-avro-1.12.jar,以前是 flink-avro-1.12-sql-jar.jar,而且不再需要手动添加 Avro 依赖。

    Deployment(部署)

    默认 Log4j 配置了日志大小超过 100MB 滚动

    默认的 log4j 配置现在做了变更:除了在 Flink 启动时现有的日志文件滚动外,它们在达到 100MB 大小时也会滚动。Flink 总共保留 10 个日志文件,从而有效地将日志目录的总大小限制为 1GB(每个 Flink 服务记录到该目录)。

    默认在 Flink Docker 镜像中使用 jemalloc

    在 Flink 的 Docker 镜像中,jemalloc 被用作默认的内存分配器,以减少内存碎片问题。用户可以通过将 disable-jemalloc 标志传递给 docker-entrypoint.sh 脚本来回滚使用 glibc。有关更多详细信息,请参阅 Docker 文档上的 Flink。

    升级 Mesos 版本到 1.7

    将 Mesos 依赖版本从 1.0.1 版本升级到 1.7.0 版本。

    如果 Flink 进程在超时后仍未停止,则发送 SIGKILL

    在 Flink 1.12 中,如果 SIGTERM 无法成功关闭 Flink 进程,我们更改了独立脚本的行为以发出 SIGKILL。

    介绍非阻塞作业提交

    提交工作的语义略有变化,提交调用几乎立即返回,并且作业处于新的 INITIALIZING 状态,当作业处于该状态时,对作业做 Savepoint 或者检索作业详情信息等操作将不可用。

    一旦创建了该作业的 JobManager,该作业就处于 CREATED 状态,并且所有的调用均可用。

    Runtime

    FLIP-141: Intra-Slot Managed Memory 共享

    python.fn-execution.buffer.memory.size 和 python.fn-execution.framework.memory.size 的配置已删除,因此不再生效。除此之外,python.fn-execution.memory.managed 默认的值更改为 true, 因此默认情况下 Python workers 将使用托管内存。

    FLIP-119 Pipelined Region Scheduling

    从 Flink 1.12 开始,将以 pipelined region 为单位进行调度。pipelined region 是一组流水线连接的任务。这意味着,对于包含多个 region 的流作业,在开始部署任务之前,它不再等待所有任务获取 slot。取而代之的是,一旦任何 region 获得了足够的任务 slot 就可以部署它。对于批处理作业,将不会为任务分配 slot,也不会单独部署任务。取而代之的是,一旦某个 region 获得了足够的 slot,则该任务将与所有其他任务一起部署在同一区域中。

    可以使用 jobmanager.scheduler.scheduling-strategy:legacy 启用旧的调度程序。

    RocksDB optimizeForPointLookup 导致丢失时间窗口

    默认情况下,我们会将 RocksDB 的 ReadOptions 的 setTotalOrderSeek 设置为true,以防止用户忘记使用 optimizeForPointLookup。同时,我们支持通过RocksDBOptionsFactory 自定义 ReadOptions。如果观察到任何性能下降,请将 setTotalOrderSeek 设置为 false(根据我们的测试,这是不可能的)。

    自定义 OptionsFactory 设置似乎对 RocksDB 没有影响

    过期的 OptionsFactory 和 ConfigurableOptionsFactory 类已移除,请改用 RocksDBOptionsFactory 和 ConfigurableRocksDBOptionsFactory。如果有任何扩展 DefaultConfigurableOptionsFactory 的类,也请重新编译你的应用程序代码。

    更多相关内容
  • flink 1.12及以上 兼容cdh6所需要的flink-shaded-hadoop jar包
  • 共课程包含9个章节:Flink安装部署与快速入门、Flink批处理API、Flink流处理API、Flink高级API、Flink-Table与SQL、Flink-Action综合练习、Flink-高级特性和新特性、Flink多语言开发、Flink性能调优 课程目录: ...
  • CDH6.3.2集成的ES7.9、flink1.12.2编译好的parcel及csd,可以直接用,不懂私聊 flink启动失败的参考这篇:https://blog.csdn.net/spark9527/article/details/115767011?spm=1001.2014.3001.5501 ES启动失败了执行这...
  • flink1.12_20210510.rar

    2021-05-10 15:59:00
    centos7.5+cdh6.3.2+scala2.12亲测可用,源码编译制作的parcel包,内附文章教程地址,欢迎大家交流学习。centos7.5+cdh6.3.2+scala2.12亲测可用,源码编译制作的parcel包,内附文章教程地址,欢迎大家交流学习。...
  • Flink1.12新特性 SQL Connectors 中的 Metadata 处理 Flink 1.12 中,元数据列是 SQL 标准的扩展,参数中connector和format配置提供的metadata字段。元数据列由METADATA关键字指示。例如,元数据列可用于在 Kafka ...

    Flink1.12新特性

    SQL Connectors 中的 Metadata 处理

    Flink 1.12 中,元数据列是 SQL 标准的扩展,参数中connector和format配置提供的metadata字段。元数据列由METADATA关键字指示。例如,元数据列可用于在 Kafka 记录中读取和写入时间戳,以进行基于时间的操作。连接器和格式文档列出了每个组件的可用元数据字段。但是,在表的架构中声明元数据列是可选的。

    //sql CREATE TABLE source( field1 varchar, field2 int, ts TIMESTAMP(3) METADATA FROM'timestamp', partition BIGINT METADATA, offset BIGINT METADATA, proc_time AS PROCTIME() )WITH( 'properties.bootstrap.servers'='110.42.146.82:9092', 'connector'='kafka-x', 'scan.parallelism'='1', 'format'='json', 'topic'='planet_MetaData', 'scan.startup.mode'='latest-offset' ); CREATE TABLE sink_kafka( field1 varchar, field2 int, ts timestamp, partition bigint, offset bigint )WITH( 'properties.bootstrap.servers'='110.42.146.82:9092', 'connector'='kafka-x', 'format'='json', 'topic'='planetTest', 'sink.parallelism'='1' ); create table sink( field1 varchar, field2 int, ts timestamp, partition bigint, offset bigint )with( 'connector'='stream-x', 'print'='true' ); insert intosink select field1, field2, ts, partition, offset from source; insert into sink_kafka select field1, field2, ts, partition, offset from source;

    ~

    上述案例中 Kafka connector available metadata

    image-20220314172256555

    时态表

    Flink1.12中,一张随时间变化的表即可被称为时态表,时态表是一个概念解释,并没有实体。

    时态表可以被分为两个实体概念,根据有无ChangeLog来区分定义。

    版本表: 如果时态表中的记录可以追踪和并访问它的历史版本,这种表我们称之为版本表,来自数据库的 changelog 可以定义成版本表。

    普通表: 如果时态表中的记录仅仅可以追踪并和它的最新版本,这种表我们称之为普通表,来自数据库 或 HBase 的表可以定义成普通表。

    声明版本表

    在 Flink1.12 中,定义了主键约束(1)和事件时间属性(2)的表就是版本表。

    -- 定义一张版本表
    CREATE TABLE product_changelog (
      product_id STRING,
      product_name STRING,
      product_price DECIMAL(10, 4),
      update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL,
      PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) 定义主键约束
      WATERMARK FOR update_time AS update_time   -- (2) 通过 watermark 定义事件时间              
    ) WITH (
      'connector' = 'kafka-x',
      'topic' = 'products',
      'scan.startup.mode' = 'earliest-offset',
      'properties.bootstrap.servers' = 'localhost:9092',
      'format' = 'debezium-json'    -- 时态表限定debezium-json
    );

    声明版本视图

    Flink1.12还支持讲Append-Only流转成changlog流。 先声明一个Append-Only表

    -- 定义一张 append-only 表
    CREATE TABLE RatesHistory (
        currency_time TIMESTAMP(3),
        currency STRING,
        rate DECIMAL(38, 10),
        WATERMARK FOR currency_time AS currency_time   -- 定义事件时间
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'rates',
      'scan.startup.mode' = 'earliest-offset',
      'properties.bootstrap.servers' = 'localhost:9092',
      'format' = 'json'                                -- 普通的 append-only 流
    )

    然后通过去重查询定义版本视图

    CREATE VIEW versioned_rates AS              
    SELECT currency, rate, currency_time            -- (1) `currency_time` 保留了事件时间
      FROM (
          SELECT *,
          ROW_NUMBER() OVER (PARTITION BY currency  -- (2) `currency` 是去重 query 的 unique key,可以作为主键
             ORDER BY currency_time DESC) AS rowNum 
          FROM RatesHistory )
    WHERE rowNum = 1; 

    时间区间 Join

    如果一个 Join 限定输入时间属性必须在一定的时间限制中(即时间窗口),那么就称之为时间区间 Join。

    SELECT *
    FROM
      Orders o,
      Shipments s
    WHERE o.id = s.orderId AND
          o.ordertime BETWEEN s.shiptime - INTERVAL '4' HOUR AND s.shiptime

    与常规 Join 操作相比,时间区间 Join 只支持带有时间属性的递增表。由于时间属性是单调递增的,Flink 可以从状态中移除过期的数据,而不会影响结果的正确性。

    时态表Join

    时态表 Join 意味着对任意表(左输入/探针侧)去关联一个时态表(右输入/构建侧)的版本,时态表可以是一张跟踪所有变更记录的表(例如数据库表的 changelog,包含多个表快照),也可以是物化所有变更之后的表(例如数据库表,只有最新表快照)。

    Flink 使用了 SQL:2011 标准引入的时态表 Join 语法,时态表 Join 的语法如下:

    SELECT [column_list]
    FROM table1 [AS <alias1>]
    [LEFT] JOIN table2 FOR SYSTEM_TIME AS OF table1.{ proctime | rowtime } [AS <alias2>]
    ON table1.column-name1 = table2.column-name1

    基于事件时间的时态 Join

    基于事件时间的时态表 join 使用(左侧输入/探针侧) 的 事件时间 去关联(右侧输入/构建侧) 版本表 对应的版本。 基于事件时间的时态表 join 仅支持关版本表或版本视图,版本表或版本视图只能是一个 changelog 流。 但是,Flink 支持将 append-only 流转换成 changelog 流,因此版本表也可以来自一个 append-only 流。 查看声明版本视图 获取更多的信息关于如何声明一张来自 append-only 流的版本表。

    将事件时间作为时间属性时,可将 过去 时间属性与时态表一起使用。这允许对两个表中在相同时间点的记录执行 Join 操作。 与基于处理时间的时态 Join 相比,时态表不仅将构建侧记录的最新版本(是否最新由所定义的主键所决定)保存在 state 中,同时也会存储自上一个 watermarks 以来的所有版本(按时间区分)。

    例如,在探针侧表新插入一条事件时间时间为 12:30:00 的记录,它将和构建侧表时间点为 12:30:00 的版本根据时态表的概念进行 Join 运算。 因此,新插入的记录仅与时间戳小于等于 12:30:00 的记录进行 Join 计算(由主键决定哪些时间点的数据将参与计算)。

    通过定义事件时间,watermarks 允许 Join 运算不断向前滚动,丢弃不再需要的构建侧快照。因为不再需要时间戳更低或相等的记录。

    下面的例子展示了订单流关联产品表这个场景举例,orders 表包含了来自 Kafka 的实时订单流,product_changelog 表来自数据库表 products 的 changelog , 产品的价格在数据库表 products 中是随时间实时变化的。

    SELECT * FROM product_changelog;
    
    (changelog kind)  update_time product_name price
    ================= =========== ============ ===== 
    +(INSERT)         00:01:00    scooter      11.11
    +(INSERT)         00:02:00    basketball   23.11
    -(UPDATE_BEFORE)  12:00:00    scooter      11.11
    +(UPDATE_AFTER)   12:00:00    scooter      12.99  <= 产品 `scooter` 在 `12:00:00` 时涨价到了 `12.99`
    -(UPDATE_BEFORE)  12:00:00    basketball   23.11 
    +(UPDATE_AFTER)   12:00:00    basketball   19.99  <= 产品 `basketball` 在 `12:00:00` 时降价到了 `19.99`
    -(DELETE)         18:00:00    scooter      12.99  <= 产品 `scooter` 在 `18:00:00` 从数据库表中删除

    如果我们想输出 product_changelog 表在 10:00:00 对应的版本,表的内容如下所示:

    update_time  product_id product_name price
    ===========  ========== ============ ===== 
    00:01:00     p_001      scooter      11.11
    00:02:00     p_002      basketball   23.11

    如果我们想输出 product_changelog 表在 13:00:00 对应的版本,表的内容如下所示:

    update_time  product_id product_name price
    ===========  ========== ============ ===== 
    12:00:00     p_001      scooter      12.99
    12:00:00     p_002      basketball   19.99

    通过基于事件时间的时态表 join, 我们可以 join 上版本表中的不同版本:

    CREATE TABLE orders (
      order_id STRING,
      product_id STRING,
      order_time TIMESTAMP(3),
      WATERMARK FOR order_time AS order_time  -- defines the necessary event time
    ) WITH (
    ...
    );
    
    -- 设置会话的时间区间, changelog 里的数据库操作时间是以 epoch 开始的毫秒数存储的,
    -- 在从毫秒转化为时间戳时,Flink SQL 会使用会话的时间区间
    -- 因此,请根据 changelog 中的数据库操作时间设置合适的时间区间
    SET table.local-time-zone=UTC;
    
    -- 声明一张版本表
    CREATE TABLE product_changelog (
      product_id STRING,sq
      product_name STRING,
      product_price DECIMAL(10, 4),
      update_time TIMESTAMP(3) METADATA FROM 'value.source.timestamp' VIRTUAL, -- 注意:自动从毫秒数转为时间戳
      PRIMARY KEY(product_id) NOT ENFORCED,      -- (1) defines the primary key constraint
      WATERMARK FOR update_time AS update_time   -- (2) defines the event time by watermark                               
    ) WITH (
      'connector' = 'kafka',
      'topic' = 'products',
      'scan.startup.mode' = 'earliest-offset',
      'properties.bootstrap.servers' = 'localhost:9092',
      'value.format' = 'debezium-json'
    );
    
    -- 基于事件时间的时态表 Join
    SELECT
      order_id,
      order_time,
      product_name,
      product_time,
      price
    FROM orders AS O
    LEFT JOIN product_changelog FOR SYSTEM_TIME AS OF O.order_time AS P
    ON O.product_id = P.product_id;
    
    order_id order_time product_name product_time price
    ======== ========== ============ ============ =====
    o_001    00:01:00   scooter      00:01:00     11.11
    o_002    00:03:00   basketball   00:02:00     23.11
    o_003    12:00:00   scooter      12:00:00     12.99
    o_004    12:00:00   basketball   12:00:00     19.99
    o_005    18:00:00   NULL         NULL         NULL

    基于事件时间的时态表 Join 通常用在通过 changelog 丰富流上数据的场景。

    注意: 基于事件时间的时态表 Join 是通过左右两侧的 watermark 触发,请确保为 join 两侧的表设置了合适的 watermark。

    注意: 基于事件时间的时态表 Join 的 join key 必须包含时态表的主键,例如:表 product_changelog 的主键 P.product_id 必须包含在 join 条件 O.product_id = P.product_id 中。

    基于处理时间的时态 Join

    基于处理时间的时态表 join 使用任意表 (左侧输入/探针侧) 的 处理时间 去关联 (右侧输入/构建侧) 普通表的最新版本. 基于处理时间的时态表 join 当前只支持关联普通表或普通视图,且支持普通表或普通视图当前只能是 append-only 流。

    如果将处理时间作为时间属性,过去 时间属性将无法与时态表一起使用。根据定义,处理时间总会是当前时间戳。 因此,关联时态表的调用将始终返回底层表的最新已知版本,并且底层表中的任何更新也将立即覆盖当前值。

    可以将处理时间的时态 Join 视作简单的 HashMap <K,V>,HashMap 中存储来自构建侧的所有记录。 当来自构建侧的新插入的记录与旧值具有相同的 Key 时,旧值会被覆盖。 探针侧的每条记录将总会根据 HashMap 的最新/当前状态来计算。

    接下来的示例展示了订单流 Orders 该如何与实时变化的汇率表 Lates 进行基于处理时间的时态 Join 操作,LatestRates 总是表示 HBase 表 Rates 的最新内容。

    LastestRates 中的数据在时间点 10:1510:30 时是相等的。欧元汇率在时间点 10:52 从 114 变化至 116 。

    Orders 包含了金额字段 amount 和货币字段 currency 的支付记录数据。例如在 10:15 有一笔金额为 2 欧元的订单记录。

    SELECT * FROM Orders;
    
    amount currency
    ====== =========
         2 Euro             <== arrived at time 10:15
         1 US Dollar        <== arrived at time 10:30
         2 Euro             <== arrived at time 10:52

    基于以上,我们想要计算所有 Orders 表的订单金额总和,并同时转换为对应成日元的金额。

    例如,我们想要以表 LatestRates 中的汇率将以下订单转换,则结果将为:

    amount currency     rate   amout*rate
    ====== ========= ======= ============
         2 Euro          114          228    <== arrived at time 10:15
         1 US Dollar     102          102    <== arrived at time 10:30
         2 Euro          116          232    <== arrived at time 10:52

    通过时态表 Join,我们可以将上述操作表示为以下 SQL 查询:

    SELECT
      o.amout, o.currency, r.rate, o.amount * r.rate
    FROM
      Orders AS o
      JOIN LatestRates FOR SYSTEM_TIME AS OF o.proctime AS r
      ON r.currency = o.currency

    探针侧表中的每个记录都将与构建侧表的当前版本所关联。 在此示例中,查询使用处理时间作为处理时间,因而新增订单将始终与表 LatestRates 的最新汇率执行 Join 操作。注意,结果对于处理时间来说不是确定的。 基于处理时间的时态表 Join 通常用在通过外部表(例如维度表)丰富流上数据的场景。

    常规 Join 相比,尽管构建侧表的数据发生了变化,但时态表 Join 的变化前结果不会随之变化。

    • 对于基于事件时间的时态 Join, join 算子保留 Join 两侧流的状态并通过 watermark 清理。
    • 对于基于处理时间的时态 Join, join 算子保留仅保留右侧(构建侧)的状态,且构建侧的状态只包含数据的最新版本,右侧的状态是轻量级的; 对于在运行时有能力查询外部系统的时态表,join 算子还可以优化成不保留任何状态,此时算子是非常轻量级的。

    时间区间 Join 相比,时态表 Join 没有定义决定构建侧记录所属的将被 Join 时间窗口。 探针侧的记录将总是与构建侧在对应处理时间的最新数据执行 Join,因而构建侧的数据可能是任意旧的。

    Flink1.12与Flink1.10的差异

    Flink1.10维表关联方式

    create table sink(
        flag varchar,
        field1 varchar,
        field2 varchar,
        field3 varchar
     )with(
        type='console'
     );
    insert into sink
    select
        'flink110' as flag
        ,s1.field1 as field1
        ,s1.field2 as field2
        ,s2.field2 as field3  
    from source s1  
    left join dim as s2 
    on s1.field1 = s2.field1;

    Flink1.12维表关联方式

    create table sink(
        flag varchar,
        field1 varchar,
        field2 varchar,
        field3 varchar
     )with(
        'connector'='stream-x',
        'print'='true'
     );
    insert into sink
    select
        'flink112' as flag
        ,s1.field1 as field1
        ,s1.field2 as field2
        ,s2.field2 as field3  
    from source s1  
    left join dim for SYSTEM_TIME AS OF s1.proc_time as s2 
    on s1.field1 = s2.field1;

    Flink1.10嵌套Json

    CREATE TABLE source(
        column.field1 string as field1,
        column.field2 int as field2
     )WITH(
        type ='kafka',
        bootstrapServers ='110.42.146.82:9092',
        offsetReset ='latest',
        topic ='planet',
        charsetName ='utf-8',
        timezone='Asia/Shanghai',
        updateMode ='append',
        enableKeyPartitions ='false',
        topicIsPattern ='false',
        parallelism ='1'
     );
    
    
    insert into sink
    select
        *     
    from(
        select
            '110_demo1' as flag
            ,field1 as field1
            ,field2 as field2     
        FROM source );

    Flink1.12嵌套Json

    CREATE TABLE source(
        `column` ROW<field1 string,
        field2 int>,
        proc_time AS PROCTIME() 
     )WITH(
        'properties.bootstrap.servers'='110.42.146.82:9092',
        'connector'='kafka-x',
        'scan.parallelism'='1',
        'format'='json',
        'topic'='planet',
        'scan.startup.mode'='latest-offset'
     );
    
    insert into sink
    select
        *     
        ,ROW(field1,field2) as columns 
    from(
        select
            '112_demo1' as flag
            ,`column`.field1 as field1
            ,`column`.field2 as field2     
        FROM source );

    本文由博客群发一文多发等运营工具平台 OpenWrite 发布

    展开全文
  • Flink 1.12 版本在 20 年 12 月已经正式 Release,目前我们的 Flink SQL 作业的 Flink 引擎版本还是 1.10,本文主要用以评估 Flink 1.10 升级到 1.12 整体所能带来的预期收益,同时结合所需投入的成本,决定是否需要...

    前言

    Flink 1.12 版本在 20 年 12 月已经正式 Release,目前我们的 Flink SQL 作业的 Flink 引擎版本还是 1.10,本文主要用以评估 Flink 1.10 升级到 1.12 整体所能带来的预期收益,同时结合所需投入的成本,决定是否需要升级 Flink SQL 引擎版本到 1.12。本次升级所评估的收益包含 1.11 和 1.12 版本所带来的收益,如有理解错误,欢迎指出,一起交流。

    一、Flink SQL 作业语法更加简洁,提升实时作业开发效率

    收益:

    FLIP-122 提出了新的 Connector 属性 key, 具体参考 FLIP-122: New Connector Property Keys for New Factory 。 FLIP-122 在 Flink 1.11 Released,Flink 1.11 SQL 语法会更加简洁,这能够提升实时用户开发作业的效率。

    新的代码结构(Kafka Source 举例):

    CREATE TABLE kafka_table (
     ...
    ) WITH (
     'connector' = 'kafka-0.10',
     'topic' = 'test-topic',
     'properties.bootstrap.servers' = 'localhost:9092',
     'properties.group.id' = 'hello_world',
     'format' = 'json',
     'json.fail-on-missing-field' = 'false'
    );
    

    可以看到,新的 Flink SQL 语法,整体对于用户来说,更为简洁和直观,用户开发时,也会更为的方便。

    二、Flink SQL 支持 Kafka Upsert Connector

    2.1 背景

    FLIP-149 云邪提出了 upsert-kafka Connector,具体链接:https://cwiki.apache.org/confluence/display/FLINK/FLIP-149%3A+Introduce+the+upsert-kafka+Connector。首先要理解 upsert 的含义:一条记录(有 主键),如果不存在,则插入,有则更新,全称:insert / update。Upsert-kafka connector 产生一个changelog 流,changelog 流中的数据记录可以理解为 UPSERT 流,也就是INSERT/UPDATE,因为具有相同键的任何现有行都会被覆盖。同样,空值可以用一种特殊的方式理解:带有空值的记录表示“删除”。

    Upsert-kafka Connector 对于我们来说,解决最常用的场景是: 从 Kafka Topic 按某类 Key 取最新数据,然后下游聚合,最后写入到外部存储。这种通用的实时开发流程一般是:上游为 mysql binglog -> Kafka 的数据同步任务,然后下游需要按照某类key 取最新数据进行聚合等等。

    下面是 Flink 1.10 按照 a 取最新值,然后下游进行聚合的 SQL 代码,主要使用到了 last_value:

    create table hello_world
    (
        a                     varchar
      , b                 bigint
      , c                   bigint
      , d                     bigint
    ) with (
         xxx
           );
    
    create view temp_hello as
    select
        a
      , last_value( b                 )  as  b
      , last_value( c                   )  as  c
      , last_value( d              )  as  d
    from
        hello_world
    group by
        a;
    create view temp_world as 
    select 
    		sum(b) as sum_b
    		,sum(c) as sum_c
    		,sum(d) as sum_d
    from temp_hello;
    

    在 Flink 1.10 中,当前这类任务开发对于用户来说,还是不够友好,需要很多代码,同时也会造成 Flink SQL 冗长。Flink 1.12 SQL Connector 支持 Kafka Upsert Connector,这也是我们公司内部业务方对实时平台提出的需求。

    2.2 收益

    Flink 1.12 支持了 Flink SQL Kafka upsert connector ,下面是使用 Flink 1.12 代码改写上述逻辑:

    CREATE TABLE temp_hello (
        a                     varchar
      , b                  bigint
      , c                   bigint
      , d                     bigint
      PRIMARY KEY (a) NOT ENFORCED
    ) WITH (
      xx
    );
    
    create view temp_world as 
    select 
    		sum(b) as sum_b
    		,sum(c) as sum_c
    		,sum(d) as sum_d
    from temp_hello;
    

    收益:便利用户有这种需要从 kafka 取最新记录操作的实时任务开发,比如这种 binlog -> kafka,然后用户聚合操作,这种场景还是非常多的,这能提升实时作业开发效率,同时 1.12 做了优化,性能会比单纯的 last_value 性能要好

    三、Flink Yarn 作业 On k8s 的生产级别能力

    背景:

    之前我们内部 Flink Jar 作业已经全部 K8s 化,Flink SQL 作业由于是推广初期,还是在 Yarn 上面进行运行,为了将实时计算 Flink 全部 K8s 化(去 Yarn),所以我们 Flink SQL 作业也需要迁移到 K8s,目前 Flink 1.12 已经满足生产级别的 Flink k8s 功能,所以 Flink SQL K8s 化,打算直接使用社区的 On k8s 能力。

    风险:

    虽然和社区的人沟通,Flink 1.12 on k8s 没有什么问题,但是具体功能还是需要先 POC 验证一下,同时可能社区 Flink on k8s 的能力,可能会限制我们这边一些 k8s 功能使用,比如 hostpath volome 以及 Ingress 的使用,这里可能需要改底层源码来进行快速支持(社区有相关 JIRA 要做)。

    收益:

    **(Flink 去 Yarn)**Flink 1.12 on k8s 对于我们最主要的两个点:

    1. JobManager 的高可用,能够依赖 Zookeeper 或者 k8s ConfigMap
    2. Flink 云原生能力,ResourceManager 能够和 K8s API Server 交互,自动申请所需资源。
    3. 社区 Flink on K8s 方面的一些优化

    最终实时集群 Flink 作业去 Yarn 化,同时为离线提供更多能够弹性扩缩资源,更好的降低成本。

    四、Flink On Hive 能力(生产级别)

    背景:

    目前在有赞已经开始有部分实时业务方希望 Flink 能够支持 Hive,比如 Flink-Hive 近实时的数仓中间层【小时表可更快产出】,以及 Flink 实时任务和离线数据对比功能。而在 Flink 1.12 中,已经支持生产级别 Flink On Hive 任务运行(社区 Commitor 说),所以基于这次 Flink SQL 引擎版本升级,能够支持 Flink on hive 生产功能。

    收益:

    解决部分实时业务方, Flink On Hive 的业务需求,下面是 Flink 1.12 具体 Hive 相关功能:

    1. 支持 Sort-Merge Shuffle (FLIP-148)

    2. 在 FileSystem/Hive connector 的流式写入中支持小文件合并 (FLINK-19345),在 Flink 1.12 中,File Sink 增加了小文件合并功能,从而使得即使作业 checkpoint 间隔比较小时,也不会产生大量的文件。要开启小文件合并,可以按照文档[11]中的说明在 FileSystem connector 中设置 auto-compaction = true 属性。

    五、Flink 基于 Savepoint 跨集群迁移能力

    背景:

    当前我们使用 Flink 版本是 1.10,而在Flink 1.11 以下,在任务 Savepoint Meta 文件里面,存储的这次 Savepoint 引用的状态文件路径都是 HDFS 全路径,所以在跨集群迁移时,Savepoint 是不能够进行复用的,所以一旦有集群迁移,Flink SQL 作业状态会丢失,如果有状态强相关的实时作业,可能会有故障风险。该问题已经在 1.11 已经修复,具体可以参考:FLINK-5763:Make savepoints self-contained and relocatable

    收益:

    稳定性:Flink 作业有基于 Savepoint 跨集群不丢状态的恢复和迁移能力。

    六、其他对我们有用收益

    6.1 Flink Web UI

    1. [FLIP-103] 改善 Web UI 上 JM/TM 日志的展示
    2. [FLIP-99] 允许展示更多的历史 Failover 异常
    3. [Flink-14816] 允许用户直接在页面上进行 Thread Dump

    6.2 Flink Connector/ Source Sinks

    1. Kafka Connector 支持 Watermark 下推 (FLINK-20041)
    2. Flink 1.11 introduces new table source and sink interfaces with changelog mode (see New TableSource and TableSink Interfaces) and support for the Debezium and Canal formats (FLIP-105).
    3. 利用 Multi-input 算子进行 Join 优化 (FLINK-19621),Shuffling 是一个 Flink 作业中最耗时的操作之一。为了消除不必要的序列化反序列化开销、数据 spilling 开销,提升 Table API / SQL 上批作业和流作业的性能, planner 当前会利用上一个版本中已经引入的N元算子(FLIP-92),将由 forward 边所连接的多个算子合并到一个 Task 里执行。

    其他一些 Flink Bug Fix。

    展开全文
  • 摘要:本文由社区志愿者陈政羽整理,Apache Flink Committer、阿里巴巴技术专家宋辛童,Apache Flink Contributor、阿里巴巴高级开发工程师郭旸泽在 F...

    摘要:本文由社区志愿者陈政羽整理,Apache Flink Committer、阿里巴巴技术专家宋辛童,Apache Flink Contributor、阿里巴巴高级开发工程师郭旸泽在 Flink Forward Asia 2020 分享的议题《Flink 1.12 资源管理新特性》。内容包括:

    1. 内存管理

    2. 资源调度

    3. 扩展资源框架

    4. 未来规划

    5. 总结

    Tips:点击文末「阅读原文」即可查看原文视频~

     GitHub 地址 

    欢迎大家给 Flink 点赞送 star~

    一、内存管理


    首先回顾 Flink 的内存模型变迁。下图左边分别为 Flink 1.10、Flink 1.11 引入的新的内存模型。尽管涉及的模块较多,但 80% - 90% 的用户仅需关注真正用于任务执行的 Task Heap Memory、Task Off-Heap Memory、Network Memory、Managed Memory 四部分。

    其它模块大部分是 Flink 的框架内存,正常不需要调整,即使遇到问题也可以通过社区文档来解决。除此之外,“一个作业究竟需要多少内存才能满足实际生产需求” 也是大家不得不面临的问题,比如其他指标的功能使用、作业是否因为内存不足影响了性能、是否存在资源浪费等。

    针对上述内容,社区在 Flink 1.12 版本提供了一个全新的, 关于 Task manager 和 Job manager 的 Web UI。

    在新的 Web UI 中,可以直接将每一项监控指标配置值、实际使用情况对应到内存模型中进行直观的展示。在此基础上,可以更清楚的了解到作业的运行情况、该如何调整、用哪些配置参数调整等 (社区也有相应的文档提供支持)。通过新的 Web UI,大家能更好的了解作业的使用情况,内存管理也更方便。

    1. 本地内存(Managed Memory)

    Flink 托管内存实际上是 Flink 特有的一种本地内存,不受 JVM 和 GC 的管理,而是由 Flink 自行进行管理。

    本地内存的特点主要体现在两方面:

    • 一方面是 slot 级别的预算规划,它可以保证作业运行过程中不会因为内存不足,造成某些算子或者任务无法运行;也不会因为预留了过多的内存没有使用造成资源浪费。同时 Flink 能保证当任务运行结束时准确将内存释放,确保 Task Manager 执行新任务时有足够的内存可用。

    • 另一方面,资源适应性也是托管内存很重要的特性之一,指算子对于内存的需求是动态可调整的。具备了适应性,算子就不会因为给予任务过多的内存造成资源使用上的浪费,也不会因为提供的内存相对较少导致整个作业无法运行,使内存的运用保持在一定的合理范围内。

      当然,在内存分配相对比较少情况下,作业会受到一定限制,例如需要通过频繁的落盘保证作业的运行,这样可能会影响性能。

    当前,针对托管内存,Flink 的使用场景如下:

    • RocksDB 状态后端:在流计算的场景中,每个 Slot 会使用 State 的 Operator,从而共享同一底层 的 RocksDB 缓存;

    • Flink 内置算子:包含批处理、Table SQL、DataSet API 等算子,每个算子有独立的资源预算,不会相互共享;

    • Python 进程:用户使用 PyFlink,使用 Python 语言定义 UDF 时需要启动 Python 的虚拟机进程。

    2. Job Graph 编译阶段

    Flink 对于 management memory 的管理主要分为两个阶段。

    ■ 2.1 作业的 Job Graph 编译阶段

    在这个阶段需要注意三个问题:

    • 第一个问题是:slot 当中到底有哪些算子或者任务会同时执行。这个问题关系到在一个查询作业中如何对内存进行规划,是否还有其他的任务需要使用 management memory,从而把相应的内存留出来。在流式的作业中,这个问题是比较简单的,因为我们需要所有的算子同时执行,才能保证上游产出的数据能被下游及时的消费掉,这个数据才能够在整个 job grep 当中流动起来。但是如果我们是在批处理的一些场景当中,实际上我们会存在两种数据 shuffle 的模式。

      • 一种是 pipeline 的模式,这种模式跟流式是一样的,也就是我们前面说到的 bounded stream 处理方式,同样需要上游和下游的算子同时运行,上游随时产出,下游随时消费。


      • 另外一种是所谓的 batch 的 blocking的方式,它要求上游把数据全部产出,并且落盘结束之后,下游才能开始读数据。

    • 这两种模式会影响到哪些任务可以同时执行。目前在 Flink 当中,根据作业拓扑图中的一个边的类型 (如图上)。我们划分出了定义的一个概念叫做 pipelined region,也就是全部都由 pipeline 的边锁连通起来的一个子图,我们把这个子图识别出来,用来判断哪些 task 会同时执行。

    • 第二个问题是:slot 当中到底有哪些使用场景?我们刚才介绍了三种 manage memory 的使用场景。在这个阶段,对于流式作业,可能会出现 Python UDF 以及 Stateful Operator。这个阶段当中我们需要注意的是,这里并不能肯定 State Operator 一定会用到 management memory,因为这跟它的状态类型是相关的。

      • 如果它使用了 RocksDB State Operator,是需要使用 manage memory 的;

      • 但是如果它使用的是 Heap State Backend,则并不需要。

      然而,作业在编译的阶段,其实并不知道状态的类型,这里是需要去注意的地方。

    • 第三个问题:对于 batch 的作业,我们除了需要清楚有哪些使用场景,还需要清楚一件事情,就是前面提到过 batch 的 operator。它使用 management memory 是以一种算子独享的方式,而不是以 slot 为单位去进行共享。我们需要知道不同的算子应该分别分配多少内存,这个事情目前是由 Flink 的计划作业来自动进行设置的。

    ■ 2.2 执行阶段

    第一个步骤是根据 State Backend 的类型去判断是否有 RocksDB。如上图所示,比如一个 slot,有 ABC 三个算子,B 跟 C 都用到了 Python,C 还用到了 Stateful 的 Operator。这种情况下,如果是在 heap 的情况下,我们走上面的分支,整个 slot 当中只有一种在使用,就是Python。之后会存在两种使用方式:

    • 其中一个是 RocksDB State Backend,有了第一步的判断之后,第二步我们会根据用户的配置,去决定不同使用方式之间怎么样去共享 slot 的 management memory。

      在这个 Steaming 的例子当中,我们定义的 Python 的权重是 30%,State Backend 的权重是 70%。在这样的情况下,如果只有 Python,Python 的部分自然是使用 100% 的内存(Streaming 的 Heap State Backend 分支);

    • 而对于第二种情况(Streaming 的 RocksDB State Backend 分支),B、C 的这两个 Operator 共用 30% 的内存用于 Python 的 UDF,另外 C 再独享 70% 的内存用于 RocksDB State Backend。最后 Flink 会根据 Task manager 的资源配置,一个 slot 当中有多少 manager memory 来决定每个 operator 实际可以用的内存的数量。

    批处理的情况跟流的情况有两个不同的地方,首先它不需要去判断 State Backend 的类型,这是一个简化;其次对于 batch 的算子,上文提到每一个算子有自己独享的资源的预算,这种情况下我们会去根据使用率算出不同的使用场景需要多少的 Shared 之后,还要把比例进一步的细分到每个 Operator。

    3. 参数配置


    配置参数默认值备注
    大小taskmanager.memory.managed.size/绝对大小
    权重taskmanager.memory.managed.fraction0.4相对大小(占用Flink)总内存比例

    taskmanager.memory.managed.consumer-weightDATAPROC:70,PYTHON:30多种用途并存时候分配权重

    上方图表展示了我们需要的是 manager,memory 大小有两种配置方式:

    • 一种是绝对值的配置方式;

    • 还有一种是作为 Task Manager 总内存的一个相对值的配置方式。


    taskmanager.memory.managed.consumer-weight 是一个新加的配置项,它的数据类型是 map 的类型,也就是说我们在这里面实际上是给了一个 key 冒号 value,然后逗号再加上下一组 key 冒号 value 的这样的一个数据的结构。这里面我们目前支持两种 consumer 的 key:

    • 一个是 DATAPROC, DATAPROC 既包含了流处理当中的状态后端 State Backend 的内存,也包含了批处理当中的 Batch Operator;

    • 另外一种是 Python。

    二、 资源调度


    部分资源调度相关的 Feature 是其他版本或者邮件列表里面大家询问较多的,这里我们也做对应的介绍。

    1. 最大 Slot 数

    Flink 在 1.12 支持了最大 slot 数的一个限制(slotmanager.number-of-slots.max),在之前我们也有提到过对于流式作业我们要求所有的 operator 同时执行起来,才能够保证数据的顺畅的运行。在这种情况下,作业的并发度决定了我们的任务需要多少个 slot 和资源去执行作业。

    然而对于批处理其实并不是这样的,批处理作业往往可以有一个很大的并发度,但实际并不需要这么多的资源,批处理用很少的资源,跑完前面的任务腾出 Slot 给后续的任务使用。通过这种串行的方式去执行任务能避免 YARN/K8s 集群的资源过多的占用。目前这个参数支持在 yarn/mesos/native k8 使用。

    2. TaskManager 容错

    在我们实际生产中有可能会有程序的错误、网络的抖动、硬件的故障等问题造成 TaskManager 无法连接,甚至直接挂掉。我们在日志中常见的就是 TaskManagerLost 这样的报错。对于这种情况需要进行作业重启,在重启的过程中需要重新申请资源和重启 TaskManager 进程,这种性能消耗代价是非常高昂的。

    对于稳定性要求相对比较高的作业,Flink1.12 提供了一个新的 feature,能够支持在 Flink 集群当中始终持有少量的冗余的 TaskManager,这些冗余的 TaskManager 可以用于在单点故障的时候快速的去恢复,而不需要等待一个重新的资源申请的过程。

    通过配置 slotmanager.redundant-taskmanager-num 可以实现冗余 TaskManager。这里所谓的冗余 TaskManager 并不是完完全全有两个 TaskManager 是空负载运行的,而是说相比于我所需要的总共的资源数量,会多出两个 TaskManager。

    任务可能是相对比较均匀的分布在上面,在能够在利用空闲 TaskManager 的同时,也能够达到一个相对比较好的负载。一旦发生故障的时候,可以去先把任务快速的调度到现有的还存活的 TaskManager 当中,然后再去进行新一轮的资源申请。目前这个参数支持在 yarn/mesos/native k8 使用。

    3. 任务平铺分布

    任务平铺问题主要出现在 Flink Standalone 模式下或者是比较旧版本的 k8s 模式部署下的。在这种模式下因为事先定义好了有多少个 TaskManager,每个 TaskManager 上有多少 slot,这样会导致经常出现调度不均的问题,可能部分 manager 放的任务很满,有的则放的比较松散。

    在 1.11 的版本当中引入了参数 cluster.evenly-spread-out-slots,这样的参数能够控制它,去进行一个相对比较均衡的调度。

    注意:

    • 第一,这个参数我们只针对 Standalone 模式,因为在 yarn 跟 k8s 的模式下,实际上是根据你作业的需求来决定起多少 task manager 的,所以是先有了需求再有 TaskManager,而不是先有 task manager,再有 slot 的调度需求。

      在每次调度任务的时候,实际上只能看到当前注册上来的那一个 TaskManager,Flink 没办法全局的知道后面还有多少 TaskManager 会注册上来,这也是很多人在问的一个问题,就是为什么特性打开了之后好像并没有起到一个很好的效果。

    • 第二个需要注意的点是,这里面我们只能决定每一个 TaskManager 上有多少空闲 slot,然而并不能够决定每个 operator 有不同的并发数,Flink 并不能决定说每个 operator 是否在 TaskManager 上是一个均匀的分布,因为在 flink 的资源调度逻辑当中,在整个 slot 的 allocation 这一层是完全看不到 task 的。

    三、扩展资源框架


    1. 背景

    近年来,随着人工智能领域的不断发展,深度学习模型已经被应用到了各种各样的生产需求中,比较典型的场景如推荐系统,广告推送,智能风险控制。这些也是 Flink 一直以来被广泛使用的场景,因此,支持人工智能一直以来都是 Flink 社区的长远目标之一。针对这个目标,目前已经有了很多第三方的开源扩展工作。由阿里巴巴开源的工作主要有两个:

    • 一个是 Flink AI Extended 的项目,是基于 Flink 的深度学习扩展框架,目前支持 TensorFlow、PyTorch 等框架的集成,它使用户可以将 TensorFlow 当做一个算子,放在 Flink 任务中。

    • 另一个是 Alink,它是一个基于 Flink 的通用算法平台,里面也内置了很多常用的机器学习算法。

    以上的两个工作都是从功能性上对 Flink 进行扩展,然而从算力的角度上讲,深度学习模型亦或机器学习算法,通常都是整个任务的计算瓶颈所在。GPU 则是这个领域被广泛使用用来加速训练或者预测的资源。因此,支持 GPU 资源来加速计算是 Flink 在 AI 领域的发展过程中必不可少的功能。

    2. 使用扩展资源

    目前 Flink 支持用户配置的资源维度只有 CPU 与内存,而在实际使用中,不仅是 GPU,我们还会遇到其他资源需求,如 SSD 或 RDMA 等网络加速设备。因此,我们希望提供一个通用的扩展资源框架,任何扩展资源都可以以插件的形式来加入这个框架,GPU 只是其中的一种扩展资源。

    对于扩展资源的使用,可以抽象出两个通用需求:

    • 需要支持该类扩展资源的配置与调度。用户可以在配置中指明对这类扩展资源的需求,如每个 TaskManager 上需要有一块 GPU 卡,并且当 Flink 被部署在 Kubernetes/Yarn 这类资源底座上时,需要将用户对扩展资源的需求进行转发,以保证申请到的 Container/Pod 中存在对应的扩展资源。

    • 需要向算子提供运行时的扩展资源信息。用户在自定义算子中可能需要一些运行时的信息才能使用扩展资源,以 GPU 为例,算子需要知道它内部的模型可以部署在那一块 GPU 卡上,因此,需要向算子提供这些信息。

    3. 扩展资源框架使用方法

    使用资源框架我们可以分为以下这 3 个步骤:

    • 首先为该扩展资源设置相关配置;

    • 然后为所需的扩展资源准备扩展资源框架中的插件;

    • 最后在算子中,从 RuntimeContext 来获取扩展资源的信息并使用这些资源。

    ■ 3.1 配置参数

    # 定义扩展资源名称,“gpu”external-resources: gpu# 定义每个 TaskManager 所需的 GPU 数量external-resource.gpu.amount: 1 # 定义Yarn或Kubernetes中扩展资源的配置键external-resource.gpu.yarn.config-key: yarn.io/gpuexternal-resource.gpu.kubernetes.config-key: nvidia.com/gpu# 定义插件 GPUDriver 的工厂类。external-resource.gpu.driver-factory.class: org.apache.flink.externalresource.gpu.GPUDriverFactory
    
    
    

    以上是使用 GPU 资源的配置示例:

    • 对于任何扩展资源,用户首先需要将它的名称加入 "external-resources" 中,这个名称也会被用作该扩展资源其他相关配置的前缀来使用。示例中,我们定义了一种名为 "gpu" 的资源。

    • 在调度层,目前支持用户在 TaskManager 的粒度来配置扩展资源需求。示例中,我们定义每个 TaskManager 上的 GPU 设备数为 1。

    • 将 Flink 部署在 Kubernetes 或是 Yarn 上时,我们需要配置扩展资源在对应的资源底座上的配置键,以便 Flink 对资源需求进行转发。示例中展示了 GPU 对应的配置。

    • 如果提供了插件,则需要将插件的工厂类名放入配置中。

    ■ 3.2 前置准备

    在实际使用扩展资源前,还需要做一些前置准备工作,以 GPU 为例:

    • 在 Standalone 模式下,集群管理员需要保证 GPU 资源对 TaskManager 进程可见;

    • 在 Kubernetes 模式下,需要集群支持 Device Plugin[6],对应的 Kubernetes 版本为 1.10,并且在集群中安装了 GPU 对应的插件;

    • 在 Yarn 模式下,GPU 调度需要集群 Hadoop 版本在 2.10 或 3.1 以上,并正确配置了 resource-types.xml 等文件。

    ■ 3.3 扩展资源框架插件

    完成了对扩展资源的调度后,用户自定义算子可能还需要运行时扩展资源的信息才能使用它。扩展资源框架中的插件负责完成该信息的获取,它的接口如下:

    public interface ExternalResourceDriverFactory {  /**  * 根据提供的设置创建扩展资源的Driver  */  ExternalResourceDriver createExternalResourceDriver(Configuration config) throws Exception;}
    public interface ExternalResourceDriver {  /**  * 获取所需数量的扩展资源信息  */  Set<? extends ExternalResourceInfo> retrieveResourceInfo(long amount) throws Exception;}
    
    
    

    ExternalResourceDriver 会在各个 TaskManager 上启动,扩展资源框架会调用各个 Driver 的 retrieveResourceInfo 接口来获得 TaskManager 上的扩展资源信息,并将得到的信息传到算子的 RuntimeContext。ExternalResourceDriverFactory 则为插件的工厂类。

    4. GPU 插件

    Flink 目前内置了针对 GPU 资源的插件,其内部通过执行名为 Discovery Script 的脚本来获取当前环境可用的 GPU 信息,目前信息中包含了 GPU 设备的 Index。

    Flink 提供了一个默认脚本,位于项目的 "plugins/external-resource-gpu/" 目录,用户也可以实现自定义的 Discovery Script 并通过配置来指定使用自定义脚本。该脚本与 GPU 插件的协议为:

    • 当调用脚本时,所需要的 GPU 数量将作为第一个参数输入,之后为用户自定义参数列表;

    • 若脚本执行正常,则输出 GPU Index 列表,以逗号分隔;

    • 若脚本出错或执行结果不符合预期,则脚本以非零值退出,这会导致 TaskManager 初始化失败,并在日志中打印脚本的错误信息。

    Flink 提供的默认脚本是通过 "nvidia-smi" 工具来获取当前的机器中可用的 GPU 数量以及 index,并根据所需要的 GPU 数量返回对应数量的 GPU Index 列表。当无法获取到所需数量的 GPU 时,脚本将以非零值退出。

    GPU 设备的资源分为两个维度,流处理器与显存,其显存资源只支持独占使用。因此,当多个 TaskManager 运行在同一台机器上时,若一块 GPU 被多个进程使用,可能导致其显存 OOM。因此,Standalone 模式下,需要 TaskManager 级别的资源隔离机制。

    默认脚本提供了 Coordination Mode 来支持单机中多个 TaskManager 进程之间的 GPU 资源隔离。该模式通过使用文件锁来实现多进程间 GPU 使用信息同步,协调同一台机器上多个 TaskManager 进程对 GPU 资源的使用。

    5. 在算子中获取扩展资源信息

    在用户自定义算子中,可使用在 "external-resources" 中定义的资源名称来调用 RuntimeContext 的 getExternalResourceInfos 接口获取对应扩展资源的信息。以 GPU 为例,得到的每个 ExternalResourceInfo 代表一块 GPU 卡,而其中包含名为 "index" 的字段代表该 GPU 卡的设备 Index。

    public class ExternalResourceMapFunction extends RichMapFunction<String, String> {
      private static finalRESOURCE_NAME="gpu";
      @Override
      public String map(String value) {
        Set<ExternalResourceInfo> gpuInfos = getRuntimeContext().getExternalResourceInfos(RESOURCE_NAME);
        List<String> indexes = gpuInfos.stream()
              .map(gpuInfo -> gpuInfo.getProperty("index").get()).collect(Collectors.toList());
        // Map function with GPU// ...    
      }
    }
    


    6. MNIST Demo

    下图以 MNIST 数据集的识别任务来演示使用 GPU 加速 Flink 作业。

    MNIST 如上图所示,为手写数字图片数据集,每个图片可表示为为 28*28 的矩阵。在该任务中,我们使用预训练好的 DNN 模型,图片输入经过一层全连接网络得到一个 10 维向量,该向量最大元素的下标即为识别结果。

    我们在一台拥有两块 GPU 卡的 ECS 上启动一个有两个 TaskManager 进程的 Standalone 集群。借助默认脚本提供的 Coordination Mode 功能,我们可以保证每个 TaskManager 各使用其中一块 GPU 卡。

    该任务的核心算子为图像识别函数 MNISTClassifier,核心实现如下所示:

    class MNISTClassifier extends RichMapFunction<List<Float>, Integer> {
      @Override  public void open(Configuration parameters) {    //获取GPU信息并且选择第一块GPU    Set<ExternalResourceInfo> externalResourceInfos =   getRuntimeContext().getExternalResourceInfos(resourceName);    final Optional<String> firstIndexOptional = externalResourceInfos.iterator().next().getProperty("index");    // 使用第一块GPU的index初始化JCUDA组件    JCuda.cudaSetDevice(Integer.parseInt(firstIndexOptional.get()));    JCublas.cublasInit();  }}
    
    
    

    在 Open 方法中,从 RuntimeContext 获取当前 TaskManager 可用的 GPU,并选择第一块来初始化 JCuda 以及 JCublas 库。

    class MNISTClassifier extends RichMapFunction<List<Float>, Integer> {    @Override    public Integer map(List<Float> value) {        // 使用Jucblas做矩阵算法        JCublas.cublasSgemv('n', DIMENSIONS.f1, DIMENSIONS.f0, 1.0f,                matrixPointer, DIMENSIONS.f1, inputPointer, 1, 0.0f, outputPointer, 1);
            // 获得乘法结果并得出该图所表示的数字        JCublas.cublasGetVector(DIMENSIONS.f1, Sizeof.FLOAT, outputPointer, 1, Pointer.to(output), 1);
            JCublas.cublasFree(inputPointer);        JCublas.cublasFree(outputPointer);
            int result = 0;        for (int i = 0; i < DIMENSIONS.f1; ++i) {            result = output[i] > output[result] ? i : result;        }        return result;    }}
    
    
    

    在 Map 方法中,将预先训练好的模型参数与输入矩阵放入 GPU 显存,使用 JCublas 进行 GPU 中的矩阵乘法运算,最后将结果向量从 GPU 显存中取出并得到识别结果数字。

    具体案例演示流程可以前往观看视频或者参考 Github 上面的链接动手尝试。

    四、未来计划


    除了上文介绍的这些已经发布的特性外,Apache Flink 社区也正在积极准备更多资源管理方面的优化特性,在未来的版本中将陆续和大家见面。

    • 被动资源调度模式:托管内存使得 Flink 任务可以灵活地适配不同的 TaskManager/Slot 资源,充分利用可用资源,为计算任务提供给定资源限制下的最佳算力。但用户仍需指定计算任务的并行度,Flink 需要申请到满足该并行度数量的 TaskManager/Slot 才能顺利执行。被动资源调度将使 Flink 能够根据可用资源动态改变并行度,在资源不足时能够 best effort 进行数据处理,同时在资源充足时恢复到指定的并行度保障处理性能。

    • 细粒度资源管理:Flink 目前基于 Slot 的资源管理与调度机制,认为所有的 Slot 都具有相同的规格。对于一些复杂的规模化生产任务,往往需要将计算任务拆分成多个子图,每个子图单独使用一个 Slot 执行。当子图间的资源需求差异较大时,使用相同规格的 Slot 往往难以满足资源效率方面的需求,特别是对于 GPU 这类成本较高的扩展资源。细粒度资源管理允许用户为作业的子图指定资源需求,Flink 会根据资源需求使用不同规格的 TaskManager/Slot 执行计算任务,从而优化资源效率。

    五、总结

    通过文章的介绍,相信大家对 Flink 内存管理有了更加清晰的认知。

    • 首先从本地内存、Job Graph 编译阶段、执行阶段来解答每个流程的内存管理以及内存分配细节,通过新的参数配置控制 TaskManager的内存分配;

    • 然后从大家平时遇到资源调度相关问题,包括最大 Slot 数使用,如何进行 TaskManager 进行容错,任务如何通过任务平铺均摊任务资源;

    • 最后在机器学习和深度学习领域常常用到 GPU 进行加速计算,通过解释 Flink 在 1.12 版本如何使用扩展资源框架和演示 Demo, 给我们展示了资源扩展的使用。再针对资源利用率方面提出 2 个社区未来正在做的计划,包括被动资源模式和细粒度的资源管理。

    六、附录

    [1] Accelerating your workload with GPU and other external resources

    [2] 扩展资源框架文档

    [3] FLIP-108: Add GPU support in Flink

    [4] flink-mnist 项目

    另外~《Apache Flink-实时计算正当时》电子书重磅发布,本书将助您轻松 Get Apache Flink 1.13 版本最新特征,同时还包含知名厂商多场景 Flink 实战经验,学用一体,干货多多!快扫描下方二维码获取吧~

    更多 Flink 相关技术问题,可扫码加入社区钉钉交流群~


    ▼ 关注「Flink 中文社区」,获取更多技术干货 ▼

       戳我,查看更多技术干货~

    展开全文
  • Flink 1.12 内存和提交参数

    千次阅读 2021-12-29 11:35:43
    flink1.12 内存和提交参数 在使用yarn cluster模式提交flink的任务时候,往往会涉及到很多内存参数的配置 例如下面的提交命令: flink run -d -m yarn-cluster -yjm 512 -ytm 5028 -yD jobmanager.memory.off-heap....
  • flink 1.12 批处理读写hive基础教程

    千次阅读 2021-12-01 10:20:01
    flink 1.12批量读写hive基础教程
  • Flink 1.12 Release 文档

    2020-12-11 23:19:39
    本文的 Release 文档描述了在 Flink 1.11 和 Flink 1.12 之间更改的重要方面,例如配置,行为或依赖项。如果您打算将 Flink 版本升级到 1.12,请仔细阅读这些说明。 API 移除掉 ExecutionConfig 中过期的方法 移除掉...
  • Flink1.13与Flink1.12状态后端Flink1.12的状态后端(旧版)Flink1.13的状态后端(新版) Flink1.12的状态后端(旧版) MemoryStateBackend 内部数据作为对象在java堆空间中存储。checkpoint时,状态快照发送给...
  • Flink 1.12-SQL

    2021-04-23 15:39:42
    Flink 1.12 SQL 应用 1.mysql实时数据与kafka更新的维度数据进行关联,补全维度后输出到dwd层kafka import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream;...
  • flink1.12 stream windows-top-n

    千次阅读 2021-11-08 15:10:50
    窗口 top-N flink 1.12 package com.cn.stream; import org.apache.flink.api.common.eventtime.WatermarkStrategy; import org.apache.flink.api.common.functions.AggregateFunction; import org.apache.flink....
  • flink 1.12 SQL Demo

    千次阅读 2021-12-24 11:14:20
    Flink 版本 1.12.3 source是kafka 维表是MySQL source left join 维表 public class FlinkTableDemo { public static void main(String[] args) throws Exception { StreamExecutionEnvironment env = ...
  • flink 1.12 时间语义与WaterMark

    千次阅读 2021-03-24 09:15:02
    一. 在新版本中 flink 默认处理的是事件时间而不是 以前默认的处理时间 在学习的... @deprecated In Flink 1.12 the default stream time characteristic has been changed to * [[TimeCharacteristic.EventTime]] ,
  • 时间、窗口和Watermark是Flink的很重要的概念,学习它们是掌握运用Flink的重中之重。 二、时间特性 在DataStream API中,你可以用时间特性告知Flink在创建窗口时如何定义时间。 时间特性是...
  • Flink1.12整合Hadoop3.x报错

    千次阅读 2021-11-19 06:12:55
    Flink1.12整合Hadoop3.x报错 由于项目需要,需要搞一段时间的规则预警,于是先在项目的阿里云服务器上安装了flink集群和hadoop,但是在跑官网示例的时候,直接报错了,具体信息如下: Caused by: java.lang....
  • 文章目录 前言 一、默认时间语义的变化 二、水印策略和时间戳方法变化 前言 Flink 1.12 版本之后,窗口聚合操作的变化 提示:以下是本篇文章正文内容,下面案例可供参考 一、默认时间语义的变化 Flink 1.12 将默认的...
  • flink1.12版本–滚动窗口水位线 watermark是用于处理乱序事件的,而正确的处理乱序事件,通常用watermark机制结合window来实现。 我们知道,流处理从事件产生,到流经source,再到operator,中间是有一个过程和时间...
  • 1. Flink概述 1.1 Flink官方介绍 flink官网地址 1.2 Flink组件栈 一个计算框架要有长远的发展,必须打造一个完整的 Stack。只有上层有了具体的应用,并能很好的发挥计算框架本身的优势,那么这个计算框架才能吸引...
  • 我想用flink 直接写入HDFS,但是总是报错。代码如下行 //然后建将官网写好的代码复制到这个平台 String path = "hdfs:///hdfs-01:9000/tmp/flink"; //必须要设置,检查点10秒钟 env.enableCheckpointing(10000); //...
  • Flink1.12 流批一体Hello-world

    千次阅读 2021-03-31 22:57:24
    flink: 1.12.2 编译器:IDEA MAVEN项目 要开发flink程序,首先,我们需要引入依赖,必要依赖POM.xml文件如下 (1)核心依赖 <?xml version="1.0" encoding="UTF-8"?> <project xmlns=...
  • Flink1.12 cdh版本编译

    2021-08-31 17:59:19
    注意:从Flink 1.11开始,Flink项目不再正式支持使用Flink -shade -hadoop-2-uber版本。建议用户通过HADOOP_CLASSPATH提供Hadoop依赖项。 Flink 在启动Flink组件(如Client、JobManager或TaskManager)前,将环境变量...
  • flink1.12 单机安装

    千次阅读 2021-03-07 22:44:30
    Flink程序由JobClient进行提交 JobClient将作业提交给JobManager JobManager负责协调资源分配和作业执行。资源分配完成后,任务将提交给相应的TaskManager TaskManager启动一个线程以开始执行。TaskManager会向...
  • 使用Hive构建数据仓库已经成为了比较...值得注意的是,不同版本的Flink对于Hive的集成有所差异,本文将以最新的Flink1.12版本为例,阐述Flink集成Hive的简单步骤,以下是全文,希望对你有所帮助。 公众号『大数据技术
  • import org.apache.flink.api.java.tuple.Tuple2; import org.apache.flink.streaming.api.datastream.DataStream; import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment; import org....
  • 一、代码gitee地址 ... //使用Flink原生的代码创建TableEnvironment //先初始化流计算的上下文 val streamEnv: StreamExecutionEnvironment = StreamExecutionEnvironment.getExecutionEnvironment val setti
  • flink 1.12是不支持jdbc连接clickhouse的,需要修改flink的源码,我把我2天的研究跑通记录下来供大家参考. 查遍了网上所有资料,没有一次能成功的,网上的资料千篇一律.我总结一下 踩的坑有这么几点: flink 源码下载 ...
  • Flink1.12官方文档: https://ci.apache.org/projects/flink/flink-docs-release-1.12/ (1)基于IDEA配置开发环境 基本环境: IntelliJ IDEA 2021 apache-maven-3.5.4 jdk1.8.0_271 scala-2.11.12 配置pom.x

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,482
精华内容 992
关键字:

flink1.12