精华内容
下载资源
问答
  • 在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念:正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的...

    原标题:什么是数据倾斜?如何解决数据倾斜?

    导读

    相信很多接触MapReduce的朋友对'数据倾斜'这四个字并不陌生,那么究竟什么是数据倾斜?又该怎样解决这种该死的情况呢?

    02dfb6cd81762444e9f31cf51425fc0c.png

    何为数据倾斜?

    在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念:

    正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , 不同的数据字段可能的数据倾斜一般有两种情况:

    一种是唯一值非常少,极少数值有非常多的记录值(唯一值少于几千)

    一种是唯一值比较多,这个字段的某些值有远远多于其他值的记录数,但是它的占比也小于百分之一或千分之一

    数据倾斜:

    数据倾斜在MapReduce编程模型中十分常见,用最通俗易懂的话来说,数据倾斜无非就是大量的相同key被partition分配到一个分区里,造成了'一个人累死,其他人闲死'的情况,这种情况是我们不能接受的,这也违背了并行计算的初衷,首先一个节点要承受着巨大的压力,而其他节点计算完毕后要一直等待这个忙碌的节点,也拖累了整体的计算时间,可以说效率是十分低下的。

    解决方案:

    1.增加jvm内存,这适用于第一种情况(唯一值非常少,极少数值有非常多的记录值(唯一值少于几千)),这种情况下,往往只能通过硬件的手段来进行调优,增加jvm内存可以显著的提高运行效率。

    2.增加reduce的个数,这适用于第二种情况(唯一值比较多,这个字段的某些值有远远多于其他值的记录数,但是它的占比也小于百分之一或千分之一),我们知道,这种情况下,最容易造成的结果就是大量相同key被partition到一个分区,从而一个reduce执行了大量的工作,而如果我们增加了reduce的个数,这种情况相对来说会减轻很多,毕竟计算的节点多了,就算工作量还是不均匀的,那也要小很多。

    3.自定义分区,这需要用户自己继承partition类,指定分区策略,这种方式效果比较显著。

    4.重新设计key,有一种方案是在map阶段时给key加上一个随机数,有了随机数的key就不会被大量的分配到同一节点(小几率),待到reduce后再把随机数去掉即可。

    5.使用combinner合并,combinner是在map阶段,reduce之前的一个中间阶段,在这个阶段可以选择性的把大量的相同key数据先进行一个合并,可以看做是local reduce,然后再交给reduce来处理,这样做的好处很多,即减轻了map端向reduce端发送的数据量(减轻了网络带宽),也减轻了map端和reduce端中间的shuffle阶段的数据拉取数量(本地化磁盘IO速率),推荐使用这种方法。

    摘自:http://www.shcpda.com/shujuliliang/1166.html返回搜狐,查看更多

    责任编辑:

    展开全文
  • 1 Data Skew 数据倾斜 1.1 数据倾斜概念 对Hive、Spark、Flink等大数据计算框架来讲,数据量大并不可怕,可怕的是数据倾斜。 数据倾斜是指并行处理的数据集中某一部分的数据显著多于其它部分,从而使得该部分的处理...

    1 Data Skew 数据倾斜

    1.1 数据倾斜概念

    对Hive、Spark、Flink等大数据计算框架来讲,数据量大并不可怕,可怕的是数据倾斜。

    数据倾斜是指并行处理的数据集中某一部分的数据显著多于其它部分,从而使得该部分的处理速度成为整个数据集处理的瓶颈。
    数据倾斜是大数据计算中一个最棘手的问题,发生数据倾斜后,Spark作业的性能会比期望差很多。

    举个 word count 的入门例子:若进行 word count 的文本有100G,其中 80G 全部是 “aaa” ,剩下 20G 是其余单词,那就会形成 80G 的数据量交给同一个 reduce task进行相加,其余 20G 根据分配到不同 reduce task进行相加的情况,如此就造成了数据倾斜,结果是绝大部分task在几秒内跑完,reduce 整体跑到 99%,然后一直在原地等着那80G 的reduce task 跑完。
    在这里插入图片描述

    1.2 发生数据倾斜时的现象

    • 绝大多数task执行得都非常快,但个别task执行极慢。比如,总共有1000个task,990个task都在1分钟之内执行完了,但是剩余10来个task却要一两个小时。这种情况很常见,如上面的示例
    • 原本能够正常执行的Spark作业,某天突然报出OOM(内存溢出)异常,观察异常栈,是业务代码导致的,这种情况比较少见。

    1.3 数据倾斜的原因

    Shuffle数据之后导致数据分布不均匀,不同的Task处理的数据量差异太大,导致在机器性能、代码一样的情况下,task的执行时间差异太大。

    在进行shuffle的时候,必须将各个节点上相同的key拉取到某个节点上的一个task来进行处理【比如按照key进行聚合或join等操作】,此时如果某个key对应的数据量特别大的话,就会发生数据倾斜。如上面word count的示例。

    因此出现数据倾斜的时候,Spark作业看起来会运行得非常缓慢,甚至可能因为某个task处理的数据量过大导致内存溢出

    1.4 定位数据倾斜的代码

    数据倾斜只会发生在shuffle过程中。这里罗列一些常用的并且可能会触发shuffle操作的算子:

    • 重分区操作:比如repartition、repartitionAndSortWithinPartitions、coalesce等;重分区操作一般引起shuffle,因为需要在整个集群中,对之前所有的分区的数据进行随机,均匀的打乱,然后把数据放入下游新的指定数量的分区内
    • byKey类操作:比如reduceByKey、groupByKey、sortByKey、countByKey、combineByKey、aggregateByKey、foldByKey等;因为byKey操作是针对key进行操作【最常用用的是聚合操作】,需要保证相同的key,一定是到同一个节点上进行处理
    • join类操作:比如join、cogroup等,两个rdd进行join,就必须将相同join key的数据,shuffle到同一个节点上,然后进行相同key的两个rdd数据的笛卡尔乘积
    • 去重操作: distinct
    • 排序操作: sortByKey, sort
    • 集合或者表操作: intersection、subtractByKey,subtract

    出现数据倾斜时,可能就是代码中使用了这些算子中的某一个所导致的。

    1.4.1 某个task执行特别慢的情况

    通过Spark Web UI查看任务执行情况:

    • 查看当前spark application运行到了第几个stage
    • 深入看一下当前这个stage各个task分配的数据量,从而进一步确定是不是task分配的数据不均匀导致了数据倾斜。
    • 回到代码中定位出这个stage导致shuffle类的算子,此时基本就可以确定是哪个算子导致的数据倾斜问题

    知道数据倾斜发生在哪一个stage之后,接着就需要根据stage划分原理,推算出来发生倾斜的那个stage对应代码中的哪一部分,这部分代码中肯定会有一个shuffle类算子。

    精准推算stage与代码的对应关系,需要对Spark的源码有深入的理解,这里介绍一个相对简单实用的推算方法:只要看到Spark代码中出现了一个shuffle类算子或者是Spark SQL的SQL语句中出现了会导致shuffle的语句(比如group by语句),那么就可以判定,以那个地方为界限划分出了前后两个stage。

    以Spark单词计数来举例:在整个代码中,只有一个reduceByKey是会发生shuffle的算子,因此可以认为以这个算子为界限,会划分出前后两个stage。

    • stage0:主要执行从textFile到map操作,以及执行shuffle write操作。shuffle write操作,可以简单理解为对pairs RDD中的数据进行分区操作,每个task处理的数据中,相同的key会写入同一个磁盘文件内
    • stage1:主要执行从reduceByKey到collect操作,stage1的各个task一开始运行,首先执行shuffle read操作。执行shuffle read操作的task,会从stage0的各个task所在节点拉取属于自己处理的那些key,然后对同一个key进行全局性的聚合或join等操作,在这里就是对key的value值进行累加。stage1在执行完reduceByKey算子之后,就计算出了最终的wordCounts RDD,然后会执行collect算子,将所有数据拉取到Driver上,供遍历和打印输出。
    import org.apache.spark.{SparkConf, SparkContext}
    
    object ScalaWordCount {
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Scala Word Count")
        val sc = new SparkContext(conf)
    
        val lines = sc.textFile("hdfs://...")
        val words = lines.flatMap(_.split(" "))
        val pairs = words.map((_, 1))
        val wordCounts = pairs.reduceByKey(_ + _)
    
        wordCounts.collect().foreach(println(_))
        sc.stop()
      }
    }
    

    1.4.2 某个task莫名其妙内存溢出的情况

    这种情况下去定位出问题的代码就比较容易了。建议直接看yarn-client模式下本地log的异常栈,或者是通过YARN查看yarn-cluster模式下的log中的异常栈。
    一般来说,通过异常栈信息就可以定位到代码中哪一行发生了内存溢出。然后在那行代码附近找找,一般也会有shuffle类算子,此时很可能就是这个算子导致了数据倾斜。

    注意: 不能单纯靠偶然的内存溢出就判定发生了数据倾斜。因代码的bug以及偶然出现的数据异常,也可能会导致内存溢出。因此需要通过Spark Web UI查看报错的那个stage的各个task的运行时间以及分配的数据量,才能确定是否是由于数据倾斜才导致了这次内存溢出。

    1.5 导致数据倾斜的key分布情况

    查看key分布的方式:

    • 如果是Spark SQL中的group by、join语句导致的数据倾斜,那么就查询一下使用表的key分布情况。
    • 如果是对Spark RDD执行shuffle算子导致的数据倾斜,那么可以在Spark作业中加入查看key分布的代码,比如RDD.countByKey()。然后对统计出来的各个key出现的次数,collect/take到客户端打印一下,就可以看到key的分布情况。
    val sampledPairs = pairs.sample(false, 0.2)
    val sampledWordCounts = sampledPairs.countByKey()
    sampledWordCounts.foreach(println(_))
    

    2 数据倾斜的解决方案

    2.1 使用Hive ETL预处理数据

    实现原理:将shuffle操作前移到hive中执行,从根源上解决了数据倾斜,因为彻底避免了在Spark中执行shuffle类算子。

    方案缺点:治标不治本,Hive ETL中还是会发生数据倾斜。

    方案优点:实现起来简单便捷,效果还非常好,完全规避掉了数据倾斜,Spark作业的性能会大幅度提升。

    适用场景:数据源头表的数据存在倾斜,业务场景需要频繁使用Spark对Hive表执行某个分析操作

    hive的数据倾斜解决方案如下:

    • 当出现小文件过多时:合并小文件 set hive.merge.mapfiles=true
    • 当group by分组的维度过少,每个维度的值过多时:调优参数【注意以下2个参数不能同时使用,否则统计结果有问题】
      1. 设置在map阶段做部分聚合操作 hive.map.aggr=true
      2. 设置数据倾斜时负载均衡 hive.groupby.skewindata=true : 它分为了两个mapreduce,第一个在shuffle过程中partition时随机给key打标记,使其分布在不同的reduce上计算,但不能完成全部运算,所以需要第二次mapreduce整合回归正常的shuffle,由于数据分布不均问题在第一次时得到改善,所以基本解决数据倾斜问题。
    • 调节SQL语句
      1. 关联字段带空值的两表Join时:把空值的key变成一个字符串加上随机数,这样就可以把倾斜的数据分到不同的reduce上,此外由于空值关联不起来,所以处理后并不影响最终结果
      2. 大小表Join时:使用map join让小表(10000条以下的记录条数) 先进内存,在map端完成reduce
        set hive.auto.convert.join=true; //设置 MapJoin 优化自动开启
        set hive.mapjoin.smalltable.filesize=25000000 //设置小表不超过多大时开启 mapjoin 优化
    • count(distinct xx)时有大量相同的特殊值:用sum() group by的方式来替换count(distinct)完成计算。如:如select a,count(distinct b) from t group by a,用select a,sum(1) from (select a,b from t group by a,b) group by a替代。

    2.2 过滤少数导致倾斜的key

    实现原理:将导致数据倾斜的key给过滤掉之后,这些key就不会参与计算了,自然不可能产生数据倾斜。

    方案缺点:适用场景不多,大多数情况下,导致倾斜的key还是很多的,并不是只有少数几个。可用于数据异常时导致某一个key产生的数据量暴增。

    方案优点:实现简单,而且效果也很好,可以完全规避掉数据倾斜。

    适用场景: 如果发现导致数据倾斜的key是极少数,并且对计算本身影响不大,那么这种方案比较适用。

    通过spark的sample算子,定位到数据倾斜的key,然后使用filter算子或sql where条件将其过滤即可。

    2.3 提高shuffle操作的并行度

    实现原理: 增加shuffle read task的数量,让原本分配给一个task的多个key分配给多个task,从而让每个task处理比原来更少的数据。举例来说,如果原本有5个key,每个key对应10条数据,这5个key都是分配给一个task的,那么这个task就要处理50条数据。而增加了shuffle read task以后,每个task就分配到一个key,即每个task就处理10条数据,那么自然每个task的执行时间都会变短了

    方案优点:实现起来比较简单,可以有效缓解和减轻数据倾斜的影响。

    方案缺点:只是缓解了数据倾斜而已,没有彻底根除问题,根据实践经验来看,其效果有限。

    适用场景:如果必须要对数据倾斜迎难而上,请优先使用这种方案,因为这是处理数据倾斜最简单的一种方案。

    此方案通常无法彻底解决数据倾斜,因为如果出现一些极端情况,比如某个key对应的数据量有100万,那么无论你的task数量增加到多少,这个对应着100万数据的key肯定还是会分配到一个task中去处理,因此注定还是会发生数据倾斜的。所以这种方案只能说是在发现数据倾斜时尝试使用的第一种手段,尝试去用嘴简单的方法缓解数据倾斜而已,或者是和其他方案结合起来使用。

    2.4 两阶段聚合(局部聚合+全局聚合)

    实现原理:将原本相同的key通过附加随机前缀的方式,变成多个不同的key,就可以让原本被一个task处理的数据分散到多个task上去做局部聚合,进而解决单个task处理数据量过多的问题。接着去除掉随机前缀,再次进行全局聚合,就可以得到最终的结果。具体原理见下图。类似于hive中设置数据倾斜时负载均衡参数 hive.groupby.skewindata=true

    方案优点:对于聚合类的shuffle操作导致的数据倾斜,效果是非常不错的。通常都可以解决掉数据倾斜,或者至少是大幅度缓解数据倾斜,将Spark作业的性能提升数倍以上。

    方案缺点:仅仅适用于聚合类的shuffle操作,适用范围相对较窄。如果是join类的shuffle操作,还得用其他的解决方案。

    适用场景: 适用于聚合类的shuffle操作

    在这里插入图片描述
    示例代码如下:

    val logger: Logger = LoggerFactory.getLogger(this.getClass)
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Scala Word Count").setMaster("local")
        val sc = new SparkContext(conf)
    
        val list = List(
          "hello you hello hello me",
          "hello you hello hello are you ok",
          "oh hello hello world"
        )
        val listRDD = sc.parallelize(list)
        val pairsRDD = listRDD.flatMap(line => line.split("\\s+")).map((_, 1))
    
        //step 1 【可选】找到发生数据倾斜key
        val sampleRDD = pairsRDD.sample(false, 0.2)
        val cbk= sampleRDD.countByKey()
        val dsKeys = cbk.toBuffer.sortWith((t1, t2) => t1._2 > t2._2)
        println("================>sample,打印key的分布情况:")
        dsKeys.foreach(print(_))
        println()
    
        //step 2给RDD中的每个key都打上一个随机前缀。
        val prefixPairsRDD = pairsRDD.map{case(word,count) => {
            val random = new Random()
            val prefix = random.nextInt(3)
            (s"${prefix}_${word}", count)
        }}
        //step 3 对打上随机前缀的key进行局部聚合。
        val partAggrInfo = prefixPairsRDD.reduceByKey(_+_)
        println("===============>局部聚合之后的结果:")
        partAggrInfo.foreach(println)
        //step 4 全局聚合
        //step 4.1 去掉前缀
        val unPrefixPairRDD = partAggrInfo.map{case (word, count) => {
            (word.substring(word.indexOf("_") + 1), count)
        }}
        println("================>去掉随机前缀之后的结果:")
        unPrefixPairRDD.foreach(println)
        // step 4.2 全局聚合
        val fullAggrInfo = unPrefixPairRDD.reduceByKey(_+_)
        println("===============>全局聚合之后的结果:")
        fullAggrInfo.foreach(println)
        sc.stop()
      }
    

    2.5 将reduce join转为map join

    实现原理:如果一个RDD是比较小的,采用广播小RDD全量数据+map算子来实现与join同样的效果,也就是map join,此时就不会发生shuffle操作,也就不会发生数据倾斜。类似于hive 大小表Join时:使用map join让小表先进内存,在map端完成reduce。

    方案优点:对join操作导致的数据倾斜,效果非常好,因为根本就不会发生shuffle,也就根本不会发生数据倾斜。

    方案缺点:适用场景较少,因为只适用于一个大表和一个小表的情况。
    因为需要将小表进行广播,会比较消耗内存资源,driver和每个Executor内存中都会驻留一份小RDD的全量数据。
    如果广播出去的RDD数据比较大,比如10G以上,那么就可能发生内存溢出了。因此并不适合两个都是大表的情况。

    适用场景: 适用于一个大表和一个小表(几百MB以内)的情况

    示例代码如下:

    def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Scala Word Count").setMaster("local")
        val sc = new SparkContext(conf)
    
        val students = List(
          "1 张三 1",
          "2 李四 1",
          "3 赵美丽 2",
          "4 赵六 1",
          "5 王五 1",
          "6 jasen 1"
        )
    
        val cls = List(
          "1 2000级1班",
          "2 2000级2班",
          "3 2000级3班",
          "4 2000级4班",
          "7 2000级5班"
        )
    
        val stuRDD = sc.parallelize(students)
    
        //step 1 Broadcast小表
        // 将较小RDD中的数据直接通过collect算子拉取到Driver端的内存中来,
        // 然后使用Spark的广播功能,将小RDD的数据转换成广播变量,这样每个Executor就只有一份RDD的数据。
    	// 可以尽可能节省内存空间,并且减少网络传输性能开销。
        val map = cls.map(line => {
          val cols = line.split("\\s+")
          (cols(0), cols(1))
        }).toMap
        val clsMapBC: Broadcast[Map[String, String]] = sc.broadcast(map)
    
        stuRDD.map { case line => {
            //在算子函数中,通过广播变量,获取到本地Executor中的rdd1数据, 将
            val map = clsMapBC.value
            val cols = line.split("\\s+")
            val cid = cols(2)
            val className = map.getOrElse(cid, "UnKnown")
            s"${cols(0)}\t${cols(1)}\t${className}"
          }
        }.foreach(println)
        sc.stop()
      }
    

    2.6 采样倾斜key并分拆join操作

    实现原理:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,可以将少数几个key分拆成独立RDD,并附加随机前缀打散成n份去进行join,此时这几个key对应的数据就不会集中在少数几个task上,而是分散到多个task进行join了。具体原理见下图。

    方案优点:对于join导致的数据倾斜,如果只是某几个key导致了倾斜,采用该方式可以用最有效的方式打散key进行join。而且只需要针对少数倾斜key对应的数据进行扩容n倍,不需要对全量数据进行扩容。避免了占用过多内存。

    方案缺点:如果导致倾斜的key特别多的话,比如成千上万个key都导致数据倾斜,那么这种方式也不适合。

    适用场景:两个RDD/Hive表进行join的时候,如果数据量都比较大,无法采用“解决方案五”,那么此时可以看一下两个RDD/Hive表中的key分布情况。如果出现数据倾斜,是因为其中某一个RDD/Hive表中的少数几个key的数据量过大,而另一个RDD/Hive表中的所有key都分布比较均匀,那么采用这个解决方案是比较合适的。

    具体实现思路如下:

    • 对包含少数几个数据量过大的key的那个RDD,通过sample算子采样出一份样本来,然后统计一下每个key的数量,计算出来数据量最大的是哪几个key。
    • 然后将这几个key对应的数据从原来的RDD中拆分出来,形成一个单独的RDD,并给每个key都打上n以内的随机数作为前缀,而不会导致倾斜的大部分key形成另外一个RDD。
    • 接着将需要join的另一个RDD,也过滤出来那几个倾斜key对应的数据并形成一个单独的RDD,将每条数据膨胀成n条数据,这n条数据都按顺序附加一个0~n的前缀,不会导致倾斜的大部分key也形成另外一个RDD。
    • 再将附加了随机前缀的独立RDD与另一个膨胀n倍的独立RDD进行join,此时就可以将原先相同的key打散成n份,分散到多个task中去进行join了。
    • 另外两个普通的RDD就照常join即可。
    • 最后将两次join的结果使用union算子合并起来即可,就是最终的join结果。
      在这里插入图片描述
      示例代码如下:
      def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Scala Word Count").setMaster("local")
        val sc = new SparkContext(conf)
    
        val list1 = List(
          "hello 1",
          "hello 2",
          "hello 3",
          "hello 4",
          "you 1",
          "me 2"
        )
    
        val list2 = List(
          "hello zhangsan 1",
          "hello lisi 1",
          "you wangwu 2",
          "me zhouqi 2"
        )
    
        val listRDD1 = sc.parallelize(list1).map(line => {
          val fields = line.split("\\s+")
          (fields(0), fields(1))
        })
        val listRDD2 = sc.parallelize(list2).map(line => {
          val fields = line.split("\\s+")
          (fields(0), fields(1))
        })
    
        val joinRDD: RDD[(String, (String, String))] = dataSkewRDDJoin(sc, listRDD1, listRDD2)
        println("最后进行join的结果:")
        joinRDD.foreach(println)
        sc.stop()
      }
    
      private def dataSkewRDDJoin(sc: SparkContext, listRDD1: RDD[(String, String)], listRDD2: RDD[(String, String)]) = {
        //假设rdd1中的部分key有数据倾斜,所以在进行join操作的时候,需要进行拆分计算
        //step 1 找到发生数据倾斜的key
        val dsKeys = listRDD1.sample(false, 0.6).countByKey().toList.sortWith((t1, t2) => t1._2 > t2._2).take(1).map(t => t._1)
        println("通过sample算子得到的可能发生数据倾斜的key:" + dsKeys)
    
        //step 2 对rdd1和rdd2中的数据按照dsKeys各拆分成两个部分
        //step 2.1 讲dataSkewKeys进行广播
        val dskBC = sc.broadcast(dsKeys)
        // step 2.2 进行拆分
        val dataSkewRDD1 = listRDD1.filter { case (word, value) => {
              val dsks = dskBC.value
              dsks.contains(word)
            }
        }
    
        val commonRDD1 = listRDD1.filter { case (word, value) => {
            val dsks = dskBC.value
            !dsks.contains(word)
          }
        }
        val dataSkewRDD2 = listRDD2.filter { case (word, value) => {
            val dsks = dskBC.value
            dsks.contains(word)
          }
        }
    
        val commonRDD2 = listRDD2.filter { case (word, value) => {
            val dsks = dskBC.value
            !dsks.contains(word)
          }
        }
    
        //step 3 对dataskewRDD进行添加N以内随机前缀
        // step 3.1 添加随机前缀
        val prefixDSRDD1: RDD[(String, String)] = dataSkewRDD1.map { case (word, value) => {
            val random = new Random()
            val prefix = random.nextInt(3)
            (s"${prefix}_${word}", value)
          }
        }
        // step 3.2 另一个rdd进行扩容
        val prefixDSRDD2: RDD[(String, String)] = dataSkewRDD2.flatMap { case (word, value) => {
            val ab = ArrayBuffer[(String, String)]()
            for (i <- 0 until 2) {
              ab.append((s"${i}_${word}", value))
            }
            ab
          }
        }
        println("---->有数据倾斜RDD1添加前缀成prefixDSRDD1的结果:" + prefixDSRDD1.collect().mkString(","))
        println("---->有数据倾斜RDD2扩容之后成prefixDSRDD2的结果:" + prefixDSRDD2.collect().mkString(","))
    
        // step 4 分步进行join操作
        // step 4.1 有数据倾斜的prefixDSRDD1和prefixDSRDD2进行join
        val prefixJoinDSRDD = prefixDSRDD1.join(prefixDSRDD2)
        //ste 4.2 无数据倾斜的commonRDD1和commonRDD2进行join
        val commonJoinRDD = commonRDD1.join(commonRDD2)
        // step 4.3 将随机前缀去除
        val dsJionRDD = prefixJoinDSRDD.map { case (word, (value1, value2)) => {
            (word.substring(2), (value1, value2))
          }
        }
        //step 5 将拆分进行join之后的结果进行union连接,得到最后的结果 sql union all
        val joinRDD = dsJionRDD.union(commonJoinRDD)
        joinRDD
      }
    

    2.7 使用随机前缀和扩容RDD进行join

    实现原理:将原先一样的key通过附加随机前缀变成不一样的key,然后就可以将这些处理后的“不同key”分散到多个task中去处理,而不是让一个task处理大量的相同key。该方案与“解决方案六”的不同之处就在于,上一种方案是尽量只对少数倾斜key对应的数据进行特殊处理,由于处理过程需要扩容RDD,因此上一种方案扩容RDD后对内存的占用并不大;而这一种方案是针对有大量倾斜key的情况,没法将部分key拆分出来进行单独处理,因此只能对整个RDD进行数据扩容,对内存资源要求很高。

    方案优点:对join类型的数据倾斜基本都可以处理,而且效果也相对比较显著,性能提升效果非常不错。

    方案缺点:该方案更多的是缓解数据倾斜,而不是彻底避免数据倾斜。而且需要对整个RDD进行扩容,对内存资源要求很高。

    适用场景:如果在进行join操作时,RDD中有大量的key导致数据倾斜,那么进行分拆key也没什么意义,此时就只能使用最后一种方案来解决问题了。

    方案实现思路:

    • 将该RDD的每条数据都打上一个n以内的随机前缀。
    • 同时对另外一个正常的RDD进行扩容,将每条数据都扩容成n条数据,扩容出来的每条数据都依次打上一个0~n的前缀。
    • 最后将两个处理后的RDD进行join即可。

    示例代码如下:

     def main(args: Array[String]): Unit = {
        val conf = new SparkConf().setAppName("Scala Word Count").setMaster("local")
        val sc = new SparkContext(conf)
    
        val list1 = List(
          "hello 1",
          "hello 2",
          "hello 3",
          "hello 4",
          "you 1",
          "me 2"
        )
    
        val list2 = List(
          "hello zhangsan 1",
          "hello lisi 1",
          "you wangwu 2",
          "me zhouqi 2"
        )
    
        val listRDD1 = sc.parallelize(list1).map(line => {
          val fields = line.split("\\s+")
          (fields(0), fields(1))
        })
        val listRDD2 = sc.parallelize(list2).map(line => {
          val fields = line.split("\\s+")
          (fields(0), fields(1))
        })
    
        val joinRDD: RDD[(String, (String, String))] = dataSkewRDDJoin(sc, listRDD1, listRDD2)
        println("最后进行join的结果:")
        joinRDD.foreach(println)
        sc.stop()
      }
    
      private def dataSkewRDDJoin(sc: SparkContext, listRDD1: RDD[(String, String)], listRDD2: RDD[(String, String)]) = {
    
        //将一个有数据倾斜key的RDD,每条数据都打上100以内的随机前缀。
        val mappedRDD: RDD[(String, String)] = listRDD1.map { case (word, value) => {
            val random = new Random()
            val prefix = random.nextInt(3)
            (s"${prefix}_${word}", value)
          }
        }
        // 将其中一个key分布相对较为均匀的RDD膨胀100倍。
        val expandedRDD: RDD[(String, String)] = listRDD2.flatMap { case (word, value) => {
            val ab = ArrayBuffer[(String, String)]()
            for (i <- 0 until 3) {
              ab.append((s"${i}_${word}", value))
            }
            ab
          }
        }
    
        val joinedRDD = mappedRDD.join(expandedRDD)
    
    
        // 将随机前缀去除
        val dsJionRDD = joinedRDD.map { case (word, (value1, value2)) => {
            (word.substring(2), (value1, value2))
          }
        }
        dsJionRDD
      }
    

    2.8 多种方案组合使用

    在实践中发现,很多情况下,如果只是处理较为简单的数据倾斜场景,那么使用上述方案中的某一种基本就可以解决。
    但是如果要处理一个较为复杂的数据倾斜场景,那么可能需要将多种方案组合起来使用。比如说,针对出现了多个数据倾斜环节的Spark作业,可以先运用解决方案一和二,预处理一部分数据,并过滤一部分数据来缓解;其次可以对某些shuffle操作提升并行度,优化其性能;最后还可以针对不同的聚合或join操作,选择一种方案来优化其性能。

    需要对这些方案的思路和原理都透彻理解之后,在实践中根据各种不同的情况,灵活运用多种方案,来解决自己的数据倾斜问题。

    3 总结

    大多数Spark作业的性能主要就是消耗在了shuffle环节,因为该环节包含了大量的磁盘IO、序列化、网络数据传输等操作。
    因此,如果要让作业的性能更上一层楼,就有必要对shuffle过程进行调优。

    注意: 影响一个Spark作业性能的因素,主要还是代码开发、资源参数以及数据倾斜,shuffle调优只能在整个Spark的性能调优中占到一小部分而已。

    因此务必把握住调优的基本原则,千万不要舍本逐末。

    参考

    Spark性能优化指南——高级篇
    spark调优之数据倾斜

    展开全文
  • 数据倾斜解读

    2021-10-25 13:36:59
    本文将为你深入浅出地讲解什么是Hive数据倾斜、数据倾斜产生的原因以及面对数据倾斜的解决方法,从而帮你快速完成工作! 什么是数据倾斜? 数据倾斜在 MapReduce 计算框架中经常发生。通俗理解,该现...


     

    背景

    我们日常使用HIVE SQL的时候可能会遇到这样一个令人苦恼的场景:执行一个非常简单的SQL语句,任务的进度条长时间卡在99%,不确定还需多久才能结束,这种现象称之为数据倾斜。这一现象出现的原因在于数据研发工程师主要关注分析逻辑和数据结果的正确性,却很少关注SQL语句的执行过程与效率。

    本文将为你深入浅出地讲解什么是Hive数据倾斜、数据倾斜产生的原因以及面对数据倾斜的解决方法,从而帮你快速完成工作!

    什么是数据倾斜?

    数据倾斜在 MapReduce 计算框架中经常发生。通俗理解,该现象指的是在整个计算过程中,大量相同的key被分配到了同一个任务上,造成“一个人累死、其他人闲死”的状况,这违背了分布式计算的初衷,使得整体的执行效率十分低下。

    数据倾斜后直观表现是任务进度长时间维持在99%(或100%),查看任务监控页面,发现只有少量(1个或几个)reduce子任务未完成。因为其处理的数据量和其他reduce差异过大,单一reduce的记录数与平均记录数差异过大,最长时长远大于平均时长。

    数据倾斜表现形式

    什么情况下会出现数据倾斜?

    日常工作中数据倾斜主要发生在Reduce阶段,而很少发生在 Map阶段,其原因是Map端的数据倾斜一般是由于 HDFS 数据存储不均匀造成的(一般存储都是均匀分块存储,每个文件大小基本固定),而Reduce阶段的数据倾斜几乎都是因为数据研发工程师没有考虑到某种key值数据量偏多的情况而导致的。

    Reduce阶段最容易出现数据倾斜的两个场景分别是Join和Count Distinct。有些博客和文档将Group By也纳入其中,但是我们认为如果不使用Distinct,仅仅对分组数据进行Sum和Count操作,并不会出现数据倾斜,作者的话这里也单独拉出来算作一种场景,毕竟也是会造成的。

    数据倾斜产生的原因?

    • key分布不均匀
    • 业务数据本身的特性
    • 建表时考虑不周
    • 某些SQL语句本身就有数据倾斜

    数据倾斜的解决方案?

    一、优先开启负载均衡

    -- map端的Combiner,默认为ture
    set hive.map.aggr=true;
    -- 开启负载均衡
    set hive.groupby.skewindata=true (默认为false)
    

    如果发生数据倾斜,我们首先需要调整参数,进行负载均衡处理,这样 MapReduce 进程则会生成两个额外的 MR Job,这两个任务的主要操作如下:

    第一步:MR Job 中Map 输出的结果集合首先会随机分配到 Reduce 中,然后每个 Reduce 做局部聚合操作并输出结果,这样处理的原因是相同的Group By Key有可能被分发到不同的 Reduce Job中,从而达到负载均衡的目的。第二步:MR Job 再根据预处理的数据结果按照 Group By Key 分布到 Reduce 中(这个过程可以保证相同的 Group By Key 被分布到同一个 Reduce 中),最后完成聚合操作。

    二、表join连接时引发的数据倾斜

    两表进行join时,如果表连接的key存在倾斜,那么在 Shuffle 阶段必然会引起数据倾斜。

    2.1 小表join大表,某个key过大

    通常做法是将倾斜的数据存到分布式缓存中,分发到各个Map任务所在节点。在Map阶段完成join操作,即MapJoin,这避免了 Shuffle,从而避免了数据倾斜。

    MapJoin是Hive的一种优化操作,其适用于小表JOIN大表的场景,由于表的JOIN操作是在Map端且在内存进行的,所以其并不需要启动Reduce任务也就不需要经过shuffle阶段,从而能在一定程度上节省资源提高JOIN效率。

    在Hive 0.11版本之前,如果想在Map阶段完成join操作,必须使用MAPJOIN来标记显示地启动该优化操作,由于其需要将小表加载进内存所以要注意小表的大小。

    -- 常规join
    SELECT
            pis.id_ id,
            service_name serviceName,
            service_code serviceCode,
            bd.code_text serviceType,
    FROM
        prd_price_increment_service pis
        left join prd_price_increment_product pip on pis.id_ = pip.increment_service_id
    
    -- Hive 0.11版本之前开启mapjoin
    -- 将小表prd_price_increment_service,别名为pis的表放到map端的内存中
    -- 如果想将多个表放到Map端内存中,只需在mapjoin()中写多个表名称即可,用逗号分隔,如将a表和c表放到Map端内存中,则 /* +mapjoin(a,c) */ 。
    SELECT
          /*+ mapjoin(pis)*/
            pis.id_ id,
            service_name serviceName,
            service_code serviceCode,
            bd.code_text serviceType,
    FROM
        prd_price_increment_service pis
        left join prd_price_increment_product pip on pis.id_ = pip.increment_service_id
    

    在Hive 0.11版本及之后,Hive默认启动该优化,也就是不在需要显示的使用MAPJOIN标记,其会在必要的时候触发该优化操作将普通JOIN转换成MapJoin,可以通过以下两个属性来设置该优化的触发时机:

    -- 自动开启MAPJOIN优化,默认值为true,
    set hive.auto.convert.join=true;
    -- 通过配置该属性来确定使用该优化的表的大小,如果表的大小小于此值就会被加载进内存中,默认值为2500000(25M),
    set hive.mapjoin.smalltable.filesize=2500000;
    SELECT
            pis.id_ id,
            service_name serviceName,
            service_code serviceCode,
            bd.code_text serviceType,
    FROM
        prd_price_increment_service pis
        left join prd_price_increment_product pip on pis.id_ = pip.increment_service_id
    
    -- 特殊说明
    使用默认启动该优化的方式如果出现莫名其妙的BUG(比如MAPJOIN并不起作用),就将以下两个属性置为fase手动使用MAPJOIN标记来启动该优化:
    -- 关闭自动MAPJOIN转换操作
    set hive.auto.convert.join=false;
    -- 不忽略MAPJOIN标记
    set hive.ignore.mapjoin.hint=false;
    SELECT
          /*+ mapjoin(pis)*/
            pis.id_ id,
            service_name serviceName,
            service_code serviceCode,
            bd.code_text serviceType,
    FROM
        prd_price_increment_service pis
        left join prd_price_increment_product pip on pis.id_ = pip.increment_service_id
    

    特说说明:将表放到Map端内存时,如果节点的内存很大,但还是出现内存溢出的情况,我们可以通过这个参数 mapreduce.map.memory.mb 调节Map端内存的大小。

    2.2 表中作为关联条件的字段值为0或空值的较多

    解决方式:给空值添加随机key值,将其分发到不同的reduce中处理。由于null值关联不上,所以对结果无影响。

    -- 方案一、给空值添加随机key值,将其分发到不同的reduce中处理。由于null值关联不上,所以对结果无影响。
    SELECT * 
    FROM log a left join users b 
    on case when a.user_id is null then concat('hive',rand()) else a.user_id end = b.user_id;
    
    -- 方案二:去重空值
    SELECT a.*,b.name
    FROM 
        (SELECT * FROM users WHERE LENGTH(user_id) > 1 OR user_id IS NOT NULL ) a
    JOIN
        (SELECT * FROM log WHERE LENGTH(user_id) > 1 OR user_id IS NOT NULL) B
    ON a.user_id; = b.user_id;
    

    2.3 表中作为关联条件的字段重复值过多

    SELECT a.*,b.name
    FROM 
        (SELECT * FROM users WHERE LENGTH(user_id) > 1 OR user_id IS NOT NULL ) a
    JOIN
        (SELECT * FROM (SELECT *,row_number() over(partition by user_id order by create_time desc) rk FROM log WHERE LENGTH(user_id) > 1 OR user_id IS NOT NULL) temp where rk = 1) B
    ON a.user_id; = b.user_id;
    

    三、空值引发的数据倾斜

    实际业务中有些大量的null值或者一些无意义的数据参与到计算作业中,表中有大量的null值,如果表之间进行join操作,就会有shuffle产生,这样所有的null值都会被分配到一个reduce中,必然产生数据倾斜。

    之前有小伙伴问,如果A、B两表join操作,假如A表中需要join的字段为null,但是B表中需要join的字段不为null,这两个字段根本就join不上啊,为什么还会放到一个reduce中呢?

    这里我们需要明确一个概念,数据放到同一个reduce中的原因不是因为字段能不能join上,而是因为shuffle阶段的hash操作,只要key的hash结果是一样的,它们就会被拉到同一个reduce中。

    -- 解决方案
    -- 场景:如日志中,常会有信息丢失的问题,比如日志中的 user_id,如果取其中的 user_id 和 用户表中的user_id 关联,会碰到数据倾斜的问题。
    -- 方案一:可以直接不让null值参与join操作,即不让null值有shuffle阶段,所以user_id为空的不参与关联
    SELECT * FROM log a JOIN users b  ON a.user_id IS NOT NULL   AND a.user_id = b.user_id UNION ALL SELECT * FROM log a WHERE a.user_id IS NULL; 
    
    -- 方案二:因为null值参与shuffle时的hash结果是一样的,那么我们可以给null值随机赋值,这样它们的hash结果就不一样,就会进到不同的reduce中:
    SELECT * FROM log a  LEFT JOIN users b ON CASE     WHEN a.user_id IS NULL THEN concat('hive_', rand())    ELSE a.user_id   END = b.user_id;
    

    特殊说明:针对上述方案进行分析,方案二比方案一效率更好,不但io少了,而且作业数也少了。方案一中对log读取可两次,jobs是2。方案二job数是1。本优化适合对无效id (比如 -99, ’’, null等) 产生的倾斜问题。把空值的 key 变成一个字符串加上随机数,就能把倾斜的数据分到不同的reduce上,解决数据倾斜问题。

    四、不同数据类型关联产生数据倾斜

    对于两个表join,表a中需要join的字段key为int,表b中key字段既有string类型也有int类型。当按照key进行两个表的join操作时,默认的Hash操作会按int型的id来进行分配,这样所有的string类型都被分配成同一个id,结果就是所有的string类型的字段进入到一个reduce中,引发数据倾斜。

    -- 如果key字段既有string类型也有int类型,默认的hash就都会按int类型来分配,那我们直接把int类型都转为string就好了,这样key字段都为string,hash时就按照string类型分配了:
    
    方案一:把数字类型转换成字符串类型
    SELECT * FROM users a  LEFT JOIN logs b ON a.usr_id = CAST(b.user_id AS string); 
    
    方案二:建表时按照规范建设,统一词根,同一词根数据类型一致
    

    五、count distinct 大量相同特殊值

    由于SQL中的Distinct操作本身会有一个全局排序的过程,一般情况下,不建议采用Count Distinct方式进行去重计数,除非表的数量比较小。当SQL中不存在分组字段时,Count Distinct操作仅生成一个Reduce 任务,该任务会对全部数据进行去重统计;当SQL中存在分组字段时,可能某些 Reduce 任务需要去重统计的数量非常大。在这种情况下,我们可以通过以下方式替换。

    -- 可能会造成数据倾斜的sql
    select a,count(distinct b) from t group by a;
    -- 先去重、然后分组统计
    select a,sum(1) from (select a, b from t group by a,b) group by a;
    -- 总结: 如果分组统计的数据存在多个distinct结果,可以先将值为空的数据占位处理,分sql统计数据,然后将两组结果union all进行汇总结算。
    

    六、数据膨胀引发的数据倾斜

    在多维聚合计算时,如果进行分组聚合的字段过多,且数据量很大,Map端的聚合不能很好地起到数据压缩的情况下,会导致Map端产出的数据急速膨胀,这种情况容易导致作业内存溢出的异常。如果log表含有数据倾斜key,会加剧Shuffle过程的数据倾斜。

    -- 造成倾斜或内存溢出的情况
    -- sql01
    select a,b,c,count(1)from log group by a,b,c with rollup;
    -- sql02
    select a,b,c,count(1)from log grouping sets a,b,c;
    
    -- 解决方案
    -- 可以拆分上面的sql,将with rollup拆分成如下几个sql
    select a,b,c,sum(1) from (
        SELECT a, b, c, COUNT(1) FROM log GROUP BY a, b, c
        union all
        SELECT a, b, NULL, COUNT(1) FROM log GROUP BY a, b 
        union all
        SELECT a, NULL, NULL, COUNT(1) FROM log GROUP BY a
        union all
        SELECT NULL, NULL, NULL, COUNT(1) FROM log
    ) temp;
    -- 结论:但是上面这种方式不太好,因为现在是对3个字段进行分组聚合,那如果是5个或者10个字段呢,那么需要拆解的SQL语句会更多。
    
    -- 在Hive中可以通过参数 hive.new.job.grouping.set.cardinality 配置的方式自动控制作业的拆解,该参数默认值是30。
    -- 该参数主要针对grouping sets/rollups/cubes这类多维聚合的操作生效,如果最后拆解的键组合大于该值,会启用新的任务去处理大于该值之外的组合。如果在处理数据时,某个分组聚合的列有较大的倾斜,可以适当调小该值。
    set hive.new.job.grouping.set.cardinality=10;
    select a,b,c,count(1)from log group by a,b,c with rollup;
    

    总结

    上文为你深入浅出地讲解什么是Hive数据倾斜、数据倾斜产生的原因以及面对数据倾斜的解决方法。概括而言,让Map端的输出数据更均匀地分布到Reduce中,是我们的终极目标,也是解决Reduce端倾斜的必然途径。

    在此过程中,掌握四点可以帮助我们更好地解决数据倾斜问题:

    1. 如果任务长时间卡在99%则基本可以认为是发生了数据倾斜,建议开发者调整参数以实现负载均衡:set hive.groupby.skewindata=true
    2. 小表关联大表操作,需要先看能否使用子查询,再看能否使用Mapjoin
    3. Join操作注意关联字段不能出现大量的重复值或者空值
    4. Count(distinct id ) 去重统计要慎用,尽量通过其他方式替换

    最后,作者的想法还是遇到问题再解决问题,不要过度设计,正常情况下按照sql研发规范编写,遇到数据倾斜的场景一般不多,即使遇到数据倾斜问题、且数据量不大,如对结果产出延迟在可接受范围,就不必过度重视。

    展开全文
  • 所谓“站在巨人的肩膀上”在这篇文章中就很好的体现出来了,本文的一些图和一些概念就是借鉴“巨人”发表的文章中的图和概念。这还得感谢这些“巨人”把我们领向一条光明大道。我不是摄影测量的专业人士,GIS圈也...

    所谓“站在巨人的肩膀上”在这篇文章中就很好的体现出来了,本文的一些图和一些概念就是借鉴“巨人”发表的文章中的图和概念。这还得感谢这些“巨人”把我们领向一条光明大道。我不是摄影测量的专业人士,GIS圈也只是个刚入门的菜鸟。请允许我从一个菜鸟的角度,来说说那些菜鸟入门级的倾斜摄影测量知识。 近些年,倾斜摄影测量在GIS圈掀起了一阵巨浪,有人说他颠覆传统的测绘领域,有人说他替代了传统建模方式。正因为倾斜摄影测量被带上了这么多光环,人们开始对他有形形色色的猜想,让它变得越来越神秘。就让咱们来看看倾斜摄影数据的效果图。 这里写图片描述

    我们先来看看倾斜摄影测量,和我们传统的影像有什么区别? 从数据采集的方式来看,传统影像是通过飞机上搭载的航摄仪对地面连续摄取相片,而后经过一系列的内业处理得到的影像数据,获取的成果只有地物俯视角度信息,也就是视角垂直于地面。而倾斜摄影测量测试通过飞机或无人机搭载5个相机从前、后、左、右、垂直五个方向对地物进行拍摄,再通过内业的几何校正、平差、多视影像匹配等一系列的处理得到的具有地物全方位信息的数据。简单理解就是,影像上地物是在一个平面的,倾斜摄影测量地物是具有真实高度的。 这里写图片描述

    我们知道了倾斜摄影数据采集的方式,通过倾斜摄影数据加工的关键技术,比如多视影像联合平差、多视影像关键匹配、数字表面模型生产和真正射影像纠正等,得到地表数据更多的侧面信息,加上内业数据处理,得到数据的三维模型。 下面,小伙伴们一起来揭开倾斜摄影测量数据神秘的面纱吧!倾斜摄影测量的数据本质上来看是mesh模型,什么是mesh模型呢?mesh模型就是网格面模型,它是点云通过一些算法,比如区域增长法、八叉树算法和波前算法等等构成的。而点云是在同一空间参考系下用来表示目标空间分布和目标表面特性的海量点集合。内业软件基于几何校正,联合平差等处理流程,可计算出基于影像的超高密度点云,如下图: 这里写图片描述 最后经过纹理映射构建真实三维模型,如下图: 这里写图片描述 下面我们说说倾斜摄影测量常用的处理软件。目前国内外比较流行的倾斜摄影自动建模软件如下: 1) 法国Acute3D公司的Smart3DCapture。基于图形运算单元GPU的快速三维场景运算软件,可运算生成基于真实影像的超高密度点云,它能无需人工干预地从简单连续影像中生成逼真的三维场景模型。 国内有多家重要的数据生产单位正在使用该软件。 2) 法国INFOTERRA公司的像素工厂(Pixel Factory)StreetFactory子系统通过对获得的倾斜影像进行几何处理、多视匹配、三角网构建,提取典型地物的纹理特征,并对该纹理进行可视化处理,最终得到三维模型。 3) 美国苹果公司收购C3公司所采用自动建模技术。 4) 美国Pictometry公司的Pictometry倾斜影像处理软件提供了EFSElectronic Field Study。 5) 国内有多家企业及单位对倾斜模型也有比较深入的研究,形成自己独特的模型工艺流程。 目前网络上可以方便找到Smart3DCapture Viewer,并且在Acute3D官网提供了示范的数据, 感兴趣的同学不妨下载试用,下载地址:http://www.acute3d.com/s3c-viewer/。 好,啰嗦了这么多,咱们言归正传,超图可以用倾斜摄影测量的数据做些啥呢? 倾斜摄影测量的数据格式很多,超图目前支持的是OSGB文件格式的倾斜模型,OSGB文件格式自带了多级金子塔的模型精度级别,为了充分利用其LOD结构,超图平台通过scp索引文件直接加载模型,SuperMap iDesktop 7C版本已经为用户提供了生成scp的工具,通过在场景中直接加载scp的方式,可以快速的在三维场景中显示不同精度的LOD层级。这里得说明下LOD这个概念,LOD(Level of Detail)是GIS平台提高性能的一个重要法宝,即对同一个数据从清晰到模糊有多层。当屏幕视角距离某个地物近时,软件自动调用最清晰层的数据;当屏幕视角远离该地物时,则自动切换为模糊层的数据。想在前年使用超图对接OSGB数据时,使用的还是数据导入,再生成模型缓存的方式,这种方式耗时长而且数据加载慢,这样做完全不利于用户体验。现在以这种scp直接加载数据的方式,真是好太多太多了。 倾斜摄影测量的数据有这么多的好处,比如浏览速度快、数据加载快、数据精度高、数据真实性高、人员成本低、和数据获取耗时短等等,但是用过的人都知道,单体化是倾斜摄影测量应用的一大问题,为什么这么说呢?咱们的倾斜摄影数据是一个整体,不像简单模型那样是一个或者同类对象组成的,所以咱们想要倾斜模型像简单模型那样能够进行查询、分析和编辑等功能,这就是倾斜模型存在的一个大问题。 那么什么叫单体化?我们有必要先了解下这个概念。简单的说单体化也就是获取或者分离一个场景中的单个或者一类对象。怎样获取或者分离呢?不同的人有不同的看法,比如说一些人认为应该用切割的方式来获取或者分离;而超图则是用矢量化的方式来获取或者分离,这主要是超图得益于二三维一体化的技术,将二三维的矢量数据与倾斜摄影测量数据相结合,实现了数据的单体化,有了这个法宝,什么查询,分析,编辑等等功能的实现就水到渠成了。小菜我就以这两种方式来谈谈单体化问题。 提倡切割的人认为,单体化单体化,不仅仅是要单体,而且还要实现分离的效果,最后把同类的给分离出来,完成分离之后给后期基于此数据做查询、分析和编辑等功能时提供方便。现在我们来看看切割出来的效果: 这里写图片描述

    上面两张图看来切割的方式展现出来的效果还蛮不错的,有条有理的,但是当你拉近到一定程度的时候效果是这样的: 这里写图片描述 这样看来切割的缺点就不言而喻了,出现的锯齿状是用户最不愿看到的,而且这也不是倾斜摄影该有的展示效果。这是效果问题,那么对于倾斜摄影的话,我们后续还要进行数据处理,切割之后的倾斜摄影要替换人工精细建模的时候, 如何才能把锯齿状边缘与精细模型的边缘对接上去,使它们能够达到无缝结合?要隐藏某种地物的时候露出锯齿状的空洞,该如何解决?而且这样切割之后,也抛弃了倾斜摄影数据自带的LOD的优点,导致GIS平台只能用按照普通模型的方法来构建LOD。这些都应该是 “切割工程师”需要考虑的。 说完了切割咱们来一起来探讨下矢量化的方式,何谓矢量化?矢量化则是用简单的面数据,通过贴附的方式展现在倾斜摄影模型数据上,在保证效果、不破坏原始数据和LOD的同时,最大的好处还在于它打通了基于三维的倾斜摄影与基于二维的矢量面之间的关键“关卡”,实现三维和二维GIS的完美一体化。现在小菜我就给大家带来大神做的矢量化的单体化的效果图: 这里写图片描述 看到了这样的效果,小伙伴们是怎样想的呢?上面咱们介绍了单体化的内容,现在咱们来说说矢量化解决了倾斜摄影数据的一个重大问题水面“破洞”现象,“破洞”是由于水面的高反射现象使得相机在获取水面信息的时候被“致盲”了,使得那片区域的信息不准确。下面咱们来看看修补前后的效果图: 这里写图片描述 这里写图片描述 超图弥补的还不止一个倾斜摄影数据本身的问题,比如树的还原度不高等问题。大家都知道相机是有拍摄盲区,比如立交桥的下面,建筑的屋檐下面,这些被遮挡的地方,而这盲区也是不可避免的,就像人的眼睛不能看到遮挡物后面的区域,这片区域就是我们所谓的盲区 。对于这个区域目前各个公司有不同的想法,目前听来,才疏学浅的我觉得,倾斜摄影测量+街景的想法很靠谱,既可以利用现有的资源,同时还能弥补互相的不足 ,不过街景数据的覆盖程度和数据更新的算法也是个问题,那就让我们拭目以待。下面这幅图就给大家说明了相机盲区和树的还原度问题。 这里写图片描述 在小菜我看来,倾斜数据的价值还需要挖掘,它能提供给我们的价值远不止目前看到的,随之数据精度的不断提高,采集速度的加快,有可能在未来的某一天,倾斜摄影测量的数据可能会成为人们了解现实环境的重要手段。 但是犹如上文提及到的一系列功能模块,我们也可以看到倾斜摄影的一些缺陷:比如倾斜摄影相机的盲点区域,比如立交桥下、房屋遮挡处;水面的“破洞”现象;不规则模型的还原度不高,比如说树等等,造成这些缺陷的主要原因还在于倾斜摄影相机的拍摄技巧、硬件、还原模型的计算方式等因素。在这里小菜我也希望大家一起努力的学习把倾斜摄影这个技术提升到一个新的高度,也让我们的数字化城市、数字化地球更加真实、漂亮、更具观赏性。未来是咱们的!
    本文转载自https://blog.csdn.net/supermapsupport/article/details/47129077

    展开全文
  • 检查数据倾斜分布

    2021-01-19 03:38:27
    从传统数据库迁移到GP中一个重要的且经常被开发人员忽略的概念是数据分布,没有良好的设计表的分布键会导致严重的性能问题,以下函数将给开发人员及DBA检测一个表的数据倾斜情况。 -- Function: gpmg.data_skew...
  • hive数据倾斜及处理

    2020-12-24 20:12:09
    数据倾斜的直白概念:数据倾斜就是数据的分布不平衡,某些地方特别多,某些地方又特别少,导致的在处理数据的时候,有些很快就处理完了,而有些又迟迟未能处理完,导致整体任务最终迟迟无法完成,这种现象就是数据...
  • 数据倾斜原理及解决方案

    万次阅读 多人点赞 2018-11-20 16:03:22
    在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念: 正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , ...
  • 什么是数据倾斜及数据倾斜是怎么产生 简单来说数据倾斜就是数据的key 的分化严重不均,造成一部分数据很多,一部分数据很少的局面。 举个 word count 的入门例子,它的map 阶段就是形成 (“aaa”,1)的形式,然后...
  • 基于深度学习的高中数学概念课教学探析——以人教版必修二《直线的倾斜角与斜率》为例.pdf
  • 在弄清什么是数据倾斜之前,我想让大家看看数据分布的概念: 正常的数据分布理论上都是倾斜的,就是我们所说的20-80原理:80%的财富集中在20%的人手中, 80%的用户只使用20%的功能 , 20%的用户贡献了80%的访问量 , ...
  • Cesium--倾斜摄影加载详细攻略

    万次阅读 热门讨论 2019-06-25 15:37:10
    几个概念重要的 摄影测量的格式 常见的有: osgb dae gltf b3dm …其他格式 这里我主要介绍OSGB格式的倾斜摄影测量 OSGB是OSG三维引擎定义的数据格式.OSG为二进制格式,然后贴上纹理图片就可以转换为OSGB 几大主流...
  • 倾斜摄影行业背景知识介绍

    千次阅读 2017-07-07 23:10:55
    概念和原理就不讲了,听得太多了。首先我们来聊一下倾斜摄影的行业发展,大家就是大概了解一下,出去跑客户的时候也是谈资嘛。 整个行业的发展差不多有十来年的发展历史吧:国外代表性的有苹果公司收购C3公司采用的...
  • 倾斜摄影是最近比较火的一个话题,但也引出了一些问题,有些需要倾斜摄影数据的单位或者公司,想和已有数据做结合,对外提需求就是,能否做到1:500的精度?我们现在有很多1:500的线画图,dlg图,能否做到完美结合...
  • Hive 数据倾斜 常用解决办法

    千次阅读 2020-05-24 20:22:04
    1.数据倾斜的原因 数据分布不均匀,造成数据热点问题 2.数据倾斜的现象 Hive任务进度长时间维持在99%或者100%的附近,进度好久没变化。通过查看任务监控页面Web,发现只有一个或者少数的reduce任务未完成,...
  • 一、概念 数据处理中的数据倾斜:个人理解,在数据处理的MapReduce程序中,由于数据的特殊性,数据中存在大量相同key的数据,根据业务需求需要对这个key进行分区操作(group by/join)时,在map的partition阶段将大...
  • kafka 平衡leade (倾斜)

    2021-08-23 23:13:53
    在kafkaManager Web 上查看leader Skewed(倾斜)在Broker节点为true,表示所有分区的leader在同一Broker上 为了避免这种不平衡,Kafka有一个首选副本的概念。如果一个分区的副本列表是11,12,13,节点11...
  • 7 Hive数据倾斜及处理

    2021-02-01 22:52:35
    Hive数据倾斜及处理① 数据倾斜概念及原因1.1 数据倾斜数据倾斜就是数据的分布不平衡,某些地方特别多,某些地方又特别少,导致的在处理数据的时候,有些很快就处理完了,而有些又迟迟未能处理完,导致整体任务...
  • 数据倾斜 概念 在大数据处理的过程中,出现数据分配不均匀,导致整体任务完成缓慢的现象 特点 分布式任务中,大部分任务均已完成,只有少部分卡在99%. 类似**木桶原理,**任务完成是时间取决于最后一个任务的完成...
  • 菜鸟都应该知道的倾斜摄影测量知识

    万次阅读 多人点赞 2015-07-29 15:54:12
    所谓“站在巨人的肩膀上”在这篇文章中就很好的体现出来了,本文的一些图和一些概念就是借鉴“巨人”发表的文章中的图和概念。这还得感谢这些“巨人”把我们领向一条光明大道。我不是摄影测量的专业人士,GIS圈也...
  • Hive的HQL语句及数据倾斜解决方案

    万次阅读 2016-06-14 23:05:39
    一、Hive的基本概念 在Hive中没有插入操作,但是可以通过load data批量导入数据文件。  Hive中分为内部表和外部表。  内部表:表数据存放在统一的/user/hive/warehouse目录下; drop表时会将表的数据及表的元信息...
  • 三维GIS:倾斜摄影技术倾斜摄影技术简介倾斜摄影模型处理海量数据单体化效果修补 倾斜摄影技术简介 倾斜摄影技术是国际测绘领域近些年发展起来的一项高新技术,它颠覆了以往正射影像只能从垂直角度拍摄的局限,通过...
  • 一:什么是数据倾斜 数据倾斜其根本原因是由于原始数据的分布不均匀,造成部分数据大量的集中在某一个节点上,形成了数据热点,导致这一节点运行时间远远大于其他节点的时间 二:Hive中有那些容易造成数据倾斜的情况...
  • 倾斜模型单体化的研究

    千次阅读 2019-03-13 10:52:11
    倾斜摄影三维建模及应用是近年来测绘领域关注的热点,产业链上下游的企业为此都在积极探索,以推动该项技术的健康发展和落地应用。然而,什么样的技术才是真正符合用户实际应用需求的?在这里,我们要和大家讲解倾斜...
  • OpenCV 文字识别(一):图像预处理 上一篇的博客中我使用了C++对印刷体汉字进行... 2.python矩阵在概念上要着重理解。写代码时踩了很多坑。 3.pycharm这个IDE很强大,一些语法上的不规范便有各色波浪线,强迫症福利。
  • 大数据篇--数据倾斜

    2021-05-31 20:59:59
    文章目录一、什么是数据倾斜 一、什么是数据倾斜   简单的讲,数据倾斜就是我们在计算数据的时候,数据的分散度不够,导致大量的数据集中到了一台或者几台机器上计算,造成数据热点问题(数据倾斜的另一种说法),...
  • Hive千亿级数据倾斜 数据倾斜问题 数据倾斜是分布式系统不可避免的问题,任何分布式系统都有几率发生数据倾斜; 只有数据达到一个量级时,一台机器应付不了这么多的数据,这时如果发生数据倾斜,那么最后就很难算出...
  • Spark 数据倾斜 join 调优

    千次阅读 2017-08-17 13:22:52
    原文出处:https://tech.meituan.com/spark-tuning-pro.html前言继基础篇讲解了每个Spark开发人员都...数据倾斜调优调优概述有的时候,我们可能会遇到大数据计算中一个最棘手的问题——数据倾斜,此时Spark作业的性能
  • 倾斜摄影三维建模及应用是近年来测绘 领域关注的热点,产业链上下游的企业为此都在积极探索,以推动该项技术的健康发展和落地应用。然而,什么样的技术才是真正符合用户实际应用需求的?在这 里,我们要和大家讲解...
  • 相控阵的概念可理解为用相位补偿的方式实现空间滤波,其基本假设之一是接收或发射信号为窄带的(带宽/载波频率<<1),当不满足该假设时,就会出现两种效应,其中之一称为波束倾斜(Beam Squint)123。 波束...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 22,267
精华内容 8,906
关键字:

倾斜的概念

友情链接: tmk_keyboard-master.zip