2019-12-09 16:40:50 weixin_43548518 阅读数 8
  • 阿里沈询:高并发网站中的数据库设计视频教程

    高并发网站中的数据库设计视频教程,该课程主要分为3个部分,1、数据库的基本组成:KV存储系统、查询优化原理、单机/多机事务概述;2、分布式存储、Key-Value的多机扩展、CAP和分布式系统的一致性;3、阿里数据库的一些最佳实践。 嘉宾介绍:王晶昱(花名:沈询),阿里巴巴技术讲师 。目前主要负责阿里的分布式数据库DRDS(TDDL)和阿里的分布式消息服务ONS(RocketMQ/Notify)两个系统。

    12082 人正在学习 去看看 CSDN讲师

kv 类型RDD 在java 显示:JavaPairRDD<String, Integer>
而不是JavaRDD<Tuple2<String, Integer>>
这个很重要不要搞错

spark RDD
2017-12-26 10:56:44 qq_36864672 阅读数 603
  • 阿里沈询:高并发网站中的数据库设计视频教程

    高并发网站中的数据库设计视频教程,该课程主要分为3个部分,1、数据库的基本组成:KV存储系统、查询优化原理、单机/多机事务概述;2、分布式存储、Key-Value的多机扩展、CAP和分布式系统的一致性;3、阿里数据库的一些最佳实践。 嘉宾介绍:王晶昱(花名:沈询),阿里巴巴技术讲师 。目前主要负责阿里的分布式数据库DRDS(TDDL)和阿里的分布式消息服务ONS(RocketMQ/Notify)两个系统。

    12082 人正在学习 去看看 CSDN讲师

RDD是什么?

RDD是Spark中的抽象数据结构类型,任何数据在Spark中都被表示为RDD。从编程的角度来看,RDD可以简单看成是一个数组。和普通数组的区别是,RDD中的数据是分区存储的,这样不同分区的数据就可以分布在不同的机器上,同时可以被并行处理。因此,Spark应用程序所做的无非是把需要处理的数据转换为RDD,然后对RDD进行一系列的变换和操作从而得到结果。本文为第一部分,将介绍Spark RDD中与Map和Reduce相关的API中。

如何创建RDD?

RDD可以从普通数组创建出来,也可以从文件系统或者HDFS中的文件创建出来。

举例:从普通数组创建RDD,里面包含了1到9这9个数字,它们分别在3个分区中。

scala> val a = sc.parallelize(1 to 9, 3)
a: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[1] at parallelize at <console>:12

举例:读取文件README.md来创建RDD,文件中的每一行就是RDD中的一个元素

scala> val b = sc.textFile("README.md")
b: org.apache.spark.rdd.RDD[String] = MappedRDD[3] at textFile at <console>:12

虽然还有别的方式可以创建RDD,但在本文中我们主要使用上述两种方式来创建RDD以说明RDD的API。

map

map是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。任何原RDD中的元素在新RDD中都有且只有一个元素与之对应。

举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> val b = a.map(x => x*2)
scala> a.collect
res10: Array[Int] = Array(1, 2, 3, 4, 5, 6, 7, 8, 9)
scala> b.collect
res11: Array[Int] = Array(2, 4, 6, 8, 10, 12, 14, 16, 18)

上述例子中把原RDD中每个元素都乘以2来产生一个新的RDD。

mapPartitions

mapPartitions是map的一个变种。map的输入函数是应用于RDD中每个元素,而mapPartitions的输入函数是应用于每个分区,也就是把每个分区中的内容作为整体来处理的。 
它的函数定义为:

def mapPartitions[U: ClassTag](f: Iterator[T] => Iterator[U], preservesPartitioning: Boolean = false): RDD[U]

f即为输入函数,它处理每个分区里面的内容。每个分区中的内容将以Iterator[T]传递给输入函数f,f的输出结果是Iterator[U]。最终的RDD由所有分区经过输入函数处理后的结果合并起来的。

举例:

scala> val a = sc.parallelize(1 to 9, 3)
scala> def myfunc[T](iter: Iterator[T]) : Iterator[(T, T)] = {
    var res = List[(T, T)]() 
    var pre = iter.next while (iter.hasNext) {
        val cur = iter.next; 
        res .::= (pre, cur) pre = cur;
    } 
    res.iterator
}
scala> a.mapPartitions(myfunc).collect
res0: Array[(Int, Int)] = Array((2,3), (1,2), (5,6), (4,5), (8,9), (7,8))

上述例子中的函数myfunc是把分区中一个元素和它的下一个元素组成一个Tuple。因为分区中最后一个元素没有下一个元素了,所以(3,4)和(6,7)不在结果中。 
mapPartitions还有些变种,比如mapPartitionsWithContext,它能把处理过程中的一些状态信息传递给用户指定的输入函数。还有mapPartitionsWithIndex,它能把分区的index传递给用户指定的输入函数。

mapValues

mapValues顾名思义就是输入函数应用于RDD中Kev-Value的Value,原RDD中的Key保持不变,与新的Value一起组成新的RDD中的元素。因此,该函数只适用于元素为KV对的RDD。

举例:

scala> val a = sc.parallelize(List("dog", "tiger", "lion", "cat", "panther", " eagle"), 2)
scala> val b = a.map(x => (x.length, x))
scala> b.mapValues("x" + _ + "x").collect
res5: Array[(Int, String)] = Array((3,xdogx), (5,xtigerx), (4,xlionx),(3,xcatx), (7,xpantherx), (5,xeaglex))

mapWith

mapWith是map的另外一个变种,map只需要一个输入函数,而mapWith有两个输入函数。它的定义如下:

def mapWith[A: ClassTag, U: ](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => U): RDD[U]
  • 第一个函数constructA是把RDD的partition index(index从0开始)作为输入,输出为新类型A;
  • 第二个函数f是把二元组(T, A)作为输入(其中T为原RDD中的元素,A为第一个函数的输出),输出类型为U。

举例:把partition index 乘以10,然后加上2作为新的RDD的元素。

val x = sc.parallelize(List(1,2,3,4,5,6,7,8,9,10), 3) 
x.mapWith(a => a * 10)((a, b) => (b + 2)).collect 
res4: Array[Int] = Array(2, 2, 2, 12, 12, 12, 22, 22, 22, 22)

flatMap

与map类似,区别是原RDD中的元素经map处理后只能生成一个元素,而原RDD中的元素经flatmap处理后可生成多个元素来构建新RDD。 
举例:对原RDD中的每个元素x产生y个元素(从1到y,y为元素x的值)

scala> val a = sc.parallelize(1 to 4, 2)
scala> val b = a.flatMap(x => 1 to x)
scala> b.collect
res12: Array[Int] = Array(1, 1, 2, 1, 2, 3, 1, 2, 3, 4)

flatMapWith

flatMapWith与mapWith很类似,都是接收两个函数,一个函数把partitionIndex作为输入,输出是一个新类型A;另外一个函数是以二元组(T,A)作为输入,输出为一个序列,这些序列里面的元素组成了新的RDD。它的定义如下:

def flatMapWith[A: ClassTag, U: ClassTag](constructA: Int => A, preservesPartitioning: Boolean = false)(f: (T, A) => Seq[U]): RDD[U]

举例:

scala> val a = sc.parallelize(List(1,2,3,4,5,6,7,8,9), 3)
scala> a.flatMapWith(x => x, true)((x, y) => List(y, x)).collect
res58: Array[Int] = Array(0, 1, 0, 2, 0, 3, 1, 4, 1, 5, 1, 6, 2, 7, 2,
8, 2, 9)

flatMapValues

flatMapValues类似于mapValues,不同的在于flatMapValues应用于元素为KV对的RDD中Value。每个一元素的Value被输入函数映射为一系列的值,然后这些值再与原RDD中的Key组成一系列新的KV对。

举例

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> val b = a.flatMapValues(x=>x.to(5))
scala> b.collect
res3: Array[(Int, Int)] = Array((1,2), (1,3), (1,4), (1,5), (3,4), (3,5))

上述例子中原RDD中每个元素的值被转换为一个序列(从其当前值到5),比如第一个KV对(1,2), 其值2被转换为2,3,4,5。然后其再与原KV对中Key组成一系列新的KV对(1,2),(1,3),(1,4),(1,5)。

reduce

reduce将RDD中元素两两传递给输入函数,同时产生一个新的值,新产生的值与RDD中下一个元素再被传递给输入函数直到最后只有一个值为止。

举例

scala> val c = sc.parallelize(1 to 10)
scala> c.reduce((x, y) => x + y)
res4: Int = 55

上述例子对RDD中的元素求和。

reduceByKey

顾名思义,reduceByKey就是对元素为KV对的RDD中Key相同的元素的Value进行reduce,因此,Key相同的多个元素的值被reduce为一个值,然后与原RDD中的Key组成一个新的KV对。

举例:

scala> val a = sc.parallelize(List((1,2),(3,4),(3,6)))
scala> a.reduceByKey((x,y) => x + y).collect
res7: Array[(Int, Int)] = Array((1,2), (3,10))

上述例子中,对Key相同的元素的值求和,因此Key为3的两个元素被转为了(3,10)。


1.概念 
RDD(Resilient Distributed Datasets,弹性分布式数据集)是一个分区的只读记录的集合。RDD只能通过在稳定的存储器或其他RDD的数据上的确定性操作来创建。我们把这些操作称作变换以区别其他类型的操作。例如 map,filter和join。 
RDD在任何时候都不需要被”物化”(进行实际的变换并最终写入稳定的存储器上)。实际上,一个RDD有足够的信息描述着其如何从其他稳定的存储器上的数据生成。它有一个强大的特性:从本质上说,若RDD失效且不能重建,程序将不能引用该RDD。而用户可以控制RDD的其他两个方面:持久化和分区。用户可以选择重用哪个RDD,并为其制定存储策略(比如,内存存储)。也可以让RDD中的数据根据记录的key分布到集群的多个机器。 这对位置优化来说是有用的,比如可用来保证两个要jion的数据集都使用了相同的哈希分区方式。

2.spark 编程接口 
对编程人员通过对稳定存储上的数据进行变换操作(如map和filter).而得到一个或多个RDD。然后可以调用这些RDD的actions(动作)类的操作。这类操作的目是返回一个值或是将数据导入到存储系统中。动作类的操作如count(返回数据集的元素数),collect(返回元素本身的集合)和save(输出数据集到存储系统)。spark直到RDD第一次调用一个动作时才真正计算RDD。 
还可以调用RDD的persist(持久化)方法来表明该RDD在后续操作中还会用到。默认情况下,spark会将调用过persist的RDD存在内存中。但若内存不足,也可以将其写入到硬盘上。通过指定persist函数中的参数,用户也可以请求其他持久化策略(如Tachyon)并通过标记来进行persist,比如仅存储到硬盘上或是在各机器之间复制一份。最后,用户可以在每个RDD上设定一个持久化的优先级来指定内存中的哪些数据应该被优先写入到磁盘。 
PS: 
缓存有个缓存管理器,spark里被称作blockmanager。注意,这里还有一个误区是,很多初学的同学认为调用了cache或者persist的那一刻就是在缓存了,这是完全不对的,真正的缓存执行指挥在action被触发。

说了一大堆枯燥的理论,我用一个例子来解释下吧: 
现在数据存储在hdfs上,而数据格式以“;”作为每行数据的分割:

"age";"job";"marital";"education";"default";"balance";"housing";"loan"
30;"unemployed";"married";"primary";"no";1787;"no";"no"
33;"services";"married";"secondary";"no";4789;"yes";"yes"
  • 1
  • 2
  • 3

scala代码如下:

 //1.定义了以一个HDFS文件(由数行文本组成)为基础的RDD
 val lines = sc.textFile("/data/spark/bank/bank.csv")
 //2.因为首行是文件的标题,我们想把首行去掉,返回新RDD是withoutTitleLines
 val withoutTitleLines = lines.filter(!_.contains("age"))
 //3.将每行数据以;分割下,返回名字是lineOfData的新RDD
 val lineOfData = withoutTitleLines.map(_.split(";"))
 //4.将lineOfData缓存到内存到,并设置缓存名称是lineOfData
 lineOfData.setName("lineOfData")
 lineOfData.persist
 //5.获取大于30岁的数据,返回新RDD是gtThirtyYearsData
 val gtThirtyYearsData = lineOfData.filter(line => line(0).toInt > 30)
 //到此,集群上还没有工作被执行。但是,用户现在已经可以在动作(action)中使用RDD。
 //计算大于30岁的有多少人
 gtThirtyYearsData.count
 //返回结果是3027
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15

OK,我现在要解释两个概念NO.1 什么是lineage?,NO.2 transformations 和 actions是什么? 
lineage
这里写图片描述

在上面查询大于30岁人查询里,我们最开始得出去掉标题行所对应的RDD lines,即为withTitleLines,接着对withTitleLines进行map操作分割每行数据内容,之后再次进行过滤age大于30岁的人、最后进行count(统计所有记录)。Spark的调度器会对最后的那个两个变换操作流水线化,并发送一组任务给那些保存了lineOfData对应的缓存分区的节点。另外,如果lineOfData的某个分区丢失,Spark将只在该分区对应的那些行上执行原来的split操作即可恢复该分区。 
所以在spark计算时,当前RDD不可用时,可以根据父RDD重新计算当前RDD数据,但如果父RDD不可用时,可以可以父RDD的父RDD重新计算父RDD。

transformations 和 actions

transformations操作理解成一种惰性操作,它只是定义了一个新的RDD,而不是立即计算它。相反,actions操作则是立即计算,并返回结果给程序,或者将结果写入到外存储中。

下面我以示例解释下:

这里写图片描述

先简单介绍这些吧,稍后文章我会详细介绍每个方法的使用,感兴趣可以看spark官方文档

3.RDDs接口5个特性

这里写图片描述

简单概括为:一组分区,他们是数据集的最小分片;一组 依赖关系,指向其父RDD;一个函数,基于父RDD进行计算;以及划分策略和数据位置的元数据。例如:一个表现HDFS文件的RDD将文件的每个文件块表示为一个分区,并且知道每个文件块的位置信息。同时,对RDD进行map操作后具有相同的划分。当计算其元素时,将map函数应用于父RDD的数据。

4.RDDs依赖关系

  1. 在spark中如何表示RDD之间的依赖关系分为两类: 
    窄依赖:每个父RDD的分区都至多被一个子RDD的分区使用,即为OneToOneDependecies; 
    宽依赖:多个子RDD的分区依赖一个父RDD的分区,即为OneToManyDependecies。 
    例如,map操作是一种窄依赖,而join操作是一种宽依赖(除非父RDD已经基于Hash策略被划分过了)
  2. 详细介绍: 
    首先,窄依赖允许在单个集群节点上流水线式执行,这个节点可以计算所有父级分区。例如,可以逐个元素地依次执行filter操作和map操作。相反,宽依赖需要所有的父RDD数据可用并且数据已经通过类MapReduce的操作shuffle完成。 
    其次,在窄依赖中,节点失败后的恢复更加高效。因为只有丢失的父级分区需要重新计算,并且这些丢失的父级分区可以并行地在不同节点上重新计算。与此相反,在宽依赖的继承关系中,单个失败的节点可能导致一个RDD的所有先祖RDD中的一些分区丢失,导致计算的重新执行。 
    对于hdfs:HDFS文件作为输入RDD。对于这些RDD,partitions代表文件中每个文件块的分区(包含文件块在每个分区对象中的偏移量),preferredLocations表示文件块所在的节点,而iterator读取这些文件块。 
    对于map:在任何一个RDD上调用map操作将返回一个MappedRDD对象。这个对象与其父对象具有相同的分区以及首选地点(preferredLocations),但在其迭代方法(iterator)中,传递给map的函数会应用到父对象记录。 
    再一个经典的RDDs依赖图吧 
    这里写图片描述

5.作业调度

当用户对一个RDD执行action(如count 或save)操作时, 调度器会根据该RDD的lineage,来构建一个由若干阶段(stage) 组成的一个DAG(有向无环图)以执行程序,如下图所示。 
每个stage都包含尽可能多的连续的窄依赖型转换。各个阶段之间的分界则是宽依赖所需的shuffle操作,或者是DAG中一个经由该分区能更快到达父RDD的已计算分区。之后,调度器运行多个任务来计算各个阶段所缺失的分区,直到最终得出目标RDD。 
调度器向各机器的任务分配采用延时调度机制并根据数据存储位置(本地性)来确定。若一个任务需要处理的某个分区刚好存储在某个节点的内存中,则该任务会分配给那个节点。否则,如果一个任务处理的某个分区,该分区含有的RDD提供较佳的位置(例如,一个HDFS文件),我们把该任务分配到这些位置。 
“对应宽依赖类的操作 {比如 shuffle依赖),会将中间记录物理化到保存父分区的节点上。这和MapReduce物化Map的输出类似,能简化数据的故障恢复过程。 
对于执行失败的任务,只要它对应stage的父类信息仍然可用,它便会在其他节点上重新执行。如果某些stage变为不可用(例如,因为shuffle在map阶段的某个输出丢失了),则重新提交相应的任务以并行计算丢失的分区。 
若某个任务执行缓慢 (即”落后者”straggler),系统则会在其他节点上执行该任务的拷贝,这与MapReduce做法类似,并取最先得到的结果作为最终的结果。 
这里写图片描述 
实线圆角方框标识的是RDD。阴影背景的矩形是分区,若已存于内存中则用黑色背景标识。RDD G 上一个action的执行将会以宽依赖为分区来构建各个stage,对各stage内部的窄依赖则前后连接构成流水线。在本例中,stage 1 的输出已经存在RAM中,所以直接执行 stage 2 ,然后stage 3。

6.内存管理

Spark提供了三种对持久化RDD的存储策略:未序列化Java对象存于内存中、序列化后的数据存于内存及磁盘存储。第一个选项的性能表现是最优秀的,因为可以直接访问在JAVA虚拟机内存里的RDD对象。在空间有限的情况下,第二种方式可以让用户采用比JAVA对象图更有效的内存组织方式,代价是降低了性能。第三种策略适用于RDD太大难以存储在内存的情形,但每次重新计算该RDD会带来额外的资源开销。

对于有限可用内存,Spark使用以RDD为对象的LRU回收算法来进行管理。当计算得到一个新的RDD分区,但却没有足够空间来存储它时,系统会从最近最少使用的RDD中回收其一个分区的空间。除非该RDD便是新分区对应的RDD,这种情况下,Spark会将旧的分区继续保留在内存,防止同一个RDD的分区被循环调入调出。因为大部分的操作会在一个RDD的所有分区上进行,那么很有可能已经存在内存中的分区将会被再次使用。

7.检查点支持(checkpoint) 
虽然lineage可用于错误后RDD的恢复,但对于很长的lineage的RDD来说,这样的恢复耗时较长。因此,将某些RDD进行检查点操作(Checkpoint)保存到稳定存储上,是有帮助的。 
通常情况下,对于包含宽依赖的长血统的RDD设置检查点操作是非常有用的,在这种情况下,集群中某个节点的故障会使得从各个父RDD得出某些数据丢失,这时就需要完全重算。相反,对于那些窄依赖于稳定存储上数据的RDD来说,对其进行检查点操作就不是有必要的。如果一个节点发生故障,RDD在该节点中丢失的分区数据可以通过并行的方式从其他节点中重新计算出来,计算成本只是复制整个RDD的很小一部分。 
Spark当前提供了为RDD设置检查点(用一个REPLICATE标志来持久化)操作的API,让用户自行决定需要为哪些数据设置检查点操作。 
最后,由于RDD的只读特性使得比常用的共享内存更容易做checkpoint,因为不需要关心一致性的问题,RDD的写出可在后台进行,而不需要程序暂停或进行分布式快照。


概念与特性

RDD是spark最重要的抽象。spark统一建立在抽象的RDD之上。设计一个通用的编程抽象,使得spark可以应对各种场合的大数据情景。RDD模型将不同的组件融合到一起,选用其中的几个/所有,可以应付各种不同的场景。解决了mr的缺陷 
1. 弹性分布式数据集Resilient Distributed Dataset。 
2. 只读分区数据集,final修饰的 
3. 一个分布式的数据集合,是spark中的核心,spark的操作都是围绕RDD展开的。 
4. 真正的运算是在各个计算节点。 
5. 当某个RDD操作丢失的时候,可以很快恢复。

分区

  • 不同分区可能被划分到不同机器上。但是每个分区对应一个数据block
  • 分区是个逻辑概念,新旧分区可能是同一块内存。(重要的优化,节约资源。)。在函数式编程,经常使用常量,但是很费内存,rdd的这种优化非常实用。防止内存的无限性扩充。
  • 只是记录需要做的操作。只有当真正要执行的时候,才具体的执行。

计算

  • 并行计算。计算/处理都是在各分区上,并行计算。并行,提高了效率。
  • 真正的数据处理都是在各个分散的节点上。

依赖

  • 子RDD从父RDD产生,父子RDD之间的关系。 
    • 宽依赖:依赖上级所有的RDD分区。宽依赖一般非常消耗资源,结果一般要缓存下来
    • 窄依赖:依赖上级RDD的部分分区。计算的时候可能都在同一个节点上,节省资源
  • stage以依赖的区别,分成不同的stage
  • 每个父RDD的分区,只能被最多一个字RDD使用,子RDD可以使用任意个父RDD。

RDD的操作

创建

  • 从外部数据集中读取。来源于文件系统(HDFS,HBASE),textFile方法 
    这里的路径要让每个RDD的节点都能访问到
// 从外部文本中创建RDD。可以指定分片的个数
lines =  sc.textFle("路径",3)
// 一个目录下所有的文件。
lines = sc.wholeTextFile("路径")
// 从hadoop系统创建RDD
val rdd = newApiHadoopFile()
// 从hadoop数据创建RDD
val RDD = new ApiHadoopRDD()
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 从驱动程序中对一个集合进行并行化 
    在测试的时候用的多,parallelize方法。可以指定分区个数
val lines = sc.parallelize(list["name","age"])
  • 1

转化(Transform)

  • 不进行具体操作,类似scala中的惰性求值
  • 从一个RDD生成另一个RDD的过程。spark用lineage的方式表示各个RDD的依赖关系,链表的表头是textFile
  • 参考fp中的概念,这里只做逻辑运算,接受一个RDD,结果产生一个RDD,没有任何副作用
  • RDD常见的转化操作

map                RDD.map(fun)              将函数应用于每个元素,结果返回一个RDD
flatmap            RDD.flatmap(fun)          同map,返回一个包含所有处理结果的整体。生成的分片数不变,只是在逻辑上成一个整体
filter             RDD.filter(fun)           过滤掉不符合要求的数据 
distinct           RDD.distinct()            去重,需要shuffle代价大
union              RDD.union(RDD1)           两个RDD求并集
intersection       RDD. intersection(RDD1)   两个RDD求交集
substract          RDD.substract(RDD1)       从RDD中移除RDD1的内容
cartesian          RDD.cartesian(RDD1)       生成RDD与RDD1的笛卡尔积
pipe               RDD.pipe("shell命令")      利用linux中的shell语言,对数据操作。
zip                RDD.zip(RDD1)             将RDD和RDD1组成一个kv格式的新RDD
sample             RDD.sample()              随机采样,返回RDD
-----------------------------------------------------------------------------
groupBy            RDD.groubBy(key)          根据key的取值,分组
reduceByKey        RDD.reduceByKey()         根据key的取值聚合
sortByKey          RDD.sortByKey()           根据key的值排序
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11
  • 12
  • 13
  • 14
  • 15
  • 16
  • 17

行动(Action)

  • 真正的开始处理和操作,强制执行所有的RDD
  • RDD常见的行动操作
collect()           RDD.collect()             返回RDD中的所有元素。只能计算小规模的RDD,否则要shuffle代价大
count()             RDD.count()               统计RDD中元素的个数
countByVale()       RDD.countByValue()        每个元素在RDD中出现的次数。
take()              RDD.take(n)               返回RDD中的n个元素
top()               RDD.top(N)                返回RDD中的前N个元素
takeOrdered()       RDD.takeOrdered(n)        按照要求的顺序返回前n个元素
takeSample()        RDD.takeSample(n)         从RDD中任意返回n个元素
reduce()            RDD.reduce(fun)           并行整合RDD中所有的元素
fold()              RDD.fold(num)(fun)        提供初始值,的reduce
aggregate()         RDD.aggregate()            ?????????
foreach()           RDD.foreach(fun)          对RDD中的 每个元素使用给定的函数
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

RDD常见操作

过滤

val textRDD = sc.textFile(path)
val resultRDD1 = textRDD.filter(_.split(" ")(0).equals("1"))
val resultRDD2 = resultRDD1.map(s => { | val columns = s.split(",") | val pm = columns(1)  | val host = columns(2).replaceAll("a","b") pm+""+host })
  • 1
  • 2
  • 3
  • 4

去重

val RDD = sc.textFile("path")
val R1 = RDD.map(s => { | val one = s.split(",") | val two = columns(0) | val three = columms(1) | (p2,p1) | })
val R2 = R1.distinct()
  • 1
  • 2
  • 3
  • 4

共享变量

广播变量

副本拷贝的方式获得

累加器

只能进行add操作

持久化

  • 提高了数据的可重用性
  • 把RDD中的结果持久化到内存中。当后续的操作需要用到某些RDD运算结果的时候,持久化到内存可以提高效率。主要有cahce方法和persist方法。
  • cache方法只有一个默认的缓存级别(MEMORY_ONLY),persist可以有许多级别。cache实际上是调用了persist方法
  • 当要缓存的内容太多,用LRU算法淘汰。
  • 保存
RDD.saveAsTextFile("路径")
  • 1
  • 持久化级别:
级别 描述
MEMORY_ONLY 只放在内存,超过的部分丢弃,不存储(默认的级别)
MEMORY_AND_DISK 放在内存,超过的放在磁盘上
MEMORY_ONLY_SER RDD作为序列化的对象存储
MEMORY_AND_DISK_SER 超过的部分放在disk上
DISK_ONLY 只放在disk上

工作流程

  • RDD把操作记录程DAG图,记录各个DAG中的转换关系
  • 无论进行了多少次转换,只有真正遇到action的时候才真正

一、 定义

1、 RDD定义

RDD是弹性分布式数据集(Resilient Distributed Dataset)的简称,其实就是分布式元素集合。在Spark中,对数据的所有操作不外乎创建RDD、转化已有的RDD、调用RDD操作进行求值。

2、 操作类型

RDD有两种类型的操作:Transformation操作、Action操作,Transformation操作和Action操作区别在于Spark计算RDD的方式不同。

  • Transformation操作会由一个RDD生成另一个新的RDD,生成的新的RDD是惰性求值的,只有在Action操作时才会被计算。
  • Action操作会对RDD计算出一个结果,并把结果返回到驱动器程序中,或者是把结果存储到外部存储系统中。

二、 Spark RDD五大特性

RDD是弹性分布式数据集,是Spark中最关键、最基本的一个抽象,他代表的是不可变的、分区的集合,这个集合可以被并行处理。

Internally, each RDD is characterized by five main properties:

  • A list of partitions
  • A function for computing each split
  • A list of dependencies on other RDDs
  • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
  • Optionally, a list of preferred locations to compute each split on (e.g. block locations for 
    an HDFS file)

1、 RDD是分区的

一个RDD对应的数据集合是分区的,各个分区分布在各个机器当中,每个RDD是不可变的,每个分区对应一个Task。 
查看rdd的分区数量:

scala> val rdd=sc.textFile("/input.txt")
scala> rdd.getNumPartitions
16/10/18 05:16:24 INFO mapred.FileInputFormat: Total input paths to process : 1
res2: Int = 2
  • 1
  • 2
  • 3
  • 4
  • 5

2、 每个分区都可以运用函数

每个分区上都应用于一个函数,各个分区运行的计算逻辑是一样的,只是数据不同。

3、 每个RDD都有一系列其他依赖

每个RDD依赖于一系列的RDD,因为有DAG,所以能找到依赖的RDD,目的是可以进行容错,当某一个RDD操作失败了,可以找到他的依赖进行重新计算。 
查看某一个rdd的血缘关系:


scala> val rdd=sc.textFile("/input.txt")
scala> var wordcountRdd=rdd.flatMap(line => line.split(" ")).map(word =>(word,1)).reduceByKey((a,b)=>(a+b))
scala> wordcountRdd.toDebugString
res3: String = 
(2) ShuffledRDD[4] at reduceByKey at <console>:29 []
 +-(2) MapPartitionsRDD[3] at map at <console>:29 []
    |  MapPartitionsRDD[2] at flatMap at <console>:29 []
    |  /input.txt MapPartitionsRDD[1] at textFile at <console>:27 []
    |  /input.txt HadoopRDD[0] at textFile at <console>:27 []
  • 1
  • 2
  • 3
  • 4
  • 5
  • 6
  • 7
  • 8
  • 9
  • 10
  • 11

4、 键值对RDD类型可以指定分区方式

对于键值对类型的RDD,可以制定一个分区方式。

5、 数据本地化

Task所运行的机器与处理的数据在同一个机器上,就是数据本地化。

三、 RDD创建方式

1、 并行化集合

scala> val list=List(1,2,3,4,5,6,7)
scala> val rddNum=sc.parallelize(list)
scala> rddNum.count
res0: Long = 7
  • 1
  • 2
  • 3
  • 4
  • 5

2、 从外部数据集读取

可以从HDFS读取文件加载数据或者HBbase加载数据。

scala> val rdd=sc.textFile("/input.txt")
res1: Long = 5
  • 1
  • 2
  • 3

四、 RDD三大Operation

1、 transformation(转换)

从一个RDD变为另外一个RDD的操作叫做transformation操作,transformation的操作都是懒操作,即不会立即执行,只有当进行action操作时才会真正的去执行。 
如:map()、filter()、distinct()

2、 action(执行)

action操作能返回Driver程序一个值或者是导出数据到外部系统。 
比如:count()、reduce()、collect()、take()

3、 Persistence(持久化)

缓存数据到内存、磁盘、外部的存储器。 
persist():缓存数据,可以指定缓存级别。 
cache():就是调用的persist(),缓存级别为内存。

(1) 缓存级别:

  • NONE:都不存储
  • DISK_ONLY:只存储在磁盘
  • DISK_ONLY_2:只存储在磁盘,存储两个副本
  • MEMORY_ONLY:只存放内存中
  • MEMORY_ONLY_2:只存放在内存中,存储两个副本。
  • MEMORY_ONLY_SER:只存储在内存中并序列化到磁盘
  • MEMORY_ONLY_SER_2:只存储在内存中并序列化到磁盘,存储两个副本
  • MEMORY_AND_DISK:优先放入内存,内存不够就放在硬盘
  • MEMORY_AND_DISK_2:优先放入内存,内存不够就放在硬盘,存储两个副本
  • MEMORY_AND_DISK_SER:优先放入内存,内存不够就放在硬盘,并序列化
  • MEMORY_AND_DISK_SER_2:优先放入内存,内存不够就放在硬盘,并序列化,存储两个副本
  • OFF_HEAP:外部存储。

(2) 何时进行缓存

有两种情况下要进行缓存: 
- RDD之后会被使用很多次 
- 某个RDD的数据是经过非常复杂的清洗过滤得到的

五、 Spark Application调度

1、 Spark Application组成结构图

这里写图片描述

(1) 一个Application有多个Job,一个Action操作会产生一个Job。 
(2) 每个Job有多个stage 
(3) 每个stage有多个task,每个task是一个线程 
(4) 每个task业务逻辑相同,数据不同

2、 Spark Application 运行架构

这里写图片描述

(1) Spark应用的运行架构主要分三部分:Driver、Executor、Cluster Manager。 
(2) Driver Program负责提交Job,Spark在Driver Program的main函数中,创建一个SparkContext的实例,使用SparkContext来提交Job。 
(3) Cluster Manager是个外部的服务,主要起管理资源的作用,它可能是standalone manager、Mesos,YARN。 
(4) Application:只指创建在Spark上的用户程序,包括集群上的Driver Program和Executors两部分。 
(5) Application Jar:一个jar包包含了用户的Spark Application,一个jar包可以有很多 Application,Jar包里不能包含Hadoop和Spark的包,Hadoop包和Spark包会在运行时被添加 
(6) Worker节点,是集群中用来运行代码的节点,是实际来干活的机器。 
(7) Executor负责来执行任务,运行在Worker节点上,是一个进程,在Executor上运行很多Task,每一个Task是一个线程。一个Executor只属于一个Application。 
(8) Task是一个工作的单位是一个线程,运行在Executor上。 
(9) Job是一个并行的计算,包含多个task,通常RDD一个Action执行时就触发一个Job。 
(10) Stage:每一个Job会被划分为几个Stage,每个Stage之间是相互依赖的。


2019-06-15 12:12:27 nengyu 阅读数 590
  • 阿里沈询:高并发网站中的数据库设计视频教程

    高并发网站中的数据库设计视频教程,该课程主要分为3个部分,1、数据库的基本组成:KV存储系统、查询优化原理、单机/多机事务概述;2、分布式存储、Key-Value的多机扩展、CAP和分布式系统的一致性;3、阿里数据库的一些最佳实践。 嘉宾介绍:王晶昱(花名:沈询),阿里巴巴技术讲师 。目前主要负责阿里的分布式数据库DRDS(TDDL)和阿里的分布式消息服务ONS(RocketMQ/Notify)两个系统。

    12082 人正在学习 去看看 CSDN讲师

1.SparkConf
    Spark配置对象,设置各种参数,使用kv类型。
2.SparkContext
    spark主要入口点,代表到spark集群的连接,可以创建
    rdd、累加器和广播变量。

    每个JVM中只能有一个SparkContext,启动新的SparkContext必须stop的原来的。 

    val rdd1 = sc.textFile()

3.RDD
    rdd有依赖列表.
    弹性分布式数据库,是spark的基本抽象,表示为不可变的、分区化的集合,可用于并行计算。
    该类包含基本操作,map、filter、persist。
    对于kv类型的rdd,方法封装在PairRDDFunction类中。
    轻量的集合,里面没有数据。

    内部有5大属性:
    1.分区列表
    2.计算每个切片的函数
    3.到其他RDD的依赖列表
    4.(可选)针对kv类型RDD的分区类
    5.(可选)计算每个的首选的位置列表。

RDD常见操作
------------------
    rdd都是延迟计算的,只有调用action方法时,才会触发job的提交。
    1.变换
        只要返回新的RDD就是transform。
        map
        filter
        flatMap
        mapPartitons                //对每个分区进行变换处理
        sample
        union
        distinct
        intersection
        groupByKey                    //没有combine过程,可以改变v类型
        reduceByKey                    //有combine过程,不能改变v类型
        join
    2.action
        2.1)collect
        2.2)foreachPartition        //迭代每个分区,

4.Dependency
    依赖,
    指的是子RDD的每个分区和父RDD的分区之间数量的对应关系。
    Dependency
        |
        |---NarrowDependency(窄依赖)
            |----OneToOne依赖(一对一依赖)
            |----Range依赖(范围依赖)
            |----Prune依赖(修剪依赖)

        |---ShuffleDependency(宽依赖)

4.Stage
    阶段是并行任务的集合,由调度器运行DAG图根据shuffle进行划分成若干stage。
    阶段分两种类型:ShuffleMapStage和ResultStage
    1.ShuffleMapStage
        该阶段的输出是下一个阶段的输入,跟踪每个节点的输出情况。
        一个阶段会重试执行多次处于容错考虑。
        由多个ShuffleMapTask构成。

    2.ResultStage
        在某些分区上应用计算函数,有些操作例如take(n)/first()没必要在所有分区上执行的。
        结果阶段的输出结果回传给driver.
        由多个ResultTask构成。

    
5.Task
    Spark执行的最小单位,有两种类型,和Stage相对。
    1.ShuffleMapTask
        
    2.ResultTask
        执行任务,并将结果回传给driver。

6.job
    每个action是一个job。

7.Application
    一个应用有多个job,对应一个SparkContext。

2018-06-24 20:59:40 IT_NEU_Lee 阅读数 246
  • 阿里沈询:高并发网站中的数据库设计视频教程

    高并发网站中的数据库设计视频教程,该课程主要分为3个部分,1、数据库的基本组成:KV存储系统、查询优化原理、单机/多机事务概述;2、分布式存储、Key-Value的多机扩展、CAP和分布式系统的一致性;3、阿里数据库的一些最佳实践。 嘉宾介绍:王晶昱(花名:沈询),阿里巴巴技术讲师 。目前主要负责阿里的分布式数据库DRDS(TDDL)和阿里的分布式消息服务ONS(RocketMQ/Notify)两个系统。

    12082 人正在学习 去看看 CSDN讲师

spark上下文对象,是spark程序的主入口点,负责连接到spark cluster。

    一旦有了上下文,就可以创建RDD,子集群上创建累加器和广播变量

每个jvm只能激活一个sparkcontext,创建新的时候必须停止前一个

sparkcontext需要传入sparkconf   ,用来设置spark参数,参数是kv对


RDD:是不可变的,可分区的元素集合,可进行并行操作。该类包含了用于所有RDD之上的基本操作

   在RDD内部,每个RDD有5个特征:

       1有一个分区列表

       2每个split都有一个计算函数

       3存放是parent的依赖列表

       4(可选)基于kv对的分区器

       5(可选)首选的位置列表

val lines(就是RDD)=textFile()用来读取hdfs内容,或者本地文件 file:///home/ddd.txt      lines.first    lines.take(2)提取前两行


每个spark程序应由driver构成,由他启动各种并行操作。driver含有main函数和分布式数据集,并对他们应用

各种操作,如:spark-shell本身就是一个driver(包含main方法  )

Driver通过sparkContext访问spark  代表spark和集群的连接,运行程序时候,驱动程序需要管理一些叫做executor的节点


spark-shell  默认使用local模式运行spark程序,并没有用到spark集群,类似于hadoop的本地模式

     sparkshell可以带参数  --master  local[2] 在本地开启两个线程模拟spark集群     

独立应用程序:standalone application

  写好Scala文件后  先使用Scalac命令进行编译:scalac  -cp /usr/apps/spark/lib/spark-assembly-2.2.1-hadoop-2.7.1.jar: `hadoop classpath`:   . -d target wcApp.scala    然后用Scala命令运行

  或者使用maven进行编译:mvn clean && mvn compile && mvn package   然后就会生成相应的jar包

每个RDD被切分成不同的分区,每个分区在不同的节点上计算 

持久化:persist()  用来重用  不用重复计算结果  只用计算一次  如执行rdd.count  

 spark默认持久化对象到jvm heap中 没有串行化     或者串行化到磁盘或者离堆区中

               创建RDD有两种方式:1 textFile读取文件     和     2  parallelize(1 to 100) 



伪集合操作:  

             1.rdd1=tom tomas tomasLee       rdd2 = tomas  tomasLee jerry bob

             2. rdd.distinct()去重

             3.  union  联合

            4. intersection   交集 

             5.substract

笛卡尔积:

           rdd1.cartesion(rdd2)


      rdd.sample(false,0.8)     (某个元素是否可以被采样多次,采样比)


Action:

       rdd.reduce((x,y)=>x+y)   即x+y=>x   然后用新的x+下一个y    (1,2,3,4,5)变成 1+2+3+4+5

       fold()  有一个初始值  和reduce类似  如初始值10  则:10+1+3+4+5

       聚合:aggregate():    https://blog.csdn.net/huanbia/article/details/51436822

      collect()返回所有元素

      countByValue()   rdd中每个元素出现的次数

     top(n)提取末尾的n个元素

     takeOrdered()     

    mean()   平均值 


操作key-value:

      PairRDD   (1,2)

      reduceByKey()   groupByKey()    combinByKey()       mapValues(x=>x+1)

      flatMapValues(x=>{x to 5})       keys()取出key

        rdd1.subtractByKey(rdd2)   key 一样  就减去

       rdd1.join(rdd2)=        a:(1,2),(3,4),(3,6) join (3,9)   =array((3,(4,9)),(3,(6,9)))  如图:

   

    rdd1.rightOuterJoin(rdd2)

    rdd1.fileter{case(key,value)=>value<5}   找出value小于5的


aggregations:聚合

1.


  2.     rdd.countByValue() 等价于rdd.map(x=>(x,1)).reduceByKey((x,y)=>x+y)

 3      combineByKey    

  调整并发程度

             rdd.reduceByKey(f:op,numParitions:Int)   (函数,并发程度)

检测rdd分区数

     rdd.repartition(4) 用来重新制定分区数   rd.coalesce(4)也可以用来指定分区 数目

     rdd.partitions.size


隐式常量

  

PairRDD可用的action:

          countByKey         collectAsMap() 转换成map键值对,并且去掉key重复的数据     lookup(key) 找到相同key的value值


data分区;(通信昂贵,需要最小化网络流量)在所有的kv RDD对上,可以进行分区,引发系统

对元素按照key进行分组  这样让同样key的分到同一个节点上去

rdd.partitionBy(new HashPartitioner(100)).persist()     传入hash分区器100个分区然后根据键进行分区

rdd.partitioner   获取分区器   rdd.isDefined  判断分区器是否定义   rdd.get()获得分区对象

分区有益的方法:cogroup()  Join()  groupByKey() reduceByKey() groupWith()   loopup()  combineByKey  这些操作都会产生分区


读取文件:textFile()            wholeTextFiles()


2019-12-15 21:36:00 hyunbar 阅读数 6
  • 阿里沈询:高并发网站中的数据库设计视频教程

    高并发网站中的数据库设计视频教程,该课程主要分为3个部分,1、数据库的基本组成:KV存储系统、查询优化原理、单机/多机事务概述;2、分布式存储、Key-Value的多机扩展、CAP和分布式系统的一致性;3、阿里数据库的一些最佳实践。 嘉宾介绍:王晶昱(花名:沈询),阿里巴巴技术讲师 。目前主要负责阿里的分布式数据库DRDS(TDDL)和阿里的分布式消息服务ONS(RocketMQ/Notify)两个系统。

    12082 人正在学习 去看看 CSDN讲师

大多数的 Spark 操作可以用在任意类型的 RDD 上, 但是有一些比较特殊的操作只能用在key-value类型的 RDD 上.

这些特殊操作大多都涉及到 shuffle 操作, 比如: 按照 key 分组(group), 聚集(aggregate)等.

在 Spark 中, 这些操作在包含对偶类型(Tuple2)的 RDD 上自动可用(通过隐式转换).

object RDD {
  implicit def rddToPairRDDFunctions[K, V](rdd: RDD[(K, V)])
    (implicit kt: ClassTag[K], vt: ClassTag[V], ord: Ordering[K] = null): PairRDDFunctions[K, V] = {
    new PairRDDFunctions(rdd)
  }

键值对的操作是定义在PairRDDFunctions类上, 这个类是对RDD[(K, V)]的装饰.

1、partitionBy

作用: 对pairRDD 进行分区操作,如果原有的 partionRDD 的分区器和传入的分区器相同, 则返回原pairRDD,否则会生成 ShuffleRDD,即会产生 shuffle过程

def partitionBy(partitioner: Partitioner): RDD[(K, V)] = self.withScope {
  
  if (self.partitioner == Some(partitioner)) {
    self
  } else {
    new ShuffledRDD[K, V, V](self, partitioner)
  }
}
scala> val rdd1 = sc.parallelize(Array((1, "a"), (2, "b"), (3, "c"), (4, "d")))
scala> rdd1.partitions.length
res1: Int = 2

scala> rdd1.partitionBy(new org.apache.spark.HashPartitioner(3)).partitions.length
res3: Int = 3

2、reduceByKey(func,[numTasks])

作用: 在一个(K,V)的 RDD 上调用,返回一个(K,V)的 RDD,使用指定的reduce函数,将相同key的value聚合到一起,reduce任务的个数可以通过第二个可选的参数来设置。

scala> val rdd1 = sc.parallelize(List(("female",1),("male",5),("female",5)("male",2)))
scala> rdd1.reduceByKey(_ + _)

scala> res1.collect
res2: Array[(String, Int)] = Array((female,6), (male,7))

3、groupByKey()

作用: 按照key进行分组.

scala> val rdd1 = sc.parallelize(Array("hello", "world", "h", "hello", "are", "go"))

scala> val rdd2 = rdd1.map((_, 1))
rdd2: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[3] at map at <console>:26

scala> rdd2.groupByKey()
res3: org.apache.spark.rdd.RDD[(String, Iterable[Int])] = ShuffledRDD[4] at groupByKey at <console>:29

scala> res3.collect
res4: Array[(String, Iterable[Int])] = Array((are,CompactBuffer(1)), (hello,CompactBuffer(1, 1)), (go,CompactBuffer(1)), (h,CompactBuffer(1)), (world,CompactBuffer(1)))

scala> res3.map(t => (t._1, t._2.sum))
res5: org.apache.spark.rdd.RDD[(String, Int)] = MapPartitionsRDD[5] at map at <console>:31
                
scala> res5.collect
res7: Array[(String, Int)] = Array((are,1), (hello,2), (go,1), (h,1), (world,1))

注意:

(1)基于当前的实现,groupByKey必须在内存中持有所有的键值对 . 如果一个key有太多的value, 则会导致内存溢出(OutOfMemoryError)

(2)所以这操作非常耗资源, 如果分组的目的是为了在每个key上执行聚合操作(比如: sum 和 average), 则应该使用PairRDDFunctions.aggregateByKey或者PairRDDFunctions.reduceByKey, 因为他们有更好的性能(会先在分区进行预聚合)

4、reduceByKey和groupByKey的区别

(1)reduceByKey:按照Key进行聚合,在Shuffle之前有combine(预聚合)操作,返回结果是RDD[k,v]

  def reduceByKey(partitioner: Partitioner, func: (V, V) => V): RDD[(K, V)] = self.withScope {
    combineByKeyWithClassTag[V]((v: V) => v, func, func, partitioner)
  }
  def combineByKeyWithClassTag[C](
      createCombiner: V => C,
      mergeValue: (C, V) => C,
      mergeCombiners: (C, C) => C,
      partitioner: Partitioner,
      mapSideCombine: Boolean = true,
      serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
    require(mergeCombiners != null, "mergeCombiners must be defined") // required as of Spark 0.9.0
    if (keyClass.isArray) {
      if (mapSideCombine) {
        throw new SparkException("Cannot use map-side combining with array keys.")
      }
      if (partitioner.isInstanceOf[HashPartitioner]) {
        throw new SparkException("HashPartitioner cannot partition array keys.")
      }
    }
    val aggregator = new Aggregator[K, V, C](
      self.context.clean(createCombiner),
      self.context.clean(mergeValue),
      self.context.clean(mergeCombiners))
    if (self.partitioner == Some(partitioner)) {
      self.mapPartitions(iter => {
        val context = TaskContext.get()
        new InterruptibleIterator(context, aggregator.combineValuesByKey(iter, context))
      }, preservesPartitioning = true)
    } else {
      new ShuffledRDD[K, V, C](self, partitioner)
        .setSerializer(serializer)
        .setAggregator(aggregator)
        .setMapSideCombine(mapSideCombine)
    }
  }

(2)groupByKey:按照key进行分组,直接进行shuffle

  def groupByKey(partitioner: Partitioner): RDD[(K, Iterable[V])] = self.withScope {
    // groupByKey shouldn't use map side combine because map side combine does not
    // reduce the amount of data shuffled and requires all map side data be inserted
    // into a hash table, leading to more objects in the old gen.
    val createCombiner = (v: V) => CompactBuffer(v)
    val mergeValue = (buf: CompactBuffer[V], v: V) => buf += v
    val mergeCombiners = (c1: CompactBuffer[V], c2: CompactBuffer[V]) => c1 ++= c2
    val bufs = combineByKeyWithClassTag[CompactBuffer[V]](
      createCombiner, mergeValue, mergeCombiners, partitioner, mapSideCombine = false)
    bufs.asInstanceOf[RDD[(K, Iterable[V])]]
  }

5、aggregateByKey(zeroValue)(seqOp, combOp, [numTasks])

函数声明:

    /**
   * Aggregate the values of each key, using given combine functions and a neutral "zero value".
   * This function can return a different result type, U, than the type of the values in this RDD,
   * V. Thus, we need one operation for merging a V into a U and one operation for merging two U's,
   * as in scala.TraversableOnce. The former operation is used for merging values within a
   * partition, and the latter is used for merging values between partitions. To avoid memory
   * allocation, both of these functions are allowed to modify and return their first argument
   * instead of creating a new U.
   */
  def aggregateByKey[U: ClassTag](zeroValue: U)(seqOp: (U, V) => U,
      combOp: (U, U) => U): RDD[(K, U)] = self.withScope {
    aggregateByKey(zeroValue, defaultPartitioner(self))(seqOp, combOp)
  }

使用给定的 combine 函数和一个初始化的zero value, 对每个key的value进行聚合.

这个函数返回的类型U不同于源 RDD 中的V类型. U的类型是由初始化的zero value来定的. 所以, 我们需要两个操作: -

一个操作(seqOp)去把 1 个v变成 1 个U - 另外一个操作(combOp)来合并 2 个U

一个操作用于在一个分区进行合并, 第二个操作用在两个分区间进行合并.

为了避免内存分配, 这两个操作函数都允许返回第一个参数, 而不用创建一个新的U

(1) eroValue:给每一个分区中的每一个key一个初始值;

(2)seqOp:函数用于在每一个分区中用初始值逐步迭代value;

(3)combOp:函数用于合并每个分区中的结果。

创建一个 pairRDD,取出每个分区相同key对应值的最大值,然后相加

import org.apache.spark.{SparkConf, SparkContext}

/**
 * Author z
 * Date 2019-12-09 15:39:08
 */
object AggregateByKey {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("MySqlRead").setMaster("local[2]")
        
        val sc = new SparkContext(conf)
        
        val rdd = sc.parallelize(List(("a", 3), ("a", 2), ("c", 4), ("b", 3), ("c", 6), ("c", 8)), 2)
        
        /*     val rdd2 = rdd.aggregateByKey((Int.MinValue, Int.MaxValue))(
                 (x, y) => (x._1.max(y), x._2.min(y)),
                 (x, y) => (x._1 + y._1, x._2 + x._2)
             )*/
        val rdd2 = rdd.aggregateByKey((Int.MinValue, Int.MaxValue))(
            {   //分区内相同key的(最大值,最小值)
                case (kv, e) => (kv._1.max(e), kv._2.min(e))
            },
            {   //两个分区间数据的合并
                case (kv1, kv2) => (kv1._1 + kv2._1, kv1._2 + kv2._2)
            }
        )
        
        // 计算出来每个key对应的值的平均值!!
        /*  val rdd2=rdd.aggregateByKey((0, 0))(
              {     //(sum,count)即为zero value,每个key
                  case ((sum, count), e) => (sum + e, count + 1)
              },
              {
                  case ((sum1,count1),(sum2,count2)) => (sum1 + sum2, count1 + count2)
              }
          )*/
        
        //val rdd3 = rdd2.mapValues(kv => kv._1.toDouble / kv._2)
        rdd2.collect().foreach(println)
    }
}

6、foldByKey

参数: (zeroValue:V)(func: (V, V) => V): RDD[(K, V)]

作用:aggregateByKey的简化操作seqop和combop相同

object FoldLeft {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("FoldLeft").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd= sc.parallelize(Array(("c","3"), ("c","2"), ("c","4"), ("c","3"), ("c","6"), ("c","8")), 3)
        
        // foldByKey来说, 0值, 每个分区内用一次. 重点: 分区间合并的时候, 零值不参与
        val res = rdd.foldByKey("-")(_ + _)
        res.collect.foreach(println)               
        sc.stop()
        
    }
}

7、combineByKey

def combineByKey[C](
                       createCombiner: V => C,
                       mergeValue: (C, V) => C,
                       mergeCombiners: (C, C) => C): RDD[(K, C)] = self.withScope {
    combineByKeyWithClassTag(createCombiner, mergeValue, mergeCombiners,
        partitioner, mapSideCombine, serializer)(null)
}

作用: 针对每个K, 将V进行合并成C, 得到RDD[(K,C)]

参数描述:

(1)createCombiner: combineByKey会遍历分区中的每个key-value对. 如果第一次碰到这个key, 则调用createCombiner函数,传入value, 得到一个C类型的值.(如果不是第一次碰到这个 key, 则不会调用这个方法)

(2)mergeValue: 如果不是第一个遇到这个key, 则调用这个函数进行合并操作. 分区内合并

(3)mergeCombiners 跨分区合并相同的key的值(C). 跨分区合并

创建一个 pairRDD,根据 key 计算每种 key 的value的平均值。(先计算每个key出现的次数以及可以对应值的总和,再相除得到结果

object CombineByKey {
    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("MySqlRead").setMaster("local[2]")
        val sc = new SparkContext(conf)
        val rdd = sc.parallelize(Array(("a", 88), ("b", 95), ("a", 91), ("b", 93), ("a", 95), ("b", 98)), 2)
        val rdd2:RDD[(String,(Int,Int))] = rdd.combineByKey(
            (_, 1),
            {
                case ((sum: Int, count: Int), e:Int) => (sum + e, count + 1)
            },
            {
                case ((sum1: Int, count1: Int), (sum2:Int, count2:Int)) => (sum1 + sum2, count1 + count2)
            }
        )
        val rdd3 = rdd2.mapValues {
            case (sum, count) => (sum, count, sum.toDouble / count)
        }               
        rdd3.collect.foreach(println)
    }
}

8、sortByKey

作用: 在一个(K,V)的 RDD 上调用, K必须实现 Ordered[K] 接口(或者有一个隐式值: Ordering[K]), 返回一个按照key进行排序的(K,V)的 RDD

object SorkByKey {
    //1. 冥界召唤,需要样例类
   /* implicit val ord = new Ordering[User]{
        override def compare(x: User, y: User): Int = x.age - y.age
    }
    */
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SorkByKey").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
//        val rdd = sc.parallelize(Array((1, "a"), (10, "b"), (11, "c"), (4, "d"), (20, "d"), (10, "e")))
//        val res: RDD[(Int, String)] = rdd.sortByKey(ascending = false, numPartitions = 10)
        val rdd = sc.parallelize(Array(User(10, "a"), User(8, "c"), User(12, "b"))).map((_, 1))
        val res: RDD[(User, Int)] = rdd.sortByKey()
        
        res.collect.foreach(println)
        sc.stop()
        
    }
}
//
//case class User(id:Int,name:String)
//2. 继承 Ordered
case class User(age: Int, name:String) extends Ordered[User] {
    override def compare(that: User): Int = this.age - that.age
}

9、mapValues

作用: 针对(K,V)形式的类型只对V进行操作

scala> val rdd = sc.parallelize(Array((1, "a"), (10, "b"), (11, "c"), (4, "d"), (20, "d"), (10, "e")))

scala> rdd.mapValues("<" + _ + ">").collect
res29: Array[(Int, String)] = Array((1,<a>), (10,<b>), (11,<c>), (4,<d>), (20,<d>), (10,<e>))

10、join(otherDataSet,[numTasks])

内连接:在类型为(K,V)和(K,W)的 RDD 上调用,返回一个相同 key 对应的所有元素对在一起的(K,(V,W))的RDD

object Join {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("Join").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        var rdd1 = sc.parallelize(Array((1, "a"), (1, "b"), (2, "c"), (4, "d")))
        var rdd2 = sc.parallelize(Array((1, "aa"),(1, "bb"), (3, "bb"), (2, "cc")), 3)
        // 内连接
//        val res: RDD[(Int, (String, String))] = rdd1.join(rdd2)
//        var res = rdd1.leftOuterJoin(rdd2)
//        val res: RDD[(Int, (Option[String], String))] = rdd1.rightOuterJoin(rdd2)
        val res = rdd1.rightOuterJoin(rdd2)
        println(res.partitions.length)
        res.collect.foreach(println)
        sc.stop()        
    }
}

(1) 如果某一个 RDD 有重复的 Key, 则会分别与另外一个 RDD 的相同的 Key进行组合.

(2)也支持外连接: leftOuterJoin,rightOuterJoin,fullOuterJoin.

11、cogroup(otherDataSet,[numTasks])

作用:在类型为(K,V)和(K,W)的 RDD 上调用,返回一个(K,(Iterable<V>,Iterable<W>))类型的 RDD

object Cogroup {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("Cogroup").setMaster("local[2]")
        val sc: SparkContext = new SparkContext(conf)
        val rdd1 = sc.parallelize(Array((1, 10), (2, 20), (1, 100), (3, 30)), 1)
        val rdd2 = sc.parallelize(Array((1, "a"), (2, "b"), (1, "aa"), (3, "c")), 1)
        val res: RDD[(Int, (Iterable[Int], Iterable[String]))] = rdd1.cogroup(rdd2)
        res.collect.foreach(println)
        sc.stop()
    }
}
(1,(CompactBuffer(10, 100),CompactBuffer(a, aa)))
(3,(CompactBuffer(30),CompactBuffer(c)))
(2,(CompactBuffer(20),CompactBuffer(b)))

Spark SQL 访问Hbase

阅读数 576

spark中cogroup用法

阅读数 1181

没有更多推荐了,返回首页