精华内容
下载资源
问答
  • Spark学习--RDD编码

    2017-05-09 14:07:03
    RDD:弹性分布式数据集(ResilientDistributed Dataset),是Spark对数据的核心抽象。RDD其实是分布式的元素集合。当Spark对数据操作和转换时,会自动将RDD中的数据分发到集群,并将操作并行化执行。 Spark中的RDD是一...
  • hbase-rdd:Spark RDD从HBase读取,写入和删除
  • import org.apache.spark.storage.StorageLevel import org.apache.spark.{SparkConf, SparkContext} /** * TODO * * @author 徐磊 ... * @data2020/01/07 下午 05:03 */ object SparkWordCount { ...
  • 今天小编就为大家分享一篇spark rdd转dataframe 写入mysql的实例讲解,具有很好的参考价值,希望对大家有所帮助。一起跟随小编过来看看吧
  • 上一章讲了Spark提交作业的过程,这一章我们要讲RDD。简单的讲,RDD就是Spark的input,知道input是啥吧,就是输入的数据。RDD的全名是ResilientDistributedDataset,意思是容错的分布式数据集,每一个RDD都会有5个...
  • SparkRDD函数详解.doc

    2019-07-06 16:47:47
    Spark RDD函数详细解读,基本转换,键值转换,Action操作
  • RDD

    千次阅读 多人点赞 2019-10-10 08:46:28
    RDD<1> 概述一. 什么是RDD二. spark 编程模型1. DataSource2. SparkContext3. Diver(1)SparkConf(2)SparkEnv(3)DAGScheduler(4)TaskScheduler(5)ScheduleBackend二. RDD属性RDD的五个特征包含四个...

    RDD

    <1> 概述

    一. 什么是RDD

    RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,

    1. RDD是Spark中的抽象数据结构类型,Spark中最基本的数据抽象,实现了以操作本地集合的方式来操作分布式数据集的抽象实现
    2. 它代表一个不可变、可分区、里面的元素可并行计算的集合
    3. RDD具有数据流模型的特点:自动容错、位置感知性调度和可伸缩性
    4. RDD允许用户在执行多个查询时显式地将工作集缓存在内存中,后续的查询能够重用工作集,这极大地提升了查询速度。
    5. RDD是Spark最核心的东西,RDD必须是可序列化的。RDD可以cache到内存中,省去了MapReduce大量的磁盘IO操作
    6. 任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,RDD中的数据是分区存储的可以分布在不同的机器上,同时可以被并行处理
    7. 作用:Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。

    二. spark 编程模型

    在这里插入图片描述

    1. RDD被表示为对象;
    2. 通过对象上的方法调用来对RDD进行转换;
    3. 最后输出结果 或是 向存储系统保存数据;
    4. RDD转换算子被称为Transformation;
    5. 只有遇到Action算子,才会执行RDD的计算(懒执行)

    1. DataSource

    (1)定义:spark的数据来源
    (2)分类:

    1. DB(数据库)
    2. File System (文件系统)
    3. Socket (传输)
    4. Hdfs , HBase… …

    2. SparkContext

    (1)RDD是一个对象
    (2)是Spark的第一个类的入口,负责集群的交互
    (3)用于连接Spark集群,创建RDD,累加器,广播变量… …
    (4)Method : Transformation (转换)
    Action (动作)
    (5)Spark 的物理模型
    Driver 主要是对SparkContext进行配置、初始化以及关闭。初始化SparkContext是为了构建Spark应用程序的运行环境,在初始化SparkContext,要先导入一些Spark的类和隐式转换;在Executor部分运行完毕后,需要将SparkContext关闭。
    在这里插入图片描述
    在Executor中完成数据的处理,数据有以下几种:
    1、Scala集合数据(测试)
    2、文件系统、DB(SQL、NOSQL)的数据
    3、RDD
    4、网络
    Driver : 主节点 ,主要是对SparkContext进行配置、初始化以及关闭。初始化就是构建运行环境(导入类和隐式转换)
    RAM: 随机存取存储器(内存)
    如果Spark集群是服务器则Driver是客户端:driver发送tasks给工作节点,worker返回结果给driver

    3. Diver

    在这里插入图片描述
    在这里插入图片描述

    (1)SparkConf

    是Spark的配置类,配置项包括:master、appName、Jars、ExecutorEnv等等 (键值对形式存储)

    (2)SparkEnv

    a.利用Rpc协议 ---->心跳机制 传输数据
    b.维护Spark的执行环境,有:serializer、RpcEnv、Block Manager、内存管理等

    (3)DAGScheduler

    a.高层调度器
    b.将Job按照RDD的依赖关系划分成若干个TaskSet(任务集),也称为Stage(阶段,时期);再结合当前缓存情况及数据就近的原则,将Stage提交给TaskScheduler
    在这里插入图片描述

    (4)TaskScheduler

    负责任务调度资源的分配.

    (5)ScheduleBackend

    负责集群资源的获取和调度。

    二. RDD属性

    RDD的五个特征包含四个函数和一个属性:

    在这里插入图片描述

    1. def computer(split:Partition, context:TaskContext):Interator[T]
      // 对一个分片进行计算,得出一个可遍历的结果
    2. protected def getPartitions:Array[Partition]
      // 只计算一次
    3. protected def getDependencies:Seq[Dependency[_]] = deps
      //只计算一次,计算RDD对父RDD的依赖
    4. protected def getPreferredLocations(split:Partition):Seq[String] = Nil
      // 可选,制定优先位置,输入的参数是split分片,输出的结果是一组优点的节点位置
    5. @transient
      Val partitioner:Option[Partiotion] = None
      // 可选,分区的方法,针对分区的方法(哈希、RangePartitioner)类似MR中的接口控制key的去向(默认hashPartitioner)
      Spark 以一个弹性分布式数据集(RDD)的概念为中心,它是一个容错且可以执行并行操作的元素的集合。

    1. 一组分片(Partition)分片

    一组分片(Partition),即数据集的基本组成单位。每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目(设置的最大core数)。

    2. 一个计算每个分区的函数

    (2)一个计算每个分区的函数。RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据.Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。

    3. 依赖关系

    (3)RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系(窄依赖(有一对一),宽依赖(多对多))。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。

    4. RDD的分片函数

    (4)一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。

    5. 一个列表

    (5)一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

    6. 只读的

    (6)只读:RDD是只读的,要想改变RDD中的数据,只能创建一个新的RDD由一个RDD转换到另一个RDD,通过操作算子(map、filter、union、join、reduceByKey… …)实现,不再像MR那样只能写map和reduce了。

    三. RDD特点

    1、分区

    RDD逻辑上是分区的,每个分区的数据是抽象存在的,计算的时候会通过一个compute函数得到每个分区的数据。如果RDD是通过已有的文件系统构建,则compute函数是读取指定文件系统中的数据,如果RDD是通过其他RDD转换而来,则compute函数是执行转换逻辑将其他RDD的数据进行转换。
    在这里插入图片描述
    RDD分区

    1.1 原因:

    要处理的原始数据很大,会被分成很多个分区,分别保存在不同的节点上。

    1.2 目的:

    设置合理的并行度,提高数据处理的性能。

    1.3.原则:

    RDD分区的一个分区原则是:
    尽可能使得分区的个数,等于集群核心数目;
    尽可能使同一 RDD 不同分区内的记录的数量一致;

    1.4.如何分区?

    Spark包含两种数据分区方式:HashPartitioner(哈希分区)RangePartitioner(范围分区),数据分区方式只作用于<Key,Value>形式的数据

    1.4.1 HashPartitioner

    HashPartitioner采用哈希的方式对<Key,Value>键值对数据进行分区。其数据分区规则为 partitionId = Key.hashCode % numPartitions,其中partitionId代表该Key对应的键值对数据应当分配到的Partition标识,Key.hashCode表示该Key的哈希值,numPartitions表示包含的Partition个数。

    1.4.2 RangePartitioner

    Spark引入RangePartitioner的目的是为了解决HashPartitioner所带来的分区倾斜问题,也即分区中包含的数据量不均衡问题。HashPartitioner采用哈希的方式将同一类型的Key分配到同一个Partition中,因此当某一或某几种类型数据量较多时,就会造成若干Partition中包含的数据过大问题,而在Job执行过程中,一个Partition对应一个Task,此时就会使得某几个Task运行过慢。RangePartitioner基于抽样的思想来对数据进行分区。
    先Hash数据倾斜时在sample重新采样

    1.5 .分区器

    HashPartitioner & RangePartitioner

    1.5.1 作用

    决定了RDD中分区的个数;
    RDD中每条数据经过Shuffle过程属于哪个分区;
    reduce的个数;
    注意:

    1. 只有Key-Value类型的RDD才有分区器,非Key-Value类型的RDD分区器的值是None。.
    2. 如果是对键操作,则子RDD不再继承父RDD的分区器,但是分区数会继承.
    1.5.2 HashPartitioner (默认)

    hashCode%分区数=余数—>决定在那个区
    该分区方法保证key相同的数据出现在同一个分区中。
    易发生数据倾斜.

    1.5.3 RangePartitioner (sort之类)

    定义:
    简单的说就是将一定范围内的数映射到某一个分区内
    sample采样抽样确定边界

    1.5.4 什么操作会导致子RDD失去父RDD的分区方式?

    如果是对键操作,则子RDD不再继承父RDD的分区器,但是分区数会继承.
    使用map()算子生成的RDD,由于该转换操作理论上可能会改变元素的键(Spark并不会去判断是否真的改变了键),所以不再继承父RDD的分区器

    1.5.5 多元RDD的分区操作后,子RDD如何继承分区信息?

    对于两个或多个RDD的操作,生成的新的RDD,其分区方式,取决于父RDD的分区方式。如果两个父RDD都设置过分区方式,则会选择第一个父RDD的分区方式。

    1.6 函数操作
    1.6.1. 查看分区方式

    rdd.partitioner

    1.6.2. 查看分区个数

    rdd.getNumPartitions == rdd.partitions.size

    1.6.3. 查看分区存储规律

    getElement(rdd)

    1.6.4. 获得默认分区数

    rdd.defsultParallelism

    1.6.5. 重新定义partitioner主动使用分区

    用户可通过partitionBy主动使用分区器,通过partitions参数指定想要分区的数量。
    可重新定义partitioner主动使用分区
    val rdd1= rdd.partitionBy(new arg.apache.spark.HashPartitioner(2))

    1.6.6. 重新设置分区coalesce() 和 repartition()
    coalesce() 和 repartition()
    val rdd=sc.makeRDD(arr,9)	
    rdd.coalesce(num,false)=rdd.coalesce(num)
    注意:num的数值必须小于原分区的数量(因为是false)
    rdd.repartition(num)==rdd.coalesce(num,true)
    注意: num可大可小于原分区的数值
    

    2、只读

    RDD是只读的,要想改变RDD中的数据,只能在现有的RDD基础上创建新的RDD;由一个RDD转换到另一个RDD,可以通过丰富的操作算子(map、filter、union、join、reduceByKey… …)实现,不再像MR那样只能写map和reduce了。
    在这里插入图片描述

    3、依赖

    RDDs通过操作算子进行转换,转换得到的新RDD包含了从其他RDDs衍生所必需的信息,RDDs之间维护着这种血缘关系(lineage),也称之为依赖。依赖包括两种,一种是窄依赖,RDDs之间分区是一一对应的;另一种是宽依赖,下游RDD的每个分区与上游RDD(也称之为父RDD)的每个分区都有关,是多对多的关系。
    在这里插入图片描述

    3.1.窄依赖:

    每个父RDD的一个Partition最多被子RDD的一个Partition所使用,(1:1 n:1)

    3.2.宽依赖:

    一个父RDD的Partition会被多个或所有子RDD的Partition所使用,(1:n n:n)

    3.3.作用:

    其一用来解决数据容错;
    其二用来划分stage。

    3.4.stage的划分
    (1)划分:

    action触发job,依照RDD依赖关系切分若干TaskSet即(stage) ; task任务处理的最小单元
    划分Stage:
    1.从后向前遍历,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到Stage中;
    2.每个Stage里面的Task的数量是由该Stage中最后一个RDD的Partition数量决定的
    3.最后一个Stage里面的任务的类型是ResultTask,前面所有其他Stage里面的任务类型都是ShuffleMapTask
    4.代表当前Stage的算子一定是该Stage的最后一个计算步骤

    (2)用宽依赖划分stage原因:

    相比于宽依赖,窄依赖对优化很有利,

    1. 宽依赖对应着shuffle操作,数据传输时宽依赖对应多个节点的传输
    2. 当RDD分区丢失时(某个节点故障),spark会对数据进行重算时,宽依赖重算的效用不仅在于算的多,还在于有多少是冗余的计算
      在这里插入图片描述
      b1分区丢失,则需要重新计算a1,a2和a3,这就产生了冗余计算
      (a1,a2,a3中对应b2的数据)
    3. 窄依赖能够更有效地进行失效节点的恢复,即只需重新计算丢失RDD分区的父分区,而且不同节点之间可以并行计算;
    4. 窄依赖允许在一个集群节点上以流水线的方式(pipeline)计算所有父分区
    3.5. 查看依赖
    3.5.1. 查看依赖长度

    rdd1.dependencies.size

    3.5.2. 查看所有父RDD

    rdd1.dependencies.collect

    3.5.3. 查看第一个父RDD数据并转换格式

    rdd1.dependencies(0).rdd.collect.asInstanceOf[Array[Int]]

    4、持久化(缓存)

    可以控制存储级别(内存、磁盘等)来进行持久化。如果在应用程序中多次使用同一个RDD,可以将该RDD缓存起来,该RDD只有在第一次计算的时候会根据血缘关系得到分区的数据,在后续其他地方用到该RDD的时候,会直接从缓存处取而不用再根据血缘关系计算,这样就加速后期的重用。

    4.1. cache持久化

    cache实际上是persist的一种简化方式,是一种懒执行的,执行action类算子才会触发,cahce后返回值要赋值给一个变量,下一个job直接基于变量进行操作。

    4.2. persist:

    可以指定持久化的级别。最常用的是MEMORY_ONLY,MEMORY_AND_DISK,MEMORY_ONLY_SER,MEMORY_AND_DISK_SER。”_2”表示有副本数。

    4.3. cache和persist

    cache和persist算子后不能立即紧跟action算子。因为rdd.cache().count() 返回的不是持久化的RDD,而是一个数值了。

    4.4. checkpoint

    checkpoint将RDD持久化到磁盘,还可以切断RDD之间的依赖关系。
    checkpoint 的执行原理:
    (1) 当RDD的job执行完毕后,会从finalRDD从后往前回溯。
    (2) 当回溯到某一个RDD调用了checkpoint方法,会对当前的RDD做一个标记。
    (3) Spark框架会自动启动一个新的job,重新计算这个RDD的数据,将数据持久化到HDFS上。
    优化:
    对RDD执行checkpoint之前,最好对这个RDD先执行cache,这样新启动的job只需要将内存中的数据拷贝到HDFS上就可以,省去了重新计算这一步。

    四. 持久化

    1.定义:

    spark中控制算子也是懒执行的,需要Action算子触发才能执行,主要是为了对数据进行缓存。可以构建迭代式算法和快速交互式查询.

    2.容错机制

    当存储于内存的数据由于内存不足而被删除时,RDD的缓存的容错机制执行,丢失的数据会被重算。RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition.

    2.1.原因

    为了防止计算失败后从头开始计算造成的大量开销,RDD会checkpoint计算过程的信息,这样作业失败后从checkpoint点重新计算即可,提高效率过对RDD启动Checkpoint机制来实现容错和高可用;

    2.2.checkpoint使用场景
    1. 会被重复使用的(但是)不能太大的RDD需要persist或者cache 。
    2. 运算时间很长或运算量太大才能得到的 RDD,computing chain 过长或依赖其他 RDD 很多的 RDD。
    3. 针对整个RDD计算链条中特别需要数据持久化的环节(后面会反复使用当前环节的RDD)使用checkpoint
    2.3.checkpoint和cache,persist的区别
    1. 缓存后checkpoint会斩断RDD的依赖,cache,persist依然有依赖
    2. 磁盘或内存可能被清理,但是checkpoint的数据通常保存到hdfs上,放在了高容错文件系统。
    3. rdd.persist() 将 RDD 的 partition 持久化到磁盘,但该 partition 由 blockManager 管理。一旦 driver program 执行结束,也就是 executor 所在进程stop,blockManager 也会 stop,被 cache 到磁盘上的 RDD 也会被清空(整个 blockManager 使用的 local 文件夹被删除)。而 checkpoint 将 RDD 持久化到 HDFS 或本地文件夹,如果不被手动 remove 掉,是一直存在的。
    2.4.checkpoint操作
    1. 创建RDD
      val cktest = sc.parallelize(1 to 100000)
    2. 设置存储路径(HDFS或者本地路径)
      sc.setCheckpointDir("/tmp/checkpoint")–>hdfs的
    3. 执行checkpoint
      cktest2.checkpoint
      // checkpoint是lazy操作 检查是否缓存
      cktest2.isCheckpointed
      // 再次查看RDD的依赖关系
      cktest2.dependencies.collect
      //查看RDD所依赖的checkpoint文件
      cktest2.getCheckpointFile

    3.RDD控制算子

    (1) cache :

    每一个节点都将把计算的分片结果缓存到内存rdd.cache()==rdd.persist(MERMORY_ONLY)

    (2) persist

    ①指定参数缓存在缓存或磁盘
    ②rdd.unpersist() 把持久化的RDD从缓存中移除;

    (3) checkpoint

    缓存在hdfs和磁盘,并切断依赖
    开头要写sc.setCheckPointDir()
    在执行checkpoint()

    4.控制算子的使用

    1. 如果多个动作需要用到某个 RDD,而它的计算代价又很高,使用缓存
    2. 注意:当进行了RDD0→RDD1→RDD2的计算作业,计算结束时,RDD1就已经缓存在系统中了,如果在进行RDD0 →RDD1→RDD3的计算作业时,只须进行RDD1→RDD3的计算,RDD1已经在缓存中0->1的转换不会重复进行
    3. 使用persist()方法对一个RDD标记为持久化,并不会马上计算生成RDD并把它持久化,而是要等到遇到第一个行动操作触发真正计算以后,才会把计算结果进行持久化

    5.pesist

    MEMORY_ONLY将RDD 作为反序列化的的对象存储JVM中。如果RDD不能被内存装下,一些分区将不会被缓存,并且在需要的时候被重新计算,按照LRU(Least recently used,最近最少使用)原则替换缓存中的内容。默认的缓存级别 等于cache()
    MEMORY_AND_DISK将RDD作为反序列化的的对象存储在JVM中。如果RDD不能被与内存装下,超出的分区将被保存在硬盘上,并且在需要时被读取(优先存取内存)
    MEMORY_ONLY_SER将RDD作为序列化的的对象进行存储(每一分区占用一个字节数组)。通常来说,这比将对象反序列化的空间利用率更高,尤其当使用fast serializer,但在读取时会比较占用CPU
    MEMORY_AND_DISK_SER与MEMORY_ONLY_SER 相似,但是把超出内存的分区将存储在硬盘上而不是在每次需要的时候重新计算
    DISK_ONLY只将RDD分区存储在硬盘上
    DISK_ONLY_2 、等带2的与上述的存储级别一样,但是将每一个分区都复制到集群的两个结点上

    6.如何使用缓存

    1. 如果RDD的数据可以很好的兼容默认存储级别(MEMORY_ONLY),那么优先使用它,这是CPU工作最为高效的一种方式,可以很好地提高运行速度;
    2. 如果(1)不能满足,则尝试使用MEMORY_ONLY_SER,且选择一种快速的序列化工具,也可以达到一种不错的效果;
    3. 由于数据量大,一般不会持久化到磁盘,除非计算是非常“昂贵”的或者计算过程会过滤掉大量数据,因为重新计算一个分区数据的速度可能要高于从磁盘读取一个分区数据的速度;
    4. 备份:如果需要快速的失败恢复机制,则使用备份的存储级别,如MEMORY_ONLY_2、MEMORY_AND_DISK_2;虽然所有的存储级别都可以通过重新计算丢失的数据实现容错,但是缓存机制使得大部分情况下应用无需中断,即数据丢失情况下,直接使用缓存数据,而不需要重新计算数据的过程;
    5. 如果处于大内存或多应用的场景下,OFF_HEAP可以带来以下的好处:它允许Spark Executors可以共享Tachyon的内存数据;它很大程序上减少JVM垃圾回收带来的性能开销;Spark Executors故障不会导致数据丢失。
      OFF_HEAP与MEMORY_ONLY_SER类似,但将数据存储在 堆外内存中。这需要启用堆外内存。

    五. 创建RDD

    1. 合并行化创建 (通过scala集合创建)

    通过集合并行化方式创建RDD,适用于本地测试,做实验
    scala中的本地集合 -->Spark RDD
    spark-shell --master spark://master:7077

    1. parallelize(数据集)方法
      scala> val arr = Array(1,2,3,4,5)
      scala> val rdd = sc.parallelize(arr)
    2. makeRDD(数据集)方法
      scala> val arr = Array(1,2,3,4,5)
      scala> val rdd = sc.makeRDD(arr)
      scala> rdd.collect
      res0: Array[Int] = Array(1, 2, 3, 4, 5)

    2. 文件系统 , 比如 HDFS

    1. 读取HDFS文件系统(默认)
      val rdd2 = sc.textFile(“hdfs://master:9000/words.txt”)
      val line=sc.textFile(“words.txt”)
      这两个是相等的
    2. 读取本地文件
      val rdd2 = sc.textFile(“file:///root/words.txt”)
      scala> val rdd2 = sc.textFile(“file:root/word.txt”)
      scala> rdd2.collect
      res2: Array[String] = Array(hadoop hbase java, hbase java spark, java, hadoop hive hive, hive hbase)

    3. 从父RDD转换成新的子RDD

    调用 Transformation 类的方法,生成新的 RDD只要调用transformation类的算子,都会生成一个新的RDD。RDD中的数据类型,由传入给算子的函数的返回值类型决定.
    注意:action类的算子,不会生成新的 RDD
    scala> rdd.collect
    res3: Array[Int] = Array(1, 2, 3, 4, 5)
    scala> val rdd = sc.parallelize(arr)
    scala> val rdd2 = rdd.map(_*100)
    scala> rdd2.collect
    res4: Array[Int] = Array(100, 200, 300, 400, 500)
    使用 rdd.partitions.size 查看分区数量
    scala> rdd.partitions.size
    res7: Int = 4
    scala> rdd2.partitions.size
    res8: Int = 4

    RDD操作(惰性求值)

    在这里插入图片描述
    RDD是惰性求值的,整个转换过程只是记录了转换的轨迹,没有发生真正的计算,只有遇到行动操作时,才会发生真正的计算,开始从血缘关系源头开始,进行物理的转换操作.

    <2> RDD算子

    1. 常见算子

    这是一个全的RDD算子案例
    http://homepage.cs.latrobe.edu.au/zhe/ZhenHeSparkRDDAPIExamples.html

    动作含义
    reduce(func)通过func函数对RDD中的所有元素处理,传入函数必须满足交换律和结合律(传入两个参数输入返回一个值)
    collect()在驱动程序中,以数组的形式返回数据集的所有元素
    count()返回RDD的元素个数
    first()返回RDD的第一个元素(类似于take(1))
    take(n)返回一个由数据集的前n个元素组成的数组
    takeSample(withReplacement,num, [seed])返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子初始值
    saveAsTextFile(path)将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它封装为文件中的文本
    saveAsSequenceFile(path)将数据集中的元素以二进制的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
    countByKey()针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
    foreach(func)在数据集的每一个元素上,运行函数func进行更新。

    2. 打印元素

    一般会采用语句rdd.foreach(println)

    1. 本地模式(local)
      会打印出一个RDD中的所有元素
    2. 集群模式
      1)Driver Program中的stdout是不会显示打印语句的这些输出内容的;在worker节点上执行打印语句是输出到worker节点的stdout中,而不是输出到任务控制节点Driver Program中.需要到worker节点日志查看
      2)rdd.take(num).foreach(println);
      3)rdd.collect().foreach(println)

    3. Pair RDD

    3.1.什么是Pair RDD

    键值对(K,V),RDD的数据集,RDD的键值对操作

    3.2.创建Pair RDD

    (1)
    val list = List(“Hadoop”,“Spark”,“Hive”,“Spark”)
    val rdd = sc.parallelize(list)
    val pairRDD = rdd.map(word => (word,1))
    val pairRDD = rdd.map((,1))
    (2)
    val lines = sc.textFile(“file://+Path”)
    val pairRDD = lines.flatMap(
    .split(" ")).map((_,1))

    3.3.常见操作

    1)reduceByKey和 groupByKey 区别:

    reduceByKey:
    在这里插入图片描述

    用于对每个key对应的多个value进行merge操作,它能够在本地先进行merge操作
    每个分区先进行merge操作然后再所有分区进行汇总merge操作,reduceByKey的效率高

    groupByKey:
    在这里插入图片描述
    对每个key进行操作,但只生成一个sequence。
    所有分区进行汇总再merge操作,集群节点之间的开销很大

    2)常见算子

    常用的Transformation:
    转换含义
    map(func)返回一个新的RDD,该RDD由每一个输入元素经过func函数转换后组成
    filter(func)返回一个新的RDD,该RDD由经过func函数计算后返回值为true的输入元素组成
    flatMap(func)类似于map,但是每一个输入元素可以被映射为0或多个输出元素(所以func应该返回一个序列,而不是单一元素)
    mapPartitions(func)类似于map,但独立地在RDD的每一个分片上运行,因此在类型为T的RDD上运行时,func的函数类型必须是Iterator[T] => Iterator[U]
    mapPartitionsWithIndex(func)类似于mapPartitions,但func带有一个整数参数表示分片的索引值,因此在类型为T的RDD上运行时,func的函数类型必须是(Int, Interator[T]) => Iterator[U]sample(withReplacement, fraction, seed)根据fraction指定的比例对数据进行采样,可以选择是否使用随机数进行替换,seed用于指定随机数生成器种子
    union(otherDataset)对源RDD和参数RDD求并集后返回一个新的RDD
    intersection(otherDataset)对源RDD和参数RDD求交集后返回一个新的RDD
    distinct([numTasks]))对源RDD进行去重后返回一个新的RDD
    groupByKey([numTasks])在一个(K,V)的RDD上调用,返回一个(K, Iterator[V])的RDD
    reduceByKey(func, [numTasks])在一个(K,V)的RDD上调用,返回一个(K,V)的RDD,使用指定的reduce函数,将相同key的值聚合到一起,与groupByKey类似,reduce任务的个数可以通过第二个可选的参数来设置
    aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])先按分区聚合 再总的聚合 每次要跟初始值交流 例如:aggregateByKey(0)(+,+) 对k/y的RDD进行操作
    sortByKey([ascending], [numTasks])在一个(K,V)的RDD上调用,K必须实现Ordered接口,返回一个按照key进行排序的(K,V)的RDD
    sortBy(func,[ascending], [numTasks])与sortByKey类似,但是更灵活 第一个参数是根据什么排序 第二个是怎么排序 false倒序 第三个排序后分区数 默认与原RDD一样
    join(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调用,返回一个相同key对应的所有元素对在一起的(K,(V,W))的RDD 相当于内连接(求交集)
    cogroup(otherDataset, [numTasks])在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD
    cartesian(otherDataset)两个RDD的笛卡尔积 的成很多个K/V
    pipe(command, [envVars])调用外部程序
    coalesce(numPartitions)重新分区 第一个参数是要分多少区,第二个参数是否shuffle 默认false 少分区变多分区 true 多分区变少分区 false
    repartition(numPartitions)重新分区 必须shuffle 参数是要分多少区 少变多
    repartitionAndSortWithinPartitions(partitioner)重新分区+排序 比先分区再排序效率高 对K/V的RDD进行操作
    foldByKey(zeroValue)(seqOp)该函数用于K/V做折叠,合并处理 ,与aggregate类似 第一个括号的参数应用于每个V值 第二括号函数是聚合例如:+
    combineByKey合并相同的key的值 rdd1.combineByKey(x => x, (a: Int, b: Int) => a + b, (m: Int, n: Int) => m + n)
    partitionBy(partitioner)对RDD进行分区 partitioner是分区器 例如new HashPartition(2)
    cache、persistRDD缓存,可以避免重复计算从而减少时间,区别:cache内部调用了persist算子,cache默认就一个缓存级别MEMORY-ONLY ,而persist则可以选择缓存级别
    Subtract(rdd)返回前rdd元素不在后rdd的rdd
    leftOuterJoinleftOuterJoin类似于SQL中的左外关联left outer join,返回结果以前面的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可。
    rightOuterJoinrightOuterJoin类似于SQL中的有外关联right outer join,返回结果以参数中的RDD为主,关联不上的记录为空。只能用于两个RDD之间的关联,如果要多个RDD关联,多关联几次即可
    subtractByKeysubstractByKey和基本转换操作中的subtract类似只不过这里是针对K的,返回在主RDD中出现,并且不在otherRDD中出现的元素
    常用的Action:

    触发代码的运行,我们一段spark代码里面至少需要有一个action操作。

    动作含义
    reduce(func)通过func函数聚集RDD中的所有元素,这个功能必须是课交换且可并联的
    collect()在驱动程序中,以数组的形式返回数据集的所有元素
    count()返回RDD的元素个数
    first()返回RDD的第一个元素(类似于take(1))
    take(n)返回一个由数据集的前n个元素组成的数组
    takeSample(withReplacement,num, [seed])返回一个数组,该数组由从数据集中随机采样的num个元素组成,可以选择是否用随机数替换不足的部分,seed用于指定随机数生成器种子
    takeOrdered(n, [ordering])
    saveAsTextFile(path)将数据集的元素以textfile的形式保存到HDFS文件系统或者其他支持的文件系统,对于每个元素,Spark将会调用toString方法,将它装换为文件中的文本
    saveAsSequenceFile(path)将数据集中的元素以Hadoop sequencefile的格式保存到指定的目录下,可以使HDFS或者其他Hadoop支持的文件系统。
    saveAsObjectFile(path)
    countByKey()针对(K,V)类型的RDD,返回一个(K,Int)的map,表示每一个key对应的元素个数。
    foreach(func)在数据集的每一个元素上,运行函数func进行更新。
    aggregate先对分区进行操作,在总体操作

    4. 共享变量

    4.1. 定义

    Spark在集群的多个不同节点的多个任务上并行运行一个函数时,把一个变量设为共享变量,就可以在不同节点使用其变量的值. 注意: 共享变量只读

    4.2. 分类

    广播变量(broadcast variables)
    累加器(accumulators)

    4.3. 广播变量(broadcast variables)

    Spark的Action操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,Spark都会自动进行广播

    4.3.1 设置

    val broadcastVar = sc.broadcast(Array(1, 2, 3))

    4.3.2 获取

    broadcastVar.value

    4.4. 累加器 ( accumulators )

    sc.longAccumulator()
    sc.doubleAccumulator()

    4.案例

    1.集合的交并差

    交集: 数据集1.intersection(数据集2)
    并集: 数据集1.union(数据集2)
    差集: 数据集1.subtract(数据集2)

    2.corgroup

    cogroup(otherDataset, [numTasks])
    在类型为(K,V)和(K,W)的RDD上调用,返回一个(K,(Iterable,Iterable))类型的RDD, 也就是相同K的数据集不同为空(k,(,v2))或者(k,(v1,)),相同的如统join类似(k,(v1,v2)) ,numTasks为并发的任务数
    在这里插入图片描述

    3.最值

    // 同时找最大、最小值。缺点:不适合在大数据量情况下运行
    val rdd1 = sc.makeRDD((1 to 20).toArray)
    rdd1.map((1,_)).groupByKey()
    .map(x=>(x._2.min, x._2.max)).collect
    // 对于仅有一个元素(基本数据类型)的RDD有更简便的方法
    rdd1.stats / rdd1.max / rdd1.min / rdd1.mean

    4. 排序

    4.1.单一数组的排序
    val out=sc.textFile().filter(_.trim.size!=0)		//空行排除
    		.map(_.split(“ ”).trim.toInt)			//拆分并去除首尾空格
    		.sortBy(_,false)						//排序方式
    		.take(num)							//取前几
    		.foreach(println)
    
    4.2.分组元组排序
    val out=sc.textFile()
    		.filter(_.trim.size!=0)
    		.map(_.split(“ “).trim)
    		.map(x=>(x(0),x(1)))	//变为元组
    		.groupByKey()				//生成(k,(v1,v2..))
    		//排序
    		.map(x=>(x._1,x._2.toList.sorted.reverse.take(3)))
    		.collect	
    		
    val rdd1 = sc.textFile("").map(line => {
    		  val value = line.split(" ")
    		  (value(0), value(1).toDouble)
    		}).groupByKey()	
    		.map(x=>(x._1,x._2.toList.sorted.reverse.take(3)))
    		.collect	
    

    这两个相同只不过map拆开而已

    4.3.求平均值

    val ar=Array((“spark”,2),(“hadoop”,6),(“hadoop”,4),(“spark”,6))
    val rdd=sc.makeRDD(ar)

    1. 使用(groupByKey)
      rdd.groupBykey()
      .map(x=>(x._1,x._2.sum.toDouble/x._2.size))
      .collect
    2. 使用(reduceByKey)
      mapValues增加标志位并求和来查看相同的key的个数
      rdd.mapValues(x=>(x,1))
      .reduceByKey((x,y)=>(x._1+y._1,x._2+y._2))
      .mapValues(x=>(x._1.toDouble/x._2))
    3. 使用(foldByKey)
      和reduceByKey类似
      rdd.mapValues((_,1))
      .foldByKey((0,0))((x,y)=>(x._1+y._1,x._2+y._2))
      .mapValues(x=>(x._1.toDouble/x._2))
    4.4.求多列数据的平均值
    1. 生成一个RDD,每行包含2个数字,要计算这两列数据的平均值

       import scala.util.Random
       val random = new Random(100)
       val arr =(1 to 100).map(x=>(random.nextInt(1000), random.nextInt(1000))).toArray
       val rdd = sc.makeRDD(arr)
      
    2. 使用stat分别计算,算法简单,效率低)

       val (firstAvg,secondAvg)=(rdd.map(_._1).stats.mean,
       rdd.map(_._2).stats.mean)
       如果有一列就得计算一次效率低
      
    3. (利用了Vector,高效。需要借助外部的数据结构):
      Vector数组相加里面相同下标的元素相加(仅有它是可以的),但也要加上标志位好知道个数

       import breeze.linalg.Vector
       val (sums, count)=
       rdd.map(x=>((Vector(x._1.toDouble, x._2.toDouble), 1.0)))
       .reduce((x,y) => (x._1+y._1,x._2 + y._2))
       val means = sums/count
      
    4. (自定义方法实现:数组累加):

       不使用Vector数组的相加,使用常用数组拉链的方法,
       再map映射成一个内部元素相加和的数组
       但也要加标志位1看个数
       def arrAdd(x: Array[Int], y:Array[Int]): Array[Int] = x.zip(y).map(x=>x._1+x._2)
       val (sums, count) =
        rdd.map(x=>(Array(x._1, x._2), 1)) //加标志位
       .reduce((x,y) => (arrAdd(x._1, y._1), x._2+y._2)) 
       //数组执行方法,数量相加
       val means = sums.map(_.toDouble/count)
      
    4.5.二次排序
    1. 数据来源

       import scala.util.Random
       val random = new Random(100)
       val arr = (1 to 10000).map(x=>(random.nextInt(100), random.nextInt(100)))
      
    2. 自定义函数继承Ordered接口或实现Orderring方法

       case class MyObject(x:Int, y:Int) extends Ordered[MyObject]{
         def compare(other:MyObject):Int = {
           if (x - other.x !=0) x - other.x   else y - other.y  }}
      
    3. 调用方法排序

       val rdd = sc.makeRDD(arr)
       val rdd1 = rdd.map(pair=>(new MyObject(pair._1, pair._2),pair))
       val sorted = rdd1.sortByKey(false)
       注意:二次排序后的结果写成文件,文件内及文件间的数据是有顺序的。但是文件间的顺序是不保证的!!!
       val lines = sc.textFile(" ") 
       val rdd1  = lines.map(line=>(new MyObject(line.split(" ")(0).toInt, line.split(" ")(1).toInt),line) )  //元组((k0,k1),k)
       val sorted = rdd1.sortByKey(false)       
       val result = sorted.map(sortedLine =>sortedLine._2)  //要k1     
       result.collect().foreach (println)
      

    Java CountWord

    1. 创建SparkConf
      SparkConf conf =new SparkConf().setAppName().setMaster();
    2. 创建JavaSparkContext(这是Java)
      JavaSparkContext sc=new JavaSparkContext(conf);
    3. 创建RDD
    4. JavaRDD<类型> 变量=sc.textFile()
      JavaRDD lines=sc.textFile();
      注意:拆分行时读取的行成为字符串数组(使用Arrays.asList()转成列表操作切分)数组无法拆分
      JavaRDD words=lines.
      flatMap(line->Arrays.asList(line.split(“ ”)).iterator());
    5. 变成JavaPairRDD<KeyType,ValueType> 键值对使用
      mapToPair()方法和new Tuple2(s,1)
      JavaPairRDD<String,Integer> pairs=
      words.mapToPair(s->new Tuple2(s,1))
    6. 计算 注意:在Java中没有占位符 (_) 这个说法
      JavaPairRDD<String,Integer> count=
      pairs.reduceByKey((a,b)->a+b)
    7. 结果存储到List中foreach遍历
      List(Tuple2<String,integer>) list=count.collect();
      for(Tuple2<String,integer> tuple : list){
      System.out.println( tuple._1+””+tuple._2);
      }

    在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述在这里插入图片描述

    展开全文
  • spark实验5 rdd编程2.doc

    2021-01-10 14:35:46
    spark实验5 rdd编程2.doc
  • Spark自定义RDD从HDFS读取数据,实现和sc.textFile相同功能,代码测试通过,可以根据需求避免数据源数据倾斜
  • RDD是什么? RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在...
  • 键值对 两个Pair RDD 转化操作 val conf = new SparkConf().setMaster(local).setAppName(PairRDD) val sc = new SparkContext(conf) val lines = sc.parallelize(List((1, 2), (3, 4), (3, 6))) val lines_1 = ...
  • 包括spara rdd api,dataframe action操作、查询操作、join操作,dataframe rdd dataset 相互转换以及spark sql。
  • 该文档是本人对实时流数据分析的scala代码的详细解析,具有一定RDD了解,scala基础
  • RDD 随便推理RDD复制
  • RDDRDD因果推断-源码

    2021-02-13 07:13:59
    RDD RDD因果推论 包含汉森复制
  • spark RDD 论文 中文版

    2018-11-14 10:05:05
    spark RDD论文中文版
  • RDD-源码

    2021-03-06 10:26:28
    RDD 因果推理复制分配1
  • Spark-Kafka-RDD Spark-Kafka-RDD是一个scala库,让Kafka成为Spark平台的数据源。 请注意,Spark-Kafka-RDD 从 Kafka 主题和分区中获取给定的偏移范围作为单个 RDD ( KafkaRDD ) 返回给 Spark 驱动程序,而不是生成...
  • sparkRDD函数大全

    2019-02-28 20:20:28
    spark rdd函数大全。spark rdd操作为core操作,虽然后续版本主要以dataset来操作,但是rdd操作也是不可忽略的一部分。
  • RDD编程基础

    千次阅读 2021-12-16 20:40:10
    lines = sc.textFile("file:/l/usr/local/spark/mycode/rdd/word.txt") >>>linesWithSpark = lines.filter(lambda line: "Spark" in line) >>> linesWithSpark.foreach(print) S

    一、创建RDD

    两种方式:

    1.从文件系统中加载数据创建RDD

    Spark采用textFile()方法来从文件系统中加载数据创建RDD,该方法把文件的URI作为参数,这个URI可以是:

    • 本地文件系统的地址
    • 或者是分布式文件系统HDFS的地址
    • 或者是Amazon S3的地址等等

    2. 通过并行集合(列表)创建RDD

    可以调用SparkContext的parallelize方法,在Driver中一个已经存在的集合 (列表)上创建。

    举个栗子:

    第一种:
    lines = sc.textFile("file:///usr/local/spark/mycode/rdd/word.txt") or sc.textFile("hdfs://localhost:9000/user/hadoop/word.txt")
    >>> lines.foreach(print)
    Hadoop is good
    Spark is fast
    Spark is bette

    第二种:
    >>> array = [1,2,3,4,5]
    >>> rdd = sc.parallelize(array)
    >>> rdd.foreach(print)

    二、转换函数

    1.filter()

    .filter(func):筛选出满足函数func的元素,并返回一个新的数据集
    >>>lines = sc.textFile("file:/l/usr/local/spark/mycode/rdd/word.txt")
    >>>linesWithSpark = lines.filter(lambda line: "Spark" in line)
    >>> linesWithSpark.foreach(print)
    Spark is fast
    Spark is better

    2.map()

    .map(func):将每个元素传递到函数func中,并将结果返回为一个新的数据集,一个纯粹的转换操作

    第一个栗子:
    >>>data=[1,2,3,4,5]
    >>> rdd1=sc.parallelize(data)
    >>> rdd2=rdd1.map(lambda x:x+1)
    >>> rdd2.foreach(print)

    第二个栗子:

    >>>lines =sc.textFile("file:lllusr/local/spark/mycode/rdd/word.txt"')
    >>>words = lines.map(lambda line:line.split(" "))
    >>>words.foreach(print)
    ['Hadoop', ' is' , 'good']['Spark', 'is', ' fast]['Spark', 'is', ' better']

    3.flatMap()

    .flatMap(func):与map相似,但每个输入元素都可以映射到0或多个输出结果;先执行Map在执行flat拍扁其中的每个元素

    第一个栗子:
    >>>lines = sc.textFile("file:/llusr/localspark/mycode/rdd/word.txt")
    >>>words = lines.flatMap(lambda line:line.split(" "))

    第二个栗子
    >>>words = sc.parallelize([("Hadoop",1),("is",1).("good",1),.... ("Spark",1).("is",1),("fast",1),("Spark",1),("is",1),("better",1)])
    >>> words1 = words.groupByKey
    >>> words1.foreach(print)
    ('Hadoop',<pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
    ('better',<pyspark.resultiterable.ResultIterable object at 0x7fb210552e80>)
    ('fast',<pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
    ('good',<pyspark.resultiterable.ResultIterable object at 0x7fb210552c88>)
    ('Spark',<pyspark.resultiterable.ResultIterable object at 0x7fb210552f98>)
    ('is',<pyspark.resultiterable.ResultIterable object at Ox7fb210552e10>)

    4.reduceByKey()

    .reduceByKey(func):应用于(K,V)键值对的数据集时,返回一个新的(K,V)形式的数据集,其中每个值是将每个key传递到函数func中进行聚合后的结果
    >>>words = sc.parallelize([("Hadoop",1),("is",1),("good",1),("Spark",1), !.... ("is",1),("fast"",1),("Spark",1),("is",1),("better",1)])
    >>> words1 = words.reduceByKey(lambda a,b:a+b)
    >>> words1.foreach(print)
    ('good', 1)
    ('Hadoop', 1)('better', 1)('Spark', 2)('fast', 1)('is',3)

    tips:

    • groupByKey也是对每个key进行操作,但只生成一个sequence, groupByKey本身不能自定义函数,需要先用groupByKey生成RDD,然后才能对此RDD通过map进行自定义函数操作.
    • reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义.

    5.keys()

    .keys():返回键值
    >>> list=[("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
    >>>pairRDD= sc.parallelize(list)
    >>> pairRDD.keys().foreach(print)
    Hadoop
    Spark
    Hive
    Spark

    6.values()

    .values():返回值
    >>> list =[("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
    >>> pairRDD = sc.parallelize(list)
    >>>pairRDD.values(.foreach(print)
    1111

    7.sortByKey()

    .sortByKey():返回一个根据键排序的RDD,默认升序排序,降序sortByKey(False)

    第一个栗子:
    >>>list=[("Hadoop",1).("Spark",1),("Hive",1),(""Spark",1)]
    >>>pairRDD= sc.parallelize(list)
    >>> pairRDD.foreach(print)
    ('Hadoop', 1)
    ('Spark', 1)
    ('Hive', 1)
    ('Spark', 1)

    第二个栗子:

    >>>pairRDD.sortByKey.foreach(print)
    ('Hadoop', 1)
    ('Hive', 1)('Spark', 1)('Spark', 1)
    >>>d1.reduceByKey(lambda a,b:a+b).sortByKey(False).collect()
    [('g',21), ('f,29), ('e',17), ('d', 9), ('c',27),('b',38), ('a',42)]

    8.sortBy()

    .sortBy(func):按func自定义排序
    >>>d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x: x[0],False).collect(('g',21), ('f,29), ('e',17), ('d', 9), ('c',27), ('b',38), ('a', 42))
    >>>d1.reduceByKey(lambda a,b:a+b).sortBy(lambda x: x[1],False).collect()
    [('a',42), ('b',38), ('f,29), ('c',27),('g', 21), ('e',17), ('d', 9)]

    9.mapValues()

    .mapValues(func):对键值对RDD中的每个value都应用—个函数,key不会发生变化
    >>> list=[("Hadoop",1),("Spark",1),("Hive",1),("Spark",1)]
    >>>pairRDD= sc.parallelize(list)
    >>>pairRDD1 = pairRDD.mapValues(lambda x;x+1)
    >>>pairRDD1.foreach(print)
    ('Hadoop',2)
    ('Spark',2)('Hive',2)('Spark', 2)

    10.join()

    .join(): join就表示内连接。对于内连接,对于给定的两个输入数据集(K,V1)和(K,V2),只有在两个数据集中都存在的key才会被输出,最终得到一个:(K,(V1,V2))类型的数据集。
    >>> pairRDD1 = sc. parallelize([("spark",1),("spark",2),("hadoop",3),("hadoop",5)])
    >>>pairRDD2= sc.parallelize([("spark","fast")))
    >>>pairRDD3 = pairRDD1.join(pairRDD2)
    >>>pairRDD3.foreach(print)
    ('spark', (1, 'fast'))
    ('spark', (2, 'fast'))

    11.distinct()

    .distinct():去除重复值,一般用于在数据读入时执行该操作
    >>> RDD = sc. parallelize([("spark",1),("spark",1),("spark",2),("hadoop",3),("hadoop",5)]).map(lambda x:s.strip()).distinct()
    >>>RDD.foreach(print)
    ("spark",1)
    ("spark",2)
    ("hadoop",3)
    ("hadoop",5)


    二、常见的行动操作:

    • count(返回数据集中的元素个数
    • collect0以数组的形式返回数据集中的所有元素
    • first()返回数据集中的第一个元素
    • take(n)以数组的形式返回数据集中的前n个元素
    • reduce(func)通过函数func(输入两个参数并返回一个值)聚合数据集中的元素
    •  foreach(func)将数据集中的每个元素传递到函数func中运行

    >>>rdd = sc.parallelize([1,2,3,4,5)
    >>> rdd.countO
    5
    >>> rdd.first()
    1
    >>>rdd.take(3)
    [1,2,3]
    >>>rdd.reduce(lambda a,b:a+b)
    15
    >>> rdd.collect()
    [1,2,3,4,5]
    >>>rdd.foreach(lambda elem:print(elem))
    12345

    三、题外话:

    A.持久化

    在Spark中,RDD采用惰性求值的机制,每次遇到行动操作,都会从头开始执 行计算。每次调用行动操作,都会触发一次从头开始的计算。这对于迭代计 算而言,代价是很大的,迭代计算经常需要多次重复使用同一组数据,通过持久化(缓存)机制避免这种重复计算的开销。

    .persist():标记为持久化,在第一次行动操作时执行----->.unpersist():手动地把持久化的RDD从缓存中移除

    >>> list =["Hadoop","Spark","Hive"]
    >>> rdd = sc.parallelize(list)
    >>>rdd.cache) #会调用persist(MEMORY_ONLY),但是,语句执行到这里,并不会缓存rdd,因为这时rdd还没有被计算生成
    >>> print(rdd.count() #第一次行动操作,触发一次真正从头到尾的计算,这时上面的rdd.cache()才会被执行,把这个rdd放到缓存中
    3
    >>>print(','.join(rdd.collectO)) #第二次行动,不需要触发从头到尾的计算,只需要重复使用上面缓存中的rdd
    Hadoop,Spark,Hive

    B.分区

    RDD是弹性分布式数据集,通常RDD很大,会被分成很多 个分区,分别保存在不同的节点上。分区的作用主要是:增加并行度;减少通信开销。

    分区原则:RDD分区的一个原则是使得分区的个数尽量等于集群中的CPU核心 (core)数目

    分区个数:

    (1)创建RDD时手动指定分区个数

            sc.textFile(path, partitionNum) 其中,path参数用于指定要加载的文件的地址,partitionNum参数用于 指定分区个数。例如:

    (2)使用reparititon方法重新设置分区个数

            通过转换操作得到新 RDD 时,直接调用 repartition 方法即可。例如:

    四、如何开始编写一个RDD

    启动Hadoop,打开pycharm新建文件就不说了,部分同学不知道sc是啥,我理解的是sparkContext相当于一个指挥官负责统筹调度计算RDD之间的依赖关系构建DAG(有向无环图),再有DAGScheduler负责将DAG图分解成多个阶段,每个阶段包含多个任务,每个任务又会被TaskScheduler分发给各个WorkerNode上的Executor去执行,再逐层返回最后得到结果,基本的思想还是MapReduce只是基于内存速度更快,不像Hadoop频繁的IO读写会有很大延迟,举个栗子:

    from pyspark import SparkContext,SparkConf
    
    def fun1(x):
        arr = x.split()
        id = arr[0]
        name = arr[1:]
        return (id,name)
    
    def fun2(x):
        if x[0] == '2019110401':
            return False
        else:
            return True
    
    def fun3(x):
        key = x[0]
        value = int(int(x[1][1])/10)
        return (value,key)
    
    conf = SparkConf().setAppName('class 1').setMaster('local')
    sc = SparkContext(conf=conf)
    
    # alist=[1,2,3,4,5]
    # rdd0 = sc.parallelize(alist)#创建第一个RDD
    # print(rdd0)
    
    #path = 'hdfs://master:9000/test.txt'
    path = 'file:///home/mls/abc/test.txt'
    rdd0 = sc.textFile(path)
    print(rdd0)
    #
    rdd1 = rdd0.map(lambda x:x.strip()).distinct()#去空格后去重
    print(rdd1.collect())#此时得到的是字符串
    
    rdd2 = rdd1.map(lambda x:fun1(x))#map转换
    print(rdd2.count())
    print(rdd2.collect())
    print(rdd2.take(4))
    
    rddttest = rdd2.map(lambda x: fun3(x))
    print(rddttest.collect())
    
    rdd3 = rdd2.filter(fun2)#过滤 rdd2.filter(lambda x: fun2(x))
    print(rdd3.count())
    print(rdd3.collect())
    print(rdd3.take(4))
    print(rdd3.collect()[0][1][1])#打印第一个元素的第二维里的第二个元素
    
    # rdd4 = rdd3.map(lambda x: fun3(x))
    # print(rdd4.collect())
    rdd5 = rdd3.groupByKey()#按照学号分组
    print(rdd5.collect())
    rddttest5 = rddttest.groupByKey()#按照成绩分组
    print(rddttest5.collect())
    
    rdd6 = rdd5.mapValues(lambda x:list(x))#len(list(x))只针对值
    print(rdd6.collect())
    rddttest6 = rddttest5.mapValues(lambda x:list(x))
    print(rddttest6.collect())
    
    print(rdd4.collect())
    
    sc.stop()

    展开全文
  • spark API RDD

    2014-09-05 16:18:26
    spark API RDD pdf版的..........对初学者应该有所帮助
  • spark rdd api

    2018-04-28 14:17:47
    本文详细的描述了spark rdd的api 这些api 应该够我们日常生产使用了
  • RDD编程API

    2018-08-09 09:39:42
    简单的RDD编程,便于上手,只适合于小白用户使用,大神绕道
  • spark rdd 转换和动作

    2020-12-24 07:09:36
    2017-07-22概述本文对spark rdd的转换和动作进行总结和实际操作演示.RDD(Resilient Distributed Datasets),弹性分布式数据集, 是spark分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型.即RDD是只读...

    2017-07-22

    概述

    本文对spark rdd的转换和动作进行总结和实际操作演示.

    RDD(Resilient Distributed Datasets),弹性分布式数据集, 是spark分布式内存的一个抽象概念,RDD提供了一种高度受限的共享内存模型.即RDD是只读的记录分区的集合,只能通过在其他RDD执行确定的转换操作(如map、join和group by)而创建,然而这些限制使得实现容错的开销很低。

    rdd 的分布式,因为rdd支持分区, 自动把一个rdd根据partition分发到n台spark设备进行处理. 这些对用户完全透明. 用户感觉和操作本地数据一样.

    rdd通过parallelize和textFile或流来创建. 再通过转换得到新的rdd. 转换的过程不是立即执行, 而是在需要动作action时才开始. 这样方便系统进行自动优化.

    rdd操作示例

    parallelize一般用于测示创建rdd.

    scala> val square = sc.parallelize(List(1,2,3,4))

    scala> val sq = square.map(x=>x*x).collect()

    sq: Array[Int] = Array(1, 4, 9, 16)

    scala> val drink1=sc.parallelize(List("coffee","tea","coffee","panda","monkey"))

    scala> val rdd2 = sc.parallelize(List("coffee","money","kitty","猫"))

    scala> val r11= rdd1.union(rdd2).collect()

    r11: Array[String] = Array(coffee, tea, coffee, panda, monkey, coffee, money, kitty, 猫)

    scala> val r12 = rdd1.intersection(rdd2).collect()

    r12: Array[String] = Array(coffee)

    scala> val r13 = rdd1.subtract(rdd2).collect()

    r13: Array[String] = Array(tea, panda, monkey)

    scala> val users=sc.parallelize(List("user1","user2"))

    scala> val tags = sc.parallelize(List("经济","政治","文化"))

    scala> users.cartesian(tags).collect()

    res1: Array[(String, String)] = Array((user1,经济), (user1,政治), (user1,文化), (user2,经济), (user2,政治), (user2,文化))

    普通RDD操作

    转换 Transformation

    转换有如下一些种类

    map(func)

    filter(func)

    repartition(numPartitions)

    flatMap(func)

    repartitionAndSortWithinPartitions(partitioner)

    mapPartitions(func)

    join(otherDataset, [numTasks])

    mapPartitionsWithIndex(func)

    cogroup(otherDataset, [numTasks])

    sample(withReplacement, fraction, seed)

    cartesian(otherDataset)

    coalesce(numPartitions)

    基础转换

    测试用RDD 包含 {1, 2, 3, 3}

    函数

    目的

    示例

    结果

    map

    对RDD每个元素应用函数, 并返回一个RDD结果

    rdd.map(x => x + 1)

    {2, 3, 4, 4}

    flatMap

    对RDD每个元素应用函数, 并返回一个RDD结果,包含迭代器返回的内容.常用于抽取单词.

    rdd.flatMap(x => x.to(3))

    {1, 2, 3, 2, 3, 3, 3}

    filter

    返回一个RDD结果, 由通过了filter的元素组成.

    rdd.filter(x => x != 1)

    {2, 3, 3}

    distinct

    移除重复元素

    rdd.distinct()

    {1, 2, 3}

    sample(withReplacement, fraction, [seed])

    对 RDD 抽样,withReplacement是指是否有放回的抽样为true为放回,为false为不放回,fraction为抽样占总数据量的比值

    rdd.sample(false, 0.5)

    non-deterministic

    两个RDD转换

    RDD分别包含 {1, 2, 3} 和 {3, 4, 5}

    函数

    目的

    示例

    结果

    union()

    两个RDD并集.

    rdd.union(other)

    {1, 2, 3, 3, 4, 5}

    intersection()

    RDD 交集

    rdd.intersection(other) {3}

    subtract()

    从一个RDD移除全部存在于另一个RDD的元素.如移除训练数据.

    rdd.subtract(other) {1, 2}

    cartesian()

    两个RDD的笛卡尔积, 一个RDD中每个元素和另一个RDD的每个元素两两组合

    rdd.cartesian(other)

    {(1, 3), (1,4), … (3,5)}

    普通RDD 动作(Action)

    基础动作

    RDD 包含 {1, 2, 3, 3}

    函数

    目的

    示例

    结果

    collect()

    返回 RDD 全部元素

    rdd.collect()

    {1, 2, 3, 3}

    count()

    RDD 元素总计

    rdd.count()

    4

    countByValue()

    RDD 每个元素出现次数总计

    rdd.countByValue()

    {(1, 1), (2, 1), (3, 2)}

    take(num)

    返回 RDD 中num个元素

    rdd.take(2)

    {1, 2}

    top(num)

    返回 RDD 中top num个元素

    rdd.top(2)

    {3, 3}

    takeOrdered(num)(ordering)

    返回 RDD 中num个元素, 基于给定排序

    rdd.takeOrdered(2)(myOrdering)

    {3, 3}

    takeSample(withReplacement, num, [seed])

    返回 RDD 中随机num个元素, withReplacement表示抽样是否放回. 放回会有重复

    rdd.takeSample(false, 1)

    non-deterministic

    reduce(func)

    并发将RDD中的元素联合起来.如加和.

    rdd.fold((x, y) => x + y)

    9

    fold(zero)(func)

    类reduce函数, 但提供初始值设置.

    rdd.fold(0)((x, y) => x + y)

    9

    aggregate(zeroValue)(seqOp, combOp)

    类reduce函数, 但提供初始值设置. 可以返回不同类型

    rdd.aggregate(0, 0) ({case (x, y) => (y._1() + x, y._2() + 1)}, {case (x, y) => (y._1() + x._1(), y._2() + x._2()) })

    (9, 4)

    foreach(func)

    对RDD中每个元素应用函数.

    rdd.foreach(func)

    reduce

    对两个元素应用reduce中的函数, 得到一个新的元素, 再用新元素和集合中的元素进行reduce运算. 如+, 求和,计数和其他聚合运算.

    scala> val data = sc.parallelize(List(1,2,3,4))

    scala> data.collect()

    res33: Array[Int] = Array(1, 2, 3, 4)

    scala> data.reduce(_ + _)

    res35: Int = 10

    fold

    fold()和reduce类似. 但会对每个分区置初始zero值(+为0,乘为1, 连接列表为空列表).

    fold 定义:

    def fold(zeroValue: T)(op: (T, T) => T): T

    scala> val data = sc.parallelize(List(1,2,3,4))

    scala> data.collect()

    res33: Array[Int] = Array(1, 2, 3, 4)

    scala> data.fold(0)((x,y)=>x+y)

    res40: Int = 10

    scala> data.fold(1)((x,y)=>x+y)

    res41: Int = 15

    aggregate

    聚合函数可以避免使用map. 和fold差不多, 需要传入一个初始值函数, 再传入一个每个分区的累积函数. 最后传入一个分区之间的累积函数.

    下面的代码是求均值. 参数(0,0) 表示总和0,计数0.

    参数(acc, value) ,acc表示累计值, value是当前值.其中acc是元组. _1为第一个元素, 为累积值. _2为第二个元素,表示累计计数.

    第三个函数(acc1, acc2) 则是各分区的归总combine.

    val result = input.aggregate((0, 0))(

    (acc, value) => (acc._1 + value, acc._2 + 1),

    (acc1, acc2) => (acc1._1 + acc2._1, acc1._2 + acc2._2))

    val avg = result._1 / result._2.toDouble

    scala> val result = data.aggregate((0,0))( (acc,value)=>(acc._1 + value, acc._2+1),(acc1,acc2)=>(acc1._1+acc2._1, acc1._2 + acc2._2))

    result: (Int, Int) = (10,4)

    scala> val avg = result._1/result._2

    avg: Int = 2

    # pairRDD 操作

    pairRDD是特殊的RDD, 包含key->value元组键值对的RDD, 有特殊操作.

    很多变换会返回pairRDD, 也可以将普通RDD转为pairRDD. 如通过map()函数, 将行的第一个单词作为key,行作为value.

    val pairs = lines.map(x => (x.split(" ")(0), x))

    ## 单个pairRDD上的变换

    rdd={(1, 2), (3, 4), (3, 6)}

    函数

    目的

    示例

    结果

    reduceByKey(func)

    用同一个key将值联合.

    rdd.reduceByKey((x,y)=>x+y)

    {(1,2),(3,10)}

    groupByKey()

    用同一个key将值分组

    rdd.groupByKey()

    {(1,[2]),(3,[4,6])}

    combineByKey(createCombiner, mergeValue, mergeCombiners, partitioner)

    用同一个key将值联合, 但返回不同类型

    mapValues(func)

    将函数应用于每个值, 不改变key.

    rdd.mapValues(x=>x+1)

    {(1,3),(3,5),(3,7)}

    flatMapValues(func)

    将函数应用于一个返回pair RDD每个值的迭代器, 对每个返回的的值, 用原来的key生成键值对.常用于 tokenization.

    rdd.flatMapValues(x=>(x to 5)

    {(1,2),(1,3),(1,4),(1,5),(3,4),(3,5)}

    keys()

    返回 RDD 的键集合

    rdd.keys()

    {1,3,3}

    values()

    返回 RDD 的值集合

    rdd.values()

    {2,4,6}

    sortByKey()

    返回 RDD, 用键排序.

    rdd.sortByKey()

    {(1,2),(3,4),(3,6)}

    combineByKey

    对每个key求均值

    input ={(1, 2), (3, 4), (3, 6)}

    combineByKey 可以省去map的过程.

    scala> val input=sc.parallelize(List((1,2),(3,4),(3,6))

    scala> val result = input.combineByKey(

    (v) => (v, 1),

    (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),

    (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)

    ).map{ case (key, value) => (key, value._1 / value._2.toFloat) }

    scala> result.collectAsMap().map(println(_))

    (1,2.0)

    (3,5.0)

    说明

    第一个参数相当于map.

    (v) => (v, 1), 表示将key都转为计数为1的元组

    第二个参数 (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1) 表示对每一个分区的acc元组的第一个值求和, 第二个计数

    第三个参数 (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)

    ) 表示对每个分区的累积元组进行总的加和和计数.

    map操作进行求均值

    两个 pair RDD的变换

    (rdd = {(1,2),(3,4),(3,6)}, other = {(3,9)})

    函数

    目的

    示例

    结果

    subtractByKey

    将另一个RDD存在的key从本RDD移除.

    rdd.subtractByKey(other)

    {(1,2)}

    join

    两个 RDD 进行 inner join .

    rdd.join(other)

    {(3,(4,9)),(3,(6,9))}

    rightOuterJoin

    两个 RDD 进行右连接, key必须在右边的RDD存在

    rdd.rightOuterJoin(other)

    {(3,(Some(4),9)),(3,(Some(6),9))}

    leftOuterJoin

    两个 RDD 进行左连接, key必须在左边的RDD存在

    rdd.leftOuterJoin(other)

    {(1,(2,None)),(3,(4,Some(9))),(3,(6,Some(9)))}

    cogroup

    两个RDD具有同一个键, 则组合成一个组.

    rdd.cogroup(other)

    {(1,([2],[])),(3,([4,6],[9]))}

    参考

    《Learning spark》

    如非注明转载, 均为原创. 本站遵循知识共享CC协议,转载请注明来源

    展开全文
  • spark数据处理-RDD

    千次阅读 2020-08-11 01:29:46
    sparkRDD计算,共享变量,数据读写;


    学习来源:spark快速大数据分析
    [美] Holden Karau [美] Andy Konwinski
    [美] Patrick Wendell [加] Matei Zaharia 著
    王道远 译

    spark数据处理笔记

    spark核心介绍

    每个spark应用都是由一个驱动器程序(driver program)发起集群上的各种并行操作;驱动器程序包含应用的main函数,定义了集群上的分布式数据集及相关应用操作;驱动器通过一个SparkContext对象访问spark.shell启动时会自动创建一个SparkContext对象——一个命名为sr变量;

    RDD编程

    RDD介绍-弹性分布式数据集

    • 弹性分布式数据集(Resilient Distributed Dataset,简 称 RDD)。RDD 是一个不可变的分布式对象集合。在 Spark 中,对数据的所有操作不外乎创 建 RDD、转化已有 RDD 以及调用 RDD 操作进行求值。Spark 会自动将 RDD 中的数据分发到集群上,并将操作并行化执行。
    • RDD支持两种类型的操作:转化操作(transformation)和行动操作(action);转化操作会由一个RDD生成一个新的RDD,惰性lazy计算,在被调用行动操作之前不会开始计算(比如filter,map);行动操作对RDD计算出一个结果,把结果返回到驱动器程序中或储存到外部储存系统(如HDFS)中:返回其他数据类型,会触发实际计算;缓存是在第一次调用行动操作时触发缓存动作;
    • 默认情况下,spark的RDD会在每次行动操作时重新计算,如果需要复用同一个RDD,可以使用**RDD.persist()**把RDD缓存,不然,每调用一个新的行动操作时,整个RDD都会从头开始计算;
    • RDD可以通过**collection()**函数获取整个RDD中的数据,如果RDD很小,单台机器内存足以支撑,才能使用collection()方法;

    创建RDD两种方式

    • 读取一个外部数据集
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    // local:本地模式运行
    val conf = new SparkConf().setMaster('local').setAppName("appname")
    val sc = new SparkContext(conf)
    lines = sc.textFile("README.md")
    
    • 在驱动器程序里分发驱动器程序中的对象集合
      • 将程序中已有的集合传给SparkContextparallelize()
    val lines = SC.parallelize(List(1,2,3,4))
    

    函数传递

    • 传递一个对象的方法或者字段时,把包含整个对象的引用,可以把需要的字段放到一个局部变量中,避免传递包含该字段的整个对象,这里在python中也是一样的;

    • 所传递的函数或数据需要是可序列化的(实现了 Java 的 Serializable 接口);如果在 Scala 中出现了 NotSerializableException,通常问题是传递了一个不可序列 化的类中的函数或字段。传递局部可序列化变量或顶级对象中的函数始终是安全的。

    class SearchFunctions(val query: String) {   
      def isMatch(s: String): Boolean = {    
        s.contains(query)   
      } 
        
      def getMatchesFunctionReference(rdd: RDD[String]): RDD[String] = { 
        // 问题:"isMatch"表示"this.isMatch",因此我们要传递整个"this"     
        rdd.map(isMatch)   
      }   
        
      def getMatchesFieldReference(rdd: RDD[String]): RDD[String] = {
        // 问题: "query"表示"this.query",因此我们要传递整个"this"     
        rdd.map(x => x.split(query))   
      }   
        
      def getMatchesNoReference(rdd: RDD[String]): RDD[String] = {     
        // 安全:只把我们需要的字段拿出来放入局部变量中     
        val query_ = this.query     
        rdd.map(x => x.split(query_))  
      }
    }
    

    常见RDD转化操作和行动操作

    常用的转化操作:

    • map():映射
    val lines = SC.parallelize(List(1,2,3,4)) 
    val lines1 = lines.map(e => e * e)    // 所有元素取平方
    
    • filter():过滤
    val lines = SC.parallelize(List(1,2,3,4)) 
    val lines1 = lines.filter(e => e > 2)      // 筛选大于2
    
    • flatMap()
    val lines = SC.parallelize(List("hello world","scala"))
    val lines1 = lines.flatMap(e => e.split(" "))
    println(lines1.first())     // hello
    

    类集合操作

    RDD不是严格意义上的集合,但支持许多数据学集合操作,比如:

    • RDD.distinct()

      去重:操作开销比较大,需要将所有数据通过网络进行混洗(shufflle),以确保每个元素的唯一性;

    • RDD.union(RDD1)

      合并:会把两个RDD合并成一个RDD,新的RDD不会去重,sql中的union all操作会去重;

    • RDD.intersection(RDD1)

      交集:类似于sql中的inner join操作,返回两个RDD共有的元素。**intersection()**在运行时会去掉所有重复操作,该操作也需要通过网络混洗拿数据来发现共有的元素;

    • RDD.subtract(RDD1)

      差集:返回只存在于RDD不存在于RDD1中的元素,与intersection()一样,也需要进行数据混洗;

    • RDD.cartesian(RDD1)

      笛卡尔积:返回RDD元素和RDD1元素组合(a,b)的所有组合对;

    • RDD.sample(withReplacement,fraction,[seed])

      对RDD采样,以及是否替换

      rdd.sample(false,0.5)    // 抽样50%
      

    行动操作

    • reduce(func)

      接受一个函数作为参数,函数要求两个RDD的元素类型一致,并返回同样类型元素;

    val sum = rdd.reduce((x,y) = > x + y)   // 求和操作
    
    • fold()和reduce一样,都要求函数的返回值类型需要和我们所操作的RDD中的元素类型相同;有时候也会需要返回一个不同类型的值,例如求和和计数同时运行,这里可以对RDD进行map操作,把元素转化为该元素和1的二元组,然后再调用reduce操作;

      rdd = sc.parallelize(List(1,2,3,4))
      rdd.fold(0)((x,y) = > x + y)    // 10
      
    • collect()

      会将整个RDD内容返回驱动程序中,通常在单元测试中使用(RDD数据块比较小),所有数据能放入同一台机器中;不然是在各个worker节点上执行操作;

    • take(n)

      返回RDD的n个元素,并且尝试只访问尽量少的分区,该操作会得到一个不均衡的集合。操作返回元素的顺序可能会与预期不一样;

    • top(n)

      如果定义了顺序,top()从RDD获取前几个元素;使用默认顺序;

    • foreach(func)

      可以对RDD中的每个元素进行操作,而不需要把RDD返回到本地;

    • count()

      统计元素个数

    • countByValue()

      统计各元素在RDD中出现的次数;

      rdd = sc.parallelize(List(1,2,2,2))
      rdd.countByValue()     // {(1,2),(2,3)}
      
    • takeOrdered(n)(ordering)

      从RDD中按照提供的顺序返回最前面的n个元素

      rdd.takeOrdered(2)(myOrdering)
      
    • takeSample(withReplacement,num,[seed])

      从RDD中返回任意一些元素

      rdd.takeSample(false,1)    // 返回元素非确定的
      
    • aggregate(zeroValue)(seqOp,comOp)

      和reduce相似,但是通常返回不同类型的函数;

      rdd.aggregate((0,0)) 
      ((x, y) => (x._1 + y, x._2 + 1),
       (x, y) => (x._1 + y._1, x._2 + y._2))
      

    不同RDD类型间转化

    • scala将RDD转化为有特定函数的RDD是由隐式转化来自动处理的。这里需要导入模块**import or.apache.spark.SparkContext._**来使用这些隐式转化,这些隐式转换可以隐式地将一个 RDD 转为各种封装类,比如 DoubleRDDFunctions (数值数据的 RDD)和 PairRDDFunctions(键值对 RDD);
    • SparkContext 对象隐式转化的 Scala 文档
    • RDD隐式转化类的函数应用在scala-RDD文档里可能找不到对应函数,因为隐式转化可以把一个RDD类型转化为另一个类型;比如RDD[Double] -> DoubleRDDFunctions

    持久化(缓存)

    org.apache.spark.storage.StorageLevel中的持久化级别。有必要,也可以在存储级别末位加上**_2**来把持久化数据保存两份;

    级别使用的空间CPU时间是否在内存中是否在磁盘上备注
    MEMORY_ONLY
    MEMORY_ONLY_SER
    MEMORY_AND_DISK中等部分部分如果数据内存中放不下,则溢写到磁盘上;
    MEMORY_AND_DISK_SER部分部分如果数据内存中放不下,则溢写到磁盘上;在内存中存放序列化后的数据;
    DISK_ONLY
    val res = rdd.map(x => x + 1)
    res.persist(StorageLevel.MEMORY_ONLY)
    println(res.count())    // 调用行动操作
    res.reduce((x,y) => x + y)     // 再次调用行动reuce操作,已经缓存,无需再重新进行转化操作map计算;
    

    数据缓存多,spark会自动利用最近最少使用的(LRU)的缓存策略把最老的分区从内存中移除。对于仅把数据存放在内存中的缓存级别,下一次要用到已经被移除的分区时,这些分区需要重新计算;对于使用内存与磁盘的缓存级别的分区来说,被移除的分区都会写入磁盘。

    键值对RDD操作

    • 键值对RDD(pair RDD)通常用来进行聚合计算,一般需要一些初始ETL(抽取、转化、装载)操作来将数据转化为键值对形式。键值对对RDD提供了一些新的操作接口,比如不同RDD分组合并,比如**reduceByKey()**可以分别规约每个键对应的数据、**join()**把键相同的元素组合到一起合并为一个RDD;

    创建Pair RDD

    • 很多存储键值对的数据格式会在读取时直接返回由其键值对数据组成的pair RDD;另外,把一个普通的RDD转化为pari RDD也可以通过调用map()函数来实现,传递的函数需要返回键值对。
    // scala中使用第一个单词作为键创建一个pair RDD
    val pairs = lines.map(x => (x.split(" ")(0),x))
    

    转化操作

    • pair RDD中包含了二元组,需要传递的函数应当操作二元组而不是独立元素。
    val pairs = sc.parallelize(List((1,2),(3,4),(3,6)))
    

    单RDD操作

    函数名称功用示例结果
    reduceByKey(func)合并具有相同键的值rdd.reduceByKey(_ + _){(1,2), (3,10)}
    groupByKey(func)对具有相同键的值进行分组rdd.groupByKey(){(1,[2]),
    (3, [4,6])}
    combineByKey(createCombiner,
    mergeValue,
    mergeCombines,
    partitioner)
    使用不同的返回类型合并具有相同键的值
    mapValues(func)
    等同于map{case (k,v) => (k,func(v))}
    对pair RDD中的每个值应用func函数,键不改变;rdd.mapValues(_ + 1){(1,2), (1,3), (1,4), (1,5), (3,4), (3,5)}
    flatMapValues(func)对每个pair RDD中的值应用一个返回迭代器的函数,然后对返回的每个元素都生成一个对应原键的键值对记录rdd.flatMapValues(x => x to 4){(1,2), (1,3), (1,4), (3,4)}
    keys()返回一个仅包含键的RDDrdd.keys(){1, 3,3}
    values()返回一个仅包含值的RDDrdd.values(){2, 4,6}
    sortByKey()返回一个根据键排序的RDDrdd.sortByKey(){(1,2), (3,4), (3,6)}
    keyBy()返回一个新的RDD,键不变,value为原来的键+value元组rdd.keyBy(){(1,(1,2)),
    (3,(3,4)),
    (3,(3,6))}

    两个pair RDD转化操作

    val rdd = sc.parallelize(List((1,2),(3,4),(3,6)))
    val other = sc.parallelize(List((3,9)))
    
    函数名称功用示例结果
    subtractByKey删掉RDD中键与other RDD中的键相同的元素rdd.subtractByKey(other){(1, 2)}
    join对两个RDD进行内连接rdd.join(other){(3, (4, 9)),
    (3,(6, 9))}
    rightOuterJoin右连接,以右RDD键为基准进行连接,类似于sqlrdd.rightOuterJoin(other){(3,(Some(4),9)),
    (3,(Some(6),9))}
    leftOuterJoin左连接,类似于sql左连接,已左RDD键为基准进行合并rdd.leftOuterJoin(other){(1,(2,None)),
    (3,(4,Some(9))),
    (3,(6,Some(9)))}
    cogroup将两个RDD中拥有相同键的数据分组到一起rdd.cogroup(other){(1,([2],[])),
    (3,([4, 6],[9]))}
    • pair RDD也是RDD,支持RDD所支持的函数;
    // 筛选第二个元素长度小于10
    pairs.filter{case (k,v) => v.length < 10}
    
    • 并行度调优

      • 每个RDD都有固定数目的分区,分区数决定了RDD上执行操作时的并行度;在执行聚合或分组操作时,可以自定义分区数;spark会尝试根据集群的大小推断一个有意义的默认值,有时候可能也需要对并行度进行调优获得更好的性能表现;
      pairs.reduceByKey(_ + _)    // 默认并行度
      pairs.reduceByKey(_ + _, 10)     // 自定义并行度
      
      • 分组或聚合之外,可以通过repartition()函数改变RDD的分区,repartition()函数会把数据通过网络混洗,创建新的分区集合,消耗相对较大。spark中也有一个优化版的repartition()-coalesce()。scala中可以通过rdd.partitions.size()查看RDD的分区个数;

    聚合操作

    • reduceByKey()

      • 与reduce类似,传入一个函数,返回由各键和对应键规约出来的结果值组成的新的RDD
    • foldByKey(0)

      • 与fold类似,初始值为0;
    • mapValues()

      • 对值进行map映射操作,键不变;
      • 求平均值
      pairs.mapValues((_,1)).
        reduceByKey((x,y) => (x._1 + y._1,x._2 + y._2)).
        mapValues(x => x._1 / x._1.toFloat).
        collectAsMap
      
    • combineByKey()

      • combineByKey()是最为常用基于键进行聚合的函数,可以让用户返回与输入的类型不同的返回值;combineByKey在遍历元素时,如果是一个新的元素,会调用createCombiner()函数来创建该键对应的累加器的初始值,这一过程每个分区第一次出现各个键触发;如果键存在于createCombiner创建的累加器键里面,会使用mergeValue()方法将该键的累加器对应的值与新的值进行合并;此外,每个分区都是独立的,如果有两个或者多个分区有对应同一个键的累加器,这里需要提供mergeCombiners()方法将各个分区的结果进行合并;以下为求平均值示例:
      val result = input.combineByKey(   
          (v) => (v, 1),        // createCombiner初始化累加器
          (acc: (Int, Int), v) => (acc._1 + v, acc._2 + 1),     // mergeValue累加
          // mergeCombiners()合并
          (acc1: (Int, Int), acc2: (Int, Int)) => (acc1._1 + acc2._1, acc1._2 + acc2._2)   
      	// sum / count = avg
      	).map{ case (key, value) => (key, value._1 / value._2.toFloat) }   
      // 结果转化为映射打印出来
      result.collectAsMap().map(println(_))
      

    分组操作

    • groupByKey() 使用 RDD 中的键来对数据进行分组。对于一个由类型 K 的键和类型 V 的值组成的 RDD,所得到的结果 RDD 类型会是[K, Iterable[V]];groupBy()可以永固未成对的数据上,也可以根据除键相同以外的条件进行分组。可以接收一个函数,对RDD中每个元素使用该函数,将返回结果作为键再进行分组;
      • rdd.reduceByKey(func)与 rdd.groupByKey().mapValues(value => value.reduce(func)) 等价,但是前
        者更为高效,因为它避免了为每个键创建存放值的列表的步骤;
    • 除了对单个 RDD 的数据进行分组,还可以使用一个叫作 cogroup() 的函数对多个共享同一个键的 RDD 进行分组。对两个键的类型均为 K 而值的类型分别为 V 和 W 的 RDD 进行cogroup() 时,得到的结果 RDD 类型为 [(K, (Iterable[V], Iterable[W]))]。如果其中的一个 RDD 对于另一个 RDD 中存在的某个键没有对应的记录,那么对应的迭代器则为空。cogroup() 提供了为多个 RDD 进行数据分组的方法;
      • cogroup() 不仅可以用于实现连接操作,还可以用来求键的交集。除此之外,cogroup() 还能同时应用于三个及以上的 RDD
      • reduceByKey与groupByKey的区别:
        reduceByKey用于每个key对应的多个value进行merge操作,可以在本地先进行merge操作,merge操作可以通过函数自定义;
        groupByKey也是对每个key操作,不过是生成一个sequence,本身不能自定义函数,需要基于生成的RDD通过map进行函数转化操作;
        // wordcount计数:两者都可以实现,但具体内部运算过程是不一样的;
        val ls = List("python","scala","java","python","scala","scala")
        var rdd = sc.parallelize(ls)
        var pairRdd = rdd.map((_,1))
        val wordCountByReduce = pairRdd.reduceByKey(_ + _)
        // 使用groupByKey聚合map计数
        val wordCountByGroup = pairRdd.groupByKey().map(x => (x._1,x._2.sum))
        
        • reduceByKey具体实现过程
          在这里插入图片描述
        • groupByKey具体实现过程
          在这里插入图片描述
    • 计算平均值
    // 创建键值对rdd
    val rdd = sc.parallelize(Array(("python",3),("hadoop",2),("spark",5),(python,"5)))
    // 先分别进行求和和计数,再使用mapValues对value进行求平均操作
    rdd.mapValues((_,1)).reduceByKey((x,y) => (x._1 + y._1,x._2 + y._2)).mapValues(x => x._2 / x._1.toFloat).collect
    

    连接操作

    • 将有键的数据与另一组有键的数据可以通过键连接操作,pair RDD常用方法;连接方式有:内连接、左外连接、右外连接、交叉连接;具体作用类同于数据库中的连接;

    排序

    • sortByKey()函数接收一个ascending的参数,默认是true:升序;false为降序;默认是按当前键进行排序,也可以指定函数;
    • 默认是每个分区内排序,全局排序需要指定分区参数numPartitions=1
    val pairs = sc.parallelize(List((-100,1),(20,2)))
    pairs.sortByKey(ascending=false)      // 升序参数传入true,默认是true升序排序
    // 全局排序
    pairs.sortByKey(numPartitions=1)
    
    • sortBy()函数
    // 创建键值对rdd
    val rdd = sc.parallelize(Array(("a",1),("b",19),("c",9),("d",100),("e",30),("a",200)))
    // 按照key降序排序
    rdd.reduceByKey(_ + _).sortByKey(false).collect
    // res0: Array[(String, Int)] = Array((e,30), (d,100), (c,9), (b,19), (a,201))
    
    // 按照value降序排序
    rdd.reduceByKey(_ + _).sortBy(_._2,false).collect
    // rdd.reduceByKey(_ + _).sortBy(x => x._2,false).collect     下划线匿名函数
    // res1: Array[(String, Int)] = Array((a,201), (d,100), (e,30), (b,19), (c,9))
    
    // 使用sortByKey对value进行排序,通过map函数对换key与value
    rdd.map(x => (x._2,x._1)).sortByKey(false).map(x => (x._2,x._1)).foreach(println)
    

    分区

    RDD分区的一个分区原则是使得分区的个数尽量等于集群中的CPU核心数目;对于不同的Spark部署模式而言(本地模式、Standa Lone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值配置默认分区数目;
    本地模式:默认为本地机器的CPU数目,若设置了loacl[N],则默认为N;Apache Mesos默认分区数为8;Standalone或YARN:在‘集群中所有CPU核心数目总和’和‘2’二者中取较大值作为默认值;

    • 手动设置分区
    1. 创建RDD时:调用textFile和parallelize方法时手动指定分区个数;语法格式:textFile(path,partitionNum)
    2. 通过转化操作得到新的RDD时:直接调用repartition方法即可;
    // 手动设置为一个分区
    var rdd2 = rdd.repartition(1)
    rdd2.partition.size   // 查看分区个数:1
    var rdd2 = rdd.repartition(4)   // 设置分区个数为4
    
    • parallelize如果没有在方法中指定分区数,默认为spark.default.parallelism;textFile如果没有指定分区数,默认为min(defaultParallelism,2);HDFS中读取文件,分区数为文件分片数,比如128MB/片;
    • 分区案例
    import org.apache.spark.{Partitioner,SparkContext,SparkConf}
    
    // 自定义分区类,继承Partition类
    class UserPartition(numParts:Int) extends Partitioner{
      // 重写覆盖分区数
      override def numPartitions:Int = numParts
      // 覆盖分区号获取函数:分区划分规则
      override def getPartition(key:Any):Int = {
        key.toString.toInt%10        // 对10取余数
      }
    }
    
    object SetPartition {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf()
        val sc = new SparkContext(conf)
        // 模拟设置5个分区
        val data = sc.parallelize(1 to 10,5)
        // 根据尾号转变为10个分区,分别写到10个文件;
        data.map((_,1)).partitionBy(new UserPartition(10)).
          map(_._1).saveAsTextFile("file:///usr/local/output")
      }
    }
    

    共享变量

    • 当spark在集群的多个不同节点的多个任务上并行运行一个函数时,它会把函数中涉及到的每个变量在每个任务都生成一个副本,但有时候需要在多个任务之间共享变量,或者在任务(Task)和任务控制节点(Driver Program)之间共享变量,为了满足这种需求,spark提供了两种类型变量:广播变量(broadcast variables)和累加器(accumulators)。广播变量用来把变量在所有节点的内存之间进行共享;累加器支持在所有不同节点之间进行累加计算(比如计数和求和);
    • 广播变量允许程序开发人员在每个机器上缓存一个只读变量,而不是为机器每个任务都生成一个副本;spark的“行动”操作会跨越多个阶段(stage),对于每个阶段内的所有任务所需要的公共数据,spark都会自动进行广播;(只广播一次,减少通信开销)
      • 广播变量可以通过SparkContext.broadcast(v) 从普通变量v生成一个广播变量,通过value方法获取这个广播变量的值;
      • 广播变量被创建以后,集群中的任何函数应该使用广播变量broadcastVar的值而不是原始变量v的值,这样就不会把v重复分发到这些节点上;
      • 一旦广播变量创建后,普通变量v的值不能再发生修改,确保所有节点获得广播变量的值都是相同的;
      // 变量广播
      val broadcastVar = sc.broadcast(Array(3,4,5))
      // 访问广播变量值
      broadcastVar.value(1)    // 4
      
      • 广播变量示例
    import org.apache.spark.SparkConf
    import org.apache.spark.SparkContext
    
    object BroadCastValue {
    	def main(args:Array[String]):Unit = {
    		val conf = new SparkConf().setAppName("broadcastAppname").setMaster("local[*]")
    		// 获取SparkContext
    		val sc = SparkContext(conf)
    		// 创建广播变量:10
    		val broadcastVar = sc.broadcast(10)
    		// 创建一个测试Array
    		val intArray = Array(1,2,3,4,5,6,7)
    		// 转化为rdd(并行化)
    		val rdd = sc.parallelize(intArray)
    		// 使用map进行转化操作:所有变量乘以广播变量值:10
    		val res = rdd.map(_ * broadcastVar.value)
    		// 打印下结果
    		res.foreach(println)
    	}
    }
    
    • 累加器是仅仅被相关操作累加的变量,通常可以被用来是先计数器(counter)和求和(sum)。spark原生地支持数值型(numeric)的累加器,开发人员也可以编写对新类型的支持;
    • 一个数值型的累加器可以通过SparkContext.longAccumulator()或者SparkContext.doubleAccumulator()来创建运行在集群中的任务,就可以使用add方法来把数值累加到累加器上,但是这些任务只能做累加操作,不能读取累加器的值,只有任务控制节点(Driver Program)可以使用value方法来读取累加器的值;
    val accum = sc.longAccumulator("My Accumulator")
    // accum: org.apache.spark.util.LongAccumulator = LongAccumulator(id: 651, name: Some(My Accumulator), value: 0)
    sc.parallelize(Array(4,5,6,7,8)).foreach(x => accum.add(x))     // 进行累加操作
    accum.value      // 返回累加结果值:Long = 30
    

    数据读写

    本地文件数据读写

    本地文件读取

    读取本地文件,文件路径必须以**file:///**开头;转化操作是惰性计算,在未遇到行动操作时不会真实运行;所以假设在行动操作之前,假设文件不存在,也不会报错;

    val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/word.txt")
    // 如果是读取windows本地文件
    val textFile = sc.textFile("file:///F:/test.txt")
    

    文件写入本地

    使用saveAsTextFile+完成路径写入本地

    // 实际是生成word.txt文件夹,里面有数据分区块文件
    textFile.saveAsTextFile("file:///usr/local/spark/mycode/wordcount/wordback.txt")
    

    切换到这个目录下

    cd usr/local/spark/mycode/wordcount/wordback.txt/
    ls
    

    会出现
    part-0000:只有一个分区为part-0000,如果有第二个分区,会出现part-0001
    _SUCCES:表示成功
    加载保存的数据

    // 会读取这个目录下所有文件
    val textFile = sc.textFile("file:///usr/local/spark/mycode/wordcount/wordback.txt")
    

    HDFS文件读写

    文件读取

    // 路径localhost:9000是当时hdfs配置信息
    val textFile = sc.textFile("hdfs://loaclhost:9000/user/hadoop/word.txt")
    val textFile = sc.textFile("/user/hadoop/word.txt")
    val textFile = sc.textFile("word.txt")
    // 以上三条语句是等价的,都是读取hdfs-hadoop用户目录下的文件;这里的hadoop是用户名称
    

    把文件写入到HDFS

    val textFile = sc.textFile("word.txt")
    textFile.saveAsTextFile("writeback.txt")      // 文件写入到hdfs中,生成的也是一个目录
    

    JSON文件读写

    // spark安装目录下有这样一个样本文件
    val jsonRdd = sc.textFile("file:///usr/local/spark/examples/src/main/resources/people.json")
    

    scala自带有一个JSON库,scala.util.parsing.json.Json,可以解析JSON数据;
    JSON.parseFull(jsonString:String)函数,输入json字符串,解析成功返回Some(map:Map[String,Any]),失败返回None;

    操作示例

    连续n活跃问题

    • 取用户连续存在3日活跃;这里的日期暂时直接使用数字代替;
    // 生成rdd;用户+数据日期
    val rdd = sc.parallelize(List("123,1","234,4","123,2","123,5","344,5","123,3","234,6","007,5","007,7","007,6","234,9"))
    // 转化为键值对rdd
    val rdd1 = rdd.map(_.split(",")).map(e => (e(0) toInt,e(1)))
    // 找出用户连续活跃最大的天数
    val rdd2 = rdd1.groupByKey().mapValues(
      iter => {
        var num = 1
        var max = 1
        iter.map(x => x.toInt).toArray.sorted.reduce(    // 转化成数组升序排序
          (x,y) => {
            if(y - x == 1){
              num += 1       // 如果y,x差为1,num+1
              num
            }else{
              num=1       // 如果y,x差不为1,另num初始为1,重新累加
              num
            }
            if(max<num){max=num}    // 该变量用户记录用户最大连续值
            y     // reduce这里本身不需要进行其他计算返回,只需要比较前后两个数差值机型,返回y
          }
        )  
        max   // map返回用户最大连续值;
      }
    )
    // 取出有连续大等于3天活跃记录的用户,返回到driver节点上查看;(量级少)
    val result = rdd2.filter(_._2>=3).map(_._1).collect
    
    展开全文
  • RDD概念

    千次阅读 2019-03-15 20:37:21
    1 RDD的由来 RDD是Spark的基石,是实现Spark数据处理的核心抽象。那么RDD为什么会产生呢? Hadoop的MapReduce是一种基于数据集的工作模式,面向数据,这种工作模式一般是从存储上加载数据集,然后 操作数据集,最后...
  • 熟悉Spark的RDD基本操作及键值对操作; 熟悉使用RDD编程解决实际具体问题的方法

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 97,826
精华内容 39,130
关键字:

rdd