精华内容
下载资源
问答
  • 倾斜模型数据优化和处理 今天通过我的经验吧,给你们讲一下关于倾斜模型数据优化和处理问题,因为我也是也是小白,这几天测试得出的最后结论 一.服务器的改变 服务器最好安装两个,一个nginx和一个tomact这样实现...

    **

    倾斜模型数据优化和处理

    今天通过我的经验吧,给你们讲一下关于倾斜模型数据优化和处理问题,因为我也是也是小白,这几天测试得出的最后结论

    一.服务器的改变

    服务器最好安装两个,一个nginx和一个tomact这样实现动静分离,把静态的数据(前端)放在nginx上,因为nginx可以对B3DM数据压缩,吞吐量快一些,把动态(后端)的数据放在tomact,实现前端完全分离。

    二.数据的改变

    国内对数据本身优化最好的软件分为三个:
    1.超图
    2.cesiumlab
    3.图新
    先来介绍一下超图吧,超图是有自己专属的API,用官方的cesium.js,毕竟这个两家不同的公司,所以两个版本不兼容的,导致他转换的S3mb我这边不可以用,说实话。他这个真的香,对数据优化是真的可以,但是你必须用他们家JS文件,这个导致我对他家放弃的原因。
    再来说说cesium lab这个软件,你可以换这个手机号申请,他们体验一天的,才综合优化和跨平台优化,纹理压缩,具体操作的话你可以加他们群,然后询问管理员,但是他们家唯一的缺点转换就是太慢了,太吃内存,因为我的数据200G数据,数据太大,所以我放弃cesium lab
    **最后说一下图新吧,你可以使用它们家会员一个周,只要你手机号够多,你可以一直飘到最后,你可以先采用他们根节点合并,

    具体操作方法在这个连接里,这是图新官方API

    http://www.tuxingis.com/document/product/1/64/8/8-6/8-6-1.html

    在通过osg2cesiumApp v1.3

    注意:当你的界面 在1的位置自动生成数据和2的位置自动生成位置的时候,你才可以点击确定按钮

    进行转换倾斜模型数据,什么都要修改默认就好,然后等生成完之后。**

    重头戏来了

    你加载倾斜摄影数据的时候,打开浏览器的开发者模式,你在network—ALL 查看它加载的层级是13-22级,当镜头飞过的时候,它是先加载18级数据,然后根据你的相机高度调整加载的加载的层级增大还是减少。
    他这个数据属于二叉树结构类型,你的级别越大,他的数据量越大,导致你加载的数据越多,这就是造成卡顿的原因。
    其实你可以直接加载22级数据就行,但是这样会造成卡顿和卡屏现象,为了造成不卡顿数据,当用户缩小想看另一个数据的时候,把它变成了13级数据,这样减少内存的消耗,用户体验效果更佳。我的方法是保留的13 15 18 20 22 级,把其他级别的给删了。我删完之后做过对比,原来数据200G,删完之后他的的数据成80G,这样减少数据的容量。
    本人提个建议:你删的时候最好做个备份你再删
    在这里插入图片描述
    这是我删完加载的时候,14级他已经找不到了。
    删除文件:你主要删除Tile_+000_+002_L13_0.json类似的文件,因为所有的B3DM连接都是通过这个json串连接,当你在删除完这Tile_+000_+002_L13_0.json,你在加载数据你会发现,14 16 17 19 21 的层级的B3DM不加载。

    展开全文
  • MapReduce 数据倾斜以及解决思路 1.小背景 在mapreduce的分布式解决框架中,数据处理主要分为2个步骤,map阶段和reduce阶段 map阶段主要是数据转换,也就是按照预期把输入的数据进行转换,变为中间数据 中间数据再...

    MapReduce 数据倾斜以及解决思路

    1.小背景

    1. 在mapreduce的分布式解决框架中,数据处理主要分为2个步骤,map阶段和reduce阶段
    2. map阶段主要是数据转换,也就是按照预期把输入的数据进行转换,变为中间数据
    3. 中间数据再分发给reduce阶段作为输入数据源进行处理。
    4. 在这个过程中,由于数据本身按照设计的key规则划分时就不均匀,再加上使用默认的分区规则进行对key进行计算时,也有产生数据倾斜的可能(key.hashcode() % numberOfReduceTasks)

    2.大背景

    1. 为了解决传统计算机处理数据受限于单台计算机性能上限的问题,新的数据处理思路,分布式计算被提出来。
    2. 单台计算机处理,提升处理效率主要就是从硬件和软件上提升。硬件提升如单CPU多核心、多CPU多核心、内存容量增加例如部分服务器主板可以加内存到521GB甚至1TB、硬盘改为高速硬盘甚至固态硬盘,硬盘Raid技术等等。
    3. 分布式计算则换一种思路,打破单台计算机处理性能上限的问题。程序=数据+逻辑(算法/代码),如果能够让数据做切分,让代码也做切分,让后把切分后的数据和代码同时让多台计算机处理,这样就可以做到以往一台计算机需要处理十小时的任务,使用100台计算机可能几分钟就出结果的效果。
    4. 但分布式计算有一个大前提,需要针对分布式计算的特点,对程序要处理的数据和程序执行代码做处理,方便让程序和数据分发到计算机集群中进行并行执行。

    3.MapReduce简介

    1. Hadoop的mapreduce处理框架,结合hadoop自带的hdfs,就可以做到数据分布式存储,程序分布式计算的目标。
    2. mapreduce是一种编程框架,也可以看作是一种编程思路。就是把问题分成2个大的步骤,第一步,map阶段,就是把输入数据进行转换。转换后的数据称之为中间数据。第二步,reduce阶段,就是把map阶段产生的中间数据进行聚合处理,最后得出结果。
    3. 这里面有几个关键点,hdfs可以把数据进行分布式存储,例如一个200MB文件,默认就会分为0–128MB,128MB–200MB 2个数据块存储到hdfs文件系统中。每个数据块会有3个备份来保证数据安全性。这样就隐形带来一个好处,数据被切块了。
    4. map task处理任务,如果能够让集群中计算机分别处理一个文件的不同数据块,这样就可做到集群分布式并行计算。hdfs已经做到让数据分布式存储。剩下的就是怎样让程序分别处理不同的数据块。
    5. 其实很简单,就是安排一个协调者,指定某台计算机就只处理分布式文件系统中的某一块数据即可。
    6. map task阶段结束后,也会产生中间结果数据,这个数据没错,也是会存到分布式文件系统中的。这时候执行reduce task的计算机,也一样,处理对应数据块的数据即可。最后进行结果汇总就是最后的结果。

    4.数据倾斜

    1. 简单示意图如下:
      在这里插入图片描述
    2. 如图所示,数据倾斜主要就是在中间数据阶段,由于对数据分区处理不均匀,导致有些文件中数据量会非常大,有些文件数据量会少很多。这样一来,后续进行reduce task的计算机节点的任务量就会有很大差异。旱涝不同,雨露不均。

    5.解决思路

    1. 加内存和提升CPU性能,数据倾斜导致最明显一个问题就是个别或者部分计算机节点数据量处理过大,导致整体处理速度变慢。提升集群中计算机硬件水平,可以缓解这个问题,就是比较费钱。。。
    2. 提升并行度,可以看到,我们如果让中间数据文件数量增多,这样数据就会分不到更多文件,对于单个文件数据量过大问题就能得到较好缓解。hadoop中的中间文件数量默认和reduce task数量一致,所以增加reduce task数量可以缓解数据倾斜问题。不过增加并行度其实也受限于集群中计算机节点数量,除非愿意增加计算机物理节点数,这又回到费钱的方式上
    3. 既然数据倾斜是由中间数据分区导致的,那就干脆不执行reduce阶段,直接把数据输出到文件中,这样就不需要进行中间的数据分区操作。实际业务场景就是,启动一个mapreduce,但只是利用map阶段针对数据做清洗和转换。
    4. 既然中间文件数量受限于reduce task任务数,如果设置为1,文件就一个,也不存在数据倾斜问题,数据都写入到一个文件中去了。但这样不符合分布式计算的准则,会极大降低处理效率。
    5. 既然分区是由针对key的处理而产生的,我让key变得均匀散布,也就是把key打散。常见操作就是加一个随机数前缀或者后缀,这样一来,以往相同的key,由于新增了随机数前后缀,变得不一样,中间数据分区时,自然而然就会分到不同文件中去,数据倾斜就被很大缓解了。不过带来另外一个问题,需要对key打散后reduce聚合后数据再做一次mapreduce操作来得到最后数据。这时候可能还会有数据倾斜,但由于已经聚合过一次,数据量会降低很多,数据倾斜也会由较大缓解。
    6. 自定义shuffle分区算法,也就是换一种算法。既然默认的分区算法有问题,那我换一个算法来对数据分区即可。生活中类比来看就是,如果某方面大家都一样,那我就换一个标准和角度来重新划分,一定可以把物品相对均匀的分成相同多份。

    6.代码演示

    6.1输入数据源:

    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    a b a b a c hu jn jgh op a a a a b  c c c c a a a a a a a c c c c c c c c c c c a a a a 
    

    6.2不做key打散的mapreduce代码

    1. 代码
    package com.doit.hadoop.skew;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * @author hulc
     * @slogan: just do it
     * @date 2020/8/20 22:48
     */
    
    public class DataSkewDriver {
        public static void main(String[] args) {
    
            // 配置文件
            Configuration conf = new Configuration();
            conf.set("mapreduce.framework.name", "local");
    
            // job  注意由于引入了配置文件,需要设置为local模式运行
            try {
                Job skew = Job.getInstance(conf, "skew");
    
                // mapper 和reducer类
                skew.setMapperClass(SkewMapper.class);
                skew.setReducerClass(SkewReducer.class);
    
                // map输出的key value
                skew.setMapOutputKeyClass(Text.class);
                skew.setMapOutputValueClass(IntWritable.class);
    
                // reduce 输出的key 和value
                skew.setOutputKeyClass(Text.class);
                skew.setOutputValueClass(IntWritable.class);
    
                // 输入和输出数据源  E:\DOITLearning\8.Hadoop\mrdata\flow\input\dataskew.txt
                FileInputFormat.setInputPaths(skew, new Path("E:\\DOITLearning\\8.Hadoop\\mrdata\\flow\\input\\dataskew.txt"));
                FileOutputFormat.setOutputPath(skew, new Path("E:\\DOITLearning\\8.Hadoop\\mrdata\\flow\\skew_output1"));
    
                // reduce 任务数
                skew.setNumReduceTasks(2);
    
                // 启动任务
                boolean b = skew.waitForCompletion(true);
    
                if(b) {
                    System.out.println("success");
                } else {
                    System.out.println("failed");
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    class SkewMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        IntWritable mapValue = new IntWritable(1);
        Text mapKey = new Text();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            // 简单的统计单词个数
            String line = value.toString();
    
            String[] split = line.split("\\s+");
    
            for (String s : split) {
                mapKey.set(s);
                context.write(mapKey, mapValue);
            }
        }
    }
    
    class SkewReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        IntWritable reduceValue = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // 聚合统计单词个数
            int count = 0;
            for (IntWritable value : values) {
                count ++;
            }
            reduceValue.set(count);
    
            context.write(key, reduceValue);
        }
    }
    
    1. 输出结果
      从结果看,数据倾斜其实比较严重,单词a和c数量过多,这样依赖,处理这些数据的reduce task的节点任务就比另外一个要重一些。主要看单词数据量对比,是7 -8倍的处理数据量的差距
      在这里插入图片描述

    6.3 增加reduce task并行度的结果

    1. 代码
    package com.doit.hadoop.skew;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    /**
     * @author hulc
     * @slogan: just do it
     * @date 2020/8/20 22:48
     */
    
    public class DataSkewDriver {
        public static void main(String[] args) {
    
            // 配置文件
            Configuration conf = new Configuration();
            conf.set("mapreduce.framework.name", "local");
    
            // job  注意由于引入了配置文件,需要设置为local模式运行
            try {
                Job skew = Job.getInstance(conf, "skew");
    
                // mapper 和reducer类
                skew.setMapperClass(SkewMapper.class);
                skew.setReducerClass(SkewReducer.class);
    
                // map输出的key value
                skew.setMapOutputKeyClass(Text.class);
                skew.setMapOutputValueClass(IntWritable.class);
    
                // reduce 输出的key 和value
                skew.setOutputKeyClass(Text.class);
                skew.setOutputValueClass(IntWritable.class);
    
                // 输入和输出数据源  E:\DOITLearning\8.Hadoop\mrdata\flow\input\dataskew.txt
                FileInputFormat.setInputPaths(skew, new Path("E:\\DOITLearning\\8.Hadoop\\mrdata\\flow\\input\\dataskew.txt"));
                FileOutputFormat.setOutputPath(skew, new Path("E:\\DOITLearning\\8.Hadoop\\mrdata\\flow\\skew_output2"));
    
                // reduce 任务数
                skew.setNumReduceTasks(5);
    
                // 启动任务
                boolean b = skew.waitForCompletion(true);
    
                if(b) {
                    System.out.println("success");
                } else {
                    System.out.println("failed");
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    class SkewMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        IntWritable mapValue = new IntWritable(1);
        Text mapKey = new Text();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            // 简单的统计单词个数
            String line = value.toString();
    
            String[] split = line.split("\\s+");
    
            for (String s : split) {
                mapKey.set(s);
                context.write(mapKey, mapValue);
            }
        }
    }
    
    class SkewReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        IntWritable reduceValue = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // 聚合统计单词个数
            int count = 0;
            for (IntWritable value : values) {
                count ++;
            }
            reduceValue.set(count);
    
            context.write(key, reduceValue);
        }
    }
    
    
    1. 运行结果
      注意这里有一个文件是空的,还记得之前说的,中间数据分区时,是根据reducetask任务数。key.hashcode() % 5,如果这时候这些key的hashcode模除5没有等于1的,中间文件对应1这个文件夹就是空的,自然而然,reduce task处理这个文件得出的结果也会是空的。
      在这里插入图片描述

    6.3 对key进行打散处理(随机数)

    1. 代码,这里的reducetask设置为2,和初始时的代码一样,注意并没有设置为5,单一变量,方便做对比实验。
    package com.doit.hadoop.skew;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    import java.util.Random;
    
    /**
     * @author hulc
     * @slogan: just do it
     * @date 2020/8/20 22:48
     */
    
    public class DataSkewDriver {
        public static void main(String[] args) {
    
            // 配置文件
            Configuration conf = new Configuration();
            conf.set("mapreduce.framework.name", "local");
    
            // job  注意由于引入了配置文件,需要设置为local模式运行
            try {
                Job skew = Job.getInstance(conf, "skew");
    
                // mapper 和reducer类
                skew.setMapperClass(SkewMapper.class);
                skew.setReducerClass(SkewReducer.class);
    
                // map输出的key value
                skew.setMapOutputKeyClass(Text.class);
                skew.setMapOutputValueClass(IntWritable.class);
    
                // reduce 输出的key 和value
                skew.setOutputKeyClass(Text.class);
                skew.setOutputValueClass(IntWritable.class);
    
                // 输入和输出数据源  E:\DOITLearning\8.Hadoop\mrdata\flow\input\dataskew.txt
                FileInputFormat.setInputPaths(skew, new Path("E:\\DOITLearning\\8.Hadoop\\mrdata\\flow\\input\\dataskew.txt"));
                FileOutputFormat.setOutputPath(skew, new Path("E:\\DOITLearning\\8.Hadoop\\mrdata\\flow\\skew_output3"));
    
                // reduce 任务数
                skew.setNumReduceTasks(2);
    
                // 启动任务
                boolean b = skew.waitForCompletion(true);
    
                if(b) {
                    System.out.println("success");
                } else {
                    System.out.println("failed");
                }
    
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    }
    
    class SkewMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
        IntWritable mapValue = new IntWritable(1);
        Text mapKey = new Text();
    
        Random random = new Random();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
    
            // 简单的统计单词个数
            String line = value.toString();
    
            String[] split = line.split("\\s+");
    
            for (String s : split) {
                // 打散key时,为了让数据更加均匀,一般都是取reducetask的倍数进行随机数生成并拼接。这样打散后数据分配到各个区中的概率会相对平均一些
                mapKey.set(s+random.nextInt(4));
                context.write(mapKey, mapValue);
            }
        }
    }
    
    class SkewReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
        IntWritable reduceValue = new IntWritable();
    
        @Override
        protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
            // 聚合统计单词个数
            int count = 0;
            for (IntWritable value : values) {
                count ++;
            }
            reduceValue.set(count);
    
            context.write(key, reduceValue);
        }
    }
    
    
    1. 数据结果,注意这里来看,单词a作为key,加了随机数后缀后,确实被打散到了不同的文件中去,2个输出结果文件现在的数量看起来要均匀很多。
      在这里插入图片描述
    2. 注意key打散之后,需要对打散后的数据做二次mapreduce聚合处理,但这个时候数据倾斜已经有了较大缓解了。

    总而言之,数据倾斜的来源是为了方便分布式计算而对中间数据做切分,但切分代码并不是所有场景下都能相对均匀切分,这时候就会出现多分数据,但有些数据多,有些数据少。这样后续的计算几点处理数据时,运行负载就会有差异,所以需要想办法进行重新划分,其实就是负载均衡的一种体现

    展开全文
  • 数据倾斜问题是分布式架构的重要难题,它破坏了MPP架构中各个节点对等的要求,导致单节点(倾斜节点)所存储或者计算的数据量远大于其他节点,所以会造成以下危害: 存储上的倾斜会严重限制系统容量,在系统容量不...

    什么是数据倾斜?

    数据倾斜问题是分布式架构的重要难题,它破坏了MPP架构中各个节点对等的要求,导致单节点(倾斜节点)所存储或者计算的数据量远大于其他节点,所以会造成以下危害: 存储上的倾斜会严重限制系统容量,在系统容量不饱和的情况下,由于单节点倾斜的限制,使得整个系统容量无法继续增长。

    FLINK中,如何定位数据倾斜?

    1、进入flink-webUI界面

     

    2、哪类算子易出现数据倾斜?

     

     3、为什么keyedProcess易出现数据倾斜?

            1)非KeyProcess的分区策略(默认shuffle 或 forward):

            rebalance(以轮循方式均匀地分布到下一个操作)
            shuffle(输出元素均匀随机地打乱到下一个操作)

                    forward(并行分区传输到下一个操作算子)

                    rescale(如果上游操作的并行度为2,而下游操作的并行度为4,那么一个上游操作将把元素分配给两个下游操作,而另一个上游操作将分配给另外两个下游操作)

                    global 输出值都转到下一个处理操作符的第一个实例

            2)如果非KeyedProcess的默认分区策略不满足需求,还可以调用partitionCustom自定义分区。

            3)keyProcess的分区策略:

                    KeyGroupRangeAssignment的assignToKeyGroup 这个api决定的分区策略。

                    简而言之:你的key,经过hash再hash之后,除以分区数取余得到数据的分区号。

                    所以如果你的key本身分布就不是均匀的,那么大概率会出现倾斜问题。

    4、数据倾斜现象(选择keyStream算子,查看subTasks,观察数据量是否均匀)

     5、优化方案

            1)keyProcess的分区数选择为质数,因为质数取余更均匀。

            2)key再针对细分(没有key状态计算的场景可以如此用,如果是key状态计算,比如某类商品的windowCount,要么容忍倾斜,要么细分状态后再用算子聚合状态计算)

                   比如数据为: 

                        

            Tuple3.of("a","东京","m")

            Tuple3.of("a","东京1","m1")       

            Tuple3.of("a","东京","m2")

            Tuple3.of("a","东京1","m")   

            Tuple3.of("b","东京1","m1")

            Tuple3.of("b","东京","m2")  

    如果你选择的key是tuple.f1,那么出来两个分区,分区数据量分别为 4和2。但是如果选择为tuple.f1+tuple.f2 那么分区数据量分别为:2,2,1,1。数据倾斜便没有那么严重。

    6、进阶解决方案:

    其实我们没有必要生成的key一定是数据本身的key,例如第五步的例子,即使你采取第二种解决方案,出来的key也是(a,"东京")这种样式,但如果你在生成key的时候,把这类key转换为指定范围内的数字(前提是没有key状态计算,只是保证同类key顺序执行你的process),例如:

     这样你的key相对而已,就是在一个可控的范围内,比如范围是1-1000,而你的分区数是127,这种情况下,分区相对而言会均匀很多。

    当然,最优!最优的解决方案,是你按权重去对key进行分配,这种方案需要你对key的权重比较了解,再针对性使用例如switch这种方式去给对应的权重数据分配指定的key。

        

        

    展开全文
  • 大型的互联网服务中对于redis的使用,往往采用集群架构,通过横向扩展redis实例规模的方式,以较低的成本,来提升缓存系统对数据请求的处理效率和数据存储容量。 redis集群架构虽然有众多优点,但是事物往

    在服务端系统服务开发中,缓存是一种常用的技术,它可以提高系统对请求的处理效率,而redis又是缓存技术栈中的一个佼佼者,广泛的应用于各种服务系统中。在大型互联网服务中,每天需要处理的请求和存储的缓存数据都是海量的,在这些大型系统中,使用单实例的redis,很难满足系统超高的并发请求以及海量数据缓存需求。大型的互联网服务中对于redis的使用,往往采用集群架构,通过横向扩展redis实例规模的方式,以较低的成本,来提升缓存系统对数据请求的处理效率和数据存储容量。

    redis集群架构虽然有众多优点,但是事物往往都是有双面性的,在解决某些场景下的问题后,又会在另外的场景下有带来问题。当然redis集群模式,也是符合这个规律的。redis集群模式下,会增加运维的复杂度,限制了redis中某些命令的使用(范围查询,事务操作的使用等),除了这些常见的问题外,数据倾斜问题,是redis集群模式下一个比较隐蔽的问题,只有在一些特殊的场景下才会出现,但是该问题带来的影响确实巨大的。

    一、什么是数据倾斜

       在redis集群模式下,数据会按照一定的分布规则分散到不同的实例上。如果由于业务数据特殊性,按照指定的分布规则,可能导致不同的实例上数据分布不均匀,如以下场景:有些切片实例上数据分布量较大,有些实例上数据分布量较少;有些实例上保存了热点数据,数据访问量较大,有些实例上保存数据相对较"冷",几乎没有访问量。那么存储数据量大的实例,或者保存热点数据的实例,资源利用率会比较高,负载压力较大,导致其对数据请求响应变慢。此时就产生了数据倾斜。

    二、有哪几种数据倾斜

    通常来说,数据倾斜分为以下两种:数据量倾斜数据访问倾斜

    三、数据量倾斜

    数据量倾斜产生的根本原因是:数据在各个redis实例上分布不均匀。

    产生数据分布不均匀的情况主要有以下三种。

    1) 存在bigkey

    当一个实例上存储了一个或者多个bigkey时,例如一个占用空间很大的string类型的数据,或者保存了一个包含很多数据元素的集合时,那么会导致该实例的内存资源消耗严重。而且对于bigkey的操作通常会造成实例IO线程阻塞,使该实例对数据请求处理效率严重下降。

    对于bigkey造成的数据倾斜问题,一个根本的解决方案就是,避免bigkey的产生。常用的做法是对bigkey进行"化整为零",在业务层面生成数据的时候,尽量避免把过多的数据保存到同一个键中。具体可以参考 redis 使用规范

    2) HashTag使用不当

    HashTag是使用redis的一个小技巧,使用方式是 在 key中添加一对花括号 {},这个 {} 可以将key的一部分内容包裹起来,而redis server会对于加上{}的key进行识别,并进行特殊处理。使用HashTag可以增强业务层面对redis集模式下,key分区的控制。具体来说:正常情况下,客户端根据key的完整内容,按照CRC16算法生成一个CRC16值,redis-server按照这个CRC16值,给这个key分配slot,而是用HashTag后,客户端计算key的CRC16值,不在是整个key的内容,而是{}括起来的那部分部分。使用HashTag,可以让我们根据业务属性,在key的适当位置添加{},让业务相关的一些key存储到同一个slot中,同时也就分配到同一个实例上,当相关的数据全部分配到同一个实例上后,就可以执行实例内事务操作(目前redis 集群不支持跨实例事务操作)和范围查询等先关操作了。

    虽然,HashTag,有利于实现跨实例的事务操作和范围查询,但是带来的风险就是,所有业务相关的的数据,都要放到同一个实例上,如果这个业务相关的数据量比较大的话,就会导致,业务所在实例的内存资源消耗严重。产生数据倾斜问题。如果使用 Hash Tag 进行切片的数据会带来较大的访问压力,就优先考虑避免数据倾斜,最好不要使用 Hash Tag 进行数据切片。因为事务和范围查询都还可以放在客户端来执行,而数据倾斜会导致实例不稳定,造成服务不可用

    3)slot分配不均匀

    在redis 集群中,slot是数据在多个redis实例间分配的的基本单元。在一个redis集群中,一共有16384个slot,这些slot会分配到集群中的实例上。如果slot在实例上分配不均匀的话,使某个或者某些实例上分配过多的slot,那么这些实例上被分配数据量也会更多一些,也就会产生类似HashTag一样的数据倾斜问题。

    对于由于slot分配不均导致的数据倾斜问题,可以在slot分配前,通过人工分配的方式,来避免多个slot分配到同一个实例的问题,如果一个集群中slot已经分配完毕,可以通过redis提供的运维工具和相应的运维命令,来查看redis集群中,slot的分配情况,如果存在某些实例上slot分配过多的情况,那么可以将这些过多的slot转移到其他实例上。

    四、数据访问倾斜

    除了数据量过多导致的数据倾斜问题外,如果实例上存在热点数据,如电商秒杀场景中的商品信息。那么会导致所在实例数据请求访问量远高于其他实例,产生数据访问倾斜。

    对于热点数据问题,通常采用的解决方案是多副本冗余。具体做法:我们把热点数据复制多份,在每一个数据副本的 key 中增加一个随机前缀,让它和其它副本数据不会被映射到同一个 Slot 中。这样一来,热点数据既有多个副本可以同时服务请求,同时,这些副本数据的 key 又不一样,会被映射到不同的 Slot 中。在给这些 Slot 分配实例时,我们也要注意把它们分配到不同的实例上,那么,热点数据的访问压力就被分散到不同的实例上了。这样可以有效解决数据访问倾斜问题。但是这种多副本的方式,仅仅适用于对热点数据的读操作,而对于存在写操作的热点数据,就不太适用了,因为对于写操作需要保证多个副本的数据一致性,而保证数据一致性,是一个比较复杂的问题,业务上需要较大的额外开销。

    五、总结

    以集群方式使用redis时,数据倾斜问题,是一个很难发现的问题,当出现数据请求响应变慢时,有可能是数据倾斜问题导致的,这个时候,可以参考上文提到的一些可能产生数据倾斜问题的场景进行排查,然后根据对应的方案进行问题的解决。

    展开全文
  • 数据处理中的数据倾斜:个人理解,在数据处理的MapReduce程序中,由于数据的特殊性,数据中存在大量相同key的数据,根据业务需求需要对这个key进行分区操作(group by/join)时,在map的partition阶段将大数据量的...
  • A表:单副本14.9G,1002354875条数据 ...在sql结尾使用distribute by rand() ,reduce个数88个,但是还是有数据倾斜 select * from A full outer join B on a.partition_path =b.join_path distribute by r...
  • Hive数据倾斜问题总结

    千次阅读 2017-11-25 21:45:55
    Hive数据倾斜问题总结1、MapReduce数据倾斜Hive查询最终转换为MapReduce操作,所以要先了解MapReduce数据倾斜问题。MapReduce程序执行时,reduce节点大部分执行完毕,但是有一个或者几个reduce节点运行很慢,导致...
  • hive或者MR处理数据,不怕数据量大,就怕倾斜。hive里大表join的时候,数据倾斜就是个很头疼的问题。(用该用小表去join大表) 参考: hive join 数据倾斜 真实案例
  • Spark调优 数据倾斜

    2019-04-26 13:28:00
    Spark中的数据倾斜问题主要指shuffle过程中出现的数据倾斜问题,是由于不同的key对应的数据量不同导致的不同task所处理的数据量不同的问题。 例如,reduce点一共要处理100万条数据,第一个和第二个task分别被分配到...
  • 1.Hive数据倾斜优化分为哪两类?2.Hive开发中,为何会出现倾斜?3.Hive倾斜本文有哪些解决方案?实际搞过离线数据处理的同学都知道,Hive SQL 的各种优化方法都是和数据倾斜密切相关的,所以我会先来聊一聊 “「数据...
  • hive 数据倾斜实际问题中总结

    千次阅读 2016-10-25 13:16:14
    1.数据倾斜原因 a.大表(2.8G)与小表关联(580K) b.大表(2.8G)与大表(3.0G)关联 首先谈论大表与小表的关联导致数据倾斜问题  实例如下: 大表数据格式: 小表数据格式: 关联语句 hive>...
  • 提高reduce端并行度并没有从根本上改变数据倾斜的本质和问题(方案一和方案二从根本上避免了数据倾斜的发生),只是尽可能地去缓解和减轻shuffle reduce task的数据压力,以及数据倾斜的问题,适用于有较多key对应的...
  •  balance工具的运行过程中,迭代的将文件块从高使用率的datanode移动到低使用率的datanode上,每一个迭代过程中移动的数据量不超过下面两个值的较小者:10G或者指定阀值*容量,且每次迭代不超过20分钟。每次迭代...
  • Hadoop Streaming 中的数据倾斜坑 1. 背景 最近用 hadoop streaming 跑一个数据集,不算大,每小时150G左右,但是每次耗时特别长,而且基本是卡在了reduce 98%的地方。 &nbsp; &nbsp;&nbsp; 看了下...
  • 内容摘要 硬件技术的不断发展推动了数据处理系统的更新迭代在计算方面现代 CPU 多核众核技术的成熟让大多数据处理系统选择采用数据分区的方式实现 并行计算从而充分发挥CPU 的性能在存储方面大容量内存集群让数据处 ...
  • hive 数据倾斜

    2017-03-09 10:25:45
    1.数据倾斜原因 a.大表(2.8G)与小表关联(580K) b.大表(2.8G)与大表(3.0G)关联 首先谈论大表与小表的关联导致数据倾斜问题  实例如下: 大表数据格式: 小表数据格式: ...
  • 倾斜摄影数据发布

    2021-09-27 14:41:04
    数据处理与开发应用一站式解决方案 技术服务:数据中心 + 桌面端 + 移动端(APP) + WEB端 关 键 词:地图 导航 定位 编辑 开发 分析 智慧 可视化 (以上产品均可立即下载体验) 以技术孵化产品 以产品打动用户 ...
  • 前端面试题

    万次阅读 多人点赞 2019-08-08 11:49:01
    typeof运算符返回值中有一个跟javascript数据类型不一致,它是________”function”_________。 68 定义了一个变量,但没有为该变量赋值,如果alert该变量,javascript弹出的对话框中显示___undefined______ 。 68...
  • 数据倾斜时负载均衡,当选项设定为true,生成的查询计划会有两个MRJob。第一个MRJob 中,Map的输出结果集合会随机分布到Reduce中,每个Reduce做部分聚合操作,并输出结果,这样处理的结果是相同的GroupBy Key有可能...
  • 小白最近很长一段时间,都遇到了大数据量,JOB运行慢的问题,看一些优化方法的时候经常提起spark的SQL语句执行过程,对于没有认真研究过SPARK的运行过程的小白来说,看的一知半解,为了打破这个情况,小白认真学习了...
  • 数据倾斜就是数据的 key 的分化严重不均,造成一部分 Reduce 数据很多,一部分数据 Reduce 很少的负载不均衡的问题。 比如:(盗图) 产生数据倾斜的原因 分组:group by 维度过小,某值的数量过多 去重...
  • task会通过所在节点的BlockManager获取数据,BlockManager发现数据不在本地时,户通过网络传输组件从数据所在节点的BlockManager处获取数据。 网络传输数据的情况是我们不愿意看到的,大量的网络传输会严重影响...
  • Dremio 数据湖以及数据仓库一. 数据湖和数据仓库什么是数据湖?数据湖的目的和优势什么数据仓库?数据湖和数据仓库之间差异数据湖引擎二. Dremio分离数据和计算使用基于Apache Arrow的查询引擎加速—并节省90%DREMIO...
  • DDR,DDR2,DDR3,DDR4,LPDDR区别

    万次阅读 多人点赞 2019-12-22 22:47:42
    目前内存的读写基本都是连续的,因为与CPU交换的数据量以一个Cache Line(即CPU内Cache的存储单位)的容量为准,一般为64字节。而现有的Rank位宽为8字节(64bit),那么就要一次连续传输8次,这就涉及到我们也经常能...
  • final:最终聚合 keys:分组的字段,如果没有分组,则没有此字段 outputColumnNames:聚合之后输出列名 Statistics:表统计信息,包含分组聚合之后的数据条数,数据大小等 Reduce Output Operator:输出到reduce操作...
  • 如下所示: 同样类似于数据库,当单表数据大于500W的时候需要对其进行分库分表,当数据量很大的时候(标准可能不一样,要看Redis服务器容量)我们同样可以对Redis进行类似的操作,就是分库分表。 假设,我们有一个...
  • 2020年数据术语的故事

    2021-01-21 00:00:00
    点击上方蓝色字体,选择“设为星标”回复”资源“获取更多资源2020年整个技术圈子要说话题最多的,应该是大数据方向。新感念层出不穷,数据湖概念就是其中之一。这篇文章是关于数据仓库、数据湖、...
  • 浅析Redis分布式集群倾斜问题

    千次阅读 2019-08-14 17:45:28
    对于分布式系统而言,整个集群处理请求的效率和存储容量,往往取决于集群中响应最慢或存储增长最快的节点。所以在系统设计和容量规划时,我们尽量保障集群中各节点的“数据和请求分布...
  • 关系型数据库本身比较容易成为系统性能瓶颈,单机存储容量、连接数、处理能力等都很有限,数据库本身的“有状态性”导致了它并不像Web和应用服务器那么容易扩展。在互联网行业海量数据和高并发访问的考验下,聪明的...
  • 数据架构:概念与冷热分离 公众号:程序员架构进阶 一 概述 上一篇文章数据架构:概念与冷热分离中介绍了数据架构的概念和意义。并抛出了数据冷热分离的问题。事实上,这并不是新的概念,各公司在很早之前就...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 5,532
精华内容 2,212
关键字:

倾斜数据容量