-
2018-03-14 19:55:32
sparkDag源码分析–生成
Dag生成
dag主要是通过rdd的各种转换生成,如下面rdd的map方法,会生成一个新的Rdd
//返回一个新的rdd,通过应用一个函数到Rdd的所有元素 def map[U: ClassTag](f: T => U): RDD[U] = { val cleanF = sc.clean(f) new MapPartitionsRDD[U, T](this, (context, pid, iter) => iter.map(cleanF)) }
多个这样的方法(算子)就生成了一个有向无环图Dag(Directed acyclic graph),如下图最左边的部分
下面来看一下几个相关的类RDD
abstract class RDD[T: ClassTag]( @transient private var _sc: SparkContext, @transient private var deps: Seq[Dependency[_]] //依赖 ) extends Serializable with Logging { //这个构造方法被下面的MapPartitionsRDD使用,map算子就只依赖前面一个RDD def this(@transient oneParent: RDD[_]) = this(oneParent.context , List(new OneToOneDependency(oneParent))) //这个方法默认为deps,但是不同的子类实现是不同的,注意,这里 //返回的是一个Seq,表示一个rdd的依赖可能有多个,比如 c = a join b //那么 c的依赖就有 a 和 b 了,这种rdd下面会讲到的 protected def getDependencies: Seq[Dependency[_]] = deps }
MapPartitionsRDD OneToOneDependency
// map算子会生成MapPartitionsRDD 是Rdd的一个子类,它实现的就是默认的getDependencies //方法 private[spark] class MapPartitionsRDD[U: ClassTag, T: ClassTag]( prev: RDD[T], f: (TaskContext, Int, Iterator[T]) => Iterator[U], // (TaskContext, partition index, iterator) preservesPartitioning: Boolean = false) extends RDD[U](prev) {}
ShuffledRDD ShuffleDependency
// groupByKey算子会生成ShuffledRDD //构造方法传了一个Nil,进去了,难道shuffleDependency没有依赖? //它重写了getDependencies,里面new ShuffleDependency,使用的是prev变量, class ShuffledRDD[K, V, C]( @transient var prev: RDD[_ <: Product2[K, V]], part: Partitioner) extends RDD[(K, C)](prev.context, Nil) { override def getDependencies: Seq[Dependency[_]] = { List(new ShuffleDependency(prev, part, serializer, keyOrdering, aggregator, mapSideCombine)) } }
CoGroupedRDD OneToOneDependency/ShuffleDependency
//join算子会生成CoGroupedRDD //CoGroupedRDD,它的getDependencies,会生成多个Dependency,join里面 //可能有OneToOneDependency/ShuffleDependency class CoGroupedRDD[K](@transient var rdds: Seq[RDD[_ <: Product2[K, _]]], part: Partitioner) extends RDD[(K, Array[Iterable[_]])](rdds.head.context, Nil) { override def getDependencies: Seq[Dependency[_]] = { rdds.map { rdd: RDD[_ <: Product2[K, _]] => if (rdd.partitioner == Some(part)) { logDebug("Adding one-to-one dependency with " + rdd) new OneToOneDependency(rdd) } else { logDebug("Adding shuffle dependency with " + rdd) new ShuffleDependency[K, Any, CoGroupCombiner](rdd, part, serializer) } } } }
通过上面的代码,我们知道了再Rdd之间的转换,是通过不断生成新的Rdd,来构成一幅Dag
图的,Rdd与Rdd之间有OneToOneDependency/ShuffleDependency(切分的依据)更多相关内容 -
Spark的DAG图
2019-05-29 21:21:30DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于...DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就就形成了DAG,根据RDD之间的依赖关系的不同将DAG划分成不同的Stage,对于窄依赖,partition的转换处理在Stage中完成计算。对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。
从WordCount角度解释DAG
sc.textFile(“xx").flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_).saveAsTextFile(“xx")
RDD任务切分
RDD任务运行规划图
WordCount的DAG图
推荐:
-
Spark中的DAG介绍
2022-04-25 18:50:45DAG:全称为 Directed Acyclic Graph 中文为:有向无环图 在spark中,使用DAG来描述我们的计算逻辑。 2 Spark中的DAG DAG是一组顶点与边的组合,顶点代表RDD,边代表对RDD的一系列操作。 DAG Sheduler根据RDD的...目录
1 DAG的介绍
DAG:全称为 Directed Acyclic Graph 中文为:有向无环图
在spark中,使用DAG来描述我们的计算逻辑。
2 Spark中的DAG
DAG是一组顶点与边的组合,顶点代表RDD,边代表对RDD的一系列操作。
DAG Sheduler根据RDD的不同transformation操作,讲DAG分为不同的stage,每个stage中又分为多个task。
3 DAG解决的问题
DAG的出现主要是为了解决MapReduce框架的局限性。MapReduce框架的局限性主要有以下两点:
(1)每个MapReduce操作都是独立的,hadoop框架也不知道接下来会有哪些MapReduce。
(2)每一步的输出结果,都会持久化到硬盘或者HDFS上。
当以上两个特点结合之后,我们就可以想象,如果在某些迭代的场景下,MapReduce 框架会对硬盘和 HDFS 的读写造成大量浪费。
而且每一步都是都是堵塞在上一步中,所以当我们处理复杂计算时,会需要很长的时间,但是数据量却不大。
所以Spark中引入了DAG,它可以优化计算计划,比如减少shuffle的计算。
4 DAG是怎么工作
4.1 工作流程
- 解释器是第一层。Spark 通过使用Scala解释器,来解释代码,并会对代码做一些修改。
- 在Spark控制台中输入代码时,Spark会创建一个 operator graph, 来记录各个操作。
- 当一个 RDD 的 Action 动作被调用时, Spark 就会把这个 operator graph 提交到 DAG scheduler 上。
- DAG Scheduler 会把 operator graph 分为各个 stage。 一个 stage 包含基于输入数据分区的task。DAG scheduler 会把各个操作连接在一起。
- 这些 Stage 将传递给 Task Scheduler。Task Scheduler 通过 cluster manager 启动任务。Stage 任务的依赖关系, task scheduler 是不知道的。
- 在 slave 机器上的 Worker 们执行 task。
4.2 注意点
RDD 的 transformation 分为两种:窄依赖(如map、filter),宽依赖(如reduceByKey)。 窄依赖不需要对分区数据进行 shuffle ,而宽依赖需要。所以窄依赖都会在一个 stage 中, 而宽依赖会作为 stage 的交界处。
每个 RDD 都维护一个 metadata 来指向一个或多个父节点的指针以及记录有关它与父节点的关系类型。
-
Spark的DAG的生成过程详解
2021-01-25 15:27:14DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就形成了DAG,根据RDD之间依赖关系的不同将DAG划分成不同的Stage(调度阶段)。 对于窄依赖,partition的转换处理在一个Stage中完成计算。 对于...1. DAG详解
- DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就形成了DAG,根据RDD之间依赖关系的不同将DAG划分成不同的Stage(调度阶段)。
- 对于窄依赖,partition的转换处理在一个Stage中完成计算。
- 对于宽依赖,由于有Shuffle的存在,只能在parent RDD处理完成后,才能开始接下来的计算,因此宽依赖是划分Stage的依据。
DAG的边界:- 开始:通过SparkContext创建的RDD
- 触发Action,一旦触发Action就形成了一个完整的DAG
小结:
- 一个Spark的Application应用中一个或者多个DAG(也就是一个Job),取决于触发了多少次Action
- 一个DAG中会有不同的阶段/stage,划分阶段/stage的依据就是宽依赖
- 一个阶段/stage中可以有多个Task,一个分区对应一个Task
2.DAG划分Stage
- Spark的计算逻辑关系
- 一个Application有一个或者多个job,一个job对应一个DAG
- 一个job分为不同的stage
- 一个stage下面有一个或者多个TaskSet
- 一个TaskSet有很多Task(一个Task就是所需的cpucores)
- 一个TaskSet就对应一个RDD,很多RDD称为TaskSets
-
为什么要划分Stage? –并行计算
-
如何划分DAG的stage
-
总结:
Spark会根据shuffle/宽依赖使用回溯算法来对DAG进行Stage划分,从后往前,遇到宽依赖就断开,遇到窄依赖就把当前的RDD加入到当前的stage/阶段中
-
Spark DAG概述
2019-09-16 12:52:24DAG,有向无环图,Directed Acyclic Graph的缩写,常用于建模。Spark中使用DAG对RDD的关系进行建模,描述了RDD的依赖关系,这种关系也被称之为lineage,RDD的依赖关系使用Dependency维护。参考Spark RDD之Dependency... -
Spark DAG有向无环图任务划分
2021-01-21 17:38:181.DAG有向无环图 DAG(Directed Acyclic Graph)有向无环图是由点和线组成的拓扑图形,该图形具有方向,不会闭环。例如,DAG记录了RDD的转换过程和任务的阶段。 2)RDD任务切分中间分为:Application、Job、Stage和... -
Spark DAG优化的解读
2018-07-15 16:15:08一,Spark专业术语的解析 1,Application基于Spark的用户程序,包含了driver程序和集群上的executor 2,Driver Program运行main函数并且新建SparkContext的程序 3,Cluster Manager在集群上获取资源的外部服务... -
spark-5 DAG图和stage
2020-08-05 23:26:38DAG图和stage的划分DAGstage为什么要划分stage如何划分stagetaskSetstage和stage之间的联系 DAG DAG(Directed Acyclic Graph)叫做有向无环图(有方向,无闭环),在spark中用不表示数据的流向。原始的RDD经过一系列... -
spark中dag的介绍
2019-06-18 17:06:24目录 什么是DAG ...在 Spark 中, 使用 DAG 来描述我们的计算逻辑。 什么是DAG DAG 是一组顶点和边的组合。顶点代表了 RDD, 边代表了对 RDD 的一系列操作。 DAG Scheduler 会根据 RDD 的 t... -
Spark DAG和Stage
2021-11-22 23:58:51受制于某些任务必须比另一些任务较早执行的限制,必须对任务进行排队,形成一个队列的任务集合,这个队列的任务集合就是DAG图,每一个定点就是一个任务,每一条边代表一种限制约束(Spark中的依赖关系) 二、DAG如何... -
spark的Web UI查看DAG的两种方式
2020-05-05 13:55:06提交spark任务后,master:8088->ApplicationMaster 然后会跳转到spark的WEB UI界面. 第一种查看DAG的方式是: Jobs->Description中任选一个 第二种查看DAG的方式是: Stages->Description中任选... -
Spark基础 DAG
2018-12-03 22:19:45为什么使用spark的原因是早期的编程模式MapReduce缺乏对数据共享...spark处理数据时,会将计算转化为一个有向无环图(DAG)的任务集,RDD能够有效的恢复DAG中故障和慢节点执行的任务,并且RDD提供一种基于粗粒度变换的... -
spark六 DAG和task
2019-04-02 23:49:43目录 任务划分的几个概念 DAG图解 task的划分 任务划分的几个概念 RDD任务切分中间分为:Application、Job、Stage和Task 1)Application:初始化一个SparkContext即生成一个Application 2)Job:一个Action算子就会... -
Spark有向无环图DAG图解与演示
2018-11-12 19:21:44在图论中,如果一个有向图无法从某个顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。 因为有向图中一个点经过两种路线到达另一个点未必形成环,因此有向无环图未必能转化成树,但任何有向树..... -
SparkCore中DAG介绍
2020-12-12 16:57:04DAG,全称 Directed Acyclic Graph, 中文为:有向无环图。 在 Spark 中, 使用 DAG 来描述我们的计算逻辑。 什么是DAG? DAG 是一组顶点和边的组合。顶点代表了 RDD, 边代表了对 RDD 的一系列操作。 DAG ... -
什么是SparkDAG
2018-02-15 18:16:36什么是SparkDAG原创 2016年06月23日 00:39:124049对于DAG这个概念现在很多上面都有所应用概念在spark里每一个操作生成一个RDD,RDD之间连一条边,最后这些RDD和他们之间的边组成一个有向无环图,这个就是DAG。... -
Apache Spark DAG
2020-04-07 15:31:27DAG(Directed Acyclic Graph有向无环图)指的是数据转换执行的过程,有方向,无闭环(其实就是RDD执行的流程) 原始的RDD通过一系列的转换操作就形成了DAG有向无环图,任务执行时,可以按照DAG的描述,执行真正的计算... -
spark DAG 笔记
2019-11-02 08:48:54DAG,有向无环图,Directed Acyclic Graph的缩写,常用于建模。 Spark中使用DAG对RDD的关系进行建模,描述了RDD的依赖关系,这种关系也被称之为lineage,RDD的依赖关系使用Dependency维护,参考Spark RDD之... -
Spark—— DAG 如何划分stage?
2020-07-14 11:37:36DAG介绍 DAG是什么 DAG(Directed Acyclic Graph有向无环图) 指的是数据转换执行的过程,有方向,无闭环(其实就是RDD执行的流程) 原始的RDD通过一系列的...一个Spark应用中可以有一到多个DAG,取决于触发了多少次A -
spark--DAG的生成和Stage的划分-★★★
2021-02-21 20:54:07DAG的生成和Stage的划分DAG生成什么是DAGDAG图的开始和结束代码中有多少个DAG有向无环图呢?Stage划分Stage划分划分依据Stage划分效果 DAG生成 什么是DAG DAG(Directed Acyclic Graph)有向无环图, 指的是RDD直接的... -
Spark内核原理之DAG和Stage
2021-04-21 08:23:19有一个有向图无法从任意顶点出发经过若干条边回到该点,则这个图是一个有向无环图(DAG图)。 Spark的DAG:就是spark任务/程序执行的流程图 DAG的开始:从创建RDD开始 DAG的结束:到Action结束 注:一个Spark程序中有... -
简单了解spark、flink执行任务过程中的DAG有向无环图
2021-05-09 20:11:48DAG:Directed Acyclic Graph,中文意为「有向无环图」。 DAG原本是计算机领域一种常用数据结构,因为独特的拓扑结构所带来的优异特性,经常被用于处理动态规划、导航中寻求最短路径、数据压缩等多种算法场景。我们... -
Spark DAG之划分Stage
2017-04-22 13:55:56到这里,Stage就划分完成了,最后贴张spark webUI的图片 总结 介绍了Stage的结构和实现类,举了一个例子,从物理结构上介绍了Stage的划分,以及划分后的Stage保存了哪些重要信息,了解这些是后续分析根据... -
spark ui job和stage的dag图查看过去运行的任务,查不到,分析源码解决问题
2021-04-05 13:48:47sparkUI 显示dag信息缺失问题: 使用用2.x跑任务,查看spark 性能问题,从spark ui选择出最耗时的任务,进去查看,有一个任务有100多个job, 有几百个stage,程序跑完看前面 job和stage的dag图,提示没有了 ... -
Spark DAG的生成
2018-03-06 17:25:09Spark DAG的生成DAG概念 DAG(Directed Acyclic Graph)叫做有向无环图,原始的RDD通过一系列的转换就形成了DAG,根据RDD之间依赖关系的不同将DAG划分成不同的Stage(调度阶段)。对于窄依赖,partition的转换处理在一... -
spark --DAG的生成和划分Stage
2020-04-08 14:46:33原始的RDD通过一系列的转换操作就形成了DAG有向无环图,任务执行时,可以按照DAG的描述,执行真正的计算(数据被操作的一个过程) ●DAG的边界 开始:通过SparkContext创建的RDD 结束:触发Action,一旦触... -
spark DAG 切分 stage
2019-09-14 22:28:371、DAG的整体图 简要说明(个人理解): ①当所有的RDD触发action的时候,会生成一个DAG ②stage是由DAG进行切分的,可以理解stage是一个taskset ③DAG是根据宽依赖进行切分stage的,stage是由Driver进行提交的 ... -
Spark的RDD概要&DAG概述
2021-01-16 09:45:16DAG概述1.RDD: 弹性分布式数据集(相当于集合)2.RDD对应HDFS关系图3.DAG概要 1.RDD: 弹性分布式数据集(相当于集合) 弹性:RDD的数据默认是存放在内存中,但是内存资源存在不足的情况,spark会将RDD数据写入磁盘...