2017-10-18 14:52:18 gaoshui87 阅读数 1546

spark RDD join的核心过程

spark join的过程是查询过程中最核心的过程,怎么做到实现两个表的关联查询耗费资源最少。可看源码如下
join的实现在 PairRDDFunctions类当中。

def join[W](other: RDD[(K, W)], partitioner: Partitioner): RDD[(K, (V, W))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues( pair =>
  // _1 是左表,_2 是右表的值,这是一个笛卡尔积的过程,key 一样,左表和右表各一些数据
  for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, w)
)
}

可以看到上面,自身RDD和其它的RDD进行数据的关联,同时传进去partitioner对象

def cogroup[W](other: RDD[(K, W)], partitioner: Partitioner)
  : RDD[(K, (Iterable[V], Iterable[W]))] = self.withScope {
if (partitioner.isInstanceOf[HashPartitioner] && keyClass.isArray) {
  // hash分区方式不能用于key是数组的对象
  throw new SparkException("Default partitioner cannot partition array keys.")
}
// join操作中很核心的执行类
val cg = new CoGroupedRDD[K](Seq(self, other), partitioner)
cg.mapValues { case Array(vs, w1s) =>
  (vs.asInstanceOf[Iterable[V]], w1s.asInstanceOf[Iterable[W]])
}
}

然后创建 CoGroupedRDD 专门用于RDD的关联操作对象。我们现在完整分析CoGroupedRDD源码

override def getDependencies: Seq[Dependency[_]] = {
rdds.map { rdd: RDD[_] =>
  if (rdd.partitioner == Some(part)) {
    logDebug("Adding one-to-one dependency with " + rdd)
    // 该RDD和 join合并的分区partitioner一样
    new OneToOneDependency(rdd)
  } else {
    logDebug("Adding shuffle dependency with " + rdd)
    // 当partitioner不一样时,要对数据进行重新分区,就是shuff的过程
    new ShuffleDependency[K, Any, CoGroupCombiner](
      rdd.asInstanceOf[RDD[_ <: Product2[K, _]]], part, serializer)
  }
}
}

上面就是这个join的相关RDD依赖,如果part分区一样,就是OneToOneDependency依赖,不用进行hash拆分。否则
要关联的RDD和part的分区不一致时,就要对RDD进行重新hash分区,分到正确的分片上面,所以就要用ShuffleDependency 进行
hash分片数据,然后在正确的split分片处理业务进程中进行处理。

override def getPartitions: Array[Partition] = {
// 这里对数据进行分片,一个分片就在一台work进程中进行处理了
val array = new Array[Partition](part.numPartitions)
for (i <- 0 until array.length) {
  // Each CoGroupPartition will have a dependency per contributing RDD
  array(i) = new CoGroupPartition(i, rdds.zipWithIndex.map { case (rdd, j) =>
    // Assume each RDD contributed a single dependency, and get it
    dependencies(j) match {
      case s: ShuffleDependency[_, _, _] =>
        // 当这个数据要进行shuffler时
        None
      case _ =>
        // 当分区是一样时,就直接进行了
        Some(new NarrowCoGroupSplitDep(rdd, i, rdd.partitions(i)))
    }
  }.toArray)
}
// 这样就可以把关联的RDD拆成了numPartitions分了
array
}

上面就是对各个关联的数据进行hash分片了,就是有几个RDD,然后根据它们的key进行hash分片,分到正确的partition中,如果是 OneToOneDependency 就不用进行数据的再拆分片了,ShuffleDependency 就要通过传进去的part对key进行分片,把所有一样的key
分到同样的split数据分片当中。这样各个RDD一样的key就在一样的,就可以执行关联操作了。

override def compute(s: Partition, context: TaskContext): Iterator[(K, Array[Iterable[_]])] = {
// 在其中一个work进程中执行这一分区数据了
val split = s.asInstanceOf[CoGroupPartition]
// 依赖这么多RDD
val numRdds = dependencies.length

// A list of (rdd iterator, dependency number) pairs
// 拿这个分片的数据进行计算
val rddIterators = new ArrayBuffer[(Iterator[Product2[K, Any]], Int)]
for ((dep, depNum) <- dependencies.zipWithIndex) dep match {
  case oneToOneDependency: OneToOneDependency[Product2[K, Any]] @unchecked =>
    // 依赖于 depNum 那个RDD的分片数据
    val dependencyPartition = split.narrowDeps(depNum).get.split
    // Read them from the parent
    // 在这个work进程中读取这个分片数据
    val it = oneToOneDependency.rdd.iterator(dependencyPartition, context)
    rddIterators += ((it, depNum))

  case shuffleDependency: ShuffleDependency[_, _, _] =>
    // Read map outputs of shuffle
    // 说明之前对这个RDD 的数据进行分片hash过的了
    // 然后这里专门去拉取该分片对应的数据回来
    val it = SparkEnv.get.shuffleManager
      .getReader(shuffleDependency.shuffleHandle, split.index, split.index + 1, context)
      .read()
    rddIterators += ((it, depNum))
}
// 创建一个多个RDD的合并器
val map = createExternalMap(numRdds)
for ((it, depNum) <- rddIterators) {
  map.insertAll(it.map(pair => (pair._1, new CoGroupValue(pair._2, depNum))))
}
context.taskMetrics().incMemoryBytesSpilled(map.memoryBytesSpilled)
context.taskMetrics().incDiskBytesSpilled(map.diskBytesSpilled)
context.internalMetricsToAccumulators(
  InternalAccumulator.PEAK_EXECUTION_MEMORY).add(map.peakMemoryUsedBytes)
// 结果就这样排好序的了
new InterruptibleIterator(context,
  map.iterator.asInstanceOf[Iterator[(K, Array[Iterable[_]])]])
}

再来研究一下compute方法,这个方法就是对当前要计算的split进行处理的,上面已经对多个RDD进行hash分片了,然后把
相同的key都分片到这里来了,如果是oneToOneDependency就直接读取那个分片数据,否则就要启动对RDD的shuffle的过程
把一个RDD通过hash分到多个分片当中,然后该函数拉取自己需求的那一个分片数据。当该split需求的分片数据准备好后,就创建
下面的ExternalAppendOnlyMap 类进行对数据的排序关联功能了。

private def createExternalMap(numRdds: Int)
: ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner] = {
// 创建一个多个RDD的合并器,key value rdd_index
//  value._2 应该是rdd的index value._1 应该是value
// 初始化 rdd_index --> value
val createCombiner: (CoGroupValue => CoGroupCombiner) = value => {
  val newCombiner = Array.fill(numRdds)(new CoGroup)
  newCombiner(value._2) += value._1
  newCombiner
}
// 中间过程数据的合并 rdd_index --> value
val mergeValue: (CoGroupCombiner, CoGroupValue) => CoGroupCombiner =
  (combiner, value) => {
  combiner(value._2) += value._1
  combiner
}
// 最后所有 rdd数据的合并
val mergeCombiners: (CoGroupCombiner, CoGroupCombiner) => CoGroupCombiner =
  (combiner1, combiner2) => {
    var depNum = 0
    while (depNum < numRdds) {
      combiner1(depNum) ++= combiner2(depNum)
      depNum += 1
    }
    combiner1
  }
// key 在这个 对象里面自己管控,各业务方法只要缓存value和rdd_index的关系就好了
new ExternalAppendOnlyMap[K, CoGroupValue, CoGroupCombiner](
  createCombiner, mergeValue, mergeCombiners)
}

可以看到这里有一个公共的多RDD数据联合器,只要把数据往里面插入进去,就自动进行数据的关联操作了。
最后就返回排好序的InterruptibleIterator对象,实现多RDD的联合join。

下面再看下 left out join

def leftOuterJoin[W](
  other: RDD[(K, W)],
  partitioner: Partitioner): RDD[(K, (V, Option[W]))] = self.withScope {
this.cogroup(other, partitioner).flatMapValues { pair =>
  if (pair._2.isEmpty) {
    // 当是右表为空时,左表也要输出
    pair._1.iterator.map(v => (v, None))
  } else {
    for (v <- pair._1.iterator; w <- pair._2.iterator) yield (v, Some(w))
  }
}
}

可以看到,当是 left out join 时,底层也是一样关联的,只是在外面通过判断左表有值时,也进行输出。
同时,right out join也一样。

总结

  1. 传入多个RDD对象
  2. 判断该RDD对象是否和给定的part分区函数一致,如果是就直接拉取对应的分区,否则就shuffle,hash分片数据,然后拉取
  3. 把相同partition分片后的数据发到对应work进程中进行读取
  4. 然后在该work业务进程中,单独对这hash分片一致的数据进行关联操作
  5. 最后返回有序iterator对象
2015-10-17 21:48:56 u010220089 阅读数 1986
RDD key/value关联操作
val left = sc.parallelize(List(("spark",1),("hadoop",1),("storm",1)))
val left = sc.parallelize(List(("scala",1),("hadoop",1),("spark",1)))


关联2个RDD 

val joinOut = left join right 


res9: Array[(String, (Int, Int))] = Array((spark,(1,1)), (hadoop,(1,1)))


left.join(right) 


(left cogroup right).collect
2017-05-24 21:00:32 echo_ale 阅读数 1074

1.一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。
2.一个计算每个分区的函数。Spark中RDD的计算是以分片为单位的,每个RDD都会实现compute函数以达到这个目的。compute函数会对迭代器进行复合,不需要保存每次计算的结果。
3.RDD之间的依赖关系。RDD的每次转换都会生成一个新的RDD,所以RDD之间就会形成类似于流水线一样的前后依赖关系。在部分分区数据丢失时,Spark可以通过这个依赖关系重新计算丢失的分区数据,而不是对RDD的所有分区进行重新计算。
4.一个Partitioner,即RDD的分片函数。当前Spark中实现了两种类型的分片函数,一个是基于哈希的HashPartitioner,另外一个是基于范围的RangePartitioner。只有对于于key-value的RDD,才会有Partitioner,非key-value的RDD的Parititioner的值是None。Partitioner函数不但决定了RDD本身的分片数量,也决定了parent RDD Shuffle输出时的分片数量。
5.一个列表,存储存取每个Partition的优先位置(preferred location)。对于一个HDFS文件来说,这个列表保存的就是每个Partition所在的块的位置。按照“移动数据不如移动计算”的理念,Spark在进行任务调度的时候,会尽可能地将计算任务分配到其所要处理数据块的存储位置。

2018-11-02 08:01:41 wangpei1949 阅读数 1705

基本需求

将Keyed RDD[(Key,Value)]按Key保存到不同文件。

测试数据

数据格式:id,studentId,language,math,english,classId,departmentId

1,111,68,69,90,Class1,Economy
2,112,73,80,96,Class1,Economy
3,113,90,74,75,Class1,Economy
4,114,89,94,93,Class1,Economy
5,115,99,93,89,Class1,Economy
6,121,96,74,79,Class2,Economy
7,122,89,86,85,Class2,Economy
8,123,70,78,61,Class2,Economy
9,124,76,70,76,Class2,Economy
10,211,89,93,60,Class1,English
11,212,76,83,75,Class1,English
12,213,71,94,90,Class1,English
13,214,94,94,66,Class1,English
14,215,84,82,73,Class1,English
15,216,85,74,93,Class1,English
16,221,77,99,61,Class2,English
17,222,80,78,96,Class2,English
18,223,79,74,96,Class2,English
19,224,75,80,78,Class2,English
20,225,82,85,63,Class2,English

用Spark RDD实现

package com.bigData.spark

import org.apache.hadoop.mapred.lib.MultipleTextOutputFormat
import org.apache.log4j.{Level, Logger}
import org.apache.spark.HashPartitioner
import org.apache.spark.sql.SparkSession

/**
  * Author: Wang Pei
  * License: Copyright(c) Pei.Wang
  * Summary:
  *   RDD 按Key保存到不同文件
  */
object OutputToMultiFile {
  def main(args: Array[String]): Unit = {


    /**设置日志等级*/
    Logger.getLogger("org").setLevel(Level.WARN)

    /**spark环境*/
    val spark = SparkSession.builder().master("local[3]").appName(this.getClass.getSimpleName.replace("$","")).getOrCreate()

    /**Keyed RDD*/
    val data =spark.sparkContext.textFile("data/scores.csv")
        //Keyed RDD
        .map(item=>(item.split(",").takeRight(2).reverse.mkString("_"),item))
        //按Key Hash分区,4个Key分到4个Partition中
        .partitionBy(new HashPartitioner(4))


    /**按Key保存到不同文件*/
    data.saveAsHadoopFile("data/multiKeyedDir",
      classOf[String],
      classOf[String],
      classOf[PairRDDMultipleTextOutputFormat]
    )

    spark.stop()

  }

}

/**继承类重写方法*/
class PairRDDMultipleTextOutputFormat extends MultipleTextOutputFormat[Any, Any] {
  //1)文件名:根据key和value自定义输出文件名。
  override def generateFileNameForKeyValue(key: Any, value: Any, name: String): String ={
    val fileNamePrefix=key.asInstanceOf[String]
    val fileName=fileNamePrefix+"-"+name
    fileName
  }

  //2)文件内容:默认同时输出key和value。这里指定不输出key。
  override def generateActualKey(key: Any, value: Any): String = {
    null
  }
}

outputToMultiFile.png

2019-08-02 20:30:05 h1025372645 阅读数 25

RDD依赖分类

宽依赖

操作产生类似与MapReduce中shuffle的操作
– 子 RDD 的每个分区依赖于所有父 RDD 分区
– 对单个 RDD 基于 key 进行重组和 reduce ,如 groupByKey 、 reduceByKey
– 对两个 RDD 基于 key 进行 join 和重组,如 join、

窄依赖

操作不会产生类似与MapReduce中shuffle的操作
– 子 RDD 的每个分区依赖于常数个父分区(即与数据规模无关)
– 输入输出一对一的算子,且结果 RDD 的分区结构不变,主要是 map 、 flatMap
– 输入输出一对一,但结果 RDD 的分区结构发生了变化,如 union 、 coalesce
– 从输入中选择部分元素的算子,如 filter 、 distinct 、 subtract 、 sample

Spark中的RDD Transformation函数

创建的对象为懒加载
在这里插入图片描述

RDD Action函数

饿汉模式,先创建
在这里插入图片描述

Spark 框架的优势

数据结构RDD,用于存储管理数据

DAG调度

spark中每个job的调度都是DAG调度

DAG:有向无环图

(0)构建DAG图,
倒推法,配合依赖
(1)DAG图划分为多个stage,RDD直接产生了shuffle过程,就会划分stage
(2)按照顺序执行stage中task任务,每个stage中可有多个Task

Spark性能优化:RDD方法优化

对于RDD中某些函数使用注意

(1)能不使用groupByKey函数就不使用,除非不得已

可以使用reduceByKey函数
redcueByKey可以先进行本地聚合操作

(2)尽量使用XXPartition函数代替XX函数

xx函数:map/foreach/zip

def foreach(f: T => Unit): Unit
f:针对RDD中每个元素进行的操作处理的
def foreachPartition(f: Iterator[T] => Unit): Unit
f:针对RDD中每个分区的元素进行操作处理的
比如RDD中2个分区,100条数据,现将数据报道MYSQL表中

foreach
	item ->mysql
	connection ->创建100次
foreachPartition
		对每个分区中数据
		只要获取2个连接即可

(3)适当的降低或者增加RDD分区数目

RDD的分区对应一个Task处理数据
def repartition(numPartitions: Int) -产生shuffle
def coalesce(numPartitions: Int, shuffle: Boolean = false)
一开始的时候,数据量比较多,可以加到RDD分分区数,增加并行度(在集群资源充足的情况下)
当数据预处理之后(尤其过滤清洗之后)。RDD中数据量减少了很多,此时可以考虑减少分区的数目

Spark RDD之Partitioner

阅读数 7607

spark rdd 转换过程

阅读数 55

spark rdd 自动分区

阅读数 541

没有更多推荐了,返回首页