kv求交集 spark_spark算子 转化为kv值 - CSDN
  • reduceByKey、foldByKey、aggregateByKey、combineByKey之前的区别和联系 源码主要逻辑函数比较 算子 源码 ...combineByKetWithClassTag[V] ((v:V)=>...combineByKetWithClassTag[V] ((v:...

    reduceByKey、foldByKey、aggregateByKey、combineByKey之间的区别和联系

    源码主要逻辑函数比较

    算子 源码
    reduceByKey( ) combineByKeyWithClassTag[V] ((v:V)=>v,func,func)
    foldByKey( )( ) combineByKeyWithClassTag[V] ((v:V)=>cleanedFunc(createZero(),v),cleanedFunc,cleanedFunc)
    aggregateByKey( )( , ) combineByKeyWithClassTag[V] ((v:V)=>cleanedSeqOp(createZero(),v),cleanedSeqOp,combOp)
    combineByKey( , , ) combineByKeyWithClassTag(createCombiner,mergeValue,mergeCombiners)

    灵活性:
    reduceByKey < foldByKey < aggregateByKey < combineByKey

    都有一个默认参数:mapSideCombine=true,即都在map端进行的combine操作,进行了提前的预聚合。

    展开全文
  • 1. 交集 intersecion 1.1 源码 /** * Return the intersection of this RDD and another one. The output will not contain any duplicate * elements, even if the input RDDs did.//交集结果将会去重 * * @...

    1. 交集 intersecion

    1.1 源码

    /**
       * Return the intersection of this RDD and another one. The output will not contain any duplicate
       * elements, even if the input RDDs did.//交集结果将会去重
       * 
       * @note This method performs a shuffle internally.//属于shuffle类算子
       */
       //参与计算的两个RDD的元素泛型必须一致,也是返回的RDD的元素泛型
      def intersection(other: RDD[T]): RDD[T] = withScope {
        this.map(v => (v, null)).cogroup(other.map(v => (v, null)))
            .filter { case (_, (leftGroup, rightGroup)) => leftGroup.nonEmpty && rightGroup.nonEmpty }
            .keys
      }
    

    源码分析:

    1. thisRDD.intersection(otherRDD):计算 thisRDD 和 otherRDD 的交集,交集结果将不会包含重复的元素,即使有的元素在两个 RDD 中都出现多次;
    2. intersection 属于 shuffleDependency 类算子;
    3. 其内部调用了cogroup算子;
    4. Note:凡是涉及两个RDD的计算,并且计算是以相同 key分组的数据为对象进行的,那么一定会调用 cogroup(otherDataSet,[numTasks]) 算子。

    1.2 代码实例:

       val list1 = List(1,2,3,4,5,6,7,7,20)
       val list2 = List(4,5,6,7,8,9,10)
       val rdd1: RDD[Int] = sc.parallelize(list1 , 3) //3为分区数,默认分区数为2
       val rdd2: RDD[Int] = sc.parallelize(list2)
       //交集:rdd1交rdd2
       rdd1.intersection(rdd2).foreach(println)
    

    运行结果如下:

    6
    4
    7
    5
    

    2. 差集 subtract

    2.1 源码

       /**//默认保持thisRDD的分区器 和 分区数量
       * Return an RDD with the elements from `this` that are not in `other`.
       * 
       * Uses `this` partitioner/partition size, because even if `other` is huge, the resulting
       * RDD will be &lt;= us.
       */
      def subtract(other: RDD[T]): RDD[T] = withScope {
        subtract(other, partitioner.getOrElse(new HashPartitioner(partitions.length)))
      }
    
      /**//可以传入参数,控制新生成RDD的分区数量(仍保持thisRDD分区规则)
       * Return an RDD with the elements from `this` that are not in `other`.
       */
      def subtract(other: RDD[T], numPartitions: Int): RDD[T] = withScope {
        subtract(other, new HashPartitioner(numPartitions))
      }
    
      /**//可以传入参数,控制使用自定义的分区器
       * Return an RDD with the elements from `this` that are not in `other`.
       */
      def subtract(
          other: RDD[T],
          p: Partitioner)(implicit ord: Ordering[T] = null): RDD[T] = withScope {
        if (partitioner == Some(p)) {
          // Our partitioner knows how to handle T (which, since we have a partitioner, is
          // really (K, V)) so make a new Partitioner that will de-tuple our fake tuples
          val p2 = new Partitioner() {
            override def numPartitions: Int = p.numPartitions
            override def getPartition(k: Any): Int = p.getPartition(k.asInstanceOf[(Any, _)]._1)
          }
          // Unfortunately, since we're making a new p2, we'll get ShuffleDependencies
          // anyway, and when calling .keys, will not have a partitioner set, even though
          // the SubtractedRDD will, thanks to p2's de-tupled partitioning, already be
          // partitioned by the right/real keys (e.g. p).
          this.map(x => (x, null)).subtractByKey(other.map((_, null)), p2).keys
        } else {
          this.map(x => (x, null)).subtractByKey(other.map((_, null)), p).keys
        }
      }
    
        
    

    2.2 代码实例
    2.2.1 参与运算的RDD的泛型必须完全一致(统一类型)

        //准备数据集
        val list1 = List(1,2,3,4,5,6,7,7,20)
        val list2 = List(4,5,6,7,8,9,10)
        val array = Array("hello huangbo","hello xuzheng","hello huangxiaoming")
        val kv = Array(("a",1), ("b",2),("c",3),("a",1),("b",1),("c",1))
        val rdd1: RDD[Int] = sc.parallelize(list1 , 3) 
        val rdd2: RDD[Int] = sc.parallelize(list2)
        val rdd3: RDD[String] = sc.makeRDD(array)
        //k-v型的PairRDD
        val rdd4:RDD[(String,Int)] = sc.makeRDD(kv)  //会自动将元组的第一个元素作为key
        
        /** 开始计算差集
          * subtract():差集,参与运算的RDD必须具有相同泛型(元素类型一致);
          *     1、当为单值元素时,直接求差集
          *     2、当为(K,V)时,仍然按照整个元素进行求差集(而不是按照key);
          */
        val subtractRes: RDD[Int] = rdd1.subtract(rdd2)
        subtractRes.foreach(x => print(x + "\t"));println() //差集: 3	1	2	20
        //rdd3.subtract(rdd4)  //错误,参与运算的RDD必须泛型相同
    

    2.2.2 当RDD的元素为元组时,元组内部的构成元素也必须一致:

        //错误:泛型不统一,无法进行差集计算(上虽然都是元组,但是元组的泛型不一致)
        val list01 = Array(("a",1), ("b",2), ("c",3))
        val rdd01: RDD[(String, Int)] = sc.parallelize(list01)
        val list02 = Array(("a","lily"),("b","lucy"),("c","rose"),("c",3))
        val rdd02: RDD[(String, Any)] = sc.makeRDD(list02)
        //rdd01.subtract(rdd02).foreach(print) //错误,元组的泛型不一致
    

    但是可以使用多态,向上进行类型抽象,将类型统一:

        //正确:泛型统一了,结果为:(a,1)(b,2)
        //手动指定泛型Any,以统一类型
        val list03: Array[(String, Any)] = Array(("a",1), ("b",2), ("c",3))
        val rdd03 = sc.parallelize(list03)
        val list04: Array[(String, Any)] = Array(("a","lily"),("b","lucy"),("c","rose"),("c",3))
        val rdd04 = sc.makeRDD(list04)
        rdd03.subtract(rdd04).foreach(print)
    

    3. 按照key取差集 subtractByKey

    thisPairRDD.subtractByKey(otherPairRDD):以key值作为元素的唯一性标志,记性差集运算,与value的类型和值无关。

    注意:参与运算的必须是PairRDD。

    代码实例

        /**
          * subtractByKey(otherRDD):只针对于key做差集,返回主RDD中存在的KEY,而otherRDD中不存在的KEY的元素;
          *           ----针对于PairRDD
          */
        val rdd10 = sc.makeRDD(Array(("a",1), ("b",2), ("c",3), ("a",5), ("d",5)))
        val rdd11 = sc.makeRDD(Array(("a",1), ("b",2), ("c",3)))
        //结果为 (d,5): 因为只有key="d" 在rdd11中没有出现
        rdd10.subtractByKey(rdd11).foreach(print)
    

    4. 并集

    4.1 拼接算子 union

    /** 交集、并集、差集
       * union(): 直接拼接,并不会去重(并不是数学意义上的并集)
       * count():统计 RDD的元素个数!
       */
    /*
        rdd1 = {1,2,3,4,5,6,7,7,20}
        rdd2 = {4,5,6,7,8,9,10}
     */
        println(rdd1.union(rdd2).count())//16个元素
    

    4.2 求交集(先union,再distinct)

        //并集:先union拼接,再distinct去重
        rdd1.union(rdd2).distinct().foreach(println)
    
    展开全文
  • 本文始发于个人公众号:TechFlow,原创不易,个关注 今天是spark专题的第四篇文章,我们一起来看下Pair RDD。 定义小说网 m.198200.com 在之前的文章当中,我们已经熟悉了RDD的相关概念,也了解了RDD基本的转化...
        

    本文始发于个人公众号:TechFlow,原创不易,求个关注


    今天是spark专题的第四篇文章,我们一起来看下Pair RDD。

    定义

    小说网 m.198200.com

    在之前的文章当中,我们已经熟悉了RDD的相关概念,也了解了RDD基本的转化操作和行动操作。今天我们来看一下RDD当中非常常见的PairRDD,也叫做键值对RDD,可以理解成KVRDD。

    KV很好理解,就是key和value的组合,比如Python当中的dict或者是C++以及Java当中的map中的基本元素都是键值对。相比于之前基本的RDD,pariRDD可以支持更多的操作,相对来说更加灵活,可以完成更加复杂的功能。比如我们可以根据key进行聚合,或者是计算交集等。

    所以本身pairRDD只不过是数据类型是KV结构的RDD而已,并没有太多的内涵,大家不需要担心。

    Pair RDD转化操作

    Pair RDD也是RDD,所以之前介绍的RDD的转化操作Pair RDD自然也可以使用。它们两者有些像是类继承的关系,RDD是父类,Pair RDD是实现了一些新特性的子类。子类可以调用父类当中所有的方法,但是父类却不能调用子类中的方法。

    调用的时候需要注意,由于我们的Pair RDD中的数据格式是KV的二元组,所以我们传入的函数必须是针对二元组数据的,不然的话可能运算的结果会有问题。下面我们来列举一些最常用的转化操作。

    为了方便演示,我们用一个固定的RDD来运行各种转化操作,来直观了解一下这些转化操作究竟起什么样的作用。

    ex1 = sc.parallelize([[12], [34], [35]])

    keys,values和sortByKey

    这三个转化操作应该是最常用也是最简单的,简单到我们通过字面意思就可以猜出它们的意思。

    我们先来看keys和values:

    我们的RDD当中二元组当中的第一个元素会被当做key,第二个元素当做value,需要注意的是,它并不是一个map或者是dict,所以key和value都是可以重复的

    sortByKey也很直观,我们从字面意思就看得出来是对RDD当中的数据根据key值进行排序,同样,我们也来看下结果:

    mapValues和flatMapValues

    mapValues不能直接使用,而必须要传入一个函数作为参数。它的意思是对所有的value执行这个函数,比如我们想把所有的value全部转变成字符串,我们可以这么操作:

    flatMapValues的操作和我们的认知有些相反,我们都知道flatMap操作是可以将一个嵌套的数组打散,但是我们怎么对一个value打散嵌套呢?毕竟我们的value不一定就是一个数组,这就要说到我们传入的函数了,这个flatMap的操作其实是针对函数返回的结果的,也就是说函数会返回一个迭代器,然后打散的内容其实是这个迭代器当中的值。

    我这么表述可能有些枯燥,我们来看一个例子就明白了:

    不知道这个结果有没有出乎大家的意料,它的整个流程是这样的,我们调用flatMapValues运算之后返回一个迭代器,迭代器的内容是range(x, x+3)。其实是每一个key对应一个这样的迭代器,之后再将迭代器当中的内容打散,和key构成新的pair。

    groupByKey,reduceByKey和foldByKey

    这两个功能也比较接近,我们先说第一个,如果学过SQL的同学对于group by操作的含义应该非常熟悉。如果没有了解过也没有关系,group by可以简单理解成归并或者是分桶。也就是说将key值相同的value归并到一起,得到的结果是key-list的Pair RDD,也就是我们把key值相同的value放在了一个list当中。

    我们也来看下例子:

    我们调用完groupby之后得到的结果是一个对象,所以需要调用一下mapValues将它转成list才可以使用,否则的话是不能使用collect获取的。

    reduceByKey和groupByKey类似,只不过groupByKey只是归并到一起,然而reduceByKey是传入reduce函数,执行reduce之后的结果。我们来看一个例子:

    在这个例子当中我们执行了累加,把key值相同的value加在了一起。

    foldByKey和fold的用法差别并不大,唯一不同的是我们加上了根据key值聚合的逻辑。如果我们把分区的初始值设置成0的话,那么它用起来和reduceByKey几乎没有区别:

    我们只需要清楚foldByKey当中的初始值针对的是分区即可。

    combineByKey

    这个也是一个很核心并且不太容易理解的转化操作,我们先来看它的参数,它一共接受5个参数。我们一个一个来说,首先是第一个参数,是createCombiner

    它的作用是初始化,将value根据我们的需要做初始化,比如将string类型的转化成int,或者是其他的操作。我们用记号可以写成是V => C,这里的V就是value,C是我们初始化之后的新值。

    它会和value一起被当成新的pair传入第二个函数,所以第二个函数的接受参数是(C, V)的二元组。我们要做的是定义这个二元组的合并,所以第二个函数可以写成(C, V) => C。源码里的注释和网上的教程都是这么写的,但我觉得由于出现了两个C,可能会让人难以理解,我觉得可以写成(C, V) => D,比较好。

    最后一个函数是将D进行合并,所以它可以写成是(D, D) => D。

    到这里我们看似好像明白了它的原理,但是又好像有很多问号,总觉得哪里有些不太对劲。我想了很久,才找到了问题的根源,出在哪里呢,在于合并。有没有发现第二个函数和第三个函数都是用来合并的,为什么我们要合并两次,它们之间的区别是什么?如果这个问题没搞明白,那么对于它的使用一定是错误的,我个人觉得这个问题才是这个转化操作的核心,没讲清楚这个问题的博客都是不够清楚的。

    其实这两次合并的逻辑大同小异,但是合并的范围不一样,第一次合并是针对分区的,第二次合并是针对key的。因为在spark当中数据可能不止存放在一个分区内,所以我们要合并两次,第一次先将分区内部的数据整合在一起,第二次再跨分区合并。由于不同分区的数据可能相隔很远,所以会导致网络传输的时间过长,所以我们希望传输的数据尽量小,这才有了groupby两次的原因。

    我们再来看一个例子:

    在这个例子当中我们计算了每个单词出现的平均个数,我们一点一点来看。首先,我们第一个函数将value转化成了(1, value)的元组,元组的第0号元素表示出现该单词的文档数,第1号元素表示文档内出现的次数。所以第二个函数,也就是在分组内聚合的函数,我们对于出现的文档数只需要加一即可,对于出现的次数要进行累加。因为这一次聚合的对象都是(1, value)类型的元素,也就是没有聚合之前的结果。

    在第三个函数当中,我们对于出现的总数也进行累加,是因为这一个函数处理的结果是各个分区已经聚合一次的结果了。比如apple在一个分区内出现在了两个文档内,一共出现了20次,在一个分区出现在了三个文档中,一共出现了30次,那么显然我们一共出现在了5个文档中,一共出现了50次。

    由于我们要计算平均,所以我们要用出现的总次数除以出现的文档数。最后经过map之后由于我们得到的还是一个二元组,我们不能直接collect,需要用collectAsMap。

    我们把上面这个例子用图来展示,会很容易理解:

    连接操作

    在spark当中,除了基础的转化操作之外,spark还提供了额外的连接操作给pair RDD。通过连接,我们可以很方便地像是操作集合一样操作RDD。操作的方法也非常简单,和SQL当中操作数据表的形式很像,就是join操作。join操作又可以分为join(inner join)、left join和right join。

    如果你熟悉SQL的话,想必这三者的区别应该非常清楚,它和SQL当中的join是一样的。如果不熟悉也没有关系,解释起来并不复杂。在join的时候我们往往是用一张表去join另外一张表,就好像两个数相减,我们用一个数减去另外一个数一样。比如A.join(B),我们把A叫做左表,B叫做右表。所谓的join,就是把两张表当中某一个字段或者是某些字段值相同的行连接在一起。

    比如一张表是学生表,一张表是出勤表。我们两张表用学生的id一关联,就得到了学生的出勤记录。但是既然是集合关联,就会出现数据关联不上的情况。比如某个学生没有出勤,或者是出勤表里记错了学生id。对于数据关联不上的情况,我们的处理方式有四种。第一种是全都丢弃,关联不上的数据就不要了。第二种是全部保留,关联不上的字段就记为NULL。第三种是左表关联不上的保留,右表丢弃。第四种是右表保留,左表丢弃。

    下图展示了这四种join,非常形象。

    我们看几个实际的例子来体会一下。

    首先创建数据集:

    ex1 = sc.parallelize([['frank'30], ['bob'9], ['silly'3]])
    ex2 = sc.parallelize([['frank'80], ['bob'12], ['marry'22], ['frank'21], ['bob'22]])

    接着,我们分别运行这四种join,观察一下join之后的结果。

    从结果当中我们可以看到,如果两个数据集当中都存在多条key值相同的数据,spark会将它们两两相乘匹配在一起。

    行动操作

    最后,我们看下pair RDD的行动操作。pair RDD同样是rdd,所以普通rdd适用的行动操作,同样适用于pair rdd。但是除此之外,spark还为它开发了独有的行动操作。

    countByKey

    countByKey这个操作顾名思义就是根据Key值计算每个Key值出现的条数,它等价于count groupby的SQL语句。我们来看个具体的例子:

    collectAsMap

    这个也很好理解,其实就是讲最后的结果以map的形式输出

    从返回的结果可以看到,输出的是一个dict类型。也就是Python当中的"map"。

    lookup

    这个单词看起来比较少见,其实它代表的是根据key值查找对应的value的意思。也就是常用的get函数,我们传入一个key值,会自动返回key值对应的所有的value。如果有多个value,则会返回list。

    总结

    到这里,所有的pair RDD相关的操作就算是介绍完了。pair rdd在我们日常的使用当中出现的频率非常高,利用它可以非常方便地实现一些比较复杂的操作。

    另外,今天的这篇文章内容不少,想要完全吃透,需要一点功夫。这不是看一篇文章就可以实现的,但是也没有关系,我们初学的时候只需要对这些api和使用方法有一个大概的印象即可,具体的使用细节可以等用到的时候再去查阅相关的资料。

    今天的文章就是这些,如果觉得有所收获,请顺手点个关注或者转发吧,你们的举手之劳对我来说很重要。

    展开全文
  • Spark RDD 学习 导入pyspark import pyspark 初始化SparkContext sc = pyspark.SparkContext(master="local[*]",appName="test1") RDD Transform算子 将一个列表构建成一个rdd rdd1 = sc.parallelize([1,2,3,4,5])...

    Spark RDD 学习

    导入pyspark

    import pyspark
    

    初始化SparkContext

    sc = pyspark.SparkContext(master="local[*]",appName="test1")
    

    RDD Transform算子

    将一个列表构建成一个rdd

    rdd1 = sc.parallelize([1,2,3,4,5])
    

    收集并显示rdd中的数据

    rdd1.collect()
    
    [1, 2, 3, 4, 5]
    

    map方法

    具名函数方式

    def adder(x):
        """
            使得传入的数加一
        """
        return x+1
    
    rdd1.map(adder).collect()
    
    [2, 3, 4, 5, 6]
    

    匿名函数方式

    rdd1.map(lambda x:x+1).collect()
    
    [2, 3, 4, 5, 6]
    

    filter用法

    print("rdd1 rdd1中小于2的数:", rdd1.filter(lambda x:x<2).collect())
    print("rdd1 中的偶数为:", rdd1.filter(lambda x:x%2==0).collect())
    
    rdd1 rdd1中小于2的数: [1]
    rdd1 中的偶数为: [2, 4]
    

    distinct用法

    rdd1.distinct().collect()
    
    [1, 2, 3, 4, 5]
    

    randomSplit用法

    rdd_r1,rdd_r2 = rdd1.randomSplit([0.5,0.5])
    print("随机划分集合1:",rdd_r1.collect())
    print("随机划分集合2:",rdd_r2.collect())
    
    随机划分集合1: [2, 4, 5]
    随机划分集合2: [1, 3]
    

    groupBy用法

    group_rdd1 = rdd1.groupBy(lambda x:"偶数:" if x%2==0 else "奇数:").collect()
    print("group_rdd1 type:", type(group_rdd1))
    print("group_rdd1 content type:",type(group_rdd1[0]))
    print("group name:",group_rdd1[0][0],"group member:",list(group_rdd1[0][1]))
    print("group name:",group_rdd1[1][0],"group member:",list(group_rdd1[1][1]))
    
    group_rdd1 type: <class 'list'>
    group_rdd1 content type: <class 'tuple'>
    group name: 奇数: group member: [1, 3, 5]
    group name: 偶数: group member: [2, 4]
    

    多个RDD的用法

    intrdd1 = sc.parallelize((1,2,3,4))
    intrdd2 = sc.parallelize((3,3,4))
    intrdd3 = sc.parallelize((5,1))
    

    并集计算

    print(intrdd1.union(intrdd2).collect())
    print(intrdd2.union(intrdd3).collect())
    print(intrdd1.union(intrdd2).union(intrdd3).collect())
    
    [1, 2, 3, 4, 3, 3, 4]
    [3, 3, 4, 5, 1]
    [1, 2, 3, 4, 3, 3, 4, 5, 1]
    

    交集计算

    print(intrdd1.intersection(intrdd2).collect())
    print(intrdd1.intersection(intrdd3).collect())
    
    [4, 3]
    [1]
    

    差集计算

    print(intrdd1.subtract(intrdd2).collect())
    print(intrdd2.subtract(intrdd1).collect())
    
    [2, 1]
    []
    

    笛卡尔积运算

    print(intrdd1.cartesian(intrdd2).collect())
    print(intrdd1.cartesian(intrdd2).cartesian(intrdd3).collect())
    
    [(1, 3), (1, 3), (1, 4), (2, 3), (2, 3), (2, 4), (3, 3), (3, 3), (3, 4), (4, 3), (4, 3), (4, 4)]
    [((1, 3), 5), ((1, 3), 1), ((1, 3), 5), ((1, 3), 1), ((1, 4), 5), ((1, 4), 1), ((2, 3), 5), ((2, 3), 1), ((2, 3), 5), ((2, 3), 1), ((2, 4), 5), ((2, 4), 1), ((3, 3), 5), ((3, 3), 1), ((3, 3), 5), ((3, 3), 1), ((3, 4), 5), ((3, 4), 1), ((4, 3), 5), ((4, 3), 1), ((4, 3), 5), ((4, 3), 1), ((4, 4), 5), ((4, 4), 1)]
    

    RDD Action 算子

    first用法

    print(rdd1.first())
    
    1
    

    take用法

    print(rdd1.take(4))
    print(rdd1.take(2))
    
    [1, 2, 3, 4]
    [1, 2]
    

    takeOrdered 用法

    print(rdd1.takeOrdered(3))
    print(rdd1.takeOrdered(num=2,key=lambda x:-x))
    
    [1, 2, 3]
    [5, 4]
    

    (全统计) stats 用法

    print(rdd1.stats())
    # 总数,均值,标准差,最大值,最小值
    
    (count: 5, mean: 3.0, stdev: 1.4142135623730951, max: 5.0, min: 1.0)
    

    (总数) count 用法

    print(rdd1.count())
    
    5
    

    (均值) mean 用法

    print(rdd1.mean())
    
    3.0
    

    (标准差) stdev 用法

    print(rdd1.stdev())
    
    1.4142135623730951
    

    (最大值) max 用法

    print(rdd1.max())
    
    5
    

    (最小值) min 用法

    print(rdd1.min())
    
    1
    

    (求和) sum 用法

    print(rdd1.sum())
    
    15
    

    key-value RDD Transform 算子

    创建key-value RDD

    kvRdd1 = sc.parallelize([("one",1),("two",2),("three",3),("four",4),("five",5)])
    kvRdd1.collect()
    
    [('one', 1), ('two', 2), ('three', 3), ('four', 4), ('five', 5)]
    

    keys 用法

    print(kvRdd1.keys())
    print(kvRdd1.keys().collect())
    
    ['one', 'two', 'three', 'four', 'five']
    

    values 用法

    print(kvRdd1.values())
    print(kvRdd1.values().collect())
    
    PythonRDD[301] at RDD at PythonRDD.scala:53
    [1, 2, 3, 4, 5]
    

    filter 用法

    # 筛选key
    print(kvRdd1.filter(lambda kv:kv[0] == "two").collect())
    # 筛选value
    print(kvRdd1.filter(lambda kv:kv[1] <= 3).collect())
    
    [('two', 2)]
    [('one', 1), ('two', 2), ('three', 3)]
    

    mapValues 用法

    # 每个元素加1
    print(kvRdd1.mapValues(lambda x:x+1).collect())
    # 每个元素取二次幂
    print(kvRdd1.mapValues(lambda x:x**2).collect())
    
    [('one', 2), ('two', 3), ('three', 4), ('four', 5), ('five', 6)]
    [('one', 1), ('two', 4), ('three', 9), ('four', 16), ('five', 25)]
    

    sortByKey 用法

    # 键如果是字符串,是按照字典序来进行排列
    # 正序: 小 -> 大
    print(kvRdd1.sortByKey(ascending=True).collect())
    # 逆序: 大 -> 小
    print(kvRdd1.sortByKey(ascending=False).collect())
    
    [('five', 5), ('four', 4), ('one', 1), ('three', 3), ('two', 2)]
    [('two', 2), ('three', 3), ('one', 1), ('four', 4), ('five', 5)]
    

    sortBy 用法

    # 对其键进行排序
    print(kvRdd1.sortBy(lambda kv:kv[0]).collect())
    # 对其值进行排序 并且是逆序
    print(kvRdd1.sortBy(lambda kv:kv[1],False).collect())
    
    [('five', 5), ('four', 4), ('one', 1), ('three', 3), ('two', 2)]
    [('five', 5), ('four', 4), ('three', 3), ('two', 2), ('one', 1)]
    

    reduceByKey 用法

    print(kvRdd1.union(kvRdd1).collect())
    # 把相同的键聚合起来并相加
    print(kvRdd1.union(kvRdd1).reduceByKey(lambda v1,v2:v1 + v2).collect())
    
    [('one', 1), ('two', 2), ('three', 3), ('four', 4), ('five', 5), ('one', 1), ('two', 2), ('three', 3), ('four', 4), ('five', 5)]
    [('two', 4), ('three', 6), ('four', 8), ('one', 2), ('five', 10)]
    

    reduce 用法

    # 将数据集中所有的值求和
    kvRdd1.reduce(lambda kv1,kv2:("sum",kv1[1]+kv2[1]))
    
    ('sum', 15)
    

    多个kev-value RDD的运算

    kv_student1_rdd = sc.parallelize([("name","nick"),("age",18,),("gender","F")])
    kv_student2_rdd = sc.parallelize([("name","nick"),("age",18,)])
    kv_student3_rdd = sc.parallelize([("name","nick"),("gender","F")])
    

    join用法

    # 当其中一个rdd中没有该键时,双方会进行抛弃此键
    kv_student1_rdd.join(kv_student2_rdd).collect()
    
    [('name', ('nick', 'nick')), ('age', (18, 18))]
    

    leftOuterJoin 用法

    # 以左边的rdd 为主,右边没有左边的键时会以None代替
    print(kv_student1_rdd.leftOuterJoin(kv_student3_rdd).collect())
    # 当左边没有右边的键时 会抛弃此键
    print(kv_student3_rdd.leftOuterJoin(kv_student1_rdd).collect())
    
    [('name', ('nick', 'nick')), ('gender', ('F', 'F')), ('age', (18, None))]
    [('name', ('nick', 'nick')), ('gender', ('F', 'F'))]
    

    rightOuterJoin 用法

    # 以右边的rdd 为主,左边没有右边的键时会以None代替
    print(kv_student2_rdd.rightOuterJoin(kv_student1_rdd).collect())
    # 当右边没有左边的键时 会抛弃此键
    print(kv_student1_rdd.rightOuterJoin(kv_student2_rdd).collect())
    
    [('name', ('nick', 'nick')), ('gender', (None, 'F')), ('age', (18, 18))]
    [('name', ('nick', 'nick')), ('age', (18, 18))]
    

    subtractByKey 用法

    # 删除两个数据集中相同键 的键值对
    print(kv_student1_rdd.subtractByKey(kv_student2_rdd).collect())
    
    [('gender', 'F')]
    

    key-value RDD Action算子

    first 用法

    print(kvRdd1.first())
    
    ('one', 1)
    

    taken 用法

    # 取头部 n 个
    print(kvRdd1.take(1))
    print(kvRdd1.take(2))
    
    [('one', 1)]
    [('one', 1), ('two', 2)]
    

    统计kev-value RDD 中每个key的个数

    print(sc.parallelize([("int",1),("int",3),("int",4),("float",2.0),("float",3.0)]).countByKey())
    
    defaultdict(<class 'int'>, {'int': 3, 'float': 2})
    

    collectAsMap 用法

    # 转换成python中的字典
    print(kvRdd1.collectAsMap())
    
    {'one': 1, 'two': 2, 'three': 3, 'four': 4, 'five': 5}
    

    lookup 用法

    # 根据传入键找值
    print(kvRdd1.lookup("one"))
    print(kvRdd1.lookup("four"))
    print(kvRdd1.lookup("five"))
    
    [1]
    [4]
    [5]
    

    释放SparkContext

    sc.stop()
    
    展开全文
  • spark学习

    2018-06-24 20:59:40
     一旦有了上下文,就可以创建RDD,子集群上创建累加器和广播变量每个jvm只能激活一个sparkcontext,创建新的时候必须停止前一个sparkcontext需要传入sparkconf ,用来设置spark参数,参数是kv对RDD:是不可变的,...
  • import org.apache.spark.Partitioner; import org.apache.spark.SparkConf; import org.apache.spark.api.java.JavaPairRDD; import org.apache.spark.api.java.JavaRDD; import org.apache....
  • spark(2)

    2018-06-21 15:19:14
    1. spark 任务运行的资源DIY 默认启动任务时, executor 占用worker 中所有的内核,每一个 executor 占用 1g内存。默认情况下,一个工作人员启动一个执行者 1.1。 火花提交任务提交时的常用选项:火花提交选项 ...
  • Spark系列--SparkCore(三)RDD基本操作

    千次阅读 2018-08-03 14:44:53
    前言 RDD的基本操作分为两种,一种是转换Transformation,一种是行动Action RDD中的所有转换都是延迟加载的,也就是说,它们并不会直接计算...这种设计让Spark更加有效率地运行。 一、常用的Transformation 1...
  • Spark - 运行架构&原理

    2020-06-21 17:55:25
    Spark运行架构 运行架构 Spark框架的核心是一个计算引擎,整体来说,它采用了标准 master-slave 的结构。如下图所示,它展示了一个 Spark执行时的基本结构。图形中的Driver表示master,负责管理整个集群中的作业任务...
  • Spark常用的算子以及Scala函数总结

    千次阅读 2018-04-01 09:52:30
    Spark与Scala首先,介绍一下scala语言:Scala 是一种把面向对象和函数式编程理念加入到静态类型语言中的混血儿。为什么学scala?spark提供了R、Python等语言的接口,为什么还要重新学一门新的语言呢?1、spark本身...
  • spark 基础和spark sql翻译

    千次阅读 2015-06-15 10:56:51
    Spark1.3.0 基础文档和spark1.3.0 sql 部分翻译
  • Spark算子使用示例

    万次阅读 2017-08-24 11:36:03
    1. 算子分类从大方向来说,Spark 算子大致可以分为以下两类 Transformation:操作是延迟计算的,也就是说从一个RDD 转换生成另一个 RDD 的转换操作不是马上执行,需要等到有 Action 操作的时候才会真正触发运算。 ...
  • Spark基础知识点儿汇总

    千次阅读 2018-04-13 19:26:05
    *spark的理解spark是一个快速的、统一的大规模数据处理引擎它是基于内存计算的它的特点是:快速、易用、适用于各种数据处理场景(批处理、流处理、交互式处理)、它可以运行在多种分布式计算框架中,如yarn和...
  • SparkRDD函数详解

    千次阅读 2018-03-12 21:13:45
    1、RDD操作详解启动spark-shellspark-shell --master spark://node-1:70771.1 基本转换1) mapmap是对RDD中的每个元素都执行一个指定的函数来产生一个新的RDD。 任何原RDD中的元素在新RDD中都有且只有一个元素与之...
  • spark

    2020-06-09 06:55:26
    Spark 第一代引擎:mapreduce 第二代引擎:impala(DAG) 第三代引擎:spark 第四代引擎:flink 定义 大数据的统一的计算引擎。采用 DAG来进行计算。 Spark是一种快速、通用、可扩展的大数据分析引擎 Spark部署模式 ...
  • Spark 一、Spark介绍 Spark 是用于大规模数据处理的统一分析引擎。 Spark是一种与Hadoop相似的开源集群计算环境,Spark使用了内存分布数据集,除了能够提供交互式查询外,它还可以优化迭代工作负载。 相对于...
  • 大数据的面试除了手撕算法之外,还有手撕Hql,手撕Spark RDD等“额外套餐”,本文就是专为应对面试中手撕Spark准备的操作指南,如有错误还请指正。 我们先准备一个数据集。 val rdd1: RDD[Int] = sc.parallelize...
  • 一、Spark基础1.1Spark简介• MR有什么问题?– 调度慢,启动map、 reduce太耗时;计算慢,每一步都要保存中间结果落磁盘;API抽象简单,只有map和reduce两个原语;缺乏作业流描述,一项任务需要多轮mr。• 什么是...
  • 文章目录第1章Spark GraphX概述1.1什么是Spark GraphX1.2弹性分布式属性图1.3运行图计算程序第2章Spark GraphX解析2.1存储模式2.1.1图存储模式2.1.2GraphX存储模式2.1.2.1 RandomVertexCut2.1.2.2 ...
1 2 3 4 5 ... 20
收藏数 407
精华内容 162
关键字:

kv求交集 spark