spark RDD
2016-01-20 10:18:06 sprite101 阅读数 40

RDD是Spark中对数据和计算的抽象,是Spark中最核心的概念,它表示已被分片(partition),不可变的并能够被并行操作的数据集合。

对RDD的操作分为两种transformation和action。Transformation操作是通过转换从一个或多个RDD生成新的RDD。Action操作是从RDD生成最后的计算结果。

在Spark中,提供丰富的transformation和action操作,比起MapReduce计算模型中仅有的两种操作,会大大简化程序开发的难度。

RDD的生成方式只有两种,一是从数据源读入,另一种就是从其它RDD通过transformation操作转换。一个典型的Spark程序就是通过Spark上下文环境(SparkContext)生成一个或多个RDD,在这些RDD上通过一系列的transformation操作生成最终的RDD,最后通过调用最终RDD的action方法输出结果。

每个RDD都可以用下面5个特性来表示,其中后两个为可选的:

分片列表(数据块列表)

计算每个分片的函数

对父RDD的依赖列表

对key-value类型的RDD的分片器(Partitioner)(可选)

每个数据分片的预定义地址列表(如HDFS上的数据块的地址)(可选)

虽然Spark是基于内存的计算,但RDD不光可以存储在内存中,根据useDisk、useMemory、useOffHeap, deserialized、replication五个参数的组合Spark提供了12种存储级别,

在后面介绍RDD的容错机制时,我们会进一步理解。值得注意的是当StorageLevel设置成OFF_HEAP时,RDD实际被保存到Tachyon中。Tachyon是一个基于内存的分布式文件系统,目前正在快速发展

DAG、Stage与任务的生成

Spark的计算发生在RDD的action操作,而对action之前的所有transformation,Spark只是记录下RDD生成的轨迹,而不会触发真正的计算。

Spark内核会在需要计算发生的时刻绘制一张关于计算路径的有向无环图,也就是DAG。举个例子,下图中,

 

                                                         A-->B

                                                        /   \

                                           input-->   E--> F -->output

                                                        \    /

                                                         C-->D

从输入中逻辑上生成A和C两个RDD,经过一系列transformation操作,逻辑上生成了F,注意,我们说的是逻辑上,因为这时候计算没有发生, Spark内核做的事情只是记录了RDD的生成和依赖关系。当F要进行输出时,也就是F进行了action操作,Spark会根据RDD的依赖生成DAG,并从起点开始真正的计算。

有了计算的DAG图,Spark内核下一步的任务就是根据DAG图将计算划分成任务集 ,也就是Stage,这样可以将任务提交到计算节点进行真正的计算。Spark计算的中间结果默认是保存在内存中的,Spark在划分Stage(阶段)的时候会充分考虑在分布式计算中可流水线计算(pipeline)的部分来提高计算的效率,而在这个过程中,主要的根据就是RDD的依赖类型。根据不同的transformation操作,RDD的依赖可以分为窄依赖(Narrow Dependency)和宽依赖(Wide Dependency,在代码中为ShuffleDependency)两种类型。窄依赖指的是生成的RDD中每个partition只依赖于父RDD(s) 固定的partition。宽依赖指的是生成的RDD的每一个partition都依赖于父 RDD(s) 所有partition。

窄依赖典型的操作有map, filter, union等,宽依赖典型的操作有groupByKey, sortByKey等。可以看到,宽依赖往往意味着shuffle操作,这也是Spark划分stage的主要边界。

对于窄依赖,Spark会将其尽量划分在同一个stage中,因为它们可以进行流水线计算。

Spark在运行时会把Stage包装成任务提交,有父Stage的Spark会先提交父Stage。

SparkContext拥有DAGScheduler的实例,在runJob方法中会进一步调用DAGScheduler的runJob方法。

在此时,DAGScheduler会生成DAG和Stage,将Stage提交给TaskScheduler。TaskSchduler将Stage包装成TaskSet,发送到Worker节点进行真正的计算,同时还要监测任务状态,重试失败和长时间无返回的任务。

整个过程如下图所示。

                         创建RDD,经过一系列transformations,最后actions

                                                 V

                      action触发SparkContext的runJob方法,交给DAGScheduler

                                                 V

                                  DAGScheduler将DAG生成Stage

                                                 V

                               将Stage提交给TaskScheduler

                                                 V

                 TaskSchduler将Stage包装成TaskSet,发送到Worker节点进行真正的计算,

                       同时还要监测任务状态,重试失败和长时间无返回的任务

 

上文提到,Spark的计算是从action开始触发的,如果在action操作之前逻辑上很多transformation操作,一旦中间发生计算失败,Spark会重新提交任务,这在很多场景中代价过大。

还有一些场景,如有些迭代算法,计算的中间结果会被重复使用,重复计算同样增加计算时间和造成资源浪费。因此,在提高计算效率和更好支持容错,Spark提供了基于RDDcache机制和checkpoint机制。

我们可以通过RDD的toDebugString来查看其递归的依赖信息,也就是它的Lineage.

如果发现Lineage过长或者里面有被多次重复使用的RDD,我们就可以考虑使用cache机制或checkpoint机制了。

我们可以通过在程序中直接调用RDD的cache方法将其保存在内存中,这样这个RDD就可以被多个任务共享,避免重复计算。

另外,RDD还提供了更为灵活的persist方法,可以指定存储级别。从源码中可以看到RDD.cache就是简单的调用了RDD.persist(StorageLevel.MEMORY_ONLY)。

同样,我们可以调用RDD的checkpoint方法将其保存到磁盘。我们需要在SparkContext中设置checkpoint的目录,否则调用会抛出异常。

值得注意的是,在调用checkpoint之前建议先调用cache方法将RDD放入内存,否则将RDD保存到文件的时候需要重新计算。

Cache机制和checkpoint机制的差别在于cache将RDD保存到内存,并保留Lineage,如果缓存失效RDD还可以通过Lineage重建。而checkpoint将RDD落地到磁盘并切断Lineage,由文件系统保证其重建。

Spark的集群部署分为Standalone、Mesos和Yarn三种模式,

我们以Standalone模式为例,简单介绍Spark程序的部署。

集群中的Spark程序运行时分为3种角色,driver, master和worker(slave)。

在集群启动前,首先要配置master和worker节点。启动集群后,worker节点会向master节点注册自己,master节点会维护worker节点的心跳。

Spark程序都需要先创建Spark上下文环境,也就是SparkContext。创建SparkContext的进程就成为了driver角色,上一节提到的DAGScheduler和TaskScheduler都在driver中运行。

Spark程序在提交时要指定master的地址,这样可以在程序启动时向master申请worker的计算资源。

 

2015-04-17 13:37:02 wallacegui 阅读数 1278

RDD无疑是spark框架中的核心概念之一,RDD是什么?概念太抽象,不如看看RDD有什么用。本篇主要介绍rdd的容错机制之一checkpoint,就是将RDD写入disk进行做检查点

大致浏览 论文 ,RDD上的操作分为两种:transformation和 action.

(1)    Transformation =>从一个\多个rdd生成另一个rdd

Filter,map,sample,flatmap,join,reduceBykey等

(2)    Action => Count,collect,reduce,save等

以rdd.count为例,调用栈如下:

def count(): Long= sc.runJob(this, Utils.getIteratorSize _).sum

可知count调用的是sparkContext的runJob方法,如下

def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    allowLocal: Boolean,
    resultHandler: (Int, U) => Unit) {
  if (dagScheduler == null) {
    throw new SparkException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite)
  val start = System.nanoTime
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, allowLocal,resultHandler, localProperties.get)
//dagscheduler 是spark的任务调度,里面包括stage的划分、和taskscheduler的交互等,内容丰富,需至少再开一篇。
  logInfo("Job finished: " + callSite + ", took " + (System.nanoTime - start) / 1e9 + " s")
  rdd.doCheckpoint()//调用了Docheckpoint方法。
}

doCheckpoint()方法如下:

private[spark] def doCheckpoint() {
  if (!doCheckpointCalled) {//确保每个rdd至多只执行一次checkpoint
    doCheckpointCalled = true
    if (checkpointData.isDefined) {  //checkpointdata在checkpoint中初始化的,即若对该rdd调用了checkpoint()方法
      checkpointData.get.doCheckpoint() // RDDCheckpointData.doCheckpoint()
    } else {
      dependencies.foreach(_.rdd.doCheckpoint())//否则就递归的对其dependency进行doCheckpoint()
    }
  }
}
/*This function must be called before any job has been
/* executed on this RDD. 
defcheckpoint() {
  if (context.checkpointDir.isEmpty) {
    throw new Exception("Checkpoint directory has not been set in the SparkContext")
  } else if (checkpointData.isEmpty) {
    checkpointData = Some(new RDDCheckpointData(this)) //初始化了checkpointData
    checkpointData.get.markForCheckpoint() //标记一下,
  }
}

可见docheckpoint的实现得继续往RDDCheckpointData.doCheckpoint()里面追

// Do the checkpointing of the RDD. Called after the first job using that RDD is over.
def doCheckpoint() {
  // If it is marked for checkpointing AND checkpointing is not already in progress,
  // then set it to be in progress, else return
  RDDCheckpointData.synchronized {
    if (cpState == MarkedForCheckpoint) {//在rdd.checkpoint()标记的
      cpState = CheckpointingInProgress
    } else {
      return
    }
  }

  // Create the output path for the checkpoint
  val path = new Path(rdd.context.checkpointDir.get, "rdd-" + rdd.id) //存盘的路径,rdd-rdd.id
  val fs = path.getFileSystem(rdd.context.hadoopConfiguration)
  if (!fs.mkdirs(path)) {
    throw new SparkException("Failed to create checkpoint path " + path)
  }

  // Save to file, and reload it as an RDD
  val broadcastedConf = rdd.context.broadcast(
    new SerializableWritable(rdd.context.hadoopConfiguration))
  rdd.context.runJob(rdd, CheckpointRDD.writeToFile[T](path.toString, broadcastedConf) _) //rdd写到存盘path
  val newRDD = new CheckpointRDD[T](rdd.context, path.toString) //如果要用到这个rdd,会从路径读取到rdd
  if (newRDD.partitions.size != rdd.partitions.size) {
    throw new SparkException(
      "Checkpoint RDD " + newRDD + "(" + newRDD.partitions.size + ") has different " +
        "number of partitions than original RDD " + rdd + "(" + rdd.partitions.size + ")")
  }

  // Change the dependencies and partitions of the RDD
  RDDCheckpointData.synchronized {
    cpFile = Some(path.toString)
    cpRDD = Some(newRDD)
    rdd.markCheckpointed(newRDD)   // Update the RDD's dependencies and partitions
    cpState = Checkpointed
    RDDCheckpointData.clearTaskCaches()
  }
  logInfo("Done checkpointing RDD " + rdd.id + " to " + path + ", new parent is RDD " + newRDD.id)
}

总结一下:rdd.checkpoint()->rdd.count->sparkcontext.runjob->rdd.docheckpoint->rddcheckpointdata.docheckpoint

2018-03-22 13:32:22 leedsjung 阅读数 193


RDD


弹性分布式数据集(Resilient Distributed Dataset)

每个 RDD 都被分为多个分区,这些分区运行在集群中的不同节点上。

RDD 支 持 两 种 类 型 的 操 作: 转 化 操 作(transformation) 和 行 动 操 作(action)
  • 转化操作会由一个 RDD 生成一个新的 RDD
  • 行动操作会对 RDD 计算出一个结果,并把结果返回到驱动器程序中,或把结果存储到外部存储系统(如 HDFS)中

Spark 只会惰性计算这些 RDD。它们只有第一次在一个行动操作中用到时,才会真正计算。


默认情况下,Spark 的 RDD 会在你每次对它们进行行动操作时重新计算。
如果想在多个行动操作中重用同一个 RDD,可以使用 RDD.persist() 让 Spark 把这个 RDD 缓存下来
默认,RDD 缓存到内存而不是磁盘上


在任何时候都能进行重算是我们为什么把 RDD 描述为“弹性”的原因。
当保存 RDD 数据的一台机器失败时,Spark 还可以使用这种特性来重算出丢掉的分区,这一过程对用户是完全透明的。



总的来说,每个 Spark 程序或 shell 会话都按如下方式工作。
(1) 从外部数据创建出输入 RDD。
  • 创建 RDD 最简单的方式就是把程序中一个已有的集合传给 SparkContext 的 parallelize()方法
  • 从外部存储中读取数据来创建 RDD
    • 将文本文件读入为一个存储字符串的 RDD 的方法SparkContext.textFile()
(2) 使用诸如 filter() 这样的转化操作对 RDD 进行转化,以定义新的 RDD。
(3) 告诉 Spark 对需要被重用的中间结果 RDD 执行 persist() 操作。
(4) 使用行动操作(例如 count() 和 first() 等)来触发一次并行计算,Spark 会对计算进行优化后再执行。



通过转化操作,你从已有的 RDD 中派生出新的 RDD,Spark 会使用谱系图(lineage graph)来记录这些不同 RDD 之间的依赖关系
  • Spark 需要用这些信息来按需计算每个 RDD,
  • 也可以依靠谱系图在持久化的 RDD 丢失部分数据时恢复所丢失的数据



惰性求值
  • 我们不应该把 RDD 看作存放着特定数据的数据集,而最好把每个 RDD 当作我们通过转化操作构建出来的、记录如何计算数据的指令列表
  • 把数据读取到 RDD 的操作也同样是惰性的


传递函数时需要小心的一点是,当传递的函数中包含字段引用时(例如 self.field),会把整个对象发到工作节点上,
  • 这可能比你想传递的东西大得多。
  • 有时,如果传递的对象不可序列化,也会导致你的程序失败
  • 替代的方案是,只把你所需要的字段从对象中拿出来放到一个局部变量中,然后传递这个局部变量

传递函数,涉及到的变量,必须可序列化


Spark 中大部分常见的转化操作和行动操作

特定数据类型的 RDD还支持一些附加操作,
  • 例如,数字类型的 RDD 支持统计型函数操作,
  • 而键值对形式的RDD 则支持诸如根据键聚合数据的键值对操作

任意数据类型的 RDD都支持的通用转化操作:
  • filter、map
  • flatMap
    • 有时候,我们希望对每个输入元素生成多个输出元素。实现该功能的操作叫作 flatMap()
    • 和 map() 类似,我们提供给 flatMap() 的函数被分别应用到了输入 RDD 的每个元素上。不过返回的不是一个元素,而是一个返回值序列的迭代器。
    • 输出的 RDD 倒不是由迭代器组成的。而是一个包含所有元素的 RDD(所谓flat)
  • 伪集合运算:
    • distinct、union、intersection、subtract(差集)、cartesian(笛卡尔积)
    • 尽管 RDD 本身不是严格意义上的集合,但它也支持许多数学上的集合操作,比如合并和相交操作
    • RDD 中最常缺失的集合属性是元素的唯一性,因为常常有重复的元素
    • distinct,开销较大

任意数据类型的 RDD都支持的通用行动操作:
  • 最常见的行动操作 reduce()。它接收一个函数作为参数,这个函数要操作两个 RDD 的元素类型的数据并返回一个同样类型的新元素
  • fold
    • 与reduce类似。。区别,是提供一个“初始值”来作为每个分区第一次调用时的结果。
  • aggregate操作就是将map、reduce合并提供结果
把数据返回驱动器程序中
  • 最简单、最常见的操作是 collect(),它会将整个 RDD 的内容返回
  • take(n) 返回 RDD 中的 n 个元素,
  • 如果为数据定义了顺序,就可以使用 top() 从 RDD 中获取前几个元素
  • takeSample
  • count()、countByValue()



在 Scala 中,将 RDD 转为有特定函数的 RDD(比如在 RDD[Double] 上进行数值操作)是由隐式转换来自动处理的
  • 这些隐式转换可以隐式地将一个 RDD 转为各种封装类,比如 DoubleRDDFunctions(数值数据的 RDD)和 PairRDDFunctions(键值对 RDD),这样我们就有了诸如 mean() 和variance() 之类的额外的函数


我们可以为 RDD 选择不同的持久化级别
  • 在 Scala和 Java 中,默认情况下 persist() 会把数据以序列化的形式缓存在 JVM 的堆空间中。
    • 堆外缓存是试验性功能,
  • org.apache.spark.storage.StorageLevel
  • 如有必要,可以通过在存储级别的末尾加上“_2”来把持久化数据存为两份
  • persist() 调用本身不会触发强制求值
  • 如果要缓存的数据太多,内存中放不下,Spark 会自动利用最近最少使用(LRU)的缓存策略把最老的分区从内存中移除
    • 但是对于使用内存与磁盘的缓存级别的分区来说,被移除的分区都会写入磁盘
  • 最后,RDD 还有一个方法叫作 unpersist(),调用该方法可以手动把持久化的 RDD 从缓存中移除







pair RDD 


键值对 RDD 通常用来进行聚合计算。
我们一般要先通过一些初始 ETL(抽取Extract、转化Transform、装载Load)操作来将数据转化为键值对形式。

让用户控制键值对 RDD 在各节点上分布情况的高级特性:分区
  • 使用 PageRank 算法来演示分区的作用
  • 为分布式数据集(RDD)选择正确的分区方式和为本地数据集选择合适的数据结构很相似
  • 数据的分布都会极其明显地影响程序的性能表现


从一个 RDD 中提取某些字段(例如代表事件时间、用户 ID 或者其他标识符的字段),并使用这些字段作为 pair RDD 操作中的键(类似Lists.uniqueMap)


创建pair RDD

当需要把一个普通的 RDD 转为 pair RDD 时,可以调用 map() 函数来实现,传递的函数需要返回键值对(二元组)


Pair RDD的转化操作
  • 单个集合
    • 提供 reduceByKey() 方法,可以分别归约每个键对应的数据,
    • groupByKey() 对具有相同键的值进行分组
    • mapValues(func) 对 pair RDD 中的每个值应用一个函数而不改变键
      • flatMapValues
  • 两个集合
    • subtractByKey差集

Pair RDD的转化操作
  • 聚合
    • reduceByKey、 foldByKey(带初始值的reduce)
      • reduceByKey() 与 reduce() 相当类似
      • foldByKey() 则与 fold() 相当类似
    • combineByKey
      • 大多数基于键聚合的函数都是用它实现的,为用户提供了更简单的接口
      • combineByKey() 会遍历分区中的所有元素,因此每个元素的键要么还没有遇到过,要么就和之前的某个元素的键相同
      • 如果这是一个新的元素,combineByKey() 会使用一个叫作 createCombiner() 的函数来创建那个键对应的累加器的初始值
        • 这一过程会在每个分区中第一次出现各个键时发生,而不是在整个 RDD 中第一次出现一个键时发生
      • 如果这是一个在处理当前分区之前已经遇到的键,它会使用 mergeValue() 函数将该键的累加器对应的当前值与这个新的值进行合并
      • 如果有两个或者更多的分区,就需要使用用户提供的 mergeCombiners() 函数将各个分区的结果进行合并
      • combineByKey的三个参数,分别是上述三个函数
    • 并行度调优
      • 每个 RDD 都有固定数目的分区,分区数决定了在 RDD 上执行操作时的并行度
      • 在执行聚合或分组操作时,可以要求 Spark 使用给定的分区数
      • Spark 始终尝试根据集群的大小推断出一个有意义的默认值
      • 本章讨论的大多数操作符都能接收第二个参数,这个参数用来指定分组结果或聚合结果的RDD 的分区数,
      • 有时,我们希望在除分组操作和聚合操作之外的操作中也能改变 RDD 的分区
        • Spark 提供了 repartition() 函数
        • 对数据进行重新分区是代价相对比较大的操作
        • Spark 中也有一个优化版的 repartition(),叫作 coalesce()。
        • rdd.partitions.size()查看 RDD 的分区数,确保调用 coalesce() 时将 RDD 合并到比现在的分区数更少的分区中
  • 数据分组
    • groupByKey  所得到的结果 RDD 类型会是[K, Iterable[V]]
    • groupBy() 可以用于未成对的数据上,也可以根据除键相同以外的条件进行分组
      • 可以接收一个函数,对源 RDD 中的每个元素使用该函数,将返回结果作为键再进行分组
    •  如果你发现自己写出了先使用 groupByKey() 然后再对值使用 reduce() 或者fold() 的代码,你很有可能可以通过使用一种根据键进行聚合的函数来更高效地实现同样的效果。。。后者更高效
    • 使用一个叫作 cogroup() 的函数对多个共享同一个键的 RDD 进行分组。
      • 对两个键的类型均为 K 而值的类型分别为 V 和 W 的 RDD 进行cogroup() 时,得到的结果 RDD 类型为 [(K, (Iterable[V], Iterable[W]))]
      • 如果其中的一个 RDD 对于另一个 RDD 中存在的某个键没有对应的记录,那么对应的迭代器则为空。
  • 连接
    • 连接方式多种多样:右外连接、左外连接、交叉连接以及内连接。
    • join() 方法,对两个 RDD 进行内连接,可以把两个 RDD 中键相同的元素组合到一起,合并为一个 RDD
      • cogroup将两个 RDD 中拥有相同键的数据分组到一起
      • 注意区别
    • rightOuterJoin、leftOuterJoin
      • 对两个 RDD 进行连接操作,类似sql 左右外连接(rightOuterJoin确保第一个RDD的所有元素存在,右侧没的补空)
  • 排序
    • sortByKey

Pair RDD的行动操作
  • countByKey
  • collectAsMap
  • lookup(key)


Optional 是 Google 的 Guava 库(https://github.com/google/guava)中的一部分,
  • 表示有可能缺失的值。可以调用 isPresent() 来看值是否存在,
  • 果数据存在,则可以调用 get() 来获得其中包含的对象实例

scala.Option 对象,这是 Scala 中用来存放可能存在的对象的容器类。
你可以对这个 Option 对象调用 isDefined() 来检查其中是否有值,调用 get() 来获取其中的值。


对于每种操作(转化、行动),需要传入函数,此时,注意函数的输入、输出以及注意RDD的单个元素形式。






RDD分区


控制数据分布以获得最少的网络传输可以极大地提升整体性能

只有当RDD多次在诸如连接这种基于键的操作中使用时,分区才会有帮助

系统会根据一个针对键的函数(分区函数)对元素进行分组
Spark 可以确保同一组的键出现在同一个节点上

哈希分区
  • 例如,将一个 RDD 分成了 100 个分区,
  • 此时键的哈希值对100 取模的结果相同的记录会被放在一个节点上
范围分区法


在join操作时,对大型RDD进行分区,产生的效果
  • 默认情况下,连接操作会将两个数据集中的所有键的哈希值都求出来,
  • 将该哈希值相同的记录通过网络传到同一台机器上,然后在那台机器上对所有键相同的记录进行连接操作


  • 在程序开始时,对 userData 表使用 partitionBy() 转化操作,将这张表转为哈希分区
  • 当调用 userData.join(events) 时,Spark 只会对 events 进行数据混洗操作,
  • 将 events 中特定 UserID 的记录发送到 userData 的对应分区所在的那台机器上


转化操作,返回值总是一个新的 RDD
RDD 一旦创建就无法修改。


分区数目,它会控制之后对这个 RDD 进行进一步操作(比如连接操作)时有多少任务会并行执行。


许多其他 Spark 操作会自动为结果 RDD 设定已知的分区方式信息
  • sortByKey() 和 groupByKey()会分别生成范围分区的 RDD 和哈希分区的 RDD
  • 诸如 map() 这样的操作会导致新的 RDD 失去父 RDD 的分区信息,因为这样的操作理论上可能会修改每条记录的键


RDD 的 partitioner 属性获得RDD的分区方式


分区操作之后,务必进行persist持久化。


Spark 的许多操作都引入了将数据根据键跨节点进行混洗的过程。所有这些操作都会从数据分区中获益。
  • 连接:
    • join()、leftOuterJoin()、rightOuterJoin()
  • 分组
    • cogroup()、groupWith()、groupByKey()、
  • 聚合
    • reduceByKey()、combineByKey() 
  •  lookup()


对于像 reduceByKey() 这样只作用于单个 RDD 的操作
  • 运行在未分区的 RDD 上的时候会导致每个键的所有对应值都在每台机器上进行本地计算,
  • 只需要把本地最终归约出的结果值从各工作节点传回主节点,所以原本的网络开销就不算大
对于诸如 cogroup() 和join() 这样的二元操作,
  • 预先进行数据分区会导致其中至少一个 RDD不发生数据混洗
一个 RDD 是通过 mapValues() 从另一个 RDD 中创建出来的,这两个RDD 就会拥有相同的键和分区方式
  • 那么跨节点的数据混洗就不会发生了。


Spark 内部知道各操作会如何影响分区方式,并将会对数据进行分区的操作的结果 RDD 自动设置为对应的分区器
  • 如果你调用 join() 来连接两个 RDD;由于键相同的元素会被哈希到同一台机器上,Spark 知道输出结果也是哈希分区的

转化操作的结果并不一定会按已知的分区方式分区,这时输出的 RDD 可能就会没有设置分区器。
  • 当你对一个哈希分区的键值对 RDD 调用 map() 时,由于传给 map()的函数理论上可以改变元素的键,因此结果就不会有固定的分区方式
  • mapValues() 和flatMapValues() 可以保证每个二元组的键保持不变


这里列出了所有会为生成的结果 RDD 设好分区方式的操作
  • 分组:
    • cogroup()、groupWith()、groupByKey()
  • 连接
    • join()、leftOuterJoin()、rightOuterJoin()
  • 聚合
    • reduceByKey()、combineByKey()
  • partitionBy()、sort()、
  • mapValues()(如果父 RDD 有分区方式的话)、flatMapValues()(如果父 RDD 有分区方式的话),以及 filter()(如果父 RDD 有分区方式的话)。
  • 其他所有的操作生成的结果都不会存在特定的分区方式。



对于二元操作,输出数据的分区方式取决于父 RDD 的分区方式
  • 默认情况下,结果会采用哈希分区,分区的数量和操作的并行度一样。
  • 不过,如果其中的一个父 RDD 已经设置过分区方式,那么结果就会采用那种分区方式;
  • 如果两个父 RDD 都设置过分区方式,结果 RDD 会采用第一个父 RDD 的分区方式




PageRank 是执行多次连接的一个迭代算法,因此它是 RDD 分区操作的一个很好的用例

算法会维护两个数据集
  • 一个由 (pageID, linkList) 的元素组成,包含每个页面的相邻页面的列表
  • 另一个由 (pageID, rank) 元素组成,包含每个页面的当前排序值


自定义分区方式

要实现自定义的分区器,你需要继承 org.apache.spark.Partitioner 类并实现下面三个方法
  • numPartitions: Int:返回创建出来的分区数。
  • getPartition(key: Any): Int:返回给定键的分区编号(0 到 numPartitions-1)。
  • equals():Java 判断相等性的标准方法。。Spark 需要用这个方法来检查你的分区器对象是否和其他分区器实例相同,这样 Spark 才可以判断两个RDD 的分区方式是否相同

有一个问题需要注意,当你的算法依赖于 Java 的 hashCode() 方法时,这个方法有可能会返回负数。
  • 你需要十分谨慎,确保 getPartition() 永远返回一个非负数



2018-09-13 13:18:16 u010530712 阅读数 236

RDD是弹性分布式数据集,通常RDD很大,会被分成很多分区,分别保存在不同节点上

分区原则

RDD分区是使分区的个数尽量等于集群中的CPU核心数目

默认分区数

对于不同的Spark部署模式而言(本地模式、Standalone模式、YARN模式、Mesos模式),都可以通过设置spark.default.parallelism这个参数的值来配置默认分区数。

本地模式:默认为本地机器的CPU数目,若设置了local[N],则默认为N

Mesos:默认分区数为8

Standalone/YARN:在“集群中所有CPU核心数目总和”和“2”二者中较大值作为默认值

如何手动设置分区

(1)创建RDD时:在调用textFile和parallelize方法时候手动指定分区个数,sc.textFile(path,partionNum)

对于parallelize而言,如果没有在方法中指定分区数,则默认为spark.default.parallelism

对于textFile而言,如果没有在方法中指定分区,则默认为min(defaultParallelism,2),其中defaultParallelism对应的就是spark.default.parallelism

(2)通过转换操作得到新RDD时:直接调用reparation方法

如果是从HDFS中读取文件,则分区数为文件分片数(比如,128MB/片)

 

 

 

 

 

2019-04-25 22:19:20 weixin_42490528 阅读数 1268

RDD依赖:

窄依赖:每一个父RDD的分区最多被子RDD的一个分区使用

宽依赖:每一个父RDD的分区被多个子RDD分区依赖

 

RDD依赖的作用:

血统(Lineage)会记录RDD的元数据和转换行为,

以便恢复丢失的分区。

 

stage划分:

DAG(有向无环图):RDD的一系列转换组成了DAG

划分stage的目的是生成任务,每个stage对应一种任务,

划分stage的关键因素为是否发生了宽依赖(是否shuffle(洗牌))

如下图,groupBy和join是宽依赖,分别是stage1、stage2,

再加上总的算一个stage,划分了3个stage

 

 

web界面观察stage:

启动spark集群:

启动hadoop:start-all.sh

启动spark:./start-all.sh

使用spark-shell:

spark-shell --master yarn-client

scala> sc.textFile("hdfs://master:9000/test/wordcount.txt").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).collect

res1: Array[(String, Int)] = Array((scala,2), (python,2), (hello,1), (apple,2), (java,1))

在spark页面观察进程:http://192.168.32.128:8080/

点进去有job、stage可以查看

 

 

 

checkpoint:

检查点checkpoint的作用:

对RDD做checkpoint,可以切断做checkpoint的RDD的依赖关系,

将RDD数据保存到可靠存储(如HDFS)以便数据恢复,保证数据的安全性(HDFS的备份规则)。

 

cache或者persist放到内存或者磁盘中,但是这样也不能保证数据完全不会丢失,存储的这个内存出问题了或者磁盘坏了,也会导致spark从头再根据RDD计算一遍,所以就有了checkpoint,其中checkpoint的作用就是将DAG中比较重要的中间数据做一个检查点将结果存储到一个高可用的地方(通常这个地方就是HDFS里面)

 

checkpoint步骤:

1. 建立checkpoint存储路径

scala> sc.setCheckpointDir("hdfs://master:9000/checkpoint0727")

2. rdd1.cache()

3. rdd1.checkpoint()

 

在checkpoint的时候强烈建议先进行cache,并且当你checkpoint执行成功了,

那么前面所有的RDD依赖都会被销毁

spark之RDD

阅读数 3373

Spark--》RDD

阅读数 6

Spark RDD和DAG生成

阅读数 7

没有更多推荐了,返回首页