spark面试题_spark streaming 面试题 - CSDN
精华内容
参与话题
  • Spark面试,Spark面试题,Spark面试汇总

    千次阅读 多人点赞 2020-01-09 16:53:31
    1、你觉得spark 可以完全替代hadoop 么? Spark 会替代 MR,Spark 存储依赖 HDFS,资源调度依赖 YARN,集群管理依赖 Zookeeper。 2、Spark消费 Kafka,分布式的情况下,如何保证消息的顺序? Kafka 分布式的单位是 ...

    Table of Contents

    1、你觉得spark 可以完全替代hadoop 么?

    2、Spark消费 Kafka,分布式的情况下,如何保证消息的顺序?

    3、对于 Spark 中的数据倾斜问题你有什么好的方案?

    4、你所理解的 Spark 的 shuffle 过程?

    5、Spark有哪些聚合类的算子,我们应该尽量避免什么类型的算子?

    6、spark on yarn 作业执行流程,yarn-client 和 yarn cluster 有什么区别

    7、Spark为什么快,Spark SQL 一定比 Hive 快吗

    8、RDD, DAG, Stage怎么理解?

    9、RDD 如何通过记录更新的方式容错

    10、宽依赖、窄依赖怎么理解?

    11、Job 和 Task 怎么理解

    ​12、Spark 血统的概念

    13、任务的概念

    14、容错方法

    15、Spark 粗粒度和细粒度

    16、Spark优越性

    17、Transformation和action是什么?区别?举几个常用方法

    18、Spark作业提交流程是怎么样的

    19、Spark streamning工作流程是怎么样的,和Storm比有什么区别

    ​20、Spark 机器学习和 Spark 图计算接触过没有,举例说明你用它做过什么?

    21、Spark RDD是怎么容错的,基本原理是什么?

    22、为什么要用Yarn来部署Spark?

    23、说说yarn-cluster和yarn-client的异同点。

    24、解释一下 groupByKey, reduceByKey 还有 reduceByKeyLocally

    25、说说 persist() 和 cache() 的异同

    26、可以解释一下这两段程序的异同吗

    27、说说map和mapPartitions的区别

    28、groupByKey和reduceByKey是属于Transformation还是 Action?

    29、说说Spark支持的3种集群管理器

    30、说说Worker和Excutor的异同

    31、说说Spark提供的两种共享变量

    32、说说检查点的意义

    33、说说Spark的高可用和容错

    34、解释一下Spark Master的选举过程

    35、说说Spark如何实现序列化组件的

    36、说说对Master的理解

    37、说说什么是窗口间隔和滑动间隔

    38、Spark Streaming小文件问题

    39、Spark的UDF?

    40、Mesos下粗粒度和细粒度对比?

    41、Spark Local和Standalone有什么区别

    42、说说SparkContext和SparkSession有什么区别?

    43、如果Spark Streaming停掉了,如何保证Kafka的重新运作是合理的呢

    44、列举Spark中 Transformation 和 Action算子

    45、Spark经常说的Repartition是个什么玩意

    46、Spark Streaming Duration的概念

    47、简单写一个WordCount程序

    48、说说Yarn-cluster的运行阶段

    49、Mesos粗细度对比

    50、说说Standalone模式下运行Spark程序的大概流程

    51、如何区分 Appliction(应用程序)还有 Driver(驱动程序)

    52、介绍一下 Spark 通信的启动方式

    53、介绍一下 Spark 运行时候的消息通信

    54、解释一下Stage

    55、描述一下Worker异常的情况

    56、描述一下Master异常的情况

    57、Spark的存储体系

    ​58、简述Spark Streaming

    59、知道 Hadoop MRv1 的局限吗

    60、说说Spark的特点,相对于MR来说

    61、说说Spark Narrow Dependency的分类

    62、Task和Stage的分类

    63、Spark的编程模型

    64、Spark的计算模型

    65、总述Spark的架构

    66、一句话说说 Spark Streaming 是如何收集和处理数据的

    67、解释一下窗口间隔window duration和滑动间隔slide duration

    68、介绍一下Spark Streaming的foreachRDD(func)方法

    69、简单描述一下Spark Streaming的容错原理

    70、DStream 有几种转换操作

    71、聊聊Spark Streaming的运行架构

    ​72、说说DStreamGraph

    73、创建RDD的方式以及如何继承创建RDD

    74、分析一下Spark Streaming的transform()和updateStateByKey()两个操作

    75、说说Spark Streaming的输出操作

    76、谈谈Spark Streaming Driver端重启会发生什么

    77、再谈Spark Streaming的容错性

    78、流数据如何存储

    79、StreamingContext启动时序图吗

    80、说说RDD和DataFrame和DataSet的关系


    1、你觉得spark 可以完全替代hadoop 么?

    Spark 会替代 MR,Spark 存储依赖 HDFS,资源调度依赖 YARN,集群管理依赖 Zookeeper。

    2、Spark消费 Kafka,分布式的情况下,如何保证消息的顺序?

    Kafka 分布式的单位是 Partition。如何保证消息有序,需要分几个情况讨论。

    • 同一个 Partition 用一个 write ahead log 组织,所以可以保证 FIFO 的顺序。

    • 不同 Partition 之间不能保证顺序。但是绝大多数用户都可以通过 message key 来定义,因为同一个 key 的 message 可以保证只发送到同一个 Partition。比如说 key 是 user id,table row id 等等,所以同一个 user 或者同一个 record 的消息永远只会发送到同一个 Partition上,保证了同一个 user 或 record 的顺序。

    • 当然,如果你有 key skewness 就有些麻烦,需要特殊处理。

    实际情况中: (1)不关注顺序的业务大量存在;(2)队列无序不代表消息无序。

    第(2)条的意思是说::我们不保证队列的全局有序,但可以保证消息的局部有序。举个例子: 保证来自同1个 order id 的消息,是有序的!

    Kafka 中发送1条消息的时候,可以指定(topic, partition, key) 3个参数。partiton 和 key 是可选的。如果你指定了 partition,那就是所有消息发往同1个 partition,就是有序的。并且在消费端,Kafka 保证,1个 partition 只能被1个 consumer 消费。或者你指定 key(比如 order id),具有同1个 key 的所有消息,会发往同1个 partition。也是有序的。

    3、对于 Spark 中的数据倾斜问题你有什么好的方案?

    简单一句:Spark 数据倾斜的几种场景以及对应的解决方案,包括避免数据源倾斜,调整并行度,使用自定义 Partitioner,使用 Map 侧 Join 代替 Reduce 侧 Join(内存表合并),给倾斜 Key 加上随机前缀等。

    什么是数据倾斜 对 Spark/Hadoop 这样的大数据系统来讲,数据量大并不可怕,可怕的是数据倾斜。数据倾斜指的是,并行处理的数据集中,某一部分(如 Spark 或 Kafka 的一个 Partition)的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈(木桶效应)。

    数据倾斜是如何造成的 在 Spark 中,同一个 Stage 的不同 Partition 可以并行处理,而具有依赖关系的不同 Stage 之间是串行处理的。假设某个 Spark Job 分为 Stage 0和 Stage 1两个 Stage,且 Stage 1依赖于 Stage 0,那 Stage 0完全处理结束之前不会处理Stage 1。而 Stage 0可能包含 N 个 Task,这 N 个 Task 可以并行进行。如果其中 N-1个 Task 都在10秒内完成,而另外一个 Task 却耗时1分钟,那该 Stage 的总时间至少为1分钟。换句话说,一个 Stage 所耗费的时间,主要由最慢的那个 Task 决定。由于同一个 Stage 内的所有 Task 执行相同的计算,在排除不同计算节点计算能力差异的前提下,不同 Task 之间耗时的差异主要由该 Task 所处理的数据量决定。

    具体解决方案 :

    1. 调整并行度分散同一个 Task 的不同 Key:Spark 在做 Shuffle 时,默认使用 HashPartitioner对数据进行分区。如果并行度设置的不合适,可能造成大量不相同的 Key 对应的数据被分配到了同一个 Task 上,造成该 Task 所处理的数据远大于其它 Task,从而造成数据倾斜。如果调整 Shuffle 时的并行度,使得原本被分配到同一 Task 的不同 Key 发配到不同 Task 上处理,则可降低原 Task 所需处理的数据量,从而缓解数据倾斜问题造成的短板效应。

    2. 自定义Partitioner:使用自定义的 Partitioner(默认为 HashPartitioner),将原本被分配到同一个 Task 的不同 Key 分配到不同 Task,可以拿上图继续想象一下,通过自定义 Partitioner 可以把原本分到 Task0 的 Key 分到 Task1,那么 Task0 的要处理的数据量就少了。 

    3. 将 Reduce side(侧) Join 转变为 Map side(侧) Join:通过 Spark 的 Broadcast 机制,将 Reduce 侧 Join 转化为 Map 侧 Join,避免 Shuffle 从而完全消除 Shuffle 带来的数据倾斜。可以看到 RDD2 被加载到内存中了。

    4. 为 skew 的 key 增加随机前/后缀:为数据量特别大的 Key 增加随机前/后缀,使得原来 Key 相同的数据变为 Key 不相同的数据,从而使倾斜的数据集分散到不同的 Task 中,彻底解决数据倾斜问题。Join 另一则的数据中,与倾斜 Key 对应的部分数据,与随机前缀集作笛卡尔乘积,从而保证无论数据倾斜侧倾斜 Key 如何加前缀,都能与之正常 Join。

    5. 大表随机添加 N 种随机前缀,小表扩大 N 倍:如果出现数据倾斜的 Key 比较多,上一种方法将这些大量的倾斜 Key 分拆出来,意义不大(很难一个 Key 一个 Key 都加上后缀)。此时更适合直接对存在数据倾斜的数据集全部加上随机前缀,然后对另外一个不存在严重数据倾斜的数据集整体与随机前缀集作笛卡尔乘积(即将数据量扩大 N 倍),可以看到 RDD2 扩大了 N 倍了,再和加完前缀的大数据做笛卡尔积。

    4、你所理解的 Spark 的 shuffle 过程?

    Spark shuffle 处于一个宽依赖,可以实现类似混洗的功能,将相同的 Key 分发至同一个 Reducer上进行处理。

    5、Spark有哪些聚合类的算子,我们应该尽量避免什么类型的算子?

    在我们的开发过程中,能避免则尽可能避免使用 reduceByKey、join、distinct、repartition 等会进行 shuffle 的算子,尽量使用 map 类的非 shuffle 算子。这样的话,没有 shuffle 操作或者仅有较少 shuffle 操作的 Spark 作业,可以大大减少性能开销。

    6、spark on yarn 作业执行流程,yarn-client 和 yarn cluster 有什么区别

    Spark On Yarn 的优势 

    1. Spark 支持资源动态共享,运行于 Yarn 的框架都共享一个集中配置好的资源池 

    2. 可以很方便的利用 Yarn 的资源调度特性来做分类·,隔离以及优先级控制负载,拥有更灵活的调度策略 

    3. Yarn 可以自由地选择 executor 数量 

    4. Yarn 是唯一支持 Spark 安全的集群管理器,使用 Yarn,Spark 可以运行于 Kerberos Hadoop 之上,在它们进程之间进行安全认证

    yarn-client 和 yarn cluster 的异同 :

    1. 从广义上讲,yarn-cluster 适用于生产环境。而 yarn-client 适用于交互和调试,也就是希望快速地看到 application 的输出。

    2. 从深层次的含义讲,yarn-cluster 和 yarn-client 模式的区别其实就是 Application Master 进程的区别,yarn-cluster 模式下,driver 运行在 AM(Application Master)中,它负责向 YARN 申请资源,并监督作业的运行状况。当用户提交了作业之后,就可以关掉 Client,作业会继续在 YARN 上运行。然而 yarn-cluster 模式不适合运行交互类型的作业。而 yarn-client 模式下,Application Master 仅仅向 YARN 请求 executor,Client 会和请求的 container 通信来调度他们工作,也就是说 Client 不能离开。

    7、Spark为什么快,Spark SQL 一定比 Hive 快吗

    Spark SQL 比 Hadoop Hive 快,是有一定条件的,而且不是 Spark SQL 的引擎比 Hive 的引擎快,相反,Hive 的 HQL 引擎还比 Spark SQL 的引擎更快。其实,关键还是在于 Spark 本身快。

    1. 消除了冗余的 HDFS 读写:Hadoop 每次 shuffle 操作后,必须写到磁盘,而 Spark 在 shuffle 后不一定落盘,可以 cache 到内存中,以便迭代时使用。如果操作复杂,很多的 shufle 操作,那么 Hadoop 的读写 IO 时间会大大增加,也是 Hive 更慢的主要原因了。

    2. 消除了冗余的 MapReduce 阶段:Hadoop 的 shuffle 操作一定连着完整的 MapReduce 操作,冗余繁琐。而 Spark 基于 RDD 提供了丰富的算子操作,且 reduce 操作产生 shuffle 数据,可以缓存在内存中。

    3. JVM 的优化:Hadoop 每次 MapReduce 操作,启动一个 Task 便会启动一次 JVM,基于进程的操作。而 Spark 每次 MapReduce 操作是基于线程的,只在启动 Executor 是启动一次 JVM,内存的 Task 操作是在线程复用的。每次启动 JVM 的时间可能就需要几秒甚至十几秒,那么当 Task 多了,这个时间 Hadoop 不知道比 Spark 慢了多少。

    记住一种反例 考虑一种极端查询:

    Select month_id, sum(sales) from T group by month_id;

    这个查询只有一次 shuffle 操作,此时,也许 Hive HQL 的运行时间也许比 Spark 还快,反正 shuffle 完了都会落一次盘,或者都不落盘。

    结论 Spark 快不是绝对的,但是绝大多数,Spark 都比 Hadoop 计算要快。这主要得益于其对 mapreduce 操作的优化以及对 JVM 使用的优化。

    8、RDD, DAG, Stage怎么理解?

    DAG:Spark 中使用 DAG 对 RDD 的关系进行建模,描述了 RDD 的依赖关系,这种关系也被称之为 lineage(血缘),RDD 的依赖关系使用 Dependency 维护。DAG 在 Spark 中的对应的实现为 DAGScheduler。

    RDD:RDD 是 Spark 的灵魂,也称为弹性分布式数据集。一个 RDD 代表一个可以被分区的只读数据集。RDD 内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。

    Rdd的五个特征:

    1. dependencies:建立 RDD 的依赖关系,主要 RDD 之间是宽窄依赖的关系,具有窄依赖关系的 RDD 可以在同一个 stage 中进行计算。

    2. partition:一个 RDD 会有若干个分区,分区的大小决定了对这个 RDD 计算的粒度,每个 RDD 的分区的计算都在一个单独的任务中进行。

    3. preferedlocations:按照“移动数据不如移动计算”原则,在 Spark 进行任务调度的时候,优先将任务分配到数据块存储的位置。

    4. compute:Spark 中的计算都是以分区为基本单位的,compute 函数只是对迭代器进行复合,并不保存单次计算的结果。

    5. partitioner: 只存在于(K,V)类型的 RDD 中,非(K,V)类型的 partitioner 的值就是 None。

    RDD 的算子主要分成2类,action 和 transformation。这里的算子概念,可以理解成就是对数据集的变换。action 会触发真正的作业提交,而 transformation 算子是不会立即触发作业提交的。每一个 transformation 方法返回一个新的 RDD。只是某些 transformation 比较复杂,会包含多个子 transformation,因而会生成多个 RDD。这就是实际 RDD 个数比我们想象的多一些 的原因。通常是,当遇到 action 算子时会触发一个job的提交,然后反推回去看前面的 transformation 算子,进而形成一张有向无环图。

    Stage 在 DAG 中又进行 stage 的划分,划分的依据是依赖是否是 shuffle 的,每个 stage 又可以划分成若干 task。接下来的事情就是 driver 发送 task 到 executor,executor 自己的线程池去执行这些 task,完成之后将结果返回给 driver。action 算子是划分不同 job 的依据。

    9、RDD 如何通过记录更新的方式容错

    RDD 的容错机制实现分布式数据集容错方法有两种:1. 数据检查点 2. 记录更新。

    RDD 采用记录更新的方式:记录所有更新点的成本很高。所以,RDD只支持粗颗粒变换,即只记录单个块(分区)上执行的单个操作,然后创建某个 RDD 的变换序列(血统 lineage)存储下来;变换序列指,每个 RDD 都包含了它是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。因此 RDD 的容错机制又称“血统”容错。

    10、宽依赖、窄依赖怎么理解?

    窄依赖指的是每一个 parent RDD 的 partition 最多被子 RDD 的一个 partition 使用(一子一亲)。

    宽依赖指的是多个子 RDD 的 partition 会依赖同一个 parent RDD的 partition(多子一亲)。

    RDD 作为数据结构,本质上是一个只读的分区记录集合。一个 RDD 可以包含多个分区,每个分区就是一个 dataset 片段。RDD 可以相互依赖。

    首先,窄依赖可以支持在同一个 cluster node上,以 pipeline 形式执行多条命令(也叫同一个 stage 的操作),例如在执行了 map 后,紧接着执行 filter。相反,宽依赖需要所有的父分区都是可用的,可能还需要调用类似 MapReduce 之类的操作进行跨节点传递。

    其次,则是从失败恢复的角度考虑。窄依赖的失败恢复更有效,因为它只需要重新计算丢失的 parent partition 即可,而且可以并行地在不同节点进行重计算(一台机器太慢就会分配到多个节点进行),相反,宽依赖牵涉 RDD 各级的多个 parent partition。

    11、Job 和 Task 怎么理解

    Job:Spark 的 Job 来源于用户执行 action 操作(这是 Spark 中实际意义的 Job),就是从 RDD 中获取结果的操作,而不是将一个 RDD 转换成另一个 RDD 的 transformation 操作。

    Task:一个 Stage 内,最终的 RDD 有多少个 partition,就会产生多少个 task。看一看图就明白了,可以数一数每个 Stage 有多少个 Task。

    640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1

     

     

    https://mmbiz.qpic.cn/mmbiz_png/UdK9ByfMT2OSwS8tHQeMicc0egREicTZ5RScIjsWSWADIAVBvBZ5NuKxvnbQnFljGSdib0IeCLklf7BleljHhjmWw/640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1
    12、Spark 血统的概念

    RDD 的 lineage 记录的是粗颗粒度的特定数据转换(transformation)操作(filter, map, join etc.)行为。当这个 RDD 的部分分区数据丢失时,它可以通过 lineage 获取足够的信息来重新运算和恢复丢失的数据分区。这种粗颗粒的数据模型,限制了 Spark 的运用场合,但同时相比细颗粒度的数据模型,也带来了性能的提升。

    13、任务的概念

    包含很多 task 的并行计算,可以认为是 Spark RDD 里面的 action,每个 action 的计算会生成一个 job。用户提交的 job 会提交给 DAGScheduler,job 会被分解成 Stage 和 Task。


    14、容错方法

    Spark 选择记录更新的方式。但是,如果更新粒度太细太多,那么记录更新成本也不低。因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建 RDD 的一系列变换序列(每个 RDD 都包含了他是如何由其他 RDD 变换过来的以及如何重建某一块数据的信息。因此 RDD 的容错机制又称血统容错)记录下来,以便恢复丢失的分区。lineage本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据。

    相比其他系统的细颗粒度的内存数据更新级别的备份或者 LOG 机制,RDD 的 lineage 记录的是粗颗粒度的特定数据 transformation 操作行为。当这个 RDD 的部分分区数据丢失时,它可以通过 lineage 获取足够的信息来重新运算和恢复丢失的数据分区。


    15、Spark 粗粒度和细粒度

    Spark 中,每个 application 对应一个 SparkContext。对于 SparkContext 之间的调度关系,取决于 Spark 的运行模式。对 Standalone 模式而言,Spark Master 节点先计算集群内的计算资源能否满足等待队列中的应用对内存和 CPU 资源的需求,如果可以,则 Master 创建 Spark Driver,启动应用的执行。宏观上来讲,这种对应用的调度类似于 FIFO 策略。在 Mesos 和 Yarn 模式下,底层的资源调度系统的调度策略都是由 Mesos 和 Yarn 决定的。具体分类描述如下:

    1. Standalone 模式:默认以用户提交 Applicaiton 的顺序来调度,即 FIFO 策略。每个应用执行时独占所有资源。如果有多个用户要共享集群资源,则可以使用参数 spark.cores.max 来配置应用在集群中可以使用的最大 CPU 核的数量。如果不配置,则采用默认参数 spark.deploy.defaultCore 的值来确定。

    2. Mesos 模式:如果在 Mesos 上运行 Spark,用户想要静态配置资源的话,可以设置 spark.mesos.coarse 为 true,这样 Mesos 变为粗粒度调度模式。然后可以设置 spark.cores.max 指定集群中可以使用的最大核数,与上面 Standalone 模式类似。同时,在 Mesos 模式下,用户还可以设置参数 spark.executor.memory 来配置每个 executor 的内存使用量。如果想使 Mesos 在细粒度模式下运行,可以通过 mesos://<url-info> 设置动态共享 CPU core 的执行模式。在这种模式下,应用不执行时的空闲 CPU 资源得以被其他用户使用,提升了 CPU 使用率。

    16、Spark优越性

    一、Spark 的5大优势:

    1. 更高的性能。因为数据被加载到集群主机的分布式内存中。数据可以被快速的转换迭代,并缓存用以后续的频繁访问需求。在数据全部加载到内存的情况下,Spark可以比Hadoop快100倍,在内存不够存放所有数据的情况下快hadoop10倍。

    2. 通过建立在Java,Scala,Python,SQL(应对交互式查询)的标准API以方便各行各业使用,同时还含有大量开箱即用的机器学习库。

    3. 与现有Hadoop 1和2.x(YARN)生态兼容,因此机构可以无缝迁移。

    4. 方便下载和安装。方便的shell(REPL: Read-Eval-Print-Loop)可以对API进行交互式的学习。

    5. 借助高等级的架构提高生产力,从而可以讲精力放到计算上。

    二、MapReduce与Spark相比,有哪些异同点:

    1、基本原理上:(1) MapReduce:基于磁盘的大数据批量处理系统 (2)Spark:基于RDD(弹性分布式数据集)数据处理,显示将RDD数据存储到磁盘和内存中。

    2、模型上:(1) MapReduce可以处理超大规模的数据,适合日志分析挖掘等较少的迭代的长任务需求,结合了数据的分布式的计算。(2) Spark:适合数据的挖掘,机器学习等多轮迭代式计算任务。

    17、Transformation和action是什么?区别?举几个常用方法

    RDD 创建后就可以在 RDD 上进行数据处理。RDD 支持两种操作:1. 转换(transformation): 即从现有的数据集创建一个新的数据集 2. 动作(action): 即在数据集上进行计算后,返回一个值给 Driver 程序

    RDD 的转化操作是返回一个新的 RDD 的操作,比如 map() 和 filter() ,而行动操作则是向驱动器程序返回结果或把结果写入外部系统的操作,会触发实际的计算,比如 count() 和 first() 。Spark 对待转化操作和行动操作的方式很不一样,因此理解你正在进行的操作的类型是很重要的。如果对于一个特定的函数是属于转化操作还是行动操作感到困惑,你可以看看它的返回值类型:转化操作返回的是 RDD,而行动操作返回的是其他的数据类型。

    RDD 中所有的 Transformation 都是惰性的,也就是说,它们并不会直接计算结果。相反的它们只是记住了这些应用到基础数据集(例如一个文件)上的转换动作。只有当发生一个要求返回结果给 Driver 的 Action 时,这些 Transformation 才会真正运行。

    这个设计让 Spark 更加有效的运行。

    18、Spark作业提交流程是怎么样的

    • spark-submit 提交代码,执行 new SparkContext(),在 SparkContext 里构造 DAGScheduler 和 TaskScheduler

    • TaskScheduler 会通过后台的一个进程,连接 Master,向 Master 注册 Application。

    • Master 接收到 Application 请求后,会使用相应的资源调度算法,在 Worker 上为这个 Application 启动多个 Executer。

    • Executor 启动后,会自己反向注册到 TaskScheduler 中。所有 Executor 都注册到 Driver 上之后,SparkContext 结束初始化,接下来往下执行我们自己的代码。

    • 每执行到一个 Action,就会创建一个 Job。Job 会提交给 DAGScheduler。

    • DAGScheduler 会将 Job划分为多个 stage,然后每个 stage 创建一个 TaskSet。

    • TaskScheduler 会把每一个 TaskSet 里的 Task,提交到 Executor 上执行。

    • Executor 上有线程池,每接收到一个 Task,就用 TaskRunner 封装,然后从线程池里取出一个线程执行这个 task。(TaskRunner 将我们编写的代码,拷贝,反序列化,执行 Task,每个 Task 执行 RDD 里的一个 partition)

    19、Spark streamning工作流程是怎么样的,和Storm比有什么区别

    Spark Streaming 与 Storm 都可以用于进行实时流计算。但是他们两者的区别是非常大的。其中区别之一,就是,Spark Streaming 和 Storm 的计算模型完全不一样,Spark Streaming 是基于 RDD 的,因此需要将一小段时间内的,比如1秒内的数据,收集起来,作为一个 RDD,然后再针对这个 batch 的数据进行处理。而 Storm 却可以做到每来一条数据,都可以立即进行处理和计算。因此,Spark Streaming 实际上严格意义上来说,只能称作准实时的流计算框架;而 Storm 是真正意义上的实时计算框架。此外,Storm 支持的一项高级特性,是 Spark Streaming 暂时不具备的,即 Storm 支持在分布式流式计算程序(Topology)在运行过程中,可以动态地调整并行度,从而动态提高并发处理能力。而 Spark Streaming 是无法动态调整并行度的。但是 Spark Streaming 也有其优点,首先 Spark Streaming 由于是基于 batch 进行处理的,因此相较于 Storm 基于单条数据进行处理,具有数倍甚至数十倍的吞吐量。此外,Spark Streaming 由于也身处于 Spark 生态圈内,因此Spark Streaming可以与Spark Core、Spark SQL,甚至是Spark MLlib、Spark GraphX进行无缝整合。流式处理完的数据,可以立即进行各种map、reduce转换操作,可以立即使用sql进行查询,甚至可以立即使用machine learning或者图计算算法进行处理。这种一站式的大数据处理功能和优势,是 Storm 无法匹敌的。因此,综合上述来看,通常在对实时性要求特别高,而且实时数据量不稳定,比如在白天有高峰期的情况下,可以选择使用 Storm。但是如果是对实时性要求一般,允许1秒的准实时处理,而且不要求动态调整并行度的话,选择Spark Streaming是更好的选择。


    20、Spark 机器学习和 Spark 图计算接触过没有,举例说明你用它做过什么?

    Spark 提供了很多机器学习库,我们只需要填入数据,设置参数就可以用了。使用起来非常方便。另外一方面,由于它把所有的东西都写到了内部,我们无法修改其实现过程。要想修改里面的某个环节,还的修改源码,重新编译。比如 kmeans 算法,如果没有特殊需求,很方便。但是spark内部使用的两个向量间的距离是欧式距离。如果你想改为余弦或者马氏距离,就的重新编译源码了。Spark 里面的机器学习库都是一些经典的算法,这些代码网上也好找。这些代码使用起来叫麻烦,但是很灵活。Spark 有一个很大的优势,那就是 RDD。模型的训练完全是并行的。

    Spark 的 ML 和 MLLib 两个包区别和联系

    1. 技术角度上,面向的数据集类型不一样:ML 的 API 是面向 Dataset 的(Dataframe 是 Dataset 的子集,也就是 Dataset[Row]), mllib 是面对 RDD 的。Dataset 和 RDD 有啥不一样呢?Dataset 的底端是 RDD。Dataset 对 RDD 进行了更深一层的优化,比如说有 sql 语言类似的黑魔法,Dataset 支持静态类型分析所以在 compile time 就能报错,各种 combinators(map,foreach 等)性能会更好,等等。

    2. 编程过程上,构建机器学习算法的过程不一样:ML 提倡使用 pipelines,把数据想成水,水从管道的一段流入,从另一端流出。ML 是1.4比 Mllib 更高抽象的库,它解决如果简洁的设计一个机器学习工作流的问题,而不是具体的某种机器学习算法。未来这两个库会并行发展。

    21、Spark RDD是怎么容错的,基本原理是什么?

    一般来说,分布式数据集的容错性有两种方式:数据检查点和记录数据的更新。 

    面向大规模数据分析,数据检查点操作成本很高,需要通过数据中心的网络连接在机器之间复制庞大的数据集,而网络带宽往往比内存带宽低得多,同时还需要消耗更多的存储资源。 

    因此,Spark选择记录更新的方式。但是,如果更新粒度太细太多,那么记录更新成本也不低。因此,RDD只支持粗粒度转换,即只记录单个块上执行的单个操作,然后将创建RDD的一系列变换序列(每个RDD都包含了他是如何由其他RDD变换过来的以及如何重建某一块数据的信息。因此RDD的容错机制又称“血统(Lineage)”容错)记录下来,以便恢复丢失的分区。 

    Lineage本质上很类似于数据库中的重做日志(Redo Log),只不过这个重做日志粒度很大,是对全局数据做同样的重做进而恢复数据。

    22、为什么要用Yarn来部署Spark?

    因为 Yarn 支持动态资源配置。Standalone 模式只支持简单的固定资源分配策略,每个任务固定数量的 core,各 Job 按顺序依次分配在资源,资源不够的时候就排队。这种模式比较适合单用户的情况,多用户的情境下,会有可能有些用户的任务得不到资源。

    Yarn 作为通用的种子资源调度平台,除了 Spark 提供调度服务之外,还可以为其他系统提供调度,如 Hadoop MapReduce, Hive 等。

    23、说说yarn-cluster和yarn-client的异同点。

    • cluster 模式会在集群的某个节点上为 Spark 程序启动一个称为 Master 的进程,然后 Driver 程序会运行正在这个 Master 进程内部,由这种进程来启动 Driver 程序,客户端完成提交的步骤后就可以退出,不需要等待 Spark 程序运行结束,这是四一职中适合生产环境的运行方式

    • client 模式也有一个 Master 进程,但是 Driver 程序不会运行在这个 Master 进程内部,而是运行在本地,只是通过 Master 来申请资源,直到运行结束,这种模式非常适合需要交互的计算。显然 Driver 在 client 模式下会对本地资源造成一定的压力。

    24、解释一下 groupByKey, reduceByKey 还有 reduceByKeyLocally

    Spark RDD 算子

    25、说说 persist() 和 cache() 的异同

    RDD的cache和persist的区别

    cache()是persist()的简化方式,调用persist的无参版本,也就是调用persist(StorageLevel.MEMORY_ONLY),cache只有一个默认的缓存级别MEMORY_ONLY,即将数据持久化到内存中,而persist可以通过传递一个 StorageLevel 对象来设置缓存的存储级别。

    DataFrame的cache和persist的区别

    cache()依然调用的persist(),但是persist调用cacheQuery,而cacheQuery的默认存储级别为MEMORY_AND_DISK,这点和rdd是不一样的。

    26、可以解释一下这两段程序的异同吗

    val counter = 0
    val data = Seq(1, 2, 3)
    data.foreach(x => counter += x)
    println("Counter value: " + counter)
    val counter = 0
    val data = Seq(1, 2, 3)
    var rdd = sc.parallelizze(data)
    rdd.foreach(x => counter += x)
    println("Counter value: " + counter)

    所有在 Driver 程序追踪的代码看上去好像在 Driver 上计算,实际上都不在本地,每个 RDD 操作都被转换成 Job 分发至集群的执行器 Executor 进程中运行,即便是单机本地运行模式,也是在单独的执行器进程上运行,与 Driver 进程属于不用的进程。所以每个 Job 的执行,都会经历序列化、网络传输、反序列化和运行的过程。

    再具体一点解释是 foreach 中的匿名函数 x => counter += x 首先会被序列化然后被传入计算节点,反序列化之后再运行,因为 foreach 是 Action 操作,结果会返回到 Driver 进程中。

    在序列化的时候,Spark 会将 Job 运行所依赖的变量、方法全部打包在一起序列化,相当于它们的副本,所以 counter 会一起被序列化,然后传输到计算节点,是计算节点上的 counter 会自增,而 Driver 程序追踪的 counter 则不会发生变化。执行完成之后,结果会返回到 Driver 程序中。而 Driver 中的 counter 依然是当初的那个 Driver 的值为0。

    因此说,RDD 操作不能嵌套调用,即在 RDD 操作传入的函数参数的函数体中,不可以出现 RDD 调用。

    27、说说map和mapPartitions的区别

    map 中的 func 作用的是 RDD 中每一个元素,而 mapPartitioons 中的 func 作用的对象是 RDD 的一整个分区。所以 func 的类型是 Iterator<T> => Iterator<T>,其中 T 是输入 RDD 的元素类型。

    28、groupByKey和reduceByKey是属于Transformation还是 Action?

    前者,因为 Action 输出的不再是 RDD 了,也就意味着输出不是分布式的,而是回送到 Driver 程序。以上两种操作都是返回 RDD,所以应该属于 Transformation。

    29、说说Spark支持的3种集群管理器

    Standalone 模式:资源管理器是 Master 节点,调度策略相对单一,只支持先进先出模式。

    Hadoop Yarn 模式:资源管理器是 Yarn 集群,主要用来管理资源。Yarn 支持动态资源的管理,还可以调度其他实现了 Yarn 调度接口的集群计算,非常适用于多个集群同时部署的场景,是目前最流行的一种资源管理系统。

    Apache Mesos:Mesos 是专门用于分布式系统资源管理的开源系统,与 Yarn 一样是 C++ 开发,可以对集群中的资源做弹性管理。

    30、说说Worker和Excutor的异同

    Worker 是指每个及节点上启动的一个进程,负责管理本节点,jps 可以看到 Worker 进程在运行。Excutor 每个Spark 程序在每个节点上启动的一个进程,专属于一个 Spark 程序,与 Spark 程序有相同的生命周期,负责 Spark 在节点上启动的 Task,管理内存和磁盘。如果一个节点上有多个 Spark 程序,那么相应就会启动多个执行器。

    31、说说Spark提供的两种共享变量

    Spark 程序的大部分操作都是 RDD 操作,通过传入函数给 RDD 操作函数来计算,这些函数在不同的节点上并发执行,内部的变量有不同的作用域,不能相互访问,有些情况下不太方便。

    1. 广播变量,是一个只读对象,在所有节点上都有一份缓存,创建方法是 SparkContext.broadcast()。创建之后再更新它的值是没有意义的,一般用 val 来修改定义。

    2. 计数器,只能增加,可以用计数或求和,支持自定义类型。创建方法是 SparkContext.accumulator(V, name)。只有 Driver 程序可以读这个计算器的变量,RDD 操作中读取计数器变量是无意义的。

    以上两种类型都是 Spark 的共享变量。

    32、说说检查点的意义

    在容错机制中,如果一个节点死机了,而且运算窄依赖,则只要把丢失的父RDD分区重算即可,不依赖于其他节点。而宽依赖需要父RDD的所有分区都存在,重算就很昂贵了。可以这样理解开销的经济与否:在窄依赖中,在子RDD的分区丢失、重算父RDD分区时,父RDD相应分区的所有数据都是子RDD分区的数据,并不存在冗余计算。在宽依赖情况下,丢失一个子RDD分区重算的每个父RDD的每个分区的所有数据并不是都给丢失的子RDD分区用的,会有一部分数据相当于对应的是未丢失的子RDD分区中需要的数据,这样就会产生冗余计算开销,这也是宽依赖开销更大的原因。因此如果使用Checkpoint算子来做检查点,不仅要考虑Lineage是否足够长,也要考虑是否有宽依赖,对宽依赖加Checkpoint是最物有所值的。

    33、说说Spark的高可用和容错

    Spark 应用程序的高可用性主要包含两个部分:集群环境的高可用以及应用程序的容错特性;集群环境的高可用,主要由集群框架来控制,比如 spark on yarn 模式下的 ResourceManager 的 HA、Spark Standalone 模式下的 Master HA 等特性的设置保障集群的高可用性;至于应用程序的容错需要考虑应用的各个组成部分的容错。

    spark 应用程序执行过程中,一般存在以下失败的情况:

    • Driver 集成宕机:Driver 运行机器宕机、Driver 程序运行过程中异常导致宕机
    • Executor 进程宕机:Executor 所在的work 宕机,Exector 和 Driver 通信超时
    • Task 执行失败:task 执行过程发生异常导致失败

    Driver 进程宕机解决方案:

    • 监控机器机器是否存活,如果机器宕机,重启服务机器和 spark 集群
    • 通过 spark job 的 history 服务监控应用是否执行成功,如果执行失败,通过开发人员重启服务即可
    • SparkStreaming 中,重启spark应用后,可通过 checkpoint 进行job数据恢复

    Executor 宕机解决方案:选择一个work 节点重启Executor 进程,Driver 重新分配任务

    Task 执行失败解决方案:

    • Spark 会自动进行 task 重试机制,如果某个 task 失败重试次数超过3次(spark.task.maxFailures)后,当前job 执行失败;local 模式默认不启用 task 重试机制
    • Task 数据恢复/重新运行的机制实际上是 RDD 容错机制,即 Lineage 机制,RDD的 Lineage 机制记录的是粗粒度的特定数据的 Transformation 操作行为。当这个 RDD 的部分数据丢失时,它可以通过 lineage 获取足够的信息来重新运算和恢复丢失的数据分区;该机制体现在RDD上就是RDD依赖特性
    • 如果 rdd 的 lineage 的生命线特别长,此时某些 task 执行失败的恢复成本就会比较高,那么可以采用检查点或缓存的方式将数据冗余下来,当检查点/缓存点之后的rdd的task出现异常的时候,可以直接从检查点重新构建lineage,可以减少执行开销。

    34、解释一下Spark Master的选举过程

    Master作为Spark standalone模式的核心,如果Master出现异常,那么集群就不能正常工作。所以Spark会从Standby中选择一个节点作为Master.

     Spark支持以下几种策略,这种策略可以通过配置文件spark-env.sh配置spark.deploy.recoveryMode

    • ZOOKEEPER: 集群元数据持久化到zookeeper,当master出现异常的时候,zookeeper会通过选举机制选举出新的Master,新的Master接管集群时需要从zookeeper获取持久化信息,并根据这些信息恢复集群状态
    • FILESYSTEM: 集群的元数据持久化到文件系统,当Master出现异常的时候,只要在该机器上重启Master,启动后的Master获取持久化信息并根据持久化信息恢复集群状态
    • CUSTOM: 自定义恢复模式,实现StandaloneRecoveryModeFactory抽象类进行实现,并把该类配置到配置文件,当Master出现异常,会根据用户自定义的方式进行恢复集群状况
    • NONE: 不持久化集群元数据,当Master出现异常时,新启动的Master不进行恢复集群状态

    35、说说Spark如何实现序列化组件的

    Spark通过两种方式来创建序列化器

    Java序列化

    在默认情况下,Spark 采用 Java的 ObjectOutputStream 序列化一个对象。该方式适用于所有实现了 java.io.Serializable 的类。通过继承 java.io.Externalizable,你能进一步控制序列化的性能。Java序列化非常灵活,但是速度较慢,在某些情况下序列化的结果也比较大。

    Kryo序列化

    Spark 也能使用 Kryo(版本2)序列化对象。Kryo 不但速度极快,而且产生的结果更为紧凑(通常能提高10倍)。Kryo 的缺点是不支持所有类型,为了更好的性能,你需要提前注册程序中所使用的类(class)。

    Java 的序列化比较简单,就和前面的一样,下面主要介绍Kryo序列化的使用。

    Kryo序列化怎么用?

    可以在创建 SparkContext 之前,通过调用 System.setProperty("spark.serializer", "spark.KryoSerializer"),将序列化方式切换成Kryo。

    但是 Kryo 需要用户进行注册,这也是为什么 Kryo 不能成为 Spark 序列化默认方式的唯一原因,但是建议对于任何“网络密集型”(network-intensive)的应用,都采用这种方式进行序列化方式。

    Kryo文档描述了很多便于注册的高级选项,例如添加用户自定义的序列化代码。

    如果对象非常大,你还需要增加属性 spark.kryoserializer.buffer.mb 的值。该属性的默认值是32,但是该属性需要足够大以便能够容纳需要序列化的最大对象。

    最后,如果你不注册你的类,Kryo仍然可以工作,但是需要为了每一个对象保存其对应的全类名(full class name),这是非常浪费的。

    package com.zhangpengfei.spark_demo.kyro
    
    import com.esotericsoftware.kryo.Kryo
    import org.apache.spark.SparkContext
    import org.apache.spark.serializer.KryoRegistrator
    
    
    class KryoDemo1 {}
    
    class KyroClass extends KryoRegistrator {
      override def registerClasses(kryo: Kryo): Unit = {
        kryo.register(classOf[KryoDemo1])
      }
    }
    object KryoClass {
      def main(args: Array[String]): Unit = {
        System.setProperty("spark.serializer", "spark.KryoSerializer")
        System.setProperty("spark.kryo.registrator", "com.zhangpengfei.spark_demo.kyro.KyroClass")
        val context = new SparkContext()
      }
    }

    36、说说对Master的理解

    Master 是 local-cluster 部署模式和 Standalone 部署模式中,整个 Spark 集群最为重要的组件之一,分担了对整个集群资源的管理和分配的工作。

    local-cluser 下,Master 作为 JVM 进程的对象启动,而在 Standalone 模式下,就是单独的进程启动。 

    37、说说什么是窗口间隔和滑动间隔

    也叫 WriteAheadLogs,通常被用于数据库和文件系统中,保证数据操作的持久性。预写日志通常是先将操作写入到一个持久可靠的日志文件中,然后才对数据施加该操作,当加入施加该操作中出现异常,可以通过读取日志文件并重新施加该操作,从而恢复系统。

    当 WAL 开启后,所有收到的数据同时保存到了容错文件系统的日志文件中,当 Spark Streaming 失败,这些接受到的数据也不会丢失。另外,接收数据的正确性只在数据被预写到日志以后接收器才会确认。已经缓存但还没有保存的数据可以在 Driver 重新启动之后由数据源再发送一次(经常问)。

    这两个机制保证了数据的零丢失,即所有的数据要么从日志中恢复,要么由数据源重发。

    38、Spark Streaming小文件问题

    使用 Spark Streaming 时,如果实时计算结果要写入到 HDFS,那么不可避免的会遇到一个问题,那就是在默认情况下会产生非常多的小文件,这是由 Spark Streaming 的微批处理模式和 DStream(RDD) 的分布式(partition)特性导致的,Spark Streaming 为每个 Partition 启动一个独立的线程(一个 task/partition 一个线程)来处理数据,一旦文件输出到 HDFS,那么这个文件流就关闭了,再来一个 batch 的 parttition 任务,就再使用一个新的文件流,那么假设,一个 batch 为10s,每个输出的 DStream 有32个 partition,那么一个小时产生的文件数将会达到(3600/10)*32=11520个之多。众多小文件带来的结果是有大量的文件元信息,比如文件的 location、文件大小、block number 等需要 NameNode 来维护,NameNode 会因此鸭梨山大。不管是什么格式的文件,parquet、text、JSON 或者 Avro,都会遇到这种小文件问题,这里讨论几种处理 Spark Streaming 小文件的典型方法。

    1. 增加 batch 大小:这种方法很容易理解,batch 越大,从外部接收的 event 就越多,内存积累的数据也就越多,那么输出的文件数也就会变少,比如上边的时间从10s增加为100s,那么一个小时的文件数量就会减少到1152个。但别高兴太早,实时业务能等那么久吗,本来人家10s看到结果更新一次,现在要等快两分钟,是人都会骂娘。所以这种方法适用的场景是消息实时到达,但不想挤压在一起处理,因为挤压在一起处理的话,批处理任务在干等,这时就可以采用这种方法。

    2. Coalesce大法好:文章开头讲了,小文件的基数是 batch_number * partition_number,而第一种方法是减少 batch_number,那么这种方法就是减少 partition_number 了,这个 api 不细说,就是减少初始的分区个数。看过 spark 源码的童鞋都知道,对于窄依赖,一个子 RDD 的 partition 规则继承父 RDD,对于宽依赖(就是那些个xxxByKey操作),如果没有特殊指定分区个数,也继承自父 rdd。那么初始的 SourceDstream 是几个 partiion,最终的输出就是几个 partition。所以 Coalesce 大法的好处就是,可以在最终要输出的时候,来减少一把 partition 个数。但是这个方法的缺点也很明显,本来是32个线程在写256M数据,现在可能变成了4个线程在写256M数据,而没有写完成这256M数据,这个 batch 是不算结束的。那么一个 batch 的处理时延必定增长,batch 挤压会逐渐增大。

    3. Spark Streaming 外部来处理:我们既然把数据输出到 hdfs,那么说明肯定是要用 Hive 或者 Spark Sql 这样的“sql on hadoop”系统类进一步进行数据分析,而这些表一般都是按照半小时或者一小时、一天,这样来分区的(注意不要和 Spark Streaming 的分区混淆,这里的分区,是用来做分区裁剪优化的),那么我们可以考虑在 Spark Streaming 外再启动定时的批处理任务来合并 Spark Streaming 产生的小文件。这种方法不是很直接,但是却比较有用,“性价比”较高,唯一要注意的是,批处理的合并任务在时间切割上要把握好,搞不好就可能会去合并一个还在写入的 Spark Streaming 小文件。

    4. 自己调用 foreach 去 append:Spark Streaming 提供的 foreach 这个 outout 类 api (一种 Action 操作),可以让我们自定义输出计算结果的方法。那么我们其实也可以利用这个特性,那就是每个 batch 在要写文件时,并不是去生成一个新的文件流,而是把之前的文件打开。考虑这种方法的可行性,首先,HDFS 上的文件不支持修改,但是很多都支持追加,那么每个 batch 的每个 partition 就对应一个输出文件,每次都去追加这个 partition 对应的输出文件,这样也可以实现减少文件数量的目的。这种方法要注意的就是不能无限制的追加,当判断一个文件已经达到某一个阈值时,就要产生一个新的文件进行追加了。所以大概就是一直32个文件。

    39、Spark的UDF?

    因为目前 Spark SQL 本身支持的函数有限,一些常用的函数都没有,比如 len, concat...etc 但是使用 UDF 来自己实现根据业务需要的功能是非常方便的。Spark SQL UDF 其实是一个 Scala 函数,被 catalyst 封装成一个 Expression 结点,最后通过 eval 方法计根据当前 Row 计算 UDF 的结果。UDF 对表中的单行进行转换,以便为每行生成单个对应的输出值。例如,大多数 SQL 环境提供 UPPER 函数返回作为输入提供的字符串的大写版本。

    用户自定义函数可以在 Spark SQL 中定义和注册为 UDF,并且可以关联别名,这个别名可以在后面的 SQL 查询中使用。作为一个简单的示例,我们将定义一个 UDF 来将以下 JSON 数据中的温度从摄氏度(degrees Celsius)转换为华氏度(degrees Fahrenheit)。

    {"city":"St. John's","avgHigh":8.7,"avgLow":0.6}
    {"city":"Charlottetown","avgHigh":9.7,"avgLow":0.9}
    {"city":"Halifax","avgHigh":11.0,"avgLow":1.6}
    {"city":"Fredericton","avgHigh":11.2,"avgLow":-0.5}
    {"city":"Quebec","avgHigh":9.0,"avgLow":-1.0}
    {"city":"Montreal","avgHigh":11.1,"avgLow":1.4}
    ...

    以下示例代码使用 SQL 别名为 CTOF 来注册我们的转换 UDF,然后在 SQL 查询使用它来转换每个城市的温度。为简洁起见,省略了 SQLContext 对象和其他代码的创建,每段代码下面都提供了完整的代码链接。

    # Python
    df = sqlContext.read.json("temperatures.json")
    
    df.registerTempTable("citytemps")
    # Register the UDF with our SQLContext
    
    sqlContext.registerFunction("CTOF", lambda degreesCelsius: ((degreesCelsius * 9.0 / 5.0) + 32.0))
    sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()
    # Scala
    val df = sqlContext.read.json("temperatures.json")
    
    df.registerTempTable("citytemps")
    // Register the UDF with our SQLContext
    
    sqlContext.udf.register("CTOF", (degreesCelcius: Double) => ((degreesCelcius * 9.0 / 5.0) + 32.0))
    sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show()
    # Java
    DataFrame df = sqlContext.read().json("temperatures.json");
    df.registerTempTable("citytemps");
    
    // Register the UDF with our SQLContext
    sqlContext.udf().register("CTOF", new UDF1<Double, Double>() {
      @Override
      public Double call(Double degreesCelcius) {
        return ((degreesCelcius * 9.0 / 5.0) + 32.0);
      }
    }, DataTypes.DoubleType);
    sqlContext.sql("SELECT city, CTOF(avgLow) AS avgLowF, CTOF(avgHigh) AS avgHighF FROM citytemps").show();

    40、Mesos下粗粒度和细粒度对比?

    1. 粗粒度运行模式:Spark 应用程序在注册到 Mesos 时会分配对应系统资源,在执行过程中由 SparkContext 和 Executor 直接交互,该模式优点是由于资源长期持有减少了资源调度的时间开销,缺点是该模式下 Mesos 无法感知资源使用的变化,容易造成系统资源的闲置,无法被 Mesos 其他框架使用,造成资源浪费。
    2. 细粒度的运行模式:Spark 应用程序是以单个任务的粒度发送到 Mesos 中执行,在执行过程中 SparkContext 并不能和 Executor 直接交互,而是由 Mesos Master 进行统一的调度管理,这样能够根据整个 Mesos 集群资源使用的情况动态调整。该模式的优点是系统资源能够得到充分利用,缺点是该模式中每个人物都需要从 Mesos 获取资源,调度延迟较大,对于 Mesos Master 开销较大。

    41、Spark Local和Standalone有什么区别

    Spark一共有5种运行模式:Local,Standalone,Yarn-Cluster,Yarn-Client 和 Mesos。

    1. Local:Local 模式即单机模式,如果在命令语句中不加任何配置,则默认是 Local 模式,在本地运行。这也是部署、设置最简单的一种模式,所有的 Spark 进程都运行在一台机器或一个虚拟机上面。

    2. Standalone:Standalone 是 Spark 自身实现的资源调度框架。如果我们只使用 Spark 进行大数据计算,不使用其他的计算框架(如MapReduce或者Storm)时,就采用 Standalone 模式就够了,尤其是单用户的情况下。Standalone 模式是 Spark 实现的资源调度框架,其主要的节点有 Client 节点、Master 节点和 Worker 节点。其中 Driver 既可以运行在 Master 节点上中,也可以运行在本地 Client 端。当用 spark-shell 交互式工具提交 Spark 的 Job 时,Driver 在 Master 节点上运行;当使用 spark-submit 工具提交 Job 或者在 Eclipse、IDEA 等开发平台上使用 new SparkConf.setManager(“spark://master:7077”) 方式运行 Spark 任务时,Driver 是运行在本地 Client 端上的。

    Standalone 模式的部署比较繁琐,需要把 Spark 的部署包安装到每一台节点机器上,并且部署的目录也必须相同,而且需要 Master 节点和其他节点实现 SSH 无密码登录。启动时,需要先启动 Spark 的 Master 和 Slave 节点。提交命令类似于:

    ./bin/spark-submit \
      --class org.apache.spark.examples.SparkPi \
      --master spark://Oscar-2.local:7077 \
      /tmp/spark-2.2.0-bin-hadoop2.7/examples/jars/spark-examples_2.11-2.2.0.jar \
      100

    其中 master:7077是 Spark 的 Master 节点的主机名和端口号。当然集群是需要提前启动。

    42、说说SparkContext和SparkSession有什么区别?

    1. Application: 用户编写的 Spark 应用程序,Driver 即运行上述 Application 的 main() 函数并且创建 SparkContext。Application 也叫应用。

    2. SparkContext: 整个应用的上下文,控制应用的生命周期。

    3. RDD: 不可变的数据集合,可由 SparkContext 创建,是 Spark 的基本计算单元。

    4. SparkSession: 可以由上节图中看出,Application、SparkSession、SparkContext、RDD之间具有包含关系,并且前三者是1对1的关系。SparkSession 是 Spark 2.0 版本引入的新入口,在这之前,创建一个 Application 对应的上下文是这样的:

    //set up the spark configuration and create contexts
    val sparkConf = new SparkConf().setAppName("SparkSessionZipsExample").setMaster("local")
    // your handle to SparkContext to access other context like SQLContext
    val sc = new SparkContext(sparkConf).set("spark.some.config.option", "some-value")
    val sqlContext = new org.apache.spark.sql.SQLContext(sc)

    现在 SparkConf、SparkContext 和 SQLContext 都已经被封装在 SparkSession 当中,并且可以通过 builder 的方式创建:

    // Create a SparkSession. No need to create SparkContext
    // You automatically get it as part of the SparkSession
    val warehouseLocation = "file:${system:user.dir}/spark-warehouse"
    val spark = SparkSession
       .builder()
       .appName("SparkSessionZipsExample")
       .config("spark.sql.warehouse.dir", warehouseLocation)
       .enableHiveSupport()
       .getOrCreate()

    通过 SparkSession 创建并操作 Dataset 和 DataFrame,代码中的 spark 对象就是 SparkSession:

    //create a Dataset using spark.range starting from 5 to 100, with increments of 5
    val numDS = spark.range(5, 100, 5)
    // reverse the order and display first 5 items
    numDS.orderBy(desc("id")).show(5)
    //compute descriptive stats and display them
    numDs.describe().show()
    // create a DataFrame using spark.createDataFrame from a List or Seq
    val langPercentDF = spark.createDataFrame(List(("Scala", 35), ("Python", 30), ("R", 15), ("Java", 20)))
    //rename the columns
    val lpDF = langPercentDF.withColumnRenamed("_1", "language").withColumnRenamed("_2", "percent")
    //order the DataFrame in descending order of percentage
    lpDF.orderBy(desc("percent")).show(false)

    43、如果Spark Streaming停掉了,如何保证Kafka的重新运作是合理的呢

    首先要说一下 Spark 的快速故障恢复机制,在节点出现故障的情况下,传统流处理系统会在其他节点上重启失败的连续算子,并可能冲洗能运行先前数据流处理操作获取部分丢失数据。在此过程中只有该节点重新处理失败的过程。只有在新节点完成故障前所有计算后,整个系统才能够处理其他任务。在 Spark 中,计算将会分成许多小的任务,保证能在任何节点运行后能够正确合并,因此,就算某个节点出现故障,这个节点的任务将均匀地分散到集群中的节点进行计算,相对于传递故障恢复机制能够更快地恢复。

    44、列举Spark中 Transformation 和 Action算子

    Transformantion:Map, Filter, FlatMap, Sample, GroupByKey, ReduceByKey, Union, Join, Cogroup, MapValues, Sort, PartionBy

    Action:Collect, Reduce, Lookup, Save (主要记住,结果不是 RDD 的就是 Action)

    45、Spark经常说的Repartition是个什么玩意

    简单的说:返回一个恰好有numPartitions个分区的RDD,可以增加或者减少此RDD的并行度。内部,这将使用shuffle重新分布数据,如果你减少分区数,考虑使用coalesce,这样可以避免执行shuffle。目的:

    • 避免小文件

    • 减少 Task 个数

    • 但是会增加每个 Task 处理的数据量

    46、Spark Streaming Duration的概念

    Spark Streaming 是微批处理。

     SparkConf sparkConf = new SparkConf().setAppName("SparkStreaming").setMaster("local[*]"); 
     JavaStreamingContext javaStreamingContext = new JavaStreamingContext(sparkConf, Durations.seconds(1000));

    Durations.seconds(1000)设置的是 sparkstreaming 批处理的时间间隔,每隔 Batch Duration 时间去提交一次 job,如果 job 的处理时间超过 Batch Duration,会使得 job 无法按时提交,随着时间推移,越来越多的作业被拖延,最后导致整个 Streaming 作业被阻塞,无法做到实时处理数据。

    47、简单写一个WordCount程序

    sc.textFile("/Users/runzhliu/workspace/spark-2.2.1-bin-hadoop2.7/README.md")
    .flatMap(_.split(" "))
    .map(x => (x, 1))
    .reduceByKey(_ + _)
    .map(x => (x._2, x._1))
    .sortByKey(false)
    .map(x => (x._2, x._1))
    .take(10)

    48、说说Yarn-cluster的运行阶段

    在 Yarn-cluset 模式下,当用户向 Yarn 提交一个应用程序后,Yarn 将两个阶段运行该应用程序:

    1. 第一阶段是把 Spark 的 Driver 作为一个 Application Master 在 Yarn 集群中先启动。

    2. 第二阶段是由 Application Master 创建应用程序,然后为它向 Resource Manager 申请资源,并启动 Executor 来运行任务集,同时监控它的整个过程,直到运行介绍结束。

    49、Mesos粗细度对比

    Mesos 粗粒度运行模式中,Spark 程序在注册到 Mesos 的时候会分配对应系统资源,在执行过程中由 SparkContext 和 Executor 直接进行交互。该模式优点是由于资源长期持有,减少了资源调度的时间开销,缺点是该模式之下,Mesos 无法感知资源使用的变化,容易造成资源的闲置,无法被 Mesos 其他框架所使用,从而造成资源浪费。

    而在细粒度运行模式下,Spark 应用程序是以单个任务的粒度发送到 Mesos 中执行,在执行过程中 SparkContext 并不能与 Executor 直接进行交互,而是由 Mesos Master 进行统一的调度管理,这样能够根据整个 Mesos 集群资源使用的情况动态调整。该模式的优点是系统资源能够得到充分利用,缺点是该模式中每个任务都需要从 Mesos 获取资源,调度延迟比较大,对于 Mesos 开销比较大。

    50、说说Standalone模式下运行Spark程序的大概流程

    Standalone 模式分别由客户端、Master 节点和 Worker 节点组成。在 Spark Shell 提交计算代码的时候,所在机器作为客户端启动应用程序,然后向 Master 注册应用程序,由 Master 通知 Worker 节点启动 Executor,Executor 启动之后向客户端的 Driver 注册,最后由 Driver 发送执行任务给 Executor 并监控任务执行情况。该程序代码中,在触发计算行数动作之前,需要设置缓存代码,这样在执行计算行数行为的时候进行缓存数据,缓存后再运行计算行数。

    51、如何区分 Appliction(应用程序)还有 Driver(驱动程序)

    Application 是指用户编写的 Spark 应用程序,包含驱动程序 Driver 和分布在集群中多个节点上运行的 Executor 代码,在执行过程之中由一个或多个做作业组成。

    Driver 是 Spark 中的 Driver 即运行上述 Application 的 main 函数并且创建 SparkContext,其中创建 SparkContext 的目的是为了准备 Spark 应用程序的运行环境。在 Spark 中由 sc 负责与 ClusterManager 通信,进行资源的申请,任务的分配和监控等。当 Executor 部分运行完毕后,Driver 负责把 sc 关闭,通常 Driver 会拿 SparkContext 来代表。

    52、介绍一下 Spark 通信的启动方式

    Spark 启动过程主要是 Master 与 Worker 之间的通信,首先由 Worker 节点向 Master 发送注册消息,然后 Master 处理完毕后,返回注册成功消息或失败消息,如果成功注册,那么 Worker 就会定时发送心跳消息给 Master。

    53、介绍一下 Spark 运行时候的消息通信

    用户提交应用程序时,应用程序的 SparkContext 会向 Master 发送应用注册消息,并由 Master 给该应用分配 Executor,Excecutor 启动之后,Executor 会向 SparkContext 发送注册成功消息。当 SparkContext 的 RDD 触发行动操作之后,将创建 RDD 的 DAG。通过 DAGScheduler 进行划分 Stage 并把 Stage 转化为 TaskSet,接着 TaskScheduler 向注册的 Executor 发送执行消息,Executor 接收到任务消息后启动并运行。最后当所有任务运行时候,由 Driver 处理结果并回收资源。

    54、解释一下Stage

    每个作业会因为 RDD 之间的依赖关系拆分成多组任务集合,称为调度阶段,也叫做任务集。调度阶段的划分由 DAGScheduler 划分,调度阶段有 Shuffle Map Stage 和 Result Stage 两种。

    55、描述一下Worker异常的情况

    Spark 独立运行模式 Standalone 采用的是 Master/Slave 的结构,其中 Slave 是由 Worker 来担任的,在运行的时候会发送心跳给 Master,让 Master 知道 Worker 的实时状态,另一方面,Master 也会检测注册的 Worker 是否超时,因为在集群运行的过程中,可能由于机器宕机或者进程被杀死等原因造成 Worker 异常退出。

    56、描述一下Master异常的情况

    Master 出现异常的时候,会有几种情况,而在独立运行模式 Standalone 中,Spark 支持几种策略,来让 Standby Master 来接管集群。主要配置的地方在于 spark-env.sh 文件中。配置项是 spark.deploy.recoveryMode 进行设置,默认是 None。

    1. ZOOKEEPER:集群元数据持久化到 Zookeeper 中,当 Master 出现异常,ZK 通过选举机制选举新的 Master,新的 Master 接管的时候只要从 ZK 获取持久化信息并根据这些信息恢复集群状态。StandBy 的 Master 随时候命的。

    2. FILESYSTEM:集群元数据持久化到本地文件系统中,当 Master 出现异常的时候,只要在该机器上重新启动 Master,启动后新的 Master 获取持久化信息并根据这些信息恢复集群的状态。

    3. CUSTOM:自定义恢复方式,对 StandaloneRecoveryModeFactory 抽象类进行实现并把该类配置到系统中,当 Master 出现异常的时候,会根据用户自定义的方式进行恢复集群状态。

    4. NONE:不持久化集群的元数据,当出现异常的是,新启动 Master 不进行信息恢复集群状态,而是直接接管集群。

    57、Spark的存储体系

    简单来讲,Spark存储体系是各个Driver与Executor实例中的BlockManager所组成的;但是从一个整体来看,把各个节点的BlockManager看成存储体系的一部分,那存储体系就有了更多衍生的内容,比如块传输服务、map任务输出跟踪器、Shuffle管理器等。


    58、简述Spark Streaming

    具有高吞吐量和容错能力强的特点,输入源有很多,如 Kafka, Flume, Twitter 等待。

    关于流式计算的做法,如果按照传统工具的做法把数据存储到数据库中再进行计算,这样是无法做到实时的,而完全把数据放到内存中计算,万一宕机、断电了,数据也就丢失了。

    因此 Spark 流式计算引入了检查点 CheckPoint 和日志,以便能够从中恢复计算结果。而本质上 Spark Streaming 是接收实时输入数据流并把他们按批次划分,然后交给 Spark 计算引擎处理生成按照批次划分的结果流。

    59、知道 Hadoop MRv1 的局限吗

    1. 可扩展性查,在运行的时候,JobTracker 既负责资源管理,又负责任务调度,当集群繁忙的时候,JobTracker 很容易成为瓶颈,最终导致可扩展性的问题。

    2. 可用性差,采用单节点的 Master 没有备用 Master 以及选举操作,这导致一旦 Master 出现故障,整个集群将不可用。

    3. 资源利用率低,TaskTracker 使用 slot 等量划分本节点上的资源量,slot 代表计算资源将各个 TaskTracker 上的空闲 slot 分配给 Task 使用,一些 Task 并不能充分利用 slot,而其他 Task 无法使用这些空闲的资源。有时会因为作业刚刚启动等原因导致 MapTask 很多,而 Reduce Task 任务还没调度的情况,这时 Reduce slot 也会被闲置。

    4. 不能支持多种 MapReduce 框架,无法通过可插拔方式将自身的 MapReduce 框架替换为其他实现,例如 Spark,Storm。

    60、说说Spark的特点,相对于MR来说

    1. 减少磁盘 I/O,MR 会把 map 端将中间输出和结果存储在磁盘中,reduce 端又需要从磁盘读写中间结果,势必造成磁盘 I/O 称为瓶颈。Spark 允许将 map 端的中间结果输出和结果存储在内存中,reduce 端在拉取中间结果的时候避免了大量的磁盘 I/O。

    2. 增加并行度,由于把中间结果写到磁盘与从磁盘读取中间结果属于不同的缓解,Hadoop 将他们简单地通过串行执行衔接起来,Spark 则把不同的环节抽象成为 Stage,允许多个 Stage 既可以串行又可以并行执行。

    3. 避免重新计算,当 Stage 中某个分区的 Task 执行失败后,会重新对此 Stage 调度,但在重新调度的时候会过滤已经执行成功的分区任务,所以不会造成重复计算和资源浪费。

    4. 可选的 Shuffle 排序,MR 在 Shuffle 之前有着固定的排序操作,而 Spark 则可以根据不同场景选择在 map 端排序还是 reduce 排序。

    5. 灵活的内存管理策略,Spark 将内存分为堆上的存储内存、堆外的存储内存,堆上的执行内存,堆外的执行内存4个部分。

    61、说说Spark Narrow Dependency的分类

    • OneToOneDependency

    • RangeDependency

    62、Task和Stage的分类

    Task 指具体的执行任务,一个 Job 在每个 Stage 内都会按照 RDD 的 Partition 数量,创建多个 Task,Task 分为 ShuffleMapTask 和 ResultTask 两种。ShuffleMapStage 中的 Task 为 ShuffleMapTask,而 ResultStage 中的 Task 为 ResultTask。ShuffleMapTask 和 ResultTask 类似于 Hadoop 中的 Map 任务和 Reduce 任务。

    63、Spark的编程模型

    1.创建应用程序 SparkContext

    2.创建RDD,有两种方式,方式一:输入算子,即读取外部存储创建RDD,Spark与Hadoop完全兼容,所以对Hadoop所支持的文件类型或者数据库类型,Spark同样支持。方式二:从集合创建RDD

    3.Transformation 算子,这种变换并不触发提交作业,完成作业中间过程处理。也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。

    4.Action 算子,这类算子会触发 SparkContext 提交 Job 作业。并将数据输出 Spark系统。

    5.保存结果

    6.关闭应用程序

    64、Spark的计算模型

    用户程序对 RDD 通过多个函数进行操作,将 RDD 进行转换。

    Block-Manager 管理 RDD 的物理分区,每个 Block 就是节点上对应的一个数据块,可以存储在内存或者磁盘。

    而 RDD 中的 partition 是一个逻辑数据块,对应相应的物理块 Block。

    本质上一个 RDD 在代码中相当于是数据的一个元数据结构,存储着数据分区及其逻辑结构映射关系,存储着 RDD 之前的依赖转换关系

    65、总述Spark的架构

    从集群部署的角度来看,Spark 集群由集群管理器 Cluster Manager、工作节点 Worker、执行器 Executor、驱动器 Driver、应用程序 Application 等部分组成。

    Cluster Manager:主要负责对集群资源的分配和管理,Cluster Manager 在 YARN 部署模式下为 RM,在 Mesos 下为 Mesos Master,Standalone 模式下为 Master。CM 分配的资源属于一级分配,它将各个 Worker 上的内存、CPU 等资源分配给 Application,但是不负责对 Executor 的资源分类。Standalone 模式下的 Master 会直接给 Application 分配内存、CPU 及 Executor 等资源。

    Worker:Spark 的工作节点。在 YARN 部署模式下实际由 NodeManager 替代。Worker 节点主要负责,把自己的内存、CPU 等资源通过注册机制告知 CM,创建 Executor,把资源和任务进一步分配给 Executor,同步资源信息,Executor 状态信息给 CM 等等。Standalone 部署模式下,Master 将 Worker 上的内存、CPU 以及 Executor 等资源分配给 Application 后,将命令 Worker 启动 CoarseGrainedExecutorBackend 进程(此进程会创建 Executor 实例)。

    Executor:执行计算任务的一线组件,主要负责任务的执行及与 Worker Driver 信息同步。

    Driver:Application 的驱动程序,Application 通过 Driver 与 CM、Executor 进行通信。Driver 可以运行在 Application 中,也可以由 Application 提交给 CM 并由 CM 安排 Worker 运行。

    Application:用户使用 Spark 提供的 API 编写的应用程序,Application 通过 Spark API 将进行 RDD 的转换和 DAG 的创建,并通过 Driver 将 Application 注册到 CM,CM 将会根据 Application 的资源需求,通过一级资源分配将 Excutor、内存、CPU 等资源分配给 Application。Drvier 通过二级资源分配将 Executor 等资源分配给每一个任务,Application 最后通过 Driver 告诉 Executor 运行任务。

    66、一句话说说 Spark Streaming 是如何收集和处理数据的

    在 Spark Streaming 中,数据采集是逐条进行的,而数据处理是按批 mini batch进行的,因此 Spark Streaming 会先设置好批处理间隔 batch duration,当超过批处理间隔就会把采集到的数据汇总起来成为一批数据交给系统去处理。

    67、解释一下窗口间隔window duration和滑动间隔slide duration

    Spark Streaming

    1. 红色的矩形就是一个窗口,窗口 hold 的是一段时间内的数据流。

    2. 这里面每一个 time 都是时间单元,在官方的例子中,每隔 window size 是3 time unit, 而且每隔2个单位时间,窗口会 slide 一次。

    所以基于窗口的操作,需要指定2个参数:

    • window length - The duration of the window (3 in the figure)
    • slide interval - The interval at which the window-based operation is performed (2 in the figure). 
    1. 窗口大小,个人感觉是一段时间内数据的容器。

    2. 滑动间隔,就是我们可以理解的 cron 表达式吧。

    窗口间隔一般大于(批处理间隔、滑动间隔)。这都是理解窗口操作的关键。

    68、介绍一下Spark Streaming的foreachRDD(func)方法

    将函数应用于 DStream 的 RDD 上,这个操作会输出数据到外部系统,比如保存 RDD 到文件或者网络数据库等。需要注意的是 func 函数是运行该 Streaming 应用的 Driver 进程里执行的。

    69、简单描述一下Spark Streaming的容错原理

    Spark Streaming 的一个特点就是高容错。

    首先 Spark RDD 就有容错机制,每一个 RDD 都是不可变的分布式可重算的数据集,其记录这确定性的操作血统,所以只要输入数据是可容错的,那么任意一个 RDD 的分区出错或不可用,都是可以利用原始输入数据通过转换操作而重新计算出来的。

    预写日志通常被用于数据库和文件系统中,保证数据操作的持久性。预写日志通常是先将操作写入到一个持久可靠的日志文件中,然后才对数据施加该操作,当加入施加操作中出现了异常,可以通过读取日志文件并重新施加该操作。

    另外接收数据的正确性只在数据被预写到日志以后接收器才会确认,已经缓存但还没保存的数据可以在 Driver 重新启动之后由数据源再发送一次,这两个机制确保了零数据丢失,所有数据或者从日志中恢复,或者由数据源重发。

    70、DStream 有几种转换操作

    Transform Operation、Window Operations、Join Operations

    71、聊聊Spark Streaming的运行架构

    https://mmbiz.qpic.cn/mmbiz_png/UdK9ByfMT2OSwS8tHQeMicc0egREicTZ5RxdvYTIReQqncsxUlDW5bMcW7hRKibAASOtAf7CzibG3kEplufYzTPdsQ/640?wx_fmt=png&tp=webp&wxfrom=5&wx_lazy=1&wx_co=1
    72、说说DStreamGraph

    Spark Streaming 中作业生成与 Spark 核心类似,对 DStream 进行的各种操作让它们之间的操作会被记录到名为 DStream 使用输出操作时,这些依赖关系以及它们之间的操作会被记录到明伟 DStreamGraph 的对象中表示一个作业。这些作业注册到 DStreamGraph 并不会立即运行,而是等到 Spark Streaming 启动之后,达到批处理时间,才根据 DG 生成作业处理该批处理时间内接收的数据。

    73、创建RDD的方式以及如何继承创建RDD

    Spark 可以从 Hadoop 支持的任何存储源创建分布式数据集,包括本地文件系统、HDFS、Cassandra、HBase、Amazon S3等。Spark 支持文本文件、SequenceFile 和任何其他 Hadoop InputFormat。可以使用SparkContext的textFile方法创建文本文件RDDs。

    val distFile = sc.textFile("data.txt")

    74、分析一下Spark Streaming的transform()和updateStateByKey()两个操作

    • transform(func) 操作:允许 DStream 任意的 RDD-to-RDD 函数。

    • updateStateByKey 操作:可以保持任意状态,同时进行信息更新,先定义状态,后定义状态更新函数。

    75、说说Spark Streaming的输出操作

    print()、saveAsTextFiles(prefix, [suffix])、saveAsObjectFiles(prefix, [suffix])、saveAsHadoopFiles(prefix, [suffix])、foreachRDD(func)

    76、谈谈Spark Streaming Driver端重启会发生什么

    1. 恢复计算:使用检查点信息重启 Driver 端,重构上下文并重启接收器

    2. 恢复元数据块:为了保证能够继续下去所必备的全部元数据块都被恢复

    3. 未完成作业的重新形成:由于失败而没有处理完成的批处理,将使用恢复的元数据再次产生 RDD 和对应的作业

    4. 读取保存在日志中的块数据:在这些作业执行的时候,块数据直接从预写日志中读出,这将恢复在日志中可靠地保存所有必要的数据

    5. 重发尚未确认的数据:失败时没有保存到日志中的缓存数据将由数据源再次发送

    77、再谈Spark Streaming的容错性

    实时流处理系统需要长时间接收并处理数据,这个过程中出现异常是难以避免的,需要流程系统具备高容错性。Spark Streaming 一开始就考虑了两个方面。

    1. 利用 Spark 自身的容错设计、存储级别和 RDD 抽象设计能够处理集群中任何 Worker 节点的故障

    2. Spark 运行多种运行模式,其 Driver 端可能运行在 Master 节点或者集群中的任意节点,这样让 Driver 端具备容错能力是很大的挑战,但是由于其接收的数据是按照批进行存储和处理,这些批次数据的元数据可以通过执行检查点的方式定期写入到可靠的存储中,在 Driver 端重新启动中恢复这些状态

    当接收到的数据缓存在 Executor 内存中的丢失风险要怎么处理呢?

    如果是独立运行模式/Yarn/Mesos 模式,当 Driver 端失败的时候,该 Driver 端所管理的 Executor 以及内存中数据将终止,即时 Driver 端重新启动这些缓存的数据也不能被恢复。为了避免这种数据损失,就需要预写日志功能了。

    当 Spark Streaming 应用开始的时候,也就是 Driver 开始的时候,接收器成为长驻运行任务,这些接收器接收并保存流数据到 Spark 内存以供处理。

    1. 接收器将数据分成一系列小块,存储到 Executor 内存或磁盘中,如果启动预写日志,数据同时还写入到容错文件系统的预写日志文件。

    2. 通知 StreamingContext,接收块中的元数据被发送到 Driver 的 StreamingContext,这个元数据包括两种,一是定位其 Executor 内存或磁盘中数据位置的块编号,二是块数据在日志中的偏移信息(如果启用 WAL 的话)。

    78、流数据如何存储

    作为流数据接收器调用 Receiver.store 方式进行数据存储,该方法有多个重载方法,如果数据量很小,则攒多条数据成数据块再进行块存储,如果数据量大,则直接进行块存储。


    79、StreamingContext启动时序图吗

    1. 初始化 StreamingContext 中的 DStreamGraph 和 JobScheduler,进而启动 JobScheduler 的 ReceiveTracker 和 JobGenerator。

    2. 初始化阶段会进行成员变量的初始化,重要的包括 DStreamGraph(包含 DStream 之间相互依赖的有向无环图),JobScheduler(定时查看 DStreamGraph,然后根据流入的数据生成运行作业),StreamingTab(在 Spark Streaming 运行的时候对流数据处理的监控)。

    3. 然后就是创建 InputDStream,接着就是对 InputDStream 进行 flatMap, map, reduceByKey, print 等操作,类似于 RDD 的转换操作。

    4. 启动 JobScheduler,实例化并启动 ReceiveTracker 和 JobGenerator。

    5. 启动 JobGenerator

    6. 启动 ReceiverTracker

    80、说说RDD和DataFrame和DataSet的关系

    共性:

    1、RDD、DataFrame、Dataset全都是spark平台下的分布式弹性数据集,为处理超大型数据提供便利

    2、三者都有惰性机制,在进行创建、转换,如map方法时,不会立即执行,只有在遇到Action如foreach时,三者才会开始遍历运算,极端情况下,如果代码里面有创建、转换,但是后面没有在Action中使用对应的结果,在执行时会被直接跳过,如

    1

    2

    3

    4

    5

    6

    7

    8

    val sparkconf = new SparkConf().setMaster("local").setAppName("test").set("spark.port.maxRetries","1000")

    val spark = SparkSession.builder().config(sparkconf).getOrCreate()

    val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))

     

    rdd.map{line=>

      println("运行")

      line._1

    }

    map中的println("运行")并不会运行

    3、三者都会根据spark的内存情况自动缓存运算,这样即使数据量很大,也不用担心会内存溢出

    4、三者都有partition的概念,如

    1

    2

    3

    4

    5

    6

    7

    8

    var predata=data.repartition(24).mapPartitions{

          PartLine => {

            PartLine.map{

              line =>

                 println(“转换操作”)

                                }

                             }

    这样对每一个分区进行操作时,就跟在操作数组一样,不但数据量比较小,而且可以方便的将map中的运算结果拿出来,如果直接用map,map中对外面的操作是无效的,如

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    16

    17

    18

    19

    20

    val rdd=spark.sparkContext.parallelize(Seq(("a", 1), ("b", 1), ("a", 1)))

        var flag=0

        val test=rdd.map{line=>

          println("运行")

          flag+=1

          println(flag)

          line._1

        }

    println(test.count)

    println(flag)

        /**

        运行

        1

        运行

        2

        运行

        3

        3

        0

       * */

    不使用partition时,对map之外的操作无法对map之外的变量造成影响

    5、三者有许多共同的函数,如filter,sort等

    6、在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持

    1

    2

    import spark.implicits._

    //这里的spark是SparkSession的变量名

    7、DataFrame和Dataset均可使用模式匹配获取各个字段的值和类型

    DataFrame:

    1

    2

    3

    4

    5

    6

    7

    testDF.map{

          case Row(col1:String,col2:Int)=>

            println(col1);println(col2)

            col1

          case _=>

            ""

        }

    为了提高稳健性,最好后面有一个_通配操作,这里提供了DataFrame一个解析字段的方法

    Dataset:

    1

    2

    3

    4

    5

    6

    7

    8

    case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型

        testDS.map{

          case Coltest(col1:String,col2:Int)=>

            println(col1);println(col2)

            col1

          case _=>

            ""

        }

      

    区别:

    RDD:

    1、RDD一般和spark mlib同时使用

    2、RDD不支持sparksql操作

    DataFrame:

    1、与RDD和Dataset不同,DataFrame每一行的类型固定为Row,只有通过解析才能获取各个字段的值,如

    1

    2

    3

    4

    5

    testDF.foreach{

      line =>

        val col1=line.getAs[String]("col1")

        val col2=line.getAs[String]("col2")

    }

    每一列的值没法直接访问

    2、DataFrame与Dataset一般与spark ml同时使用

    3、DataFrame与Dataset均支持sparksql的操作,比如select,groupby之类,还能注册临时表/视窗,进行sql语句操作,如

    1

    2

    dataDF.createOrReplaceTempView("tmp")

    spark.sql("select  ROW,DATE from tmp where DATE is not null order by DATE").show(100,false)

    4、DataFrame与Dataset支持一些特别方便的保存方式,比如保存成csv,可以带上表头,这样每一列的字段名一目了然

    1

    2

    3

    4

    5

    6

    //保存

    val saveoptions = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test")

    datawDF.write.format("com.databricks.spark.csv").mode(SaveMode.Overwrite).options(saveoptions).save()

    //读取

    val options = Map("header" -> "true", "delimiter" -> "\t", "path" -> "hdfs://172.xx.xx.xx:9000/test")

    val datarDF= spark.read.options(options).format("com.databricks.spark.csv").load()

    利用这样的保存方式,可以方便的获得字段名和列的对应,而且分隔符(delimiter)可以自由指定

    Dataset:

    这里主要对比Dataset和DataFrame,因为Dataset和DataFrame拥有完全相同的成员函数,区别只是每一行的数据类型不同

    DataFrame也可以叫Dataset[Row],每一行的类型是Row,不解析,每一行究竟有哪些字段,各个字段又是什么类型都无从得知,只能用上面提到的getAS方法或者共性中的第七条提到的模式匹配拿出特定字段

    而Dataset中,每一行是什么类型是不一定的,在自定义了case class之后可以很自由的获得每一行的信息

    1

    2

    3

    4

    5

    6

    7

    8

    9

    10

    11

    12

    13

    14

    15

    case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型

    /**

          rdd

          ("a", 1)

          ("b", 1)

          ("a", 1)

          * */

    val test: Dataset[Coltest]=rdd.map{line=>

          Coltest(line._1,line._2)

        }.toDS

    test.map{

          line=>

            println(line.col1)

            println(line.col2)

        }

    可以看出,Dataset在需要访问列中的某个字段时是非常方便的,然而,如果要写一些适配性很强的函数时,如果使用Dataset,行的类型又不确定,可能是各种case class,无法实现适配,这时候用DataFrame即Dataset[Row]就能比较好的解决问题

    转化:

    RDD、DataFrame、Dataset三者有许多共性,有各自适用的场景常常需要在三者之间转换

    DataFrame/Dataset转RDD:

    这个转换很简单

    1

    2

    val rdd1=testDF.rdd

    val rdd2=testDS.rdd

    RDD转DataFrame:

    1

    2

    3

    4

    import spark.implicits._

    val testDF = rdd.map {line=>

          (line._1,line._2)

        }.toDF("col1","col2")

    一般用元组把一行的数据写在一起,然后在toDF中指定字段名

    RDD转Dataset:

    1

    2

    3

    4

    5

    import spark.implicits._

    case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型

    val testDS = rdd.map {line=>

          Coltest(line._1,line._2)

        }.toDS

    可以注意到,定义每一行的类型(case class)时,已经给出了字段名和类型,后面只要往case class里面添加值即可

    Dataset转DataFrame:

    这个也很简单,因为只是把case class封装成Row

    1

    2

    import spark.implicits._

    val testDF = testDS.toDF

    DataFrame转Dataset:

    1

    2

    3

    import spark.implicits._

    case class Coltest(col1:String,col2:Int)extends Serializable //定义字段名和类型

    val testDS = testDF.as[Coltest]

    这种方法就是在给出每一列的类型后,使用as方法,转成Dataset,这在数据类型是DataFrame又需要针对各个字段处理时极为方便

    特别注意:

    在使用一些特殊的操作时,一定要加上 import spark.implicits._ 不然toDF、toDS无法使用

     

     

     

     

     

     

     

     

     

     

    附录:

    独孤九剑-Spark面试80连击(上)

    独孤九剑-Spark面试80连击(下)

    展开全文
  • 1、什么是宽依赖,什么是窄依赖?哪些算子是宽依赖,哪些是窄依赖? 窄依赖就是一个父RDD分区对应一个子RDD分区,如map,filter 或者多个父RDD分区对应一个子RDD分区,如co-partioned join 宽依赖是一个父RDD分区...

    1、什么是宽依赖,什么是窄依赖?哪些算子是宽依赖,哪些是窄依赖?
    窄依赖就是一个父RDD分区对应一个子RDD分区,如map,filter
    或者多个父RDD分区对应一个子RDD分区,如co-partioned join

    宽依赖是一个父RDD分区对应非全部的子RDD分区,如groupByKey,ruduceByKey
    或者一个父RDD分区对应全部的子RDD分区,如未经协同划分的join
    https://www.jianshu.com/p/736a4e628f0f

    2、Transformation和action算子有什么区别?举例说明
    Transformation 变换/转换:这种变换并不触发提交作业,完成作业中间过程处理。Transformation 操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算

    map, filter

    Action 行动算子:这类算子会触发 SparkContext 提交 Job 作业。
    Action 算子会触发 Spark 提交作业(Job)。

    count

    3、讲解spark shuffle原理和特性?shuffle write 和 shuffle read过程做些什么?
    https://blog.csdn.net/zhanglh046/article/details/78360762

    4、Shuffle数据块有多少种不同的存储方式?分别是什么

    1. RDD数据块:用来存储所缓存的RDD数据。
    2. Shuffle数据块:用来存储持久化的Shuffle数据。
    3. 广播变量数据块:用来存储所存储的广播变量数据。
    4. 任务返回结果数据块:用来存储在存储管理模块内部的任务返回结果。通常情况下任务返回结果随任务一起通过Akka返回到Driver端。但是当任务返回结果很大时,会引起Akka帧溢出,这时的另一种方案是将返回结果以块的形式放入存储管理模块,然后在Driver端获取该数据块即可,因为存储管理模块内部数据块的传输是通过Socket连接的,因此就不会出现Akka帧溢出了。
    5. 流式数据块:只用在Spark Streaming中,用来存储所接收到的流式数据块

    5、哪些spark算子会有shuffle?

    1. 去重,distinct
    2. 排序,groupByKey,reduceByKey等
    3. 重分区,repartition,coalesce
    4. 集合或者表操作,interection,join

    https://kuncle.github.io/spark/2017/03/13/Spark的shuffle算子.html

    6、讲解spark schedule(任务调度)?
    在这里插入图片描述
    https://www.cnblogs.com/missmzt/p/6734078.html

    7、Spark stage是如何划分的?

    1. 从hdfs中读取文件后,创建 RDD 对象
    2. DAGScheduler模块介入运算,计算RDD之间的依赖关系。RDD之间的依赖关系就形成了DAG
    3. 每一个JOB被分为多个Stage,划分Stage的一个主要依据是当前计算因子的输入是否是确定的,如果是则将其分在同一个Stage,避免多个Stage之间的消息传递开销。

    因此spark划分stage的整体思路是:从后往前推,遇到宽依赖就断开,划分为一个stage;遇到窄依赖就将这个RDD加入该stage中。

    8、Spark cache一定能提升计算性能么?说明原因?
    不一定啊,cache是将数据缓存到内存里,当小数据量的时候是能提升效率,但数据大的时候内存放不下就会报溢出。

    9、Cache和persist有什么区别和联系?
    cache调用了persist方法,cache只有一个默认的缓存级别MEMORY_ONLY ,而persist可以根据情况设置其它的缓存级别。
    https://blog.csdn.net/houmou/article/details/52491419

    10、RDD是弹性数据集,“弹性”体现在哪里呢?你觉得RDD有哪些缺陷?

    1. 自动进行内存和磁盘切换
    2. 基于lineage的高效容错
    3. task如果失败会特定次数的重试
    4. stage如果失败会自动进行特定次数的重试,而且只会只计算失败的分片
    5. checkpoint【每次对RDD操作都会产生新的RDD,如果链条比较长,计算比较笨重,就把数据放在硬盘中】和persist 【内存或磁盘中对数据进行复用】(检查点、持久化)
    6. 数据调度弹性:DAG TASK 和资源管理无关
    7. 数据分片的高度弹性repartion

    缺陷:
    惰性计算的缺陷也是明显的:中间数据默认不会保存,每次动作操作都会对数据重复计算,某些计算量比较大的操作可能会影响到系统的运算效率

    11、RDD有多少种持久化方式?memory_only如果内存存储不了,会怎么操作?
    cache和persist
    memory_and_disk,放一部分到磁盘
    MEMORY_ONLY_SER:同MEMORY_ONLY,但是会使用Java序列化方式,将Java对象序列化后进行持久化。可以减少内存开销,但是需要进行反序列化,因此会加大CPU开销。
    MEMORY_AND_DSK_SER:同MEMORY_AND_DSK。但是使用序列化方式持久化Java对象。
    DISK_ONLY:使用非序列化Java对象的方式持久化,完全存储到磁盘上。
    MEMORY_ONLY_2或者MEMORY_AND_DISK_2等:如果是尾部加了2的持久化级别,表示会将持久化数据复用一份,保存到其他节点,从而在数据丢失时,不需要再次计算,只需要使用备份数据即可。

    12、RDD分区和数据块有啥联系?

    13、当GC时间占比很大可能的原因有哪些?对应的优化方法是?
    垃圾回收的开销和对象合数成正比,所以减少对象的个数,就能大大减少垃圾回收的开销。序列化存储数据,每个RDD就是一个对象。缓存RDD占用的内存可能跟工作所需的内存打架,需要控制好

    14、Spark中repartition和coalesce异同?coalesce什么时候效果更高,为什么

    repartition(numPartitions:Int):RDD[T]
    coalesce(numPartitions:Int, shuffle:Boolean=false):RDD[T]
    

    以上为他们的定义,区别就是repartition一定会触发shuffle,而coalesce默认是不触发shuffle的。

    他们两个都是RDD的分区进行重新划分,repartition只是coalesce接口中shuffle为true的简易实现,(假设RDD有N个分区,需要重新划分成M个分区)

    减少分区提高效率

    15、Groupbykey和reducebykey哪个性能更高,为什么?
    reduceByKey性能高,更适合大数据集
    https://www.jianshu.com/p/0c6705724cff

    16、你是如何理解caseclass的?
    https://blog.csdn.net/hellojoy/article/details/81034528

    17、Scala里trait有什么功能,与class有何异同?什么时候用trait什么时候该用class
    它可以被继承,而且支持多重继承,其实它更像我们熟悉的接口(interface),但它与接口又有不同之处是:
    trait中可以写方法的实现,interface不可以(java8开始支持接口中允许写方法实现代码了),这样看起来trait又很像抽象类

    18、Scala 语法中to 和 until有啥区别
    to 包含上界,until不包含上界

    19、讲解Scala伴生对象和伴生类
    单例对象与类同名时,这个单例对象被称为这个类的伴生对象,而这个类被称为这个单例对象的伴生类。伴生类和伴生对象要在同一个源文件中定义,伴生对象和伴生类可以互相访问其私有成员。不与伴生类同名的单例对象称为孤立对象。

    import scala.collection.mutable.Map
     
    class ChecksumAccumulator {
      private var sum = 0
      def add(b: Byte) {
        sum += b
      }
      def checksum(): Int = ~(sum & 0xFF) + 1
    }
     
    object ChecksumAccumulator {
      private val cache = Map[String, Int]()
      def calculate(s: String): Int =
        if (cache.contains(s))
        cache(s)
      else {
          val acc = new ChecksumAccumulator
          for (c <- s)
            acc.add(c.toByte)
          val cs = acc.checksum()
          cache += (s -> cs)
          println("s:"+s+" cs:"+cs)
          cs
        }
     
      def main(args: Array[String]) {
        println("Java 1:"+calculate("Java"))
        println("Java 2:"+calculate("Java"))
        println("Scala :"+calculate("Scala"))
      }
    }
    

    20、spark作业执行流程

    1. 客户端提交作业
    2. Driver启动流程
    3. Driver申请资源并启动其余Executor(即Container)
    4. Executor启动流程
    5. 作业调度,生成stages与tasks。
    6. Task调度到Executor上,Executor启动线程执行Task逻辑
    7. Driver管理Task状态
    8. Task完成,Stage完成,作业完成
    展开全文
  • 史上最全Spark面试题

    2019-10-06 08:23:51
    【面试妥了】史上最全Spark面试题 Spark问题精华 Q:什么是Spark? A:简单理解,Spark是在Hadoop基础上的改进,是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce...

    【面试妥了】史上最全Spark面试题

    Spark问题精华
    Q:什么是Spark?

     

    A:简单理解,Spark是在Hadoop基础上的改进,是UC Berkeley AMP lab所开源的类Hadoop MapReduce的通用的并行计算框架,Spark基于map reduce算法实现的分布式计算,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出和结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的map reduce的算法。

     

    Q:为什么要学Spark?

     

    A:基于MapReduce的计算引擎通常会将中间结果输出到磁盘上,进行存储和容错。出于任务管道承接的考虑,当一些查询翻译到MapReduce任务时,往往会产生多个Stage,而这些串联的Stage又依赖于底层文件系统(如HDFS)来存储每一个Stage的输出结果。

     

    Spark是MapReduce的替代方案,而且兼容HDFS、Hive,可融入Hadoop的生态系统,以弥补MapReduce的不足。

     

    Q:Spark有什么特性?

     

    A:1、高效性

     

    运行速度提高100倍。Apache Spark使用最先进的DAG调度程序,查询优化程序和物理执行引擎,实现批量和流式数据的高性能。

     

    2、易用性

     

    Spark支持Java、Python和Scala的API,还支持超过80种高级算法,使用户可以快速构建不同的应用。而且Spark支持交互式的Python和Scala的shell,可以非常方便地在这些shell中使用Spark集群来验证解决问题的方法。

     

    3、通用性

     

    Spark提供了统一的解决方案。Spark可以用于批处理、交互式查询(Spark SQL)、实时流处理(Spark Streaming)、机器学习(Spark MLlib)和图计算(GraphX)。这些不同类型的处理都可以在同一个应用中无缝使用。Spark统一的解决方案非常具有吸引力,毕竟任何公司都想用统一的平台去处理遇到的问题,减少开发和维护的人力成本和部署平台的物力成本。

     

    4、兼容性

     

    Spark可以非常方便地与其他的开源产品进行融合。比如,Spark可以使用Hadoop的YARN和Apache Mesos作为它的资源管理和调度器,器,并且可以处理所有Hadoop支持的数据,包括HDFS、HBase和Cassandra等。这对于已经部署Hadoop集群的用户特别重要,因为不需要做任何数据迁移就可以使用Spark的强大处理能力。Spark也可以不依赖于第三方的资源管理和调度器,它实现了Standalone作为其内置的资源管理和调度框架,这样进一步降低了Spark的使用门槛,使得所有人都可以非常容易地部署和使用Spark。此外,Spark还提供了在EC2上部署Standalone的Spark集群的工具。

     

     

     

    Q:Spark生态圈介绍

     

    A:Spark力图整合机器学习(MLib)、图算法(GraphX)、流式计算(Spark Streaming)和数据仓库(Spark SQL)等领域,通过计算引擎Spark,弹性分布式数据集(RDD),架构出一个新的大数据应用平台。

     

    Spark生态圈以HDFS、S3、Techyon为底层存储引擎,以Yarn、Mesos和Standlone作为资源调度引擎;使用Spark,可以实现MapReduce应用;基于Spark,Spark SQL可以实现即席查询,Spark Streaming可以处理实时应用,MLib可以实现机器学习算法,GraphX可以实现图计算,SparkR可以实现复杂数学计算。

     

    Q:Spark与Hadoop的对比

     

    A:Spark的中间数据放到内存中,对于迭代运算效率更高。Spark更适合于迭代运算比较多的ML和DM运算。因为在Spark里面,有RDD的抽象概念。所以,Spark比Hadoop更通用。

     

    Q:spark的组成有哪些?

     

    A:Spark组成(BDAS):全称伯克利数据分析栈,通过大规模集成算法、机器、人之间展现大数据应用的一个平台。也是处理大数据、云计算、通信的技术解决方案。

     

     

     

    它的主要组件有:

     

    SparkCore:将分布式数据抽象为弹性分布式数据集(RDD),实现了应用任务调度、RPC、序列化和压缩,并为运行在其上的上层组件提供API。

     

    SparkSQL:Spark Sql 是Spark来操作结构化数据的程序包,可以让我使用SQL语句的方式来查询数据,Spark支持 多种数据源,包含Hive表,parquest以及JSON等内容。

     

    SparkStreaming:是Spark提供的实时数据进行流式计算的组件。

     

    MLlib:提供常用机器学习算法的实现库。

     

    GraphX:提供一个分布式图计算框架,能高效进行图计算。

     

    BlinkDB:用于在海量数据上进行交互式SQL的近似查询引擎。

     

    Tachyon:以内存为中心高容错的的分布式文件系统。

     

    Q:Spark的工作流程是什么样的呢?

     

    A:通俗的解释就是:Spark是为了处理数据而生的平台,用一个比喻来形容它是餐馆。餐馆搭建好了后,就会有顾客,顾客的各种需求都得有人去处理,那么这时的Master就像是服务员,负责了解顾客的要求并把需求按照一定规律分配给厨师(Worker),这个顾客的需求就是一个APP,但这个APP不止包括了一个菜(job),整个订单里有很多个job,每个job都得由这些厨师处理,厨师的手就像是具体处理的Executor,负责所有的包括shuffle啊,filter啊,map啊,reduce等等具体的对原材料(RDD)的处理。driver就像是懒惰的厨师长,worker向它申请资源,同时它负责接收下面的人处理好的半成品材料或者完成品的菜品,但它自己并不干具体的活,如果是别人处理好的半成品,driver就将它分配给它认为有空的人接着处理(可能是map后要reduce的东西),直到目前的stage结束得到具体想要的结果,如果是直接就是想要的数据形式(一个job的完成),那么driver就通知master收货并反馈给顾客(可能是python程序,scala程序等等)。

     

    Q:Apache Spark和Apache Storm之间有什么差异,用户应该根据什么来加以选择?

     

     

     

    A:Apache Spark是一个内存中的分布式数据分析平台- 主要针对加快批量分析工作,反复机器学习的工作,交互式查询和图形处理。一个最主要区别是Spark使用弹性分布式数据集(RDD)。RDD是通过并行运算符来进行计算,并根据定义它是一成不变的。RDD允许Spark基于谱系信息容错的独特的形式。如果你对执行Hadoop MapReduce作业更快,那么Spark是一个很好的选择(即使在这里需要考虑内存的因素)。

     

    Apache Storm是专注于流处理或者一些所谓复杂事件的处理。Storm实现容错的方法进行计算或者以流水线的方式多次计算一个事件,由于Storm进入一个需要特定格式的系统,那么可能导致它转换为一个非结构化的数据。

     

    Storm和Spark存在相当不同的使用情况。Storm和Spark流更多是类似“苹果和苹果”比较。由于Spark的SSD本身是不可变的,Spark流实现在用户定义的时间间隔“定量”来实现更新,得到改造成自己的RDD的方法,从而Spark的并行操作人员可以对这些RDD进行计算。这是与Storm处理每个事的不同之处。

     

    这两种技术之间的一个主要区别是,Spark进行数据的并行计算,而Storm则是任务的并行计算。无论是那种方法,都有它表现价值的一方面。

     

    Q:RDD的核心概念是什么?

     

    A:Client:客户端进程,负责提交作业到Master。

     

    Master:Standalone模式中主控节点,负责接收Client提交的作业,管理Worker,并命令Worker启动分配Driver的资源和启动Executor的资源。

     

    Worker:Standalone模式中slave节点上的守护进程,负责管理本节点的资源,定期向Master汇报心跳,接收Master的命令,启动Driver和Executor。

     

    Driver:一个Spark作业运行时包括一个Driver进程,也是作业的主进程,负责作业的解析、生成Stage并调度Task到Executor上。包括DAGScheduler,TaskScheduler。

     

    Executor:即真正执行作业的地方,一个集群一般包含多个Executor,每个Executor接收Driver的命令Launch Task,一个Executor可以执行一到多个Task。

     

    Q:RDD有哪些常见术语?

     

    A:DAGScheduler:实现将Spark作业分解成一到多个Stage,每个Stage根据RDD的Partition个数决定Task的个数,然后生成相应的Task set放到TaskScheduler中。

     

    TaskScheduler:实现Task分配到Executor上执行。

     

    Task:运行在Executor上的工作单元。

     

    Job:SparkContext提交的具体Action操作,常和Action对应。

     

    Stage:每个Job会被拆分很多组任务(task),每组任务被称为Stage,也称TaskSet。

     

    RDD:Resilient Distributed Datasets的简称,弹性分布式数据集,是Spark最核心的模块和类。

     

    Transformation/Action:SparkAPI的两种类型;Transformation返回值还是一个RDD,Action返回值不少一个RDD,而是一个Scala的集合;所有的Transformation都是采用的懒策略,如果只是将Transformation提交是不会执行计算的,计算只有在Action被提交时才会被触发。

     

     

     

    Q:RDD提供了哪些操作?

     

    A:RDD提供了两种类型的操作:

     

    transformation和action

     

    1. transformation是得到一个新的RDD,方式很多,比如从数据源生成一个新的RDD,从RDD生成一个新的RDD

     

    2. action是得到一个值,或者一个结果(直接将RDD cache到内存中)

     

    3. 所有的transformation都是采用的懒策略,就是如果只是将transformation提交是不会执行计算的,计算只有在action被提交的时候才被触发。

     

    DataFrame:带有Schema信息的RDD,主要是对结构化数据的高度抽象。

     

    DataSet:结合了DataFrame和RDD两者的优势,既允许用户很方便的操作领域对象,又具有SQL执行引擎的高效表现。

     

    Q:RDD中关于转换(transformation)与动作(action)有什么区别?

     

    A:transformation会生成新的RDD,而后者只是将RDD上某项操作的结果返回给程序,而不会生成新的RDD;无论执行了多少次transformation操作,RDD都不会真正执行运算(记录lineage),只有当action操作被执行时,运算才会触发。

     

     

     

    Q:RDD 与 DSM的最大不同是什么?

     

    A:RDD只能通过粗粒度转换来创建,而DSM则允许对每个内存位置上数据的读和写。在这种定义下,DSM不仅包括了传统的共享内存系统,也包括了像提供了共享 DHT(distributed hash table) 的 Piccolo 以及分布式数据库等。

    展开全文
  • spark相关面试题

    万次阅读 多人点赞 2018-03-18 23:51:09
    spark面试问题收集 spark面试问题 1、spark中的RDD是什么,有哪些特性 RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行...

    spark面试问题收集

    spark面试问题

    1、spark中的RDD是什么,有哪些特性

    • RDD(Resilient Distributed Dataset)叫做分布式数据集,是Spark中最基本的数据抽象,它代表一个不可变、可分区、里面的元素可并行计算的集合。
      • Dataset:就是一个集合,用于存放数据的
      • Distributed:分布式,可以并行在集群计算
      • Resilient:表示弹性的
        • 弹性表示
          • 1、RDD中的数据可以存储在内存或者是磁盘
          • 2、RDD中的分区是可以改变的
    • 五大特性:
      • A list of partitions
        一个分区列表,RDD中的数据都存在一个分区列表里面
      • A function for computing each split
        作用在每一个分区中的函数
      • A list of dependencies on other RDDs
        一个RDD依赖于其他多个RDD,这个点很重要,RDD的容错机制就是依据这个特性而来的
      • Optionally, a Partitioner for key-value RDDs (e.g. to say that the RDD is hash-partitioned)
        可选的,针对于kv类型的RDD才具有这个特性,作用是决定了数据的来源以及数据处理后的去向
      • Optionally, a list of preferred locations to compute each split on (e.g. block locations for an HDFS file)
        可选项,数据本地性,数据位置最优

    2、概述一下spark中的常用算子区别(map、mapPartitions、foreach、foreachPartition)

    • map:用于遍历RDD,将函数f应用于每一个元素,返回新的RDD(transformation算子)。
    • foreach:用于遍历RDD,将函数f应用于每一个元素,无返回值(action算子)。
    • mapPartitions:用于遍历操作RDD中的每一个分区,返回生成一个新的RDD(transformation算子)。
    • foreachPartition: 用于遍历操作RDD中的每一个分区。无返回值(action算子)。

    • 总结:一般使用mapPartitions或者foreachPartition算子比map和foreach更加高效,推荐使用。

    3、谈谈spark中的宽窄依赖

    • RDD和它依赖的父RDD(s)的关系有两种不同的类型,即窄依赖(narrow dependency)和宽依赖(wide dependency)。
    • 宽依赖:指的是多个子RDD的Partition会依赖同一个父RDD的Partition
    • 窄依赖:指的是每一个父RDD的Partition最多被子RDD的一个Partition使用。

    4、spark中如何划分stage

    • 1.Spark Application中可以因为不同的Action触发众多的job,一个Application中可以有很多的job,每个job是由一个或者多个Stage构成的,后面的Stage依赖于前面的Stage,也就是说只有前面依赖的Stage计算完毕后,后面的Stage才会运行。
    • 2.Stage划分的依据就是宽依赖,何时产生宽依赖,例如reduceByKey,groupByKey的算子,会导致宽依赖的产生。
    • 3.由Action(例如collect)导致了SparkContext.runJob的执行,最终导致了DAGScheduler中的submitJob的执行,其核心是通过发送一个case class JobSubmitted对象给eventProcessLoop。
      eventProcessLoop是DAGSchedulerEventProcessLoop的具体实例,而DAGSchedulerEventProcessLoop是eventLoop的子类,具体实现EventLoop的onReceive方法,onReceive方法转过来回调doOnReceive
    • 4.在doOnReceive中通过模式匹配的方法把执行路由到
    • 5.在handleJobSubmitted中首先创建finalStage,创建finalStage时候会建立父Stage的依赖链条

    • 总结:以来是从代码的逻辑层面上来展开说的,可以简单点说:写介绍什么是RDD中的宽窄依赖,然后在根据DAG有向无环图进行划分,从当前job的最后一个算子往前推,遇到宽依赖,那么当前在这个批次中的所有算子操作都划分成一个stage,然后继续按照这种方式在继续往前推,如在遇到宽依赖,又划分成一个stage,一直到最前面的一个算子。最后整个job会被划分成多个stage,而stage之间又存在依赖关系,后面的stage依赖于前面的stage。

    5、spark-submit的时候如何引入外部jar包

    • 在通过spark-submit提交任务时,可以通过添加配置参数来指定
      • –driver-class-path 外部jar包
      • –jars 外部jar包

    6、spark 如何防止内存溢出

    • driver端的内存溢出
      • 可以增大driver的内存参数:spark.driver.memory (default 1g)
      • 这个参数用来设置Driver的内存。在Spark程序中,SparkContext,DAGScheduler都是运行在Driver端的。对应rdd的Stage切分也是在Driver端运行,如果用户自己写的程序有过多的步骤,切分出过多的Stage,这部分信息消耗的是Driver的内存,这个时候就需要调大Driver的内存。
    • map过程产生大量对象导致内存溢出
      • 这种溢出的原因是在单个map中产生了大量的对象导致的,例如:rdd.map(x=>for(i <- 1 to 10000) yield i.toString),这个操作在rdd中,每个对象都产生了10000个对象,这肯定很容易产生内存溢出的问题。针对这种问题,在不增加内存的情况下,可以通过减少每个Task的大小,以便达到每个Task即使产生大量的对象Executor的内存也能够装得下。具体做法可以在会产生大量对象的map操作之前调用repartition方法,分区成更小的块传入map。例如:rdd.repartition(10000).map(x=>for(i <- 1 to 10000) yield i.toString)。
        面对这种问题注意,不能使用rdd.coalesce方法,这个方法只能减少分区,不能增加分区,不会有shuffle的过程。
    • 数据不平衡导致内存溢出
      • 数据不平衡除了有可能导致内存溢出外,也有可能导致性能的问题,解决方法和上面说的类似,就是调用repartition重新分区。这里就不再累赘了。
    • shuffle后内存溢出
      • shuffle内存溢出的情况可以说都是shuffle后,单个文件过大导致的。在Spark中,join,reduceByKey这一类型的过程,都会有shuffle的过程,在shuffle的使用,需要传入一个partitioner,大部分Spark中的shuffle操作,默认的partitioner都是HashPatitioner,默认值是父RDD中最大的分区数,这个参数通过spark.default.parallelism控制(在spark-sql中用spark.sql.shuffle.partitions) , spark.default.parallelism参数只对HashPartitioner有效,所以如果是别的Partitioner或者自己实现的Partitioner就不能使用spark.default.parallelism这个参数来控制shuffle的并发量了。如果是别的partitioner导致的shuffle内存溢出,就需要从partitioner的代码增加partitions的数量。
    • standalone模式下资源分配不均匀导致内存溢出

      • 在standalone的模式下如果配置了–total-executor-cores 和 –executor-memory 这两个参数,但是没有配置–executor-cores这个参数的话,就有可能导致,每个Executor的memory是一样的,但是cores的数量不同,那么在cores数量多的Executor中,由于能够同时执行多个Task,就容易导致内存溢出的情况。这种情况的解决方法就是同时配置–executor-cores或者spark.executor.cores参数,确保Executor资源分配均匀。
    • 使用rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)代替rdd.cache()

      • rdd.cache()和rdd.persist(Storage.MEMORY_ONLY)是等价的,在内存不足的时候rdd.cache()的数据会丢失,再次使用的时候会重算,而rdd.persist(StorageLevel.MEMORY_AND_DISK_SER)在内存不足的时候会存储在磁盘,避免重算,只是消耗点IO时间。

    7、spark中cache和persist的区别

    • cache:缓存数据,默认是缓存在内存中,其本质还是调用persist
    • persist:缓存数据,有丰富的数据缓存策略。数据可以保存在内存也可以保存在磁盘中,使用的时候指定对应的缓存级别就可以了。

    8、简要描述Spark分布式集群搭建的步骤

    • 地球人都知道
    • 这里可以概述下如何搭建高可用的spark集群(HA)
      • 主要是引入了zookeeper

    9、spark中的数据倾斜的现象、原因、后果

    • (1)、数据倾斜的现象
      • 多数task执行速度较快,少数task执行时间非常长,或者等待很长时间后提示你内存不足,执行失败。
    • (2)、数据倾斜的原因
      • 数据问题
        • 1、key本身分布不均衡(包括大量的key为空)
        • 2、key的设置不合理
      • spark使用问题
        • 1、shuffle时的并发度不够
        • 2、计算方式有误
    • (3)、数据倾斜的后果
      • 1、spark中的stage的执行时间受限于最后那个执行完成的task,因此运行缓慢的任务会拖垮整个程序的运行速度(分布式程序运行的速度是由最慢的那个task决定的)。
      • 2、过多的数据在同一个task中运行,将会把executor撑爆。

    10、如何解决spark中的数据倾斜问题

    • 发现数据倾斜的时候,不要急于提高executor的资源,修改参数或是修改程序,首先要检查数据本身,是否存在异常数据。

      • 1、数据问题造成的数据倾斜

        • 找出异常的key
          • 如果任务长时间卡在最后最后1个(几个)任务,首先要对key进行抽样分析,判断是哪些key造成的。
            选取key,对数据进行抽样,统计出现的次数,根据出现次数大小排序取出前几个。
          • 比如: df.select(“key”).sample(false,0.1).(k=>(k,1)).reduceBykey(+).map(k=>(k._2,k._1)).sortByKey(false).take(10)
          • 如果发现多数数据分布都较为平均,而个别数据比其他数据大上若干个数量级,则说明发生了数据倾斜。
        • 经过分析,倾斜的数据主要有以下三种情况:
          • 1、null(空值)或是一些无意义的信息()之类的,大多是这个原因引起。
          • 2、无效数据,大量重复的测试数据或是对结果影响不大的有效数据。
          • 3、有效数据,业务导致的正常数据分布。
        • 解决办法
          • 第1,2种情况,直接对数据进行过滤即可(因为该数据对当前业务不会产生影响)。
          • 第3种情况则需要进行一些特殊操作,常见的有以下几种做法
            • (1) 隔离执行,将异常的key过滤出来单独处理,最后与正常数据的处理结果进行union操作。
            • (2) 对key先添加随机值,进行操作后,去掉随机值,再进行一次操作。
            • (3) 使用reduceByKey 代替 groupByKey(reduceByKey用于对每个key对应的多个value进行merge操作,最重要的是它能够在本地先进行merge操作,并且merge操作可以通过函数自定义.)
            • (4) 使用map join。
        • 案例
          • 如果使用reduceByKey因为数据倾斜造成运行失败的问题。具体操作流程如下:
            • (1) 将原始的 key 转化为 key + 随机值(例如Random.nextInt)
            • (2) 对数据进行 reduceByKey(func)
            • (3) 将 key + 随机值 转成 key
            • (4) 再对数据进行 reduceByKey(func)
        • 案例操作流程分析:
          • 假设说有倾斜的Key,我们给所有的Key加上一个随机数,然后进行reduceByKey操作;此时同一个Key会有不同的随机数前缀,在进行reduceByKey操作的时候原来的一个非常大的倾斜的Key就分而治之变成若干个更小的Key,不过此时结果和原来不一样,怎么破?进行map操作,目的是把随机数前缀去掉,然后再次进行reduceByKey操作。(当然,如果你很无聊,可以再次做随机数前缀),这样我们就可以把原本倾斜的Key通过分而治之方案分散开来,最后又进行了全局聚合
          • 注意1: 如果此时依旧存在问题,建议筛选出倾斜的数据单独处理。最后将这份数据与正常的数据进行union即可。
          • 注意2: 单独处理异常数据时,可以配合使用Map Join解决。
      • 2、spark使用不当造成的数据倾斜

        • 提高shuffle并行度

          • dataFrame和sparkSql可以设置spark.sql.shuffle.partitions参数控制shuffle的并发度,默认为200。
          • rdd操作可以设置spark.default.parallelism控制并发度,默认参数由不同的Cluster Manager控制。
          • 局限性: 只是让每个task执行更少的不同的key。无法解决个别key特别大的情况造成的倾斜,如果某些key的大小非常大,即使一个task单独执行它,也会受到数据倾斜的困扰。
        • 使用map join 代替reduce join

    11、flume整合sparkStreaming问题

    • (1)、如何实现sparkStreaming读取flume中的数据
      • 可以这样说:
        • 前期经过技术调研,查看官网相关资料,发现sparkStreaming整合flume有2种模式,一种是拉模式,一种是推模式,然后在简单的聊聊这2种模式的特点,以及如何部署实现,需要做哪些事情,最后对比两种模式的特点,选择那种模式更好。
          • 推模式:Flume将数据Push推给Spark Streaming
          • 拉模式:Spark Streaming从flume 中Poll拉取数据
    • (2)、在实际开发的时候是如何保证数据不丢失的

      • 可以这样说:
        • flume那边采用的channel是将数据落地到磁盘中,保证数据源端安全性(可以在补充一下,flume在这里的channel可以设置为memory内存中,提高数据接收处理的效率,但是由于数据在内存中,安全机制保证不了,故选择channel为磁盘存储。整个流程运行有一点的延迟性)
        • sparkStreaming通过拉模式整合的时候,使用了FlumeUtils这样一个类,该类是需要依赖一个额外的jar包(spark-streaming-flume_2.10)
        • 要想保证数据不丢失,数据的准确性,可以在构建StreamingConext的时候,利用StreamingContext.getOrCreate(checkpoint, creatingFunc: () => StreamingContext)来创建一个StreamingContext,使用StreamingContext.getOrCreate来创建StreamingContext对象,传入的第一个参数是checkpoint的存放目录,第二参数是生成StreamingContext对象的用户自定义函数。如果checkpoint的存放目录存在,则从这个目录中生成StreamingContext对象;如果不存在,才会调用第二个函数来生成新的StreamingContext对象。在creatingFunc函数中,除了生成一个新的StreamingContext操作,还需要完成各种操作,然后调用ssc.checkpoint(checkpointDirectory)来初始化checkpoint功能,最后再返回StreamingContext对象。
          这样,在StreamingContext.getOrCreate之后,就可以直接调用start()函数来启动(或者是从中断点继续运行)流式应用了。如果有其他在启动或继续运行都要做的工作,可以在start()调用前执行。
        • 流失计算中使用checkpoint的作用:
          • 保存元数据,包括流式应用的配置、流式没崩溃之前定义的各种操作、未完成所有操作的batch。元数据被存储到容忍失败的存储系统上,如HDFS。这种ckeckpoint主要针对driver失败后的修复。
          • 保存流式数据,也是存储到容忍失败的存储系统上,如HDFS。这种ckeckpoint主要针对window operation、有状态的操作。无论是driver失败了,还是worker失败了,这种checkpoint都够快速恢复,而不需要将很长的历史数据都重新计算一遍(以便得到当前的状态)。
        • 设置流式数据checkpoint的周期
          • 对于一个需要做checkpoint的DStream结构,可以通过调用DStream.checkpoint(checkpointInterval)来设置ckeckpoint的周期,经验上一般将这个checkpoint周期设置成batch周期的5至10倍。
        • 使用write ahead logs功能
          • 这是一个可选功能,建议加上。这个功能将使得输入数据写入之前配置的checkpoint目录。这样有状态的数据可以从上一个checkpoint开始计算。开启的方法是把spark.streaming.receiver.writeAheadLogs.enable这个property设置为true。另外,由于输入RDD的默认StorageLevel是MEMORY_AND_DISK_2,即数据会在两台worker上做replication。实际上,Spark Streaming模式下,任何从网络输入数据的Receiver(如kafka、flume、socket)都会在两台机器上做数据备份。如果开启了write ahead logs的功能,建议把StorageLevel改成MEMORY_AND_DISK_SER。修改的方法是,在创建RDD时由参数传入。
        • 使用以上的checkpoint机制,确实可以保证数据0丢失。但是一个前提条件是,数据发送端必须要有缓存功能,这样才能保证在spark应用重启期间,数据发送端不会因为spark streaming服务不可用而把数据丢弃。而flume具备这种特性,同样kafka也具备。
    • (3)Spark Streaming的数据可靠性

      • 有了checkpoint机制、write ahead log机制、Receiver缓存机器、可靠的Receiver(即数据接收并备份成功后会发送ack),可以保证无论是worker失效还是driver失效,都是数据0丢失。原因是:如果没有Receiver服务的worker失效了,RDD数据可以依赖血统来重新计算;如果Receiver所在worker失败了,由于Reciever是可靠的,并有write ahead log机制,则收到的数据可以保证不丢;如果driver失败了,可以从checkpoint中恢复数据重新构建。

    12、kafka整合sparkStreaming问题

    • (1)、如何实现sparkStreaming读取kafka中的数据

      • 可以这样说:在kafka0.10版本之前有二种方式与sparkStreaming整合,一种是基于receiver,一种是direct,然后分别阐述这2种方式分别是什么
        • receiver:是采用了kafka高级api,利用receiver接收器来接受kafka topic中的数据,从kafka接收来的数据会存储在spark的executor中,之后spark streaming提交的job会处理这些数据,kafka中topic的偏移量是保存在zk中的。
          • 基本使用: val kafkaStream = KafkaUtils.createStream(streamingContext,
            [ZK quorum], [consumer group id], [per-topic number of Kafka partitions to consume])
          • 还有几个需要注意的点:
            • 在Receiver的方式中,Spark中的partition和kafka中的partition并不是相关的,所以如果我们加大每个topic的partition数量,仅仅是增加线程来处理由单一Receiver消费的主题。但是这并没有增加Spark在处理数据上的并行度.
            • 对于不同的Group和topic我们可以使用多个Receiver创建不同的Dstream来并行接收数据,之后可以利用union来统一成一个Dstream。
            • 在默认配置下,这种方式可能会因为底层的失败而丢失数据. 因为receiver一直在接收数据,在其已经通知zookeeper数据接收完成但是还没有处理的时候,executor突然挂掉(或是driver挂掉通知executor关闭),缓存在其中的数据就会丢失. 如果希望做到高可靠, 让数据零丢失,如果我们启用了Write Ahead Logs(spark.streaming.receiver.writeAheadLog.enable=true)该机制会同步地将接收到的Kafka数据写入分布式文件系统(比如HDFS)上的预写日志中. 所以, 即使底层节点出现了失败, 也可以使用预写日志中的数据进行恢复. 复制到文件系统如HDFS,那么storage level需要设置成 StorageLevel.MEMORY_AND_DISK_SER,也就是KafkaUtils.createStream(…, StorageLevel.MEMORY_AND_DISK_SER)
        • direct:在spark1.3之后,引入了Direct方式。不同于Receiver的方式,Direct方式没有receiver这一层,其会周期性的获取Kafka中每个topic的每个partition中的最新offsets,之后根据设定的maxRatePerPartition来处理每个batch。(设置spark.streaming.kafka.maxRatePerPartition=10000。限制每秒钟从topic的每个partition最多消费的消息条数)。
    • (2) 对比这2中方式的优缺点:

      • 采用receiver方式:这种方式可以保证数据不丢失,但是无法保证数据只被处理一次,WAL实现的是At-least-once语义(至少被处理一次),如果在写入到外部存储的数据还没有将offset更新到zookeeper就挂掉,这些数据将会被反复消费. 同时,降低了程序的吞吐量。
      • 采用direct方式:相比Receiver模式而言能够确保机制更加健壮. 区别于使用Receiver来被动接收数据, Direct模式会周期性地主动查询Kafka, 来获得每个topic+partition的最新的offset, 从而定义每个batch的offset的范围. 当处理数据的job启动时, 就会使用Kafka的简单consumer api来获取Kafka指定offset范围的数据。
        • 优点:
          • 1、简化并行读取
            • 如果要读取多个partition, 不需要创建多个输入DStream然后对它们进行union操作. Spark会创建跟Kafka partition一样多的RDD partition, 并且会并行从Kafka中读取数据. 所以在Kafka partition和RDD partition之间, 有一个一对一的映射关系.
          • 2、高性能
            • 如果要保证零数据丢失, 在基于receiver的方式中, 需要开启WAL机制. 这种方式其实效率低下, 因为数据实际上被复制了两份, Kafka自己本身就有高可靠的机制, 会对数据复制一份, 而这里又会复制一份到WAL中. 而基于direct的方式, 不依赖Receiver, 不需要开启WAL机制, 只要Kafka中作了数据的复制, 那么就可以通过Kafka的副本进行恢复.
          • 3、一次且仅一次的事务机制
            • 基于receiver的方式, 是使用Kafka的高阶API来在ZooKeeper中保存消费过的offset的. 这是消费Kafka数据的传统方式. 这种方式配合着WAL机制可以保证数据零丢失的高可靠性, 但是却无法保证数据被处理一次且仅一次, 可能会处理两次. 因为Spark和ZooKeeper之间可能是不同步的. 基于direct的方式, 使用kafka的简单api, Spark Streaming自己就负责追踪消费的offset, 并保存在checkpoint中. Spark自己一定是同步的, 因此可以保证数据是消费一次且仅消费一次。不过需要自己完成将offset写入zk的过程,在官方文档中都有相应介绍.
              *简单代码实例:
              * messages.foreachRDD(rdd=>{
              val message = rdd.map(_._2)//对数据进行一些操作
              message.map(method)//更新zk上的offset (自己实现)
              updateZKOffsets(rdd)
              })
              * sparkStreaming程序自己消费完成后,自己主动去更新zk上面的偏移量。也可以将zk中的偏移量保存在mysql或者redis数据库中,下次重启的时候,直接读取mysql或者redis中的偏移量,获取到上次消费的偏移量,接着读取数据。

    13、利用scala语言实现排序

    • (1)冒泡排序:

      • package cn.itcast.sort
      • //冒泡排序
      • class BubbleSort {
      • def main(args: Array[String]): Unit = {
      • val list = List(3, 12, 43, 23, 7, 1, 2, 0)
      • println(sort(list))
      • }
      • //定义一个方法,传入的参数是要进行排序的List集合,输出的是排序后的List集合
      • def sort(list: List[Int]): List[Int] = list match {
      • case List() => List()
      • case head :: tail => compute(head, sort(tail))
      • }
      • def compute(data: Int, dataSet: List[Int]): List[Int] = dataSet match {
      • case List() => List(data)
      • case head :: tail => if (data <= head) data :: dataSet else * head :: compute(data, tail)
      • }
      • }
    • (2) 快读排序

      • package cn.itcast.sort
      • //快速排序
      • object QuickSort {
      • def main(args: Array[String]): Unit = {
      • val list = List(3, 12, 43, 23, 7, 1, 2, 0)
      • println(quickSort(list))
        *
      • }
      • //定义一个方法,传入的参数是要进行排序的List集合,输出的是排序后的List集合
      • def quickSort(list: List[Int]): List[Int] = {
      • //对输入参数list进行模式匹配
      • list match {
      • //如果是空,返回nil
      • case Nil => Nil
      • case List() => List()
      • //不为空从list中提取出首元素和剩余元素组成的列表分别到head和tail中
      • case head :: tail =>
      • //对剩余元素列表调用partition方法,这个方法会将列表分为两部分。
      • // 划分依据接受的参数,这个参数是一个函数(这里是(_ < x))。
      • // partition方法会对每个元素调用这个函数,根据返回的true,false分成两部分。
      • // 这里’_ < x’是一个匿名函数(又称lambda),’_’关键字是函数输入参数的占位符,
      • // 输入参数这里是列表中的每个元素。
      • val (left, right) = tail.partition(_ < head)
      • //最后对划分好的两部分递归调用quickSort
      • //其中head::quickSort(right) 这里::是List定义的一个方法,用于将两部分合成一个列表
      • quickSort(left) ++ (head :: quickSort(right))
      • }
      • }
      • }
    展开全文
  • Spark面试题及其答案

    千次阅读 2019-04-25 11:03:53
    一、简答 1.Spark master使用zookeeper进行HA的,有哪些元数据保存在Zookeeper? 答:spark通过这个参数spark.deploy.zookeeper.dir指定master元数据在zookeeper中保存的位置,包括Worker,Driver和Application...
  • Spark应用转换流程 1、spark应用提交后,经历了一系列的转换,最后成为task在每个节点上执行 2、RDD的Action算子触发Job的提交,生成RDD DAG 3、由DAGScheduler将RDD DAG转化为Stage DAG,每个Stage中...
  • spark相关面试题总结

    千次阅读 2018-09-26 13:51:45
    1.spark中的RDD是什么,有哪些特性? 答:RDD(Resilient Distributed Dataset)叫做分布式数据集,是spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可以并行计算的集合 Dataset:就是一个集合,...
  • spark精华面试题

    万次阅读 2018-05-18 16:25:11
    1)一个Spark作业运行时包括一个Driver进程,也是作业的主进程,具有main函数,并且有SparkContext的实例,是程序的人口点;2)功能:负责向集群申请资源,向master注册信息,负责了作业的调度,,负责作业的解析、...
  • 史上最全的spark面试题——持续更新中

    万次阅读 多人点赞 2018-09-09 16:34:10
    1.spark中的RDD是什么,有哪些特性? 答:RDD(Resilient Distributed Dataset)叫做分布式数据集,是spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可以并行计算的集合 Dataset:就是一个集合,...
  • spark-面试题(含答案)

    千次阅读 2019-06-03 09:23:58
    1 var, val和def三个关键字之间的区别? var immutable variable val mutable variable def function defined keyword 2.object 和 class 的区别? object 单例 无构造器 成员变量和method都是static 可以直接访问main...
  • spark 面试题

    2020-02-29 17:59:55
    Hadoop 相关试题 Hive 相关试题 1、 hive表关联查询,如何解决数据倾斜的问题? 倾斜原因: map输出数据按key Hash的分配到reduce中,由于key分布不均匀、业务数据本身的特点、建表时考虑不周、等原因造成的...
  • Spark面试题

    2019-03-14 16:12:00
    Spark面试题 RDD怎么理解? RDD 是 Spark 的灵魂,也称为弹性分布式数据集。一个 RDD 代表一个可以被分区的只读数据集。RDD 内部可以有许多分区(partitions),每个分区又拥有大量的记录(records)。...
  • Spark面试题、答案

    2018-11-03 20:27:31
    一、简答 1.Spark master使用zookeeper进行HA的,有哪些元数据保存在Zookeeper? 答:spark通过这个参数spark.deploy.zookeeper.dir指定master元数据在zookeeper中保存的位置,包括Worker,Driver和Application...
  • Spark 面试题

    2020-01-19 10:19:19
    1、Spark与Apache Hadoop有何关系? Spark是与Hadoop数据兼容的快速通用处理引擎。它可以通过YARN或Spark的独立模式在Hadoop群集中运行,并且可以处理HDFS,HBase,Cassandra,Hive和任何Hadoop InputFormat中的数据...
  • 1.spark中的RDD是什么,有哪些特性? RDD(Resilient Distributed Dataset)叫做分布式数据集,是spark中最基本的数据抽象,它代表一个不可变,可分区,里面的元素可以并行计算的集合。 三个特性分区,不可变,...
  • spark面试题

    2018-05-16 19:42:15
    1.Spark master使用zookeeper进行HA的,有哪些元数据保存在Zookeeper?偏向于运维,暂不作讨论。2.Spark master HA 主从切换过程不会影响集群已有的作业运行,为什么?3.Spark on Mesos中,什么是的粗粒度分配,什么...
  • 大概说一些这三个框架各自是什么,解决了什么问题。 Hadoop Hadoop是一个开源框架,允许使用简单的编程模型在跨计算机集群的分布式环境中存储和处理大数据。它的设计是从单个服务器扩展到数千个机器,每个都提供本地...
  • spark面试题

    千次阅读 2018-07-02 20:18:52
    前期分享了很多关于Spark的学习视频和文章,为了进一步巩固和掌握Spark,在原有spark专刊基础上,新增《Spark面试2000》专刊,题集包含基础概念、原理、编码开发、性能调优、运维、源代码以及Spark周边生态系统等...
  • 2018最新版spark面试题及答案

    千次阅读 2018-04-02 18:12:15
    Spark是一个围绕速度、...成为大数据核心技术之一,自然也成为了众多企业面试的核心专业问题,小编针对spark整理了一套相关的面试题,正在学习大数据和正在面试大数据岗位的小伙伴可以参考使用了!1、SDD,DAG,Sta...
  • 第二章 Spark 2.1 Spark 原理 2.1.1 Shuffle 原理 2.1.1.1 SortShuffle mapTask将map(聚合算子)或array(join算子)写入内存 达到阀值发生溢写,溢写前根据key排序,分批写入磁盘,最终将所有临时文件...
1 2 3 4 5 ... 20
收藏数 5,294
精华内容 2,117
关键字:

spark面试题