2017-01-30 21:29:45 lisi1129 阅读数 348
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    34968 人正在学习 去看看 张长志

上节说到job提交时候进入了taskScheduler.submitTasks(newTaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId,properties)),这节我们就一起了解一下TaskSchedule类和submitTask()方法;

第一步了解taskSchedule类的初始化,此类在sparkContext中被创建,源码见下方;createTaskScheduler函数中,TaskScheduler会根据部署方式而选择不同的SchedulerBackend来处理. 针对不同部署方式会有不同的TaskScheduler与SchedulerBackend进行组合:

  • Local模式:TaskSchedulerImpl + LocalBackend
  • Spark集群模式:TaskSchedulerImpl + SparkDepolySchedulerBackend
  • Yarn-Cluster模式:YarnClusterScheduler + CoarseGrainedSchedulerBackend
  • Yarn-Client模式:YarnClientClusterScheduler + YarnClientSchedulerBackend
TaskScheduler类负责任务调度资源的分配,SchedulerBackend负责与Master、Worker通信收集Worker上分配给该应用使用的资源情况。 


下面进入scheduler.initialize(backend),taskScheduler为接口类,TaskSchedulerImpl实现taskScheduler接口,具体初始化是在taskSchedulerImpl类中,见下面源码


taskScheduler初始化后,会调用start方法,在start方法中,backend也会被调用


故以上可以总结:

SparkContext的createTaskScheduler创建schedulerBackend和taskScheduler–>根据不同的调度方式选择具体的scheduler和backend构造器–>调用TaskSchedulerImpl的initialize方法为scheduler的成员变量backend赋值–>createTaskScheduler返回创建好的(schedulerBackend, taskScheduler)–>调用TaskScheduler.start()启动–>实际上在TaskSchedulerImpl的start方法中调用backend.start()来启动SchedulerBackend。

TaskScheduler是在Application执行过程中,为它进行任务调度的,是属于Driver侧的。对应于一个Application就会有一个TaskScheduler,TaskScheduler和Application是一一对应的。TaskScheduler对资源的控制也比较鲁棒,一个Application申请Worker的计算资源,只要Application不结束就会一直被占有。

下面进入taskSchedulerImpl的作业提交的主要方法,此方法可以概括如下

  1. 任务(tasks)会被包装成TaskSetManager
  2. TaskSetManager实例通过schedulableBuilder(分为FIFOSchedulableBuilder和FairSchedulableBuilder两种)投入调度池中等待调度
  3. 调用backend的reviveOffers函数,向backend的driverActor实例发送ReviveOffers消息,driveerActor收到ReviveOffers消息后,调用makeOffers处理函数

override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    //同步
    this.synchronized {
      //把taskSet封装为taskSetManager
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      //把taskManager递交给scheduler
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
      //判断是否接收到task
      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    //调用backend的reviveOffers函数,向backend的driverActor实例发送ReviveOffers消息
    backend.reviveOffers()
  }

reviveOffers函数代码

下面是CoarseGrainedSchedulerBackend的reviveOffers函数:

 override def reviveOffers() {
    //发送
    driverEndpoint.send(ReviveOffers)
  }
driver经过rpc接收到收到ReviveOffers消息后,调用makeOffers处理函数。



如上面源码可以了解,再分发task之前,先调用scheduler.resourceOffers()方法获取资源;资源分配的工作由resourceOffers函数处理;下面进行resourceOffers()方法:

  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    //遍历work资源,更新executor相关的映射
     for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    // 从worker当中随机选出一些来,防止任务都堆在一个机器上
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    // worker的task列表
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    // getSortedTask函数对taskset进行排序
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    // 随机遍历抽出来的worker,通过TaskSetManager的resourceOffer,把本地性最高的Task分给Worker
    // 本地性是根据当前的等待时间来确定的任务本地性的级别。
    // 它的本地性主要是包括四类:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。
    //1. 首先依次遍历 sortedTaskSets, 并对于每个 Taskset, 遍历 TaskLocality
    //2. 越 local 越优先, 找不到(launchedTask 为 false)才会到下个 locality 级别
    //3. (封装在resourceOfferSingleTaskSet函数)在多次遍历offer list,
    //因为一次taskSet.resourceOffer只会占用一个core, 
    //而不是一次用光所有的 core, 这样有助于一个 taskset 中的 task 比较均匀的分布在workers上
    //4. 只有在该taskset, 该locality下, 对所有worker offer都找不到合适的task时, 
    //才跳到下个 locality 级别
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }
然后调用

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= maxRpcMessageSize) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
                "spark.rpc.message.maxSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK

          logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")

          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

以上就是提交大部分过程了,下面引用张包峰的csdn博客图,此图很明显的介绍了作业提交过程:



1、Driver程序的代码运行到action操作,触发了SparkContext的runJob方法。 
2、SparkContext调用DAGScheduler的runJob函数。 
3、DAGScheduler把Job划分stage,然后把stage转化为相应的Tasks,把Tasks交给TaskScheduler。 
4、通过TaskScheduler把Tasks添加到任务队列当中,交给SchedulerBackend进行资源分配和任务调度。 
5、调度器给Task分配执行Executor,ExecutorBackend负责执行Task。





2017-05-20 18:52:47 m0_37138008 阅读数 2047
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    34968 人正在学习 去看看 张长志

错误信息:

17/05/20 18:51:39 ERROR JobScheduler: Error running job streaming job 1495277499000 ms.0
org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)

问题原因:再对RDD进行操作时引用了类的成员变量而该成员变量无法被序列化所导致的

  例如如下代码:   

object Test2 extends App{
   val conf = new SparkConf().setAppName("RVM").setMaster("local")
   val sc = new SparkContext(conf)
   val matrix = new DenseMatrix(2,2,Array(1.0,2,3,4))
   new Test(sc,matrix).run()

}

class Test(scc:SparkContext,PHI:DenseMatrix) extends Serializable{
   val ts = 0.1
   def run(): Unit ={
      val rdds = scc.parallelize(0 to 3)
      val a = rdds.map(
         x =>{
            PHI.toArray.apply(x)*x
         }
      )
      a.collect.foreach(println(_))
   }
}

      这一段代码运行确实会报错,而且报错如预期一样,最开始以为是因为DenseMatrix不能序列化导致的,结果将DenseMatrix换成了其它类型如Double等基本类型同样会报错,然后发现是scc(SparkContext)不能序列化导致的错误。

      解决办法是在不能序列化的变量前添加注释@transient告诉编译器该变量不需要进行序列化。网上还有其它的一些处理方法暂时未深入研究,

   如果还是没有得到解决:

  可以试下如下方法:

  

出现“org.apache.spark.SparkException: Task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。特别是当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。解决这个问题最常用的方法有:

  1. 如果可以,将依赖的变量放到map、filter等的参数内部定义。这样就可以使用不支持序列化的类;
  2. 如果可以,将依赖的变量独立放到一个小的class中,让这个class支持序列化;这样做可以减少网络传输量,提高效率;
  3. 如果可以,将被依赖的类中不能序列化的部分使用transient关键字修饰,告诉编译器它不需要序列化。
  4. 将引用的类做成可序列化的。

 


2017-08-22 18:36:58 z597952645 阅读数 701
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    34968 人正在学习 去看看 张长志

1 spark中的宽依赖算子

spark的rdd基本操作包括transformation和action,rdd都是懒加载的,通过DAGGraph生成一个有向无环链来代表rdd的生成关系,只有遇到action以后才会真正的去执行。

在执行过程中会根据宽/窄依赖进行stage划分,常见的宽依赖包括groupByKey/reduceByKey/partitionBy……

以reduceByKey为例,调用reduceByKey时,会通过hashPartitioner方式去调用combineByKeyWithClassTag,实现如下:

def combineByKeyWithClassTag[C](
    createCombiner: V => C,
    mergeValue: (C, V) => C,
    mergeCombiners: (C, C) => C,
    partitioner: Partitioner,
    mapSideCombine: Boolean = true,
    serializer: Serializer = null)(implicit ct: ClassTag[C]): RDD[(K, C)] = self.withScope {
  code
  val aggregator = new Aggregator[K, V, C](
    self.context.clean(createCombiner),
    self.context.clean(mergeValue),
    self.context.clean(mergeCombiners))
  if (self.partitioner == Some(partitioner)) {
    //code
  } else {
//生成shuffledRDD
    new ShuffledRDD[K, V, C](self, partitioner)
      .setSerializer(serializer)
      .setAggregator(aggregator)
      .setMapSideCombine(mapSideCombine)
  }
}

核心在于根据传入的序列化方式/聚合器等参数生成新的ShuffledRDD.

2 宽依赖划分stage

action会触发流程真正开始执行

以count为例,

def count(): Long = sc.runJob(this, Utils.getIteratorSize _).sum

调用count的时候sparkContext会开始调用runJob方法,进一步调用dagSchedule的runJob方法

/**
 * 在rdd给定的partitions上运行func,并将函数结果传给resultHandler。 
 */
def runJob[T, U: ClassTag](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    resultHandler: (Int, U) => Unit): Unit = {
  if (stopped.get()) {
    throw new IllegalStateException("SparkContext has been shutdown")
  }
  val callSite = getCallSite
  val cleanedFunc = clean(func)
  logInfo("Starting job: " + callSite.shortForm)
  if (conf.getBoolean("spark.logLineage", false)) {
    logInfo("RDD's recursive dependencies:\n" + rdd.toDebugString)
  }
  dagScheduler.runJob(rdd, cleanedFunc, partitions, callSite, resultHandler, localProperties.get)//调用dagSchedule.runJob
  progressBar.foreach(_.finishAll())
  rdd.doCheckpoint()
}

在dagScheduler里面,调用情况如下:

def runJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): Unit = {
  val start = System.nanoTime
  //核心是submitJob  
  val waiter = submitJob(rdd, func, partitions, callSite, resultHandler, properties)
  ……
}

/**
 * 提交action给scheduler.
 * @param rdd 目标rdd
 * @param func 执行函数
 * @param partitions 目标rdd的分区集合
 * @param callSite where in the user program this job was called
 * @param resultHandler 结果接收方
 * @param properties 配置属性
 * @return a JobWaiter 用来等候job完成,获取取消job
 */
def submitJob[T, U](
    rdd: RDD[T],
    func: (TaskContext, Iterator[T]) => U,
    partitions: Seq[Int],
    callSite: CallSite,
    resultHandler: (Int, U) => Unit,
    properties: Properties): JobWaiter[U] = {
  // Check to make sure we are not launching a task on a partition that does not exist.
  val maxPartitions = rdd.partitions.length
  //判断分区数目是否合理
  partitions.find(p => p >= maxPartitions || p < 0).foreach { p =>
    throw new IllegalArgumentException(
      "Attempting to access a non-existent partition: " + p + ". " +
        "Total number of partitions: " + maxPartitions)
  }
  //生成新的jobid
  val jobId = nextJobId.getAndIncrement()
  if (partitions.size == 0) {
    // Return immediately if the job is running 0 tasks
    return new JobWaiter[U](this, jobId, 0, resultHandler)
  }
  val func2 = func.asInstanceOf[(TaskContext, Iterator[_]) => _]
  val waiter = new JobWaiter(this, jobId, partitions.size, resultHandler)
  //将新生成的JobSubmit实例添加到job队列中去
  eventProcessLoop.post(JobSubmitted(
    jobId, rdd, func2, partitions.toArray, callSite, waiter,
    SerializationUtils.clone(properties)))
  waiter
}

eventProcessLoop继承至EventLoop,里面维护有job队列

当队列里面不为空的时候,通过handleJobSubmitteed来处理这个job

private def doOnReceive(event: DAGSchedulerEvent): Unit = event match {
  case JobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties) =>
    dagScheduler.handleJobSubmitted(jobId, rdd, func, partitions, callSite, listener, properties)
  ……
}


//完成job到stage的转换,每个job都有一个finalStage:ResultStage
private[scheduler] def handleJobSubmitted(jobId: Int,
    finalRDD: RDD[_],
    func: (TaskContext, Iterator[_]) => _,
    partitions: Array[Int],
    callSite: CallSite,
    listener: JobListener,
    properties: Properties) {
  var finalStage: ResultStage = null
  try {
    // 生成结果stage,执行过程中可能会报异常
    finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)
  } catch {
    //异常处理
  }
//根据finalStage和jobId信息生成新的ActiveJob对象
  val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)
  clearCacheLocs()
  val jobSubmissionTime = clock.getTimeMillis()
  jobIdToActiveJob(jobId) = job   //job绑定id
  activeJobs += job    
  finalStage.setActiveJob(job) //再次绑定
  val stageIds = jobIdToStageIds(jobId).toArray//获取指定job下面的所有stageId
  val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))
  listenerBus.post(
    SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))
  submitStage(finalStage)//根据finalStage提交stage

  submitWaitingStages()
}

/** 提交stage,但是会先递归提交祖先stage. */
private def submitStage(stage: Stage) {
  val jobId = activeJobForStage(stage)
  if (jobId.isDefined) {
    logDebug("submitStage(" + stage + ")")
    if (!waitingStages(stage) && !runningStages(stage) && !failedStages(stage)) {
      val missing = getMissingParentStages(stage).sortBy(_.id) //获取stage的依赖关系,划分stage
      logDebug("missing: " + missing)
      if (missing.isEmpty) {
        logInfo("Submitting " + stage + " (" + stage.rdd + "), which has no missing parents")
        submitMissingTasks(stage, jobId.get) //所有祖先stage可用之后调用
      } else {
        for (parent <- missing) {
          submitStage(parent) //计算丢失的祖先stage信息,优先提交祖先stage
        }
        waitingStages += stage //计算祖先stage信息时,修改stage信息为waiting状态
      }
    }
  } else {
    abortStage(stage, "No active job for stage " + stage.id, None)
  }
}
//stage划分依据
private def getMissingParentStages(stage: Stage): List[Stage] = {
  val missing = new HashSet[Stage]
  val visited = new HashSet[RDD[_]]
  // 用栈来保存已经访问过的rdd信息
  val waitingForVisit = new Stack[RDD[_]]
  def visit(rdd: RDD[_]) {
    if (!visited(rdd)) {
      visited += rdd
      val rddHasUncachedPartitions = getCacheLocs(rdd).contains(Nil)//是否存在未缓存的partition
      if (rddHasUncachedPartitions) {
        for (dep <- rdd.dependencies) {//遍历依赖的rdd
          dep match {
            case shufDep: ShuffleDependency[_, _, _] =>
              val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)//宽依赖则生成新的ShuffleMapStage信息
              if (!mapStage.isAvailable) {
                missing += mapStage
              }
            case narrowDep: NarrowDependency[_] =>
              waitingForVisit.push(narrowDep.rdd)//窄依赖就将rdd添加到栈中
          }
        }
      }
    }
  }
  waitingForVisit.push(stage.rdd)
  while (waitingForVisit.nonEmpty) {
    visit(waitingForVisit.pop())
  }
  missing.toList
}

ps:通过getCacheLocs来获取rdd各个分区的缓存位置(未缓存的话每个分区的缓存位置是Nil)

3 task分配 DAGScheduler.submitMissingTasks

private def submitMissingTasks(stage: Stage, jobId: Int) {
  stage.pendingPartitions.clear()
  // 获取所有需要处理的partition
  val partitionsToCompute: Seq[Int] = stage.findMissingPartitions()
  val properties = jobIdToActiveJob(jobId).properties
  runningStages += stage//stage切换到running状态
  // 验证stage是否可以提交
  stage match {
    case s: ShuffleMapStage =>
      outputCommitCoordinator.stageStart(stage = s.id, maxPartitionId = s.numPartitions - 1)
    case s: ResultStage =>
      outputCommitCoordinator.stageStart(
        stage = s.id, maxPartitionId = s.rdd.partitions.length - 1)
  }
//根据partitionId获取最优位置
  val taskIdToLocations: Map[Int, Seq[TaskLocation]] = try {
    stage match {
      case s: ShuffleMapStage =>
        partitionsToCompute.map { id => (id, getPreferredLocs(stage.rdd, id))}.toMap
      case s: ResultStage =>
        val job = s.activeJob.get
        partitionsToCompute.map { id =>
          val p = s.partitions(id)
          (id, getPreferredLocs(stage.rdd, p))
        }.toMap
    }
  } catch {
    //异常处理
  }

  stage.makeNewStageAttempt(partitionsToCompute.size, taskIdToLocations.values.toSeq)
  listenerBus.post(SparkListenerStageSubmitted(stage.latestInfo, properties))

  var taskBinary: Broadcast[Array[Byte]] = null
  try {
    // task序列化
    val taskBinaryBytes: Array[Byte] = stage match {
      case stage: ShuffleMapStage =>
        closureSerializer.serialize((stage.rdd, stage.shuffleDep): AnyRef).array()
      case stage: ResultStage =>
        closureSerializer.serialize((stage.rdd, stage.func): AnyRef).array()
    }

    taskBinary = sc.broadcast(taskBinaryBytes)
  } catch {
    //异常处理
  }
//根据不同的stage类型生成不同的task,每个partition生成一个task
  val tasks: Seq[Task[_]] = try {
    stage match {
      case stage: ShuffleMapStage =>
        partitionsToCompute.map { id =>
          val locs = taskIdToLocations(id)
          val part = stage.rdd.partitions(id)
          new ShuffleMapTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, stage.internalAccumulators)
        }

      case stage: ResultStage =>
        val job = stage.activeJob.get
        partitionsToCompute.map { id =>
          val p: Int = stage.partitions(id)
          val part = stage.rdd.partitions(p)
          val locs = taskIdToLocations(id)
          new ResultTask(stage.id, stage.latestInfo.attemptId,
            taskBinary, part, locs, id, stage.internalAccumulators)
        }
    }
  } catch {
    //
  }

  if (tasks.size > 0) {
    //taskSet不为空,则提交到taskScheduler
    taskScheduler.submitTasks(new TaskSet(
      tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId, properties))
    stage.latestInfo.submissionTime = Some(clock.getTimeMillis())
  } else {
    //标记
  }
}


总结:1 spark懒加载模式,只有在count/show/reduce等action被调用的时候才会真正开始执行计算,每个action会生成一个job,每个job有一个ResultStage;

           2 在执行过程中根据生成关系构造DAGScheduler,根据宽窄依赖关系划分stage。

           3 DAGScheduler根据partition情况分配task,并转移给taskScheduler进行维护;



2019-06-01 16:49:09 OldDirverHelpMe 阅读数 143
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    34968 人正在学习 去看看 张长志

什么是Spark的Task倾斜?

假设当我们提交资源的到yarn上的时候

  • executor个数为6个
  • 每个executor的cores为4个
  • task的个数为6个

理想的情况是:每一个executor做一个task,那么6个executor都在工作,6个task同时执行,只要服务器配置一致,6个task的数据量一致的话,那么数据是很快就可以执行完毕的

我自己实际测试的情况如下:
在这里插入图片描述
从图上可以看到,我有6个executor,并且有6个task,然而这6个task都集中在两个executor上面执行。这不是我想象中的样子。这样的结果是什么,数据处理的负担就给了两个executor,其他的executor都在围观。就好像下面这个样子:
在这里插入图片描述

问题的探究

通过网络搜索,得知应该是spark的数据本地化的原因:

关于数据本地化的介绍spark的官网有描述
下面是我粘贴复制的:

数据位置

数据位置可能会对Spark作业的性能产生重大影响。如果数据和在其上运行的代码在一起,那么计算往往很快。但是如果代码
和数据是分开的,那么必须移动到另一个。通常,将序列化代码从一个地方运送到另一个地方比一块数据更快,因为代码大小比数据小得多。Spark围绕数据局部性的一般原则构建其调度。

数据位置是数据与处理它的代码的接近程度。根据数据的当前位置,有多个级别的位置。从最近到最远的顺序:

  • PROCESS_LOCAL数据与正在运行的代码位于同一JVM中。这是最好的地方
  • NODE_LOCAL数据在同一节点上。示例可能位于同一节点上的HDFS中,也可能位于同一节点上的另一个执行程序中。这比PROCESS_LOCAL因为数据必须在进程之间传输要慢一些
  • NO_PREF 可以从任何地方快速访问数据,并且没有位置偏好
  • RACK_LOCAL数据位于同一机架服务器上。数据位于同一机架上的不同服务器上,因此需要通过网络发送,通常通过单个交换机
  • ANY 数据在网络上的其他位置,而不在同一个机架中
    Spark更喜欢在最佳位置级别安排所有任务,但这并不总是可行的。在任何空闲执行程序上没有未处理数据的情况下,Spark会切换到较低的位置级别。有两个选项:a)等待忙碌的CPU释放以启动同一服务器上的数据任务,或b)立即在需要移动数据的更远的地方启动新任务。

    Spark通常会做的是等待繁忙的CPU释放的希望。一旦超时到期,它就开始将数据从远处移动到空闲CPU。每个级别之间的回退等待超时可以单独配置,也可以在一个参数中一起配置; 有关详细信息,请参阅配置页面spark.locality上的 参数。如果您的任务很长并且看到不良位置,您应该增加这些设置,但默认情况下通常效果很好。

大概的意思就是:spark在给executor分配task的时候,会按照数据所在位置的级别,分配任务。位置级别的优先级排列顺序如下:

类型 描述 解释
PROCESS_LOCAL 进程本地化 数据与代码都在JVM里保存
NODE_LOCAL 节点本地化 数据与代码在同一节点上
NO_PREF 没有偏好 可以在任何地方快速访问数据
RACK_LOCAL 机架本地化 数据在同一个机架的不同的服务器上
ANY 任何地方 数据在网络上的其他位置,而不在同一个机架中

现在对于我之前提出来的问题有一个这样的解决方案:
对于前文提到的问题来说。6个executor,6个task,想要让每一个executor都要跑task的话,每一个executor的core设置为1就可以了,那么这样一来,每一个executor就只有一个core,最多只能执行一个task,所以,剩下的task就会移交到另外的5个executor上面执行了。
但是另外一个问题又出现了
集群的资源利用不上来啊,如果这么去设置的话。

那么就从task入手吧,将task的粒度分得稍微细一点,这样一来,刚开始的时候,比如:
6个executor 4个cores 我有24个task
那么每个executor都在执行task。24个task并行执行。所有的executor都在工作。这样速度就快了

2018-04-13 22:28:41 UUfFO 阅读数 2948
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    34968 人正在学习 去看看 张长志

需求

spark应用程序中,只要task失败就发送邮件,并携带错误原因。 我的GitHub,猛戳我

背景

在spark程序中,task有失败重试机制(根据 spark.task.maxFailures 配置,默认是4次),当task执行失败时,并不会直接导致整个应用程序down掉,只有在重试了 spark.task.maxFailures 次后任然失败的情况下才会使程序down掉。另外,spark on yarn模式还会受yarn的重试机制去重启这个spark程序,根据 yarn.resourcemanager.am.max-attempts 配置(默认是2次)。

即使spark程序task失败4次后,受yarn控制重启后在第4次执行成功了,一切都好像没有发生,我们只有通过spark的监控UI去看是否有失败的task,若有还得去查找看是哪个task由于什么原因失败了。基于以上原因,我们需要做个task失败的监控,只要失败就带上错误原因通知我们,及时发现问题,促使我们的程序更加健壮。

捕获Task失败事件

顺藤摸瓜,task在Executor中执行,跟踪源码看task在失败后都干了啥?

1.在executor中task执行完不管成功与否都会向execBackend报告task的状态;

 execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

2.在CoarseGrainedExecutorBackend中会向driver发送StatusUpdate状态变更信息;

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }

3.CoarseGrainedSchedulerBackend收到消息后有调用了scheduler的方法;

override def receive: PartialFunction[Any, Unit] = {
      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        ......

4.由于代码繁琐,列出了关键的几行代码,嵌套调用关系,这里最后向eventProcessLoop发送了CompletionEvent事件;

taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
taskSetManager.handleFailedTask(tid, taskState, reason)
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
eventProcessLoop.post(CompletionEvent(task, reason, result, accumUpdates, taskInfo)) 

5.在DAGSchedulerEventProcessLoop处理方法中 handleTaskCompletion(event: CompletionEvent)有着最为关键的一行代码,这里listenerBus把task的状态发了出去,凡是监听了SparkListenerTaskEnd的listener都可以获取到对应的消息,而且这个是带了失败的原因(event.reason)。其实第一遍走源码并没有注意到前面提到的sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)方法,后面根据SparkUI的page页面往回追溯才发现。

 listenerBus.post(SparkListenerTaskEnd(
       stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))

自定义监听器

需要获取到SparkListenerTaskEnd事件,得继承SparkListener类并重写onTaskEnd方法,
在方法中获取task失败的reason,发送邮件给对应的负责人。这样我们就可以第一时间知道哪个task是以什么原因失败了。

import cn.i4.utils.MailUtil
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

class I4SparkAppListener(conf: SparkConf) extends SparkListener with Logging {

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
    val info = taskEnd.taskInfo
    // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
    // completion event is for. Let's just drop it here. This means we might have some speculation
    // tasks on the web ui that's never marked as complete.
    if (info != null && taskEnd.stageAttemptId != -1) {
      val errorMessage: Option[String] =
        taskEnd.reason match {
          case kill: TaskKilled =>
            Some(kill.toErrorString)
          case e: ExceptionFailure =>
            Some(e.toErrorString)
          case e: TaskFailedReason =>
            Some(e.toErrorString)
          case _ => None
        }
      if (errorMessage.nonEmpty) {
        if (conf.getBoolean("enableSendEmailOnTaskFail", false)) {
          val args = Array("********@qq.com", "spark任务监控", errorMessage.get)
          try {
            MailUtil.sendMail(args)
          } catch {
            case e: Exception =>
          }
        }
      }
    }
  }
}

注意这里还需要在我们的spark程序中注册好这个listener:

.config("enableSendEmailOnTaskFail", "true")
.config("spark.extraListeners", "cn.i4.monitor.streaming.I4SparkAppListener")

总结

这里只是实现了一个小demo,可以做的更完善使之更通用,比如加上应用程序的名字、host、stageid、taskid等,单独达成jia包放到classPath,并把该listener的注册放到默认配置文件中永久有效,只需控制enableSendEmailOnTaskFail控制是否启用。

我的GitHub,猛戳我

Spark-task相关

阅读数 364

Spark-task相关@(spark)[Task]

博文 来自: oblesslyy

Spark之Task的定义

阅读数 268

没有更多推荐了,返回首页