spark 订阅
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。 展开全文
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。
信息
基    于
MapReduce算法实现的分布式计算
最新版本
2.4.0
外文名
Spark
SPARK基本介绍
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎 [1]  。现在形成一个高速发展应用广泛的生态系统。
收起全文
精华内容
参与话题
问答
  • Spark

    万次阅读 2017-11-14 13:44:49
    应用场景 搭建部署了hadoop环境后,使用MapReduce来进行计算,速度非常慢,因为MapReduce只是分布式批量计算,用于跑批的场景,并不追求速率,因为它需要频繁读写...因为Spark是内存计算,它把计算的中间结果存到了内

    一、运行模式

    spark是基于内存计算的计算框架,性能很强悍,但是它支持单机模式,同时也支持集群模式,它的运行模式有好多种,为了不混淆方便区分,这里进行一些总结。网上总结了,多数为三种,四种,其实真实要细分,spark有六种运行模式,这里给出区分。

    1. local模式【单机】

    Local模式又称为本地模式,运行该模式非常简单,只需要把Spark的安装包解压后,改一些常用的配置即可使用,而不用启动Spark的Master、Worker守护进程( 只有集群的Standalone方式时,才需要这两个角色),也不用启动Hadoop的各服务(除非你要用到HDFS),这是和其他模式的区别。

    运行实例

     # ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master local lib/spark-examples-1.0.0-hadoop2.2.0.jar 
    
     # 注:看到 --master local 就可以确定是单机的local模式了!
    

    这个SparkSubmit进程又当爹、又当妈,既是客户提交任务的Client进程、又是Spark的driver程序、还充当着Spark执行Task的Executor角色。

    2. 本地伪集群运行模式(单机模拟集群)

    这种运行模式,和Local[N]很像,不同的是,它会在单机启动多个进程来模拟集群下的分布式场景,而不像Local[N]这种多个线程只能在一个进程下委屈求全的共享资源。通常也是用来验证开发出来的应用程序逻辑上有没有问题,或者想使用Spark的计算框架而没有太多资源。

    用法是:提交应用程序时使用local-cluster[x,y,z]参数:x代表要生成的executor数,y和z分别代表每个executor所拥有的core和memory数。

     # spark-submit --master local-cluster[2, 3, 1024]
    
     # 上面这条命令代表会使用2个executor进程,每个进程分配3个core和1G的内存,来运行应用程序。
    

    SparkSubmit依然充当全能角色,又是Client进程,又是driver程序,还有点资源管理的作用。生成的两个CoarseGrainedExecutorBackend

    运行该模式依然非常简单,只需要把Spark的安装包解压后,改一些常用的配置即可使用。而不用启动Spark的Master、Worker守护进程( 只有集群的standalone方式时,才需要这两个角色 ),也不用启动Hadoop的各服务(除非你要用到HDFS),这是和其他模式的区别。

    3. standalone模式【集群】

    和单机运行的模式不同,这里必须在执行应用程序前,先启动Spark的Master和Worker守护进程。不用启动Hadoop服务,除非你用到了HDFS的内容。

    运行实例

     # ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://192.168.123.101:7077 lib/spark-examples-1.0.0-hadoop2.2.0.jar 
    
     # 注:看到 --master spark://IP:7077 就可以确定是standalone模式了!
    

    Master进程做为cluster manager,用来对应用程序申请的资源进行管理;SparkSubmit 做为Client端和运行driver程序;CoarseGrainedExecutorBackend 用来并发执行应用程序;

    Standalone模式是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点。其中Driver既可以运行在Master节点上中,也可以运行在本地Client端。当用spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行;当使用spark-submit工具提交Job或者在Eclips、IDEA等开发平台上使用”new SparkConf.setManager(“spark://master:7077”)”方式运行Spark任务时,Driver是运行在本地Client端上的。

    运行流程如下:
    1.SparkContext连接到Master,向Master注册并申请资源(CPU Core 和Memory);
    2.Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上获取资源,然后启动StandaloneExecutorBackend;
    3.StandaloneExecutorBackend向SparkContext注册;
    4.SparkContext将Applicaiton代码发送给StandaloneExecutorBackend;并且SparkContext解析Applicaiton代码,构建DAG图,并提交给DAG Scheduler分解成Stage(当碰到Action操作时,就会催生Job;每个Job中含有1个或多个Stage,Stage一般在获取外部数据和shuffle之前产生),然后以Stage(或者称为TaskSet)提交给Task Scheduler,Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;
    5.StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,直至Task完成。
    6.所有Task完成后,SparkContext向Master注销,释放资源。

    4. on yarn client模式【集群】

    现在越来越多的场景,都是Spark跑在Hadoop集群中,所以为了做到资源能够均衡调度,会使用YARN来做为Spark的Cluster Manager,来为Spark的应用程序分配资源。

    运行实例

     # ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client lib/spark-examples-1.0.0-hadoop2.2.0.jar 
    
     # 注:这里执行方式是--master yarn-client
    

    在执行Spark应用程序前,要启动Hadoop的各种服务。由于已经有了资源管理器,所以不需要启动Spark的Master、Worker守护进程。也就是不需要在spark的sbin目录下执行start-all.sh了

    运行流程如下:
    (1).Spark Yarn Client向YARN的ResourceManager申请启动Application Master。同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend;
    (2).ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派;
    (3).Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);
    (4).一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
    (5).Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
    (6).应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己

    5. on yarn cluster(on-yarn-standalone)模式【集群】

    运行实例

     # ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster lib/spark-examples-1.0.0-hadoop2.2.0.jar 
    
     # 注:这里的执行方式是 --master yarn-cluster
    
    运行模式:
    (1). Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;
    (2). ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;
    (3). ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
    (4). 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等;
    (5). ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
    (6). 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。

    6. mesos模式【集群】

    上面4、5两种,是基于hadoop的yarn来进行资源管理的,这里是采用mesos来进行资源管理,Mesos是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核。Mesos最初是由加州大学伯克利分校的AMPLab开发的,后在Twitter得到广泛使用。Apache Mesos是一个通用的集群管理器,起源于 Google 的数据中心资源管理系统Borg。

    Twitter从Google的Borg系统中得到启发,然后就开发一个类似的资源管理系统来帮助他们摆脱可怕的“失败之鲸”。后来他们注意到加州大学伯克利分校AMPLab正在开发的名为Mesos的项目,这个项目的负责人是Ben Hindman,Ben是加州大学伯克利分校的博士研究生。后来Ben Hindman加入了Twitter,负责开发和部署Mesos。现在Mesos管理着Twitter超过30,0000台服务器上的应用部署,“失败之鲸”已成往事。其他公司纷至沓来,也部署了Mesos,比如Airbnb(空中食宿网)、eBay(电子港湾)和Netflix。

    这块接触不多,一般不太采用!

    附件

    Spark Client 和 Spark Cluster的区别

    理解YARN-Client和YARN-Cluster深层次的区别之前先清楚一个概念:Application Master。在YARN中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。从深层次的含义讲YARN-Cluster和YARN-Client模式的区别其实就是ApplicationMaster进程的区别。
    YARN-Cluster模式下,Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,因而YARN-Cluster模式不适合运行交互类型的作业;
    YARN-Client模式下,Application Master仅仅向YARN请求Executor,Client会和请求的Container通信来调度他们工作,也就是说Client不能离开。

    二、伪分布式

    搭建部署了hadoop环境后,使用MapReduce来进行计算,速度非常慢,因为MapReduce只是分布式批量计算,用于跑批的场景,并不追求速率,因为它需要频繁读写HDFS,并不能实时反馈结果,这种跑批的场景用的还是比较少的。一般客户最想看到的是输入后立马有结果反馈。那此时我们就需要在Hadoop伪分布式集群上部署Spark环境了!因为Spark是内存计算,它把计算的中间结果存到了内存中,不用频繁读取HDFS,做了极大的优化,当然Spark也是今后的潮流,慢慢将取代Hadoop的很多组件,Spark还有一个优势就是,它是天然与Hadoop完美结合的!

    操作步骤

    1. 下载Scala和Spark

    SCALA2.10.4下载地址
    spark1.6.1下载地址

    2. 解压并配置环境变量

    下载解压scala,添加配置环境变量:

     export SCALA_HOME=/opt/scala-2.10.4
     export PATH=$JAVA_HOME/bin$HADOOP_HOME/bin:$HIVE_HOME/bin:$SCALA_HOME/bin:$PATH
    

    下载解压spark,添加配置环境变量:

     export SPARK_HOME=/opt/spark-1.6.1 
     export PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$HIVE_HOME/bin:$PATH
    

    3. 修改spark-env.sh

    进入Spark的配置文件路径,
     # cd $SPARK_HOME/conf  
    在spark-env.sh文件中添加如下配置: 
     export JAVA_HOME=/opt/jdk1.7.0_79
     export SCALA_HOME=/opt/scala-2.10.4
     export HADOOP_CONF_DIR=/opt/hadoop-2.6.0/etc/hadoop
    

    4. 启动spark:

     # cd /opt/spark-1.6.1
     # ./sbin/start-all.sh
    

    5. 验证

    启动完毕,命令行输入jps,如果有master,worker那么就是启动成功

    浏览器访问:http://192.168.208.110:8080


    # ./bin/spark-shell
    浏览器访问:http://192.168.208.110:4040
    访问spark-shell页面


    # ./bin/spark-sql
    通过spark-sql连接hive,访问hive中的数据


    # ./sbin/start-thriftserver.sh
    # ./bin/beeline
    重要,启动后,可以直接使用hive的程序,即HQL执行时默认用spark来进行内存计算

    三、分布式

    当我们安装好Hadoop分布式集群后,默认底层计算是采用MapReduce,速度比较慢,适用于跑批场景,而Spark可以和hadoop完美的融合,Spark提供了更强劲的计算能力,它基于内存计算,速度快,效率高。虽然Spark也支持单机安装,但是这样就不涉及分布式计算,以及分布式存储,如果我们要用Spark集群,那么就需要分布式的hadoop环境,调用hadoop的分布式文件系统,本篇博文来学习分布式Spark的安装部署!

    操作步骤

    1. Scala2.11.6配置

    1.1 下载Scala2.11.6

    Scala2.11.6下载地址,下载scala2.11.6压缩包,上传到主节点的opt目录下

    1.2 解压缩并更换目录

     # cd /opt/
     # tar -xzvf scala-2.11.6.tgz
     # mv scala-2.11.6 scala2.11.6
    

    1.3 配置环境变量

     # vim /etc/profile
    
    export JAVA_HOME=/opt/jdk1.8
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export PATH=$PATH:$JAVA_HOME/bin
    
    export HADOOP_HOME=/opt/hadoop2.6.0
    export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
    
    export HIVE_HOME=/opt/hive2.1.1
    export HIVE_CONF_DIR=$HIVE_HOME/conf
    export CLASSPATH=.:$HIVE_HOME/lib:$CLASSPATH
    export PATH=$PATH:$HIVE_HOME/bin
    
    export SQOOP_HOME=/opt/sqoop1.4.6
    export PATH=$PATH:$SQOOP_HOME/bin
    
    export ZK_HOME=/opt/zookeeper3.4.10
    export PATH=$PATH:$ZK_HOME/bin
    
    export HBASE_HOME=/opt/hbase1.2.6
    export PATH=$PATH:$HBASE_HOME/bin
    
    export SCALA_HOME=/opt/scala2.11.6
    export PATH=$PATH:$SCALA_HOME/bin
    
    #加上最后两行,关于scala的环境变量配置
    
     # source /etc/profile       #使环境变量配置生效
    

    1.4 验证scala配置

     # scala -version
    

    这里写图片描述

    2. Spark1.6.1配置

    2.1 下载Spark1.6.1

    spark1.6.1下载地址,下载spark1.6.1压缩包,上传到主节点的opt目录下

    2.2 解压缩并更换目录

     # cd /opt
     # tar -xzvf spark-1.6.1-bin-hadoop2.6.tgz
     # mv spark-1.6.1-bin-hadoop2.6 spark1.6.1
    

    2.3 配置环境变量

     # vim /etc/profile
    
    export JAVA_HOME=/opt/jdk1.8
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export PATH=$PATH:$JAVA_HOME/bin
    
    export HADOOP_HOME=/opt/hadoop2.6.0
    export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
    
    export HIVE_HOME=/opt/hive2.1.1
    export HIVE_CONF_DIR=$HIVE_HOME/conf
    export CLASSPATH=.:$HIVE_HOME/lib:$CLASSPATH
    export PATH=$PATH:$HIVE_HOME/bin
    
    export SQOOP_HOME=/opt/sqoop1.4.6
    export PATH=$PATH:$SQOOP_HOME/bin
    
    export ZK_HOME=/opt/zookeeper3.4.10
    export PATH=$PATH:$ZK_HOME/bin
    
    export HBASE_HOME=/opt/hbase1.2.6
    export PATH=$PATH:$HBASE_HOME/bin
    
    export SCALA_HOME=/opt/scala2.11.6
    export PATH=$PATH:$SCALA_HOME/bin
    
    export SPARK_HOME=/opt/spark1.6.1
    export PATH=$PATH:$SPARK_HOME/bin
    
    #加上最后两行,关于spark的环境变量配置
    #切记,不要把SPARK_HOME/sbin也配置到PATH中,因为sbin下的命令和hadoop中的sbin下的命令很多相似的,避免冲突,所以执行spark的sbin中的命令,要切换到该目录下再执行
    
     # source /etc/profile       #使环境变量配置生效
    

    3. 修改Spark-env.sh配置文件

     # cd /opt/spark1.6.1/conf/
     # cp spark-env.sh.template   spark-env.sh
     # vim spark-env.sh
     
    export SCALA_HOME=/opt/scala2.11.6
    export JAVA_HOME=/opt/jdk1.8
    export HADOOP_HOME=/opt/hadoop2.6.0
    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
    export SPARK_HOME=/opt/spark1.6.1
    export SPARK_MASTER_IP=hadoop0
    export SPARK_EXECUTOR_MEMORY=4G                 #在末尾添加上述配置
    

    4. 修改slaves配置文件

     # cd /opt/spark1.6.1/conf/
     # cp slaves.template slaves
     # vim slaves
     
    hadoop1
    hadoop2              #删除localhost,添加从节点的两个主机名
    

    5. 将主节点的scala2.11.6,spark1.6.1搬到两个从节点上

     # cd /opt
     
     # scp -r scala2.11.6 root@hadoop1:/opt/
     # scp -r scala2.11.6 root@hadoop2:/opt/
     # scp -r spark1.6.1 root@hadoop1:/opt/
     # scp -r spark1.6.1 root@hadoop2:/opt/
    

    并且修改从节点的环境变量!而且使环境变量生效!

    6. 启动并且验证spark

    注:在运行spark之前,必须确保hadoop在运行中,因为spark集群是依托于hadoop的。

      # cd /opt/spark1.6.1/sbin
      # ./start-all.sh
    

    这里写图片描述

    这里写图片描述

    这里写图片描述

    浏览器访问http://192.168.210.70:8080

    这里写图片描述

    四、注意

    安装部署完完全分布式的spark后,发现yarn-cluster模式可以运行不报错,但是yarn-client报错,无法进行计算PI的值,导致spark并不能使用,报错信息如下所示,只需要修改yarn的配置即可!

    操作方案

     # ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client lib/spark-examples-1.6.1-hadoop2.6.0.jar
    

    1. 报错信息:

    [root@hadoop0 spark1.6.1]# ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client lib/spark-examples-1.6.1-hadoop2.6.0.jar 
    17/11/16 16:04:59 INFO spark.SparkContext: Running Spark version 1.6.1
    17/11/16 16:05:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    17/11/16 16:05:00 INFO spark.SecurityManager: Changing view acls to: root
    17/11/16 16:05:00 INFO spark.SecurityManager: Changing modify acls to: root
    17/11/16 16:05:00 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    17/11/16 16:05:00 INFO util.Utils: Successfully started service 'sparkDriver' on port 56204.
    17/11/16 16:05:00 INFO slf4j.Slf4jLogger: Slf4jLogger started
    17/11/16 16:05:01 INFO Remoting: Starting remoting
    17/11/16 16:05:01 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.210.70:56916]
    17/11/16 16:05:01 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 56916.
    17/11/16 16:05:01 INFO spark.SparkEnv: Registering MapOutputTracker
    17/11/16 16:05:01 INFO spark.SparkEnv: Registering BlockManagerMaster
    17/11/16 16:05:01 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-9e904d0f-0d09-4c9a-b523-86dc52613223
    17/11/16 16:05:01 INFO storage.MemoryStore: MemoryStore started with capacity 511.1 MB
    17/11/16 16:05:01 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    17/11/16 16:05:01 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/11/16 16:05:01 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
    17/11/16 16:05:01 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
    17/11/16 16:05:01 INFO ui.SparkUI: Started SparkUI at http://192.168.210.70:4040
    17/11/16 16:05:01 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-a3aae429-abe4-4bcb-b73e-8fe359aa92d9/httpd-742c21cf-89df-4af2-8b81-431523fe7bfd
    17/11/16 16:05:01 INFO spark.HttpServer: Starting HTTP Server
    17/11/16 16:05:01 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/11/16 16:05:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:37440
    17/11/16 16:05:01 INFO util.Utils: Successfully started service 'HTTP file server' on port 37440.
    17/11/16 16:05:01 INFO spark.SparkContext: Added JAR file:/opt/spark1.6.1/lib/spark-examples-1.6.1-hadoop2.6.0.jar at http://192.168.210.70:37440/jars/spark-examples-1.6.1-hadoop2.6.0.jar with timestamp 1510819501618
    17/11/16 16:05:01 INFO client.RMProxy: Connecting to ResourceManager at hadoop0/192.168.210.70:8032
    17/11/16 16:05:01 INFO yarn.Client: Requesting a new application from cluster with 3 NodeManagers
    17/11/16 16:05:01 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (12288 MB per container)
    17/11/16 16:05:01 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
    17/11/16 16:05:01 INFO yarn.Client: Setting up container launch context for our AM
    17/11/16 16:05:01 INFO yarn.Client: Setting up the launch environment for our AM container
    17/11/16 16:05:01 INFO yarn.Client: Preparing resources for our AM container
    17/11/16 16:05:02 INFO yarn.Client: Uploading resource file:/opt/spark1.6.1/lib/spark-assembly-1.6.1-hadoop2.6.0.jar -> hdfs://hadoop0:9000/user/root/.sparkStaging/application_1510653707211_0005/spark-assembly-1.6.1-hadoop2.6.0.jar
    17/11/16 16:05:04 INFO yarn.Client: Uploading resource file:/tmp/spark-a3aae429-abe4-4bcb-b73e-8fe359aa92d9/__spark_conf__7623958375810260855.zip -> hdfs://hadoop0:9000/user/root/.sparkStaging/application_1510653707211_0005/__spark_conf__7623958375810260855.zip
    17/11/16 16:05:04 INFO spark.SecurityManager: Changing view acls to: root
    17/11/16 16:05:04 INFO spark.SecurityManager: Changing modify acls to: root
    17/11/16 16:05:04 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    17/11/16 16:05:04 INFO yarn.Client: Submitting application 5 to ResourceManager
    17/11/16 16:05:04 INFO impl.YarnClientImpl: Submitted application application_1510653707211_0005
    17/11/16 16:05:06 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:06 INFO yarn.Client: 
    	 client token: N/A
    	 diagnostics: N/A
    	 ApplicationMaster host: N/A
    	 ApplicationMaster RPC port: -1
    	 queue: default
    	 start time: 1510819504598
    	 final status: UNDEFINED
    	 tracking URL: http://hadoop0:8088/proxy/application_1510653707211_0005/
    	 user: root
    17/11/16 16:05:07 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:08 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:09 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:10 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:11 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:12 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:13 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:14 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:15 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=89567902
    java.nio.channels.ClosedChannelException
    17/11/16 16:05:15 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:16 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:17 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:18 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:18 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as 
    17/11/16 16:05:18 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.07211_0005
    17/11/16 16:05:18 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFi
    17/11/16 16:05:19 INFO yarn.Client: Application report for application_1510653707211_0005 (state: RUNNING)
    17/11/16 16:05:19 INFO yarn.Client: 
    	 client token: N/A
    	 diagnostics: N/A
    	 ApplicationMaster host: 192.168.210.71
    	 ApplicationMaster RPC port: 0
    	 queue: default
    	 start time: 1510819504598
    	 final status: UNDEFINED
    	 tracking URL: http://hadoop0:8088/proxy/application_1510653707211_0005/
    	 user: root
    17/11/16 16:05:19 INFO cluster.YarnClientSchedulerBackend: Application application_1510653707211_0005 has s
    17/11/16 16:05:19 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockT
    17/11/16 16:05:19 INFO netty.NettyBlockTransferService: Server created on 60932
    17/11/16 16:05:19 INFO storage.BlockManagerMaster: Trying to register BlockManager
    17/11/16 16:05:19 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.210.70:60932 w
    17/11/16 16:05:19 INFO storage.BlockManagerMaster: Registered BlockManager
    17/11/16 16:05:22 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (h
    17/11/16 16:05:22 INFO storage.BlockManagerMasterEndpoint: Registering block manager hadoop1:35613 with 2.7
    17/11/16 16:05:22 ERROR cluster.YarnClientSchedulerBackend: Yarn application has already exited with state 
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,nul
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,nul
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
    17/11/16 16:05:22 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (h
    17/11/16 16:05:22 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.210.70:4040
    17/11/16 16:05:22 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
    17/11/16 16:05:22 INFO cluster.YarnClientSchedulerBackend: Asking each executor to shut down
    17/11/16 16:05:22 INFO cluster.YarnClientSchedulerBackend: Stopped
    17/11/16 16:05:22 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    17/11/16 16:05:22 INFO storage.BlockManagerMasterEndpoint: Registering block manager hadoop2:51640 with 2.7
    17/11/16 16:05:22 INFO storage.MemoryStore: MemoryStore cleared
    17/11/16 16:05:22 INFO storage.BlockManager: BlockManager stopped
    17/11/16 16:05:22 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event Spa
    17/11/16 16:05:22 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    17/11/16 16:05:22 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoord
    17/11/16 16:05:22 INFO spark.SparkContext: Successfully stopped SparkContext
    17/11/16 16:05:22 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
    17/11/16 16:05:22 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceedin
    17/11/16 16:05:22 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginni
    17/11/16 16:05:22 ERROR spark.SparkContext: Error initializing SparkContext.
    java.lang.NullPointerException
    	at org.apache.spark.SparkContext.<init>(SparkContext.scala:584)
    	at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:29)
    	at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.sc
    	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    17/11/16 16:05:22 INFO spark.SparkContext: SparkContext already stopped.
    Exception in thread "main" java.lang.NullPointerException
    	at org.apache.spark.SparkContext.<init>(SparkContext.scala:584)
    	at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:29)
    	at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.sc
    	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    17/11/16 16:05:22 INFO util.ShutdownHookManager: Shutdown hook called
    17/11/16 16:05:22 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a3aae429-abe4-4bcb-b73e-8fe3
    17/11/16 16:05:22 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
    17/11/16 16:05:22 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a3aae429-abe4-4bcb-b73e-8fe
    

    2. 修改yarn-site.xml配置添加最后两个配置:

     # vim /opt/hadoop2.6.0/etc/hadoop/yarn-site.xml 
     
    <property>
            <name>yarn.resourcemanager.hostname</name>
            <value>hadoop0</value>
       <property>
            <description>The http address of the RM web application.</description>
            <name>yarn.resourcemanager.webapp.address</name>
            <value>${yarn.resourcemanager.hostname}:8088</value>
            <name>yarn.resourcemanager.resource-tracker.address</name>
            <value>${yarn.resourcemanager.hostname}:8031</value>
       </property>
       <property>
            <description>The address of the RM admin interface.</description>
            <name>yarn.resourcemanager.admin.address</name>
            <value>${yarn.resourcemanager.hostname}:8033</value>
       </property>
       <property>
            <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle</value>
       </property>
       <property>
            <name>yarn.scheduler.maximum-allocation-mb</name>
            <value>8182</value>
            <discription>每个节点可用内存,单位MB,默认8182MB</discription>
       </property>
       <property>
                <name>yarn.nodemanager.pmem-check-enabled</name>
                <value>false</value>
        </property>
        <property>
            <name>yarn.nodemanager.vmem-check-enabled</name>
                <value>false</value>
        </property>
    
     # 添加最后两个false的属性即可!
    

    3. 运行成功后的展示:

    [root@hadoop0 spark1.6.1]# ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client lib/spark-examples-1.6.1-hadoop2.6.0.jar 
    17/11/20 11:23:30 INFO spark.SparkContext: Running Spark version 1.6.1
    17/11/20 11:23:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    17/11/20 11:23:30 INFO spark.SecurityManager: Changing view acls to: root
    17/11/20 11:23:30 INFO spark.SecurityManager: Changing modify acls to: root
    17/11/20 11:23:30 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    17/11/20 11:23:30 INFO util.Utils: Successfully started service 'sparkDriver' on port 50890.
    17/11/20 11:23:31 INFO slf4j.Slf4jLogger: Slf4jLogger started
    17/11/20 11:23:31 INFO Remoting: Starting remoting
    17/11/20 11:23:31 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.210.70:43819]
    17/11/20 11:23:31 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 43819.
    17/11/20 11:23:31 INFO spark.SparkEnv: Registering MapOutputTracker
    17/11/20 11:23:31 INFO spark.SparkEnv: Registering BlockManagerMaster
    17/11/20 11:23:31 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-c0631ca3-48c6-45ed-b1bd-c785e7ed4e52
    17/11/20 11:23:31 INFO storage.MemoryStore: MemoryStore started with capacity 511.1 MB
    17/11/20 11:23:31 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    17/11/20 11:23:31 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/11/20 11:23:31 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
    17/11/20 11:23:31 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
    17/11/20 11:23:31 INFO ui.SparkUI: Started SparkUI at http://192.168.210.70:4040
    17/11/20 11:23:31 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-603bd57a-5f73-41dc-84d0-e732cbd37788/httpd-8b6ce293-389a-4564-bedb-8560a3a924d5
    17/11/20 11:23:31 INFO spark.HttpServer: Starting HTTP Server
    17/11/20 11:23:31 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/11/20 11:23:31 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44183
    17/11/20 11:23:31 INFO util.Utils: Successfully started service 'HTTP file server' on port 44183.
    17/11/20 11:23:31 INFO spark.SparkContext: Added JAR file:/opt/spark1.6.1/lib/spark-examples-1.6.1-hadoop2.6.0.jar at http://192.168.210.70:44183/jars/spark-examples-1.6.1-hadoop2.6.0.jar with timestamp 1511148211815
    17/11/20 11:23:31 INFO client.RMProxy: Connecting to ResourceManager at hadoop0/192.168.210.70:8032
    17/11/20 11:23:32 INFO yarn.Client: Requesting a new application from cluster with 3 NodeManagers
    17/11/20 11:23:32 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
    17/11/20 11:23:32 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
    17/11/20 11:23:32 INFO yarn.Client: Setting up container launch context for our AM
    17/11/20 11:23:32 INFO yarn.Client: Setting up the launch environment for our AM container
    17/11/20 11:23:32 INFO yarn.Client: Preparing resources for our AM container
    17/11/20 11:23:32 INFO yarn.Client: Uploading resource file:/opt/spark1.6.1/lib/spark-assembly-1.6.1-hadoop2.6.0.jar -> hdfs://hadoop0:9000/user/root/.sparkStaging/application_1511146953298_0003/spark-assembly-1.6.1-hadoop2.6.0.jar
    17/11/20 11:23:33 INFO yarn.Client: Uploading resource file:/tmp/spark-603bd57a-5f73-41dc-84d0-e732cbd37788/__spark_conf__5627219911217194032.zip -> hdfs://hadoop0:9000/user/root/.sparkStaging/application_1511146953298_0003/__spark_conf__5627219911217194032.zip
    17/11/20 11:23:33 INFO spark.SecurityManager: Changing view acls to: root
    17/11/20 11:23:33 INFO spark.SecurityManager: Changing modify acls to: root
    17/11/20 11:23:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    17/11/20 11:23:33 INFO yarn.Client: Submitting application 3 to ResourceManager
    17/11/20 11:23:33 INFO impl.YarnClientImpl: Submitted application application_1511146953298_0003
    17/11/20 11:23:34 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:34 INFO yarn.Client: 
    	 client token: N/A
    	 diagnostics: N/A
    	 ApplicationMaster host: N/A
    	 ApplicationMaster RPC port: -1
    	 queue: default
    	 start time: 1511148213962
    	 final status: UNDEFINED
    	 tracking URL: http://hadoop0:8088/proxy/application_1511146953298_0003/
    	 user: root
    17/11/20 11:23:36 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:37 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:38 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:39 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:41 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:42 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:43 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:47 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:56 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:57 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:58 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:59 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:24:12 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:24:13 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:24:14 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null)
    17/11/20 11:24:14 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> hadoop0, PROXY_URI_BASES -> http://hadoop0:8088/proxy/application_1511146953298_0003), /proxy/application_1511146953298_0003
    17/11/20 11:24:14 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
    17/11/20 11:24:14 INFO yarn.Client: Application report for application_1511146953298_0003 (state: RUNNING)
    17/11/20 11:24:14 INFO yarn.Client: 
    	 client token: N/A
    	 diagnostics: N/A
    	 ApplicationMaster host: 192.168.210.70
    	 ApplicationMaster RPC port: 0
    	 queue: default
    	 start time: 1511148213962
    	 final status: UNDEFINED
    	 tracking URL: http://hadoop0:8088/proxy/application_1511146953298_0003/
    	 user: root
    17/11/20 11:24:14 INFO cluster.YarnClientSchedulerBackend: Application application_1511146953298_0003 has started running.
    17/11/20 11:24:14 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33528.
    17/11/20 11:24:14 INFO netty.NettyBlockTransferService: Server created on 33528
    17/11/20 11:24:14 INFO storage.BlockManagerMaster: Trying to register BlockManager
    17/11/20 11:24:14 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.210.70:33528 with 511.1 MB RAM, BlockManagerId(driver, 192.168.210.70, 33528)
    17/11/20 11:24:14 INFO storage.BlockManagerMaster: Registered BlockManager
    17/11/20 11:24:15 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
    17/11/20 11:24:15 INFO spark.SparkContext: Starting job: reduce at SparkPi.scala:36
    17/11/20 11:24:15 INFO scheduler.DAGScheduler: Got job 0 (reduce at SparkPi.scala:36) with 2 output partitions
    17/11/20 11:24:15 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:36)
    17/11/20 11:24:15 INFO scheduler.DAGScheduler: Parents of final stage: List()
    17/11/20 11:24:15 INFO scheduler.DAGScheduler: Missing parents: List()
    17/11/20 11:24:15 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32), which has no missing parents
    17/11/20 11:24:15 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1904.0 B, free 1904.0 B)
    17/11/20 11:24:15 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1218.0 B, free 3.0 KB)
    17/11/20 11:24:15 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.210.70:33528 (size: 1218.0 B, free: 511.1 MB)
    17/11/20 11:24:15 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
    17/11/20 11:24:15 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32)
    17/11/20 11:24:15 INFO cluster.YarnScheduler: Adding task set 0.0 with 2 tasks
    17/11/20 11:24:26 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (hadoop1:41774) with ID 2
    17/11/20 11:24:26 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, hadoop1, partition 0,PROCESS_LOCAL, 2157 bytes)
    17/11/20 11:24:26 INFO storage.BlockManagerMasterEndpoint: Registering block manager hadoop1:40640 with 1247.3 MB RAM, BlockManagerId(2, hadoop1, 40640)
    17/11/20 11:24:40 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on hadoop1:40640 (size: 1218.0 B, free: 1247.2 MB)
    17/11/20 11:24:40 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, hadoop1, partition 1,PROCESS_LOCAL, 2157 bytes)
    17/11/20 11:24:40 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 14708 ms on hadoop1 (1/2)
    17/11/20 11:24:40 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 34 ms on hadoop1 (2/2)
    17/11/20 11:24:40 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:36) finished in 24.994 s
    17/11/20 11:24:40 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    17/11/20 11:24:40 INFO scheduler.DAGScheduler: Job 0 finished: reduce at SparkPi.scala:36, took 25.166251 s
    Pi is roughly 3.14648
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
    17/11/20 11:24:40 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.210.70:4040
    17/11/20 11:24:40 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
    17/11/20 11:24:40 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
    17/11/20 11:24:40 INFO cluster.YarnClientSchedulerBackend: Asking each executor to shut down
    17/11/20 11:24:41 INFO cluster.YarnClientSchedulerBackend: Stopped
    17/11/20 11:24:41 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    17/11/20 11:24:41 INFO storage.MemoryStore: MemoryStore cleared
    17/11/20 11:24:41 INFO storage.BlockManager: BlockManager stopped
    17/11/20 11:24:41 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    17/11/20 11:24:41 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    17/11/20 11:24:41 INFO spark.SparkContext: Successfully stopped SparkContext
    17/11/20 11:24:41 INFO util.ShutdownHookManager: Shutdown hook called
    17/11/20 11:24:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-603bd57a-5f73-41dc-84d0-e732cbd37788
    17/11/20 11:24:41 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
    17/11/20 11:24:41 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
    17/11/20 11:24:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-603bd57a-5f73-41dc-84d0-e732cbd37788/httpd-8b6ce293-389a-4564-bedb-8560a3a924d5
    

    4. 以下关于RPC的错误也可以通过上述方案,解决:

    17/11/20 10:43:02 INFO spark.SparkContext: Running Spark version 1.6.1
    17/11/20 10:43:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    17/11/20 10:43:02 INFO spark.SecurityManager: Changing view acls to: root
    17/11/20 10:43:02 INFO spark.SecurityManager: Changing modify acls to: root
    17/11/20 10:43:02 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    17/11/20 10:43:03 INFO util.Utils: Successfully started service 'sparkDriver' on port 43103.
    17/11/20 10:43:03 INFO slf4j.Slf4jLogger: Slf4jLogger started
    17/11/20 10:43:03 INFO Remoting: Starting remoting
    17/11/20 10:43:03 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.210.70:54479]
    17/11/20 10:43:03 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 54479.
    17/11/20 10:43:03 INFO spark.SparkEnv: Registering MapOutputTracker
    17/11/20 10:43:03 INFO spark.SparkEnv: Registering BlockManagerMaster
    17/11/20 10:43:03 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-115d1d9d-efd2-4b77-a275-071bb880d596
    17/11/20 10:43:03 INFO storage.MemoryStore: MemoryStore started with capacity 511.1 MB
    17/11/20 10:43:03 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    17/11/20 10:43:04 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/11/20 10:43:04 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
    17/11/20 10:43:04 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
    17/11/20 10:43:04 INFO ui.SparkUI: Started SparkUI at http://192.168.210.70:4040
    17/11/20 10:43:04 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-1a7d680b-c815-4c0d-b706-9751f5f1b57a/httpd-e8d7be01-495d-403b-a7a4-1332d9ae2411
    17/11/20 10:43:04 INFO spark.HttpServer: Starting HTTP Server
    17/11/20 10:43:04 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/11/20 10:43:04 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:36186
    17/11/20 10:43:04 INFO util.Utils: Successfully started service 'HTTP file server' on port 36186.
    17/11/20 10:43:05 INFO spark.SparkContext: Added JAR file:/opt/spark1.6.1/lib/spark-examples-1.6.1-hadoop2.6.0.jar at http://192.168.210.70:36186/jars/spark-examples-1.6.1-hadoop2.6.0.jar with timestamp 1511145785204
    17/11/20 10:43:05 INFO client.RMProxy: Connecting to ResourceManager at hadoop0/192.168.210.70:8032
    17/11/20 10:43:05 INFO yarn.Client: Requesting a new application from cluster with 3 NodeManagers
    17/11/20 10:43:05 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (12288 MB per container)
    17/11/20 10:43:05 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
    17/11/20 10:43:05 INFO yarn.Client: Setting up container launch context for our AM
    17/11/20 10:43:05 INFO yarn.Client: Setting up the launch environment for our AM container
    17/11/20 10:43:05 INFO yarn.Client: Preparing resources for our AM container
    17/11/20 10:43:06 INFO yarn.Client: Uploading resource file:/opt/spark1.6.1/lib/spark-assembly-1.6.1-hadoop2.6.0.jar -> hdfs://hadoop0:9000/user/root/.sparkStaging/application_1510653707211_0009/spark-assembly-1.6.1-hadoop2.6.0.jar
    17/11/20 10:43:07 INFO yarn.Client: Uploading resource file:/tmp/spark-1a7d680b-c815-4c0d-b706-9751f5f1b57a/__spark_conf__910020831153605384.zip -> hdfs://hadoop0:9000/user/root/.sparkStaging/application_1510653707211_0009/__spark_conf__910020831153605384.zip
    17/11/20 10:43:07 INFO spark.SecurityManager: Changing view acls to: root
    17/11/20 10:43:07 INFO spark.SecurityManager: Changing modify acls to: root
    17/11/20 10:43:07 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    17/11/20 10:43:07 INFO yarn.Client: Submitting application 9 to ResourceManager
    17/11/20 10:43:08 INFO impl.YarnClientImpl: Submitted application application_1510653707211_0009
    17/11/20 10:43:10 INFO yarn.Client: Application report for application_1510653707211_0009 (state: ACCEPTED)
    17/11/20 10:43:10 INFO yarn.Client: 
    	 client token: N/A
    	 diagnostics: N/A
    	 ApplicationMaster host: N/A
    	 ApplicationMaster RPC port: -1
    	 queue: default
    	 start time: 1511145788138
    	 final status: UNDEFINED
    	 tracking URL: http://hadoop0:8088/proxy/application_1510653707211_0009/
    	 user: root
    17/11/20 10:43:13 INFO yarn.Client: Application report for application_1510653707211_0009 (state: ACCEPTED)
    17/11/20 10:43:15 INFO yarn.Client: Application report for application_1510653707211_0009 (state: ACCEPTED)
    17/11/20 10:43:16 INFO yarn.Client: Application report for application_1510653707211_0009 (state: ACCEPTED)
    17/11/20 10:43:17 INFO yarn.Client: Application report for application_1510653707211_0009 (state: ACCEPTED)
    17/11/20 10:43:18 INFO yarn.Client: Application report for application_1510653707211_0009 (state: ACCEPTED)
    17/11/20 10:43:18 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null)
    17/11/20 10:43:18 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> hadoop0, PROXY_URI_BASES -> http://hadoop0:8088/proxy/application_1510653707211_0009), /proxy/application_1510653707211_0009
    17/11/20 10:43:18 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
    17/11/20 10:43:19 INFO yarn.Client: Application report for application_1510653707211_0009 (state: RUNNING)
    17/11/20 10:43:19 INFO yarn.Client: 
    	 client token: N/A
    	 diagnostics: N/A
    	 ApplicationMaster host: 192.168.210.70
    	 ApplicationMaster RPC port: 0
    	 queue: default
    	 start time: 1511145788138
    	 final status: UNDEFINED
    	 tracking URL: http://hadoop0:8088/proxy/application_1510653707211_0009/
    	 user: root
    17/11/20 10:43:19 INFO cluster.YarnClientSchedulerBackend: Application application_1510653707211_0009 has started running.
    17/11/20 10:43:19 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44148.
    17/11/20 10:43:19 INFO netty.NettyBlockTransferService: Server created on 44148
    17/11/20 10:43:19 INFO storage.BlockManagerMaster: Trying to register BlockManager
    17/11/20 10:43:19 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.210.70:44148 with 511.1 MB RAM, BlockManagerId(driver, 192.168.210.70, 44148)
    17/11/20 10:43:19 INFO storage.BlockManagerMaster: Registered BlockManager
    17/11/20 10:43:22 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (hadoop1:45801) with ID 2
    17/11/20 10:43:22 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (hadoop2:32964) with ID 1
    17/11/20 10:43:22 INFO storage.BlockManagerMasterEndpoint: Registering block manager hadoop1:52352 with 2.7 GB RAM, BlockManagerId(2, hadoop1, 52352)
    17/11/20 10:43:22 INFO storage.BlockManagerMasterEndpoint: Registering block manager hadoop2:45228 with 2.7 GB RAM, BlockManagerId(1, hadoop2, 45228)
    17/11/20 10:43:22 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
    17/11/20 10:43:23 INFO spark.SparkContext: Starting job: reduce at SparkPi.scala:36
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Got job 0 (reduce at SparkPi.scala:36) with 2 output partitions
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:36)
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Parents of final stage: List()
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Missing parents: List()
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32), which has no missing parents
    17/11/20 10:43:23 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1904.0 B, free 1904.0 B)
    17/11/20 10:43:23 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1218.0 B, free 3.0 KB)
    17/11/20 10:43:23 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.210.70:44148 (size: 1218.0 B, free: 511.1 MB)
    17/11/20 10:43:23 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32)
    17/11/20 10:43:23 INFO cluster.YarnScheduler: Adding task set 0.0 with 2 tasks
    17/11/20 10:43:23 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, hadoop2, partition 0,PROCESS_LOCAL, 2157 bytes)
    17/11/20 10:43:23 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, hadoop1, partition 1,PROCESS_LOCAL, 2157 bytes)
    17/11/20 10:43:23 INFO cluster.YarnClientSchedulerBackend: Disabling executor 1.
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Executor lost: 1 (epoch 0)
    17/11/20 10:43:23 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster.
    17/11/20 10:43:23 ERROR client.TransportClient: Failed to send RPC 6494801080030835916 to hadoop0/192.168.210.70:55463: java.nio.channels.ClosedChannelException
    java.nio.channels.ClosedChannelException
    17/11/20 10:43:23 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, hadoop2, 45228)
    17/11/20 10:43:23 INFO storage.BlockManagerMaster: Removed 1 successfully in removeExecutor
    17/11/20 10:43:23 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to get executor loss reason for executor id 1 at RPC address hadoop2:32964, but got no response. Marking as slave lost.
    java.io.IOException: Failed to send RPC 6494801080030835916 to hadoop0/192.168.210.70:55463: java.nio.channels.ClosedChannelException
    	at org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
    	at org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
    	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
    	at io.netty.util.concurrent.DefaultPromise$LateListeners.run(DefaultPromise.java:845)
    	at io.netty.util.concurrent.DefaultPromise$LateListenerNotifier.run(DefaultPromise.java:873)
    	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: java.nio.channels.ClosedChannelException
    17/11/20 10:43:23 ERROR cluster.YarnScheduler: Lost executor 1 on hadoop2: Slave lost
    17/11/20 10:43:23 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hadoop2): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Slave lost
    17/11/20 10:43:24 INFO cluster.YarnClientSchedulerBackend: Disabling executor 2.
    17/11/20 10:43:24 INFO scheduler.DAGScheduler: Executor lost: 2 (epoch 1)
    17/11/20 10:43:24 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
    17/11/20 10:43:24 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, hadoop1, 52352)
    17/11/20 10:43:24 INFO storage.BlockManagerMaster: Removed 2 successfully in removeExecutor
    17/11/20 10:43:24 ERROR client.TransportClient: Failed to send RPC 6980255577157578925 to hadoop0/192.168.210.70:55463: java.nio.channels.ClosedChannelException
    java.nio.channels.ClosedChannelException
    17/11/20 10:43:24 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to get executor loss reason for executor id 2 at RPC address hadoop1:45801, but got no response. Marking as slave lost.
    java.io.IOException: Failed to send RPC 6980255577157578925 to hadoop0/192.168.210.70:55463: java.nio.channels.ClosedChannelException
    	at org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
    	at org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
    	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
    	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
    	at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
    	at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
    	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
    	at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
    	at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
    	at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
    	at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
    	at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
    	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: java.nio.channels.ClosedChannelException
    17/11/20 10:43:24 ERROR cluster.YarnScheduler: Lost executor 2 on hadoop1: Slave lost
    17/11/20 10:43:24 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, hadoop1): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Slave lost
    17/11/20 10:43:25 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null)
    17/11/20 10:43:25 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> hadoop0, PROXY_URI_BASES -> http://hadoop0:8088/proxy/application_1510653707211_0009), /proxy/application_1510653707211_0009
    17/11/20 10:43:25 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
    17/11/20 10:43:29 ERROR cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
    17/11/20 10:43:29 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.210.70:4040
    17/11/20 10:43:29 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:36) failed in 5.680 s
    17/11/20 10:43:29 INFO scheduler.DAGScheduler: Job 0 failed: reduce at SparkPi.scala:36, took 5.884625 s
    17/11/20 10:43:29 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@29bb1d25)
    Exception in thread "main" org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down
    	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806)
    	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804)
    	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804)
    	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658)
    	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
    	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581)
    	at org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1740)
    	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
    	at org.apache.spark.SparkContext.stop(SparkContext.scala:1739)
    	at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:147)
    	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
    	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
    	at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:36)
    	at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    17/11/20 10:43:29 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(0,1511145809112,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down))
    17/11/20 10:43:29 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
    17/11/20 10:43:29 INFO cluster.YarnClientSchedulerBackend: Asking each executor to shut down
    17/11/20 10:43:29 INFO storage.DiskBlockManager: Shutdown hook called
    17/11/20 10:43:29 INFO cluster.YarnClientSchedulerBackend: Stopped
    17/11/20 10:43:29 INFO util.ShutdownHookManager: Shutdown hook called
    17/11/20 10:43:29 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-1a7d680b-c815-4c0d-b706-9751f5f1b57a/httpd-e8d7be01-495d-403b-a7a4-1332d9ae2411
    17/11/20 10:43:29 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    17/11/20 10:43:29 INFO storage.MemoryStore: MemoryStore cleared
    17/11/20 10:43:29 INFO storage.BlockManager: BlockManager stopped
    17/11/20 10:43:29 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    17/11/20 10:43:29 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    17/11/20 10:43:29 INFO spark.SparkContext: Successfully stopped SparkContext
    17/11/20 10:43:29 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-1a7d680b-c815-4c0d-b706-9751f5f1b57a/userFiles-e796cf1a-3942-44d1-a8cc-68295e623b03
    17/11/20 10:43:29 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-1a7d680b-c815-4c0d-b706-9751f5f1b57a
    
    展开全文
  • Spark学习笔记:Spark基础

    千次阅读 2018-09-03 23:39:57
    Spark基础以及WordCount实现

    目录

     

    Spark基础

    1.Spark基础入门

    (1)什么是Spark

    (2)Spark生态圈

    (3)Spark的特点与MapReduce对比

    2.Spark体系结构与安装部署

    (1)Spark体系结构

    (2)Spark的安装与部署

    (3)Spark HA的实现

    3.执行Spark Demo

    (1)Spark-submit

    (2)Spark-shell

    (3)Spark实现WordCount

    (4)Spark WordCount的Java版本

    (5)Spark WordCount的Scala版本

    4.Spark运行机制及原理分析


    Spark基础

    1.Spark基础入门

    (1)什么是Spark

    Spark是用于大规模数据处理的统一分析引擎

    (2)Spark生态圈

               Spark Core:内核
               Spark SQL:用于处理结构化数据的组件,类似Hive
               Spark Streaming:用于处理流式数据的组件,类似Storm
               Spark MLLib:机器学习
               Spark Graphx: 图计算

    (3)Spark的特点与MapReduce对比

    Spark的特点
    1.基于内存,所以速度快,但同时也是缺点,因为Spark没有对内存进行管理,容易OOM(out of memory内存溢出),可以用Java Heap Dump对内存溢出问题进行分析
    2.可以使用Scala、Java、Python、R等语言进行开发
    3.兼容Hadoop

    Spark与MapReuce对比
    1.MapReduce最大的缺点,Shuffle过程中会有很多I/O开销,可以看到这里有6个地方会产生IO,而Spark只会在1和6的地方产生I/O,其他的过程都在内存中进行

    2.Spark是MapReduce的替代方案,兼容Hive、HDFS、融入到Hadoop

    2.Spark体系结构与安装部署

    (1)Spark体系结构

    1.主从架构:存在单点故障的问题,因此需要实现HA
    2.Spark体系结构图

    Driver Program可以理解为是客户端,而右边的可以理解为服务器端。 Cluster Manager是主节点,主节点并不负责真正任务的执行,任务的执行由Worker Node完成。
    这是一张更详细的架构图

    如果要搭建全分布模式,至少需要两个worker
    要实现HA的话,则必须借助ZooKeeper

    (2)Spark的安装与部署

    Spark伪分布模式的部署

    解压 tar -zxvf spark-2.2.0-bin-hadoop2.6.tgz -C ~/training/
    注意:由于Hadoop和Spark的脚本有冲突,设置环境变量的时候,只能设置一个
    核心配置文件:  conf/spark-env.sh
             export JAVA_HOME=/root/training/jdk1.8.0_144
             export SPARK_MASTER_HOST=bigdata111
             export SPARK_MASTER_PORT=7077      
    从节点的地址:slaves文件中填入主机名即可,注意hosts文件里要有对ip的解析
    启动Spark集群  sbin/start-all.sh,这里我个人是给这个文件做了一个软链接start-spark.sh,因为hadoop下的启动脚本也是start-all.sh,会有冲突
    Web界面:主机名:8080

    Spark全分布模式的部署

    全分布式的部署与伪分布式类似,在每个节点上都解压压缩包,修改conf/spark-env.sh
    在主节点上的slaves文件中填入从节点的主机名
    然后在每个节点上启动集群即可

    (3)Spark HA的实现

    1.基于文件系统的单点恢复

    适用于开发和测试环境                                                                                                                                                      
    恢复目录:保存集群的运行信息                                                                                                                                               
    在spark-env.sh中 增加                                                                                                                                          

    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.2.0-bin-hadoop2.6/recovery"

    如果有运行的任务,任务信息就会被写入到恢复目录下
    当节点宕掉重启之后, Spark就可以从恢复目录中的文件获取之前的状态并进行恢复

    2.基于zookeeper实现Standby Master

    zookeeper的功能:数据同步、选举的功能、分布式锁(秒杀)
    启动zookeeper,运行zkServer.sh,然后会选举出zookeeper集群的leader和follower,节点状态可以通过zkServer.sh status查看

    zookeeper数据同步功能
    启动zookeeper后,在随意一个节点的zkCli.sh(即zk shell)中输入create /node001 helloworld
    在其他节点的shell中get /node001都可以看得见这个虚数据helloworld

    zookeeper选举功能
    每个zookeeper集群都会有一个leader,其他都是follower,当leader节点宕机了,其他的follower会选举出leader

    zookeeper实现Standby Master
    在spark-env.sh中增加

    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata112:2181,bigdata113:2181,bigdata114:2181 -Dspark.deploy.zookeeper.dir=/mysparkHA"

    其中Dspark.deploy.zookeeper.url参数是zookeeper集群每个节点的地址,之前有提到zookeeper需要有三个节点以上

    注释下面的两行
    #export SPARK_MASTER_HOST=bigdata112
    #export SPARK_MASTER_PORT=7077
    配置好后在两台机器上启动spark-master,在web界面上就会发现一个是ALIVE,一个是StandBy

    3.执行Spark Demo

    (1)Spark-submit

    Spark-submit可以提交任务到Spark集群执行,也可以提交到hadoop的yarn集群执行

    这里运行了一个蒙特卡罗求圆周率的Demo,运行1000次

    spark-submit --master spark://centos:7077 --class org.apache.spark.examples.SparkPi /opt/software/spark-2.2.0-bin-hadoop2.6/examples/jars/spark-examples_2.11-2.2.0.jar 1000

    --master后面跟spark的地址,然后用--class指定类和jar包,以及运行次数

    (2)Spark-shell

    Spark-shell是Spark自带的交互式程序,方便用户进行交互式变成,用户可以在该命令行下用scala编写spark程序。
    Spark-shell有两种运行模式:本地模式和集群模式
    本地模式:不连接到集群,在本地直接执行Spark任务(local模式)
    直接运行spark-shell

    集群模式:连接到集群,在集群执行任务
    集群模式下的shell将作为一个独立的Application链接到Master上

    运行spark-shell --master spark://centos:7077

    Spark的Web上可以看见

    (3)Spark实现WordCount

    Spark可以集成到HDFS,读取HDFS里的文件

    先做一个测试文件data.txt,上传到HDFS上

    执行WordCount

    进行单步分析

    可以看到一个String类型的RDD,用来存储文本信息,但这个时候并不会真正的执行

    执行rdd1.collect之后,才会真正的执行,获取文本文件里的字符串,放进RDD里

    flatmap_是表示rdd1里的每个元素,然后使用split方法,间隔符是空格,同样的,要执行collect才算真正执行

    map((_,1))是把元素里的每个元素都映射成了(word,1)的kv对,这个语法糖等价于下面这条语句

    reduceByKey方法是使用一个相关的函数来合并每个key的value的值的一个算子,前面的下划线可以理解为sum,用来迭代计算和,后面的下划线是每个kv对的value

    总结:RDD就是一个集合,存在依赖关系,RDD有些方法不会触发计算,有些会触发计算

    (4)Spark WordCount的Java版本

    新建Java Project,名为ZSparkDemo,然后在project下新建folder,名为lib,然后把/opt/software/spark-2.2.0-bin-hadoop2.6/jars下的jar包复制到lib文件夹里

    代码与注释如下

    package demo;
    
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    /*
     * 打包成jar包上传到集群环境后,使用spark-submit提交任务
     * spark-submit --master spark://centos:7077 --class demo.JavaWordCount /WordCount.jar hdfs://10.1.130.233:9000/input/data.txt
     */
    public class JavaWordCount {
    
    	public static void main(String[] args) {
    
    		// 配置参数,setAppName方法指定app的名字
    		// setMaster方法用以设定Master的URL,设为local就会在本地以单线程运行
    		// local[4]就会在本地以4核运行,设为"spark://master:7077就会在独立集群上运行,或者不写就会默认在集群运行
    		//SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
    		SparkConf conf = new SparkConf().setAppName("JavaWordCount");
    
    		// 创建一个SparkContext对象
    		JavaSparkContext sc = new JavaSparkContext(conf);
    
    		// 指定路径,读入数据,路径可以是本地路径,也可以是HDFS上的
    		// 这个方法返回的是一个Java的RDD,类型是String
    		//JavaRDD<String> datas = sc.textFile("hdfs://10.1.130.233:9000/input/data.txt");
    		//这里可以不把路径写死,而是将args传入的第一个参数作为路径
    		JavaRDD<String> datas = sc.textFile(args[0]);
    
    		// 分词
    		// 这里需要实现FlatMapFunction接口,表示要对每个传入的文本所要执行的操作FlatMapFunction<String, U>
    		// 把U改成String,第一个String代表输入的文本,第二个String表示分词后的每个单词
    		JavaRDD<String> words = datas.flatMap(new FlatMapFunction<String, String>() {
    
    			@Override
    			// line表示每一行传入的数据
    			public Iterator<String> call(String line) throws Exception {
    				// 因为split完之后,返回的是一个String类型的数组,所以要用Arrays的asList方法转换成是一个List,然后才能用iterator
    				return Arrays.asList(line.split(" ")).iterator();
    			}
    
    		});
    
    		// 每个单词记一次数map((单词,1)
    		// 这里需要实现PairFunction接口,PairFunction<String, K2, V2>
    		// String代表传入的参数,K2,V2相当于MapReduce里Map的输出(Beijing,1),所以Key是String类型,V是Integer类型
    		JavaPairRDD<String, Integer> wordOne = words.mapToPair(new PairFunction<String, String, Integer>() {
    
    			@Override
    			public Tuple2<String, Integer> call(String word) throws Exception {
    				// Beijing --->(Beijing,1)
    				return new Tuple2<String, Integer>(word, 1);
    			}
    
    		});
    
    		// 执行Reduce的操作,把相同单词的value做求和
    		// Function2<Integer, Integer,
    		// Integer>,前面两个Integer表示:两个key相同的value,最后一个Integer表示运算的结果
    		JavaPairRDD<String, Integer> count = wordOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
    
    			@Override
    			public Integer call(Integer a, Integer b) throws Exception {
    				// TODO Auto-generated method stub
    				return a + b;
    			}
    		});
    
    		// 触发计算
    		List<Tuple2<String, Integer>> result = count.collect();
    
    		// 输出到Console
    		for (Tuple2<String, Integer> r : result) {
    			System.out.println(r._1 + ":" + r._2);
    		}
    
    		// 停止SparkContext对象
    		sc.stop();
    	}
    
    }
    

    打包成jar包后上传到集群环境, 通过spark-submit提交到集群运行

    spark-submit --master spark://centos:7077 --class demo.JavaWordCount /WordCount.jar hdfs://10.1.130.233:9000/input/data.txt

    在集群上运行结果如下 


    在Spark WebUI

    (5)Spark WordCount的Scala版本

    package demo
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]): Unit = {
        
        //获取Spark配置,setAppName方法用来设置app的名字,setMaster设为local则为在本地运行不提交到集群
        //val conf = new SparkConf().setAppName("WordCount").setMaster("local")
        val conf=new SparkConf().setAppName("WordCount")
        
        //获取SparkContext
        val sc = new SparkContext(conf)
    
        //textFile指定路径,然后做分词,转换成kv对,再reduceByKey做统计处理
        //val count=sc.textFile("hdfs://centos:9000/input/data.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        val count=sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
        
        //将结果保存到HDFS目录下,repartition方法设定只返回一个RDD,saveAsTextFile设定结果保存的地址
        count.repartition(1).saveAsTextFile(args(1))
        
        //触发计算
        val result=count.collect()
        
        //输出结果
        result.foreach(println)
        
        //停止SparkContext对象
        sc.stop()
      }
    }

     执行语句

    spark-submit --master spark://centos:7077 --class demo.WordCount /WordCount.jar hdfs://centos:9000/input/data.txt hdfs://centos:9000/output/result

    运行结果
     在HDFS里也可以看到生成的文件

    WordCount流程分析图

    4.Spark运行机制及原理分析

    展开全文
  • Scala基础入门教程,解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器...
  • Spark Core

    万次阅读 2018-01-17 08:33:11
    由于 Spark 基于内存设计,使得它拥有比 Hadoop 更高的性能(极端情况下可以达到 100x),并且对多语言(Scala、Java、Python)提供支持。其一栈式的设计特点使得我们的学习和维护成本大大地减少,而且其提供了很好...

    一、Spark Core

    Apache Spark 是加州大学伯克利分校的 AMP Labs 开发的开源分布式轻量级通用计算框架。由于 Spark 基于内存设计,使得它拥有比 Hadoop 更高的性能(极端情况下可以达到 100x),并且对多语言(Scala、Java、Python)提供支持。其一栈式的设计特点使得我们的学习和维护成本大大地减少,而且其提供了很好的容错解决方案。

    操作步骤

    1. 主要功能

    Spark Core提供Spark最基础与最核心的功能,主要包括以下功能:
    (1)、SparkContext:通常而言,Driver Application的执行与输出都是通过SparkContext来完成的。在正式提交Application之前,首先需要初始化SparkContext。SparkContext隐藏了网络通信、分布式部署、消息通信、存储能力、计算能力、缓存、测量系统、文件服务、Web服务等内容,应用程序开发者只需要使用SparkContext提供的API完成功能开发。SparkContext内置的DAGScheduler负责创建Job,将DAG中的RDD划分到不同的Stage,提交Stage等功能。内置的TaskScheduler负责资源的申请,任务的提交及请求集群对任务的调度等工作。
    (2)、存储体系:Spark优先考虑使用各节点的内存作为存储,当内存不足时才会考虑使用磁盘,这极大地减少了磁盘IO,提升了任务执行的效率,使得Spark适用于实时计算、流式计算等场景。此外,Spark还提供了以内存为中心的高容错的分布式文件系统Tachyon供用户进行选择。Tachyon能够为Spark提供可靠的内存级的文件共享服务。
    (3)、计算引擎:计算引擎由SparkContext中的DAGScheduler、RDD以及具体节点上的Executor负责执行的Map和Reduce任务组成。DAGScheduler和RDD虽然位于SparkContext内部,但是在任务正式提交与执行之前会将Job中的RDD组织成有向无环图(DAG),并对Stage进行划分,决定了任务执行阶段任务的数量、迭代计算、shuffle等过程。
    (4)、部署模式:由于单节点不足以提供足够的存储和计算能力,所以作为大数据处理的Spark在SparkContext的TaskScheduler组件中提供了对Standalone部署模式的实现和Yarn、Mesos等分布式资源管理系统的支持。通过使用Standalone、Yarn、Mesos等部署模式为Task分配计算资源,提高任务的并发执行效率。

    2. Spark Core子框架

    Spark的几大子框架包括:
    (1)、Spark SQL:首先使用SQL语句解析器(SqlParser)将SQL转换为语法树(Tree),并且使用规则执行器(RuleExecutor)将一系列规则(Rule)应用到语法树,最终生成物理执行计划并执行。其中,规则执行器包括语法分析器(Analyzer)和优化器(Optimizer)。
    (2)、Spark Streaming:用于流式计算。Spark Streaming支持Kafka、Flume、Twitter、MQTT、ZeroMQ、Kinesis和简单的TCP套接字等多种数据输入源。输入流接收器(Receiver)负责接入数据,是接入数据流的接口规范。Dstream是Spark Streaming中所有数据流的抽象,Dstream可以被组织为Dstream Graph。Dstream本质上由一系列连续的RDD组成。
    (3)、GraphX:Spark提供的分布式图计算框架。GraphX主要遵循整体同步并行(bulk Synchronous parallel,BSP)计算模式下的Pregel模型实现。GraphX提供了对图的抽象Graph,Graph由顶点(Vertex),边(Edge)及继承了Edge的EdgeTriplet三种结构组成。GraphX目前已经封装了最短路径,网页排名,连接组件,三角关系统计等算法的实现,用户可以选择使用。
    (4)、MLlib:Spark提供的机器学习框架。机器学习是一门设计概率论、统计学、逼近论、凸分析、算法复杂度理论等多领域的交叉学科。MLlib目前已经提供了基础统计、分析、回归、决策树、随机森林、朴素贝叶斯、保序回归、协同过滤、聚类、维数缩减、特征提取与转型、频繁模式挖掘、预言模型标记语言、管道等多种数理统计、概率论、数据挖掘方面的数学算法。

    3. Spark架构

    Spark采用了分布式计算中的Master-Slave模型。Master作为整个集群的控制器,负责整个集群的正常运行;Worker是计算节点,接受主节点命令以及进行状态汇报;Executor负责任务(Tast)的调度和执行;Client作为用户的客户端负责提交应用;Driver负责控制一个应用的执行。

    这里写图片描述

    Spark集群启动时,需要从主节点和从节点分别启动Master进程和Worker进程,对整个集群进行控制。在一个Spark应用的执行过程中,Driver是应用的逻辑执行起点,运行Application的main函数并创建SparkContext,DAGScheduler把对Job中的RDD有向无环图根据依赖关系划分为多个Stage,每一个Stage是一个TaskSet, TaskScheduler把Task分发给Worker中的Executor;Worker启动Executor,Executor启动线程池用于执行Task。

    这里写图片描述

    4. Spark计算模型

    RDD:弹性分布式数据集,是一种内存抽象,可以理解为一个大数组,数组的元素是RDD的分区Partition,分布在集群上;在物理数据存储上,RDD的每一个Partition对应的就是一个数据块Block,Block可以存储在内存中,当内存不够时可以存储在磁盘上。

    这里写图片描述
    RDD逻辑物理结构

    Hadoop将Mapreduce计算的结果写入磁盘,在机器学习、图计算、PageRank等迭代计算下,重用中间结果导致的反复I/O耗时过长,成为了计算性能的瓶颈。为了提高迭代计算的性能和分布式并行计算下共享数据的容错性,伯克利的设计者依据两个特性而设计了RDD:
    1、数据集分区存储在节点的内存中,减少迭代过程(如机器学习算法)反复的I/O操作从而提高性能。
    2、数据集不可变,并记录其转换过程,从而实现无共享数据读写同步问题、以及出错的可重算性。

    Operations:算子

    算子是RDD中定义的函数,可以对RDD中的数据进行转换和操作。如下图,Spark从外部空间(HDFS)读取数据形成RDD_0,Tranformation算子对数据进行操作(如fliter)并转化为新的RDD_1、RDD_2,通过Action算子(如collect/count)触发Spark提交作业。

    如上的分析过程可以看出,Tranformation算子并不会触发Spark提交作业,直至Action算子才提交作业,这是一个延迟计算的设计技巧,可以避免内存过快被中间计算占满,从而提高内存的利用率。

    这里写图片描述

    下图是算子的列表,分三大类:Value数据类型的Tranformation算子;Key-Value数据类型的Tranformation算子;Action算子。

    这里写图片描述

    Lineage Graph:血统关系图

    下图的第一阶段生成RDD的有向无环图,即是血统关系图,记录了RDD的更新过程,当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。DAGScheduler依据RDD的依赖关系将有向无环图划分为多个Stage,一个Stage对应着一系列的Task,由TashScheduler分发给Worker计算。

    这里写图片描述

    二、组件

    1. 介绍

    spark生态系统中,Spark Core,包括各种Spark的各种核心组件,它们能够对内存和硬盘进行操作,或者调用CPU进行计算。
    spark core定义了RDD、DataFrame和DataSet

    这里写图片描述

    spark最初只有RDD,DataFrame在Spark 1.3中被首次发布,DataSet在Spark1.6版本中被加入。

    2. RDD

    RDD:Spark的核心概念是RDD (resilientdistributed dataset),指的是一个只读的,可分区的分布式数据集,这个数据集的全部或部分可以缓存在内存中,在多次计算间重用。

    优点:
    编译时类型安全
    编译时就能检查出类型错误
    面向对象的编程风格
    直接通过类名点的方式来操作数据
    缺点:
    序列化和反序列化的性能开销
    无论是集群间的通信, 还是IO操作都需要对对象的结构和数据进行序列化和反序列化.
    GC的性能开销
    频繁的创建和销毁对象, 势必会增加GC
    import org.apache.spark.sql.SQLContext
    import org.apache.spark.{SparkConf, SparkContext}
     
    object Run {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("test").setMaster("local")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
        val sqlContext = new SQLContext(sc)
     
        /**
          * id      age
          * 1       30
          * 2       29
          * 3       21
          */
        case class Person(id: Int, age: Int)
        val idAgeRDDPerson = sc.parallelize(Array(Person(1, 30), Person(2, 29), Person(3, 21)))
     
        // 优点1
        // idAge.filter(_.age > "") // 编译时报错, int不能跟String比
     
        // 优点2
        idAgeRDDPerson.filter(_.age > 25) // 直接操作一个个的person对象
      }
    }
    

    3. DataFrame

    在Spark中,DataFrame是一种以RDD为基础的分布式数据集,类似于传统数据库中的二维表格。DataFrame与RDD的主要区别在于,前者带有schema元信息,即DataFrame所表示的二维表数据集的每一列都带有名称和类型。这使得Spark SQL得以洞察更多的结构信息,从而对藏于DataFrame背后的数据源以及作用于DataFrame之上的变换进行了针对性的优化,最终达到大幅提升运行时效率的目标。反观RDD,由于无从得知所存数据元素的具体内部结构,Spark Core只能在stage层面进行简单、通用的流水线优化。

    这里写图片描述

    DataFrame引入了schema和off-heap

    schema : RDD每一行的数据, 结构都是一样的.
    这个结构就存储在schema中。 Spark通过schame就能够读懂数据, 因此在通信和IO时就只需要序列化和反序列化数据,而结构的部分就可以省略了。 off-heap : 意味着JVM堆以外的内存,这些内存直接受操作系统管理(而不是JVM)。Spark能够以二进制的形式序列化数据(不包括结构)到off-heap中,当要操作数据时,就直接操作off-heap内存。由于Spark理解schema,所以知道该如何操作。

    off-heap就像地盘,schema就像地图, Spark有地图又有自己地盘了, 就可以自己说了算了, 不再受JVM的限制,也就不再收GC的困扰了。通过schema和off-heap,DataFrame解决了RDD的缺点,但是却丢了RDD的优点。 DataFrame不是类型安全的, API也不是面向对象风格的。

    import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
    import org.apache.spark.sql.{Row, SQLContext}
    import org.apache.spark.{SparkConf, SparkContext}
     
    object Run {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("test").setMaster("local")
        val sc = new SparkContext(conf)
        sc.setLogLevel("WARN")
        val sqlContext = new SQLContext(sc)
        /**
          * id      age
          * 1       30
          * 2       29
          * 3       21
          */
        val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))
     
        val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))
     
        val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)
        // API不是面向对象的
        idAgeDF.filter(idAgeDF.col("age") > 25) 
        // 不会报错, DataFrame不是编译时类型安全的
        idAgeDF.filter(idAgeDF.col("age") > "") 
      }
    }
    

    4. DataSet

    Dataset是一个强类型的特定领域的对象,这种对象可以函数式或者关系操作并行地转换。每个Dataset也有一个被称为一个DataFrame的类型化视图,这种DataFrame是Row类型的Dataset,即Dataset[Row]

    Dataset是“懒惰”的,只在执行行动操作时触发计算。本质上,数据集表示一个逻辑计划,该计划描述了产生数据所需的计算。当执行行动操作时,Spark的查询优化程序优化逻辑计划,并生成一个高效的并行和分布式物理计划。

    DataSet结合了RDD和DataFrame的优点,,并带来的一个新的概念Encoder 当序列化数据时,Encoder产生字节码与off-heap进行交互,能够达到按需访问数据的效果, 而不用反序列化整个对象。 Spark还没有提供自定义Encoder的API,但是未来会加入。

    下面看DataFrame和DataSet在2.0.0-preview中的实现

    下面这段代码, 在1.6.x中创建的是DataFrame
    // 上文DataFrame示例中提取出来的
    val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))
     
    val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))
     
    val idAgeDF = sqlContext.createDataFrame(idAgeRDDRow, schema)
    
    但是同样的代码在2.0.0-preview中, 创建的虽然还叫DataFrame
    
    // sqlContext.createDataFrame(idAgeRDDRow, schema) 方法的实现, 返回值依然是DataFrame
    def createDataFrame(rowRDD: RDD[Row], schema: StructType): DataFrame = {
    sparkSession.createDataFrame(rowRDD, schema)
    }
    
    但是其实却是DataSet, 因为DataFrame被声明为Dataset[Row]
    
    package object sql {
      // ...省略了不相关的代码
     
      type DataFrame = Dataset[Row]
    }
    
    因此当我们从1.6.x迁移到2.0.0的时候, 无需任何修改就直接用上了DataSet.
    
    下面是一段DataSet的示例代码
    
    import org.apache.spark.sql.types.{DataTypes, StructField, StructType}
    import org.apache.spark.sql.{Row, SQLContext}
    import org.apache.spark.{SparkConf, SparkContext}
     
    object Test {
      def main(args: Array[String]) {
        val conf = new SparkConf().setAppName("test").setMaster("local") // 调试的时候一定不要用local[*]
        val sc = new SparkContext(conf)
        val sqlContext = new SQLContext(sc)
        import sqlContext.implicits._
     
        val idAgeRDDRow = sc.parallelize(Array(Row(1, 30), Row(2, 29), Row(4, 21)))
     
        val schema = StructType(Array(StructField("id", DataTypes.IntegerType), StructField("age", DataTypes.IntegerType)))
     
        // 在2.0.0-preview中这行代码创建出的DataFrame, 其实是DataSet[Row]
        val idAgeDS = sqlContext.createDataFrame(idAgeRDDRow, schema)
     
        // 在2.0.0-preview中, 还不支持自定的Encoder, Row类型不行, 自定义的bean也不行
        // 官方文档也有写通过bean创建Dataset的例子,但是我运行时并不能成功
        // 所以目前需要用创建DataFrame的方法, 来创建DataSet[Row]
        // sqlContext.createDataset(idAgeRDDRow)
     
        // 目前支持String, Integer, Long等类型直接创建Dataset
        Seq(1, 2, 3).toDS().show()
        sqlContext.createDataset(sc.parallelize(Array(1, 2, 3))).show()
      }
    }
    

    5. RDD和DataFrame比较

    这里写图片描述

    DataFrame与RDD相同之处,都是不可变分布式弹性数据集。不同之处在于,DataFrame的数据集都是按指定列存储,即结构化数据。类似于传统数据库中的表。
    DataFrame的设计是为了让大数据处理起来更容易。DataFrame允许开发者把结构化数据集导入DataFrame,并做了higher-level的抽象; DataFrame提供特定领域的语言(DSL)API来操作你的数据集。

    上图直观地体现了DataFrame和RDD的区别。左侧的RDD[Person]虽然以Person为类型参数,但Spark框架本身不了解Person类的内部结构。而右侧的DataFrame却提供了详细的结构信息,使得Spark SQL可以清楚地知道该数据集中包含哪些列,每列的名称和类型各是什么。DataFrame多了数据的结构信息,即schema。RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化,比如filter下推、裁剪等。

    6. RDD和DataSet比较

    DataSet以Catalyst逻辑执行计划表示,并且数据以编码的二进制形式被存储,不需要反序列化就可以执行sorting、shuffle等操作。

    DataSet创立需要一个显式的Encoder,把对象序列化为二进制,可以把对象的scheme映射为Spark SQl类型,然而RDD依赖于运行时反射机制。

    通过上面两点,DataSet的性能比RDD的要好很多

    7. DataFrame和DataSet比较

    Dataset可以认为是DataFrame的一个特例,主要区别是Dataset每一个record存储的是一个强类型值而不是一个Row。因此具有如下三个特点:
    1.DataSet可以在编译时检查类型
    2.是面向对象的编程接口。用wordcount举例:
    3.后面版本DataFrame会继承DataSet,DataFrame是面向Spark SQL的接口。
    DataFrame和DataSet可以相互转化,df.as[ElementType]这样可以把DataFrame转化为DataSet,ds.toDF()这样可以把DataSet转化为DataFrame。
    //DataFrame
     
    // Load a text file and interpret each line as a java.lang.String
    val ds = sqlContext.read.text("/home/spark/1.6/lines").as[String]
    val result = ds
      .flatMap(_.split(" "))               // Split on whitespace
      .filter(_ != "")                     // Filter empty words
      .toDF()                              // Convert to DataFrame to perform aggregation / sorting
      .groupBy($"value")                   // Count number of occurences of each word
      .agg(count("*") as "numOccurances")
      .orderBy($"numOccurances" desc)      // Show most common words first
    
    //DataSet,完全使用scala编程,不要切换到DataFrame
     
    val wordCount =
      ds.flatMap(_.split(" "))
        .filter(_ != "")
        .groupBy(_.toLowerCase()) // Instead of grouping on a column expression (i.e. $"value") we pass a lambda function
        .count()
    
    

    8. 应用场景

    什么时候用RDD?使用RDD的一般场景:
    你需要使用low-level的transformation和action来控制你的数据集;
    你得数据集非结构化,比如,流媒体或者文本流;
    你想使用函数式编程来操作你得数据,而不是用特定领域语言(DSL)表达;
    你不在乎schema,比如,当通过名字或者列处理(或访问)数据属性不在意列式存储格式;
    你放弃使用DataFrame和Dataset来优化结构化和半结构化数据集
    RDD在Apache Spark 2.0中惨遭抛弃?
    答案当然是 NO !
    通过后面的描述你会得知:Spark用户可以在RDD,DataFrame和Dataset三种数据集之间无缝转换,而是只需使用超级简单的API方法。
    什么时候使用DataFrame或者Dataset?
    你想使用丰富的语义,high-level抽象,和特定领域语言API,那你可DataFrame或者Dataset;
    你处理的半结构化数据集需要high-level表达, filter,map,aggregation,average,sum ,SQL 查询,列式访问和使用lambda函数,那你可DataFrame或者Dataset;
    你想利用编译时高度的type-safety,Catalyst优化和Tungsten的code生成,那你可DataFrame或者Dataset;
    你想统一和简化API使用跨Spark的Library,那你可DataFrame或者Dataset;
    如果你是一个R使用者,那你可DataFrame或者Dataset;
    如果你是一个Python使用者,那你可DataFrame或者Dataset;
    你可以无缝的把DataFrame或者Dataset转化成一个RDD,只需简单的调用 .rdd:
    
    // select specific fields from the Dataset, apply a predicate
    // using the where() method, convert to an RDD, and show first 10
    // RDD rows
     
    val deviceEventsDS = ds.select($"device_name", $"cca3", $"c02_level").where($"c02_level" > 1300)
    // convert to RDDs and take the first 10 rows
     
    val eventsRDD = deviceEventsDS.rdd.take(10)
    
    展开全文
  • Spark SQL

    万次阅读 2018-01-17 10:44:58
    Spark 1.0版本开始,推出了Spark SQL。 其实最早使用的,都是Hadoop自己的Hive查询引擎;但是后来Spark提供了Shark;再后来Shark被淘汰,推出了Spark SQL。Shark的性能比Hive就要高出一个数量级,而Spark ...

    1. 背景

    Spark 1.0版本开始,推出了Spark SQL。

    其实最早使用的,都是Hadoop自己的Hive查询引擎;但是后来Spark提供了Shark;再后来Shark被淘汰,推出了Spark
    SQL。Shark的性能比Hive就要高出一个数量级,而Spark SQL的性能又比Shark高出一个数量级。

    最早来说,Hive的诞生,主要是因为要让那些不熟悉Java,无法深入进行MapReduce编程的数据分析师,能够使用他们熟悉的关系型数据库的SQL模型,来操作HDFS上的数据。因此推出了Hive。Hive底层基于MapReduce实现SQL功能,能够让数据分析人员,以及数据开发人员,方便的使用Hive进行数据仓库的建模和建设,然后使用SQL模型针对数据仓库中的数据进行统计和分析。但是Hive有个致命的缺陷,就是它的底层基于MapReduce,而MapReduce的shuffle又是基于磁盘的,因此导致Hive的性能异常低下。进场出现复杂的SQL
    ETL,要运行数个小时,甚至数十个小时的情况。

    后来,Spark推出了Shark,Shark与Hive实际上还是紧密关联的,Shark底层很多东西还是依赖于Hive,但是修改了内存管理、物理计划、执行三个模块,底层使用Spark的基于内存的计算模型,从而让性能比Hive提升了数倍到上百倍。

    然而,Shark还是它的问题所在,Shark底层依赖了Hive的语法解析器、查询优化器等组件,因此对于其性能的提升还是造成了制约。所以后来Spark团队决定,完全抛弃Shark,推出了全新的Spark
    SQL项目。Spark SQL就不只是针对Hive中的数据了,而且可以支持其他很多数据源的查询。

    2. Spark SQL特点

    Spark SQL的特点
    1、支持多种数据源:Hive、RDD、Parquet、JSON、JDBC等。
    2、多种性能优化技术:in-memory columnar storage、byte-code generation、cost model动态评估等。
    3、组件扩展性:对于SQL的语法解析器、分析器以及优化器,用户都可以自己重新开发,并且动态扩展。

    在2014年6月1日的时候,Spark宣布了不再开发Shark,全面转向Spark SQL的开发。

    Spark SQL的性能比Shark来说,又有了数倍的提升。

    3. 性能优化技术

    Spark SQL的性能优化技术简介
    1、内存列存储(in-memory columnar storage)
    内存列存储意味着,Spark SQL的数据,不是使用Java对象的方式来进行存储,而是使用面向列的内存存储的方式来进行存储。也就是说,每一列,作为一个数据存储的单位。从而大大优化了内存使用的效率。采用了内存列存储之后,减少了对内存的消耗,也就避免了gc大量数据的性能开销。
    2、字节码生成技术(byte-code generation)
    Spark SQL在其catalyst模块的expressions中增加了codegen模块,对于SQL语句中的计算表达式,比如select num + num from t这种的sql,就可以使用动态字节码生成技术来优化其性能。
    3、Scala代码编写的优化
    对于Scala代码编写中,可能会造成较大性能开销的地方,自己重写,使用更加复杂的方式,来获取更好的性能。比如Option样例类、for循环、map/filter/foreach等高阶函数,以及不可变对象,都改成了用null、while循环等来实现,并且重用可变的对象。

    4. Spark SQL与Hive on Spark

    Hive on Spark是由Cloudera发起,由Intel、MapR等公司共同参与的开源项目,其目的是把Spark作为Hive的一个计算引擎,将Hive的查询作为Spark的任务提交到Spark集群上进行计算。通过该项目,可以提高Hive查询的性能,同时为已经部署了Hive或者Spark的用户提供了更加灵活的选择,从而进一步提高Hive和Spark的普及率。

    Hive on Spark是从Hive on MapReduce演进而来,Hive的整体解决方案很不错,但是从查询提交到结果返回需要相当长的时间,查询耗时太长,这个主要原因就是由于Hive原生是基于MapReduce的,那么如果我们不生成MapReduce Job,而是生成Spark Job,就可以充分利用Spark的快速执行能力来缩短HiveQL的响应时间。

    Hive on Spark现在是Hive组件(从Hive1.1 release之后)的一部分。

    SparkSQL和Hive On Spark都是在Spark上实现SQL的解决方案。Spark早先有Shark项目用来实现SQL层,不过后来推翻重做了,就变成了SparkSQL。这是Spark官方Databricks的项目,Spark项目本身主推的SQL实现。Hive On Spark比SparkSQL稍晚。Hive原本是没有很好支持MapReduce之外的引擎的,而Hive On Tez项目让Hive得以支持和Spark近似的Planning结构(非MapReduce的DAG)。所以在此基础上,Cloudera主导启动了Hive On Spark。这个项目得到了IBM,Intel和MapR的支持(但是没有Databricks)。

    5. 代码示例

    SparkSQL实例代码:

    import org.apache.spark._
    
    object Hello {
    
        // 创建一个表示用户的自定义类
        case class Person(name: String, age: Int)
    
        def main(args: Array[String]) {
    
            val conf = new SparkConf().setAppName("SparkSQL Demo")
            val sc = new SparkContext(conf)
    
            // 首先用已有的Spark Context对象创建SQLContext对象
            val sqlContext = new org.apache.spark.sql.SQLContext(sc)
    
            // 导入语句,可以隐式地将RDD转化成DataFrame
            import sqlContext.implicits._
    
            // 用数据集文本文件创建一个Person对象的DataFrame
            val people = sc.textFile("/Users/urey/data/input2.txt").map(_.split(",")).map(p => Person(p(0), p(1).trim.toInt)).toDF()
    
            // 将DataFrame注册为一个表
            people.registerTempTable("people")
    
            // SQL查询
            val teenagers = sqlContext.sql("SELECT name, age FROM people WHERE age >= 13 AND age <= 19")
    
            // 输出查询结果,按照顺序访问结果行的各个列。
            teenagers.map(t => "Name: " + t(0)).collect().foreach(println)
    
            sc.stop()
            }
    }
    

    Hive on Spark实例代码:

    val hiveContext = new HiveContext(sc)
    
    import hiveContext._
    
    hql("CREATE TABLE IF NOT EXIST src(key INT, value STRING)")
    
    hql("LOAD DATA LOCAL PATH '/Users/urey/data/input2.txt' INTO TABLE src")
    
    hql("FROM src SELECT key, value").collect().foreach(println)
    
    展开全文
  • spark直接写入hive表>

    千次阅读 2017-09-15 14:35:47
    import org.apache.spark.rdd.RDD import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import org.apache.spark.{SparkConf, SparkContext}object Main ...
  • 系统的讲解了我们为什么需要去认识sparkspark有什么内容以及我们该怎么去学习spark。在学习spark过程中遵循的几个原则。内容如下: 1 大数据是什么 2 需要什么知识(除了scala,java和python都行) 3 spark可以做...
  • 大数据Spark实战视频教程

    万人学习 2016-11-10 14:26:54
    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室...
  • SparkSpark基础教程

    万次阅读 多人点赞 2019-03-20 12:33:42
    Spark最初由美国加州伯克利大学的AMP实验室于2009年开发,是基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序。 Spark特点 Spark具有如下几个主要特点: 运行速度快:Spark使用先进...
  • Hadoop与Spark等大数据框架介绍

    万次阅读 多人点赞 2018-08-09 17:06:40
    海量数据的存储问题很早就已经出现了,一些行业或者部门因为历史的积累,数据量也达到了一定的级别。很早以前,当一台电脑无法存储这么庞大的数据时,采用的解决方案是使用NFS(网络文件系统)将数据分开存储。...
  • Spark排错与优化

    万次阅读 多人点赞 2015-10-15 17:08:36
    Master默认使用512M内存,当集群中运行的任务特别多时,就会挂掉,原因是master会读取每个task的event log日志去生成spark ui,内存不足自然会OOM,可以在master的运行日志中看到,通过HA启动的master自然也会因为这...
  • Spark基础与编程模型

    千人学习 2015-11-02 13:20:03
    Spark基础与编程模型视频教程,该课程主要讲解Spark基础入门知识与编程模型,全面了解Spark各个知识点,为深入学习大数据相关知识打下基础。 讲师介绍,陈超:七牛云技术总监,专注于分布式计算与机器学习相关领域...
  • Spark快速大数据处理

    万人学习 2019-04-24 19:32:53
    5.Spark2实时大数据处理 6.Oozie5-大数据流程引擎 课程特点: 1.最新API: Hadoop3/Spark2/Hive3/Oozie5 2.手工搭建集群环境:编译+搭建 3.配套资源:分阶段镜像+课件+安装资源,其中安装...
  • at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:738) at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:187) at org.apache....
  • .config("spark.mongodb.input.uri", "mongodb://127.0.0.1/Spark-Test.Numbers") \ .config("spark.mongodb.output.uri", "mongodb://127.0.0.1/Spark-Test.Numbers") \ .getOrCreate() df = spark.read.format...
  • 深入浅出Spark

    万人学习 2014-11-29 11:05:21
    Spark是下一代In Memory MR计算框架,性能上有数量级提升,同时支持Interactive Query、流计算、图计算等。本次视频将为大家分享Spark的核心原理,并在此基础上探讨几个Spark性能的优化点!
  • Spark概述、Spark特点

    千次阅读 2017-07-05 16:38:41
    一、 Spark概述1. 什么是Spark(官网:http://spark.apache.org) Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月...
  • 1. 简介Spark的bin目录中的spark-submit脚本用于启动集群上的应用程序。 可以通过统一的接口使用Spark所有支持的集群管理器,因此不必为每个集群管理器专门配置你的应用程序(It can use all of Spark’s supported ...
  • JAVA代码实现编程式提交Spark任务

    万次阅读 2020-07-29 16:54:11
    1)直接调用SparkSubmit的main方法 2)SparkLauncher类的launch方法或者startApplication方法 3)使用RestSubmissionClient的run方法 SparkSubmit提交任务 String[] param = { "--class", "org.apache....
  • nohup bin/spark-submit --master spark://sousou:7077 --executor-memory 1g --total-executor-cores 2 --class AnalyzeInfo /spark/jar/v2_AnalyzeInfo.jar & nohup bin/spark-submit --master spark://...
  • Spark中,有Yarn-Client和Yarn-Cluster两种模式可以运行在Yarn上,通常Yarn-cluster适用于生产环境,而Yarn-Client更适用于交互,调试模式,以下是它们的区别   Spark插拨式资源管理 Spark支持Yarn,Mesos,...
  • 1 Spark机器学习 spark MLlib 入门

    万次阅读 2018-09-17 10:59:14
    开始学习spark ml了,都知道spark是继hadoop后的大数据利器,很多人都在使用spark的分布式并行来处理大数据。spark中也提供了机器学习的包,就是MLlib。 MLlib中也包含了大部分常用的算法,分类、回归、聚类等等,...
  • Spark2.1.0——SparkUI的实现

    千次阅读 多人点赞 2018-11-20 09:53:05
    任何系统都需要提供监控功能,否则在运行期间发生一些异常时,我们将会束手无策。也许有人说,可以增加日志来解决这个问题。日志只能解决你的程序逻辑在运行期的监控,进而发现Bug,以及提供对业务有帮助的调试信息...
  • spark Standalone

    千次阅读 2014-11-08 23:37:48
    Spark任务调度方式 Spark运行模式 Spark的运行模式取决于传递给SparkContext的deployMode和master环境变量的值,个别模式还需要辅助的程序接口配合使用,目前master有local,yarn-client,yarn-cluster,spark...
  • Spark之——Spark Submit提交应用程序

    万次阅读 2018-06-19 21:44:36
    本部分来源,也可以到spark官网查看英文版。 spark-submit 是在spark安装目录中bin目录下的一个shell脚本文件,用于在集群中启动应用程序(如*.py脚本);对于spark支持的集群模式,spark-submit提交应用的时候有...
  • Spark——Spark概述

    千次阅读 2020-09-02 00:49:31
    一、Spark是什么 二、Spark and Hadoop 在之前的学习中,Hadoop的MapReduce是大家广为熟知的计算框架,那为什么咱们还要学习新的计算框架Spark呢,这里就不得不提到Spark和Hadoop的关系。 首先从时间节点上...
  • spark 参数调优4-Spark UI

    千次阅读 2018-08-31 14:42:47
    spark参数调优系列 目录地址: https://blog.csdn.net/zyzzxycj/article/details/81011540   ④ Spark UI 这一块配置,是有关于spark日志的。日志开关,日志输出路径,是否压缩。 还有一些可视化界面、端口的...
  • spark 参数调优11-Spark Streaming

    千次阅读 2018-09-05 17:50:32
    spark参数调优系列 目录地址: https://blog.csdn.net/zyzzxycj/article/details/81011540   11 Spark Streaming spark.streaming.backpressure.enabled 反压,默认false,详细了解请移步...

空空如也

1 2 3 4 5 ... 20
收藏数 162,323
精华内容 64,929
关键字:

spark