
- 基 于
- MapReduce算法实现的分布式计算
- 最新版本
- 2.4.0
- 外文名
- Spark
-
Spark
2020-04-22 10:59:32什么是Spark Spark特点 Spark运行模式 Spark编写代码 SparkCore 什么是RDD RDD的主要属性 RDD的算子分为两类: Rdd数据持久化什么作用? cache和Checkpoint的区别 什么是宽窄依赖 什么是DAG DAG边界 ...目录
什么是Spark
基于内存的,用于大规模数据处理(离线计算、实时计算、快速查询(交互式查询))的统一分析引擎。
Spark特点
快:
Spark计算速度是MapReduce计算速度的10-100倍
易用:(算法多)
MR支持1种计算模型,Spsark支持更多的计算模型。
通用:
Spark 能够进行离线计算、交互式查询(快速查询)、实时计算、机器学习、图计算等
兼容性:
Spark支持大数据中的Yarn调度,支持mesos。可以处理hadoop计算的数据
Spark运行模式
1.local本地模式(单机)--开发测试使用
2.standalone独立集群模式--开发测试使用
3.standalone-HA高可用模式--生产环境使用
4.on yarn集群模式--生产环境使用
5.on cloud集群模式--中小公司未来会更多的使用云服务
Spark编写代码
- 创建一个 Sparkconf对象,设置app名称
- 创建一个SparkContext,
- 读取数据,对数据进行计算
- 保存数据
SparkCore
什么是RDD
弹性分布式数据集(数据存储在内存),一个不可变、可分区、里面的元素可并行计算的集合
RDD的主要属性
1、数据集的基本组成单位是一组分片(Partition)或一个分区(Partition)列表
每个分片都会被一个计算任务处理,分片数决定并行度。
2、一个函数会被作用在每一个分区。
3、一个RDD会依赖于其他多个RDD,RDD的每次转换都会生成一个新的RDD
RDD的算子分为两类:
1.Transformation转换操作:返回一个新的RDD
2.Action动作操作:返回值不是RDD
惰性计算,遇到Transformation不计算,遇到Action在真正计算。
Rdd数据持久化什么作用?
1、对多次使用的rdd进行缓存,缓存到内存,当后续频繁使用时直接在内存中读取缓存的数据,不需要重新计算。 (Persist、Cache)
2、将RDD结果写入硬盘(容错机制),当RDD丢失数据时,或依赖的RDD丢失数据时,可以使用持久化到硬盘的数据恢复。(MEMORY_ONLY(默认)、MEMORY_AND_DISK、DISK_ONLY)
SparkContext.setCheckpointDir("目录") //HDFS的目录
RDD.checkpoint()
cache和Checkpoint的区别
位置
Persist 和 Cache将数据保存在内存
Checkpoint将数据保存在HDFS
生命周期
Persist 和 Cache 程序结束后会被清除或手动调用unpersist方法。
Checkpoint永久存储不会被删除。
RDD依赖关系(血统Lineage)
Persist和Cache,不会丢掉RDD间的依赖链/依赖关系
Checkpoint会斩断依赖链
什么是宽窄依赖
窄依赖:父RDD的一个分区只会被子RDD的一个分区依赖
宽依赖:父RDD的一个分区会被子RDD的多个分区依赖(涉及到shuffle)
什么是DAG
DAG:指的是数据转换执行的过程,有方向,无闭环(其实就是RDD执行的流程)
DAG边界
开始:通过SparkContext创建的RDD
结束:触发Action,一旦触发Action就形成了一个完整的DAG
说明:
一个Spark应用中可以有一到多个DAG,取决于触发了多少次Action
一个DAG中会有不同的阶段/stage,划分阶段/stage的依据就是宽依赖
一个阶段/stage中可以有多个Task,一个分区对应一个Task
Spark概念
1.Application:指的是用户编写的Spark应用程序/代码,包含了Driver功能代码和分布在集群中多个节点上运行的Executor代码。
2.Driver:Spark中的Driver即运行上述Application的Main()函数并且创建SparkContext,SparkContext负责和ClusterManager通信,进行资源的申请、任务的分配和监控等
3.Cluster Manager:指的是在集群上获取资源的外部服务,Standalone模式下由Master负责,Yarn模式下ResourceManager负责;
4.Executor:是运行在工作节点Worker上的进程,负责运行任务,并为应用程序存储数据,是执行分区计算任务的进程;
5.RDD:Resilient Distributed Dataset弹性分布式数据集,是分布式内存的一个抽象概念;
6.DAG:Directed Acyclic Graph有向无环图,反映RDD之间的依赖关系和执行流程;
7.Job:作业,按照DAG执行就是一个作业;Job==DAG
8.Stage:阶段,是作业的基本调度单位,同一个Stage中的Task可以并行执行,多个Task组成TaskSet任务集
9.Task:任务,运行在Executor上的工作单元,一个Task计算一个分区,包括pipline上的一系列操作
Spark执行任务的基本流程
1.Spark应用被提交-->SparkContext向资源管理器注册并申请资源 (??) -->启动Executor
2.RDD-->构建DAG-->DAGScheduler划分Stage形成TaskSet-->TaskScheduler提交Task-->Worker上的Executor执行Task
-
【Spark】Spark基础教程
2019-03-20 12:33:42Spark最初由美国加州伯克利大学的AMP实验室于2009年开发,是基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序。 Spark特点 Spark具有如下几个主要特点: 运行速度快:Spark使用先进...Spark最初由美国加州伯克利大学的AMP实验室于2009年开发,是基于内存计算的大数据并行计算框架,可用于构建大型的、低延迟的数据分析应用程序。
Spark特点
Spark具有如下几个主要特点:
- 运行速度快:Spark使用先进的DAG(Directed Acyclic Graph,有向无环图)执行引擎,以支持循环数据流与内存计算,基于内存的执行速度可比Hadoop MapReduce快上百倍,基于磁盘的执行速度也能快十倍;
- 容易使用:Spark支持使用Scala、Java、Python和R语言进行编程,简洁的API设计有助于用户轻松构建并行程序,并且可以通过Spark Shell进行交互式编程;
- 通用性:Spark提供了完整而强大的技术栈,包括SQL查询、流式计算、机器学习和图算法组件,这些组件可以无缝整合在同一个应用中,足以应对复杂的计算;
- 运行模式多样:Spark可运行于独立的集群模式中,或者运行于Hadoop中,也可运行于Amazon EC2等云环境中,并且可以访问HDFS、Cassandra、HBase、Hive等多种数据源。
Spark相对于Hadoop的优势
Hadoop虽然已成为大数据技术的事实标准,但其本身还存在诸多缺陷,最主要的缺陷是其MapReduce计算模型延迟过高,无法胜任实时、快速计算的需求,因而只适用于离线批处理的应用场景。
回顾Hadoop的工作流程,可以发现Hadoop存在如下一些缺点:
- 表达能力有限。计算都必须要转化成Map和Reduce两个操作,但这并不适合所有的情况,难以描述复杂的数据处理过程;
- 磁盘IO开销大。每次执行时都需要从磁盘读取数据,并且在计算完成后需要将中间结果写入到磁盘中,IO开销较大;
- 延迟高。一次计算可能需要分解成一系列按顺序执行的MapReduce任务,任务之间的衔接由于涉及到IO开销,会产生较高延迟。而且,在前一个任务执行完成之前,其他任务无法开始,难以胜任复杂、多阶段的计算任务。
Spark主要具有如下优点:
- Spark的计算模式也属于MapReduce,但不局限于Map和Reduce操作,还提供了多种数据集操作类型,编程模型比MapReduce更灵活;
- Spark提供了内存计算,中间结果直接放到内存中,带来了更高的迭代运算效率;
- Spark基于DAG的任务调度执行机制,要优于MapReduce的迭代执行机制。
Spark最大的特点就是将计算数据、中间结果都存储在内存中,大大减少了IO开销
Spark提供了多种高层次、简洁的API,通常情况下,对于实现相同功能的应用程序,Spark的代码量要比Hadoop少2-5倍。
但Spark并不能完全替代Hadoop,主要用于替代Hadoop中的MapReduce计算模型。实际上,Spark已经很好地融入了Hadoop生态圈,并成为其中的重要一员,它可以借助于YARN实现资源调度管理,借助于HDFS实现分布式存储。
Spark生态系统
Spark的生态系统主要包含了Spark Core、Spark SQL、Spark Streaming、MLLib和GraphX 等组件,各个组件的具体功能如下:
- Spark Core:Spark Core包含Spark的基本功能,如内存计算、任务调度、部署模式、故障恢复、存储管理等。Spark建立在统一的抽象RDD之上,使其可以以基本一致的方式应对不同的大数据处理场景;通常所说的Apache Spark,就是指Spark Core;
- Spark SQL:Spark SQL允许开发人员直接处理RDD,同时也可查询Hive、HBase等外部数据源。Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行查询,并进行更复杂的数据分析;
- Spark Streaming:Spark Streaming支持高吞吐量、可容错处理的实时流数据处理,其核心思路是将流式计算分解成一系列短小的批处理作业。Spark Streaming支持多种数据输入源,如Kafka、Flume和TCP套接字等;
- MLlib(机器学习):MLlib提供了常用机器学习算法的实现,包括聚类、分类、回归、协同过滤等,降低了机器学习的门槛,开发人员只要具备一定的理论知识就能进行机器学习的工作;
- GraphX(图计算):GraphX是Spark中用于图计算的API,可认为是Pregel在Spark上的重写及优化,Graphx性能良好,拥有丰富的功能和运算符,能在海量数据上自如地运行复杂的图算法。
Spark基本概念
在具体讲解Spark运行架构之前,需要先了解几个重要的概念:
- RDD:是弹性分布式数据集(Resilient Distributed Dataset)的简称,是分布式内存的一个抽象概念,提供了一种高度受限的共享内存模型;
- DAG:是Directed Acyclic Graph(有向无环图)的简称,反映RDD之间的依赖关系;
- Executor:是运行在工作节点(Worker Node)上的一个进程,负责运行任务,并为应用程序存储数据;
- 应用:用户编写的Spark应用程序;
- 任务:运行在Executor上的工作单元;
- 作业:一个作业包含多个RDD及作用于相应RDD上的各种操作;
- 阶段:是作业的基本调度单位,一个作业会分为多组任务,每组任务被称为“阶段”,或者也被称为“任务集”。
Spark结构设计
Spark运行架构包括集群资源管理器(Cluster Manager)、运行作业任务的工作节点(Worker Node)、每个应用的任务控制节点(Driver)和每个工作节点上负责具体任务的执行进程(Executor)。其中,集群资源管理器可以是Spark自带的资源管理器,也可以是YARN或Mesos等资源管理框架。
Spark各种概念之间的关系
在Spark中,一个应用(Application)由一个任务控制节点(Driver)和若干个作业(Job)构成,一个作业由多个阶段(Stage)构成,一个阶段由多个任务(Task)组成。当执行一个应用时,任务控制节点会向集群管理器(Cluster Manager)申请资源,启动Executor,并向Executor发送应用程序代码和文件,然后在Executor上执行任务,运行结束后,执行结果会返回给任务控制节点,或者写到HDFS或者其他数据库中。
Executor的优点
与Hadoop MapReduce计算框架相比,Spark所采用的Executor有两个优点:
- 利用多线程来执行具体的任务(Hadoop MapReduce采用的是进程模型),减少任务的启动开销;
- Executor中有一个BlockManager存储模块,会将内存和磁盘共同作为存储设备,当需要多轮迭代计算时,可以将中间结果存储到这个存储模块里,下次需要时,就可以直接读该存储模块里的数据,而不需要读写到HDFS等文件系统里,因而有效减少了IO开销;或者在交互式查询场景下,预先将表缓存到该存储系统上,从而可以提高读写IO性能。
Spark运行基本流程
Spark的基本运行流程如下:
- 当一个Spark应用被提交时,首先需要为这个应用构建起基本的运行环境,即由任务控制节点(Driver)创建一个SparkContext,由SparkContext负责和资源管理器(Cluster Manager)的通信以及进行资源的申请、任务的分配和监控等。SparkContext会向资源管理器注册并申请运行Executor的资源;
- 资源管理器为Executor分配资源,并启动Executor进程,Executor运行情况将随着“心跳”发送到资源管理器上;
- SparkContext根据RDD的依赖关系构建DAG图,DAG图提交给DAG调度器(DAGScheduler)进行解析,将DAG图分解成多个“阶段”(每个阶段都是一个任务集),并且计算出各个阶段之间的依赖关系,然后把一个个“任务集”提交给底层的任务调度器(TaskScheduler)进行处理;Executor向SparkContext申请任务,任务调度器将任务分发给Executor运行,同时,SparkContext将应用程序代码发放给Executor;
- 任务在Executor上运行,把执行结果反馈给任务调度器,然后反馈给DAG调度器,运行完毕后写入数据并释放所有资源。
Spark运行架构的特点
Spark运行架构具有以下特点:
- 每个应用都有自己专属的Executor进程,并且该进程在应用运行期间一直驻留。Executor进程以多线程的方式运行任务,减少了多进程任务频繁的启动开销,使得任务执行变得非常高效和可靠;
- Spark运行过程与资源管理器无关,只要能够获取Executor进程并保持通信即可;
- Executor上有一个BlockManager存储模块,类似于键值存储系统(把内存和磁盘共同作为存储设备),在处理迭代计算任务时,不需要把中间结果写入到HDFS等文件系统,而是直接放在这个存储系统上,后续有需要时就可以直接读取;在交互式查询场景下,也可以把表提前缓存到这个存储系统上,提高读写IO性能;
- 任务采用了数据本地性和推测执行等优化机制。数据本地性是尽量将计算移到数据所在的节点上进行,即“计算向数据靠拢”,因为移动计算比移动数据所占的网络资源要少得多。而且,Spark采用了延时调度机制,可以在更大的程度上实现执行过程优化。比如,拥有数据的节点当前正被其他的任务占用,那么,在这种情况下是否需要将数据移动到其他的空闲节点呢?答案是不一定。因为,如果经过预测发现当前节点结束当前任务的时间要比移动数据的时间还要少,那么,调度就会等待,直到当前节点可用。
Spark的部署模式
Spark支持的三种典型集群部署方式,即standalone、Spark on Mesos和Spark on YARN;然后,介绍在企业中是如何具体部署和应用Spark框架的,在企业实际应用环境中,针对不同的应用场景,可以采用不同的部署应用方式,或者采用Spark完全替代原有的Hadoop架构,或者采用Spark和Hadoop一起部署的方式。
Spark三种部署方式
Spark应用程序在集群上部署运行时,可以由不同的组件为其提供资源管理调度服务(资源包括CPU、内存等)。比如,可以使用自带的独立集群管理器(standalone),或者使用YARN,也可以使用Mesos。因此,Spark包括三种不同类型的集群部署方式,包括standalone、Spark on Mesos和Spark on YARN。
1.standalone模式
与MapReduce1.0框架类似,Spark框架本身也自带了完整的资源调度管理服务,可以独立部署到一个集群中,而不需要依赖其他系统来为其提供资源管理调度服务。在架构的设计上,Spark与MapReduce1.0完全一致,都是由一个Master和若干个Slave构成,并且以槽(slot)作为资源分配单位。不同的是,Spark中的槽不再像MapReduce1.0那样分为Map 槽和Reduce槽,而是只设计了统一的一种槽提供给各种任务来使用。
2.Spark on Mesos模式
Mesos是一种资源调度管理框架,可以为运行在它上面的Spark提供服务。Spark on Mesos模式中,Spark程序所需要的各种资源,都由Mesos负责调度。由于Mesos和Spark存在一定的血缘关系,因此,Spark这个框架在进行设计开发的时候,就充分考虑到了对Mesos的充分支持,因此,相对而言,Spark运行在Mesos上,要比运行在YARN上更加灵活、自然。目前,Spark官方推荐采用这种模式,所以,许多公司在实际应用中也采用该模式。
3. Spark on YARN模式
Spark可运行于YARN之上,与Hadoop进行统一部署,即“Spark on YARN”,其架构如图9-13所示,资源管理和调度依赖YARN,分布式存储则依赖HDFS。
Hadoop和Spark的统一部署
一方面,由于Hadoop生态系统中的一些组件所实现的功能,目前还是无法由Spark取代的,比如,Storm可以实现毫秒级响应的流计算,但是,Spark则无法做到毫秒级响应。另一方面,企业中已经有许多现有的应用,都是基于现有的Hadoop组件开发的,完全转移到Spark上需要一定的成本。因此,在许多企业实际应用中,Hadoop和Spark的统一部署是一种比较现实合理的选择。
由于Hadoop MapReduce、HBase、Storm和Spark等,都可以运行在资源管理框架YARN之上,因此,可以在YARN之上进行统一部署(如图9-16所示)。这些不同的计算框架统一运行在YARN中,可以带来如下好处:- 计算资源按需伸缩;
- 不用负载应用混搭,集群利用率高;
- 共享底层存储,避免数据跨集群迁移。
-
JAVA代码实现编程式提交Spark任务
2020-07-29 16:54:111)直接调用SparkSubmit的main方法 2)SparkLauncher类的launch方法或者startApplication方法 3)使用RestSubmissionClient的run方法 SparkSubmit提交任务 String[] param = { "--class", "org.apache....三种方法作为记录:
1)直接调用SparkSubmit的main方法
2)SparkLauncher类的launch方法或者startApplication方法
3)使用RestSubmissionClient的run方法
SparkSubmit提交任务
String[] param = { "--class", "org.apache.spark.examples.SparkPi", "--master", "local[2]", "/bigdata/spark-2.4.6-bin-hadoop2.6/examples/jars/spark-examples_2.11-2.4.6.jar", }; SparkSubmit.main(param); System.out.println("任务运行完成。。。。。。。。。");
可见,基本上就是把spark-submit脚本放到了java中执行!
但是要注意:(Rest URL端口是6066)
"--master", "spark://192.168.0.181:6066"
SparkLauncher提交任务
public static void main(String[] args) throws Exception { HashMap<String, String> envParams = new HashMap<>(); envParams.put("YARN_CONF_DIR", "/home/hadoop/cluster/hadoop-release/etc/hadoop"); envParams.put("HADOOP_CONF_DIR", "/home/hadoop/cluster/hadoop-release/etc/hadoop"); envParams.put("SPARK_HOME", "/home/hadoop/cluster/spark-new"); envParams.put("SPARK_PRINT_LAUNCH_COMMAND", "1"); SparkAppHandle spark = new SparkLauncher(envParams) .setAppResource("/home/hadoop/cluster/spark-new/examples/jars/spark-examples_2.11-2.2.1.jar") .setMainClass("org.apache.spark.examples.SparkPi") .setMaster("yarn") .startApplication(); Thread.sleep(100000); }
RestSubmissionClient的run方法提交
@Test public void submit() { String appResource = "hdfs://192.168.0.181:8020/opt/guoxiang/wordcount.jar"; String mainClass = "com.fly.spark.WordCount"; String[] args = { "hdfs://192.168.0.181:8020/opt/guoxiang/wordcount.txt", "hdfs://192.168.0.181:8020/opt/guoxiang/wordcount" }; SparkConf sparkConf = new SparkConf(); // 下面的是参考任务实时提交的Debug信息编写的 sparkConf.setMaster("spark://192.168.0.181:6066") .setAppName("carabon" + " " + System.currentTimeMillis()) .set("spark.executor.cores", "4") .set("spark.submit.deployMode", "cluster") .set("spark.jars", appResource) .set("spark.executor.memory", "1g") .set("spark.cores.max", "4") .set("spark.driver.supervise", "false"); Map<String, String> env = System.getenv(); CreateSubmissionResponse response = null; try { response = (CreateSubmissionResponse) RestSubmissionClient.run(appResource, mainClass, args, sparkConf, new HashMap<>()); } catch (Exception e) { e.printStackTrace(); } System.out.println(response.toJson()); }
-
Spark排错与优化
2015-10-15 17:08:36Master默认使用512M内存,当集群中运行的任务特别多时,就会挂掉,原因是master会读取每个task的event log日志去生成spark ui,内存不足自然会OOM,可以在master的运行日志中看到,通过HA启动的master自然也会因为这...一. 运维
1. Master挂掉,standby重启也失效
Master默认使用512M内存,当集群中运行的任务特别多时,就会挂掉,原因是master会读取每个task的event log日志去生成spark ui,内存不足自然会OOM,可以在master的运行日志中看到,通过HA启动的master自然也会因为这个原因失败。
解决
-
增加Master的内存占用,在Master节点
spark-env.sh
中设置:export SPARK_DAEMON_MEMORY 10g # 根据你的实际情况
-
减少保存在Master内存中的作业信息
spark.ui.retainedJobs 500 # 默认都是1000 spark.ui.retainedStages 500
2. worker挂掉或假死
有时候我们还会在web ui中看到worker节点消失或处于dead状态,在该节点运行的任务则会报各种
lost worker
的错误,引发原因和上述大体相同,worker内存中保存了大量的ui信息导致gc时失去和master之间的心跳。解决
-
增加Master的内存占用,在Worker节点
spark-env.sh
中设置:export SPARK_DAEMON_MEMORY 2g # 根据你的实际情况
-
减少保存在Worker内存中的Driver,Executor信息
spark.worker.ui.retainedExecutors 200 # 默认都是1000 spark.worker.ui.retainedDrivers 200
二. 运行错误
1.shuffle FetchFailedException
Spark Shuffle FetchFailedException解决方案
错误提示
-
missing output location
org.apache.spark.shuffle.MetadataFetchFailedException: Missing an output location for shuffle 0
-
shuffle fetch faild
org.apache.spark.shuffle.FetchFailedException: Failed to connect to spark047215/192.168.47.215:50268
当前的配置为每个executor使用1core,5GRAM,启动了20个executor解决
这种问题一般发生在有大量shuffle操作的时候,task不断的failed,然后又重执行,一直循环下去,直到application失败。
一般遇到这种问题提高executor内存即可,同时增加每个executor的cpu,这样不会减少task并行度。- spark.executor.memory 15G
- spark.executor.cores 3
- spark.cores.max 21
启动的execuote数量为:7个
execuoterNum = spark.cores.max/spark.executor.cores
每个executor的配置:
3core,15G RAM
消耗的内存资源为:105G RAM
15G*7=105G
可以发现使用的资源并没有提升,但是同样的任务原来的配置跑几个小时还在卡着,改了配置后几分钟就能完成。
2.Executor&Task Lost
错误提示
-
executor lost
WARN TaskSetManager: Lost task 1.0 in stage 0.0 (TID 1, aa.local): ExecutorLostFailure (executor lost)
-
task lost
WARN TaskSetManager: Lost task 69.2 in stage 7.0 (TID 1145, 192.168.47.217): java.io.IOException: Connection from /192.168.47.217:55483 closed
-
各种timeout
java.util.concurrent.TimeoutException: Futures timed out after [120 second] ERROR TransportChannelHandler: Connection to /192.168.47.212:35409 has been quiet for 120000 ms while there are outstanding requests. Assuming connection is dead; please adjust spark.network. timeout if this is wrong
解决
由网络或者gc引起,worker或executor没有接收到executor或task的心跳反馈。
提高spark.network.timeout
的值,根据情况改成300(5min)或更高。
默认为 120(120s),配置所有网络传输的延时,如果没有主动设置以下参数,默认覆盖其属性- spark.core.connection.ack.wait.timeout
- spark.akka.timeout
- spark.storage.blockManagerSlaveTimeoutMs
- spark.shuffle.io.connectionTimeout
- spark.rpc.askTimeout or spark.rpc.lookupTimeout
3.倾斜
错误提示
-
数据倾斜
-
任务倾斜
差距不大的几个task,有的运行速度特别慢。
解决
大多数任务都完成了,还有那么一两个任务怎么都跑不完或者跑的很慢,分为数据倾斜和task倾斜两种。
-
数据倾斜
数据倾斜大多数情况是由于大量的无效数据引起,比如null或者"",也有可能是一些异常数据,比如统计用户登录情况时,出现某用户登录过千万次的情况,无效数据在计算前需要过滤掉。
数据处理有一个原则,多使用filter,这样你真正需要分析的数据量就越少,处理速度就越快。sqlContext.sql("...where col is not null and col != ''")
具体可参考:
解决spark中遇到的数据倾斜问题
2. 任务倾斜
task倾斜原因比较多,网络io,cpu,mem都有可能造成这个节点上的任务执行缓慢,可以去看该节点的性能监控来分析原因。以前遇到过同事在spark的一台worker上跑R的任务导致该节点spark task运行缓慢。
或者可以开启spark的推测机制,开启推测机制后如果某一台机器的几个task特别慢,推测机制会将任务分配到其他机器执行,最后Spark会选取最快的作为最终结果。* spark.speculation true * spark.speculation.interval 100 - 检测周期,单位毫秒; * spark.speculation.quantile 0.75 - 完成task的百分比时启动推测 * spark.speculation.multiplier 1.5 - 比其他的慢多少倍时启动推测。
4.OOM
错误提示
堆内存溢出
java.lang.OutOfMemoryError: Java heap space
解决
内存不够,数据太多就会抛出OOM的Exeception,主要有driver OOM和executor OOM两种
-
driver OOM
一般是使用了collect操作将所有executor的数据聚合到driver导致。尽量不要使用collect操作即可。 -
executor OOM
可以按下面的内存优化的方法增加code使用内存空间
- 增加executor内存总量,也就是说增加
spark.executor.memory
的值 - 增加任务并行度(大任务就被分成小任务了),参考下面优化并行度的方法
5.task not serializable
错误提示
org.apache.spark.SparkException: Job aborted due to stage failure: Task not serializable: java.io.NotSerializableException: ...
解决
如果你在worker中调用了driver中定义的一些变量,Spark就会将这些变量传递给Worker,这些变量并没有被序列化,所以就会看到如上提示的错误了。
val x = new X() //在driver中定义的变量 dd.map{r => x.doSomething(r) }.collect //map中的代码在worker(executor)中执行
除了上文的map,还有filter,foreach,foreachPartition等操作,还有一个典型例子就是在foreachPartition中使用数据库创建连接方法。这些变量没有序列化导致的任务报错。
下面提供三种解决方法:
- 将所有调用到的外部变量直接放入到以上所说的这些算子中,这种情况最好使用foreachPartition减少创建变量的消耗。
- 将需要使用的外部变量包括
sparkConf
,SparkContext
,都用@transent
进行注解,表示这些变量不需要被序列化 - 将外部变量放到某个class中对类进行序列化。
6.driver.maxResultSize太小
错误提示
Caused by: org.apache.spark.SparkException: Job aborted due to stage failure: Total size of serialized results of 374 tasks (1026.0 MB) is bigger than spark.driver.maxResultSize (1024.0 MB)
解决
spark.driver.maxResultSize默认大小为1G 每个Spark action(如collect)所有分区的序列化结果的总大小限制,简而言之就是executor给driver返回的结果过大,报这个错说明需要提高这个值或者避免使用类似的方法,比如countByValue,countByKey等。
将值调大即可
spark.driver.maxResultSize 2g
7.taskSet too large
错误提示
WARN TaskSetManager: Stage 198 contains a task of very large size (5953 KB). The maximum recommended task size is 100 KB.
这个WARN可能还会导致ERROR
Caused by: java.lang.RuntimeException: Failed to commit task Caused by: org.apache.spark.executor.CommitDeniedException: attempt_201603251514_0218_m_000245_0: Not committed because the driver did not authorize commit
解决
如果你比较了解spark中的stage是如何划分的,这个问题就比较简单了。
一个Stage中包含的task过大,一般由于你的transform过程太长,因此driver给executor分发的task就会变的很大。
所以解决这个问题我们可以通过拆分stage解决。也就是在执行过程中调用cache.count
缓存一些中间数据从而切断过长的stage。8. driver did not authorize commit
driver did not authorize commit
9. 环境报错
-
driver节点内存不足
driver内存不足导致无法启动application,将driver分配到内存足够的机器上或减少driver-memoryJava HotSpot(TM) 64-Bit Server VM warning: INFO:
os::commit_memory(0x0000000680000000, 4294967296, 0) failed;
error=‘Cannot allocate memory’ (errno=12) -
hdfs空间不够
hdfs空间不足,event_log无法写入,所以ListenerBus会报错
,增加hdfs空间(删除无用数据或增加节点)Caused by: org.apache.hadoop.ipc.RemoteException(java.io.IOException): File /tmp/spark-history/app-20151228095652-0072.inprogress could only be replicated to 0 nodes instead of minReplication (=1) ERROR LiveListenerBus: Listener EventLoggingListener threw an exception java.lang.reflect.InvocationTargetException
-
spark编译包与hadoop版本不一致
下载对应hadoop版本的spark包或自己编译。java.io.InvalidClassException: org.apache.spark.rdd.RDD; local class incompatible: stream classdesc serialVersionUID
-
driver机器端口使用过多
在一台机器上没有指定端口的情况下,提交了超过15个任务。16/03/16 16:03:17 ERROR SparkUI: Failed to bind SparkUI java.net.BindException: 地址已在使用: Service 'SparkUI' failed after 16 retries!
提交任务时指定app web ui端口号解决:
--conf spark.ui.port=xxxx
-
中文乱码
使用write.csv等方法写出到hdfs的文件,中文乱码。JVM使用的字符集如果没有指定,默认会使用系统的字符集,因为各个节点系统字符集并不都是UTF8导致,所以会出现这个问题。直接给JVM指定字符集即可。
spark-defaults.conf
spark.executor.extraJavaOptions -Dfile.encoding=UTF-8
三. pyspark 错误
1. rdd.isEmpty 一直为 true
python 调用函数一定要加括号,否则返回的是函数体本身,进行判断时始终为 True。
四. 优化
1. 部分Executor不执行任务
有时候会发现部分executor并没有在执行任务,为什么呢?
(1) 任务partition数过少,
要知道每个partition只会在一个task上执行任务。改变分区数,可以通过repartition
方法,即使这样,在repartition
前还是要从数据源读取数据,此时(读入数据时)的并发度根据不同的数据源受到不同限制,常用的大概有以下几种:hdfs - block数就是partition数 mysql - 按读入时的分区规则分partition es - 分区数即为 es 的 分片数(shard)
(2) 数据本地性的副作用
taskSetManager在分发任务之前会先计算数据本地性,优先级依次是:
process(同一个executor) -> node_local(同一个节点) -> rack_local(同一个机架) -> any(任何节点)
Spark会优先执行高优先级的任务,任务完成的速度很快(小于设置的spark.locality.wait时间),则数据本地性下一级别的任务则一直不会启动,这就是Spark的延时调度机制。
举个极端例子:运行一个count任务,如果数据全都堆积在某一台节点上,那将只会有这台机器在长期计算任务,集群中的其他机器则会处于等待状态(等待本地性降级)而不执行任务,造成了大量的资源浪费。
判断的公式为:
curTime – lastLaunchTime >= localityWaits(currentLocalityIndex)
其中
curTime
为系统当前时间,lastLaunchTime
为在某优先级下最后一次启动task的时间如果满足这个条件则会进入下一个优先级的时间判断,直到
any
,不满足则分配当前优先级的任务。数据本地性任务分配的源码在
taskSetManager.scala
。如果存在大量executor处于等待状态,可以降低以下参数的值(也可以设置为0),默认都是3s。
spark.locality.wait spark.locality.wait.process spark.locality.wait.node spark.locality.wait.rack
当你数据本地性很差,可适当提高上述值,当然也可以直接在集群中对数据进行balance。
2. spark task 连续重试失败
有可能哪台worker节点出现了故障,task执行失败后会在该
executor
上不断重试,达到最大重试次数后会导致整个application
执行失败,我们可以设置失败黑名单(task在该节点运行失败后会换节点重试),可以看到在源码中默认设置的是0
,private val EXECUTOR_TASK_BLACKLIST_TIMEOUT = conf.getLong("spark.scheduler.executorTaskBlacklistTime", 0L)
在
spark-default.sh
中设置spark.scheduler.executorTaskBlacklistTime 30000
当
task
在该executor
运行失败后会在其它executor
中启动,同时此executor
会进入黑名单30s(不会分发任务到该executor)。3. 内存
如果你的任务shuffle量特别大,同时rdd缓存比较少可以更改下面的参数进一步提高任务运行速度。
spark.storage.memoryFraction
- 分配给rdd缓存的比例,默认为0.6(60%),如果缓存的数据较少可以降低该值。spark.shuffle.memoryFraction
- 分配给shuffle数据的内存比例,默认为0.2(20%)
剩下的20%内存空间则是分配给代码生成对象等。如果任务运行缓慢,jvm进行频繁gc或者内存空间不足,或者可以降低上述的两个值。
"spark.rdd.compress","true"
- 默认为false,压缩序列化的RDD分区,消耗一些cpu减少空间的使用4. 并发
mysql读取并发度优化
spark 读取 hdfs 数据分区规则spark.default.parallelism
发生shuffle时的并行度,在standalone模式下的数量默认为core的个数,也可手动调整,数量设置太大会造成很多小任务,增加启动任务的开销,太小,运行大数据量的任务时速度缓慢。spark.sql.shuffle.partitions
sql聚合操作(发生shuffle)时的并行度,默认为200,如果该值太小会导致OOM,executor丢失,任务执行时间过长的问题相同的两个任务:
spark.sql.shuffle.partitions=300:spark.sql.shuffle.partitions=500:
速度变快主要是大量的减少了gc的时间。
但是设置过大会造成性能恶化,过多的碎片task会造成大量无谓的启动关闭task开销,还有可能导致某些task hang住无法执行。
修改map阶段并行度主要是在代码中使用rdd.repartition(partitionNum)
来操作。5. shuffle
spark-sql join优化
map-side-join 关联优化
spark range join 优化6. 磁盘
7.序列化
8.数据本地性
Spark不同Cluster Manager下的数据本地性表现
spark读取hdfs数据本地性异常9.代码
编写Spark程序的几个优化点
Spark WithColumn 陷阱
Spark累加器(Accumulator)陷阱及解决办法10.PySpark
PySpark Pandas UDF
在spark dataFrame 中使用 pandas dataframe
pypy on PySpark -
-
Spark——Spark概述
2020-09-02 00:49:31一、Spark是什么 二、Spark and Hadoop 在之前的学习中,Hadoop的MapReduce是大家广为熟知的计算框架,那为什么咱们还要学习新的计算框架Spark呢,这里就不得不提到Spark和Hadoop的关系。 首先从时间节点上... -
大数据Spark实战视频教程
2016-11-10 14:26:54大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室... -
【spark】Spark 入门到精通
2019-09-29 09:29:12Spark 修炼之道(进阶篇)——Spark 入门到精通:第一节 Spark 1.5.0 集群搭建【点击打开】 Spark 修炼之道(进阶篇)——Spark 入门到精通:第二节 Hadoop、Spark 生成圈简介【点击打开】 Spark 修炼之道(进阶篇... -
大数据Spark系列之Spark单机环境搭建
2020-04-03 17:23:331. 下载spark与scala Spark下载地址 http://mirrors.hust.edu.cn/apache/spark/spark-2.4.5/spark-2.4.5-bin-hadoop2.7.tgz Scala下载地址 http://www.scala-lang.org/files/archive/scala-2.10.4.tgz 2. 解压... -
HDP2.6.5更换spark版本为2.4.5 与carbondata2.0.1集成
2020-06-29 15:56:32文章目录一更换spark版本第一种方式第二种方式 一更换spark版本 因为要使用的carbondata对spark版本有要求,项目中使用的carbondata版本为2.0.1,spark版本要求为2.4.5 第一种方式 1)、找到/usr/hdp/2.6.5.0-292/... -
Spark概述、Spark特点
2017-07-05 16:38:41一、 Spark概述1. 什么是Spark(官网:http://spark.apache.org) Spark是一种快速、通用、可扩展的大数据分析引擎,2009年诞生于加州大学伯克利分校AMPLab,2010年开源,2013年6月成为Apache孵化项目,2014年2月... -
Spark RDD基础
2018-11-30 14:17:24Spark RDD基础 IDEA 创建scala spark的Mvn项目:https://blog.csdn.net/u014646662/article/details/84618032 spark快速大数据分析.pdf下载:https://download.csdn.net/download/u014646662/10816588 弹性分布式... -
1 Spark机器学习 spark MLlib 入门
2018-09-17 10:59:14开始学习spark ml了,都知道spark是继hadoop后的大数据利器,很多人都在使用spark的分布式并行来处理大数据。spark中也提供了机器学习的包,就是MLlib。 MLlib中也包含了大部分常用的算法,分类、回归、聚类等等,... -
Spark介绍
2020-04-08 14:11:00Spark 1、什么是Spark 是基于内存的用于大规模数据处理(离线计算、实时计算、快速查询)的统一分析引擎。 也是一个生态系统。 2、官网 http://spark.apache.org http://spark.apachecn.org Spark特点 ● 速度之快 与... -
Spark Launcher Java API提交Spark算法
2020-06-07 12:36:06在介绍之前,我先附上spark 官方文档地址: http://spark.apache.org/docs/latest/api/java/org/apache/spark/launcher/package-summary.html 源码github地址: ... -
spark core、spark sql、spark streaming 联系与区别
2019-03-07 21:43:39sparkcore是做离线批处理 sparksql 是做sql高级查询 sparkshell 是做交互式查询 sparkstreaming是做流式处理 区别: Spark Core : Spark的基础,底层的最小数据单位是:RDD ; 主要是处理一些离线(可以... -
Spark之——Spark Submit提交应用程序
2018-06-19 21:44:36本部分来源,也可以到spark官网查看英文版。 spark-submit 是在spark安装目录中bin目录下的一个shell脚本文件,用于在集群中启动应用程序(如*.py脚本);对于spark支持的集群模式,spark-submit提交应用的时候有... -
[Spark]Spark 应用程序部署工具spark-submit
2017-02-16 20:08:531. 简介Spark的bin目录中的spark-submit脚本用于启动集群上的应用程序。 可以通过统一的接口使用Spark所有支持的集群管理器,因此不必为每个集群管理器专门配置你的应用程序(It can use all of Spark’s supported ... -
Spark四大组件包括Spark Streaming、Spark SQL、Spark MLlib和Spark GraphX。
2016-08-31 14:33:57Spark四大组件包括Spark Streaming、Spark SQL、Spark MLlib和Spark GraphX。它们的主要应用场景是: Spark Streaming: Spark Streaming基于微批量方式的计算和处理,可以用于处理实时的流数据。它使用DStream... -
spark踩坑记——windows环境下spark安装和运行
2018-08-10 17:08:10本文主要记录windows系统上安装spark,scala,和intelj IDEA,并实现本地spark运行。同时介绍了利用maven构建工具对spark工程构建的方法。本地运行需要本地安装scala,spark,hadoop。而如果利用maven构建工具则只... -
Spark2.1.0之剖析spark-shell
2018-04-20 09:30:10通过在spark-shell中执行word count的过程,让读者了解到可以使用spark-shell提交Spark作业。现在读者应该很想知道spark-shell究竟做了什么呢?脚本分析 在Spark安装目录的bin文件夹下可以找到spark-shell,其中有... -
spark 参数调优11-Spark Streaming
2018-09-05 17:50:32spark参数调优系列 目录地址: https://blog.csdn.net/zyzzxycj/article/details/81011540 11 Spark Streaming spark.streaming.backpressure.enabled 反压,默认false,详细了解请移步... -
spark 参数调优4-Spark UI
2018-08-31 14:42:47spark参数调优系列 目录地址: https://blog.csdn.net/zyzzxycj/article/details/81011540 ④ Spark UI 这一块配置,是有关于spark日志的。日志开关,日志输出路径,是否压缩。 还有一些可视化界面、端口的... -
[老汤]Spark 2.x实战应用系列一之怎样学习Spark
2017-11-07 13:31:42系统的讲解了我们为什么需要去认识spark、spark有什么内容以及我们该怎么去学习spark。在学习spark过程中遵循的几个原则。内容如下: 1 大数据是什么 2 需要什么知识(除了scala,java和python都行) 3 spark可以做... -
[Spark进阶]-- spark-client和spark-cluster详解
2017-03-28 15:38:13在Spark中,有Yarn-Client和Yarn-Cluster两种模式可以运行在Yarn上,通常Yarn-cluster适用于生产环境,而Yarn-Client更适用于交互,调试模式,以下是它们的区别 Spark插拨式资源管理 Spark支持Yarn,Mesos,... -
Spark调优 | Spark SQL参数调优
2019-07-26 09:45:29Spark SQL里面有很多的参数,而且这些参数在Spark官网中没有明确的解释,可能是太多了吧,可以通过在spark-sql中使用set -v 命令显示当前spark-sql版本支持的参数。 本文讲解最近关于在参与hive往spark迁移过程中... -
Hadoop与Spark等大数据框架介绍
2018-08-09 17:06:40海量数据的存储问题很早就已经出现了,一些行业或者部门因为历史的积累,数据量也达到了一定的级别。很早以前,当一台电脑无法存储这么庞大的数据时,采用的解决方案是使用NFS(网络文件系统)将数据分开存储。... -
spark-spark是什么
2018-09-07 13:59:01Spark 是一个用来实现快速而通用的集群计算的平台。 在速度方面,Spark 扩展了广泛使用的 MapReduce 计算模型,而且高效地支持更多计算模式,包括交互式查询和流处理。在处理大规模数据集时,速度是非常重要的。... -
Spark学习笔记:Spark基础
2018-09-03 23:39:57Spark基础以及WordCount实现
-
成功逆袭:外包—苏宁—阿里 论我是怎么快速晋升的?
-
C++ 判断输入的是否为整数
-
三顾茅庐,七面阿里,18k*15offer,还原我的大厂面经
-
内部管理系统Spring boot/Spring MVC/Mybati
-
MOGRT动态图标模板 爱情元素手绘爱心特效pr视频模板
-
Selenium3分布式与虚拟化
-
2021年西式面点师(中级)答案解析及西式面点师(中级)证考试
-
ARS510前向雷达参数和性能说明书.pdf
-
字符设备驱动之poll机制
-
Kotlin协程极简入门与解密
-
MOGRT模板 电影级金属闪电特效动态全屏标题mogrt模板
-
2021年建筑电工(建筑特殊工种)考试报名及建筑电工(建筑特殊工种)找答案
-
QT数据库操作实例
-
Excel高级图表技巧
-
第3章 入门程序、常量、变量
-
基于前散射的超低烟尘浓度检测仪研究
-
图片选择
-
【数据分析-随到随学】Hive详解
-
气溶胶粒子微观特性对后向散射回波的影响
-
Soliton and bound-state soliton mode-locked fiber laser based on a MoS2/fluorine mica Langmuir–Blodgett film saturable absorber