精华内容
下载资源
问答
  • task-scheduler-源码

    2021-06-04 19:55:03
    处理完所有任务后,调度程序将关闭。 增强功能和潜在问题 这个调度器还有很多可以做得更好的地方,包括: 更好地验证文件(即检查重复的任务名称、循环引用) 资源的优先级 - 例如,在查看计算 2 之前填写计算 1...
  • 1.如果对应服务依赖都正常情况下,请参考下面解决方案 进入注册表(cmd–> regedit ,依次找到 HKEY_LOCAL_MACHINE\HKEY_LOCAL_MACHINE\SOFTWARE\MICROSOFT\RPC\INTERNET,删除INTERNET,重启服务器 ...

    1.如果对应服务依赖都正常情况下,请参考下面解决方案
    进入注册表(cmd–> regedit ,依次找到 HKEY_LOCAL_MACHINE\HKEY_LOCAL_MACHINE\SOFTWARE\MICROSOFT\RPC\INTERNET,删除INTERNET,重启服务器
    注:删除前请导出备份

    展开全文
  • 转载自: How can I shutdown Spring task executor/scheduler pools before all other beans in the web app are destroyed?
    展开全文
  • 微信号:519292115 ... 尊重原创,禁止转载!! ...Spark目前是大数据领域中最火的框架之一,可高效实现离线批处理,实时计算和机器学习等...Task作为Spark的最小执行单元在DAGScheduler划分好Stage之后会提交给TaskSch

     

    尊重原创,禁止转载!!

     

    Spark目前是大数据领域中最火的框架之一,可高效实现离线批处理,实时计算和机器学习等多元化操作,阅读源码有助你加深对框架的理解和认知

     

    Task作为Spark的最小执行单元在DAGScheduler划分好Stage之后会提交给TaskSchedulerImpl的实现子类 (比如像yarn模式的YarnScheduler)来部署分配每个Task
    在这个章节中,将涉及到Task的数据本地化级别,TaskSchedulerImpl的资源调度机制,Task在Executor上的分发部署,Netty通信等..

    建议看下博主的前几篇介绍Spark不同组件的文章,里面会涉及到跟之前组件的交互 ,有助于加深理解....

    从上篇文章的DAGScheduler创建Task和提交Task开始:
    在之前的文章中我已经介绍过DAGScheduler在划分Stage的时候会计算出每个Task的最佳位置和创建父Stage,最后通过封装所有task的TaskSet提交给TaskSchedulerImpl

    这里以shuffleMapTask为例:

     

    // 开始构建ShuffleMapTask对象,里面封装的主要是它的元数据和runTask方法
      // 补充:Task分为两种:一种是ShuffleMapTask,一种是ResultTask
      new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
        taskBinary, part, locs, properties, serializedTaskMetrics, Option(jobId),
        Option(sc.applicationId), sc.applicationAttemptId)
    }

    提交给TaskSchedulerImpl:

     

     

    taskScheduler.submitTasks(new TaskSet(
      tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))

    首先,这里补充一下,这里的TaskSchedulerImpl以及后面有关的TaskSchedulerBackend都是在SparkContext初始化的时候构建生成的:

     

     

    // Create and start the scheduler
    val (sched, ts) = SparkContext.createTaskScheduler(this, master, deployMode)
    // 集群模式有mesos和yarn两种,都是继承于ExternalClusterManager
    // 这里以YarnClusterManager为例
    val scheduler = cm.createTaskScheduler(sc, masterUrl)
    val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)
    cm.initialize(scheduler, backend)
    (backend, scheduler)

    这里我们只看createTaskScheduler,至于创建SchedulerBackend我们待会再说:

     

     

    override def createTaskScheduler(sc: SparkContext, masterURL: String): TaskScheduler = {
      sc.deployMode match {
          // 创建集群模式的调度器
        case "cluster" => new YarnClusterScheduler(sc)
        case "client" => new YarnScheduler(sc)
        case _ => throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
      }
    }
    private[spark] class YarnClusterScheduler(sc: SparkContext) extends YarnScheduler(sc) {
    
      logInfo("Created YarnClusterScheduler")
    
      override def postStartHook() {
        ApplicationMaster.sparkContextInitialized(sc)
        super.postStartHook()
        logInfo("YarnClusterScheduler.postStartHook done")
      }
    
    }

    其实yarn模式的调度器实现主要还是复写了机架感知:

     

     

    private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
    
      // RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
      if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
        Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
      }
    
      // By default, rack is unknown
      override def getRackForHost(hostPort: String): Option[String] = {
        val host = Utils.parseHostPort(hostPort)._1
        Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
      }
    }

    其实整个task的调度的通用核心逻辑都是在TaskSchedulerImpl中实现的,接着看提交Task:
    这里有三个地方比较重要:
    ①创建TaskSetManager :一个TaskSet对应一个TaskSetManager (一个Stage对应一个TaskSet),里面的核心方法待会讲
    ②把创建好的TaskSetManager 加入到稍后会被调度算法排序的调度池中的schedulableQueue队列里
    ③调用SchedulerBackend匹配资源

     

     

    override def submitTasks(taskSet: TaskSet) {
      // 拿到存放tasks的数组
      val tasks = taskSet.tasks
      logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
      this.synchronized {
        // 创建TaskSetManager,它会跟踪每个task,如果有失败的task就根据重试次数重新提交
        // 还包括计算数据本地化,构建TaskDescription等
        val manager = createTaskSetManager(taskSet, maxTaskFailures)
        val stage = taskSet.stageId
        // 把manager加入taskSetsByStageIdAndAttempt中,如果以前有就更新,没有就新增
        val stageTaskSets =
        // taskSetsByStageIdAndAttempt维护的是每个stage在不同尝试ID对应的TaskSetManager
          taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
        // 把刚创建的TaskSetManager替换掉以前的,没有就新增
        stageTaskSets(taskSet.stageAttemptId) = manager
        // 判断case 到的是否存在 ,返回值为Boolean
        // 当Task被完成后isZombie会被标记成true,默认为false
        val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
          ts.taskSet != taskSet && !ts.isZombie
        }
        // TaskSet有冲突情况抛错
        if (conflictingTaskSet) {
          throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
            s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
        }
        // 默认使用的是FIFO调度器(另一种是FAIR公平调度器)
        // 所以这里会使用FIFOSchedulableBuilder.addTaskSetManager把TaskSetManager加入到schedulableQueue
        // 默认后面的代码会调用FIFO调度算法对schedulableQueue使用自己的比较算法对里面的schedulableQueue排序
        schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
    
        if (!isLocal && !hasReceivedTask) {
          // 启动一个线程 定时检查提交的Task是否有被运行
          starvationTimer.scheduleAtFixedRate(new TimerTask() {
            override def run() {
              // 如果没运行说明集群的资源被占用 还没空闲出来
              if (!hasLaunchedTask) {
                logWarning("Initial job has not accepted any resources; " +
                  "check your cluster UI to ensure that workers are registered " +
                  "and have sufficient resources")
              } else {
                // 如果提交的Task运行了 就关闭这个Timer线程
                this.cancel()
              }
            }
          }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
        }
        hasReceivedTask = true
      }
      // 默认使用的SchedulerBackend是CoarseGrainedSchedulerBackend,也是在SparkContext生成
      backend.reviveOffers()
    }

    schedulableBuilder根据不同的调度模式初始化也是不同的,而初始化也是在SparkContext中构建好了TaskSchedulerImpl和TaskSchedulerBackend后生成:

     

     

    cm.initialize(scheduler, backend)
    def initialize(backend: SchedulerBackend) {
      this.backend = backend
      schedulableBuilder = {
        schedulingMode match {
          case SchedulingMode.FIFO =>
            new FIFOSchedulableBuilder(rootPool)
          case SchedulingMode.FAIR =>
            new FairSchedulableBuilder(rootPool, conf)
          case _ =>
            throw new IllegalArgumentException(s"Unsupported $SCHEDULER_MODE_PROPERTY: " +
            s"$schedulingMode")
        }
      }
      // FIFO不会做任何事情
      schedulableBuilder.buildPools()
    }

    默认匹配到的模式是FIFO:

     

     

    // default scheduler is FIFO
    // 默认的是FIFO先进先出的调度器
    private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
    val schedulingMode: SchedulingMode =
      try {
        // 拿到调度器名字,当然你可以提前配置FIAR模式
        SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
      } catch {
        case e: java.util.NoSuchElementException =>
          throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
      }

    回到添加TaskSetManager方法:这里会把它添加到schedulableQueue,后面会对这个队列做调度算法的排序

     

     

    private[spark] class FIFOSchedulableBuilder(val rootPool: Pool)
      extends SchedulableBuilder with Logging {
    
      override def buildPools() {
        // nothing
      }
    
      override def addTaskSetManager(manager: Schedulable, properties: Properties) {
        // 里面会加入调度池的schedulableQueue中
        rootPool.addSchedulable(manager)
      }
    }
    override def addSchedulable(schedulable: Schedulable) {
      // 做个断言
      require(schedulable != null)
      // 加入到schedulableQueue中,后面会对这个数据结构进行自定义的排序比较算法
      schedulableQueue.add(schedulable)
      // 存放的是调度器名字和自己的标识
      schedulableNameToSchedulable.put(schedulable.name, schedulable)
      // 标记自己的调度池
      schedulable.parent = this
    }

    最后开始匹配资源:

     

     

    // 默认使用的SchedulerBackend是CoarseGrainedSchedulerBackend,也是在SparkContext生成
    backend.reviveOffers()

    首先看下backend是哪里生成的:还是SparkContext

     

     

    // 集群模式有mesos和yarn两种,都是继承于ExternalClusterManager
    // 这里以YarnClusterManager为例
    val scheduler = cm.createTaskScheduler(sc, masterUrl)
    val backend = cm.createSchedulerBackend(sc, masterUrl, scheduler)

    这里还是以Yarn集群模式为例:

     

     

    override def createSchedulerBackend(sc: SparkContext,
        masterURL: String,
        scheduler: TaskScheduler): SchedulerBackend = {
      sc.deployMode match {
        case "cluster" =>
          new YarnClusterSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
        case "client" =>
          new YarnClientSchedulerBackend(scheduler.asInstanceOf[TaskSchedulerImpl], sc)
        case  _ =>
          throw new SparkException(s"Unknown deploy mode '${sc.deployMode}' for Yarn")
      }

    Yarn集群模式复写了启动方法和获取DriverLog方法:

     

     

    private[spark] class YarnClusterSchedulerBackend(
        scheduler: TaskSchedulerImpl,
        sc: SparkContext)
      extends YarnSchedulerBackend(scheduler, sc) {
    

     

    YarnSchedulerBackend其实就是继承于CoarseGrainedSchedulerBackend,复写了启停以及与APPMaster(Driver)交互的方法

     

    private[spark] abstract class YarnSchedulerBackend(
        scheduler: TaskSchedulerImpl,
        sc: SparkContext)
      extends CoarseGrainedSchedulerBackend(scheduler, sc.env.rpcEnv) {

    而其他核心逻辑主要实现在CoarseGrainedSchedulerBackend,比如刚刚的backend.reviveOffers():

     

     

    override def reviveOffers() {
      // 向Driver端的RpcEndpointRef发送一个异步请求,Driver端会调用receive来对应处理
      // 补充:RpcEndpointRef是Driver端的Enpoint的引用
      driverEndpoint.send(ReviveOffers)
    }

    这里会调用DriverEnpoint的引用 触发它的receive单向消息接受方法(这里的Netty通信之前章节介绍过)

     

     

    case ReviveOffers =>
      makeOffers()

    这里会做以下几点:

    ①过滤掉不可用的之前executor注册到Driver上的元数据

    ②把过滤好的每个executor元数据简单封装成WorkerOffer递交给TaskSchedulerImpl做Task划分,最后返回每个Task封装好的TaskDescription

    ③预备部署Task到每个executor上

     

    private def makeOffers() {
      // Make sure no executor is killed while some task is launching on it
      val taskDescs = CoarseGrainedSchedulerBackend.this.synchronized {
        // Filter out executors under killing
        // 注意:这里的executorDataMap维护着各个ExecutorID和自己的元数据信息(地址,端口,可用cpu等)
        // 在executor注册自己到Drvier上的时候会把自己的ExecutorData信息put进Driver的executorDataMap
        val activeExecutors = executorDataMap.filterKeys(executorIsAlive)
        // 拿到所有的executor的可用资源
        val workOffers = activeExecutors.map { case (id, executorData) =>
          // 封装的WorkerOffer代表了executor上空闲可用的资源以及它的地址信息等
          // 里面只有三个成员属性:ExecutorId,主机地址,空闲的核数
          new WorkerOffer(id, executorData.executorHost, executorData.freeCores)
        }.toIndexedSeq
        // 里面会给每个task的相关信息做标记 并且返回每个task生成的TaskDescription
        scheduler.resourceOffers(workOffers)
      }
      // 判断返回的TaskDescription数组是否为空
      if (!taskDescs.isEmpty) {
        launchTasks(taskDescs)
      }
    }

    首先我们看下executorDataMap为何物:它里里面维护着每个executor它对应的元数据(端口,host,可用核数等),每个executor在启动自己的时候 会同时把自己的元数据注册到Driver上

     

     

    val data = new ExecutorData(executorRef, executorRef.address, hostname,
      cores, cores, logUrls)
    // This must be synchronized because variables mutated
    // in this block are read when requesting executors
    CoarseGrainedSchedulerBackend.this.synchronized {
      executorDataMap.put(executorId, data)

    然后把拿到的每个executor元数据简单封装成WorkerOffer

     

     

    private[spark]
    case class WorkerOffer(executorId: String, host: String, cores: Int)

    然后提交给TaskSchedulerImpl做资源匹配:

     

    这会做以下几件事:

    ①为每个Task,Executor,Host,Rack打标记,添加到对应的集合里面

    ②把非黑名单的WorkerOffer随机断乱做成待会存放TaskDescription的数组和每个executor可用核数的数组

    ③调度池按照对应调度模式对每个TaskSetManager划分计算

    ④计算出每个task的本地级别

    ⑤拿到了以上的元数据后封装每个Task为TaskDescription

     

    // 集群给各个executor分配资源,按照循环的方式把task填充到每个节点
    // 通俗点说就是,第一轮按顺序填充task到每个节点,若还有多余的task没有填充满又重复第一轮的动作
    def resourceOffers(offers: IndexedSeq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
      // Mark each slave as alive and remember its hostname
      // Also track if new executor is added
      // 这里会标记每个executor和host的关系,executor和正在运行的Task的关系
      var newExecAvail = false
      for (o <- offers) {
        // hostToExecutors里维护着每个节点上已经激活的所有的executor
        if (!hostToExecutors.contains(o.host)) {
          // 如果不包含就把这个节点注册进hostToExecutors
          hostToExecutors(o.host) = new HashSet[String]()
        }
        // executorIdToRunningTaskIds里维护着每个executor中运行中的每个task
        if (!executorIdToRunningTaskIds.contains(o.executorId)) {
          // 把当前executorID添加到指定的host主机上作为标识
          hostToExecutors(o.host) += o.executorId
          // 这里面实现的是查看failedEpoch之前有没有标记这个executor
          // failedEpoch中维护的是之前被发现由于各种原因导致丢失报错的executor
          // 如果有就把他这个标记删除掉
          executorAdded(o.executorId, o.host)
          // 维护着每个executor对应的host
          executorIdToHost(o.executorId) = o.host
          // 之后的taskId都会加入到这里面对应的executorId中
          executorIdToRunningTaskIds(o.executorId) = HashSet[Long]()
          newExecAvail = true
        }
        // 拿到host对应的机架
        // 如果是yarn模式,则会调用YarnScheduler.getRackForHost
        for (rack <- getRackForHost(o.host)) {
          // hostsByRack维护着每个host对应的机架,get不出来就更新进去
          hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
        }
      }
    
      // Before making any offers, remove any nodes from the blacklist whose blacklist has expired. Do
      // this here to avoid a separate thread and added synchronization overhead, and also because
      // updating the blacklist is only relevant when task offers are being made.
      blacklistTrackerOpt.foreach(_.applyBlacklistTimeout())
    
      // 任何被拉入黑名单的节点都会在分配资源之前被移除掉
      // 这里会遍历黑名单中的节点和executor
      val filteredOffers = blacklistTrackerOpt.map { blacklistTracker =>
        offers.filter { offer =>
          !blacklistTracker.isNodeBlacklisted(offer.host) &&
            !blacklistTracker.isExecutorBlacklisted(offer.executorId)
        }
      }.getOrElse(offers)
    
      // 把拿到的offers随机打乱顺序 避免把tasks分配到同一个worker上
      val shuffledOffers = shuffleOffers(filteredOffers)
      // Build a list of tasks to assign to each worker.
      // 对每一个offer生成一个核数大小长度并且类型为TaskDescription的ArrayBuffer可变数组里
      // 用来存放待会分配好的每个TaskDescription
      val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
      // 返回的是每个executor中可以用的核数,最后把他们封装在一个Array数组里
      val availableCpus = shuffledOffers.map(o => o.cores).toArray
      // 这里会根据不同的调度模式对TaskSetManager匹配不同的调度算法
      // 补充:调度池会在初始化TaskSchedulerImpl的任何子类的时候根据不同的调度模式new出来
      val sortedTaskSets = rootPool.getSortedTaskSetQueue
      for (taskSet <- sortedTaskSets) {
        logDebug("parentName: %s, name: %s, runningTasks: %s".format(
          taskSet.parent.name, taskSet.name, taskSet.runningTasks))
        // 之前标记完了executor,host,task的对应关系后会设置成true
        if (newExecAvail) {
          // 里面会调用TaskSetManager本地级别的分配算法,为每个task分配计算本地级别的等级
          taskSet.executorAdded()
        }
      }
    
      // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
      // of locality levels so that it gets a chance to launch local tasks on all of them.
      // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
      // 把所有TaskSet按照本地级别的顺序开始分配到各个节点
      for (taskSet <- sortedTaskSets) {
        var launchedAnyTask = false
        var launchedTaskAtCurrentMaxLocality = false
        // 遍历当前taskSet的所有本地级别
        for (currentMaxLocality <- taskSet.myLocalityLevels) {
          do {
            // 这里面主要是在做对task相关的标记,为每个task构建TaskDescription,返回值为Boolean值
            launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
              taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
            launchedAnyTask |= launchedTaskAtCurrentMaxLocality
            // 若返回为false继续循环
          } while (launchedTaskAtCurrentMaxLocality)
        }
        // 若返回为fasel 则把这个host上的executor拉入黑名单
        if (!launchedAnyTask) {
          taskSet.abortIfCompletelyBlacklisted(hostToExecutors)
        }
      }
    
      if (tasks.size > 0) {
        hasLaunchedTask = true
      }
      // 返回封装着TaskDescription数组
      return tasks
    }

    从①阶段的时候主要是为每个host,executor和Rack做标记,而机架感知是调用的Yarn复写的机架感知:

     

     

    private[spark] class YarnScheduler(sc: SparkContext) extends TaskSchedulerImpl(sc) {
    
      // RackResolver logs an INFO message whenever it resolves a rack, which is way too often.
      if (Logger.getLogger(classOf[RackResolver]).getLevel == null) {
        Logger.getLogger(classOf[RackResolver]).setLevel(Level.WARN)
      }
    
      // By default, rack is unknown
      override def getRackForHost(hostPort: String): Option[String] = {
        val host = Utils.parseHostPort(hostPort)._1
        Option(RackResolver.resolve(sc.hadoopConfiguration, host).getNetworkLocation)
      }
    }

    ②阶段很简单 只是把非黑名单的WorkerOffer随机断乱顺序,避免都把task分配到相同的host上,并且乱做成待会存放TaskDescription的数组和每个executor可用核数的数组

     

     

    /**
     * Shuffle offers around to avoid always placing tasks on the same workers.  Exposed to allow
     * overriding in tests, so it can be deterministic.
     */
    protected def shuffleOffers(offers: IndexedSeq[WorkerOffer]): IndexedSeq[WorkerOffer] = {
      Random.shuffle(offers)
    }

    ③阶段是核心的调度算法了,这里会对TaskSetManager做对应的排序:

     

     

    // 这里会根据不同的调度模式对TaskSetManager匹配不同的调度算法
    // 补充:调度池会在初始化TaskSchedulerImpl的任何子类的时候根据不同的调度模式new出来
    val sortedTaskSets = rootPool.getSortedTaskSetQueue

    补充:调度池会在初始化TaskSchedulerImpl的时候根据不同调度模式初始化(默认是FIFO)

     

     

    // default scheduler is FIFO
    // 默认的是FIFO先进先出的调度器
    private val schedulingModeConf = conf.get(SCHEDULER_MODE_PROPERTY, SchedulingMode.FIFO.toString)
    val schedulingMode: SchedulingMode =
      try {
        // 拿到调度器名字,当然你可以提前配置FIAR模式
        SchedulingMode.withName(schedulingModeConf.toUpperCase(Locale.ROOT))
      } catch {
        case e: java.util.NoSuchElementException =>
          throw new SparkException(s"Unrecognized $SCHEDULER_MODE_PROPERTY: $schedulingModeConf")
      }
    
    val rootPool: Pool = new Pool("", schedulingMode, 0, 0)

    进入getSortedTaskSetQueue:

     

     

    override def getSortedTaskSetQueue: ArrayBuffer[TaskSetManager] = {
      // 用来存放TaskSetManager的ArrayBuffer
      var sortedTaskSetQueue = new ArrayBuffer[TaskSetManager]
      // 这里会根据不同的调度模式 对Job和stage按照对应算法进行调度
      val sortedSchedulableQueue =
        // 之前的schedulableBuilder会把taskSetMananger加入到schedulableQueue中
        // schedulableQueue结构是java.util.concurrent.ConcurrentLinkedQueue线程安全的FIFO链表
        // 转换成scala的seq序列并按照匹配到的调度模式做对应的排序规则
        // 简单的说就是对之前放入队列里的TaskSetManager做对应的调度模式的排序算法
        schedulableQueue.asScala.toSeq.sortWith(taskSetSchedulingAlgorithm.comparator)
      for (schedulable <- sortedSchedulableQueue) {
        // 把遍历出的TaskSetManager按顺序放入
        sortedTaskSetQueue ++= schedulable.getSortedTaskSetQueue
      }
      // 返回排序好的TaskSetMananger队列
      sortedTaskSetQueue
    }

    首先说下schedulableQueue,这个在上面的代码里出现过,忘记的可以看看前面的代码,在调用submitTasks的时候就会把TaskSetManager加入进去,而里面存放的是全局所有的TaskSetManager

    下面来看下taskSetSchedulingAlgorithm.comparator的实现:因为默认是FIFO调度算法 所以我们这里以此为例:先比较JobId,若是相同的Job就比较StageId,顺序是从小到大

     

    /**
     * An interface for sort algorithm
     * FIFO: FIFO algorithm between TaskSetManagers
     * FS: FS algorithm between Pools, and FIFO or FS within Pools
     */
    private[spark] trait SchedulingAlgorithm {
      def comparator(s1: Schedulable, s2: Schedulable): Boolean
    }
    
    private[spark] class FIFOSchedulingAlgorithm extends SchedulingAlgorithm {
      override def comparator(s1: Schedulable, s2: Schedulable): Boolean = {
        // 这里比较的其实是TaskSetManager的优先级,优先级是通过Taskset传进来的,而Taskset的优先级就是JobId
        // 之前DGAScheduler生成TaskSet的时候会把当前JobId传进去,之前章节介绍过,优先指数是顺序从小到大
        val priority1 = s1.priority
        val priority2 = s2.priority
        // 底层调用的是java.lang.Integer.signum
        // 意思是priority1 - priority2 =负数 返回-1 ; =0 返回0 ; = 整数 返回1
        var res = math.signum(priority1 - priority2)
        if (res == 0) {
          // 如果优先级相等的话 就比较StageID
          // 这种情况会发生在同一个job中,不同stage任务之间的调度顺序
          val stageId1 = s1.stageId
          val stageId2 = s2.stageId
          res = math.signum(stageId1 - stageId2)
        }
        // 最后返回的Boolean值决定于res是否小于0
        res < 0
      }
    }

    ④,对排序好的TaskSetManager划分本地级别算法:

     

    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      // 之前标记完了executor,host,task的对应关系后会设置成true
      if (newExecAvail) {
        // 里面会调用TaskSetManager本地级别的分配算法,为每个task分配计算本地级别的等级
        taskSet.executorAdded()
      }
    }
     def executorAdded() {
        recomputeLocality()
      }
    }
    def recomputeLocality() {
      // currentLocalityIndex为当前本地级别,默认从0开始,如果不匹配或者等待资源时间(3s)超时的话级别+1
      // 这里返回的是当前级别的level加上ANY级别的数组,所以这里拿到的是索引为0的元素->当前级别的level
      val previousLocalityLevel = myLocalityLevels(currentLocalityIndex)
      myLocalityLevels = computeValidLocalityLevels()
      // 这里拿到的对应级别的等待时间
      // 补充:比如像调用的是本地级别,在尝试获取数据的时候 发现executor的核数被占满,需要等待资源释放
      // 若在规定时间内 资源还是被占用 就把本地级别+1
      localityWaits = myLocalityLevels.map(getLocalityWait)
      // 提取到当前level的索引,并更新进currentLocalityIndex
      currentLocalityIndex = getLocalityIndex(previousLocalityLevel)
    }
    /**
     * Compute the locality levels used in this TaskSet. Assumes that all tasks have already been
     * added to queues using addPendingTask.
     *
     */
    // 计算本地级别的算法
    private def computeValidLocalityLevels(): Array[TaskLocality.TaskLocality] = {
      // 分为五种级别,下面条件满足的都会添加到levels中
      import TaskLocality.{PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY}
      // 在初始化TaskSetManager的时候 会把需要执行的所有task加入到pendingTasksForExecutor中
      val levels = new ArrayBuffer[TaskLocality.TaskLocality]
      // 里面维护的是executor和它里面的所有task
      // 需要运算的数据跟当前的task对应的分区在同一个executor中(也就是同一个JVM进程中)
      if (!pendingTasksForExecutor.isEmpty &&
          // 判断可用的executor是否在pendingTasksForExecutor中
          pendingTasksForExecutor.keySet.exists(sched.isExecutorAlive(_))) {
    
        levels += PROCESS_LOCAL
      }
      if (!pendingTasksForHost.isEmpty &&
          pendingTasksForHost.keySet.exists(sched.hasExecutorsAliveOnHost(_))) {
        levels += NODE_LOCAL
      }
      if (!pendingTasksWithNoPrefs.isEmpty) {
        levels += NO_PREF
      }
      if (!pendingTasksForRack.isEmpty &&
          pendingTasksForRack.keySet.exists(sched.hasHostAliveOnRack(_))) {
        levels += RACK_LOCAL
      }
      // 最后还会加入ANY级别进去
      levels += ANY
      logDebug("Valid locality levels for " + taskSet + ": " + levels.mkString(", "))
      levels.toArray
    }

    上面可以看得出获取本地级别 就是看不同类型的PendingTask的Key是包含在对应的Alive里:所以这里要搞清楚的就是这2个集合到底是什么:

     

    首先看看PendingTask:

     

    /** Add a task to all the pending-task lists that it should be on. */
    private def addPendingTask(index: Int) {
      // preferredLocations这个方法上个章节介绍过,其实就是在生成Task的时候调用taskIdToLocations
      // 补充下:只有之前持久化过或者之前产生过任何类型的task,preferredLocations才会返回值,不然返回值为Nil
      // 拿到每个task的最佳位置
      for (loc <- tasks(index).preferredLocations) {
        loc match {
          case e: ExecutorCacheTaskLocation =>
            // pendingTasksForExecutor里面维护的是每个executor和它对应的等待运行的所有Task
            // 如果get不出来的话就把该索引值加入进去
            pendingTasksForExecutor.getOrElseUpdate(e.executorId, new ArrayBuffer) += index
          case e: HDFSCacheTaskLocation =>
            // 拿到指定host的所有executor并放入set返回
            val exe = sched.getExecutorsAliveOnHost(loc.host)
            exe match {
              case Some(set) =>
                for (e <- set) {
                  // 走到这说明之前有持久化到HDFS上的task,依次把task的索引值放入pendingTasksForExecutor
                  pendingTasksForExecutor.getOrElseUpdate(e, new ArrayBuffer) += index
                }
                logInfo(s"Pending task $index has a cached location at ${e.host} " +
                  ", where there are executors " + set.mkString(","))
              case None => logDebug(s"Pending task $index has a cached location at ${e.host} " +
                  ", but there are no executors alive there.")
            }
          case _ =>
        }
        // pendingTasksForHost 维护着每个host中所有待执行的task
        pendingTasksForHost.getOrElseUpdate(loc.host, new ArrayBuffer) += index
        // 拿到host对应的机架,若是yarn cluster模式就调用的
        for (rack <- sched.getRackForHost(loc.host)) {
          // pendingTasksForRack维护着每个机架中所有的task,get不出来就更新进去
          pendingTasksForRack.getOrElseUpdate(rack, new ArrayBuffer) += index
        }
      }
      // 若之前没有持久化过并且也没有产生过任何类型的task 那么返回就为Nil
      if (tasks(index).preferredLocations == Nil) {
        pendingTasksWithNoPrefs += index
      }
      // 这里放入的是所有的需要执行的task
      allPendingTasks += index  // No point scanning this whole list to find the old task there
    }

     

    而调用addPendingTask是在TaskSetManager初始化的时候自动调用的:

     

    // Add all our tasks to the pending lists. We do this in reverse order
    // of task index so that tasks with low indices get launched first.
    // 这里会在初始化TaskSetManager的时候调用
    for (i <- (0 until numTasks).reverse) {
      // 这里会把TaskSet中所有的task加入到各种对应关系的数据结构中
      // 比如跟executor,host,rack等的对应关系的数据结构中
      addPendingTask(i)
    }

     

     

     

    现在我们再看下Alive方法:这里以PROCESS_LOCAL级别为例:

     

    def isExecutorAlive(execId: String): Boolean = synchronized {
      // 正常情况下在之前调用resourceOffers的时候就会把可用的executorId加入到executorIdToRunningTaskIds
      executorIdToRunningTaskIds.contains(execId)
    }

    在TaskSetManager拿到了所有本地级别后调用resourceOfferSingleTaskSet 里面会为每个task创建TaskDescription,用来后面启动task用

     

     

    // 这里面主要是在做对task相关的标记,为每个task构建TaskDescription,返回值为Boolean值
    launchedTaskAtCurrentMaxLocality = resourceOfferSingleTaskSet(
      taskSet, currentMaxLocality, shuffledOffers, availableCpus, tasks)
    private def resourceOfferSingleTaskSet(
        taskSet: TaskSetManager,
        maxLocality: TaskLocality,
        shuffledOffers: Seq[WorkerOffer],
        availableCpus: Array[Int],
        tasks: IndexedSeq[ArrayBuffer[TaskDescription]]) : Boolean = {
      var launchedTask = false
      // nodes and executors that are blacklisted for the entire application have already been
      // filtered out by this point
      // 这里其实就是遍历每个可以用的executor(一个WorkerOffer对应一个executor)
      for (i <- 0 until shuffledOffers.size) {
        // 拿到当前executor的ID和host地址
        val execId = shuffledOffers(i).executorId
        val host = shuffledOffers(i).host
        // 如果当前的executor的可用核数大于等于CPUS_PER_TASK
        // CPUS_PER_TASK为运行每个Task必须的Cpu个数,默认是1
        // 如果返回为true 说明这个executor可用来执行至少一个task
        if (availableCpus(i) >= CPUS_PER_TASK) {
          try {
            // resourceOffer主要用来对每个task做标记,最后返回每个task的TaskDescription
            for (task <- taskSet.resourceOffer(execId, host, maxLocality)) {
              // tasks用来存放所有的TaskDescription
              tasks(i) += task
              val tid = task.taskId
              // 做相关的标记
              taskIdToTaskSetManager(tid) = taskSet
              taskIdToExecutorId(tid) = execId
              executorIdToRunningTaskIds(execId).add(tid)
              availableCpus(i) -= CPUS_PER_TASK
              assert(availableCpus(i) >= 0)
              // 把返回值标记成true
              launchedTask = true
            }
          } catch {
            case e: TaskNotSerializableException =>
              logError(s"Resource offer failed, task set ${taskSet.name} was not serializable")
              // Do not offer resources for this task, but don't throw an error to allow other
              // task sets to be submitted.
              return launchedTask
          }
        }
      }
      return launchedTask
    }

    我们来看看里面最核心的方法resourceOffer:从TaskSetManager中提取出每个task 并封装成TaskDescription

     

     

    def resourceOffer(
        execId: String,
        host: String,
        maxLocality: TaskLocality.TaskLocality)
      : Option[TaskDescription] =
    {
      // 首先判断当前的executor或者host是否在黑名单中
      val offerBlacklisted = taskSetBlacklistHelperOpt.exists { blacklist =>
        blacklist.isNodeBlacklistedForTaskSet(host) ||
          blacklist.isExecutorBlacklistedForTaskSet(execId)
      }
      if (!isZombie && !offerBlacklisted) {
        // 获取当前时间
        val curTime = clock.getTimeMillis()
        // 当前最优的Task本地化级别
        var allowedLocality = maxLocality
        // 如果级别不是NO_PREF
        if (maxLocality != TaskLocality.NO_PREF) {
          // 这里会拿到这个task其他可用的本地级别
          allowedLocality = getAllowedLocalityLevel(curTime)
          if (allowedLocality > maxLocality) {
            // We're not allowed to search for farther-away tasks
            // 这里只会取最小的本地级别
            allowedLocality = maxLocality
          }
        }
        // 遍历出指定host上executor所有需要执行的task索引和它的本地化级别
        dequeueTask(execId, host, allowedLocality).map { case ((index, taskLocality, speculative)) =>
          // Found a task; do some bookkeeping and return a task description
          // 获取task
          val task = tasks(index)
          // 调用getAndIncrement生成一个taskID
          val taskId = sched.newTaskId()
          // Do various bookkeeping
          // 把这个task的索引标记在copiesRunning
          copiesRunning(index) += 1
          val attemptNum = taskAttempts(index).size
          // 生成一个TaskInfo 里面注入了这个task的所有元数据
          val info = new TaskInfo(taskId, index, attemptNum, curTime,
            execId, host, taskLocality, speculative)
          // taskInfos维护的是每个task对应的元数据信息
          taskInfos(taskId) = info
          // 加入到taskAttempts中
          taskAttempts(index) = info :: taskAttempts(index)
          // Update our locality level for delay scheduling
          // NO_PREF will not affect the variables related to delay scheduling
          if (maxLocality != TaskLocality.NO_PREF) {
            currentLocalityIndex = getLocalityIndex(taskLocality)
            lastLaunchTime = curTime
          }
          // Serialize and return the task
          // 序列化这个Task
          val serializedTask: ByteBuffer = try {
            ser.serialize(task)
          } catch {
            // If the task cannot be serialized, then there's no point to re-attempt the task,
            // as it will always fail. So just abort the whole task-set.
            case NonFatal(e) =>
              val msg = s"Failed to serialize task $taskId, not attempting to retry it."
              logError(msg, e)
              abort(s"$msg Exception during serialization: $e")
              throw new TaskNotSerializableException(e)
          }
          // 这里会判断task序列化后的大小是否达到了警告级别的阈值
          if (serializedTask.limit > TaskSetManager.TASK_SIZE_TO_WARN_KB * 1024 &&
            !emittedTaskSizeWarning) {
            emittedTaskSizeWarning = true
            logWarning(s"Stage ${task.stageId} contains a task of very large size " +
              s"(${serializedTask.limit / 1024} KB). The maximum recommended task size is " +
              s"${TaskSetManager.TASK_SIZE_TO_WARN_KB} KB.")
          }
          // 加入到调度池的runningTasks和TaskSetMananger的runningTasksSet
          // 就是标记成正在运行的task
          addRunningTask(taskId)
    
          // We used to log the time it takes to serialize the task, but task size is already
          // a good proxy to task serialization time.
          // val timeTaken = clock.getTime() - startTime
          // 格式化task的名字
          val taskName = s"task ${info.id} in stage ${taskSet.id}"
          logInfo(s"Starting $taskName (TID $taskId, $host, executor ${info.executorId}, " +
            s"partition ${task.partitionId}, $taskLocality, ${serializedTask.limit} bytes)")
          // 这里主要是告诉给ExecutorAllocationListener和JobProgressListener, Task启动了
          // 然后对这个task的相关信息做标记
          sched.dagScheduler.taskStarted(task, info)
          // 生成一个TaskDescription
          // 标记着这个task在那个host的哪个executor执行
          // 以及需要添加到executor的Classpath上的所有Jar包和File
          new TaskDescription(
            taskId,
            attemptNum,
            execId,
            taskName,
            index,
            sched.sc.addedFiles,
            sched.sc.addedJars,
            task.localProperties,
            serializedTask)
        }
      } else {
        None
      }
    }

    最后回到CoarseGrainedSchedulerBackend,它会开始为每个Executor分发TaskDescription

     

     

    if (!taskDescs.isEmpty) {
      launchTasks(taskDescs)
    }

     

     

     

     

    // Launch tasks returned by a set of resource offers
    private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      // 遍历每个executor上的所有TaskDescription
      for (task <- tasks.flatten) {
        // 首序列化每个task
        val serializedTask = TaskDescription.encode(task)
        // 判断序列化后的task大小是否超过阈值
        if (serializedTask.limit >= maxRpcMessageSize) {
          // 如果超过阈值会对每个提取出来的TaskSetManager执行终止操作
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
                "spark.rpc.message.maxSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          // 若满足阈值之内的大小,首先提取出executor对应的ExecutorData元数据
          // executorDataMap之前介绍过,在executor注册自己的Driver上的时候就会注册
          // 自己的元数据到Driver上
          val executorData = executorDataMap(task.executorId)
          // executor上总共的cpu个数减去每个task需要的cpu个数(默认1个)
          // 也就是当前的task会占用一个cpu
          executorData.freeCores -= scheduler.CPUS_PER_TASK
    
          logDebug(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")
          // executorEndpoint为当前Task所在的executor的RPCEndpoint引用(之前章节介绍过)
          // 这里会给这个executor发送一个异步执行task消息
    
          // 注意:最后真正触发task启动的是Executor而不是CoarseGrainedExecutorBackend
          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

     

    这里Driver端会用每个executorEndpoint的引用发送启动task的单向事件消息,事件消息里封装了序列化后的TaskDescription,对应的executor会调用receive接受到并匹配,最后把TaskDescription封装成继承Java线程的Runnable 调用用线程池去run

     

    override def receive: PartialFunction[Any, Unit] = {
      case RegisteredExecutor =>
        logInfo("Successfully registered with driver")
        try {
          executor = new Executor(executorId, hostname, env, userClassPath, isLocal = false)
        } catch {
          case NonFatal(e) =>
            exitExecutor(1, "Unable to create executor due to " + e.getMessage, e)
        }
    
      case RegisterExecutorFailed(message) =>
        exitExecutor(1, "Slave registration failed: " + message)
    
      case LaunchTask(data) =>
        if (executor == null) {
          exitExecutor(1, "Received LaunchTask command but executor was null")
        } else {
          // 首先会反序列化传输过来的TaskDescription
          val taskDesc = TaskDescription.decode(data.value)
          logInfo("Got assigned task " + taskDesc.taskId)
          // 开始在executor自己进程中启动task
          // executor会在CoarseGrainedExecutorBackend触onStart时,把自己的元数据注册到Driver上之后
          // Driver发送一个send单向消息通知CoarseGrainedExecutorBackend构建自己的Executor
          executor.launchTask(this, taskDesc)
        }

    最后交由Executor的launchTask:

     

     

    def launchTask(context: ExecutorBackend, taskDescription: TaskDescription): Unit = {
      // 这里会把TaskDescription和ExecutorBackend(默认是:CoarseGrainedExecutorBackend)
      // 封装成继承Runnable的TaskRunner,待会回交给线程池调用
      val tr = new TaskRunner(context, taskDescription)
      // 放入负责维护所有正在此executor上运行的task的ConcurrentHashMap中
      runningTasks.put(taskDescription.taskId, tr)
      // threadPool线程池:java.util.concurrent.ThreadPoolExecutor
      // 执行TaskRunner
      threadPool.execute(tr)
    }

    TaskRunner中会复写run方法 并且最后会调用task.run:

     

     

    // 开始运行task
    val res = task.run(

    补充:Task分为两种 一种是ShuffleMapTask  第二种是ResultTask

    以ShuffleMapTask为例,他首先会从排序好的Task对应的RDD调用iterator来计算是否有持久化过 若没有就调用computer(每种RDD的实现都不一样),最后到shuffledRDD就会调用shuffleManager(默认是SortShuffleManager)来构建一个Reader,里面最终会调用BlockStoreShuffleReader.read来fetch远程各个Executor的分区,最后再会构建一个Writer把拿到的计算结果封装成bucket写入磁盘,至此一个shuffleStage结束;具体细节比较多,留着以后再起章节说

     

     

    展开全文
  • Advanced Task Scheduler是一款多功能的计划任务程序,允许自动执行程序和批处理文件、打开文档和Internet页面、显示提醒事件、播放音乐、关机、重新启动计算机、只关闭显示器、终止正在运行的过程、建立和关闭拨号...
  • Task Scheduler问题及解法

    千次阅读 2017-09-22 11:40:35
    621. Task Scheduler LeetCode

    问题描述:

    Given a char array representing tasks CPU need to do. It contains capital letters A to Z where different letters represent different tasks.Tasks could be done without original order. Each task could be done in one interval. For each interval, CPU could finish one task or just be idle.

    However, there is a non-negative cooling interval n that means between two same tasks, there must be at least n intervals that CPU are doing different tasks or just be idle.

    You need to return the least number of intervals the CPU will take to finish all the given tasks.

    示例:

    Input: tasks = ['A','A','A','B','B','B'], n = 2
    Output: 8
    Explanation: A -> B -> idle -> A -> B -> idle -> A -> B.

    问题分析:

    看看大神们是怎么分析的:

    • 可以把一次调度看成是一个长度为n+1的环。cycle = n + 1. 如果这cycle个坑,必须由不同的task来填。如果没有这么多种类的task,那么剩下的坑cpu就只能空转
    • 那么填坑的时候使用哪些task那,我们尽可能的使用出现次数多的task 因为task不能重复,我们需要尽量使用其他的task来隔开出现次数最多的task,否则就要用idle来隔开他们。
    • 算法的思路就是首先记录出每个task出现的次数,然后把这些次数记录到一个priority_queue中,因为我们只需要知道次数就行了,所以不用再管task的名称。priority_queue默认是大顶堆,也就是出现次数多的task会先出队列。每次遍历时,我们把pq中的task全部出队列,如果这时候cycle被填满了,那么就把task出现的次数全部减1 再添加到pq中去,总的运行时间需要cycle个cpu运转周期;如果cycle没有填满,那么说明需要补充一部分idle来填坑,运行时间同样增长cycle个cpu运转周期。
    • 这里有两点需要注意:(1)把task出现次数添加会pq的时候,需要判断这一轮使用了一个task之后,该task剩余次数是否为0,如果为0,就不能再添加回去了,说明他已经调度完了。(2)如果在这一个cycle完成之后,发现没有task被重新添加到pq中去,说明所有的task都被调度完了,这一次的调度是最后一次,那么只需要添加实际的调度时间,而不是cycle个CPU运转周期。

    过程详见代码:

    class Solution {
    public:
        int leastInterval(vector<char>& tasks, int n) {
            unordered_map<char, int> m;
    		for (int i = 0; i < tasks.size(); ++i) {
    			m[tasks[i]]++;
    		}
    		priority_queue<int> pq;
    		for (auto ite : m){
    			pq.push(ite.second);
    		}
    		int cycle = n + 1, ret = 0;
    		while (!pq.empty()){
    			vector<int> tmp;
    			int time = 0;
    			for (int i = 0; i < cycle; ++i) {
    				if (!pq.empty()){
    					tmp.push_back(pq.top());
    					pq.pop();
    					time++;
    				}
    			}
    
    			for (auto cnt : tmp){
    				int remainCnt = cnt - 1;
    				if (remainCnt > 0)pq.push(remainCnt);
    			}
    			if (pq.empty()) ret += time;//如果是最后一次调度,不在需要idle来填充
    			else ret += cycle;
    		}
    		return ret;
        }
    };


    展开全文
  • taskscheduler 引发了异常

    千次阅读 2017-09-13 21:21:35
    vs2013,win7系统, 出现taskscheduler 引发了异常的错误 卸载干净.netframework 4.6的所有安全更新,就能正常试用
  • spring 3.0版本后,自带了一个定时任务工具,而且使用简单方便,不用配置文件,可以...TaskScheduler接口 TaskScheduler是一个接口 TaskScheduler接口下定义了6个方法 schedule(Runnable task, Trigger trigger); ...
  • /** * 每x毫秒钟执行(如果时间已过立即执行一次) * @param task 任务 * @param startSeconds 执行周期(毫秒) */ public void addEveryMillisecond(Runnable task, long startSeconds) { taskScheduler....
  • 前言 先推荐阅读此篇: 【小家java】Java定时任务ScheduledThreadPoolExecutor详解以及与Timer、TimerTask的区别(执行...Spring通过使用TaskScheduler来完成这些功能。 任务调度框架设计到几个核心的接口,下面做如...
  • Task Scheduler 2.0的相关操作

    千次阅读 2017-02-26 23:05:53
    理解ITaskService对象此对象需要依赖TaskshdWindows服务,如果此服务被关闭,那么此COM对象也将会失效。
  • 通过任务管理器,得手动关闭3次,才能把服务关掉,风扇不再响,但是过了一会,它又自己启动了。然后我就想,既然是计划任务,就让它运行一次,晚上没关机。但今天早上来公司,它还在运行。 尝试在服务管理里和QQ...
  • //注意:不要切换-1和任何具体并发限制在运行时,这将导致不一致的并发数:-1有效的限制完全关闭的并发数。 public void setConcurrencyLimit(int concurrencyLimit) { this.concurrencyThrottle.setConcurrencyLimit...
  • 3、问题原因 DefaultListableBeanFactory类通过classType获取实例,刚好websocket依赖有一个NoOpScheduler交给spring托管就匹配上返回了,导致NacosWatch获取错误的TaskScheduler。 三、解决方案: 1、通过@primary...
  • 从live555的例程... live555靠函数 env-&gt;taskScheduler().doEventLoop();/执行循环方法,来执行循环方法,对套接字的读取事件和对媒体文件的延时发送操作都在这个循环中完成。 源代码如下: ...
  • 然后,在Task Scheduler的任务里面,最后的位置添加一个Action,设置如下: Program:  C:\Windows\System32\WindowsPowerShell\v1.0\powershell.exe Arguments: -command "& 'C:\SendEmailScript\SendMail.ps1...
  • TaskScheduler接口下定义了6个方法: public interface TaskScheduler { ScheduledFuture<?> schedule(Runnable task, Trigger trigger); ScheduledFuture<?> schedule(Runnable task, Date startTime); ...
  • 在spark 源码分析之二 -- SparkContext 的初始化过程中,第 14 步 和 16 步分别描述了 TaskScheduler的 初始化 和 启动过程。 话分两头,先说 TaskScheduler的初始化过程 TaskScheduler的实例化 1 val (sched, ...
  • 2021-07-12 23:50:10.381 [tid: ][skId: ] [SpringContextShutdownHook] INFO o.s.s.c.ThreadPoolTaskScheduler:218 - Shutting down ExecutorService ‘taskScheduler’ 2021-07-12 23:50:12.387
  • 最近需要大量创建闹铃和更换闹钟铃声,所以利用AutoHotKey动手写了一个在Windows中快速添加更新闹铃的小应用. ...因为利用的是Windows自带的TaskScheduler,所以就在使用本工具添加计划后,不需要运行本工具,闹...
  • 如果调度实体是task group,进程组作为一个调度实体存在,它包含若干个task,并且若干个task可能不在相同的cpu上运行,那么这个调度组就相当于一个自成一体的rb tree,跟基本的root_task_group一样,组成一个rb tree...
  • // Start the lightweight task scheduler thread CScheduler::Function serviceLoop = boost::bind(&CScheduler::serviceQueue, &scheduler); threadGroup.create_thread(boost::bind(&TraceThread, "scheduler...
  • 一、MapTask运行机制详解以及Map任务的并行度 整个Map阶段流程大体如上图所示。简单概述:inputFile通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map...
  • 点击上方蓝字关注我们✎编 者 按 2021 年,Apache DolphinScheduler 社区又迎来了新的蓬勃发展,社区活跃度持续提高。目前,项目 GitHub Star 已达...
  • // start TaskScheduler after taskScheduler sets DAGScheduler reference in DAGScheduler's constructor _taskScheduler . start ( ) //下面开始就一些应用程序信息的初始化,比如获得 SparkAppId之类的 ...
  • 操作完成后重启计算机,若还会出现CompatTelRunner或task scheduler占用CPU过高的情况,那么可以重新打开任务计划程序库\Microsoft\Windows\customer experience improvement program,将右侧的3个选项删除即可!...
  • Spring Scheduler的使用与坑

    千次阅读 2018-07-25 10:54:33
    文章目录 1.简介 2.运行 3.Cron 4.技巧 5.... 5.1.... 5.2....它包含了用于定时任务处理的Spring Scheduler。...Spring Scheduler里有两个概念:任务(Task)和运行任务的框架(TaskExecutor/TaskSc...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 14,277
精华内容 5,710
关键字:

scheduler关闭task