精华内容
参与话题
问答
  • Spark提交任务的方式

    千次阅读 2018-09-19 11:15:42
    提交任务的两个命令 spark-submit 程序执行之后,application就会退出。 spark-shell 会一直占有一个application,手动退出。 ctrl + c  spark-shell  是一个交互式的命令行,主要用于测试。 spark-shell脚本,...

    提交任务的两个命令

    spark-submit 程序执行之后,application就会退出。
    spark-shell 会一直占有一个application,手动退出。 ctrl + c 


    spark-shell 

    是一个交互式的命令行,主要用于测试。

    spark-shell脚本,实际上调用的是spark-submit脚本:

    spark-shell --master spark://hadoop01:7077

    在spark-shell中,已经为我们初始化好了一个SparkContext,名称是sc。

    在写spark程序之前,必须创建SparkContext实例对象。


    spark-submit

    local  

    本地模式  开箱即用,不指定master 
    或者  
    --master  local  local[N]  local[*]
    local: 只是用一个cores
    local[N]  : 使用N个cores
    local[*] : 使用当前机器的所有的可以用的cores

    注 : local 模式运行程序时 , 不能在 Spark 监控页面中进行查看

    以求圆周率为例 : 

    spark-submit --class org.apache.spark.examples.SparkPi /usr/local/spark-2.2.1-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.1.jar 1000


    standalone

    spark本身提供的集群模式
    --master spark://host:port 

    以求圆周率为例

    --master spark://hadoop01:7077 指定提交到集群

    spark-submit --master spark://hadoop01:7077 --class org.apache.spark.examples.SparkPi /usr/local/spark-2.2.1-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.1.jar 1000

    提交到集群中是可以在Spark监控页面中查看到的

    运行结果 :

    运行完以后 , 在监控页面中可以看到Completed中有显示任务完成


    Yarn

    统一的资源调度平台
    --master yarn 


    mesos

    类似于yarn,资源调度平台
    --master  mesos://host:port

    展开全文
  • Spark之——Spark Submit提交应用程序

    万次阅读 2018-06-19 21:44:36
    对于spark支持的集群模式,spark-submit提交应用的时候有统一的接口,不用太多的设置。 使用spark-submit时,应用程序的jar包以及通过—jars选项包含的任意jar文件都会被自动传到集群中。spa...
    本部分来源,也可以到spark官网查看英文版。 
    spark-submit 是在spark安装目录中bin目录下的一个shell脚本文件,用于在集群中启动应用程序(如*.py脚本);对于spark支持的集群模式,spark-submit提交应用的时候有统一的接口,不用太多的设置。

    使用spark-submit时,应用程序的jar包以及通过—jars选项包含的任意jar文件都会被自动传到集群中。

    spark-submit --class   --master  --jars 

    1、绑定应用程序依赖

    如果代码依赖于其它项目,为了将代码分发到Spark集群,就需要将这些依赖一起打包到应用程序中去。sbt和Maven都有装配插件,只要在创建集成的jar时列出Spark和Hadoop需要的依赖,而不需要将这些依赖和应用打包,因为在程序运行的时候集群的master知道如何调用和提供这些依赖;但是一旦有集成好的jar包,在执行bin/spark-submit脚本时就坐传递这些jar包了。

    对于Python语言来讲,可以使用spark-submit的–py-files参数添加.py,.zip,.egg文件和应用程序一起进行分发,如果应用程序依赖于多个Python文件,建议将它们打包成.zip或.egg文件。

    2、用spark-submit启动应用程序

    如果打包了应用程序,就可以使用bin/spark-submit脚本启动应用程序了,这个脚本可以设置Spark类路径(classpath)和应用程序依赖包,并且可以设置不同的Spark所支持的集群管理和部署模式。提交任务后,无论是Standalone模式还是Spark on Yarn模式,都可以通过Web地址http://:4040来查看当前运行状态(具体访问地址需要查看spark搭建时的配置文件)。
    spark-submit提交应用的大致格式如下:

    ./bin/spark-submit \
    --class <main-class> \
    --master <master-url> \
    --deploy-mode <deploy-mode> \
    --conf <key>=<value> \
    ... # other options
    <application-jar> \
    [application-arguments]
    用得较多的参数是:
    --class:应用程序的入口点(例如,org.apache.spark.examples.SparkPi)
    --master:集群的master URL(例如,spark://localhost:7077)
    --deploy-mode:将driver部署到worker节点(cluster模式)或者作为外部客户端部署到本地(client模式),默认情况下是client模式
    --conf:用key=value格式强制指定Spark配置属性,用引号括起来
    --application-jar:包含应用程序和所有依赖的jar包的路径,路径必须是在集群中是全局可见的,例如,hdfs://路径或者file://路径
    --application-arguments:传递给主类中main函数的参数

    一般的部署策略是在一个网关机器上提交应用程序,这个机器和Worker机器部署在一个网络中(例如,Standalone模式的EC2集群中的Master节点)。在此部署策略中,client模式更为合适,client模式中的driver直接跟spark-submit进程一起启动,spark-submit进程在此扮演集群中一个client的角色。应用程序的输入输出依赖于控制台,如此一来,这种模式就特别适合关于REPL(例如,Spark shell)的应用程序。
    另一种部署策略是,应用程序通过一台远离Worker节点的机器提交(例如,本地或者便携设备),这种情况下,一般使用cluster模式最小化drivers和executors之间的网络延时。注意,cluster模式暂时不支持于Mesos集群或Python应用程序。

    Python应用程序中,简单地在application-jar处传递一个.py文件而不是JAR文件,然后用–py-files添加Python.zip,.egg或者.py文件到搜索路径。

    还有一些集群管理器正在使用的可选项。例如,对于Spark Standalone的cluster部署模式,也可以使用–supervise以确定driver在遇到非0(non-zero)退出码的错误时进行自动重启。
    通过运行spark-submit上–help列出所有的可选项。以下是一些常用选项的例子:

    # Run application locally on 8 cores
    ./bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master local[8] \
      /path/to/examples.jar \
      100
    
    # Run on a Spark standalone cluster in client deploy mode
    ./bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master spark://207.184.161.138:7077 \
      --executor-memory 20G \
      --total-executor-cores 100 \
      /path/to/examples.jar \
      1000
    # Run on a Spark standalone cluster in cluster deploy mode with supervise
    ./bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master spark://207.184.161.138:7077 \
      --deploy-mode cluster \
      --supervise \
      --executor-memory 20G \
      --total-executor-cores 100 \
      /path/to/examples.jar \
      1000
    
    # Run on a YARN cluster
    export HADOOP_CONF_DIR=XXX
    ./bin/spark-submit \
    --class org.apache.spark.examples.SparkPi \
    --master yarn-cluster \  # can also be`yarn-client` for client mode
    --executor -memory 20G \
    --num-executors 50 \
    /path/to/examples.jar \
    1000
    
    # Run a Python application on a Spark standalone cluster
    ./bin/spark-submit \
      --master spark://207.184.161.138:7077 \
      examples/src/main/python/pi.py \
      1000
    
    # Run on a Mesos cluster in cluster deploy mode with supervise
    ./bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master mesos://207.184.161.138:7077 \
      --deploy-mode cluster \
      --supervise \
      --executor-memory 20G \
      --total-executor-cores 100 \
      http://path/to/examples.jar \
      1000

    3、Master URLs

    传递给Spark的master url可以是以下任意格式之一:

    master URL 意义
    local 使用1个worker线程本地运行Spark(即完全没有并行化)
    local[K] 使用K个worker线程本地运行Spark(最好将K设置为机器的CPU核数)
    local[*] 根据机器的CPU逻辑核数,尽可能多地使用Worker线程
    spark://HOST:PORT 连接到给定的Spark Standalone集群的Master,此端口必须是Master配置的端口,默认为7077
    mesos://HOST:PORT 连接到给定的Mesos集群的Master,此端口必须是Master配置的端口,默认为5050。若Mesos集群使用ZooKeeper,则master URL使用mesos://zk://……
    yarn-client 以client模式连接到YARN集群,集群位置将通过HADOOP_CONF_DIR环境变量获得
    yarn-cluster 以cluster模式连接到YARN集群,集群位置将通过HADOOP_CONF_DIR环境变量获得

    4、从文件中加载配置

    spark-submit脚本可以通过属性文件加载默认的Spark配置值并将其传递给应用程序。默认情况下会读取Spark目录中conf/spark-default.conf文件中的各配置项,详细信息参考“加载默认配置”。

    加载默认配置可以取消spark-submit命令的某些参数选项。例如,如果设置了spark.master属性,那么可以直接省略 –master选项。一般情况下,直接使用SparkConf设置的属性值具有最高的优先级,然后是spark-submit命令中传递的选项,最后才是默认配置文件中的值。如果你不清楚配置选项是来自于哪里,可以运行spark-submit –verbose打印处更细粒度的调试信息。

    5、高级依赖管理

    当使用spark-submit时,应用程序的jar包以及由–jars选向给出的jar会自动上传到集群,由–jars给出的jar路径之间必做用逗号分隔,如果路径是个目录的话,–jars的设置无法起作用,必须详细到abc.jar。
    Spark使用了下面的URL格式允许不同的jar包分发策略。
    1、文件file方式:
    绝对路径且file:/URIs是作为driver的HTTP文件服务器,且每个executor会从driver的HTTP服务器拉取文件;
    2、hdfs方式:
    http:,https:,ftp:,从这些给定的URI中拉取文件和JAR包;
    3、本地local方式:
    以local:/开始的URI应该是每个worker节点的本地文件,这意味着没有网络IO开销,并且推送或通过NFS/GlusterFS等共享到每个worker大文件/JAR文件或能很好的工作。

    注意:SparkContext的JAR包和文件都会被复制到每个executor节点的工作目录下,这将用掉大量的空间,所以需要清理干净。
    在YARN下,会自动清理。
    在Spark Standalone下,可以通过配置spark.worker.cleanup.appDataTtl属性做到自动清理。
    用户可以用--packages选项提供一个以逗号分隔的maven清单来包含任意其他依赖。
    其它的库(或SBT中的resolvers)可以用--repositories选项添加(同样用逗号分隔),这些命令都可以用在pyspark,spark-shell和spark-submit中来包含一些Spark包。

    对Python而言,–py-files选项可以用来向executors分发.egg,.zip和.py库。

    6、在YARN集群上运行Spark应用

    详细请参考官网
    在Spark独立集群模式,–master 所提供的地址是由Spark自身给出,在yarn模式下,Spark资源管理器的地址其实也就是hadoop的master地址,因而–master 的地址应该由yarn来提供。
    格式如下:

    $ ./bin/spark-submit --class path.to.your.Class --master yarn --deploy-mode cluster [options] <app jar> [app options]
    下面是一个更详细的例子:
    $ ./bin/spark-submit --class org.apache.spark.examples.SparkPi \
        --master yarn \
        --deploy-mode cluster \
        --driver-memory 4g \
        --executor-memory 2g \
        --executor-cores 1 \
        --queue thequeue \
        lib/spark-examples*.jar \
        10

    执行的过程大致如下:
    上面的应用提交后,会启动一个yarn客户端程序,这个程序接着启动master上的应用程序,SparkPi 再启动master应用的不同线程,yarn客户端程序会周期性地检查master应用的状态,并在控制台打印这些状态,程序结束后,yarn客户端会结束即出。我们可以通过查看驱动器和执行器的日志文件来调试应用程序。

    先放在这里,有时间再来补上

    7、运行 Spark Python 应用

    主要是要解决如何将依赖库一起提交的问题。
    方法一:将库以python文件的方式提交。
    首先要理解的,我们在安装python外部库的时候,该库的功能主要还是靠abc.py这样的类文件来实现,当我们用类似from spark_learning.utils.default_utils import setDefaultEncoding的时候,是从这个类文件中引入了某个函数。所以当我们要将某个库提交给集群的时候,可以找到相关的类文件,然后将这个类文件提交上去。
    可参考下面的格式(可能有错):

    #python代码中的import
    from spark_learning.utils.default_utils import setDefaultEncoding,initSparkContext,ensureOffset
    #submit命令:
    bin/spark-submit --jars /home/jabo/software/spark-1.5.2-bin-hadoop2.6/lib/spark-streaming-kafka-assembly_2.10-1.5.2.jar \
    --py-files /home/jabo/spark-by-python/spark_learning/utils/default_utils.py \
    /home/jabo/spark-by-python/spark_learning/third_day/streaming_kafka_avg.py

    下面是比较正规的做法:
    来源
    用 Java 和 Scala 访问 Spark 提供了许多优点 : 因为 Spark 它自己运行在 JVM 中,运行在 JVM 内部是平台无关的,独立的代码包和它打入到 JAR 文件中的依赖,以及更高的性能。如果您使用 Spark Python API 将会失去这些优势。

    管理依赖并让它们可用于群集上的 Python Job 是很难的。为了确定哪些依赖在群集上是需要的,您必须了解运行在分布式 群集 中的 Spark executor 进程中的 Spark 应用程序的代码。如果您定义的 Python 转换使用的任何的第三方库,比如 NumPy 或者 nltk,当它们运行在远程的 executor 上时 Spark executor 需要访问这些库。

    7.1、独立的依赖关系

    常见的情况中,一个自定义的 Python 包包含了你想要应用到每个 RDD 元素的 功能。下面展示了一个简单的例子 :

    def import_my_special_package(x):
      import my.special.package
      return x
    
    int_rdd = sc.parallelize([1, 2, 3, 4])
    int_rdd.map(lambda x: import_my_special_package(x))
    int_rdd.collect()

    您创建了一个有 4 个元素名为 int_rdd 的简单的 RDD。然后您应用函数 import_my_special_package 到每个 int_rdd 的元素。这个函数导入了 my.sepcial.package 并且返回了传入的原始参数。这样和使用类或者定义的函数有相同的作用,因为 Spark 需要每个 Spark executor 在需要时导入 my.special.package。

    如果你只需要 my.special.package 内部一个简单的文件,您可以通过在您的 spark-submit 命令中使用 –py-files 选项并指定文件的路径来直接让 Spark 的所有 executor 都可以获取。您也可以以编程的方式通过使用 sc.addPyFiles() 函数来指定。如果您使用的功能来自跨越多个文件的 package,为 package 制作一个 egg,因为 –py-files 标记也接受一个 egg 文件的路径。
    如果您有一个独立的依赖关系,您可以使用两种方式让需要的 Python 依赖是可用的。

    • 如果您只依赖一个单独的文件,您可以使用 –py-files 命令行选项,或者以编程的方式用 sc.addPyFiles(path) 兵指定本地 Python 文件的路径添加这些文件到 SparkContext。
    • 如果您有一个独立模块上的依赖(一个没有其它依赖的模块),您可以创建一个这些模块的 egg 或者 zip 文件,使用 –py-files 命令行选项或者以编程的方式用 sc.addPyFiles(path) 兵指定本地 Python 文件的路径添加这些文件到 SparkContext。

    7.2、复杂的依赖关系

    一些操作依赖有许多依赖关系的复杂的 package。例如,下列的代码片段导入了 Python pandas 数据分析库 :

    def import_pandas(x):
     import pandas
     return x
    
    int_rdd = sc.parallelize([1, 2, 3, 4])
    int_rdd.map(lambda x: import_pandas(x))
    int_rdd.collect()

    pandas 依赖 NumPy,SciPi,和许多其它的 package。尽管 pandas 作为一个 *.py 文件来分发是很复杂的,您可以为它和它的依赖建一个 egg 并发送它们到 executor。

    7.3、分发 Egg 文件的限制

    在这两个独立的,复杂的依赖关系的情况中,发送 egg 文件是有问题的,因为带有原代码包必须在其上运行的特定主机进行编译。当行业业标准的硬件做分布式计算,你必须假设的是,硬件是多样化的。然而,由于所需的 C 编译,内置客户端主机上一个 Python egg 是特定的客户端的 CPU 架构。因此,分配复杂的 egg,像编译 NumPy,SciPy 的 package 和 pandas 经常出现故障。相反,分发 egg 文件,你应该在群集的每个主机上安装所需的 Python 包,并指定 Python 的二进制文件的路径为 worker 主机使用。
    安装以及保持 Python 环境
    安装和维护的Python环境可能比较复杂,但可以让你使用完整的Python包生态系统。系统管理员在用您需要的依赖在群集的每台主机上安装 Anaconda distribution 或者设置一个 虚拟环境。
    如果您使用 Cloudera Manager,您可以使用方法将 Anaconda distribution 作为一个 Parcel 来部署。

    最低需要的角色 : 群集管理员(或者管理员)

    • 添加下面的 URL https://repo.continuum.io/pkgs/misc/parcels/ 到远程的 Parcel 仓库 URL,像 Parcel 配置设置 中描述的一样。
    • 像 管理 Parcels 中描述的一样下载,分发,并且激活 Parcel。

    Anaconda 被安装在 parcel 目录/Anaconda 中,parcel 目录默认是 /opt/cloudera/parcels,但是可以在 parcel 配置设置中更改。Anaconda parcel 支持 Continuum Analytics。

    如果您没有使用 Cloudera Manager,您可以在您的群集中安装一个虚拟环境通过在每台主机上运行命令 Cluster SSH,Parallel SSH,或者 Fabric。假设每台主机已经安装了 Python 和 pip,在一个 RHEL 6 兼容的系统中上的虚拟环境中使用下面的命令来安装标准的数据栈(NumPy,SciPy,scikit-learn,和 pandas)。

    # Install python-devel:
    yum install python-devel
    
    # Install non-Python dependencies required by SciPy that are not installed by default:
    yum install atlas atlas-devel lapack-devel blas-devel
    
    # install virtualenv:
    pip install virtualenv
    
    # create a new virtualenv:
    virtualenv mynewenv
    
    # activate the virtualenv:
    source mynewenv/bin/activate
    
    # install packages in mynewenv:
    pip install numpy
    pip install scipy
    pip install scikit-learn
    pip install pandas

    7.4、设置 Python 路径

    之后您想要使用的 Python 包在您群集中一致的位置,如下所示设置相应的环境变量路径到您的 Python 可执行的文件 :

    • Client 模式 - 用 PYSPARK_PYTHON 设置 executor 路径,用 PYSPARK_DRIVER_PYTHON 设置 driver 路径。
    • Cluster 模式 - 用 spark.yarn.appMasterEnv.PYSPARK_PYTHON 设置 executor 路径,用 spark.yarn.appMasterEnv.PYSPARK_DRIVER_PYTHON 设置 driver 路径。

    为了使这些变量一致,添加相应的 export 语句 :

    • Anaconda parcel - export 变量 /opt/cloudera/parcels/Anaconda/bin/python
    • Virtual environment - export 变量 /path/to/mynewenv/bin/python

    到 sparl-env.sh,检查其他用户没有用条件测试的变量,比如 :

    if [ -z "${PYSPARK_PYTHON}" ]; then
    export PYSPARK_PYTHON=
    fi

    在 Cloudera Manager 中,如下所示在 spark-env.sh 中设置环境变量 :

    最低需求角色 : 配置员(也可以用群集管理员,管理员)

    • 转到 Spark 服务。
    • 点击 配置 标签。
    • 搜索 spark-conf/spark-env.sh 的 Spark 服务高级配置代码段(安全阀)。
    • 添加变量到属性中。
    • 点击 保存更改 以提交更改。
    • 重启服务。
    • 部署客户端配置。

    在命令行中,在 /etc/spark/conf/spark-env.sh 中设置环境变量。

    8、最终提交spark作业

    更详细可参考文档

    8.1、用yarn-client模式提交spark作业

    在/usr/local/spark目录下创建文件夹

    vi spark_pi.sh
    shell文件的内容如下:
    $SPARK_HOME/bin/spark-submit \
    --class org.apache.spark.examples.JavaSparkPi \
    --master yarn-client \
    --num-executors 1 \
    --driver-memory 1g \
    --executor-memory 1g \
    --executor-cores 1 \
    
    $SPARK_HOME/lib/spark-examples-1.6.1-hadoop2.6.0.jar \
    或者
    [spark@master ~]$  $SPARK_HOME/bin/spark-submit  \
    > --class org.apache.spark.examples.JavaSparkPi \
    > --master yarn-cluster \
    > --num-executors 1 \
    > --driver-memory 1g \
    > --executor-memory 1g \
    > --executor-cores 1 \
    >  $SPARK_HOME/lib/spark-examples-1.6.1-hadoop2.6.0.jar
    修改该shell文档权限
    chmod 777 spark_pi.sh
    然后运行以下代码就可启动应用程序
    ./spark_pi.sh

    当然也可以在linux下设置定时器来定时运行该应用程序。

    8.2、用yarn-cluster模式提交spark作业

    在/usr/local/spark目录下创建文件夹

    vi spark_pi.sh
    shell文件的内容如下:
    $SPARK_HOME/bin/spark-submit \
    --class org.apache.spark.examples.JavaSparkPi \
    --master yarn-client \
    --num-executors 1 \
    --driver-memory 1g \
    --executor-memory 1g \
    --executor-cores 1 \
    
    $SPARK_HOME/lib/spark-examples-1.6.1-hadoop2.6.0.jar \
    或者
    [spark@master ~]$  $SPARK_HOME/bin/spark-submit  \
    > --class org.apache.spark.examples.JavaSparkPi \
    > --master yarn-cluster \
    > --num-executors 1 \
    > --driver-memory 1g \
    > --executor-memory 1g \
    > --executor-cores 1 \
    >  $SPARK_HOME/lib/spark-examples-1.6.1-hadoop2.6.0.jar
    修改该shell文档权限
    chmod 777 spark_pi.sh
    然后运行以下代码就可启动应用程序
    ./spark_pi.sh
    当然也可以在linux下设置定时器来定时运行该应用程序


    展开全文
  • Spark-submit提交任务到集群

    万次阅读 2018-05-28 15:50:33
    1 IDEA 打包示例代码参考AMPCamp2015之SparkSQL,开发环境使用idea。首先需要将程序打包成jar选择project structure --》artifacts ,由于集群已经有了运行依赖的jar包,打包时可以将这些jar包排除出去,以减小打包...

    1 IDEA 打包

    示例代码参考AMPCamp2015之SparkSQL,开发环境使用idea。首先需要将程序打包成jar


    选择project structure --》artifacts ,由于集群已经有了运行依赖的jar包,打包时可以将这些jar包排除出去,以减小打包后的jar包的大小。


    点击ok,然后选择build --》build artifacts,SparkSQLTest --》build,然后在工程目录的子目录下会生成j对应的jar文件:


    2 提交任务

    首先启动集群,然后客户端来到spark-submit目录:/app/hadoop/spark131/bin


    spark-submit通过命令行的方式提交任务,具体参数信息如下参考


    提交示例如下:


    在命令行输入如下命令:
    ./spark-submit  --class SparkSQLTest --master spark:hadoop1:7077 --executor-memory 2g --num-executors 3  /home/kaiseu/MyProject/IdeaProjects/SparkProject/Saprk131/out/artifacts/SparkSQLTest_jar/SparkSQLTest.jar



    在web监控界面可以看到:


    3 运行结果

    结果如下:


    展开全文
  • Spark Submit任务提交流程

    千次阅读 2019-01-25 16:49:14
    1,简介 在上一篇博客中,我们详细介绍了...本篇博客就主要介绍Spark Submit提交任务的流程。 2,Spark 任务的提交 我们可以从spark 的官网看到,spark-submit的提交格式如下: ./bin/spark-submit –class –ma...

    1,简介

    在上一篇博客中,我们详细介绍了Spark Standalone模式下集群的启动流程。在Spark 集群启动后,我们要想在集群上运行我们自己编写的程序,该如何做呢?本篇博客就主要介绍Spark Submit提交任务的流程。

    2,Spark 任务的提交

    我们可以从spark 的官网看到,spark-submit的提交格式如下:
    ./bin/spark-submit
    –class
    –master
    –deploy-mode
    –conf =
    … # other options

    [application-arguments]
    • --class: The entry point for your application (e.g. org.apache.spark.examples.SparkPi) 应用程序的入口
    • --master: The master URL for the cluster (e.g. spark://23.195.26.187:7077) master 的URL
    • --deploy-mode: Whether to deploy your driver on the worker nodes (cluster) or locally as an external client (client) (default: client) † 集群的部署模式
    • --conf: Arbitrary Spark configuration property in key=value format. For values that contain spaces wrap “key=value” in quotes (as shown). Spark的配置文件
    • application-jar: Path to a bundled jar including your application and all dependencies. The URL must be globally visible inside of your cluster, for instance, an hdfs:// path or a file:// path that is present on all nodes. 自己编写的jar包的路径
    • application-arguments: Arguments passed to the main method of your main class, if any 需要传入的参数
    一个具体的实例如下:
    #Run on a Spark standalone cluster in cluster deploy mode with supervise
    ./bin/spark-submit
    –class org.apache.spark.examples.SparkPi
    –master spark://207.184.161.138:7077
    –deploy-mode cluster
    –supervise
    –executor-memory 20G
    –total-executor-cores 100
    /path/to/examples.jar
    1000
    提交任务要使用$SPARK_HOME下bin目录里面的spark-submit脚本,我们来分析一下这个脚本:

    //判断SPARK_HOME的目录是否存在
    
    if [ -z "${SPARK_HOME}" ]; then
      source "$(dirname "$0")"/find-spark-home
    fi
    # disable randomized hash for string in Python 3.3+
    export PYTHONHASHSEED=0
    //调用bin目录下的spark-class脚本
    exec "${SPARK_HOME}"/bin/spark-class org.apache.spark.deploy.SparkSubmit "$@"
    

    我们再次进入spark-class的脚本:

    //判断SPARK_HOME的目录是否存在
    if [ -z "${SPARK_HOME}" ]; then
      source "$(dirname "$0")"/find-spark-home
    fi
    //加载spark-env.sh 文件
    . "${SPARK_HOME}"/bin/load-spark-env.sh
    # Find the java binary
    //检测java的路径
    if [ -n "${JAVA_HOME}" ]; then
      RUNNER="${JAVA_HOME}/bin/java"
    else
      if [ "$(command -v java)" ]; then
        RUNNER="java"
      else
        echo "JAVA_HOME is not set" >&2
        exit 1
      fi
    fi
    
    # Find Spark jars.
    //检测jars是否存在
    if [ -d "${SPARK_HOME}/jars" ]; then
      SPARK_JARS_DIR="${SPARK_HOME}/jars"
    else
      SPARK_JARS_DIR="${SPARK_HOME}/assembly/target/scala-$SPARK_SCALA_VERSION/jars"
    fi
    
    if [ ! -d "$SPARK_JARS_DIR" ] && [ -z "$SPARK_TESTING$SPARK_SQL_TESTING" ]; then
      echo "Failed to find Spark jars directory ($SPARK_JARS_DIR)." 1>&2
      echo "You need to build Spark with the target \"package\" before running this program." 1>&2
      exit 1
    else
      LAUNCH_CLASSPATH="$SPARK_JARS_DIR/*"
    fi
    
    # Add the launcher build dir to the classpath if requested.
    if [ -n "$SPARK_PREPEND_CLASSES" ]; then
      LAUNCH_CLASSPATH="${SPARK_HOME}/launcher/target/scala-$SPARK_SCALA_VERSION/classes:$LAUNCH_CLASSPATH"
    fi
    
    # For tests
    if [[ -n "$SPARK_TESTING" ]]; then
      unset YARN_CONF_DIR
      unset HADOOP_CONF_DIR
    fi
    
    # The launcher library will print arguments separated by a NULL character, to allow arguments with
    # characters that would be otherwise interpreted by the shell. Read that in a while loop, populating
    # an array that will be used to exec the final command.
    #
    # The exit code of the launcher is appended to the output, so the parent shell removes it from the
    # command array and checks the value to see if the launcher succeeded.
    build_command() {
    //执行org.apache.spark.launcher.Main的main函数,解析参数
      "$RUNNER" -Xmx128m -cp "$LAUNCH_CLASSPATH" org.apache.spark.launcher.Main "$@"
      printf "%d\0" $?
    }
    
    # Turn off posix mode since it does not allow process substitution
    set +o posix
    CMD=()
    while IFS= read -d '' -r ARG; do
    把命令添加到CMD中
      CMD+=("$ARG")
    //调用方法创建执行命令
    done < <(build_command "$@")
    
    COUNT=${#CMD[@]}
    LAST=$((COUNT - 1))
    LAUNCHER_EXIT_CODE=${CMD[$LAST]}
    
    # Certain JVM failures result in errors being printed to stdout (instead of stderr), which causes
    # the code that parses the output of the launcher to get confused. In those cases, check if the
    # exit code is an integer, and if it's not, handle it as a special error case.
    if ! [[ $LAUNCHER_EXIT_CODE =~ ^[0-9]+$ ]]; then
      echo "${CMD[@]}" | head -n-1 1>&2
      exit 1
    fi
    
    if [ $LAUNCHER_EXIT_CODE != 0 ]; then
      exit $LAUNCHER_EXIT_CODE
    fi
    
    CMD=("${CMD[@]:0:$LAST}")
    //执行命令
    exec "${CMD[@]}"
    

    spark-class的脚本,最主要的就是解析参数,以及创建执行命令,把命令交给spark-class的CMD进行执行。我们进入org.apache.spark.launcher.Main这个类里面,然后看一main函数:

    public static void main(String[] argsArray) throws Exception {
    //检查参数
      checkArgument(argsArray.length > 0, "Not enough arguments: missing class name.");
    //创建一个链表存放参数
      List<String> args = new ArrayList<>(Arrays.asList(argsArray));
    //我们提交任务时,第一个参数就是要进入org.apache.spark.deploy.SparkSubmit,它也是第一个参数,把这个参数移除,剩下的参数组成args
      String className = args.remove(0);
    
      boolean printLaunchCommand = !isEmpty(System.getenv("SPARK_PRINT_LAUNCH_COMMAND"));
      Map<String, String> env = new HashMap<>();
      List<String> cmd;
    //className
      if (className.equals("org.apache.spark.deploy.SparkSubmit")) {
        try {
    //创建一个命令解析器,创建spark-class中exec执行的command
          AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(args);
          cmd = buildCommand(builder, env, printLaunchCommand);
        } catch (IllegalArgumentException e) {
          printLaunchCommand = false;
          System.err.println("Error: " + e.getMessage());
          System.err.println();
    
          MainClassOptionParser parser = new MainClassOptionParser();
          try {
    //解析参数
            parser.parse(args);
          } catch (Exception ignored) {
            // Ignore parsing exceptions.
          }
    
          List<String> help = new ArrayList<>();
          if (parser.className != null) {
    //把CLASS和classname加入到help链表中
            help.add(parser.CLASS);
            help.add(parser.className);
          }
          help.add(parser.USAGE_ERROR);
          AbstractCommandBuilder builder = new SparkSubmitCommandBuilder(help);
          cmd = buildCommand(builder, env, printLaunchCommand);
        }
      } else {
        AbstractCommandBuilder builder = new SparkClassCommandBuilder(className, args);
        cmd = buildCommand(builder, env, printLaunchCommand);
      }
    //如果是window环境,那么就创建windows环境下的命令
      if (isWindows()) {
        System.out.println(prepareWindowsCommand(cmd, env));
      } else {
        // In bash, use NULL as the arg separator since it cannot be used in an argument.
        List<String> bashCmd = prepareBashCommand(cmd, env);
        for (String c : bashCmd) {
          System.out.print(c);
          System.out.print('\0');
        }
      }
    }
    

    上面最重要的是完成了一些参数的解析,参数解析正确后会把需要执行的命令加进CMD的数组中,在spark-class的脚本中进行执行,然后进入到org.apache.spark.deploy.SparkSubmit这个类中,看一下main函数:

    override def main(args: Array[String]): Unit = {
    //创建一个SparkSubmit的对象,
      val submit = new SparkSubmit() {
        self =>
    
        override protected def parseArguments(args: Array[String]): SparkSubmitArguments = {
    //创建Spark的提交参数
          new SparkSubmitArguments(args) {
            override protected def logInfo(msg: => String): Unit = self.logInfo(msg)
    
            override protected def logWarning(msg: => String): Unit = self.logWarning(msg)
          }
        }
    
        override protected def logInfo(msg: => String): Unit = printMessage(msg)
    
        override protected def logWarning(msg: => String): Unit = printMessage(s"Warning: $msg")
    
        override def doSubmit(args: Array[String]): Unit = {
          try {
    //调用父类的doSubmit方法
            super.doSubmit(args)
          } catch {
            case e: SparkUserAppException =>
              exitFn(e.exitCode)
          }
        }
    
      }
    
      submit.doSubmit(args)
    }
    

    再进入到doSubmit的方法中

    def doSubmit(args: Array[String]): Unit = {
      // Initialize logging if it hasn't been done yet. Keep track of whether logging needs to
      // be reset before the application starts.
      val uninitLog = initializeLogIfNecessary(true, silent = true)
    //获取解析的参数
      val appArgs = parseArguments(args)
      if (appArgs.verbose) {
        logInfo(appArgs.toString)
      }
    //根据SparkSubmitAction的动作进行模式匹配,我们这里是SUBMIT,所以要进入submit的方法
      appArgs.action match {
        case SparkSubmitAction.SUBMIT => submit(appArgs, uninitLog)
        case SparkSubmitAction.KILL => kill(appArgs)
        case SparkSubmitAction.REQUEST_STATUS => requestStatus(appArgs)
        case SparkSubmitAction.PRINT_VERSION => printVersion()
      }
    }
    根据SparkSubmitAction的动作进行模式匹配,进入submit的方法:
    private def submit(args: SparkSubmitArguments, uninitLog: Boolean): Unit = {
    //调用prepareSubmitEnvironment方法,根据传入的解析参数,获取以下四个变量
      val (childArgs, childClasspath, sparkConf, childMainClass) = prepareSubmitEnvironment(args)
      def doRunMain(): Unit = {
        if (args.proxyUser != null) {
          val proxyUser = UserGroupInformation.createProxyUser(args.proxyUser,
            UserGroupInformation.getCurrentUser())
          try {
            proxyUser.doAs(new PrivilegedExceptionAction[Unit]() {
              override def run(): Unit = {
                runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
              }
            })
          } catch {
            case e: Exception =>
              // Hadoop's AuthorizationException suppresses the exception's stack trace, which
              // makes the message printed to the output by the JVM not very helpful. Instead,
              // detect exceptions with empty stack traces here, and treat them differently.
              if (e.getStackTrace().length == 0) {
                error(s"ERROR: ${e.getClass().getName()}: ${e.getMessage()}")
              } else {
                throw e
              }
          }
        } else {
          runMain(childArgs, childClasspath, sparkConf, childMainClass, args.verbose)
        }
      }
    
      // Let the main class re-initialize the logging system once it starts.
      if (uninitLog) {
        Logging.uninitialize()
      }
    
      // In standalone cluster mode, there are two submission gateways:
      //   (1) The traditional RPC gateway using o.a.s.deploy.Client as a wrapper
      //   (2) The new REST-based gateway introduced in Spark 1.3
      // The latter is the default behavior as of Spark 1.3, but Spark submit will fail over
      // to use the legacy gateway if the master endpoint turns out to be not a REST server.
    //如果是StandaloneCluster模式,并且使用REST网关,进入下面这个分支
      if (args.isStandaloneCluster && args.useRest) {
        try {
          logInfo("Running Spark using the REST application submission protocol.")
          doRunMain()
        } catch {
          // Fail over to use the legacy submission gateway
          case e: SubmitRestConnectionException =>
            logWarning(s"Master endpoint ${args.master} was not a REST server. " +
              "Falling back to legacy submission gateway instead.")
            args.useRest = false
            submit(args, false)
        }
      // In all other modes, just run the main class as prepared
      } else {
        doRunMain()
      }
    }
    

    上面实际上是首先准备spark的环境,即调用prepareSubmitEnvironment的方法,进入到这个方法里面:

    private[deploy] def prepareSubmitEnvironment(
        args: SparkSubmitArguments,
        conf: Option[HadoopConfiguration] = None)
        : (Seq[String], Seq[String], SparkConf, String) = {
      // Return values
    //定义了以下这四个需要返回的变量
      val childArgs = new ArrayBuffer[String]()
      val childClasspath = new ArrayBuffer[String]()
      val sparkConf = new SparkConf()
      var childMainClass = ""
    
      // Set the cluster manager
    //设置集群的资源管理器
      val clusterManager: Int = args.master match {
        case "yarn" => YARN
        case "yarn-client" | "yarn-cluster" =>
          logWarning(s"Master ${args.master} is deprecated since 2.0." +
            " Please use master \"yarn\" with specified deploy mode instead.")
          YARN
        case m if m.startsWith("spark") => STANDALONE
        case m if m.startsWith("mesos") => MESOS
        case m if m.startsWith("k8s") => KUBERNETES
        case m if m.startsWith("local") => LOCAL
        case _ =>
          error("Master must either be yarn or start with spark, mesos, k8s, or local")
          -1
      }
    

    prepareSubmitEnvironment这个方法里面,只要做的事情是根据解析的参数,获取集群的部署模式,返回这四个参数:childArgs, childClasspath, sparkConf, childMainClass供后面程序的使用。
    本篇博客是按照Standalone的集群部署模式进行介绍,因此,进入以下代码:

    if (args.isStandaloneCluster) {
      if (args.useRest) {
    //如果使用REST网关,则采用RestSubmissionClientApp的方式提交
        childMainClass = REST_CLUSTER_SUBMIT_CLASS
        childArgs += (args.primaryResource, args.mainClass)
      } else {
    //如果不使用REST的网关,则用ClientApp的方式进行提交。
        // In legacy standalone cluster mode, use Client as a wrapper around the user class
        childMainClass = STANDALONE_CLUSTER_SUBMIT_CLASS
        if (args.supervise) { childArgs += "--supervise" }
        Option(args.driverMemory).foreach { m => childArgs += ("--memory", m) }
        Option(args.driverCores).foreach { c => childArgs += ("--cores", c) }
        childArgs += "launch"
        childArgs += (args.master, args.primaryResource, args.mainClass)
      }
      if (args.childArgs != null) {
        childArgs ++= args.childArgs
      }
    }
    

    上面的代码主要完成,根据是否可以使用REST网关的条件,来匹配不同的提交方式,讨论ClientApp的方式进行提交,这里的childMainClass就是我们自己编写的程序的主函数。回到doRunmain的方法:

    private def runMain(
          childArgs: Seq[String],
          childClasspath: Seq[String],
          sparkConf: SparkConf,
          childMainClass: String,
          verbose: Boolean): Unit = {
        if (verbose) {
          logInfo(s"Main class:\n$childMainClass")
          logInfo(s"Arguments:\n${childArgs.mkString("\n")}")
          // sysProps may contain sensitive information, so redact before printing
          logInfo(s"Spark config:\n${Utils.redact(sparkConf.getAll.toMap).mkString("\n")}")
          logInfo(s"Classpath elements:\n${childClasspath.mkString("\n")}")
          logInfo("\n")
        }
    //创建加载器,用于加载jar包
        val loader =
          if (sparkConf.get(DRIVER_USER_CLASS_PATH_FIRST)) {
            new ChildFirstURLClassLoader(new Array[URL](0),
              Thread.currentThread.getContextClassLoader)
          } else {
            new MutableURLClassLoader(new Array[URL](0),
              Thread.currentThread.getContextClassLoader)
          }
        Thread.currentThread.setContextClassLoader(loader)
    //根据指定的路径加载jar包
        for (jar <- childClasspath) {
          addJarToClasspath(jar, loader)
        }
    
        var mainClass: Class[_] = null
    
        try {
    //获取mainClass
          mainClass = Utils.classForName(childMainClass)
        } catch {
          case e: ClassNotFoundException =>
            logWarning(s"Failed to load $childMainClass.", e)
            if (childMainClass.contains("thriftserver")) {
              logInfo(s"Failed to load main class $childMainClass.")
              logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
            }
            throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
          case e: NoClassDefFoundError =>
            logWarning(s"Failed to load $childMainClass: ${e.getMessage()}")
            if (e.getMessage.contains("org/apache/hadoop/hive")) {
              logInfo(s"Failed to load hive class.")
              logInfo("You need to build Spark with -Phive and -Phive-thriftserver.")
            }
            throw new SparkUserAppException(CLASS_NOT_FOUND_EXIT_STATUS)
        }
    
        val app: SparkApplication = if (classOf[SparkApplication].isAssignableFrom(mainClass)) {
          mainClass.newInstance().asInstanceOf[SparkApplication]
        } else {
          // SPARK-4170
          if (classOf[scala.App].isAssignableFrom(mainClass)) {
            logWarning("Subclasses of scala.App may not work correctly. Use a main() method instead.")
          }
    //创建一个
          new JavaMainApplication(mainClass)
        }
    }
    

    进入JavaMainApplication的方法中

    private[deploy] class JavaMainApplication(klass: Class[_]) extends SparkApplication {
    
      override def start(args: Array[String], conf: SparkConf): Unit = {
    //获取main 方法
        val mainMethod = klass.getMethod("main", new Array[String](0).getClass)
        if (!Modifier.isStatic(mainMethod.getModifiers)) {
          throw new IllegalStateException("The main method in the given main class must be static")
        }
    
        val sysProps = conf.getAll.toMap
        sysProps.foreach { case (k, v) =>
          sys.props(k) = v
        }
    //通过反射机制调用该方法
        mainMethod.invoke(null, args)
      }
    

    通过反射机制调用用户编写的main 函数。
    假如我们采用的是ClientAPP的方式提交,进入到org.apache.spark.deploy.Client:
    进入main函数

    object Client {
      def main(args: Array[String]) {
        // scalastyle:off println
        if (!sys.props.contains("SPARK_SUBMIT")) {
          println("WARNING: This client is deprecated and will be removed in a future version of Spark")
          println("Use ./bin/spark-submit with \"--master spark://host:port\"")
        }
        // scalastyle:on println
    //创建一个ClientAPP
        new ClientApp().start(args, new SparkConf())
      }
    }
    
    private[spark] class ClientApp extends SparkApplication {
    
      override def start(args: Array[String], conf: SparkConf): Unit = {
        val driverArgs = new ClientArguments(args)
    
        if (!conf.contains("spark.rpc.askTimeout")) {
          conf.set("spark.rpc.askTimeout", "10s")
        }
        Logger.getRootLogger.setLevel(driverArgs.logLevel)
    //创建rpcEnv的通信环境
        val rpcEnv =
          RpcEnv.create("driverClient", Utils.localHostName(), 0, conf, new SecurityManager(conf))
    //创建masterEndpoints和ClientEndpoint
        val masterEndpoints = driverArgs.masters.map(RpcAddress.fromSparkURL).
          map(rpcEnv.setupEndpointRef(_, Master.ENDPOINT_NAME))
        rpcEnv.setupEndpoint("client", new ClientEndpoint(rpcEnv, driverArgs, masterEndpoints, conf))
    
        rpcEnv.awaitTermination()
      }
    

    }
    在onStart的方法中:

    override def onStart(): Unit = {
      driverArgs.cmd match {
        case "launch" =>
          // TODO: We could add an env variable here and intercept it in `sc.addJar` that would
          //       truncate filesystem paths similar to what YARN does. For now, we just require
          //       people call `addJar` assuming the jar is in the same directory.
          val mainClass = "org.apache.spark.deploy.worker.DriverWrapper"
    
          val classPathConf = "spark.driver.extraClassPath"
          val classPathEntries = sys.props.get(classPathConf).toSeq.flatMap { cp =>
            cp.split(java.io.File.pathSeparator)
          }
    
          val libraryPathConf = "spark.driver.extraLibraryPath"
          val libraryPathEntries = sys.props.get(libraryPathConf).toSeq.flatMap { cp =>
            cp.split(java.io.File.pathSeparator)
          }
    
          val extraJavaOptsConf = "spark.driver.extraJavaOptions"
          val extraJavaOpts = sys.props.get(extraJavaOptsConf)
            .map(Utils.splitCommandString).getOrElse(Seq.empty)
          val sparkJavaOpts = Utils.sparkJavaOpts(conf)
          val javaOpts = sparkJavaOpts ++ extraJavaOpts
          val command = new Command(mainClass,
            Seq("{{WORKER_URL}}", "{{USER_JAR}}", driverArgs.mainClass) ++ driverArgs.driverOptions,
            sys.env, classPathEntries, libraryPathEntries, javaOpts)
    //封装Driver的信息
          val driverDescription = new DriverDescription(
            driverArgs.jarUrl,
            driverArgs.memory,
            driverArgs.cores,
            driverArgs.supervise,
            command)
    //向Master发送启动Driver的请求
          asyncSendToMasterAndForwardReply[SubmitDriverResponse](
            RequestSubmitDriver(driverDescription))
    
        case "kill" =>
          val driverId = driverArgs.driverId
          asyncSendToMasterAndForwardReply[KillDriverResponse](RequestKillDriver(driverId))
      }
    }
    

    在master接收到Client发送来的启动Driver的信息后,整个作业的提交就完成了。接下来,就是Driver的注册。

    展开全文
  • spark集群的任务提交执行流程

    万次阅读 2018-03-07 20:41:30
    本文转自:https://www.linuxidc.com/Linux/2018-02/150886.htm一、Spark on Standalone1.spark集群启动后,Worker向Master注册信息2.spark-submit命令提交程序后,driver和application也会向Master注册信息3....
  • Spark spark-submit 提交的几种模式

    万次阅读 2018-11-20 09:34:42
    Spark spark-submit 提交的几种模式 包括 local ,yarn-client,yarn-cluster,standlone
  • 关于spark任务提交的几种方式

    千次阅读 2019-04-08 13:25:27
    1.Spark当前支持三种集群管理方式 Standalone—Spark自带的一种集群管理方式,易于构建集群。 Apache Mesos—通用的集群管理,可以在其上运行Hadoop MapReduce和一些服务应用。 Hadoop YARN—Hadoop2中的资源管理器...
  • 1.Driver端启动SparkSubmit进程,启动后开始向Master进行通信,此时创建了一个对象(SparkContext),接着向Master发送任务消息 2.Master接收到任务信息后,开始资源调度,此时会和所有的Worker进行通信,找到空闲的...
  • spark提交

    2016-08-03 10:24:38
    这次主要讲一下spark提交具体操作和流程。原来一直用,也没怎么深入查看,那么这次就来仔细看一下提交的学问。跟我们以前一样,我们以官网下手。这里我不在把英文贴上,直接进行。 在spark的bin文件夹下的spark-...
  • spark提交应用的全流程分析

    千次阅读 2017-08-05 20:08:14
    spark提交应用的全流程分析@(SPARK)[spark]本文分析一下spark的应用通过spark-submit后,如何提交到集群中并开始运行。先介绍一下spark从提交到运行的全流程,下面再详细分析。 1、用户通过spark-submit脚本提交应用...
  • spark提交scala代码

    千次阅读 2019-08-11 22:08:04
    目的:通过spark-submit提交scala代码 scala代码需要先编译才能在spark上跑 工具:sbt 下载地址sbt 正式开始 假设现在的地址是 /home/sparknode/scalacode,则需要先 mkdir -p src/main/scala (路径必须...
  • spark提交命令

    2019-01-30 10:14:07
    Standalone提交命令 ./spark-submit --master spark://node1:7077 --class org.apache.spark.examples.SparkPi ../lib/spark-examples-1.6.0-hadoop2.6.0.jar 10000   YARN提交命令: ./spark-submit --...
  • spark提交作业命令

    2018-07-15 12:15:40
    spark提交作业命令 ./spark-submit --class c01_RDD --master spark://ubuntu:7077 --executor-memory 512M --total-executor-cores 2 /root/app/wc.jar hdfs://localhost:9000/test hdfs://localhost:9000/output....
  • 先看官网提供的两个提交例子(只看集群模式) # Run on a Spark standalone cluster in cluster deploy mode with supervise ./bin/spark-submit \ --class org.apache.spark.examples.SparkPi \ --master spark:...
  • spark提交任务采用yarn集群提交方法

    千次阅读 2018-08-17 16:49:07
    spark提交任务采用yarn集群提交方法 先启动 hadoop集群 再启动yarn 再启动spark集群 spark提交任务命令 spark-submit --class org.apache.spark.examples.SparkPi --master yarn --deploy-mode cluster /home...
  • Spark提交任务的命令

    千次阅读 2018-08-28 19:47:29
    bin/spark-submit --master spark://master.hadoop:7077,slave1.hadoop:7077 --executor-memory 512mb --total-executor-cores 4 --class nuc.sw.test.ScalaWordCount /root/spark-1.0.jar hdfs://master.hadoop:90...
  • Spark提交命令和参数调优

    千次阅读 2019-05-08 10:52:13
    参数意义和参考值: 1.num-executors 线程数:一般设置在50-100之间,必须设置,不然默认启动的executor非常少,不能充分利用集群资源,运行速度慢 2.executor-memory 线程内存:参考值4g-8g,num-executor乘以...
  • spark提交命令详解

    千次阅读 2015-07-16 16:05:58
    本片文章主要结合官网的提交说明进行,详情请看http://spark.apache.org/docs/latest/submitting-applications.html ./bin/spark-submit \ --class --master \ --deploy-mode \ --conf = \ ... # other ...
  • Spark提交代码的两种方式

    千次阅读 2018-08-26 22:18:08
    基于spark1.6测试(虽然很多公司都已经在用2.X了,但是1.6我认为是最经典的版本,CDH最新版本至今默认的spark版本依然是1.6,不过2.X提交方式是基本没有变的) Standalone (1)standalone-client提交任务方式 ...
  • spark 提交参数设置

    千次阅读 2017-06-16 11:23:19
    2.executor-memory 4G~8G num-executors乘以executor-memory,就代表了你的Spark作业申请到的总内存量,这个量是不能超过队列的最大内存量的 3.executor-cores 2~4 4.spark.default.parallelism 用于设置每个stage...

空空如也

1 2 3 4 5 ... 20
收藏数 41,345
精华内容 16,538
关键字:

spark 提交