2019-01-09 17:02:40 weixin_44233163 阅读数 60
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1715 人正在学习 去看看 李飞

微软的ASG (应用与服务集团)包含Bing,、Office,、Skype。每天产生多达5 PB以上数据,如何构建一个高扩展性的data audit服务来保证这样量级的数据完整性和实时性非常具有挑战性。本文将介绍微软ASG大数据团队如何利用Kafka、Spark以及Elasticsearch来解决这个问题。
在这里插入图片描述

案例简介

本案例介绍了微软大数据平台团队设计和部署的基于开源技术(Kafka、Spark、ElasticsSearch、Kibana)的大数据质量监控平台,这个平台具有实时、高可用、可扩展、高度可信的特性,成为微软Bing、Office365、Skype等年收入270+亿美元的业务在监控数据质量方面的可靠技术保障。

同时,基于业务需要,我们在设计和实现中达成下面一系列的目标:

监控流式数据的完整性与时延;
需要监控的数据管道(pipeline)具有多个数据生产者、多处理阶段、多数据消费者的特性;
数据质量的监控需要近实时(near real time);
数据质量发生问题的时候,需要提供相应的诊断信息来帮助工程师迅速解决问题;
监控平台的服务本身需要超级稳定和高可用, 大于99.9%在线时间;
监控与审计本身是高度可信;
平台架构可以水平扩展 (Scale out)。

背景及问题引入

为了服务微软的Bing、Office 365以及Skype业务,我们的大数据平台需要处理每天高达十几PB级别的海量大数据,所有的数据分析、报表、洞见以及A/B测试都依赖于高质量的数据,如果数据质量不高的话,依赖数据做决策的业务都会受到严重影响。

与此同时,微软业务对于实时数据处理的需求也日益增加,以前监控批处理数据(batch data)的很多解决方案已经不再适用于实时的流式数据的质量监控。

在另外一个层面,基于历史原因,各个业务集团往往使用不同的技术、工具来做数据处理,怎么整合这样异构的技术、工具以及在此之上的数据质量监控也是一个急需解决的问题。

图1是我们数据处理平台的一个概念性架构。从数据生产者这端,我们通过在客户端以及服务端使用通用的SDK,按照通用的schema来产生数据,数据通过分布在全世界的数据收集服务(collectors)来分发到相应的Kafka, 然后通过pub/sub模式由各种各样的计算以及存储框架来订阅。

这样各种团队就可以选择他们最熟悉或者一直以来使用的工具来做处理。例如,从实时处理的角度,各个业务团队可以选用比如Spark或者微软的USQL streaming处理框架,以及其他第三方的工具来做一些特定场景的分析,比如日志分析的Splunk、交互式分析的Interana等。在批处理框架上,用户可以选用开源社区的Hadoop,、Spark或者微软的Cosmos等。
在这里插入图片描述

图1: 整合各个业务集团的异构数据系统的架构
在这里插入图片描述

图2:快速增长的实时数据

如图2所示,我们在迁移大数据到图1架构的过程中,也看到实时流式数据的快速增长。每天峰值消息高达一万亿个以上,每秒处理一百三十万个消息, 每天处理3.5PB流式数据。

数据监控的场景以及工作原理

3.1数据监控场景

基于业务需求,我们总结概括了需要被监控的数据处理管道特性(如图3)

多数据生产者(multiple data producers),数据来自客户端和服务端;

多个数据消费者(multiple data consumers),这里特指各种数据处理框架;

多数据监控阶段(multiple stages),从数据产生到数据处理,数据往往流经多个数据管道的组件,我们需要通过监控确保每个阶段数据都不会发生丢失、高时延、以及异常。
在这里插入图片描述
图3: 多数据生产者、多阶段、多数据消费者的数据管道

3.2工作原理

基于图3的数据管道,我们把问题具体化为如何确保基于Kafka的数据管道上下游的数据完整性、实时性、数据异常的监测。图4是一个抽象化的监控架构以及工作原理。

蓝色组件是数据管道里数据流经的各个处理阶段;绿色组件是本文中实时数据质量监控的核心服务Audit Trail。在数据流经各个组件的同时,相应的审计(audit)数据也会同时发到Audit Trail, 这个审计数据可以看作是一种元数据(meta data),它包含关于数据流的信息,例如该消息是在哪个数据中心、哪台机器产生;该消息包含几条记录、大小、时间戳等。Audit Trail汇总了各个数据处理组件发来的元数据后,就可以实时做各种数据质量的评估,比如数据在此时刻的完整性如何、实时性如何、有无异常。
在这里插入图片描述
图4:数据流与监控流,监控流实时汇总到Audit Trail

基于图5的审计元数据,一旦发生数据质量问题,工程师可以快速定位是哪个数据中心的哪台服务器在什么时间段发生了问题,然后快速采取相应行动来解决或缓解问题,并把对下游数据处理的影响降到最低。
在这里插入图片描述
图5: 审计元数据的结构

可被监控的数据质量问题可以分为如下几类:

数据时延超出规定的SLA (service level agreement)

工程师可以通过如图6所示的时延状态图快速了解在数据质量时延这个维度是否正常,这对于对实时性要求比较严格的数据产品及应用非常重要,如果数据延迟到来,很多时候就失去了意义。

需要注意的是,图表在这里起到的只是辅助作用,在真正的生产环境中是通过系统API调用来定期检查SLA的符合情况,一旦超出时延阈值,会通过电话、短信等手段通知值班的工程师来实时解决问题。
在这里插入图片描述
图6:简单时延柱状图

数据在移动中发生丢失导致完整性不满足SLA (service level agreement)

工程师可以通过图7中所示简单图表来了解数据完整性的状态,图7所示包含两个数据处理阶段:一个数据生产者和两个数据消费者的应用案例。所以图表中实际上是三条线,绿色是生产者的实时数据量,蓝色和紫色线是两个数据消费者处理的数据量。如果在理想情况下,数据完整性没有问题,这三条线是完全重合。本例中在最后一个点出现了分叉,代表数据完整性出现问题,需要工程师进行干预。
在这里插入图片描述
图7:简单完整性图表

数据本身发生异常-通过异常检测来实时监控

数据本身发生异常,我们由相应的基于统计元数据的异常检测(如图8)来做实时监控。异常检测是一个在工业界非常普遍的问题和挑战,几乎每个互联网公司都会有做异常检测的服务或平台,但是做好很不容易,这是一个可以单独写一篇文章的大题目,这里只是单辟一个章节做简单的算法介绍。

在这里插入图片描述

图8:基于审计数据的异常检测

本例是通过对于数据量的异常检测来发现上游写log问题,或者其他数据生产的逻辑问题。

3.3异常检测

异常检测算法1
在这里插入图片描述
图 9 Holt-Winters算法

我们采用了Holt-Winters算法(图9)来训练模型和做预测,并在此之上做了很多改进来增加算法的强健性和容错能力。

强健性上的改进包括:

使用Median Absolute Deviation (MAD) 得到更好的估值;
处理数据丢点和噪声 (例如数据平滑)。

功能上的改进包括:

自动获取趋势和周期信息;
允许用户人工标记和反馈来更好的处理趋势变化。

通过比较预测值和实际值,我们采用GLR (Generalized Likelihood Ratio) 来发现异常点。在这上面我们也做了相应的改进,包括:

Floating Threshold GLR, 基于新的输入数据动态调整模型;
对于噪声比较大的数据做去除异常点。

异常检测算法2

这是一个基于Exchangeability Martingale的在线时间序列的异常检测算法,其核心就是假设数据的分布是稳定的。如果新的数据点的加入导致数据的分布(distribution)发生比较大的变化,我们就认为异常发生了。所以基于历史数据,我们需要定义一个新值异常公式(New value strangeness)。下面是这些公式的构成,对数学不感兴趣的读者可以略去。

在某个时刻t, 我们收到一个新的数据点,对于历史每个数据i:

s[i] = strangeness function of (value[i], history)
Let p[t] = (#{i: s[i] > s[t]}+ r*#{i: s[i]==s[t]})/N, where r is uniform in (0,1)
Uniform r makes sure p is uniform
Exchangeability Martingale: Mt=i=1tϵpiϵ-1
EMtp1,p2,…pt-1=Mt-1
Integrate ϵpiϵ-1 over [0,1] and pi is uniform
报警触发门槛通过Doob’s maximal inequality控制
Prob (∃ t :Mt>λ)<1λ
对于异常点,Martingale的值就会大于门槛值。

异常检测算法3

这是一个简单而非常有效的基于历史数据的指数平滑算法。

它首先基于历史数据生成动态上下界:

Threshold (width) = min(max(M1Mean, M2Standard Deviation), M3*Mean) (M1
Alert: |Value – predicated value| > Threshold
预测值 = S1+12S2+14S3+18S4+116S51+12+14+18+116

优点在于处理周期性数据的异常检测很好,并且允许用户反馈和标记来调整动态上下界。

系统设计概述

基于业务场景的需要,我们在设计和实现中需要达成一系列的目标以及处理相应的挑战:

监控流式数据的完整性与时延;
需要监控的数据管道(pipeline)具有多个数据生产者、多处理阶段、多数据消费者的特性;
数据质量的监控需要近实时(near real time);
数据发生问题的时候,提供相应的诊断信息来帮助工程师迅速解决问题;
监控平台的服务本身需要超级稳定和高可用, 99.9%以上在线时间;
监控与审计本身是高度可信;
平台架构可以水平扩展 (Scale out)。

4.1高可用可扩展的架构

如图10所示,审计元数据通过前端服务(front end web service)到达Kafka, 我们利用Kafka来实现高可用的临时存储(transient storage), 这样,我们的数据生产者和消费者在发送审计数据的同时,就不会发生阻塞进而影响更重要的数据流。

通过Spark streaming的应用,把审计数据按照时间窗口聚合,同时有相应的逻辑处理去重,晚到以及非顺序到来的数据,同时做各种容错处理保证高可用。

ElasticsSearch作为存储聚合的审计数据,通过Kibana做报表展示,进而通过Data Analysis service对外提供API来使得用户获取各种数据质量信息。

Data Analysis Service作为最终的API端,提供各种数据完整性、实时性、异常的信息。

上述组件,每个都设计成可以独立水平扩展(Scale out), 并且在设计上保证高容错已实现高可用性。
在这里插入图片描述
图10:Audit Trail数据处理架构

4.2异地双活的可靠性保障

通过双数据中心Active-Active灾备(Disaster recovery)如图11所示,来进一步保证高可用高可靠的服务。整体架构保证数据流同时通过两个同构的审计处理管道进行处理,即使一个数据中心因为各种原因下线,整体服务还是处于可用状态,进而保证全天候的数据质量审计与监控。
在这里插入图片描述
图11:双数据中心Active-Active Disaster Recovery

4.3高度可信的审计与监控服务

对于任何监控服务来说,经常被质疑的就是是否监控服务本身的结果是准确可信的。为了保证这一点,我们通过两种方式来保证服务的可信度:

用来审计自身(Audit for audit)(图12);

Synthetic probe。
在这里插入图片描述
图12:审计自身

在基于Kafka/Spark/ES的管道之外,我们还有一套独立的经由ES的审计元数据的处理管道,通过比较上述两个管道的结果,我们就能保证审计数据的可靠性。

另外,基于synthetic probe的方式,我们每分钟会发送一组synthetic数据进入前端服务(front end web service), 然后试图从Data Analysis web service 读出,通过这种方式进一步保障数据的可靠性。

4.4辅助数据质量问题的诊断

当数据质量发生问题,Audit Trail提供了原始的审计元数据来帮助工程师进一步做问题的诊断。工程师可以使用这些元数据和他们自己的trace来进一步JOIN, 来提供一种交互式的诊断,如图13。
在这里插入图片描述
图13:把Trace和审计元数据做JOIN, 可视化的交互诊断视图

效果评估与总结

通过上述系统架构的设计与部署,我们实现了一系列支持公司Bing,、Office,、Skype业务发展的数据质量监控目标:

监控流式数据的完整性与时延;
需要监控的数据管道(pipeline)具有多个数据生产者、多处理阶段、多数据消费者的特性;
数据质量的监控需要近实时(near real time);
数据发生问题的时候,需要提供相应的诊断信息来帮助工程师迅速解决问题;
监控平台的服务本身需要超级稳定和高可用, 99.9%在线时间
监控与审计本身是高度可信;
平台架构可以水平扩展 (Scale out)。

同时,我们准备开源这个平台服务,因为我们相信这个服务本身是一个足够通用化的解决方案,可以应用于很多公司的数据质量监控场景。

结语

为了帮助大家让学习变得轻松、高效,给大家免费分享一大批资料,帮助大家在成为大数据工程师,乃至架构师的路上披荆斩棘。在这里给大家推荐一个大数据学习交流圈:658558542 欢迎大家进群交流讨论,学习交流,共同进步。

当真正开始学习的时候难免不知道从哪入手,导致效率低下影响继续学习的信心。

但最重要的是不知道哪些技术需要重点掌握,学习时频繁踩坑,最终浪费大量时间,所以有有效资源还是很有必要的。

最后祝福所有遇到瓶疾且不知道怎么办的大数据程序员们,祝福大家在往后的工作与面试中一切顺利。

2018-07-30 13:39:55 qq_36807862 阅读数 1105
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1715 人正在学习 去看看 李飞

Kafka总结(一):Kafka概述

Kafka总结(二):Kafka核心组件

Kafka总结(三):Kafka核心流程分析

Kafka总结(四):Kafka命令操作

Kafka总结(五):API编程详解

Kafka总结(六):Kafka Stream详解

Kafka总结(七):数据采集应用

Kafka总结(八):KafKa与ELK整合应用

Kafka总结(九):KafKa 与Spark整合应用

当前,Flume,Kafka和Spark已经成为一个比较成熟的构建实时日志采集分析与计算平台组件,例如,通过收集相应数据统计某个应用或者网站的PV/UV信息,统计流量以及用户分布,对访问日志进行实时或者离线分析,以追踪用户行为或者进行系统风险监控等。通常在数据采集的时候会选择将Kafka作为数据采集队列,将采集的数据首先存储到Kafka中,然后用Spark对kafka中读取的数据进行处理。

1.Spark简介

Spark是一个快速,通用的计算引擎,是Apache一个顶级项目。

Spark用Scala语言开发,提供了Java、Scala、Python、R语言相关的API,运行在JVM之上,因此在运行Spark之前需要保证已经安装JDK环境。

Spark可以很方便的与大数据处理相关的框架(如Flume、Kafka、HDFS、Hbase等)、工具进行整合应用;

通常我们说的Spark,其实是指Spark核心或者Spark生态圈的统称,包括Spark的任务调度、内存管理、容错机制等基本功能。

Spark包括以下组件:

  1. Spark SQL:
  2. Spark Streaming:
  3. Mlib:
  4. GraphX:

2.Spark

所谓的智能投顾,简而言之就是通过机器学习相关的算法基于大数据进行分析处理为用户投资决策提出参考指标甚至自动帮助用户进行投资决策。

例如:在证券行业,当前比较热门的“智能选股”就属于“智能投顾”范畴的一类典型应用,金融机构或者第三方根据股票行情、技术指标、财务指标、基本面指标等多种维度和策略进行分析计算,为股民提供各类选股的方案;

应用描述

实时统计有单时间内用户搜索的关键词,并将搜索次数最高的前10个词输出

重点是使用Spark Streaming 与Kafka集成的应用,因此并不关注业务本身的完整性。

Spark官方网站关于Spark Streaming与Kafka集成给出了两个依赖版本,一个是基于Kafka0.8之后的版本:spark-streaming-kafka-0-8,一个是基于kafka 0.10之后的版本:spark-streaming-kafka-0-10;

 

 

小结

这一章中主要是对流式处理程序进行了讲解,在实际应用中,需要根据实际的业务场景,编写出满足业务场景的流式处理程序;

2018-01-26 18:11:54 weixin_40691089 阅读数 1311
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1715 人正在学习 去看看 李飞

1)启动一个console consumer消费topic的数据,判断消费是否正常。
2)如果console consumer消费正常,就检查sparkstream程序。
2.1)sparkstreming 是基于时间片消费数据的。 看看时间片是否过小,最小的时间间隔,参考在0.5~2秒钟之间。可以适当放宽时间片的大小。
2.2)spark streaming虽然是按照时间片消费数据的,但是上一个批次的数据没有处理完,下一个批次也会继续处理,如果有很多批次的数据同时在处理。会拖垮集群,服务器的资源会使用频繁,导致很慢。

2019-01-08 09:46:36 xcg132566 阅读数 263
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1715 人正在学习 去看看 李飞

kafka和spark总结

本文涉及到的技术版本号:

  • scala 2.11.8
  • kafka1.1.0
  • spark2.3.1

kafka简介

kafka是一个分布式流平台,流媒体平台有三个功能

  • 发布和订阅记录流
  • 以容错的持久化的方式存储记录流
  • 发生数据时对流进行处理

kafka通常用于两大类应用

  • 构件在系统或应用程序之间可靠获取数据的实时数据管道
  • 构件转换或响应数据流的实时流应用程序

kafka的几个概念

  • kafka运行在集群上,或一个或多个能跨越数据中心的服务器上
  • kafka集群上存储流记录的称为topic
  • kafka的topic里,每一条记录包括一个key、一个value和一个时间戳timestamp

kafka有四个核心API

  • Producer API

    生产者api允许应用程序发布一个记录流到一个或多个kafka的topic

  • Consumer API

    消费者api允许应用程序订阅一个或多个topic并且接受处理传送给消费者的数据流

  • Streams API

    流api允许应用程序作为一个流处理器,从一个或多个输入topic中消费输入流,并生产一个输出流到一个或多个输出topic中

  • Connector API

    连接器api允许构建和运行中的kafka的topic连接到现有的应用程序或数据系统中重用生产者或消费者。例如关系数据库的连接器可以捕获对表的每一个更改操作

kafka中的客户端和服务端之间是通过简单、高性能的语言无关的TCP协议完成的,该协议已经版本化并且高版本向低版本向后兼容。

topics

topic为kafka为记录流提供的核心抽象,类似于数据通道,并且topic是发布记录和订阅的核心。

kafka的topic是多用户的,一个topic可以有0个、1个或多个消费者订阅记录

对于每一个topic,kafka集群都维护了一个如下所示的分区记录:
topic

其中每一个分区都是有序的不可变的记录序列,并且新数据是不断的追加到结构化的记录中。分区中的记录每个都分配了一个offset作为ID,它唯一标识分区中的每个记录。

kafka集群默认是保存所有记录,无论是否被消费过,但是可以通过配置保留时间策略。例如如果设置数据保留策略为两天,则超过两天的数据将被丢弃释放空间。kafka的性能受数据大小影响不大,因此长时间的存储数据并不是太大的问题。

其中,kafka 的消费者唯一对topic中的每一个分区都可以设置偏移量offset,标识当前消费者从哪个分区的哪一条数据开始消费,消费者通过对偏移量的设置可以灵活的对topic进行消费。如下图
offset

消费者控制自己的偏移量就意味着kafka的消费者是轻量的,消费者之间互不影响。

topic记录中的分区有多种用途,首先它允许topic扩展到超出单台服务器适合的大小。每个分区都需要有适合托管分区的服务器,而topic可以有很多分区,因此一个topic可以处理任意数量的数据。另外这些分区作为并行的单位,效率很高,这也是相当重要的一点。

分配

记录分区分布在kafka集群服务器上,每个服务器共同处理数据并请求分区的共享。每个分区都可以在可用服务器的数量上进行复制,以此实现容错。

每一个分区都会有一个服务器作为leader,0个或多个服务器作为followers。leader处理分区的所有读取和写入请求,而follower被动的复制leader。如果leader出错,则其中一个follower会自动称为新的leader。集群中的每个服务器都充当某分区的leader和其他分区的follower,因此能在集群中达到负载均衡。

生产者

生产者将数据发布到所选择的分区,生产者在发布数据是需要选择将数据发送到哪个分区,分配分区可以通过循环方式完成也可以根据语义分区的功能实现。

消费者

消费者使用消费者组(consumer group)标记自己。发布到topic的每个记录会被发送到每个消费者组中的一个消费者实例。所以当一个消费者组中有多个消费者实例,则记录将在该消费者组中的所有消费者之间进行有效的负载均衡。

topic接受的每一条记录都会被广播发送到每个消费者组中。示意图如下:

消费者
上图有两个机器的kafka集群,某topic有四个分区p0-p3,有两个消费者组A/B订阅该topic,消费者组A有两个消费者实例,消费者组B有四个消费者实例。

kafka中实现消费的方式是通过在消费者实例上划分分区实现,保证实例在任何时间点都是公平分配的。消费者组中的成员划分分区是由kafka协议进行动态处理。如果新实例加入该组,那新加入的实例会从改组的成员中接管一些分区。如果消费者组中的某个实例死亡,则它所划分的分区分配给该消费组的其他实例。

kafka只能提供一个分区内的记录的顺序,但是不保证多个分区的记录顺序。如果用户想保证topic中的顺序,则使用一个分区的topic即可,但这样就意味着每个消费者组中只能有一个消费者实例。

kafka提供的保证

  • 同一个生产者实例发送到特定topic的特定分区的两条数据M1和M2,并且M1发送早于M2,则M1将拥有更低的偏移量,即可以保证插入顺序。
  • 消费者可以按照记录存储的顺序消费记录
  • 对于复制因子为N的topic,最多可以容忍N-1个服务器故障,而不会丢失提交到topic中的记录

kafka常用命令

  • 启动Zookeeper server

    bin/zookeeper-server-start.sh config/zookeeper.properties &
    
  • 启动Kafka server

    nohup bin/kafka-server-start.sh config/server.properties &
    
  • 停止Kafka server

    bin/kafka-server-stop.sh
    
  • 停止Zookeeper server

    bin/zookeeper-server-stop.sh
    
  • producer

    bin/kafka-console-producer.sh --broker-list 192.168.20.133:9092 --topic realtime
    
  • consumer

    bin/kafka-console-consumer.sh --zookeeper 192.168.20.133:2181 --topic realtime --from-beginning
    
  • 查看所有topic

    bin/kafka-topics.sh --list --zookeeper localhost:2181
    
  • 创建一个topic

    bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 3 --partitions 3 --topic realtime0103
    
  • 查看topic详情

    bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic realtime0103
    
  • 删除topic

    bin/kafka-topics.sh --delete --zookeeper localhost:2181 --topic realtime0103
    

java操作kafka

引入jar包

  <dependency>
    <groupId>org.apache.kafka</groupId>
    <artifactId>kafka_2.11</artifactId>
    <version>1.1.0</version>
  </dependency>

Producer

import java.util.Properties;
import java.util.concurrent.ExecutionException;

import org.apache.kafka.clients.producer.KafkaProducer;
import org.apache.kafka.clients.producer.Producer;
import org.apache.kafka.clients.producer.ProducerRecord;

public class ProducerDemo {

  public static void main(String[] args) throws InterruptedException, ExecutionException {
    Properties props = new Properties();
    props.put("bootstrap.servers", "192.168.20.133:9092,192.168.20.134:9092,192.168.20.135:9092");
    props.put("acks", "all");
    props.put("retries", 0);
    props.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    props.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
    
    String topic = "realtime0103";

    Producer<String, String> producer = new KafkaProducer<String, String>(props);
    String value = "{'name':'1','value':1}" ; 
    
    //设定分区规则,分区为0,1,2
    int partation = KafkaProducerClient.count.get() % 3;

    ProducerRecord<String, String> record = new ProducerRecord<String, String>(topic,partation, "key1",value );
      
    producer.send(record).get();
  }
}

Customer

import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;

import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.clients.consumer.OffsetAndMetadata;
import org.apache.kafka.common.TopicPartition;


public class CustomerDemo {

  private static KafkaConsumer<String, String> consumer;
  private static String inputTopic;

  @SuppressWarnings({ "unchecked", "rawtypes" })
  public static void main(String[] args) {
    String groupId = "group1";
    inputTopic = "realtime0103";
    String brokers = "192.168.20.133:9092";

    consumer = new KafkaConsumer(createConsumerConfig(brokers, groupId));
    
    //分配topic 某分区的offset
    TopicPartition part0 = new TopicPartition(inputTopic, 0);
    TopicPartition part1 = new TopicPartition(inputTopic, 1);
    TopicPartition part2 = new TopicPartition(inputTopic, 2);
    OffsetAndMetadata offset0 = new OffsetAndMetadata(1);
    OffsetAndMetadata offset1 = new OffsetAndMetadata(2);
    OffsetAndMetadata offset2 = new OffsetAndMetadata(3);
    Map<TopicPartition,OffsetAndMetadata> offsetMap = new HashMap<>();
    offsetMap.put(part0,offset0);
    offsetMap.put(part1,offset1);
    offsetMap.put(part2,offset2);
    //提交offset信息
    consumer.commitSync(offsetMap);
    
    start();

  }

  private static Properties createConsumerConfig(String brokers, String groupId) {
        Properties props = new Properties();
        props.put("bootstrap.servers", brokers);
        props.put("group.id", groupId);
        props.put("auto.commit.enable", "false");
        props.put("auto.offset.reset", "earliest");
        props.put("key.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");
        props.put("value.deserializer", "org.apache.kafka.common.serialization.StringDeserializer");

        return props;
    }

  private static void start() {
    consumer.subscribe(Collections.singletonList(inputTopic));

        System.out.println("Reading topic:" + inputTopic);

        while (true) {
            ConsumerRecords<String, String> records = consumer.poll(1000);
            for (ConsumerRecord<String, String> record: records) {
                String ip = record.key();
                String event = record.value();
                System.out.println(event);
            }
            consumer.commitSync();
        }

  }
}

spark操作kafka

IDEA配置搭建spark scala开发环境(Windows)

  • 安装jdk8并配置环境变量
  • 安装scala2.11并配置环境变量(本文安装2.11.8)
  • 安装IDEA
  • IDEA安装SBT和Scala插件
  • File->New->Project 创建新项目,选择Scala->sbt->next

新建项目

  • 选择项目名称、位置、Java版本号、sbt版本和Scala版本,Finish

选择版本

  • 打开build.sbt,添加相关依赖

    libraryDependencies += "org.apache.spark" %% "spark-core" % "2.3.1"
    libraryDependencies += "org.apache.spark" %% "spark-streaming" % "2.3.1" % "provided"
    libraryDependencies += "org.apache.spark" %% "spark-streaming-kafka-0-10" % "2.3.1"
    libraryDependencies += "org.apache.spark" %% "spark-sql" % "2.3.1"
    
  • 刷新sbt项目,下载依赖:

刷新

  • 编写业务代码,可以使用以下的使用spark Structured Streaming连接kafka处理流部分代码
  • 设置打包规则
    • File->Project Sturcture->Artifacts 点击绿色加号设置打jar包规则

      Artifacts

    • 选择Module和Main class,JAR file from libraries选择copy to output…即不将外部jar打包到jar文件中

      Artifacts

    • 导航栏 Build->Build Artifacts ,打包成jar,将jar包上传到spark集群

  • 运行程序:
    • 以上配置打出jar包包含项目jar包和多个依赖jar包,提交spark作业时,可以使用–jars逗号隔开配置引用多个外部jar

      cd $SPARK_HOME
      ./bin/spark-submit --master spark://192.168.20.133:7077 --jars /root/interface-annotations-1.4.0.jar,/root/async-1.4.1.jar --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.3.1 --class com.xuchg.app.Application /root/spark-kafka.jar
      

使用spark Structured Streaming连接kafka处理流

import org.apache.log4j.{Level, Logger}
import org.apache.spark.SparkConf
import org.apache.spark.sql._
object Main extends App {

  //spark读取kafka示例
  Logger.getLogger("org").setLevel(Level.ERROR)
  val kafkaAddress = "192.168.20.133:9092"
  val zookeeper = "192.168.20.133:2181"
  val topic = "realtime0103"
  val topicOffset = "{\""+topic+"\":{\"0\":0,\"1\":0,\"2\":0}}"
  val sparkSession = SparkSession
    .builder()
    .config(new SparkConf()
      .setMaster("local[2]")
      .set("spark.streaming.stopGracefullyOnShutdown","true")//设置spark,关掉sparkstreaming程序,并不会立即停止,而是会把当前的批处理里面的数据处理完毕后 才会停掉,此间sparkstreaming不会再消费kafka的数据,这样以来就能保证结果不丢和重复。
      .set("spark.submit.deployMode","cluster")
      .set("spark.executor.memory","4g")//worker内存
      .set("spark.driver.memory","4g")
      .set("spark.cores.max","2")//设置最大核心数
    )
    .appName(getClass.getName)
    .getOrCreate()

  def createStreamDF(spark:SparkSession):DataFrame = {
    import spark.implicits._
    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", kafkaAddress)
      .option("zookeeper.connect", zookeeper)
      .option("subscribe", topic)
      .option("startingOffsets", topicOffset)
      .option("enable.auto.commit", "false")
      .option("failOnDataLoss", false)
      .option("includeTimestamp", true)
      .load()
    df
  }

  var df = createStreamDF(sparkSession)

  val query = df.writeStream
    .format("console")
    .start()

  query.awaitTermination()
}

监控spark和kafka

此处根据实际应用情况使用两种监控方法,解决两个不同问题

  • 解决spark启动和停止处理的动作,例如监听spark停止时处理或记录完所有计算

    //定义监听类继承SparkListener,并重写相关方法
    import org.apache.spark.scheduler.{SparkListener, SparkListenerApplicationEnd, SparkListenerApplicationStart}
    class AppListener extends SparkListener{
      override def onApplicationEnd(applicationEnd: SparkListenerApplicationEnd): Unit = {
        //监控spark停止方法,可以处理spark结束的动作
        println("application 关闭")
      }
    
      override def onApplicationStart(applicationStart: SparkListenerApplicationStart): Unit = {
        println("application 启动")
      }
    }
    
    //在主类中注册监听类
    sparkSession.sparkContext.addSparkListener(new AppListener)
    
  • 监控spark的查询,例如spark读取kafka流的偏移量offset,可以监听并记录下来,下次启动spark可以直接从该偏移量offset进行消费,不会消费相同的数据

    sparkSession.streams.addListener(new StreamingQueryListener() {
      override def onQueryStarted(queryStarted: QueryStartedEvent): Unit = {
        println("Query started: " + queryStarted.id)
      }
      override def onQueryTerminated(queryTerminated: QueryTerminatedEvent): Unit = {
        //服务出现问题而停止
        println("Query terminated: " + queryTerminated.id)
      }
      override def onQueryProgress(queryProgress: QueryProgressEvent): Unit = {
        var progress = queryProgress.progress
        var sources = progress.sources
        if(sources.length>0){
          var a = 0
          for(a <- 0 to sources.length - 1){
            var offsetStr = sources.apply(a).startOffset
            if(offsetStr!=null){
              println("检测offset是否变化 -- " + offsetStr)
            }
          }
        }
      }
    })
    

    运行结果如下:可以看到对topic的每个分区的偏移量都可以获取到
    offset

管理和停止spark程序

在spark集群主节点配置并使用spark history server可以实现对spark作业进行管理和监控

  • 配置spark history server

    • 修改$SPARK_HOME/conf/spark-defaults.conf,如果不存在则从模板复制一份

      cp $SPARK_HOME/conf/spark-defaults.conf.template $SPARK_HOME/conf/spark-defaults.conf
      vi $SPARK_HOME/conf/spark-defaults.conf
      
    • 修改配置文件如下:

      spark.eventLog.enabled           true
      spark.eventLog.dir               hdfs://192.168.20.133:9000/spark-history
      spark.eventLog.compress          true
      # 注意ip地址需要根据情况更改,9000为hdfs默认端口号,如hdfs端口号不是9000则需要更改
      
    • 创建hdfs目录

      $HADOOP_HOME/bin/hdfs dfs -mkdir /spark-history
      
    • 配置$SPARK_HOME/conf/spark-env.sh文件:

      • 如果不存在则从模板复制:

        cp $SPARK_HOME/conf/spark-env.sh.template $SPARK_HOME/conf/spark-env.sh
        
      • 编辑$SPARK_HOME/conf/spark-env.sh,结尾添加:

        export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://192.168.20.133:9000/spark-history"
        # 18080为history server的访问端口号,192.168.20.133:9000为hdfs的ip和端口,根据实际情况修改
        
    • 打开防火墙18080端口

    • 执行命令打开history server服务

      $SPARK_HOME/sbin/start-history-server.sh
      
  • 代码中sparkSession创建时添加配置:

    //设置spark,关掉sparkstreaming程序,并不会立即停止,而是会把当前的批处理里面的数据处理完毕后 才会停掉,此间sparkstreaming不会再消费kafka的数据,这样以来就能保证结果不丢和重复。
    new SparkConf().set("spark.streaming.stopGracefullyOnShutdown","true")
    
  • 使用shell关掉某一个正在运行的spark作业:

    • spark作业关闭原理
      每一个spark作业都由一个appId唯一标识,而每一个作业包含多个Executors执行器,这些Executors中包含1个或几个id为driver的驱动程序,它是执行开发程序中的 main方法的进程。如果驱动程序停止,则当前spark作业就结束了。

    • spark关闭过程

      • 获取某appId的spark作业的driver节点的ip和端口,可以通过spark history server提供的页面或提供的api进行获取。此处介绍页面获取,后面会介绍api获取

        finddriver

      • 根据获取的driver的端口号对spark作业发送停止命令,当然有时ctrl+c和在监控页面上都是可以直接停止的,但此处只提用shell停止,应用场景更广泛。

        centod7:ss -tanlp |  grep 60063|awk '{print $6}'|awk  -F, '{print $2}'|awk -F= '{print $2}'|xargs kill -15
        centos6:ss -tanlp |  grep 60063|awk '{print $6}'|awk  -F, '{print $2}'|xargs kill -15
        

        注意:centos6和centos7稍有不同,而且此处使用kill -15而不是kill -9,kill -9会直接杀死进程,可能会导致丢失数据。而kill -15是发送停止命令,不会直接杀死进程。

    • 通过以上内容可以实现在spark集群主节点部署web服务接收并远程调用执行shell语句来达到远程动态启动(可传参)和停止spark作业,大体如下:

      • 远程调用接口传参启动spark作业,此时记录下spark运行的appid

      • 通过调用spark history server提供的REST Api获取当前作业driver进程的端口号:

        http://192.168.20.133:18080/api/v1/applications/{appId}/allexecutors
        

        driver

      • 通过获取到的端口号可以向spark集群主节点发送停止命令到该端口进程即可

示例项目地址:github
kafka官网
spark官网

2019-12-18 11:35:36 chenbengang 阅读数 6
  • Spark开发工程师(含项目)

    本课程为大数据金融信贷项目实战课,着重讲解企业中常用的大数据技术理论与实战,如Hadoop、Hive、HBase、Sqoop、Flume、Kafka、Spark Streaming、Spark SQL、Spark Structured Streaming等。课程包含离线项目和实时项目,从项目业务需求、技术选型、架构设计、集群安装部署、集成开发以及项目可视化进行全方位实战讲解。

    1715 人正在学习 去看看 李飞

Spark大数据-输入源之kafka

kafka相关基础

高吞吐量的分布式发布订阅消息系统,能订阅和发布消息。
kafka原理
kafka和hadoop作用
broker:kafka集群中每个节点服务器叫broker。
topic:消息扔给某个topic,订阅相关topic即可。
partition:每个topic消息非常多,所以需要分区放在多台服务器上。
生产者:把消息发给kafka broker。
消费者:向kafka broker读取消息。
group:每个消费者只属于某个消费者分组。
消费者分组
kafka架构图:
kafka架构图

测试启动kafka:

1.kafka需要借助zookeeper服务,首先启动zookeeper服务,分布式zookeeper一般安装在三个节点即可,本文安装在192.168.1.30、192.168.1.31、192.168.1.32,分别启动,启动方式如下:

cd /opt/opensoc/zookeeper-3.4.12/bin/ 
./zkServer.sh start

2.再启动kafka服务,本文分布式kafka安装在六台机器,分别30~35,分别启动,启动方式如下(daemon 为将前台命令转化为后台运行):

cd /opt/opensoc/kafka_2.12-2.0.1/bin/
./kafka-server-start.sh -daemon ../config/server.properties

3.创建topic,数据1份备份,1个分区:

cd /opt/opensoc/kafka_2.12-2.0.1/
./bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic wordsendertest
//列出所有创建的topic,验证是否创建成功
./bin/kafka-topics.sh --list --zookeeper localhost:2181

3.创建生产者:

cd /opt/opensoc/kafka_2.12-2.0.1/
bin/kafka-console-producer.sh --broker-list 192.168.1.30:9092 --topic wordsendertest

4.创建消费者:

cd /opt/opensoc/kafka_2.12-2.0.1/
bin/kafka-console-consumer.sh --bootstrap-server 192.168.1.30:9092 --topic wordsendertest --from-beginning

或者

cd /usr/local/kafka
./bin/kafka-console-consumer.sh --zookeeper localhost:2181 --topic wordsendertest --from-beginning

5.在生产者中可以输入单词回车即可。

spark+kafka

1.下载jar包导入spark-streaming-kafka-0-10_2.11-2.2.1.jar,复制到spark的jars目录下;将kafka的libs下的所有jar复制到spark的jars下,如果是其他版本的参照官网API

cd /usr/local/kafka/libs
ls
cp ./* /usr/local/spark/jars/kafka

2.启动spark-shell时加上jar目录,导入包则不会报错。
3.编写使用kafka数据源的spark streaming程序:

  • 创建代码目录
cd /usr/local/spark/mycode
mkdir kafka
cd kafka
mkdir -p src/main/scala
cd src/main/scala
vim KafkaWordProducer.scala
  • 编写生产者程序(每条消息5个0~9的随机数字,每秒钟3条消息)
// 创建生产者
import java.util.HashMap
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.kafka.clients.producer.{KafkaProducer,ProducerConfig,ProducerRecord}
import org.apache.spark.streaming.kafka010._

object KafkaWordProducer {
  def main(args: Array[String]) {
    if (args.length < 4) {
      System.err.println("Usage: KafkaWordCountProducer <metadataBrokerList> <topic> " +
        "<messagesPerSec> <wordsPerMessage>")
      System.exit(1)
    }
    val Array(brokers, topic, messagesPerSec, wordsPerMessage) = args
    // Zookeeper connection properties
    val props = new HashMap[String, Object]()
    props.put(ProducerConfig.BOOTSTRAP_SERVERS_CONFIG, brokers)
    props.put(ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    props.put(ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG,
      "org.apache.kafka.common.serialization.StringSerializer")
    val producer = new KafkaProducer[String, String](props)
   // Send some messages
    while(true) {
      (1 to messagesPerSec.toInt).foreach { messageNum =>
        val str = (1 to wordsPerMessage.toInt).map(x => scala.util.Random.nextInt(10).toString)
          .mkString(" ")
                    print(str)
                    println()
        val message = new ProducerRecord[String, String](topic, null, str)
        producer.send(message)
      }
     Thread.sleep(1000)
    }
  }
}
// 一定要使用具体地址,不要localhost,否则会出现奇奇怪怪的问题
KafkaWordProducer.main(Array("192.168.1.30:9092","wordsender","3","5"))
  • 编写消费者程序(对上述消息进行词频统计并打印)
// 创建消费者
import org.apache.spark._
import org.apache.spark.SparkConf
import org.apache.spark.streaming._
import org.apache.spark.streaming.StreamingContext._

import org.apache.kafka.clients.consumer.ConsumerRecord
import org.apache.kafka.common.serialization.StringDeserializer

import org.apache.spark.streaming.kafka010._
import org.apache.spark.streaming.kafka010.KafkaUtils
import org.apache.spark.streaming.kafka010.LocationStrategies.PreferConsistent
import org.apache.spark.streaming.kafka010.ConsumerStrategies.Subscribe

//格式化日志的单例程序
import org.apache.spark.internal.Logging
import org.apache.log4j.{Level, Logger}
/** Utility functions for Spark Streaming examples. */
object StreamingExamples extends Logging {
  /** Set reasonable logging levels for streaming if the user has not configured log4j. */
  def setStreamingLogLevels() {
    val log4jInitialized = Logger.getRootLogger.getAllAppenders.hasMoreElements
    if (!log4jInitialized) {
      // We first log something to initialize Spark's default logging, then we override the
      // logging level.
      logInfo("Setting log level to [WARN] for streaming example." +
        " To override add a custom log4j.properties to the classpath.")
      Logger.getRootLogger.setLevel(Level.WARN)
    }
  }
}

object KafkaWordCount{
    def main(args:Array[String]){
//         设置日志格式
        StreamingExamples.setStreamingLogLevels()
//         创建ssc
        val sc=new SparkConf().setAppName("KafkaWordCount").setMaster("local[2]")
        val ssc=new StreamingContext(sc,Seconds(10))
//         设置检查点
        ssc.checkpoint("home/ziyu_bigdata/quick_learn_spark/checkpoint")
//         如果为存在hdfs,则ssc.checkpoint("root/usr/checkpoint")
        
//         val zkQuorum="localhost:2181"//zookeeper服务器地址
//         val group="1"//topic所在的group,可自定义名称val group="tesr-consumer-group"
//         val topics="wordsender"
//         val numThreads=1//每个topic的分区数
//         val topicMap=topics.split(",").map((_,numThreads.toInt)).toMap
//         val lineMap=KafkaUtils.createStream(ssc,zkQuorum,group,topicMap)
//         val lines=lineMap.map(_._2)
        
        val kafkaParams = Map[String, Object](
          "bootstrap.servers" -> "192.168.1.30:9092",
          "key.deserializer" -> classOf[StringDeserializer],
          "value.deserializer" -> classOf[StringDeserializer],
          "group.id" -> "1",
          "auto.offset.reset" -> "latest",
          "enable.auto.commit" -> (false: java.lang.Boolean)
        )
        val topics = Array("wordsender")
        val lineMap = KafkaUtils.createDirectStream[String, String](
            ssc,
            PreferConsistent,
            Subscribe[String, String](topics, kafkaParams)
        )
        val lines =lineMap.map(record => record.value)
        
        val words=lines.flatMap(_.split(" "))
        val pair=words.map(x => (x,1))
//         词频统计
        val wordCounts=pair.reduceByKeyAndWindow(_ + _,_ - _,Minutes(2),Seconds(10),2)
        wordCounts.print
        ssc.start()
        ssc.awaitTermination()
    }
}

KafkaWordCount.main(Array())

至此全部搞定kafka数据源的spark streaming的流处理。

没有更多推荐了,返回首页