精华内容
参与话题
问答
  • 在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark 0.9.0之后才引入的(可以参见SPARK-1063)。而sortByKey函数是对PairRDD进行排序,也就是有Key和...

    在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外。在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark 0.9.0之后才引入的(可以参见SPARK-1063)。而sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。下面将分别对这两个函数的实现以及使用进行说明。

    展开全文
  • top.txt文件 ...取前三位数(Java)sortByKey 运行结果 取前三位数(Java)sortBy 运行结果 **取最大的前三位数(Scala)sortByKey ** 运行结果 取最大的前三位数(Scala)sortBy 运行结果 ...

    top.txt文件
    在这里插入图片描述
    取前三位数(Java)sortByKey
    在这里插入图片描述
    运行结果
    在这里插入图片描述
    取前三位数(Java)sortBy

    在这里插入图片描述

    运行结果
    在这里插入图片描述

    取最大的前三位数(Scala)sortByKey
    在这里插入图片描述
    运行结果
    在这里插入图片描述
    取最大的前三位数(Scala)sortBy
    在这里插入图片描述
    运行结果
    在这里插入图片描述

    展开全文
  • Spark:sortBy和sortByKey的函数详解

    千次阅读 2017-01-19 13:45:09
    sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark 0.9.0之后才引入的(可以参见SPARK-1063)。而sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。下面将分别对这两个函数的实现以及...

    在很多应用场景都需要对结果数据进行排序,Spark中有时也不例外。在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark 0.9.0之后才引入的(可以参见SPARK-1063)。而sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。下面将分别对这两个函数的实现以及使用进行说明。
      

    一、sortBy函数实现以及使用

      sortBy函数是在org.apache.spark.rdd.RDD类中实现的,它的实现如下:

    01 /**
    02  * Return this RDD sorted by the given key function.
    03  */
    04 def sortBy[K](
    05     f: (T) => K,
    06     ascending: Boolean = true,
    07     numPartitions: Int = this.partitions.size)
    08     (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] =
    09   this.keyBy[K](f)
    10       .sortByKey(ascending, numPartitions)
    11       .values

      该函数最多可以传三个参数:
      第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;
      第二个参数是ascending,从字面的意思大家应该可以猜到,是的,这参数决定排序后RDD中的元素是升序还是降序,默认是true,也就是升序;
      第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认排序后的分区个数和排序之前的个数相等,即为this.partitions.size
      从sortBy函数的实现可以看出,第一个参数是必须传入的,而后面的两个参数可以不传入。而且sortBy函数函数的实现依赖于sortByKey函数,关于sortByKey函数后面会进行说明。keyBy函数也是RDD类中进行实现的,它的主要作用就是将将传进来的每个元素作用于f(x)中,并返回tuples类型的

    元素,也就变成了Key-Value类型的RDD了,它的实现如下:

    1 /**
    2 * Creates tuples of the elements in this RDD by applying `f`.
    3 */
    4 def keyBy[K](f: => K): RDD[(K, T)] = {
    5     map(x => (f(x), x))
    6 }

      那么,如何使用sortBy函数呢?

    01 /**
    02  * User: 过往记忆
    03  * Date: 14-12-26
    04  * Time: 上午10:16
    05  * bolg: http://www.iteblog.com
    06  * 本文地址:http://www.iteblog.com/archives/1240
    07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
    08  * 过往记忆博客微信公共帐号:iteblog_hadoop
    09  */
    10 scala> val data = List(3,1,90,3,5,12)
    11 data: List[Int] = List(31903512)
    12  
    13 scala> val rdd = sc.parallelize(data)
    14 rdd: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[0] at parallelize at <console>:14
    15  
    16 scala> rdd.collect
    17 res0: Array[Int] = Array(31903512)
    18  
    19 scala> rdd.sortBy(x => x).collect
    20 res1: Array[Int] = Array(13351290)
    21  
    22 scala> rdd.sortBy(x => x, false).collect
    23 res3: Array[Int] = Array(90125331)
    24  
    25 scala> val result = rdd.sortBy(x => x, false)
    26 result: org.apache.spark.rdd.RDD[Int] = MappedRDD[23] at sortBy at <console>:16
    27  
    28 scala> result.partitions.size
    29 res9: Int = 2
    30  
    31 scala> val result = rdd.sortBy(x => x, false1)
    32 result: org.apache.spark.rdd.RDD[Int] = MappedRDD[26] at sortBy at <console>:16
    33  
    34 scala> result.partitions.size
    35 res10: Int = 1

      上面的实例对rdd中的元素进行升序排序。并对排序后的RDD的分区个数进行了修改,上面的result就是排序后的RDD,默认的分区个数是2,而我们对它进行了修改,所以最后变成了1。
      

    二、sortByKey函数实现以及使用

    sortByKey函数作用于Key-Value形式的RDD,并对Key进行排序。它是在org.apache.spark.rdd.OrderedRDDFunctions中实现的,实现如下

    1 def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.size)
    2     : RDD[(K, V)] =
    3 {
    4   val part = new RangePartitioner(numPartitions, self, ascending)
    5   new ShuffledRDD[K, V, V](self, part)
    6     .setKeyOrdering(if (ascending) ordering else ordering.reverse)
    7 }

      从函数的实现可以看出,它主要接受两个函数,含义和sortBy一样,这里就不进行解释了。该函数返回的RDD一定是ShuffledRDD类型的,因为对源RDD进行排序,必须进行Shuffle操作,而Shuffle操作的结果RDD就是ShuffledRDD。其实这个函数的实现很优雅,里面用到了RangePartitioner,它可以使得相应的范围Key数据分到同一个partition中,然后内部用到了mapPartitions对每个partition中的数据进行排序,而每个partition中数据的排序用到了标准的sort机制,避免了大量数据的shuffle。下面对sortByKey的使用进行说明:

    01 /**
    02  * User: 过往记忆
    03  * Date: 14-12-26
    04  * Time: 上午10:16
    05  * bolg: http://www.iteblog.com
    06  *
    07  * 过往记忆博客,专注于hadoop、hive、spark、shark、flume的技术博客,大量的干货
    08  * 过往记忆博客微信公共帐号:iteblog_hadoop
    09  */
    10 scala> val = sc.parallelize(List("wyp""iteblog""com""397090770""test"), 2)
    11 a: org.apache.spark.rdd.RDD[String] =
    12 ParallelCollectionRDD[30] at parallelize at <console>:12
    13  
    14 scala> val = sc. parallelize (1 to a.count.toInt , 2)
    15 b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[31] at parallelize at <console>:14
    16  
    17 scala> val = a.zip(b)
    18 c: org.apache.spark.rdd.RDD[(String, Int)] = ZippedPartitionsRDD2[32] at zip at <console>:16
    19  
    20 scala> c.sortByKey().collect
    21 res11: Array[(String, Int)] = Array((397090770,4), (com,3), (iteblog,2), (test,5), (wyp,1))

      上面对Key进行了排序。细心的读者可能会问,soryKy函数中的第一个参数可以对排序方式进行重写。为什么sortByKey没有呢?难道只能用默认的排序规则。不是,是有的。其实在OrderedRDDFunctions类中有个变量ordering它是隐形的:private val ordering = implicitly[Ordering[K]]。他就是默认的排序规则,我们可以对它进行重写,如下:

    01 scala> val = sc.parallelize(List(3,1,9,12,4))
    02 b: org.apache.spark.rdd.RDD[Int] = ParallelCollectionRDD[38] at parallelize at <console>:12
    03  
    04 scala> val = b.zip(a)
    05 c: org.apache.spark.rdd.RDD[(Int, String)] = ZippedPartitionsRDD2[39] at zip at <console>:16
    06  
    07 scala> c.sortByKey().collect
    08 res15: Array[(Int, String)] = Array((1,iteblog), (3,wyp), (4,test), (9,com), (12,397090770))
    09  
    10 scala> implicit val sortIntegersByString = new Ordering[Int]{
    11      override def compare(a: Int, b: Int) =
    12      | a.toString.compare(b.toString)}
    13 sortIntegersByString: Ordering[Int] = $iwC$$iwC$$iwC$$iwC$$iwC$$anon$1@5d533f7a
    14  
    15 scala>  c.sortByKey().collect
    16 res17: Array[(Int, String)] = Array((1,iteblog), (12,397090770), (3,wyp), (4,test), (9,com))

      例子中的sortIntegersByString就是修改了默认的排序规则。这样将默认按照Int大小排序改成了对字符串的排序,所以12会排序在3之前。

    本博客文章除特别声明,全部都是原创!
    尊重原创,转载请注明: 转载自过往记忆(http://www.iteblog.com/)
    本文链接地址: 《Spark: sortBy和sortByKey函数详解》(http://www.iteblog.com/archives/1240)

    展开全文
  • spark sortBy sortByKey实战详解

    千次阅读 2018-08-27 19:13:41
    日常工作中,排序是道绕过不过去的侃,我们每天都会面对各种各样的排序需求。那么在spark中如何排序呢?我们来看一些很有代表性的例子。 1.最简单的排序 假设有个RDD[Int]类型的数据,需要按数据大小进行排序,那...

    项目github地址:bitcarmanlee easy-algorithm-interview-and-practice
    欢迎大家star,留言,一起学习进步

    日常工作中,排序是道绕过不过去的侃,我们每天都会面对各种各样的排序需求。那么在spark中如何排序呢?我们来看一些很有代表性的例子。

    1.最简单的排序

    假设有个RDD[Int]类型的数据,需要按数据大小进行排序,那这个排序算最简单的:

    sc.parallelize(Array(1,3,2,4,6,5)).sortBy(x => x).collect()
    

    代码运行的结果:

     Array[Int] = Array(1, 2, 3, 4, 5, 6)
    

    2.kv结构的排序

    在kv结构的数据中,按value排序是常见的需求:

    sc.parallelize(Array(("a", 1), ("c", 3), ("b", 2), ("d", 4))).sortBy(_._2)
    

    代码运行的结果:

    Array[(String, Int)] = Array((a,1), (b,2), (c,3), (d,4))
    

    3.定制排序规则

    有如下结构的数据:

    <10 6094308
    <100 234975
    <20 2286079
    <200 1336431
    

    希望按照<后面的数字大小排序,得到如下结果:

    <10 6094308
    <20 2286079
    <100 234975
    <200 1336431
    

    代码如下:

    val array = Array(("<10",6094308), ("<100",234975), ("<20",2286079),("<200",1336431));
    sc.parallelize(array).sortBy({item => item._1.substring(1, item._1.length).toInt}).collect()
    

    要理解上述代码的原理,我们需要分析一下sortBy的源码。

      /**
       * Return this RDD sorted by the given key function.
       */
      def sortBy[K](
          f: (T) => K,
          ascending: Boolean = true,
          numPartitions: Int = this.partitions.length)
          (implicit ord: Ordering[K], ctag: ClassTag[K]): RDD[T] = withScope {
        this.keyBy[K](f)
            .sortByKey(ascending, numPartitions)
            .values
      }
    

    sortBy必需传入的一个参数为f: (T) => KT为array中的元素类型。

      /**
       * Creates tuples of the elements in this RDD by applying `f`.
       */
      def keyBy[K](f: T => K): RDD[(K, T)] = withScope {
        val cleanedF = sc.clean(f)
        map(x => (cleanedF(x), x))
      }
    

    传入的f: (T) => K作用在keyBy方法上,生成了一个RDD[(K, T)]的数据。
    然后调用sortByKey,最后取出里面的T,得到的就是原始array中的类型!

    4.用sortByKey实现上面的功能

    我们再来看看sortByKey的源码

      /**
       * Sort the RDD by key, so that each partition contains a sorted range of the elements. Calling
       * `collect` or `save` on the resulting RDD will return or output an ordered list of records
       * (in the `save` case, they will be written to multiple `part-X` files in the filesystem, in
       * order of the keys).
       */
      // TODO: this currently doesn't work on P other than Tuple2!
      def sortByKey(ascending: Boolean = true, numPartitions: Int = self.partitions.length)
          : RDD[(K, V)] = self.withScope
      {
        val part = new RangePartitioner(numPartitions, self, ascending)
        new ShuffledRDD[K, V, V](self, part)
          .setKeyOrdering(if (ascending) ordering else ordering.reverse)
      }
    

    大家看到sortByKey的源码可能会有疑惑,难道sortByKey不能指定排序方式么?不能像sortBy那样传入一个函数么?
    其实是可以的。sortByKey位于OrderedRDDFunctions类中,OrderedRDDFunctions中有一个隐藏变量:

    private val ordering = implicitly[Ordering[K]]
    

    我们重写这个变量以后,就可以改变排序规则。
    以第三部分的需求为例,我们用sortByKey可以这么做:

    implicit val sortByNum = new Ordering[String] { override def compare(x: String, y: String): Int = x.substring(1, x.length).toInt.compareTo(y.substring(1, y.length).toInt)};
    val array = Array(("<10",6094308), ("<100",234975), ("<20",2286079),("<200",1336431));
    sc.parallelize(array).sortByKey().collect()
    

    最后的输出结果为:

    Array[(String, Int)] = Array((<10,6094308), (<20,2286079), (<100,234975), (<200,1336431))
    

    同样达到了我们的目的!

    展开全文
  • Spark中sortByKey和sortBy对(key,value)数据分别 根据key和value排序
  • Spark: sortBy和sortByKey函数详解

    千次阅读 2018-07-23 09:55:16
    在Spark中存在两种对RDD进行排序的函数,分别是 sortBy和sortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark 0.9.0之后才引入的(可以参见SPARK-1063)。而sortByKey函数是对PairRDD进行排序,也就是有Key和...
  • 浅析sortByKey算子

    千次阅读 2017-02-10 12:55:41
    一、简介spark中用于排序的算子主要有两个,sortByKey与sortBy,其中sortBy是引用sortByKey来实现的。下面主要对sortByKey算子进行分析,该方法的实现代码如下: def sortByKey(ascending: Boolean = true, ...
  • Spark: sortBy sortByKey 二次排序

    千次阅读 2017-09-12 12:29:12
    Sample data(考场号,班级号,学号)–> 考场号升序,班级号升序,学号降序1 1 3 1 1 4 1 2 8 1 3 7 3 2 9 3 5 11 1 4 13 1 5 12 2 1 14 2 1 10 2 4 1 2 3 5 2 4 6 3 5 2 3 2 15 1 1 16 2 2 17 ...
  • sortBysortByKey区别

    2019-11-27 22:24:09
    sortBysortBy可以定义排序方式 object sortByTest{ def main(args: Array[String]): Unit = { val conf: SparkConf = new SparkConf().setMaster("local[*]").setAppName("reduceTest") val sc = ...
  • sortBy函数 sortBy函数是在org.apache.spark.rdd.RDD类中实现的。 该函数有三个参数:  第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的;  第二个参数是ascending,从...
  • import org.apache.spark.rdd.RDD import org.apache.spark.{SparkConf, SparkContext} /** * Created with IntelliJ IDEA. ...* Date: 2020/9/27 20:07 * Version: 1.0 * Description:排序相关的算子的应用 ...
  • 记录pyspark中的sortBykeysortBy的问题

    千次阅读 2019-05-14 13:50:00
    当我在复习pyspark中的sortByKey时,我试图使用...我用sortBy方法再次进行操作,能够得到正确结果。我尝试查看了一下源码,但还没有解决问题,因此记录一下问题,以待后续解决和更新。具体运行情况如下: ...
  • spark中的sortBysortByKey

    千次阅读 2017-04-06 12:01:59
    spark中对RDD的数据进行排序...比如我们对wordcount的结果进行排序,除了将(key,value)倒过来根据key排序外,我们可以直接用sortBy. 用法如下: 第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD
  • 排序sortByKey,和sortBy

    2019-06-20 23:02:12
    普通排序sortByKey package com.aura.liu.Dayof20 import org.apache.log4j.{Level, Logger} import org.apache.spark.{SparkConf, SparkContext} import org.apache.spark.util.AccumulatorV2 import scala....
  • sortBy 该函数会对原数据进行Shuffle操作,里面用到了RangePartitioner
  • PairRDD的创建 可以采用多种方式创建Pair RDD,其中一种主要的方式是使用map()函数来实现。 scala> val lines = sc.textFile("pathToFile") scala> val pairRDD = lines.flatMap(line =>...
  • 1.sortByKey() 功能:  返回一个根据键排序的RDD 示例 val list = List(("a",3),("b",2),("c",1)) val pairRdd = sc.parallelize(list) pairRdd.sortByKey().collect.foreach(println) 结果 (a,3) ...
  • 一、数据集 fruits.txt apple banana canary melon grap lemon orange pineapple strawberry ...numFruitsByLength = fruits.map(lambda fruit: (len(fruit), 1)).reduceByKey(lambda x, y: x + y) print
  • https://www.jianshu.com/p/9baf6039c71e
  • spark之sortBysortByKey

    2019-11-27 19:03:26
    在Spark中存在两种对RDD进行排序的函数,分别是 sortBysortByKey函数。sortBy是对标准的RDD进行排序,它是从Spark 0.9.0之后才引入的(可以参见SPARK-1063)。而sortByKey函数是对PairRDD进行排序,也就是有Key和...
  • //统计单词top10def main(args: Array[String]): Unit = { val conf = new SparkConf().setAppName("tst").setMaster("local[3]") val sc = new SparkContext(conf) //wc val res = sc.text...
  • 【Spark】sortBy[T]和sortByKey[T]排序详解

    千次阅读 2017-12-19 15:22:30
    问题导读: 1. 排序算子是如何做排序的? 2. 完整的排序流程是? 解决方案: 1 前言 ...在前面一系列博客中,特别在Shuffle博客系列中,曾描述过在生成ShuffleWrite的文件的时候,对每个partition会先进行排序并...
  • sortby中默认传入排序规则是 ascending true.升序 第一个参数是一个函数,该函数的也有一个带T泛型的参数,返回类型和RDD中元素的类型是一致的 第三个参数是numPartitions,该参数决定排序后的RDD的分区个数,默认...
  • 1 前言在前面一系列博客中,特别在Shuffle博客系列中,曾今描述过在生成ShuffleWrite的文件的时候,对每个partition会先进行排序并spill到文件中,最后合并成ShuffleWrite的文件,也就是每个Partition里的内容已经...
  • 不知道大家有没有注意到,大家在编写spark程序调用sortBy/sortByKey这两个算子的时候大家会不会有这样子的疑问,他们两个明明是transformation,为啥在执行的时候却触发了作业的执行呢?今天就和大家一起一探究竟? ...
  • sortBy是对标准的RDD进行排序。[在scala语言中,RDD与PairRDD没有太严格的界限]。 sortByKey函数是对PairRDD进行排序,也就是有Key和Value的RDD。sortBy源码 /** * RDD.scala * Return this RDD sorted by the ...

空空如也

1 2 3 4 5 ... 20
收藏数 329,083
精华内容 131,633
关键字:

sortbykey