2019-05-22 17:14:32 weixin_42490528 阅读数 479
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35092 人正在学习 去看看 张长志

shuffle是什么:

分布式计算中,每个节点只计算部分数据,也就是只处理一个分片,那么要想求得某个key对应的全部数据,比如reduceByKey、groupByKey,那就需要把相同key的数据拉取到同一个分区,原分区的数据需要被打乱重组,这个按照一定的规则对数据重新分区的过程就是Shuffle(洗牌)。

Shuffle是连接Map和Reduce之间的桥梁,描述的是数据从Map端到Reduce端的过程,

map阶段的结果会写到shuffle,reduce阶段再拉取。

 

Hadoop的Shuffle过程

大致分为排序(sort)、溢写(spill)、合并(merge)、拉取拷贝(Copy)、

合并排序(merge sort)这几个过程,大体流程如下:

上图的红绿蓝代表3种key,shuffle过程就是把相同的Key拉取到同一分区,供Reduce节点处理。

Map端:sort spill merge

1.sort(排序)

map端输出的数据,先写到环形缓冲区kvbuffer,溢出则写到磁盘(也可能一直没有达到阀值,也一样要将内存中的数据写入磁盘)。写磁盘之前会排序,按照partition和key两个关键字来排序,排序结果是数据按照partition为单位聚集在一起,同一partition内数据按照key排序。

2.spill(溢写)

当排序完成,便开始把数据刷到磁盘,刷磁盘的过程以分区为单位,一个分区写完,写下一个分区,分区内数据有序,最终生成多个文件

3.merge(合并)

上一步会生成的多个小文件,对于Reduce端拉取数据是相当低效的,那么这时候就有了merge的过程,该过程会将每一个Spill.out文件合并成为一个大文件,合并的过程也是同分片的合并成一个片段(segment),最终所有的segment组装成一个最终文件。

 

Reduce端:fetch copy和merge sort

1.拉取拷贝(fetch copy)

Reduce任务通过向各个Map任务拉取对应分片。

这个过程都是以Http协议完成,每个Map节点都会启动一个常驻的HTTP server服务,Reduce节点会请求这个Http Server拉取数据,这个过程完全通过网络传输,所以是一个非常重量级的操作。

2.合并排序 (merge sort)

拉取到各个Map节点对应分片的数据之后,会进行再次排序,排序完成,结果丢给Reduce函数进行计算。

 

总结:

1、shuffle过程就是为了对key进行全局聚合

2、排序操作伴随着整个shuffle过程,所以Hadoop的shuffle是sort-based的

 

 

spark的shuffle过程:

Spark shuffle分为write和read两个过程

1.shuffle write

将MapTask产生的数据写到磁盘中,

首先将map的结果文件中的数据记录送到对应的bucket里面(缓冲区),之后,每个bucket里面的数据会不断被写到本地磁盘上,形成一个ShuffleBlockFile,或者简称FileSegment

2.shuffle read

ReduceTask再将数据抓取(fetch)过来

因为不需要排序,fetch一旦开始,就会边fetch边处理(reduce)

 

spark2.0版本后放弃了Hash Based Shuffle,使用Sort Based Shuffle,

目的是在处理大规模的数据上也不会很容易达到性能瓶颈

其实 Sorted-Based Shuffle 也有缺点,其缺点反而是它排序的特性,它强制要求数据在 Mapper 端必须要先进行排序 (注意,这里没有说对计算结果进行排序),所以导致它排序的速度有点慢。而 Tungsten-Sort Shuffle 对它的排序算法进行了改进,优化了排序的速度。

https://www.bbsmax.com/A/lk5anoONd1/

 

需要注意:

Hash Based Shuffle会在磁盘上生成大量中间文件,这些文件将被保留,

这样做是为了在重新计算时不需要重新创建shuffle文件。

这意味着长时间运行的Spark作业可能会占用大量磁盘空间。

 

 

 

Hadoop和Spark的Shuffle区别:

1、Hadoop中有一个Map完成,Reduce便可以去fetch数据了,不必等到所有Map任务完成,而Spark的必须等到父stage完成,也就是父stage的map操作全部完成才能去fetch数据。

这是因为spark必须等到父stage执行完,才能执行子stage,迎合stage规则。

2、Hadoop的Shuffle是sort-base的,那么不管是Map的输出,还是Reduce的输出,都是partion内有序的,而spark不要求这一点。

存在疑问,spark最开始是Hash Based Shuffle,不需要排序。

然后引入Sort Based Shuffle,是要排序的。

Spark 2.0版本Hash Based Shuffle退出历史舞台。

3、Hadoop的Reduce要等到fetch完全部数据,才将数据传入reduce函数进行聚合,而spark是一边fetch一边聚合。

 

2015-02-13 21:41:31 axxbc123 阅读数 946
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35092 人正在学习 去看看 张长志

前面有几篇关于Spark Streaming的博客,那会只是作为Spark入门,快速体验Spark之用,只是照着葫芦画瓢。本文结合Spark官网上Spark Streaming的编程指南对Spark Streaming进行介绍

StreamingContext

如同SparkContext一样,StreamingContext也是Spark Streaming应用程序通往Spark集群的通道,它的定义如下:

/**
 * Main entry point for Spark Streaming functionality. It provides methods used to create
 * [[org.apache.spark.streaming.dstream.DStream]]s from various input sources. It can be either
 * created by providing a Spark master URL and an appName, or from a org.apache.spark.SparkConf
 * configuration (see core Spark documentation), or from an existing org.apache.spark.SparkContext.
 * The associated SparkContext can be accessed using `context.sparkContext`. After
 * creating and transforming DStreams, the streaming computation can be started and stopped
 * using `context.start()` and `context.stop()`, respectively.
 * `context.awaitTermination()` allows the current thread to wait for the termination
 * of the context by `stop()` or by an exception.
 */
class StreamingContext private[streaming] (
    sc_ : SparkContext,
    cp_ : Checkpoint,
    batchDur_ : Duration
  ) extends Logging {

 通过类的文档注释,我们看到:

1. 提供了从各种输入数据源创建DStream的方法

2,参数中的batchDur_是Duration类型的对象,比如Second(10),这个参数的含义是the time interval at which streaming data will be divided into batches,也就是说,假如batchDur_为Second(10)表示Spark Streaming会把每10秒钟的数据作为一个Batch,而一个Batch就是一个RDD?是的,一个RDD的数据对应一个batchInterval累加读取到的数据

 

DStream

/**
 * A Discretized Stream (DStream), the basic abstraction in Spark Streaming, is a continuous
 * sequence of RDDs (of the same type) representing a continuous stream of data (see
 * org.apache.spark.rdd.RDD in the Spark core documentation for more details on RDDs).
 * DStreams can either be created from live data (such as, data from TCP sockets, Kafka, Flume,
 * etc.) using a [[org.apache.spark.streaming.StreamingContext]] or it can be generated by
 * transforming existing DStreams using operations such as `map`,
 * `window` and `reduceByKeyAndWindow`. While a Spark Streaming program is running, each DStream
 * periodically generates a RDD, either from live data or by transforming the RDD generated by a
 * parent DStream.
 *
 * This class contains the basic operations available on all DStreams, such as `map`, `filter` and
 * `window`. In addition, [[org.apache.spark.streaming.dstream.PairDStreamFunctions]] contains
 * operations available only on DStreams of key-value pairs, such as `groupByKeyAndWindow` and
 * `join`. These operations are automatically available on any DStream of pairs
 * (e.g., DStream[(Int, Int)] through implicit conversions when
 * `org.apache.spark.streaming.StreamingContext._` is imported.
 *
 * DStreams internally is characterized by a few basic properties:
 *  - A list of other DStreams that the DStream depends on
 *  - A time interval at which the DStream generates an RDD
 *  - A function that is used to generate an RDD after each time interval
 */

 从文档中,我们可以看到如下几点:

1. 对DStream实施map操作,会转换成另外一个DStream

2. DStream是一组连续的RDD序列,这些RDD中的元素的类型是一样的。DStream是一个时间上连续接收数据但是接受到的数据按照指定的时间(batchInterval)间隔切片,每个batchInterval都会构造一个RDD,因此,Spark Streaming实质上是根据batchInterval切分出来的RDD串,想象成糖葫芦,每个山楂就是一个batchInterval形成的RDD

3. 对DStream实施windows或者reduceByKeyAndWindow操作,也是转换成另外一个DStream(window操作是stateful DStream Transformation)

4. DStream同RDD一样,也定义了map,filter,window等操作,同时,对于元素类型为(K,V)的pair DStream,Spark Streaming提供了一个隐式转换的类,PairStreamFunctions

5. DStream内部有如下三个特性:

-DStream也有依赖关系,一个DStream可能依赖于其它的DStream(依赖关系的产生,同RDD是一样的)

-DStream创建RDD的时间间隔,这个时间间隔是不是就是构造StreamingContext传入的第三个参数?是的!

-在时间间隔到达后,DStream创建RDD的方法


 在DStream内部,DStream表现为一系列的RDD的序列,针对DStream的操作(比如map,filter)会转换到它底层的RDD的操 作,由这个图中可以看出来,0-1这段时间的数据累积构成了RDD@time1,1-2这段时间的数据累积构成了RDD@time2,。。。也就是说,在 Spark Streaming中,DStream中的每个RDD的数据是一个时间窗口的累计。

 

 

下图展示了对DStream实施转换算子flatMap操作。需要指出的是,RDD的转换操作是由Spark Engine来实现的,原因是Spark Engine接受了原始的RDD以及作用于RDD上的算子,在计算结果时才真正的对RDD实施算子操作

 

 

 

 

 

按照下面这幅图所呈现出来的含义是,Spark Streaming用于将输入的数据进行分解成一个一个的RDD,每个RDD交由Spark Engine进行处理以得到最后的处理数据?是的!

 


上图中,Spark Streaming模块用于将接受到数据定时的切分成RDD(上图中定义为batch of input data),这些RDD交由Spark Engine进行计算。Spark Streaming模块负责数据接收并定时转换成一系列RDD,Spark Engine对Spark Streaming送过来的RDD进行计算

 

DStream层次关系

 

 

DStream的window操作

  /**
   * Return a new DStream in which each RDD contains all the elements in seen in a
   * sliding window of time over this DStream.
   * @param windowDuration width of the window; must be a multiple of this DStream's
   *                       batching interval
   * @param slideDuration  sliding interval of the window (i.e., the interval after which
   *                       the new DStream will generate RDDs); must be a multiple of this
   *                       DStream's batching interval
   */
  def window(windowDuration: Duration, slideDuration: Duration): DStream[T] = {
    new WindowedDStream(this, windowDuration, slideDuration)
  }

DStream与window相关的两个参数是windowDuration和slideDuration,这两个参数究竟表示什么含义。通过window操作,DStream转换为了WindowedDStream

windowDuration表示的是对过去的一个windowDuration时间间隔的数据进行统计计算, windowDuration是intervalBatch的整数倍,也就是说,假如windowDuration=n*intervalBatch, 那么window操作就是对过去的n个RDD进行统计计算
如下内容来自于Spark Streaming的官方文档:http://spark.apache.org/docs/latest/streaming-programming-guide.html

 

Spark Streaming也提供了窗口计算(window computations)的功能,允许我们每隔一段时间(sliding duration)对过去一个时间段内(window duration)的数据进行转换操作(tranformation).

slideDruation控制着窗口计算的频度,windowDuration控制着窗口计算的时间跨度。slideDruation和windowDuration都必须是batchInterval的整数倍。假想如下一种场景:

windowDuration=3*batchInterval,

slideDuration=10*batchInterval,

表示的含义是每个10个时间间隔对之前的3个RDD进行统计计算,也意味着有7个RDD没在window窗口的统计范围内。slideDuration的默认值是batchInterval

 

 


下图展示了滑动窗口的概念
 

 

 

如上图所示,一个滑动窗口时间段((sliding window length)内的所有RDD会进行合并以创建windowed DStream所对应的RDDD。每个窗口操作有两个参数:

 

  • window length - The duration of the window (3 in the figure),滑动窗口的时间跨度,指本次window操作所包含的过去的时间间隔(图中包含3个batch interval,可以理解时间单位)
  • sliding interval - The interval at which the window operation is performed (2 in the figure).(窗口操作执行的频率,即每隔多少时间计算一次)

These two parameters must be multiples of the batch interval of the source DStream (1 in the figure). 这表示,sliding window length的时间长度以及sliding interval都要是batch interval的整数倍。
batch interval是在构造StreamingContext时传入的(1 in the figure)

说明:

window length为什么是3?如椭圆形框,它是从第三秒开始算起(包括第三秒),第五秒结束,即包含3,4,5三个1秒,因此是3

sliding interval为什么是2?主要是看圆角矩形框的右边线,虚线的圆角矩形框的右边线在time 3结束, 实线的圆角矩形框的右边线在time 5结束,所以跨度是2。也就是看时间的最右侧即可,以右边线为基准,每个窗口操作(window length)占用了3个时间片。



// Reduce last 30 seconds of data, every 10 seconds
val windowedWordCounts = pairs.reduceByKeyAndWindow((a:Int,b:Int) => (a + b), Seconds(30), Seconds(10))
表示每隔10秒钟对过去30秒钟产生的单词进行计数。这个方法有个不合理的地方,既然要求sliding window length和sliding interval都是batch interval的整数倍,那么此处为什么不用时间单位,而使用绝对的时间长度呢?

 

 

 

Spark Streaming Sources

这是Spark Streaming的数据输入源,包括两类:基本数据源和高级数据源

 

基本数据源

  • file systems
  • socket connections
  • Akka actors

以上数据源,StreamingContext的API直接提供,

  • fileStream,

监听HDFS文件系统的新文件的创建,读取其中内容。如果文件已存在而内容有变化,是不会被监听到的,因此只能将文件内容在某个位置写好后,然后移动到Spark Streaming监听的目录,如果文件在这个目录下内容发生变化,则Spark Streaming无法监听到

 

另外需要注意的是,Spark Streaming启动后,Spark Streaming通过文件的最后修改时间(modify time)来判断一个新加入到监听目录的文件是否有效。如果一个较长时间没有更新的文件move到监听目录,Spark Streaming也不会对它进行读取进而计算

 

  /**
   * Create a input stream that monitors a Hadoop-compatible filesystem
   * for new files and reads them using the given key-value types and input format.
   * Files must be written to the monitored directory by "moving" them from another
   * location within the same file system. File names starting with . are ignored.
   * @param directory HDFS directory to monitor for new file
   * @tparam K Key type for reading HDFS file
   * @tparam V Value type for reading HDFS file
   * @tparam F Input format for reading HDFS file
   */
  def fileStream[
    K: ClassTag,
    V: ClassTag,
    F <: NewInputFormat[K, V]: ClassTag
  ] (directory: String): InputDStream[(K, V)] = {
    new FileInputDStream[K, V, F](this, directory)
  }

 

  • socket connections

 

  /**
   * Create an input stream from TCP source hostname:port. Data is received using
   * a TCP socket and the receive bytes it interepreted as object using the given
   * converter.
   * @param hostname      Hostname to connect to for receiving data
   * @param port          Port to connect to for receiving data
   * @param converter     Function to convert the byte stream to objects
   * @param storageLevel  Storage level to use for storing the received objects
   * @tparam T            Type of the objects received (after converting bytes to objects)
   */
  def socketStream[T: ClassTag](
      hostname: String,
      port: Int,
      converter: (InputStream) => Iterator[T],
      storageLevel: StorageLevel
    ): ReceiverInputDStream[T] = {
    new SocketInputDStream[T](this, hostname, port, converter, storageLevel)
  }

 问题: converter怎么使用?把InputStream转换为Iterator[T]集合

 

 高级数据源

 

Source Artifact
Kafka spark-streaming-kafka_2.10
Flume spark-streaming-flume_2.10
Kinesis spark-streaming-kinesis-asl_2.10 [Amazon Software License]
Twitter spark-streaming-twitter_2.10
ZeroMQ spark-streaming-zeromq_2.10
MQTT spark-streaming-mqtt_2.10

 

Spark Streaming注意点:

 1. When running a Spark Streaming program locally, do not use “local” or “local[1]” as the master URL. Either of these means that only one thread will be used for running tasks locally. If you are using a input DStream based on a receiver (e.g. sockets, Kafka, Flume, etc.), then the single thread will be used to run the receiver, leaving no thread for processing the received data. Hence, when running locally, always use “local[n]” as the master URL where n > number of receivers to run (see Spark Properties for information on how to set the master).Extending the logic to running on a cluster, the number of cores allocated to the Spark Streaming application must be more than the number of receivers. Otherwise the system will receive data, but not be able to process them.

 

 

 

 

2019-06-21 09:31:50 young_0609 阅读数 162
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35092 人正在学习 去看看 张长志

ES也是比较火热,在日志数据分析,规则分析等确实很方便,说实话用es stack可以解决很多公司的数据分析需求。

 

Spark 分析ES的数据,生成的RDD分区数跟什么有关系呢?

稍微猜测一下就能想到跟分片数有关,但是具体是什么关系呢?

可想的具体关系可能是以下两种:

1).就像KafkaRDD的分区与kafka topic分区数的关系一样,一对一。

2).ES支持游标查询,那么是不是也可以对比较大的分片进行拆分成多个RDD分区呢?

 

Spark Core读取ES

ES官网直接提供的有elasticsearch-hadoop 插件,对于ES 7.x,hadoop和Spark版本支持如下:

hadoop2Version  = 2.7.1
hadoop22Version = 2.2.0
spark13Version = 1.6.2
spark20Version = 2.3.0

这了采用的ES版本是7.1.1,测试用的Spark版本是2.3.1,没有问题。整合es和spark,导入相关依赖有两种方式:

a,导入整个elasticsearch-hadoop包

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-hadoop</artifactId>
    <version>7.1.1</version>
</dependency>

b,只导入spark模块的包

<dependency>
    <groupId>org.elasticsearch</groupId>
    <artifactId>elasticsearch-spark-20_2.11</artifactId>
    <version>7.1.1</version>
</dependency>

这里为了测试方便,只是在本机起了一个单节点的ES实例,简单的测试代码如下:

import org.apache.spark.{SparkConf, SparkContext}
import org.elasticsearch.hadoop.cfg.ConfigurationOptions

object es2sparkrdd {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf().setMaster("local[*]").setAppName(this.getClass.getCanonicalName)

    conf.set(ConfigurationOptions.ES_NODES, "127.0.0.1")
    conf.set(ConfigurationOptions.ES_PORT, "9200")
    conf.set(ConfigurationOptions.ES_NODES_WAN_ONLY, "true")
    conf.set(ConfigurationOptions.ES_INDEX_AUTO_CREATE, "true")
    conf.set(ConfigurationOptions.ES_NODES_DISCOVERY, "false")
//    conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_USER, esUser)
//    conf.set(ConfigurationOptions.ES_NET_HTTP_AUTH_PASS, esPwd)
    conf.set("es.write.rest.error.handlers", "ignoreConflict")
    conf.set("es.write.rest.error.handler.ignoreConflict", "com.jointsky.bigdata.handler.IgnoreConflictsHandler")

    val sc = new SparkContext(conf)
    import org.elasticsearch.spark._

    sc.esRDD("posts").foreach(each=>{
      each._2.keys.foreach(println)
    })
    sc.esJsonRDD("posts").foreach(each=>{
      println(each._2)
    })

    sc.stop()
  }
}

可以看到Spark Core读取RDD主要有两种形式的API:

  • esRDD。这种返回的是一个tuple2的类型的RDD,第一个元素是id,第二个是一个map,包含ES的document元素。
RDD[(String, Map[String, AnyRef])]
  • esJsonRDD。这种返回的也是一个tuple2类型的RDD,第一个元素依然是id,第二个是json字符串。
RDD[(String, String)]

虽然是两种类型的RDD,但是RDD都是ScalaEsRDD类型。

要分析Spark Core读取ES的并行度,只需要分析ScalaEsRDD的getPartitions函数即可。

源码分析

首先导入源码https://github.com/elastic/elasticsearch-hadoop这个是gradle工程,可以直接导入idea,然后切换到7.x版本即可。

废话少说直接找到ScalaEsRDD,发现gePartitions是在其父类实现的,方法内容如下:


override def getPartitions: Array[Partition] = {
    esPartitions.zipWithIndex.map { case(esPartition, idx) =>
      new EsPartition(id, idx, esPartition)
    }.toArray
  }

esPartitions是一个lazy型的变量:

@transient private[spark] lazy val esPartitions = {
    RestService.findPartitions(esCfg, logger)
  }

这种声明原因是什么呢?

lazy+transient的原因大家可以考虑一下。

RestService.findPartitions方法也是仅是创建客户端获取分片等信息,然后调用,分两种情况调用两个方法。


final List<PartitionDefinition> partitions;
//            5.x及以后版本 同时没有配置es.input.max.docs.per.partition
if (clusterInfo.getMajorVersion().onOrAfter(EsMajorVersion.V_5_X) && settings.getMaxDocsPerPartition() != null) {
     partitions = findSlicePartitions(client.getRestClient(), settings, mapping, nodesMap, shards, log);
} else {
     partitions = findShardPartitions(settings, mapping, nodesMap, shards, log);
} 
  • findSlicePartitions

这个方法其实就是在5.x及以后的ES版本,同时配置了

es.input.max.docs.per.partition

以后,才会执行,实际上就是将ES的分片按照指定大小进行拆分,必然要先进行分片大小统计,然后计算出拆分的分区数,最后生成分区信息。具体代码如下:

long numDocs;
if (readResource.isTyped()) {
    numDocs = client.count(index, readResource.type(), Integer.toString(shardId), query);
} else {
    numDocs = client.countIndexShard(index, Integer.toString(shardId), query);
}
int numPartitions = (int) Math.max(1, numDocs / maxDocsPerPartition);
for (int i = 0; i < numPartitions; i++) {
    PartitionDefinition.Slice slice = new PartitionDefinition.Slice(i, numPartitions);
    partitions.add(new PartitionDefinition(settings, resolvedMapping, index, shardId, slice, locations));
}

实际上分片就是用游标的方式,对_doc进行排序,然后按照分片计算得到的分区偏移进行数据的读取,组装过程是SearchRequestBuilder.assemble方法来实现的。

这个其实个人觉得会浪费一定的性能,假如真的要ES结合Spark的话,建议合理设置分片数。

  • findShardPartitions方法

这个方法没啥疑问了就是一个RDD分区对应于ES index的一个分片。

PartitionDefinition partition = new PartitionDefinition(settings, resolvedMapping, index, shardId,
locationList.toArray(new String[0]));
partitions.add(partition);

总结

以上就是Spark Core读取ES数据的时候分片和RDD分区的对应关系分析,默认情况下是一个es 索引分片对应Spark RDD的一个分区。假如分片数过大,且ES版本在5.x及以上,可以配置参数

es.input.max.docs.per.partition

进行拆分。

2019-06-07 20:10:00 weixin_30319153 阅读数 12
  • 数据Spark实战视频教程

    大数据Spark实战视频培训教程:本课程内容涉及,Spark虚拟机安装、Spark表配置、平台搭建、快学Scala入门、Spark集群通信、任务调度、持久化等实战内容。Spark是UC Berkeley AMP lab (加州大学伯克利分校的AMP实验室)所开源的类Hadoop MapReduce的通用并行框架,Spark,拥有Hadoop MapReduce所具有的优点;但不同于MapReduce的是Job中间输出结果可以保存在内存中,从而不再需要读写HDFS,因此Spark能更好地适用于数据挖掘与机器学习等需要迭代的MapReduce的算法。

    35092 人正在学习 去看看 张长志

一,简介

二,自定义分区规则

  2.1 普通的分组TopN实现

  2.2 自定义分区规则TopN实现

三,RDD的缓存

  3.1 RDD缓存简介

  3.2 RDD缓存方式

 

 

 

 

正文

一,简介

  在之前的文章中,我们知道RDD的有一个特征:就是一组分片(Partition),即数据集的基本组成单位。对于RDD来说,每个分片都会被一个计算任务处理,并决定并行计算的粒度。用户可以在创建RDD时指定RDD的分片个数,如果没有指定,那么就会采用默认值。默认值就是程序所分配到的CPU Core的数目。这个分配的规则我们是可以自己定制的。同时我们一直在讨论Spark快,快的方式有那些方面可以体现,RDD缓存就是其中的一个形式,这里将对这两者进行介绍。

二,自定义分区规则

  分组求TopN的方式有多种,这里进行简单的几种。这里尊卑一些数据:点击下载

  2.1 普通的分组TopN实现

  实现思路一:先对数据进行处理,然后聚合。最后进行分组排序。

package cn.edu360.sparkTwo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SubjectTopNone {

    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/", 2)
        // 对每一行数据进行整理
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        // 聚合,将学科和老师联合当做key
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)
        //分组排序(按学科进行分组)
        //[学科,该学科对应的老师的数据]
        val grouped: RDD[(String, Iterable[((String, String), Int)])] = reduced.groupBy(_._1._1)
        // 这里取出的是每一组的数据
        // 为什么可以调用scala的sortby方法呢?因为一个学科的数据已经在一台机器上的一个scala集合里面了
        // 弊端,调用scala的sortBy当数据量过大时,有内存溢出的缺陷
        val result: RDD[(String, List[((String, String), Int)])] = grouped.mapValues(_.toList.sortBy(_._2).reverse.take(4))
        println(result.collect.toBuffer)
    }
}

  实现思路二:先对数据进行处理,然后聚合,然后对数据进行单学科过滤,最后进行排序,提交

package cn.edu360.sparkTwo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SubjectTopNtwo {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTwo").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn")
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)
        // 获取所有学科
        val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()
        // 对所有的reduce后的数据进行单学科过滤,在进行排序
        for(sb <- subjects){
            val filter: RDD[((String, String), Int)] = reduced.filter(_._1._1 == sb)
            // 这里进行了多次提交
            val result: Array[((String, String), Int)] = filter.sortBy(_._2, false).take(3)
            print(result.toBuffer)
        }
        sc.stop()
    }
}

  2.2 自定义分区规则TopN实现

  实现方式一:先对数据进行处理,然后聚合,而后对按照学科进行分区,然后对每一个分区进行排序

package cn.edu360.sparkTwo

import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable

object SubjectTopNthree {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/")
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        //聚合,将学科和老师联合当做key ---> 这里有一次shuffle
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)
        //计算有多少学科
        val subjects: Array[String] = reduced.map(_._1._1).distinct().collect()
        //partitionBy按照指定的分区规则进行分区
        //调用partitionBy时RDD的Key是(String, String) --->这里也有一次shuffle
        val partioned: RDD[((String, String), Int)] = reduced.partitionBy(new SubPartitioner(subjects))
        //如果一次拿出一个分区(可以操作一个分区中的数据了)
        val sorted: RDD[((String, String), Int)] = partioned.mapPartitions(it => {
            //将迭代器转换成list,然后排序,在转换成迭代器返回
            it.toList.sortBy(_._2).reverse.take(3).iterator
        })
        val result: Array[((String, String), Int)] = sorted.collect()
        print(result.toBuffer)
    }
}

// 自定义分区规则,需要继承Partitioner
class SubPartitioner(subs: Array[String]) extends Partitioner{
    //相当于主构造器(new的时候回执行一次)
    //用于存放规则的一个map
    private val rules = new mutable.HashMap[String, Int]()
    var i = 0
    for(sub <- subs){
        rules.put(sub, i)
        i += 1
    }
    //返回分区的数量(下一个RDD有多少分区)
    override def numPartitions: Int = subs.length
    //根据传入的key计算分区标号
    //key是一个元组(String, String)
    override def getPartition(key: Any): Int = {
        //获取学科名称
        val s: String = key.asInstanceOf[(String, String)]._1
        //根据规则计算分区编号
        rules(s)
    }
}

  实现方式二:上面的过程可以将聚合和分区操作进行合并,减少shuffle的次数

package cn.edu360.sparkTwo

import org.apache.spark.{Partitioner, SparkConf, SparkContext}
import org.apache.spark.rdd.RDD

import scala.collection.mutable

object SubjectTopNfour {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTopNone").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn/")
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        val subjects: Array[String] = sbToTeacherOne.map(_._1._1).distinct().collect()
        // 在这里传入分区规则,即聚合时就分区
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(new SubPartinerTwo(subjects), _+_)
        // 对每个分区进行排序
        val result: RDD[((String, String), Int)] = reduced.mapPartitions(it => {
            it.toList.sortBy(_._2).reverse.take(3).iterator
        })
        print(result.collect().toBuffer)
    }
}

class SubPartinerTwo(subs: Array[String]) extends Partitioner{
    private val rules = new mutable.HashMap[String, Int]()
    var i = 0
    for(sub <- subs){
        rules.put(sub, i)
        i += 1
    }
    override def numPartitions: Int = subs.length
    override def getPartition(key: Any): Int = {
        val subject: String = key.asInstanceOf[(String, String)]._1
        rules(subject)
    }
}

三,RDD的缓存

  3.1 RDD缓存简介

  Spark速度非常快的原因之一,就是在不同操作中可以在内存中持久化或缓存数据集。当持久化某个RDD后,每一个节点都将把计算的分片结果保存在内存中,并在对此RDD或衍生出的RDD进行的其他动作中重用。这使得后续的动作变得更加迅速。RDD相关的持久化和缓存,是Spark最重要的特征之一。可以说,缓存是Spark构建迭代式算法和快速交互式查询的关键。

  3.2 RDD缓存方式

  RDD通过persist方法或cache方法可以将前面的计算结果缓存,但是并不是这两个方法被调用时立即缓存,而是触发后面的action时,该RDD将会被缓存在计算节点的内存中,并供后面重用。

  

  通过查看源码发现cache最终也是调用了persist方法,默认的存储级别都是仅在内存存储一份,Spark的存储级别还有好多种,存储级别在object StorageLevel中定义的。

  

  缓存有可能丢失,或者存储存储于内存的数据由于内存不足而被删除,RDD的缓存容错机制保证了即使缓存丢失也能保证计算的正确执行。通过基于RDD的一系列转换,丢失的数据会被重算,由于RDD的各个Partition是相对独立的,因此只需要计算丢失的部分即可,并不需要重算全部Partition。

  实例:

package cn.edu360.sparkTwo

import org.apache.spark.rdd.RDD
import org.apache.spark.{SparkConf, SparkContext}

object SubjectTopNtwo {
    def main(args: Array[String]): Unit = {
        val conf: SparkConf = new SparkConf().setAppName("SubjectTwo").setMaster("local[4]")
        val sc: SparkContext = new SparkContext(conf)
        val lines: RDD[String] = sc.textFile("hdfs://hd1:9000/sparkLearn")
        val sbToTeacherOne: RDD[((String, String), Int)] = lines.map(line => {
            val words: Array[String] = line.split("/")
            val teacher: String = words(3)
            val subject: String = words(2).split("[.]")(0)
            ((subject, teacher), 1)
        })
        val reduced: RDD[((String, String), Int)] = sbToTeacherOne.reduceByKey(_+_)
        // 这里讲reduced的数据集到缓存中
        val cached: RDD[((String, String), Int)] = cached.cache()
        // 获取所有学科
        val subjects: Array[String] = cached.map(_._1._1).distinct().collect()
        // 对所有的reduce后的数据进行单学科过滤,在进行排序
        for(sb <- subjects){
            // 因为这里的多次提交和过滤,所以添加到缓存就有必要了
            val filter: RDD[((String, String), Int)] = cached.filter(_._1._1 == sb)
            // 这里进行了多次提交
            val result: Array[((String, String), Int)] = filter.sortBy(_._2, false).take(3)
            print(result.toBuffer)
        }
        sc.stop()
    }
}

 

转载于:https://www.cnblogs.com/tashanzhishi/p/10989078.html

没有更多推荐了,返回首页