精华内容
下载资源
问答
  • 2021-11-14 23:41:02

    [源码] Spark如何划分Stage

    大家好,我是一拳就能打爆你A柱的猛男

    太久不写博客了,今天写的博客可能有点生疏,各位将就看一下。不知道有没有人跟我一样好奇Spark如何划分Stage的,今天翻一下源码给大家看看。

    1、Stage的介绍

    这部分还是给一点前置知识,如果对Stage有不了解或者了解不清楚的同学可以看一下,如果比较熟悉可以直接跳到第二节。

    1.1 Stage是什么概念以及Stage的划分

    我们接触Spark的时候一定会接触到下面几个名词:

    • 1、Application:Application就是用户所提交的应用程序,比如写了一个wordcount打成jar提交,这就是一个Application
    • 2、Job:一个Application是由多个Job组成的,Job可以理解成一连串连续的动作(自己的理解,后续会解释)
    • 3、Stage:一个Job里会包含多个Stage,Stage以shuffle切分,其中Stage又会分为ShuffleMapStage和ResultStage两种
    • 4、Task:Task是存在于Stage内的概念,是实际执行的单元,也分为ShuffleMapTask和ResultTask两种

    这几个介绍说实话,我看了好多次了,都背下来了。但是我相信很多人还是没有真真切切的搞明白,下面再用一张图解释(也是很多博客 的做法):

    这里讲的比较啰嗦,但是应该能帮助大家更好理解 。上面这张图可以看作是一个Job,包含了三个虚线框,这就是三个Stage。每个大写字母旁边的圆角矩形就是一个RDD,RDD内部的小矩形就是一个RDD块。从RDD A到RDD B的笛卡尔积映射关系就是shuffle操作,而RDD C到RDD D这种一对一的映射就是map操作。

    Stage的划分从最后一个RDD开始,且按照深度优先的方式遍历,这一点很重要,所以我们从RDD G开始往前看。步骤如下:

    • 1、RDD G -> RDD B是一个map操作,所以会并入G对应的Stage
    • 2、RDD B -> RDD A是一个shuffle操作,所以会不会并入B的Stage,也不是G的Stage的成员
    • 3、RDD A往前就没有RDD了,所以RDD A自成一个Stage,这时StageId=1
    • 4、回到G,G -> F是一个 shuffle操作,所以不会 并入G的Stage
    • 5、F->E是一个map操作,所以并入F的Stage
    • 6、E往前没有RDD,所以停止遍历
    • 7、F->D是一个map操作,所以并入F的Stage
    • 8、D->C是一个map操作,所以并入F的Stage
    • 9、C往前没有RDD,所以停止遍历,F为起点的遍历结束,此时StageId=2
    • 10、G的遍历因为其他遍历的结束而结束,StageId=3

    得到下面的表格:

    StageIdRdds
    1A
    2C D E F
    3B G

    此时可以很清楚的看到,Stage包含了 RDD,而且因为RDD之间有血缘关系,即前一个RDD“计算”结束后才能进行下一个RDD的操作。所以 Stage的执行也有顺序,即Stage从后往前划分,从前往后执行。

    1.3 Stage的意义

    那么经过上面的操作到底有什么意义呢?为什么不直接按照用户代码给的顺序直接执行下来就完了呢?

    观察上面这个图可以发现,shuffle的操作需要所有的RDD块都到位之后才能执行,但是map操作好像并不需要。Stage2中分两台机执行的话,一台机执行要执行两次map操作然后在F处等待即可,另一台机则执行一次map操作就可以等待,全部执行完毕后就可以做shuffle了。

    大家可以想象一下一个工厂的流水线上,有的流水线只需要盖个章,而有的流水线需要将商品分发到不同的流水线上。在Spark中数据就想流水线上的商品,盖章的(map)操作哪怕再多,只要不需要跟其他机器的数据有交互,都可以一次性执行下来。而分发的(shuffle)操作,则需要等待这一批的商品(数据)都到齐后,才能分发。

    Stage就是按照shuffle划分的,而Stage划分的意义就是将所有的盖章(map)操作放在一起,不做停留的执行,从而提高效率。

    这一段比喻我也不知道恰不恰当,各位辩证看待。

    2、从SparkPi定位Stage划分代码

    我手头刚好是spark-2.2.3的工程,所以就用这套代码了,如果是用的更新的工程也没关系,因为基本上没有大改动。

    打开Spark工程找到examples模块下的SparkPi:

    object SparkPi {
      def main(args: Array[String]) {
        val spark = SparkSession
          .builder
          .master("local[*]")
          .appName("Spark Pi")
          .getOrCreate()
        val slices = if (args.length > 0) args(0).toInt else 2
        val n = math.min(100000L * slices, Int.MaxValue).toInt // avoid overflow
        val count = spark.sparkContext.parallelize(1 until n, slices).map { i =>
          val x = random * 2 - 1
          val y = random * 2 - 1
          if (x * x + y * y <= 1) 1 else 0
        }.reduce(_ + _) // action算子提交任务
        println("Pi is roughly " + 4.0 * count / (n - 1))
        spark.stop()
      }
    }
    

    可以看到只有一个reduce这一个Action算子,即负责提交Job的算子。从这里进入看到下面的代码:

    def reduce(f: (T, T) => T): T = withScope {
        ...
        // 其他都可以不看先,先看runjob
        sc.runJob(this, reducePartition, mergeResult)
        // Get the final result out of our Option, or throw an exception if the RDD was empty
        jobResult.getOrElse(throw new UnsupportedOperationException("empty collection"))
    }
    

    可以看到runJob就是提交Job,而我们知道Job中包含多个Stage,所以划分Stage的代码必然在里面,点进入看到下面代码:

    def runJob[T, U: ClassTag](
        rdd: RDD[T],
        processPartition: Iterator[T] => U,
        resultHandler: (Int, U) => Unit)
    {
        val processFunc = (context: TaskContext, iter: Iterator[T]) => processPartition(iter)
        runJob[T, U](rdd, processFunc, 0 until rdd.partitions.length, resultHandler)
    }
    
    继续点:
    def runJob[T, U: ClassTag](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        resultHandler: (Int, U) => Unit): Unit = {
        ...
        dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)
        progressBar.foreach(_.finishAll())
        rdd.doCheckpoint()
    }
    

    这里可以看到dagScheduler.runJob,也就是说在SparkContext类里维护了一个DAGScheduler对象,提交Job的时候实际上是调用了dagScheduler.runJob函数,点进去:
    可以看到dagScheduler.runJob里做了三件事:

    • 1、记录Job执行时间
    • 2、执行Job
    • 3、对执行结果做处理
    def runJob[T, U](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        callSite: CallSite,
        resultHandler: (Int, U) => Unit,
        properties: Properties): Unit = {
        val start = System.nanoTime
        val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
        ThreadUtils.awaitReady(waiter.completionFuture, Duration.Inf)
        waiter.completionFuture.value.get match {
            case scala.util.Success(_) =>
            logInfo("Job %d finished: %s, took %f s".format
                    (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
            case scala.util.Failure(exception) =>
            logInfo("Job %d failed: %s, took %f s".format
                    (waiter.jobId, callSite.shortForm, (System.nanoTime - start) / 1e9))
            // SPARK-8644: Include user stack trace in exceptions coming from DAGScheduler.
            val callerStackTrace = Thread.currentThread().getStackTrace.tail
            exception.setStackTrace(exception.getStackTrace ++ callerStackTrace)
            throw exception
        }
    }
    

    重点在执行Job的步骤,val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)

    跳转到submitJob函数中,这里还是没到划分的代码:

    def submitJob[T, U](
        rdd: RDD[T],
        func: (TaskContext, Iterator[T]) => U,
        partitions: Seq[Int],
        callSite: CallSite,
        resultHandler: (Int, U) => Unit,
        properties: Properties): JobWaiter[U] = {
        ...
        eventProcessLoop.post(JobSubmitted(
            jobId, rdd, func2, partitions.toArray, callSite, waiter,
            SerializationUtils.clone(properties)))
        waiter
    }
    

    这里提交给eventProcessLoop去执行JobSubmitted这个事件了。

    注:Spark的RPC通讯(Spark各节点的通讯)以及*EventLoop都是通过事件来发送命令的,也没那么神奇,这些事件就是一个case class,当然事件这个说法是我自己想的,反正就是那么个东西。

    接上面的 eventProcessLoop.post,发送了一个JobSubmitted事件,下面点击eventProcessLoop

    private[scheduler] val eventProcessLoop = new DAGSchedulerEventProcessLoop(this)
    

    接着点击DAGSchedulerEventProcessLoop

    private[scheduler] class DAGSchedulerEventProcessLoop(dagScheduler: DAGScheduler)
    extends EventLoop[DAGSchedulerEvent]("dag-scheduler-event-loop") with Logging {
        ...
        private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
            case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
            dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
    
    

    可以看到doOnReceive函数里有对事件类型的判断,第一个就是JobSubmitted,可以看到实际上调用了dagScheduler.handleJobSubmitted函数,继续点击到这个函数内:

    private[scheduler] def handleJobSubmitted(jobId: Int,
          finalRDD: RDD[_],
          func: (TaskContext, Iterator[_]) => _,
          partitions: Array[Int],
          callSite: CallSite,
          listener: JobListener,
          properties: Properties) {
        ...
        val stageIds = jobIdToStageIds(jobId).toArray
        val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
        listenerBus.post(
          SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
        submitStage(finalStage)
      }
    

    **最后一行就是提交Stage的代码了。终于找到了,蛮不容易的。**走这一遍过程其实还是有收获的,除了找到Stage划分的位置之外,也可以无意间了解到Spark有一种特别的调用机制,就是事件调用的方式。其实通过名字可以看出来*EventProcessLoop就是事件处理循环,是*EventLoop的具体实现类。这些类实际上就是一个阻塞线程等待处理相应的事件:

    private[spark] abstract class EventLoop[E](name: String) extends Logging {
        ...
        private val eventThread = new Thread(name) {
            setDaemon(true)
            override def run(): Unit = {
                try {
                    while (!stopped.get) {
                        val event = eventQueue.take()
                        try {
                            onReceive(event)
                        } catch {
                            case NonFatal(e) =>
                            try {
                                onError(e)
                            } catch {
                                case NonFatal(e) => logError("Unexpected error in " + name, e)
                            }
                        }
                    }
                } catch {
                    case ie: InterruptedException => // exit even if eventQueue is not empty
                    case NonFatal(e) => logError("Unexpected error in " + name, e)
                }
            }
        }
    

    3、Spark划分Stage代码解读

    接下来开始读Stage的划分的代码了,其实这部分蛮简单的,也可以作为刚了解Spark的同学想要读Spark源码的练手册,提升信心。

    接下来的代码我尽量逐行的注释,同时会附上完整的代码,可能代码块会长一点,各位注意。点开submitStage(finalStage)函数,进入函数内部:

    private def submitStage(stage: Stage) {
        // 找到Stage对应的JobId
        val jobId = activeJobForStage(stage)
        // 若找得到则继续划分Stage,执行提交Stage的动作
        if (jobId.isDefined) {
            logDebug("submitStage(" + stage + ")")
            // 这里有三个HashSet,分别注释一下:
            // 1、waitingStages:用来装父Stage没有执行完毕的Stage
            // 2、runningStages:用来装正在执行的Stage
            // 3、failedStages:用来装执行失败,将要重新执行的Stage
            // 下面的判断显然是针对第一次进入的Stage,假如Stage有未执行的父Stage、或者Stage正在运行或者需要重新执行,则没法进入 
            if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
                // 下面getMissingParentStages函数是找到当前Stage的所有父Stage,并按照StageId排序
                val missing = getMissingParentStages(stage).sortBy(_.id)
                logDebug("missing: " + missing)
                // 若当前Stage的父Stage为空,则说明走到头了,可以执行
                if (missing.isEmpty) {
                    logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
                    // 执行Stage的函数在这里
                    submitMissingTasks(stage, jobId.get)
                } else {
                    // 若找得到父Stage,则还需要递归的进入当前函数
                    for (parent <- missing) {
                        submitStage(parent)
                    }
                    // 并且将当前stage加入等待执行的set中
                    waitingStages += stage
                }
            }
        // 若是没找到则终止所有需要使用到当前 入参Stage的Job
        } else {
            abortStage(stage, "No active job for stage " + stage.id, None)
        }
    }
    

    经过观察上面的代码,可以看到几个细节:

    • 1、针对Stage的执行,Spark分为3类
      • 1 等待执行的Stage
      • 2 正在执行的Stage
      • 3 需要重新执行的Stage
    • 2、Stage的执行是按照递归进行的,也就是说针对当前入参Stage,Spark会往前找父Stage,一直到没有父Stage的时候才会执行Stage的操作(这点证实了Stage从前往后执行
    • 3、针对找不到JobId的Stage,Spark可能会对所有需要该Stage的Job都做终止操作(不知道理解的对不对)
    • 4、其实这里并没有划分Stage的过程,而划分过程显然放在找父Stage的函数getMissingParentStages
    • 5、具体提交Stage的代码在submitMissingTasks函数里

    点击getMissingParentStages进入函数:

    // 这里还是蛮简单的,就是用栈做一个深度优先搜索,根据RDD的血缘关系以及依赖类型划分Stage
    private def getMissingParentStages(stage: Stage): List[Stage] = {
        // 装父Stage
        val missing = new HashSet[Stage]
        // 用于DFS的set
        val visited = new HashSet[RDD[_]]
        // 用于DFS的栈
        val waitingForVisit = new Stack[RDD[_]]
        // 这里定义了一个访问函数,具体的划分代码在这里
        def visit(rdd: RDD[_]) {
            if (!visited(rdd)) {
                visited += rdd
                // 这个函数从名字来看就是确定RDD内各个分区(partition)所处的节点位置
                val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)
                if (rddHasUncachedPartitions) {
                    // 从这开始遍历RDD所有的父RDD,并且对RDD的依赖做判断
                    // 若当前RDD到父RDD的依赖是ShuffleDependency,则证明是一个Stage的划分点
                    // 若当前RDD到父RDD的依赖是NarrowDependency,则证明当前RDD和父RDD属于同一个Stage
                    for (dep <- rdd.dependencies) {
                        dep match {
                            case shufDep: ShuffleDependency[_, _, _] =>
                            // 下面这行操作还没搞清楚,不敢乱说
                            // 还是乱说一下把,这行蛮重要的,就是如果当前Stage是已经访问过的,就直接返回已经构造好的Stage
                            // 如果没访问过,也就是Stage第一次出现,则会构造一个带血缘关系的Stage,当然在提交的时候会根据血缘
                            // 重新构造父Stage
                            val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
                            if (!mapStage.isAvailable) {
                                missing += mapStage
                            }
                            case narrowDep: NarrowDependency[_] =>
                            // 若是窄依赖,则将父RDD放入栈中等待进一步的遍历
                            waitingForVisit.push(narrowDep.rdd)
                        }
                    }
                }
            }
        }
        // DFS第一步,先把Job里最尾巴的Stage的rdd放进等待访问的栈
        waitingForVisit.push(stage.rdd)
        // 接下来就是弹栈,访问,划分,压栈的操作,全在visit里
        while (waitingForVisit.nonEmpty) {
            visit(waitingForVisit.pop())
        }
        // 经过完整的血缘遍历,得到当前Stage所有的父Stage,返回
        missing.toList
    }
    

    所以具体的划分代码已经看完了,有下面几个细节可以注意一下:

    • 1、Stage通过RDD血缘关系的方式往前遍历(证实Stage从后往前遍历
    • 2、Stage的划分依据是Shuffle依赖
    • 3、划分Stage的具体方式是用深度优先遍历实现的

    在这里还是留了一点遗憾,就是getOrCreateShuffleMapStage函数的具体内容没有搞清楚,接下来有机会再仔仔细细的看一遍。

    4、总结

    其实Stage划分的代码是相对来说很简单的,只要找对位置,基本上就能够看得懂。通过去看Spark-core中Stage划分的代码,可以帮助刚接触Spark的同学更好地了解Stage的概念,也可以提高看源码的信心。

    总结一下,Stage到底是什么:

    感性的解释:

    Stage就像是工厂流水线中的一组流水线,而map操作就好像其中的一个流水线。玩过戴森球计划的同学肯定很清楚流水线设计的重要性。map操作就像给商品做一个单一的操作,不需要与其他流水线上的商品有交集,所以可以无顾忌的马力全开。而Shuffle操作就好像一套流水线走完之后的装车、分发操作,将map做好的商品分发到各个节点(流水线)进行下一组的封装。(这里可以再想想一个 工厂做手机屏幕,另一个工厂做手机中框,几百个工厂 同时火力全开做单个零件,最终装车shuffle到苹果、华为的富士康流水线上做组装)

    根据代码来的解释:

    Stage就是Job里的一个阶段,这个Stage会负责一些操作,然后等所有数据准备好后,shuffle到各节点。Stage划分从后往前划分,而执行的时候却要从前往后执行,因为Stage有前后的依赖关系。

    太久不写博客了,欢迎各位大佬指教。

    更多相关内容
  • gentoo-stage4 目录 主意 我们所有人都知道,与任何其他发行版相比,安装Gentoo会花费一些时间。 在过去的几年中,我一直通过自动安装程序脚本进行安装,该过程需要花费几分钟,但是在每次安装时都会进行一些重复...
  • Pipeline Stage View插件在Flow Project的索引页面的Stage View下,包括对管道构建历史记录的扩展可视化。 (您也可以单击“完整舞台视图”以获取全屏视图。) 要利用此视图,您需要在流程中定义阶段。 您可以按照...
  • Two-stage-TrAdaboost.R2

    2021-05-11 20:16:36
    Pardoe的论文“回归传递的提升(ICML 2010)”中的两阶段TrAdaBoost.R2算法 描述 这是Pardoe等人提出的基于Boost的回归任务转移学习算法... 第一类Stage2_TrAdaBoostR2是sklearn软件包中AdaBoostRe
  • stage-python

    2021-05-10 00:28:15
    用Python编写计算机程序! 由 , 和 于12月22日至24日组织的编程课程的Code Runestone。 这些页面可以在以下位置查看: : 指示 virtualenv . pip install -r requirements.txt cd web paver build ...
  • e-stage 6.6

    2020-11-20 17:37:33
    制作3d模型支撑时,必备工具。 3D打印机的完美搭配,Materialise Magics。 3D打印magics 21对接e-stage 6.6实现一键自动加支撑
  • 尝试同时使用babel-preset-react-native和babel-preset-stage-0时,您在React Native中遇到奇怪的错误吗? 对于使用RN上最新和最出色的Babel功能有多么困难,您是否通常感到困惑和沮丧? 该预设适合您! 安装 npm ...
  • 在2017年5月3日即将发行的 Docker 17.05.0-ce 中,Docker 官方提供了简便的多阶段构建 (multi-stage build) 方案,下面这篇文章主要给大家介绍了关于Dockerfile中multi-stage(多阶段构建)的相关资料,需要的朋友可以...
  • STAGE TEN Connect上参与实况广播时,请使用屏幕共享作为源。 在STAGE TEN中共享计算机的屏幕作为实时信号源。 显示演示文稿,游戏供稿,照片或屏幕上的任何内容。 第十阶段可让您在一个简单的Web应用程序中利用...
  • 要说明这个问题主要从感受野的角度去看,one stage的方法,对于SSD,其采取多个特征图进行分类,但由于依赖网络中比较深的层(特征层),感受野很大,因而小物体检测不准确。同样,对于YOLO,由于在方法设计中就把原...
  • Stage.java

    2021-11-27 12:12:21
    Stage.java
  • Java-stage:JAVA_STAGE

    2021-03-07 15:52:33
    Java阶段 JAVA_STAGE
  • 2019年6月思科新推出的合作伙伴认证考试,Black Belt HyperFlex Deployment - Stage1 题库
  • 国际优质教学资源:ROS下Stage仿真器以及TF详解PPT(含例程代码),通俗易懂。
  • stagefright框架2.0.pdf

    2021-11-20 09:58:53
    stagefright框架2.0.pdf
  • 对橡子的第3阶段提案支持 这是的插件-一个很小的,快速JavaScript解析器,完全...const stage3 = require ( 'acorn-stage3' ) ; Parser . extend ( stage3 ) . parse ( 'class X { #a() {} }' ) ; 执照 该插件是根据。
  • Page2Stage-开源

    2021-05-13 20:19:24
    Page 2 Stage是编剧软件,专为撰写剧本,剧本和剧本的人们而设计。 Page 2 Stage最初是商业产品。 但是,它现在已作为开源发布。
  • weblogic部署方式nostage和stage优劣对比(英文版) weblogic部署方式nostage和stage优劣对比(英文版) weblogic部署方式nostage和stage优劣对比(英文版)
  • Multi-Stage Progressive Image Restoration文献调研
  • back-stage-management:一个后台管理系统的源码
  • 对Spark 中Stage划分的分析,很详细。如果有不清楚的地方可以一起谈论
  • Stage划分算法原理剖析
  • Parallel multi-stage features fusion of deep convolutional neural networks for aerial scene classification
  • Stage :移动机器人模拟器
  • Two-Stage Fuzzy Fusion Based-Convolution Neural Network for Dynamic Emotion Recognition
  • open-stage-control, Libre和模块化 osc/midi控制器 开放阶段控制是一个自由的桌面 OSC directionnal控制 Surface 应用程序。 服务器使用 HTML 。JavaScript & CSS和作为 node/电子服务器运行,它接受任意数量的...
  • 开源项目-itwars-Docker-multi-stage-build#an-example-of-docker-multi-stage-building-with-golang.zip,An example of Docker multi-stage building with Golang
  • FCOS Fully Convolutional One-Stage Object Detection论文
  • 以前学stage3d写的,很挫,但是性能很高,可以用来学习显卡渲染的原理
  • A two-stage attention aware method for train bearing shed oil inspection based on convolutional neural networks

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 206,231
精华内容 82,492
关键字:

stage