精华内容
下载资源
问答
  • spark推测执行
    2021-11-02 21:26:27

    基本概念

            在spark程序中,推测任务是指对于一个stage里面拖后腿的task,会在其他节点的Executor上再次启动这个task,如果其中一个task的实例运行成功者将这个最先完成的task的计算结果,同时会干掉其它Executor上运行的实例。默认情况下推测执行时关闭的。

    开启推测的优点:

    1. 解决慢task提升作业的整体执行进度
    2. 解决分布式集群环境下,负载不均衡或者资源分布不均等问题
    3. 解决因机器或者程序bug导致执行task的进程hang(暂时停止执行)住,使得job无法继续执行,需要重启任务等问题

    开启推测的弊端:

    1. 占用更多的集群资源,严重的会造成所有资源被全部占用,不能及时释放
    2. task执行非事务操作,如果中间过程有跟外界存储交互的可能会影响结果数据

    推测执行算法流程图:

    开启spark的推测执行,需要设置运行参数spark.speculation=true,两种设置方式:

    • 在程序的sparkConf对象设置 :sparkConf.set("spark.speculation","true")
    • 提交作业时设定: --conf spark.speculation=true

    开启spark的推测执行需结合其他三个参数同时使用:

    1. spark.speculation.interval 100:检测周期,单位毫秒;
    2. spark.speculation.quantile 0.75:完成task的百分比时启动推测;
    3. spark.speculation.multiplier 1.5:比其他的慢多少倍时启动推测。

    执行流程如下图:

            执行流程: 推测执行根据设置检查周期spark.speculation.interval,默认100ms定时检查执行的task是否需要对task启用推测。当task执行到100ms时,程序开始检测该spark程序job对应的stage已经执行完成的task,如果没有超过spark.speculation.quantile设定的百分比,则不启用推测。如果超过spark.speculation.quantile设定的值,计算成功task运行时间的中位数medianDuration,然后计算启用推测执行时间的界限threshold = (spark.speculation.multiplier)*medianDuration,对正在运行的task运行时间是否超过启用推测执行时间的界限threshold,如果运行时间未超过界限,则不启用推测,如果超过界限则会在另一个excecutor启动相同的task计算,如果其中一个task的实例运行成功者将这个最先完成的task的计算结果,同时会干掉其它Executor上运行的实例。如果200ms的时候,也就是spark.speculation.interval的2倍还有task未完成的话,就会进入下一次的推测执行判断周期中,判断逻辑跟周期一的一样,这是一个循环的过程。

    注意问题 :

           推测执行的检测周期不要设计得太短,不然可能会重复创建很多相同的task,如果有实时跟外部存储交互的场景慎用推测执行,因为一个task虽然没有执行完,但是一部分结果已经写入外部存储了,启动多个task就会造成数据重复,所以具体要不要开推测和参数怎么设定,一定要根据具体业务设定。

    参考 :

    Spark推测执行spark.speculation - 软件开发其他 - 红黑联盟

    Spark推测执行spark.speculation_lvbiao_62的博客-CSDN博客_spark.speculation

    更多相关内容
  • Spark 推测执行

    千次阅读 2019-03-31 13:04:37
    Spark 推测执行是一种优化技术。 在Spark中,可以通过推测执行,即Speculative Execution,来识别并在其他节点的Executor上重启某些运行缓慢的Task,并行处理同样的数据,谁先完成就用谁的结果,并将另一个未完成的...

    Spark 推测执行是一种优化技术。

    在Spark中,可以通过推测执行,即Speculative Execution,来识别并在其他节点的Executor上重启某些运行缓慢的Task,并行处理同样的数据,谁先完成就用谁的结果,并将另一个未完成的Task Kill掉,从而加快Task处理速度。适用于某些Spark任务中部分Task被hang住或运行缓慢,从而拖慢了整个任务运行速度的场景。

    注意:

    1. 不是所有运行缓慢的Spark任务,都可以用推测执行来解决。

    2. 使用推测执行时应谨慎。需要合适的场景、合适的参数,参数不合理可能会导致大量推测执行Task占用资源。

    3. 如Spark Streaming写Kafka缓慢,若启用推测执行,可能会导致数据重复。

    4. 被推测的Task不会再次被推测。

    Spark推测执行参数

    • spark.speculation :默认false。是否开启推测执行。

    • spark.speculation.interval :默认100ms。多久检查一次要推测执行的Task。

    • spark.speculation.multiplier :默认1.5。一个Stage中,运行时间比成功完成的Task的运行时间的中位数还慢1.5倍的Task才可能会被推测执行。

    • spark.speculation.quantile: 默认0.75。推测的分位数。即一个Stage中,至少要完成75%的Task才开始推测。

    Spark推测执行源码解析

    源码解析

      /**
        * TaskScheduleImpl在启动时,会判断是否启动Task的推测执行。
        */
      override def start() {
        backend.start()
        if (!isLocal && conf.getBoolean("spark.speculation", false)) {
          logInfo("Starting speculative execution thread")
          // scheduleWithFixedDelay 位于`java.util.concurrent.ScheduledExecutorService#scheduleWithFixedDelay`
          // scheduleWithFixedDelay 指的是系统启动等待`第一个SPECULATION_INTERVAL_MS 时间后`,开始执行定时任务,每隔`第二个SPECULATION_INTERVAL_MS 时间`执行一次。
          // SPECULATION_INTERVAL_MS 可通过`spark.speculation.interval`参数设置
          speculationScheduler.scheduleWithFixedDelay(new Runnable {
            override def run(): Unit = Utils.tryOrStopSparkContext(sc) {
              // 检查需要推测执行的Task
              checkSpeculatableTasks()
            }
          }, SPECULATION_INTERVAL_MS, SPECULATION_INTERVAL_MS, TimeUnit.MILLISECONDS)
        }
      }
    
    

    如果开启Spark推测执行(即设置参数spark.speculation=true),且不是Local模式运行,则TaskScheduleImpl在启动spark.speculation.interval(即上述第一个SPECULATION_INTERVAL_MS)时间后,会每隔spark.speculation.interval(即上述第二个SPECULATION_INTERVAL_MS)时间启动一个线程去检查需要推测执行的Task。

    点击checkSpeculatableTasks()方法,跳转到org.apache.spark.scheduler.checkSpeculatableTasks,如下代码:

     def checkSpeculatableTasks() {
        var shouldRevive = false
        synchronized {
          // MIN_TIME_TO_SPECULATION 在原始副本运行至少这段时间后,才会启动任务的重复副本。 
          shouldRevive = rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION)
        }
        if (shouldRevive) {
          // 如果有需要推测执行的Task,则SchedulerBackend向ApplicationMaster发送reviveOffers消息,获取集群中可用的executor列表,发起task
          backend.reviveOffers()
        }
      }
    

    可以看到,该方法内部调用的是rootPool.checkSpeculatableTasks(MIN_TIME_TO_SPECULATION),如下代码:

      override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
        var shouldRevive = false
        //schedulableQueue是ConcurrentLinkedQueue[Schedulable]类型,而Schedulable Trait有两种类型的调度实体:Pool、TaskSetManager
        for (schedulable <- schedulableQueue.asScala) {
          shouldRevive |= schedulable.checkSpeculatableTasks(minTimeToSpeculation)
        }
        shouldRevive
      }
    

    可以看到,最终调用的是schedulable.checkSpeculatableTasks(minTimeToSpeculation)方法。

    schedulableschedulableQueue中的对象,schedulableQueue是ConcurrentLinkedQueue[Schedulable]类型,而Schedulable Trait有两种类型的调度实体:Pool、TaskSetManager。

    通过查看org.apache.spark.scheduler.TaskSetManager#checkSpeculatableTasks方法可看到真正检测推测Task的逻辑。如下:

    //真正检测推测执行Task的逻辑
      override def checkSpeculatableTasks(minTimeToSpeculation: Int): Boolean = {
        // Can't speculate if we only have one task, and no need to speculate if the task set is a
        // zombie or is from a barrier stage.
        if (isZombie || isBarrier || numTasks == 1) {
          return false
        }
        var foundTasks = false
        // minFinishedForSpeculation=SPECULATION_QUANTILE * numTasks
        // SPECULATION_QUANTILE即spark.speculation.quantile
        // numTasks即某个Stage中Taskset的任务总数。
        val minFinishedForSpeculation = (SPECULATION_QUANTILE * numTasks).floor.toInt
        logDebug("Checking for speculative tasks: minFinished = " + minFinishedForSpeculation)
    
        // 1)已经成功的Task数必须要大于等于`spark.speculation.quantile * numTasks`,才开始处理这个TaskSet
        if (tasksSuccessful >= minFinishedForSpeculation && tasksSuccessful > 0) {
          val time = clock.getTimeMillis()
          // medianDuration: 已经成功的Task的运行时间的中位数
          // threshold=max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)
          // SPECULATION_MULTIPLIER:即spark.speculation.multiplier
          val medianDuration = successfulTaskDurations.median
          val threshold = max(SPECULATION_MULTIPLIER * medianDuration, minTimeToSpeculation)
          // TODO: Threshold should also look at standard deviation of task durations and have a lower
          // bound based on that.
          logDebug("Task length threshold for speculation: " + threshold)
          // 2)遍历TaskSet中的每一个Task
          for (tid <- runningTasksSet) {
            val info = taskInfos(tid)
            val index = info.index
            // 3)如果还未运行成功 且 正在执行 且 运行时间已经超过threshold 且 当前不是推测运行的Task
            // 就将该Task取出放到需要推测执行的列表中
            if (!successful(index) && copiesRunning(index) == 1 && info.timeRunning(time) > threshold &&
              !speculatableTasks.contains(index)) {
              logInfo(
                "Marking task %d in stage %s (on %s) as speculatable because it ran more than %.0f ms"
                  .format(index, taskSet.id, info.host, threshold))
              speculatableTasks += index
              // 4)最终由DAGScheduler将Task提交到待执行的队列中,后台线程将对提交的任务进行处理
              sched.dagScheduler.speculativeTaskSubmitted(tasks(index))
              foundTasks = true
            }
          }
        }
        foundTasks
      }
    

    检测推测Task的大致流程

    spark_speculative.jpg

    Spark推测执行示例

    代码示例

    package com.bigdata.spark
    
    import org.apache.spark.TaskContext
    import org.apache.spark.sql.SparkSession
    import org.slf4j.LoggerFactory
    
    /**
      * Author: Wang Pei
      * License: Copyright(c) Pei.Wang
      * Summary: 
      *   Spark推测执行
      */
    object SparkSpeculative {
      def main(args: Array[String]): Unit = {
    
        @transient lazy val logger = LoggerFactory.getLogger(this.getClass)
    
        val spark=SparkSession.builder()
          //启用Spark推测执行
          .config("spark.speculation",true)
          .config("spark.speculation.interval",1000)
          .config("spark.speculation.multiplier",1.5)
          .config("spark.speculation.quantile",0.10)
          .getOrCreate()
    
    
        logger.info("开始处理.........................................")
    
         //设置5个并行度,一个Stage中,5个Task同时运行
         //为保证5个Task同时运行,Spark Submit提交任务时给5个核
         //这样,方便观察第4个Task被推测执行
        spark.sparkContext.parallelize(0 to 50,5)
          .foreach(item=>{
            if(item ==38){Thread.sleep(200000)}
            val taskContext = TaskContext.get()
            val stageId = taskContext.stageId()
            val taskAttemptId = taskContext.taskAttemptId()
            logger.info(s"当前Stage:${stageId},Task:${taskAttemptId},打印的数字..............${item}..................")
          })
    
        logger.info("处理完成.........................................")
      }
    }
    

    任务提交

    /data/apps/spark-2.4.0-bin-2.7.3.2.6.5.3-10/bin/spark-submit \
        --master yarn \
        --deploy-mode cluster \
        --driver-memory 1g \
        --executor-memory 1g \
        --executor-cores  1 \
        --num-executors  5 \
        --queue offline \
        --name SparkSpeculative \
        --class com.bigdata.spark.SparkSpeculative \
        bigdata_spark.jar
    

    Yarn日志查看

    在Yarn上可以看到如下日志:

    19/03/31 04:21:39 INFO scheduler.TaskSetManager: Finished task 2.0 in stage 0.0 (TID 2) in 318 ms on x.x.x.x (executor 4) (1/5)
    19/03/31 04:21:39 INFO scheduler.TaskSetManager: Finished task 4.0 in stage 0.0 (TID 4) in 321 ms on x.x.x.x (executor 2) (2/5)
    19/03/31 04:21:39 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 338 ms on x.x.x.x (executor 1) (3/5)
    19/03/31 04:21:39 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 327 ms on x.x.x.x (executor 3) (4/5)
    #task 3被标记为推测执行
    19/03/31 04:21:40 INFO scheduler.TaskSetManager: Marking task 3 in stage 0.0 (on x.x.x.x) as speculatable because it ran more than 486 ms
    #启动task 3的推测执行task(taskID=5)
    19/03/31 04:21:40 INFO scheduler.TaskSetManager: Starting task 3.1 in stage 0.0 (TID 5, x.x.x.x, executor 3, partition 3, PROCESS_LOCAL, 7855 bytes)
    #kill掉task 3的推测执行task(taskID=5),由于原来的task已经成功
    19/03/31 04:24:59 INFO scheduler.TaskSetManager: Killing attempt 1 for task 3.1 in stage 0.0 (TID 5) on x.x.x.x as the attempt 0 succeeded on x.x.x.x
    19/03/31 04:24:59 INFO scheduler.TaskSetManager: Finished task 3.0 in stage 0.0 (TID 3) in 200311 ms on x.x.x.x (executor 5) (5/5)
    

    Spark WebUI查看

    在Spark WebUI上可以看到如下结果:

    spark_speculative_webui.jpg

    展开全文
  • spark推测执行

    2019-02-23 16:36:40
    一. 简介 再我们run spark job 的时候 有...spark 为了这样的情况提供了了一种机制叫推测执行这种机制默认是关闭的需要手动开启。 二. 配置 设置 spark.speculation=true即可 额外设置 1. spark.speculation.inte...

    一. 简介

    再我们run spark job 的时候 有时候可能会遇到 很少一部分task 运行的时候出错了或者卡住了。又不想直接重新run 一下 这样太不方便了而且还需要手动参与。
    spark 为了这样的情况提供了了一种机制叫推测执行这种机制默认是关闭的需要手动开启。

    二. 配置

    设置 spark.speculation=true即可
    额外设置

    1. spark.speculation.interval 100:检测周期,单位毫秒;
    2. spark.speculation.quantile 0.75:当运行成功的task占总task的百分比启动推测;
    3. spark.speculation.multiplier 1.5:比其他的慢多少倍时启动推测。
    

    流程图如下:
    在这里插入图片描述

    三.问题

    1.谨慎使用,严重的会造成所有资源被全部占用,不能及时释放
    2.我们的spark任务会将计算结果写入kafka,再有logstash写入es。最近由于kafka集群写入慢,甚至写不进去,spark任务直接卡住,为防止卡住的情况发生,加了推测执行,但发现跑出来的数据存在重复的情况。同一条数据写了2次,排查发现是由于推测执行的问题,像这种讲执行结果写入kafka的场景,不适用推测执行,因为一个task虽然没有执行完,但是一部分结果已经输出了,启动多个task就会造成数据重复,所以具体的配置还是要看应用的场景来做权衡

    欢迎关注,更多惊喜等着你

    这里写图片描述

    展开全文
  • 有的Task很慢就会成为整个任务的瓶颈,此时可以触发 推测执行 (speculative) 功能,为长时间的task重新启动一个task,哪个先完成就使用哪个的结果,并Kill掉另一个task。 重点配置: # 开启speculative,默认关闭...

    在Spark中任务会以DAG图的方式并行执行,每个节点都会并行的运行在不同的executor中,但是有的任务可能执行很快,有的任务执行很慢,比如网络抖动、性能不同、数据倾斜等等。有的Task很慢就会成为整个任务的瓶颈,此时可以触发 推测执行 (speculative) 功能,为长时间的task重新启动一个task,哪个先完成就使用哪个的结果,并Kill掉另一个task。

    重点配置:

    # 开启speculative,默认关闭
    spark.speculation=true 
    
    # 检测周期,单位毫秒
    spark.speculation.interval=100
    
    # 任务完成的百分比,比如同一个stage中task的完成占比
    条件1:  spark.speculation.quantile=0.75 [比如该stage有100个task,已完成75个及以上task]
    
    # 任务延迟的比例,比如当75%的task都完成,那么取他们的中位数跟还未执行完的任务作对比。如果超过1.5倍,则开启推测执行。
    
    条件2:  spark.speculation.multiplier=1.5
    
    条件1和条件2都满足时,才会对该运行慢的任务启用speculation
    
    
    

    在实现时,spark会在TaskSchedulerImpl中启动独立的线程池,每100ms检查所有的task是否有慢任务,然后针对慢任务开启推测执行。

    转自 https://zhuanlan.zhihu.com/p/151833314

    展开全文
  • spark推测执行的坑

    千次阅读 2018-05-11 17:19:39
    1、spark推测执行开启设置 spark.speculation=true即可2、spark开启推测执行的好处推测执行是指对于一个Stage里面运行慢的Task,会在其他节点的Executor上再次启动这个task,如果其中一个Task实例运行成功则将这个...
  •  推测执行(Speculative Execution)是指在分布式集群环境下,因为程序BUG,负载不均衡或者资源分布不均等原因,造成同一个job的多个task运行速度不一致,有的task运行速度明显慢于其他task(比如:一个job的某个task...
  • spark推测执行(填坑)

    千次阅读 2018-07-17 16:13:09
    1、spark推测执行开启 设置 spark.speculation=true即可 额外设置 1. spark.speculation.interval 100:检测周期,单位毫秒; 2. spark.speculation.quantile 0.75:完成task的百分比时启动推测; 3. spark....
  • 很多情况下因为运行环境导致的task跑的过慢,或者直接卡死,让task可以重新跑起来是可以缓解这个问题的,也就是Spark中的推测执行机制(speculation)。 --conf spark.speculation=true --conf spark.speculation
  • spark推测执行

    千次阅读 2019-01-04 16:19:20
    1、什么是推测执行?   在spark作业运行中,一个stage里面的不同task的执行时间可能...而推测执行就是当出现同一个stage里面有task长时间完成不了任务,spark就会在不同的executor上再启动一个task来跑这个任务,...
  • 某个workflow原来执行完只需要1h,现在执行时间翻倍? workflow 历史执行时间 4:00-5:14 workflow2 历史执行时间 5:00-5:56 根据Start Time显示workflow在4:21开始执行,5:00被workflow2卡死, 在workflow2 释放...
  • 一speculative简介 在spark作业运行中,一个spark作业会构成一个DAG调度图,一个DAG又切分成多个stage,...而推测执行(speculative)就是当出现同一个stage里面有task长时间完成不了任务,spark就会在不同的executor..
  • 87-Spark推测执行spark.speculation

    千次阅读 2018-03-29 21:23:31
     推测执行(Speculative Execution)是指在分布式集群环境下,因为程序BUG,负载不均衡或者资源分布不均等原因,造成同一个job的多个task运行速度不一致,有的task运行速度明显慢于其他task(比如:一个job的某个task...
  • Spark推测执行解决SparkStreaming任务task卡死问题 景:测试环境运行一个SparkStreaming任务,yarn-cluster模式,duration为5分钟一个批次,每个批次平均2000w条records,并行度为60 资源配置为: ${SPARK_HOME}...
  • Spark speculation(推测执行)详解

    千次阅读 2020-03-12 21:15:11
    我们都知道,Spark job中,一个stage什么时候完成,取决于stage下最后一个task的完成时间。task的完成时间也被很多因素影响,比如partition的分配,executor的资源使用情况,host的运行状态,集群网络等等。很多情况...
  • [spark] spark推测执行

    千次阅读 2017-10-19 20:14:49
    推测任务是指对于一个Stage里面拖后腿的Task,会在其他节点的Executor上再次启动这个...spark推测执行默认是关闭的,可通过spark.speculation属性来开启。检测是否有需要推测式执行的Task在SparkContext创建了schedul
  • 推测执行机制 推测任务是指对于一个Stage里面拖后腿的Task,会在其他节点的Executor上再次启动这个task,如果其中一个Task实例运行成功则将这个最先完成的Task的计算结果作为最终结果,同时会干掉其他Executor上...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,999
精华内容 1,199
关键字:

spark推测执行

友情链接: lkguestbook.zip