精华内容
下载资源
问答
  • flink实时流处理

    2020-04-23 09:48:00
    但随着数据的不断增长,新技术的不断发展,人们逐渐意识到对实时数据处理的重要性,企业需要能够同时支持高吞吐、低延迟、高性能的流处理技术来处理日益增长的数据。 相对于传统的数据处理模式,流式数据处理则有着...

    背景:

    数据量激增传统的时代,不同的业务场景都有大量的业务数据产生,对于这些不断产生的数据应该如何进行有效地处理,成为当下大多数公司所面临的问题。 

    但随着数据的不断增长,新技术的不断发展,人们逐渐意识到对实时数据处理的重要性,企业需要能够同时支持高吞吐、低延迟、高性能的流处理技术来处理日益增长的数据。 

    相对于传统的数据处理模式,流式数据处理则有着更高的处理效率和成本控制。Apache Flink就是近年来在开源社区发展不断发展的能够支持同时支持高吞吐、低延迟、高性能分布式处理框架。 

    flink: 

    Flink是一个分布式大数据处理引擎,可对有限数据流和无限数据流进行有状态计算。可以部署在各种集群环境,对各种大小规模的数据进行快速计算。

     

    flink发展史

    Flink 诞生于2009年,起初诞生于德国柏林大学,最开始设计是为了解决批处理问题的。于2014年孵化并捐给Apache,2016年开始在阿里大规模应用,从而驶入快车道。 

    这一切都源于 MillWheel: Fault-Tolerant Stream Processing at Internet Scale,Google发表的一篇论文,讲述了如何在大规模系统中实现高吞吐/低延迟/有状态实时流处理。而Flink也是受到该论文的影响,采用了 Dataflow/Beam 编程模型 。

     

    Flink VS Spark VS Storm

    • 流计算方面 

    当前的 Flink 就是为了支持流计算而设计的。而 Spark 的流计算是特殊的批,是由连续的微批组成的流式计算。

     

    • 批计算方面 

    Flink 的批处理是特殊的流,而 Spark 本身的计算模型就是批处理模型。Storm 则不支持批处理。

     

    • 批流融合方面 

    Flink 的批处理API和流处理API是两套API,导致一个方法为了同时支持批处理和流处理需要实现两次,不过 Flink 的后续版本一直在促进批流融合。 

     

    Spark 批流结合则支持的很好,通过 DataFrame 实现的方法既可以支持批处理又可以支持流处理。

     

    • 机器学习方面 

    Spark 和 Flink 都提供了丰富的机器学习相关方法库。Spark 最近几年的重心都放在了支持机器学习上,并且 Spark 对于 Python以及R都支持得很好,让一些非Java体系的数据开发人员也能很快入手Spark。

     

    Flink 对 Python 的支持、优化也在进行中,并且有阿里主导的 Alink - 全球首个批流一体机器学习平台,也在国内外十分受欢迎。

     

    • 计算延迟方面 

    因为 Spark 是微批处理,因此处理延迟较 Flink 和 Storm 比较较大。 

     

    • 一致性语义方面 

    Flink 和 Storm 都实现了 Exactly Once 语义,而 Storm 如果不借助外部数据库仅支持 At Least Once 语义。

     

    • 状态管理方面 

    Spark 在有状态实时流处理方面支持地不够好,仅支持有限的方法,例如updateStateByKey 以及mapWithState。而 Flink 支持非常丰富的状态管理方法,这个我们后面也会说到。

    Flink适应应用场景

    我们会在什么时候使用 Flink 呢?或者说哪些场景使用 Flink 能更好地完成呢。 

    如果按照传统的方案会是什么架构呢? 

    首先业务已经由相关人员实时推送至Kafka中,我们会将数据通过ETL工具实时写入数据仓库中,通过数据仓库完成即席查询实时聚合。

    这种方案可能因大量数据入库导致一部分数据延迟,那么我们还可以通过 Spark 进行数据的预聚合后写入数仓中,这样能减少写入以及查询的成本。

    但是这个方案还存在延迟的问题: 

    1. 整个方案即使优化过还存在较大的延迟,数据写入/数据查询 ;

    2. 整个方案有太多DB交互过程 ;

     那么如果我们用 Flink 会怎么实现呢? 

    我们会将成交额当作“状态”维护在Flink中,定期更新Redis或者MC,这样我们的图表可以直接从Reids或者MC中直接读取成交额信息,这样可以减少很多DB交互过程,并且Redis或者MC的一次Set或者Get操作也是毫秒级别的。 

    Flink有哪些特有的功能呢

    流计算系统中经常需要与外部系统进行交互,我们通常的做法如向数据库发送用户a的查询请求,然后等待结果返回,在结果返回之前,我们的程序无法继续发送用户b的查询请求。这是一种同步访问方式,如下图所示。

    图中棕色的长条表示等待时间,可以发现网络交互等待时间极大地阻碍了吞吐和延迟。为了解决同步访问的问题,异步模式可以并发地处理多个请求和回复。

    也就是说,你可以连续地向数据库发送用户a、b、c等的请求,与此同时,哪个请求的回复先返回了就处理哪个回复,从而连续的请求之间不需要阻塞等待,如上图右边所示,这样能节省很多网络交互等待时间。

    CEP处理

     

    CEP全称为(Complex Event Processing),Flink CEP库允许你在流上定义一系列的模式(pattern),最终使得你可以方便的抽取自己需要的重要的事件出来。

    CEP常用于异常检测/行为分析以及实时风控,以下是一个应用简单的模式匹配实现的用户异常登陆检测的例子。

     

    匹配10秒内连续两次登陆失败的行为 。

    多样的窗口

    Flink支持多种多样的窗口来实现各种复杂的统计和计算。常用有以下几种: 

     

    1、Tumbling Windows(滚动窗口) 

    各窗口间不重合,常用于实时定时统计,例如服务每秒访问量 。

     

    2、Sliding Windows(滑动窗口) 

    各窗口间可以重合,常用于判断事件是否连续,例如每1分钟统计最近5分钟服务访问波动。

    状态管理

    状态(state)是 Flink 程序某个时刻某个 task/operator 的状态,state数据是程序运行中某一时刻数据结果,这个state数据会保存在taskmanager的内存中,Flink 提供了丰富的状态访问和容错机制支持 Flink 完成有状态的实时流处理。 

    1、多种数据类型 

    Value,List,Map,Reducing... 

    2、多种划分方式 

    Keyed State,Operator State 

    3、多种存储方式 

    MemoryStateBackend,FsStateBackend,RocksDBStateBackend

    4、高效备份和恢复 

    提供Exactly Once保证 

    为什么Flink收到青睐

     

    了解了 Fink 以及 Flink 相关特性,总结一下为什么 Flink 为什么收到大众的亲睐,被誉为 Spark 的替代品。 

     

    同时支持批处理和实时流程序处理 

    支持Java和Scala API 

    支持高吞吐/低延迟实时处理数据 

    支持在不同的时间下支持灵活的窗口 

    自动反压机制 

    Exactly Once语义保证 

    丰富的状态管理 

    图计算/机器学习/复杂时间处理 

    当今大数据引擎该有的样子

     

    1、强大的处理性能 

    性能为王,如果数据处理性能上不去,一切都是白搭。 

     

    2、批流一体化 

    即支持批处理又支持流处理,并且批处理和流处理的API能够很好的兼容。 

     

    3、强大的状态管理 

    提供丰富的状态管理方法,支持实现有状态的实时流处理,保证数据处理Exactly Once语义。 

     

    4、丰富的功能 

    提供丰富的接口API,能满足日常工作中各种各样的需求,包括但不限于机器学习、CEP、图计算等。 

    综上,对于强大的处理性能、流批一体,状态管理,丰富的API这些特性,Flink都是具备的。之所以说Flink与Spark是当今最流行的大数据计算引擎,是因为阿里巴巴、百度、腾讯、字节跳动、滴滴、美团等,几乎所有的互联网一二线大厂都在用Flink,Spark。

     内容取之于AI商学院技术分享。

    展开全文
  • 本文为Flink开发教程系列首篇文章,首发于微信公众号:码上观世界。文章目录Flink 的批和流批处理示例流处理示例开发环境准备PyFlink 安装Flink 服务部署再论流处...

         本文为Flink开发教程系列首篇文章,首发于微信公众号:码上观世界。

    文章目录

      • Flink 的批和流

        • 批处理示例

        • 流处理示例

      • 开发环境准备

        • PyFlink 安装

        • Flink 服务部署

      • 再论流处理与批处理

        • 物化视图(Materialized Views)

        • 动态表(Dynamic Tables)

        • 动态表定义

        • 连续查询(Continuous Queries)

      • 表转换为流

        • Append-only 流

        • Retract 流

        • upsert 流

      • 结果持久化

      • 常见问题

        • Queston0

        • Queston1

        • Queston2

        • Queston3

        • Queston4

        • Queston5

        • Queston6

    Flink 的批和流

    首先通过 Python 快速演示 Flink 中批处理和流处理的异同,以建立对 Flink 的初步认识。示例程序功能是从文件中读出 CSV 格式的单词数据(每行一个单词),然后做简单的分组计算,然后将结果保存到输出文件。

    批处理示例

    exec_env = ExecutionEnvironment.get_execution_environment()
    exec_env.set_parallelism(1)
    t_config = TableConfig()
    t_env = BatchTableEnvironment.create(exec_env, t_config)
    
    t_env.connect(FileSystem().path('/tmp/input.csv')) \
        .with_format(OldCsv()
                     .field('word', DataTypes.STRING())) \
        .with_schema(Schema()
                     .field('word', DataTypes.STRING())) \
        .create_temporary_table('mySource')
    
    t_env.connect(FileSystem().path('/tmp/output/file_output')) \
        .with_format(OldCsv()
                     .field_delimiter('\t')
                     .field('word', DataTypes.STRING())
                     .field('count', DataTypes.BIGINT())) \
        .with_schema(Schema()
                     .field('word', DataTypes.STRING())
                     .field('count', DataTypes.BIGINT())) \
        .create_temporary_table('mySink')
    
    tab = t_env.from_path('mySource')
    tab.group_by(tab.word) \
       .select(tab.word, lit(1).count) \
       .execute_insert('mySink').wait()
    

    示例中通过 create_temporary_table 将文件输入数据命名为临时表 mySource,将文件输出数据命名为临时表 mySink,然后通过形如:

    insert into mySink  select word,count(1) from mySource group by mySink
    

    的 SQL 形式计算输出,最后通过 execute_insert 触发任务的提交,wait 任务执行完成。

    流处理示例

    exec_env = StreamExecutionEnvironment.get_execution_environment()
    exec_env.set_parallelism(1)
    t_config = TableConfig()
    t_env = StreamTableEnvironment.create(exec_env, t_config)
    
    my_source_ddl = """
        create table mySource (
            word VARCHAR
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = 'tmp/input.csv'
        )
    """
    
    my_sink_ddl = """
        create table mySink (
            word VARCHAR,
            `count` BIGINT
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = 'tmp/output/sql_output'
        )
    """
    
    t_env.execute_sql(my_source_ddl)
    t_env.execute_sql(my_sink_ddl)
    t_env.from_path('mySource').select('word, 1').insert_into('mySink')
    
    t_env.execute("tutorial_job")
    

    示例中通过 SQL 语法重写了批处理中的 API 语法调用,通过 execute_sql 来执行,然后通过形如 SQL 语法:

    insert into mySink select word,1 from mySource
    

    最后通过 insert_into 触发任务执行。

    从上面两个示例中,可以看到两者的相同处理逻辑:从源端加载数据,然后经过变换处理,输出到目标端。用图示可以描述如下:

    实际上,源端可以是内存数据集合(Collection)、数据库系统(包括 NOSQL)、文件系统(如 HDFS)、网络 Socket、消息队列(如 Kafka、RabbitMQ)等,目标端也可以是数据库系统、文件系统、或者消息队列等,变换除了示例中的分组统计,还可以是诸如过滤(filter)、映射(map)、聚合(如 keyBy)以及其他复杂的流的合并与分拆等。

    两个示例中 API 虽有不同,但是仅仅是形式不同,本质上底层调用是一致的,批处理也可以使用这种 SQL 语法的 API 调用,我们需要关注的是两者的主要区别。

    1. 执行环境

    批处理使用:

    BatchTableEnvironment.create(ExecutionEnvironment.get_execution_environment())
    

    流处理使用:

    StreamTableEnvironmentt.create(StreamExecutionEnvironment.get_execution_environment())
    

    两种方式都是用方法 get_execution_environmen 获取执行环境,该方法根据程序启动方式的不同,创建不同的执行环境,比如通过 IDE 启动程序,会通过 createLocalEnvironment 创建本地执行环境。示例程序通过 python flink_demo_files.py 执行任务也会创建本地环境,也就是说对于快速演示学习,可以不用部署 Flink 服务。

    2. 数据边界

    虽然两种方式都是从文件中读取数据,对于确定的时刻,比如都一次性读取一批数据,两者的数据是相同的,而且是数量有限的,但是当文件本身的数据是不断地写入的话,该文件数据就是无界的,因此从数据的处理方式来看,流处理包含了批处理,批处理是流处理的一个特例。对于无限的数据,当确定时间段或者数据数量(即 Window)之后,对于固定时间段或者数据量的数据,就转化为常规的批处理。下图展示了无界数据流和有界数据流之间的关系。

    3. 时间相关

    对于无界数据流,数据是跟时间是强相关的,比如来源于 Kafka 消息队列的数据的产生时间(Event Time)、到达 Flink 系统的时间(Injestion Time)、以及流处理的时间(Process Time)。只有知道数据的时间,才能区分该数据是属于历史数据还是即时数据,特别是对于顺序敏感的应用场景,时间就显得更为重要。

    4. 状态与容错

    由于流处理一般用于无限数据流的场景,所以一旦任务提交,程序将永远执行,如果中途因为程序异常崩溃,之前计算的结果可能就无法恢复,对于易逝性数据或者需要较大开销的场景,重新计算是无法接受的。因此流处理提供了大量状态管理的方法,比如支持多种外部系统持久化状态、定时 CheckPoint 等。而批处理是一次性提交,数据处理完,程序也就结束了。

    通过初步了解 Flink,对于初学者来说,如果觉得不过如此,那就显得过于 Too Young Too Simple 了。殊不知,对于一门新技术框架,没有大量的踩坑和填坑经历是无法掌握其精髓的。比如在流处理中,本可以像批处理那样计算(count group by),但是因为文件系统不支持更新写入,因此如果没有实践,可能是不会有深刻理解的。为了能够跟上本文节奏,建议读者安装相关的开发环境。

    开发环境准备

    软硬件要求如下。

    以下版本要求必选:

    • Java:OpenJDK 1.8

    • Python:3.8

    • apache-flink:release 1.12.0

    • PyFlink:1.12.0

    • cygwin 64 位

    以下版本要求可选:

    • Linux CentOS:7.8(16 cores 64GB)

    • Windows 10(2.30GHz 16GB)

    PyFlink 安装

    python -m pip install apache-flink --timeout 3600
    

    为保证快速下载,需要设置以下下载源。

    更改 Python 包的下载源:

    [global]
    index-url = https://pypi.tuna.tsinghua.edu.cn/simple
    [install]
    trusted-host = https://pypi.tuna.tsinghua.edu.cn
    

    在 Linux 系统和 Windows 系统,相应的文件路径分别为:

    #Linux
    ~/pip/pip.conf
    #Windows
    %APPDATA%/pip/pip.ini
    

    Flink 服务部署

    下载最新 release 版本,如 1.12.0,如果是 Windows 系统,需要通过 cygwin,将 Flink 包解压到 cygwin 相应路径下,然后执行如下命令:

    tar -xzf flink-1.12.0-bin-scala_2.11.tgz
    cd flink-1.12.0-bin-scala_2.11
    ./bin/start-cluster.sh
    

    如果部署过程一切正常,可以正常显示如下链接:

    http://localhost:8081/
    

    再论流处理与批处理

    物化视图(Materialized Views)

    通过文章第一部分对批处理和流处理的演示和比较,我们再回顾两者的几点区别,比如:

    我们主要关注第二点,批处理查询实际上是关系数据查询,针对的是全部数据,而流处理查询永远无法访问所有数据,只能访问当前的快照数据(SnapShot),这类似于高级关系数据库系统中提供的物化视图特性——缓存当前查询结果,只有当基础数据发生变化,才需要更新视图,这带来的好处是更高效快捷的查询。

    物化视图常用于结果汇总表,比如报表展示的数据直接来自物化视图,而不需要每次查询基础表重新计算。如果把这个概念推广到数据仓库的分层建模领域,数据汇总层(DWS)可以看作是数据明细层(DWD)的物化视图;同理 DWD 又可以看做 ODS 的物化视图,每层通过定期更新保证视图最新。

    应用到流处理领域,基于流数据创建视图表,可以当有新的数据到来,就更新视图。

    动态表(Dynamic Tables)

    动态表是 Flink 的核心概念,且有相关 API 和 SQL 支持,比如上面示例中 .create_temporary_table('mySource') 创建的表,该表会随着数据的输入而变化,你可以简单把这个表理解为基于流上的物化视图,只不过在 Flink 内部,没有真正的物化,它是一个逻辑概念。只要流应用不终止,基于动态表就可以产生连续的查询结果,查询结果表又形成了新的动态表。当输入的动态表发生变化,连续的查询就可以在结果动态表中反应这种改变,下图描述了流、动态表和连续查询的关系:

    从图中可以总结如下几点结论:

    • 流可以转换为动态表

    • 基于动态表的连续查询生成新的动态表

    • 结果动态表可以转回流

    下面通过服务器点击流日志数据进一步描述上述概念。

    动态表定义

    假设有这样的用户点击事件 schema(user,time,url),可以使用以下语句定义动态表:

    CREATE TABLE clicks (
      user VARCHAR,   // the name of the user
      url   VARCHAR,    // the URL that was accessed by the user
      cTime TIMESTAMP(3) // the time when the URL was accessed
    ) WITH (...);
    

    随着点击事件源源不断地被插入到源端系统(左图),动态表(右图)的数据也持续增加。

    连续查询(Continuous Queries)

    这里通过两个示例来演示基于动态表的查询,第一个是简单的分组统计,下图展示了随着 clicks 表插入新的记录,查询是如何进行的:

    如上图所示:当第一条记录(Mary,./home)进入系统,查询结果只有一条[(Mary,1)];当第二条记录(Bob,./cart)进入系统,此时查询结果变成 2 条 [(Mary,1),(bob,1)];当第三条记录(Mary,./prod?id=1)进入系统,此时更新查询结果中的 (Mary,1) 为 (Mary,2),但记录总数还是 2 条:[(Mary,2),(bob,1)];依次类推,从整体来看,结果表随着记录的进入连续实时更新。

    通过 Java 程序演示上述过程如下:

    public static void main(String[] args) throws Exception {
            final StreamExecutionEnvironment env =
                    StreamExecutionEnvironment.getExecutionEnvironment();
            StreamTableEnvironment tEnv = StreamTableEnvironment.create(env);
            DataStream<ClickEvent> clickEventDataStream = env.fromElements(
                    new ClickEvent("marry", "12:30:30", "/home.html"),
                    new ClickEvent("Bob", "12:45:30", "/cart.html"),
                    new ClickEvent("Liz", "13:15:30", "/prod.html"),
                    new ClickEvent("marry", "12:30:30", "/home.html"),
                    new ClickEvent("Bob", "12:45:30", "/cart.html"),
                    new ClickEvent("Liz", "13:15:30", "/prod.html")
            );
            DataStream<ClickEvent> mapClickEventDataStream = clickEventDataStream.map(new MapFunction<ClickEvent, ClickEvent>() {
                @Override
                public ClickEvent map(ClickEvent clickEvent) {
                    clickEvent.setTimestamp(Timestamp.valueOf("2020-12-16 " + clickEvent.getTime()));
                    return clickEvent;
                }
            });
            tEnv.createTemporaryView("clickEvents", mapClickEventDataStream, $("user"), $("url"));
            Table groupTable = tEnv.sqlQuery("select user,count(url) as cnt from clickEvents group by user");
            env.execute("stream job demo");
    }
    

    第二个是带有时间窗口的分组聚合查询,下图展示了每小时查询处理的结果(右图):

    该查询定义了 1 个小时的时间窗口,并且按照 user 和窗口分组:当第一个小时结束(12:00:00-12:59:59),查询计算出 2 个结果记录然后追加到结果表;同理对第二个小时(13:00:00-13:59:59),第三个小时(14:00:00-14:59:59)的计算结果追加到结果表。

    通过 Python 程序演示上述过程如下:

    exec_env = StreamExecutionEnvironment.get_execution_environment()
    exec_env.set_parallelism(1)
    t_config = TableConfig()
    t_env = StreamTableEnvironment.create(exec_env, t_config)
    
    my_source_ddl = """
        create table clicks (
             `user` VARCHAR,
            `ctime` TIMESTAMP,
            `url` VARCHAR
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = 'tmp/click_input'
        )
    """
    
    my_sink_ddl = """
        create table sink (
             `user` VARCHAR,
            `ctime` TIMESTAMP,
            `cnt` bigint
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = 'tmp/output/click_output'
        )
    """
    t_env.execute_sql(my_source_ddl)
    t_env.execute_sql(my_sink_ddl)
    
    t_env.execute_sql("""
    insert into sink select user,tumble_end(ctime,interval '1' hours) as end_time,count(url) as cnt
    from clicks
    group by user,tumble_end(ctime,interval '1' hours)
    """)
    t_env.execute("tutorial_job")
    

    上述代码仅作为阅览,但无法执行,因为当前版本还不支持 TUMBLE_END 这样的窗口函数,会抛出这样的异常:

    org.apache.flink.table.planner.codegen.CodeGenException: Unsupported call: TUMBLE_END(TIMESTAMP(6), INTERVAL SECOND(3) NOT NULL)
    

    除非使用 Java(Scala) API 才可以解决这个问题,但是因为涉及到窗口(Window)、水位线(WaterMark)以及自定义窗户处理函数(ProcessFunction)相关的内容,为了避免问题复杂化,本文暂不做深入讲解。有兴趣的读者请关注本文后续相关文章。

    通过上面两个查询示例的演示,我们再总结下两者的异同:

    • 虽然两者都是分组查询,但是后者是带有时间窗口的查询,其本身需要维护当前窗口内的数据记录(状态)直到触发窗口操作;

    • 简单分组查询每当有新记录进入便触发查询,而且查询结果会就地更新,会产生插入(Insert)和更新(Update)事件;

    • 窗口分组查询当达到窗口操作条件才会触发窗口查询,查询结果 Append 到结果表,会产生插入(Insert)事件。

    表转换为流

    上述示例中基于动态表上的连续查询,结果是新的结果动态表,实际上动态表还可以转为流,用于进一步操作,比如持久化等,那如何将动态表转为流呢?

    我们知道数据库中有三种数据更新方式,插入(Insert)、更新(Update)和删除(Delete),因为数据库存在主键的缘故,更新和删除可以在原有记录上进行。但是对于分布式系统,原地更新涉及到较大的系统开销,通常是采用追加方式,比如更新操作会转化为删除和插入两条记录,在必要的时候将原有记录合并丢弃。同样在 flink 中也提供了三种更新方式,连续的事件形成 3 种不同的流:

    Append-only 流

    当动态表种只有插入事件的数据,形成追加流,

    Flink 提供了表到撤销流转化的 toAppendStream,代码示例为:

           Table groupTable = tEnv.sqlQuery("select * from clickEvents");
            DataStream<Row>> appendStreamTableResult= tEnv.toAppendStream(groupTable, Row.class);
            appendStreamTableResult.print();
    

    Retract 流

    当动态表种种不仅存在插入还存在更新或者删除事件的数据时,删除事件会被编码为撤回事件,更新事件被编码为撤回(旧)和插入(新)事件,比如下图可视化事件转化效果:

    比如第一条记录(Mary,1)进入后,产生一条 Insert 事件,当第二条进入时,产生原有记录的撤回事件,和新的插入事件(Mary,2)。

    Flink 提供了表到撤销流转化的 API——toRetractStream,代码示例为:

           Table groupTable = tEnv.sqlQuery("select user,count(url) as cnt from clickEvents group by user");
            DataStream<Tuple2<Boolean, Row>> retractStreamTableResult = tEnv.toRetractStream(groupTable, Row.class);
            retractStreamTableResult.print();
    

    upsert 流

    Retract stream 流存在的问题是频繁的更新会导致过多的事件数据,而 upsert 流将其更新事件产生的 2 条记录合并为 1 条 upsert 记录,减少了数据量,提高了处理效率,但同时要求存储系统具有唯一键(Unique Key),上例中只有 Upsert 消息记录,示意图如下:

    Flink 目前只提供了表到 append 流和 retract 流的转化,暂不支持对 upsert 流的转化,但是 Flink 有 DynamicTableSink 接口,开发者可以自行开发实现。

    这里做个小结:

    • Append-only 流只适合于动态表只有插入数据的场景,而 Retract stream 流不仅适合于插入数据的场景,还适合于有更新和删除数据的场景。

    • 实际应用中,不带窗口的分组查询是常见的会更新动态结果表,因此只能转化为 Retract 流,而非分组查询以及带窗口查询的结果动态表只有插入事件,所有可以转化为 Append 流。

    • 带有更新事件的流不可用不支持更新操作的系统来存储,比如常规的操作系统的文件系统。动态表转为 upsert 流目前暂不支持,可以通过 DynamicTableSink 接口自行实现.

    结果持久化

    回到文章开头部分,明白了我们在演示流处理时候为什么不像批处理那样分组查询的原因:文件系统不支持更新操作,会报出异常:

    pyflink.util.exceptions.TableException: "AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[user, end_time],select=[user, end_time, COUNT(url) AS cnt])"
    

    如果想持久化动态结果数据,需要更换支持更新操作的存储系统,比如关系数据库系统,下面通过 MySQL 来演示。

    首先创建表:

    CREATE TABLE `mysqlSink`  (
      `word` varchar(255) CHARACTER SET utf8 COLLATE utf8_general_ci NOT NULL,
      `count` bigint(20) NULL DEFAULT NULL,
      PRIMARY KEY (`word`) USING BTREE
    ) ENGINE = InnoDB CHARACTER SET = utf8 COLLATE = utf8_general_ci 
    ROW_FORMAT = Compact;
    

    然后创建 jdbc sink:

      create table mysql_sink (
            word VARCHAR,
            `count` BIGINT,
            primary key(word) NOT ENFORCED
        ) with (...)
    

    完整的示例程序如下:

    exec_env = StreamExecutionEnvironment.get_execution_environment()
    exec_env.set_parallelism(1)
    t_config = TableConfig()
    t_env = StreamTableEnvironment.create(exec_env, t_config)
    
    my_source_ddl = """
        create table mySource (
            word VARCHAR
        ) with (
            'connector' = 'filesystem',
            'format' = 'csv',
            'path' = 'tmp/input.csv'
        )
    """
    mysql_sink_ddl = """
        create table mysql_sink (
            word VARCHAR,
            `count` BIGINT,
            primary key(word) NOT ENFORCED
        ) with (
            'connector.type' = 'jdbc',
            'connector.url' = 'jdbc:mysql://192.168.1.18:3306/hzgas_hand',
            'connector.table' = 'mysqlSink',
            'connector.driver' = 'com.mysql.jdbc.Driver',
            'connector.username' = 'hzgas',
            'connector.password' = '******'
        )
    """
    t_env.execute_sql(my_source_ddl)
    t_env.execute_sql(mysql_sink_ddl)
    t_env.from_path('mySource').group_by('word').select('word, count(1)').insert_into('mysql_sink')
    
    t_env.execute("tutorial_job")
    

    如果数据库表没有设置主键,结果数据会被追加到表,读者可以自行去验证。

    常见问题

    Queston0

    现象:在 Window 中通过 cygwin 部署 Flink,提示 JVM 参数错误。

    Improperly specified VM option 'MaxMetaspaceSize=268435456 ' Error: Could not create the Java Virtual Machine. Error: A fatal exception has occurred. Program will exit.
    

    Solution:这是 Flink 自动获取的 MaxMetaspaceSize 大小不合适造成的,需要手动在 flink-daemon.sh 文件修改该值。

    Queston1

    现象:通过 PyFlink 提交任务报错提示。

    class py4j.jvm.org....ResetClientCallBack dosnot exist in JVM!
    

    Solution:PyFlink 通过 Py4J 启动 Gateway 并跟远程服务通信,会通过 FLINK_HOME 环境变量引用 Flink 相关包,因此需要设置 FLINK_HOME,并保证 PyFlink 和 Flink 服务版本的一致。

    Queston2

    现象:PyFlink 使用 MySQL 存储时候报错。

    Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.factories.TableSinkFactory' in
    the classpath.
    

    Reason:

    No factory implements 'org.apache.flink.table.delegation.ExecutorFactory'.
    

    Solution:Flink 1.11 版本之前缺少 JDBC 驱动,需要将 Flink JDBC 部署到 Flink lib 路径下,而 1.12 版本不存在该问题。

    Queston3

    现象:Java 通过 jar 命令行运行报错。

    Caused by: org.apache.flink.table.api.NoMatchingTableFactoryException: Could not find a suitable table factory for 'org.apache.flink.table.delegation.ExecutorFactory' in the classpath.
    

    Solution:将包 flink-table-planner-blink_2.12-1.12.0.jar 放在类路径下。

    如:

    java -cp ~\.m2\org\apache\flink\flink-table-planner-blink_2.12\1.12.0\flink-table-planner-blink_2.12-1.12.0.jar main_class
    

    Queston4

    现象:控制台通过 Python 命令行运行脚本报错。

    Traceback (most recent call last):
      File "/usr/local/lib64/python3.6/site-packages/pyflink/util/exceptions.py", line 147, in deco
        return f(*a, **kw)
      File "/usr/local/lib/python3.6/site-packages/py4j/protocol.py", line 328, in get_return_value
        format(target_id, ".", name), value)
    py4j.protocol.Py4JJavaError: An error occurred while calling o23.execute.
    : org.apache.flink.table.api.TableException: BatchTableSink or OutputFormatTableSink required to emit batch Table.
    

    Solution:这种属于流和批环境的混用导致的。

    修改为:

    from pyflink.datastream  import StreamExecutionEnvironment
    from pyflink.table import TableConfig, StreamTableEnvironment
    
    exec_env = StreamExecutionEnvironment.get_execution_environment()
    exec_env.set_parallelism(1)
    t_config = TableConfig()
    t_env = StreamTableEnvironment.create(exec_env, t_config)
    

    Queston5

    现象:将结果数据写入 MySQL,报错提示找不到驱动类。

    Solution:在 MySQL 8.0 driver 名称为 com.mysql.cj.jdbc.Driver,在 MySQL 5.0 driver 名称为 com.mysql.jdbc.Driver。

    Queston6

    现象:进行分组查询写入文件系统报错。

    pyflink.util.exceptions.TableException: "AppendStreamTableSink doesn't support consuming update changes which is produced by node GroupAggregate(groupBy=[word], select=[word, COUNT($f1) AS EXPR$0])"
    

    Solution:toAppendStream 不支持消费更新和删除,使用 toRetractStream 或者更换支持更新操作的存储系统。

    参考链接

    • https://ci.apache.org/projects/flink/flink-docs-release-1.12/dev/table/streaming/dynamic_tables.html#dynamic-tables

    展开全文
  • flink实时流处理入门实战1-单词计数

    千次阅读 2019-01-31 18:07:21
    1、flink-1.7.1 编辑start-cluster.bat set "JAVA_HOME=C:\tools\JDK8" set "path=%JAVA_HOME%\bin;%PATH%"; set "CLASSPATH=%JAVA_HOME%\lib"; 2、创建flink项目 打开cmd黑屏 md ...

    1、flink-1.7.1
    编辑start-cluster.bat
    set "JAVA_HOME=C:\tools\JDK8"
    set "path=%JAVA_HOME%\bin;%PATH%";
    set "CLASSPATH=%JAVA_HOME%\lib";

    2、创建flink项目
    打开cmd黑屏
    md project
    cd project                           
    mvn archetype:generate -DarchetypeGroupId=org.apache.flink -DarchetypeArtifactId=flink-quickstart-java      -DarchetypeVersion=1.7.0          

    package cn.zsj;

    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.io.TextInputFormat;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.FileProcessingMode;

    public class FileStreamingJob {
        public static void main(String[] args) throws Exception {
            Path pa=new Path("C:\\tools\\flink-1.7.1\\project\\1.txt");
     
            TextInputFormat format = new TextInputFormat(pa);
     
            BasicTypeInfo typeInfo = BasicTypeInfo.STRING_TYPE_INFO;
     
            format.setCharsetName("UTF-8");
     
            StreamExecutionEnvironment env=StreamExecutionEnvironment.getExecutionEnvironment();
        /*
            方法内有三个属性,
            分别是WatchType.ONLY_NEW_FILES:处理整个文件,
            PROCESS_ONLY_APPENDED只处理相应的增加部分,
            REPROCESS_WITH_APPENDED :当文件内容增加了之后会重新处理整个文件。
            
            在新的flink版本里面,
            只有两种模式了:PROCESS_ONCE以及PROCESS_CONTINUOUSLY.分别对应处理一次以及增量处理。

        */
            DataStream<String> st=env.readFile(format,"C:\\tools\\flink-1.7.1\\project\\1.txt", 
            FileProcessingMode.PROCESS_CONTINUOUSLY, 
            1L,(TypeInformation)typeInfo);
     
            st.print();
     
            env.execute();
     
        }
     
    }

    3、构建项目
    mvn clean package


    4、打开http://localhost:8081
    提交jar包到平台,并运行

    5、编辑C:\\tools\\flink-1.7.1\\project\\1.txt,黑屏打印如下:
    sssfs
    safasdf
    werwrwr
    sfdsdf
    a
    sd
    asd

    -----------------分割线----------------------
    代码升级,实现计数功能

    package cn.zsj;

    import org.apache.flink.api.common.functions.FlatMapFunction;
    import org.apache.flink.api.common.typeinfo.BasicTypeInfo;
    import org.apache.flink.api.common.typeinfo.TypeInformation;
    import org.apache.flink.api.java.io.TextInputFormat;
    import org.apache.flink.api.java.tuple.Tuple2;
    import org.apache.flink.core.fs.Path;
    import org.apache.flink.streaming.api.datastream.DataStream;
    import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
    import org.apache.flink.streaming.api.functions.source.FileProcessingMode;
    import org.apache.flink.streaming.api.windowing.time.Time;
    import org.apache.flink.util.Collector;

    public class FileStreamingJob {
        public static void main(String[] args) throws Exception {
            Path pa = new Path("C:\\tools\\flink-1.7.1\\project\\1.txt");

            TextInputFormat format = new TextInputFormat(pa);

            BasicTypeInfo typeInfo = BasicTypeInfo.STRING_TYPE_INFO;

            format.setCharsetName("UTF-8");

            StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();
            /*
             * 方法内有三个属性, 分别是WatchType.ONLY_NEW_FILES:处理整个文件,
             * PROCESS_ONLY_APPENDED只处理相应的增加部分, REPROCESS_WITH_APPENDED
             * :当文件内容增加了之后会重新处理整个文件。 在新的flink版本里面,
             * 只有两种模式了:PROCESS_ONCE以及PROCESS_CONTINUOUSLY.分别对应处理一次以及增量处理。
             * “Filter”就类似于过滤。 “keyBy”就等效于SQL里的group by。
             * “aggregate”是一个聚合操作,如计数、求和、求平均等。 “reduce”就类似于MapReduce里的reduce。
             * “join”操作就有点类似于我们数据库里面的join。 “connect”实现把两个流连成一个流。
             * “repartition”是一个重新分区操作(还没研究)。 “project”操作就类似于SQL里面的snacks(还没研究)。
             */
            DataStream<String> st = env.readFile(format, "C:\\tools\\flink-1.7.1\\project\\1.txt",
                    FileProcessingMode.PROCESS_CONTINUOUSLY, 1L, (TypeInformation) typeInfo).flatMap(new Splitter())
                    .keyBy(0).sum(1);
            
            st.print();

            env.execute();

        }

    }

    class Splitter implements FlatMapFunction<String, Tuple2<String, Integer>> {
        private static final long serialVersionUID = 1L;

        @Override
        public void flatMap(String sentence, Collector<Tuple2<String, Integer>> out) throws Exception {
            for (String word : sentence.split(" ")) {
                System.out.println("wwww:" + word);
                out.collect(new Tuple2<String, Integer>(word, 1));
            }

        }
    }


    在1.txt输入一下内容:
    1231 嗯嗯 人人 eee fff
    asdfa asdfa


    黑屏打印如下:
    wwww:1231
    wwww:嗯嗯
    wwww:人人
    wwww:eee
    wwww:fff
    wwww:asdfa
    wwww:asdfa
    (1231,4)
    (嗯嗯,4)
    (人人,4)
    (eee,4)
    (fff,3)
    (asdfa,2)
    (asdfa,3)

    展开全文
  • flink 分布式流处理 流处理实时计算 有什么区别.pdf
  • Flink流处理

    2021-01-07 05:30:47
    在下文中,我们描述了流处理应用程序的这些构建块,并解释了Flink处理它们的方法。 流 显然,流是流处理的一个基本方面。但是,流可以具有不同的特征,这些特征会影响流的处理方式。Flink是一个多功能的处理框架,...
  • Apache Flink是一个开源的流处理框架,其核心是用Java和Scala编写的分布式流数据流 程序。 由Apache软件基金会开发。 == 分布式流处理 流处理是一种重要的大数据处理手段,其主要特点是其处理的数据是源源不断且...

    ==

    flink 

    Apache Flink是一个开源的流处理框架,其核心是用JavaScala编写的分布式流数据流 程序。

    Apache软件基金会开发。

    ==

    分布式流处理

    流处理是一种重要的大数据处理手段,其主要特点是其处理的数据是源源不断且实时到来的。

    =

    流处理和实时计算 有什么区别

     

     

    实时计算、离线计算、流式计算、批量计算 之间的关系

    • 离线和实时应该指的是:数据处理的延迟;
    • 批量和流式指的是:数据处理的方式。

    ==

    展开全文
  • Flink流处理的wordCount 对于Flink流处理模式,下面的代码块完整的说明了Flink处理wordCount的过程 import org.apache.flink.api.java.utils.ParameterTool import org.apache.flink.streaming.api.scala.{...
  • 第1章 状态化流处理概述 传统数据处理 绝大多数企业所实现的传统架构都会将数据处理分为两类: 事务型处理 分析型处理 事务型处理 企业在日常业务运营过程中会用到各类应用,例如:客户管理管理软件、基于Web的...
  • Flink常见流处理API

    千次阅读 2020-07-27 23:53:14
    Flink 流处理API的编程可以分为environment,source,transform,sink四大部分 1 Flink支持的数据类型   在Flink底层因为要对所有的数据序列化,反序列化对数据进行传输,以便通过网络传送它们,或者从状态后端、...
  • Flink流处理与批处理

    千次阅读 2019-09-10 14:13:09
    Flink流处理与批处理 Flink通过执行引擎,能够同时支持批处理与流处理任务。 在执行引擎这一层,流处理系统与批处理系统最大的不同在于节点的数据传输方式。 流处理系统 对于一个流处理系统,其节点间数据传输...
  • 基于ApacheFlink的流处理原版完整pdf java开发 大数据从业人员 流处理
  • Flink实时数据处理实践经验 文章目录Flink实时数据处理实践经验1....Flink实现结果实时展示(一些简单的分组聚合逻辑)可以直接在中分组聚合写入Redis/MySQL,而需要在线实时多维分析需要将数据存入Cl
  • Flink实时处理并将结果写入ElasticSearch实战1 需求分析2 Flink实时处理2.1 版本说明2.2 项目结构2.3 程序代码3 Elasticsearch准备...使用Flink实时数据进行实时处理,并将处理后的结果保存到Elasticsearch中,...
  • 5.流处理结构的演变 5.1 第一代:有状态的流式处理 5.2 第二代:lambda架构 5.3 第三代:Flink架构 6.Flink的主要特点 6.1事件驱动型(Event-driven) 6.2 流与批的世界观 6.3 分层API 7.Flink的其他特点 8....
  • Flink--实时流处理框架

    2020-06-02 16:31:49
    上图是flink的工作流程 首先,flink从source中...然后,获取到的数据(dataStream) 按需来进行代码运算 最后,将上一步运算的结果数据sink到落地的模块中,模块可以是数据库,也可以是服务(用户预警报警模板)。 ...
  • Flink实时处理Socket数据

    千次阅读 2020-03-10 23:36:25
    Apache Flink是一个面向分布式数据流处理和批量数据处理的开源计算平台,提供支持流处理和批处理两种类型应用的功能 Flink实时处理Socket数据 Flink Socket 源码GitHub 通过 Maven Archetype 创建项目 创建项目 mvn...
  • flink实时流处理案例添加依赖编写程序为程序添加参数host,port本机cmd开启相对应的端口(window得提前安装nc 命令)运行程序,在cmd上随便打,控制台可实时计算统计 添加依赖 <!-- ...
  • ​阿里巴巴双11大屏在峰值期间可以承担每秒超过4.72亿次的访问,这是多高的访问量……为什么学习Flink?...Flink认为有界数据集是无界数据的一种特例,所以说有界数据集也是一种数据,事件也是
  • org.apache.flink</groupId> <artifactId>flink-connector-kafka-0.11_2.11</artifactId> <version>1.10.0</version> </dependency> Kafka 版本为 2.0 以上 <dependency>...
  • 本文是作者在用 flink 开发实时流数据的时候,对 flink 的一些总结经验,其中会重点讲到 “数据倾斜” 的解决方案 + 源码 文章结构: 1:flink本地如何模拟topic的实时流 2:flink在遇到数据倾斜的时候,应该怎么...
  • Flink流处理上常见的sink Flink将数据进行sink操作到本地文件/本地集合/HDFS等和之前的批处理操作一致 sink 到 kafka (读取mysql数据,落地到kafka) 创建kafka的topic:kafka-console-consumer.sh --from-...
  • Apache Flink是一个分布式流处理引擎,提供了直观而富有表现力的api,以此来实现有状态的流处理应用程序。它以一种容错的方式有效地在大规模集群上运行这样的应用程序。2014年4月,Flink加入了Apache软件基金会作为...
  • Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. ... ... ... Caused by: java.net.ConnectException: Connection refused: connect //报错代码 env....
  • flink流处理初始demo

    2020-03-27 15:30:44
    flink流处理初始demo,可以在此基础上编写自己的flink程序。内包含flink流处理程序基本框架和批处理程序基本框架。
  • Apache Flink流处理中Window的概念

    千次阅读 2016-06-07 16:09:55
    Apache Flink流处理中Window的概念1、什么是Window?有哪些用途? 下面我们结合一个现实的例子来说明。我们先提出一个问题:统计经过某红绿灯的汽车数量之和? 假设在一个红绿灯处,我们每隔15秒统计一次通过此...
  • 但是对于流处理(DataStream),Flink同样提供了对迭代的支持,这一节我们主要来分析流处理中的迭代,我们将会看到流处理中的迭代相较于批处理有相似之处,但差异也是十分之明显。 可迭代的流处理程序允许定义“步...
  • 但随着数据的不断增长,新技术的不断发展,人们逐渐意识到对实时数据处理的重要性,企业需要能够同时支持高吞吐、低延迟、高性能的流处理技术来处理日益增长的数据。 相对于传统的数据处理模式,流式数据处理则有着...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 22,929
精华内容 9,171
关键字:

flink实时流处理