精华内容
下载资源
问答
  • RDD

    千次阅读 2019-10-10 08:46:28
    RDD<1> 概述一. 什么是RDD二. spark 编程模型1. DataSource2. SparkContext3. Diver(1)SparkConf(2)SparkEnv(3)DAGScheduler(4)TaskScheduler(5)ScheduleBackend二. RDD属性RDD的五个特征包含四个...

    RDD

    <1> 概述

    一. 什么是RDD

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,

    1. RDD是Spark中的抽象数据结构类型,Spark中最基本的数据抽象,实现了以操作本地集合的方式来操作分布式数据集的抽象实现
    2. 它代表一个不可变、可分区、里面的元素可并行计算的集合
    3. RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性
    4. RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
    5. RDD是Spark最核心的东西,RDD必须是可序列化的。RDD可以cache到内存中,省去了MapReduce大量的磁盘IO操作
    6. 任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,RDD中的数据是分区存储的可以分布在不同的机器上,同时可以被并行处理
    7. 作用:Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。

    二. spark 编程模型

    在这里插入图片描述

    1. RDD被表示为对象;
    2. 通过对象上的方法调用来对RDD进行转换;
    3. 最后输出结果 或是 向存储系统保存数据;
    4. RDD转换算子被称为Transformation;
    5. 只有遇到Action算子,才会执行RDD的计算(懒执行)

    1. DataSource

    (1)定义:spark的数据来源
    (2)分类:

    1. DB(数据库)
    2. File System (文件系统)
    3. Socket (传输)
    4. Hdfs , HBase… …

    2. SparkContext

    (1)RDD是一个对象
    (2)是Spark的第一个类的入口,负责集群的交互
    (3)用于连接Spark集群,创建RDD,累加器,广播变量… …
    (4)Method : Transformation (转换)
    Action (动作)
    (5)Spark 的物理模型
    Driver 主要是对SparkContext进行配置、初始化以及关闭。初始化SparkContext是为了构建Spark应用程序的运行环境,在初始化SparkContext,要先导入一些Spark的类和隐式转换;在Executor部分运行完毕后,需要将SparkContext关闭。
    在这里插入图片描述
    在Executor中完成数据的处理,数据有以下几种:
    1、Scala集合数据(测试)
    2、文件系统、DB(SQL、NOSQL)的数据
    3、RDD
    4、网络
    Driver : 主节点 ,主要是对SparkContext进行配置、初始化以及关闭。初始化就是构建运行环境(导入类和隐式转换)
    RAM: 随机存取存储器(内存)
    如果Spark集群是服务器则Driver是客户端:driver发送tasks给工作节点,worker返回结果给driver

    3. Diver

    在这里插入图片描述
    在这里插入图片描述

    (1)SparkConf

    是Spark的配置类,配置项包括:master、appName、Jars、ExecutorEnv等等 (键值对形式存储)

    (2)SparkEnv

    a.利用Rpc协议 ---->心跳机制 传输数据
    b.维护Spark的执行环境,有:serializer、RpcEnv、Block Manager、内存管理等

    (3)DAGScheduler

    a.高层调度器
    b.将Job按照RDD的依赖关系划分成若干个TaskSet(任务集),也称为Stage(阶段,时期);再结合当前缓存情况及数据就近的原则,将Stage提交给TaskScheduler
    在这里插入图片描述

    (4)TaskScheduler

    负责任务调度资源的分配.

    (5)ScheduleBackend

    负责集群资源的获取和调度。

    二. RDD属性

    RDD的五个特征包含四个函数和一个属性:

    在这里插入图片描述

    1. def computer(split:Partition, context:TaskContext):Interator[T]
      // 对一个分片进行计算,得出一个可遍历的结果
    2. protected def getPartitions:Array[Partition]
      // 只计算一次
    3. protected def getDependencies:Seq[Dependency[_]] = deps
      //只计算一次,计算RDD对父RDD的依赖
    4. protected def getPreferredLocations(split:Partition):Seq[String] = Nil
      // 可选,制定优先位置,输入的参数是split分片,输出的结果是一组优点的节点位置
    5. @transient
      Val partitioner:Option[Partiotion] = None
      // 可选,分区的方法,针对分区的方法(哈希、RangePartitioner)类似MR中的接口控制key的去向(默认hashPartitioner)
      Spark 以一个弹性分布式数据集(RDD)的概念为中心,它是一个容错且可以执行并行操作的元素的集合。

    1. 一组分片(Partition)分片

    一组分片(Partition),即数据集的基本组成单位。每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目(设置的最大core数)。

    2. 一个计算每个分区的函数

    (2)一个计算每个分区的函数。RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据.Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

    3. 依赖关系

    (3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系(窄依赖(有一对一),宽依赖(多对多))。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

    4. RDD的分片函数

    (4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

    5. 一个列表

    (5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

    6. 只读的

    (6)只读:RDD是只读的,要想改变RDD中的数据,只能创建一个新的RDD由一个RDD转换到另一个RDD,通过操作算子(map、filter、union、join、reduceByKey… …)实现,不再像MR那样只能写map和reduce了。

    三. RDD特点

    1、分区

    RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。
    在这里插入图片描述
    RDD分区

    1.1 原因:

    要处理的原始数据很大,会被分成很多个分区,分别保存在不同的节点上。

    1.2 目的:

    设置合理的并行度,提高数据处理的性能。

    1.3.原则:

    RDD分区的一个分区原则是:
    尽可能使得分区的个数,等于集群核心数目;
    尽可能使同一 RDD 不同分区内的记录的数量一致;

    1.4.如何分区?

    Spark包含两种数据分区方式:HashPartitioner(哈希分区)RangePartitioner(范围分区),数据分区方式只作用于<Key,Value>形式的数据

    1.4.1 HashPartitioner

    HashPartitioner采用哈希的方式对<Key,Value>键值对数据进行分区。其数据分区规则为 partitionId = Key.hashCode % numPartitions,其中partitionId代表该Key对应的键值对数据应当分配到的Partition标识,Key.hashCode表示该Key的哈希值,numPartitions表示包含的Partition个数。

    1.4.2 RangePartitioner

    Spark引入RangePartitioner的目的是为了解决HashPartitioner所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题。HashPartitioner采用哈希的方式将同一类型的Key分配到同一个Partition中,因此当某一或某几种类型数据量较多时,就会造成若干Partition中包含的数据过大问题,而在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢。RangePartitioner基于抽样的思想来对数据进行分区。
    先Hash数据倾斜时在sample重新采样

    1.5 .分区器

    HashPartitioner & RangePartitioner

    1.5.1 作用

    决定了RDD中分区的个数;
    RDD中每条数据经过Shuffle过程属于哪个分区;
    reduce的个数;
    注意:

    1. 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区器的值是None。.
    2. 如果是对键操作,则子RDD不再继承父RDD的分区器,但是分区数会继承.
    1.5.2 HashPartitioner (默认)

    hashCode%分区数=余数—>决定在那个区
    该分区方法保证key相同的数据出现在同一个分区中。
    易发生数据倾斜.

    1.5.3 RangePartitioner (sort之类)

    定义:
    简单的说就是将一定范围内的数映射到某一个分区内
    sample采样抽样确定边界

    1.5.4 什么操作会导致子RDD失去父RDD的分区方式?

    如果是对键操作,则子RDD不再继承父RDD的分区器,但是分区数会继承.
    使用map()算子生成的RDD,由于该转换操作理论上可能会改变元素的键(Spark并不会去判断是否真的改变了键),所以不再继承父RDD的分区器

    1.5.5 多元RDD的分区操作后,子RDD如何继承分区信息?

    对于两个或多个RDD的操作,生成的新的RDD,其分区方式,取决于父RDD的分区方式。如果两个父RDD都设置过分区方式,则会选择第一个父RDD的分区方式。

    1.6 函数操作
    1.6.1. 查看分区方式

    rdd.partitioner

    1.6.2. 查看分区个数

    rdd.getNumPartitions == rdd.partitions.size

    1.6.3. 查看分区存储规律

    getElement(rdd)

    1.6.4. 获得默认分区数

    rdd.defsultParallelism

    1.6.5. 重新定义partitioner主动使用分区

    用户可通过partitionBy主动使用分区器,通过partitions参数指定想要分区的数量。
    可重新定义partitioner主动使用分区
    val rdd1= rdd.partitionBy(new arg.apache.spark.HashPartitioner(2))

    1.6.6. 重新设置分区coalesce() 和 repartition()
    coalesce() 和 repartition()
    val rdd=sc.makeRDD(arr,9)	
    rdd.coalesce(num,false)=rdd.coalesce(num)
    注意:num的数值必须小于原分区的数量(因为是false)
    rdd.repartition(num)==rdd.coalesce(num,true)
    注意: num可大可小于原分区的数值
    

    2、只读

    RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD;由一个RDD转换到另一个RDD,可以通过丰富的操作算子(map、filter、union、join、reduceByKey… …)实现,不再像MR那样只能写map和reduce了。
    在这里插入图片描述

    3、依赖

    RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系(lineage),也称之为依赖。依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的;另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
    在这里插入图片描述

    3.1.窄依赖:

    每个父RDD的一个Partition最多被子RDD的一个Partition所使用,(1:1 n:1)

    3.2.宽依赖:

    一个父RDD的Partition会被多个或所有子RDD的Partition所使用,(1:n n:n)

    3.3.作用:

    其一用来解决数据容错;
    其二用来划分stage。

    3.4.stage的划分
    (1)划分:

    action触发job,依照RDD依赖关系切分若干TaskSet即(stage) ; task任务处理的最小单元
    划分Stage:
    1.从后向前遍历,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到Stage中;
    2.每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition数量决定的
    3.最后一个Stage里面的任务的类型是ResultTask,前面所有其他Stage里面的任务类型都是ShuffleMapTask
    4.代表当前Stage的算子一定是该Stage的最后一个计算步骤

    (2)用宽依赖划分stage原因:

    相比于宽依赖,窄依赖对优化很有利,

    1. 宽依赖对应着shuffle操作,数据传输时宽依赖对应多个节点的传输
    2. 当RDD分区丢失时(某个节点故障),spark会对数据进行重算时,宽依赖重算的效用不仅在于算的多,还在于有多少是冗余的计算
      在这里插入图片描述
      b1分区丢失,则需要重新计算a1,a2和a3,这就产生了冗余计算
      (a1,a2,a3中对应b2的数据)
    3. 窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;
    4. 窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区
    3.5. 查看依赖
    3.5.1. 查看依赖长度

    rdd1.dependencies.size

    3.5.2. 查看所有父RDD

    rdd1.dependencies.collect

    3.5.3. 查看第一个父RDD数据并转换格式

    rdd1.dependencies(0).rdd.collect.asInstanceOf[Array[Int]]

    4、持久化(缓存)

    可以控制存储级别(内存、磁盘等)来进行持久化。如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。

    4.1. cache持久化

    cache实际上是persist的一种简化方式,是一种懒执行的,执行action类算子才会触发,cahce后返回值要赋值给一个变量,下一个job直接基于变量进行操作。

    4.2. persist:

    可以指定持久化的级别。最常用的是MEMORY_ONLY,MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_AND_DISK_SER。”_2”表示有副本数。

    4.3. cache和persist

    cache和persist算子后不能立即紧跟action算子。因为rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。

    4.4. checkpoint

    checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。
    checkpoint 的执行原理:
    (1) 当RDD的job执行完毕后,会从finalRDD从后往前回溯。
    (2) 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
    (3) Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
    优化:
    对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。

    四. 持久化

    1.定义:

    spark中控制算子也是懒执行的,需要Action算子触发才能执行,主要是为了对数据进行缓存。可以构建迭代式算法和快速交互式查询.

    2.容错机制

    当存储于内存的数据由于内存不足而被删除时,RDD的缓存的容错机制执行,丢失的数据会被重算。RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition.

    2.1.原因

    为了防止计算失败后从头开始计算造成的大量开销,RDD会checkpoint计算过程的信息,这样作业失败后从checkpoint点重新计算即可,提高效率过对RDD启动Checkpoint机制来实现容错和高可用;

    2.2.checkpoint使用场景
    1. 会被重复使用的(但是)不能太大的RDD需要persist或者cache 。
    2. 运算时间很长或运算量太大才能得到的 RDD,computing chain 过长或依赖其他 RDD 很多的 RDD。
    3. 针对整个RDD计算链条中特别需要数据持久化的环节(后面会反复使用当前环节的RDD)使用checkpoint
    2.3.checkpoint和cache,persist的区别
    1. 缓存后checkpoint会斩断RDD的依赖,cache,persist依然有依赖
    2. 磁盘或内存可能被清理,但是checkpoint的数据通常保存到hdfs上,放在了高容错文件系统。
    3. rdd.persist() 将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,也就是 executor 所在进程stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的。
    2.4.checkpoint操作
    1. 创建RDD
      val cktest = sc.parallelize(1 to 100000)
    2. 设置存储路径(HDFS或者本地路径)
      sc.setCheckpointDir("/tmp/checkpoint")–>hdfs的
    3. 执行checkpoint
      cktest2.checkpoint
      // checkpoint是lazy操作 检查是否缓存
      cktest2.isCheckpointed
      // 再次查看RDD的依赖关系
      cktest2.dependencies.collect
      //查看RDD所依赖的checkpoint文件
      cktest2.getCheckpointFile

    3.RDD控制算子

    (1) cache :

    每一个节点都将把计算的分片结果缓存到内存rdd.cache()==rdd.persist(MERMORY_ONLY)

    (2) persist

    ①指定参数缓存在缓存或磁盘
    ②rdd.unpersist() 把持久化的RDD从缓存中移除;

    (3) checkpoint

    缓存在hdfs和磁盘,并切断依赖
    开头要写sc.setCheckPointDir()
    在执行checkpoint()

    4.控制算子的使用

    1. 如果多个动作需要用到某个 RDD,而它的计算代价又很高,使用缓存
    2. 注意:当进行了RDD0→RDD1→RDD2的计算作业,计算结束时,RDD1就已经缓存在系统中了,如果在进行RDD0 →RDD1→RDD3的计算作业时,只须进行RDD1→RDD3的计算,RDD1已经在缓存中0->1的转换不会重复进行
    3. 使用persist()方法对一个RDD标记为持久化,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化

    5.pesist

    MEMORY_ONLY 将RDD 作为反序列化的的对象存储JVM中。如果RDD不能被内存装下,一些分区将不会被缓存,并且在需要的时候被重新计算,按照LRU(Least recently used,最近最少使用)原则替换缓存中的内容。默认的缓存级别 等于cache()
    MEMORY_AND_DISK 将RDD作为反序列化的的对象存储在JVM中。如果RDD不能被与内存装下,超出的分区将被保存在硬盘上,并且在需要时被读取(优先存取内存)
    MEMORY_ONLY_SER 将RDD作为序列化的的对象进行存储(每一分区占用一个字节数组)。通常来说,这比将对象反序列化的空间利用率更高,尤其当使用fast serializer,但在读取时会比较占用CPU
    MEMORY_AND_DISK_SER 与MEMORY_ONLY_SER 相似,但是把超出内存的分区将存储在硬盘上而不是在每次需要的时候重新计算
    DISK_ONLY 只将RDD分区存储在硬盘上
    DISK_ONLY_2 、等带2的 与上述的存储级别一样,但是将每一个分区都复制到集群的两个结点上

    6.如何使用缓存

    1. 如果RDD的数据可以很好的兼容默认存储级别(MEMORY_ONLY),那么优先使用它,这是CPU工作最为高效的一种方式,可以很好地提高运行速度;
    2. 如果(1)不能满足,则尝试使用MEMORY_ONLY_SER,且选择一种快速的序列化工具,也可以达到一种不错的效果;
    3. 由于数据量大,一般不会持久化到磁盘,除非计算是非常“昂贵”的或者计算过程会过滤掉大量数据,因为重新计算一个分区数据的速度可能要高于从磁盘读取一个分区数据的速度;
    4. 备份:如果需要快速的失败恢复机制,则使用备份的存储级别,如MEMORY_ONLY_2、MEMORY_AND_DISK_2;虽然所有的存储级别都可以通过重新计算丢失的数据实现容错,但是缓存机制使得大部分情况下应用无需中断,即数据丢失情况下,直接使用缓存数据,而不需要重新计算数据的过程;
    5. 如果处于大内存或多应用的场景下,OFF_HEAP可以带来以下的好处:它允许Spark Executors可以共享Tachyon的内存数据;它很大程序上减少JVM垃圾回收带来的性能开销;Spark Executors故障不会导致数据丢失。
      OFF_HEAP与MEMORY_ONLY_SER类似,但将数据存储在 堆外内存中。这需要启用堆外内存。

    五. 创建RDD

    1. 合并行化创建 (通过scala集合创建)

    通过集合并行化方式创建RDD,适用于本地测试,做实验
    scala中的本地集合 -->Spark RDD
    spark-shell --master spark://master:7077

    1. parallelize(数据集)方法
      scala> val arr = Array(1,2,3,4,5)
      scala> val rdd = sc.parallelize(arr)
    2. makeRDD(数据集)方法
      scala> val arr = Array(1,2,3,4,5)
      scala> val rdd = sc.makeRDD(arr)
      scala> rdd.collect
      res0: Array[Int] = Array(1, 2, 3, 4, 5)

    2. 文件系统 , 比如 HDFS

    1. 读取HDFS文件系统(默认)
      val rdd2 = sc.textFile(“hdfs://master:9000/words.txt”)
      val line=sc.textFile(“words.txt”)
      这两个是相等的
    2. 读取本地文件
      val rdd2 = sc.textFile(“file:///root/words.txt”)
      scala> val rdd2 = sc.textFile(“file:root/word.txt”)
      scala> rdd2.collect
      res2: Array[String] = Array(hadoop hbase java, hbase java spark, java, hadoop hive hive, hive hbase)

    3. 从父RDD转换成新的子RDD

    调用 Transformation 类的方法,生成新的 RDD只要调用transformation类的算子,都会生成一个新的RDD。RDD中的数据类型,由传入给算子的函数的返回值类型决定.
    注意:action类的算子,不会生成新的 RDD
    scala> rdd.collect
    res3: Array[Int] = Array(1, 2, 3, 4, 5)
    scala> val rdd = sc.parallelize(arr)
    scala> val rdd2 = rdd.map(_*100)
    scala> rdd2.collect
    res4: Array[Int] = Array(100, 200, 300, 400, 500)
    使用 rdd.partitions.size 查看分区数量
    scala> rdd.partitions.size
    res7: Int = 4
    scala> rdd2.partitions.size
    res8: Int = 4

    RDD操作(惰性求值)

    在这里插入图片描述
    RDD是惰性求值的,整个转换过程只是记录了转换的轨迹,没有发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作.

    <2> RDD算子

    1. 常见算子

    这是一个全的RDD算子案例
    http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

    动作 含义
    reduce(func) 通过func函数对RDD中的所有元素处理,传入函数必须满足交换律和结合律(传入两个参数输入返回一个值)
    collect() 在驱动程序中,以数组的形式返回数据集的所有元素
    count() 返回RDD的元素个数
    first() 返回RDD的第一个元素(类似于take(1))
    take(n) 返回一个由数据集的前n个元素组成的数组
    takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子初始值
    saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它封装为文件中的文本
    saveAsSequenceFile(path) 将数据集中的元素以二进制的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
    countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
    foreach(func) 在数据集的每一个元素上,运行函数func进行更新。

    2. 打印元素

    一般会采用语句rdd.foreach(println)

    1. 本地模式(local)
      会打印出一个RDD中的所有元素
    2. 集群模式
      1)Driver Program中的stdout是不会显示打印语句的这些输出内容的;在worker节点上执行打印语句是输出到worker节点的stdout中,而不是输出到任务控制节点Driver Program中.需要到worker节点日志查看
      2)rdd.take(num).foreach(println);
      3)rdd.collect().foreach(println)

    3. Pair RDD

    3.1.什么是Pair RDD

    键值对(K,V),RDD的数据集,RDD的键值对操作

    3.2.创建Pair RDD

    (1)
    val list = List(“Hadoop”,“Spark”,“Hive”,“Spark”)
    val rdd = sc.parallelize(list)
    val pairRDD = rdd.map(word => (word,1))
    val pairRDD = rdd.map((,1))
    (2)
    val lines = sc.textFile(“file://+Path”)
    val pairRDD = lines.flatMap(
    .split(" ")).map((_,1))

    3.3.常见操作

    1)reduceByKey和 groupByKey 区别:

    reduceByKey:
    在这里插入图片描述

    用于对每个key对应的多个value进行merge操作,它能够在本地先进行merge操作
    每个分区先进行merge操作然后再所有分区进行汇总merge操作,reduceByKey的效率高

    groupByKey:
    在这里插入图片描述
    对每个key进行操作,但只生成一个sequence。
    所有分区进行汇总再merge操作,集群节点之间的开销很大

    2)常见算子

    常用的Transformation:
    转换 含义
    map(func) 返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
    filter(func) 返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
    flatMap(func) 类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
    mapPartitions(func) 类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
    mapPartitionsWithIndex(func) 类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]sample(withReplacement, fraction, seed)根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
    union(otherDataset) 对源RDD和参数RDD求并集后返回一个新的RDD
    intersection(otherDataset) 对源RDD和参数RDD求交集后返回一个新的RDD
    distinct([numTasks])) 对源RDD进行去重后返回一个新的RDD
    groupByKey([numTasks]) 在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
    reduceByKey(func, [numTasks]) 在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks]) 先按分区聚合 再总的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 对k/y的RDD进行操作
    sortByKey([ascending], [numTasks]) 在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
    sortBy(func,[ascending], [numTasks]) 与sortByKey类似,但是更灵活 第一个参数是根据什么排序 第二个是怎么排序 false倒序 第三个排序后分区数 默认与原RDD一样
    join(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD 相当于内连接(求交集)
    cogroup(otherDataset, [numTasks]) 在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
    cartesian(otherDataset) 两个RDD的笛卡尔积 的成很多个K/V
    pipe(command, [envVars]) 调用外部程序
    coalesce(numPartitions) 重新分区 第一个参数是要分多少区,第二个参数是否shuffle 默认false 少分区变多分区 true 多分区变少分区 false
    repartition(numPartitions) 重新分区 必须shuffle 参数是要分多少区 少变多
    repartitionAndSortWithinPartitions(partitioner) 重新分区+排序 比先分区再排序效率高 对K/V的RDD进行操作
    foldByKey(zeroValue)(seqOp) 该函数用于K/V做折叠,合并处理 ,与aggregate类似 第一个括号的参数应用于每个V值 第二括号函数是聚合例如:+
    combineByKey 合并相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
    partitionBy(partitioner) 对RDD进行分区 partitioner是分区器 例如new HashPartition(2)
    cache、persist RDD缓存,可以避免重复计算从而减少时间,区别:cache内部调用了persist算子,cache默认就一个缓存级别MEMORY-ONLY ,而persist则可以选择缓存级别
    Subtract(rdd) 返回前rdd元素不在后rdd的rdd
    leftOuterJoin leftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
    rightOuterJoin rightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可
    subtractByKey substractByKey和基本转换操作中的subtract类似只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素
    常用的Action:

    触发代码的运行,我们一段spark代码里面至少需要有一个action操作。

    动作 含义
    reduce(func) 通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
    collect() 在驱动程序中,以数组的形式返回数据集的所有元素
    count() 返回RDD的元素个数
    first() 返回RDD的第一个元素(类似于take(1))
    take(n) 返回一个由数据集的前n个元素组成的数组
    takeSample(withReplacement,num, [seed]) 返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
    takeOrdered(n, [ordering])
    saveAsTextFile(path) 将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
    saveAsSequenceFile(path) 将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
    saveAsObjectFile(path)
    countByKey() 针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
    foreach(func) 在数据集的每一个元素上,运行函数func进行更新。
    aggregate 先对分区进行操作,在总体操作

    4. 共享变量

    4.1. 定义

    Spark在集群的多个不同节点的多个任务上并行运行一个函数时,把一个变量设为共享变量,就可以在不同节点使用其变量的值. 注意: 共享变量只读

    4.2. 分类

    广播变量(broadcast variables)
    累加器(accumulators)

    4.3. 广播变量(broadcast variables)

    Spark的Action操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播

    4.3.1 设置

    val broadcastVar = sc.broadcast(Array(1, 2, 3))

    4.3.2 获取

    broadcastVar.value

    4.4. 累加器 ( accumulators )

    sc.longAccumulator()
    sc.doubleAccumulator()

    4.案例

    1.集合的交并差

    交集: 数据集1.intersection(数据集2)
    并集: 数据集1.union(数据集2)
    差集: 数据集1.subtract(数据集2)

    2.corgroup

    cogroup(otherDataset, [numTasks])
    在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD, 也就是相同K的数据集不同为空(k,(,v2))或者(k,(v1,)),相同的如统join类似(k,(v1,v2)) ,numTasks为并发的任务数
    在这里插入图片描述

    3.最值

    // 同时找最大、最小值。缺点:不适合在大数据量情况下运行
    val rdd1 = sc.makeRDD((1 to 20).toArray)
    rdd1.map((1,_)).groupByKey()
    .map(x=>(x._2.min, x._2.max)).collect
    // 对于仅有一个元素(基本数据类型)的RDD有更简便的方法
    rdd1.stats / rdd1.max / rdd1.min / rdd1.mean

    4. 排序

    4.1.单一数组的排序
    val out=sc.textFile().filter(_.trim.size!=0)		//空行排除
    		.map(_.split(“ ”).trim.toInt)			//拆分并去除首尾空格
    		.sortBy(_,false)						//排序方式
    		.take(num)							//取前几
    		.foreach(println)
    
    4.2.分组元组排序
    val out=sc.textFile()
    		.filter(_.trim.size!=0)
    		.map(_.split(“ “).trim)
    		.map(x=>(x(0),x(1)))	//变为元组
    		.groupByKey()				//生成(k,(v1,v2..))
    		//排序
    		.map(x=>(x._1,x._2.toList.sorted.reverse.take(3)))
    		.collect	
    		
    val rdd1 = sc.textFile("").map(line => {
    		  val value = line.split(" ")
    		  (value(0), value(1).toDouble)
    		}).groupByKey()	
    		.map(x=>(x._1,x._2.toList.sorted.reverse.take(3)))
    		.collect	
    

    这两个相同只不过map拆开而已

    4.3.求平均值

    val ar=Array((“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6))
    val rdd=sc.makeRDD(ar)

    1. 使用(groupByKey)
      rdd.groupBykey()
      .map(x=>(x._1,x._2.sum.toDouble/x._2.size))
      .collect
    2. 使用(reduceByKey)
      mapValues增加标志位并求和来查看相同的key的个数
      rdd.mapValues(x=>(x,1))
      .reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
      .mapValues(x=>(x._1.toDouble/x._2))
    3. 使用(foldByKey)
      和reduceByKey类似
      rdd.mapValues((_,1))
      .foldByKey((0,0))((x,y)=>(x._1+y._1,x._2+y._2))
      .mapValues(x=>(x._1.toDouble/x._2))
    4.4.求多列数据的平均值
    1. 生成一个RDD,每行包含2个数字,要计算这两列数据的平均值

       import scala.util.Random
       val random = new Random(100)
       val arr =(1 to 100).map(x=>(random.nextInt(1000), random.nextInt(1000))).toArray
       val rdd = sc.makeRDD(arr)
      
    2. 使用stat分别计算,算法简单,效率低)

       val (firstAvg,secondAvg)=(rdd.map(_._1).stats.mean,
       rdd.map(_._2).stats.mean)
       如果有一列就得计算一次效率低
      
    3. (利用了Vector,高效。需要借助外部的数据结构):
      Vector数组相加里面相同下标的元素相加(仅有它是可以的),但也要加上标志位好知道个数

       import breeze.linalg.Vector
       val (sums, count)=
       rdd.map(x=>((Vector(x._1.toDouble, x._2.toDouble), 1.0)))
       .reduce((x,y) => (x._1+y._1,x._2 + y._2))
       val means = sums/count
      
    4. (自定义方法实现:数组累加):

       不使用Vector数组的相加,使用常用数组拉链的方法,
       再map映射成一个内部元素相加和的数组
       但也要加标志位1看个数
       def arrAdd(x: Array[Int], y:Array[Int]): Array[Int] = x.zip(y).map(x=>x._1+x._2)
       val (sums, count) =
        rdd.map(x=>(Array(x._1, x._2), 1)) //加标志位
       .reduce((x,y) => (arrAdd(x._1, y._1), x._2+y._2)) 
       //数组执行方法,数量相加
       val means = sums.map(_.toDouble/count)
      
    4.5.二次排序
    1. 数据来源

       import scala.util.Random
       val random = new Random(100)
       val arr = (1 to 10000).map(x=>(random.nextInt(100), random.nextInt(100)))
      
    2. 自定义函数继承Ordered接口或实现Orderring方法

       case class MyObject(x:Int, y:Int) extends Ordered[MyObject]{
         def compare(other:MyObject):Int = {
           if (x - other.x !=0) x - other.x   else y - other.y  }}
      
    3. 调用方法排序

       val rdd = sc.makeRDD(arr)
       val rdd1 = rdd.map(pair=>(new MyObject(pair._1, pair._2),pair))
       val sorted = rdd1.sortByKey(false)
       注意:二次排序后的结果写成文件,文件内及文件间的数据是有顺序的。但是文件间的顺序是不保证的!!!
       val lines = sc.textFile(" ") 
       val rdd1  = lines.map(line=>(new MyObject(line.split(" ")(0).toInt, line.split(" ")(1).toInt),line) )  //元组((k0,k1),k)
       val sorted = rdd1.sortByKey(false)       
       val result = sorted.map(sortedLine =>sortedLine._2)  //要k1     
       result.collect().foreach (println)
      

    Java CountWord

    1. 创建SparkConf
      SparkConf conf =new SparkConf().setAppName().setMaster();
    2. 创建JavaSparkContext(这是Java)
      JavaSparkContext sc=new JavaSparkContext(conf);
    3. 创建RDD
    4. JavaRDD<类型> 变量=sc.textFile()
      JavaRDD lines=sc.textFile();
      注意:拆分行时读取的行成为字符串数组(使用Arrays.asList()转成列表操作切分)数组无法拆分
      JavaRDD words=lines.
      flatMap(line->Arrays.asList(line.split(“ ”)).iterator());
    5. 变成JavaPairRDD<KeyType,ValueType> 键值对使用
      mapToPair()方法和new Tuple2(s,1)
      JavaPairRDD<String,Integer> pairs=
      words.mapToPair(s->new Tuple2(s,1))
    6. 计算 注意:在Java中没有占位符 (_) 这个说法
      JavaPairRDD<String,Integer> count=
      pairs.reduceByKey((a,b)->a+b)
    7. 结果存储到List中foreach遍历
      List(Tuple2<String,integer>) list=count.collect();
      for(Tuple2<String,integer> tuple : list){
      System.out.println( tuple._1+””+tuple._2);
      }

    在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述

    展开全文
  • Spark RDD简单操作

    万次阅读 2018-11-30 15:45:55
    Spark RDD操作 spark快速大数据分析.pdf下载:https://download.csdn.net/download/u014646662/10816588 IDEA 创建scala spark的Mvn项目:https://blog.csdn.net/u014646662/article/details/84618032 1、Spark ...

    Spark RDD操作

    spark快速大数据分析.pdf下载https://download.csdn.net/download/u014646662/10816588

    IDEA 创建scala spark的Mvn项目https://blog.csdn.net/u014646662/article/details/84618032

    1、Spark RDD转化操作

    2、Spark RDD行动操作

    3、惰性求值

    对人工智能感兴趣的同学,可以点击以下链接:

    现在人工智能非常火爆,很多朋友都想学,但是一般的教程都是为博硕生准备的,太难看懂了。最近发现了一个非常适合小白入门的教程,不仅通俗易懂而且还很风趣幽默。所以忍不住分享一下给大家。点这里可以跳转到教程。

    https://www.cbedai.net/u014646662

    RDD 支持两种操作:转化操作和行动操作。

    RDD 的转化操作是返回一个新的RDD 的操作,比如map() 和filter(),而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如count() 和first()。Spark 对待转化操作和行动操作的方式很不一样,因此理解你正在进行的操作的类型是很重要的。如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是RDD,而行动操作返回的是其他的数据类型。

    Spark RDD转化操作

    RDD 的转化操作是返回新RDD 的操作。转化出来的RDD 是惰性求值的,只有在行动操作中用到这些RDD 时才会被计算。许多转化操作都是针对各个元素的,也就是说,这些转化操作每次只会操作RDD 中的一个元素。不过并不是所有的转化操作都是这样的。举个例子,我们筛选默认使用bash的用户

    scala> var users = sc.textFile("passwd")
    users: org.apache.spark.rdd.RDD[String] = passwd MapPartitionsRDD[12] at textFile at <console>:27
    
    scala> var bashUser = users.filter(line => line.contains("/bin/bash"))
    bashUser: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:29

    注意,filter() 操作不会改变已有的users 中的数据。实际上,该操作会返回一个全新的RDD。users在后面的程序中还可以继续使用,比如我们还可以从中搜索别的单词。

    接下来,我们使用另一个转化操作union() 来打印出包含bash或nologin的行数。

    scala> var users = sc.textFile("passwd")
    users: org.apache.spark.rdd.RDD[String] = passwd MapPartitionsRDD[12] at textFile at <console>:27
    
    scala> var bashUser = users.filter(line => line.contains("/bin/bash"))
    bashUser: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[13] at filter at <console>:29
    
    scala> var nologins = users.filter(line => line.contains("/sbin/nologin"))
    nologins: org.apache.spark.rdd.RDD[String] = MapPartitionsRDD[14] at filter at <console>:29
    
    scala> var bash_nologin = bashUser.union(nologins)
    bash_nologin: org.apache.spark.rdd.RDD[String] = UnionRDD[15] at union at <console>:33
    

    union() 与filter() 的不同点在于它操作两个RDD 而不是一个。转化操作可以操作任意数量的输入RDD。

    最后要说的是,通过转化操作,你从已有的RDD 中派生出新的RDD,Spark 会使用谱系图(lineage graph)来记录这些不同RDD 之间的依赖关系。Spark 需要用这些信息来按需计算每个RDD,也可以依靠谱系图在持久化的RDD 丢失部分数据时恢复所丢失的数据。(对bash_nologin 做一个行动操作就会生成下面的图)

    Spark RDD行动操作

    转化操作是不会自动做计算的,只有执行力行动操作才会运算

    我们继续对上述例子操作:首先统计个数

    
    scala> bashUser.count
    res19: Long = 10
    
    scala> nologins.count
    res20: Long = 37
    
    scala> bash_nologin.count
    res21: Long = 47
    

    三个统计显示:默认使用bash的用户有10个,不可登录的用户37个,两者合并后47个

    使用take收集一些元素

    scala> bash_nologin.take(15).foreach(println)
    root:x:0:0:root:/root:/bin/bash
    hdfs:x:492:491:Hadoop HDFS:/var/lib/hadoop-hdfs:/bin/bash
    yarn:x:491:490:Hadoop Yarn:/var/lib/hadoop-yarn:/bin/bash
    impala:x:490:489:Impala:/var/lib/impala:/bin/bash
    mapred:x:489:488:Hadoop MapReduce:/var/lib/hadoop-mapreduce:/bin/bash
    mysql:x:27:27:MySQL Server:/var/lib/mysql:/bin/bash
    kms:x:482:482:Hadoop KMS:/var/lib/hadoop-kms:/bin/bash
    llama:x:500:481:Llama:/var/lib/llama:/bin/bash
    httpfs:x:481:480:Hadoop HTTPFS:/var/lib/hadoop-httpfs:/bin/bash
    cloudera:x:501:501::/home/cloudera:/bin/bash
    bin:x:1:1:bin:/bin:/sbin/nologin
    daemon:x:2:2:daemon:/sbin:/sbin/nologin
    adm:x:3:4:adm:/var/adm:/sbin/nologin
    lp:x:4:7:lp:/var/spool/lpd:/sbin/nologin
    mail:x:8:12:mail:/var/spool/mail:/sbin/nologin

    take()只能 获取了RDD 中的少量元素,而collect可以获取RRD所有元素

    scala> bash_nologin.collect().foreach(println)
    root:x:0:0:root:/root:/bin/bash
    hdfs:x:492:491:Hadoop HDFS:/var/lib/hadoop-hdfs:/bin/bash
    yarn:x:491:490:Hadoop Yarn:/var/lib/hadoop-yarn:/bin/bash
    ......

    注意:collect() 不能用在大规模数据集上。他会把数据放到单台机器的内存中

    当RDD特别大时,可以把RDD中的数据存到文件中,例如存储到文本文档saveAsTextFile()

    bash_nologin.saveAsTextFile("bash_nologin")
    

    惰性求值

    前面提过,RDD 的转化操作都是惰性求值的。这意味着在被调用行动操作之前Spark 不会开始计算。这对新用户来说可能与直觉有些相违背之处,但是对于那些使用过诸如Haskell等函数式语言或者类似LINQ 这样的数据处理框架的人来说,会有些似曾相识。

    惰性求值意味着当我们对RDD 调用转化操作(例如调用map())时,操作不会立即执行。相反,Spark 会在内部记录下所要求执行的操作的相关信息。我们不应该把RDD 看作存放着特定数据的数据集,而最好把每个RDD 当作我们通过转化操作构建出来的、记录如何计算数据的指令列表。把数据读取到RDD 的操作也同样是惰性的。因此,当我们调用sc.textFile() 时,数据并没有读取进来,而是在必要时才会读取。和转化操作一样的是,读取数据的操作也有可能会多次执行。

    虽然转化操作是惰性求值的,但还是可以随时通过运行一个行动操作来强制Spark 执行RDD 的转化操作,比如使用count()。这是一种对你所写的程序进行部分测试的简单方法。

    Spark 使用惰性求值,这样就可以把一些操作合并到一起来减少计算数据的步骤。在类似Hadoop MapReduce 的系统中,开发者常常花费大量时间考虑如何把操作组合到一起,以减少MapReduce 的周期数。而在Spark 中,写出一个非常复杂的映射并不见得能比使用很多简单的连续操作获得好很多的性能。因此,用户可以用更小的操作来组织他们的程序,这样也使这些操作更容易管理。

    展开全文
  • RDD弹性分布式数据集(Resilient Distributed Dataset)每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。RDD 支 持 两 种 类 型 的 操 作: 转 化 操 作(transformation) 和 行 动 操 作(action)...


    RDD


    弹性分布式数据集(Resilient Distributed Dataset)

    每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。

    RDD 支 持 两 种 类 型 的 操 作: 转 化 操 作(transformation) 和 行 动 操 作(action)
    • 转化操作会由一个 RDD 生成一个新的 RDD
    • 行动操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中

    Spark 只会惰性计算这些 RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。


    默认情况下,Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。
    如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来
    默认,RDD 缓存到内存而不是磁盘上


    在任何时候都能进行重算是我们为什么把 RDD 描述为“弹性”的原因。
    当保存 RDD 数据的一台机器失败时,Spark 还可以使用这种特性来重算出丢掉的分区,这一过程对用户是完全透明的。



    总的来说,每个 Spark 程序或 shell 会话都按如下方式工作。
    (1) 从外部数据创建出输入 RDD。
    • 创建 RDD 最简单的方式就是把程序中一个已有的集合传给 SparkContext 的 parallelize()方法
    • 从外部存储中读取数据来创建 RDD
      • 将文本文件读入为一个存储字符串的 RDD 的方法SparkContext.textFile()
    (2) 使用诸如 filter() 这样的转化操作对 RDD 进行转化,以定义新的 RDD。
    (3) 告诉 Spark 对需要被重用的中间结果 RDD 执行 persist() 操作。
    (4) 使用行动操作(例如 count() 和 first() 等)来触发一次并行计算,Spark 会对计算进行优化后再执行。



    通过转化操作,你从已有的 RDD 中派生出新的 RDD,Spark 会使用谱系图(lineage graph)来记录这些不同 RDD 之间的依赖关系
    • Spark 需要用这些信息来按需计算每个 RDD,
    • 也可以依靠谱系图在持久化的 RDD 丢失部分数据时恢复所丢失的数据



    惰性求值
    • 我们不应该把 RDD 看作存放着特定数据的数据集,而最好把每个 RDD 当作我们通过转化操作构建出来的、记录如何计算数据的指令列表
    • 把数据读取到 RDD 的操作也同样是惰性的


    传递函数时需要小心的一点是,当传递的函数中包含字段引用时(例如 self.field),会把整个对象发到工作节点上,
    • 这可能比你想传递的东西大得多。
    • 有时,如果传递的对象不可序列化,也会导致你的程序失败
    • 替代的方案是,只把你所需要的字段从对象中拿出来放到一个局部变量中,然后传递这个局部变量

    传递函数,涉及到的变量,必须可序列化


    Spark 中大部分常见的转化操作和行动操作

    特定数据类型的 RDD还支持一些附加操作,
    • 例如,数字类型的 RDD 支持统计型函数操作,
    • 而键值对形式的RDD 则支持诸如根据键聚合数据的键值对操作

    任意数据类型的 RDD都支持的通用转化操作:
    • filter、map
    • flatMap
      • 有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫作 flatMap()
      • 和 map() 类似,我们提供给 flatMap() 的函数被分别应用到了输入 RDD 的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器。
      • 输出的 RDD 倒不是由迭代器组成的。而是一个包含所有元素的 RDD(所谓flat)
    • 伪集合运算:
      • distinct、union、intersection、subtract(差集)、cartesian(笛卡尔积)
      • 尽管 RDD 本身不是严格意义上的集合,但它也支持许多数学上的集合操作,比如合并和相交操作
      • RDD 中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素
      • distinct,开销较大

    任意数据类型的 RDD都支持的通用行动操作:
    • 最常见的行动操作 reduce()。它接收一个函数作为参数,这个函数要操作两个 RDD 的元素类型的数据并返回一个同样类型的新元素
    • fold
      • 与reduce类似。。区别,是提供一个“初始值”来作为每个分区第一次调用时的结果。
    • aggregate操作就是将map、reduce合并提供结果
    把数据返回驱动器程序中
    • 最简单、最常见的操作是 collect(),它会将整个 RDD 的内容返回
    • take(n) 返回 RDD 中的 n 个元素,
    • 如果为数据定义了顺序,就可以使用 top() 从 RDD 中获取前几个元素
    • takeSample
    • count()、countByValue()



    在 Scala 中,将 RDD 转为有特定函数的 RDD(比如在 RDD[Double] 上进行数值操作)是由隐式转换来自动处理的
    • 这些隐式转换可以隐式地将一个 RDD 转为各种封装类,比如 DoubleRDDFunctions(数值数据的 RDD)和 PairRDDFunctions(键值对 RDD),这样我们就有了诸如 mean() 和variance() 之类的额外的函数


    我们可以为 RDD 选择不同的持久化级别
    • 在 Scala和 Java 中,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。
      • 堆外缓存是试验性功能,
    • org.apache.spark.storage.StorageLevel
    • 如有必要,可以通过在存储级别的末尾加上“_2”来把持久化数据存为两份
    • persist() 调用本身不会触发强制求值
    • 如果要缓存的数据太多,内存中放不下,Spark 会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除
      • 但是对于使用内存与磁盘的缓存级别的分区来说,被移除的分区都会写入磁盘
    • 最后,RDD 还有一个方法叫作 unpersist(),调用该方法可以手动把持久化的 RDD 从缓存中移除







    pair RDD 


    键值对 RDD 通常用来进行聚合计算。
    我们一般要先通过一些初始 ETL(抽取Extract、转化Transform、装载Load)操作来将数据转化为键值对形式。

    让用户控制键值对 RDD 在各节点上分布情况的高级特性:分区
    • 使用 PageRank 算法来演示分区的作用
    • 为分布式数据集(RDD)选择正确的分区方式和为本地数据集选择合适的数据结构很相似
    • 数据的分布都会极其明显地影响程序的性能表现


    从一个 RDD 中提取某些字段(例如代表事件时间、用户 ID 或者其他标识符的字段),并使用这些字段作为 pair RDD 操作中的键(类似Lists.uniqueMap)


    创建pair RDD

    当需要把一个普通的 RDD 转为 pair RDD 时,可以调用 map() 函数来实现,传递的函数需要返回键值对(二元组)


    Pair RDD的转化操作
    • 单个集合
      • 提供 reduceByKey() 方法,可以分别归约每个键对应的数据,
      • groupByKey() 对具有相同键的值进行分组
      • mapValues(func) 对 pair RDD 中的每个值应用一个函数而不改变键
        • flatMapValues
    • 两个集合
      • subtractByKey差集

    Pair RDD的转化操作
    • 聚合
      • reduceByKey、 foldByKey(带初始值的reduce)
        • reduceByKey() 与 reduce() 相当类似
        • foldByKey() 则与 fold() 相当类似
      • combineByKey
        • 大多数基于键聚合的函数都是用它实现的,为用户提供了更简单的接口
        • combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同
        • 如果这是一个新的元素,combineByKey() 会使用一个叫作 createCombiner() 的函数来创建那个键对应的累加器的初始值
          • 这一过程会在每个分区中第一次出现各个键时发生,而不是在整个 RDD 中第一次出现一个键时发生
        • 如果这是一个在处理当前分区之前已经遇到的键,它会使用 mergeValue() 函数将该键的累加器对应的当前值与这个新的值进行合并
        • 如果有两个或者更多的分区,就需要使用用户提供的 mergeCombiners() 函数将各个分区的结果进行合并
        • combineByKey的三个参数,分别是上述三个函数
      • 并行度调优
        • 每个 RDD 都有固定数目的分区,分区数决定了在 RDD 上执行操作时的并行度
        • 在执行聚合或分组操作时,可以要求 Spark 使用给定的分区数
        • Spark 始终尝试根据集群的大小推断出一个有意义的默认值
        • 本章讨论的大多数操作符都能接收第二个参数,这个参数用来指定分组结果或聚合结果的RDD 的分区数,
        • 有时,我们希望在除分组操作和聚合操作之外的操作中也能改变 RDD 的分区
          • Spark 提供了 repartition() 函数
          • 对数据进行重新分区是代价相对比较大的操作
          • Spark 中也有一个优化版的 repartition(),叫作 coalesce()。
          • rdd.partitions.size()查看 RDD 的分区数,确保调用 coalesce() 时将 RDD 合并到比现在的分区数更少的分区中
    • 数据分组
      • groupByKey  所得到的结果 RDD 类型会是[K, Iterable[V]]
      • groupBy() 可以用于未成对的数据上,也可以根据除键相同以外的条件进行分组
        • 可以接收一个函数,对源 RDD 中的每个元素使用该函数,将返回结果作为键再进行分组
      •  如果你发现自己写出了先使用 groupByKey() 然后再对值使用 reduce() 或者fold() 的代码,你很有可能可以通过使用一种根据键进行聚合的函数来更高效地实现同样的效果。。。后者更高效
      • 使用一个叫作 cogroup() 的函数对多个共享同一个键的 RDD 进行分组。
        • 对两个键的类型均为 K 而值的类型分别为 V 和 W 的 RDD 进行cogroup() 时,得到的结果 RDD 类型为 [(K, (Iterable[V], Iterable[W]))]
        • 如果其中的一个 RDD 对于另一个 RDD 中存在的某个键没有对应的记录,那么对应的迭代器则为空。
    • 连接
      • 连接方式多种多样:右外连接、左外连接、交叉连接以及内连接。
      • join() 方法,对两个 RDD 进行内连接,可以把两个 RDD 中键相同的元素组合到一起,合并为一个 RDD
        • cogroup将两个 RDD 中拥有相同键的数据分组到一起
        • 注意区别
      • rightOuterJoin、leftOuterJoin
        • 对两个 RDD 进行连接操作,类似sql 左右外连接(rightOuterJoin确保第一个RDD的所有元素存在,右侧没的补空)
    • 排序
      • sortByKey

    Pair RDD的行动操作
    • countByKey
    • collectAsMap
    • lookup(key)


    Optional 是 Google 的 Guava 库(https://github.com/google/guava)中的一部分,
    • 表示有可能缺失的值。可以调用 isPresent() 来看值是否存在,
    • 果数据存在,则可以调用 get() 来获得其中包含的对象实例

    scala.Option 对象,这是 Scala 中用来存放可能存在的对象的容器类。
    你可以对这个 Option 对象调用 isDefined() 来检查其中是否有值,调用 get() 来获取其中的值。


    对于每种操作(转化、行动),需要传入函数,此时,注意函数的输入、输出以及注意RDD的单个元素形式。






    RDD分区


    控制数据分布以获得最少的网络传输可以极大地提升整体性能

    只有当RDD多次在诸如连接这种基于键的操作中使用时,分区才会有帮助

    系统会根据一个针对键的函数(分区函数)对元素进行分组
    Spark 可以确保同一组的键出现在同一个节点上

    哈希分区
    • 例如,将一个 RDD 分成了 100 个分区,
    • 此时键的哈希值对100 取模的结果相同的记录会被放在一个节点上
    范围分区法


    在join操作时,对大型RDD进行分区,产生的效果
    • 默认情况下,连接操作会将两个数据集中的所有键的哈希值都求出来,
    • 将该哈希值相同的记录通过网络传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作


    • 在程序开始时,对 userData 表使用 partitionBy() 转化操作,将这张表转为哈希分区
    • 当调用 userData.join(events) 时,Spark 只会对 events 进行数据混洗操作,
    • 将 events 中特定 UserID 的记录发送到 userData 的对应分区所在的那台机器上


    转化操作,返回值总是一个新的 RDD
    RDD 一旦创建就无法修改。


    分区数目,它会控制之后对这个 RDD 进行进一步操作(比如连接操作)时有多少任务会并行执行。


    许多其他 Spark 操作会自动为结果 RDD 设定已知的分区方式信息
    • sortByKey() 和 groupByKey()会分别生成范围分区的 RDD 和哈希分区的 RDD
    • 诸如 map() 这样的操作会导致新的 RDD 失去父 RDD 的分区信息,因为这样的操作理论上可能会修改每条记录的键


    RDD 的 partitioner 属性获得RDD的分区方式


    分区操作之后,务必进行persist持久化。


    Spark 的许多操作都引入了将数据根据键跨节点进行混洗的过程。所有这些操作都会从数据分区中获益。
    • 连接:
      • join()、leftOuterJoin()、rightOuterJoin()
    • 分组
      • cogroup()、groupWith()、groupByKey()、
    • 聚合
      • reduceByKey()、combineByKey() 
    •  lookup()


    对于像 reduceByKey() 这样只作用于单个 RDD 的操作
    • 运行在未分区的 RDD 上的时候会导致每个键的所有对应值都在每台机器上进行本地计算,
    • 只需要把本地最终归约出的结果值从各工作节点传回主节点,所以原本的网络开销就不算大
    对于诸如 cogroup() 和join() 这样的二元操作,
    • 预先进行数据分区会导致其中至少一个 RDD不发生数据混洗
    一个 RDD 是通过 mapValues() 从另一个 RDD 中创建出来的,这两个RDD 就会拥有相同的键和分区方式
    • 那么跨节点的数据混洗就不会发生了。


    Spark 内部知道各操作会如何影响分区方式,并将会对数据进行分区的操作的结果 RDD 自动设置为对应的分区器
    • 如果你调用 join() 来连接两个 RDD;由于键相同的元素会被哈希到同一台机器上,Spark 知道输出结果也是哈希分区的

    转化操作的结果并不一定会按已知的分区方式分区,这时输出的 RDD 可能就会没有设置分区器。
    • 当你对一个哈希分区的键值对 RDD 调用 map() 时,由于传给 map()的函数理论上可以改变元素的键,因此结果就不会有固定的分区方式
    • mapValues() 和flatMapValues() 可以保证每个二元组的键保持不变


    这里列出了所有会为生成的结果 RDD 设好分区方式的操作
    • 分组:
      • cogroup()、groupWith()、groupByKey()
    • 连接
      • join()、leftOuterJoin()、rightOuterJoin()
    • 聚合
      • reduceByKey()、combineByKey()
    • partitionBy()、sort()、
    • mapValues()(如果父 RDD 有分区方式的话)、flatMapValues()(如果父 RDD 有分区方式的话),以及 filter()(如果父 RDD 有分区方式的话)。
    • 其他所有的操作生成的结果都不会存在特定的分区方式。



    对于二元操作,输出数据的分区方式取决于父 RDD 的分区方式
    • 默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。
    • 不过,如果其中的一个父 RDD 已经设置过分区方式,那么结果就会采用那种分区方式;
    • 如果两个父 RDD 都设置过分区方式,结果 RDD 会采用第一个父 RDD 的分区方式




    PageRank 是执行多次连接的一个迭代算法,因此它是 RDD 分区操作的一个很好的用例

    算法会维护两个数据集
    • 一个由 (pageID, linkList) 的元素组成,包含每个页面的相邻页面的列表
    • 另一个由 (pageID, rank) 元素组成,包含每个页面的当前排序值


    自定义分区方式

    要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法
    • numPartitions: Int:返回创建出来的分区数。
    • getPartition(key: Any): Int:返回给定键的分区编号(0 到 numPartitions-1)。
    • equals():Java 判断相等性的标准方法。。Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个RDD 的分区方式是否相同

    有一个问题需要注意,当你的算法依赖于 Java 的 hashCode() 方法时,这个方法有可能会返回负数。
    • 你需要十分谨慎,确保 getPartition() 永远返回一个非负数



    展开全文
  • 深入理解Spark RDD——RDD信息对象

    千次阅读 2019-08-16 11:20:53
    RDDInfo用于描述RDD的信息,RDDInfo提供的信息如下: id:RDD的id。 name:RDD的名称。 numPartitions:RDD的分区数量。 storageLevel:RDD的存储级别(即StorageLevel)。 parentIds:RDD的父亲RDD的id序列。...

    RDDInfo用于描述RDD的信息,RDDInfo提供的信息如下:

    • id:RDD的id。
    • name:RDD的名称。
    • numPartitions:RDD的分区数量。
    • storageLevel:RDD的存储级别(即StorageLevel)。
    • parentIds:RDD的父亲RDD的id序列。这说明一个RDD会有零到多个父RDD。
    • callSite:RDD的用户调用栈信息。
    • scope:RDD的操作范围。scope的类型为RDDOperationScope,每一个RDD都有一个RDDOperationScope。RDDOperationScope与Stage或Job之间并无特殊关系,一个RDDOperationScope可以存在于一个Stage内,也可以跨越多个Job。
    • numCachedPartitions:缓存的分区数量。
    • memSize:使用的内存大小。
    • diskSize:使用的磁盘大小。
    • externalBlockStoreSize:Block存储在外部的大小。

    RDDInfo还提供了以下方法:

    • isCached:是否已经缓存。isCached的实现见代码清单1

    代码清单1   isCached的实现

      def isCached: Boolean = (memSize + diskSize > 0) && numCachedPartitions > 0
    • compare:由于RDDInfo继承了Ordered,所以重写了compare方法以用于排序。compare的实现见代码清单2

    代码清单2   compare的实现

      override def compare(that: RDDInfo): Int = {
        this.id - that.id
      }
    

    此外,RDDInfo的伴生对象中定义了fromRdd方法,用于从RDD构建出对应的RDDInfo,其实现见代码清单3

    代码清单RDDInfo伴生对象的fromRdd方法

    private[spark] object RDDInfo {
      def fromRdd(rdd: RDD[_]): RDDInfo = {
        val rddName = Option(rdd.name).getOrElse(Utils.getFormattedClassName(rdd))
        val parentIds = rdd.dependencies.map(_.rdd.id)
        new RDDInfo(rdd.id, rddName, rdd.partitions.length,
          rdd.getStorageLevel, parentIds, rdd.creationSite.shortForm, rdd.scope)
      }
    }
    

    根据代码清单3fromRdd方法的执行步骤如下:

    1. 获取当前RDD的名称(即name属性)作为RDDInfo的name属性,如果RDD还没有名称,那么调用Utils工具类的getFormattedClassName方法(见附录A)生成RDDInfo的name属性。
    2. 获取当前RDD依赖的所有父RDD的身份标识作为RDDInfo的parentIds属性。
    3. 创建RDDInfo对象。
    展开全文
  • RDD基础

    万次阅读 2018-07-28 16:06:28
    RDD介绍 RDD概念 一个RDD就是一个分布式对象集合,本质上是一个只读的分区记录集合,每个RDD可以分成多个分区,每个分区就是一个数据集片段,并且一个RDD的不同分区可以被保存到集群中不同的节点上,从而可以在...
  • 在《深入理解Spark RDD——RDD依赖(构建DAG的关键)》一文,详细描述了RDD的宽窄依赖。RDD之间的依赖关系如果是Shuffle依赖,那么上游RDD该如何确定每个分区的输出将交由下游RDD的哪些分区呢?或者下游RDD的各个...
  • RDDRDD之间的操作

    2019-04-12 14:00:33
    1. 作用:对源RDD和参数RDD求并集后返回一个新的RDD 要求俩个RDD是相同类型 subtract (otherDataset) 案例 1. 作用:计算差的一种函数,去除两个RDD中相同的元素,不同的RDD将保留下来 2. 需求:创建两个RDD,...
  • RDD创建

    2019-07-12 21:22:01
    1、从文件系统中加载数据创建RDD (1)、从本地文件系统中加载数据 首先进入spark-shell交互式环境中,写入第一行代码: 执行如下的命令: 执行sc.textFile()方法后,Spark从本地文件word.txt中加载数据到...
  • spark RDD

    2018-08-02 00:10:10
    熟练使用RDD的算子完成计算 掌握RDD的原理 弹性分布式数据集RDD RDD概述 什么是RDD RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、...
  • RDD原理

    2018-04-25 15:43:53
    RDD概念 RDD的内部属性 一组分片(Partition),即数据集的基本组成单位 计算每个分片的函数 RDD之间的依赖关系 一个Partitioner,即RDD的分片函数 分区列表,存储存取每个Partition的优先位置(preferred location...
  • Spark RDD基础

    万次阅读 2018-11-30 14:17:24
    Spark RDD基础 IDEA 创建scala spark的Mvn项目:https://blog.csdn.net/u014646662/article/details/84618032 spark快速大数据分析.pdf下载:https://download.csdn.net/download/u014646662/10816588 弹性分布式...
  • RDD使用

    2018-04-25 15:44:16
    RDD操作 RDD的创建方式 RDD的两种操作算子 RDD操作 RDD的创建方式 从Hadoop文件系统(或与Hadoop兼容的其他持久化存储系统,如Hive、Cassandra、HBase)输入(例如HDFS)创建。 从父RDD转换得到新RDD...
  • RDD合并

    千次阅读 2017-10-29 12:30:44
    val rdd= rdd1.zipWithIndex().join(rdd2.zipWithIndex()).join(rdd3.zipWithIndex()).join(rdd4.zipWithIndex())
  • Spark RDD 练习

    千次阅读 2020-04-13 21:16:13
    1、创建一个1-10数组的RDD,将所有元素*2形成新的RDD scala> val rdd1 = sc.parallelize(1 to 10) rdd1: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[2] at parallelize at <console>:24 ...
  • RDD简介

    2018-03-30 23:34:13
    RDD:弹性分布式数据集(Resilient Distributed Dataset,简称 RDD)。RDD 其实就是分布式的元素集合。----Spark最根本的数据抽象。在 Spark 中,对数据的所有操作不外乎创建 RDD、转化已有 RDD 以及调用 RDD 操作...
  • RDD详解 什么是RDD 为什么要有RDD? 在许多迭代式算法(比如机器学习、图算法等)和交互式数据挖掘中,不同计算阶段之间会重用中间结果,即一个阶段的输出结果会作为下一个阶段的输入。但是,之前的MapReduce框架...
  • RDD(Resilient Distributed Datasets,弹性分布式数据集)代表可并行操作元素的不可变分区集合。对于Spark的初学者来说,这个概念会十分陌生。即便是对于一些有Spark使用经验的人,要想说清楚什么是RDD,以及为什么...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 31,042
精华内容 12,416
关键字:

rdd