精华内容
下载资源
问答
  • 文章目录Flink Checkpoint 目录的清除策略应用代码Flink源码需要注意 Flink Checkpoint 目录的清除策略 应用代码 CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig(); checkPointConfig....

    Flink checkpoint使用Stop命令停止任务后会自动删除checkpoint 目录?

    Flink Checkpoint 目录的清除策略

    应用代码

    CheckpointConfig checkPointConfig = streamEnv.getCheckpointConfig();
    checkPointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE);
    checkPointConfig.setCheckpointTimeout(1 * 60 * 1000);
    checkPointConfig.setMinPauseBetweenCheckpoints((1 * 30 * 1000));
    checkPointConfig.setMaxConcurrentCheckpoints(1);
    checkPointConfig.setTolerableCheckpointFailureNumber(3);
    checkPointConfig
           .enableExternalizedCheckpoints(ExternalizedCheckpointCleanup.RETAIN_ON_CANCELLATION);
    

    Flink源码

    org.apache.flink.streaming.api.environment.CheckpointConfig.ExternalizedCheckpointCleanup有两种策略:

    	/**
    		 * Delete externalized checkpoints on job cancellation.
    		 *
    		 * <p>All checkpoint state will be deleted when you cancel the owning
    		 * job, both the meta data and actual program state. Therefore, you
    		 * cannot resume from externalized checkpoints after the job has been
    		 * cancelled.
    		 *
    		 * <p>Note that checkpoint state is always kept if the job terminates
    		 * with state {@link JobStatus#FAILED}.
    		 */
    		DELETE_ON_CANCELLATION(true),
    
    		/**
    		 * Retain externalized checkpoints on job cancellation.
    		 *
    		 * <p>All checkpoint state is kept when you cancel the owning job. You
    		 * have to manually delete both the checkpoint meta data and actual
    		 * program state after cancelling the job.
    		 *
    		 * <p>Note that checkpoint state is always kept if the job terminates
    		 * with state {@link JobStatus#FAILED}.
    		 */
    		RETAIN_ON_CANCELLATION(false);
    
    • DELETE_ON_CANCELLATION:仅当作业失败时,作业的 Checkpoint 才会被保留用于任务恢复。当作业取消时,Checkpoint 状态信息会被删除,因此取消任务后,不能从 Checkpoint 位置进行恢复任务。
    • RETAIN_ON_CANCELLATION:当作业手动取消时,将会保留作业的 Checkpoint 状态信息。注意,这种情况下,需要手动清除该作业保留的 Checkpoint 状态信息,否则这些状态信息将永远保留在外部的持久化存储中。

    需要注意

    即使使用了RETAIN_ON_CANCELLATION命令,当使用flink stop命令来停止任务时也会删除Checkpoint 目录,这是因为这个机制是适用于使用cancel命令取消的任务的。

    以下是网友做的测试:

    启动后等待若干检查点之后做如下操作文件系统上的检查点是否保留说明

    • WEB UI 点击 Cancel 方式取消任务 保留 合理,因为设置了 RETAIN_ON_CANCELLATION。
    • 通过命令生成保存点:flink savepoint ${jobId} ${savepointDir} 保留 OK
    • 通过命令取消任务:flink cancel ${jobId} 保留 OK
    • 通过命令取消任务并生成保存点:flink cancel -s ${savepointDir} ${jobId} 保留 OK
    • 通过命令停止任务(基于默认保存点目录):flink stop ${jobId} 不保留 注意别被特点坑
    • 通过命令停止任务并生成保存点:flink stop -p ${savepointDir} ${jobId} 不保留 注意别被特点坑

    这是因为在Flink-1.7.0 之后,savepoint也被当做是retained checkpoint了 [1],当你stop with savepoint 成功时,新的savepoint创建之后,旧的checkpoint因为默认retain checkpoint的数量为1而被subsume掉了,也就是被删掉了。
    如果你还想保留之前的一个旧的checkpoint,可以将默认retain的checkpoint数目设置为2 [2]。
    另外说一句,即使是已经deprecated的cancel with savepoint的用法,当新的savepoint创建成功后,旧的checkpoint在默认情况下也应该会被删除,除非增大retain的checkpoint数量。
    [1] https://issues.apache.org/jira/browse/FLINK-10354
    [2] https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/state/checkpointing.html#state-checkpoints-num-retained

    官网相关解释:
    https://ci.apache.org/projects/flink/flink-docs-release-1.12/zh/deployment/config.html#execution-checkpointing-externalized-checkpoint-retention

    展开全文
  • 写代码过程,IDE 的代码提示功能是程序员的最爱,但是在用 Scala 写 Flink 代码的过程,经常会有不提示的情况。 蛋疼。。。 这个就是 Scala 的引入包的问题,常用的用下面这几个,mark 下, import org....

    写代码过程中,IDE 的代码提示功能是程序员的最爱,但是在用 Scala 写 Flink 代码的过程中,经常会有不提示的情况。

    蛋疼。。。

    这个就是 Scala 的引入包的问题,常用的用下面这几个,mark 下,

    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.bridge.scala._

    注意后面的下划线,就是整包导入的意思,类似Java里面的星号 * 。

    这样处理后,在我们用点提示的时候,对应的api方法就会出来了,可以选择自己需要的实现方法。

    附加,使用 flink sql 版本的 wordcount(写wordcount还是api好用,用sql麻烦。。)

    import org.apache.flink.streaming.api.TimeCharacteristic
    import org.apache.flink.streaming.api.scala._
    import org.apache.flink.table.api._
    import org.apache.flink.table.api.bridge.scala._
    
    /**
     * @Author: shipfei
     * @Date: 2021/3/4 13:26
     * motto: Saying and doing are two different things.
     */
    object WcBySql {
    
      val filePath = "D:\\dev\\workspace\\eclipse-workspace\\bigdata-realtime\\my-demo\\input\\wc.txt"
    
      def main(args: Array[String]): Unit = {
        // 1. create table env (创建表执行环境)
        val env = StreamExecutionEnvironment.getExecutionEnvironment
        env.setParallelism(1)
        env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime)
        val settings = EnvironmentSettings.newInstance().useOldPlanner().inStreamingMode().build()
        val tableEnv = StreamTableEnvironment.create(env, settings)
    
        // 2. inputStream => dataStream(流转化,输入流 map 转化成需要的实体类)
        val inputStream: DataStream[String] = env.readTextFile(filePath)
        val dataStream: DataStream[(String, Int)] = inputStream.flatMap(_.split(" ")).map((_, 1))
    
        // 3. dataStream => inputTable(数据流转化成inputTable表)
        val inputTable = tableEnv.fromDataStream[(String, Int)](dataStream, $"word", $"count")
        val resultTable = inputTable.groupBy($"word").select($"word", $"count".sum)
    
        // 4. sink print
        resultTable.toRetractStream[(String, Int)].print("result")
    
        // 5. 任务执行
        env.execute("WcBySql Job")
      }
    
    }

     

    展开全文
  • flink sql使用注意事项

    千次阅读 2020-07-07 20:02:00
    可以通过create 语句控制字段个数和顺序 , 决定后面insert into的select语句的字段顺序受影响,所以insert into的select只能跟create定义的顺序一样,不能改别名 例如create table mysink (id varchar ,name ...
    • 可以通过create 语句控制字段个数和顺序 , 决定后面insert into的select语句中的字段顺序受影响,所以insert into的select只能跟create定义的顺序一样,不能改别名
      例如create table mysink (id varchar ,name varchar) with(...)​
    • 不可以通过insert 或select语句控制输出个数
      • insert into mysink (id ,name ) select * from source是错误的
      • insert into mysink select id from source是错误的(select中的字段一定要跟create语句中一致--->如insert into mysink select id,name from source)
    • 可以在create语句中使用udf
      • 比如可以解决,source没有time字段,无法定义watremark,可以通过udf解析得到time字段
    GetHeartTime是udf,返回timestamp​
    ​CREATE TABLE sourceTable (
        request_uri STRING,
        ts as GetHeartTime(request_uri),
        WATERMARK FOR ts AS ts - INTERVAL '1' SECOND
    ) WITH (
        'connector.type' = 'socket',
        'connector.host' = 'localhost',
        'connector.port' = '21'
    );
    • flinksql 1.10.0想要通过sql创建sql函数时 , 如果时tablefunction,需要重写getResultTypepublic
    //flinksql 1.10.0
    public class ParseUriRow extends TableFunction<Row> {
        public void eval(String data) {
            Row row = new Row(fnames.length);
         ....
            collect(row);
        }
        @Override
        public TypeInformation<Row> getResultType() {
            return Types.ROW(
                    Types.STRING
            );
        }
    }
    //或者
    ​​public class Split2Tuple extends TableFunction<Tuple2<String, String>> {
        public void eval(String str) {
            Tuple2 tuple = new Tuple2();
            //根据业务
            String[] split = str.split(",");
            tuple.setField(split[0],0);
            tuple.setField(split[1],1);
            collect(tuple);//TableFunction可以collect多次(一行转多行)
        }
        @Override
        public TypeInformation<Tuple2<String, String>> getResultType() {
            return Types.TUPLE(Types.STRING,Types.STRING);
        }
    }
    //在1.11版本中做修改,使用注解或者重写getTypeInference方法
    public class Split2Row extends TableFunction<Row> {
        /**
         * Row形式返回不限制,但需要重写getResultType
         */
        //@FunctionHint(output = @DataTypeHint("ROW<s STRING, i STRING>"))
        public void eval(String str) {
            //根据业务
            Row row = new Row(2);
            String[] split = str.split(",");
            row.setField(0,split[0]);
            row.setField(1,split[1]);
            collect(row);//TableFunction可以collect多次(一行转多行)
        }
        @Override
        public TypeInference getTypeInference(DataTypeFactory typeFactory) {
            return TypeInference.newBuilder()
                // 指定输入参数的类型,必要时参数会被隐式转换
                .typedArguments(STRING())
                // specify a strategy for the result data type of the function
                .outputTypeStrategy(callContext -> {
                    return Optional.of(DataTypes.ROW(DataTypes.FIELD("a",DataTypes.STRING()),DataTypes.FIELD("b",DataTypes.STRING())));
                })
                .build();
        }
    }​
    
    tableEnvironment.executeSql("CREATE TABLE mysqlsink ( " +
            "     id bigint, " +
            "  game_id varchar, " +
            "  PRIMARY KEY (id) NOT ENFORCED      " +
            " )  " +
            " with ( " +
            "'connector' = 'jdbc',  " +
            " 'url' = 'jdbc:mysql://xxxxx:3306/flinksql?useSSL=false' , " +
            " 'username' = 'root' , " +
            " 'password' = 'root', " +
            " 'table-name' = 'mysqlsink' , " +
            " 'driver' = 'com.mysql.cj.jdbc.Driver' , " +
            " 'sink.buffer-flush.interval' = '2s', " +
            " 'sink.buffer-flush.max-rows' = '300' " +
            " )");
    tableEnvironment.executeSql("insert overwrite mysqlsink select a,cast(b as varchar) b from mySource");

     

    展开全文
  • flink sql通过JdbcCatalog连接postgresql踩坑记录 flink sql可以通过JdbcCatalog连接postgresql数据库,从而实现直接对postgresql表的相关操作,相关代码如下: StreamExecutionEnvironment bsEnv = ...

    flink sql通过JdbcCatalog连接postgresql踩坑记录

    flink sql可以通过JdbcCatalog连接postgresql数据库,从而实现直接对postgresql表的相关操作,相关代码如下:

            StreamExecutionEnvironment bsEnv = StreamExecutionEnvironment.getExecutionEnvironment();
            EnvironmentSettings bsSettings = EnvironmentSettings.newInstance().useBlinkPlanner().inStreamingMode().build();
            StreamTableEnvironment bsTableEnv = StreamTableEnvironment.create(bsEnv, bsSettings);
    		String catalogName = "mycatelog";
            String defaultDatabase = "postgres";
            String username = "postgres";
            String pwd = "123456";
            String baseUrl = "jdbc:postgresql://**.**.**.**:5432/";
    
            JdbcCatalog jdbcCatalog = new JdbcCatalog(catalogName, defaultDatabase, username, pwd, baseUrl);
            bsTableEnv.registerCatalog("mycatelog", jdbcCatalog);
    
            bsTableEnv.useCatalog("mycatelog");
    
            Table table = bsTableEnv.sqlQuery("SELECT * FROM my_schema.table-name" );
    		 DataStream<Row> rowDataStream = bsTableEnv.toAppendStream(table, Row.class);
            rowDataStream.print();
    
            bsEnv.execute("test");
    

    以上代码,执行后将报如下错误:

    Exception in thread "main" org.apache.flink.table.api.ValidationException: SQL validation failed. From line 1, column 15 to line 1, column 39: Object 'my_schema' not found
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:146)
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.validate(FlinkPlannerImpl.scala:108)
    	at org.apache.flink.table.planner.operations.SqlToOperationConverter.convert(SqlToOperationConverter.java:187)
    	at org.apache.flink.table.planner.delegation.ParserImpl.parse(ParserImpl.java:78)
    	at org.apache.flink.table.api.internal.TableEnvironmentImpl.sqlQuery(TableEnvironmentImpl.java:658)
    	at SqlTest.main(SqlTest.java:55)
    Caused by: org.apache.calcite.runtime.CalciteContextException: From line 1, column 15 to line 1, column 39: Object 'phm_realtime' not found
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    	at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
    	at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:839)
    	at org.apache.calcite.sql.SqlUtil.newContextException(SqlUtil.java:824)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.newValidationError(SqlValidatorImpl.java:5089)
    	at org.apache.calcite.sql.validate.IdentifierNamespace.resolveImpl(IdentifierNamespace.java:179)
    	at org.apache.calcite.sql.validate.IdentifierNamespace.validateImpl(IdentifierNamespace.java:184)
    	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3256)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateFrom(SqlValidatorImpl.java:3238)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateSelect(SqlValidatorImpl.java:3510)
    	at org.apache.calcite.sql.validate.SelectNamespace.validateImpl(SelectNamespace.java:60)
    	at org.apache.calcite.sql.validate.AbstractNamespace.validate(AbstractNamespace.java:84)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateNamespace(SqlValidatorImpl.java:1110)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateQuery(SqlValidatorImpl.java:1084)
    	at org.apache.calcite.sql.SqlSelect.validate(SqlSelect.java:232)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validateScopedExpression(SqlValidatorImpl.java:1059)
    	at org.apache.calcite.sql.validate.SqlValidatorImpl.validate(SqlValidatorImpl.java:766)
    	at org.apache.flink.table.planner.calcite.FlinkPlannerImpl.org$apache$flink$table$planner$calcite$FlinkPlannerImpl$$validate(FlinkPlannerImpl.scala:141)
    	... 5 more
    Caused by: org.apache.calcite.sql.validate.SqlValidatorException: Object 'my_schema' not found
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance0(Native Method)
    	at sun.reflect.NativeConstructorAccessorImpl.newInstance(NativeConstructorAccessorImpl.java:62)
    	at sun.reflect.DelegatingConstructorAccessorImpl.newInstance(DelegatingConstructorAccessorImpl.java:45)
    	at java.lang.reflect.Constructor.newInstance(Constructor.java:423)
    	at org.apache.calcite.runtime.Resources$ExInstWithCause.ex(Resources.java:457)
    	at org.apache.calcite.runtime.Resources$ExInst.ex(Resources.java:550)
    	... 24 more
    
    

    提示找不到相应的schema。开始很费解,感觉flink sql很不讲武德,不按套路出牌,怎么一个简单的sql查询会找不到对应的表呢。通过jdbcCatalog.listTables()也查到了表记录。各种办法试一遍,没用,没办法,还是去官方文档去取经。

    • flink文档jdbcCatalog章节有以下内容:
      在这里插入图片描述
      文档主要意思就是:通过JdbcCatalog查询PostgreSQL数据库表时,你得用(catalog 名.数据库名.表名)来指定一张表,如果schema为public,那么表名前可以不加public。如果不为public,则必须加上schema名。且schema.table一定要通过反引号(`)来转义。(文档中看着反引号有点像单引号,一开始大意了。😔)。

    那么因此查询表的代码就得这样写:

    Table table = bsTableEnv.sqlQuery("SELECT * FROM `my_schema.table-name`" );
    

    至此,代码成功运行,问题解决。

    • ps:
      文档,永远滴神!!!
    展开全文
  • Flink从topic最新的数据开始消费,不管哪个消费者是否同个消费组 如果设置了保存点,则会记录自己的offect,不管其他的消费者是否消费了 setStartFromGroupOffsets 必须配置group.id参数 启动会获取最新的...
  • 我们在使用checkpoint时,会遇到这样一个参数设置: env.getCheckpointConfig().setTolerableCheckpointFailureNumber(1); tolerableCheckpointFailureNumber 限制的是最大可容忍的连续失败checkpoint计数 ...
  • Flink使用随笔

    2020-09-10 14:38:59
    2.发布任务到flink集群的时候,一定要注意使用的docker版本,必须和代码的版本对应上,flink-scala都要对应上 3.flink 集群 部署 4个文件 flink-configuration-configmap.yaml jobmanager-service.yaml jobmanager-...
  • 如何在 Flink 1.9 中使用 Hive?

    千次阅读 2019-09-06 15:29:49
    Apache Flink 从 1.9.0 版本开始增加了与 Hive 集成的功能,用户可以通过 Flink 来访问 Hive 的元数据,以及读写 Hive 的表。本文将主要从项目的设计架构、最新进展、使用说明等方面来介绍...
  • Flink 中使用 iceberg

    千次阅读 2020-11-18 09:10:05
    Apache Iceberg 支持Apache Flink的DataStream API 和 Table API 将记录写入 iceberg 的表,当前,我们只提供 iceberg 与 apache flink 1.11.x  的集成支持。 Feature support Flink 1.11.0 Notes SQL ...
  • 系列文章目录 Flink使用指南:Checkpoint机制,完全搞懂了,你...Flink允许将自定义的配置传递给ExecutionConfig的环境接口,由于执行配置科在所有的用户功能访问,因此自定义配置将在所有功能全局可用。 Conf
  • flink 读写hive注意细节

    2020-11-13 09:30:46
    Linux环境下安装FLink1.11.1并启动SQL-client读取Hive数据 首先去官网下载Flink1.11.1的tgz的包,教程如上篇文章上半部分流程一样,然后配置一下FLINK_HOME/conf/sql-client-defaults.yaml: catalogs: - name: ...
  • Flink使用指南: Flink SQL自定义函数 目录 系列文章目录 前言 一、新版本API区别 二、WaterMark 1.watermark简介 2.watermark使用 3.内置watermark生成器 3.1.单调递增时间戳分配器 3.2.固定延迟时间戳...
  • 本文根据 Apache Flink 系列直播课程整理而成,由 Apache Flink PMC,阿里巴巴高级技术专家 孙金城 分享。重点为大家介绍 Flink Python API 的现状及未来规划,主要内容包括:Apache Flink Python API 的前世今生和...
  • flink打包注意事项

    2020-06-05 13:30:46
    1.flink datastream api join hbase维度表报 在打包的时候没有排除掉hbase依赖的hadoop-common 2020-06-04 11:53:26,056 ERROR org.apache.flink.runtime.entrypoint.ClusterEntrypoint - Fatal error occurred...
  • Flink使用指南:Flink设置全局变量,并在函数获取,让你的代码更加优雅! Flink使用指南:Checkpoint机制,完全搞懂了,你就是大佬! Flink使用指南: 面试必问内存管理模型,进大厂一定要知道! Flink使用指南:...
  • Flink

    2020-07-16 16:31:34
    Flink
  • 见官网,需要写入元组格式 https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/datastream_api.html#data-sinks A3 解决 csv需要以逗号作为分隔符,而tuple(元组)的toString方法正好是以逗号...
  • 在 Apache Flink 1.9 版,我们引入了 PyFlink 模块,支持了 Python Table API。Python 用户可以完成数据转换和数据分析的作业。但是,您可能会...
  • Keyed Windows使用window算子进行窗口定义 No-Keyed Windows使用windowAll算子进行窗口定义 数量、时间的窗口 CountWindow 按照知道的数据条数生成一个 Window,与时间无关 滚动窗口(Tumbling Window)、滑动...
  • Flink中使用嵌入式ElasticSearch进行单元测试 Flink版本 1.8.0 ElasticSearch版本 5.1.2 Scala版本 2.11.12 Java版本 1.8 Github地址:https://github.com/shirukai/flink-examples-embedded-elasticsearch.git 1 ...
  • Tuple元组类型使用以及Lambda编写flink程序正确姿势(方法与避坑)
  • Flink SQL时态表

    2021-03-23 09:44:53
    本文主要将在Flink1.12中新的时态表的一些新的概念和注意事项,如何在Join中使用会在之后另一个篇文章中具体讨论。 Flink中的时态表的设计初衷 首先,大家需要明确一个概念,就是传统SQL中表一般表示的都是有界的...
  • 何时以及如何在 Apache Flink 中使用 RocksDB 状态后端Flink中的状态什么是RocksDB?Flink中的RocksDB什么时候使用RocksDBStateBackend如何使用RocksDBStateBackend集群级别作业级别最佳实践和高级配置状态在RocksDB...
  • Flink 使用大状态时的一点优化

    千次阅读 2020-03-02 09:02:00
    通过本文你能 get 到以下点:Flink使用大状态时,该如何配置。常见的负载均衡策略Flink 源码在选择 RocksDB 状态磁盘时,存在的问题。给出一些解决方案,并分析了每种...
  • Flink 使用之 MySQL CDC

    2021-08-21 16:33:03
    一、CDC 简介 CDC 即 Change Data Capture 变更数据捕获,为Flink 1.11中一个新增功能。我们可以通过 CDC 得知数据源表的更新内容(包含Insert Update 和 Delete),并将这些...在我的文章 Flink CDC 可以更详细的

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 15,367
精华内容 6,146
关键字:

flink使用中注意