spark 订阅
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。 展开全文
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。
信息
基    于
MapReduce算法实现的分布式计算
最新版本
2.4.0
外文名
Spark
SPARK基本介绍
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎 [1]  。现在形成一个高速发展应用广泛的生态系统。
收起全文
精华内容
参与话题
问答
  • spark入门之一 spark组件

    千次阅读 2017-01-27 14:07:23
    spark

    一:spark的组件构成

          

         1:每一个application有自己的executor的进程,它们相互隔离,每个executor中可以有多个task线程。这样可以很好的隔离各个applications,各个spark applications 不能分享数据,除非把数据写到外部系统。

          2:SparkContext对象可以视为Spark应用程序的入口,主程序被称为driver program,SparkContext可以与不同种类的集群资源管理器(Cluster Manager),例如Hadoop Yarn、Mesos等 进行通信,从而分配到程序运行所需的资源,获取到集群运行所需的资源后,SparkContext将得到集群中其它工作节点(Worker Node) 上对应的Executors (不同的Spark应用程序有不同的Executor,它们之间也是独立的进程,Executor为应用程序提供分布式计算及数据存储功能),之后SparkContext将应用程序代码分发到各Executors,最后将任务(Task)分配给executors执行。

     二:spark相关概念

              Application                      运行在集群上的用户程序,包含集群上的driver program 和多个executor线程组成;

              Driver program               application运行的main方法,并生成sparkcontext;

              Cluster manager             集群资源管理器 ;

              Deploy mode                   部署模式 用于区别driver program的运行方式:集群模式(cluter mode),driver在集群内部启动;客户端模式(client mode),driver进程从集群外部启动;   

              Worker node                    工作节点,运行application的节点

              Executor                           work node的上进程,运行task并保持数据交互,每一个application有自己的executor

              Task                                   运行于Executor中的任务单元,Spark应用程序最终被划分为经过优化后的多个任务的集合

              Job                                     由多个转变构建的并行计算任务,具体为Spark中的action操作, 一个action就为一个job

      三:Resilient Distributed Datasets (RDDs)         

               spark 涉及的核心概念就是resilient distributed dataset (RDD),rdd是具有容错性的数据集合,并可以并行数据计算。有两种方法可以创建rdd,第一种就是parallelizing 方法:序列化存在driver program 中的集合,见下方代码
    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data)
    parallelize 方法中可以指定数据分区参数,并每个分区对应一个task 如下面代码
    val data = Array(1, 2, 3, 4, 5)
    val distData = sc.parallelize(data,10)
             

         RDD 可以抽象的认为是一个数组,这个数组分布在集群上,RDD可以进行逻辑上的分区,每个分区叫一个partition。在spark application运行过程中,RDD经过一个个transformtion 转换算子后,最后通过Action算计触发操作;RDD是懒加载的,前面的转化并不发生实际的操作,这个转化为记录在DAG图中,只有触发action后,才实际就行操作。逻辑上每经历一个变化,RDD就会转化为新的RDD,rdd之间通过lineage关系,这个关系在容错中起到至关重要的作用。

    RDD的源码中标注了5个性质:

    1. 一组分片(partition),即数据集的基本组成单位
    2. 每个分片都可以计算
    3. 对parent RDD的依赖,这个依赖描述了RDD之间的lineage
    4. 对于key-value的RDD,一个Partitioner
    5. 一个列表,存储存取每个partition的preferred位置。对于一个HDFS文件来说,存储每个partition所在的块的位置。

         四:RDD的依赖
              RDD在每次转化时候,会生成一个新的RDD,但新的RDD和旧的RDD之间保持着关系,就是依赖,依据依赖的样式,可以划分为窄依赖和宽依赖;
          
        
           窄依赖是指每个父RDD的Partition最多被子RDD的一个Partition所使用,例如map、filter,见上左图
           宽依赖是指一个父RDD的Partition会被多个子RDD的Partition所使用,例如groupByKey、reduceByKey等
           
       五:RDD的持久化
              RDD的持久化是spark的一个重要的特性,当你把RDD持久化,每个Node会存储RDD的分区在内存,在其他action中用到此rdd的时候,就不用从头转化,而是直接使用。你可以用persist或者cache方法持久化rdd,Spark的 缓 存是一个容 错 的技 术 -如果RDD的任何一个分区 丢 失,它 可以通 过 原有的 转换 ( transformations )操作自 动 的重复 计 算并且 创 建出 这 个分区。另外,每一个RDD可以选择不同的持久化级别:
         MEMORY_ONLY     把RDD非序列化为Java对象存在jvm中,如果RDD不适合持久化在内存中,RDD的一些分区可能不能持久化,让此RDD需要的时候,此丢失的RDD分区会重新计算获取;
         MEMORY_AND_DISK   将RDD作为非序列化的Java对象存储在jvm中。如何RDD不适合存在内存中,将这些不适合存在内存的分区存在磁盘中
         MEMORY_ONLY_SER     把RDD序列化为Java对象存在jvm中,如果RDD不适合持久化在内存中,RDD的一些分区可能不能持久化,让此RDD需要的时候,此丢失的RDD分区会重新计算获取;
         DISK_ONLY  仅仅存在磁盘中
      Spark也会自动持久化一些shuffle操作(如 reduceByKey )中的中间数据,即使用户没有调用 persist 方法。 这样 的好 处 是避免了在shuffle出 错 情况下,需要重复 计 算整个 输 入

         

                 


              

    展开全文
  • Scala基础入门教程,解析Scala集合操作,通过本课程的学习,能够应用Scala进行Spark应用程序开发、掌握Spark的基本运行原理及编程模型,能够熟悉运用Spark SQL进行大数据仓库的开发,掌握Spark流式计算、Spark机器...
  • Spark学习笔记:Spark基础

    千次阅读 2018-09-03 23:39:57
    Spark基础以及WordCount实现

    目录

     

    Spark基础

    1.Spark基础入门

    (1)什么是Spark

    (2)Spark生态圈

    (3)Spark的特点与MapReduce对比

    2.Spark体系结构与安装部署

    (1)Spark体系结构

    (2)Spark的安装与部署

    (3)Spark HA的实现

    3.执行Spark Demo

    (1)Spark-submit

    (2)Spark-shell

    (3)Spark实现WordCount

    (4)Spark WordCount的Java版本

    (5)Spark WordCount的Scala版本

    4.Spark运行机制及原理分析


    Spark基础

    1.Spark基础入门

    (1)什么是Spark

    Spark是用于大规模数据处理的统一分析引擎

    (2)Spark生态圈

               Spark Core:内核
               Spark SQL:用于处理结构化数据的组件,类似Hive
               Spark Streaming:用于处理流式数据的组件,类似Storm
               Spark MLLib:机器学习
               Spark Graphx: 图计算

    (3)Spark的特点与MapReduce对比

    Spark的特点
    1.基于内存,所以速度快,但同时也是缺点,因为Spark没有对内存进行管理,容易OOM(out of memory内存溢出),可以用Java Heap Dump对内存溢出问题进行分析
    2.可以使用Scala、Java、Python、R等语言进行开发
    3.兼容Hadoop

    Spark与MapReuce对比
    1.MapReduce最大的缺点,Shuffle过程中会有很多I/O开销,可以看到这里有6个地方会产生IO,而Spark只会在1和6的地方产生I/O,其他的过程都在内存中进行

    2.Spark是MapReduce的替代方案,兼容Hive、HDFS、融入到Hadoop

    2.Spark体系结构与安装部署

    (1)Spark体系结构

    1.主从架构:存在单点故障的问题,因此需要实现HA
    2.Spark体系结构图

    Driver Program可以理解为是客户端,而右边的可以理解为服务器端。 Cluster Manager是主节点,主节点并不负责真正任务的执行,任务的执行由Worker Node完成。
    这是一张更详细的架构图

    如果要搭建全分布模式,至少需要两个worker
    要实现HA的话,则必须借助ZooKeeper

    (2)Spark的安装与部署

    Spark伪分布模式的部署

    解压 tar -zxvf spark-2.2.0-bin-hadoop2.6.tgz -C ~/training/
    注意:由于Hadoop和Spark的脚本有冲突,设置环境变量的时候,只能设置一个
    核心配置文件:  conf/spark-env.sh
             export JAVA_HOME=/root/training/jdk1.8.0_144
             export SPARK_MASTER_HOST=bigdata111
             export SPARK_MASTER_PORT=7077      
    从节点的地址:slaves文件中填入主机名即可,注意hosts文件里要有对ip的解析
    启动Spark集群  sbin/start-all.sh,这里我个人是给这个文件做了一个软链接start-spark.sh,因为hadoop下的启动脚本也是start-all.sh,会有冲突
    Web界面:主机名:8080

    Spark全分布模式的部署

    全分布式的部署与伪分布式类似,在每个节点上都解压压缩包,修改conf/spark-env.sh
    在主节点上的slaves文件中填入从节点的主机名
    然后在每个节点上启动集群即可

    (3)Spark HA的实现

    1.基于文件系统的单点恢复

    适用于开发和测试环境                                                                                                                                                      
    恢复目录:保存集群的运行信息                                                                                                                                               
    在spark-env.sh中 增加                                                                                                                                          

    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.2.0-bin-hadoop2.6/recovery"

    如果有运行的任务,任务信息就会被写入到恢复目录下
    当节点宕掉重启之后, Spark就可以从恢复目录中的文件获取之前的状态并进行恢复

    2.基于zookeeper实现Standby Master

    zookeeper的功能:数据同步、选举的功能、分布式锁(秒杀)
    启动zookeeper,运行zkServer.sh,然后会选举出zookeeper集群的leader和follower,节点状态可以通过zkServer.sh status查看

    zookeeper数据同步功能
    启动zookeeper后,在随意一个节点的zkCli.sh(即zk shell)中输入create /node001 helloworld
    在其他节点的shell中get /node001都可以看得见这个虚数据helloworld

    zookeeper选举功能
    每个zookeeper集群都会有一个leader,其他都是follower,当leader节点宕机了,其他的follower会选举出leader

    zookeeper实现Standby Master
    在spark-env.sh中增加

    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata112:2181,bigdata113:2181,bigdata114:2181 -Dspark.deploy.zookeeper.dir=/mysparkHA"

    其中Dspark.deploy.zookeeper.url参数是zookeeper集群每个节点的地址,之前有提到zookeeper需要有三个节点以上

    注释下面的两行
    #export SPARK_MASTER_HOST=bigdata112
    #export SPARK_MASTER_PORT=7077
    配置好后在两台机器上启动spark-master,在web界面上就会发现一个是ALIVE,一个是StandBy

    3.执行Spark Demo

    (1)Spark-submit

    Spark-submit可以提交任务到Spark集群执行,也可以提交到hadoop的yarn集群执行

    这里运行了一个蒙特卡罗求圆周率的Demo,运行1000次

    spark-submit --master spark://centos:7077 --class org.apache.spark.examples.SparkPi /opt/software/spark-2.2.0-bin-hadoop2.6/examples/jars/spark-examples_2.11-2.2.0.jar 1000

    --master后面跟spark的地址,然后用--class指定类和jar包,以及运行次数

    (2)Spark-shell

    Spark-shell是Spark自带的交互式程序,方便用户进行交互式变成,用户可以在该命令行下用scala编写spark程序。
    Spark-shell有两种运行模式:本地模式和集群模式
    本地模式:不连接到集群,在本地直接执行Spark任务(local模式)
    直接运行spark-shell

    集群模式:连接到集群,在集群执行任务
    集群模式下的shell将作为一个独立的Application链接到Master上

    运行spark-shell --master spark://centos:7077

    Spark的Web上可以看见

    (3)Spark实现WordCount

    Spark可以集成到HDFS,读取HDFS里的文件

    先做一个测试文件data.txt,上传到HDFS上

    执行WordCount

    进行单步分析

    可以看到一个String类型的RDD,用来存储文本信息,但这个时候并不会真正的执行

    执行rdd1.collect之后,才会真正的执行,获取文本文件里的字符串,放进RDD里

    flatmap_是表示rdd1里的每个元素,然后使用split方法,间隔符是空格,同样的,要执行collect才算真正执行

    map((_,1))是把元素里的每个元素都映射成了(word,1)的kv对,这个语法糖等价于下面这条语句

    reduceByKey方法是使用一个相关的函数来合并每个key的value的值的一个算子,前面的下划线可以理解为sum,用来迭代计算和,后面的下划线是每个kv对的value

    总结:RDD就是一个集合,存在依赖关系,RDD有些方法不会触发计算,有些会触发计算

    (4)Spark WordCount的Java版本

    新建Java Project,名为ZSparkDemo,然后在project下新建folder,名为lib,然后把/opt/software/spark-2.2.0-bin-hadoop2.6/jars下的jar包复制到lib文件夹里

    代码与注释如下

    package demo;
    
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    /*
     * 打包成jar包上传到集群环境后,使用spark-submit提交任务
     * spark-submit --master spark://centos:7077 --class demo.JavaWordCount /WordCount.jar hdfs://10.1.130.233:9000/input/data.txt
     */
    public class JavaWordCount {
    
    	public static void main(String[] args) {
    
    		// 配置参数,setAppName方法指定app的名字
    		// setMaster方法用以设定Master的URL,设为local就会在本地以单线程运行
    		// local[4]就会在本地以4核运行,设为"spark://master:7077就会在独立集群上运行,或者不写就会默认在集群运行
    		//SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
    		SparkConf conf = new SparkConf().setAppName("JavaWordCount");
    
    		// 创建一个SparkContext对象
    		JavaSparkContext sc = new JavaSparkContext(conf);
    
    		// 指定路径,读入数据,路径可以是本地路径,也可以是HDFS上的
    		// 这个方法返回的是一个Java的RDD,类型是String
    		//JavaRDD<String> datas = sc.textFile("hdfs://10.1.130.233:9000/input/data.txt");
    		//这里可以不把路径写死,而是将args传入的第一个参数作为路径
    		JavaRDD<String> datas = sc.textFile(args[0]);
    
    		// 分词
    		// 这里需要实现FlatMapFunction接口,表示要对每个传入的文本所要执行的操作FlatMapFunction<String, U>
    		// 把U改成String,第一个String代表输入的文本,第二个String表示分词后的每个单词
    		JavaRDD<String> words = datas.flatMap(new FlatMapFunction<String, String>() {
    
    			@Override
    			// line表示每一行传入的数据
    			public Iterator<String> call(String line) throws Exception {
    				// 因为split完之后,返回的是一个String类型的数组,所以要用Arrays的asList方法转换成是一个List,然后才能用iterator
    				return Arrays.asList(line.split(" ")).iterator();
    			}
    
    		});
    
    		// 每个单词记一次数map((单词,1)
    		// 这里需要实现PairFunction接口,PairFunction<String, K2, V2>
    		// String代表传入的参数,K2,V2相当于MapReduce里Map的输出(Beijing,1),所以Key是String类型,V是Integer类型
    		JavaPairRDD<String, Integer> wordOne = words.mapToPair(new PairFunction<String, String, Integer>() {
    
    			@Override
    			public Tuple2<String, Integer> call(String word) throws Exception {
    				// Beijing --->(Beijing,1)
    				return new Tuple2<String, Integer>(word, 1);
    			}
    
    		});
    
    		// 执行Reduce的操作,把相同单词的value做求和
    		// Function2<Integer, Integer,
    		// Integer>,前面两个Integer表示:两个key相同的value,最后一个Integer表示运算的结果
    		JavaPairRDD<String, Integer> count = wordOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
    
    			@Override
    			public Integer call(Integer a, Integer b) throws Exception {
    				// TODO Auto-generated method stub
    				return a + b;
    			}
    		});
    
    		// 触发计算
    		List<Tuple2<String, Integer>> result = count.collect();
    
    		// 输出到Console
    		for (Tuple2<String, Integer> r : result) {
    			System.out.println(r._1 + ":" + r._2);
    		}
    
    		// 停止SparkContext对象
    		sc.stop();
    	}
    
    }
    

    打包成jar包后上传到集群环境, 通过spark-submit提交到集群运行

    spark-submit --master spark://centos:7077 --class demo.JavaWordCount /WordCount.jar hdfs://10.1.130.233:9000/input/data.txt

    在集群上运行结果如下 


    在Spark WebUI

    (5)Spark WordCount的Scala版本

    package demo
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]): Unit = {
        
        //获取Spark配置,setAppName方法用来设置app的名字,setMaster设为local则为在本地运行不提交到集群
        //val conf = new SparkConf().setAppName("WordCount").setMaster("local")
        val conf=new SparkConf().setAppName("WordCount")
        
        //获取SparkContext
        val sc = new SparkContext(conf)
    
        //textFile指定路径,然后做分词,转换成kv对,再reduceByKey做统计处理
        //val count=sc.textFile("hdfs://centos:9000/input/data.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        val count=sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
        
        //将结果保存到HDFS目录下,repartition方法设定只返回一个RDD,saveAsTextFile设定结果保存的地址
        count.repartition(1).saveAsTextFile(args(1))
        
        //触发计算
        val result=count.collect()
        
        //输出结果
        result.foreach(println)
        
        //停止SparkContext对象
        sc.stop()
      }
    }

     执行语句

    spark-submit --master spark://centos:7077 --class demo.WordCount /WordCount.jar hdfs://centos:9000/input/data.txt hdfs://centos:9000/output/result

    运行结果
     在HDFS里也可以看到生成的文件

    WordCount流程分析图

    4.Spark运行机制及原理分析

    展开全文
  • Spark 入门

    万次阅读 2017-09-13 09:06:28
    Apache Spark是一个轻量级的内存集群计算平台,通过不同的组件来支撑批、流和交互式用例。 Apache Spark是个开源和兼容Hadoop的集群计算平台。由加州大学伯克利分校的AMPLabs开发,作为Berkeley Data Analytics ...

    Spark相对于hadoop所做的改进:

    Spark 速度更快;

    其次,Spark 丰富的API 带来了更强大的易用性;

    最后,Spark 不单单支持传统批处理应用,更支持交互式查询、流式计算、机器学习、图计算等
    各种应用,满足各种不同应用场景下的需求。


    Apache Spark是一个轻量级的内存集群计算平台,通过不同的组件来支撑批、流和交互式用例。

    Apache Spark是个开源和兼容Hadoop的集群计算平台。由加州大学伯克利分校的AMPLabs开发,作为Berkeley Data Analytics Stack(BDAS)的一部分,当下由大数据公司Databricks保驾护航,更是Apache旗下的顶级项目,下图显示了Apache Spark堆栈中的不同组件。


    Apache Spark的5大优势:

    1.更高的性能,因为数据被加载到集群主机的分布式内存中。数据可以被快速的转换迭代,并缓存用以后续的频繁访问需求。很多对Spark感兴趣的朋友可能也会听过这样一句话——在数据全部加载到内存的情况下,Spark可以比Hadoop快100倍,在内存不够存放所有数据的情况下快Hadoop 10倍。

    2.通过建立在Java、Scala、Python、SQL(应对交互式查询)的标准API以方便各行各业使用,同时还含有大量开箱即用的机器学习库。

    3.与现有Hadoop v1 (SIMR) 和2.x (YARN) 生态兼容,因此机构可以进行无缝迁移。


    4.方便下载和安装。方便的shell(REPL: Read-Eval-Print-Loop)可以对API进行交互式的学习。

    5.借助高等级的架构提高生产力,从而可以讲精力放到计算上。

    同时,Apache Spark由Scala实现,代码非常简洁。

    三、安装Apache Spark

    下表列出了一些重要链接和先决条件:

    Current Release 1.0.1 @ http://d3kbcqa49mib13.cloudfront.net/spark-1.0.1.tgz
    Downloads Page https://spark.apache.org/downloads.html
    JDK Version (Required) 1.6 or higher
    Scala Version (Required) 2.10 or higher
    Python (Optional) [2.6, 3.0)
    Simple Build Tool (Required) http://www.scala-sbt.org
    Development Version git clone git://github.com/apache/spark.git
    Building Instructions https://spark.apache.org/docs/latest/building-with-maven.html
    Maven 3.0 or higher

    如图6所示,Apache Spark的部署方式包括standalone、Hadoop V1 SIMR、Hadoop 2 YARN/Mesos。Apache Spark需求一定的Java、Scala或Python知识。这里,我们将专注standalone配置下的安装和运行。

    1.安装JDK 1.6+、Scala 2.10+、Python [2.6,3] 和sbt

    2.下载Apache Spark 1.0.1 Release

    3.在指定目录下Untar和Unzip spark-1.0.1.tgz 

    akuntamukkala@localhost~/Downloads$ pwd 
    /Users/akuntamukkala/Downloads akuntamukkala@localhost~/Downloads$ tar -zxvf spark- 1.0.1.tgz -C /Users/akuntamukkala/spark

    4.运行sbt建立Apache Spark

    akuntamukkala@localhost~/spark/spark-1.0.1$ pwd /Users/akuntamukkala/spark/spark-1.0.1 akuntamukkala@localhost~/spark/spark-1.0.1$ sbt/sbt assembly

    5.发布Scala的Apache Spark standalone REPL

    /Users/akuntamukkala/spark/spark-1.0.1/bin/spark-shell

    如果是Python

    /Users/akuntamukkala/spark/spark-1.0.1/bin/ pyspark

    6.查看SparkUI @ http://localhost:4040

    四、Apache Spark的工作模式

    Spark引擎提供了在集群中所有主机上进行分布式内存数据处理的能力,下图显示了一个典型Spark job的处理流程。


    下图显示了Apache Spark如何在集群中执行一个作业。


    Master控制数据如何被分割,利用了数据本地性,并在Slaves上跟踪所有分布式计算。在某个Slave不可用时,其存储的数据会分配给其他可用的Slaves。虽然当下(1.0.1版本)Master还存在单点故障,但后期必然会被修复。

    五、弹性分布式数据集(Resilient Distributed Dataset,RDD)

    弹性分布式数据集(RDD,从Spark 1.3版本开始已被DataFrame替代)是Apache Spark的核心理念。它是由数据组成的不可变分布式集合,其主要进行两个操作:transformation和action。Transformation是类似在RDD上做 filter()、map()或union() 以生成另一个RDD的操作,而action则是count()、first()、take(n)、collect() 等促发一个计算并返回值到Master或者稳定存储系统的操作。Transformations一般都是lazy的,直到action执行后才会被执行。Spark Master/Driver会保存RDD上的Transformations。这样一来,如果某个RDD丢失(也就是salves宕掉),它可以快速和便捷地转换到集群中存活的主机上。这也就是RDD的弹性所在。

    下图展示了Transformation的lazy:


    我们可以通过下面示例来理解这个概念:从文本中发现5个最常用的word。下图显示了一个可能的解决方案。


    在上面命令中,我们对文本进行读取并且建立字符串的RDD。每个条目代表了文本中的1行。

    scala> val hamlet = sc.textFile(“/Users/akuntamukkala/temp/gutenburg.txt”)
    hamlet: org.apache.spark.rdd.RDD[String] = MappedRDD[1] at textFile at <console>:12
    scala> val topWordCount = hamlet.flatMap(str=>str.split(“ “)). filter(!_.isEmpty).map(word=>(word,1)).reduceByKey(_+_).map{case (word, count) => (count, word)}.sortByKey(false)
    topWordCount: org.apache.spark.rdd.RDD[(Int, String)] = MapPartitionsRDD[10] at sortByKey at <console>:14

    1. 通过上述命令我们可以发现这个操作非常简单——通过简单的Scala API来连接transformations和actions。

    2. 可能存在某些words被1个以上空格分隔的情况,导致有些words是空字符串,因此需要使用filter(!_.isEmpty)将它们过滤掉。

    3. 每个word都被映射成一个键值对:map(word=>(word,1))。

    4. 为了合计所有计数,这里需要调用一个reduce步骤——reduceByKey(_+_)。 _+_ 可以非常便捷地为每个key赋值。

    5. 我们得到了words以及各自的counts,下一步需要做的是根据counts排序。在Apache Spark,用户只能根据key排序,而不是值。因此,这里需要使用map{case (word, count) => (count, word)}将(word, count)流转到(count, word)。

    6. 需要计算最常用的5个words,因此需要使用sortByKey(false)做一个计数的递减排序。

    上述命令包含了一个.take(5) (an action operation, which triggers computation)和在 /Users/akuntamukkala/temp/gutenburg.txt文本中输出10个最常用的words。在Python shell中用户可以实现同样的功能。

    RDD lineage可以通过toDebugString(一个值得记住的操作)来跟踪。

    scala> topWordCount.take(5).foreach(x=>println(x))
    (1044,the)
    (730,and)
    (679,of)
    (648,to)
    (511,I)

    常用的Transformations:

    Transformation & Purpose Example & Result
    filter(func) Purpose: new RDD by selecting those data elements on which func returns true scala> val rdd = sc.parallelize(List(“ABC”,”BCD”,”DEF”)) scala> val filtered = rdd.filter(_.contains(“C”)) scala> filtered.collect()Result:
    Array[String] = Array(ABC, BCD)
    map(func) Purpose: return new RDD by applying func on each data element scala> val rdd=sc.parallelize(List(1,2,3,4,5)) scala> val times2 = rdd.map(_*2) scala> times2.collect()Result:
    Array[Int] = Array(2, 4, 6, 8, 10)
    flatMap(func) Purpose: Similar to map but func returns a Seq instead of a value. For example, mapping a sentence into a Seq of words scala> val rdd=sc.parallelize(List(“Spark is awesome”,”It is fun”)) scala> val fm=rdd.flatMap(str=>str.split(“ “)) scala> fm.collect()Result:
    Array[String] = Array(Spark, is, awesome, It, is, fun)
    reduceByKey(func,[numTasks]) Purpose: To aggregate values of a key using a function. “numTasks” is an optional parameter to specify number of reduce tasks scala> val word1=fm.map(word=>(word,1)) scala> val wrdCnt=word1.reduceByKey(_+_) scala> wrdCnt.collect()Result:
    Array[(String, Int)] = Array((is,2), (It,1), (awesome,1), (Spark,1), (fun,1))
    groupByKey([numTasks]) Purpose: To convert (K,V) to (K,Iterable<V>) scala> val cntWrd = wrdCnt.map{case (word, count) => (count, word)} scala> cntWrd.groupByKey().collect()Result:
    Array[(Int, Iterable[String])] = Array((1,ArrayBuffer(It, awesome, Spark, fun)), (2,ArrayBuffer(is)))
    distinct([numTasks]) Purpose: Eliminate duplicates from RDD scala> fm.distinct().collect() Result:
    Array[String] = Array(is, It, awesome, Spark, fun)

    常用的集合操作:

    Transformation and Purpose Example and Result
    union()
    Purpose: new RDD containing all elements from source RDD and argument.
    Scala> val rdd1=sc.parallelize(List(‘A’,’B’))
    scala> val rdd2=sc.parallelize(List(‘B’,’C’))
    scala> rdd1.union(rdd2).collect()
    Result:
    Array[Char] = Array(A, B, B, C)
    intersection()
    Purpose: new RDD containing only common elements from source RDD and argument.
    Scala> rdd1.intersection(rdd2).collect()
    Result:
    Array[Char] = Array(B)
    cartesian()
    Purpose: new RDD cross product of all elements from source RDD and argument
    Scala> rdd1.cartesian(rdd2).collect()
    Result:
    Array[(Char, Char)] = Array((A,B), (A,C), (B,B), (B,C))
    subtract()
    Purpose: new RDD created by removing data elements in source RDD in common with argument
    scala> rdd1.subtract(rdd2).collect() Result:
    Array[Char] = Array(A)
    join(RDD,[numTasks])
    Purpose: When invoked on (K,V) and (K,W), this operation creates a new RDD of (K, (V,W))
    scala> val personFruit = sc.parallelize(Seq((“Andy”, “Apple”), (“Bob”, “Banana”), (“Charlie”, “Cherry”), (“Andy”,”Apricot”)))
    scala> val personSE = sc.parallelize(Seq((“Andy”, “Google”), (“Bob”, “Bing”), (“Charlie”, “Yahoo”), (“Bob”,”AltaVista”)))
    scala> personFruit.join(personSE).collect()
    Result:
    Array[(String, (String, String))] = Array((Andy,(Apple,Google)), (Andy,(Apricot,Google)), (Charlie,(Cherry,Yahoo)), (Bob,(Banana,Bing)), (Bob,(Banana,AltaVista)))
    cogroup(RDD,[numTasks])
    Purpose: To convert (K,V) to (K,Iterable<V>)
    scala> personFruit.cogroup(personSe).collect()
    Result:
    Array[(String, (Iterable[String], Iterable[String]))] = Array((Andy,(ArrayBuffer(Apple, Apricot),ArrayBuffer(google))), (Charlie,(ArrayBuffer(Cherry),ArrayBuffer(Yahoo))), (Bob,(ArrayBuffer(Banana),ArrayBuffer(Bing, AltaVista))))

    更多transformations信息,请查看http://spark.apache.org/docs/latest/programming-guide.html#transformations

    常用的actions

    Action & Purpose Example & Result
    count() Purpose: get the number of data elements in the RDD scala> val rdd = sc.parallelize(list(‘A’,’B’,’c’)) scala> rdd.count()Result:
    long = 3
    collect() Purpose: get all the data elements in an RDD as an array scala> val rdd = sc.parallelize(list(‘A’,’B’,’c’)) scala> rdd.collect()Result:
    Array[char] = Array(A, B, c)
    reduce(func) Purpose: Aggregate the data elements in an RDD using this function which takes two arguments and returns one scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.reduce(_+_)Result:
    Int = 10
    take (n) Purpose: : fetch first n data elements in an RDD. computed by driver program. Scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.take(2)Result:
    Array[Int] = Array(1, 2)
    foreach(func) Purpose: execute function for each data element in RDD. usually used to update an accumulator(discussed later) or interacting with external systems. Scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.foreach(x=>println(“%s*10=%s”. format(x,x*10)))Result:
    1*10=10 4*10=40 3*10=30 2*10=20
    first() Purpose: retrieves the first data element in RDD. Similar to take(1) scala> val rdd = sc.parallelize(list(1,2,3,4)) scala> rdd.first()Result:
    Int = 1
    saveAsTextFile(path) Purpose: Writes the content of RDD to a text file or a set of text files to local file system/ HDFS scala> val hamlet = sc.textFile(“/users/akuntamukkala/ temp/gutenburg.txt”) scala> hamlet.filter(_.contains(“Shakespeare”)). saveAsTextFile(“/users/akuntamukkala/temp/ filtered”)Result:
    akuntamukkala@localhost~/temp/filtered$ ls _SUCCESS part-00000 part-00001

    更多actions参见http://spark.apache.org/docs/latest/programming-guide.html#actions 

    六、RDD持久性

    Apache Spark中一个主要的能力就是在集群内存中持久化/缓存RDD。这将显著地提升交互速度。下表显示了Spark中各种选项。

    Storage Level Purpose
    MEMORY_ONLY (Default level) This option stores RDD in available cluster memory as deserialized Java objects. Some partitions may not be cached if there is not enough cluster memory. Those partitions will be recalculated on the fly as needed.
    MEMORY_AND_DISK This option stores RDD as deserialized Java objects. If RDD does not fit in cluster memory, then store those partitions on the disk and read them as needed.
    MEMORY_ONLY_SER This options stores RDD as serialized Java objects (One byte array per partition). This is more CPU intensive but saves memory as it is more space efficient. Some partitions may not be cached. Those will be recalculated on the fly as needed.
    MEMORY_ONLY_DISK_SER This option is same as above except that disk is used when memory is not sufficient.
    DISC_ONLY This option stores the RDD only on the disk
    MEMORY_ONLY_2, MEMORY_AND_DISK_2, etc. Same as other levels but partitions are replicated on 2 slave nodes

    上面的存储等级可以通过RDD. cache()操作上的 persist()操作访问,可以方便地指定MEMORY_ONLY选项。关于持久化等级的更多信息,可以访问这里http://spark.apache.org/docs/latest/programming-guide.html#rdd-persistence。

    Spark使用Least Recently Used (LRU)算法来移除缓存中旧的、不常用的RDD,从而释放出更多可用内存。同样还提供了一个unpersist() 操作来强制移除缓存/持久化的RDD。

    七、变量共享

    Accumulators。Spark提供了一个非常便捷地途径来避免可变的计数器和计数器同步问题——Accumulators。Accumulators在一个Spark context中通过默认值初始化,这些计数器在Slaves节点上可用,但是Slaves节点不能对其进行读取。它们的作用就是来获取原子更新,并将其转发到Master。Master是唯一可以读取和计算所有更新合集的节点。举个例子:

    akuntamukkala@localhost~/temp$ cat output.log
    error
    warning
    info
    trace
    error
    info
    info
    scala> val nErrors=sc.accumulator(0.0)
    scala> val logs = sc.textFile(“/Users/akuntamukkala/temp/output.log”)
    scala> logs.filter(_.contains(“error”)).foreach(x=>nErrors+=1)
    scala> nErrors.value
    Result:Int = 2

    Broadcast Variables。实际生产中,通过指定key在RDDs上对数据进行合并的场景非常常见。在这种情况下,很可能会出现给slave nodes发送大体积数据集的情况,让其负责托管需要做join的数据。因此,这里很可能存在巨大的性能瓶颈,因为网络IO比内存访问速度慢100倍。为了解决这个问题,Spark提供了Broadcast Variables,如其名称一样,它会向slave nodes进行广播。因此,节点上的RDD操作可以快速访问Broadcast Variables值。举个例子,期望计算一个文件中所有路线项的运输成本。通过一个look-up table指定每种运输类型的成本,这个look-up table就可以作为Broadcast Variables。

    akuntamukkala@localhost~/temp$ cat packagesToShip.txt ground
    express
    media
    priority
    priority
    ground
    express
    media
    scala> val map = sc.parallelize(Seq((“ground”,1),(“med”,2), (“priority”,5),(“express”,10))).collect().toMap
    map: scala.collection.immutable.Map[String,Int] = Map(ground -> 1, media -> 2, priority -> 5, express -> 10)
    scala> val bcMailRates = sc.broadcast(map)

    上述命令中,我们建立了一个broadcast variable,基于服务类别成本的map。

    scala> val pts = sc.textFile(“/Users/akuntamukkala/temp/packagesToShip.txt”)

    在上述命令中,我们通过broadcast variable的mailing rates来计算运输成本。

    scala> pts.map(shipType=>(shipType,1)).reduceByKey(_+_). map{case (shipType,nPackages)=>(shipType,nPackages*bcMailRates. value(shipType))}.collect()

    通过上述命令,我们使用accumulator来累加所有运输的成本。详细信息可通过下面的PDF查看http://ampcamp.berkeley.edu/wp-content/uploads/2012/06/matei-zaharia-amp-camp-2012-advanced-spark.pdf。

    八、Spark SQL

    通过Spark Engine,Spark SQL提供了一个便捷的途径来进行交互式分析,使用一个被称为SchemaRDD类型的RDD。SchemaRDD可以通过已有RDDs建立,或者其他外部数据格式,比如Parquet files、JSON数据,或者在Hive上运行HQL。SchemaRDD非常类似于RDBMS中的表格。一旦数据被导入SchemaRDD,Spark引擎就可以对它进行批或流处理。Spark SQL提供了两种类型的Contexts——SQLContext和HiveContext,扩展了SparkContext的功能。

    SparkContext提供了到简单SQL parser的访问,而HiveContext则提供了到HiveQL parser的访问。HiveContext允许企业利用已有的Hive基础设施。

    这里看一个简单的SQLContext示例。

    下面文本中的用户数据通过“|”来分割。

    John Smith|38|M|201 East Heading Way #2203,Irving, TX,75063 Liana Dole|22|F|1023 West Feeder Rd, Plano,TX,75093 Craig Wolf|34|M|75942 Border Trail,Fort Worth,TX,75108 John Ledger|28|M|203 Galaxy Way,Paris, TX,75461 Joe Graham|40|M|5023 Silicon Rd,London,TX,76854

    定义Scala case class来表示每一行:

    case class Customer(name:String,age:Int,gender:String,address: String)

    下面的代码片段体现了如何使用SparkContext来建立SQLContext,读取输入文件,将每一行都转换成SparkContext中的一条记录,并通过简单的SQL语句来查询30岁以下的男性用户。

    val sparkConf = new SparkConf().setAppName(“Customers”)
    val sc = new SparkContext(sparkConf)
    val sqlContext = new SQLContext(sc)
    val r = sc.textFile(“/Users/akuntamukkala/temp/customers.txt”) val records = r.map(_.split(‘|’))
    val c = records.map(r=>Customer(r(0),r(1).trim.toInt,r(2),r(3))) c.registerAsTable(“customers”)
    sqlContext.sql(“select * from customers where gender=’M’ and age <
                30”).collect().foreach(println) Result:[John Ledger,28,M,203 Galaxy Way,Paris,
                TX,75461]

    更多使用SQL和HiveQL的示例请访问下面链接https://spark.apache.org/docs/latest/sql-programming-guide.html、https://databricks-training.s3.amazonaws.com/data-exploration-using-spark-sql.html。


    九、Spark Streaming

    Spark Streaming提供了一个可扩展、容错、高效的途径来处理流数据,同时还利用了Spark的简易编程模型。从真正意义上讲,Spark Streaming会将流数据转换成micro batches,从而将Spark批处理编程模型应用到流用例中。这种统一的编程模型让Spark可以很好地整合批量处理和交互式流分析。下图显示了Spark Streaming可以从不同数据源中读取数据进行分析。


    Spark Streaming中的核心抽象是Discretized Stream(DStream)。DStream由一组RDD组成,每个RDD都包含了规定时间(可配置)流入的数据。图12很好地展示了Spark Streaming如何通过将流入数据转换成一系列的RDDs,再转换成DStream。每个RDD都包含两秒(设定的区间长度)的数据。在Spark Streaming中,最小长度可以设置为0.5秒,因此处理延时可以达到1秒以下。

    Spark Streaming同样提供了 window operators,它有助于更有效率在一组RDD( a rolling window of time)上进行计算。同时,DStream还提供了一个API,其操作符(transformations和output operators)可以帮助用户直接操作RDD。下面不妨看向包含在Spark Streaming下载中的一个简单示例。示例是在Twitter流中找出趋势hashtags,详见下面代码。

    spark-1.0.1/examples/src/main/scala/org/apache/spark/examples/streaming/TwitterPopularTags.scala
    val sparkConf = new SparkConf().setAppName(“TwitterPopularTags”)
    val ssc = new StreamingContext(sparkConf, Seconds(2))
    val stream = TwitterUtils.createStream(ssc, None, filters)

    上述代码用于建立Spark Streaming Context。Spark Streaming将在DStream中建立一个RDD,包含了每2秒流入的tweets。

    val hashTags = stream.flatMap(status => status.getText.split(“ “).filter(_.startsWith(“#”)))

    上述代码片段将Tweet转换成一组words,并过滤出所有以a#开头的。

    val topCounts60 = hashTags.map((_, 1)).reduceByKeyAndWindow(_ + _, Seconds(60)).map{case (topic, count) => (count, topic)}. transform(_.sortByKey(false))

    上述代码展示了如何整合计算60秒内一个hashtag流入的总次数。

    topCounts60.foreachRDD(rdd => {
    val topList = rdd.take(10)
    println(“\nPopular topics in last 60 seconds (%s
    total):”.format(rdd.count())) topList.foreach{case (count, tag) => println(“%s (%s
    tweets)”.format(tag, count))} })

    上面代码将找出top 10趋势tweets,然后将其打印。

    ssc.start()

    上述代码让Spark Streaming Context 开始检索tweets。一起聚焦一些常用操作,假设我们正在从一个socket中读入流文本。

    al lines = ssc.socketTextStream(“localhost”, 9999, StorageLevel.MEMORY_AND_DISK_SER)


    更多operators请访问http://spark.apache.org/docs/latest/streaming-programming-guide.html#transformations

    Spark Streaming拥有大量强大的output operators,比如上文提到的 foreachRDD(),了解更多可访问   http://spark.apache.org/docs/latest/streaming-programming-guide.html#output-operations

    十、附加学习资源

    展开全文
  • Spark

    万次阅读 2017-11-14 13:44:49
    应用场景 搭建部署了hadoop环境后,使用MapReduce来进行计算,速度非常慢,因为MapReduce只是分布式批量计算,用于跑批的场景,并不追求速率,因为它需要频繁读写...因为Spark是内存计算,它把计算的中间结果存到了内

    一、运行模式

    spark是基于内存计算的计算框架,性能很强悍,但是它支持单机模式,同时也支持集群模式,它的运行模式有好多种,为了不混淆方便区分,这里进行一些总结。网上总结了,多数为三种,四种,其实真实要细分,spark有六种运行模式,这里给出区分。

    1. local模式【单机】

    Local模式又称为本地模式,运行该模式非常简单,只需要把Spark的安装包解压后,改一些常用的配置即可使用,而不用启动Spark的Master、Worker守护进程( 只有集群的Standalone方式时,才需要这两个角色),也不用启动Hadoop的各服务(除非你要用到HDFS),这是和其他模式的区别。

    运行实例

     # ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master local lib/spark-examples-1.0.0-hadoop2.2.0.jar 
    
     # 注:看到 --master local 就可以确定是单机的local模式了!
    

    这个SparkSubmit进程又当爹、又当妈,既是客户提交任务的Client进程、又是Spark的driver程序、还充当着Spark执行Task的Executor角色。

    2. 本地伪集群运行模式(单机模拟集群)

    这种运行模式,和Local[N]很像,不同的是,它会在单机启动多个进程来模拟集群下的分布式场景,而不像Local[N]这种多个线程只能在一个进程下委屈求全的共享资源。通常也是用来验证开发出来的应用程序逻辑上有没有问题,或者想使用Spark的计算框架而没有太多资源。

    用法是:提交应用程序时使用local-cluster[x,y,z]参数:x代表要生成的executor数,y和z分别代表每个executor所拥有的core和memory数。

     # spark-submit --master local-cluster[2, 3, 1024]
    
     # 上面这条命令代表会使用2个executor进程,每个进程分配3个core和1G的内存,来运行应用程序。
    

    SparkSubmit依然充当全能角色,又是Client进程,又是driver程序,还有点资源管理的作用。生成的两个CoarseGrainedExecutorBackend

    运行该模式依然非常简单,只需要把Spark的安装包解压后,改一些常用的配置即可使用。而不用启动Spark的Master、Worker守护进程( 只有集群的standalone方式时,才需要这两个角色 ),也不用启动Hadoop的各服务(除非你要用到HDFS),这是和其他模式的区别。

    3. standalone模式【集群】

    和单机运行的模式不同,这里必须在执行应用程序前,先启动Spark的Master和Worker守护进程。不用启动Hadoop服务,除非你用到了HDFS的内容。

    运行实例

     # ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master spark://192.168.123.101:7077 lib/spark-examples-1.0.0-hadoop2.2.0.jar 
    
     # 注:看到 --master spark://IP:7077 就可以确定是standalone模式了!
    

    Master进程做为cluster manager,用来对应用程序申请的资源进行管理;SparkSubmit 做为Client端和运行driver程序;CoarseGrainedExecutorBackend 用来并发执行应用程序;

    Standalone模式是Spark实现的资源调度框架,其主要的节点有Client节点、Master节点和Worker节点。其中Driver既可以运行在Master节点上中,也可以运行在本地Client端。当用spark-shell交互式工具提交Spark的Job时,Driver在Master节点上运行;当使用spark-submit工具提交Job或者在Eclips、IDEA等开发平台上使用”new SparkConf.setManager(“spark://master:7077”)”方式运行Spark任务时,Driver是运行在本地Client端上的。

    运行流程如下:
    1.SparkContext连接到Master,向Master注册并申请资源(CPU Core 和Memory);
    2.Master根据SparkContext的资源申请要求和Worker心跳周期内报告的信息决定在哪个Worker上分配资源,然后在该Worker上获取资源,然后启动StandaloneExecutorBackend;
    3.StandaloneExecutorBackend向SparkContext注册;
    4.SparkContext将Applicaiton代码发送给StandaloneExecutorBackend;并且SparkContext解析Applicaiton代码,构建DAG图,并提交给DAG Scheduler分解成Stage(当碰到Action操作时,就会催生Job;每个Job中含有1个或多个Stage,Stage一般在获取外部数据和shuffle之前产生),然后以Stage(或者称为TaskSet)提交给Task Scheduler,Task Scheduler负责将Task分配到相应的Worker,最后提交给StandaloneExecutorBackend执行;
    5.StandaloneExecutorBackend会建立Executor线程池,开始执行Task,并向SparkContext报告,直至Task完成。
    6.所有Task完成后,SparkContext向Master注销,释放资源。

    4. on yarn client模式【集群】

    现在越来越多的场景,都是Spark跑在Hadoop集群中,所以为了做到资源能够均衡调度,会使用YARN来做为Spark的Cluster Manager,来为Spark的应用程序分配资源。

    运行实例

     # ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client lib/spark-examples-1.0.0-hadoop2.2.0.jar 
    
     # 注:这里执行方式是--master yarn-client
    

    在执行Spark应用程序前,要启动Hadoop的各种服务。由于已经有了资源管理器,所以不需要启动Spark的Master、Worker守护进程。也就是不需要在spark的sbin目录下执行start-all.sh了

    运行流程如下:
    (1).Spark Yarn Client向YARN的ResourceManager申请启动Application Master。同时在SparkContent初始化中将创建DAGScheduler和TASKScheduler等,由于我们选择的是Yarn-Client模式,程序会选择YarnClientClusterScheduler和YarnClientSchedulerBackend;
    (2).ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,与YARN-Cluster区别的是在该ApplicationMaster不运行SparkContext,只与SparkContext进行联系进行资源的分派;
    (3).Client中的SparkContext初始化完毕后,与ApplicationMaster建立通讯,向ResourceManager注册,根据任务信息向ResourceManager申请资源(Container);
    (4).一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向Client中的SparkContext注册并申请Task;
    (5).Client中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向Driver汇报运行的状态和进度,以让Client随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
    (6).应用程序运行完成后,Client的SparkContext向ResourceManager申请注销并关闭自己

    5. on yarn cluster(on-yarn-standalone)模式【集群】

    运行实例

     # ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-cluster lib/spark-examples-1.0.0-hadoop2.2.0.jar 
    
     # 注:这里的执行方式是 --master yarn-cluster
    
    运行模式:
    (1). Spark Yarn Client向YARN中提交应用程序,包括ApplicationMaster程序、启动ApplicationMaster的命令、需要在Executor中运行的程序等;
    (2). ResourceManager收到请求后,在集群中选择一个NodeManager,为该应用程序分配第一个Container,要求它在这个Container中启动应用程序的ApplicationMaster,其中ApplicationMaster进行SparkContext等的初始化;
    (3). ApplicationMaster向ResourceManager注册,这样用户可以直接通过ResourceManage查看应用程序的运行状态,然后它将采用轮询的方式通过RPC协议为各个任务申请资源,并监控它们的运行状态直到运行结束;
    (4). 一旦ApplicationMaster申请到资源(也就是Container)后,便与对应的NodeManager通信,要求它在获得的Container中启动启动CoarseGrainedExecutorBackend,CoarseGrainedExecutorBackend启动后会向ApplicationMaster中的SparkContext注册并申请Task。这一点和Standalone模式一样,只不过SparkContext在Spark Application中初始化时,使用CoarseGrainedSchedulerBackend配合YarnClusterScheduler进行任务的调度,其中YarnClusterScheduler只是对TaskSchedulerImpl的一个简单包装,增加了对Executor的等待逻辑等;
    (5). ApplicationMaster中的SparkContext分配Task给CoarseGrainedExecutorBackend执行,CoarseGrainedExecutorBackend运行Task并向ApplicationMaster汇报运行的状态和进度,以让ApplicationMaster随时掌握各个任务的运行状态,从而可以在任务失败时重新启动任务;
    (6). 应用程序运行完成后,ApplicationMaster向ResourceManager申请注销并关闭自己。

    6. mesos模式【集群】

    上面4、5两种,是基于hadoop的yarn来进行资源管理的,这里是采用mesos来进行资源管理,Mesos是Apache下的开源分布式资源管理框架,它被称为是分布式系统的内核。Mesos最初是由加州大学伯克利分校的AMPLab开发的,后在Twitter得到广泛使用。Apache Mesos是一个通用的集群管理器,起源于 Google 的数据中心资源管理系统Borg。

    Twitter从Google的Borg系统中得到启发,然后就开发一个类似的资源管理系统来帮助他们摆脱可怕的“失败之鲸”。后来他们注意到加州大学伯克利分校AMPLab正在开发的名为Mesos的项目,这个项目的负责人是Ben Hindman,Ben是加州大学伯克利分校的博士研究生。后来Ben Hindman加入了Twitter,负责开发和部署Mesos。现在Mesos管理着Twitter超过30,0000台服务器上的应用部署,“失败之鲸”已成往事。其他公司纷至沓来,也部署了Mesos,比如Airbnb(空中食宿网)、eBay(电子港湾)和Netflix。

    这块接触不多,一般不太采用!

    附件

    Spark Client 和 Spark Cluster的区别

    理解YARN-Client和YARN-Cluster深层次的区别之前先清楚一个概念:Application Master。在YARN中,每个Application实例都有一个ApplicationMaster进程,它是Application启动的第一个容器。它负责和ResourceManager打交道并请求资源,获取资源之后告诉NodeManager为其启动Container。从深层次的含义讲YARN-Cluster和YARN-Client模式的区别其实就是ApplicationMaster进程的区别。
    YARN-Cluster模式下,Driver运行在AM(Application Master)中,它负责向YARN申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉Client,作业会继续在YARN上运行,因而YARN-Cluster模式不适合运行交互类型的作业;
    YARN-Client模式下,Application Master仅仅向YARN请求Executor,Client会和请求的Container通信来调度他们工作,也就是说Client不能离开。

    二、伪分布式

    搭建部署了hadoop环境后,使用MapReduce来进行计算,速度非常慢,因为MapReduce只是分布式批量计算,用于跑批的场景,并不追求速率,因为它需要频繁读写HDFS,并不能实时反馈结果,这种跑批的场景用的还是比较少的。一般客户最想看到的是输入后立马有结果反馈。那此时我们就需要在Hadoop伪分布式集群上部署Spark环境了!因为Spark是内存计算,它把计算的中间结果存到了内存中,不用频繁读取HDFS,做了极大的优化,当然Spark也是今后的潮流,慢慢将取代Hadoop的很多组件,Spark还有一个优势就是,它是天然与Hadoop完美结合的!

    操作步骤

    1. 下载Scala和Spark

    SCALA2.10.4下载地址
    spark1.6.1下载地址

    2. 解压并配置环境变量

    下载解压scala,添加配置环境变量:

     export SCALA_HOME=/opt/scala-2.10.4
     export PATH=$JAVA_HOME/bin$HADOOP_HOME/bin:$HIVE_HOME/bin:$SCALA_HOME/bin:$PATH
    

    下载解压spark,添加配置环境变量:

     export SPARK_HOME=/opt/spark-1.6.1 
     export PATH=$JAVA_HOME/bin:$HADOOP_HOME/bin:$SCALA_HOME/bin:$SPARK_HOME/bin:$HIVE_HOME/bin:$PATH
    

    3. 修改spark-env.sh

    进入Spark的配置文件路径,
     # cd $SPARK_HOME/conf  
    在spark-env.sh文件中添加如下配置: 
     export JAVA_HOME=/opt/jdk1.7.0_79
     export SCALA_HOME=/opt/scala-2.10.4
     export HADOOP_CONF_DIR=/opt/hadoop-2.6.0/etc/hadoop
    

    4. 启动spark:

     # cd /opt/spark-1.6.1
     # ./sbin/start-all.sh
    

    5. 验证

    启动完毕,命令行输入jps,如果有master,worker那么就是启动成功

    浏览器访问:http://192.168.208.110:8080


    # ./bin/spark-shell
    浏览器访问:http://192.168.208.110:4040
    访问spark-shell页面


    # ./bin/spark-sql
    通过spark-sql连接hive,访问hive中的数据


    # ./sbin/start-thriftserver.sh
    # ./bin/beeline
    重要,启动后,可以直接使用hive的程序,即HQL执行时默认用spark来进行内存计算

    三、分布式

    当我们安装好Hadoop分布式集群后,默认底层计算是采用MapReduce,速度比较慢,适用于跑批场景,而Spark可以和hadoop完美的融合,Spark提供了更强劲的计算能力,它基于内存计算,速度快,效率高。虽然Spark也支持单机安装,但是这样就不涉及分布式计算,以及分布式存储,如果我们要用Spark集群,那么就需要分布式的hadoop环境,调用hadoop的分布式文件系统,本篇博文来学习分布式Spark的安装部署!

    操作步骤

    1. Scala2.11.6配置

    1.1 下载Scala2.11.6

    Scala2.11.6下载地址,下载scala2.11.6压缩包,上传到主节点的opt目录下

    1.2 解压缩并更换目录

     # cd /opt/
     # tar -xzvf scala-2.11.6.tgz
     # mv scala-2.11.6 scala2.11.6
    

    1.3 配置环境变量

     # vim /etc/profile
    
    export JAVA_HOME=/opt/jdk1.8
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export PATH=$PATH:$JAVA_HOME/bin
    
    export HADOOP_HOME=/opt/hadoop2.6.0
    export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
    
    export HIVE_HOME=/opt/hive2.1.1
    export HIVE_CONF_DIR=$HIVE_HOME/conf
    export CLASSPATH=.:$HIVE_HOME/lib:$CLASSPATH
    export PATH=$PATH:$HIVE_HOME/bin
    
    export SQOOP_HOME=/opt/sqoop1.4.6
    export PATH=$PATH:$SQOOP_HOME/bin
    
    export ZK_HOME=/opt/zookeeper3.4.10
    export PATH=$PATH:$ZK_HOME/bin
    
    export HBASE_HOME=/opt/hbase1.2.6
    export PATH=$PATH:$HBASE_HOME/bin
    
    export SCALA_HOME=/opt/scala2.11.6
    export PATH=$PATH:$SCALA_HOME/bin
    
    #加上最后两行,关于scala的环境变量配置
    
     # source /etc/profile       #使环境变量配置生效
    

    1.4 验证scala配置

     # scala -version
    

    这里写图片描述

    2. Spark1.6.1配置

    2.1 下载Spark1.6.1

    spark1.6.1下载地址,下载spark1.6.1压缩包,上传到主节点的opt目录下

    2.2 解压缩并更换目录

     # cd /opt
     # tar -xzvf spark-1.6.1-bin-hadoop2.6.tgz
     # mv spark-1.6.1-bin-hadoop2.6 spark1.6.1
    

    2.3 配置环境变量

     # vim /etc/profile
    
    export JAVA_HOME=/opt/jdk1.8
    export CLASSPATH=.:$JAVA_HOME/lib/dt.jar:$JAVA_HOME/lib/tools.jar
    export PATH=$PATH:$JAVA_HOME/bin
    
    export HADOOP_HOME=/opt/hadoop2.6.0
    export PATH=$HADOOP_HOME/bin:$HADOOP_HOME/sbin:$PATH
    
    export HIVE_HOME=/opt/hive2.1.1
    export HIVE_CONF_DIR=$HIVE_HOME/conf
    export CLASSPATH=.:$HIVE_HOME/lib:$CLASSPATH
    export PATH=$PATH:$HIVE_HOME/bin
    
    export SQOOP_HOME=/opt/sqoop1.4.6
    export PATH=$PATH:$SQOOP_HOME/bin
    
    export ZK_HOME=/opt/zookeeper3.4.10
    export PATH=$PATH:$ZK_HOME/bin
    
    export HBASE_HOME=/opt/hbase1.2.6
    export PATH=$PATH:$HBASE_HOME/bin
    
    export SCALA_HOME=/opt/scala2.11.6
    export PATH=$PATH:$SCALA_HOME/bin
    
    export SPARK_HOME=/opt/spark1.6.1
    export PATH=$PATH:$SPARK_HOME/bin
    
    #加上最后两行,关于spark的环境变量配置
    #切记,不要把SPARK_HOME/sbin也配置到PATH中,因为sbin下的命令和hadoop中的sbin下的命令很多相似的,避免冲突,所以执行spark的sbin中的命令,要切换到该目录下再执行
    
     # source /etc/profile       #使环境变量配置生效
    

    3. 修改Spark-env.sh配置文件

     # cd /opt/spark1.6.1/conf/
     # cp spark-env.sh.template   spark-env.sh
     # vim spark-env.sh
     
    export SCALA_HOME=/opt/scala2.11.6
    export JAVA_HOME=/opt/jdk1.8
    export HADOOP_HOME=/opt/hadoop2.6.0
    export HADOOP_CONF_DIR=$HADOOP_HOME/etc/hadoop
    export SPARK_HOME=/opt/spark1.6.1
    export SPARK_MASTER_IP=hadoop0
    export SPARK_EXECUTOR_MEMORY=4G                 #在末尾添加上述配置
    

    4. 修改slaves配置文件

     # cd /opt/spark1.6.1/conf/
     # cp slaves.template slaves
     # vim slaves
     
    hadoop1
    hadoop2              #删除localhost,添加从节点的两个主机名
    

    5. 将主节点的scala2.11.6,spark1.6.1搬到两个从节点上

     # cd /opt
     
     # scp -r scala2.11.6 root@hadoop1:/opt/
     # scp -r scala2.11.6 root@hadoop2:/opt/
     # scp -r spark1.6.1 root@hadoop1:/opt/
     # scp -r spark1.6.1 root@hadoop2:/opt/
    

    并且修改从节点的环境变量!而且使环境变量生效!

    6. 启动并且验证spark

    注:在运行spark之前,必须确保hadoop在运行中,因为spark集群是依托于hadoop的。

      # cd /opt/spark1.6.1/sbin
      # ./start-all.sh
    

    这里写图片描述

    这里写图片描述

    这里写图片描述

    浏览器访问http://192.168.210.70:8080

    这里写图片描述

    四、注意

    安装部署完完全分布式的spark后,发现yarn-cluster模式可以运行不报错,但是yarn-client报错,无法进行计算PI的值,导致spark并不能使用,报错信息如下所示,只需要修改yarn的配置即可!

    操作方案

     # ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client lib/spark-examples-1.6.1-hadoop2.6.0.jar
    

    1. 报错信息:

    [root@hadoop0 spark1.6.1]# ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client lib/spark-examples-1.6.1-hadoop2.6.0.jar 
    17/11/16 16:04:59 INFO spark.SparkContext: Running Spark version 1.6.1
    17/11/16 16:05:00 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    17/11/16 16:05:00 INFO spark.SecurityManager: Changing view acls to: root
    17/11/16 16:05:00 INFO spark.SecurityManager: Changing modify acls to: root
    17/11/16 16:05:00 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    17/11/16 16:05:00 INFO util.Utils: Successfully started service 'sparkDriver' on port 56204.
    17/11/16 16:05:00 INFO slf4j.Slf4jLogger: Slf4jLogger started
    17/11/16 16:05:01 INFO Remoting: Starting remoting
    17/11/16 16:05:01 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.210.70:56916]
    17/11/16 16:05:01 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 56916.
    17/11/16 16:05:01 INFO spark.SparkEnv: Registering MapOutputTracker
    17/11/16 16:05:01 INFO spark.SparkEnv: Registering BlockManagerMaster
    17/11/16 16:05:01 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-9e904d0f-0d09-4c9a-b523-86dc52613223
    17/11/16 16:05:01 INFO storage.MemoryStore: MemoryStore started with capacity 511.1 MB
    17/11/16 16:05:01 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    17/11/16 16:05:01 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/11/16 16:05:01 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
    17/11/16 16:05:01 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
    17/11/16 16:05:01 INFO ui.SparkUI: Started SparkUI at http://192.168.210.70:4040
    17/11/16 16:05:01 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-a3aae429-abe4-4bcb-b73e-8fe359aa92d9/httpd-742c21cf-89df-4af2-8b81-431523fe7bfd
    17/11/16 16:05:01 INFO spark.HttpServer: Starting HTTP Server
    17/11/16 16:05:01 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/11/16 16:05:01 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:37440
    17/11/16 16:05:01 INFO util.Utils: Successfully started service 'HTTP file server' on port 37440.
    17/11/16 16:05:01 INFO spark.SparkContext: Added JAR file:/opt/spark1.6.1/lib/spark-examples-1.6.1-hadoop2.6.0.jar at http://192.168.210.70:37440/jars/spark-examples-1.6.1-hadoop2.6.0.jar with timestamp 1510819501618
    17/11/16 16:05:01 INFO client.RMProxy: Connecting to ResourceManager at hadoop0/192.168.210.70:8032
    17/11/16 16:05:01 INFO yarn.Client: Requesting a new application from cluster with 3 NodeManagers
    17/11/16 16:05:01 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (12288 MB per container)
    17/11/16 16:05:01 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
    17/11/16 16:05:01 INFO yarn.Client: Setting up container launch context for our AM
    17/11/16 16:05:01 INFO yarn.Client: Setting up the launch environment for our AM container
    17/11/16 16:05:01 INFO yarn.Client: Preparing resources for our AM container
    17/11/16 16:05:02 INFO yarn.Client: Uploading resource file:/opt/spark1.6.1/lib/spark-assembly-1.6.1-hadoop2.6.0.jar -> hdfs://hadoop0:9000/user/root/.sparkStaging/application_1510653707211_0005/spark-assembly-1.6.1-hadoop2.6.0.jar
    17/11/16 16:05:04 INFO yarn.Client: Uploading resource file:/tmp/spark-a3aae429-abe4-4bcb-b73e-8fe359aa92d9/__spark_conf__7623958375810260855.zip -> hdfs://hadoop0:9000/user/root/.sparkStaging/application_1510653707211_0005/__spark_conf__7623958375810260855.zip
    17/11/16 16:05:04 INFO spark.SecurityManager: Changing view acls to: root
    17/11/16 16:05:04 INFO spark.SecurityManager: Changing modify acls to: root
    17/11/16 16:05:04 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    17/11/16 16:05:04 INFO yarn.Client: Submitting application 5 to ResourceManager
    17/11/16 16:05:04 INFO impl.YarnClientImpl: Submitted application application_1510653707211_0005
    17/11/16 16:05:06 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:06 INFO yarn.Client: 
    	 client token: N/A
    	 diagnostics: N/A
    	 ApplicationMaster host: N/A
    	 ApplicationMaster RPC port: -1
    	 queue: default
    	 start time: 1510819504598
    	 final status: UNDEFINED
    	 tracking URL: http://hadoop0:8088/proxy/application_1510653707211_0005/
    	 user: root
    17/11/16 16:05:07 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:08 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:09 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:10 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:11 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:12 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:13 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:14 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:15 ERROR server.TransportRequestHandler: Error sending result RpcResponse{requestId=89567902
    java.nio.channels.ClosedChannelException
    17/11/16 16:05:15 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:16 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:17 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:18 INFO yarn.Client: Application report for application_1510653707211_0005 (state: ACCEPTED)
    17/11/16 16:05:18 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as 
    17/11/16 16:05:18 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.07211_0005
    17/11/16 16:05:18 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFi
    17/11/16 16:05:19 INFO yarn.Client: Application report for application_1510653707211_0005 (state: RUNNING)
    17/11/16 16:05:19 INFO yarn.Client: 
    	 client token: N/A
    	 diagnostics: N/A
    	 ApplicationMaster host: 192.168.210.71
    	 ApplicationMaster RPC port: 0
    	 queue: default
    	 start time: 1510819504598
    	 final status: UNDEFINED
    	 tracking URL: http://hadoop0:8088/proxy/application_1510653707211_0005/
    	 user: root
    17/11/16 16:05:19 INFO cluster.YarnClientSchedulerBackend: Application application_1510653707211_0005 has s
    17/11/16 16:05:19 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockT
    17/11/16 16:05:19 INFO netty.NettyBlockTransferService: Server created on 60932
    17/11/16 16:05:19 INFO storage.BlockManagerMaster: Trying to register BlockManager
    17/11/16 16:05:19 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.210.70:60932 w
    17/11/16 16:05:19 INFO storage.BlockManagerMaster: Registered BlockManager
    17/11/16 16:05:22 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (h
    17/11/16 16:05:22 INFO storage.BlockManagerMasterEndpoint: Registering block manager hadoop1:35613 with 2.7
    17/11/16 16:05:22 ERROR cluster.YarnClientSchedulerBackend: Yarn application has already exited with state 
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,nul
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,nul
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
    17/11/16 16:05:22 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
    17/11/16 16:05:22 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (h
    17/11/16 16:05:22 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.210.70:4040
    17/11/16 16:05:22 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
    17/11/16 16:05:22 INFO cluster.YarnClientSchedulerBackend: Asking each executor to shut down
    17/11/16 16:05:22 INFO cluster.YarnClientSchedulerBackend: Stopped
    17/11/16 16:05:22 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    17/11/16 16:05:22 INFO storage.BlockManagerMasterEndpoint: Registering block manager hadoop2:51640 with 2.7
    17/11/16 16:05:22 INFO storage.MemoryStore: MemoryStore cleared
    17/11/16 16:05:22 INFO storage.BlockManager: BlockManager stopped
    17/11/16 16:05:22 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event Spa
    17/11/16 16:05:22 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    17/11/16 16:05:22 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoord
    17/11/16 16:05:22 INFO spark.SparkContext: Successfully stopped SparkContext
    17/11/16 16:05:22 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
    17/11/16 16:05:22 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceedin
    17/11/16 16:05:22 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginni
    17/11/16 16:05:22 ERROR spark.SparkContext: Error initializing SparkContext.
    java.lang.NullPointerException
    	at org.apache.spark.SparkContext.<init>(SparkContext.scala:584)
    	at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:29)
    	at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.sc
    	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    17/11/16 16:05:22 INFO spark.SparkContext: SparkContext already stopped.
    Exception in thread "main" java.lang.NullPointerException
    	at org.apache.spark.SparkContext.<init>(SparkContext.scala:584)
    	at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:29)
    	at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.sc
    	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    17/11/16 16:05:22 INFO util.ShutdownHookManager: Shutdown hook called
    17/11/16 16:05:22 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a3aae429-abe4-4bcb-b73e-8fe3
    17/11/16 16:05:22 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remoting shut down.
    17/11/16 16:05:22 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-a3aae429-abe4-4bcb-b73e-8fe
    

    2. 修改yarn-site.xml配置添加最后两个配置:

     # vim /opt/hadoop2.6.0/etc/hadoop/yarn-site.xml 
     
    <property>
            <name>yarn.resourcemanager.hostname</name>
            <value>hadoop0</value>
       <property>
            <description>The http address of the RM web application.</description>
            <name>yarn.resourcemanager.webapp.address</name>
            <value>${yarn.resourcemanager.hostname}:8088</value>
            <name>yarn.resourcemanager.resource-tracker.address</name>
            <value>${yarn.resourcemanager.hostname}:8031</value>
       </property>
       <property>
            <description>The address of the RM admin interface.</description>
            <name>yarn.resourcemanager.admin.address</name>
            <value>${yarn.resourcemanager.hostname}:8033</value>
       </property>
       <property>
            <name>yarn.nodemanager.aux-services</name>
            <value>mapreduce_shuffle</value>
       </property>
       <property>
            <name>yarn.scheduler.maximum-allocation-mb</name>
            <value>8182</value>
            <discription>每个节点可用内存,单位MB,默认8182MB</discription>
       </property>
       <property>
                <name>yarn.nodemanager.pmem-check-enabled</name>
                <value>false</value>
        </property>
        <property>
            <name>yarn.nodemanager.vmem-check-enabled</name>
                <value>false</value>
        </property>
    
     # 添加最后两个false的属性即可!
    

    3. 运行成功后的展示:

    [root@hadoop0 spark1.6.1]# ./bin/spark-submit --class org.apache.spark.examples.SparkPi --master yarn-client lib/spark-examples-1.6.1-hadoop2.6.0.jar 
    17/11/20 11:23:30 INFO spark.SparkContext: Running Spark version 1.6.1
    17/11/20 11:23:30 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    17/11/20 11:23:30 INFO spark.SecurityManager: Changing view acls to: root
    17/11/20 11:23:30 INFO spark.SecurityManager: Changing modify acls to: root
    17/11/20 11:23:30 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    17/11/20 11:23:30 INFO util.Utils: Successfully started service 'sparkDriver' on port 50890.
    17/11/20 11:23:31 INFO slf4j.Slf4jLogger: Slf4jLogger started
    17/11/20 11:23:31 INFO Remoting: Starting remoting
    17/11/20 11:23:31 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.210.70:43819]
    17/11/20 11:23:31 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 43819.
    17/11/20 11:23:31 INFO spark.SparkEnv: Registering MapOutputTracker
    17/11/20 11:23:31 INFO spark.SparkEnv: Registering BlockManagerMaster
    17/11/20 11:23:31 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-c0631ca3-48c6-45ed-b1bd-c785e7ed4e52
    17/11/20 11:23:31 INFO storage.MemoryStore: MemoryStore started with capacity 511.1 MB
    17/11/20 11:23:31 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    17/11/20 11:23:31 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/11/20 11:23:31 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
    17/11/20 11:23:31 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
    17/11/20 11:23:31 INFO ui.SparkUI: Started SparkUI at http://192.168.210.70:4040
    17/11/20 11:23:31 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-603bd57a-5f73-41dc-84d0-e732cbd37788/httpd-8b6ce293-389a-4564-bedb-8560a3a924d5
    17/11/20 11:23:31 INFO spark.HttpServer: Starting HTTP Server
    17/11/20 11:23:31 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/11/20 11:23:31 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:44183
    17/11/20 11:23:31 INFO util.Utils: Successfully started service 'HTTP file server' on port 44183.
    17/11/20 11:23:31 INFO spark.SparkContext: Added JAR file:/opt/spark1.6.1/lib/spark-examples-1.6.1-hadoop2.6.0.jar at http://192.168.210.70:44183/jars/spark-examples-1.6.1-hadoop2.6.0.jar with timestamp 1511148211815
    17/11/20 11:23:31 INFO client.RMProxy: Connecting to ResourceManager at hadoop0/192.168.210.70:8032
    17/11/20 11:23:32 INFO yarn.Client: Requesting a new application from cluster with 3 NodeManagers
    17/11/20 11:23:32 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (8192 MB per container)
    17/11/20 11:23:32 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
    17/11/20 11:23:32 INFO yarn.Client: Setting up container launch context for our AM
    17/11/20 11:23:32 INFO yarn.Client: Setting up the launch environment for our AM container
    17/11/20 11:23:32 INFO yarn.Client: Preparing resources for our AM container
    17/11/20 11:23:32 INFO yarn.Client: Uploading resource file:/opt/spark1.6.1/lib/spark-assembly-1.6.1-hadoop2.6.0.jar -> hdfs://hadoop0:9000/user/root/.sparkStaging/application_1511146953298_0003/spark-assembly-1.6.1-hadoop2.6.0.jar
    17/11/20 11:23:33 INFO yarn.Client: Uploading resource file:/tmp/spark-603bd57a-5f73-41dc-84d0-e732cbd37788/__spark_conf__5627219911217194032.zip -> hdfs://hadoop0:9000/user/root/.sparkStaging/application_1511146953298_0003/__spark_conf__5627219911217194032.zip
    17/11/20 11:23:33 INFO spark.SecurityManager: Changing view acls to: root
    17/11/20 11:23:33 INFO spark.SecurityManager: Changing modify acls to: root
    17/11/20 11:23:33 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    17/11/20 11:23:33 INFO yarn.Client: Submitting application 3 to ResourceManager
    17/11/20 11:23:33 INFO impl.YarnClientImpl: Submitted application application_1511146953298_0003
    17/11/20 11:23:34 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:34 INFO yarn.Client: 
    	 client token: N/A
    	 diagnostics: N/A
    	 ApplicationMaster host: N/A
    	 ApplicationMaster RPC port: -1
    	 queue: default
    	 start time: 1511148213962
    	 final status: UNDEFINED
    	 tracking URL: http://hadoop0:8088/proxy/application_1511146953298_0003/
    	 user: root
    17/11/20 11:23:36 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:37 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:38 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:39 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:41 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:42 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:43 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:47 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:56 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:57 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:58 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:23:59 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:24:12 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:24:13 INFO yarn.Client: Application report for application_1511146953298_0003 (state: ACCEPTED)
    17/11/20 11:24:14 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null)
    17/11/20 11:24:14 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> hadoop0, PROXY_URI_BASES -> http://hadoop0:8088/proxy/application_1511146953298_0003), /proxy/application_1511146953298_0003
    17/11/20 11:24:14 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
    17/11/20 11:24:14 INFO yarn.Client: Application report for application_1511146953298_0003 (state: RUNNING)
    17/11/20 11:24:14 INFO yarn.Client: 
    	 client token: N/A
    	 diagnostics: N/A
    	 ApplicationMaster host: 192.168.210.70
    	 ApplicationMaster RPC port: 0
    	 queue: default
    	 start time: 1511148213962
    	 final status: UNDEFINED
    	 tracking URL: http://hadoop0:8088/proxy/application_1511146953298_0003/
    	 user: root
    17/11/20 11:24:14 INFO cluster.YarnClientSchedulerBackend: Application application_1511146953298_0003 has started running.
    17/11/20 11:24:14 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 33528.
    17/11/20 11:24:14 INFO netty.NettyBlockTransferService: Server created on 33528
    17/11/20 11:24:14 INFO storage.BlockManagerMaster: Trying to register BlockManager
    17/11/20 11:24:14 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.210.70:33528 with 511.1 MB RAM, BlockManagerId(driver, 192.168.210.70, 33528)
    17/11/20 11:24:14 INFO storage.BlockManagerMaster: Registered BlockManager
    17/11/20 11:24:15 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after waiting maxRegisteredResourcesWaitingTime: 30000(ms)
    17/11/20 11:24:15 INFO spark.SparkContext: Starting job: reduce at SparkPi.scala:36
    17/11/20 11:24:15 INFO scheduler.DAGScheduler: Got job 0 (reduce at SparkPi.scala:36) with 2 output partitions
    17/11/20 11:24:15 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:36)
    17/11/20 11:24:15 INFO scheduler.DAGScheduler: Parents of final stage: List()
    17/11/20 11:24:15 INFO scheduler.DAGScheduler: Missing parents: List()
    17/11/20 11:24:15 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32), which has no missing parents
    17/11/20 11:24:15 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1904.0 B, free 1904.0 B)
    17/11/20 11:24:15 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1218.0 B, free 3.0 KB)
    17/11/20 11:24:15 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.210.70:33528 (size: 1218.0 B, free: 511.1 MB)
    17/11/20 11:24:15 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
    17/11/20 11:24:15 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32)
    17/11/20 11:24:15 INFO cluster.YarnScheduler: Adding task set 0.0 with 2 tasks
    17/11/20 11:24:26 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (hadoop1:41774) with ID 2
    17/11/20 11:24:26 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, hadoop1, partition 0,PROCESS_LOCAL, 2157 bytes)
    17/11/20 11:24:26 INFO storage.BlockManagerMasterEndpoint: Registering block manager hadoop1:40640 with 1247.3 MB RAM, BlockManagerId(2, hadoop1, 40640)
    17/11/20 11:24:40 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on hadoop1:40640 (size: 1218.0 B, free: 1247.2 MB)
    17/11/20 11:24:40 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, hadoop1, partition 1,PROCESS_LOCAL, 2157 bytes)
    17/11/20 11:24:40 INFO scheduler.TaskSetManager: Finished task 0.0 in stage 0.0 (TID 0) in 14708 ms on hadoop1 (1/2)
    17/11/20 11:24:40 INFO scheduler.TaskSetManager: Finished task 1.0 in stage 0.0 (TID 1) in 34 ms on hadoop1 (2/2)
    17/11/20 11:24:40 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:36) finished in 24.994 s
    17/11/20 11:24:40 INFO cluster.YarnScheduler: Removed TaskSet 0.0, whose tasks have all completed, from pool 
    17/11/20 11:24:40 INFO scheduler.DAGScheduler: Job 0 finished: reduce at SparkPi.scala:36, took 25.166251 s
    Pi is roughly 3.14648
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
    17/11/20 11:24:40 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
    17/11/20 11:24:40 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.210.70:4040
    17/11/20 11:24:40 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
    17/11/20 11:24:40 INFO cluster.YarnClientSchedulerBackend: Interrupting monitor thread
    17/11/20 11:24:40 INFO cluster.YarnClientSchedulerBackend: Asking each executor to shut down
    17/11/20 11:24:41 INFO cluster.YarnClientSchedulerBackend: Stopped
    17/11/20 11:24:41 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    17/11/20 11:24:41 INFO storage.MemoryStore: MemoryStore cleared
    17/11/20 11:24:41 INFO storage.BlockManager: BlockManager stopped
    17/11/20 11:24:41 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    17/11/20 11:24:41 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    17/11/20 11:24:41 INFO spark.SparkContext: Successfully stopped SparkContext
    17/11/20 11:24:41 INFO util.ShutdownHookManager: Shutdown hook called
    17/11/20 11:24:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-603bd57a-5f73-41dc-84d0-e732cbd37788
    17/11/20 11:24:41 INFO remote.RemoteActorRefProvider$RemotingTerminator: Shutting down remote daemon.
    17/11/20 11:24:41 INFO remote.RemoteActorRefProvider$RemotingTerminator: Remote daemon shut down; proceeding with flushing remote transports.
    17/11/20 11:24:41 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-603bd57a-5f73-41dc-84d0-e732cbd37788/httpd-8b6ce293-389a-4564-bedb-8560a3a924d5
    

    4. 以下关于RPC的错误也可以通过上述方案,解决:

    17/11/20 10:43:02 INFO spark.SparkContext: Running Spark version 1.6.1
    17/11/20 10:43:02 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
    17/11/20 10:43:02 INFO spark.SecurityManager: Changing view acls to: root
    17/11/20 10:43:02 INFO spark.SecurityManager: Changing modify acls to: root
    17/11/20 10:43:02 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    17/11/20 10:43:03 INFO util.Utils: Successfully started service 'sparkDriver' on port 43103.
    17/11/20 10:43:03 INFO slf4j.Slf4jLogger: Slf4jLogger started
    17/11/20 10:43:03 INFO Remoting: Starting remoting
    17/11/20 10:43:03 INFO Remoting: Remoting started; listening on addresses :[akka.tcp://sparkDriverActorSystem@192.168.210.70:54479]
    17/11/20 10:43:03 INFO util.Utils: Successfully started service 'sparkDriverActorSystem' on port 54479.
    17/11/20 10:43:03 INFO spark.SparkEnv: Registering MapOutputTracker
    17/11/20 10:43:03 INFO spark.SparkEnv: Registering BlockManagerMaster
    17/11/20 10:43:03 INFO storage.DiskBlockManager: Created local directory at /tmp/blockmgr-115d1d9d-efd2-4b77-a275-071bb880d596
    17/11/20 10:43:03 INFO storage.MemoryStore: MemoryStore started with capacity 511.1 MB
    17/11/20 10:43:03 INFO spark.SparkEnv: Registering OutputCommitCoordinator
    17/11/20 10:43:04 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/11/20 10:43:04 INFO server.AbstractConnector: Started SelectChannelConnector@0.0.0.0:4040
    17/11/20 10:43:04 INFO util.Utils: Successfully started service 'SparkUI' on port 4040.
    17/11/20 10:43:04 INFO ui.SparkUI: Started SparkUI at http://192.168.210.70:4040
    17/11/20 10:43:04 INFO spark.HttpFileServer: HTTP File server directory is /tmp/spark-1a7d680b-c815-4c0d-b706-9751f5f1b57a/httpd-e8d7be01-495d-403b-a7a4-1332d9ae2411
    17/11/20 10:43:04 INFO spark.HttpServer: Starting HTTP Server
    17/11/20 10:43:04 INFO server.Server: jetty-8.y.z-SNAPSHOT
    17/11/20 10:43:04 INFO server.AbstractConnector: Started SocketConnector@0.0.0.0:36186
    17/11/20 10:43:04 INFO util.Utils: Successfully started service 'HTTP file server' on port 36186.
    17/11/20 10:43:05 INFO spark.SparkContext: Added JAR file:/opt/spark1.6.1/lib/spark-examples-1.6.1-hadoop2.6.0.jar at http://192.168.210.70:36186/jars/spark-examples-1.6.1-hadoop2.6.0.jar with timestamp 1511145785204
    17/11/20 10:43:05 INFO client.RMProxy: Connecting to ResourceManager at hadoop0/192.168.210.70:8032
    17/11/20 10:43:05 INFO yarn.Client: Requesting a new application from cluster with 3 NodeManagers
    17/11/20 10:43:05 INFO yarn.Client: Verifying our application has not requested more than the maximum memory capability of the cluster (12288 MB per container)
    17/11/20 10:43:05 INFO yarn.Client: Will allocate AM container, with 896 MB memory including 384 MB overhead
    17/11/20 10:43:05 INFO yarn.Client: Setting up container launch context for our AM
    17/11/20 10:43:05 INFO yarn.Client: Setting up the launch environment for our AM container
    17/11/20 10:43:05 INFO yarn.Client: Preparing resources for our AM container
    17/11/20 10:43:06 INFO yarn.Client: Uploading resource file:/opt/spark1.6.1/lib/spark-assembly-1.6.1-hadoop2.6.0.jar -> hdfs://hadoop0:9000/user/root/.sparkStaging/application_1510653707211_0009/spark-assembly-1.6.1-hadoop2.6.0.jar
    17/11/20 10:43:07 INFO yarn.Client: Uploading resource file:/tmp/spark-1a7d680b-c815-4c0d-b706-9751f5f1b57a/__spark_conf__910020831153605384.zip -> hdfs://hadoop0:9000/user/root/.sparkStaging/application_1510653707211_0009/__spark_conf__910020831153605384.zip
    17/11/20 10:43:07 INFO spark.SecurityManager: Changing view acls to: root
    17/11/20 10:43:07 INFO spark.SecurityManager: Changing modify acls to: root
    17/11/20 10:43:07 INFO spark.SecurityManager: SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); users with modify permissions: Set(root)
    17/11/20 10:43:07 INFO yarn.Client: Submitting application 9 to ResourceManager
    17/11/20 10:43:08 INFO impl.YarnClientImpl: Submitted application application_1510653707211_0009
    17/11/20 10:43:10 INFO yarn.Client: Application report for application_1510653707211_0009 (state: ACCEPTED)
    17/11/20 10:43:10 INFO yarn.Client: 
    	 client token: N/A
    	 diagnostics: N/A
    	 ApplicationMaster host: N/A
    	 ApplicationMaster RPC port: -1
    	 queue: default
    	 start time: 1511145788138
    	 final status: UNDEFINED
    	 tracking URL: http://hadoop0:8088/proxy/application_1510653707211_0009/
    	 user: root
    17/11/20 10:43:13 INFO yarn.Client: Application report for application_1510653707211_0009 (state: ACCEPTED)
    17/11/20 10:43:15 INFO yarn.Client: Application report for application_1510653707211_0009 (state: ACCEPTED)
    17/11/20 10:43:16 INFO yarn.Client: Application report for application_1510653707211_0009 (state: ACCEPTED)
    17/11/20 10:43:17 INFO yarn.Client: Application report for application_1510653707211_0009 (state: ACCEPTED)
    17/11/20 10:43:18 INFO yarn.Client: Application report for application_1510653707211_0009 (state: ACCEPTED)
    17/11/20 10:43:18 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null)
    17/11/20 10:43:18 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> hadoop0, PROXY_URI_BASES -> http://hadoop0:8088/proxy/application_1510653707211_0009), /proxy/application_1510653707211_0009
    17/11/20 10:43:18 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
    17/11/20 10:43:19 INFO yarn.Client: Application report for application_1510653707211_0009 (state: RUNNING)
    17/11/20 10:43:19 INFO yarn.Client: 
    	 client token: N/A
    	 diagnostics: N/A
    	 ApplicationMaster host: 192.168.210.70
    	 ApplicationMaster RPC port: 0
    	 queue: default
    	 start time: 1511145788138
    	 final status: UNDEFINED
    	 tracking URL: http://hadoop0:8088/proxy/application_1510653707211_0009/
    	 user: root
    17/11/20 10:43:19 INFO cluster.YarnClientSchedulerBackend: Application application_1510653707211_0009 has started running.
    17/11/20 10:43:19 INFO util.Utils: Successfully started service 'org.apache.spark.network.netty.NettyBlockTransferService' on port 44148.
    17/11/20 10:43:19 INFO netty.NettyBlockTransferService: Server created on 44148
    17/11/20 10:43:19 INFO storage.BlockManagerMaster: Trying to register BlockManager
    17/11/20 10:43:19 INFO storage.BlockManagerMasterEndpoint: Registering block manager 192.168.210.70:44148 with 511.1 MB RAM, BlockManagerId(driver, 192.168.210.70, 44148)
    17/11/20 10:43:19 INFO storage.BlockManagerMaster: Registered BlockManager
    17/11/20 10:43:22 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (hadoop1:45801) with ID 2
    17/11/20 10:43:22 INFO cluster.YarnClientSchedulerBackend: Registered executor NettyRpcEndpointRef(null) (hadoop2:32964) with ID 1
    17/11/20 10:43:22 INFO storage.BlockManagerMasterEndpoint: Registering block manager hadoop1:52352 with 2.7 GB RAM, BlockManagerId(2, hadoop1, 52352)
    17/11/20 10:43:22 INFO storage.BlockManagerMasterEndpoint: Registering block manager hadoop2:45228 with 2.7 GB RAM, BlockManagerId(1, hadoop2, 45228)
    17/11/20 10:43:22 INFO cluster.YarnClientSchedulerBackend: SchedulerBackend is ready for scheduling beginning after reached minRegisteredResourcesRatio: 0.8
    17/11/20 10:43:23 INFO spark.SparkContext: Starting job: reduce at SparkPi.scala:36
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Got job 0 (reduce at SparkPi.scala:36) with 2 output partitions
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Final stage: ResultStage 0 (reduce at SparkPi.scala:36)
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Parents of final stage: List()
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Missing parents: List()
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Submitting ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32), which has no missing parents
    17/11/20 10:43:23 INFO storage.MemoryStore: Block broadcast_0 stored as values in memory (estimated size 1904.0 B, free 1904.0 B)
    17/11/20 10:43:23 INFO storage.MemoryStore: Block broadcast_0_piece0 stored as bytes in memory (estimated size 1218.0 B, free 3.0 KB)
    17/11/20 10:43:23 INFO storage.BlockManagerInfo: Added broadcast_0_piece0 in memory on 192.168.210.70:44148 (size: 1218.0 B, free: 511.1 MB)
    17/11/20 10:43:23 INFO spark.SparkContext: Created broadcast 0 from broadcast at DAGScheduler.scala:1006
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Submitting 2 missing tasks from ResultStage 0 (MapPartitionsRDD[1] at map at SparkPi.scala:32)
    17/11/20 10:43:23 INFO cluster.YarnScheduler: Adding task set 0.0 with 2 tasks
    17/11/20 10:43:23 INFO scheduler.TaskSetManager: Starting task 0.0 in stage 0.0 (TID 0, hadoop2, partition 0,PROCESS_LOCAL, 2157 bytes)
    17/11/20 10:43:23 INFO scheduler.TaskSetManager: Starting task 1.0 in stage 0.0 (TID 1, hadoop1, partition 1,PROCESS_LOCAL, 2157 bytes)
    17/11/20 10:43:23 INFO cluster.YarnClientSchedulerBackend: Disabling executor 1.
    17/11/20 10:43:23 INFO scheduler.DAGScheduler: Executor lost: 1 (epoch 0)
    17/11/20 10:43:23 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 1 from BlockManagerMaster.
    17/11/20 10:43:23 ERROR client.TransportClient: Failed to send RPC 6494801080030835916 to hadoop0/192.168.210.70:55463: java.nio.channels.ClosedChannelException
    java.nio.channels.ClosedChannelException
    17/11/20 10:43:23 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(1, hadoop2, 45228)
    17/11/20 10:43:23 INFO storage.BlockManagerMaster: Removed 1 successfully in removeExecutor
    17/11/20 10:43:23 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to get executor loss reason for executor id 1 at RPC address hadoop2:32964, but got no response. Marking as slave lost.
    java.io.IOException: Failed to send RPC 6494801080030835916 to hadoop0/192.168.210.70:55463: java.nio.channels.ClosedChannelException
    	at org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
    	at org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
    	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
    	at io.netty.util.concurrent.DefaultPromise$LateListeners.run(DefaultPromise.java:845)
    	at io.netty.util.concurrent.DefaultPromise$LateListenerNotifier.run(DefaultPromise.java:873)
    	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: java.nio.channels.ClosedChannelException
    17/11/20 10:43:23 ERROR cluster.YarnScheduler: Lost executor 1 on hadoop2: Slave lost
    17/11/20 10:43:23 WARN scheduler.TaskSetManager: Lost task 0.0 in stage 0.0 (TID 0, hadoop2): ExecutorLostFailure (executor 1 exited caused by one of the running tasks) Reason: Slave lost
    17/11/20 10:43:24 INFO cluster.YarnClientSchedulerBackend: Disabling executor 2.
    17/11/20 10:43:24 INFO scheduler.DAGScheduler: Executor lost: 2 (epoch 1)
    17/11/20 10:43:24 INFO storage.BlockManagerMasterEndpoint: Trying to remove executor 2 from BlockManagerMaster.
    17/11/20 10:43:24 INFO storage.BlockManagerMasterEndpoint: Removing block manager BlockManagerId(2, hadoop1, 52352)
    17/11/20 10:43:24 INFO storage.BlockManagerMaster: Removed 2 successfully in removeExecutor
    17/11/20 10:43:24 ERROR client.TransportClient: Failed to send RPC 6980255577157578925 to hadoop0/192.168.210.70:55463: java.nio.channels.ClosedChannelException
    java.nio.channels.ClosedChannelException
    17/11/20 10:43:24 WARN cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: Attempted to get executor loss reason for executor id 2 at RPC address hadoop1:45801, but got no response. Marking as slave lost.
    java.io.IOException: Failed to send RPC 6980255577157578925 to hadoop0/192.168.210.70:55463: java.nio.channels.ClosedChannelException
    	at org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:239)
    	at org.apache.spark.network.client.TransportClient$3.operationComplete(TransportClient.java:226)
    	at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:680)
    	at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:567)
    	at io.netty.util.concurrent.DefaultPromise.tryFailure(DefaultPromise.java:424)
    	at io.netty.channel.AbstractChannel$AbstractUnsafe.safeSetFailure(AbstractChannel.java:801)
    	at io.netty.channel.AbstractChannel$AbstractUnsafe.write(AbstractChannel.java:699)
    	at io.netty.channel.DefaultChannelPipeline$HeadContext.write(DefaultChannelPipeline.java:1122)
    	at io.netty.channel.AbstractChannelHandlerContext.invokeWrite(AbstractChannelHandlerContext.java:633)
    	at io.netty.channel.AbstractChannelHandlerContext.access$1900(AbstractChannelHandlerContext.java:32)
    	at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.write(AbstractChannelHandlerContext.java:908)
    	at io.netty.channel.AbstractChannelHandlerContext$WriteAndFlushTask.write(AbstractChannelHandlerContext.java:960)
    	at io.netty.channel.AbstractChannelHandlerContext$AbstractWriteTask.run(AbstractChannelHandlerContext.java:893)
    	at io.netty.util.concurrent.SingleThreadEventExecutor.runAllTasks(SingleThreadEventExecutor.java:357)
    	at io.netty.channel.nio.NioEventLoop.run(NioEventLoop.java:357)
    	at io.netty.util.concurrent.SingleThreadEventExecutor$2.run(SingleThreadEventExecutor.java:111)
    	at java.lang.Thread.run(Thread.java:748)
    Caused by: java.nio.channels.ClosedChannelException
    17/11/20 10:43:24 ERROR cluster.YarnScheduler: Lost executor 2 on hadoop1: Slave lost
    17/11/20 10:43:24 WARN scheduler.TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, hadoop1): ExecutorLostFailure (executor 2 exited caused by one of the running tasks) Reason: Slave lost
    17/11/20 10:43:25 INFO cluster.YarnSchedulerBackend$YarnSchedulerEndpoint: ApplicationMaster registered as NettyRpcEndpointRef(null)
    17/11/20 10:43:25 INFO cluster.YarnClientSchedulerBackend: Add WebUI Filter. org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter, Map(PROXY_HOSTS -> hadoop0, PROXY_URI_BASES -> http://hadoop0:8088/proxy/application_1510653707211_0009), /proxy/application_1510653707211_0009
    17/11/20 10:43:25 INFO ui.JettyUtils: Adding filter: org.apache.hadoop.yarn.server.webproxy.amfilter.AmIpFilter
    17/11/20 10:43:29 ERROR cluster.YarnClientSchedulerBackend: Yarn application has already exited with state FINISHED!
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/metrics/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/kill,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/api,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/static,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/threadDump,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/executors,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/environment,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/rdd,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/storage,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/pool,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/stage,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/stages,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/job,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs/json,null}
    17/11/20 10:43:29 INFO handler.ContextHandler: stopped o.s.j.s.ServletContextHandler{/jobs,null}
    17/11/20 10:43:29 INFO ui.SparkUI: Stopped Spark web UI at http://192.168.210.70:4040
    17/11/20 10:43:29 INFO scheduler.DAGScheduler: ResultStage 0 (reduce at SparkPi.scala:36) failed in 5.680 s
    17/11/20 10:43:29 INFO scheduler.DAGScheduler: Job 0 failed: reduce at SparkPi.scala:36, took 5.884625 s
    17/11/20 10:43:29 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerStageCompleted(org.apache.spark.scheduler.StageInfo@29bb1d25)
    Exception in thread "main" org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down
    	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:806)
    	at org.apache.spark.scheduler.DAGScheduler$$anonfun$cleanUpAfterSchedulerStop$1.apply(DAGScheduler.scala:804)
    	at scala.collection.mutable.HashSet.foreach(HashSet.scala:79)
    	at org.apache.spark.scheduler.DAGScheduler.cleanUpAfterSchedulerStop(DAGScheduler.scala:804)
    	at org.apache.spark.scheduler.DAGSchedulerEventProcessLoop.onStop(DAGScheduler.scala:1658)
    	at org.apache.spark.util.EventLoop.stop(EventLoop.scala:84)
    	at org.apache.spark.scheduler.DAGScheduler.stop(DAGScheduler.scala:1581)
    	at org.apache.spark.SparkContext$$anonfun$stop$9.apply$mcV$sp(SparkContext.scala:1740)
    	at org.apache.spark.util.Utils$.tryLogNonFatalError(Utils.scala:1229)
    	at org.apache.spark.SparkContext.stop(SparkContext.scala:1739)
    	at org.apache.spark.scheduler.cluster.YarnClientSchedulerBackend$MonitorThread.run(YarnClientSchedulerBackend.scala:147)
    	at org.apache.spark.scheduler.DAGScheduler.runJob(DAGScheduler.scala:620)
    	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1832)
    	at org.apache.spark.SparkContext.runJob(SparkContext.scala:1952)
    	at org.apache.spark.rdd.RDD$$anonfun$reduce$1.apply(RDD.scala:1025)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:150)
    	at org.apache.spark.rdd.RDDOperationScope$.withScope(RDDOperationScope.scala:111)
    	at org.apache.spark.rdd.RDD.withScope(RDD.scala:316)
    	at org.apache.spark.rdd.RDD.reduce(RDD.scala:1007)
    	at org.apache.spark.examples.SparkPi$.main(SparkPi.scala:36)
    	at org.apache.spark.examples.SparkPi.main(SparkPi.scala)
    	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
    	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
    	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
    	at java.lang.reflect.Method.invoke(Method.java:498)
    	at org.apache.spark.deploy.SparkSubmit$.org$apache$spark$deploy$SparkSubmit$$runMain(SparkSubmit.scala:731)
    	at org.apache.spark.deploy.SparkSubmit$.doRunMain$1(SparkSubmit.scala:181)
    	at org.apache.spark.deploy.SparkSubmit$.submit(SparkSubmit.scala:206)
    	at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:121)
    	at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala)
    17/11/20 10:43:29 ERROR scheduler.LiveListenerBus: SparkListenerBus has already stopped! Dropping event SparkListenerJobEnd(0,1511145809112,JobFailed(org.apache.spark.SparkException: Job 0 cancelled because SparkContext was shut down))
    17/11/20 10:43:29 INFO cluster.YarnClientSchedulerBackend: Shutting down all executors
    17/11/20 10:43:29 INFO cluster.YarnClientSchedulerBackend: Asking each executor to shut down
    17/11/20 10:43:29 INFO storage.DiskBlockManager: Shutdown hook called
    17/11/20 10:43:29 INFO cluster.YarnClientSchedulerBackend: Stopped
    17/11/20 10:43:29 INFO util.ShutdownHookManager: Shutdown hook called
    17/11/20 10:43:29 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-1a7d680b-c815-4c0d-b706-9751f5f1b57a/httpd-e8d7be01-495d-403b-a7a4-1332d9ae2411
    17/11/20 10:43:29 INFO spark.MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
    17/11/20 10:43:29 INFO storage.MemoryStore: MemoryStore cleared
    17/11/20 10:43:29 INFO storage.BlockManager: BlockManager stopped
    17/11/20 10:43:29 INFO storage.BlockManagerMaster: BlockManagerMaster stopped
    17/11/20 10:43:29 INFO scheduler.OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
    17/11/20 10:43:29 INFO spark.SparkContext: Successfully stopped SparkContext
    17/11/20 10:43:29 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-1a7d680b-c815-4c0d-b706-9751f5f1b57a/userFiles-e796cf1a-3942-44d1-a8cc-68295e623b03
    17/11/20 10:43:29 INFO util.ShutdownHookManager: Deleting directory /tmp/spark-1a7d680b-c815-4c0d-b706-9751f5f1b57a
    
    展开全文
  • spark直接写入hive表>

    千次阅读 2017-09-15 14:35:47
    import org.apache.spark.rdd.RDD import org.apache.spark.sql.hive.HiveContext import org.apache.spark.sql.{DataFrame, SQLContext, SaveMode} import org.apache.spark.{SparkConf, SparkContext}object Main ...
  • sparkSpark 入门到精通

    万次阅读 2019-09-29 09:29:12
    Spark 修炼之道(进阶篇)——Spark 入门到精通:第一节 Spark 1.5.0 集群搭建【点击打开】 Spark 修炼之道(进阶篇)——Spark 入门到精通:第二节 Hadoop、Spark 生成圈简介【点击打开】 Spark 修炼之道(进阶篇...
  • 1 Spark机器学习 spark MLlib 入门

    万次阅读 2018-09-17 10:59:14
    开始学习spark ml了,都知道spark是继hadoop后的大数据利器,很多人都在使用spark的分布式并行来处理大数据。spark中也提供了机器学习的包,就是MLlib。 MLlib中也包含了大部分常用的算法,分类、回归、聚类等等,...
  • Spark入门实战指南——Spark SQL入门

    千次阅读 2016-09-19 15:23:40
    Spark SQL对SQL语句的处理,首先会将SQL语句进行解析(Parse),然后形成一个Tree,在后续的如绑定、优化等处理过程都是对Tree的操作,而操作的方法是采用Rule,通过模式匹配,对不同类型的节点采用不同的操作。...
  • Spark 入门篇之spark&spark sql

    千次阅读 2016-02-17 20:40:22
    Spark 入门篇 1 概述  Spark是一个通用的快速的大数据处理引擎,是类似于hadoop的map reduce大数据并行处理引擎。它的数据源可以是hdfs、cassandra、hbase等,除常规编程模式外,它还是支持sql使用方式。Spark...
  • Spark入门-什么是Spark

    千次阅读 2016-07-16 21:13:40
    ·spark认识 Spark使用Scala语言进行实现,它是一种面向对象、函数式编程语言,能够像操作本地集合对象一样轻松地操作分布式数据集,在Spark官网上介绍,它具有运行速度快、易用性好、通用性强和随处运行等特点。 ...
  • Spark SQL入门基础

    千次阅读 2018-06-27 18:05:34
    Spark SQL简介 从Shark说起 Shark即hive on Spark,为了实现与Hive兼容,Shark在HiveQL方面重用了Hive中HiveQL的解析、逻辑执行计划、翻译执行计划优化等逻辑,可以近似认为将物理执行计划从MapReduce作业...
  • Spark入门详解

    万次阅读 2018-08-16 15:05:04
    Spark概述 1 11 什么是Spark 2 Spark特点 3 Spark的用户和用途 二 Spark集群安装 1 集群角色 2 机器准备 3 下载Spark安装包 4 配置SparkStandalone 5 配置Job History ServerStandalone 6 ...
  • Spark入门实战系列--4.Spark运行架构

    万次阅读 2016-01-25 14:20:18
    1、 Spark运行架构 1.1 术语定义 lApplication:Spark Application的概念和Hadoop MapReduce中的类似,指的是用户编写的Spark应用程序,包含了一个Driver 功能的代码和分布在集群中多个节点上运行的Executor...
  • spark入门之spark Driver Web UI

    千次阅读 2018-01-24 11:26:09
    *注:本文为本人结合网上资料翻译 Apache Spark 2.x for Java developers 一书而来,仅作个人学习研究之用,支持转载,但务必注明出处。 一、概述 本节将提供Spark driver‘s UI的一些重要方面。 我们将...
  • Spark快速入门指南(Quick Start Spark)

    千次阅读 2014-06-20 10:23:22
    作者:过往记忆 | 新浪微博:左手牵右手TEL | 可以转载, 但必须以超链接形式标明文章原始出处和作者信息及版权声明博客地址:http://www.iteblog.com/文章标题:《Spark快速入门指南(Quick Start Spark)》本文...
  • Spark初级入门视频教程,该课程主要包括RDD的依赖关系、RDD的生命周期、RDD transformation操作。详细讲解transformation函数的使用,方法。
  • 什么是Spark Streaming Spark Streaming类似于Apache Storm,用于流式数据的处理。根据其官方文档介绍,Spark Streaming有高吞吐量和容错能力强这两个特点。Spark Streaming支持的数据输入源很多,例如:Kafka、...
  • 1、简介 1.1 Spark简介 ...Spark在2013年6月进入Apache成为孵化项目,8个月后成为Apache顶级项目,速度之快足见过人之处,Spark以其先进的设计理念,迅速成为社区的热门项目,围绕着Spark推出了Spark SQL、S
  • Spark入门学习记录之SparkLearning

    千次阅读 2016-05-17 22:34:00
    为了更好的学习spark,也为了记录自己学习过程中的遇到的各种问题,方便以后查询,故谢了相关博客,也公开了代码和数据,代码基本都可以本地运行。 总目录: SparkLearning博客:...
  • Spark入门 - 常用Spark监控Tab

    千次阅读 2016-11-16 22:57:24
    常用Spark监控Tab最近用Spark做任务,中间来回配置集群环境,查看配置后的效果,以及监测程序运行过程中的运行进度等,需要频繁查看WebUI的几个Tab。各个tab功能不一,从不同方面显示了Spark的各方面性能参数和运行...
  • 1、编译Spark Spark可以通过SBT和Maven两种方式进行编译,再通过make-distribution.sh脚本生成部署包。SBT编译需要安装git工具,而Maven安装则需要maven工具,两种方式均需要在联网下进行,通过比较发现SBT编译速度...
  • Spark入门 - 3 测试Spark集群

    千次阅读 2016-07-26 16:36:59
    这是Spark入门的第三篇,也是最后一篇。说是入门,其实就只是简单的根据两本Spark书本,结合网络资料,自己一边动手实现,一边做的记录。自己看的时候确实方便许多,重新搭建一遍Hadoop-2.6.4的时候,查看前两篇记录...
  • SparkSpark 编程模型及快速入门

    千次阅读 2016-11-25 10:25:08
    http://blog.csdn.net/pipisorry/article/details/52366356Spark编程模型SparkContext类和SparkConf类我们可通过如下方式调用 ...val sc = new SparkContext("local[4]", "Test Spark App")这段代码会创建一

空空如也

1 2 3 4 5 ... 20
收藏数 161,409
精华内容 64,563
关键字:

spark