spark 生成多个rdd_spark rdd生成多个子rdd - CSDN
  • Spark的WordCount到底产生了多少个RDD

    万次阅读 2018-09-05 08:43:23
    Spark的WordCount到底产生了多少个RDD 不少的同学在面试中会被问到:这样的一句标准的sparkcore的wordcount的代码到底能要产生几个RDD呢。相信大家对于一标准的WordCount的代码一定不陌生: sc.textFile("...

    Spark的WordCount到底产生了多少个RDD

    不少的同学在面试中会被问到:这样的一句标准的sparkcore的wordcount的代码到底能要产生几个RDD呢。相信大家对于一个标准的WordCount的代码一定不陌生:

    sc.textFile("hdfs://myha01/wc/input/words.txt")
      .flatMap(_.split(" "))
      .map((_,1))
      .reduceByKey(_+_)
      .saveAsTextFile("hdfs://myha01/wc/output/")

    这局代码:

    1、开始使用了一个textFile用来读取数据的方法

    2、中间使用了三个标准的RDD的操作算子:

    flatMap(_.split(" ")) 负责把由每一行组成的RDD按照空格切开压平成标准的由单词组成的RDD

    map((_,1)) 负责把每个单词word变成(word,1)每个单词出现一次

    reduceByKey(_+_) 负责把按照key相同也就是单词相同的key-value划分成一组,然后每一组做count聚合,最终就得出了输入文件中,每个单词出现了多少次。

    3、最后,使用了一个saveAsTextFile的方法来存储数据

    那到底这句代码中执行过程中,是不是刚好每个算子生成一个RDD呢? 很不幸,不是的。如果需要知晓答案,最好的方式,就是翻阅参与运算的每个算子到底做了什么事情。

     

    接下来是详细分析:

    1、首先看sc.textFile("hdfs://myha01/wc/input/words.txt"):textFile方法在SparkContext类中

    接着看textFile中的hadoopFile方法的实现:

    通过这个代码可以得知,在hadoopFile的内部产生了第一个RDD:HadoopRDD

    接着回到textFile方法:

    发现,其实返回的HadoopRDD又调用了map算子,看map算子的实现:

    map算子的内部实现中,又创建了一个RDD,这就是第二个RDD: MapPartitionsRDD

    那也就是说,textFile算子的最终返回值就是第二个RDD:MapPartitionsRDD

     

    接着看:flatMap(_.split(" "))算子的操作实现:flatMap算子在RDD中

    所以flatMap(_.split(" "))算子操作产生了第三个RDD:MapPartitionsRDD

     

    接着看map((_,1))算子操作:map算子在RDD类中

    map((_,1))算子的具体实现依然是简单的new MapPartitionRDD的方式生成第四个RDD:MapPartitionsRDD

     

    接着看:reduceByKey(_+_)算子的具体实现:reduceByKey在PairRDDFunctions类中

    跳到:

    跳到:

    到这个地方说明:reduceByKey算子的返回值其实是创建了第五个RDD:ShuffledRDD

     

    接着看:saveAsTextFile("hdfs://myha01/wc/output/")算子的具体实现:saveAsTextFile算子在RDD类中

    this.mapPartitions这句代码在调用的时候,在mapPartitions的内部,其实又创建了第六个RDD:MapPartitionRDD

    接着回到:saveAsTextFile方法的实现,其实返现,最后一句话在调用中,也会生成一个RDD

    这就是第七个RDD:MapPartitionRDD

    到底为止,其他的地方,是没有再产生RDD的。

    所以按照刚才的分析得出的最终结论是:

    第一个RDD:HadoopRDD

    第二个RDD:MapPartitionsRDD

    第三个RDD:MapPartitionsRDD

    第四个RDD:MapPartitionsRDD

    第五个RDD:ShuffledRDD

    第六个RDD:MapPartitionRDD

    第七个RDD:MapPartitionRDD

    其实,在执行saveAsTextFile之前,我们可以通过RDD提供的toDebugString看到这些个算子在调用的时候到底产生了多少个RDD:

    望各位仁兄牢记。如果不记得,请翻阅源码。本篇文章是基于最新的Spark-2.3.1的版本

     

     

    展开全文
  • spark个rdd求交集,差集,并集

    万次阅读 2017-11-24 22:58:50
    1.前言spark中两个rdd,经常需要做交集,差集,并集等操作。好比任何一门编程语言中两集合,交并差也是常见的需求。现在我们看看在spark中怎么实现两个rdd的这种操作。 为了方便看到结果,在spark shell中测试...

    1.前言

    spark中两个rdd,经常需要做交集,差集,并集等操作。好比任何一门编程语言中两个集合,交并差也是常见的需求。现在我们看看在spark中怎么实现两个rdd的这种操作。
    为了方便看到结果,在spark shell中测试如下代码。

    先生成两个rdd

    scala> val rdd1 = sc.parallelize(List("a", "b","c"))
    rdd1: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[5] at parallelize at <console>:27
    
    scala> val rdd2 = sc.parallelize(List("e", "d","c"))
    rdd2: org.apache.spark.rdd.RDD[String] = ParallelCollectionRDD[6] at parallelize at <console>:27

    生成了两个rdd,分别为rdd1与rdd2,然后开始测试。

    2.求并集,union操作

    如果是要求并集,用union操作即可。

    scala> rdd1.union(rdd2).collect
    ...
    res5: Array[String] = Array(a, b, c, e, d, c)

    可以看出,union的结果是没有去重的,就是将rdd1与rdd2的元素放一块。
    有的小伙伴说我就要去重,那怎么办。很简单,distinct嘛。

    scala> rdd1.union(rdd2).distinct.collect
    ...
    res6: Array[String] = Array(a, b, c, d, e)

    3.求交集,intersection操作

    求两个交集也是常见的需求。很简单,用intersection操作就可以了。

    scala> rdd1.intersection(rdd2).collect
    ...
    res7: Array[String] = Array(c)
    
    scala> rdd2.intersection(rdd1).collect
    ...
    res9: Array[String] = Array(c)

    4.求差集,subtract

    subtract的方法原型为:

    def subtract(other: RDD[T]): RDD[T]

    该函数类似于intersection,返回在RDD中出现,并且不在otherRDD中出现的元素,不去重。

    scala> rdd1.subtract(rdd2).collect
    ...
    res10: Array[String] = Array(a, b)
    
    scala> rdd2.subtract(rdd1).collect
    ...
    res11: Array[String] = Array(d, e)

    这几个方法都很实用,使用起来也简单快捷,希望能对大家有所帮助。

    展开全文
  • Spark RDD的创建

    2019-03-25 19:05:57
    ——创建RDD 2种方式 读取外部数据集 或 在驱动器程序中对一集合进行并行化 最简单方式把已有的集合传给SparkContext的Parallelize()方法 lines=sc.parallelize(["pandas","apple"]) //python中的parallelize...

    ——创建RDD

    2种方式 读取外部数据集 或 在驱动器程序中对一个集合进行并行化

    最简单方式把已有的集合传给SparkContext的Parallelize()方法

    lines=sc.parallelize(["pandas","apple"])  //python中的parallelize()方法
    
    val lines=sc.parallelize(list("pandas","apple")) //Scala中的parallelize()方法
    
    JavaRDD<String> lines=sc.parallelize(Arrays.aslist("pandas","apple"));//Java中的parallelize()方法

    更常用的方法是从外部存储中读取数据来创建RDD

    lines=sc.textFile("/path/test.txt")//python中的textFile()方法
    
    val lines=sc.textFile("/path/test.txt")//Scala中的textFile()方法
    
    JavaRDD<String> lines=sc.textFile("/path/test.txt");//Java中的textFile()方法
    
    

     ——RDD操作

    转化操作

    RDD的转化操作返回新的RDD 转化的RDD是惰性求值的,只有延迟到行动操作才会优化计算,转化操作一般针对RDD数据集的单个元素的,也就是说,这些转化操作大多每次只会操作RDD的一个元素

    //Python实现filter()转化操作
    inputRDD=sc.textFile("log.txt")
    errorRDD=inputRDD.filter(lambda x:"error" in x)
    
    //Scala实现filter()转化操作
    val inputRDD=sc.textFile("log.txt")
    val errorRDD=inputRDD.filter(lambda x:"error" in x)
    
    //Java实现filter()转化操作
    JavaRDD<String> inputRDD=sc.textFile("log.txt");
    JavaRDD<String> errorRDD=inputRDD.filter(
    new Function<String,boolean>()
    {
      public boolean call(String x)
      {
    
         return x.contain("error");
    
       }
    }
    
    );
    

    filter()操作不会该变已有的inputRDD中的数据,该操作会返回一个全新的RDD,inputRDD在后面的程序还可以继续使用 

    只会类似的求得warningRDD 并调用union()操作合并RDD

     

    行动操作

    对于需要对数据集RDD进行实际的计算,就需要行动操作的支持,它们会把计算结果返回到驱动器程序,或写入外部存储中,由于行动操作需要结果,因而会触发真正的计算链

    //读取unionRDD的前10条数据
    print unionRDD.count()
    for line in unionRDD.take(10)
        print line
    
    //Java中读取前10条
    System.out.println(unionRDD.count());
    for(String line:unionRDD.take(10))
        System.out.println(line);

    在驱动器程序中执行take()获取RDD的少量元素,然后读取打印这些元素

    RDD还有一个collect()函数,可以获取整个RDD的元素,如果RDD数据集已经被筛选为很小一个时,可以考虑使用本地处理,不过不建议使用,即collect()不能用在大规模的数据集上

    而一般对RDD的之后处理是把数据写入HDFS等分布式存储系统中,可以使用saveAsTextFile()  saveAsSequenceFile()把数据按照一定格式进行存储 

    由于调用一个新的行动操作会触发计算链,因此为了避免低效的行为,应当适当的把中间结果持久化

     

     

    展开全文
  • 1)由一已经存在的Scala集合创建。 val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8)) 2)由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFS、Cassandra、HBase等 val ...

    1)由一个已经存在的Scala集合创建。

    val rdd1 = sc.parallelize(Array(1,2,3,4,5,6,7,8))
    

    2)由外部存储系统的数据集创建,包括本地的文件系统,还有所有Hadoop支持的数据集,比如HDFSCassandraHBase

    val rdd2 = sc.textFile("hdfs://node1.itcast.cn:9000/words.txt")
    
    展开全文
  • Spark程序中,RDD是由SparkContext上下文生成的,一个数据源只能生成一个RDD对象(流处理场景中,指定多个消息源可以生成多个RDD,存在DStream中)。 RDD(Resilient Distributed Dataset)是Spark中最基本的数据...
  • [Spark]Spark RDD 指南四 RDD操作

    千次阅读 2018-03-13 16:39:28
    Spark2.3.0版本: Spark2.3.0 RDD操作RDD支持两种类型的操作:转移(transformations):从现有数据集创建一新数据集 动作(actions):在数据集上进行计算后将值返回给驱动程序例如,map是一转移操作,传递给每数据...
  • Spark之深入理解RDD结构

    万次阅读 多人点赞 2018-01-08 17:19:19
    RDD(Resilient Distributed Datasets,弹性分布式数据集),是Spark最为核心的概念,自然也是理解Apache Spark 工作原理的最佳入口之一。 RDD的特点: 1. 是一分区的只读记录的集合; 2. 一具有容错机制的特殊...
  • Spark 键值对RDD操作

    2019-02-14 15:43:41
    键值对RDDSpark操作中最常用的RDD,它是很程序的构成要素,因为他们提供了并行操作各个键或跨界点重新进行数据分组的操作接口。 创建 Spark中有许多中创建键值对RDD的方式,其中包括 1文件读取时直接返回键值...
  • 问题描述:我们知道在spark当中是对RDD进行操作的。所以我们想把数据源当中的数据转化成很的数据集,这也就是partition的由来。 而我们在将数据转换成RDD之后。我们可以通过设置partition的数量来让计算的效率更...
  • Spark计算模型RDD

    千次阅读 2018-03-05 22:31:44
    Spark计算模型RDD1. RDD概述1.1 什么是RDDRDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是...RDD允许用户在执行多个查询时显式地将数据缓存在内存中,后续的查询能够重用这些数据,这极大地提升了查询
  • SparkRDD 的详细介绍

    千次阅读 2018-09-27 09:14:46
    为了解决开发人员能在大规模的集群中以一种容错的方式进行内存计算,提出了 RDD 的概念,而当前的很框架对迭代式算法场景与交互性数据挖掘场景的处理性能非常差, 这是RDDs 的提出的动机。 什么是 RD...
  • spark(二) rdd具体介绍

    2019-06-09 23:42:17
    spark(二) rdd具体介绍 看完这篇 你可以学到一下内容 1:掌握RDD的原理 ...RDD(Resilient Distributed Dataset)叫做弹性分布式数据集,是Spark中最基本的数据抽象,它代表一不可变、可分区、...
  • Spark基础 -- Spark Shell -- RDD -- 算子

    千次阅读 2018-11-20 09:26:57
    文章目录Spark基础 -- Spark Shell -- RDD -- 算子一、简介二、Spark 1.6.3部署准备工作解压安装配置spark,master高可用配置环境变量分发配置好的Spark到其他节点三、Spark集群启动和测试启动测试四、Spark Shell...
  • SparkRDD的理解

    千次阅读 2018-07-10 08:17:33
    1.什么是RDDRDDRDDSpark的计算模型 RDD(Resilient Distributed Dataset)叫做弹性的分布式数据集合,是Spark... RDDSpark中的一基本抽象(可以理解为代理)有了RDD,就可以像操作本地的集合一样,有很...
  • SPARK RDD JAVA API 用法指南

    千次阅读 2018-08-14 19:19:47
    1.RDD介绍:  RDD,弹性分布式数据集,即分布式的元素集合。在spark中,对所有数据的操作不外乎是创建RDD、转化已有的RDD以及...每个RDD都被分为多个分区,这些分区运行在集群中的不同节点上。RDD可以包含Pyt...
  • Java接入Spark之创建RDD的两种方式和操作RDD
  • 键值对RDDSpark中许多操作的常见数据类型,键值对RDD通常用来进行聚合计算,一般先通过ETL 抽取,转化,装载操作来将数据转化为键值对形式,这类RDD称为 pair RDD ,提供了并行操作各个键或跨节点重新进行数据分组...
  • SparkRDD操作

    千次阅读 2016-10-09 20:38:30
    根据传入的函数处理原有的RDD对象中每一元素,每一新元素处理完成后返回一对象,这些新对象组装得到一新的RDD,新的RDD和旧的RDD元素都是一一对应的 filter(func) 根据传入的函数来过滤RDD中每一元素,...
  • Spark学习--RDD编码

    2020-07-30 23:32:10
    个RDD都倍分为多个分区,这些分区运行在集群中的不同节点。RDD可以包含Python、Java、Scala中任意类型的对象,甚至可以包含用户自定义对象,本文主要通过Java实现相关示例。 Spark程序或shell会话工作流程 1. 从...
  • Spark提供了三种主要的与数据相关的API:RDD、DataFrame、Dataset RDD (Spark1.0) —>...RDDSpark提供的最主要的一抽象概念(Resilient Distributed Dataset),它是一element的collec...
1 2 3 4 5 ... 20
收藏数 15,020
精华内容 6,008
关键字:

spark 生成多个rdd