利用spark进行大数据分析

2017-02-23 21:20:14 LW_GHY 阅读数 4642

系统概述



在日常业务分析中, R是非常常用的分析工具,而当数据量较大时,用R语言需要需用更多的时间来完成训练模型,spark作为大规模数据计算框架,采用内存计算,可以短时间内完成大量的数据的处理及计算模型,但缺点是不能图形展示, R语言的sparkly则提供了R语言和Spark的接⼝,实现了在数据量大的情况下,应用Spark的快速数据分析和处理能力结合R语言的图形化展示功能,方便业务分析,模型训练,同时R语言还可以与Hadoop,HDFS,Hbase,redis,MongoDB等大数据平台数据实现交互,以及作业递交与分析,本文主要介绍平台的架构方法,以及各个组件基本使用方法。

系统构建与调试

基础环境介绍:
操作系统 RHEL 6.5 / CentoOS 6.5
已安装软件 Hadoop Hbase Hive Spark Redis MongoDB Mysql 等大数据应用软件










2017-08-19 21:18:13 qq_33813365 阅读数 4094

——8.16开始整理
Spark快速大数据分析

推荐序:
一套大数据解决方案通常包含多个组件,从存储、计算和网络硬件层,到数据处理引擎,再到利用改良的统计和计算算法、数据可视化来获得商业洞见的分析层,这其中数据处理引擎起到了十分重要的作用,毫不夸张的说数据处理引擎至于大数据就相当于CPU之于计算机

spark起源:
2009年加州大学伯克利分校AMPlab 创立spark大数据处理和计算框架。不同于传统数据处理框架,spark基于内存的基本类型,为一些应用程序带来了100倍的性能提升。spark允许允许应用将数据加载到集群内存中反复查询,非擦汗那个适合于大数据处理和机器学习

spark发展:
spark已超越spark核心,发展到了spark streaming、sql、MLlib、GraphX、sparkR等模块,企业、交通、医疗、零售,推进商业洞见,加速决策;
作为MapReduce的继承者,spark主要有三大优点:1.spark非常好用,由于高级API剥离了对集群本身的关注,只关注任务实现的逻辑。2.spark很快,支持交互使用和复杂算法。3.spark是通用引擎,可以用它来完成各种各样的运算,包括SQL查询、文本处理、机器学习

第一章:spark数据分析导论
1.1 spark是什么
快速通用集群计算平台

spark扩展了mapreduce计算模型,高效的支持更多的计算模式,包括交互式查询和流处理(在处理大规模数据集时,速度非常重要,速度快就意味着我们可以进行交互式的数据操作),能够在内存中进行计算(不过就算必须在磁盘中进行复杂计算,spark依然比mapreduce更加高效)

spark适用于各种各样原来需要多种不同分布式平台的场景,包括批处理、迭代算法、交互式查询、流处理。通过在这一个统一的框架下支持这些不同类型的计算,spark是我们可以简单而低耗地把各种处理流程整合到一起。而这样的组合在实际开发中很有意义,进而减轻原来需要对各种平台分别管理的负担

spark可以运行在hadoop集群上,访问hadoop上任意数据源

1.2一个大一统的软件栈

spark的核心是一个对有许多计算任务组成的、运行在多个工作机器或者是一个计算集群上的应用进行调度、分发以及监控的计算引擎。该引擎支持多种不同应用场景专门设计的高级组件,例如 spark streaming、saprk sql、MLlib、GraphX ,这些组件关系密切并且可以相互调用,这样就可像平常软件项目中使用程序库那样,组合使用这些组件

各组间密切集成于spark平台的优点:
都可以从下层改进中受益(当spark核心引擎引入一个优化,sql和机器学习程序库都能自动获得性能提升)
运行整个软件栈的代价变小了(不需要同时运行支持多种不同应用场景所需的多个软件系统,只需在spark上调用各种库)这些代价包括系统的部署、维护、测试、支持、升级
能够无缝整合不同处理模型的应用

spark软件栈:独立调度器/YARN/Mesos——>spark core ——> spark sql/spark streaming/MLlib/GraphX

1.2.1 Spark Core
Spark Core 上实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core还包含了对弹性分布式数据集的API定义。RDD表示分布在多个节点可并并行操作的元素集合,是spark主要的编程对象。spark core提供了创建和操作这些集合的多个API

1.2.2 Spark SQL
Spark SQL 是spark用来操作结构化数据的程序包,可以使用HiveQL来查询数据。Spark SQL支持多种数据源,比如 Hive表、Parquest以及Json。saprk SQL还支持开发者将SQL和传统的RDD编程数据操作方式结合(python、java、scala皆有相应程序包)

1.2.3 Spark streaming
Spark提供的对流式数据进行流式计算的组件。比如网页服务器日志或者网络服务中用户提交的状态更新组成的消息队列,都是数据流 可操作磁盘、内存、实时数据流

1.2.4 MLlib
Spark还提供一个机器学习的程序库,MLlib,提供了许多种机器学习的算法,包括分类、回归、聚类、协同过滤以及模型评估和数据导入等功能

1.2.5 GraphX
操作图的程序库,例如 社交网络的朋友关系图,可以进行并行的图计算以及常用的图算法

1.2.6 集群管理器
底层而言,spark设计为高效的在一个计算节点到数千个计算节点伸缩计算,spark支持多种集群管理器(Hadoop YARN、Apache Mesos),以及spark自带的独立调度器

第二章:spark下载与入门
spark 可以通过Python、java或者Scala使用
spark 本身是scala编写的,运行在java虚拟机(为什么要使用java虚拟机?java语言一个非常重要特点就是与平台无关性。而使用java虚拟机是实现这这一特点的关键。一般的高级语言如果在不同的平台运行,至少需要编译成不同的目标代码。而引入java虚拟机后java语言在不同平台与性是不需要重新编译。java语言使用java虚拟机屏蔽了具体平台的不同,使得java语言编译程序只需生成在java虚拟机上运行的字节码文件,就可以在不同平台不需修改的运行),所以需要集群都安装java环境,要是使用python接口,还需要一个python解释器

ps:.tgz(.tar.gz)

spark有python shell 和 scala版的shell

第三章:RDD编程
简介:
Spark对数据的核心抽象——弹性分布式数据集(Resilient Distributed Dataset)。RDD实际上是分布式的元素集合。在Spark中,对数据的所有操作不外乎创建RDD、转化已有RDD以及调用RDD操作进行求值,而在一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行执行

3.1 RDD基础
Spark中的RDD就是一个不可变的分布式对象集合。每个RDD被分成多个分区,这些分区运行在集群中不同节点
从外部数据创建出输入RDD
使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD
对需要被重用的中间结果RDD执行persist()操作
使用行动操作(count()、first())来触发一次并行操作,spark会对计算优化后执行

3.2创建RDD
创建RDD两种方式:读取外部数据、驱动器中对一个集合进行转化

3.3 RDD操作
RDD支持两种操作:转化操作、行动操作。RDD的转化操作返回的是一个新的RDD,比如map()、filter(),而行动操作向驱动器系统返回结果或者把结果写到外部系统的操作,会触发实际的计算,比如 count()、first()。

3.3.1 转化操作

通过转化操作,从已有的RDD派生出新的RDD,spark会用 谱系图 来记录这些不同RDD的依赖关系。spark需要用这些信息计算每个RDD,可以依靠系谱图在持久化RDD丢失数据时恢复所丢失数据

inputRDD——》errorsRDD/warningRDD——》badlinesRDD

3.3.2 行动操作
转化操作是惰性操作,只有行动操作需要生成实际的输出,会强制执行用的RDD执行转化操作
take()取RDD少量元素,collect()函数可以用来获取整个RDD中的元素,不需注意的是,必须确定整个RDD元素能在单台内存中装的下,因此collect()不能用在大规模数据集
需要注意的是:每当我们调用一个新的行动操作时,整个RDD都会重新开始计算,所以要将中间结果持久化

3.3.3 惰性求值
RDD的转化操作是惰性的,RDD依赖关系由spark系谱图管理记录,当执行操作必须用到指定RDD时才强制执行生成该RDD的转化操作
spark用惰性求值,这样就可以将一些操作和起来来减少计算数据的步骤

2018-03-04 20:15:36 zkf541076398 阅读数 2577
什么是Spark

Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一。

与Hadoop和Storm等其他大数据和MapReduce技术相比,Spark有如下优势。

首先,Spark为我们提供了一个全面、统一的框架用于管理各种有着不同性质(文本数据、图表数据等)的数据集和数据源(批量数据或实时的流数据)的大数据处理的需求。

 

Spark可以将Hadoop集群中的应用在内存中的运行速度提升100倍,甚至能够将应用在磁盘上的运行速度提升10倍。

Spark让开发者可以快速的用Java、Scala或Python编写程序。它本身自带了一个超过80个高阶操作符集合。而且还可以用它在shell中以交互式地查询数据。

除了Map和Reduce操作之外,它还支持SQL查询,流数据,机器学习和图表数据处理。开发者可以在一个数据管道用例中单独使用某一能力或者将这些能力结合在一起使用。

在这个Apache Spark文章系列的第一部分中,我们将了解到什么是Spark,它与典型的MapReduce解决方案的比较以及它如何为大数据处理提供了一套完整的工具。

Hadoop和Spark

Hadoop这项大数据处理技术大概已有十年历史,而且被看做是首选的大数据集合处理的解决方案。MapReduce是一路计算的优秀解决方案,不 过对于需要多路计算和算法的用例来说,并非十分高效。数据处理流程中的每一步都需要一个Map阶段和一个Reduce阶段,而且如果要利用这一解决方案, 需要将所有用例都转换成MapReduce模式。

在下一步开始之前,上一步的作业输出数据必须要存储到分布式文件系统中。因此,复制和磁盘存储会导致这种方式速度变慢。另外Hadoop解决方案中 通常会包含难以安装和管理的集群。而且为了处理不同的大数据用例,还需要集成多种不同的工具(如用于机器学习的Mahout和流数据处理的Storm)。

如果想要完成比较复杂的工作,就必须将一系列的MapReduce作业串联起来然后顺序执行这些作业。每一个作业都是高时延的,而且只有在前一个作业完成之后下一个作业才能开始启动。

而Spark则允许程序开发者使用有向无环图(DAG)开发复杂的多步数据管道。而且还支持跨有向无环图的内存数据共享,以便不同的作业可以共同处理同一个数据。

Spark运行在现有的Hadoop分布式文件系统基础之上(HDFS)提供额外的增强功能。它支持将Spark应用部署到现存的Hadoop v1集群(with SIMR – Spark-Inside-MapReduce)或Hadoop v2 YARN集群甚至是Apache Mesos之中。

我们应该将Spark看作是Hadoop MapReduce的一个替代品而不是Hadoop的替代品。其意图并非是替代Hadoop,而是为了提供一个管理不同的大数据用例和需求的全面且统一的解决方案。

Spark特性

Spark通过在数据处理过程中成本更低的洗牌(Shuffle)方式,将MapReduce提升到一个更高的层次。利用内存数据存储和接近实时的处理能力,Spark比其他的大数据处理技术的性能要快很多倍。

Spark还支持大数据查询的延迟计算,这可以帮助优化大数据处理流程中的处理步骤。Spark还提供高级的API以提升开发者的生产力,除此之外还为大数据解决方案提供一致的体系架构模型。

Spark将中间结果保存在内存中而不是将其写入磁盘,当需要多次处理同一数据集时,这一点特别实用。Spark的设计初衷就是既可以在内存中又可 以在磁盘上工作的执行引擎。当内存中的数据不适用时,Spark操作符就会执行外部操作。Spark可以用于处理大于集群内存容量总和的数据集。

Spark会尝试在内存中存储尽可能多的数据然后将其写入磁盘。它可以将某个数据集的一部分存入内存而剩余部分存入磁盘。开发者需要根据数据和用例评估对内存的需求。Spark的性能优势得益于这种内存中的数据存储。

Spark的其他特性包括:

  • 支持比Map和Reduce更多的函数。
  • 优化任意操作算子图(operator graphs)。
  • 可以帮助优化整体数据处理流程的大数据查询的延迟计算。
  • 提供简明、一致的Scala,Java和Python API。
  • 提供交互式Scala和Python Shell。目前暂不支持Java。

Spark是用Scala程序设计语言编写而成,运行于Java虚拟机(JVM)环境之上。目前支持如下程序设计语言编写Spark应用:

  • Scala
  • Java
  • Python
  • Clojure
  • R

Spark生态系统

除了Spark核心API之外,Spark生态系统中还包括其他附加库,可以在大数据分析和机器学习领域提供更多的能力。

这些库包括:

  • Spark Streaming:
    • Spark Streaming基于微批量方式的计算和处理,可以用于处理实时的流数据。它使用DStream,简单来说就是一个弹性分布式数据集(RDD)系列,处理实时数据。
  • Spark SQL:
    • Spark SQL可以通过JDBC API将Spark数据集暴露出去,而且还可以用传统的BI和可视化工具在Spark数据上执行类似SQL的查询。用户还可以用Spark SQL对不同格式的数据(如JSON,Parquet以及数据库等)执行ETL,将其转化,然后暴露给特定的查询。
  • Spark MLlib:
    • MLlib是一个可扩展的Spark机器学习库,由通用的学习算法和工具组成,包括二元分类、线性回归、聚类、协同过滤、梯度下降以及底层优化原语。
  • Spark GraphX:
    • GraphX是用于图计算和并行图计算的 新的(alpha)Spark API。通过引入弹性分布式属性图(Resilient Distributed Property Graph),一种顶点和边都带有属性的有向多重图,扩展了Spark RDD。为了支持图计算,GraphX暴露了一个基础操作符集合(如subgraph,joinVertices和aggregateMessages) 和一个经过优化的Pregel API变体。此外,GraphX还包括一个持续增长的用于简化图分析任务的图算法和构建器集合。

除了这些库以外,还有一些其他的库,如BlinkDB和Tachyon。

BlinkDB是一个近似查询引擎,用于在海量数据上执行交互式SQL查询。BlinkDB可以通过牺牲数据精度来提升查询响应时间。通过在数据样本上执行查询并展示包含有意义的错误线注解的结果,操作大数据集合。

Tachyon是一个以内存为中心的 分布式文件系统,能够提供内存级别速度的跨集群框架(如Spark和MapReduce)的可信文件共享。它将工作集文件缓存在内存中,从而避免到磁盘中 加载需要经常读取的数据集。通过这一机制,不同的作业/查询和框架可以以内存级的速度访问缓存的文件。
此外,还有一些用于与其他产品集成的适配器,如Cassandra(Spark Cassandra 连接器)和R(SparkR)。Cassandra Connector可用于访问存储在Cassandra数据库中的数据并在这些数据上执行数据分析。

下图展示了在Spark生态系统中,这些不同的库之间的相互关联。

用Apache Spark进行大数据处理——第一部分:入门介绍

图1. Spark框架中的库

我们将在这一系列文章中逐步探索这些Spark库

Spark体系架构

Spark体系架构包括如下三个主要组件:

  • 数据存储
  • API
  • 管理框架

接下来让我们详细了解一下这些组件。

数据存储:

Spark用HDFS文件系统存储数据。它可用于存储任何兼容于Hadoop的数据源,包括HDFS,HBase,Cassandra等。

API:

利用API,应用开发者可以用标准的API接口创建基于Spark的应用。Spark提供Scala,Java和Python三种程序设计语言的API。

下面是三种语言Spark API的网站链接。

资源管理:

Spark既可以部署在一个单独的服务器也可以部署在像Mesos或YARN这样的分布式计算框架之上。

下图2展示了Spark体系架构模型中的各个组件。

用Apache Spark进行大数据处理——第一部分:入门介绍

图2 Spark体系架构

弹性分布式数据集

弹性分布式数据集(基于Matei的研究论文)或RDD是Spark框架中的核心概念。可以将RDD视作数据库中的一张表。其中可以保存任何类型的数据。Spark将数据存储在不同分区上的RDD之中。

RDD可以帮助重新安排计算并优化数据处理过程。

此外,它还具有容错性,因为RDD知道如何重新创建和重新计算数据集。

RDD是不可变的。你可以用变换(Transformation)修改RDD,但是这个变换所返回的是一个全新的RDD,而原有的RDD仍然保持不变。

RDD支持两种类型的操作:

  • 变换(Transformation)
  • 行动(Action)

变换:变换的返回值是一个新的RDD集合,而不是单个值。调用一个变换方法,不会有任何求值计算,它只获取一个RDD作为参数,然后返回一个新的RDD。

变换函数包括:map,filter,flatMap,groupByKey,reduceByKey,aggregateByKey,pipe和coalesce。

行动:行动操作计算并返回一个新的值。当在一个RDD对象上调用行动函数时,会在这一时刻计算全部的数据处理查询并返回结果值。

行动操作包括:reduce,collect,count,first,take,countByKey以及foreach。

如何安装Spark

安装和使用Spark有几种不同方式。你可以在自己的电脑上将Spark作为一个独立的框架安装或者从诸如Cloudera,HortonWorks或MapR之类的供应商处获取一个Spark虚拟机镜像直接使用。或者你也可以使用在云端环境(如Databricks Cloud)安装并配置好的Spark。

在本文中,我们将把Spark作为一个独立的框架安装并在本地启动它。最近Spark刚刚发布了1.2.0版本。我们将用这一版本完成示例应用的代码展示。

如何运行Spark

当你在本地机器安装了Spark或使用了基于云端的Spark后,有几种不同的方式可以连接到Spark引擎。

下表展示了不同的Spark运行模式所需的Master URL参数。

用Apache Spark进行大数据处理——第一部分:入门介绍

如何与Spark交互

Spark启动并运行后,可以用Spark shell连接到Spark引擎进行交互式数据分析。Spark shell支持Scala和Python两种语言。Java不支持交互式的Shell,因此这一功能暂未在Java语言中实现。

可以用spark-shell.cmd和pyspark.cmd命令分别运行Scala版本和Python版本的Spark Shell。

Spark网页控制台

不论Spark运行在哪一种模式下,都可以通过访问Spark网页控制台查看Spark的作业结果和其他的统计数据,控制台的URL地址如下:

http://localhost:4040

Spark控制台如下图3所示,包括Stages,Storage,Environment和Executors四个标签页

(点击查看大图)

用Apache Spark进行大数据处理——第一部分:入门介绍

图3. Spark网页控制台

共享变量

Spark提供两种类型的共享变量可以提升集群环境中的Spark程序运行效率。分别是广播变量和累加器。

广播变量:广播变量可以在每台机器上缓存只读变量而不需要为各个任务发送该变量的拷贝。他们可以让大的输入数据集的集群拷贝中的节点更加高效。

下面的代码片段展示了如何使用广播变量。

//
// Broadcast Variables
//
val broadcastVar = sc.broadcast(Array(1, 2, 3))
broadcastVar.value

累加器:只有在使用相关操作时才会添加累加器,因此它可以很好地支持并行。累加器可用于实现计数(就像在MapReduce中那样)或求和。可以用add方法将运行在集群上的任务添加到一个累加器变量中。不过这些任务无法读取变量的值。只有驱动程序才能够读取累加器的值。

下面的代码片段展示了如何使用累加器共享变量:

//
// Accumulators
//

val accum = sc.accumulator(0, "My Accumulator")

sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

accum.value

Spark应用示例

本篇文章中所涉及的示例应用是一个简单的字数统计应用。这与学习用Hadoop进行大数据处理时的示例应用相同。我们将在一个文本文件上执行一些数 据分析查询。本示例中的文本文件和数据集都很小,不过无须修改任何代码,示例中所用到的Spark查询同样可以用到大容量数据集之上。

为了让讨论尽量简单,我们将使用Spark Scala Shell。

首先让我们看一下如何在你自己的电脑上安装Spark。

前提条件:

  • 为了让Spark能够在本机正常工作,你需要安装Java开发工具包(JDK)。这将包含在下面的第一步中。
  • 同样还需要在电脑上安装Spark软件。下面的第二步将介绍如何完成这项工作。

注:下面这些指令都是以Windows环境为例。如果你使用不同的操作系统环境,需要相应的修改系统变量和目录路径已匹配你的环境。

I. 安装JDK

1)从Oracle网站上下载JDK。推荐使用JDK 1.7版本

将JDK安装到一个没有空格的目录下。对于Windows用户,需要将JDK安装到像c:\dev这样的文件夹下,而不能安装到“c: \Program Files”文件夹下。“c:\Program Files”文件夹的名字中包含空格,如果软件安装到这个文件夹下会导致一些问题。

注:不要在“c:\Program Files”文件夹中安装JDK或(第二步中所描述的)Spark软件。

2)完成JDK安装后,切换至JDK 1.7目录下的”bin“文件夹,然后键入如下命令,验证JDK是否正确安装:

java -version

如果JDK安装正确,上述命令将显示Java版本。

II. 安装Spark软件:

Spark网站上下载最新版本 的Spark。在本文发表时,最新的Spark版本是1.2。你可以根据Hadoop的版本选择一个特定的Spark版本安装。我下载了与Hadoop 2.4或更高版本匹配的Spark,文件名是spark-1.2.0-bin-hadoop2.4.tgz。

将安装文件解压到本地文件夹中(如:c:\dev)。

为了验证Spark安装的正确性,切换至Spark文件夹然后用如下命令启动Spark Shell。这是Windows环境下的命令。如果使用Linux或Mac OS,请相应地编辑命令以便能够在相应的平台上正确运行。

c:
cd c:\dev\spark-1.2.0-bin-hadoop2.4
bin\spark-shell

如果Spark安装正确,就能够在控制台的输出中看到如下信息。

….
15/01/17 23:17:46 INFO HttpServer: Starting HTTP Server
15/01/17 23:17:46 INFO Utils: Successfully started service 'HTTP class server' on port 58132.
Welcome to
      ____              __
     / __/__  ___ _____/ /__
    _\ \/ _ \/ _ `/ __/  '_/
   /___/ .__/\_,_/_/ /_/\_\   version 1.2.0
      /_/

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)
Type in expressions to have them evaluated.
Type :help for more information.
….
15/01/17 23:17:53 INFO BlockManagerMaster: Registered BlockManager
15/01/17 23:17:53 INFO SparkILoop: Created spark context..
Spark context available as sc.

可以键入如下命令检查Spark Shell是否工作正常。

sc.version

(或)

sc.appName

完成上述步骤之后,可以键入如下命令退出Spark Shell窗口:

:quit

如果想启动Spark Python Shell,需要先在电脑上安装Python。你可以下载并安装Anaconda,这是一个免费的Python发行版本,其中包括了一些比较流行的科学、数学、工程和数据分析方面的Python包。

然后可以运行如下命令启动Spark Python Shell:

c:
cd c:\dev\spark-1.2.0-bin-hadoop2.4
bin\pyspark

Spark示例应用

完成Spark安装并启动后,就可以用Spark API执行数据分析查询了。

这些从文本文件中读取并处理数据的命令都很简单。我们将在这一系列文章的后续文章中向大家介绍更高级的Spark框架使用的用例。

首先让我们用Spark API运行流行的Word Count示例。如果还没有运行Spark Scala Shell,首先打开一个Scala Shell窗口。这个示例的相关命令如下所示:

import org.apache.spark.SparkContext
import org.apache.spark.SparkContext._
 
val txtFile = "README.md"
val txtData = sc.textFile(txtFile)
txtData.cache()

我们可以调用cache函数将上一步生成的RDD对象保存到缓存中,在此之后Spark就不需要在每次数据查询时都重新计算。需要注意的 是,cache()是一个延迟操作。在我们调用cache时,Spark并不会马上将数据存储到内存中。只有当在某个RDD上调用一个行动时,才会真正执 行这个操作。

现在,我们可以调用count函数,看一下在文本文件中有多少行数据。

txtData.count()

然后,我们可以执行如下命令进行字数统计。在文本文件中统计数据会显示在每个单词的后面。

val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

wcData.collect().foreach(println)

如果想查看更多关于如何使用Spark核心API的代码示例,请参考网站上的Spark文档

后续计划

在后续的系列文章中,我们将从Spark SQL开始,学习更多关于Spark生态系统的其他部分。之后,我们将继续了解Spark Streaming,Spark MLlib和Spark GraphX。我们也会有机会学习像Tachyon和BlinkDB等框架。

小结

在本文中,我们了解了Apache Spark框架如何通过其标准API帮助完成大数据处理和分析工作。我们还对Spark和传统的MapReduce实现(如Apache Hadoop)进行了比较。Spark与Hadoop基于相同的HDFS文件存储系统,因此如果你已经在Hadoop上进行了大量投资和基础设施建设,可 以一起使用Spark和MapReduce。

此外,也可以将Spark处理与Spark SQL、机器学习以及Spark Streaming结合在一起。关于这方面的内容我们将在后续的文章中介绍。

利用Spark的一些集成功能和适配器,我们可以将其他技术与Spark结合在一起。其中一个案例就是将Spark、Kafka和Apache Cassandra结合在一起,其中Kafka负责输入的流式数据,Spark完成计算,最后Cassandra NoSQL数据库用于保存计算结果数据。

不过需要牢记的是,Spark生态系统仍不成熟,在安全和与BI工具集成等领域仍然需要进一步的改进。

参考文献

关于作者

用Apache Spark进行大数据处理——第一部分:入门介绍Srini Penchikala目 前是一家金融服务机构的软件架构师,这个机构位于德克萨斯州的奥斯汀。他在软件系统架构、设计和开发方面有超过20年的经验。Srini目前正在撰写一本 关于NoSQL数据库模式的书。他还是曼宁出版社出版的《Spring Roo in Action》一书的合著者(http://www.manning.com/SpringRooinAction)。他还曾经出席各种会议,如 JavaOne,SEI Architecture Technology Conference(SATURN),IT Architect Conference(ITARC),No Fluff Just Stuff,NoSQL Now和Project World Conference等。Srini还在InfoQ,The ServerSide,OReilly Network(ONJava),DevX Java,java.net以及JavaWorld等网站上发表过很多关于软件系统架构、安全和风险管理以及NoSQL数据库等方面的文章。他还是InfoQ NoSQL数据库社区的责任编辑

 

查看英文原文:Big Data Processing with Apache Spark – Part 1: Introduction

来自:http://www.infoq.com/cn/articles/apache-spark-introduction
2016-11-21 15:43:38 iigeoxiaoyang 阅读数 9117

前言


本文在之前搭建的集群上,运行一个地理空间分析的示例,示例来自于《Spark高级数据分析》第八章。
Github项目地址:https://github.com/sryza/aas/tree/master/ch08-geotime
这个例子是通过分析纽约市2013年1月份的出租车数据,统计纽约市乘客下车点落在每个行政区的个数。
在开始正文之前,需要掌握以下基础知识:

  • Scala基础语法
  • Spark基础概念和原理(推荐《Spark快速大数据大分析》)

纽约出租车地理空间数据分析的主要流程:

  • 数据获取
  • 数据时间和和空间处理类库
  • 数据预处理与地理空间分析
  • 提交应用至集群,分布式计算

数据获取


本文的数据是纽约市2013年1月份乘客打车费用数据,数据大小是914.9M,解压后为2.5G。

数据下载地址

http://www.andresmh.com/nyctaxitrips/(trip_data_1.csv.zip)

数据下载方式

  • 直接在window下载,上传至linux服务器,注意我的集群是docker容器,直接传到容器master节点。
  • 在linux直接下载,命令如下
wget http://www.andresmh.com/nyctaxitrips/(trip_data_1.csv.zip

数据描述

#解压数据集
unzip trip_data_1.csv.zip
# 查看前10行数据
head -n 10 trip_data_1.csv

结果如下图


纽约出租车数据描述

数据字段描述:

vendor_id:类型 rate_code:比率 store_and_fwd_flag:是否是四驱
pickup_datatime:客人上车时间 dropoff_datatime:客人下车时间
passenger_count:载客数量 trip_time_in_secs:载客时间 trip_distance:载客距离
pickup_longitude:客人上车经度 pickup_latitude:客人上车维度
dropoff_longitude:客人下车经度 dropoff_latitude:客人下车维度

数据处理第三方类库


注意scala是可以直接调用java类库的。
时间处理类库:joda-time,nscala-time_2.11.jar(2.11对应scala版本)
本文空间关系处理库采用Esri的esri-geometry-api,当然也可以采用GeoTools等开源库。
自定义RichGeometry类封装Esri矢量空间处理接口;

package com.cloudera.datascience.geotime
import com.esri.core.geometry.{GeometryEngine, SpatialReference, Geometry}
import scala.language.implicitConversions
/**
 * A wrapper that provides convenience methods for using the spatial relations in the ESRI
 * GeometryEngine with a particular instance of the Geometry interface and an associated
 * SpatialReference.
 *
 * @param geometry the geometry object
 * @param spatialReference optional spatial reference; if not specified, uses WKID 4326 a.k.a.
 *                         WGS84, the standard coordinate frame for Earth.
 */
class RichGeometry(val geometry: Geometry,
    val spatialReference: SpatialReference = SpatialReference.create(4326)) extends Serializable {

  def area2D(): Double = geometry.calculateArea2D()

  def distance(other: Geometry): Double = {
    GeometryEngine.distance(geometry, other, spatialReference)
  }

  def contains(other: Geometry): Boolean = {
    GeometryEngine.contains(geometry, other, spatialReference)
  }

  def within(other: Geometry): Boolean = {
    GeometryEngine.within(geometry, other, spatialReference)
  }

  def overlaps(other: Geometry): Boolean = {
    GeometryEngine.overlaps(geometry, other, spatialReference)
  }

  def touches(other: Geometry): Boolean = {
    GeometryEngine.touches(geometry, other, spatialReference)
  }

  def crosses(other: Geometry): Boolean = {
    GeometryEngine.crosses(geometry, other, spatialReference)
  }

  def disjoint(other: Geometry): Boolean = {
    GeometryEngine.disjoint(geometry, other, spatialReference)
  }
}

/**
 * Helper object for implicitly creating RichGeometry wrappers
 * for a given Geometry instance.
 */
object RichGeometry extends Serializable {
  implicit def createRichGeometry(g: Geometry): RichGeometry = new RichGeometry(g)
}

数据预处理与地理空间分析


上传原始数据到HDFS集群

#在Hdfs集群下创建taxidata目录,注意必须带/
hadoop fs -mkdir /taxidata
#上传本地物理机数据至HDFS集群
hadoop fs -put trip_data_1.csv /taxidata/trip_data_1.csv

自定义safe函数处理格式不正确的数据

详细请看代码注释第三部分

地理空间分析

获取纽约行政区划数据,利用esri gerometry类库判断各行政区下车点的记录数(详细请看代码注释第四部分)。

/**
  * 打车信息类
  * **/
case class Trip(
  pickupTime: DateTime,
  dropoffTime: DateTime,
  pickupLoc: Point,
  dropoffLoc: Point)

/**
  * 出租车数据地理空间分析
  */
object RunGeoTime extends Serializable {

  val formatter = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss", Locale.ENGLISH)

  def main(args: Array[String]): Unit = {

    /*--------------1.初始化SparkContext-------------------*/
    val sc = new SparkContext(new SparkConf().setAppName("SpaceGeo"))

    /*--------------2.读取HDFS数据-------------------*/
    val taxiRaw = sc.textFile("hdfs://master:9000/taxidata")

    /*--------------3.出租车数据预处理------------------*/
    //3.1 利用自定义的safe函数处理原始数据
    val safeParse = safe(parse)
    val taxiParsed = taxiRaw.map(safeParse)
    //taxiParsed数据持久化
    taxiParsed.cache()

    //查看非法数据
   /* val taxiBad = taxiParsed.collect({
      case t if t.isRight => t.right.get
    })*/

    //collect返回到驱动器,为了单机开发和测试使用,不建议集群使用
    //taxiBad.collect().foreach(println)


    /*val taxiGood = taxiParsed.collect({
      case t if t.isLeft => t.left.get
    })
    taxiGood.cache()*/

    //3.2 剔除非法数据结果,获得正确格式的数据
    val taxiGood=taxiParsed.filter(_.isLeft).map(_.left.get)
    taxiGood.cache()

    //自定义一次打车的乘坐时间函数
    def hours(trip: Trip): Long = {
      val d = new Duration(trip.pickupTime, trip.dropoffTime)
      d.getStandardHours
    }
    //3.3 打印统计乘客上下车时间的记录,打印结果如执行分析结果图中的1
    taxiGood.values.map(hours).countByValue().toList.sorted.foreach(println)
    taxiParsed.unpersist()

    //根据上面的输出结果,统计一次乘车时间大于0小于3小时的记录
    val taxiClean = taxiGood.filter {
      case (lic, trip) => {
        val hrs = hours(trip)
        0 <= hrs && hrs < 3
      }
    }

    /*--------------4.出租车数据空间分析------------------*/
    //4.1 获取纽约行政区划数据
    val geojson = scala.io.Source.fromURL(getClass.getResource("/nyc-boroughs.geojson")).mkString
    //转换为地理要素
    val features = geojson.parseJson.convertTo[FeatureCollection]

    val areaSortedFeatures = features.sortBy(f => {
      val borough = f("boroughCode").convertTo[Int]
      (borough, -f.geometry.area2D())
    })

    val bFeatures = sc.broadcast(areaSortedFeatures)
    //4.2 判断乘客下车点落在那个行政区
    def borough(trip: Trip): Option[String] = {
      val feature: Option[Feature] = bFeatures.value.find(f => {
        f.geometry.contains(trip.dropoffLoc)
      })
      feature.map(f => {
        f("borough").convertTo[String]
      })
    }
    //4.3 第一次统计打印各行政区下车点的记录,打印结果如执行分析结果图中的2
    taxiClean.values.map(borough).countByValue().foreach(println)


    //4.4 剔除起点和终点数据缺失的数据
    def hasZero(trip: Trip): Boolean = {
      val zero = new Point(0.0, 0.0)
      (zero.equals(trip.pickupLoc) || zero.equals(trip.dropoffLoc))
    }

    val taxiDone = taxiClean.filter {
      case (lic, trip) => !hasZero(trip)
    }.cache()

    //4.5 踢出零点数据后统计打印各行政区下车点的记录,打印结果如执行分析结果图中的3
    taxiDone.values.map(borough).countByValue().foreach(println)
    taxiGood.unpersist()

    //输出地理空间分析结果到HDFS
    //taxiDone.saveAsTextFile("hdfs://master:9000/GeoResult")

  }

  //字符串转double
  def point(longitude: String, latitude: String): Point = {
    new Point(longitude.toDouble, latitude.toDouble)
  }

  //获取taxiraw RDD记录中的出租车司机驾照和Trip对象
  def parse(line: String): (String, Trip) = {
    val fields = line.split(',')
    val license = fields(1)
    // Not thread-safe:
    val formatterCopy = formatter.clone().asInstanceOf[SimpleDateFormat]
    val pickupTime = new DateTime(formatterCopy.parse(fields(5)))
    val dropoffTime = new DateTime(formatterCopy.parse(fields(6)))
    val pickupLoc = point(fields(10), fields(11))
    val dropoffLoc = point(fields(12), fields(13))

    val trip = Trip(pickupTime, dropoffTime, pickupLoc, dropoffLoc)
    (license, trip)
  }

  //非法记录数据处理函数
  def safe[S, T](f: S => T): S => Either[T, (S, Exception)] = {
    new Function[S, Either[T, (S, Exception)]] with Serializable {
      def apply(s: S): Either[T, (S, Exception)] = {
        try {
          Left(f(s))
        } catch {
          case e: Exception => Right((s, e))
        }
      }
    }
  }

}

分布式计算


打包应用

Windows下环境spark项目环境配置

在Windows上安装maven scala2.11.8(我的版本),intelij 及inteli的scala插件,导入ch08-geotime项目,如下图


ch08项目

配置pom文件

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">

  <modelVersion>4.0.0</modelVersion>

  <groupId>com.cloudera.datascience.geotime</groupId>
  <artifactId>ch08-geotime</artifactId>
  <packaging>jar</packaging>
  <name>Temporal and Geospatial Analysis</name>
  <version>2.0.0</version>

  <dependencies>
   <!--注意 scala版本对应spark集群中scala的版本,provided属性要加上 -->
    <dependency>
      <groupId>org.scala-lang</groupId>
      <artifactId>scala-library</artifactId>
      <version>2.11.8</version>
      <scope>provided</scope>
    </dependency>
    <!--注意 hadoop版本对应spark集群中hadoop的版本,provided属性要加上 -->
    <dependency>
      <groupId>org.apache.hadoop</groupId>
      <artifactId>hadoop-client</artifactId>
      <version>2.7.3</version>
      <scope>provided</scope>
    </dependency>
    <!--注意 spark版本对应spark集群中spark的版本,2.11是对应的scala版本 -->
    <dependency>
      <groupId>org.apache.spark</groupId>
      <artifactId>spark-core_2.11</artifactId>
      <version>2.0.1</version>
      <scope>provided</scope>
    </dependency>
    <!--nscala-time时间处理库,2.11是对应的scala版本 -->
    <dependency>
      <groupId>com.github.nscala-time</groupId>
      <artifactId>nscala-time_2.11</artifactId>
      <version>1.8.0</version>
    </dependency>
    <!--esri空间关系库,2.11是对应的scala版本 -->
    <dependency>
      <groupId>com.esri.geometry</groupId>
      <artifactId>esri-geometry-api</artifactId>
      <version>1.2.1</version>
    </dependency>
    <dependency>
      <groupId>io.spray</groupId>
      <artifactId>spray-json_2.11</artifactId>
      <version>1.3.2</version>
    </dependency>
    <dependency>
      <groupId>joda-time</groupId>
      <artifactId>joda-time</artifactId>
      <version>2.9.4</version>
    </dependency>
  </dependencies>

  <build>
    <plugins>
     <!--scala-maven插件必须加上,否则打包后无主程序 -->
      <plugin>
        <groupId>net.alchim31.maven</groupId>
        <artifactId>scala-maven-plugin</artifactId>
        <version>3.2.2</version>
        <configuration>
          <scalaVersion>2.11.8</scalaVersion>
          <scalaCompatVersion>2.11.8</scalaCompatVersion>
          <args>
            <arg>-unchecked</arg>
            <arg>-deprecation</arg>
            <arg>-feature</arg>
          </args>
          <javacArgs>
            <javacArg>-source</javacArg>
            <javacArg>1.8.0</javacArg>
            <javacArg>-target</javacArg>
            <javacArg>1.8.0</javacArg>
          </javacArgs>
        </configuration>
        <executions>
          <execution>
            <phase>compile</phase>
            <goals>
              <goal>compile</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
       <!--maven-assembly插件可以打包应用的依赖包 -->
      <plugin>
        <groupId>org.apache.maven.plugins</groupId>
        <artifactId>maven-assembly-plugin</artifactId>
        <version>2.6</version>
        <configuration>
          <archive>
            <manifest>
              <mainClass>com.cloudera.datascience.geotime.RunGeoTime</mainClass>
            </manifest>
          </archive>
            <descriptorRefs>
                <descriptorRef>jar-with-dependencies</descriptorRef>
            </descriptorRefs>
          <recompressZippedFiles>false</recompressZippedFiles>
        </configuration>
        <executions>
          <execution>
            <id>make-assembly</id> <!-- 用于maven继承项目的聚合 -->
            <phase>package</phase> <!-- 绑定到package阶段 -->
            <goals>
              <goal>single</goal>
            </goals>
          </execution>
        </executions>
      </plugin>
    </plugins>
  </build>

</project>

Maven打包

在ch08-geotime项目下Terminal命令行

# maven打包,打包结果输入到target目录下
名称为ch08-geotime-2.0.0-jar-with-dependencies.jar(包含依赖包)
mvn clean
mvn package

提交应用到集群

上传jar包至master节点,确保集群已启动,提交应用至集群,主要过程如下:

  1. 用户通过 spark-submit 脚本提交应用。
  2. spark-submit 脚本启动驱动器程序,调用用户定义的 main() 方法。
  3. 驱动器程序与集群管理器通信,申请资源以启动执行器节点。
  4. 集群管理器为驱动器程序启动执行器节点。
  5. 驱动器进程执行用户应用中的操作。 根据程序中所定义的对RDD的转化操作和行动操
    作,驱动器节点把工作以任务的形式发送到执行器进程。
  6. 任务在执行器程序中进行计算并保存结果。
  7. 如果驱动器程序的 main() 方法退出,或者调用了 SparkContext.stop()
    驱动器程序会终止执行器进程,并且通过集群管理器释放资源。
    ————————《Spark快速大数据分析》
  • 利用yarn集群提交应用
# --class 运行 Java 或 Scala 程序时应用的主类
# --master 表示要连接的集群管理器
# --deploy-mode 选择在本地(客户端“ client”)启动驱动器程序,还是在集群中的一台工作节点机
器(集群“ cluster”)上启动。在客户端模式下, spark-submit 会将驱动器程序运行
在 spark-submit 被调用的这台机器上。在集群模式下,驱动器程序会被传输并执行
于集群的一个工作节点上。默认是本地模式
# --name 应用的显示名,会显示在 Spark 的网页用户界面中
# 最后是应用入口的 JAR 包或 Python 脚本
spark-submit  --class com.cloudera.datascience.geotime.RunGeoTime 
--master yarn --deploy-mode cluster  
--executor-memory 2g --executor-cores 2  
--name "taxiGeoSpace"  
/home/ch08-geotime/ch08-geotime-space-2.0.0.jar 
  • 利用spark自带的管理器提交应用
# 注意集群模式地址是 spark://master:6066,客户端模式地址是spark://master:7077
spark-submit  --class com.cloudera.datascience.geotime.RunGeoTime 
--master spark://master:6066 --deploy-mode cluster  
--executor-memory 2g --executor-cores 2  --name "taxiGeoSpace1" 
 /home/ch08-geotime/ch08-geotime-space--2.0.0.jar

执行结果如下图


执行分析结果

总结


执行时间是3min,后期要了解spark集群的运行参数配置

参考文献

  1. 《Spark快速大数据分析》
  2. 《Spark高级数据分析》
  3. http://spark.apache.org/docs/latest/running-on-yarn.html Running Spark on YARN
2016-12-01 10:19:23 laowu8615 阅读数 2182

如何安装Spark

安装和使用Spark有几种不同方式。你可以在自己的电脑上将Spark作为一个独立的框架安装或者从诸如ClouderaHortonWorksMapR之类的供应商处获取一个Spark虚拟机镜像直接使用。或者你也可以使用在云端环境(如Databricks Cloud)安装并配置好的Spark

在本文中,我们将把Spark作为一个独立的框架安装并在本地启动它。最近Spark刚刚发布了1.2.0版本。我们将用这一版本完成示例应用的代码展示。

如何运行Spark

当你在本地机器安装了Spark或使用了基于云端的Spark后,有几种不同的方式可以连接到Spark引擎。

下表展示了不同的Spark运行模式所需的Master URL参数。

 

如何与Spark交互

Spark启动并运行后,可以用Spark shell连接到Spark引擎进行交互式数据分析。Spark shell支持ScalaPython两种语言。Java不支持交互式的Shell,因此这一功能暂未在Java语言中实现。

可以用spark-shell.cmdpyspark.cmd命令分别运行Scala版本和Python版本的Spark Shell

Spark网页控制台

不论Spark运行在哪一种模式下,都可以通过访问Spark网页控制台查看Spark的作业结果和其他的统计数据,控制台的URL地址如下:

http://localhost:4040

Spark控制台如下图3所示,包括StagesStorageEnvironmentExecutors四个标签页

(点击查看大图)

 

3. Spark网页控制台

共享变量

Spark提供两种类型的共享变量可以提升集群环境中的Spark程序运行效率。分别是广播变量和累加器。

广播变量:广播变量可以在每台机器上缓存只读变量而不需要为各个任务发送该变量的拷贝。他们可以让大的输入数据集的集群拷贝中的节点更加高效。

下面的代码片段展示了如何使用广播变量。

//

// Broadcast Variables

//

val broadcastVar = sc.broadcast(Array(1, 2, 3))

broadcastVar.value

累加器:只有在使用相关操作时才会添加累加器,因此它可以很好地支持并行。累加器可用于实现计数(就像在MapReduce中那样)或求和。可以用add方法将运行在集群上的任务添加到一个累加器变量中。不过这些任务无法读取变量的值。只有驱动程序才能够读取累加器的值。

下面的代码片段展示了如何使用累加器共享变量:

//

// Accumulators

//

 

val accum = sc.accumulator(0, "My Accumulator")

 

sc.parallelize(Array(1, 2, 3, 4)).foreach(x => accum += x)

 

accum.value

Spark应用示例

本篇文章中所涉及的示例应用是一个简单的字数统计应用。这与学习用Hadoop进行大数据处理时的示例应用相同。我们将在一个文本文件上执行一些数据分析查询。本示例中的文本文件和数据集都很小,不过无须修改任何代码,示例中所用到的Spark查询同样可以用到大容量数据集之上。

为了让讨论尽量简单,我们将使用Spark Scala Shell

首先让我们看一下如何在你自己的电脑上安装Spark

前提条件:

· 为了让Spark能够在本机正常工作,你需要安装Java开发工具包(JDK)。这将包含在下面的第一步中。

· 同样还需要在电脑上安装Spark软件。下面的第二步将介绍如何完成这项工作。

注:下面这些指令都是以Windows环境为例。如果你使用不同的操作系统环境,需要相应的修改系统变量和目录路径已匹配你的环境。

I. 安装JDK

1)从Oracle网站上下载JDK。推荐使用JDK 1.7版本

JDK安装到一个没有空格的目录下。对于Windows用户,需要将JDK安装到像c:\dev这样的文件夹下,而不能安装到“c:\Program Files”文件夹下。“c:\Program Files”文件夹的名字中包含空格,如果软件安装到这个文件夹下会导致一些问题。

注:不要“c:\Program Files”文件夹中安装JDK或(第二步中所描述的)Spark软件。

2)完成JDK安装后,切换至JDK 1.7目录下的”bin“文件夹,然后键入如下命令,验证JDK是否正确安装:

java -version

如果JDK安装正确,上述命令将显示Java版本。

II. 安装Spark软件:

Spark网站上下载最新版本的Spark。在本文发表时,最新的Spark版本是1.2。你可以根据Hadoop的版本选择一个特定的Spark版本安装。我下载了与Hadoop 2.4或更高版本匹配的Spark,文件名是spark-1.2.0-bin-hadoop2.4.tgz

将安装文件解压到本地文件夹中(如:c:\dev)。

为了验证Spark安装的正确性,切换至Spark文件夹然后用如下命令启动Spark Shell。这是Windows环境下的命令。如果使用LinuxMac OS,请相应地编辑命令以便能够在相应的平台上正确运行。

c:

cd c:\dev\spark-1.2.0-bin-hadoop2.4

bin\spark-shell

如果Spark安装正确,就能够在控制台的输出中看到如下信息。

….

15/01/17 23:17:46 INFO HttpServer: Starting HTTP Server

15/01/17 23:17:46 INFO Utils: Successfully started service 'HTTP class server' on port 58132.

Welcome to

      ____              __

     / __/__  ___ _____/ /__

    _\ \/ _ \/ _ `/ __/  '_/

   /___/ .__/\_,_/_/ /_/\_\   version 1.2.0

      /_/

 

Using Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.7.0_71)

Type in expressions to have them evaluated.

Type :help for more information.

….

15/01/17 23:17:53 INFO BlockManagerMaster: Registered BlockManager

15/01/17 23:17:53 INFO SparkILoop: Created spark context..

Spark context available as sc.

可以键入如下命令检查Spark Shell是否工作正常。

sc.version

(或)

sc.appName

完成上述步骤之后,可以键入如下命令退出Spark Shell窗口:

:quit

如果想启动Spark Python Shell,需要先在电脑上安装Python。你可以下载并安装Anaconda,这是一个免费的Python发行版本,其中包括了一些比较流行的科学、数学、工程和数据分析方面的Python包。

然后可以运行如下命令启动Spark Python Shell

c:

cd c:\dev\spark-1.2.0-bin-hadoop2.4

bin\pyspark

Spark示例应用

完成Spark安装并启动后,就可以用Spark API执行数据分析查询了。

这些从文本文件中读取并处理数据的命令都很简单。我们将在这一系列文章的后续文章中向大家介绍更高级的Spark框架使用的用例。

首先让我们用Spark API运行流行的Word Count示例。如果还没有运行Spark Scala Shell,首先打开一个Scala Shell窗口。这个示例的相关命令如下所示:

import org.apache.spark.SparkContext

import org.apache.spark.SparkContext._

 

val txtFile = "README.md"

val txtData = sc.textFile(txtFile)

txtData.cache()

我们可以调用cache函数将上一步生成的RDD对象保存到缓存中,在此之后Spark就不需要在每次数据查询时都重新计算。需要注意的是,cache()是一个延迟操作。在我们调用cache时,Spark并不会马上将数据存储到内存中。只有当在某个RDD上调用一个行动时,才会真正执行这个操作。

现在,我们可以调用count函数,看一下在文本文件中有多少行数据。

txtData.count()

然后,我们可以执行如下命令进行字数统计。在文本文件中统计数据会显示在每个单词的后面。

val wcData = txtData.flatMap(l => l.split(" ")).map(word => (word, 1)).reduceByKey(_ + _)

 

wcData.collect().foreach(println)

如果想查看更多关于如何使用Spark核心API的代码示例,请参考网站上的Spark文档

后续计划

在后续的系列文章中,我们将从Spark SQL开始,学习更多关于Spark生态系统的其他部分。之后,我们将继续了解Spark StreamingSpark MLlibSpark GraphX。我们也会有机会学习像TachyonBlinkDB等框架。

小结

在本文中,我们了解了Apache Spark框架如何通过其标准API帮助完成大数据处理和分析工作。我们还对Spark和传统的MapReduce实现(如Apache Hadoop)进行了比较。SparkHadoop基于相同的HDFS文件存储系统,因此如果你已经在Hadoop上进行了大量投资和基础设施建设,可以一起使用SparkMapReduce

此外,也可以将Spark处理与Spark SQL、机器学习以及Spark Streaming结合在一起。关于这方面的内容我们将在后续的文章中介绍。

利用Spark的一些集成功能和适配器,我们可以将其他技术与Spark结合在一起。其中一个案例就是将SparkKafkaApache Cassandra结合在一起,其中Kafka负责输入的流式数据,Spark完成计算,最后Cassandra NoSQL数据库用于保存计算结果数据。

不过需要牢记的是,Spark生态系统仍不成熟,在安全和与BI工具集成等领域仍然需要进一步的改进。