spark 订阅
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。 展开全文
Apache Spark 是专为大规模数据处理而设计的快速通用的计算引擎。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是——Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。Spark 是一种与 Hadoop 相似的开源集群计算环境,但是两者之间还存在一些不同之处,这些有用的不同之处使 Spark 在某些工作负载方面表现得更加优越,换句话说,Spark 启用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。Spark 是在 Scala 语言中实现的,它将 Scala 用作其应用程序框架。与 Hadoop 不同,Spark 和 Scala 能够紧密集成,其中的 Scala 可以像操作本地集合对象一样轻松地操作分布式数据集。尽管创建 Spark 是为了支持分布式数据集上的迭代作业,但是实际上它是对 Hadoop 的补充,可以在 Hadoop 文件系统中并行运行。通过名为 Mesos 的第三方集群框架可以支持此行为。Spark 由加州大学伯克利分校 AMP 实验室 (Algorithms, Machines, and People Lab) 开发,可用来构建大型的、低延迟的数据分析应用程序。
信息
基    于
MapReduce算法实现的分布式计算
最新版本
2.4.0
外文名
Spark
SPARK基本介绍
Apache Spark是专为大规模数据处理而设计的快速通用的计算引擎 [1]  。现在形成一个高速发展应用广泛的生态系统。
收起全文
精华内容
下载资源
问答
  • Spark

    千次阅读 2019-05-30 23:09:22
    Spark core为核心,提供了Spark SQL、Spark Streaming、MLlib几大功能组件 中文文档:https://spark.apachecn.org/#/ github地址:https://github.com/apache/spark Spark Core Spark提供了多种资源调度框架,基于...

    简介

    Spark是使用Scala语言编写、基于内存运算的大数据计算框架。

    以Spark core为核心,提供了Spark SQL、Spark Streaming、MLlib几大功能组件

    中文文档:https://spark.apachecn.org/#/

    github地址:https://github.com/apache/spark

    Spark Core

    Spark提供了多种资源调度框架,基于内存计算、提供了DAG的执行流程管理以及RDD的血缘关系来保证计算的快速和高容错性。RDD是Spark的核心概念

    Spark SQL

    SparkSQL基于Spark Core来优化sql查询,将sql的查询转为对应的RDD(DateFrame),并进行优化,简化了开发,提高了数据清洗的效率

    Spark Streaming

    SparkStreaming是基于SparkCore实现的流处理框架,通过微批的概念实现了流处理(DStream),可以将数据的延迟保证为最少500ms,是一个高吞吐高容错的流式处理框架。


    从今天开始写一些关于Spark的文章,让自己每天都有所收获。

    不只要作为使用者,更要去了解它。

    展开全文
  • Spark学习笔记:Spark基础

    千次阅读 多人点赞 2018-09-03 23:39:57
    Spark基础以及WordCount实现

    目录

     

    Spark基础

    1.Spark基础入门

    (1)什么是Spark

    (2)Spark生态圈

    (3)Spark的特点与MapReduce对比

    2.Spark体系结构与安装部署

    (1)Spark体系结构

    (2)Spark的安装与部署

    (3)Spark HA的实现

    3.执行Spark Demo

    (1)Spark-submit

    (2)Spark-shell

    (3)Spark实现WordCount

    (4)Spark WordCount的Java版本

    (5)Spark WordCount的Scala版本

    4.Spark运行机制及原理分析


    Spark基础

    1.Spark基础入门

    (1)什么是Spark

    Spark是用于大规模数据处理的统一分析引擎

    (2)Spark生态圈

               Spark Core:内核
               Spark SQL:用于处理结构化数据的组件,类似Hive
               Spark Streaming:用于处理流式数据的组件,类似Storm
               Spark MLLib:机器学习
               Spark Graphx: 图计算

    (3)Spark的特点与MapReduce对比

    Spark的特点
    1.基于内存,所以速度快,但同时也是缺点,因为Spark没有对内存进行管理,容易OOM(out of memory内存溢出),可以用Java Heap Dump对内存溢出问题进行分析
    2.可以使用Scala、Java、Python、R等语言进行开发
    3.兼容Hadoop

    Spark与MapReuce对比
    1.MapReduce最大的缺点,Shuffle过程中会有很多I/O开销,可以看到这里有6个地方会产生IO,而Spark只会在1和6的地方产生I/O,其他的过程都在内存中进行

    2.Spark是MapReduce的替代方案,兼容Hive、HDFS、融入到Hadoop

    2.Spark体系结构与安装部署

    (1)Spark体系结构

    1.主从架构:存在单点故障的问题,因此需要实现HA
    2.Spark体系结构图

    Driver Program可以理解为是客户端,而右边的可以理解为服务器端。 Cluster Manager是主节点,主节点并不负责真正任务的执行,任务的执行由Worker Node完成。
    这是一张更详细的架构图

    如果要搭建全分布模式,至少需要两个worker
    要实现HA的话,则必须借助ZooKeeper

    (2)Spark的安装与部署

    Spark伪分布模式的部署

    解压 tar -zxvf spark-2.2.0-bin-hadoop2.6.tgz -C ~/training/
    注意:由于Hadoop和Spark的脚本有冲突,设置环境变量的时候,只能设置一个
    核心配置文件:  conf/spark-env.sh
             export JAVA_HOME=/root/training/jdk1.8.0_144
             export SPARK_MASTER_HOST=bigdata111
             export SPARK_MASTER_PORT=7077      
    从节点的地址:slaves文件中填入主机名即可,注意hosts文件里要有对ip的解析
    启动Spark集群  sbin/start-all.sh,这里我个人是给这个文件做了一个软链接start-spark.sh,因为hadoop下的启动脚本也是start-all.sh,会有冲突
    Web界面:主机名:8080

    Spark全分布模式的部署

    全分布式的部署与伪分布式类似,在每个节点上都解压压缩包,修改conf/spark-env.sh
    在主节点上的slaves文件中填入从节点的主机名
    然后在每个节点上启动集群即可

    (3)Spark HA的实现

    1.基于文件系统的单点恢复

    适用于开发和测试环境                                                                                                                                                      
    恢复目录:保存集群的运行信息                                                                                                                                               
    在spark-env.sh中 增加                                                                                                                                          

    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=FILESYSTEM -Dspark.deploy.recoveryDirectory=/root/training/spark-2.2.0-bin-hadoop2.6/recovery"

    如果有运行的任务,任务信息就会被写入到恢复目录下
    当节点宕掉重启之后, Spark就可以从恢复目录中的文件获取之前的状态并进行恢复

    2.基于zookeeper实现Standby Master

    zookeeper的功能:数据同步、选举的功能、分布式锁(秒杀)
    启动zookeeper,运行zkServer.sh,然后会选举出zookeeper集群的leader和follower,节点状态可以通过zkServer.sh status查看

    zookeeper数据同步功能
    启动zookeeper后,在随意一个节点的zkCli.sh(即zk shell)中输入create /node001 helloworld
    在其他节点的shell中get /node001都可以看得见这个虚数据helloworld

    zookeeper选举功能
    每个zookeeper集群都会有一个leader,其他都是follower,当leader节点宕机了,其他的follower会选举出leader

    zookeeper实现Standby Master
    在spark-env.sh中增加

    export SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=bigdata112:2181,bigdata113:2181,bigdata114:2181 -Dspark.deploy.zookeeper.dir=/mysparkHA"

    其中Dspark.deploy.zookeeper.url参数是zookeeper集群每个节点的地址,之前有提到zookeeper需要有三个节点以上

    注释下面的两行
    #export SPARK_MASTER_HOST=bigdata112
    #export SPARK_MASTER_PORT=7077
    配置好后在两台机器上启动spark-master,在web界面上就会发现一个是ALIVE,一个是StandBy

    3.执行Spark Demo

    (1)Spark-submit

    Spark-submit可以提交任务到Spark集群执行,也可以提交到hadoop的yarn集群执行

    这里运行了一个蒙特卡罗求圆周率的Demo,运行1000次

    spark-submit --master spark://centos:7077 --class org.apache.spark.examples.SparkPi /opt/software/spark-2.2.0-bin-hadoop2.6/examples/jars/spark-examples_2.11-2.2.0.jar 1000

    --master后面跟spark的地址,然后用--class指定类和jar包,以及运行次数

    (2)Spark-shell

    Spark-shell是Spark自带的交互式程序,方便用户进行交互式变成,用户可以在该命令行下用scala编写spark程序。
    Spark-shell有两种运行模式:本地模式和集群模式
    本地模式:不连接到集群,在本地直接执行Spark任务(local模式)
    直接运行spark-shell

    集群模式:连接到集群,在集群执行任务
    集群模式下的shell将作为一个独立的Application链接到Master上

    运行spark-shell --master spark://centos:7077

    Spark的Web上可以看见

    (3)Spark实现WordCount

    Spark可以集成到HDFS,读取HDFS里的文件

    先做一个测试文件data.txt,上传到HDFS上

    执行WordCount

    进行单步分析

    可以看到一个String类型的RDD,用来存储文本信息,但这个时候并不会真正的执行

    执行rdd1.collect之后,才会真正的执行,获取文本文件里的字符串,放进RDD里

    flatmap_是表示rdd1里的每个元素,然后使用split方法,间隔符是空格,同样的,要执行collect才算真正执行

    map((_,1))是把元素里的每个元素都映射成了(word,1)的kv对,这个语法糖等价于下面这条语句

    reduceByKey方法是使用一个相关的函数来合并每个key的value的值的一个算子,前面的下划线可以理解为sum,用来迭代计算和,后面的下划线是每个kv对的value

    总结:RDD就是一个集合,存在依赖关系,RDD有些方法不会触发计算,有些会触发计算

    (4)Spark WordCount的Java版本

    新建Java Project,名为ZSparkDemo,然后在project下新建folder,名为lib,然后把/opt/software/spark-2.2.0-bin-hadoop2.6/jars下的jar包复制到lib文件夹里

    代码与注释如下

    package demo;
    
    import java.util.Arrays;
    import java.util.Iterator;
    import java.util.List;
    
    import org.apache.spark.SparkConf;
    import org.apache.spark.api.java.JavaPairRDD;
    import org.apache.spark.api.java.JavaRDD;
    import org.apache.spark.api.java.JavaSparkContext;
    import org.apache.spark.api.java.function.FlatMapFunction;
    import org.apache.spark.api.java.function.Function2;
    import org.apache.spark.api.java.function.PairFunction;
    
    import scala.Tuple2;
    
    /*
     * 打包成jar包上传到集群环境后,使用spark-submit提交任务
     * spark-submit --master spark://centos:7077 --class demo.JavaWordCount /WordCount.jar hdfs://10.1.130.233:9000/input/data.txt
     */
    public class JavaWordCount {
    
    	public static void main(String[] args) {
    
    		// 配置参数,setAppName方法指定app的名字
    		// setMaster方法用以设定Master的URL,设为local就会在本地以单线程运行
    		// local[4]就会在本地以4核运行,设为"spark://master:7077就会在独立集群上运行,或者不写就会默认在集群运行
    		//SparkConf conf = new SparkConf().setAppName("JavaWordCount").setMaster("local");
    		SparkConf conf = new SparkConf().setAppName("JavaWordCount");
    
    		// 创建一个SparkContext对象
    		JavaSparkContext sc = new JavaSparkContext(conf);
    
    		// 指定路径,读入数据,路径可以是本地路径,也可以是HDFS上的
    		// 这个方法返回的是一个Java的RDD,类型是String
    		//JavaRDD<String> datas = sc.textFile("hdfs://10.1.130.233:9000/input/data.txt");
    		//这里可以不把路径写死,而是将args传入的第一个参数作为路径
    		JavaRDD<String> datas = sc.textFile(args[0]);
    
    		// 分词
    		// 这里需要实现FlatMapFunction接口,表示要对每个传入的文本所要执行的操作FlatMapFunction<String, U>
    		// 把U改成String,第一个String代表输入的文本,第二个String表示分词后的每个单词
    		JavaRDD<String> words = datas.flatMap(new FlatMapFunction<String, String>() {
    
    			@Override
    			// line表示每一行传入的数据
    			public Iterator<String> call(String line) throws Exception {
    				// 因为split完之后,返回的是一个String类型的数组,所以要用Arrays的asList方法转换成是一个List,然后才能用iterator
    				return Arrays.asList(line.split(" ")).iterator();
    			}
    
    		});
    
    		// 每个单词记一次数map((单词,1)
    		// 这里需要实现PairFunction接口,PairFunction<String, K2, V2>
    		// String代表传入的参数,K2,V2相当于MapReduce里Map的输出(Beijing,1),所以Key是String类型,V是Integer类型
    		JavaPairRDD<String, Integer> wordOne = words.mapToPair(new PairFunction<String, String, Integer>() {
    
    			@Override
    			public Tuple2<String, Integer> call(String word) throws Exception {
    				// Beijing --->(Beijing,1)
    				return new Tuple2<String, Integer>(word, 1);
    			}
    
    		});
    
    		// 执行Reduce的操作,把相同单词的value做求和
    		// Function2<Integer, Integer,
    		// Integer>,前面两个Integer表示:两个key相同的value,最后一个Integer表示运算的结果
    		JavaPairRDD<String, Integer> count = wordOne.reduceByKey(new Function2<Integer, Integer, Integer>() {
    
    			@Override
    			public Integer call(Integer a, Integer b) throws Exception {
    				// TODO Auto-generated method stub
    				return a + b;
    			}
    		});
    
    		// 触发计算
    		List<Tuple2<String, Integer>> result = count.collect();
    
    		// 输出到Console
    		for (Tuple2<String, Integer> r : result) {
    			System.out.println(r._1 + ":" + r._2);
    		}
    
    		// 停止SparkContext对象
    		sc.stop();
    	}
    
    }
    

    打包成jar包后上传到集群环境, 通过spark-submit提交到集群运行

    spark-submit --master spark://centos:7077 --class demo.JavaWordCount /WordCount.jar hdfs://10.1.130.233:9000/input/data.txt

    在集群上运行结果如下 


    在Spark WebUI

    (5)Spark WordCount的Scala版本

    package demo
    
    import org.apache.spark.SparkContext
    import org.apache.spark.SparkConf
    
    object WordCount {
      def main(args: Array[String]): Unit = {
        
        //获取Spark配置,setAppName方法用来设置app的名字,setMaster设为local则为在本地运行不提交到集群
        //val conf = new SparkConf().setAppName("WordCount").setMaster("local")
        val conf=new SparkConf().setAppName("WordCount")
        
        //获取SparkContext
        val sc = new SparkContext(conf)
    
        //textFile指定路径,然后做分词,转换成kv对,再reduceByKey做统计处理
        //val count=sc.textFile("hdfs://centos:9000/input/data.txt").flatMap(_.split(" ")).map((_, 1)).reduceByKey(_ + _)
        val count=sc.textFile(args(0)).flatMap(_.split(" ")).map((_,1)).reduceByKey(_+_)
        
        //将结果保存到HDFS目录下,repartition方法设定只返回一个RDD,saveAsTextFile设定结果保存的地址
        count.repartition(1).saveAsTextFile(args(1))
        
        //触发计算
        val result=count.collect()
        
        //输出结果
        result.foreach(println)
        
        //停止SparkContext对象
        sc.stop()
      }
    }

     执行语句

    spark-submit --master spark://centos:7077 --class demo.WordCount /WordCount.jar hdfs://centos:9000/input/data.txt hdfs://centos:9000/output/result

    运行结果
     在HDFS里也可以看到生成的文件

    WordCount流程分析图

    4.Spark运行机制及原理分析

    展开全文
  • spark-core_2.11-1.5.2.logging.jar

    热门讨论 2016-11-29 22:51:44
    spark-core_2.11-2.0.0.jar比spark-core_2.11-1.5.2.jar少了org.apache.spark.Logging.class,故此把缺少的class放到spark-core_2.11-1.5.2.logging.jar里面
  • Spark Launcher Java API提交Spark算法

    千次阅读 2020-06-07 12:36:06
    在介绍之前,我先附上spark 官方文档地址: http://spark.apache.org/docs/latest/api/java/org/apache/spark/launcher/package-summary.html 源码github地址: ...

    在介绍之前,我先附上spark 官方文档地址:

    http://spark.apache.org/docs/latest/api/java/org/apache/spark/launcher/package-summary.html

    个人源码github地址:

    https://github.com/yyijun/framework/tree/master/framework-spark

    1.主要提交参数说明

     spark-submit \ 
        --master yarn \ 
        --deploy-mode cluster \ 
        --driver-memory 4g \
        --driver-cores 4 \
        --num-executors 20 \
        --executor-cores 4 \
        --executor-memory  10g \
        --class com.yyj.train.spark.launcher.TestSparkLauncher \ 
        --conf spark.yarn.jars=hdfs://hadoop01.xxx.xxx.com:8020/trainsparklauncher/jars/*.jar \ 
        --jars $(ls lib/*.jar| tr '\n' ',') \ 
        lib/ train-spark-1.0.0.jar

    --conf spark.yarn.jars:提交算法到yarn集群时算法依赖spark安装包lib目录下的jar包,如果不指定,则每次启动任务都会先上传相关依赖包,耗时严重;

    --jars:算法依赖的相关包,spark standalone模式、yarn模式都有用,多个依赖包用逗号”,”分隔;

    2.Idea提交算法到yarn集群

    2.1.入口参数配置

        val spark = SparkSession
          .builder
          .appName("TestSparkLauncher")
          .master("yarn")
          .config("deploy.mode", "cluster")
          .config("spark.yarn.jars", "hdfs://hadoop01.xxx.xxx.com:8020/trainsparklauncher/jars/*.jar")
          .config("spark.sql.warehouse.dir", "/user/hive/warehouse")
          .enableHiveSupport()
          .getOrCreate()

    2.2.pom.xml配置

    <dependency>
          <groupId>org.apache.spark</groupId>
          <artifactId>spark-yarn_2.11</artifactId>
          <version>2.1.0</version>
    </dependency>

    3.提交准备

    1、从大数据平台下载hadoop相关的xml配置文件:
        core-site.xml:必须;
        hdfs-site.xml:必须;
        hive-site.xml:提交的算法里面用到spark on hive时需要此文件;
        yarn-site.xml:提交算法到yarn时必须要此文件;
    
    2、准备自己的算法包,这里对应替换为自己的算法包:
        train-spark-1.0.0.jar和train-common-1.0.0.jar
    
    3、上传spark安装目录下jars目录下相关的jar包到hdfs:hadoop fs –put –f /opt/cloudera/parcels/SPARK2/lib/spark2/jars /hdfs目录

    测试提交算法

    package com.yyj.framework.spark.launcher;
    
    import java.io.File;
    import java.util.HashMap;
    import java.util.Map;
    
    /**
     * Created by yangyijun on 2019/5/20.
     * 提交spark算法入口类
     */
    public class SparkLauncherMain {
    
        public static void main(String[] args) {
            System.out.println("starting...");
            String confPath = "/Users/yyj/workspace/alg/src/main/resources";
            System.out.println("confPath=" + confPath);
    
            //开始构建提交spark时依赖的jars
            String rootPath = "/Users/yyj/workspace/alg/lib/";
            File file = new File(rootPath);
            StringBuilder sb = new StringBuilder();
            String[] files = file.list();
            for (String s : files) {
                if (s.endsWith(".jar")) {
                    sb.append("hdfs://hadoop01.xxx.xxx.com:8020/user/alg/jars/");
                    sb.append(s);
                    sb.append(",");
                }
            }
            String jars = sb.toString();
            jars = jars.substring(0, jars.length() - 1);
    
            Map<String, String> conf = new HashMap<>();
            conf.put(SparkConfig.DEBUG, "false");
            conf.put(SparkConfig.APP_RESOURCE, "hdfs://hadoop01.xxx.xxx.com:8020/user/alg/jars/alg-gs-offline-1.0.0.jar");
            conf.put(SparkConfig.MAIN_CLASS, "com.yyj.alg.gs.offline.StartGraphSearchTest");
            conf.put(SparkConfig.MASTER, "yarn");
            //如果是提交到spark的standalone集群则采用下面的master
            //conf.put(SparkConfig.MASTER, "spark://hadoop01.xxx.xxx.com:7077");
            conf.put(SparkConfig.APP_NAME, "offline-graph-search");
            conf.put(SparkConfig.DEPLOY_MODE, "client");
            conf.put(SparkConfig.JARS, jars);
            conf.put(SparkConfig.HADOOP_CONF_DIR, confPath);
            conf.put(SparkConfig.YARN_CONF_DIR, confPath);
            conf.put(SparkConfig.SPARK_HOME, "/Users/yyj/spark2");
            conf.put(SparkConfig.DRIVER_MEMORY, "2g");
            conf.put(SparkConfig.EXECUTOR_CORES, "2");
            conf.put(SparkConfig.EXECUTOR_MEMORY, "2g");
            conf.put(SparkConfig.SPARK_YARN_JARS, "hdfs://hadoop01.xxx.xxx.com:8020/user/alg/jars/*.jar");
            conf.put(SparkConfig.APP_ARGS, "params");
            SparkActionLauncher launcher = new SparkActionLauncher(conf);
            boolean result = launcher.waitForCompletion();
            System.out.println("============result=" + result);
        }
    }

    构造SparkLauncher对象,配置Spark提交算法相关参数及说明

     private SparkLauncher createSparkLauncher() {
            logger.info("actionConfig:\n" + JSON.toJSONString(conf, true));
            this.debug = Boolean.parseBoolean(conf.get(SparkConfig.DEBUG));
            Map<String, String> env = new HashMap<>();
            //配置hadoop的xml文件本地路径
            env.put(SparkConfig.HADOOP_CONF_DIR, conf.get(SparkConfig.HADOOP_CONF_DIR));
            //配置yarn的xml文件本地路径
            env.put(SparkConfig.YARN_CONF_DIR, conf.get(SparkConfig.HADOOP_CONF_DIR));
            SparkLauncher launcher = new SparkLauncher(env);
            //设置算法入口类所在的jar包本地路径
            launcher.setAppResource(conf.get(SparkConfig.APP_RESOURCE));
            //设置算法入口类保证包名称及类名,例:com.yyj.train.spark.launcher.TestSparkLauncher
            launcher.setMainClass(conf.get(SparkConfig.MAIN_CLASS));
            //设置集群的master地址:yarn/spark standalone的master地址,例:spark://hadoop01.xxx.xxx.com:7077
            launcher.setMaster(conf.get(SparkConfig.MASTER));
            //设置部署模式:cluster(集群模式)/client(客户端模式)
            launcher.setDeployMode(conf.get(SparkConfig.DEPLOY_MODE));
            //设置算法依赖的包的本地路径,多个jar包用逗号","隔开,如果是spark on yarn只需要把核心算法包放这里即可,
            // spark相关的依赖包可以预先上传到hdfs并通过 spark.yarn.jars参数指定;
            // 如果是spark standalone则需要把所有依赖的jar全部放在这里
            launcher.addJar(conf.get(SparkConfig.JARS));
            //设置应用的名称
            launcher.setAppName(conf.get(SparkConfig.APP_NAME));
            //设置spark客户端安装包的home目录,提交算法时需要借助bin目录下的spark-submit脚本
            launcher.setSparkHome(conf.get(SparkConfig.SPARK_HOME));
            //driver的内存设置
            launcher.addSparkArg(SparkConfig.DRIVER_MEMORY, conf.getOrDefault(SparkConfig.DRIVER_MEMORY, "4g"));
            //driver的CPU核数设置
            launcher.addSparkArg(SparkConfig.DRIVER_CORES, conf.getOrDefault(SparkConfig.DRIVER_CORES, "2"));
            //启动executor个数
            launcher.addSparkArg(SparkConfig.NUM_EXECUTOR, conf.getOrDefault(SparkConfig.NUM_EXECUTOR, "30"));
            //每个executor的CPU核数
            launcher.addSparkArg(SparkConfig.EXECUTOR_CORES, conf.getOrDefault(SparkConfig.EXECUTOR_CORES, "4"));
            //每个executor的内存大小
            launcher.addSparkArg(SparkConfig.EXECUTOR_MEMORY, conf.getOrDefault(SparkConfig.EXECUTOR_MEMORY, "4g"));
            String sparkYarnJars = conf.get(SparkConfig.SPARK_YARN_JARS);
            if (StringUtils.isNotBlank(sparkYarnJars)) {
                //如果是yarn的cluster模式需要通过此参数指定算法所有依赖包在hdfs上的路径
                launcher.setConf(SparkConfig.SPARK_YARN_JARS, conf.get(SparkConfig.SPARK_YARN_JARS));
            }
            //设置算法入口参数
            launcher.addAppArgs(new String[]{conf.get(SparkConfig.APP_ARGS)});
            return launcher;
        }

    准spark安装包,用于提交spark算法的客户端,因为提交算法的时候需要用到Spark的home目录下的bin/spark-submit脚本

    重命名conf目录下的spark-env.sh脚本,否则会包如下的错误。原因是spark-env.sh里面配置了大数据平台上的路径,而在提交算法的客户端机器没有对应路径

    debug模式提交或者非debug模式

     /**
         * Submit spark application to hadoop cluster and wait for completion.
         *
         * @return
         */
        public boolean waitForCompletion() {
            boolean success = false;
            try {
                SparkLauncher launcher = this.createSparkLauncher();
                if (debug) {
                    Process process = launcher.launch();
                    // Get Spark driver log
                    new Thread(new ISRRunnable(process.getErrorStream())).start();
                    new Thread(new ISRRunnable(process.getInputStream())).start();
                    int exitCode = process.waitFor();
                    System.out.println(exitCode);
                    success = exitCode == 0 ? true : false;
                } else {
                    appMonitor = launcher.setVerbose(true).startApplication();
                    success = applicationMonitor();
                }
            } catch (Exception e) {
                logger.error(e);
            }
            return success;
        }

    非debug模式提交时,控制台获取处理结果信息

        ///
        // private functions
        ///
        private boolean applicationMonitor() {
            appMonitor.addListener(new SparkAppHandle.Listener() {
                @Override
                public void stateChanged(SparkAppHandle handle) {
                    logger.info("****************************");
                    logger.info("State Changed [state={0}]", handle.getState());
                    logger.info("AppId={0}", handle.getAppId());
                }
    
                @Override
                public void infoChanged(SparkAppHandle handle) {
                }
            });
            while (!isCompleted(appMonitor.getState())) {
                try {
                    Thread.sleep(3000L);
                } catch (InterruptedException e) {
                    e.printStackTrace();
                }
            }
            boolean success = appMonitor.getState() == SparkAppHandle.State.FINISHED;
            return success;
        }
    
        private boolean isCompleted(SparkAppHandle.State state) {
            switch (state) {
                case FINISHED:
                    return true;
                case FAILED:
                    return true;
                case KILLED:
                    return true;
                case LOST:
                    return true;
            }
            return false;
        }

    可以从处理结果中获取到app ID,用于杀掉yarn任务时使用

    4.任务详情

    //访问URL:
    http://<rm http address:port>/ws/v1/cluster/apps/{appID}
    
    //例子
    http://localhost:8088/ws/v1/cluster/apps/application15617064805542301
    

    访问详情地址,返回数据格式如下:

     

    "id": "application15617064805542301",--任务ID
    
    "user": "haizhi",--提交任务的用户名称
    
    "name": "TestSparkLauncher",--应用名称
    
    "queue": "root.users.haizhi",--提交队列
    
    "state": "FINISHED",--任务状态
    
    "finalStatus": "SUCCEEDED",--最终状态
    
    "progress": 100,--任务进度
    
    "trackingUI": "History",
    
    "trackingUrl": "http://hadoop01.xx.xxx.com:18088/proxy/application15617064805542301/A",
    
    "diagnostics":"",--任务出错时的主要错误信息
    
    "clusterId": 1561706480554,
    
    "applicationType": "SPARK",--任务类型
    
    "startedTime":  1562808570464,--任务开始时间,单位毫秒
    
    "finishedTime": 1562808621348,--任务结束时间,单位毫秒
    
    "elapsedTime": 50884,--任务耗时,毫秒
    
    "amContainerLogs": "http://hadoop01.xx.xxx.com:8042/node/containerlogs/container15617064805542301_01_000001/haizhi",--任务详细日志
    
    "amHostHttpAddress": "hadoop01.xx.xxx.com:8042",
    
    "memorySeconds": 198648,--任务分配到的内存数,单位MB
    
    "vcoreSeconds": 145,--任务分配到的CPU核数
    
    "logAggregationStatus": "SUCCEEDED"

     

    5.rest API杀掉任务请求格式:

    • 请求URL:http://<rm http address:port>/ws/v1/cluster/apps/{appid}/state

    • 请求方式:put

    • 请求参数: { "state": "KILLED" }

    例:

    请求URL:http://192.168.1.3:18088/ws/v1/cluster/apps/application15617064805542302/state
    请求方式:put
    请求参数: { "state": "KILLED" }

     

     

     

    展开全文
  • Spark是什么 Spark (全称 Apache Spark™) 是一个专门处理大数据量分析任务的通用数据分析引擎。 spark官网 Spark核心代码是用scala语言开发的,不过支持使用多种语言进行开发调用比如scala,java,python。 spark...

    Spark是什么

    Spark (全称 Apache Spark™) 是一个专门处理大数据量分析任务的通用数据分析引擎。

    spark官网

    Spark核心代码是用scala语言开发的,不过支持使用多种语言进行开发调用比如scala,java,python。

    spark github

    Spark文档2.4.4

    Spark目前有比较完整的数据处理生态组件,可以部署在多种系统环境中,同时支持处理多种数据源。

    在这里插入图片描述

    Spark发展历史

    2009年,Spark诞生于伯克利大学AMPLab,属于伯克利大学的研究性项目;

    2010年,通过BSD 许可协议正式对外开源发布;

    2012年,Spark第一篇论文发布,第一个正式版(Spark 0.6.0)发布;

    2013年,成为了Aparch基金项目,进入高速发展期。第三方开发者贡献了大量的代码,活跃度非常高;发布Spark Streaming、Spark Mllib(机器学习)、Shark(Spark on Hadoop);

    2014 年,Spark 成为 Apache 的顶级项目; 5 月底 Spark1.0.0 发布;发布 Spark Graphx(图计算)、Spark SQL代替Shark;

    2015年,推出DataFrame(大数据分析);2015年至今,Spark在国内IT行业变得愈发火爆,大量的公司开始重点部署或者使用Spark来替代MapReduce、Hive、Storm等传统的大数据计算框架;

    2016年,Spark 2.0.0版本发布,推出dataset(更强的数据分析手段);

    2017年,structured streaming 发布;

    2018年,Spark2.4.0发布,成为全球最大的开源项目。

    截至 2020年1月15号 目前最稳定的最后发布版本为 Spark 2.4.4。

    还有一个 新值得期待的 预发布版本 Spark 3.0 主要 是增加了 与k8s等云结合使用的特性。

    特点

    1、速度快,适合实时分析场景

    Spark基于内存进行计算(当然也有部分计算基于磁盘,比如shuffle),在运算方面是hadoop运算速度的一百多倍。

    2、容易上手开发

    Spark的基于RDD的计算模型,比Hadoop的基于Map-Reduce的计算模型要更加易于理解,更加易于上手开发,实现各种复杂功能,比如二次排序、topN等复杂操作时,更加便捷。

    3、支持多种语言

    Spark提供Java,Scala,Python和R中的高级API .Spark代码可以用任何这些语言编写。 它在Scala和Python中提供了一个shell。 可以通过./bin/spark-shell和Python shell通过./bin/pyspark从已安装的目录访问Scala shell。

    4、支持多种格式的数据来源

    Spark支持多种数据源,如Parquet,JSON,HDFS、Hbase、Hive和Cassandra,Alluxio,CSV和RDBMS表,还包括通常的格式,如文本文件、CSV和RDBMS表,甚至一些云存储比如S3等。 Data Source API提供了一种可插拔的机制,用于通过Spark SQL获取结构化数据。

    5、超强的通用性

    Spark提供了Spark RDD、Spark SQL、Spark Streaming、Spark MLlib、Spark GraphX等技术组件,可以一站式地完成大数据领域的离线批处理、交互式查询、流式计算、机器学习、图计算等常见的任务。

    6、集成Hadoop

    Spark并不是要成为一个大数据领域的“独裁者”,一个人霸占大数据领域所有的“地盘”,而是与Hadoop进行了高度的集成,两者可以完美的配合使用。Hadoop的HDFS、Hive、HBase负责存储,YARN负责资源调度;Spark负责大数据计算。实际上,Hadoop+Spark的组合,是一种“double win”的组合。

    7、可以在任何环境下搭建

    spark框架可以运行在各种操作系统上。

    最初Spark作为hadoop的一个计算框架组件而发布,现在慢慢长大,可以独立运行了。意味着 我们不搭建Hadoop集群也能 独立的安装运行Spark。

    除了运行在Hadoop集群中,

    目前Spark支持

    (一)local本地模式

    只需要一台机器,运行该模式非常简单,只需要把Spark的安装包解压后,默认也不需修改任何配置文件,取默认值。不用启动Spark的Master、Worker守护进程( 只有集群的Standalone方式时,才需要这两个角色),也不用启动Hadoop的各服务(除非你要用到HDFS)。

    运行客户端程序(可以是spark自带的命令行程序,如spark-shell,也可以是程序员利用spark api编写的程序),就可以完成相应的运行。相当于这一个客户端进程,充当了所有的角色。

    这种模式,只适合开发阶段使用,我们可以在该模式下开发和测试代码,使的代码的逻辑没问题,后面再提交到集群上去运行和测试。

    如果是学习或者做测试,为了搭建环境的简化,可以搭建本地模式。

    在实际生产环境,spark会采用集群模式来运行,即分布式式运行,spark可以使用多种集群资源管理器来管理自己的集群。

    (二)独立的Spark集群standalone模式

    Standalone模式,即独立模式,自带完整的服务,使用spark自带的集群资源管理功能。可单独部署到一个集群中,无需依赖任何其他资源管理系统。即每台机器上只需部署下载的Spark版本即可。

    这种模式需要提前启动spark的master和Worker守护进程,才能运行spark客户端程序。

    因为Standalone模式不需要依赖任何第三方组件,如果数据量比较小,且不需要hadoop(如不需要访问hdfs服务),则使用Standalone模式是一种可选的简单方便的方案。

    (三)在aws的ec2中安装

    这种模式类似于Standalone模式,不过部署的集群是aws的ec2服务器,需要有一些 权限方面的配置,在GitHub中有专门针对 ec2中部署spark的脚本项目, 可以直接根据其中的步骤进行部署。

    (四)使用yarn进行管理

    该模式,使用hadoop的YARN作为集群资源管理器。这种模式下因为使用yarn的服务进行资源管理,所以不需要启动Spark的Master、Worker守护进程。

    如果你的应用不仅使用spark,还用到hadoop生态圈的其它服务,从兼容性上考虑,使用Yarn作为统一的资源管理是更好的选择,这样选择这种模式就比较适合。

    目前spark on yarn的部署方式 最为常用。

    (五)使用mesos进行管理

    该模式,使用Mesos作为集群资源管理器。如果你的应用还使用了docker,则选择此模式更加通用。

    (六)使用k8s进行管理

    Spark本身的设计更偏向使用静态的资源管理,虽然Spark也支持了类似Yarn等动态的资源管理器,但是这些资源管理并不是面向动态的云基础设施而设计的,在速度、成本、效率等领域缺乏解决方案。

    随着Kubernetes的快速发展,数据科学家们开始考虑是否可以用Kubernetes的弹性与面向云原生等特点与Spark进行结合。

    在Spark 2.3中,Resource Manager中添加了Kubernetes原生的支持。

    意味着 我们可以使用k8s对Spark进行管理了,而且能运用云的特性,很好的进行集群伸缩,降低我们的成本以及当运算资源不足时快速增加节点。

    (七) 伪分布集群模式

    即在一台机器上模拟集群下的分布式场景,会启动多个进程。上述的集群模式都可以启动伪分布式集群模式,当然要求机器的配置满足要求。

    这种模式主要是开发阶段和学习使用。

    8、极高的社区活跃度

    Spark目前是Apache基金会的顶级项目,全世界有大量的优秀工程师是Spark的committer。并且世界上很多顶级的IT公司都在大规模地使用Spark。

    spark的使用场景

    物联网领域: 通过物联网的设备收集到海量的数据,比如环境监控,海洋监控,地震预测等,需要及时的处理反馈。

    大健康领域: 用户健康生活与遗传信息基因等数据的分析,反馈健康方面的信息给用户

    医疗保健:医疗保健领域使用实时分析来持续检查关键患者的医疗状况。寻找血液和器官移植的医院需要在紧急情况下保持实时联系。及时就医是患者生死攸关的问题。

    政府:政府机构主要在国家安全领域进行实时分析。各国需要不断跟踪警察和安全机构对于威胁的更新。

    电信:以电话,视频聊天和流媒体实时分析等形式围绕服务的公司,以减少客户流失并保持领先竞争优势。他们还提取移动网络的测量结果。

    银行业务:银行业务几乎涉及全球所有资金。确保整个系统的容错事务变得非常重要。通过银行业务的实时分析,可以实现欺诈检测。

    股票市场:股票经纪人使用实时分析来预测股票投资组合的变动。公司通过使用实时分析来推销其品牌的市场需求,从而重新思考其业务模式。

    使用spark的公司和项目也非常多,可以参考官网列表

    Project and Product names using

    hadoop和spark的关系与区别

    Spark作为Hadoop生态中重要的一员,其发展速度堪称恐怖,不过其作为一个完整的技术栈,在技术和环境的双重刺激下,得到如此多的关注也是有依据的。

    Spark核心在于内存计算模型代替Hadoop生态的MapReduce离线计算模型,用更加丰富Transformation和Action算子来替代map,reduce两种算子。

    计算流程的区别

    Hadoop这项大数据处理技术大概已有十年历史,而且被看做是首选的大数据集合处理的解决方案。

    MapReduce是单流程的优秀解决方案,不过对于需要多流程计算和算法的用例来说,并非十分高效。

    数据处理流程中的每一步都需要一个Map阶段和一个Reduce阶段,而且如果要利用这一解决方案,需要将所有用例都转换成MapReduce模式。

    在下一步开始之前,上一步的作业输出数据必须要存储到分布式文件系统中。因此,复制和磁盘存储会导致这种方式速度变慢。

    另外Hadoop解决方案中通常会包含难以安装和管理的集群。而且为了处理不同的大数据用例,还需要集成多种不同的工具(如用于机器学习的Mahout和流数据处理的Storm)。

    如果想要完成比较复杂的工作,就必须将一系列的MapReduce作业串联起来然后顺序执行这些作业。每一个作业都是高时延的,而且只有在前一个作业完成之后下一个作业才能开始启动。

    而Spark则允许程序开发者使用有向无环图(DAG)开发复杂的多步数据管道。而且还支持跨有向无环图的内存数据共享,以便不同的作业可以共同处理同一个数据。

    Spark运行在现有的Hadoop分布式文件系统基础之上(HDFS)提供额外的增强功能。

    它支持将Spark应用部署到现存的Hadoop v1集群(with SIMR – Spark-Inside-MapReduce)或Hadoop v2 YARN集群甚至是Apache Mesos之中。

    我们应该将Spark看作是Hadoop MapReduce的一个替代品而不是Hadoop的替代品。其意图并非是替代Hadoop,而是为了提供一个管理不同的大数据用例和需求的全面且统一的解决方案。

    在这里插入图片描述

    关键区别

    hadoop是批处理工具,更擅长处理离线数据,而spark在内存中处理数据,可以是实时处理。

    在这里插入图片描述

    Hadoop基于大数据的批处理。 这意味着数据会在一段时间内先存储下来,然后使用Hadoop进行处理。

    在Spark中,处理可以实时进行。

    Spark中的这种实时处理能力帮助我们解决实时分析问题。

    除此之外,Spark能够比Hadoop MapReduce( Hadoop处理框架)快100倍地进行批处理。

    因此,目前Apache Spark是业界大数据处理的首选工具。

    hadoop和spark发展的历史故事参考

    https://www.zhihu.com/question/23036370?sort=created

    组件框架的区别

    针对核心关键的功能 ,Hadoop和Spark都发展出了相应的组件

    HadoopSpark
    处理引擎MapreduceSpark RDD(Spark Core)
    交互式查询HiveSpark SQL
    实时流计算StormSpark Streaming
    机器学习MahoutMLlib
    图计算Hama或者 GiraphGraphX

    Spark相关概念

    Spark Shell

    Spark的shell提供了一种学习API的简单方法,以及一种以交互方式分析数据的强大工具。

    Spark Session

    在早期版本的Spark中,Spark Context是Spark的入口点。 对于每个其他API,我们需要使用不同的上下文。 对于流式传输,我们需要StreamingContext,SQL sqlContext和hive HiveContext。 为了解决这个问题,SparkSession应运而生。 它本质上是SQLContext,HiveContext和StreamingContext的组合。

    数据源

    Data Source API提供了一种可插拔的机制,用于通过Spark SQL访问结构化数据。 Data Source API用于将结构化和半结构化数据读取并存储到Spark SQL中。 数据源不仅仅是简单的管道,可以转换数据并将其拉入Spark。

    RDD

    弹性分布式数据集(RDD)是Spark的基本数据结构。 它是一个不可变的分布式对象集合。 RDD中的每个数据集被划分为逻辑分区,其可以在集群的不同节点上计算。 RDD可以包含任何类型的Python,Java或Scala对象,包括用户定义的类。

    RDD可被分发到集群各个节点上,进行并行操作。RDDs 可以通过 Hadoop InputFormats 创建(如 HDFS),或者从其他 RDDs 转化而来。

    获得RDD的三种方式:

    Parallelize:将一个存在的集合,变成一个RDD,这种方式试用于学习spark和做一些spark的测试

     >>>sc.parallelize(['cat','apple','bat’])
    

    MakeRDD:只有scala版本才有此函数,用法与parallelize类似

    textFile:从外部存储中读取数据来创建 RDD

    >>>sc.textFile(“file\\\usr\local\spark\README.md”)
    

    RDD的两个特性:不可变;分布式。

    RDD支持两种操作;

    Transformation(转化操作:返回值还是RDD)如map(),filter()等。这种操作是lazy(惰性)的,即从一个RDD转换生成另一个RDD的操作不是马上执行,只是记录下来,只有等到有Action操作是才会真正启动计算,将生成的新RDD写到内存或hdfs里,不会对原有的RDD的值进行改变;

    Action(行动操作:返回值不是RDD)会实际触发Spark计算,对RDD计算出一个结果,并把结果返回到内存或hdfs中,如count(),first()等。

    RDD的缓存策略

    Spark最为强大的功能之一便是能够把数据缓存在集群的内存里。这通过调用RDD的cache函数来实现:rddFromTextFile.cache,

    调用一个RDD的cache函数将会告诉Spark将这个RDD缓存在内存中。在RDD首次调用一个执行操作时,这个操作对应的计算会立即执行,数据会从数据源里读出并保存到内存。因此,首次调用cache函数所需要的时间会部分取决于Spark从输入源读取数据所需要的时间。但是,当下一次访问该数据集的时候,数据可以直接从内存中读出从而减少低效的I/O操作,加快计算。多数情况下,这会取得数倍的速度提升。

    广播变量

    广播变量(broadcast variable)为只读变量,它由运行SparkContext的驱动程序创建后发送给会参与计算的节点。对那些需要让各工作节点高效地访问相同数据的应用场景,比如机器学习,这非常有用。Spark下创建广播变量只需在SparkContext上调用一个方法即可:

    >>> broadcastAList = sc.broadcast(list(["a", "b", "c", "d", "e"]))
    
    

    累加器Accumulator

    在Spark中如果想在Task计算的时候统计某些事件的数量,使用filter/reduce也可以,但是使用累加器是一种更方便的方式,累加器一个比较经典的应用场景是用来在Spark Streaming应用中记录某些事件的数量。

    使用累加器时需要注意只有Driver能够取到累加器的值,Task端进行的是累加操作。

    创建的Accumulator变量的值能够在Spark Web UI上看到,在创建时应该尽量为其命名

    Spark内置了三种类型的Accumulator,分别是LongAccumulator用来累加整数型,DoubleAccumulator用来累加浮点型,CollectionAccumulator用来累加集合元素。

    后续我们会记录累加器的用法。

    Dataset

    Dataset是分布式数据集合。 数据集可以从JVM对象构造,然后使用功能转换(map,flatMap,filter等)进行操作。 数据集API在Scala和Java中可用。

    DataFrames

    DataFrame是命名列组织成数据集。 它在概念上等同于关系数据库中的表或R / Python中的数据框,但在引擎盖下具有更丰富的优化。 DataFrame可以从多种来源构建,例如:结构化数据文件,Hive中的表,外部数据库或现有RDD。

    RDD、Dataframe、DataSet区别

    在这里插入图片描述

    spark中 RDD、DataFrame、Dataset的关系及区别 以及相互转换

    Spark 组件

    Spark组件使Apache Spark快速可靠。 构建了很多这些Spark组件来解决使用Hadoop MapReduce时出现的问题。 Apache Spark具有以下组件:

    Spark Core
    Spark Streaming
    Spark SQL
    GraphX
    MLlib (Machine Learning)

    在这里插入图片描述
    用户使用的SQL、Streaming、MLib、GraphX接口最终都会转换成Spark Core分布式运行。

    Spark Core

    Spark Core是大规模并行和分布式数据处理的基础引擎。 核心是分布式执行引擎,Java,Scala和Python API为分布式ETL应用程序开发提供了一个平台。 此外,在核心上构建的其他库允许用于流式传输,SQL和机器学习的各种工作负载。 它负责:

    内存管理和故障恢复
    在群集上调度,分发和监视作业
    与存储系统交互

    Spark Streaming

    Spark Streaming是Spark的组件,用于处理实时流数据。 因此,它是核心Spark API的补充。 它支持实时数据流的高吞吐量和容错流处理。 基本流单元是DStream,它基本上是一系列用于处理实时数据的RDD(弹性分布式数据集)。

    Spark Streaming是spark中一个非常重要的扩展库,它是Spark核心API的一个扩展,可以实现高吞吐量的、具备容错机制的实时流数据的处理。支持从多种数据源获取数据,包括Kafk、Flume、以及TCP socket等,从数据源获取数据之后,可以使用诸如map、reduce和window等高级函数进行复杂算法的处理。最后还可以将处理结果存储到文件系统和数据库等。

    但从Spark2.0开始,提出了新的实时流框架 Structured Streaming (2.0和2.1是实验版本,从Spark2.2开始为稳定版本)来替代Spark streaming,这时Spark streaming就进入维护模式。相比Spark Streaming,Structured Streaming的Api更加好用,功能强大。

    Spark SQL

    Spark SQL是Spark中的一个新模块,它使用Spark编程API实现集成关系处理。 它支持通过SQL或Hive查询查询数据。 对于那些熟悉RDBMS的人来说,Spark SQL将很容易从之前的工具过渡到可以扩展传统关系数据处理的边界。

    Spark SQL通过函数编程API集成关系处理。 此外,它为各种数据源提供支持,并且使用代码转换编织SQL查询,从而产生一个非常强大的工具。

    以下是Spark SQL的四个库。

    Data Source API
    DataFrame API
    Interpreter & Optimizer
    SQL Service

    Spark SQL是Spark用来操作结构化数据的组件。通过Spark SQL,用户可以使用SQL或者Apache Hive版本的SQL方言(HQL)来查询数据。Spark SQL支持多种数据源类型,例如Hive表、Parquet以及JSON等。Spark SQL不仅为Spark提供了一个SQL接口,还支持开发者将SQL语句融入到Spark应用程序开发过程中,无论是使用Python、Java还是Scala,用户可以在单个的应用中同时进行SQL查询和复杂的数据分析。

    GraphX

    GraphX是用于图形和图形并行计算的Spark API。 因此,它使用弹性分布式属性图扩展了Spark RDD。

    属性图是一个有向多图,它可以有多个平行边。 每个边和顶点都有与之关联的用户定义属性。 这里,平行边缘允许相同顶点之间的多个关系。 在高层次上,GraphX通过引入弹性分布式属性图来扩展Spark RDD抽象:一个定向多图,其属性附加到每个顶点和边。

    为了支持图形计算,GraphX公开了一组基本运算符(例如,subgraph,joinVertices和mapReduceTriplets)以及Pregel API的优化变体。 此外,GraphX包含越来越多的图算法和构建器,以简化图形分析任务。

    GraphX是Spark面向图计算提供的框架与算法库。GraphX中提出了弹性分布式属性图的概念,并在此基础上实现了图视图与表视图的有机结合与统一;同时针对图数据处理提供了丰富的操作,例如取子图操作subgraph、顶点属性操作mapVertices、边属性操作mapEdges等。GraphX还实现了与Pregel的结合,可以直接使用一些常用图算法,如PageRank、三角形计数等。

    MlLib (Machine Learning)

    MLlib代表机器学习库。 Spark MLlib用于在Apache Spark中执行机器学习。

    MLlib是Spark提供的一个机器学习算法库,其中包含了多种经典、常见的机器学习算法,主要有分类、回归、聚类、协同过滤等。MLlib不仅提供了模型评估、数据导入等额外的功能,还提供了一些更底层的机器学习原语,包括一个通用的梯度下降优化基础算法。所有这些方法都被设计为可以在集群上轻松伸缩的架构。

    如何运行Spark程序

    在实际编程中,我们不需关心以上调度细节.只需使用 Spark 提供的指定语言的编程接口调用相应的 API 即可.
      在 Spark API 中, 一个 应用(Application) 对应一个 SparkContext 的实例。一个 应用 可以用于单个 Job,或者分开的多个 Job 的 session,或者响应请求的长时间生存的服务器。与 MapReduce 不同的是,一个 应用 的进程(我们称之为 Executor),会一直在集群上运行,即使当时没有 Job 在上面运行。
      而调用一个Spark内部的 Action 会产生一个 Spark job 来完成它。 为了确定这些job实际的内容,Spark 检查 RDD 的DAG再计算出执行 plan 。这个 plan 以最远端的 RDD 为起点(最远端指的是对外没有依赖的 RDD 或者 数据已经缓存下来的 RDD),产生结果 RDD 的 Action 为结束 。并根据是否发生 shuffle 划分 DAG 的 stage.

    Spark原生架构和运行原理

    架构和粗流程描述

    一个完整的Spark应用程序,在提交集群运行时,它的处理流程涉及到如下图所示的架构:

    在这里插入图片描述

    每个Spark应用都由一个驱动器程序(drive program)来发起集群上的各种并行操作。

    驱动器程序包含应用的main函数。

    驱动器负责创建SparkContext。

    SparkContext可以与不同种类的集群资源管理器(Cluster Manager),例如Hadoop YARN,Mesos进行通信。

    获取到集群进行所需的资源后,SparkContext将得到集群中工作节点(Worker Node)上对应的Executor。

    不同的Spark程序有不同的Executor,他们之间是相互独立的进程,Executor为应用程序提供分布式计算以及数据存储功能。

    之后SparkContext将应用程序代码发送到各Executor,将任务(Task)分配给executors执行。

    ClusterManager

    在Standalone模式中即为Master节点(主节点),控制整个集群,监控Worker.在YARN中为ResourceManager

    Worker

    从节点,负责控制计算节点,启动Executor或Driver。在YARN模式中为NodeManager,负责计算节点的控制。

    Driver

    运行Application的main()函数并创建SparkContect。

    Executor

    执行器,在worker node上执行任务的组件、用于启动线程池运行任务。每个Application拥有独立的一组Executor。

    SparkContext

    整个应用的上下文,控制应用的生命周期。

    RDD

    Spark的计算单元,一组RDD可形成执行的有向无环图RDD Graph。

    DAG Scheduler

    根据作业(Job)构建基于Stage的DAG,并提交Stage给TaskScheduler。

    TaskScheduler

    将任务(Task)分发给Executor。

    SparkEnv

    线程级别的上下文,存储运行时的重要组件的引用。

    SparkEnv内构建并包含如下一些重要组件的引用。

    1)MapOutPutTracker:负责Shuffle元信息的存储。
    2)BroadcastManager:负责广播变量的控制与元信息的存储。
    3)BlockManager:负责存储管理、创建和查找快。
    4)MetricsSystem:监控运行时性能指标信息。
    5)SparkConf:负责存储配置信息。

    详细流程描述

    使用spark-submit提交一个Spark作业之后,这个作业就会启动一个对应的Driver进程。

    根据你使用的部署模式(deploy-mode)不同,Driver进程可能在本地启动,也可能在集群中某个工作节点上启动。

    而Driver进程要做的第一件事情,就是向集群管理器(可以是Spark Standalone集群,也可以是其他的资源管理集群,比如使用YARN作为资源管理集群)申请运行Spark作业需要使用的资源,这里的资源指的就是Executor进程。

    YARN集群管理器会根据我们为Spark作业设置的资源参数,在各个工作节点上,启动一定数量的Executor进程,每个Executor进程都占有一定数量的内存和CPU core。

    在申请到了作业执行所需的资源之后,Driver进程就会开始调度和执行我们编写的作业代码了。

    Driver进程会将我们编写的Spark作业代码分拆为多个stage,每个stage执行一部分代码片段,并为每个stage创建一批Task,然后将这些Task分配到各个Executor进程中执行。

    Task是最小的计算单元,负责执行一模一样的计算逻辑(也就是我们自己编写的某个代码片段),只是每个Task处理的数据不同而已。

    一个stage的所有Task都执行完毕之后,会在各个节点本地的磁盘文件中写入计算中间结果,然后Driver就会调度运行下一个stage。

    下一个stage的Task的输入数据就是上一个stage输出的中间结果。

    如此循环往复,直到将我们自己编写的代码逻辑全部执行完,并且计算完所有的数据,得到我们想要的结果为止。

    Spark是根据shuffle类算子来进行stage的划分。

    如果我们的代码中执行了某个shuffle类算子(比如reduceByKey、join等),那么就会在该算子处,划分出一个stage界限来。

    可以大致理解为,shuffle算子执行之前的代码会被划分为一个stage,shuffle算子执行以及之后的代码会被划分为下一个stage。

    因此一个stage刚开始执行的时候,它的每个Task可能都会从上一个stage的Task所在的节点,去通过网络传输拉取需要自己处理的所有key,然后对拉取到的所有相同的key使用我们自己编写的算子函数执行聚合操作(比如reduceByKey()算子接收的函数)。这个过程就是shuffle。

    当我们在代码中执行了cache/persist等持久化操作时,根据我们选择的持久化级别的不同,每个Task计算出来的数据也会保存到Executor进程的内存或者所在节点的磁盘文件中。

    因此Executor的内存主要分为三块:

    第一块是让Task执行我们自己编写的代码时使用,默认是占Executor总内存的20%;

    第二块是让Task通过shuffle过程拉取了上一个stage的Task的输出后,进行聚合等操作时使用,默认也是占Executor总内存的20%;

    第三块是让RDD持久化时使用,默认占Executor总内存的60%。

    Task的执行速度是跟每个Executor进程的CPU core数量有直接关系的。

    一个CPU core同一时间只能执行一个线程。而每个Executor进程上分配到的多个Task,都是以每个Task一条线程的方式,多线程并发运行的。

    如果CPU core数量比较充足,而且分配到的Task数量比较合理,那么通常来说,可以比较快速和高效地执行完这些Task线程。

    以上就是Spark作业的基本运行原理的说明.

    shuffle 和 stage

    shuffle 是划分 DAG 中 stage 的标识,同时影响 Spark 执行速度的关键步骤.

    RDD 的 Transformation 函数中,又分为窄依赖(narrow dependency)和宽依赖(wide dependency)的操作.

    窄依赖跟宽依赖的区别在于 是否发生 shuffle(洗牌) 操作.

    宽依赖会发生 shuffle 操作. 窄依赖是子 RDD的各个分片(partition)不依赖于其他分片,能够独立计算得到结果,宽依赖指子 RDD 的各个分片会依赖于父RDD 的多个分片,所以会造成父 RDD 的各个分片在集群中重新分片, 看如下两个示例:

    // Map: "cat" -> c, cat
    val rdd1 = rdd.Map(x => (x.charAt(0), x))
    // groupby same key and count
    val rdd2 = rdd1.groupBy(x => x._1).
                    Map(x => (x._1, x._2.toList.length))
    

    第一个 Map 操作将 RDD 里的各个元素进行映射, RDD 的各个数据元素之间不存在依赖,可以在集群的各个内存中独立计算,也就是并行化

    第二个 groupby 之后的 Map 操作,为了计算相同 key 下的元素个数,需要把相同 key 的元素聚集到同一个 partition 下,所以造成了数据在内存中的重新分布,即 shuffle 操作.

    shuffle 操作是 spark 中最耗时的操作,应尽量避免不必要的 shuffle.

    宽依赖主要有两个过程: shuffle write 和 shuffle fetch.

    类似 Hadoop 的 Map 和 Reduce 阶段.
    shuffle write 将 ShuffleMapTask 任务产生的中间结果缓存到内存中, shuffle fetch 获得 ShuffleMapTask 缓存的中间结果进行 ShuffleReduceTask 计算,这个过程容易造成OutOfMemory.

    shuffle 过程内存分配使用 ShuffleMemoryManager 类管理,会针对每个 Task 分配内存,Task 任务完成后通过 Executor 释放空间.

    这里可以把 Task 理解成不同 key 的数据对应一个 Task.

    早期的内存分配机制使用公平分配,即不同 Task 分配的内存是一样的,但是这样容易造成内存需求过多的 Task 的 OutOfMemory, 从而造成多余的 磁盘 IO 过程,影响整体的效率.

    (例:某一个 key 下的数据明显偏多,但因为大家内存都一样,这一个 key 的数据就容易 OutOfMemory).

    在这里插入图片描述

    1.5版以后 Task 共用一个内存池,内存池的大小默认为 JVM 最大运行时内存容量的16%

    分配机制如下:

    假如有 N 个 Task,ShuffleMemoryManager 保证每个 Task 溢出之前至少可以申请到1/2N 内存,且至多申请到1/N

    N 为当前活动的 shuffle Task 数
    因为N 是一直变化的,所以 manager 会一直追踪 Task 数的变化,重新计算队列中的1/N 和1/2N.

    但是这样仍然容易造成内存需要多的 Task 任务溢出,所以最近有很多相关的研究是针对 shuffle 过程内存优化的.

    如下 DAG 流程图中,分别读取数据,经过处理后 join 2个 RDD 得到结果

    在这里插入图片描述
    在这个图中,根据是否发生 shuffle 操作能够将其分成如下的 stage 类型:

    在这里插入图片描述

    (join 需要针对同一个 key 合并,所以需要 shuffle)

    运行到每个 stage 的边界时,数据在父 stage 中按照 Task 写到磁盘上,而在子 stage 中通过网络按照 Task 去读取数据。这些操作会导致很重的网络以及磁盘的I/O,所以 stage 的边界是非常占资源的,在编写 Spark 程序的时候需要尽量避免的 。父 stage 中 partition 个数与子 stage 的 partition 个数可能不同,所以那些产生 stage 边界的 Transformation 常常需要接受一个 numPartition 的参数来觉得子 stage 中的数据将被切分为多少个 partition[^demoa]。

    PS:shuffle 操作的时候可以用 combiner 压缩数据,减少 IO 的消耗

    Spark原生框架处理数据流程

    1、Client提交应用。
    2、Master找到一个Worker启动Driver
    3、Driver向Master或者资源管理器申请资源,之后将应用转化为RDD Graph
    4、再由DAGSchedule将RDD Graph转化为Stage的有向无环图提交给TaskSchedule。
    5、再由TaskSchedule提交任务给Executor执行。
    6、其它组件协同工作,确保整个应用顺利执行。

    在这里插入图片描述

    Executor执行任务原理

    Executor完成一个任务需要做两部分工具,一部分就是加载数据源,也就是Spark的基础数据单元RDD。

    RDD的数据来源可以是多种多样的,我们这里以HDFS为例。

    Spark支持两种RDD操作:transformation和action。

    transformation操作

    transformation操作会针对已有的RDD创建一个新的RDD。

    transformation具有lazy特性,即transformation不会触发spark程序的执行,它们只是记录了对RDD所做的操作,不会自发的执行。

    只有执行了一个action,之前的所有transformation才会执行。

    常用的transformation介绍:

    map :将RDD中的每个元素传人自定义函数,获取一个新的元素,然后用新的元素组成新的RDD。

    filter:对RDD中每个元素进行判断,如果返回true则保留,返回false则剔除。

    flatMap:与map类似,但是对每个元素都可以返回一个或多个元素。

    groupByKey:根据key进行分组,每个key对应一个Iterable。

    reduceByKey:对每个key对应的value进行reduce操作。

    sortByKey:对每个key对应的value进行排序操作。

    join:对两个包含<key,value>对的RDD进行join操作,每个keyjoin上的pair,都会传入自定义函数进行处理。

    cogroup:同join,但是每个key对应的Iterable都会传入自定义函数进行处理。

    action操作

    action操作主要对RDD进行最后的操作,比如遍历,reduce,保存到文件等,并可以返回结果给Driver程序。

    action操作执行,会触发一个spark job的运行,从而触发这个action之前所有的transformation的执行,这是action的特性。

    常用的action介绍:

    reduce:将RDD中的所有元素进行聚合操作。第一个和第二个元素聚合,值与第三个元素聚合,值与第四个元素聚合,以此类推。

    collect:将RDD中所有元素获取到本地客户端(一般不建议使用)。

    count:获取RDD元素总数。

    take(n):获取RDD中前n个元素。

    saveAsTextFile:将RDD元素保存到文件中,对每个元素调用toString方法。

    countByKey:对每个key对应的值进行count计数。

    foreach:遍历RDD中的每个元素。

    在这里插入图片描述

    Spark on yarn 框架处理数据流程

    1、基于YARN的Spark作业首先由客户端生成作业信息,提交给ResourceManager。
    2、ResourceManager在某一NodeManager汇报时把AppMaster分配给NodeManager。
    3、NodeManager启动SparkAppMaster。
    4、SparkAppMastere启动后初始化然后向ResourceManager申请资源。
    5、申请到资源后,SparkAppMaster通过RPC让NodeManager启动相应的SparkExecutor。
    6、SparkExecutor向SparkAppMaster汇报并完成相应的任务。
    7、SparkClient会通过AppMaster获取作业运行状态。

    在这里插入图片描述

    如何运行Spark程序

    在实际编程中,我们不需要关心调度细节.

    只需使用 Spark 提供的指定语言的编程接口调用相应的 API 即可.

    在 Spark API 中, 一个 应用(Application) 对应一个 SparkContext 的实例。

    一个 应用 可以用于单个 Job,或者分开的多个 Job 的 session,或者响应请求的长时间生存的服务器。

    与 MapReduce 不同的是,一个 应用 的进程(我们称之为 Executor),会一直在集群上运行,即使当时没有 Job 在上面运行。

    而调用一个Spark内部的 Action 会产生一个 Spark job 来完成它。

    为了确定这些job实际的内容,Spark 检查 RDD 的DAG再计算出执行 plan 。

    这个 plan 以最远端的 RDD 为起点(最远端指的是对外没有依赖的 RDD 或者 数据已经缓存下来的 RDD),产生结果 RDD 的 Action 为结束 。

    并根据是否发生 shuffle 划分 DAG 的 stage.

    参考链接:

    https://www.aboutyun.com/forum.php?mod=viewthread&tid=24883

    https://www.cnblogs.com/cxxjohnson/p/8909578.html

    展开全文
  • spark官方文档中文版

    千次下载 热门讨论 2015-02-06 22:39:35
    本文翻译自Spark官方文档 spark官方文档中文版
  • Spark Shell 的使用

    万次阅读 2019-03-30 15:32:56
    前一章中我们介绍了Spark的Standalone模式的安装. 本章我们介绍下Spark Shell操作窗口的基本的安装. 基本启动与使用 基本启动与使用 本地启动 进入./bin目录, 使用spark-shell即可启动. localhost:bin Sean$ ...
  • spark 官网首页

    千次阅读 2020-09-13 19:20:48
    简单的spark概述: 原文: Apache Spark is a fast and general-purpose cluster computing system. It provides high-level APIs in Java, Scala, Python and R, and an optimized engine that supports general ...
  • [Spark版本更新]--Spark-2.4.0 发布说明

    千次阅读 2018-11-09 09:33:26
    2018-11-02 Apache Spark 官方发布了 2.4.0版本,以下是 Release Notes,供参考:  Sub-task [ SPARK-6236 ] - 支持大于2G的缓存块 [ SPARK-6237 ] - 支持上传块&gt; 2GB作为流 [ SPARK-10884 ] - ...
  • Spark之——Spark Submit提交应用程序

    万次阅读 2018-06-19 21:44:36
    本部分来源,也可以到spark官网查看英文版。 spark-submit 是在spark安装目录中bin目录下的一个shell脚本文件,用于在集群中启动应用程序(如*.py脚本);对于spark支持的集群模式,spark-submit提交应用的时候有...
  • Spark调优 | Spark SQL参数调优

    万次阅读 2019-07-26 09:45:29
    Spark SQL里面有很多的参数,而且这些参数在Spark官网中没有明确的解释,可能是太多了吧,可以通过在spark-sql中使用set -v 命令显示当前spark-sql版本支持的参数。 本文讲解最近关于在参与hive往spark迁移过程中...
  • SparkSpark基础教程

    万次阅读 多人点赞 2019-03-20 12:33:42
    Spark最初由美国加州伯克利大学的AMP实验室于2009年开发,是基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序。 Spark特点 Spark具有如下几个主要特点: 运行速度快:Spark使用先进...
  • If so, this book will be your companion as you create data-intensive app using Spark as a processing engine, Python visualization libraries, and web frameworks such as Flask. To begin with, you will...
  • Spark、Python spark、Hadoop简介

    千次阅读 2018-09-05 18:10:21
    Spark、Python spark、Hadoop简介 Spark简介 1、Spark简介及功能模块 Spark是一个弹性的分布式运算框架,作为一个用途广泛的大数据运算平台,Spark允许用户将数据加载到cluster集群的内存中储存,并多次重复...
  • Explore the integration of Apache Spark with third party applications such as H20, Databricks and Titan Evaluate how Cassandra and Hbase can be used for storage An advanced guide with a combination of...
  • Spark Streaming 实时写入Hive

    千次阅读 2021-02-01 17:06:08
    所以使用Spark Streaming替代Flume实现入库Hive功能。 二、流程图 Created with Raphaël 2.2.0kafkaSpark StreamingETLhive 三、代码实现 pom文件 <?xml version="1.0" encoding="UTF-8"?> <project ...
  • 我们在之前的文章中 已经了解了 spark支持的模式,其中一种就是 使用k8s进行管理。 hadoop组件—spark----全面了解spark以及与hadoop的区别 是时候考虑让你的 Spark 跑在K8s 上了 spark on k8s的优势–为什么要把...
  • spark3.0-新特性

    千次阅读 2020-06-27 20:05:35
    spark 3.0 终于出了!!! Apache Spark 3.0.0是3.x系列的第一个发行版。投票于2020年6月10日获得通过。此版本基于git标签v3.0.0,其中包括截至6月10日的所有提交。Apache Spark 3.0建立在Spark 2.x的许多创新基础之...
  • sparkSpark环境搭建(运行模式)

    千次阅读 2020-04-13 20:03:14
    一、local本地模式 ...mv spark-2.2.0-bin-2.6.0-cdh5.14.0 spark 如果有权限问题,可以修改为root,方便学习时操作,实际中使用运维分配的用户和权限即可 chown -R root /export/servers/s...
  • 基于spark的电影推荐系统数据集

    热门讨论 2015-05-27 19:39:10
    基于spark的电影推荐系统数据集
  • spark2.4.4源码编译】windows环境编译spark2.4.4源码

    万次阅读 多人点赞 2019-11-09 14:17:14
    windows环境下如何编译spark2.4.4源码环境要求环境安装源码下载源码编译注意事项后记 环境要求 操作系统环境:Windows 10(Windows7、Windows8亦可) Java版本: jdk1.8 Scala版本:2.11.0 Maven版本:3.5.4 Git版本...
  • spark 安装部署与介绍

    千次阅读 2019-10-09 07:56:54
    spark spark 概述一. spark和hadoop二. 应用常景和解决生态系统组件应用场景Spark执行任务流程图三. Spark安装四. Spark部署模式1、单机本地模式(Spark所有进程都运行在一台机器的JVM中)2、伪分布式模式 (在一台...
  • Spark2.1.0之剖析spark-shell

    千次阅读 2018-04-20 09:30:10
    通过在spark-shell中执行word count的过程,让读者了解到可以使用spark-shell提交Spark作业。现在读者应该很想知道spark-shell究竟做了什么呢?脚本分析 在Spark安装目录的bin文件夹下可以找到spark-shell,其中有...
  • Spark入门详解

    万次阅读 多人点赞 2018-08-16 15:05:04
    Spark概述 1 11 什么是Spark 2 Spark特点 3 Spark的用户和用途 二 Spark集群安装 1 集群角色 2 机器准备 3 下载Spark安装包 4 配置SparkStandalone 5 配置Job History ServerStandalone 6 ...
  • Cloudera高级架构师Phil Tian(田凤占)在OpenCloud 2015大会Spark专场的演讲PPT:Spark驱动智能大数据分析应用,谈到了在与Hadoop社区良好集成的同时,Spark当下已经得到更广泛社区和提供商的支持,还通过多个公司...
  • Spark基础知识详解

    万次阅读 多人点赞 2018-12-12 17:45:38
    它还支持一组丰富的高级工具,包括用于SQL和结构化数据处理的Spark SQL,用于机器学习的MLlib,用于图形处理的GraphX和Spark Streaming。 Spark优点: 减少磁盘I/O:随着实时大数据应用越来越多...
  • 大数据基础:Spark工作原理及基础概念

    千次阅读 多人点赞 2020-11-13 19:01:00
    导语 |Apache Spark 是专为大规模数据处理而设计的快速通用计算引擎,在数据挖掘和机器学习领域有着广泛的应用,现在也已形成一个高速发展、应用广泛的生态系统。本文将为大家详细介...
  • Spark命令详解

    千次阅读 多人点赞 2020-02-21 09:20:00
            本篇博客,Alice为大家带来关于Spark命令的详解。 ...之前我们使用提交任务都是使用spark-shell提交,spark-shell是Spark自带的交互式Shell程...
  • 使用方法: 请使用eclipse的maven导入方式导入,代码在http://blog.csdn.net/q79969786/article/details/42793487有介绍

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 354,236
精华内容 141,694
关键字:

spark