2017-01-30 21:29:45 lisi1129 阅读数 352
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35089 人正在学习 去看看 张长志

上节说到job提交时候进入了taskScheduler.submitTasks(newTaskSet(tasks.toArray, stage.id, stage.latestInfo.attemptId, jobId,properties)),这节我们就一起了解一下TaskSchedule类和submitTask()方法;

第一步了解taskSchedule类的初始化,此类在sparkContext中被创建,源码见下方;createTaskScheduler函数中,TaskScheduler会根据部署方式而选择不同的SchedulerBackend来处理. 针对不同部署方式会有不同的TaskScheduler与SchedulerBackend进行组合:

  • Local模式:TaskSchedulerImpl + LocalBackend
  • Spark集群模式:TaskSchedulerImpl + SparkDepolySchedulerBackend
  • Yarn-Cluster模式:YarnClusterScheduler + CoarseGrainedSchedulerBackend
  • Yarn-Client模式:YarnClientClusterScheduler + YarnClientSchedulerBackend
TaskScheduler类负责任务调度资源的分配,SchedulerBackend负责与Master、Worker通信收集Worker上分配给该应用使用的资源情况。 


下面进入scheduler.initialize(backend),taskScheduler为接口类,TaskSchedulerImpl实现taskScheduler接口,具体初始化是在taskSchedulerImpl类中,见下面源码


taskScheduler初始化后,会调用start方法,在start方法中,backend也会被调用


故以上可以总结:

SparkContext的createTaskScheduler创建schedulerBackend和taskScheduler–>根据不同的调度方式选择具体的scheduler和backend构造器–>调用TaskSchedulerImpl的initialize方法为scheduler的成员变量backend赋值–>createTaskScheduler返回创建好的(schedulerBackend, taskScheduler)–>调用TaskScheduler.start()启动–>实际上在TaskSchedulerImpl的start方法中调用backend.start()来启动SchedulerBackend。

TaskScheduler是在Application执行过程中,为它进行任务调度的,是属于Driver侧的。对应于一个Application就会有一个TaskScheduler,TaskScheduler和Application是一一对应的。TaskScheduler对资源的控制也比较鲁棒,一个Application申请Worker的计算资源,只要Application不结束就会一直被占有。

下面进入taskSchedulerImpl的作业提交的主要方法,此方法可以概括如下

  1. 任务(tasks)会被包装成TaskSetManager
  2. TaskSetManager实例通过schedulableBuilder(分为FIFOSchedulableBuilder和FairSchedulableBuilder两种)投入调度池中等待调度
  3. 调用backend的reviveOffers函数,向backend的driverActor实例发送ReviveOffers消息,driveerActor收到ReviveOffers消息后,调用makeOffers处理函数

override def submitTasks(taskSet: TaskSet) {
    val tasks = taskSet.tasks
    logInfo("Adding task set " + taskSet.id + " with " + tasks.length + " tasks")
    //同步
    this.synchronized {
      //把taskSet封装为taskSetManager
      val manager = createTaskSetManager(taskSet, maxTaskFailures)
      val stage = taskSet.stageId
      val stageTaskSets =
        taskSetsByStageIdAndAttempt.getOrElseUpdate(stage, new HashMap[Int, TaskSetManager])
      stageTaskSets(taskSet.stageAttemptId) = manager
      val conflictingTaskSet = stageTaskSets.exists { case (_, ts) =>
        ts.taskSet != taskSet && !ts.isZombie
      }
      if (conflictingTaskSet) {
        throw new IllegalStateException(s"more than one active taskSet for stage $stage:" +
          s" ${stageTaskSets.toSeq.map{_._2.taskSet.id}.mkString(",")}")
      }
      //把taskManager递交给scheduler
      schedulableBuilder.addTaskSetManager(manager, manager.taskSet.properties)
      //判断是否接收到task
      if (!isLocal && !hasReceivedTask) {
        starvationTimer.scheduleAtFixedRate(new TimerTask() {
          override def run() {
            if (!hasLaunchedTask) {
              logWarning("Initial job has not accepted any resources; " +
                "check your cluster UI to ensure that workers are registered " +
                "and have sufficient resources")
            } else {
              this.cancel()
            }
          }
        }, STARVATION_TIMEOUT_MS, STARVATION_TIMEOUT_MS)
      }
      hasReceivedTask = true
    }
    //调用backend的reviveOffers函数,向backend的driverActor实例发送ReviveOffers消息
    backend.reviveOffers()
  }

reviveOffers函数代码

下面是CoarseGrainedSchedulerBackend的reviveOffers函数:

 override def reviveOffers() {
    //发送
    driverEndpoint.send(ReviveOffers)
  }
driver经过rpc接收到收到ReviveOffers消息后,调用makeOffers处理函数。



如上面源码可以了解,再分发task之前,先调用scheduler.resourceOffers()方法获取资源;资源分配的工作由resourceOffers函数处理;下面进行resourceOffers()方法:

  def resourceOffers(offers: Seq[WorkerOffer]): Seq[Seq[TaskDescription]] = synchronized {
    // Mark each slave as alive and remember its hostname
    // Also track if new executor is added
    var newExecAvail = false
    //遍历work资源,更新executor相关的映射
     for (o <- offers) {
      executorIdToHost(o.executorId) = o.host
      executorIdToTaskCount.getOrElseUpdate(o.executorId, 0)
      if (!executorsByHost.contains(o.host)) {
        executorsByHost(o.host) = new HashSet[String]()
        executorAdded(o.executorId, o.host)
        newExecAvail = true
      }
      for (rack <- getRackForHost(o.host)) {
        hostsByRack.getOrElseUpdate(rack, new HashSet[String]()) += o.host
      }
    }

    // Randomly shuffle offers to avoid always placing tasks on the same set of workers.
    // 从worker当中随机选出一些来,防止任务都堆在一个机器上
    val shuffledOffers = Random.shuffle(offers)
    // Build a list of tasks to assign to each worker.
    // worker的task列表
    val tasks = shuffledOffers.map(o => new ArrayBuffer[TaskDescription](o.cores))
    val availableCpus = shuffledOffers.map(o => o.cores).toArray
    // getSortedTask函数对taskset进行排序
    val sortedTaskSets = rootPool.getSortedTaskSetQueue
    for (taskSet <- sortedTaskSets) {
      logDebug("parentName: %s, name: %s, runningTasks: %s".format(
        taskSet.parent.name, taskSet.name, taskSet.runningTasks))
      if (newExecAvail) {
        taskSet.executorAdded()
      }
    }

    // Take each TaskSet in our scheduling order, and then offer it each node in increasing order
    // of locality levels so that it gets a chance to launch local tasks on all of them.
    // NOTE: the preferredLocality order: PROCESS_LOCAL, NODE_LOCAL, NO_PREF, RACK_LOCAL, ANY
    // 随机遍历抽出来的worker,通过TaskSetManager的resourceOffer,把本地性最高的Task分给Worker
    // 本地性是根据当前的等待时间来确定的任务本地性的级别。
    // 它的本地性主要是包括四类:PROCESS_LOCAL, NODE_LOCAL, RACK_LOCAL, ANY。
    //1. 首先依次遍历 sortedTaskSets, 并对于每个 Taskset, 遍历 TaskLocality
    //2. 越 local 越优先, 找不到(launchedTask 为 false)才会到下个 locality 级别
    //3. (封装在resourceOfferSingleTaskSet函数)在多次遍历offer list,
    //因为一次taskSet.resourceOffer只会占用一个core, 
    //而不是一次用光所有的 core, 这样有助于一个 taskset 中的 task 比较均匀的分布在workers上
    //4. 只有在该taskset, 该locality下, 对所有worker offer都找不到合适的task时, 
    //才跳到下个 locality 级别
    var launchedTask = false
    for (taskSet <- sortedTaskSets; maxLocality <- taskSet.myLocalityLevels) {
      do {
        launchedTask = resourceOfferSingleTaskSet(
            taskSet, maxLocality, shuffledOffers, availableCpus, tasks)
      } while (launchedTask)
    }

    if (tasks.size > 0) {
      hasLaunchedTask = true
    }
    return tasks
  }
然后调用

private def launchTasks(tasks: Seq[Seq[TaskDescription]]) {
      for (task <- tasks.flatten) {
        val serializedTask = ser.serialize(task)
        if (serializedTask.limit >= maxRpcMessageSize) {
          scheduler.taskIdToTaskSetManager.get(task.taskId).foreach { taskSetMgr =>
            try {
              var msg = "Serialized task %s:%d was %d bytes, which exceeds max allowed: " +
                "spark.rpc.message.maxSize (%d bytes). Consider increasing " +
                "spark.rpc.message.maxSize or using broadcast variables for large values."
              msg = msg.format(task.taskId, task.index, serializedTask.limit, maxRpcMessageSize)
              taskSetMgr.abort(msg)
            } catch {
              case e: Exception => logError("Exception in error callback", e)
            }
          }
        }
        else {
          val executorData = executorDataMap(task.executorId)
          executorData.freeCores -= scheduler.CPUS_PER_TASK

          logInfo(s"Launching task ${task.taskId} on executor id: ${task.executorId} hostname: " +
            s"${executorData.executorHost}.")

          executorData.executorEndpoint.send(LaunchTask(new SerializableBuffer(serializedTask)))
        }
      }
    }

以上就是提交大部分过程了,下面引用张包峰的csdn博客图,此图很明显的介绍了作业提交过程:



1、Driver程序的代码运行到action操作,触发了SparkContext的runJob方法。 
2、SparkContext调用DAGScheduler的runJob函数。 
3、DAGScheduler把Job划分stage,然后把stage转化为相应的Tasks,把Tasks交给TaskScheduler。 
4、通过TaskScheduler把Tasks添加到任务队列当中,交给SchedulerBackend进行资源分配和任务调度。 
5、调度器给Task分配执行Executor,ExecutorBackend负责执行Task。





2019-06-01 16:49:09 OldDirverHelpMe 阅读数 171
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35089 人正在学习 去看看 张长志

什么是Spark的Task倾斜?

假设当我们提交资源的到yarn上的时候

  • executor个数为6个
  • 每个executor的cores为4个
  • task的个数为6个

理想的情况是:每一个executor做一个task,那么6个executor都在工作,6个task同时执行,只要服务器配置一致,6个task的数据量一致的话,那么数据是很快就可以执行完毕的

我自己实际测试的情况如下:
在这里插入图片描述
从图上可以看到,我有6个executor,并且有6个task,然而这6个task都集中在两个executor上面执行。这不是我想象中的样子。这样的结果是什么,数据处理的负担就给了两个executor,其他的executor都在围观。就好像下面这个样子:
在这里插入图片描述

问题的探究

通过网络搜索,得知应该是spark的数据本地化的原因:

关于数据本地化的介绍spark的官网有描述
下面是我粘贴复制的:

数据位置

数据位置可能会对Spark作业的性能产生重大影响。如果数据和在其上运行的代码在一起,那么计算往往很快。但是如果代码
和数据是分开的,那么必须移动到另一个。通常,将序列化代码从一个地方运送到另一个地方比一块数据更快,因为代码大小比数据小得多。Spark围绕数据局部性的一般原则构建其调度。

数据位置是数据与处理它的代码的接近程度。根据数据的当前位置,有多个级别的位置。从最近到最远的顺序:

  • PROCESS_LOCAL数据与正在运行的代码位于同一JVM中。这是最好的地方
  • NODE_LOCAL数据在同一节点上。示例可能位于同一节点上的HDFS中,也可能位于同一节点上的另一个执行程序中。这比PROCESS_LOCAL因为数据必须在进程之间传输要慢一些
  • NO_PREF 可以从任何地方快速访问数据,并且没有位置偏好
  • RACK_LOCAL数据位于同一机架服务器上。数据位于同一机架上的不同服务器上,因此需要通过网络发送,通常通过单个交换机
  • ANY 数据在网络上的其他位置,而不在同一个机架中
    Spark更喜欢在最佳位置级别安排所有任务,但这并不总是可行的。在任何空闲执行程序上没有未处理数据的情况下,Spark会切换到较低的位置级别。有两个选项:a)等待忙碌的CPU释放以启动同一服务器上的数据任务,或b)立即在需要移动数据的更远的地方启动新任务。

    Spark通常会做的是等待繁忙的CPU释放的希望。一旦超时到期,它就开始将数据从远处移动到空闲CPU。每个级别之间的回退等待超时可以单独配置,也可以在一个参数中一起配置; 有关详细信息,请参阅配置页面spark.locality上的 参数。如果您的任务很长并且看到不良位置,您应该增加这些设置,但默认情况下通常效果很好。

大概的意思就是:spark在给executor分配task的时候,会按照数据所在位置的级别,分配任务。位置级别的优先级排列顺序如下:

类型 描述 解释
PROCESS_LOCAL 进程本地化 数据与代码都在JVM里保存
NODE_LOCAL 节点本地化 数据与代码在同一节点上
NO_PREF 没有偏好 可以在任何地方快速访问数据
RACK_LOCAL 机架本地化 数据在同一个机架的不同的服务器上
ANY 任何地方 数据在网络上的其他位置,而不在同一个机架中

现在对于我之前提出来的问题有一个这样的解决方案:
对于前文提到的问题来说。6个executor,6个task,想要让每一个executor都要跑task的话,每一个executor的core设置为1就可以了,那么这样一来,每一个executor就只有一个core,最多只能执行一个task,所以,剩下的task就会移交到另外的5个executor上面执行了。
但是另外一个问题又出现了
集群的资源利用不上来啊,如果这么去设置的话。

那么就从task入手吧,将task的粒度分得稍微细一点,这样一来,刚开始的时候,比如:
6个executor 4个cores 我有24个task
那么每个executor都在执行task。24个task并行执行。所有的executor都在工作。这样速度就快了

2017-12-06 21:53:51 qq_21383435 阅读数 1062
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35089 人正在学习 去看看 张长志

报错这个一般是org.apache.spark.SparkException: Task not serializable

17/12/06 14:20:10 INFO MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 28.4 KB, free 872.6 MB)
17/12/06 14:20:10 INFO BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.1.161:51006 (size: 28.4 KB, free: 873.0 MB)
17/12/06 14:20:10 INFO SparkContext: Created broadcast 0 from newAPIHadoopRDD at SparkOnHbaseSecond.java:92
Exception in thread "main" org.apache.spark.SparkException: Task not serializable
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
    at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
    at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)
    at org.apache.spark.SparkContext.clean(SparkContext.scala:2101)
    at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:370)
	at org.apache.spark.rdd.RDD$$anonfun$map$1.apply(RDD.scala:369)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:151)
    at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:112)
    at org.apache.spark.rdd.RDD.withScope(RDD.scala:362)
    at org.apache.spark.rdd.RDD.map(RDD.scala:369)
    at org.apache.spark.api.java.JavaRDDLike$class.map(JavaRDDLike.scala:93)
    at org.apache.spark.api.java.AbstractJavaRDDLike.map(JavaRDDLike.scala:45)
    at sparlsql.hbase.www.second.SparkOnHbaseSecond.main(SparkOnHbaseSecond.java:94)
Caused by: java.io.NotSerializableException: sparlsql.hbase.www.second.SparkOnHbaseSecond
Serialization stack:
    - object not serializable (class: sparlsql.hbase.www.second.SparkOnHbaseSecond, value: sparlsql.hbase.www.second.SparkOnHbaseSecond@602ae7b6)
    - field (class: sparlsql.hbase.www.second.SparkOnHbaseSecond$1, name: val$sparkOnHbase, type: class sparlsql.hbase.www.second.SparkOnHbaseSecond)
    - object (class sparlsql.hbase.www.second.SparkOnHbaseSecond$1, sparlsql.hbase.www.second.SparkOnHbaseSecond$1@37af1f93)
    - field (class: org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, name: fun$1, type: interface org.apache.spark.api.java.function.Function)
	- object (class org.apache.spark.api.java.JavaPairRDD$$anonfun$toScalaFunction$1, <function1>)
    at org.apache.spark.serializer.SerializationDebugger$.improveException(SerializationDebugger.scala:40)
    at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:46)
    at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:100)
    at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:295)
    ... 12 more
17/12/06 14:20:10 INFO SparkContext: Invoking stop() from shutdown hook

第一种是类没序列化


public class SparkOnHbaseBack {


    private String  aa = "1234"; 
    public static void main(String[] args) throws Exception {



        SparkSession spark=SparkSession.builder()  
                .appName("lcc_java_read_hbase_register_to_table")  
                .master("local[4]")
                .getOrCreate();  



        JavaRDD<Row> personsRDD = myRDD.map(new Function<Tuple2<ImmutableBytesWritable,Result>,Row>() {

            @Override
            public Row call(Tuple2<ImmutableBytesWritable, Result> tuple) throws Exception {
                // TODO Auto-generated method stub
                // System.out.println("====tuple=========="+aa);
                这里使用了main外的字段,把字段放到这个map方法里面就好了
                或者类继承实现public class SparkOnHbaseSecond implements java.io.Serializable 类似这样


        });
2018-04-13 22:28:41 UUfFO 阅读数 3118
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35089 人正在学习 去看看 张长志

需求

spark应用程序中,只要task失败就发送邮件,并携带错误原因。 我的GitHub,猛戳我

背景

在spark程序中,task有失败重试机制(根据 spark.task.maxFailures 配置,默认是4次),当task执行失败时,并不会直接导致整个应用程序down掉,只有在重试了 spark.task.maxFailures 次后任然失败的情况下才会使程序down掉。另外,spark on yarn模式还会受yarn的重试机制去重启这个spark程序,根据 yarn.resourcemanager.am.max-attempts 配置(默认是2次)。

即使spark程序task失败4次后,受yarn控制重启后在第4次执行成功了,一切都好像没有发生,我们只有通过spark的监控UI去看是否有失败的task,若有还得去查找看是哪个task由于什么原因失败了。基于以上原因,我们需要做个task失败的监控,只要失败就带上错误原因通知我们,及时发现问题,促使我们的程序更加健壮。

捕获Task失败事件

顺藤摸瓜,task在Executor中执行,跟踪源码看task在失败后都干了啥?

1.在executor中task执行完不管成功与否都会向execBackend报告task的状态;

 execBackend.statusUpdate(taskId, TaskState.FINISHED, serializedResult)

2.在CoarseGrainedExecutorBackend中会向driver发送StatusUpdate状态变更信息;

override def statusUpdate(taskId: Long, state: TaskState, data: ByteBuffer) {
    val msg = StatusUpdate(executorId, taskId, state, data)
    driver match {
      case Some(driverRef) => driverRef.send(msg)
      case None => logWarning(s"Drop $msg because has not yet connected to driver")
    }
  }

3.CoarseGrainedSchedulerBackend收到消息后有调用了scheduler的方法;

override def receive: PartialFunction[Any, Unit] = {
      case StatusUpdate(executorId, taskId, state, data) =>
        scheduler.statusUpdate(taskId, state, data.value)
        ......

4.由于代码繁琐,列出了关键的几行代码,嵌套调用关系,这里最后向eventProcessLoop发送了CompletionEvent事件;

taskResultGetter.enqueueFailedTask(taskSet, tid, state, serializedData)
scheduler.handleFailedTask(taskSetManager, tid, taskState, reason)
taskSetManager.handleFailedTask(tid, taskState, reason)
sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)
eventProcessLoop.post(CompletionEvent(task, reason, result, accumUpdates, taskInfo)) 

5.在DAGSchedulerEventProcessLoop处理方法中 handleTaskCompletion(event: CompletionEvent)有着最为关键的一行代码,这里listenerBus把task的状态发了出去,凡是监听了SparkListenerTaskEnd的listener都可以获取到对应的消息,而且这个是带了失败的原因(event.reason)。其实第一遍走源码并没有注意到前面提到的sched.dagScheduler.taskEnded(tasks(index), reason, null, accumUpdates, info)方法,后面根据SparkUI的page页面往回追溯才发现。

 listenerBus.post(SparkListenerTaskEnd(
       stageId, task.stageAttemptId, taskType, event.reason, event.taskInfo, taskMetrics))

自定义监听器

需要获取到SparkListenerTaskEnd事件,得继承SparkListener类并重写onTaskEnd方法,
在方法中获取task失败的reason,发送邮件给对应的负责人。这样我们就可以第一时间知道哪个task是以什么原因失败了。

import cn.i4.utils.MailUtil
import org.apache.spark._
import org.apache.spark.internal.Logging
import org.apache.spark.scheduler.{SparkListener, SparkListenerTaskEnd}

class I4SparkAppListener(conf: SparkConf) extends SparkListener with Logging {

  override def onTaskEnd(taskEnd: SparkListenerTaskEnd): Unit = synchronized {
    val info = taskEnd.taskInfo
    // If stage attempt id is -1, it means the DAGScheduler had no idea which attempt this task
    // completion event is for. Let's just drop it here. This means we might have some speculation
    // tasks on the web ui that's never marked as complete.
    if (info != null && taskEnd.stageAttemptId != -1) {
      val errorMessage: Option[String] =
        taskEnd.reason match {
          case kill: TaskKilled =>
            Some(kill.toErrorString)
          case e: ExceptionFailure =>
            Some(e.toErrorString)
          case e: TaskFailedReason =>
            Some(e.toErrorString)
          case _ => None
        }
      if (errorMessage.nonEmpty) {
        if (conf.getBoolean("enableSendEmailOnTaskFail", false)) {
          val args = Array("********@qq.com", "spark任务监控", errorMessage.get)
          try {
            MailUtil.sendMail(args)
          } catch {
            case e: Exception =>
          }
        }
      }
    }
  }
}

注意这里还需要在我们的spark程序中注册好这个listener:

.config("enableSendEmailOnTaskFail", "true")
.config("spark.extraListeners", "cn.i4.monitor.streaming.I4SparkAppListener")

总结

这里只是实现了一个小demo,可以做的更完善使之更通用,比如加上应用程序的名字、host、stageid、taskid等,单独达成jia包放到classPath,并把该listener的注册放到默认配置文件中永久有效,只需控制enableSendEmailOnTaskFail控制是否启用。

我的GitHub,猛戳我

2017-05-20 18:52:47 m0_37138008 阅读数 2087
  • 大数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35089 人正在学习 去看看 张长志

错误信息:

17/05/20 18:51:39 ERROR JobScheduler: Error running job streaming job 1495277499000 ms.0
org.apache.spark.SparkException: Task not serializable
	at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:298)
	at org.apache.spark.util.ClosureCleaner$.org$apache$spark$util$ClosureCleaner$$clean(ClosureCleaner.scala:288)
	at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:108)

问题原因:再对RDD进行操作时引用了类的成员变量而该成员变量无法被序列化所导致的

  例如如下代码:   

object Test2 extends App{
   val conf = new SparkConf().setAppName("RVM").setMaster("local")
   val sc = new SparkContext(conf)
   val matrix = new DenseMatrix(2,2,Array(1.0,2,3,4))
   new Test(sc,matrix).run()

}

class Test(scc:SparkContext,PHI:DenseMatrix) extends Serializable{
   val ts = 0.1
   def run(): Unit ={
      val rdds = scc.parallelize(0 to 3)
      val a = rdds.map(
         x =>{
            PHI.toArray.apply(x)*x
         }
      )
      a.collect.foreach(println(_))
   }
}

      这一段代码运行确实会报错,而且报错如预期一样,最开始以为是因为DenseMatrix不能序列化导致的,结果将DenseMatrix换成了其它类型如Double等基本类型同样会报错,然后发现是scc(SparkContext)不能序列化导致的错误。

      解决办法是在不能序列化的变量前添加注释@transient告诉编译器该变量不需要进行序列化。网上还有其它的一些处理方法暂时未深入研究,

   如果还是没有得到解决:

  可以试下如下方法:

  

出现“org.apache.spark.SparkException: Task not serializable"这个错误,一般是因为在map、filter等的参数使用了外部的变量,但是这个变量不能序列化。特别是当引用了某个类(经常是当前类)的成员函数或变量时,会导致这个类的所有成员(整个类)都需要支持序列化。解决这个问题最常用的方法有:

  1. 如果可以,将依赖的变量放到map、filter等的参数内部定义。这样就可以使用不支持序列化的类;
  2. 如果可以,将依赖的变量独立放到一个小的class中,让这个class支持序列化;这样做可以减少网络传输量,提高效率;
  3. 如果可以,将被依赖的类中不能序列化的部分使用transient关键字修饰,告诉编译器它不需要序列化。
  4. 将引用的类做成可序列化的。

 


spark stage的划分和task分配

博文 来自: z597952645

Spark-task相关

阅读数 370

Spark-task相关@(spark)[Task]

博文 来自: oblesslyy
没有更多推荐了,返回首页