精华内容
参与话题
问答
  • Spark 中 RDD 的详细介绍

    万次阅读 2018-09-27 09:14:46
    RDD ---弹性分布式数据集 RDD概述 RDD论文 中文版 : http://spark.apachecn.org/paper/zh/spark-rdd.html RDD产生背景 为了解决开发人员能在大规模的集群中以一种容错的方式进行内存计算,提出了 RDD 的概念,...

    RDD ---弹性分布式数据集


    RDD概述

    RDD论文

    中文版 : http://spark.apachecn.org/paper/zh/spark-rdd.html

    RDD产生背景

    为了解决开发人员能在大规模的集群中以一种容错的方式进行内存计算,提出了 RDD 的概念,而当前的很多框架对迭代式算法场景与交互性数据挖掘场景的处理性能非常差, 这个是RDDs 的提出的动机。

    什么是 RDD

    RDD 是 Spark 的计算模型。RDD(Resilient Distributed Dataset)叫做弹性的分布式数据集合,是 Spark 中最基本的数据抽象,它代表一个不可变、只读的,被分区的数据集。操作 RDD 就像操作本地集合一样,有很多的方法可以调用,使用方便,而无需关心底层的调度细节。


    创建RDD

    1 . 集合并行化创建 (通过scala集合创建) scala中的本地集合 -->Spark RDD

    spark-shell --master spark://hadoop01:7077

    scala> val arr = Array(1,2,3,4,5)
    scala> val rdd = sc.parallelize(arr)
    scala> val rdd = sc.makeRDD(arr)
    scala> rdd.collect
    res0: Array[Int] = Array(1, 2, 3, 4, 5)

    通过集合并行化方式创建RDD,适用于本地测试,做实验

    2 外部文件系统 , 比如 HDFS

    读取HDFS文件系统

    val rdd2 = sc.textFile("hdfs://hadoop01:9000/words.txt")

    读取本地文件

    val rdd2 = sc.textFile(“file:///root/words.txt”)

    scala> val rdd2 = sc.textFile("file:root/word.txt")
    scala> rdd2.collect
    res2: Array[String] = Array(hadoop hbase java, hbase java spark, java, hadoop hive hive, hive hbase)

    3 从父RDD转换成新的子RDD

    调用 Transformation 类的方法,生成新的 RDD

    只要调用transformation类的算子,都会生成一个新的RDD。RDD中的数据类型,由传入给算子的函数的返回值类型决定

    注意:action类的算子,不会生成新的 RDD

    scala> rdd.collect
    res3: Array[Int] = Array(1, 2, 3, 4, 5)

    scala> val rdd = sc.parallelize(arr)
    scala> val rdd2 = rdd.map(_*100)
    scala> rdd2.collect
    res4: Array[Int] = Array(100, 200, 300, 400, 500)

    Spark上的所有的方法 , 有一个专有的名词 , 叫做算子


    RDD分区

    说对RDD进行操作 , 实际上是操作的RDD上的每一个分区 , 分区的数量决定了并行的数量 .

    使用 rdd.partitions.size 查看分区数量

    scala> rdd.partitions.size
    res7: Int = 4

    scala> rdd2.partitions.size
    res8: Int = 4

    如果从外部创建RDD,比如从hdfs中读取数据,正常情况下,分区的数量和我们读取的文件的block块数是一致的,但是如果只有一个block块,那么分区数量是2.也就是说最低的分区数量是2

    如果是集合并行化创建得到的RDD,分区的数量,默认的和最大可用的cores数量相等。

    (--total-executor-cores > 可用的 cores? 可用的 cores:--total-executor-cores)

    集合并行化得到的RDD的分区 :

    默认情况下,一个application使用多少个cores,就有多少个分区

    分区的数量 = 运行任务的可用的cores(默认一个cores,能处理一个任务)

    可以指定分区的数量:

    通过集合并行化创建的RDD是可以任意修改分区的数量的

    val rdd = sc.makeRDD(arr,分区的数值)

    scala> val arr = Array(List(1,3),List(4,6))
    scala> val rdd3 = sc.parallelize(arr,3)
    scala> rdd3.partitions.size
    res1: Int = 3

    这种方式,多用于测试

    读取外部文件RDD的分区

    正常情况下,读取HDFS中的文件,默认情况下,读到的文件有几个block块,得到的RDD就有几个分区。

    当读取一个文件,不足一个block块的时候,会是2个分区

    默认情况下,分区的数量  = 读取的文件的block块的数量,但是至少是2个

    scala> val rdd1 = sc.textFile("hdfs://hadoop01:9000/hbase-1.2.6-bin.tar.gz")
    scala> rdd1.partitions.size
    res2: Int = 1

    scala> val rdd2 = sc.textFile("hdfs://hadoop01:9000/hadoop-2.8.3.tar.gz")
    scala> rdd2.partitions.size
    res3: Int = 1

    scala> val rdd3 = sc.textFile("hdfs://hadoop01:9000/ideaIU-2017.2.2.exe")
    scala> rdd3.partitions.size
    res4: Int = 4

    hadoop-2.8.3文件200多M,有俩个块,按说有俩个分区,hbase不到100M,有一个块,按说应该有2个分区,结果这俩个都是一个分区,是不正常的,不知道问题在哪里,希望知道的大佬指点一下

    idea文件500多M有4个块,有四个分区,是正常的

    textFile自身提供了修改分区的API

    sc.textFile(path,分区数量)

    1 这里的分区数量,不能少于读取的数据的block块的数量

    2 当设置的分区的数量大于block的数量的时候,读取数据的API会根据我们的数据进行优化

    scala> val rdd3 = sc.textFile("hdfs://hadoop01:9000/ideaIU-2017.2.2.exe",5)
    scala> rdd3.partitions.size
    res7: Int = 5

    真正的想要改变分区的数量:用算子

    repartition,coalesce,专用于修改分区数量

    读取HDFS上的数据,写入到HDFS中的数据,使用的API都是hadoop的API

    总结:

    默认情况下,分区的数量 = 读取文件的block块的数量

    分区的数量至少是2个

    通过转换类的算子

    默认情况下,分区的数量是不变的。map  flatMap  filter

    groupByKey,reduceByKey 默认是不变的,但是可以通过参数来改变

    repartition(分区数量),coalesce(分区数量),根据指定的分区数量重新分区

    union:分区数量会增加

    scala> val rdd2 = rdd1.flatMap(_.split(" ")).map((_,1))
    rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[14] at map at <console>:26
    
    scala> val rdd3 = rdd2.reduceByKey(_+_)
    rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[15] at reduceByKey at <console>:28
    
    scala> rdd2.partitions.size
    res7: Int = 3
    
    scala> rdd3.partitions.size
    res8: Int = 3
    
    scala> val rdd3 = rdd2.reduceByKey(_+_,6)
    rdd3: org.apache.spark.rdd.RDD[(String, Int)] = ShuffledRDD[16] at reduceByKey at <console>:28
    
    scala> rdd3.partitions.size
    res9: Int = 6
    

    总结:

    集合并行化:

    val arr = Array[Int](1,4,5,6) –》  sc.makeRDD(arr)     RDD[Int]

    默认情况下, 分区数量 =  application使用的 cores

    sc.makeRDD(data,分区数量)

    读取HDFS数据:

    默认情况下, 分区数量 =  读取的数据的block块的数量

    至少是2个

    通过转换类的算子获取的RDD :

    默认情况下,分区的数量是不变的。

    简单来说,rdd分区数量就决定了任务的并行的数量。

    展开全文
  • RDD基础

    万次阅读 2018-07-28 16:06:28
    RDD介绍 RDD概念 一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可以分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在...

    RDD介绍

    RDD概念

    一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可以分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在集群中的不同节点上进行并行计算。RDD提供了一种高度受限的共享内存模型,即RDD是只读的记录分区的集合,不能直接修改,只能基于稳定的物理存储中的数据集来创建RDD,或者通过在其他RDD上执行确定的转换操作(如map、join和groupBy)而创建得到新的RDD。

    RDD提供了一组丰富的操作以支持常见的数据运算,分为“行动”(Action)和“转换”(Transformation)两种类型,前者用于执行计算并指定输出的形式,后者指定RDD之间的相互依赖关系。两类操作的主要区别是,转换操作(比如map、filter、join等)接受RDD并返回RDD,而行动操作(比如count、collect等)接受RDD但是返回非RDD(即输出一个值或结果)。

    RDD采用了惰性调用,即在RDD的执行过程中,真正的计算发生在RDD的“行动”操作,对于“行动”之前的所有“转换”操作,Spark只是记录下“转换”操作应用的一些基础数据集以及RDD生成的轨迹,即相互之间的依赖关系,而不会触发真正的计算。

    如下图从输入中逻辑上生成A和C两个RDD,经过一系列“转换”操作,逻辑上生成了F(也是一个RDD),之所以说是逻辑上,是因为这时候计算并没有发生,Spark只是记录了RDD之间的生成和依赖关系。当F要进行输出时,也就是当F进行“行动”操作的时候,Spark才会根据RDD的依赖关系生成DAG,并从起点开始真正的计算。

    eg:在pyspark的交互环境下,输入如下代码

    fileRDD = sc.textFile('/test.txt')
    def contains(line):
    ...     return 'hello world' in line
    filterRDD = fileRDD.filter(contains)
    filterRDD.cache()
    filterRDD.count()
    

    第一行: 从HDFS文件中读取数据创建RDD

    第二、三行: 定义一个过滤函数

    第四行:对fileRDD进行转换操作得到一个新的RDD,即filterRDD

    第五行:第5行代码表示对filterRDD进行持久化,把它保存在内存或磁盘中(这里采用cache接口把数据集保存在内存中),方便后续重复使用,当数据被反复访问时(比如查询一些热点数据,或者运行迭代算法)

    第六行:count()是一个行动类型的操作,触发真正的计算,开始实际执行从fileRDD到filterRDD的转换操作,并把结果持久化到内存中,最后计算出filterRDD中包含的元素个数。

    RDD的特性

    (1)高效的容错性。现有的分布式共享内存、键值存储、内存数据库等,为了实现容错,必须在集群节点之间进行数据复制或者记录日志,也就是在节点之间会发生大量的数据传输,这对于数据密集型应用而言会带来很大的开销。在RDD的设计中,数据只读,不可修改,如果需要修改数据,必须从父RDD转换到子RDD,由此在不同RDD之间建立了血缘关系。所以,RDD是一种天生具有容错机制的特殊集合,不需要通过数据冗余的方式(比如检查点)实现容错,而只需通过RDD父子依赖(血缘)关系重新计算得到丢失的分区来实现容错,无需回滚整个系统,这样就避免了数据复制的高开销,而且重算过程可以在不同节点之间并行进行,实现了高效的容错。此外,RDD提供的转换操作都是一些粗粒度的操作(比如map、filter和join),RDD依赖关系只需要记录这种粗粒度的转换操作,而不需要记录具体的数据和各种细粒度操作的日志(比如对哪个数据项进行了修改),这就大大降低了数据密集型应用中的容错开销;

    (2)中间结果持久化到内存。数据在内存中的多个RDD操作之间进行传递,不需要“落地”到磁盘上,避免了不必要的读写磁盘开销;
    (3)存放的数据可以是Java对象,避免了不必要的对象序列化和反序列化开销。

    RDD的依赖关系

    RDD中不同的操作会使得不同RDD中的分区会产生不同的依赖。RDD中的依赖关系分为窄依赖(Narrow Dependency)与宽依赖(Wide Dependency)

    窄依赖表现为一个父RDD的分区对应于一个子RDD的分区,或多个父RDD的分区对应于一个子RDD的分区;窄依赖典型的操作包括map、filter、union等

    宽依赖则表现为存在一个父RDD的一个分区对应一个子RDD的多个分区。宽依赖典型的操作包括groupByKey、sortByKey等

    对于连接(join)操作,可以分为两种情况。
    (1)对输入进行协同划分,属于窄依赖(如图9-10(a)所示)。所谓协同划分(co-partitioned)是指多个父RDD的某一分区的所有“键(key)”,落在子RDD的同一个分区内,不会产生同一个父RDD的某一分区,落在子RDD的两个分区的情况。
    (2)对输入做非协同划分,属于宽依赖,如图9-10(b)所示。
    对于窄依赖的RDD,可以以流水线的方式计算所有父分区,不会造成网络之间的数据混合。对于宽依赖的RDD,则通常伴随着Shuffle操作,即首先需要计算好所有父分区数据,然后在节点之间进行Shuffle。

    Stage的划分

    Spark通过分析各个RDD的依赖关系生成了DAG,再通过分析各个RDD中的分区之间的依赖关系来决定如何划分阶段,具体划分方法是:在DAG中进行反向解析,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的阶段中;将窄依赖尽量划分在同一个阶段中,可以实现流水线计算(具体的阶段划分算法请参见AMP实验室发表的论文《Resilient Distributed Datasets: A Fault-Tolerant Abstraction for In-Memory Cluster Computing》)。例如,如图9-11所示,假设从HDFS中读入数据生成3个不同的RDD(即A、C和E),通过一系列转换操作后再将计算结果保存回HDFS。对DAG进行解析时,在依赖图中进行反向解析,由于从RDD A到RDD B的转换以及从RDD B和F到RDD G的转换,都属于宽依赖,因此,在宽依赖处断开后可以得到三个阶段,即阶段1、阶段2和阶段3。可以看出,在阶段2中,从map到union都是窄依赖,这两步操作可以形成一个流水线操作,比如,分区7通过map操作生成的分区9,可以不用等待分区8到分区9这个转换操作的计算结束,而是继续进行union操作,转换得到分区13,这样流水线执行大大提高了计算的效率。

    一个DAG图划分成多个“阶段”以后,每个阶段都代表了一组关联的、相互之间没有Shuffle依赖关系的任务组成的任务集合。每个任务集合会被提交给任务调度器(TaskScheduler)进行处理,由任务调度器将任务分发给Executor运行。

    RDD的运行流程

    (1)创建RDD对象;
    (2)SparkContext负责计算RDD之间的依赖关系,构建DAG;
    (3)DAGScheduler负责把DAG图分解成多个阶段,每个阶段中包含了多个任务,每个任务会被任务调度器分发给各个工作节点(Worker Node)上的Executor去执行。

    RDD编程

    准备工作

    开启hdfs 开启spark

    创建RDD

    (1)从文件系统中加载数据创建rdd

    lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt")
    

    如果不加file://,那么默认是从hdfs上取文件,以下三条命令等价

    lines = sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
    lines = sc.textFile("/user/hadoop/word.txt")
    lines = sc.textFile("word.txt")
    

    注意

    • 如果使用了本地文件系统的路径,那么,必须要保证在所有的worker节点上,也都能够采用相同的路径访问到该文件,比如,可以把该文件拷贝到每个worker节点上,或者也可以使用网络挂载共享文件系统。
    • textFile()方法的输入参数,可以是文件名,也可以是目录,也可以是压缩文件等。比如,textFile(“/my/directory”), textFile(“/my/directory/.txt”), and textFile(“/my/directory/.gz”).
    • textFile()方法也可以接受第2个输入参数(可选),用来指定分区的数目。默认情况下,Spark会为HDFS的每个block创建一个分区(HDFS中每个block默认是128MB)。你也可以提供一个比block数量更大的值作为分区数目,但是,你不能提供一个小于block数量的值作为分区数目。

    (2)通过并行集合(数组)创建RDD

    nums = [1,2,3,4,5]
    rdd = sc.parallelize(nums)
    

    RDD操作

    转换操作:

    对于RDD而言,每一次转换操作都会产生不同的RDD,供给下一个“转换”使用。转换得到的RDD是惰性求值的,也就是说,整个转换过程只是记录了转换的轨迹,并不会发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作。
    下面列出一些常见的转换操作(Transformation API):

    • filter(func):筛选出满足函数func的元素,并返回一个新的数据集
    • map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集
    • flatMap(func):与map()相似,但每个输入元素都可以映射到0或多个输出结果
    • reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K, V)形式的数据集,其中的每个值是将每个key传递到函数func中进行聚合

    行动操作:

    行动操作是真正触发计算的地方。Spark程序执行到行动操作时,才会执行真正的计算,从文件中加载数据,完成一次又一次转换操作,最终,完成行动操作得到结果。
    下面列出一些常见的行动操作(Action API):

    • count() 返回数据集中的元素个数
    • collect() 以数组的形式返回数据集中的所有元素
    • first() 返回数据集中的第一个元素
    • take(n) 以数组的形式返回数据集中的前n个元素
    • reduce(func) 通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
    • foreach(func) 将数据集中的每个元素传递到函数func中运行

    惰性计算:

    eg:统计单词总数

    sc = SparkContext("local",'test')
    line  = sc.textFile("/home/tobin/Documents/hdfsLocalStore/harryport/1.txt")
    //map操作会遍历每行文本
    linelength =  line.map(lambda s:len(s))
    #知道最后一步才开始执行,之前并没有执行,惰性计算
    totallength = linelength.reduce(lambda  a,b:a+b)
    print(totallength)
    

    持久化

    from pyspark  import SparkContext
    sc = SparkContext("local",'test')
    
    list =["hadoop","spark","Hive"]
    rdd = sc.parallelize(list)
    print(rdd.count())
    print(','.join(rdd.collect()))
    

    有两个行动算子,以为着每次调用行动操作,都会出发一次从头开始的计算,为了避免重复计算,使用cache()将中间变量持久化到内存中,在上述代码的中间加入如下代码

    rdd.cache()
    

    分区

    RDD是弹性分布式数据集,通常RDD很大,会被分成很多个分区,分别保存在不同的节点上。

    对于textFile而言,如果没有在方法中指定分区数,则默认为min(defaultParallelism,2),其中,defaultParallelism对应的就是spark.default.parallelism。

    如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)

    打印元素

    rdd.foreach(print)
    or
    rdd.map(print)
    or
    //集群模式下把各节点的数据打印出来
    rdd.collect().foreach(print)
    
    

    键值对RDD

    (1)用map()函数创建键值对rdd

    from pyspark import SparkContext
    sc =SparkContext('local','test')
    lines=sc.textFile("/home/tobin/Documents/hdfsLocalStore/harryport/1.txt")
    pairrdd = lines.flatMap(lambda line:line.split(" ")).map(lambda word:(word,1))
    pairrdd.foreach(print)
    

    (2)通过并行集合(列表)创建rdd

    from pyspark import SparkContext
    sc =SparkContext('local','test')
    list =['hadoop','spark','hive','zookeeper']
    rdd = sc.parallelize(list)
    pairrdd = rdd.map(lambda word:(word,1))
    pairrdd.foreach(print
    

    键值对转换操作

    reduceByKey(func):使用func函数合并具有相同键的值

    groupByKey() ;对具有相同键的值进行分组

    keys(): 把键值对RDD中的key返回形成一个新的RDD

    values():键值对RDD中的value返回形成一个新的RDD

    sortByKey():sortByKey()的功能是返回一个根据键排序的RDD。

    mapValues(func):只想对键值对RDD的value部分进行处理

    join:类似于数据库的表连接

    eg: 求相同的字符串出现的平均值

    from pyspark import  SparkContext
    sc =SparkContext('local','test')
    rdd =sc.parallelize([("spark",2),("hadoop",6),("hadoop",4),("spark",6)])
    rdd =rdd.mapValues(lambda  x:(x,1))
    rdd=rdd.reduceByKey(lambda x,y:(x[0]+y[0],x[1]+y[1]))
    rdd =rdd.mapValues(lambda x:x[0]//x[1])
    rdd.collect()
    rdd.foreach(print)
    

    共享变量

    有时候,需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量。为了满足这种需求,Spark提供了两种类型的变量:广播变量(broadcast variables)和累加器(accumulators)

    广播变量

    广播变量用来把变量在所有节点的内存之间进行共享。可以通过调用SparkContext.broadcast(v)来从一个普通变量v中创建一个广播变量。这个广播变量就是对普通变量v的一个包装器,通过调用value方法就可以获得这个广播变量的值,具体代码如下

    broadcastVar = sc.broadcast([1, 2, 3])
    broadcastVar.value
    

    累加器

    累加器是仅仅被相关操作累加的变量,通常可以被用来实现计数器(counter)和求和(sum)。Spark原生地支持数值型(numeric)的累加器,一个数值型的累加器,可以通过调用SparkContext.accumulator()来创建

    accum = sc.accumulator(0)
    sc.parallelize([1, 2, 3, 4]).foreach(lambda x : accum.add(x))
    accum.value
    
    展开全文
  • Spark RDD是什么?

    万次阅读 多人点赞 2019-06-29 17:44:45
    Spark 的核心是建立在统一的抽象弹性分布式数据集(Resiliennt Distributed Datasets,RDD)之上的,这使得 Spark 的各个组件可以无缝地进行集成,能够在同一个应用程序中完成大数据处理。本节将对 RDD 的基本概念及...

    Spark 的核心是建立在统一的抽象弹性分布式数据集(Resiliennt Distributed Datasets,RDD)之上的,这使得 Spark 的各个组件可以无缝地进行集成,能够在同一个应用程序中完成大数据处理。本节将对 RDD 的基本概念及与 RDD 相关的概念做基本介绍。

    RDD 的基本概念

    RDD 是 Spark 提供的最重要的抽象概念,它是一种有容错机制的特殊数据集合,可以分布在集群的结点上,以函数式操作集合的方式进行各种并行操作。

    通俗点来讲,可以将 RDD 理解为一个分布式对象集合,本质上是一个只读的分区记录集合。每个 RDD 可以分成多个分区,每个分区就是一个数据集片段。一个 RDD 的不同分区可以保存到集群中的不同结点上,从而可以在集群中的不同结点上进行并行计算。

    图 1 展示了 RDD 的分区及分区与工作结点(Worker Node)的分布关系。

    RDD分区及分区与工作节点的分布关系
    图 1  RDD 分区及分区与工作节点的分布关系

    RDD 具有容错机制,并且只读不能修改,可以执行确定的转换操作创建新的 RDD。具体来讲,RDD 具有以下几个属性。

    • 只读:不能修改,只能通过转换操作生成新的 RDD。
    • 分布式:可以分布在多台机器上进行并行处理。
    • 弹性:计算过程中内存不够时它会和磁盘进行数据交换。
    • 基于内存:可以全部或部分缓存在内存中,在多次计算间重用。

    RDD 实质上是一种更为通用的迭代并行计算框架,用户可以显示控制计算的中间结果,然后将其自由运用于之后的计算。

    在大数据实际应用开发中存在许多迭代算法,如机器学习、图算法等,和交互式数据挖掘工具。这些应用场景的共同之处是在不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。

    RDD 正是为了满足这种需求而设计的。虽然 MapReduce 具有自动容错、负载平衡和可拓展性的优点,但是其最大的缺点是采用非循环式的数据流模型,使得在迭代计算时要进行大量的磁盘 I/O 操作。

    通过使用 RDD,用户不必担心底层数据的分布式特性,只需要将具体的应用逻辑表达为一系列转换处理,就可以实现管道化,从而避免了中间结果的存储,大大降低了数据复制、磁盘 I/O 和数据序列化的开销。

    RDD 基本操作

    RDD 的操作分为转化(Transformation)操作和行动(Action)操作。转化操作就是从一个 RDD 产生一个新的 RDD,而行动操作就是进行实际的计算。

    RDD 的操作是惰性的,当 RDD 执行转化操作的时候,实际计算并没有被执行,只有当 RDD 执行行动操作时才会促发计算任务提交,从而执行相应的计算操作。

    1. 构建操作

    Spark 里的计算都是通过操作 RDD 完成的,学习 RDD 的第一个问题就是如何构建 RDD,构建 RDD 的方式从数据来源角度分为以下两类。

    • 从内存里直接读取数据。
    • 从文件系统里读取数据,文件系统的种类很多,常见的就是 HDFS 及本地文件系统。

    第一类方式是从内存里构造 RDD,需要使用 makeRDD 方法,代码如下所示。

    val rdd01 = sc.makeRDD(List(l,2,3,4,5,6))

    这个语句创建了一个由“1,2,3,4,5,6”六个元素组成的 RDD。

    第二类方式是通过文件系统构造 RDD,代码如下所示。

    val rdd:RDD[String] == sc.textFile(“file:///D:/sparkdata.txt”,1)

    这里例子使用的是本地文件系统,所以文件路径协议前缀是 file://。

    2. 转换操作

    RDD 的转换操作是返回新的 RDD 的操作。转换出来的 RDD 是惰性求值的,只有在行动操作中用到这些 RDD 时才会被计算。

    许多转换操作都是针对各个元素的,也就是说,这些转换操作每次只会操作 RDD 中的一个元素,不过并不是所有的转换操作都是这样的。表 1 描述了常用的 RDD 转换操作。

    表 1 RDD转换操作(rdd1={1, 2, 3, 3},rdd2={3,4,5})
    函数名 作用 示例 结果
    map() 将函数应用于 RDD 的每个元素,返回值是新的 RDD rdd1.map(x=>x+l) {2,3,4,4}
    flatMap() 将函数应用于 RDD 的每个元素,将元素数据进行拆分,变成迭代器,返回值是新的 RDD rdd1.flatMap(x=>x.to(3)) {1,2,3,2,3,3,3}
    filter() 函数会过滤掉不符合条件的元素,返回值是新的 RDD rdd1.filter(x=>x!=1) {2,3,3}
    distinct() 将 RDD 里的元素进行去重操作 rdd1.distinct() (1,2,3)
    union() 生成包含两个 RDD 所有元素的新的 RDD rdd1.union(rdd2) {1,2,3,3,3,4,5}
    intersection() 求出两个 RDD 的共同元素 rdd1.intersection(rdd2) {3}
    subtract() 将原 RDD 里和参数 RDD 里相同的元素去掉 rdd1.subtract(rdd2) {1,2}
    cartesian() 求两个 RDD 的笛卡儿积 rdd1.cartesian(rdd2) {(1,3),(1,4)……(3,5)}

    3. 行动操作

    行动操作用于执行计算并按指定的方式输出结果。行动操作接受 RDD,但是返回非 RDD,即输出一个值或者结果。在 RDD 执行过程中,真正的计算发生在行动操作。表 2 描述了常用的 RDD 行动操作。

    表 2 RDD 行动操作(rdd={1,2,3,3})
    函数名 作用 示例 结果
    collect() 返回 RDD 的所有元素 rdd.collect() {1,2,3,3}
    count() RDD 里元素的个数 rdd.count() 4
    countByValue() 各元素在 RDD 中的出现次数 rdd.countByValue() {(1,1),(2,1),(3,2})}
    take(num) 从 RDD 中返回 num 个元素 rdd.take(2) {1,2}
    top(num) 从 RDD 中,按照默认(降序)或者指定的排序返回最前面的 num 个元素 rdd.top(2) {3,3}
    reduce() 并行整合所有 RDD 数据,如求和操作 rdd.reduce((x,y)=>x+y) 9
    fold(zero)(func) 和 reduce() 功能一样,但需要提供初始值 rdd.fold(0)((x,y)=>x+y) 9
    foreach(func) 对 RDD 的每个元素都使用特定函数 rdd1.foreach(x=>printIn(x)) 打印每一个元素
    saveAsTextFile(path) 将数据集的元素,以文本的形式保存到文件系统中 rdd1.saveAsTextFile(file://home/test)  
    saveAsSequenceFile(path) 将数据集的元素,以顺序文件格式保存到指 定的目录下 saveAsSequenceFile(hdfs://home/test)  

    aggregate() 函数的返回类型不需要和 RDD 中的元素类型一致,所以在使用时,需要提供所期待的返回类型的初始值,然后通过一个函数把 RDD 中的元素累加起来放入累加器。

    考虑到每个结点都是在本地进行累加的,所以最终还需要提供第二个函数来将累加器两两合并。

    aggregate(zero)(seqOp,combOp) 函数首先使用 seqOp 操作聚合各分区中的元素,然后再使用 combOp 操作把所有分区的聚合结果再次聚合,两个操作的初始值都是 zero。

    seqOp 的操作是遍历分区中的所有元素 T,第一个 T 跟 zero 做操作,结果再作为与第二个 T 做操作的 zero,直到遍历完整个分区。

    combOp 操作是把各分区聚合的结果再聚合。aggregate() 函数会返回一个跟 RDD 不同类型的值。因此,需要 seqOp 操作来把分区中的元素 T 合并成一个 U,以及 combOp 操作把所有 U 聚合。

    下面举一个利用 aggreated() 函数求平均数的例子。

    1. val rdd = List (1,2,3,4)
    2. val input = sc.parallelize(rdd)
    3. val result = input.aggregate((0,0))(
    4. (acc,value) => (acc._1 + value,acc._2 + 1),
    5. (acc1,acc2) => (acc1._1 + acc2._1,acc1._2 + acc2._2)
    6. )
    7. result?Int,Int) = (10,4)
    8. val avg = result._1 / result._2
    9. avg:Int = 2.5

    程序的详细过程大概如下。

    定义一个初始值 (0,0),即所期待的返回类型的初始值。代码 (acc,value) => (acc._1 + value,acc._2 + 1) 中的 value 是函数定义里面的 T,这里是 List 里面的元素。acc._1 + value,acc._2 + 1 的过程如下。

    (0+1,0+1)→(1+2,1+1)→(3+3,2+1)→(6+4,3+1),结果为(10,4)。

    实际的 Spark 执行过程是分布式计算,可能会把 List 分成多个分区,假如是两个:p1(1,2) 和 p2(3,4)。

    经过计算,各分区的结果分别为 (3,2) 和 (7,2)。这样,执行 (acc1,acc2) => (acc1._1 + acc2._2,acc1._2 + acc2._2) 的结果就是 (3+7,2+2),即 (10,4),然后可计算平均值。

    RDD 血缘关系

    RDD 的最重要的特性之一就是血缘关系(Lineage ),它描述了一个 RDD 是如何从父 RDD 计算得来的。如果某个 RDD 丢失了,则可以根据血缘关系,从父 RDD 计算得来。

    图 2 给出了一个 RDD 执行过程的实例。系统从输入中逻辑上生成了 A 和 C 两个 RDD, 经过一系列转换操作,逻辑上生成了 F 这个 RDD。

    Spark 记录了 RDD 之间的生成和依赖关系。当 F 进行行动操作时,Spark 才会根据 RDD 的依赖关系生成 DAG,并从起点开始真正的计算。

    RDD血缘关系
    图 2  RDD血缘关系

    上述一系列处理称为一个血缘关系(Lineage),即 DAG 拓扑排序的结果。在血缘关系中,下一代的 RDD 依赖于上一代的 RDD。例如,在图 2 中,B 依赖于 A,D 依赖于 C,而 E 依赖于 B 和 D。

    RDD依赖类型

    根据不同的转换操作,RDD 血缘关系的依赖分为窄依赖和宽依赖。窄依赖是指父 RDD 的每个分区都只被子 RDD 的一个分区所使用。宽依赖是指父 RDD 的每个分区都被多个子 RDD 的分区所依赖。

    map、filter、union 等操作是窄依赖,而 groupByKey、reduceByKey 等操作是宽依赖,如图 3 所示。

    join 操作有两种情况,如果 join 操作中使用的每个 Partition 仅仅和固定个 Partition 进行 join,则该 join 操作是窄依赖,其他情况下的 join 操作是宽依赖。

    所以可得出一个结论,窄依赖不仅包含一对一的窄依赖,还包含一对固定个数的窄依赖,也就是说,对父 RDD 依赖的 Partition 不会随着 RDD 数据规模的改变而改变。

    1. 窄依赖

    1)子 RDD 的每个分区依赖于常数个父分区(即与数据规模无关)。

    2)输入输出一对一的算子,且结果 RDD 的分区结构不变,如 map、flatMap。

    3)输入输出一对一的算子,但结果 RDD 的分区结构发生了变化,如 union。

    4)从输入中选择部分元素的算子,如 filter、distinct、subtract、sample。

    2. 宽依赖

    1)子 RDD 的每个分区依赖于所有父 RDD 分区。

    RDD窄依赖和宽依赖
    图 3  RDD窄依赖和宽依赖

    2)对单个 RDD 基于 Key 进行重组和 reduce,如 groupByKey、reduceByKey。

    3)对两个 RDD 基于 Key 进行 join 和重组,如 join。

    Spark 的这种依赖关系设计,使其具有了天生的容错性,大大加快了 Spark 的执行速度。RDD 通过血缘关系记住了它是如何从其他 RDD 中演变过来的。当这个 RDD 的部分分区数据丢失时,它可以通过血缘关系获取足够的信息来重新运算和恢复丢失的数据分区,从而带来性能的提升。

    相对而言,窄依赖的失败恢复更为高效,它只需要根据父 RDD 分区重新计算丢失的分区即可,而不需要重新计算父 RDD 的所有分区。而对于宽依赖来讲,单个结点失效,即使只是 RDD 的一个分区失效,也需要重新计算父 RDD 的所有分区,开销较大。

    宽依赖操作就像是将父 RDD 中所有分区的记录进行了“洗牌”,数据被打散,然后在子 RDD 中进行重组。

    阶段划分

    用户提交的计算任务是一个由 RDD 构成的 DAG,如果 RDD 的转换是宽依赖,那么这个宽依赖转换就将这个 DAG 分为了不同的阶段(Stage)。由于宽依赖会带来“洗牌”,所以不同的 Stage 是不能并行计算的,后面 Stage 的 RDD 的计算需要等待前面 Stage 的 RDD 的所有分区全部计算完毕以后才能进行。

    这点就类似于在 MapReduce 中,Reduce 阶段的计算必须等待所有 Map 任务完成后才能开始一样。

    在对 Job 中的所有操作划分 Stage 时,一般会按照倒序进行,即从 Action 开始,遇到窄依赖操作,则划分到同一个执行阶段,遇到宽依赖操作,则划分一个新的执行阶段。后面的 Stage 需要等待所有的前面的 Stage 执行完之后才可以执行,这样 Stage 之间根据依赖关系就构成了一个大粒度的 DAG。

    下面通过图 4 详细解释一下阶段划分。

    假设从 HDFS 中读入数据生成 3 个不同的 RDD(A、C 和 E),通过一系列转换操作后得到新的 RDD(G),并把结果保存到 HDFS 中。可以看到这幅 DAG 中只有 join 操作是一个宽依赖,Spark 会以此为边界将其前后划分成不同的阶段。

    同时可以注意到,在 Stage2 中,从 map 到 union 都是窄依赖,这两步操作可以形成一个流水线操作,通过 map 操作生成的分区可以不用等待整个 RDD 计算结束,而是继续进行 union 操作,这样大大提高了计算的效率。

    DAG阶级划分
    图 4  DAG阶级划分

    把一个 DAG 图划分成多个 Stage 以后,每个 Stage 都代表了一组由关联的、相互之间没有宽依赖关系的任务组成的任务集合。在运行的时候,Spark 会把每个任务集合提交给任务调度器进行处理。

    RDD缓存

    Spark RDD 是惰性求值的,而有时候希望能多次使用同一个 RDD。如果简单地对 RDD 调用行动操作,Spark 每次都会重算 RDD 及它的依赖,这样就会带来太大的消耗。为了避免多次计算同一个 RDD,可以让 Spark 对数据进行持久化。

    Spark 可以使用 persist 和 cache 方法将任意 RDD 缓存到内存、磁盘文件系统中。缓存是容错的,如果一个 RDD 分片丢失,则可以通过构建它的转换来自动重构。被缓存的 RDD 被使用时,存取速度会被大大加速。一般情况下,Executor 内存的 60% 会分配给 cache,剩下的 40% 用来执行任务。

    cache 是 persist 的特例,将该 RDD 缓存到内存中。persist 可以让用户根据需求指定一个持久化级别,如表 3 所示。

    表 3 持久化级别(StorageLevel)
    级别 使用空间 CPU 时间 是否在内存 是否在磁盘
    MEMORY_ONLY
    MEMORY_ONLY_SER
    MEMORY_AND_DISK 部分 部分
    MEMORY_AND_DISK_SER 部分 部分
    DISK_ONLY

    对于 MEMORY_AND_DISK 和 MEMORY_AND_DISK_SER 级别,系统会首先把数据保存在内存中,如果内存不够则把溢出部分写入磁盘中。

    另外,为了提高缓存的容错性,可以在持久化级别名称的后面加上“_2”来把持久化数据存为两份,如 MEMORY_ONLY_2。

    Spark 的不同 StorageLevel 的目的是为了满足内存使用和CPU效率权衡上的不同需求。可以通过以下步骤来选择合适的持久化级别。

    1)如果 RDD 可以很好地与默认的存储级别(MEMORY_ONLY)契合,就不需要做任何修改了。这已经是 CPU 使用效率最高的选项,它使得 RDD 的操作尽可能快。

    2)如果 RDD 不能与默认的存储级别很好契合,则尝试使用 MEMORY_ONLY_SER,并且选择一个快速序列化的库使得对象在有比较高的空间使用率的情况下,依然可以较快被访问。

    3)尽可能不要将数据存储到硬盘上,除非计算数据集函数的计算量特别大,或者它们过滤了大量的数据。否则,重新计算一个分区的速度与从硬盘中读取的速度基本差不多。

    4)如果想有快速故障恢复能力,则使用复制存储级别。所有的存储级别都有通过重新计算丢失数据恢复错误的容错机制,但是复制存储级别可以让任务在 RDD 上持续运行,而不需要等待丢失的分区被重新计算。

    5)在不使用 cached RDD 的时候,及时使用 unpersist 方法来释放它。

    推荐学习目录:

    展开全文
  • 什么是RDD

    千次阅读 2018-08-30 20:56:01
    什么是RDDRDD的transformation和action到底是什么

    RDD是Spark计算的时候操作的数据集,全称是Resilient Distributed Datasets(弹性分布式数据集)。


    数据是分布在多台机器上的,为了好处理,将这些分布的数据抽象成一个RDD。这个RDD就是所有数据的代理,操作RDD就相当于操作分布在每台机器上的数据。

    而RDD又是由很多分区组成的,操作RDD的时候,对RDD里面的每一个分区进行操作。而这些操作真正的会分发到每台机器上,并且拥有容错机制。一个分区针对一台机器,如果那个机器上的数据过大了,就对对应多个分区。

    对RDD的操作分为两种transformation和action:

    transformation可以将一个RDD转换为下一个RDD,也是为了方便下一步操作。

    action是可以触发任务的,当RDD达到一定的条件以后就可以调用action任务,开始真正的处理。

    一下操作就是先将文件transformtion成一个RDD,然后action,这个collect是一个action的算式。将分布在不同机器上的数据通过网络收集过来,并且显示。

    //执行该句只是告诉后续的程序,到该路径下读取数据
    scala> val rdd1 = sc.textFile("hdfs://slave1.hadoop:9000/spark")
    rdd1: org.apache.spark.rdd.RDD[String] = hdfs://slave1.hadoop:9000/spark MapPartitionsRDD[31] at textFile at <console>:24
    
    //action操作,将数据收集过来
    scala> rdd1.collect
    res8: Array[String] = Array(hello jim, hello wo, hello ni, hello jarry)         
    
    //transformaation操作
    scala> val rdd2 = rdd1.map(_.split(" "))
    rdd2: org.apache.spark.rdd.RDD[Array[String]] = MapPartitionsRDD[32] at map at <console>:26
    
    
    //transformation操作
    scala> val rdd3 = rdd1.flatMap(_.split(" "))
    rdd3: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[33] at flatMap at <console>:26
    
    //action操作
    scala> val reduce = wordAndOne.reduceByKey(_+_).sortBy(_._2,false).saveAsTextFile("hdfs://slave1.hadoop:9000/spark/output")
    reduce: Unit = ()
    

    transformation是lazy的,延迟执行,transformtion后的数据只有遇到action的时候才提交到集群上真正的运行。

    如何证明transformation是lazy的呢,首先让我们读取hdfs下一个不存在的文件

    scala> val rdd4 = sc.textFile("/sp")
    rdd4: org.apache.spark.rdd.RDD[String] = /sp MapPartitionsRDD[100] at textFile at <console>:24
    

    哎,发现时可以的,但这个文件是不存在的,为什么会读到呢。因为它就没有真正的去读取,只有执行的action的操作,才会去执行,此时就会报错了。说是文件不存在,所以说transformation是lazy的。

    scala> val rdd4 = sc.textFile("/sp").collect
    org.apache.hadoop.mapred.InvalidInputException: Input path does not exist: hdfs://ns/sp
      at org.apache.hadoop.mapred.FileInputFormat.singleThreadedListStatus(FileInputFormat.java:287)
      at org.apache.hadoop.mapred.FileInputFormat.listStatus(FileInputFormat.java:229)
      at org.apache.hadoop.mapred.FileInputFormat.getSplits(FileInputFormat.java:315)
      at org.apache.spark.rdd.HadoopRDD.getPartitions(HadoopRDD.scala:194)
      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
      at scala.Option.getOrElse(Option.scala:121)
      at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
      at org.apache.spark.rdd.MapPartitionsRDD.getPartitions(MapPartitionsRDD.scala:35)
      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:252)
      at org.apache.spark.rdd.RDD$$anonfun$partitions$2.apply(RDD.scala:250)
      at scala.Option.getOrElse(Option.scala:121)
      at org.apache.spark.rdd.RDD.partitions(RDD.scala:250)
      at org.apache.spark.SparkContext.runJob(SparkContext.scala:2087)
      at org.apache.spark.rdd.RDD$$anonfun$collect$1.apply(RDD.scala:936)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
      at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
      at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
      at org.apache.spark.rdd.RDD.collect(RDD.scala:935)
      ... 48 elided
    
    scala> 

     

    展开全文
  • RDD

    千次阅读 2019-10-10 08:46:28
    RDD<1> 概述一. 什么是RDD二. spark 编程模型1. DataSource2. SparkContext3. Diver(1)SparkConf(2)SparkEnv(3)DAGScheduler(4)TaskScheduler(5)ScheduleBackend二. RDD属性RDD的五个特征包含四个...
  • RDDRDD之间的操作

    2019-04-12 14:00:33
    1. 作用:对源RDD和参数RDD求并集后返回一个新的RDD 要求俩个RDD是相同类型 subtract (otherDataset) 案例 1. 作用:计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来 2. 需求:创建两个RDD,...
  • RDD:五大特性

    万次阅读 2018-10-06 23:31:28
    RDD是一个由多个partition(某个节点里的某一片连续的数据)组成的的list;将数据加载为RDD时,一般会遵循数据的本地性(一般一个hdfs里的block会加载为一个partition)。 2.A function for computing each ...
  • RDD

    千次阅读 2016-10-16 11:45:25
    1. RDD基础(1) 概述RDD其实就是分布式的元素集合。在Spark中,对数据的所有操作不外乎创建RDD,转化RDD以及调用RDD操作进行求值。Spark 中的 RDD 就是一个不可变的分布式对象集合。每个 RDD 都被分为多个分区,这些...
  • RDD

    2014-10-22 19:29:41
    RDD RDD初始参数:上下文和一组依赖 abstract class RDD[T: ClassTag]( @transient private var sc: SparkContext, @transient private var deps: Seq[Dependency[_]] ) extends Serializable 以下需要...
  • RDD

    2018-05-18 22:43:46
    RDD概念 RDD源码中的描述: A Resilient Distributed Dataset (RDD), the basic abstraction in Spark. Represents an immutable,partitioned collection of elements that can be operated on in parallel. This...
  • RDD

    千次阅读 2014-07-19 13:41:41
    RDD是什么东西?在Spark中有什么作用?如何使用? 1、RDD是什么(1)为什么会产生RDD?传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代计算式...
  • Spark RDD常见的转化操作和行动操作

    万次阅读 2018-12-01 11:13:39
    Spark RDD常见的转化操作和行动操作 IDEA 创建scala spark的Mvn项目:https://blog.csdn.net/u014646662/article/details/84618032 spark快速大数据分析.pdf下载:...
  • RDD

    千次阅读 2014-01-01 22:50:56
    RDD是什么东西?在Spark中有什么作用?如何使用?  1、RDD是什么 (1)为什么会产生RDD? 传统的MapReduce虽然具有自动容错、平衡负载和可拓展性的优点,但是其最大缺点是采用非循环式的数据流模型,使得在迭代...
  • PySpark中RDD与DataFrame相互转换操作

    万次阅读 2019-04-20 11:37:06
    1. 弹性数据集RDD RDD是一个抽象的分布式数据集合,它提供了一系列转化操作(例如基本的map()、flatMap()、filter(),类集合操作union()、intersection()、subtract())和行动操作(例如collect()、count()、take...
  • Spark RDD使用详解--RDD原理

    千次阅读 2018-01-16 15:47:47
    RDD简介  在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD)。RDD是Spark的最基本抽象,是对分布式内存的抽象使用,实现了以操作本地集合的方式来操作...
  • RDD弹性分布式数据集(Resilient Distributed Dataset)每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 支 持 两 种 类 型 的 操 作: 转 化 操 作(transformation) 和 行 动 操 作(action)...
  • Spark——RDD操作详解

    千次阅读 2018-04-13 11:31:15
    转载自:...转化操作map()J接收一个函数,把这个函数用于RDD中的每一个元素,将函数的返回结果作为结果RDD中对应元素。而转化操作filter()则接收一个函数,将RDD满足该函数的元素放入新的...
  • Spark RDD使用详解1--RDD原理

    万次阅读 多人点赞 2016-06-22 19:34:07
    在集群背后,有一个非常重要的分布式数据架构,即弹性分布式数据集(Resilient Distributed Dataset,RDD)。它是一种有容错机制的特殊集合,可以分布在集群的节点上,以函数式编操作集合的方式,进行各种并行操作。...
  • Spark RDD 特征及其依赖

    2018-09-23 15:12:43
    1 RDD特征 分区(partition) 有一个数据分片列表,能够将数据进行切分,切分后的数据能够进行并行计算,是数据集的原子组成部分 函数 compute 计算每个分片,得出一个可遍历的结果,用于说明在父RDD上执行何种计算 ...
  • RDD的算子

    2018-09-27 11:20:10
    转换类的算子Transformation,会生成新的RDD,lazy执行的。 所有的transformation只有遇到action才能被执行 行动类的算子action,会立即触发任务的执行,不会生成RDD 把数据写入到相应的介质,展示结果数据(收集...

空空如也

1 2 3 4 5 ... 20
收藏数 71,403
精华内容 28,561
关键字:

rdd