精华内容
下载资源
问答
  • Spark面试题

    2019-09-27 14:25:19
    Spark面试题为什么考察Spark?精选考题导入 为什么考察Spark? Spark作为大数据组件中的执行引擎,具备以下优势特性。 高效性。内存计算下,Spark 比 MapReduce 快100倍。Spark使用最先进的DAG调度程序、查询优化...

    为什么考察Spark?

    Spark作为大数据组件中的执行引擎,具备以下优势特性。

    1. 高效性。内存计算下,Spark 比 MapReduce 快100倍。Spark使用最先进的DAG调度程序、查询优化程序和物理执行引擎,实现批量和流式数据的高性能。
    2. 易用性。Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建多样的应用。
    3. 通用性。Spark提供了统一的解决方案。Spark可以用于批处理交互式查询(Spark SQL)实时流处理(Spark Streaming)机器学习(Spark MLlib)图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。这对于企业应用来说,就可使用一个平台来进行不同的工程实现,减少了人力开发和平台部署成本。
    4. 兼容性。Spark能够跟很多开源工程兼容使用。如Spark可以使用***Hadoop的YARN和Apache Mesos***作为它的资源管理和调度器,并且Spark可以读取多种数据源,如HDFS、HBase、MySQL等。对于任何一家已经部署好Hadoop基础集群的企业来说,在不需要进行任何数据迁移和处理的情况下,就可以快速使用上Spark强大的数据处理和计算能力。

    精选考题

    • Spark支持的编程语言有哪几种?

      答:Spark 同时支持Scala、Python、Java 、R四种应用程序API编程接口和编程方式, 考虑到大数据处理的特性,一般会优先使用Scala进行编程。

    • Spark有什么特点,处理大数据有什么优势?

      答:Spark为我们提供了一个全面、统一的框架,能够适用于各种各样原先需要多种不同的分布式平台的场景,包括批处理、迭代算法、交互式查询和流处理。
      Spark相比于MapReduce的运行速度提升几十到几百倍。
      Spark提供了丰富的开箱即用算子工具,让开发者可以快速的用Java、Scala或Python编写程序。它本身自带了一个超过80个的高阶操作符集合。

    • Spark中Worker的主要工作是什么?
      答:主要功能:管理当前节点内存和CPU的使用状况,接收master分配过来的资源指令,通过ExecutorRunner启动程序分配任务,worker就类似于包工头,管理分配新进程,做计算的服务,相当于process服务。

      需要注意的是:

      1)worker不会汇报当前信息给master,worker心跳给master只有workid,它不会发送资源信息给mater。

      2)worker不会运行代码,具体运行的是Executor,worker可以运行具体appliaction写的业务逻辑代码,操作代码的节点,它不会运行程序的代码的。

    • Spark Driver的功能是什么?
      答:1)一个Spark作业运行时包括一个Driver进程,也是作业的主进程,具有main函数,并且持有SparkContext的实例,是程序的人口点;2)功能:负责向集群申请资源,向master注册信息,负责作业的调度,负责作业的解析,生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler。

    • Spark是如何容错的?
      答: 一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。

      面向大规模数据分析,数据检查点操作成本非常高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低得多,同时还需要消耗很多其它的存储资源。因此,Spark选择记录更新的方式。可是,假设更新粒度太细太多,那么记录更新成本也不低。故RDD仅仅支持粗粒度转换,即仅仅记录单个块上运行的单个操作,然后将创建RDD的一系列变换序列(每一个RDD都包括了他是怎样由其它RDD变换过来的以及怎样重建某一块数据的信息。因此RDD的容错机制又称“血统(Lineage)”容错)记录下来,以便恢复丢失的分区。

      Lineage本质上非常相似于数据库中的重做日志(Redo Log),只是这个重做日志粒度非常大,是对全局数据做相同的重做进而恢复数据。RDD的Lineage记录的是粗颗粒度的特定数据Transformation操作(如filter、map、join等)行为。当这个RDD的部分分区数据丢失时,它可以通过Lineage获取足够的信息来重新运算和恢复丢失的数据分区。
      在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父RDD分区重算即可,不依赖于其他节点。而宽依赖需要父RDD的所有分区都存在,重算就很昂贵了。可以这样理解开销的经济与否:在窄依赖中,在子RDD的分区丢失、重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的,会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。

    • 说说SparkContext和SparkSession有什么区别和联系?
      SparkContext是使用Spark功能的入口点。SparkSession是Spark2.x后引入的概念。在2.x之前,对于不同的功能,需要使用不同的Context,比如

      创建和操作RDD时,使用SparkContext

      使用Streaming时,使用StreamingContext

      使用SQL时,使用SQLContext

      使用Hive时,使用HiveContext

      在2.x中,为了统一上述的Context,引入SparkSession,实质上是SQLContext、HiveContext、SparkContext的组合。

    • hadoop和spark的都是并行计算,那么他们有什么相同和区别?(优势在哪里,只写区别)
      两者都是用mr模型来进行并行计算,但机制不同。hadoop的一个作业称为job,job里面分为map task和reduce task,每个task都是在自己的进程中运行的,当task结束时,进程也会结束。

      Spark用户提交的任务称为application,一个application中存在多个job,每触发一次action操作就会产生一个job。这些job可以并行或串行执行,每个job中有多个stage,stage是shuffle过程中DAGSchaduler通过RDD之间的依赖关系划分job而来的,每个stage里面有多个task,组成taskset,由TaskSchaduler分发到各个executor中执行,executor的生命周期是和application一样的,即使没有job运行也是存在的,所以task可以快速启动读取内存中的数据并进行计算;

      hadoop的job只有map和reduce操作,表达能力比较欠缺而且在mr过程中会重复的读写hdfs,造成大量的io操作,多个job需要自己管理关系;而spark则提供了丰富的算子,可以实现常用的各种数据处理操作。

      spark的迭代计算都是在内存中进行的,API中提供了大量的RDD操作如join,groupby等,而且通过DAG图可以实现良好的容错。

    • Spark有哪些组件,每个组件有什么功能?对应到什么场景?
      1)Spark core:是其它组件的基础,spark的内核,主要包含:有向循环图、RDD、Lingage、Cache、broadcast等,并封装了底层通讯框架,是Spark的基础。

      2)SparkStreaming:是一个对实时数据流进行高通量、容错处理的流式处理系统,可以对多种数据源(如Kafka、Flume、Twitter、Zero和TCP 套接字)进行类似Map、Reduce和Join等复杂操作,将流式计算分解成一系列短小的批处理作业。

      3)Spark sql:Shark是SparkSQL的前身,Spark SQL的一个重要特点是其能够统一处理关系表和RDD,使得开发人员可以轻松地使用SQL命令进行外部查询,同时进行更复杂的数据分析。

      4)SparkR:是一个R语言包,它提供了轻量级的方式使得可以在R语言中使用Apache Spark。在Spark 1.4中,SparkR实现了分布式的dataframe,支持类似查询、过滤以及聚合的操作,但是这个可以操作大规模的数据集。

      5)MLBase是Spark生态圈的一部分专注于机器学习,让机器学习的门槛更低,让一些可能并不了解机器学习的用户也能方便地使用MLbase。MLBase分为四部分:MLlib、MLI、ML Optimizer和MLRuntime。

      6)GraphX用于图和图并行计算。

    • Spark有几种部署模式,每种模式特点?
      local(本地模式):常用于本地开发测试,本地还分为local单线程和local-cluster多线程;

      standalone(集群模式):典型的Master/Slave模式,Spark支持ZooKeeper来实现Master HA;

      on yarn(集群模式):运行在 yarn 资源管理器框架之上,由 yarn 负责资源管理,Spark 负责任务调度和计算;

      on mesos(集群模式):运行在 mesos 资源管理器框架之上,由 mesos 负责资源管理,Spark 负责任务调度和计算;

      on cloud(集群模式):比如 AWS 的 EC2,使用这个模式能很方便的访问 Amazon的 S3,Spark 支持多种分布式存储系统:HDFS 和 S3等。

    • spark有哪些存储级别?
      1)MEMORY_ONLY:数据保存在内存中,如果内存不够,数据可能就不会持久化;

      2)MEMORY_AND_DISK:数据优先保存在内存中,如果内存不够则会存到磁盘中;

      3)MEMORY_ONLY_SER:和MEMORY_ONLY类似,区别是会将RDD中的数据进行序列化,这种方式更加节省内存;

      4)MEMORY_AND_DISK_SER:和MEMORY_AND_DISK类似,区别是会将RDD中的数据进行序列化,这种方式更加节省内存;

      5)DISK_ONLY:将数据全部写入磁盘文件中;

      6)MEMORY_ONLY_2, MEMORY_AND_DISK_2, 等等:这种有后缀_2的,代表的是将每个持久化的数据,都复制一份副本,并将副本保存到其他节点上。这种基于副本的持久化机制主要用于进行容错。

    • RDD的优势是什么?
      1)高效容错机制

      RDD没有checkpoint的开销,想还原一个RDD只需要根据血缘关系就可以,而且基本不涉及分区的重计算,除非分区的数据丢失了,重算过程在不同节点并行进行,不需要将整个系统回滚。

      2)数据本地性

      任务能够根据数据本地性(data locality)被分配,意思是优先将任务分配到数据存储的节点,从而提高性能。

      3)优雅降级 (degrade gracefully)

      读取数据最快的方式当然是从内存中读取,但是当内存不足的时候,RDD会将大分区溢出存储到磁盘,也能继续提供并行计算的能力。

    • DataFrame的特性?
      1)大数据量级:支持从KB到PB级的数据量

      2)多种数据源:支持多种数据格式和多种存储系统

      3)代码优化:通过Catalyst优化器进行先进的优化生成代码

      4)通用性:通过Spark无缝集成主流大数据工具与基础设施

      5)多种开发语言:API支持Python、Java、Scala和R语言。

    • RDD中关于转换(transformation)与动作(action)的区别?
      transformation操作会产生新的RDD,而action不会,但是它会触发运算,将RDD上某项操作的结果返回给程序。无论发生多少次transformation操作都不会触发运算,只有action操作才会触发运算。

    • RDD中有几种依赖?有什么作用?
      有窄依赖(narrowdependencies)和宽依赖(widedependencies)两种。窄依赖是指父RDD的每个分区都只被子RDD的一个分区所使用。相应的,那么宽依赖就是指父RDD的分区被多个子RDD的分区所依赖。例如,map就是一种窄依赖,而join则会导致宽依赖,主要是看有没有shuffle操作。

      宽窄依赖的作用是用来划分stage。

    • cache和persist的区别?

      它们都是用来进行缓存的。
      1)cache是特定的persist,rdd中cache的缓存级别是MEMORY_ONLY,cache调用了persist;
      3)persist可以设置不同的缓存级别。

    • 什么是RDD?什么是DataFrame?什么是DataSet?以及他们之间的区别?

      RDD全称Resilient Distributed Dataset,弹性分布式数据集,它是记录的只读分区集合,是Spark的基本数据结构,见名释义:

      弹性,表现在两个方面,一是当计算过程中内存不足时可刷写到磁盘等外存上,可与外存做灵活的数据交换;二是RDD使用了一种“血统”的容错机制,在结构更新和丢失后可随时根据血统进行数据模型的重建;

      分布式,可分布在多台机器上进行并行计算;

      数据集,一组只读的、可分区的分布式数据集合,集合内包含了多个分区,分区依照特定规则将具有相同属性的数据记录放在一起,每个分区相当于一个数据集片段。

      理解了RDD,DataFrame理解起来就比较容易了,DataFrame的思想来源于Python的pandas库,RDD是一个数据集,DataFrame在RDD的基础上加了Schema(描述数据的信息,可以认为是元数据,DataFrame曾经就有个名字叫SchemaRDD)。

      DataSet是DataFrame API的扩展。相较于RDD来说,DataSet提供了强类型支持,区别也是给RDD的每行数据加了类型约束。

      共同点

      RDD、DataFrame、DataSet全都是Spark平台下的分布式弹性数据集,为处理超大型数据提供便利。

      三者都有惰性机制,在进行创建、转换等阶段,如map、filter等方法时,不会立即执行,只有在遇到Action如count、collect等时,才会真正开始运算。

      三者都会根据Spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出。

      三者有许多共同的函数,如filter、map等。

      不同点

      RDD不支持Sparksql操作,DataFrame与DataSet均支持Sparksql,比如select,groupby之类,还能注册临时表/视图,实现与sql语句的无缝操作。

      DataSet和DataFrame拥有完全相同的成员函数,区别在于每一行的数据类型和字段类型是否明确。DataFrame也可以叫DataSet[Row],每一行的类型为Row,而DataSet每一行的数据类型是确定的。DataFrame只知道字段,但无法确定字段的具体类型,所以在执行这些操作的时候是没办法在编译的时候检查类型是否匹配的,比如你可以对一个String进行减法操作,在执行的时候才会报错,而DataSet不仅仅知道字段,还知道字段类型,所以有更严格的错误检查。

      相比于RDD,DataFrame与DataSet支持一些特别方便的保存方式,比如保存成csv,且可以带上表头,这样每一列的字段名一目了然。

    • 什么是广播变量?
      广播变量允许开发人员在每个节点缓存只读的变量,而不是在任务之间传递这些变量。实际工作中,当我们需要在分布式计算里面分发大对象,例如:字典,集合,黑白名单等,这个都会由Driver端进行分发,一般来讲,如果这个变量不是广播变量,那么每个task就会分发一份,这在task数目十分多的情况下Driver的带宽会成为系统的瓶颈,而且会大量消耗task服务器上的内存资源,如果将这个变量声明为广播变量,那么只是每个Executor拥有一份,这个Executor启动的task会共享这个变量,从而节省了通信的成本和内存资源。

      使用广播变量的注意事项:

      广播变量只能在Driver端定义,不能在Executor端定义。

      在Driver端可以修改广播变量的值,在Executor端无法修改广播变量的值。

      不能将一个RDD使用广播变量广播出去,因为RDD是不存储数据的。可以将RDD在Driver端collect为一个集合再广播出去。

      被广播的对象必须实现序列化。

    • 什么是累加器?

    在数据分析工作中,我们经常会有这样的需求,如异常监控,调试,记录符合某特性的数据的数目,这种需求都需要用到计数器,如果一个变量不被声明为一个累加器,那么它将在被改变时不会在Driver端进行全局汇总,即在分布式运行时每个task运行的只是原始变量的一个副本,并不能改变原始变量的值,但是当这个变量被声明为累加器后,该变量就会有分布式累加的功能。
    我们可以通过分别调用SparkContext.longAccumulator()或SparkContext.doubleAccumulator() 累积Long或Double类型的值来创建数字累加器。然后,可以使用add方法对累加器进行增加。驱动程序可以使用其value方法读取累加器的值。
    使用累加器的注意事项:

    累加器在Driver端定义赋初始值,且只能在Driver端读取最后的值,在Excutor端更新。
    在Driver端获取累计器值的时候需要一个Action操作来触发,才能拿到值。
    累计器只能执行add操作。

    • rdd的弹性表现在哪几点?

      自动进行内存和磁盘切换;

      基于lineage的高效容错;

      task如果失败会执行特定次数的重试,而且只计算失败的分片;

      具备checkpoint(每次对RDD操作都会产生新的RDD,如果链条比较长,计算比较笨重,就把数据放在硬盘中)和persist (内存或磁盘中对数据进行复用)(检查点、持久化)特性;

      数据调度弹性;

      数据分片的高度弹性repartition。

    • Spark配置的优先级?

      通过SparkConf 对象配置的属性优先级最高;其次是提交作业时传入的命令行参数配置;最后是spark-defaults.conf文件中的默认配置。

    • 哪些算子会产生shuffle。

      去重:distinct

      聚合:reduceByKey、groupBy、groupByKey、aggregateByKey、combineByKey

      排序:sortByKey、sortBy

      重分区:repartition、coalesce(增大分区数时)

      集合或者表操作:intersection、subtract、subtractByKey、join、leftOuterJoin

    • Spark streaming 读取kafka数据的两种方式?
      1.基于Receiver方式

      需要使用单独的Receiver线程来异步获取Kafka数据。Spark Streaming启动时,会在Executor中同时启动Receiver异步线程用于从Kafka持续获取数据,获取的数据先存储在Receiver中(存储方式由StorageLevel决定),后续,当Batch Job触发后,这些数据会被转移到剩下的Executor中被处理。处理完毕后,Receiver会自动更新Zookeeper中的Offset。

      2.基于Direct(No Receiver)方式

      不需要使用单独的Receiver线程从Kafka获取数据。Spark Streaming Batch Job触发时,Driver端确定要读取的Topic-Partition的OffsetRange,然后由Executor并行从Kafka各Partition读取数据并计算。

    • 为什么要进行序列化?

      序列化可以对数据进行压缩减少数据的存储空间和传输速度,但是数据在使用时需要进行反序列化,比较消耗CPU资源。

    • RDD中reduceBykey与groupByKey哪个性能好,为什么?

      reduceByKey会在结果发送至reducer之前对每个mapper在本地进行merge,有点类似于在MapReduce中的combiner。这样做的好处在于,在map端进行一次reduce之后,数据量会大幅度减小,从而减小传输,保证reduce端能够更快的进行结果计算。

      groupByKey会对每一个RDD中的value值进行聚合形成一个序列(Iterator),此操作发生在reduce端,所以势必会将所有的数据通过网络进行传输,造成不必要的浪费。

      所以相比之下reduceBykey的性能更好。

    • Spark为什么要持久化,一般什么场景下要进行persist操作?
      持久化的目的是为了避免重算和提高效率。rdd出错后可以根据血统信息进行还原,如果没有对父rdd进行持久化操作就需要从源头重新计算;还有一种场景是某个rdd被重复使用,而这个rdd的生成的代价也不小,为了提高计算效率可以将这个rdd进行持久化操作,这样提高后续的计算效率。以下场景需要进行persist操作:

      1)计算链条很长,一旦失败重新恢复代价太大;

      2)计算复杂耗时长,避免重新计算;

      3)checkpoint所在的rdd要进行persist;

      4)比较大的shuffle之后最好做persist避免再次shuffle;

    • join操作如何优化?
      1)对于大小表join的时候,使用map-side join替换join;

      2)在join之前对表进行筛选,减少join的数据量

      3)避免出现笛卡尔积,关联字段最好不要有重复的值,可以在join之前做去重处理。

      4)某些场景下可以把join后聚合,优化为聚合后再join,减少join数据量

    • Spark如何防止内存溢出?

    1.driver端的内存溢出

    可以增大driver的内存参数:spark.driver.memory (default 1g);

    2.map过程产生大量对象导致内存溢出

    这种溢出的原因是在单个map中产生了大量的对象导致的,针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。

    3.数据不平衡导致内存溢出

    数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。

    4.shuffle后内存溢出

    shuffle内存溢出的情况基本可以说都是shuffle后,单个文件过大导致的。在Spark中,join,reduceByKey这一类的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) ,如果是别的partitioner导致的shuffle内存溢出,就需要从partitioner的代码增加partitions的数量。

    5.standalone模式下资源分配不均匀导致内存溢出

    在standalone的模式下如果配置了–total-executor-cores 和 --executor-memory 这两个参数,但是没有配置–executor-cores这个参数的话,就有可能导致,每个Executor的memory是一样的,但是cores的数量不同,那么在cores数量多的Executor中,由于能够同时执行多个Task,就容易导致内存溢出的情况。这种情况的解决方法就是同时配置–executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。

    6.使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()

    rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。

    导入

    [1]:总结 | 最全的Spark基础知识解答,作者:aaronhoho - https://www.jianshu.com/p/03a0e267c24b
    [2]:Spark知识点总结,作者:身为风帆,要顺其自然 - https://blog.csdn.net/qq_33247435/article/details/83653584
    [3]:spark相关的面试题跟答案,作者:wangxiaojian - http://www.aboutyun.com/?mod=viewthread&action=printable&tid=24246
    [4]:spark 如何防止内存溢出,作者:老子天下最美Samhttps://blog.csdn.net/Sunshine_2211468152/article/details/83050337

    展开全文
  • spark面试题

    2020-08-02 21:08:37
    Spark面试题 1. sparksql执行过程中发生数据倾斜导致任务卡顿该怎么解决??? 分析: 数据倾斜一般都发生在shuffle过程中,部分key存在占用比例过大导致大量数据分发到同一个task中导致任务执行缓慢甚至导致OOM异常...

    Spark面试题

    1. sparksql执行过程中发生数据倾斜导致任务卡顿该怎么解决???

    分析: 数据倾斜一般都发生在shuffle过程中,部分key存在占用比例过大导致大量数据分发到同一个task中导致任务执行缓慢甚至导致OOM异常。

    数据倾斜的现象: 多数task执行速度较快,少数执行时间非常长,或者等待很长时间提示内存不足,执行失败。

    原因: 1.key本身分部不均衡,key设计不合理。2.shuffle并行度不足。

    解决方案:

    1. **聚合源数据:**针对hive表中数据,对key进行重新设计对key打上随机数前缀将key打散,这样就不会造成shuffle操作,也就不存在热点数据的情况。之后再进行自定义的逻辑处理。 —此时数据已经比较均匀的分散在各个task上,再在task上做局部聚合,之后去除key的前缀信息,再做全局聚合。 【局部聚合+全局聚合】

      1. sql中实现需要使用双重groupBy改写SQL,两次groupBy。一先将key拼接随机的前缀[rand(10)],进行groupby,第二次将拼接的key去除拼接前缀再groupBy。
    2. 查找造成数据倾斜的key,使用sql的where条件过滤掉导致数据倾斜的key,即可避免shuffle操作导致的数据倾斜。针对热点key可以采用方案[1] 做处理。

    3. 提高shuffle并行度:groupByKey(1000),spark.sql.shuffle.partitions(默认是200); 也就是增加reduceTask的数量,修改之后会创建指定数量的reduce task从一定程度上可以是sparksql跑的更快。

    4. reduce join转换为map joinspark.sql.autoBroadcastJoinThreshold(默认是10485760);可以自己将表做成RDD,自己手动去实现map join;SparkSQL内置的map join,默认如果有一个10M以内的小表,会将该表进行broadcast,然后执行map join;调节这个阈值,比如调节到20M、50M、甚至1G

    5. 采样倾斜key并单独进行join:纯Spark Core的一种方式,sample、filter等算子。如果发现有一个或几个key对应的数据量特别大。此时只能将数据量多的key拉取出来,然后进行一个优化操作-> 针对热点key添加随机前缀,在执行shuffle操作将数据打散,再进行处理。

      如果你发现整个RDD中有多个key对应的数据量都特别多,此时,只能将数据量多的key拉取出来,然后进行一个优化操作。从另外一个要join的表中,也过滤出来一份数据,比如某个key可能就只有一条数据。
        然后我们再对那个只有一条数据的RDD,进行flatMap操作,打上100个随机数,作为前缀,返回100条数据。
        然后再将刚刚拉取出来的key对应的数据量特别多的RDD,给每一条数据,都打上一个100以内的随机数,作为前缀。然后就可以进行join操作了,join完以后,执行map操作将之前打上的随机数给去掉,然后再和另外一个普通RDD join以后的结果再进行union操作。

    6. 使用随机数以及扩容表进行join

      ​ 这个方案是没办法彻底解决数据倾斜的,只是一种对数据倾斜的缓解。
        1、选择一个RDD,要用flatMap,进行扩容,将每条数据,映射为多条数据,每个映射出来的数据,都带了一个n以内的随机数,通常来说会选择10。
        2、将另外一个RDD,做普通的map映射操作,每条数据都打上一个10以内的随机数。
        3、最后将两个处理后的RDD进行join操作。
        4、因为两个RDD都很大,所以你没有办法去将某一个RDD扩的特别大,一般就是10倍。且需要在最后清楚重复的9倍数据。
        5、如果就是10倍的话,那么数据倾斜问题的确是只能说是缓解和减轻,不能说彻底解决。

    2. spark并行度为200,输入文件block为100个/300个,实际任务的并行度分别是多少?

    ​ **信息介绍:**Spark会根据文件的大小自动设置要在每个文件上运行的“映射”任务的数量。默认的分区数 = 输入文件的大小 / splitsize; 分区数及对应spark的启动task的数量。

    ​ splitsize是mapreduce中每个输入内容的大小。 一般splitsize = dfs.block.size [1]从2.7.3版本开始,官方关于Data Blocks 的说明中,block size由64 MB变成了128 MB的。

    hdfs文件数据读取时的并行度: 文件所占block数(一个block对应一个task)。

    解答: 默认spark并行度与输入文件数保持一致。 block为100时并发度为100,block为300时并发度为200(按照分配的资源最大值)

    3. Spark checkpoint & Accumulators & Broadcast Variables 作用及应用场景。

    ### checkpoint
    

    metadata checkpointing

    >  主要出现在driver failures 的情况下用于恢复作业。
    
    1. configuration

      用于创建流式计算应用的配置信息

    2. DStream operations

      用于定义流式计算应用的 dstream的集合。

    3. Incomplete baches

      在队列中等待执行但还未执行完成的任务批次。

      [1] 针对无状态转换的作业,如果作业恢复过程中 metadatacheckpointing也是适用的,但是会存在部分接收到但是未处理的数据可能会丢失。

    data checkpointing

    适用于有状态转换(stateful transformation)操作的作业中。

    ​ 例如:updateStateByKey,reduceByKeyAndWindow等

    ​ 将生成的RDDs保存到可靠的存储中。在一些跨多个批次合并数据的有状态转换中,创建checkpoint是必须的。计算链路的增长会导致依赖链路的增长也会导致任务在失败是恢复的时间随之增长(与依赖链成正比)。有状态转换的中间RDDs定期被检查点到可靠存储(如HDFS),以切断依赖链。

    job失败时主动从checkpoint中重启

    /** checkpoint使用方法
    */
    //创建一个新的streamcontext的方法,
    // 内部包含了业务处理的所有逻辑
    def functionToCreateContext(): StreamingContext = {
      ...
      //任务停止是否删除checkpoint,默认不删除。也可以在job进行逻辑调整之后从checkpoint中恢复。--问题checkpoint存储算子信息可能导致job恢复失败。‘
      sparkConf.set("spark.cleaner.referenceTracking.cleanCheckpoints","false")
      val ssc = new StreamingContext(...)   // new context
      val lines = ssc.socketTextStream(...) // create DStreams
      ...
      ssc.checkpoint(checkpointDirectory)   // set checkpoint directory
      ssc
    }
    //创建流式计算的上下文环境,并指定任务失败时重启加载的metadatacheckpointing的路径 及新创建流式计算上下文的方法
    /**	1.【创建】 如果checkpointDirectory目录不存在则按照functionToCreateContext中的逻辑创建streamingcontext
    		2.【恢复】如果checkpointDirectory存在则从检查点目录中构建出streamcontext内容,不执行functionToCreateContext中的创建逻辑*/
    val context = StreamingContext.getOrCreate(checkpointDirectory, functionToCreateContext _)
    
    
    // Start the context
    context.start()
    context.awaitTermination()
    

    [1] 由于检查点的设置会导致一定的性能消耗,并增加批处理的时间。 因此,需要仔细设置检查点的间隔。 在小批量(例如1秒)时,每批检查点可能会大大降低操作吞吐量。 相反,检查点太不频繁会导致沿袭和任务规模增加,这可能会产生不利影响。对于需要RDD检查点的有状态转换,默认间隔为批处理间隔的倍数,至少应为10秒。 可以使用dstream.checkpoint(checkpointInterval)进行设置。 通常,DStream的5-10个滑动间隔的检查点间隔是一个不错的尝试。

    [2]spark.streaming.receiver.writeAheadLog.enable =true 开启write ahead logs功能,通过接收器接收的所有输入数据都将保存到预写日志中,以便在驱动程序发生故障后将其恢复。预写日志保存在checkpoint目录中。

    共享变量——Accumulators, Broadcast Variables

    ​ accumulators和boradcast 变量都可以从流处理的checkpoint中恢复。必须为Accumulators和Broadcast变量创建延迟(lazy)实例化的单例实例,以便在驱动程序因故障而重启后可以重新实例化它们。 在下面的示例中显示。

    //自定义广播变量
    //spark默认会将下一阶段需要用到的数据通过广播变量的方式发送到对应的节点上
    //针对跨多个阶段的任务需要相同的数据时或者以反序列化形式缓存数据非常重要时,显示创建广播变量才是有用的。
    object WordBlacklist {
    
      @volatile private var instance: Broadcast[Seq[String]] = null
    
      def getInstance(sc: SparkContext): Broadcast[Seq[String]] = {
        if (instance == null) {
          synchronized {
            if (instance == null) {
              val wordBlacklist = Seq("a", "b", "c")
              instance = sc.broadcast(wordBlacklist)
            }
          }
        }
        instance
      }
    }
    
    //自定义accumulator
    object DroppedWordsCounter {
    
      @volatile private var instance: LongAccumulator = null
    
      def getInstance(sc: SparkContext): LongAccumulator = {
        if (instance == null) {
          synchronized {
            if (instance == null) {
              instance = sc.longAccumulator("WordsInBlacklistCounter")
            }
          }
        }
        instance
      }
    }
    
    wordCounts.foreachRDD { (rdd: RDD[(String, Int)], time: Time) =>
      // Get or register the blacklist Broadcast
      val blacklist = WordBlacklist.getInstance(rdd.sparkContext)
      // Get or register the droppedWordsCounter Accumulator
      val droppedWordsCounter = DroppedWordsCounter.getInstance(rdd.sparkContext)
      // Use blacklist to drop words and use droppedWordsCounter to count them
      val counts = rdd.filter { case (word, count) =>
        if (blacklist.value.contains(word)) {
          droppedWordsCounter.add(count)
          false
        } else {
          true
        }
      }.collect().mkString("[", ", ", "]")
      val output = "Counts at time " + time + " " + counts
    })
    

    accumulator 针对action类型的操作可以保证只执行一次计数,即使任务重启也只会计数一次。但在transformation操作中,不能保证只更新一次。spark lazy模式在有action动作时才会真正执行,对应的累加器也只会在action操作触发式进行计数。

    4. 什么时候spark才会使用Map-side Join?

    Map side join1是针对以下场景进行的优化:两个待连接表中,有一个表非常大,而另一个表非常小,以至于小表可以直接存放到内存中。这样,我们可以将小表复制多份[广播],让每个task内存中存在一份(比如存放到hash table中),然后只扫描大表:对于大表中的每一条记录key/value,在hash table中查找是否有相同的key的记录,如果有,则连接后输出即可。

    只有当要进行join的表的大小小于spark.sql.autoBroadcastJoinThreshold(默认是10M)的时候,才会进行mapjoin。

    -- 手动添加 mapsidejoin注释 达到优化目的
    select /*+ BROADCAST (b) */ * from a where id not in (select id from b)
    
    //广播表b再与表a进行join操作
    private val sqlcontext: SQLContext = session.sqlContext
    sparksession.sparkContext.broadcast(sqlcontext.table("b").join(sqlcontext.table("a")))
    

    [2] 在使用map reduce处理数据的时候,join操作有两种选择:一种选择是在map端执行join操作,即所谓的Map-side Join(Broadcast join);另一种选择是在reduce端执行join操作,即所谓的Reduce-side Join(shuffle join)。在map端执行join操作,适合在有一个表比较小的情况下,能把整个表放到内存,发送到各个节点进行join操作。

    5.spark中如何划分stage

    1. spark application中可以因为不同的action触发众多job。每个job是由一个或者多个stage构成的,stage之间构成前后依赖关系。
    2. stage划分的依据是宽依赖,即产生shuffle操作的算子都会产生宽依赖。例:groupbykey,reducebykey等。
    3. 由action导致sparkcontext.runjob的执行,最终导致了DAGScheduler中submitJob的执行,其核心是通过发送一个case class JobSubmitted对象给eventProcessLoop.eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGSchedulerEventProcessLoop是eventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive。
    4. 在doOnReceive中通过模式匹配的方法把执行路由到相应的dagScheduler.handle*处理阶段。
    5. 对于新提交的任务匹配到:JobSubmitted会执行handleJobSubmitted处理创建finalStage,在内部的createResultStage中调用getOrCreateParentStages查找父依赖的stage并做根据是否shuffle操作切分stage。

    [1] 源码中每个action算子中都执行了runjob方法触发任务的执行。跟踪runjob就可以追踪到stage切分以及父依赖的获取。

    6. spark 如何防止内存溢出

    driver端的内存溢出

    可以增大driver的内存参数:spark.driver.memory (default 1g)
    这个参数用来设置Driver的内存。在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。

    map过程产生大量对象导致内存溢出

    这种溢出的原因是在单个map中产生了大量的对象导致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操作在rdd中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调repartition方法,分区成更小的块传入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
    面对这种问题注意,不能使用rdd.coalesce方法,这个方法只能减少分区,不能增加分区,不会有shuffle的过程。

    数据不平衡导致内存溢出

    数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。这里就不再累赘了。

    shuffle后内存溢出

    shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism参数只对HashPartitioner有效,所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出,就需要从partitioner的代码增加partitions的数量。

    standalone模式下资源分配不均匀导致内存溢出

    在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 这两个参数,但是没有配置–executor-cores这个参数的话,就有可能导致,每个Executor的memory是一样的,但是cores的数量不同,那么在cores数量多的Executor中,由于能够同时执行多个Task,就容易导致内存溢出的情况。这种情况的解决方法就是同时配置–executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。

    使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()

    rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。

    7.简要描述Spark分布式集群搭建的步骤

    1.进入spark_home/conf 打开spark-env.sh

    # 配置JDK安装位置 
    JAVA_HOME=/usr/java/jdk1.8.0_201 
    # 配置hadoop配置文件的位置 
    HADOOP_CONF_DIR=/usr/app/hadoop-2.6.0-cdh5.15.2/etc/hadoop 
    # 配置zookeeper地址 
    SPARK_DAEMON_JAVA_OPTS="-Dspark.deploy.recoveryMode=ZOOKEEPER -Dspark.deploy.zookeeper.url=hadoop001:2181,hadoop002:2181,hadoop003:2181 -Dspark.deploy.zookeeper.dir=/spark"
    
    1. mv slaves.template slaves 打开slaves文件添加worker节点的host_name。

    2. 将单节点的spark配置分发到集群中的所有节点。

    3. 启动zookeeper zkServer.sh start

    4. 启动hadoop集群 # 启动dfs服务 start-dfs.sh # 启动yarn服务 start-yarn.sh

    5. 启动spark集群 进入到主节点的机器 执行 start-all.sh

    6. 启动master的standby节点 进入需要启动的节点执行start-master.sh

    7. 验证:查看 Spark 的 Web-UI 页面,端口为 8080。master节点状态为ACTIVE。其他两个备用master节点为STANDBY

    8. 验证高可用: 在master节点使用kill杀死master进程。此时备用的master会重新选择一个作为master节点。并获取全部存活的worker。

    9. 作业提交:

      spark-submit \ 
      --class org.apache.spark.examples.SparkPi \ 
      --master yarn \ 
      --deploy-mode client \ 
      --executor-memory 1G \
       --num-executors 10 \ 
      /usr/app/spark-2.4.0-bin-hadoop2.6/examples/jars/spark-examples_2.11-2.4.0.jar \
       100
      

    8. kafka整合sparkStreaming问题

    spark+kafka官网

    	### 1).	如何实现sparkstreaming读取kafka中的数据
    

    ​ 两种方式:在kafka0.10版本之前有二种方式与sparkStreaming整合receiver和direct。0.10之后只有direct方式了。

    **receiver:**是采用了kafka高级api,利用receiver接收器来接受kafka topic中的数据,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据,kafka中topic的偏移量是保存在zk中的

    ​ 基本使用:

    val topics = Array("click_events").toIterable
    
        val kafkaParams = Map[String, Object](
          ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",
          ConsumerConfig.GROUP_ID_CONFIG -> "spark_stream_test",
          ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer],
          ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG -> classOf[StringDeserializer]
        )
    val kafkaStream = KafkaUtils.createDirectStream[String, String](
          stream,
          LocationStrategies.PreferConsistent,
          ConsumerStrategies.Subscribe[String, String](topics, kafkaParams)
        )
    

    还有几个需要注意的点:

    • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度.
    • 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
    • 在默认配置下,这种方式可能会因为底层的失败而丢失数据. 因为receiver一直在接收数据,在其已经通知zookeeper数据接收完成但是还没有处理的时候,executor突然挂掉(或是driver挂掉通知executor关闭),缓存在其中的数据就会丢失. 如果希望做到高可靠, 让数据零丢失,如果我们启用了**Write Ahead Logs(spark.streaming.receiver.writeAheadLog.enable=true)**该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中. 所以, 即使底层节点出现了失败, 也可以使用预写日志中的数据进行恢复. 复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)

    direct: 在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。(设置spark.streaming.kafka.maxRatePerPartition=10000。限制每秒钟从topic的每个partition最多消费的消息条数)。

    //设定对目标topic每个partition每秒钟拉取的数据条数。总量为 1000*kafkapartitions*每个批次的时间间隔
    sparkconf.set("spark.streaming.kafka.maxRatePerPartition","1000")
    

    2) 对比这2中方式的优缺点:

    • 采用receiver方式:这种方式可以保证数据不丢失,但是无法保证数据只被处理一次,WAL实现的是At-least-once语义(至少被处理一次),如果在写入到外部存储的数据还没有将offset更新到zookeeper就挂掉,这些数据将会被反复消费. 同时,降低了程序的吞吐量。

    • 采用direct方式:相比Receiver模式而言能够确保机制更加健壮. 区别于使用Receiver来被动接收数据, Direct模式会周期性地主动查询Kafka, 来获得每个topic+partition的最新的offset, 从而定义每个batch的offset的范围. 当处理数据的job启动时, 就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。

      • 优点:

        • 1、简化并行读取

          • 如果要读取多个partition, 不需要创建多个输入DStream然后对它们进行union操作. Spark会创建跟Kafka partition一样多的RDD partition, 并且会并行从Kafka中读取数据. 所以在Kafka partition和RDD partition之间, 有一个一对一的映射关系.
        • 2、高性能

          • 如果要保证零数据丢失, 在基于receiver的方式中, 需要开启WAL机制. 这种方式其实效率低下, 因为数据实际上被复制了两份, Kafka自己本身就有高可靠的机制, 会对数据复制一份, 而这里又会复制一份到WAL中. 而基于direct的方式, 不依赖Receiver, 不需要开启WAL机制, 只要Kafka中作了数据的复制, 那么就可以通过Kafka的副本进行恢复.
        • 3、一次且仅一次的事务机制

          • 基于receiver的方式, 是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的. 这是消费Kafka数据的传统方式. 这种方式配合着WAL机制可以保证数据零丢失的高可靠性, 但是却无法保证数据被处理一次且仅一次, 可能会处理两次. 因为Spark和ZooKeeper之间可能是不同步的. 基于direct的方式, 使用kafka的简单api, Spark Streaming自己就负责追踪消费的offset, 并保存在checkpoint中. Spark自己一定是同步的, 因此可以保证数据是消费一次且仅消费一次。不过需要自己完成将offset写入zk的过程,在官方文档中都有相应介绍.
            *简单代码实例:

            messages.foreachRDD(rdd=>{
            val message = rdd.map(_._2)//对数据进行一些操作
            message.map(method)//更新zk上的offset (自己实现)
            updateZKOffsets(rdd)
            })
            

            * sparkStreaming程序自己消费完成后,自己主动去更新zk上面的偏移量。也可以将zk中的偏移量保存在mysql或者redis数据库中,下次重启的时候,直接读取mysql或者redis中的偏移量,获取到上次消费的偏移量,接着读取数据。

    9.如何保证数据不重复/不丢失

    1.设置checkpoint

    ​ 1.检查点的开启可以保证作业在失败时可以充checkpoint中获取最新的数据状态恢复作业,从而保证job在失败前后的联系性。

    ​ 2.checkpoint中还会保存kafka的offset信息,从而保证不重复消费数据。

    2.手动更新kafkaoffset

    ​ 1.Kafka 0.10 之后使用的 direct API 为kafka底层API offset的跟踪交给spark,spark使用checkpoint保存offset。用户代码可以手动在逻辑处理完毕之后将offset信息更新到kafka中,或者保存到外部存储中。

    ​ [1] job重启是怎么指定从checkpoint 答案-job失败时主动从checkpoint中重启 --文章内连接 : command+鼠标点击

    3.保证数据处理的唯一性,使用mysql/redis保存已消费的数据,做唯一性校验。

    10. spark任务暂停,修改计算逻辑再重启,checkpoint保存状态数据是否可用?? checkpoint中都保存了什么数据???

    ​ 1.checkpoint保存的数据不可再用,逻辑的调整会导致job内部的依赖关系发生变化,job的graph发生变化即无法再找到对应关系无法恢复。

    ​ 2. 分两部分 [元数据chechpoint和数据checkpoint 键](#3. Spark checkpoint & Accumulators & Broadcast Variables 作用及应用场景。)

    20. 参考文章

    spark相关面试题

    基于zookeeper搭建spark高可用集群


    1. ↩︎
    展开全文
  • Spark 面试题

    2020-12-03 09:37:55
    2. Spark的优化? 3. Task与Job之间的关系 4. 任务提交流程(18步图) 5. RDD的弹性表现在哪里? 6. Transform 类型的RDD与action类型的RDD各有哪些? 7. 发生Shuffle的算子有哪些? 8. Spark Streaming对应kafka中...

    1. 了解shuffle代码

    HashShuffle
    在这里插入图片描述
    在这里插入图片描述

    SortShuffle

    改进的主要原因
    Linux最大一次能打开的文件数量是1024个,所以优化的方向就是减少文件数量

    hash shuffle 文件数=executor数量* core数* map task数* 分区数
    改进后的hashshuffle文件数=executor数量* core数* 1*分区数
    sorshuffle文件数=executor数量* core数
    

    2. Spark的优化?

    • 合理设置并行度,比如某个stage有150个task要运行,那么可以比如说50个executor,然后每个executo**r 3个core . 如果你只有100个task需要运行,那么这个配置就属于资源浪费了

    3. Task与Job之间的关系

    job -> 一个或多个stage -> 一个或多个task
    

    4. 任务提交流程

    1 提交任务的节点启动一个driver(client)进程;
    2 dirver进程启动以后,首先是构建sparkcontext,sparkcontext主要包含两部分:DAGSchedulerTaskScheduler

    DAGScheduler:
    根据job划分N个stage,每一个stage都会有一个taskSet,
    然后将taskSet发送给taskScheduler
    
    TaskScheduler
    会寻找Master节点,Master节点接收到Application的注册请求后
    ,通过资源调度算法,在自己的集群的worker上启动Executor进程;
    启动的executor也会反向注册到 TaskScheduler上
    

    3 Executor每接收到一个task,都会用TaskRunner封装task,然后从线程池中取出一个线程去执行taskTaskRunner

    主要包含两种task:ShuffleMapTaskResultTask

    5. RDD的弹性表现在哪里?

    1.自动进行内存和磁盘切换
    2.基于lineage的高效容错
    3.task如果失败会特定次数的重试
    4.stage如果失败会自动进行特定次数的重试,而且只会只计算失败的分片
    5.checkpoint【每次对RDD操作都会产生新的RDD,如果链条比较长,计算比较笨重,就把数据放在硬盘中】和persist 【内存或磁盘中对数据进行复用】(检查点、持久化)
    6.数据调度弹性:DAG TASK 和资源管理无关
    7.数据分片的高度弹性repartion

    6. Transform 类型的RDD与action类型的RDD各有哪些?

    Transform 类型:map,filter,mapPartitions,xxbyKey,join,repartition,distinct等等
    action类型:collect,reduce,take,takeOrdered,

    7. 发生Shuffle的算子有哪些?

    1.去重操作:

    Distinct等。
    在这里插入图片描述

    2.聚合,byKey类操作

    reduceByKey、groupByKey、sortByKey等。

    byKey类的操作要对一个key,进行聚合操作,那么肯定要保证集群中,所有节点上的相同的key,移动到同一个节点上进行处理。

    3.排序操作:

    sortByKey等。

    4.重分区操作:

    repartition、repartitionAndSortWithinPartitions、coalesce(shuffle=true)等。

    重分区一般会shuffle,因为需要在整个集群中,对之前所有的分区的数据进行随机,均匀的打乱,然后把数据放入下游新的指定数量的分区内。

    5.集合或者表操作:

    join、cogroup等。

    两个rdd进行join,就必须将相同join key的数据,shuffle到同一个节点上,然后进行相同key的两个rdd数据的笛卡尔乘积。

    8. Spark Streaming对应kafka中的三种语义,分别是什么?

    9. Spark任务提交参数有哪些?(结合项目)

    spark-submit --class org.apache.spark.examples.SparkPi \
        --master yarn \
        --deploy-mode client \
        --driver-memory 2g \
        --executor-memory 2g \
        --executor-cores 2 \
        /usr/local/spark/examples/jars/spark-examples_2.11-2.2.3.jar  \
        100
    

    10. RDD的特性?

    1 不可变的:RDD一旦被创建,数据是只读的,如果更改数据,会产生新的RDD
    2 分区:一个RDD有多个Partition,分区也可以根据业务需求来指定
    3 并行的: 因为有多个分区,所以可以并行计算.
    

    11. 写Spark SQL

    DSL风格

    def test8(): Unit ={
        val spark:SparkSession = SparkSession.builder()
          .master("local[1]")
          .appName("SparkByExamples.com")
          .getOrCreate()
        import spark.implicits._
        spark.conf.set("spark.sql.shuffle.partitions",100)
        val simpleData = Seq(("James","Sales","NY",90000,34,10000),
          ("Michael","Sales","NY",86000,56,20000),
          ("Robert","Sales","CA",81000,30,23000),
          ("Maria","Finance","CA",90000,24,23000),
          ("Raman","Finance","CA",99000,40,24000),
          ("Scott","Finance","NY",83000,36,19000),
          ("Jen","Finance","NY",79000,53,15000),
          ("Jeff","Marketing","CA",80000,25,18000),
          ("Kumar","Marketing","NY",91000,50,21000)
        )
        val df: DataFrame = simpleData.toDF("employee_name","department","state","salary","age","bonus")
        df.select("*").where("state='NY'").show()
      }
    

    运行结果

    +-------------+----------+-----+------+---+-----+
    |employee_name|department|state|salary|age|bonus|
    +-------------+----------+-----+------+---+-----+
    |        James|     Sales|   NY| 90000| 34|10000|
    |      Michael|     Sales|   NY| 86000| 56|20000|
    |        Scott|   Finance|   NY| 83000| 36|19000|
    |          Jen|   Finance|   NY| 79000| 53|15000|
    |        Kumar| Marketing|   NY| 91000| 50|21000|
    +-------------+----------+-----+------+---+-----+
    

    SQL风格

    def test7(): Unit ={
        val spark:SparkSession = SparkSession.builder()
          .master("local[1]")
          .appName("SparkByExamples.com")
          .getOrCreate()
        import spark.implicits._
        spark.conf.set("spark.sql.shuffle.partitions",100)
        val simpleData = Seq(("James","Sales","NY",90000,34,10000),
          ("Michael","Sales","NY",86000,56,20000),
          ("Robert","Sales","CA",81000,30,23000),
          ("Maria","Finance","CA",90000,24,23000),
          ("Raman","Finance","CA",99000,40,24000),
          ("Scott","Finance","NY",83000,36,19000),
          ("Jen","Finance","NY",79000,53,15000),
          ("Jeff","Marketing","CA",80000,25,18000),
          ("Kumar","Marketing","NY",91000,50,21000)
        )
        val df: DataFrame = simpleData.toDF("employee_name","department","state","salary","age","bonus")
        df.createTempView("tmp")
        val sql: String =
          """
            |select * from tmp
            |where state="NY"
            |""".stripMargin
        spark.sql(sql).show()
      }
    

    12. Spark跑任务的方式及时间?

    13. Spark Streaming数据激增如何解决?

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 8,432
精华内容 3,372
关键字:

spark面试题