spark 订阅
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。 展开全文
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。
信息
基    于
MapReduce算法实现的分布式计算
最新版本
2.4.0
外文名
Spark
SPARK基本介绍
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎 [1]  。现在形成一个高速发展应用广泛的生态系统。
收起全文
精华内容
参与话题
问答
  • Spark 入门

    万次阅读 2017-09-13 09:06:28
    Apache Spark是一个轻量级的内存集群计算平台,通过不同的组件来支撑批、流和交互式用例。 Apache Spark是个开源和兼容Hadoop的集群计算平台。由加州大学伯克利分校的AMPLabs开发,作为Berkeley Data Analytics ...

    Spark相对于hadoop所做的改进:

    Spark 速度更快;

    其次,Spark 丰富的API 带来了更强大的易用性;

    最后,Spark 不单单支持传统批处理应用,更支持交互式查询、流式计算、机器学习、图计算等
    各种应用,满足各种不同应用场景下的需求。


    Apache Spark是一个轻量级的内存集群计算平台,通过不同的组件来支撑批、流和交互式用例。

    Apache Spark是个开源和兼容Hadoop的集群计算平台。由加州大学伯克利分校的AMPLabs开发,作为Berkeley Data Analytics Stack(BDAS)的一部分,当下由大数据公司Databricks保驾护航,更是Apache旗下的顶级项目,下图显示了Apache Spark堆栈中的不同组件。


    Apache Spark的5大优势:

    1.更高的性能,因为数据被加载到集群主机的分布式内存中。数据可以被快速的转换迭代,并缓存用以后续的频繁访问需求。很多对Spark感兴趣的朋友可能也会听过这样一句话——在数据全部加载到内存的情况下,Spark可以比Hadoop快100倍,在内存不够存放所有数据的情况下快Hadoop 10倍。

    2.通过建立在Java、Scala、Python、SQL(应对交互式查询)的标准API以方便各行各业使用,同时还含有大量开箱即用的机器学习库。

    3.与现有Hadoop v1 (SIMR) 和2.x (YARN) 生态兼容,因此机构可以进行无缝迁移。


    4.方便下载和安装。方便的shell(REPL: Read-Eval-Print-Loop)可以对API进行交互式的学习。

    5.借助高等级的架构提高生产力,从而可以讲精力放到计算上。

    同时,Apache Spark由Scala实现,代码非常简洁。

    三、安装Apache Spark

    下表列出了一些重要链接和先决条件:

    Current Release 1.0.1 @ http://d3kbcqa49mib13.cloudfront.net/spark-1.0.1.tgz
    Downloads Page https://spark.apache.org/downloads.html
    JDK Version (Required) 1.6 or higher
    Scala Version (Required) 2.10 or higher
    Python (Optional) [2.6, 3.0)
    Simple Build Tool (Required) http://www.scala-sbt.org
    Development Version git clone git://github.com/apache/spark.git
    Building Instructions https://spark.apache.org/docs/latest/building-with-maven.html
    Maven 3.0 or higher

    如图6所示,Apache Spark的部署方式包括standalone、Hadoop V1 SIMR、Hadoop 2 YARN/Mesos。Apache Spark需求一定的Java、Scala或Python知识。这里,我们将专注standalone配置下的安装和运行。

    1.安装JDK 1.6+、Scala 2.10+、Python [2.6,3] 和sbt

    2.下载Apache Spark 1.0.1 Release

    3.在指定目录下Untar和Unzip spark-1.0.1.tgz 

    akuntamukkala@localhost~/Downloads$ pwd 
    /Users/akuntamukkala/Downloads akuntamukkala@localhost~/Downloads$ tar -zxvf spark- 1.0.1.tgz -C /Users/akuntamukkala/spark

    4.运行sbt建立Apache Spark

    akuntamukkala@localhost~/spark/spark-1.0.1$ pwd /Users/akuntamukkala/spark/spark-1.0.1 akuntamukkala@localhost~/spark/spark-1.0.1$ sbt/sbt assembly

    5.发布Scala的Apache Spark standalone REPL

    /Users/akuntamukkala/spark/spark-1.0.1/bin/spark-shell

    如果是Python

    /Users/akuntamukkala/spark/spark-1.0.1/bin/ pyspark

    6.查看SparkUI @ http://localhost:4040

    四、Apache Spark的工作模式

    Spark引擎提供了在集群中所有主机上进行分布式内存数据处理的能力,下图显示了一个典型Spark job的处理流程。


    下图显示了Apache Spark如何在集群中执行一个作业。


    Master控制数据如何被分割,利用了数据本地性,并在Slaves上跟踪所有分布式计算。在某个Slave不可用时,其存储的数据会分配给其他可用的Slaves。虽然当下(1.0.1版本)Master还存在单点故障,但后期必然会被修复。

    五、弹性分布式数据集(Resilient Distributed Dataset,RDD)

    弹性分布式数据集(RDD,从Spark 1.3版本开始已被DataFrame替代)是Apache Spark的核心理念。它是由数据组成的不可变分布式集合,其主要进行两个操作:transformation和action。Transformation是类似在RDD上做 filter()、map()或union() 以生成另一个RDD的操作,而action则是count()、first()、take(n)、collect() 等促发一个计算并返回值到Master或者稳定存储系统的操作。Transformations一般都是lazy的,直到action执行后才会被执行。Spark Master/Driver会保存RDD上的Transformations。这样一来,如果某个RDD丢失(也就是salves宕掉),它可以快速和便捷地转换到集群中存活的主机上。这也就是RDD的弹性所在。

    下图展示了Transformation的lazy:


    我们可以通过下面示例来理解这个概念:从文本中发现5个最常用的word。下图显示了一个可能的解决方案。


    在上面命令中,我们对文本进行读取并且建立字符串的RDD。每个条目代表了文本中的1行。

    scala> val hamlet = sc.textFile(“/Users/akuntamukkala/temp/gutenburg.txt”)
    hamlet: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12
    scala> val topWordCount = hamlet.flatMap(str=>str.split(“ “)). filter(!_.isEmpty).map(word=>(word,1)).reduceByKey(_+_).map{case (word, count) => (count, word)}.sortByKey(false)
    topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at sortByKey at <console>:14

    1. 通过上述命令我们可以发现这个操作非常简单——通过简单的Scala API来连接transformations和actions。

    2. 可能存在某些words被1个以上空格分隔的情况,导致有些words是空字符串,因此需要使用filter(!_.isEmpty)将它们过滤掉。

    3. 每个word都被映射成一个键值对:map(word=>(word,1))。

    4. 为了合计所有计数,这里需要调用一个reduce步骤——reduceByKey(_+_)。 _+_ 可以非常便捷地为每个key赋值。

    5. 我们得到了words以及各自的counts,下一步需要做的是根据counts排序。在Apache Spark,用户只能根据key排序,而不是值。因此,这里需要使用map{case (word, count) => (count, word)}将(word, count)流转到(count, word)。

    6. 需要计算最常用的5个words,因此需要使用sortByKey(false)做一个计数的递减排序。

    上述命令包含了一个.take(5) (an action operation, which triggers computation)和在 /Users/akuntamukkala/temp/gutenburg.txt文本中输出10个最常用的words。在Python shell中用户可以实现同样的功能。

    RDD lineage可以通过toDebugString(一个值得记住的操作)来跟踪。

    scala> topWordCount.take(5).foreach(x=>println(x))
    (1044,the)
    (730,and)
    (679,of)
    (648,to)
    (511,I)

    常用的Transformations:

    Transformation & Purpose Example & Result
    filter(func) Purpose: new RDD by selecting those data elements on which func returns true scala> val rdd = sc.parallelize(List(“ABC”,”BCD”,”DEF”)) scala> val filtered = rdd.filter(_.contains(“C”)) scala> filtered.collect()Result:
    Array[String] = Array(ABC, BCD)
    map(func) Purpose: return new RDD by applying func on each data element scala> val rdd=sc.parallelize(List(1,2,3,4,5)) scala> val times2 = rdd.map(_*2) scala> times2.collect()Result:
    Array[Int] = Array(2, 4, 6, 8, 10)
    flatMap(func) Purpose: Similar to map but func returns a Seq instead of a value. For example, mapping a sentence into a Seq of words scala> val rdd=sc.parallelize(List(“Spark is awesome”,”It is fun”)) scala> val fm=rdd.flatMap(str=>str.split(“ “)) scala> fm.collect()Result:
    Array[String] = Array(Spark, is, awesome, It, is, fun)
    reduceByKey(func,[numTasks]) Purpose: To aggregate values of a key using a function. “numTasks” is an optional parameter to specify number of reduce tasks scala> val word1=fm.map(word=>(word,1)) scala> val wrdCnt=word1.reduceByKey(_+_) scala> wrdCnt.collect()Result:
    Array[(String, Int)] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1))
    groupByKey([numTasks]) Purpose: To convert (K,V) to (K,Iterable<V>) scala> val cntWrd = wrdCnt.map{case (word, count) => (count, word)} scala> cntWrd.groupByKey().collect()Result:
    Array[(Int, Iterable[String])] = Array((1,ArrayBuffer(It, awesome, Spark, fun)), (2,ArrayBuffer(is)))
    distinct([numTasks]) Purpose: Eliminate duplicates from RDD scala> fm.distinct().collect() Result:
    Array[String] = Array(is, It, awesome, Spark, fun)

    常用的集合操作:

    Transformation and Purpose Example and Result
    union()
    Purpose: new RDD containing all elements from source RDD and argument.
    Scala> val rdd1=sc.parallelize(List(‘A’,’B’))
    scala> val rdd2=sc.parallelize(List(‘B’,’C’))
    scala> rdd1.union(rdd2).collect()
    Result:
    Array[Char] = Array(A, B, B, C)
    intersection()
    Purpose: new RDD containing only common elements from source RDD and argument.
    Scala> rdd1.intersection(rdd2).collect()
    Result:
    Array[Char] = Array(B)
    cartesian()
    Purpose: new RDD cross product of all elements from source RDD and argument
    Scala> rdd1.cartesian(rdd2).collect()
    Result:
    Array[(Char, Char)] = Array((A,B), (A,C), (B,B), (B,C))
    subtract()
    Purpose: new RDD created by removing data elements in source RDD in common with argument
    scala> rdd1.subtract(rdd2).collect() Result:
    Array[Char] = Array(A)
    join(RDD,[numTasks])
    Purpose: When invoked on (K,V) and (K,W), this operation creates a new RDD of (K, (V,W))
    scala> val personFruit = sc.parallelize(Seq((“Andy”, “Apple”), (“Bob”, “Banana”), (“Charlie”, “Cherry”), (“Andy”,”Apricot”)))
    scala> val personSE = sc.parallelize(Seq((“Andy”, “Google”), (“Bob”, “Bing”), (“Charlie”, “Yahoo”), (“Bob”,”AltaVista”)))
    scala> personFruit.join(personSE).collect()
    Result:
    Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista)))
    cogroup(RDD,[numTasks])
    Purpose: To convert (K,V) to (K,Iterable<V>)
    scala> personFruit.cogroup(personSe).collect()
    Result:
    Array[(String, (Iterable[String], Iterable[String]))] = Array((Andy,(ArrayBuffer(Apple, Apricot),ArrayBuffer(google))), (Charlie,(ArrayBuffer(Cherry),ArrayBuffer(Yahoo))), (Bob,(ArrayBuffer(Banana),ArrayBuffer(Bing, AltaVista))))

    更多transformations信息,请查看http://spark.apache.org/docs/latest/programming-guide.html#transformations

    常用的actions

    Action & Purpose Example & Result
    count() Purpose: get the number of data elements in the RDD scala> val rdd = sc.parallelize(list(‘A’,’B’,’c’)) scala> rdd.count()Result:
    long = 3
    collect() Purpose: get all the data elements in an RDD as an array scala> val rdd = sc.parallelize(list(‘A’,’B’,’c’)) scala> rdd.collect()Result:
    Array[char] = Array(A, B, c)
    reduce(func) Purpose: Aggregate the data elements in an RDD using this function which takes two arguments and returns one scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.reduce(_+_)Result:
    Int = 10
    take (n) Purpose: : fetch first n data elements in an RDD. computed by driver program. Scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.take(2)Result:
    Array[Int] = Array(1, 2)
    foreach(func) Purpose: execute function for each data element in RDD. usually used to update an accumulator(discussed later) or interacting with external systems. Scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.foreach(x=>println(“%s*10=%s”. format(x,x*10)))Result:
    1*10=10 4*10=40 3*10=30 2*10=20
    first() Purpose: retrieves the first data element in RDD. Similar to take(1) scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.first()Result:
    Int = 1
    saveAsTextFile(path) Purpose: Writes the content of RDD to a text file or a set of text files to local file system/ HDFS scala> val hamlet = sc.textFile(“/users/akuntamukkala/ temp/gutenburg.txt”) scala> hamlet.filter(_.contains(“Shakespeare”)). saveAsTextFile(“/users/akuntamukkala/temp/ filtered”)Result:
    akuntamukkala@localhost~/temp/filtered$ ls _SUCCESS part-00000 part-00001

    更多actions参见http://spark.apache.org/docs/latest/programming-guide.html#actions 

    六、RDD持久性

    Apache Spark中一个主要的能力就是在集群内存中持久化/缓存RDD。这将显著地提升交互速度。下表显示了Spark中各种选项。

    Storage Level Purpose
    MEMORY_ONLY (Default level) This option stores RDD in available cluster memory as deserialized Java objects. Some partitions may not be cached if there is not enough cluster memory. Those partitions will be recalculated on the fly as needed.
    MEMORY_AND_DISK This option stores RDD as deserialized Java objects. If RDD does not fit in cluster memory, then store those partitions on the disk and read them as needed.
    MEMORY_ONLY_SER This options stores RDD as serialized Java objects (One byte array per partition). This is more CPU intensive but saves memory as it is more space efficient. Some partitions may not be cached. Those will be recalculated on the fly as needed.
    MEMORY_ONLY_DISK_SER This option is same as above except that disk is used when memory is not sufficient.
    DISC_ONLY This option stores the RDD only on the disk
    MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as other levels but partitions are replicated on 2 slave nodes

    上面的存储等级可以通过RDD. cache()操作上的 persist()操作访问,可以方便地指定MEMORY_ONLY选项。关于持久化等级的更多信息,可以访问这里http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence。

    Spark使用Least Recently Used (LRU)算法来移除缓存中旧的、不常用的RDD,从而释放出更多可用内存。同样还提供了一个unpersist() 操作来强制移除缓存/持久化的RDD。

    七、变量共享

    Accumulators。Spark提供了一个非常便捷地途径来避免可变的计数器和计数器同步问题——Accumulators。Accumulators在一个Spark context中通过默认值初始化,这些计数器在Slaves节点上可用,但是Slaves节点不能对其进行读取。它们的作用就是来获取原子更新,并将其转发到Master。Master是唯一可以读取和计算所有更新合集的节点。举个例子:

    akuntamukkala@localhost~/temp$ cat output.log
    error
    warning
    info
    trace
    error
    info
    info
    scala> val nErrors=sc.accumulator(0.0)
    scala> val logs = sc.textFile(“/Users/akuntamukkala/temp/output.log”)
    scala> logs.filter(_.contains(“error”)).foreach(x=>nErrors+=1)
    scala> nErrors.value
    Result:Int = 2

    Broadcast Variables。实际生产中,通过指定key在RDDs上对数据进行合并的场景非常常见。在这种情况下,很可能会出现给slave nodes发送大体积数据集的情况,让其负责托管需要做join的数据。因此,这里很可能存在巨大的性能瓶颈,因为网络IO比内存访问速度慢100倍。为了解决这个问题,Spark提供了Broadcast Variables,如其名称一样,它会向slave nodes进行广播。因此,节点上的RDD操作可以快速访问Broadcast Variables值。举个例子,期望计算一个文件中所有路线项的运输成本。通过一个look-up table指定每种运输类型的成本,这个look-up table就可以作为Broadcast Variables。

    akuntamukkala@localhost~/temp$ cat packagesToShip.txt ground
    express
    media
    priority
    priority
    ground
    express
    media
    scala> val map = sc.parallelize(Seq((“ground”,1),(“med”,2), (“priority”,5),(“express”,10))).collect().toMap
    map: scala.collection.immutable.Map[String,Int] = Map(ground -> 1, media -> 2, priority -> 5, express -> 10)
    scala> val bcMailRates = sc.broadcast(map)

    上述命令中,我们建立了一个broadcast variable,基于服务类别成本的map。

    scala> val pts = sc.textFile(“/Users/akuntamukkala/temp/packagesToShip.txt”)

    在上述命令中,我们通过broadcast variable的mailing rates来计算运输成本。

    scala> pts.map(shipType=>(shipType,1)).reduceByKey(_+_). map{case (shipType,nPackages)=>(shipType,nPackages*bcMailRates. value(shipType))}.collect()

    通过上述命令,我们使用accumulator来累加所有运输的成本。详细信息可通过下面的PDF查看http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf。

    八、Spark SQL

    通过Spark Engine,Spark SQL提供了一个便捷的途径来进行交互式分析,使用一个被称为SchemaRDD类型的RDD。SchemaRDD可以通过已有RDDs建立,或者其他外部数据格式,比如Parquet files、JSON数据,或者在Hive上运行HQL。SchemaRDD非常类似于RDBMS中的表格。一旦数据被导入SchemaRDD,Spark引擎就可以对它进行批或流处理。Spark SQL提供了两种类型的Contexts——SQLContext和HiveContext,扩展了SparkContext的功能。

    SparkContext提供了到简单SQL parser的访问,而HiveContext则提供了到HiveQL parser的访问。HiveContext允许企业利用已有的Hive基础设施。

    这里看一个简单的SQLContext示例。

    下面文本中的用户数据通过“|”来分割。

    John Smith|38|M|201 East Heading Way #2203,Irving, TX,75063 Liana Dole|22|F|1023 West Feeder Rd, Plano,TX,75093 Craig Wolf|34|M|75942 Border Trail,Fort Worth,TX,75108 John Ledger|28|M|203 Galaxy Way,Paris, TX,75461 Joe Graham|40|M|5023 Silicon Rd,London,TX,76854

    定义Scala case class来表示每一行:

    case class Customer(name:String,age:Int,gender:String,address: String)

    下面的代码片段体现了如何使用SparkContext来建立SQLContext,读取输入文件,将每一行都转换成SparkContext中的一条记录,并通过简单的SQL语句来查询30岁以下的男性用户。

    val sparkConf = new SparkConf().setAppName(“Customers”)
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val r = sc.textFile(“/Users/akuntamukkala/temp/customers.txt”) val records = r.map(_.split(‘|’))
    val c = records.map(r=>Customer(r(0),r(1).trim.toInt,r(2),r(3))) c.registerAsTable(“customers”)
    sqlContext.sql(“select * from customers where gender=’M’ and age <
                30”).collect().foreach(println) Result:[John Ledger,28,M,203 Galaxy Way,Paris,
                TX,75461]

    更多使用SQL和HiveQL的示例请访问下面链接https://spark.apache.org/docs/latest/sql-programming-guide.html、https://databricks-training.s3.amazonaws.com/data-exploration-using-spark-sql.html。


    九、Spark Streaming

    Spark Streaming提供了一个可扩展、容错、高效的途径来处理流数据,同时还利用了Spark的简易编程模型。从真正意义上讲,Spark Streaming会将流数据转换成micro batches,从而将Spark批处理编程模型应用到流用例中。这种统一的编程模型让Spark可以很好地整合批量处理和交互式流分析。下图显示了Spark Streaming可以从不同数据源中读取数据进行分析。


    Spark Streaming中的核心抽象是Discretized Stream(DStream)。DStream由一组RDD组成,每个RDD都包含了规定时间(可配置)流入的数据。图12很好地展示了Spark Streaming如何通过将流入数据转换成一系列的RDDs,再转换成DStream。每个RDD都包含两秒(设定的区间长度)的数据。在Spark Streaming中,最小长度可以设置为0.5秒,因此处理延时可以达到1秒以下。

    Spark Streaming同样提供了 window operators,它有助于更有效率在一组RDD( a rolling window of time)上进行计算。同时,DStream还提供了一个API,其操作符(transformations和output operators)可以帮助用户直接操作RDD。下面不妨看向包含在Spark Streaming下载中的一个简单示例。示例是在Twitter流中找出趋势hashtags,详见下面代码。

    spark-1.0.1/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
    val sparkConf = new SparkConf().setAppName(“TwitterPopularTags”)
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters)

    上述代码用于建立Spark Streaming Context。Spark Streaming将在DStream中建立一个RDD,包含了每2秒流入的tweets。

    val hashTags = stream.flatMap(status => status.getText.split(“ “).filter(_.startsWith(“#”)))

    上述代码片段将Tweet转换成一组words,并过滤出所有以a#开头的。

    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) => (count, topic)}. transform(_.sortByKey(false))

    上述代码展示了如何整合计算60秒内一个hashtag流入的总次数。

    topCounts60.foreachRDD(rdd => {
    val topList = rdd.take(10)
    println(“\nPopular topics in last 60 seconds (%s
    total):”.format(rdd.count())) topList.foreach{case (count, tag) => println(“%s (%s
    tweets)”.format(tag, count))} })

    上面代码将找出top 10趋势tweets,然后将其打印。

    ssc.start()

    上述代码让Spark Streaming Context 开始检索tweets。一起聚焦一些常用操作,假设我们正在从一个socket中读入流文本。

    al lines = ssc.socketTextStream(“localhost”, 9999, StorageLevel.MEMORY_AND_DISK_SER)


    更多operators请访问http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations

    Spark Streaming拥有大量强大的output operators,比如上文提到的 foreachRDD(),了解更多可访问   http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations

    十、附加学习资源

    展开全文
  • SparkSpark基础教程

    万次阅读 多人点赞 2019-03-20 12:33:42
    Spark最初由美国加州伯克利大学的AMP实验室于2009年开发,是基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序。 Spark特点 Spark具有如下几个主要特点: 运行速度快:Spark使用先进...

    Spark最初由美国加州伯克利大学的AMP实验室于2009年开发,是基于内存计算大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序。

    Spark特点

    Spark具有如下几个主要特点:

    • 运行速度快:Spark使用先进的DAG(Directed Acyclic Graph,有向无环图)执行引擎,以支持循环数据流与内存计算,基于内存的执行速度可比Hadoop MapReduce快上百倍,基于磁盘的执行速度也能快十倍;
    • 容易使用:Spark支持使用Scala、Java、Python和R语言进行编程,简洁的API设计有助于用户轻松构建并行程序,并且可以通过Spark Shell进行交互式编程;
    • 通用性:Spark提供了完整而强大的技术栈,包括SQL查询、流式计算、机器学习和图算法组件,这些组件可以无缝整合在同一个应用中,足以应对复杂的计算;
    • 运行模式多样:Spark可运行于独立的集群模式中,或者运行于Hadoop中,也可运行于Amazon EC2等云环境中,并且可以访问HDFS、Cassandra、HBase、Hive等多种数据源。

    Spark相对于Hadoop的优势

    Hadoop虽然已成为大数据技术的事实标准,但其本身还存在诸多缺陷,最主要的缺陷是其MapReduce计算模型延迟过高,无法胜任实时、快速计算的需求,因而只适用于离线批处理的应用场景。

    回顾Hadoop的工作流程,可以发现Hadoop存在如下一些缺点:

    • 表达能力有限。计算都必须要转化成Map和Reduce两个操作,但这并不适合所有的情况,难以描述复杂的数据处理过程;
    • 磁盘IO开销大。每次执行时都需要从磁盘读取数据,并且在计算完成后需要将中间结果写入到磁盘中,IO开销较大;
    • 延迟高。一次计算可能需要分解成一系列按顺序执行的MapReduce任务,任务之间的衔接由于涉及到IO开销,会产生较高延迟。而且,在前一个任务执行完成之前,其他任务无法开始,难以胜任复杂、多阶段的计算任务。

    Spark主要具有如下优点:

    • Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比MapReduce更灵活
    • Spark提供了内存计算,中间结果直接放到内存中,带来了更高的迭代运算效率;
    • Spark基于DAG的任务调度执行机制,要优于MapReduce的迭代执行机制。

    Spark最大的特点就是将计算数据、中间结果都存储在内存中,大大减少了IO开销

    Spark提供了多种高层次、简洁的API,通常情况下,对于实现相同功能的应用程序,Spark的代码量要比Hadoop少2-5倍。

    Spark并不能完全替代Hadoop,主要用于替代Hadoop中的MapReduce计算模型。实际上,Spark已经很好地融入了Hadoop生态圈,并成为其中的重要一员,它可以借助于YARN实现资源调度管理,借助于HDFS实现分布式存储。

    Spark生态系统

    Spark的生态系统主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX 等组件,各个组件的具体功能如下:

    • Spark Core:Spark Core包含Spark的基本功能,如内存计算、任务调度、部署模式故障恢复、存储管理等。Spark建立在统一的抽象RDD之上,使其可以以基本一致的方式应对不同的大数据处理场景;通常所说的Apache Spark,就是指Spark Core;
    • Spark SQL:Spark SQL允许开发人员直接处理RDD,同时也可查询Hive、HBase等外部数据源。Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行查询,并进行更复杂的数据分析;
    • Spark Streaming:Spark Streaming支持高吞吐量、可容错处理的实时流数据处理,其核心思路是将流式计算分解成一系列短小的批处理作业。Spark Streaming支持多种数据输入源,如Kafka、Flume和TCP套接字等;
    • MLlib(机器学习):MLlib提供了常用机器学习算法的实现,包括聚类、分类、回归、协同过滤等,降低了机器学习的门槛,开发人员只要具备一定的理论知识就能进行机器学习的工作;
    • GraphX(图计算):GraphX是Spark中用于图计算的API,可认为是Pregel在Spark上的重写及优化,Graphx性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。

    Spark基本概念

    在具体讲解Spark运行架构之前,需要先了解几个重要的概念:

    • RDD:是弹性分布式数据集(Resilient Distributed Dataset)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型
    • DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系
    • Executor:是运行在工作节点(Worker Node)上的一个进程负责运行任务,并为应用程序存储数据;
    • 应用:用户编写的Spark应用程序;
    • 任务:运行在Executor上的工作单元
    • 作业:一个作业包含多个RDD及作用于相应RDD上的各种操作
    • 阶段:是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为“阶段”,或者也被称为“任务集”。

    Spark结构设计

    Spark运行架构包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(Worker Node)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)。其中,集群资源管理器可以是Spark自带的资源管理器,也可以是YARN或Mesos等资源管理框架。

    Spark各种概念之间的关系

    在Spark中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。当执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行任务,运行结束后,执行结果会返回给任务控制节点,或者写到HDFS或者其他数据库中。
    在这里插入图片描述

    Executor的优点

    与Hadoop MapReduce计算框架相比,Spark所采用的Executor有两个优点:

    1. 利用多线程来执行具体的任务(Hadoop MapReduce采用的是进程模型),减少任务的启动开销;
    2. Executor中有一个BlockManager存储模块,会将内存和磁盘共同作为存储设备,当需要多轮迭代计算时,可以将中间结果存储到这个存储模块里,下次需要时,就可以直接读该存储模块里的数据,而不需要读写到HDFS等文件系统里,因而有效减少了IO开销;或者在交互式查询场景下,预先将表缓存到该存储系统上,从而可以提高读写IO性能。

    Spark运行基本流程

    Spark的基本运行流程如下:

    1. 当一个Spark应用被提交时,首先需要为这个应用构建起基本的运行环境,即由任务控制节点(Driver)创建一个SparkContext,由SparkContext负责和资源管理器(Cluster Manager)的通信以及进行资源的申请、任务的分配和监控等。SparkContext会向资源管理器注册并申请运行Executor的资源;
    2. 资源管理器为Executor分配资源,并启动Executor进程,Executor运行情况将随着“心跳”发送到资源管理器上;
    3. SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAG调度器(DAGScheduler)进行解析,将DAG图分解成多个“阶段”(每个阶段都是一个任务集),并且计算出各个阶段之间的依赖关系,然后把一个个“任务集”提交给底层的任务调度器(TaskScheduler)进行处理;Executor向SparkContext申请任务,任务调度器将任务分发给Executor运行,同时,SparkContext将应用程序代码发放给Executor;
    4. 任务在Executor上运行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。
      在这里插入图片描述

    Spark运行架构的特点

    Spark运行架构具有以下特点:

    1. 每个应用都有自己专属的Executor进程,并且该进程在应用运行期间一直驻留。Executor进程以多线程的方式运行任务,减少了多进程任务频繁的启动开销,使得任务执行变得非常高效和可靠;
    2. Spark运行过程与资源管理器无关,只要能够获取Executor进程并保持通信即可;
    3. Executor上有一个BlockManager存储模块,类似于键值存储系统(把内存和磁盘共同作为存储设备),在处理迭代计算任务时,不需要把中间结果写入到HDFS等文件系统,而是直接放在这个存储系统上,后续有需要时就可以直接读取;在交互式查询场景下,也可以把表提前缓存到这个存储系统上,提高读写IO性能;
    4. 任务采用了数据本地性和推测执行等优化机制。数据本地性是尽量将计算移到数据所在的节点上进行,即“计算向数据靠拢”,因为移动计算比移动数据所占的网络资源要少得多。而且,Spark采用了延时调度机制,可以在更大的程度上实现执行过程优化。比如,拥有数据的节点当前正被其他的任务占用,那么,在这种情况下是否需要将数据移动到其他的空闲节点呢?答案是不一定。因为,如果经过预测发现当前节点结束当前任务的时间要比移动数据的时间还要少,那么,调度就会等待,直到当前节点可用。

    Spark的部署模式

    Spark支持的三种典型集群部署方式,即standalone、Spark on Mesos和Spark on YARN;然后,介绍在企业中是如何具体部署和应用Spark框架的,在企业实际应用环境中,针对不同的应用场景,可以采用不同的部署应用方式,或者采用Spark完全替代原有的Hadoop架构,或者采用Spark和Hadoop一起部署的方式。

    Spark三种部署方式

    Spark应用程序在集群上部署运行时,可以由不同的组件为其提供资源管理调度服务(资源包括CPU、内存等)。比如,可以使用自带的独立集群管理器(standalone),或者使用YARN,也可以使用Mesos。因此,Spark包括三种不同类型的集群部署方式,包括standalone、Spark on Mesos和Spark on YARN。
    1.standalone模式
    与MapReduce1.0框架类似,Spark框架本身也自带了完整的资源调度管理服务,可以独立部署到一个集群中,而不需要依赖其他系统来为其提供资源管理调度服务。在架构的设计上,Spark与MapReduce1.0完全一致,都是由一个Master和若干个Slave构成,并且以槽(slot)作为资源分配单位。不同的是,Spark中的槽不再像MapReduce1.0那样分为Map 槽和Reduce槽,而是只设计了统一的一种槽提供给各种任务来使用。
    2.Spark on Mesos模式
    Mesos是一种资源调度管理框架,可以为运行在它上面的Spark提供服务。Spark on Mesos模式中,Spark程序所需要的各种资源,都由Mesos负责调度。由于Mesos和Spark存在一定的血缘关系,因此,Spark这个框架在进行设计开发的时候,就充分考虑到了对Mesos的充分支持,因此,相对而言,Spark运行在Mesos上,要比运行在YARN上更加灵活、自然。目前,Spark官方推荐采用这种模式,所以,许多公司在实际应用中也采用该模式。
    3. Spark on YARN模式
    Spark可运行于YARN之上,与Hadoop进行统一部署,即“Spark on YARN”,其架构如图9-13所示,资源管理和调度依赖YARN,分布式存储则依赖HDFS。
    在这里插入图片描述

    Hadoop和Spark的统一部署

    一方面,由于Hadoop生态系统中的一些组件所实现的功能,目前还是无法由Spark取代的,比如,Storm可以实现毫秒级响应的流计算,但是,Spark则无法做到毫秒级响应。另一方面,企业中已经有许多现有的应用,都是基于现有的Hadoop组件开发的,完全转移到Spark上需要一定的成本。因此,在许多企业实际应用中,Hadoop和Spark的统一部署是一种比较现实合理的选择。
    由于Hadoop MapReduce、HBase、Storm和Spark等,都可以运行在资源管理框架YARN之上,因此,可以在YARN之上进行统一部署(如图9-16所示)。这些不同的计算框架统一运行在YARN中,可以带来如下好处:

    •  计算资源按需伸缩;
    •  不用负载应用混搭,集群利用率高;
    •  共享底层存储,避免数据跨集群迁移。
      在这里插入图片描述
    展开全文
  • Spark专场】SparkSQL在ETL中的应用

    千人学习 2017-06-12 14:58:01
    当今,企业”上云”节奏正在加速,特别是在以人工智能技术为代表的新一波技术浪潮推动下,企业一方面通过云技术增强了自身的数据存储连接、计算以及智能应用能力;另一方面,利用基于云计算之上的大数据、人工智能等...
  • 大数据Spark实战视频教程

    万人学习 2016-11-10 14:26:54
    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室...
  • Spark快速大数据处理

    万人学习 2019-04-24 19:32:53
    5.Spark2实时大数据处理 6.Oozie5-大数据流程引擎 课程特点: 1.最新API: Hadoop3/Spark2/Hive3/Oozie5 2.手工搭建集群环境:编译+搭建 3.配套资源:分阶段镜像+课件+安装资源,其中安装...
  • Spark介绍(一)简介

    千次阅读 2018-09-14 10:51:50
    一、Spark简介 Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab)开发的通用内存并行计算框架 Spark使用Scala语言进行实现,它是一种面向对象、函数式编程语言,能够像操作本地集合...

    一、Spark简介

    Spark是加州大学伯克利分校AMP实验室(Algorithms, Machines, and People Lab开发通用内存并行计算框架

    Spark使用Scala语言进行实现,它是一种面向对象、函数式编程语言,能够像操作本地集合对象一样轻松地操作分布式数据具有以下特点。

    1.运行速度快:Spark拥有DAG执行引擎,支持在内存中对数据进行迭代计算。官方提供的数据表明,如果数据由磁盘读取,速度是Hadoop MapReduce10倍以上,如果数据从内存中读取,速度可以高达100多倍

    2.用性好:Spark不仅支持Scala编写应用程序,而且支持JavaPython等语言进行编写,特别是Scala是一种高效、可拓展的语言,能够用简洁的代码处理较为复杂的处理工作

    3.通用性强:Spark生态圈即BDAS(伯克利数据分析栈)包含了Spark CoreSpark SQLSpark StreamingMLLibGraphX等组件,这些组件分别处理Spark Core提供内存计算框架、SparkStreaming的实时处理应用、Spark SQL的即席查询、MLlibMLbase的机器学习和GraphX的图处理。

    4.随处运行:Spark具有很强的适应性,能够读取HDFSCassandraHBaseS3Techyon为持久层读写原生数据,能够以MesosYARN和自身携带的Standalone作为资源管理器调度job,来完成Spark应用程序的计算

                 

    二、SparkHadoop差异

    Spark是在借鉴了MapReduce之上发展而来的,继承了其分布式并行计算的优点并改进了MapReduce明显的缺陷,具体如下:

    首先,Spark把中间数据放到内存中,迭代运算效率高。MapReduce中计算结果需要落地,保存到磁盘上,这样势必会影响整体速度,而Spark支持DAG图的分布式并行计算的编程框架,减少了迭代过程中数据的落地,提高了处理效率。

    其次,Spark容错性高。Spark引进了弹性分布式数据集RDD (Resilient Distributed Dataset) 的抽象,它是分布在一组节点中的只读对象集合,这些集合是弹性的,如果数据集一部分丢失,则可以根据“血统”(即充许基于数据衍生过程)对它们进行重建。另外在RDD计算时可以通过CheckPoint来实现容错,而CheckPoint有两种方式:CheckPoint Data,和Logging The Updates,用户可以控制采用哪种方式来实现容错。

    最后,Spark更加通用。不像Hadoop只提供了MapReduce两种操作,Spark提供的数据集操作类型有很多种,大致分为:TransformationsActions两大类。Transformations包括MapFilterFlatMapSampleGroupByKeyReduceByKeyUnionJoinCogroupMapValuesSortPartionBy等多种操作类型,同时还提供Count, Actions包括CollectReduceLookupSave等操作。另外各个处理节点之间的通信模型不再像Hadoop只有Shuffle一种模式,用户可以命名、物化,控制中间结果的存储、分区等

    三、Spark的适用场景

    目前大数据处理场景有以下几个类型:

    1.  复杂的批量处理(Batch Data Processing),偏重点在于处理海量数据的能力,至于处理速度可忍受,通常的时间可能是在数十分钟到数小时;

    2.  基于历史数据的交互式查询(Interactive Query),通常的时间在数十秒到数十分钟之间

    3.  基于实时数据流的数据处理(Streaming Data Processing),通常在数百毫秒到数秒之间

    目前对以上三种场景需求都有比较成熟的处理框架,第一种情况可以用HadoopMapReduce来进行批量海量数据处理,第二种情况可以Impala进行交互式查询,对于第三中情况可以用Storm分布式处理框架处理实时流式数据。以上三者都是比较独立,各自一套维护成本比较高,而Spark的出现能够一站式平台满意以上需求。

    通过以上分析,总结Spark场景有以下几个:

    1.Spark是基于内存的迭代计算框架,适用于需要多次操作特定数据集的应用场合。需要反复操作的次数越多,所需读取的数据量越大,受益越大,数据量小但是计算密集度较大的场合,受益就相对较小

    2.由于RDD的特性,Spark不适用那种异步细粒度更新状态的应用,例如web服务的存储或者是增量的web爬虫和索引。就是对于那种增量修改的应用模型不适合

    3.数据量不是特别大,但是要求实时统计分析需求

    四、Spark常用术语

    术语

    描述

    Application

    Spark的应用程序,包含一个Driver program和若干Executor

    SparkContext

    Spark应用程序的入口,负责调度各个运算资源,协调各个Worker Node上的Executor

    Driver Program

    运行Applicationmain()函数并且创建SparkContext

    Executor

    是为Application运行在Worker node上的一个进程,该进程负责运行Task,并且负责将数据存在内存或者磁盘上。

    每个Application都会申请各自的Executor来处理任务

    Cluster Manager

    在集群上获取资源的外部服务

    (例如:Standalone、Mesos、Yarn)

    Worker Node

    集群中任何可以运行Application代码的节点,运行一个或多个Executor进程

    Task

    运行在Executor上的工作单元

    Job

    SparkContext提交的具体Action操作,常和Action对应

    Stage

    每个Job会被拆分很多组task,每组任务被称为Stage,也称TaskSet

    RDD

    Resilient distributed datasets的简称,中文为弹性分布式数据集;Spark最核心的模块和类

    DAGScheduler

    根据Job构建基于StageDAG,并提交StageTaskScheduler

    TaskScheduler

    Taskset提交给Worker node集群运行并返回结果

    Transformations

    Spark API的一种类型,Transformation返回值还是一个RDD,

    所有的Transformation采用的都是懒策略,如果只是将Transformation提交是不会执行计算的

    Action

    Spark API的一种类型,Action返回值不是一个RDD,而是一个scala集合;计算只有在Action被提交的时候计算才被触发。

     

    展开全文
  • Spark学习总结(一)

    万次阅读 多人点赞 2018-05-31 14:10:25
    RDD及其特点1、RDD是Spark的核心数据模型,但是个抽象类,全称为Resillient Distributed Dataset,即弹性分布式数据集。2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在...


    RDD及其特点

    1、RDD是Spark的核心数据模型,但是个抽象类,全称为Resillient Distributed Dataset,即弹性分布式数据集。

    2、RDD在抽象上来说是一种元素集合,包含了数据。它是被分区的,分为多个分区,每个分区分布在集群中的不同节点上,从而让RDD中的数据可以被并行操作。(分布式数据集)

    3、RDD通常通过Hadoop上的文件,即HDFS文件或者Hive表,来进行创建;有时也可以通过应用程序中的集合来创建。

    4、RDD最重要的特性就是,提供了容错性,可以自动从节点失败中恢复过来。即如果某个节点上的RDDpartition,因为节点故障,导致数据丢了,那么RDD会自动通过自己的数据来源重新计算该partition。这一切对使用者是透明的。

    5、RDD的数据默认情况下存放在内存中的,但是在内存资源不足时,Spark会自动将RDD数据写入磁盘。(弹性)

    创建RDD

    进行Spark核心编程的第一步就是创建一个初始的RDD。该RDD,通常就代表和包含了Spark应用程序的输入源数据。然后通过Spark Core提供的transformation算子,对该RDD进行转换,来获取其他的RDD。

    Spark Core提供了三种创建RDD的方式:

    1.使用程序中的集合创建RDD(主要用于测试)

    List<Integer> numbers = Arrays.asList(1,2,3,4,5,6,7,8,9,10);
    JavaRDD<Integer> numbersRDD = sc.parallelize(numbers);
    

    2.使用本地文件创建RDD(主要用于临时性处理有大量数据的文件)

    SparkSession spark = SparkSession.builder().master("local").appName("WordCountLocal").getOrCreate();
    JavaRDD<String> lines = spark.read().textFile("D:\\Users\\Administrator\\Desktop\\spark.txt").javaRDD();
    

    3.使用HDFS文件创建RDD(生产环境的常用方式)

    SparkSession spark = SparkSession.builder().appName("WordCountCluster").getOrCreate();
    JavaRDD<String> lines = spark.read().textFile("hdfs://h0:9000/spark.txt").javaRDD();
    

    使用HDFS文件创建RDD对比使用本地文件创建RDD,需要修改的,只有两个地方:
    第一,将SparkSession对象的master("local")方法去掉
    第二,我们针对的不是本地文件了,修改为hadoop hdfs上的真正的存储大数据的文件

    操作RDD

    Spark支持两种RDD操作:transformation和action。

    transformation操作

    transformation操作会针对已有的RDD创建一个新的RDD。transformation具有lazy特性,即transformation不会触发spark程序的执行,它们只是记录了对RDD所做的操作,不会自发的执行。只有执行了一个action,之前的所有transformation才会执行。

    常用的transformation介绍:

    map :将RDD中的每个元素传人自定义函数,获取一个新的元素,然后用新的元素组成新的RDD。

    filter:对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。

    flatMap:与map类似,但是对每个元素都可以返回一个或多个元素。

    groupByKey:根据key进行分组,每个key对应一个Iterable<value>。

    reduceByKey:对每个key对应的value进行reduce操作。

    sortByKey:对每个key对应的value进行排序操作。

    join:对两个包含<key,value>对的RDD进行join操作,每个keyjoin上的pair,都会传入自定义函数进行处理。

    cogroup:同join,但是每个key对应的Iterable<value>都会传入自定义函数进行处理。

    action操作

    action操作主要对RDD进行最后的操作,比如遍历,reduce,保存到文件等,并可以返回结果给Driver程序。action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行,这是action的特性。

    常用的action介绍:

    reduce:将RDD中的所有元素进行聚合操作。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。

    collect:将RDD中所有元素获取到本地客户端(一般不建议使用)。

    count:获取RDD元素总数。

    take(n):获取RDD中前n个元素。


    saveAsTextFile:将RDD元素保存到文件中,对每个元素调用toString方法。

    countByKey:对每个key对应的值进行count计数。

    foreach:遍历RDD中的每个元素。

    RDD持久化

    要持久化一个RDD,只要调用其cache()或者persist()方法即可。在该RDD第一次被计算出来时,就会直接缓存在每个节点中。但是cache()或者persist()的使用是有规则的,必须在transformation或者textFile等创建了一个RDD之后,直接连续调用cache()或persist()才可以。

    如果你先创建一个RDD,然后单独另起一行执行cache()或persist()方法,是没有用的,而且会报错,大量的文件会丢失。

    val lines = spark.read.textFile("hdfs://h0:9000/spark.txt").persist()
    

    Spark提供的多种持久化级别,主要是为了在CPU和内存消耗之间进行取舍。

    通用的持久化级别的选择建议:

    1、优先使用MEMORY_ONLY,如果可以缓存所有数据的话,那么就使用这种策略。因为纯内存速度最快,而且没有序列化,不需要消耗CPU进行反序列化操作。

    2、如果MEMORY_ONLY策略,无法存储所有数据的话,那么使用MEMORY_ONLY_SER,将数据进行序列化进行存储,纯内存操作还是非常快,只是要消耗CPU进行反序列化。

    3、如果需要进行快速的失败恢复,那么就选择带后缀为_2的策略,进行数据的备份,这样在失败时,就不需要重新计算了。

    4、能不使用DISK相关的策略,就不用使用,有的时候,从磁盘读取数据,还不如重新计算一次。

    共享变量

    Spark提供了两种共享变量:Broadcast Variable(广播变量)和Accumulator(累加变量)。

    BroadcastVariable会将使用到的变量,仅仅为每个节点拷贝一份,更大的用处是优化性能,减少网络传输以及内存消耗。广播变量是只读的。

    val factor = 3
    val broadcastVars = sc.broadcast(factor);
    val numberList = Array(1,2,3,4,5)
    val number = sc.parallelize(numberList).map( num => num * broadcastVars.value)  //广播变量读值broadcastVars.value
    

    Accumulator则可以让多个task共同操作一份变量,主要可以进行累加操作。task只能对Accumulator进行累加操作,不能读取它的值。只有Driver程序可以读取Accumulator的值。

    val numberList = Array(1,2,3,4,5)
    val numberRDD = sc.parallelize(numberList,1)
    val sum = sc.accumulator(0)
    numberRDD.foreach{m => sum += m}
    

    小案例实战1

    案例需求:

    1、对文本文件内的每个单词都统计出其出现的次数。
    2、按照每个单词出现次数的数量,降序排序。

    步骤:

    • 1.创建RDD
    • 2.将文本进行拆分 (flatMap)
    • 3.将拆分后的单词进行统计 (mapToPair,reduceByKey)
    • 4.反转键值对 (mapToPair)
    • 5.按键升序排序 (sortedByKey)
    • 6.再次反转键值对 (mapToPair)
    • 7.打印输出(foreach)

    Java版本jdk1.8以下

    public class SortWordCount {
        public static void main(String[] args) throws Exception {
            SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
            // 创建lines RDD
            JavaRDD<String> lines = sc.textFile("D:\\Users\\Administrator\\Desktop\\spark.txt");
            // 将文本分割成单词RDD
            JavaRDD<String> words = lines.flatMap(new FlatMapFunction<String, String>() {
                @Override
                public Iterator<String> call(String s) throws Exception {
                    return Arrays.asList(s.split(" ")).iterator();
                }
            });
            //将单词RDD转换为(单词,1)键值对RDD
            JavaPairRDD<String,Integer> wordPair = words.mapToPair(new PairFunction<String, String,Integer>() {
                @Override
                public Tuple2<String,Integer> call(String s) throws Exception {
                    return new Tuple2<String,Integer>(s,1);
                }
            });
           //对wordPair 进行按键计数
            JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey(new Function2<Integer, Integer, Integer>() {
                @Override
                public Integer call(Integer integer, Integer integer2) throws Exception {
                    return integer +integer2;
                }
            });
            // 到这里为止,就得到了每个单词出现的次数
            // 我们的新需求,是要按照每个单词出现次数的顺序,降序排序
            // wordCounts RDD内的元素是这种格式:(spark, 3) (hadoop, 2)
            // 因此我们需要将RDD转换成(3, spark) (2, hadoop)的这种格式,才能根据单词出现次数进行排序
    
            // 进行key-value的反转映射
            JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(new PairFunction<Tuple2<String, Integer>, Integer, String>() {
                @Override
                public Tuple2<Integer, String> call(Tuple2<String, Integer> s) throws Exception {
                    return new Tuple2<Integer, String>(s._2,s._1);
                }
            });
            // 按照key进行排序
            JavaPairRDD<Integer, String> sortedCountWords = countWord.sortByKey(false);
            // 再次将value-key进行反转映射
            JavaPairRDD<String,Integer> sortedWordCount = sortedCountWords.mapToPair(new PairFunction<Tuple2<Integer, String>, String, Integer>() {
                @Override
                public Tuple2<String, Integer> call(Tuple2<Integer, String> s) throws Exception {
                    return new Tuple2<String, Integer>(s._2,s._1);
                }
            });
            // 到此为止,我们获得了按照单词出现次数排序后的单词计数
            // 打印出来
            sortedWordCount.foreach(new VoidFunction<Tuple2<String, Integer>>() {
                @Override
                public void call(Tuple2<String, Integer> s) throws Exception {
                    System.out.println("word \""+s._1+"\" appears "+ s._2+" times.");
                }
            });
            sc.close();
        }
    }
    

    Java版本jdk1.8

    可以使用lambda表达式,简化代码:

    public class SortWordCount {
        public static void main(String[] args) throws Exception {
            SparkConf conf = new SparkConf().setAppName("SortWordCount").setMaster("local");
            JavaSparkContext sc = new JavaSparkContext(conf);
            // 创建lines RDD
            JavaRDD<String> lines = sc.textFile("D:\\Users\\Administrator\\Desktop\\spark.txt");
            JavaRDD<String> words = lines.flatMap(line -> Arrays.asList(line.split(" ")).iterator());
            JavaPairRDD<String,Integer> wordPair = words.mapToPair(word -> new Tuple2<>(word,1));
            JavaPairRDD<String,Integer> wordCount = wordPair.reduceByKey((a,b) ->(a+b));
            JavaPairRDD<Integer,String> countWord = wordCount.mapToPair(word -> new Tuple2<>(word._2,word._1));
            JavaPairRDD<Integer,String> sortedCountWord = countWord.sortByKey(false);
            JavaPairRDD<String,Integer> sortedWordCount = sortedCountWord.mapToPair(word -> new Tuple2<>(word._2,word._1));
            sortedWordCount.foreach(s->System.out.println("word \""+s._1+"\" appears "+ s._2+" times."));
            sc.close();
        }
    }
    

    scala版本

    由于spark2 有了统一切入口SparkSession,在这里就使用了SparkSession。

    package cn.spark.study.core
    import org.apache.spark.sql.SparkSession
    object SortWordCount {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("SortWordCount").master("local").getOrCreate()
        val lines = spark.sparkContext.textFile("D:\\Users\\Administrator\\Desktop\\spark.txt")
        val words = lines.flatMap{line => line.split(" ")}
        val wordCounts = words.map{word => (word,1)}.reduceByKey(_ + _)
        val countWord = wordCounts.map{word =>(word._2,word._1)}
        val sortedCountWord = countWord.sortByKey(false)
        val sortedWordCount = sortedCountWord.map{word => (word._2, word._1)}
        sortedWordCount.foreach(s=>
        {
          println("word \""+s._1+ "\" appears "+s._2+" times.")
        })
        spark.stop()
      }
    }
    



    小案例实战2

    需求:

    1、按照文件中的第一列排序。
    2、如果第一列相同,则按照第二列排序。

    实现步骤:

    • 1、实现自定义的key,要实现Ordered接口和Serializable接口,在key中实现自己对多个列的排序算法
    • 2、将包含文本的RDD,映射成key为自定义key,value为文本的JavaPairRDD(map)
    • 3、使用sortByKey算子按照自定义的key进行排序(sortByKey)
    • 4、再次映射,剔除自定义的key,只保留文本行(map)
    • 5、打印输出(foreach)

    这里主要用scala编写

    class SecondSortKey(val first:Int,val second:Int) extends Ordered[SecondSortKey] with Serializable{
      override def compare(that: SecondSortKey): Int = {
        if(this.first - that.first !=0){
          this.first-that.first
        }else{
          this.second-that.second
        }
      }
    }
    object SecondSort {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("SecondSort").master("local").getOrCreate()
        val lines = spark.sparkContext.textFile("D:\\sort.txt")
        val pairs = lines.map{line => (
          new SecondSortKey(line.split(" ")(0).toInt,line.split(" ")(1).toInt),line
        )}
        val sortedParis = pairs.sortByKey()
        val sortedLines = sortedParis.map(pairs => pairs._2)
        sortedLines.foreach(s => println(s))
        spark.stop()
      }
    }
    



    小案例实战3

    需求:

    对每个班级内的学生成绩,取出前3名。(分组取topn)

    实现步骤:

    1.创建初始RDD

    2.对初始RDD的文本行按空格分割,映射为key-value键值对

    3.对键值对按键分组

    4.获取分组后每组前3的成绩:

    • 4.1 遍历每组,获取每组的成绩
    • 4.2 将一组成绩转换成一个数组缓冲
    • 4.3 将数组缓冲按从大到小排序
    • 4.4 对排序后的数组缓冲取其前三

    5.打印输出

    以下是使用scala实现:

    object GroupTop3 {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder().appName("GroupTop3").master("local").getOrCreate()
        //创建初始RDD
        val lines = spark.sparkContext.textFile("D:\\score.txt")
        //对初始RDD的文本行按空格分割,映射为key-value键值对
        val pairs = lines.map(line => (line.split(" ")(0), line.split(" ")(1).toInt))
        //对pairs键值对按键分组
        val groupedPairs = pairs.groupByKey()
        //获取分组后每组前3的成绩
        val top3Score = groupedPairs.map(classScores => {
          var className = classScores._1
          //获取每组的成绩,将其转换成一个数组缓冲,并按从大到小排序,取其前三
          var top3 = classScores._2.toBuffer.sortWith(_>_).take(3)
          Tuple2(className,top3)
        })
        top3Score.foreach(m => {
          println(m._1)
          for(s <- m._2) println(s)
          println("------------------")
        })
      }
    }
    




    以上三个小案例都用Scala实现了,用到了Scala中的集合的操作、高阶函数、链式调用、隐式转换等知识,自己动手实现,对Scala有个比较好的理解和掌握。


    作者:简单的happy Python爱好者社区专栏作者

    博客专栏:简单的happy

    展开全文
  • 题目概述 有一个成绩表 sc.txt Tom,DataBase,80 Tom,Algorithm,50 Tom,DataStructure,60 Jim,DataBase,90 Jim,Algrddorithm,60 Jim,DataStructure,80 John,DataStructure,80 Zhang,DataStructure,80 ...
  • 简单的Spark案例——课程学习量统计

    千次阅读 2018-10-12 00:16:40
    需求:如下图的文件中有很多访问记录,第一列表示访问站点的时间戳,第二列表示访问的站点,中间用制表符分割。这里相当于学习的不同课程,如java,ui,bigdata,android,h5等,其中每门课程又分为子课程,如h5课程...
  • 首先吐槽一下csdn,目前不支持scala语言输入!!希望后续维护更新! import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.rdd.RDD ... * Desc:学习scala语言开发实现spark版:词计数...
  • Scala 统计学生成绩 题干 学生的成绩清单格式如下所示,第一行为表头,各字段意思分别为学号、性别、课程名 1、课程名 2 等,后面每一行代表一个学生的信息,各字段之间用空白符隔开 Id gender Math English Physics...
  • Spark介绍与安装教程(Linux系统)

    万次阅读 2016-04-24 14:09:53
    简单的Spark介绍,以及相当简单的安装教程。
  • 【大数据----SparkSpark入门教程[1]

    千次阅读 2019-05-31 08:44:08
    本教程源于2016年3月出版书籍《Spark原理、机制及应用》 ,如有兴趣,请支持正版书籍。 随着互联网为代表的信息技术深度发展,其背后由于历史积累产生了TB、PB甚至EB级数据量,由于传统机器的软硬件不足以支持如此...
  • Spark入门详解

    万次阅读 2018-08-16 15:05:04
    Spark概述 1 11 什么是Spark 2 Spark特点 3 Spark的用户和用途 二 Spark集群安装 1 集群角色 2 机器准备 3 下载Spark安装包 4 配置SparkStandalone 5 配置Job History ServerStandalone 6 ...
  • 本教程源于2016年3月出版书籍《Spark原理、机制及应用》 ,如有兴趣,请支持正版书籍。随着互联网为代表的信息技术深度发展,其背后由于历史积累产生了TB、PB甚至EB级数据量,由于传统机器的软硬件不足以支持如此...
  • sparkSpark 入门到精通

    万次阅读 2019-09-29 09:29:12
    Spark 修炼之道(进阶篇)——Spark 入门到精通:第一节 Spark 1.5.0 集群搭建【点击打开】 Spark 修炼之道(进阶篇)——Spark 入门到精通:第二节 Hadoop、Spark 生成圈简介【点击打开】 Spark 修炼之道(进阶篇...
  • Spark入门实战指南——Spark SQL入门

    千次阅读 2016-09-19 15:23:40
    Spark SQL对SQL语句的处理,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。...
  • spark入门介绍(菜鸟必看)

    万次阅读 2015-11-25 22:09:33
    什么是Spark Apache Spark是一个围绕速度、易用性和复杂分析构建的大数据处理框架。最初在2009年由加州大学伯克利分校的AMPLab开发,并于2010年成为Apache的开源项目之一。 与Hadoop和Storm等其他大数据和...
  • Spark重要概念 弹性分布式数据集(RDD) 1. Spark重要概念 本节部分内容源自官方文档:http://spark.apache.org/docs/latest/cluster-overview.html (1)Spark运行模式 目前最为常用的Spark运行模式有: - l
  • Spark 入门实战之最好的实例

    万次阅读 多人点赞 2018-08-16 16:34:28
    转载:https://www.ibm.com/developerworks/cn/opensource/os-cn-spark-practice1/ 搭建开发环境 安装 Scala IDE 搭建 Scala 语言开发环境很容易,Scala IDE 官网 下载合适的版本并解压就可以完成安装,本文使用...
  • hive on spark入门安装(hive2.0、spark1.5)

    千次阅读 2017-02-09 10:24:08
    但是由于mapreduce很多计算过程都要经过硬盘读写等劣势,和spark等计算引擎相比,无论是计算速度,还是计算灵活度上都有很多劣势,这也导致了hive on mapreduce计算速度并不是令人很满意。本篇来讲下hive on spark,...
  • 子雨大数据之Spark入门教程

    万次阅读 2017-07-31 17:53:50
    林子雨老师与其团队做的技术分享,值得去好好研究下 林子雨老师 2016年10月30日 ...Spark最初诞生于美国加州大学伯克利分校(UC Berkeley)的AMP实验室,是一个可应用于大规模数据处理的快速、通用引擎。2013年,
  • Spark入门系列

    2016-12-07 19:48:47
    读完Spark官方文档后,在研究别人的源码以及Spark的源码之前进行一番入门学习,这个系列不错。 Spark系列 除此之外,Databricks也是一个非常不错的网站,上面可以使用免费的Spark集群进行代码提交与测试,在...
  • Spark入门及Java Api

    千次阅读 2018-07-09 16:42:43
    转载自:Spark基础与Java Api介绍一、Spark简介 1、什么是Spark 发源于AMPLab实验室的分布式内存计算平台,它克服了MapReduce在迭代式计算和交互式计算方面的不足。 相比于MapReduce,Spark能充分利用内存资源...
  • Spark生态圈 1. Hadoop生态圈原文地址:http://os.51cto.com/art/201508/487936_all.htm#rd?sukey=a805c0b270074a064cd1c1c9a73c1dcc953928bfe4a56cc94d6f67793fa02b3b983df6df92dc418df5a1083411b53325 下图给出
  • spark入门实战windows本地测试程序

    千次阅读 2016-06-16 10:42:57
    在做Spark开发时,一般会在windows下进行Spark本地模式程序调试,在本地调试好了再打包运行在Spark集群上。因此需要在windows上进行Spark开发配置。本文将给出三种开发工具的配置:1、使用eclipse java api开发;2、...
  • Spark入门教程(2)---开发、编译配置

    千次阅读 2016-04-11 10:30:56
    本教程源于2016年3月出版书籍《Spark原理、机制及应用》 ,在此以知识共享为初衷公开部分内容,如有兴趣,请支持正版书籍。 Spark为使用者提供了大量的工具和脚本文件,使得其部署与开发变得十分方便快捷,本章...
  • Spark入门基础教程

    万次阅读 2016-11-25 14:37:03
    Spark入门基础教程
  • 作者:周志湖1. 获取数据本文通过将github上的Spark项目...数据获取命令如下:[root@master spark]# git log --pretty=format:'{"commit":"%H","author":"%an","author_email":"%ae","date":"%ad","message":"%f"}' >
  • 本节主要内容本节部分内容来自官方文档:...入门案例 1. Spark流式计算简介Hadoop的MapReduce及Spark SQL等只能进行离线计算,无法满足实时性要

空空如也

1 2 3 4 5 ... 20
收藏数 274,985
精华内容 109,994
关键字:

spark