• Spark学习之driver

    2018-01-14 20:18:10
    driverspark中并不是一个非常重要的概念,但是在学习过程中对于他的理解感觉比其他组件都要费劲,花了几天的功夫终于把Driver弄明白了,希望这篇博客能对刚学习spark的人有点帮助 因为driver这个概念的理解与...

    driver在spark中并不是一个非常重要的概念,但是在学习过程中对于他的理解感觉比其他组件都要费劲,花了几天的功夫终于把Driver弄明白了,希望这篇博客能对刚学习spark的人有点帮助

    因为driver这个概念的理解与spark的运行模式有关,所以在讲解spark之前会先讲一下spark的四种分布式运行模式

    spark的四种分布式运行模式

    这里写图片描述
    如图所示,上方为spark的架构图,spark的组件可以分为四个部分,driver、cluster Manager、worker和executor
    根据clusterManager的不同,spark可以分成四种不同的运行模式,standalone、local cluster、spark on yarn 和spark on mesos

    1)standalone模式:

    standalone模式既独立模式,自带完整服务,可单独部署到一个集群中,无需依赖其他任何资源管理系统,只支持FIFO调度器。从一定程度上说,它是spark on yarn 和spark on mesos 的基础。在standalone模式中,没有AM和NM的概念,也没有RM的概念,用户节点直接与master打交道,由driver负责向master申请资源,并由driver进行资源的分配和调度等等。

    2)localCluster模式:

    standalone模式的单机版,master和worker分别运行在一台机器的不同进程上

    3) spark on Yarn模式:

    这是一种很有前景的部署模式。但限于YARN自身的发展,目前仅支持粗粒度模式(Coarse-grained Mode)。这是由于YARN上的Container资源是不可以动态伸缩的,一旦Container启动之后,可使用的资源不能再发生变化,不过这个已经在YARN计划中了。 spark on yarn 的支持两种模式:
    (1) yarn-cluster:适用于生产环境;
    (2) yarn-client:适用于交互、调试,希望立即看到app的输出
    yarn-cluster和yarn-client的区别在于yarn appMaster,每个yarn app实例有一个appMaster进程,是为app启动的第一个container;负责从ResourceManager请求资源,获取到资源后,告诉NodeManager为其启动container。yarn-cluster和yarn-client模式内部实现还是有很大的区别。如果你需要用于生产环境,那么请选择yarn-cluster;而如果你仅仅是Debug程序,可以选择yarn-client。

    4)spark on mesos模式:

    这是很多公司采用的模式,官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。目前在Spark On Mesos环境中,用户可选择两种调度模式之一运行自己的应用程序(可参考Andrew Xia的“Mesos Scheduling Mode on Spark”):

    1) 粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。举个例子,比如你提交应用程序时,指定使用5个executor运行你的应用程序,每个executor占用5GB内存和5个CPU,每个executor内部设置了5个slot,则Mesos需要先为executor分配资源并启动它们,之后开始调度任务。另外,在程序运行过程中,mesos的master和slave并不知道executor内部各个task的运行情况,executor直接将任务状态通过内部的通信机制汇报给Driver,从一定程度上可以认为,每个应用程序利用mesos搭建了一个虚拟集群自己使用。

    2) 细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源。每个Task会汇报状态给Mesos slave和Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于MapReduce调度模式,每个Task完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。


    关于driver

    (一)driver是什么

    这里写图片描述
    上图是官网给的解释,刚看到这句话的时候,首先不知道这个main函数指的是什么,是像C语言中的main函数一样吗,其次看到那句create context让我想到的是源码中有一段代码在某个场合创建了一个driver然后在driver里面创建的context,但看了两天的代码以后,发并没有找到。思考了很久以后才明白driver是什么意思:

    用户提交的应用程序代码在spark中运行起来就是一个driver,用户提交的程序运行起来就是一个driver,他是一个一段特殊的excutor进程,这个进程除了一般excutor都具有的运行环境外,这个进程里面运行着DAGscheduler Tasksheduler Schedulerbackedn等组件。

    下图是个求π值的spark程序
    这里写图片描述
    这段计算π值的程序代码在spark上运行起来就是一个driver,可以看到这段程序里有个main函数,它是整个应用程序的开始,也可以看到在这段代码中创建了context,这样与官网给的解释就完全对上了。

    driver组件的结构图如下,从结构图中可以看出,driver中包括一些在executor上运行进程都有的运行环境sparkenv和一些只有driver才拥有的组件比如SparkUI、SchedulerBackEnd、TaskSchduler、DagScheduler等等,整个大框就是一个sparkcontext
    这里写图片描述
    从源码中我们也能看出sparkcontext包括公共的sparkenv部分和driver特有的组件
    这里写图片描述 这里写图片描述

    Q:一个应用程序是如何与一个driver一一对应的呢,在worker.scala中我们找到了创建driver的代码,一步步往上追溯就能发现一个Driver如何与一个应用程序一一对应起来的
    这里写图片描述

    (二)driver做什么

    这里写图片描述
    上图截于stackoverflow总结了driver的主要功能,总结如下
    - 运行应用程序的main函数
    - 创建spark的上下文
    - 划分RDD并生成有向无环图(DAGScheduler)
    - 与spark中的其他组进行协调,协调资源等等(SchedulerBackend)
    - 生成并发送task到executor(taskScheduler)

    (三)driver运行在哪里(在这里只讨论yarn模式)

    官网上说:There are two deploy modes that can be used to launch Spark application on Yarn.In cluster mode,the Spark driver run inside an application master process.And in the client mode,the driver runs in the client process.

    yarn-cluster模式下

    这里写图片描述
    yarn-cluster模式下,client将用户程序提交到到spark集群中就与spark集群断开联系了,此时client将不会发挥其他任何作用,仅仅负责提交。在此模式下。AM和driver是同一个东西,但官网上给的是driver运行在AM里,可以理解为AM包括了driver的功能就像Driver运行在AM里一样,此时的AM既能够向AM申请资源并进行分配,又能完成driver划分RDD提交task等工作

    yarn-client模式下

    这里写图片描述
    yarn-client模式下,Driver运行在客户端上,先有driver再用AM,此时driver负责RDD生成、task生成和分发,向AM申请资源等 ,AM负责向RM申请资源,其他的都由driver来完成

    总结:

    用户提交的程序运行起来就是一个driver,他是一个一段特殊的excutor进程,这个进程除了一般excutor都具有的运行环境外,这个进程里面运行着DAGscheduler Tasksheduler Schedulerbackedn等组件

    yarn-Cluster模式下driver运行在AM里,这个AM既完成划分RDD生成有向无环图提交task等任务也负责管理与这个application运行有关的executor

    yarn-Client模式下由AM负责管理excutor其余的由driver完成

    展开全文
  • Spark中的Driver

    2018-04-12 19:17:08
    转自:http://www.jobplus.com.cn/article/getArticleDetail/30566spark的四种分布式运行模式如图所示,上方为spark的架构图,spark的组件可以分为四个部分,driver、cluster Manager、worker和executor根据cluster...

    转自:http://www.jobplus.com.cn/article/getArticleDetail/30566

    spark的四种分布式运行模式


    如图所示,上方为spark的架构图,spark的组件可以分为四个部分,driver、cluster Manager、worker和executor
    根据clusterManager的不同,spark可以分成四种不同的运行模式,standalone、local cluster、spark on yarn 和spark on mesos

    standalone模式:

    standalone模式既独立模式,自带完整服务,可单独部署到一个集群中,无需依赖其他任何资源管理系统,只支持FIFO调度器。从一定程度上说,它是spark on yarn 和spark on mesos 的基础。在standalone模式中,没有AM和NM的概念,也没有RM的概念,用户节点直接与master打交道,由driver负责向master申请资源,并由driver进行资源的分配和调度等等。

    localCluster模式:

    standalone模式的单机版,master和worker分别运行在一台机器的不同进程上

    spark on Yarn模式:

    这是一种很有前景的部署模式。但限于YARN自身的发展,目前仅支持粗粒度模式(Coarse-grained Mode)。这是由于YARN上的Container资源是不可以动态伸缩的,一旦Container启动之后,可使用的资源不能再发生变化,不过这个已经在YARN计划中了。 spark on yarn 的支持两种模式:

    (1) yarn-cluster:适用于生产环境;(2) yarn-client:适用于交互、调试,希望立即看到app的输出

    yarn-cluster和yarn-client的区别在于yarn appMaster,每个yarn app实例有一个appMaster进程,是为app启动的第一个container;负责从ResourceManager请求资源,获取到资源后,告诉NodeManager为其启动container。yarn-cluster和yarn-client模式内部实现还是有很大的区别。如果你需要用于生产环境,那么请选择yarn-cluster;而如果你仅仅是Debug程序,可以选择yarn-client。

    spark on mesos模式:

    这是很多公司采用的模式,官方推荐这种模式(当然,原因之一是血缘关系)。正是由于Spark开发之初就考虑到支持Mesos,因此,目前而言,Spark运行在Mesos上会比运行在YARN上更加灵活,更加自然。目前在Spark On Mesos环境中,用户可选择两种调度模式之一运行自己的应用程序(可参考Andrew Xia的“Mesos Scheduling Mode on Spark”):

    1. 粗粒度模式(Coarse-grained Mode):每个应用程序的运行环境由一个Dirver和若干个Executor组成,其中,每个Executor占用若干资源,内部可运行多个Task(对应多少个“slot”)。应用程序的各个任务正式运行之前,需要将运行环境中的资源全部申请好,且运行过程中要一直占用这些资源,即使不用,最后程序运行结束后,回收这些资源。举个例子,比如你提交应用程序时,指定使用5个executor运行你的应用程序,每个executor占用5GB内存和5个CPU,每个executor内部设置了5个slot,则Mesos需要先为executor分配资源并启动它们,之后开始调度任务。另外,在程序运行过程中,mesos的master和slave并不知道executor内部各个task的运行情况,executor直接将任务状态通过内部的通信机制汇报给Driver,从一定程度上可以认为,每个应用程序利用mesos搭建了一个虚拟集群自己使用。

    2. 细粒度模式(Fine-grained Mode):鉴于粗粒度模式会造成大量资源浪费,Spark On Mesos还提供了另外一种调度模式:细粒度模式,这种模式类似于现在的云计算,思想是按需分配。与粗粒度模式一样,应用程序启动时,先会启动executor,但每个executor占用资源仅仅是自己运行所需的资源,不需要考虑将来要运行的任务,之后,mesos会为每个executor动态分配资源,每分配一些,便可以运行一个新任务,单个Task运行完之后可以马上释放对应的资源。每个Task会汇报状态给Mesos slave和Mesos Master,便于更加细粒度管理和容错,这种调度模式类似于MapReduce调度模式,每个Task完全独立,优点是便于资源控制和隔离,但缺点也很明显,短作业运行延迟大。

    关于driver

    首先不知道这个main函数指的是什么,是像C语言中的main函数一样吗,其次看到那句create context让我想到的是源码中有一段代码在某个场合创建了一个driver然后在driver里面创建的context,但看了两天的代码以后,发并没有找到。思考了很久以后才明白driver是什么意思:

    用户提交的应用程序代码在spark中运行起来就是一个driver,用户提交的程序运行起来就是一个driver,他是一个一段特殊的excutor进程,这个进程除了一般excutor都具有的运行环境外,这个进程里面运行着DAGscheduler Tasksheduler Schedulerbackedn等组件。

    这段计算π值的程序代码在spark上运行起来就是一个driver,可以看到这段程序里有个main函数,它是整个应用程序的开始,也可以看到在这段代码中创建了context,这样与官网给的解释就完全对上了。

    Q:一个应用程序是如何与一个driver一一对应的呢,在worker.scala中我们找到了创建driver的代码,一步步往上追溯就能发现一个Driver如何与一个应用程序一一对应起来的

    (二)driver做什么

    运行应用程序的main函数

    • 创建spark的上下文
    • 划分RDD并生成有向无环图(DAGScheduler)
    • 与spark中的其他组进行协调,协调资源等等(SchedulerBackend)
    • 生成并发送task到executor(taskScheduler)

    (三)driver运行在哪里(在这里只讨论yarn模式)

    官网上说:There are two deploy modes that can be used to launch Spark application on Yarn.In cluster mode,the Spark driver run inside an application master process.And in the client mode,the driver runs in the client process.

    yarn-cluster模式下,client将用户程序提交到到spark集群中就与spark集群断开联系了,此时client将不会发挥其他任何作用,仅仅负责提交。在此模式下。AM和driver是同一个东西,但官网上给的是driver运行在AM里,可以理解为AM包括了driver的功能就像Driver运行在AM里一样,此时的AM既能够向AM申请资源并进行分配,又能完成driver划分RDD提交task等工作

    yarn-client模式下

    yarn-client模式下,Driver运行在客户端上,先有driver再用AM,此时driver负责RDD生成、task生成和分发,向AM申请资源等 ,AM负责向RM申请资源,其他的都由driver来完成

    总结

    用户提交的程序运行起来就是一个driver,他是一个一段特殊的excutor进程,这个进程除了一般excutor都具有的运行环境外,这个进程里面运行着DAGscheduler Tasksheduler Schedulerbackedn等组件

    yarn-Cluster模式下driver运行在AM里,这个AM既完成划分RDD生成有向无环图提交task等任务也负责管理与这个application运行有关的executor

    yarn-Client模式下由AM负责管理excutor其余的由driver完成

    展开全文
  • spark windows 下在idea中 作driver 调试 spark(spark本地代码集群运行问题) 18/03/15 19:35:48 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@node1:38126] has...

    spark windows 下在idea中 作driver 调试 spark(spark本地代码集群运行问题)

    18/03/15 19:35:48 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkExecutor@node1:38126] has failed, address is now gated for [5000] ms. Reason is: [org.apache.spark.TaskState$; local class incompatible: stream classdesc serialVersionUID = 746799155515967470, local class serialVersionUID = -2913614267616900700].

    18/03/15 19:34:54 WARN ReliableDeliverySupervisor: Association with remote system [akka.tcp://sparkDriver@chen:2589] has failed, address is now gated for [5000] ms. Reason is: [Disassociated].

    local class incompatible

    Reason is: [Disassociated]

    如果报错,应该仔细检查Worker Executor Driver Master的日志,看看是否有上面这样的内容,上面问题的原因是SCALA版本不一致造成的.

    如何查看集群的SCALA版本

    我是利用 spark-shell来查看的
    Welcome to
    __
    / / _ _/ /__
    \ \/ \/ _ `/ _/ ‘/
    // ./_,// //_\ version 1.3.1
    /_/

    Using.Scala version 2.10.4 (Java HotSpot(TM) 64-Bit Server VM, Java 1.8.0_91)
    Type in expressions to have them evaluated.

    idea中的版本(我是maven导入的)

           <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_**2.10**</artifactId>
                <version>**1.3.1**</version>
            </dependency>
    

    两个数字(2.10,1.3.0)都要注意
    之后就可以在idea中,启动main函数充当driver来调度了

    main中的要点

        val sparkConf = new SparkConf().setAppName("Broadcast Test").
          //集群的地址
          setMaster("spark://node1:7077").
          //先用Maven把IDEA中的项目打成jar包,然后填写这个Jar包的绝对路径,最后在idea中
          //运行就好了
          setJars(Seq("Z:\\sparkwokespace\\spark13\\target\\spark-1.0-SNAPSHOT.jar"))
          val context: SparkContext = new SparkContext(sparkConf)
    

    参考内容
    1 解决在编程方式下无法访问Spark Master问题
    2 spark 笔记1 – spark程序连接

    展开全文
  • 1 Spark运行时的架构 在分布式环境下,Spark 集群采用的是主/ 从结构。在一个Spark 集群中,有一个节点负 责中央协调,调度各个分布式工作节点。这个中央协调节点被称为驱动器(Driver)节点,与之对应的工作节点...

    1 Spark运行时的架构

    在分布式环境下,Spark 集群采用的是主/ 从结构。在一个Spark 集群中,有一个节点负责中央协调,调度各个分布式工作节点。这个中央协调节点被称为驱动器(Driver)节点,与之对应的工作节点被称为执行器(executor)节点。驱动器节点可以和大量的执行器节点进行通信,它们也都作为独立的Java 进程运行。驱动器节点和所有的执行器节点一起被称为一个Spark 应用(application)。
    在这里插入图片描述
    Spark 应用通过一个叫作集群管理器(Cluster Manager)的外部服务在集群中的机器上启动。Spark 自带的集群管理器被称为独立集群管理器。Spark 也能运行在Hadoop YARN 和Apache Mesos 这两大开源集群管理器上。

    1.1 驱动器节点Driver

    Spark 驱动器是执行你的程序中的main() 方法的进程。它执行用户编写的用来创建
    SparkContext、创建RDD,以及进行RDD 的转化操作和行动操作的代码。其实,当你启动Spark shell 时,你就启动了一个Spark 驱动器程序(Spark shell 总是会预先加载一个叫作sc的SparkContext 对象)。驱动器程序一旦终止,Spark 应用也就结束了。

    驱动器程序在Spark 应用中有下述两个职责。

    把用户程序转为任务

    Spark 驱动器程序负责把用户程序转为多个物理执行的单元,这些单元也被称为任务
    (task)。从上层来看,所有的Spark 程序都遵循同样的结构:程序从输入数据创建一系列RDD,再使用转化操作派生出新的RDD,最后使用行动操作收集或存储结果RDD
    中的数据。Spark 程序其实是隐式地创建出了一个由操作组成的逻辑上的有向无环图
    (Directed Acyclic Graph,简称DAG)。当驱动器程序运行时,它会把这个逻辑图转为物理执行计划。

    Spark 会对逻辑执行计划作一些优化,比如将连续的映射转为流水线化执行,将多个操作合并到一个步骤中等。这样Spark 就把逻辑计划转为一系列步骤(stage)。而每个步骤又由多个任务组成。这些任务会被打包并送到集群中。任务是Spark 中最小的工作单元,用户程序通常要启动成百上千的独立任务。

    为执行器节点调度任务

    有了物理执行计划之后,Spark 驱动器程序必须在各执行器进程间协调任务的调度。执行器进程启动后,会向驱动器进程注册自己。因此,驱动器进程始终对应用中所有的执行器节点有完整的记录。每个执行器节点代表一个能够处理任务和存储RDD 数据的进程。

    Spark 驱动器程序会根据当前的执行器节点集合,尝试把所有任务基于数据所在位置分配给合适的执行器进程。当任务执行时,执行器进程会把缓存数据存储起来,而驱动器进程同样会跟踪这些缓存数据的位置,并且利用这些位置信息来调度以后的任务,以尽量减少数据的网络传输。

    驱动器程序会将一些Spark 应用的运行时的信息通过网页界面呈现出来,默认在端口
    4040 上。比如,在本地模式下,访问http://localhost:4040 就可以看到这个网页了。

    1.2 执行器节点Executor

    Spark 执行器节点是一种工作进程,负责在Spark 作业中运行任务,任务间相互独立。Spark 应用启动时,执行器节点就被同时启动,并且始终伴随着整个Spark 应用的生命周期而存在。如果有执行器节点发生了异常或崩溃,Spark 应用也可以继续执行。执行器进程有两大作用:第一,它们负责运行组成Spark 应用的任务,并将结果返回给驱动器进程;第二,它们通过自身的块管理器(Block Manager)为用户程序中要求缓存的RDD 提供内存式存储。RDD 是直接缓存在执行器进程内的,因此任务可以在运行时充分利用缓存数据加速运算。

    在本地模式下,Spark 驱动器程序和各执行器程序在同一个Java 进程中运行。这是一个特例;执行器程序通常都运行在专用的进程中。

    1.3 集群管理器

    Spark 依赖于集群管理器来启动执行器节点,而在某些特殊情况下,也依赖集群管理器来启动驱动器节点。集群管理器是Spark 中的可插拔式组件。这样,除了Spark 自带的独立集群管理器,Spark 也可以运行在其他外部集群管理器上,比如YARN 和Mesos。

    Spark 文档中始终使用驱动器节点和执行器节点的概念来描述执行Spark应用的进程。而主节点(master)和工作节点(worker)的概念则被用来分别表述集群管理器中的中心化的部分和分布式的部分。这些概念很容易混淆,所以要格外小心。例如,Hadoop YARN 会启动一个叫作资源管理器(Resource Manager)的主节点守护进程,以及一系列叫作节点管理器(Node Manager)的工作节点守护进程。而在YARN 的工作节点上,Spark 不仅可以运行执行器进程,还可以运行驱动器进程。

    1.4 启动一个程序

    不论你使用的是哪一种集群管理器,你都可以使用Spark 提供的统一脚本spark-submit 将你的应用提交到那种集群管理器上。通过不同的配置选项,spark-submit 可以连接到相应的集群管理器上,并控制应用所使用的资源数量。在使用某些特定集群管理器时,sparksubmit也可以将驱动器节点运行在集群内部(比如一个YARN 的工作节点)。但对于其他的集群管理器,驱动器节点只能被运行在本地机器上。

    1.5 Spark应用执行流程概述

    (1) 用户通过spark-submit 脚本提交应用。

    (2) spark-submit 脚本启动驱动器程序,调用用户定义的main() 方法。

    (3) 驱动器程序与集群管理器通信,申请资源以启动执行器节点。

    (4) 集群管理器为驱动器程序启动执行器节点。

    (5) 驱动器进程执行用户应用中的操作。根据程序中所定义的对RDD 的转化操作和行动操作,驱动器节点把工作以任务的形式发送到执行器进程。

    (6) 任务在执行器程序中进行计算并保存结果。

    (7) 如果驱动器程序的main() 方法退出,或者调用了SparkContext.stop(),驱动器程序会终止执行器进程,并且通过集群管理器释放资源。

    2 使用spark-submit部署应用

    Spark 为各种集群管理器提供了统一的工具来提交作业,这个工具是sparksubmit。

    bin/spark-submit my_script.py
    

    如果在调用spark-submit 时除了脚本或JAR 包的名字之外没有别的参数,那么这个Spark程序只会在本地执行。当我们希望将应用提交到Spark 独立集群上的时候,可以将独立集群的地址和希望启动的每个执行器进程的大小作为附加标记提供

    bin/spark-submit --master spark://host:7077 --executor-memory 10g my_script.py
    

    –master 标记指定要连接的集群URL;在这个示例中,spark:// 表示集群使用独立模式
    在这里插入图片描述
    除了集群URL,spark-submit 还提供了各种选项,可以让你控制应用每次运行的各项细节。这些选项主要分为两类。第一类是调度信息,比如你希望为作业申请的资源量。第二类是应用的运行时依赖,比如需要部署到所有工作节点上的库和文件。

    spark-submit 的一般格式如下:

    bin/spark-submit [options] <app jar | python file> [app options]
    

    [options] 是要传给spark-submit 的标记列表。你可以运行spark-submit --help 列出所有可以接收的标记。表7-2 列出了一些常见的标记。

    <app jar | python file> 表示包含应用入口的JAR 包或Python 脚本。

    [app options] 是传给你的应用的选项。如果你的程序要处理传给main() 方法的参数,它只会得到[app options] 对应的标记,不会得到spark-submit 的标记。
    在这里插入图片描述下面展示了一些使用各种选项调用spark-submit 的例子,这些调用语句都比较长。

    # 使用独立集群模式提交Java应用
    $ ./bin/spark-submit \
    --master spark://hostname:7077 \
    --deploy-mode cluster \
    --class com.databricks.examples.SparkExample \
    --name "Example Program" \
    --jars dep1.jar,dep2.jar,dep3.jar \
    --total-executor-cores 300 \
    --executor-memory 10g \
    myApp.jar "options" "to your application" "go here"
    
    # 使用YARN客户端模式提交Python应用
    $ export HADOOP_CONF_DIR=/opt/hadoop/conf
    $ ./bin/spark-submit \
    --master yarn \
    --py-files somelib-1.2.egg,otherlib-4.4.zip,other-file.py \
    --deploy-mode client \
    --name "Example Program" \
    --queue exampleQueue \
    --num-executors 40 \
    --executor-memory 10g \
    my_script.py "options" "to your application" "go here"
    

    参考 《Spark快速大数据分析》

    展开全文
  • Spark Driver的启动

    2019-01-29 13:34:12
    Driver 是什么,看一下官方给的解释: The process running the main() function of the application and creating the SparkContext。 意思是运行应用程序的main函数并且创建SparkContext的进程。这里的应用程序...

    1,简介

    Driver 是什么,看一下官方给的解释: The process running the main() function of the application and creating the SparkContext。
    意思是运行应用程序的main函数并且创建SparkContext的进程。这里的应用程序就是我们自己编写并提交给Spark集群的程序。在这里插入图片描述

    上图是Spark程序运行的框架图,总体概括应该这样:首先启动Driver 程序,创建SparkContext程序,然后和ClusterManager通信,ClusterManager根据程序的逻辑,在相应的Worker上启动Executor,最后 Driver 和Executor通信,把任务分发到Executor进行运行。中间还有很多细节,比如任务的调度,DAGScheduler,Shuffle环节等等。后面会做相应的介绍。本篇博客只介绍Driver的启动,源码基于spark-2.4.0版本。

    2,Driver的启动流程

    在上一篇博客中,介绍了Spark Submit的任务提交,采用了传统网关的提交,创建ClientAPP,在Client的onStart方法里面,

    override def onStart(): Unit = {
      driverArgs.cmd match {
        case "launch" =>
          // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
          //       truncate filesystem paths similar to what YARN does. For now, we just require
          //       people call `addJar` assuming the jar is in the same directory.
          val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
    
          val classPathConf = "spark.driver.extraClassPath"
          val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
            cp.split(java.io.File.pathSeparator)
          }
    
          val libraryPathConf = "spark.driver.extraLibraryPath"
          val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
            cp.split(java.io.File.pathSeparator)
          }
    
          val extraJavaOptsConf = "spark.driver.extraJavaOptions"
          val extraJavaOpts = sys.props.get(extraJavaOptsConf)
            .map(Utils.splitCommandString).getOrElse(Seq.empty)
          val sparkJavaOpts = Utils.sparkJavaOpts(conf)
          val javaOpts = sparkJavaOpts ++ extraJavaOpts
          val command = new Command(mainClass,
            Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
            sys.env, classPathEntries, libraryPathEntries, javaOpts)
    
          val driverDescription = new DriverDescription(
            driverArgs.jarUrl,
            driverArgs.memory,
            driverArgs.cores,
            driverArgs.supervise,
            command)
          asyncSendToMasterAndForwardReply[SubmitDriverResponse](
            RequestSubmitDriver(driverDescription))
    
        case "kill" =>
          val driverId = driverArgs.driverId
          asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
      }
    }
    

    上面的onStart方法里,首先是创建driverDescription,然后向Master发送提交Driver的消息。也就是在我们提交程序后,创建的Client会向master发送要启动Driver这样的一个消息。下面就是Master接收到消息后进行相应的处理。下面进入到Master:

    override def receiveAndReply(context: RpcCallContext): PartialFunction[Any, Unit] = {
    //master接收到消息后进行模式匹配
      case RequestSubmitDriver(description) =>
    //首先判断master的状态是否是ALIVE,如果不是,则向
        if (state != RecoveryState.ALIVE) {
          val msg = s"${Utils.BACKUP_STANDALONE_MASTER_PREFIX}: $state. " +
            "Can only accept driver submissions in ALIVE state."
          //如果master的状态不是alive,则发送失败的消息
    context.reply(SubmitDriverResponse(self, false, None, msg))
        } else {
          logInfo("Driver submitted " + description.command.mainClass)
    //根据driverDescription的信息,创建driver
          val driver = createDriver(description)
    //把Driver的信息进行持久化
          persistenceEngine.addDriver(driver)
    //把Driver添加到等待的队列中
          waitingDrivers += driver
    //将Driver添加到Hashset中
          drivers.add(driver)
    //进行调度
          schedule()
    
          // TODO: It might be good to instead have the submission client poll the master to determine
          //       the current status of the driver. For now it's simply "fire and forget".
    
          context.reply(SubmitDriverResponse(self, true, Some(driver.id),
            s"Driver successfully submitted as ${driver.id}"))
        }
    }
    

    Master 接收到client发送的提交Driver的消息后,首先就会创建一个Driver,然后把创建的Driver加入到等待队列,等待后续的调度执行。下面看一下Driver的创建:

    private def createDriver(desc: DriverDescription): DriverInfo = {
      val now = System.currentTimeMillis()
    //创建Date
      val date = new Date(now)
    //把Driver的信息封装为一个DriverInfo的对象
      new DriverInfo(now, newDriverId(date), desc, date)
    }
    Driver创建完成后,就会把这些信息添加到队列中去。最后执行调度,下面看一下调度方法,sheduler:
    private def schedule(): Unit = {
      if (state != RecoveryState.ALIVE) {
        return
      }
      // Drivers take strict precedence over executors
    //把集群上的处于Alive状态的worker随机打乱,放到放到shuffleAliveWorkers中
      val shuffledAliveWorkers = Random.shuffle(workers.toSeq.filter(_.state == WorkerState.ALIVE))
      //统计有多少个Alive状态的worker
    val numWorkersAlive = shuffledAliveWorkers.size
      var curPos = 0
    //遍历Driver
      for (driver <- waitingDrivers.toList) { // iterate over a copy of waitingDrivers
        // We assign workers to each waiting driver in a round-robin fashion. For each driver, we
        // start from the last worker that was assigned a driver, and continue onwards until we have
        // explored all alive workers.
        var launched = false
    //用于统计已经访问的worker数量
        var numWorkersVisited = 0
        while (numWorkersVisited < numWorkersAlive && !launched) {
    //取出shuffledAliveWorkers中第一个worker,
          val worker = shuffledAliveWorkers(curPos)
    //访问的worker数量加1
          numWorkersVisited += 1
    //如果这个worke的资源满足Driver的需求
          if (worker.memoryFree >= driver.desc.mem && worker.coresFree >= driver.desc.cores) {
    //那么就在这个worker上启动Driver
            launchDriver(worker, driver)
    //Driver的等待队列中把这个启动的driver移除
            waitingDrivers -= driver
    //Driver的启动状态标记为是
            launched = true
          }
    //用于遍历下一个worker的参数
          curPos = (curPos + 1) % numWorkersAlive
        }
      }
    //在worker上启动Executor
      startExecutorsOnWorkers()
    }
    

    在执行调度时,会把集群中的worker随机打乱,放到一个数组中,然后遍历这个数组中的worker,如果在这个过程中,worker上的资源能够满足Driver的需求,就在这个worker上启动Driver。下面看一下,Driver的启动,进入launchDriver方法中:

    private def launchDriver(worker: WorkerInfo, driver: DriverInfo) {
      logInfo("Launching driver " + driver.id + " on worker " + worker.id)
    //把Driver的信息添加到worker中
      worker.addDriver(driver)
    //把worker的信息添加到Driver信息里面
      driver.worker = Some(worker)
    //向相应的worker发送LaunchDriver的信息
      worker.endpoint.send(LaunchDriver(driver.id, driver.desc))
    //把Driver的状态标记为RUNNING
      driver.state = DriverState.RUNNING
    }
    

    把worker的信息添加到Driver后,就向相应的worker发送启动Driver的消息,worker接收到消息后,就会执行启动Driver的程序,下面看一下worker接收到消息后,是怎么进行启动Driver的,进入到Worker中,

    //worker的receive方法中根据模式匹配进入下面的代码
    case LaunchDriver(driverId, driverDesc) =>
      logInfo(s"Asked to launch driver $driverId")
    //把Driver的信息封装一个DriverRunner对象
      val driver = new DriverRunner(
        conf,
        driverId,
        workDir,
        sparkHome,
        driverDesc.copy(command = Worker.maybeUpdateSSLSettings(driverDesc.command, conf)),
        self,
        workerUri,
        securityMgr)
    //创建DriverId
      drivers(driverId) = driver
    //启动Driver
      driver.start()
    //更新该worker上用掉的cores数
      coresUsed += driverDesc.cores
    //更新worker上用掉的内存
      memoryUsed += driverDesc.mem
    

    封装好Driver对象后,调用start方法启动Driver

    /** Starts a thread to run and manage the driver. */
    private[worker] def start() = {
    //创建一个新的线程启动Driver
      new Thread("DriverRunner for " + driverId) {
        override def run() {
          var shutdownHook: AnyRef = null
          try {
            shutdownHook = ShutdownHookManager.addShutdownHook { () =>
              logInfo(s"Worker shutting down, killing driver $driverId")
              kill()
            }
    
            // prepare driver jars and run driver
    //获取退出码,根据退出码反应Driver的状态
            val exitCode = prepareAndRunDriver()
    
            // set final state depending on if forcibly killed and process exit code
            finalState = if (exitCode == 0) {
              Some(DriverState.FINISHED)
            } else if (killed) {
              Some(DriverState.KILLED)
            } else {
              Some(DriverState.FAILED)
            }
          } catch {
            case e: Exception =>
              kill()
              finalState = Some(DriverState.ERROR)
              finalException = Some(e)
          } finally {
            if (shutdownHook != null) {
              ShutdownHookManager.removeShutdownHook(shutdownHook)
            }
          }
    
          // notify worker of final driver state, possible exception
    //向worker发送Driver的状态
          worker.send(DriverStateChanged(driverId, finalState.get, finalException))
        }
      }.start()
    }
    

    下面进入到prepareAndRunDriver的方法中:

    private[worker] def prepareAndRunDriver(): Int = {
    //创建Driver的工作目录
      val driverDir = createWorkingDirectory()
    //下载Jar包到该工作目录
      val localJarFilename = downloadUserJar(driverDir)
    //根据参数,匹配相应的模式
      def substituteVariables(argument: String): String = argument match {
        case "{{WORKER_URL}}" => workerUrl
        case "{{USER_JAR}}" => localJarFilename
        case other => other
      }
    
      // TODO: If we add ability to submit multiple jars they should also be added here
    //根据参数创建一个ProcessBuilder,启动Driver的执行命令
      val builder = CommandUtils.buildProcessBuilder(driverDesc.command, securityManager,
        driverDesc.mem, sparkHome.getAbsolutePath, substituteVariables)
    //执行命令启动Driver
      runDriver(builder, driverDir, driverDesc.supervise)
    }
    

    上面代码主要是准备Driver的运行环境,创建启动Driver的执行命令,最后调用runDriver方法,进入到这个方法:

    private def runDriver(builder: ProcessBuilder, baseDir: File, supervise: Boolean): Int = {
    //设置Driver的工作目录
      builder.directory(baseDir)
      def initialize(process: Process): Unit = {
        // Redirect stdout and stderr to files
    //创建stdout文件,把InputStream重定向到stdout文件
        val stdout = new File(baseDir, "stdout")
        CommandUtils.redirectStream(process.getInputStream, stdout)
    //创建stderr文件,为后面保存出现错误信息的日志做准备
        val stderr = new File(baseDir, "stderr")
    //格式化builder命令
        val formattedCommand = builder.command.asScala.mkString("\"", "\" \"", "\"")
        val header = "Launch Command: %s\n%s\n\n".format(formattedCommand, "=" * 40)
    //将出现的错误信息重新定向到stderr文件
        Files.append(header, stderr, StandardCharsets.UTF_8)
        CommandUtils.redirectStream(process.getErrorStream, stderr)
      }
      runCommandWithRetry(ProcessBuilderLike(builder), initialize, supervise)
    }
    

    上面的代码主要是创建一些保存日志的文件,最后调用runCommandWithRetry的方法:

    private[worker] def runCommandWithRetry(
          command: ProcessBuilderLike, initialize: Process => Unit, supervise: Boolean): Int = {
      //设置初始的退出码为-1
    var exitCode = -1
    // Time to wait between submission retries.
    //设置重试的时间间隔
    var waitSeconds = 1
    // A run of this many seconds resets the exponential back-off.
    val successfulRunDuration = 5
    //keepTrying为true
    var keepTrying = !killed
    
    while (keepTrying) {
      logInfo("Launch Command: " + command.command.mkString("\"", "\" \"", "\""))
    
      synchronized {
        //如果是fasle,返回退出码
        if (killed) { return exitCode }
        //执行命令启动,这里其实才是真正启动命令来启动Driver
          process = Some(command.start())
           initialize(process.get)
          }
    
          val processStart = clock.getTimeMillis()
          exitCode = process.get.waitFor()
    
          // check if attempting another run
          keepTrying = supervise && exitCode != 0 && !killed
          if (keepTrying) {
            if (clock.getTimeMillis() - processStart > successfulRunDuration * 1000L) {
              waitSeconds = 1
            }
            logInfo(s"Command exited with status $exitCode, re-launching after $waitSeconds s.")
            sleeper.sleep(waitSeconds)
            waitSeconds = waitSeconds * 2 // exponential back-off
          }
        }
    //返回退出码
        exitCode
      }
    }
    

    再回到start的方法中,根据退出码,返回Driver是FINISHED 或者是KILLED 还是FAILED的状态。把返回的状态发送给Worker,下面看一下Worker接收到消息后的处理:

    case driverStateChanged @ DriverStateChanged(driverId, state, exception) =>
      handleDriverStateChanged(driverStateChanged)
    

    会调用handleDriverStateChanged的方法,进入到该方法:

    private[worker] def handleDriverStateChanged(driverStateChanged: DriverStateChanged): Unit = {
    //获取Driver的ID
      val driverId = driverStateChanged.driverId
      val exception = driverStateChanged.exception
    //获取Driver的状态
      val state = driverStateChanged.state
    //根据状态,输出相应的日志信息
      state match {
        case DriverState.ERROR =>
          logWarning(s"Driver $driverId failed with unrecoverable exception: ${exception.get}")
        case DriverState.FAILED =>
          logWarning(s"Driver $driverId exited with failure")
        case DriverState.FINISHED =>
          logInfo(s"Driver $driverId exited successfully")
        case DriverState.KILLED =>
          logInfo(s"Driver $driverId was killed by user")
        case _ =>
          logDebug(s"Driver $driverId changed state to $state")
      }
    //向Master发送Driver的状态信息
      sendToMaster(driverStateChanged)
    //移除Driver
      val driver = drivers.remove(driverId).get
    //把Driver状态标记为完成
      finishedDrivers(driverId) = driver
    //如果需要则删除链表里面的处于finished状态的Driver
      trimFinishedDriversIfNecessary()
    //更新一下用掉的内存和cores数
      memoryUsed -= driver.driverDesc.mem
      coresUsed -= driver.driverDesc.cores
    }
    主要是worker向Master发送Driver状态改变的消息,master在接收到消息后进行相应的处理:
    进入到Master中:
    case DriverStateChanged(driverId, state, exception) =>
      state match {
        case DriverState.ERROR | DriverState.FINISHED | DriverState.KILLED | DriverState.FAILED =>
    //以上三种状态都会调用removeDriver的方法
          removeDriver(driverId, state, exception)
        case _ =>
          throw new Exception(s"Received unexpected state update for driver $driverId: $state")
      }
    

    Master接收到消息后,就会调用removeDriver的方法移除driver:

    private def removeDriver(
          driverId: String,
          finalState: DriverState,
          exception: Option[Exception]) {
        drivers.find(d => d.id == driverId) match {
          case Some(driver) =>
            logInfo(s"Removing driver: $driverId")
    //把Driver从队列中移除
            drivers -= driver
            if (completedDrivers.size >= RETAINED_DRIVERS) {
              val toRemove = math.max(RETAINED_DRIVERS / 10, 1)
              completedDrivers.trimStart(toRemove)
            }
    //把Driver添加到已完成completeDrivers的数组中
            completedDrivers += driver
    //持久化引擎中也把该Driver移除
            persistenceEngine.removeDriver(driver)
    //更新Driver的状态为最终的状态
            driver.state = finalState
            driver.exception = exception
    //移除worker上的Driver
            driver.worker.foreach(w => w.removeDriver(driver))
    //最后在进行调度
            schedule()
          case None =>
            logWarning(s"Asked to remove unknown driver: $driverId")
        }
      }
    }
    

    以上就是整个Driver的启动,以及完成后被移除的过程,即整个生命周期。

    展开全文
  • 上图为spark的架构图,spark的组件可以分为三个部分,driver、cluster Manager、worker(executor) standalone模式: standalone模式既独立模式,自带完整服务,可单独部署到一个集群中,无需依赖其他任何资源管理...
  • 看到这样的图,我很想知道driver program在哪里啊,鬼知道?为此我自己研究了一下,网友大多都说是对的有不同想法的请评论 2.现在我有三台电脑 分别是 192.168.10.82 –>bigdata01.hzjs.co 192.168.10.83 –>bi
  • Spark中的driver和Executor

    2019-06-21 11:11:59
    Apache Spark使用最先进的DAG调度程序,查询优化器和物理执行引擎,实现批处理和流数据的高性能。这篇文章主要是对driver和executor功能上进行一个分析。 驱动器节点(Driver) Spark的驱动器是执行开发程序中的 ...
  • spark架构 driver worker

    2018-11-21 21:22:08
    当为client模式时,本地提交时,driver程序在堡垒机上运行,所以堡垒机上能看到自己打印的一些日志;线上时,driver程序在客户端节点上执行。客户端节点的资源,决定了提交到集群的任务的并发数,一版为队列中状态的...
  • 介绍在Standalone模式下,从命令行使用spark-submit提交任务开始,到将Driver提交到Master的过程。
  • 一、驱动器节点(Driver)  Spark的驱动器是执行开发程序中的 main...如果你是用spark shell,那么当你启动 Spark shell的时候,系统后台自启了一个 Spark 驱动器程序,就是在Spark shell 中预加载的一个叫作 sc 的...
  • spark driver HA

    2017-10-24 10:18:20
    实验环境:  zookeeper-3.4.6  ...三:提交程序测试HA 一:Spark 构建高可用HA架构    Spark本身是Master和Slave,而这这里的  Master是指Spark资源调度和分配。负责整个集群的资源调度和分
  • 刚刚接触Spark的时候对这些概念没有好好思考,走马观花似的扫过去了,后面碰到master、worker、executor和driver的时候,也就没想太多,最近刚刚跑通了一个spark项目,准备好好研究一下程序的运行原理,却突然发现...
  • 看到这样的图,我很想知道driver program在哪里啊,鬼知道?为此我自己研究了一下,网友大多都说是对的有不同想法的请评论 2.现在我有三台电脑 分别是 192.168.10.82 –>bigdata01.hzjs.co 192.168.10.83 –>...
  • spark本地程序报错

    2020-01-02 11:33:31
    今天执行一个spark本地测试程序时出现以下错误: Exception in thread &quot;main&quot; java.net.BindException:Cannot assign requested address: Service 'sparkDriver' failed after 16 retries (on a...
  • Spark只有在cluster模式下启动时,才会有Driver的资源调度,如果在client模式下启动,Driver就在提交Job的机器上启动。资源调度指的是应用程序获得的计算资源,任务调度是在资源调度的基础上进行的。Master是负责...
  • spark-submit 是在spark安装目录中bin目录下的一个shell脚本文件,用于在集群中启动应用程序(如*.py脚本);对于spark支持的集群模式,spark-submit提交应用的时候有统一的接口,不用太多的设置。 使用spark-...
  • DriverDriverSpark中Application也即代码的发布程序,可以理解为我们编写spark代码的主程序,因此只有一个,负责对spark中SparkContext对象进行创建,其中SparkContext对象负责创建Spark中的RDD(Spark中的基本...
  • spark集群(standalone)提交作业,我们通常用如下命令 ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark://207.184.161.138:7077 \ --executor-memory 20G \ --total-...
1 2 3 4 5 ... 20
收藏数 17,406
精华内容 6,962