精华内容
下载资源
问答
  • 摘要:微博机器学习研发中心数据计算负责人,高级系统工程师曹富强为大家带来 Flink 实时计算在微博的应用介绍。内容包括:微博介绍数据计算平台介绍Flink 在数据计算平台的典型应用Tip...

    摘要:微博机器学习研发中心数据计算负责人,高级系统工程师曹富强为大家带来 Flink 实时计算在微博的应用介绍。内容包括:

    1. 微博介绍

    2. 数据计算平台介绍

    3. Flink 在数据计算平台的典型应用

    Tips:点击文末阅读原文即可回顾作者原版分享视频~

     GitHub 地址 

    https://github.com/apache/flink

    欢迎大家给 Flink 点赞送 star~

    一、微博介绍


    本次给大家带来的分享是 Flink 实时计算在微博的应用。微博是中国领先的社交媒体平台,目前的日活跃用户是 2.41 亿,月活跃用户是 5.5 亿,其中移动用户占比超过了 94%。

    二、数据计算平台介绍


    1. 数据计算平台概况

    下图为数据计算平台的架构图。

    • 首先是调度,这块基于 K8s 和 Yarn 分别部署了实时数据处理的 Flink、Storm,以及用于离线处理的 SQL 服务。

    • 在集群之上,我们部署了微博的 AI 平台,通过这个平台去对作业、数据、资源、样本等进行管理。

    • 在平台之上我们构建了一些服务,通过服务化的方式去支持各个业务方。

      • 实时计算这边的服务主要包括数据同步、内容去重、多模态内容理解、实时特征生成、实时样本拼接、流式模型训练,这些是跟业务关系比较紧密的服务。另外,还支持 Flink 实时计算和 Storm 实时计算,这些是比较通用的基础计算框架。

      • 离线这部分,结合 Hive 的 SQL,SparkSQL 构建一个 SQL 计算服务,目前已经支持了微博内部绝大多数的业务方。

    • 数据的输出是采用数仓、特征工程这些数据中台的组建,对外提供数据输出。整体上来说,目前我们在线跑的实时计算的作业将近 1000 多个,离线作业超过了 5000 多个,每天处理的数据量超过了 3 PB。

    2. 数据计算

    下面两张图是数据计算,其中一个是实时计算,另外一个是离线计算。

    • 实时计算主要包括实时的特征生成,多媒体特征生成和实时样本生成,这些跟业务关系比较紧密。另外,也提供一些基础的 flink 实时计算和 storm 实时计算。

    • 离线计算主要包括 SQL 计算。主要包括 SQL 的即席查询、数据生成、数据查询和表管理。表管理主要就是数仓的管理,包括表的元数据的管理,表的使用权限,还有表的上下游的血缘关系。


    3. 实时特征

    如下图所示,我们基于 Flink 和 Storm 构建了一个实时特征生成的服务。整体上来说,它会分为作业详情、输入源特征生成、输出和资源配置。用户按照我们事先定义好的接口去开发特征生成的 UDF 就可以。其他的像输入、特征写入,都是平台自动提供的,用户只需要在页面上配置就好。另外,平台会提供输入数据源的监控、作业的异常监控、特征写入监控、特征读取监控等,这些都是自动生成的。


    4. 流批一体

    下面介绍我们基于 FlinkSQL 构建的批流一体。首先,我们会统一元数据,将实时日志跟离线日志通过元数据管理平台去统一。统一之后,用户在提交作业的时候,我们会有一个统一的调度层。调度这一块,是根据作业的类型,作业的特点,目前集群的负载的情况,将作业调度到不同的集群上去。

    目前调度层支持的计算引擎主要就是 HiveSQL,SparkSQL 跟 FlinkSQL。Hive 和 Spark 的 SQL 主要用于批量计算,FlinkSQL 是做批流混跑。整个结果会输出到数据仓库中,提供给业务方使用。批流一体这块大概有 4 个关键点:

    • 第一,批流代码统一,提高开发效率。

    • 第二,批流元数据统一。统一管理,保证元数据一致。

    • 第三,批流程序混跑,节省资源。

    • 第四,批流统一调度,提高集群利用率。


    5. 数据仓库

    • 针对离线仓库,我们把数据分成了三层,一个是原始日志,另外一个是中间层,还有一个是数据服务层。中间是元数据的统一,下边是实时数仓。

    • 针对实时数仓,我们通过 FlinkSQL 对这些原始日志做流式的一个 ETL,再通过一个流式汇总将最终的数据结果写到数据的服务层,同时也会把它存储到各种实时存储,比如 ES、Hbase、Redis、ClickHouse 中去。我们可以通过实时存储对外提供数据的查询。还提供数据进一步数据计算的能力。也就是说,建立实时数仓主要是去解决离线特征生成的周期长的问题。另外就是使用 FlinkSQL 去解决 streaming 作业开发周期比较长的问题。其中的一个关键点还是离线数仓跟实时数仓的元数据的管理。


    三、Flink 在数据计算平台的典型应用


    1. 流式机器学习

    首先介绍流式机器学习的几个特点,最大的特点就是实时化。这块分为特征的实时化和模型的实时化。

    • 特征实时化主要是为了更及时的去反馈用户行为,更细粒度的去刻画用户。

    • 模型实时化是要根据线上样本实时训练模型,及时反映对象的线上变化情况。

    ■   微博流式机器学习的特点:

    • 样本的规模大,目前的实时样本能达到百万级别的 qps。

    • 模型的规模大。模型训练参数这块,整个框架会支持千亿级别的训练规模。

    • 对作业的稳定性要求比较高。

    • 样本的实时性要求高。

    • 模型的实时性高。

    • 平台业务需求多。

    ■   流式机器学习有几个比较难的问题:

    • 一个就是全链路,端到端的链路是比较长的。比如说,一个流式机器学习的流程会从日志收集开始,到特征生成,再到样本生成,然后到模型训练,最终到服务上线,整个流程非常长。任何一个环节有问题,都会影响到最终的用户体验。所以我们针对每一个环节都部署了一套比较完善的全链路的监控系统,并且有比较丰富的监控指标。

    • 另外一个是它的数据规模大,包括海量的用户日志,样本规模和模型规模。我们调研了常用的实时计算框架,最终选择了 Flink 去解决这个问题。

    ■   流失机器学习流程:

    • 首先是离线训练,我们拿到离线日志,去离线的生成样本之后,通过Flink去读取样本,然后去做离线训练。训练完成之后把这些训练的结果参数保存在离线的参数服务器中。这个结果会作为模型服务的 Base 模型,用于实时的冷启动。

    • 然后是实时的流式机器学习的流程。我们会去拉取实时的日志,比如说微博的发布内容,互动日志等。拉取这些日志之后,使用 Flink 去生成它的样本,然后做实时的训练。训练完成之后会把训练的参数保存在一个实时的参数服务器中,然后会定期的从实时的参数服务器同步到实时的参数服务器中。

    • 最后是模型服务这一块,它会从参数服务中拉取到模型对应的那些参数,去推荐用户特征,或者说物料的特征。通过模型对用户和物料相关的特征、行为打分,然后排序服务会调取打分的结果,加上一些推荐的策略,去选出它认为最适合用户的这一条物料,并反馈给用户。用户在客户端产生一些互动行为之后,又发出新的在线请求,产生新的日志。所以整个流式学习的流程是一个闭环的流程。

    另外,

    • 离线的样本的延时和模型的更新是天级或者小时级,而流式则达到了小时级或者分钟级;

    • 离线模型训练的计算压力是比较集中的,而实时的计算压力比较分散。

    ■   样本

    这里简单介绍一下我们流式机器学习样本的发展历程。2018 年 10 月,我们上线了第一个流式样本作业,是通过 Storm 和外部存储 Redis 去做的。2019 年 5 月,我们使用新的实时计算框架 Flink,采用 union+timer 方案替代 window 计算来实现多个数据流的 join 操作。2019 年 10月,上线了一个xx样本作业,单个作业的 qps 达到了几十万。在今年 4 月份,把样本生成流程平台化。到今年 6 月份,平台化做了一个迭代,支持样本的落盘,包括样本库,还有样本的各种监控指标的完善。

    流式机器学习所谓的样本生成,其实就是多个数据流按照相同的 key 做一个拼接。比如说,我们有三个数据流,数据清洗后的结果存储为 <k , v>, k 是聚合的 key,v 是样本中需要的值。数据 union 后做 KeyBy 聚合,聚合后将数据存储在内存区域 value state 中。如下图所示:

    • 如果 k1 不存在,则注册 timer,再存到 state 中。

    • 如果 k1 存在,就从 state 中把它给拿出来,更新之后再存进去。到最后它的 timer 到期之后,就会将这条数据输出,并且从 state 中清除掉。

    ■   样本平台

    我们把整个样本拼接的过程做了一个平台化的操作,分成了 5 个模块,包括输入、数据清洗、样本拼接、样本的格式化和输出。基于平台化开发,用户只需要关心业务逻辑部分即可。需要用户开发的有:

    • 对应输入数据的数据清洗逻辑。

    • 样本输出前的数据格式化逻辑。

    其余的在UI上配置即可实现,具体有:

    • 样本拼接的时间窗口。

    • 窗口内对字段的聚合操作。

    资源由平台方审核并配置。另外,整个平台提供基础的一些监控,包括输入数据的监控、样本指标的监控、作业异常监控、样本输出量的监控。

    ■   流式机器学习项目的样本 UI

    下图为流式机器学习项目的样本。左边是样本生成的作业配置,右边是样本库。样本库主要是做样本的管理展示,包括样本的说明权限,样本的共享情况等等。

    ■   流失机器学习的应用

    最后介绍一下流式机器学习应用的效果。目前我们支持实时样本拼接,QPS 达到百万级别。支持流式模型训练,可以同时支持几百个模型训练,模型实时性支持小时级/分钟级 模型更新。流式学习全流程容灾,支持全链路自动监控。近期在做的一个事情是流式的深度学习,增加实时模型的表达能力。还有强化学习这一块,探索一些新的应用场景。


    2. 多模态内容理解

    ■   简介

    多模态就是使用机器学习的一些方法去实现或者理解多元模态信息的能力或者技术。微博的这块主要包括图片、视频、音频、文本。

    • 图片这块包括,物体识别打标签、OCR、人脸、明星、颜值、智能裁剪。

    • 视频这块包括版权检测、logo 识别。

    • 音频这块有,语音转文本、音频的标签。

    • 文本主要包括文本的分词、文本的时效性、文本的分类标签。

    举个例子,我们一开始做视频分类的时候只用到了视频抽帧后的那些帧,也就是图片。后来第二次优化的时候,加入了音频相关的东西,还有视频对应的博文相关的东西,相当于把音频、图片、文本,多模态的融合考虑,更精准的去生成这个视频的分类标签。

    ■   平台

    下图为多模态内容理解的平台架构。中间这部分是 Flink 实时计算,实时的接收图片流、视频流、发博流这些数据,然后通过模型插件调用下边的基础服务,深度学习模型服务。调用服务之后,会返回内容特征。然后我们把特征存储到特征工程,通过数据中台对外提供给各个业务方。整个作业运行过程中全链路监控报警,异常情况第一时间响应。平台自动提供日志收集,指标统计,CASE 追踪等功能。中间这一块使用 zk 做服务发现,解决实时计算和深度学习模型之间服务状态同步的问题。另外,除了状态同步,也会有一些负载均衡的策略。最下边就是使用数据-对账系统,进一步提高数据处理成功率。

    ■   UI

    多模态内容理解的 UI,主要包括作业信息、输入源信息、模型信息、输出信息、资源配置。这块通过配置化的开发,去提高开发效率。然后会自动生成模型调用的一些监控指标,包括模型调用的成功率和耗时。当作业提交之后,会自动生成一个用于指标统计的作业。


    3. 内容去重服务

    ■   背景

    在推荐场景下,如果给用户一直推重复的内容,是很影响用户体验的。基于这个考虑,结合 Flink 实时流计算平台、分布式向量检索系统和深度学习模型服务构建的一套内容去重服务平台,具有低延迟、高稳定性、高召回率的特点。目前支持多个业务方,稳定性达到 99.9+%。

    ■   架构

    下图为内容去重服务的架构图。最下边是多媒体的模型训练。这块做离线的训练。比如说我们会拿到一些样本数据,然后去做样本处理,样本处理完之后把样本存到样本库中去。当我需要做模型训练的时候,我从样本库中去拉取样本,然后做模型训练,训练好的结果会保存到模型库中去。

    内容去重这里主要用到的模型是向量生成模型。包括图片的向量、文本的向量、视频的向量。

    当我们把训练好的模型验证没有问题之后,会把这个模型保存到模型库中。模型库保存了模型的一些基础信息,包括模型的运行环境、版本。然后需要把模型部署上线,部署的过程需要从模型库中拉取模型,同时需要知道这个模型的运行的一些技术环境。

    模型部署好之后,我们会通过 Flink 实时的从物料库中读取物料,然后调用多媒体预估服务去生成这些物料对应的向量。然后会把这些向量保存在 Weiss 库中,它是微博自研的一个向量召回检索系统。存到 Weiss 库中之后会对这条物料做向量召回的过程,召回跟这条物料相似的一批物料。在精排比对这块,会从所有的召回结果中加上一定的策略,选出最相似的那一条。然后把最相似的这一条跟当前物料聚合到一起,形成一个内容 ID。最后业务去用的时候,也是通过物料对应的内容 ID 做去重。

    ■   应用

    内容去重的应用场景,主要业务场景有三个:

    • 第一,支持视频版权 - 盗版视频识别 - 稳定性 99.99%,盗版识别率 99.99%。

    • 第二,支持全站微博视频去重 - 推荐场景应用 - 稳定性 99.99%,处理延迟秒级。

    • 第三,推荐流物料去重 - 稳定性 99%,处理延迟秒级,准确率达到 90%

    ■   最后

    我们通过将 Flink 实时流计算框架跟业务场景相结合,在平台化、服务化方面做了很大的工作,在开发效率、稳定性方面也做了很多优化。我们通过模块化设计和平台化开发,提高开发效率。目前实时数据计算平台自带全链路监控,数据指标统计和 debug case 追踪(日志回看)系统。另外,基于 FlinkSQL 在批流一体这块目前也有一定的应用。这些都是 Flink  给我们带来的一些新的变化,我们会持续不断的探索 Flink 在微博中更大的应用空间。

    更多 Flink 相关技术交流,可扫码加入社区钉钉大群~


      戳我,回顾作者分享视频!

    展开全文
  • Flink 实时计算 - 维表 Join 解读

    千次阅读 2021-02-12 18:57:30
    Flink 实时计算 - 维表 Join 解读 前言 Flink 1.9 版本可以说是一个具有里程碑意义的版本,其内部合入了很多 Blink Table/SQL 方面的功能,同时也开始增强 Flink 在批处理方面的能力,真的是向批流统一的终极方向...

    Flink 实时计算 - 维表 Join 解读

    前言

    Flink 1.9 版本可以说是一个具有里程碑意义的版本,其内部合入了很多 Blink Table/SQL 方面的功能,同时也开始增强 Flink 在批处理方面的能力,真的是向批流统一的终极方向开始前进。Flink 1.9 版本在 8.22 号也终于发布了。本文主要介绍学习 Flink SQL 维表 Join,维表 Join 对于SQL 任务来说,一般是一个很正常的功能,本文给出代码层面的实现,和大家分享用户如何自定义 Flink 维表。

    1. 什么是维表

    维表作为 SQL 任务中一种常见表的类型,其本质就是关联表数据的额外数据属性,通常在 Join 语句中进行使用。比如源数据有人的身份证号,人名,你现在想要得到人的家庭地址,那么可以通过身份证号去关联人的身份证信息,就可以得到更全的数据。
    在这里插入图片描述
    维表可以是静态的数据,也可以是动态的数据(比如定时更新的数据),一般会通过特定的主键来进行关联。它可以在 Mysql 中进行存储,也可以在 Nosql 数据库中进行存储,比如 HBase等。

    2. Flink 中的维表

    Flink 1.9 中维表功能来源于新加入的Blink中的功能,如果你要使用该功能,那就需要自己引入 Blink 的 Planner,而不是引用社区的 Planner。由于新合入的 Blink 相关功能,使得 Flink 1.9 实现维表功能很简单,只要自定义实现 LookupableTableSource 接口,同时实现里面的方法就可以进行,下面来分析一下 LookupableTableSource的代码:

    public interface LookupableTableSource<T> extends TableSource<T> {
    	TableFunction<T> getLookupFunction(String[] lookupKeys);
    	AsyncTableFunction<T> getAsyncLookupFunction(String[] lookupKeys);
    	boolean isAsyncEnabled();
    }
    

    isAsyncEnabled 方法主要表示该表是否支持异步访问外部数据源获取数据,当返回 true 时,那么在注册到 TableEnvironment 后,使用时会返回异步函数进行调用,当返回 false 时,则使同步访问函数。

    可以看到 LookupableTableSource 这个接口中有三个方法

    getLookupFunction 方法返回一个同步访问外部数据系统的函数,什么意思呢,就是你通过 Key 去查询外部数据库,需要等到返回数据后才继续处理数据,这会对系统处理的吞吐率有影响。

    getAsyncLookupFunction 方法则是返回一个异步的函数,异步访问外部数据系统,获取数据,这能极大的提升系统吞吐率。具体是否要实现异步函数方法,这需要用户自己判定是否需要对异步访问的支持,如果同步方法的吞吐率已经满足要求,那可以先不用考虑异步的实现情况。

    2.2 同步访问函数getLookupFunction

    getLookupFunction 会返回同步方法,这里你需要自定义 TableFuntion 进行实现,TableFunction 本质是 UDTF,输入一条数据可能返回多条数据,也可能返回一条数据。用户自定义 TableFunction 格式如下:

    public class MyLookupFunction extends TableFunction<Row> {
    	@Override
    	public void open(FunctionContext context) throws Exception {
    	super.open(context);
    	}
    	public void eval(Object... paramas) {
    	}
    }
    

    open 方法在进行初始化算子实例的进行调用,异步外部数据源的client要在类中定义为 transient,然后在 open 方法中进行初始化,这样每个任务实例都会有一个外部数据源的 client。防止同一个 client 多个任务实例调用,出现线程不安全情况。

    eval 则是 TableFunction 最重要的方法,它用于关联外部数据。当程序有一个输入元素时,就会调用eval一次,用户可以将产生的数据使用 collect() 进行发送下游。paramas 的值为用户输入元素的值,比如在 Join 的时候,使用 A.id = B.id and A.name = b.name, B 是维表,A 是用户数据表,paramas 则代表 A.id,A.name 的值。

    2.3 异步访问函数

    getAsyncLookupFunction 会返回异步访问外部数据源的函数,如果你想使用异步函数,前提是 LookupableTableSource 的 isAsyncEnabled 方法返回 true 才能使用。

    使用异步函数访问外部数据系统,一般是外部系统有异步访问客户端,如果没有的话,可以自己使用线程池异步访问外部系统。至于为什么使用异步访问函数,无非就是为了提高程序的吞吐量,不需要每条记录访问返回数据后,才去处理下一条记录。

    异步函数格式如下:

    public class MyAsyncLookupFunction extends AsyncTableFunction<Row> {
    	@Override
    	public void open(FunctionContext context) throws Exception {
    	super.open(context);
    	}
    	public void eval(CompletableFuture<Collection<Row>> future, Object... params) {
    	}
    }
    

    维表异步访问函数总体和同步函数实现类似,这里说一下注意点:

    1. 外部数据源异步客户端初始化。如果是线程安全的(多个客户端一起使用),你可以不加 transient 关键字,初始化一次。否则,你需要加上 transient,不对其进行初始化,而在 open 方法中,为每个 Task 实例初始化一个。

    2. eval 方法中多了一个 CompletableFuture,当异步访问完成时,需要调用其方法进行处理.

    为了减少每条数据都去访问外部数据系统,提高数据的吞吐量,一般我们会在同步函数和异步函数中加入缓存,如果以前某个关键字访问过外部数据系统,我们将其值放入到缓存中,在缓存没有失效之前,如果该关键字再次进行处理时,直接先访问缓存,有就直接返回,没有再去访问外部数据系统,然后在进行缓存,进一步提升我们实时程序处理的吞吐量。

    一般缓存类型有以下几种类型:

    1. 数据全部缓存,定时更新。
    2. LRU Cache,设置一个超时时间。
    3. 用户自定义缓存。

    3. 总结

    Flink 在 1.9 版本开源出维表功能,用户可以结合自己的具体需求,自定义的去开发维表。Flink 1.9 版本在Flink SQL方面的开源出很多功能,用户可以自己选择具体 Planner进行使用,社区的Planner、Blink的 Planner。希望 Flink 在未来越来越好。

    参考:https://blog.csdn.net/u012554509/article/details/100533749

    展开全文
  • 实时统计pv、uv是再常见不过的大数据统计需求了,前面出过一篇SparkStreaming实时统计pv,uv的案例,这里用Flink实时计算pv,uv。我们需要统计不同数据类型每天的pv,...

    实时统计pv、uv是再常见不过的大数据统计需求了,前面出过一篇SparkStreaming实时统计pv,uv的案例,这里用Flink实时计算pv,uv。

    我们需要统计不同数据类型每天的pv,uv情况,并且有如下要求.

    • 每秒钟要输出最新的统计结果;

    • 程序永远跑着不会停,所以要定期清理内存里的过时数据;

    • 收到的消息里的时间字段并不是按照顺序严格递增的,所以要有一定的容错机制;

    • 访问uv并不一定每秒钟都会变化,重复输出对IO是巨大的浪费,所以要在uv变更时在一秒内输出结果,未变更时不输出;

    Flink数据流上的类型和操作

    DataStream是flink流处理最核心的数据结构,其它的各种流都可以直接或者间接通过DataStream来完成相互转换,一些常用的流直接的转换关系如图:

    可以看出,DataStream可以与KeyedStream相互转换,KeyedStream可以转换为WindowedStream,DataStream不能直接转换为WindowedStream,WindowedStream可以直接转换为DataStream。各种流之间虽然不能相互直接转换,但是都可以通过先转换为DataStream,再转换为其它流的方法来实现。

    在这个计算pv,uv的需求中就主要用到DataStream、KeyedStream以及WindowedStream这些数据结构。

    这里需要用到window和watermark,使用窗口把数据按天分割,使用watermark可以通过“水位”来定期清理窗口外的迟到数据,起到清理内存的作用。

    业务代码

    我们的数据是json类型的,含有date,version,guid这3个字段,在实时统计pv,uv这个功能中,其它字段可以直接丢掉,当然了在离线数据仓库中,所有有含义的业务字段都是要保留到hive当中的。其它相关概念就不说了,会专门介绍,这里直接上代码吧。

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
        <modelVersion>4.0.0</modelVersion>
    
        <groupId>com.ddxygq</groupId>
        <artifactId>bigdata</artifactId>
        <version>1.0-SNAPSHOT</version>
    
        <properties>
            <scala.version>2.11.8</scala.version>
            <flink.version>1.7.0</flink.version>
            <pkg.name>bigdata</pkg.name>
        </properties>
    
        <dependencies>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-scala_2.11</artifactId>
                <version>{flink.version}</version>
      </dependency>
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-scala_2.11</artifactId>
                <version>flink.version</version>
      </dependency>
      
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-streaming-java_2.11</artifactId>
                <version>{flink.version}</version>
            </dependency>
            <!-- https://mvnrepository.com/artifact/org.apache.flink/flink-connector-kafka-0.8 -->
            <dependency>
                <groupId>org.apache.flink</groupId>
                <artifactId>flink-connector-kafka-0.10_2.11</artifactId>
                <version>flink.version</version>
      </dependency>
    
        <build>
            <!--测试代码和文件-->
            <!--<testSourceDirectory>{basedir}/src/test</testSourceDirectory>-->
            <finalName>basedir/src/test</testSourceDirectory>−−><finalName>{pkg.name}</finalName>
            <sourceDirectory>src/main/java</sourceDirectory>
            <resources>
                <resource>
                    <directory>src/main/resources</directory>
                    <includes>
                        <include>*.properties</include>
                        <include>*.xml</include>
                    </includes>
                    <filtering>false</filtering>
                </resource>
            </resources>
            <plugins>
                <!-- 跳过测试插件-->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-surefire-plugin</artifactId>
                    <configuration>
                        <skip>true</skip>
                    </configuration>
                </plugin>
                <!--编译scala插件-->
                <plugin>
                    <groupId>org.scala-tools</groupId>
                    <artifactId>maven-scala-plugin</artifactId>
                    <version>2.15.2</version>
                    <executions>
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </build>
    </project>
    

    主要代码,主要使用scala开发:

    package com.ddxygq.bigdata.flink.streaming.pvuv
    
    import java.util.Properties
    
    import com.alibaba.fastjson.JSON
    import org.apache.flink.runtime.state.filesystem.FsStateBackend
    import org.apache.flink.streaming.api.CheckpointingMode
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.extensions._
    import org.apache.flink.api.scala._
    
    /**
      * @ Author: keguang
      * @ Date: 2019/3/18 17:34
      * @ version: v1.0.0
      * @ description: 
      */
    object PvUvCount {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        // 容错
        env.enableCheckpointing(5000)
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        env.setStateBackend(new FsStateBackend("file:///D:/space/IJ/bigdata/src/main/scala/com/ddxygq/bigdata/flink/checkpoint/flink/tagApp"))
    
        // kafka 配置
        val ZOOKEEPER_HOST = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
        val KAFKA_BROKERS = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
        val TRANSACTION_GROUP = "flink-count"
        val TOPIC_NAME = "flink"
        val kafkaProps = new Properties()
        kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
        kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKERS)
        kafkaProps.setProperty("group.id", TRANSACTION_GROUP)
    
        // watrmark 允许数据延迟时间
        val MaxOutOfOrderness = 86400 * 1000L
        
        // 消费kafka数据
        val streamData: DataStream[(String, String, String)] = env.addSource(
          new FlinkKafkaConsumer010[String](TOPIC_NAME, new SimpleStringSchema(), kafkaProps)
        ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(MaxOutOfOrderness)) {
          override def extractTimestamp(element: String): Long = {
            val t = JSON.parseObject(element)
            val time = JSON.parseObject(JSON.parseObject(t.getString("message")).getString("decrypted_data")).getString("time")
            time.toLong
          }
        }).map(x => {
          var date = "error"
          var guid = "error"
          var helperversion = "error"
          try {
            val messageJsonObject = JSON.parseObject(JSON.parseObject(x).getString("message"))
            val datetime = messageJsonObject.getString("time")
            date = datetime.split(" ")(0)
            // hour = datetime.split(" ")(1).substring(0, 2)
            val decrypted_data_string = messageJsonObject.getString("decrypted_data")
            if (!"".equals(decrypted_data_string)) {
              val decrypted_data = JSON.parseObject(decrypted_data_string)
              guid = decrypted_data.getString("guid").trim
              helperversion = decrypted_data.getString("helperversion")
            }
          } catch {
            case e: Exception => {
              println(e)
            }
          }
          (date, helperversion, guid)
        })
        // 这上面是设置watermark并解析json部分
        // 聚合窗口中的数据,可以研究下applyWith这个方法和OnWindowedStream这个类
        val resultStream = streamData.keyBy(x => {
          x._1 + x._2
        }).timeWindow(Time.days(1))
          .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
          .applyWith(("", List.empty[Int], Set.empty[Int], 0L, 0L))(
            foldFunction = {
              case ((_, list, set, _, 0), item) => {
                val date = item._1
                val helperversion = item._2
                val guid = item._3
                (date + "_" + helperversion, guid.hashCode +: list, set + guid.hashCode, 0L, 0L)
              }
            }
            , windowFunction = {
              case (key, window, result) => {
                result.map {
                  case (leixing, list, set, _, _) => {
                    (leixing, list.size, set.size, window.getStart, window.getEnd)
                  }
                }
              }
            }
          ).keyBy(0)
          .flatMapWithState[(String, Int, Int, Long, Long),(Int, Int)]{
          case ((key, numpv, numuv, begin, end), curr) =>
    
            curr match {
              case Some(numCurr) if numCurr == (numuv, numpv) =>
                (Seq.empty, Some((numuv, numpv))) //如果之前已经有相同的数据,则返回空结果
              case _ =>
                (Seq((key, numpv, numuv, begin, end)), Some((numuv, numpv)))
            }
        }
    
        // 最终结果
        val resultedStream = resultStream.map(x => {
          val keys = x._1.split("_")
          val date = keys(0)
          val helperversion = keys(1)
          (date, helperversion, x._2, x._3)
        })
    
        resultedStream.print()
        env.execute("PvUvCount")
    
      }
    }
    

    使用List集合的size保存pv,使用Set集合的size保存uv,从而达到实时统计pv,uv的目的。

    这里用了几个关键的函数:

    applyWith:里面需要的参数,初始状态变量,和foldFunction ,windowFunction ;

    存在的问题

    显然,当数据量很大的时候,这个List集合和Set集合会很大,并且这里的pv是否可以不用List来存储,而是通过一个状态变量,不断做累加,对应操作就是更新状态来完成。

    改进版

    使用了一个计数器来存储pv的值。

    packagecom.ddxygq.bigdata.flink.streaming.pvuv
    
    import java.util.Properties
    
    import com.alibaba.fastjson.JSON
    import org.apache.flink.api.common.accumulators.IntCounter
    import org.apache.flink.runtime.state.filesystem.FsStateBackend
    import org.apache.flink.streaming.api.CheckpointingMode
    import org.apache.flink.streaming.api.functions.timestamps.BoundedOutOfOrdernessTimestampExtractor
    import org.apache.flink.streaming.api.scala.{DataStream, StreamExecutionEnvironment}
    import org.apache.flink.streaming.api.windowing.time.Time
    import org.apache.flink.streaming.api.windowing.triggers.ContinuousProcessingTimeTrigger
    import org.apache.flink.streaming.connectors.kafka.FlinkKafkaConsumer010
    import org.apache.flink.streaming.util.serialization.SimpleStringSchema
    import org.apache.flink.streaming.api.scala.extensions._
    import org.apache.flink.api.scala._
    import org.apache.flink.core.fs.FileSystem
    
    object PvUv2 {
      def main(args: Array[String]): Unit = {
        val env = StreamExecutionEnvironment.getExecutionEnvironment
    
        // 容错
        env.enableCheckpointing(5000)
        env.getCheckpointConfig.setCheckpointingMode(CheckpointingMode.EXACTLY_ONCE)
        env.setStateBackend(new FsStateBackend("file:///D:/space/IJ/bigdata/src/main/scala/com/ddxygq/bigdata/flink/checkpoint/streaming/counter"))
    
        // kafka 配置
        val ZOOKEEPER_HOST = "hadoop01:2181,hadoop02:2181,hadoop03:2181"
        val KAFKA_BROKERS = "hadoop01:9092,hadoop02:9092,hadoop03:9092"
        val TRANSACTION_GROUP = "flink-count"
        val TOPIC_NAME = "flink"
        val kafkaProps = new Properties()
        kafkaProps.setProperty("zookeeper.connect", ZOOKEEPER_HOST)
        kafkaProps.setProperty("bootstrap.servers", KAFKA_BROKERS)
        kafkaProps.setProperty("group.id", TRANSACTION_GROUP)
    
        // watrmark 允许数据延迟时间
        val MaxOutOfOrderness = 86400 * 1000L
    
        val streamData: DataStream[(String, String, String)] = env.addSource(
          new FlinkKafkaConsumer010[String](TOPIC_NAME, new SimpleStringSchema(), kafkaProps)
        ).assignTimestampsAndWatermarks(new BoundedOutOfOrdernessTimestampExtractor[String](Time.milliseconds(MaxOutOfOrderness)) {
          override def extractTimestamp(element: String): Long = {
            val t = JSON.parseObject(element)
            val time = JSON.parseObject(JSON.parseObject(t.getString("message")).getString("decrypted_data")).getString("time")
            time.toLong
          }
        }).map(x => {
          var date = "error"
          var guid = "error"
          var helperversion = "error"
          try {
            val messageJsonObject = JSON.parseObject(JSON.parseObject(x).getString("message"))
            val datetime = messageJsonObject.getString("time")
            date = datetime.split(" ")(0)
            // hour = datetime.split(" ")(1).substring(0, 2)
            val decrypted_data_string = messageJsonObject.getString("decrypted_data")
            if (!"".equals(decrypted_data_string)) {
              val decrypted_data = JSON.parseObject(decrypted_data_string)
              guid = decrypted_data.getString("guid").trim
              helperversion = decrypted_data.getString("helperversion")
            }
          } catch {
            case e: Exception => {
              println(e)
            }
          }
          (date, helperversion, guid)
        })
    
        val resultStream = streamData.keyBy(x => {
          x._1 + x._2
        }).timeWindow(Time.days(1))
          .trigger(ContinuousProcessingTimeTrigger.of(Time.seconds(1)))
          .applyWith(("", new IntCounter(), Set.empty[Int], 0L, 0L))(
            foldFunction = {
              case ((_, cou, set, _, 0), item) => {
                val date = item._1
                val helperversion = item._2
                val guid = item._3
                cou.add(1)
                (date + "_" + helperversion, cou, set + guid.hashCode, 0L, 0L)
              }
            }
            , windowFunction = {
              case (key, window, result) => {
                result.map {
                  case (leixing, cou, set, _, _) => {
                    (leixing, cou.getLocalValue, set.size, window.getStart, window.getEnd)
                  }
                }
              }
            }
          ).keyBy(0)
          .flatMapWithState[(String, Int, Int, Long, Long),(Int, Int)]{
          case ((key, numpv, numuv, begin, end), curr) =>
    
            curr match {
              case Some(numCurr) if numCurr == (numuv, numpv) =>
                (Seq.empty, Some((numuv, numpv))) //如果之前已经有相同的数据,则返回空结果
              case _ =>
                (Seq((key, numpv, numuv, begin, end)), Some((numuv, numpv)))
            }
        }
    
        // 最终结果
        val resultedStream = resultStream.map(x => {
          val keys = x._1.split("_")
          val date = keys(0)
          val helperversion = keys(1)
          (date, helperversion, x._2, x._3)
        })
    
        val resultPath = "D:\\space\\IJ\\bigdata\\src\\main\\scala\\com\\ddxygq\\bigdata\\flink\\streaming\\pvuv\\result"
        resultedStream.writeAsText(resultPath, FileSystem.WriteMode.OVERWRITE)
        env.execute("PvUvCount")
    
      }
    }
    

    改进

    其实这里还是需要set保存uv,难免对内存有压力,如果我们的集群不大,为了节省开支,我们可以使用外部媒介,如hbase的rowkey唯一性、redis的set数据结构,都是可以达到实时、快速去重的目的。

    参考资料

    https://flink.sojb.cn/dev/event_time.htm

    lhttp://wuchong.me/blog/2016/05/20/flink-internals-streams-and-operations-on-streams

    https://segmentfault.com/a/1190000006235690

        

    猜你喜欢

     SparkStreaming实时计算pv和uv

     Flink状态管理与状态一致性(长文)

     Flink实时计算topN热榜

     数仓建模分层理论

     数仓建模方法论

     学习建议,大数据组件重点学习这几个

    展开全文
  • 本文从数据传输和数据可靠性的角度出发,对比测试了Storm与Flink在流处理上的性能,并对测试结果进行分析,给出在使用Flink时提高...对于实时计算来说,Storm与Flink的底层计算引擎是基于流的,本质上是一条一条的数...

    本文从数据传输和数据可靠性的角度出发,对比测试了Storm与Flink在流处理上的性能,并对测试结果进行分析,给出在使用Flink时提高性能的建议。

    Apache Storm、Apache Spark和Apache Flink都是开源社区中非常活跃的分布式计算平台,在很多公司可能同时使用着其中两种甚至三种。对于实时计算来说,Storm与Flink的底层计算引擎是基于流的,本质上是一条一条的数据进行处理,且处理的模式是流水线模式,即所有的处理进程同时存在,数据在这些进程之间流动处理。而Spark是基于批量数据的处理,即一小批一小批的数据进行处理,且处理的逻辑在一批数据准备好之后才会进行计算。在本文中,我们把同样基于流处理的Storm和Flink拿来做对比测试分析。

    在我们做测试之前,调研了一些已有的大数据平台性能测试报告,比如,雅虎的Streaming-benchmarks,或者Intel的HiBench等等。除此之外,还有很多的论文也从不同的角度对分布式计算平台进行了测试。虽然这些测试case各有不同的侧重点,但他们都用到了同样的两个指标,即吞吐和延迟。吞吐表示单位时间内所能处理的数据量,是可以通过增大并发来提高的。延迟代表处理一条数据所需要的时间,与吞吐量成反比关系。

    在我们设计计算逻辑时,首先考虑一下流处理的计算模型。上图是一个简单的流计算模型,在Source中将数据取出,发往下游Task,并在Task中进行处理,最后输出。对于这样的一个计算模型,延迟时间由三部分组成:数据传输时间、Task计算时间和数据排队时间。我们假设资源足够,数据不用排队。则延迟时间就只由数据传输时间和Task计算时间组成。而在Task中处理所需要的时间与用户的逻辑息息相关,所以对于一个计算平台来说,数据传输的时间才更能反映这个计算平台的能力。因此,我们在设计测试Case时,为了更好的体现出数据传输的能力,Task中没有设计任何计算逻辑。

    在确定数据源时,我们主要考虑是在进程中直接生成数据,这种方法在很多之前的测试标准中也同样有使用。这样做是因为数据的产生不会受到外界数据源系统的性能限制。但由于在我们公司内部大部分的实时计算数据都来源于kafka,所以我们增加了从kafka中读取数据的测试。

    对于数据传输方式,可以分为两种:进程间的数据传输和进程内的数据传输。

    进程间的数据传输是指这条数据会经过序列化、网络传输和反序列化三个步骤。在Flink中,2个处理逻辑分布在不同的TaskManager上,这两个处理逻辑之间的数据传输就可以叫做进程间的数据传输。Flink网络传输是采用的Netty技术。在Storm中,进程间的数据传输是worker之间的数据传输。早版本的storm网络传输使用的ZeroMQ,现在也改成了Netty。

    进程内的数据传输是指两个处理逻辑在同一个进程中。在Flink中,这两个处理逻辑被Chain在了一起,在一个线程中通过方法调用传参的形式进程数据传输。在Storm中,两个处理逻辑变成了两个线程,通过一个共享的队列进行数据传输。

     

    Storm和Flink都有各自的可靠性机制。在Storm中,使用ACK机制来保证数据的可靠性。而在Flink中是通过checkpoint机制来保证的,这是来源于chandy-lamport算法。

     

    事实上exactly-once可靠性的保证跟处理的逻辑和结果输出的设计有关。比如结果要输出到kafka中,而输出到kafka的数据无法回滚,这就无法保证exactly-once。我们在测试的时候选用的at-least-once语义的可靠性和不保证可靠性两种策略进行测试。

    上图是我们测试的环境和各个平台的版本。

    上图展示的是Flink在自产数据的情况下,不同的传输方式和可靠性的吞吐量:在进程内+不可靠、进程内+可靠、进程间+不可靠、进程间+可靠。可以看到进程内的数据传输是进程间的数据传输的3.8倍。是否开启checkpoint机制对Flink的吞吐影响并不大。因此我们在使用Flink时,进来使用进程内的传输,也就是尽可能的让算子可以Chain起来。

    那么我们来看一下为什么Chain起来的性能好这么多,要如何在写Flink代码的过程中让Flink的算子Chain起来使用进程间的数据传输。

    大家知道我们在Flink代码时一定会创建一个env,调用env的disableOperatorChainning()方法会使得所有的算子都无法chain起来。我们一般是在debug的时候回调用这个方法,方便调试问题。

    如果允许Chain的情况下,上图中Source和mapFunction就会Chain起来,放在一个Task中计算。反之,如果不允许Chain,则会放到两个Task中。

    对于没有Chain起来的两个算子,他们被放到了不同的两个Task中,那么他们之间的数据传输是这样的:SourceFunction取到数据序列化后放入内存,然后通过网络传输给MapFunction所在的进程,该进程将数据方序列化后使用。

    对于Chain起来的两个算子,他们被放到同一个Task中,那么这两个算子之间的数据传输则是:SourceFunction取到数据后,进行一次深拷贝,然后MapFunction把深拷贝出来的这个对象作为输入数据。

    虽然Flink在序列化上做了很多优化,跟不用序列化和不用网络传输的进程内数据传输对比,性能还是差很多。所以我们尽可能的把算子Chain起来。

    不是任何两个算子都可以Chain起来的,要把算子Chain起来有很多条件:第一,下游算子只能接受一种上游数据流,比如Map接受的流不能是一条union后的流;其次上下游的并发数一定要一样;第三,算子要使用同一个资源Group,默认是一致的,都是default;第四,就是之前说的env中不能调用disableOperatorChainning()方法,最后,上游发送数据的方法是Forward的,比如,开发时没有调用rebalance()方法,没有keyby(),没有boardcast等。

    对比一下自产数据时,使用进程内通信,且不保证数据可靠性的情况下,Flink与Storm的吞吐。在这种情况下,Flink的性能是Storm的15倍。Flink吞吐能达到2060万条/s。不仅如此,如果在开发时调用了env.getConfig().enableObjectReuse()方法,Flink的但并发吞吐能达到4090万条/s。

    当调用了enableObjectReuse方法后,Flink会把中间深拷贝的步骤都省略掉,SourceFunction产生的数据直接作为MapFunction的输入。但需要特别注意的是,这个方法不能随便调用,必须要确保下游Function只有一种,或者下游的Function均不会改变对象内部的值。否则可能会有线程安全的问题。

    当对比在不同可靠性策略的情况下,Flink与Storm的表现时,我们发现,保证可靠性对Flink的影响非常小,但对Storm的影响非常大。总的来说,在保证可靠的情况下,Flink单并发的吞吐是Storm的15倍,而不保证可靠的情况下,Flink的性能是Storm的66倍。会产生这样的结果,主要是因为Flink与Storm保证数据可靠性的机制不同。

    而Storm的ACK机制为了保证数据的可靠性,开销更大。

    左边的图展示的是Storm的Ack机制。Spout每发送一条数据到Bolt,就会产生一条ack的信息给acker,当Bolt处理完这条数据后也会发送ack信息给acker。当acker收到这条数据的所有ack信息时,会回复Spout一条ack信息。也就是说,对于一个只有两级(spout+bolt)的拓扑来说,每发送一条数据,就会传输3条ack信息。这3条ack信息则是为了保证可靠性所需要的开销。

    右边的图展示的是Flink的Checkpoint机制。Flink中Checkpoint信息的发起者是JobManager。它不像Storm中那样,每条信息都会有ack信息的开销,而且按时间来计算花销。用户可以设置做checkpoint的频率,比如10秒钟做一次checkpoint。每做一次checkpoint,花销只有从Source发往map的1条checkpoint信息(JobManager发出来的checkpoint信息走的是控制流,与数据流无关)。与storm相比,Flink的可靠性机制开销要低得多。这也就是为什么保证可靠性对Flink的性能影响较小,而storm的影响确很大的原因。

    最后一组自产数据的测试结果对比是Flink与Storm在进程间的数据传输的对比,可以看到进程间数据传输的情况下,Flink但并发吞吐是Storm的4.7倍。保证可靠性的情况下,是Storm的14倍。

    上图展示的是消费kafka中数据时,Storm与Flink的但并发吞吐情况。因为消费的是kafka中的数据,所以吞吐量肯定会收到kafka的影响。我们发现性能的瓶颈是在SourceFunction上,于是增加了topic的partition数和SourceFunction取数据线程的并发数,但是MapFunction的并发数仍然是1.在这种情况下,我们发现flink的瓶颈转移到上游往下游发数据的地方。而Storm的瓶颈确是在下游收数据反序列化的地方。

    之前的性能分析使我们基于数据传输和数据可靠性的角度出发,单纯的对Flink与Storm计算平台本身进行了性能分析。但实际使用时,task是肯定有计算逻辑的,这就势必更多的涉及到CPU,内存等资源问题。我们将来打算做一个智能分析平台,对用户的作业进行性能分析。通过收集到的指标信息,分析出作业的瓶颈在哪,并给出优化建议。

    展开全文
  • 注意:以下文章转自:伍 翀(WuChong),小编...本文会一步步地带领你实现一个更复杂的 Flink 应用程序:实时热门商品。在开始本文前我们建议你先实践一遍上篇文章,因为本文会沿用上文的my-flink-project项目框架。 通
  • 身为大数据工程师,你还在苦学Spark、Hadoop、Storm,却还没搞过Flink?醒醒吧!在过去的2020双11,阿里在Flink实时计算技术的驱动下全程保持了“如丝般顺滑”,基于F...
  • 为了保证应用在大促期间不出问题,需要进行实时计算程序进行压测。由于统计的数据分为两类(流量数据、订单数据),对这两类数据进行不同方式的压测。 二、压测准备 (1)压测时间选择:一般...
  • TiDB+FLINK 实时计算

    2021-03-19 18:23:30
    } //防止flink接收""报错 }); rowdata.add(map); return rowdata; // rabbitTemplate.convertAndSend(tableEventType.toUpperCase(),JSON.toJSONString(map)); } private Map setMysqlTypes(List columns,String[] ...
  • 对于一个实时数据产品人员、或者开发人员来说,...这就需要一套实时数据对数方案,本文主要从背景、实时数据计算方案、对数方案、总结四方面来介绍,说服老板或者让其他人相信自己的数据是准确的、无误的。 一、背...
  • Flink实时计算topN热榜

    2021-03-18 23:35:21
    TopN的常见应用场景,最热商品购买量,最高人气作者的阅读量等等。 1. 用到的知识点 ...通过用户访问日志,计算最近一段时间平台最活跃的几位用户topN。 创建kafka生产者,发送测试数据到kafka; 消费kafka数据,
  • 这种机制保证了实时程序运行时,即使突然遇到异常也能够进行自我恢复。Checkpoint 对于用户层面,是透明的,用户会感觉程序一直在运行。 Savepoint 你可以把它当做在某个时间点程序状态全局镜像,以后程序在进行升级...
  • 每隔5分钟输出最近一小时内点击量最多的前N个商品 这是一个很常见的需求,其实跟实时的pv,uv差不多,可能会比pv,uv复杂一点,由于Flink窗口功能的强大,也让这个需求变的相对简单了,当然用Flink SQL也可以实现. ...
  • 这一章从实际的需求TopN来学习Flink中的窗口知识。在上一章代码中使用了timeWindow,使得我们可以操作Flink流中的一个时间段内的数据,这就引出了Flink中的"窗...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,534
精华内容 1,013
关键字:

flink实时计算