php进行大数据分析

2019-02-27 10:32:20 liqiuman180688 阅读数 5489

InfoQ

内容来自极客时间《数据分析实战 45 讲

数据分析师近几年在国内互联网圈越来越火,很多开发都因为薪资和发展前景,希望转行到数据分析岗。今天,我们就来聊聊面试数据分析师的那些事。
其实,数据分析能力是每个互联网人的必备技能,哪怕你没有转行数据分析师的计划,也推荐你看看这个专题,提升你的数据能力。

数据分析的能力要求

与数据分析相关的工作有一个特质,就是对数字非常敏感,同时也要求对数据具有良好的思考能力,比如说如何用数据指导业务,如何将数据呈现在报告中。
在面试的时候,面试官通常会考察以下三个维度的能力:

1、理论知识(概率统计、概率分析等)
掌握与数据分析相关的算法是算法工程师必备的能力,如果你面试的是和算法相关的工作,那么面试官一定会问你和算法相关的问题。比如常用的数据挖掘算法都有哪些,EM 算法和 K-Means 算法的区别和相同之处有哪些等。
有些分析师的工作还需要有一定的数学基础,比如概率论与数理统计,最优化原理等。这些知识在算法优化中会用到。
除此以外,一些数据工程师的工作更偏向于前期的数据预处理,比如 ETL 工程师。这个职位考察你对数据清洗、数据集成的能力。虽然它们不是数据分析的“炼金”环节,却在数据分析过程中占了 80% 的时间。
在《数据分析实战 45 讲》里,我有详细讲到数据集成的两种架构方式:ELT 和 ETL,以及 Kettle 工具的基本操作,非常实用。
以下是我制作的数据集成的学习脑图。
在这里插入图片描述
2、具体工具(sklearn、Python、Numpy、Pandas 等)
工程师一定需要掌握工具,你通常可以从 JD 中了解一家公司采用的工具有哪些。如果你做的是和算法相关的工作,最好还是掌握一门语言,Python 语言最适合不过,还需要对 Python 的工具,比如 Numpy、Pandas、sklearn 有一定的了解。

数据 ETL 工程师还需要掌握 ETL 工具,比如 Kettle。

如果是数据可视化工作,需要掌握数据可视化工具,比如 Python 可视化,Tableau 等。

如果工作和数据采集相关,你也需要掌握数据采集工具,比如 Python 爬虫、八爪鱼。

我也有一篇专门的文章,来解析如何用八爪鱼模拟实战做数据采集,主要讲解了八爪鱼的任务建立、流程设计,还有一个实操的案例。虽然八爪鱼工具提供了各种简易采集的方式,我更建议你把它作为参考模板,可以看下别人是如何建立的,这样多做案例,你上手会更快。在这里插入图片描述
3、业务能力(数据思维)
数据分析的本质是要对业务有帮助。因此数据分析有一个很重要的知识点就是用户画像。
用户画像是企业业务中用到比较多的场景,对于数据分析来说,就是对数据进行标签化,实际上这是一种抽象能力。
用户画像建模的详细操作,主要在我的这篇文章里「用户画像:标签化就是数据的抽象能力」,从三个维度来建立用户画像:“都是谁”、“从哪来”、“要去哪”。若是用户画像建模的过程,按照数据流处理的阶段来划分,可以分为:数据层、算法层、业务层。你会发现在不同的层,都需要打上不同的标签。

7 道数据分析笔试题

作为实力检测的一部分,笔试是非常重要的一个环节。它可以直接测验你对数据分析具体理论的掌握程度和动手操作的能力。我出了几道简单的题,你不妨来看下。
问答题
1、用一种编程语言,实现 1+2+3+4+5+…+100。
这道题考察的就是语言基础,你可以用自己熟悉的语言完成这道题,比如 Python、Java、PHP、C++ 等。这里我用 Python 举例:sum = 0for number in range(1,101): sum = sum + numberprint(sum)
在这里插入图片描述
2、如何理解过拟合?

过拟合和欠拟合一样,都是数据挖掘的基本概念。过拟合指的就是数据训练得太好,在实际的测试环境中可能会产生错误,所以适当的剪枝对数据挖掘算法来说也是很重要的。

欠拟合则是指机器学习得不充分,数据样本太少,不足以让机器形成自我认知。

3、为什么说朴素贝叶斯是“朴素”的?

朴素贝叶斯是一种简单但极为强大的预测建模算法。之所以称为朴素贝叶斯,是因为它假设每个输入变量是独立的。这是一个强硬的假设,实际情况并不一定,但是这项技术对于绝大部分的复杂问题仍然非常有效。

4、SVM 最重要的思想是什么?

SVM 计算的过程就是帮我们找到超平面的过程,它有个核心的概念叫:分类间隔。SVM 的目标就是找出所有分类间隔中最大的那个值对应的超平面。在数学上,这是一个凸优化问题。同样我们根据数据是否线性可分,把 SVM 分成硬间隔 SVM、软间隔 SVM 和非线性 SVM。

5、K-Means 和 KNN 算法的区别是什么?

首先,这两个算法解决的是数据挖掘中的两类问题。K-Means 是聚类算法,KNN 是分类算法。其次,这两个算法分别是两种不同的学习方式。K-Means 是非监督学习,也就是不需要事先给出分类标签,而 KNN 是有监督学习,需要我们给出训练数据的分类标识。最后,K 值的含义不同。K-Means 中的 K 值代表 K 类。KNN 中的 K 值代表 K 个最接近的邻居。

动手题

1、我给你一组数据,如果要你做数据清洗,你会怎么做?在这里插入图片描述
实际上,这一道题中,面试官考核的是基本的数据清洗的准则,数据清洗是数据分析必不可少的重要环节。你可能看到这个数据存在 2 个问题:典韦出现了 2 次,张飞的数学成绩缺失。
针对重复行,你需要删掉其中的一行。针对数据缺失,你可以将张飞的数学成绩补足。
在「数据科学家 80% 时间都花费在了这些清洗任务上?」的文章里,结合案例,我着重讲解了数据清洗具体方法,我将数据清洗规则总结为以下 4 个关键点,统一起来叫“完全合一”,下面我来解释下。
1、整性:单条数据是否存在空值,统计的字段是否完善。
2、面性:观察某一列的全部数值,比如在 Excel 表中,我们选中一列,可以看到该列的平均值、最大值、最小值。我们可以通过常识来判断该列是否有问题,比如:数据定义、单位标识、数值本身。
3、法性:数据的类型、内容、大小的合法性。比如数据中存在非 ASCII 字符,性别存在了未知,年龄超过了 150 岁等。
4、唯性:数据是否存在重复记录,因为数据通常来自不同渠道的汇总,重复的情况是常见的。行数据、列数据都需要是唯一的,比如一个人不能重复记录多次,且一个人的体重也不能在列指标中重复记录多次。
2、豆瓣电影数据集关联规则挖掘
在数据分析领域,有一个很经典的案例,那就是“啤酒 + 尿布”的故事。它实际上体现的就是数据分析中的关联规则挖掘。不少公司会对这一算法进行不同花样的考察,但万变不离其宗。
如果让你用 Apriori 算法,分析电影数据集中的导演和演员信息,从而发现两者之间的频繁项集及关联规则,你会怎么做?
查看完整代码:https://github.com/cystanford/Apriori
以上就是有关数据分析面试笔试的一些内容,你或许可以感受到数据分析师这个岗位的特殊性。面试找工作虽说不是一朝一夕就可以完成的事情,但我希望通过专栏,能助你一臂之力。

2019-07-12 17:21:06 A_1236 阅读数 4447

在这里插入图片描述
从广义来讲,数据是反映产品和用户状态最真实的一种方式,通过数据指导运营决策、驱动业务增长。数据可分为2种情况:数据监控和数据分析。

什么是数据监控?
数据监控是及时、有效的反馈出数据异常的一种手段,通过对数据的监控去观察是否异常,进而分析数据。

什么是数据分析?
数据分析是以业务场景和业务目标为思考起点,业务决策作为终点,按照业务场景和业务目标分解为若干影响的因子和子项目,围绕子项目做基于数据现状分析,知道改善现状的方法。

数据是产品和运营人员工作中重要的一部分,运营人员常说的一句话是“数据在手,天下我有”,任何事情都要以数据来说事。作为一名运营人员,我们在做运营策略的同时,需要分析大量用户数据,去观测用户行为和用户画像。同时我们应该要关心每个数据指标的增长,防止产品出现BUG,影响到我们的最终数据指标。那么我们要怎么及时发现数据异常呢?

以现金贷为例,每天我们都有大量的用户在进行交易行为,如果是有一个小时突然没有用户的交易数据,很少人会随时发觉,往往都是用户打电话询问客服“为什么APP登录不上”“为什么我提交不了资料”等等,这时大家才会发觉APP出现故障。

所以我们要做数据监控。

对于数据监控首先我们要明确几个点

明确监控目标
监控哪些数据
监控这些数据每个背后的意义是什么
数据预警

现在让我们来讲下:

1.监控哪些方面的数据?
首先关注每个环节的基础数据指标(以现金贷的基础数据来举例)

注册用户数
填写基础资料用户数
授信获得额度
发起提现用户数和发起提现金额
成功提现用户数和成功提现金额
监控这些数据每个背后的意义是什么?
通过监控每个业务环节的基础数据,如果数据异常,可以快速定位哪个环节出了问题,进而进一步的分析。

2.数据预警
数据预警是通过各种数据维度的比对发现数据异常。

预警即通过数据采集、数据挖掘、数据分析,对已经存在的风险发出预报与警示

当数据出现问题时迅速作出反应,可第一时间通知到所有人,这样就能快速发现问题。

数据预警有5个点需要明确:

①量级指标与转化指标的确定

量级指标即每个环节的数据指标,量级指标存在的意义是可以通过我们的加工成为我们想要的数据。

转化指标即每个环节的转化,通过观测转化指标可以快速定位出哪个环节出了问题。

②每个指标正常波动范围的确定

每个指标要根据历史的数据设定一个正常浮动范围。可以从以下4种数据维度去确认正常波动范围。

同比数据(与上周同一天同时段进行对比)
环比数据(与前三天同一时段的平均值进行对比)
每个环节的转化(与前N天每个环节的转化进行对比)
每个小时增幅(与前N天每个小时增幅进行对比)

举个例子,根据数据分析得出,注册用户量环比前3天的平均值的正常浮动范围是±20%。

③触发条件的确定

数据预警的触发条件要确定,通常是低于正常浮动范围就会发出预警

④预警周期与频次的确定

预警的周期通常是一天,频次半个小时一次或一个小时一次。

⑤预警方式

一般预警方式有三种

i短信通知
ii钉钉群通知并@所有人
Iii Email通知

监控数据只能快速的发现数据出现异常的大概范围,不能精确地定位到具体的问题上。有人会说,那我把所有的环节都拆分为细小化颗粒,做好监控自动触发,

其实这个不是不行,这样的话一旦你一个细小化的环节出现异常而发出报警,关联到的其他细小化的指标都会受到影响从而也发出报警,这样的话太多指标发出报警,会造成你的干扰。

这是数据有效监控的一个流程,当有数据异常的时候,系统就会触发条件快速通知你,这时候你要做的就是看哪个环节出了问题。这个时候就要进行数据分析。

3.数据分析有4种方法
(1)单项分析

趋势洞察、渠道归因、链接标记、漏斗分析、热图分析、分群分析、A/B分析、留存分析

(2)组合分析

针对某个细分点,进行多维度组合分析。

(3)用户场景分析

时间、地点、需求。

举例:用户早上注册时间点

(4)建模分析

流失预警分析、用户激活分析、付费决策分析

举个例子,注册用户数急剧下降的原因,我们用组合分析,针对注册用户数这个细分点,进行几个维度的分析并得出原因。

APP网络是否正常
推广注册页是否有异常
获取短信验证码是否有异常
设置手势密码是否有异常
分析哪个渠道的注册用户数下降并得出原因

5.分析数据要用什么去展现呢?
有2种方法:

(1)自动图表化:可以从数据后台刷选出我们想要看的数据,并且每个版块都制成图表,便于我们快速查看。举例:把用户每个触发行为都加上埋点,按时间维度去查询我们想要的数据。
在这里插入图片描述
(2)手动图标化

最常用的有以下几种图表:

(1)柱状图
柱状图通常描述的是分类数据,用于显示一段时间内的数据变化或显示各项之间的比较情况。
在这里插入图片描述
(2)折线图
折线图可以显示随时间(根据常用比例设置)而变化的连续数据。
在这里插入图片描述
(3)饼图
饼图以二维或三维格式显示每一数值相对于总数值的大小。
在这里插入图片描述
(4)条形图
条形图显示各个项目之间的比较情况。
在这里插入图片描述
(5)散点图
散点图也叫 X-Y 图,它将所有的数据以点的形式展现在直角坐标系上,以显示变量之间的相互影响程度,点的位置由变量的数值决定。
在这里插入图片描述
(6)漏斗图
漏斗图适用于业务流程比较规范、周期长、环节多的单流程单向分析,通过漏斗各环节业务数据的比较,能够直观地发现和说明问题所在,为决策者提供一定的参考。
在这里插入图片描述
(7)面积图
面积图又叫区域图,面积图强调数量随时间而变化的程度, 它是在折线图的基础之上形成的, 它将折线图中折线与自变量坐标轴之间的区域使用颜色或者纹理填充,颜色的填充可以更好的突出趋势信息。
在这里插入图片描述
以上7种图表都是在分析数据中经常使用的,可以根据分析数据的展示去选择不同的图表。

总结
数据监控和数据分析对于运营来说是非常重要的,做好数据监控,减少产品出现bug,影响用户的体验,减少重大事故的发生。

对于监控数据笔者仍在学习阶段,以上是最近工作中的心得分享,希望能给大家带来一些思路!

本文由 @Crystal 原创

释放数据价值,人人都是数据分析师,更多精彩信息可以点击:
https://www.yonghongtech.com/webbbs/portal.php

2018-03-09 11:20:17 fastjack 阅读数 8600

问题(来自lunacyfoundme)

       我正在重建我们公司内部网,期间遇到一个与大量数据处理报告有关的前一个版本的问题。此前我曾用同步处理程序代码解决过这个问题,只是运行的很慢很慢,这导致我不得不延长最大脚本运行时间10到15分钟。有没有更好的方式来处理PHP站点里的大量数据呢?理想情况下我想在后台运行它,并且跑的越快越好。这个过程包括处理成千上万条的财务数据,我是使用Laravel来重建这个站点的。
 

最好受欢迎的回答(来自spin81):

       人们都告诉你要使用队列和诸如此类的东西,这是一个好主意,但问题好像并没有出在PHP上面。Laravel/OOP是很厉害的,但生成你所说的报告的程序似乎不应你该有问题。对于不同的看法,我想看看你得到这些数据时使用的SQL查询。正如其他人所说,如果你的表单有成千上万行那你的报告应该不会耗费10到15分钟才完成。实际上,如果你没做错事的话可能会在一分钟内就能处理成千上万条记录,完成同样的一篇报告。

1.如果你正在做成千上万条查询,看看你能不能先只做几条查询。我之前曾使用一个PHP函数把70000条查询降为十几条查询,这样它的运行时间就从几分钟降到了几分之一秒。
 
2.在你的查询上运行EXPLAIN,看看你是不是缺少什么索引。我曾经做过一个查询,通过增加了一个索引后效率提高了4个数量级,这没有任何夸张的成分。如果你正在使用MySQL,你可以学学这个,这种“黑魔法”技能会让你和你的小伙伴惊呆的。
 
3.如果你正在做SQL查询,然后获得结果,并把很多数字弄到一起,看看你能不能使用像SUM()和AVG()之类的函数调用GROUP BY语句。跟普遍的情况下,让数据库处理尽量多的计算。我能给你的一点很重要的提示是:(至少在MySQL里是这样)布尔表达式的值为0或1,如果你很有创意的话,你可以使用SUM()和它的小伙伴们做些很让人惊讶的事情。
 
4.好了,最后来一个PHP端的提示:看看你是不是把这些同样很耗费时间的数字计算了很多遍。例如,假设1000袋土豆的成本是昂贵的计算,但你并不需要把这个成本计算500次,然后才把1000袋土豆的成本存储在一个数组或其他类似的地方,所以你不必把同样的东西翻来覆去的计算。这个技术叫做记忆术,在像你这样的报告中使用往往会带来奇迹般的效果。
 
原文:http://www.reddit.com/r/PHP/comments/2pyuy0/heavy_data_processing_in_php/

译文:http://www.php100.com/html/dujia/2014/1226/8195.html


2012-12-31 10:12:12 hguisu 阅读数 76448
摘要:随着数据体积的越来越大,实时处理成为了许多机构需要面对的首要挑战。Shruthi Kumar和Siddharth Patankar在Dr.Dobb’s上结合了汽车超速监视,为我们演示了使用Storm进行实时大数据分析。CSDN在此编译、整理。

简单和明了,Storm让大数据分析变得轻松加愉快。

当今世界,公司的日常运营经常会生成TB级别的数据。数据来源囊括了互联网装置可以捕获的任何类型数据,网站、社交媒体、交易型商业数据以及其它商业环境中创建的数据。考虑到数据的生成量,实时处理成为了许多机构需要面对的首要挑战。我们经常用的一个非常有效的开源实时计算工具就是Storm —— Twitter开发,通常被比作“实时的Hadoop”。然而Storm远比Hadoop来的简单,因为用它处理大数据不会带来新老技术的交替。

Shruthi Kumar、Siddharth Patankar共同效力于Infosys,分别从事技术分析和研发工作。本文详述了Storm的使用方法,例子中的项目名称为“超速报警系统(Speeding Alert System)”。我们想实现的功能是:实时分析过往车辆的数据,一旦车辆数据超过预设的临界值 —— 便触发一个trigger并把相关的数据存入数据库。

1.  Storm是什么

     全量数据处理使用的大多是鼎鼎大名的hadoop或者hive,作为一个批处理系统,hadoop以其吞吐量大、自动容错等优点,在海量数据处理上得到了广泛的使用。

      Hadoop下的Map/Reduce框架对于数据的处理流程是

      1、 将要处理的数据上传到Hadoop的文件系统HDFS中。

      2、 Map阶段

             a)   Master对Map的预处理:对于大量的数据进行切分,划分为M个16~64M的数据分片(可通过参数自定义分片大小)

             b)   调用Mapper函数:Master为Worker分配Map任务,每个分片都对应一个Worker进行处理。各个Worker读取并调用用户定义的Mapper函数    处理数据,并将结果存入HDFS,返回存储位置给Master。

一个Worker在Map阶段完成时,在HDFS中,生成一个排好序的Key-values组成的文件。并将位置信息汇报给Master。

      3、 Reduce阶段

             a)   Master对Reduce的预处理:Master为Worker分配Reduce任务,他会将所有Mapper产生的数据进行映射,将相同key的任务分配给某个Worker。

             b)   调用Reduce函数:各个Worker将分配到的数据集进行排序(使用工具类Merg),并调用用户自定义的Reduce函数,并将结果写入HDFS。

每个Worker的Reduce任务完成后,都会在HDFS中生成一个输出文件。Hadoop并不将这些文件合并,因为这些文件往往会作为另一个Map/reduce程序的输入。

         以上的流程,粗略概括,就是从HDFS中获取数据,将其按照大小分片,进行分布式处理,最终输出结果。从流程来看,Hadoop框架进行数据处理有以下要求:

1、 数据已经存在在HDFS当中。

2、 数据间是少关联的。各个任务执行器在执行负责的数据时,无需考虑对其他数据的影响,数据之间应尽可能是无联系、不会影响的。

使用Hadoop,适合大批量的数据处理,这是他所擅长的。由于基于Map/Reduce这种单级的数据处理模型进行,因此,如果数据间的关联系较大,需要进行数据的多级交互处理(某个阶段的处理数据依赖于上一个阶段),需要进行多次map/reduce。又由于map/reduce每次执行都需要遍历整个数据集,对于数据的实时计算并不合适,于是有了storm。

      对比Hadoop的批处理,Storm是个实时的、分布式以及具备高容错的计算系统。同Hadoop一样Storm也可以处理大批量的数据,然而Storm在保证高可靠性的前提下还可以让处理进行的更加实时;也就是说,所有的信息都会被处理。Storm同样还具备容错和分布计算这些特性,这就让Storm可以扩展到不同的机器上进行大批量的数据处理。他同样还有以下的这些特性:

  • 易于扩展:对于扩展,伴随着业务的发展,我们的数据量、计算量可能会越来越大,所以希望这个系统是可扩展的。你只需要添加机器和改变对应的topology(拓扑)设置。Storm使用Hadoop Zookeeper进行集群协调,这样可以充分的保证大型集群的良好运行。
  • 每条信息的处理都可以得到保证。
  • Storm集群管理简易。
  • Storm的容错机能:一旦topology递交,Storm会一直运行它直到topology被废除或者被关闭。而在执行中出现错误时,也会由Storm重新分配任务。这是分布式系统中通用问题。一个节点挂了不能影响我的应用。
  • 低延迟。都说了是实时计算系统了,延迟是一定要低的。
  • 尽管通常使用Java,Storm中的topology可以用任何语言设计。

       在线实时流处理模型

       对于处理大批量数据的Map/reduce程序,在任务完成之后就停止了,但Storm是用于实时计算的,所以,相应的处理程序会一直执行(等待任务,有任务则执行)直至手动停止。

       对于Storm,他是实时处理模型,与hadoop的不同是,他是针对在线业务而存在的计算平台,如统计某用户的交易量、生成为某个用户的推荐列表等实时性高的需求。他是一个“流处理”框架。何谓流处理?storm将数据以Stream的方式,并按照Topology的顺序,依次处理并最终生成结果。


当然为了更好的理解文章,你首先需要安装和设置Storm。需要通过以下几个简单的步骤:

  • 从Storm官方下载Storm安装文件
  • 将bin/directory解压到你的PATH上,并保证bin/storm脚本是可执行的。
      尽管 Storm 是使用 Clojure 语言开发的,您仍然可以在 Storm 中使用几乎任何语言编写应用程序。所需的只是一个连接到 Storm 的架构的适配器。已存在针对 Scala、JRuby、Perl 和 PHP 的适配器,但是还有支持流式传输到 Storm 拓扑结构中的结构化查询语言适配器。

2.  Storm的组件

       Storm集群和Hadoop集群表面上看很类似。但是Hadoop上运行的是MapReduce jobs,而在Storm上运行的是拓扑(topology),这两者之间是非常不一样的。一个关键的区别是: 一个MapReduce job最终会结束, 而一个topology永远会运行(除非你手动kill掉)。

       Storm集群主要由一个主节点(Nimbus后台程序)和一群工作节点(worker node)Supervisor的节点组成,通过 Zookeeper进行协调。Nimbus类似Hadoop里面的JobTracker。Nimbus负责在集群里面分发代码,分配计算任务给机器, 并且监控状态。

      每一个工作节点上面运行一个叫做Supervisor的节点。Supervisor会监听分配给它那台机器的工作,根据需要启动/关闭工作进程。每一个工作进程执行一个topology的一个子集;一个运行的topology由运行在很多机器上的很多工作进程组成。



1、 Nimbus主节点:

     主节点通常运行一个后台程序 —— Nimbus,用于响应分布在集群中的节点,分配任务和监测故障。这个很类似于Hadoop中的Job Tracker。

2、Supervisor工作节点:

      工作节点同样会运行一个后台程序 —— Supervisor,用于收听工作指派并基于要求运行工作进程。每个工作节点都是topology中一个子集的实现。而Nimbus和Supervisor之间的协调则通过Zookeeper系统或者集群。

3、Zookeeper

     Zookeeper是完成Supervisor和Nimbus之间协调的服务。而应用程序实现实时的逻辑则被封装进Storm中的“topology”。topology则是一组由Spouts(数据源)和Bolts(数据操作)通过Stream Groupings进行连接的图。下面对出现的术语进行更深刻的解析。

4、Worker:

       运行具体处理组件逻辑的进程。

5、Task:

       worker中每一个spout/bolt的线程称为一个task. 在storm0.8之后,task不再与物理线程对应,同一个spout/bolt的task可能会共享一个物理线程,该线程称为executor。

6、Topology(拓扑):

       storm中运行的一个实时应用程序,因为各个组件间的消息流动形成逻辑上的一个拓扑结构。一个topology是spouts和bolts组成的图, 通过stream groupings将图中的spouts和bolts连接起来,如下图:

      

     一个topology会一直运行直到你手动kill掉,Storm自动重新分配执行失败的任务, 并且Storm可以保证你不会有数据丢失(如果开启了高可靠性的话)。如果一些机器意外停机它上面的所有任务会被转移到其他机器上。

运行一个topology很简单。首先,把你所有的代码以及所依赖的jar打进一个jar包。然后运行类似下面的这个命令:

      storm jar all-my-code.jar backtype.storm.MyTopology arg1 arg2

这个命令会运行主类: backtype.strom.MyTopology, 参数是arg1arg2。这个类的main函数定义这个topology并且把它提交给Nimbus。storm jar负责连接到Nimbus并且上传jar包。

Topology的定义是一个Thrift结构,并且Nimbus就是一个Thrift服务, 你可以提交由任何语言创建的topology。上面的方面是用JVM-based语言提交的最简单的方法。


7、Spout:

       消息源spout是Storm里面一个topology里面的消息生产者。简而言之,Spout从来源处读取数据并放入topology。Spout分成可靠和不可靠两种;当Storm接收失败时,可靠的Spout会对tuple(元组,数据项组成的列表)进行重发;而不可靠的Spout不会考虑接收成功与否只发射一次。

       消息源可以发射多条消息流stream。使用OutputFieldsDeclarer.declareStream来定义多个stream,然后使用SpoutOutputCollector来发射指定的stream。

      而Spout中最主要的方法就是nextTuple(),该方法会发射一个新的tuple到topology,如果没有新tuple发射则会简单的返回。

       要注意的是nextTuple方法不能阻塞,因为storm在同一个线程上面调用所有消息源spout的方法。

另外两个比较重要的spout方法是ack和fail。storm在检测到一个tuple被整个topology成功处理的时候调用ack,否则调用fail。storm只对可靠的spout调用ack和fail。

8、Bolt:

     Topology中所有的处理都由Bolt完成。所有的消息处理逻辑被封装在bolts里面Bolt可以完成任何事,比如:连接的过滤、聚合、访问文件/数据库、等等。

        Bolt从Spout中接收数据并进行处理,如果遇到复杂流的处理也可能将tuple发送给另一个Bolt进行处理。即需要经过很多blots。比如算出一堆图片里面被转发最多的图片就至少需要两步:第一步算出每个图片的转发数量。第二步找出转发最多的前10个图片。(如果要把这个过程做得更具有扩展性那么可能需要更多的步骤)。

        Bolts可以发射多条消息流, 使用OutputFieldsDeclarer.declareStream定义stream,使用OutputCollector.emit来选择要发射的stream。

      而Bolt中最重要的方法是execute(),以新的tuple作为参数接收。不管是Spout还是Bolt,如果将tuple发射成多个流,这些流都可以通过declareStream()来声明。

     bolts使用OutputCollector来发射tuple,bolts必须要为它处理的每一个tuple调用OutputCollector的ack方法,以通知Storm这个tuple被处理完成了,从而通知这个tuple的发射者spouts。 一般的流程是: bolts处理一个输入tuple,  发射0个或者多个tuple, 然后调用ack通知storm自己已经处理过这个tuple了。storm提供了一个IBasicBolt会自动调用ack。

9、Tuple:

       一次消息传递的基本单元。本来应该是一个key-value的map,但是由于各个组件间传递的tuple的字段名称已经事先定义好,所以tuple中只要按序填入各个value就行了,所以就是一个value list.

10、Stream:

        源源不断传递的tuple就组成了stream。消息流stream是storm里的关键抽象。一个消息流是一个没有边界的tuple序列, 而这些tuple序列会以一种分布式的方式并行地创建和处理。通过对stream中tuple序列中每个字段命名来定义stream。在默认的情况下,tuple的字段类型可以是:integer,long,short, byte,string,double,float,boolean和byte array。你也可以自定义类型(只要实现相应的序列化器)。

     每个消息流在定义的时候会被分配给一个id,因为单向消息流使用的相当普遍, OutputFieldsDeclarer定义了一些方法让你可以定义一个stream而不用指定这个id。在这种情况下这个stream会分配个值为‘default’默认的id 。

      Storm提供的最基本的处理stream的原语是spout和bolt。你可以实现spout和bolt提供的接口来处理你的业务逻辑。

      

11、Stream Groupings:

Stream Grouping定义了一个流在Bolt任务间该如何被切分。这里有Storm提供的6个Stream Grouping类型:

1). 随机分组(Shuffle grouping):随机分发tuple到Bolt的任务,保证每个任务获得相等数量的tuple。

2). 字段分组(Fields grouping):根据指定字段分割数据流,并分组。例如,根据“user-id”字段,相同“user-id”的元组总是分发到同一个任务,不同“user-id”的元组可能分发到不同的任务。

3). 全部分组(All grouping):tuple被复制到bolt的所有任务。这种类型需要谨慎使用。

4). 全局分组(Global grouping):全部流都分配到bolt的同一个任务。明确地说,是分配给ID最小的那个task。

5). 无分组(None grouping):你不需要关心流是如何分组。目前,无分组等效于随机分组。但最终,Storm将把无分组的Bolts放到Bolts或Spouts订阅它们的同一线程去执行(如果可能)。

6). 直接分组(Direct grouping):这是一个特别的分组类型。元组生产者决定tuple由哪个元组处理者任务接收。

当然还可以实现CustomStreamGroupimg接口来定制自己需要的分组。

storm 和hadoop的对比来了解storm中的基本概念。

Hadoop Storm
系统角色 JobTracker Nimbus
TaskTracker Supervisor
Child Worker
应用名称 Job Topology
组件接口 Mapper/Reducer Spout/Bolt


3.  Storm应用场景

       Storm 与其他大数据解决方案的不同之处在于它的处理方式。Hadoop 在本质上是一个批处理系统。数据被引入 Hadoop 文件系统 (HDFS) 并分发到各个节点进行处理。当处理完成时,结果数据返回到 HDFS 供始发者使用。Storm 支持创建拓扑结构来转换没有终点的数据流。不同于 Hadoop 作业,这些转换从不停止,它们会持续处理到达的数据。

Twitter列举了Storm的三大类应用:

1. 信息流处理{Stream processing}
Storm可用来实时处理新数据和更新数据库,兼具容错性和可扩展性。Storm可以用来处理源源不断流进来的消息,处理之后将结果写入到某个存储中去。

2. 连续计算{Continuous computation}
Storm可进行连续查询并把结果即时反馈给客户端。比如把Twitter上的热门话题发送到浏览器中。

3. 分布式远程程序调用{Distributed RPC}
       Storm可用来并行处理密集查询。Storm的拓扑结构是一个等待调用信息的分布函数,当它收到一条调用信息后,会对查询进行计算,并返回查询结果。举个例子Distributed RPC可以做并行搜索或者处理大集合的数据。

        通过配置drpc服务器,将storm的topology发布为drpc服务。客户端程序可以调用drpc服务将数据发送到storm集群中,并接收处理结果的反馈。这种方式需要drpc服务器进行转发,其中drpc服务器底层通过thrift实现。适合的业务场景主要是实时计算。并且扩展性良好,可以增加每个节点的工作worker数量来动态扩展。


4.  项目实施,构建Topology

      当下情况我们需要给Spout和Bolt设计一种能够处理大量数据(日志文件)的topology,当一个特定数据值超过预设的临界值时促发警报。使用Storm的topology,逐行读入日志文件并且监视输入数据。在Storm组件方面,Spout负责读入输入数据。它不仅从现有的文件中读入数据,同时还监视着新文件。文件一旦被修改Spout会读入新的版本并且覆盖之前的tuple(可以被Bolt读入的格式),将tuple发射给Bolt进行临界分析,这样就可以发现所有可能超临界的记录。

下一节将对用例进行详细介绍。

临界分析

这一节,将主要聚焦于临界值的两种分析类型:瞬间临界(instant thershold)和时间序列临界(time series threshold)。

  • 瞬间临界值监测:一个字段的值在那个瞬间超过了预设的临界值,如果条件符合的话则触发一个trigger。举个例子当车辆超越80公里每小时,则触发trigger。
  • 时间序列临界监测:字段的值在一个给定的时间段内超过了预设的临界值,如果条件符合则触发一个触发器。比如:在5分钟类,时速超过80KM两次及以上的车辆。

Listing One显示了我们将使用的一个类型日志,其中包含的车辆数据信息有:车牌号、车辆行驶的速度以及数据获取的位置。

AB 123 60 North city
BC 123 70 South city
CD 234 40 South city
DE 123 40 East  city
EF 123 90 South city
GH 123 50 West  city

这里将创建一个对应的XML文件,这将包含引入数据的模式。这个XML将用于日志文件的解析。XML的设计模式和对应的说明请见下表。


XML文件和日志文件都存放在Spout可以随时监测的目录下,用以关注文件的实时更新。而这个用例中的topology请见下图。


Figure 1:Storm中建立的topology,用以实现数据实时处理

如图所示:FilelistenerSpout接收输入日志并进行逐行的读入,接着将数据发射给ThresoldCalculatorBolt进行更深一步的临界值处理。一旦处理完成,被计算行的数据将发送给DBWriterBolt,然后由DBWriterBolt存入给数据库。下面将对这个过程的实现进行详细的解析。

Spout的实现

Spout以日志文件和XML描述文件作为接收对象。XML文件包含了与日志一致的设计模式。不妨设想一下一个示例日志文件,包含了车辆的车牌号、行驶速度、以及数据的捕获位置。(看下图)


Figure2:数据从日志文件到Spout的流程图

Listing Two显示了tuple对应的XML,其中指定了字段、将日志文件切割成字段的定界符以及字段的类型。XML文件以及数据都被保存到Spout指定的路径。

Listing Two:用以描述日志文件的XML文件。

<TUPLEINFO> 
<FIELDLIST> 
<FIELD> 
<COLUMNNAME>vehicle_number</COLUMNNAME> 
<COLUMNTYPE>string</COLUMNTYPE> 
</FIELD> 
 
<FIELD>
<COLUMNNAME>speed</COLUMNNAME> 
<COLUMNTYPE>int</COLUMNTYPE> 
</FIELD> 
 
<FIELD> 
<COLUMNNAME>location</COLUMNNAME> 
<COLUMNTYPE>string</COLUMNTYPE> 
</FIELD> 
</FIELDLIST> 
<DELIMITER>,</DELIMITER> 
</TUPLEINFO>   

通过构造函数及它的参数Directory、PathSpout和TupleInfo对象创建Spout对象。TupleInfo储存了日志文件的字段、定界符、字段的类型这些很必要的信息。这个对象通过XSTream序列化XML时建立。

Spout的实现步骤:

  • 对文件的改变进行分开的监听,并监视目录下有无新日志文件添加。
  • 在数据得到了字段的说明后,将其转换成tuple。
  • 声明Spout和Bolt之间的分组,并决定tuple发送给Bolt的途径。

Spout的具体编码在Listing Three中显示。

Listing Three:Spout中open、nextTuple和delcareOutputFields方法的逻辑。

public void open( Map conf, TopologyContext context,SpoutOutputCollector collector )   
{   
           _collector = collector;   
         try   
         {   
         fileReader  =  new BufferedReader(new FileReader(new File(file)));  
         }  
         catch (FileNotFoundException e)  
         {  
         System.exit(1);   
         }  
}                                                          
 
public void nextTuple()  
{  
         protected void ListenFile(File file)  
         {  
         Utils.sleep(2000);  
         RandomAccessFile access = null;  
         String line = null;   
            try   
            {  
                while ((line = access.readLine()) != null)  
                {  
                    if (line !=null)  
                    {   
                         String[] fields=null;  
                          if (tupleInfo.getDelimiter().equals("|"))  fields = line.split("\\"+tupleInfo.getDelimiter());   
                          else   
                          fields = line.split  (tupleInfo.getDelimiter());   
                          if (tupleInfo.getFieldList().size() == fields.length)  _collector.emit(new Values(fields));  
                    }  
               }  
            }  
            catch (IOException ex){ }  
            }  
}  
 
public void declareOutputFields(OutputFieldsDeclarer declarer)  
{  
      String[] fieldsArr = new String [tupleInfo.getFieldList().size()];  
      for(int i=0; i<tupleInfo.getFieldList().size(); i++)  
      {  
              fieldsArr[i] = tupleInfo.getFieldList().get(i).getColumnName();  
      }  
declarer.declare(new Fields(fieldsArr));  
}      

declareOutputFileds()决定了tuple发射的格式,这样的话Bolt就可以用类似的方法将tuple译码。Spout持续对日志文件的数据的变更进行监听,一旦有添加Spout就会进行读入并且发送给Bolt进行处理。

Bolt的实现

Spout的输出结果将给予Bolt进行更深一步的处理。经过对用例的思考,我们的topology中需要如Figure 3中的两个Bolt。

Figure 3:Spout到Bolt的数据流程。

ThresholdCalculatorBolt

Spout将tuple发出,由ThresholdCalculatorBolt接收并进行临界值处理。在这里,它将接收好几项输入进行检查;分别是:

临界值检查

  • 临界值栏数检查(拆分成字段的数目)
  • 临界值数据类型(拆分后字段的类型)
  • 临界值出现的频数
  • 临界值时间段检查

Listing Four中的类,定义用来保存这些值。

Listing Four:ThresholdInfo类

public class ThresholdInfo implementsSerializable  
 
{    
        private String action;   
        private String rule;   
        private Object thresholdValue;  
        private int thresholdColNumber;   
        private Integer timeWindow;   
        private int frequencyOfOccurence;   
}   
基于字段中提供的值,临界值检查将被Listing Five中的execute()方法执行。代码大部分的功能是解析和接收值的检测。

Listing Five:临界值检测代码段

public void execute(Tuple tuple, BasicOutputCollector collector)   
{  
    if(tuple!=null)   
    {  
        List<Object> inputTupleList = (List<Object>) tuple.getValues();  
        int thresholdColNum = thresholdInfo.getThresholdColNumber();   
        Object thresholdValue = thresholdInfo.getThresholdValue();   
        String thresholdDataType = tupleInfo.getFieldList().get(thresholdColNum-1).getColumnType();   
        Integer timeWindow = thresholdInfo.getTimeWindow();  
         int frequency = thresholdInfo.getFrequencyOfOccurence();  
         if(thresholdDataType.equalsIgnoreCase("string"))  
         {  
             String valueToCheck = inputTupleList.get(thresholdColNum-1).toString();  
             String frequencyChkOp = thresholdInfo.getAction();  
             if(timeWindow!=null)  
             {  
                 long curTime = System.currentTimeMillis();  
                 long diffInMinutes = (curTime-startTime)/(1000);  
                 if(diffInMinutes>=timeWindow)  
                 {  
                     if(frequencyChkOp.equals("=="))  
                     {  
                          if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
                          {  
                              count.incrementAndGet();  
                              if(count.get() > frequency)  
                                  splitAndEmit(inputTupleList,collector);  
                          }  
                     }  
                     else if(frequencyChkOp.equals("!="))  
                     {  
                         if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
                         {  
                              count.incrementAndGet();  
                              if(count.get() > frequency)  
                                  splitAndEmit(inputTupleList,collector);  
                          }  
                      }  
                      else                         System.out.println("Operator not supported");   
                  }  
              }  
              else 
              {  
                  if(frequencyChkOp.equals("=="))  
                  {  
                      if(valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
                      {  
                          count.incrementAndGet();  
                          if(count.get() > frequency)  
                              splitAndEmit(inputTupleList,collector);  
                          }  
                  }  
                  else if(frequencyChkOp.equals("!="))  
                  {  
                       if(!valueToCheck.equalsIgnoreCase(thresholdValue.toString()))  
                       {  
                           count.incrementAndGet();  
                           if(count.get() > frequency)  
                               splitAndEmit(inputTupleList,collector);  
                          }  
                   }  
               }  
            }  
            else if(thresholdDataType.equalsIgnoreCase("int") ||                     thresholdDataType.equalsIgnoreCase("double") ||                     thresholdDataType.equalsIgnoreCase("float") ||                     thresholdDataType.equalsIgnoreCase("long") ||                     thresholdDataType.equalsIgnoreCase("short"))  
            {  
                String frequencyChkOp = thresholdInfo.getAction();  
                if(timeWindow!=null)  
                {  
                     long valueToCheck =                          Long.parseLong(inputTupleList.get(thresholdColNum-1).toString());  
                     long curTime = System.currentTimeMillis();  
                     long diffInMinutes = (curTime-startTime)/(1000);  
                     System.out.println("Difference in minutes="+diffInMinutes);  
                     if(diffInMinutes>=timeWindow)  
                     {  
                          if(frequencyChkOp.equals("<"))  
                          {  
                              if(valueToCheck < Double.parseDouble(thresholdValue.toString()))  
                              {  
                                   count.incrementAndGet();  
                                   if(count.get() > frequency)  
                                       splitAndEmit(inputTupleList,collector);  
                              }  
                          }  
                          else if(frequencyChkOp.equals(">"))  
                          {  
                               if(valueToCheck > Double.parseDouble(thresholdValue.toString()))  
                                {  
                                   count.incrementAndGet();  
                                   if(count.get() > frequency)  
                                       splitAndEmit(inputTupleList,collector);  
                               }  
                           }  
                           else if(frequencyChkOp.equals("=="))  
                           {  
                              if(valueToCheck == Double.parseDouble(thresholdValue.toString()))  
                              {  
                                  count.incrementAndGet();  
                                  if(count.get() > frequency)  
                                      splitAndEmit(inputTupleList,collector);  
                               }  
                           }  
                           else if(frequencyChkOp.equals("!="))  
                           {  
    . . .  
                            }  
                       }  
             }  
      else 
          splitAndEmit(null,collector);  
      }  
      else 
     {  
           System.err.println("Emitting null in bolt");  
           splitAndEmit(null,collector);  
    }  
} 


经由Bolt发送的的tuple将会传递到下一个对应的Bolt,在我们的用例中是DBWriterBolt。

DBWriterBolt

经过处理的tuple必须被持久化以便于触发tigger或者更深层次的使用。DBWiterBolt做了这个持久化的工作并把tuple存入了数据库。表的建立由prepare()函数完成,这也将是topology调用的第一个方法。方法的编码如Listing Six所示。

Listing Six:建表编码。

public void prepare( Map StormConf, TopologyContext context )   
{         
    try   
    {  
        Class.forName(dbClass);  
    }   
    catch (ClassNotFoundException e)   
    {  
        System.out.println("Driver not found");  
        e.printStackTrace();  
    }  
   
    try   
    {  
       connection driverManager.getConnection(   
           "jdbc:mysql://"+databaseIP+":"+databasePort+"/"+databaseName, userName, pwd);  
       connection.prepareStatement("DROP TABLE IF EXISTS "+tableName).execute();  
   
       StringBuilder createQuery = new StringBuilder(  
           "CREATE TABLE IF NOT EXISTS "+tableName+"(");  
       for(Field fields : tupleInfo.getFieldList())  
       {  
           if(fields.getColumnType().equalsIgnoreCase("String"))  
               createQuery.append(fields.getColumnName()+" VARCHAR(500),");  
           else 
               createQuery.append(fields.getColumnName()+" "+fields.getColumnType()+",");  
       }  
       createQuery.append("thresholdTimeStamp timestamp)");  
       connection.prepareStatement(createQuery.toString()).execute();  
   
       // Insert Query  
       StringBuilder insertQuery = new StringBuilder("INSERT INTO "+tableName+"(");  
       String tempCreateQuery = new String();  
       for(Field fields : tupleInfo.getFieldList())  
       {  
            insertQuery.append(fields.getColumnName()+",");  
       }  
       insertQuery.append("thresholdTimeStamp").append(") values (");  
       for(Field fields : tupleInfo.getFieldList())  
       {  
           insertQuery.append("?,");  
       }  
   
       insertQuery.append("?)");  
       prepStatement = connection.prepareStatement(insertQuery.toString());  
    }  
    catch (SQLException e)   
    {         
        e.printStackTrace();  
    }         
}  


数据分批次的插入数据库。插入的逻辑由Listting Seven中的execute()方法提供。大部分的编码都是用来实现可能存在不同类型输入的解析。

Listing Seven:数据插入的代码部分。

public void execute(Tuple tuple, BasicOutputCollector collector)   
{  
    batchExecuted=false;  
    if(tuple!=null)  
    {  
       List<Object> inputTupleList = (List<Object>) tuple.getValues();  
       int dbIndex=0;  
       for(int i=0;i<tupleInfo.getFieldList().size();i++)  
       {  
           Field field = tupleInfo.getFieldList().get(i);  
           try {  
               dbIndex = i+1;  
               if(field.getColumnType().equalsIgnoreCase("String"))               
                   prepStatement.setString(dbIndex, inputTupleList.get(i).toString());  
               else if(field.getColumnType().equalsIgnoreCase("int"))  
                   prepStatement.setInt(dbIndex,  
                       Integer.parseInt(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("long"))  
                   prepStatement.setLong(dbIndex,   
                       Long.parseLong(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("float"))  
                   prepStatement.setFloat(dbIndex,   
                       Float.parseFloat(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("double"))  
                   prepStatement.setDouble(dbIndex,   
                       Double.parseDouble(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("short"))  
                   prepStatement.setShort(dbIndex,   
                       Short.parseShort(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("boolean"))  
                   prepStatement.setBoolean(dbIndex,   
                       Boolean.parseBoolean(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("byte"))  
                   prepStatement.setByte(dbIndex,   
                       Byte.parseByte(inputTupleList.get(i).toString()));  
               else if(field.getColumnType().equalsIgnoreCase("Date"))  
               {  
                  Date dateToAdd=null;  
                  if (!(inputTupleList.get(i) instanceof Date))    
                  {    
                       DateFormat df = new SimpleDateFormat("yyyy-MM-dd hh:mm:ss");  
                       try   
                       {  
                           dateToAdd = df.parse(inputTupleList.get(i).toString());  
                       }  
                       catch (ParseException e)   
                       {  
                           System.err.println("Data type not valid");  
                       }  
                   }    
                   else 
                   {  
            dateToAdd = (Date)inputTupleList.get(i);  
            java.sql.Date sqlDate = new java.sql.Date(dateToAdd.getTime());  
            prepStatement.setDate(dbIndex, sqlDate);  
            }     
            }   
        catch (SQLException e)   
        {  
             e.printStackTrace();  
        }  
    }  
    Date now = new Date();            
    try 
    {  
        prepStatement.setTimestamp(dbIndex+1, new java.sql.Timestamp(now.getTime()));  
        prepStatement.addBatch();  
        counter.incrementAndGet();  
        if (counter.get()== batchSize)   
        executeBatch();  
    }   
    catch (SQLException e1)   
    {  
        e1.printStackTrace();  
    }             
   }  
   else 
   {  
        long curTime = System.currentTimeMillis();  
       long diffInSeconds = (curTime-startTime)/(60*1000);  
       if(counter.get()<batchSize && diffInSeconds>batchTimeWindowInSeconds)  
       {  
            try {  
                executeBatch();  
                startTime = System.currentTimeMillis();  
            }  
            catch (SQLException e) {  
                 e.printStackTrace();  
            }  
       }  
   }  
}  
   
public void executeBatch() throws SQLException  
{  
    batchExecuted=true;  
    prepStatement.executeBatch();  
    counter = new AtomicInteger(0);  
} 


一旦Spout和Bolt准备就绪(等待被执行),topology生成器将会建立topology并准备执行。下面就来看一下执行步骤。

在本地集群上运行和测试topology

  • 通过TopologyBuilder建立topology。
  • 使用Storm Submitter,将topology递交给集群。以topology的名字、配置和topology的对象作为参数。
  • 提交topology。

Listing Eight:建立和执行topology。

public class StormMain  
{  
     public static void main(String[] args) throws AlreadyAliveException,   
                                                   InvalidTopologyException,   
                                                   InterruptedException   
     {  
          ParallelFileSpout parallelFileSpout = new ParallelFileSpout();  
          ThresholdBolt thresholdBolt = new ThresholdBolt();  
          DBWriterBolt dbWriterBolt = new DBWriterBolt();  
          TopologyBuilder builder = new TopologyBuilder();  
          builder.setSpout("spout", parallelFileSpout, 1);  
          builder.setBolt("thresholdBolt", thresholdBolt,1).shuffleGrouping("spout");  
          builder.setBolt("dbWriterBolt",dbWriterBolt,1).shuffleGrouping("thresholdBolt");  
          if(this.argsMain!=null && this.argsMain.length > 0)   
          {  
              conf.setNumWorkers(1);  
              StormSubmitter.submitTopology(   
                   this.argsMain[0], conf, builder.createTopology());  
          }  
          else 
          {      
              Config conf = new Config();  
              conf.setDebug(true);  
              conf.setMaxTaskParallelism(3);  
              LocalCluster cluster = new LocalCluster();  
              cluster.submitTopology(  
              "Threshold_Test", conf, builder.createTopology());  
          }  
     }  
} 

topology被建立后将被提交到本地集群。一旦topology被提交,除非被取缔或者集群关闭,它将一直保持运行不需要做任何的修改。这也是Storm的另一大特色之一。

这个简单的例子体现了当你掌握了topology、spout和bolt的概念,将可以轻松的使用Storm进行实时处理。如果你既想处理大数据又不想遍历Hadoop的话,不难发现使用Storm将是个很好的选择。

5.  storm常见问题解答

一、我有一个数据文件,或者我有一个系统里面有数据,怎么导入storm做计算?

你需要实现一个Spout,Spout负责将数据emit到storm系统里,交给bolts计算。怎么实现spout可以参考官方的kestrel spout实现:
https://github.com/nathanmarz/storm-kestrel

如果你的数据源不支持事务性消费,那么就无法得到storm提供的可靠处理的保证,也没必要实现ISpout接口中的ack和fail方法。

二、Storm为了保证tuple的可靠处理,需要保存tuple信息,这会不会导致内存OOM?

Storm为了保证tuple的可靠处理,acker会保存该节点创建的tuple id的xor值,这称为ack value,那么每ack一次,就将tuple id和ack value做异或(xor)。当所有产生的tuple都被ack的时候, ack value一定为0。这是个很简单的策略,对于每一个tuple也只要占用约20个字节的内存。对于100万tuple,也才20M左右。关于可靠处理看这个:
https://github.com/nathanmarz/storm/wiki/Guaranteeing-message-processing

三、Storm计算后的结果保存在哪里?可以保存在外部存储吗?

Storm不处理计算结果的保存,这是应用代码需要负责的事情,如果数据不大,你可以简单地保存在内存里,也可以每次都更新数据库,也可以采用NoSQL存储。storm并没有像s4那样提供一个Persist API,根据时间或者容量来做存储输出。这部分事情完全交给用户。

数据存储之后的展现,也是你需要自己处理的,storm UI只提供对topology的监控和统计。

四、Storm怎么处理重复的tuple?

因为Storm要保证tuple的可靠处理,当tuple处理失败或者超时的时候,spout会fail并重新发送该tuple,那么就会有tuple重复计算的问题。这个问题是很难解决的,storm也没有提供机制帮助你解决。一些可行的策略:
(1)不处理,这也算是种策略。因为实时计算通常并不要求很高的精确度,后续的批处理计算会更正实时计算的误差。
(2)使用第三方集中存储来过滤,比如利用mysql,memcached或者redis根据逻辑主键来去重。
(3)使用bloom filter做过滤,简单高效。

五、Storm的动态增删节点

我在storm和s4里比较里谈到的动态增删节点,是指storm可以动态地添加和减少supervisor节点。对于减少节点来说,被移除的supervisor上的worker会被nimbus重新负载均衡到其他supervisor节点上。在storm 0.6.1以前的版本,增加supervisor节点不会影响现有的topology,也就是现有的topology不会重新负载均衡到新的节点上,在扩展集群的时候很不方便,需要重新提交topology。因此我在storm的邮件列表里提了这个问题,storm的开发者nathanmarz创建了一个issue 54并在0.6.1提供了rebalance命令来让正在运行的topology重新负载均衡,具体见:
https://github.com/nathanmarz/storm/issues/54
和0.6.1的变更:
http://groups.google.com/group/storm-user/browse_thread/thread/24a8fce0b2e53246

storm并不提供机制来动态调整worker和task数目。

六、Storm UI里spout统计的complete latency的具体含义是什么?为什么emit的数目会是acked的两倍?
这个事实上是storm邮件列表里的一个问题。Storm作者marz的解答:

The complete latency is the time from the spout emitting a tuple to that
tuple being acked on the spout
. So it tracks the time for the whole tuple
tree to be processed.

If you dive into the spout component in the UI, you'll see that a lot of
the emitted/transferred is on the __ack* stream. This is the spout
communicating with the ackers which take care of tracking the tuple trees. 


简单地说,complete latency表示了tuple从emit到被acked经过的时间,可以认为是tuple以及该tuple的后续子孙(形成一棵树)整个处理时间。其次spout的emit和transfered还统计了spout和acker之间内部的通信信息,比如对于可靠处理的spout来说,会在emit的时候同时发送一个_ack_init给acker,记录tuple id到task id的映射,以便ack的时候能找到正确的acker task。


6.  其他开源的大数据解决方案

自 Google 在 2004 年推出 MapReduce 范式以来,已诞生了多个使用原始 MapReduce 范式(或拥有该范式的质量)的解决方案。Google 对 MapReduce 的最初应用是建立万维网的索引。尽管此应用程序仍然很流行,但这个简单模型解决的问题也正在增多。

表 1 提供了一个可用开源大数据解决方案的列表,包括传统的批处理和流式处理应用程序。在将 Storm 引入开源之前将近一年的时间里,Yahoo! 的 S4 分布式流计算平台已向 Apache 开源。S4 于 2010 年 10 月发布,它提供了一个高性能计算 (HPC) 平台,向应用程序开发人员隐藏了并行处理的复杂性。S4 实现了一个可扩展的、分散化的集群架构,并纳入了部分容错功能。


表 1. 开源大数据解决方案
解决方案 开发商 类型 描述
Storm Twitter 流式处理 Twitter 的新流式大数据分析解决方案
S4 Yahoo! 流式处理 来自 Yahoo! 的分布式流计算平台
Hadoop Apache 批处理 MapReduce 范式的第一个开源实现
Spark UC Berkeley AMPLab 批处理 支持内存中数据集和恢复能力的最新分析平台
Disco Nokia 批处理 Nokia 的分布式 MapReduce 框架
HPCC LexisNexis 批处理 HPC 大数据集群



csdn(编译/仲浩 王旭东/审校):http://www.csdn.net/article/2012-12-24/2813117-storm-realtime-big-data-analysis

原文链接:Easy, Real-Time Big Data Analysis Using Storm 

2019-03-25 14:45:31 Jacksun_huang 阅读数 3221

前段时间的主要工作是开发统计系统, 统计公司产品的安装量和回访量,统计数据则由客户端调用C接口写入mysql数据库,即我们只需要分析客户端写入的原始数据即可。下面是对这个项目的一个总结:

系统评估

  • 1、预估当前每天的回访量有大几百万,随着其它产品的不断推广, 要统计的数据可能越来越多。
  • 2、统计的数据有比较强的约束关系。对于一条安装数据,必须判断之前唯一安装表中是否存在该记录, 若存在则再根据版本判断升级或重装,否则为新装数据,回访数据类似逻辑。所以,如果要出按小时统计数据,则必须把前一个小时数据处理完之后才可以处理后面的数据;前一天的数据处理完之后才可以处理后一天的数据。
  • 3、团队中都擅长的是PHP。hadoop或其他大数据处理方式经验薄弱,面临学习成本和一些未知因素。

所以最终还是选择用PHP+Mysql来统计,前期应该可以撑一撑。

整体思路

接下来对每个步骤进行梳理:

  • 1、C接口直接写数据到安装表和回访表,原始数据的表采用按年分表,按天分区。原始数据量比较大,也不适合PHP写入。
  • 2、转移数据。原始表记录数比较多,为了尽可能的减少与原始表的耦合,这里做了一个转移的动作,将原始表的最新安装、回访数据转移到近期安装、回访表中。近期表只保留3天的数据,即近期回访表的数据会维持在2000w左右。这个动作是否可以用触发器来完成?这里没有采用触发器,个人始终认为触发器和业务依赖太紧,当原始数据需要调整时触发器也要相应调整,觉得不是很方便,因为修改触发器势必会影响写入操作。
  • 3、数据更新。因为需要一条一条判断数据为新装、重装或者新用户、老用户,区域等,所以有这个更新的过程,更新每一条记录的状态。这里将可能是系统瓶颈所在。
  • 4、小时报表。 数据更新完之后即可根据该数据出报表,因为统计的字段8个左右,所以累计到一定时间之后,这个表的数据也将会很多,前台不适合直接从这里取报表数据。
  • 5、其他报表。 可根据小时报表出天的报表,或者出特定字段的报表等等,这是很方便的,同时数据量也将成倍的减少。

系统实现

项目使用CI开发,实现的步骤就没太多说的,查询的时候注意下索引的先后顺序就行了,系统到目前还没出现因不中索引而引起的问题。不过程序上的一些调整可以记录下:

  • 1、 报表采用highchart实现,但最开始是直接在控制器获取到报表数据后传到视图,当一个页面有多个报表的时候需要把这些数据一次性读取出来之后页面才会显示。运行了一段时间发现打开慢,也不方便扩展,所以把报表统一改成了ajax调用。
  • 2、 菜单的调整。最开始未意识到后面有更多的产品移植过来,所以对菜单也进行过一次调整。
  • 3、 crontab的调整。最开始所有的crontab脚本都放置在一个控制器中,随着不断的增加发现越来越难控了,难以按产品区分,有些也不用了。为了理清这些脚本以及执行频率,对这里- 进行了一次调整。
  • 4、主从调整。运行一段时间后增加了个从库(32G),所有查询的操作从从库拿,调整之后前台报表表现明显。
  • 5、模型的调整。 这个还未深入调整^_^,因为有新老系统的原因, 模型太多,以及很多业务逻辑写在模型中,模型很重。

上面的每一个调整并不需要多少时间, 但对不段增长的系统是很有好处的,每当它要倾斜时,我们就把它扶正,希望它能坚持更久一点。

系统新增功能和调整

调整用户唯一ID。

IOS产品原先用uuid来判断唯一性,但7.0之后发现uuid不唯一了,所以统计系统部分产品要将唯一值由uuid替换为序列号,但一直以来都是uuid为唯一ID,统计这边也直接以uuid为唯一键了。这意味着唯一键要调整,大部分表结构都需要调整了。

原始表有的有序列号,有的没有,所以首先是原始表统一增加序列号字段,因为转移的数据只将特定的字段值写进去,所以原始表的调整对统计不会有影响。同时原始表已有2.5亿数据,直接调整表结构基本不可能。所以采取新建一张调整后的表,rename一下即可,rename的过程是很快的,rename之前的几千条未转移的数据再手动转移一下。

统计这边将在近期表新增一个唯一字段, 唯一字段不依赖固定值。因为即便调整了, 有一些产品还是以uuid为准,唯一值在转移的过程中判断即可。 统计系统调整时先停下所有的脚本,近期表直接删除重建即可,唯一表因为需要处理,边转移边处理一下即可,报表数据保留原有。所以整个过程下来调整并不算大,只是因为数据量比较大,处理觉得麻烦一点而已。

增加一个产品

系统中已经增加了好几个产品了, 这里增加产品的接口是用php实现的。即客户端调用php页面,php写数据库,回访数据大概每天100w左右。运行几天后发现php接口机器挂了, nignx进程数太多。原因就是统计系统比较忙时,数据库压力比较大,php一条一条写入很慢, 很多进程都在等待,于是爆了。。。

针对这个问题的处理方法是,php接口直接写数据到文本,然后脚本定时load数据到数据库。

历史数据处理

有个产品需要对历史数据进行重新统计,历史数据有1亿多。因为历史数据和新数据之间的字段、值等需要进行一次处理,所以采用 SELECT INTO OUTFILE的方式导出,1.6亿数据中导出1.2亿大概5分钟左右。导出之后的的文件有9G左右,直接一次LOAD mysql会超出binlog的限制。所以设置了binglog为3G,然后对原数据按每1000w行进行切割,在一个个导入。

如果导入的表已经建好索引,开始导入1000w要半个多小时,导入了4000w数据后发现奇慢无比。后来重新导,导入的表未建立索引,1000w数据大概需要9分钟左右。不过后来增加索引花了大概2个半小时。

对原始数据的处理也是一个问题,为了提升效率,比较大的数据采用多进程跑,比如开10多个进程同时跑一个小时的数据,二三十万数据3分钟就搞定。但当系统中的这些进程碰到一起时,系统就开始慢了, 所以只能用程序去控制下。

系统总结

1、 到目前位置系统运行还算正常,但随着新功能的不断增加,这也是个挑战。如果只是针对单个产品,一般的业务,用php来处理,日2000w数据问题应该不是很大。

2、系统监控。到目前位置做个几个统计系统了,前面一个是最完善的,有很多监控,可以很快发现问题。当前这个系统数据量是比较大的,但监控还比较薄弱,或者已经有很多潜在的问题被忽略,所以做好监控是有必要的。

3、 使用php运行crontab要防止脚本重复执行,限制起来也很简单,可以用php的exec函数去查看一下当前脚本是否正在执行(需要服务器未限制exec函数),如果正在执行就直接退出,给个简单的判断方法:

function get_process_num($process_name) { return exec('ps -ef | grep "'.$process_name.'" | grep -v "grep" | wc -l'); }