精华内容
下载资源
问答
  • 目录手动设置多个ReduceTask在idea中启动MapReduce手动设置Combiner通过jar包在linux终端执行 手动设置多个ReduceTask 先来看只有一个ReduceTask时的词频统计的结果 当没有手动设置ReduceTask的数量时,默认只有一个...

    手动设置多个ReduceTask

    先来看只有一个ReduceTask时的词频统计的结果
    当没有手动设置ReduceTask的数量时,默认只有一个reduceTask

    数据为
    在这里插入图片描述

    package Demo.mr.WordCount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    public class WordCountMapper extends Mapper<LongWritable, Text,Text, IntWritable> {
        @Override
        protected void map(LongWritable key,Text value,Context context) throws IOException,InterruptedException{
            String datas = value.toString();
            String[] split = datas.split(",");
            for (String data : split){
                context.write(new Text(data),new IntWritable(1));
            }
        }
    }
    
    
    package Demo.mr.WordCount;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    import java.io.IOException;
    
    public class WordCountReducer extends Reducer<Text, IntWritable,Text,IntWritable> {
        @Override
        protected void reduce(Text key,Iterable<IntWritable> values,Context context) throws IOException,InterruptedException{
            int sum=0;
            for(IntWritable val:values){
                sum=sum + val.get();
            }
            context.write(key,new IntWritable(sum));
        }
    }
    
    
    package Demo.mr.WordCount;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class WordCountDriver {
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
            //向yarn申请一个job任务用于执行mapreduce程序
            Job job = Job.getInstance(new Configuration());
            //设置入口类
            job.setJarByClass(WordCountDriver.class);
            //设置mapper类
            job.setMapperClass(WordCountMapper.class);
            //设置reduce类
            job.setReducerClass(WordCountReducer.class);
            //设置Mapper类的输出
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            //设置reduce类的输出
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //设置要处理的文件
            FileInputFormat.addInputPath(job,new Path("hdfs://master:9000/data/words.txt"));
            //设置输出路径
            FileOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/output"));
            //启动
            job.waitForCompletion(true);
        }
    }
    
    在idea中启动MapReduce

    这里是因为给出路径时,“hdfs://master:9000/data/words.txt” 直接连接到了hdfs中的文件路径,所以可以在idea中直接运行
    在这里插入图片描述

    结果为
    在这里插入图片描述
    part-r-00000的内容为
    hadoop 4
    hive 2
    java 3
    python 2
    word 2

    然后手动设置ReduceTask的数量为2
    在WordCountDriver类,也就是主方法中设置,只需要一条语句 job.setNumReduceTasks(2);

    public class WordCountDriver {
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
            Job job = Job.getInstance(new Configuration());
            job.setJarByClass(WordCountDriver.class);
    
            //设置reduceTask数量
            job.setNumReduceTasks(2);
    
            job.setMapperClass(WordCountMapper.class);
            job.setReducerClass(WordCountReducer.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            FileInputFormat.addInputPath(job,new Path("hdfs://master:9000/data/words.txt"));
            FileOutputFormat.setOutputPath(job,new Path("hdfs://master:9000/output"));
            job.waitForCompletion(true);
        }
    }
    

    Map端和Reduce端的代码不需要改变

    看一下结果
    在这里插入图片描述
    这里就会发现,每个ReduceTask都会产生一个自己的结果文件,这里的两个ReduceTask分别产生了part-r-00000以及part-r-00001文件
    分别打开这两个文件

    part-r-00000的内容为
    hadoop 4

    part-r-00000的内容为
    hive 2
    java 3
    python 2
    word 2

    这里的part-r-00000以及part-r-00001两个文件的内容合在一起才是上面单个ReduceTask任务结果文件的内容。
    这里是因为不同key的键值对的partition值不一样,因此会被传入不同的reduceTask中

    简单的测试一下partiton

    package Demo.mr;
    
    public class Test {
        public static void main(String[] args) {
            int h = "hadoop".hashCode();
            System.out.println(h%2);
    
            int h1 = "hive".hashCode();
            int h2 ="java".hashCode();
            int h3 ="python".hashCode();
            int h4 = "word".hashCode();
            System.out.println(h1%2);
            System.out.println(h2%2);
            System.out.println(h3%2);
            System.out.println(h4%2);
        }
    }
    
    

    在这里插入图片描述
    会发现hadoop的值,与剩下四个hive,java,python,word都不相同
    所以key为hadoop的键值对单独进入一个reduceTask里面,然后计算后被输出在当前reduceTask对应的结果文件part-r-00000里面

    key为hive,java,python,word的这些键值对会被送往另一个接收partition值为0的reduceTask中,然后被输出在文件part-r-00001里面

    手动设置Combiner

    Combiner类如果不自己定义的话,默认的shuffle过程中是不会combine的

    先来看看没有combine的执行情况

    package Demo.mr;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class Gender {
        public static class GenderMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String[] split = line.split(",");
                if("男".equals(split[3])){
                    context.write(new Text(split[3]),new IntWritable(1));
                }
            }
        }
    
        public static class GenderReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable value : values) {
                    sum += value.get();
                }
                context.write(key,new IntWritable(sum));
            }
        }
    
        public static void main(String[] args) throws Exception{
            Job job = Job.getInstance();
            job.setNumReduceTasks(2);
            job.setJobName("class age sum");
            job.setJarByClass(ClazzAgeSum.class);
    
            //map端
            job.setMapperClass(GenderMapper.class);
            job.setOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //reduce端
            job.setReducerClass(GenderReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            //指定输入输出路径
            Path input = new Path("/data/students.txt");
            FileInputFormat.addInputPath(job,input);
            Path output = new Path("/output");
    
            FileSystem fs = FileSystem.get(new Configuration());
    
            if(fs.exists(output)){
                fs.delete(output,true);
            }
            FileOutputFormat.setOutputPath(job,output);
    
            //启动job
            job.waitForCompletion(true);
    
        }
    }
    
    

    在这里插入图片描述

    通过jar包在linux终端执行

    这里没有给出确切的hdfs的文件位置,所以不能像上面设置多个reduceTask的代码一样直接在idea里面运行,需要打成jar包然后传到linux里面用命令运行
    在这里插入图片描述
    hadoop jar /usr/local/jar/hdfs-1.0-SNAPSHOT.jar Demo.mr.Gender
    格式为 hadoop   jar   jar的位置   jar包里面具体执行的类名

    运行过程的信息如图所示
    在这里插入图片描述
    再来看看写了combine的情况

    需要写具体的Combiner类,还要在主方法里面加上一句job.setCombinerClass(CombineReducer.class);

    package Demo.mr;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class Gender {
    	//Map端
        public static class GenderMapper extends Mapper<LongWritable,Text,Text,IntWritable> {
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                String[] split = line.split(",");
                if("男".equals(split[3])){
                    context.write(new Text(split[3]),new IntWritable(1));
                }
            }
        }
    
    	//Combiner预聚合
        public static class CombineReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable value : values) {
                    sum += value.get();
                }
                context.write(key,new IntWritable(sum));
            }
        }
    
    	//Reduce端
        public static class GenderReducer extends Reducer<Text,IntWritable,Text,IntWritable> {
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable value : values) {
                    sum += value.get();
                }
                context.write(key,new IntWritable(sum));
            }
        }
    
        public static void main(String[] args) throws Exception{
            Job job = Job.getInstance();
            job.setNumReduceTasks(2);
            job.setJobName("class age sum");
            job.setJarByClass(ClazzAgeSum.class);
            job.setMapperClass(GenderMapper.class);
            job.setOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //combine端 预聚合
            job.setCombinerClass(CombineReducer.class);
    
            job.setReducerClass(GenderReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
            Path input = new Path("/data/students.txt");
            FileInputFormat.addInputPath(job,input);
            Path output = new Path("/output");
            FileSystem fs = FileSystem.get(new Configuration());
            if(fs.exists(output)){
                fs.delete(output,true);
            }
            FileOutputFormat.setOutputPath(job,output);
            job.waitForCompletion(true);
    
        }
    }
    
    

    写好后重新打包,打包的时候继续双击package即可,会自动覆盖原先的旧的jar包,然后再重新上传
    在这里插入图片描述
    之前没有设置Combiner时,红线标出来的地方 Combine的input和output后面的值都为0,说明没有combine过程。这里设置Combiner后,就有值了,说明经过了combine过程

    再来看一下具体的数据,Combine input records读取的数据量为507,而上面几行的Map output records的值同样为507。Combine就相当于一个发生在reduce之前的reduce端,接收一个MapTask输出的值进行combine过程后,等待map的shuffle阶段结束,将不同map的combine输出结果传送到对应的reduceTask那里

    展开全文
  • 一、MapTask运行机制详解以及Map任务的并行度 整个Map阶段流程大体如上图所示。简单概述:inputFile通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map...

    一、MapTask运行机制详解以及Map任务的并行度

     

    整个Map阶段流程大体如上图所示。简单概述:inputFile通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map处理结束之后交给OutputCollector收集器,对其结果key进行分区(默认使用hash分区),然后写入buffer,每个map task都有一个内存缓冲区,存储着map的输出结果,当缓冲区快满的时候需要将缓冲区的数据以一个临时文件的方式存放到磁盘,当整个map task结束后再对磁盘中这个map task产生的所有临时文件做合并,生成最终的正式输出文件,然后等待reduce task来拉数据。

    详细步骤:

    1、首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。默认情况下split与block的对应关系默认是一对一。

    2、将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行文本内容。

    3、读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行用户重写的map调用一次,并输出一个<key,value>。

    4、Map输出的数据会写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。

    环形缓冲区其实是一个数组,数组中存放着key、value的序列化数据和key、value的元数据信息,包括partition、key的起始位置、value的起始位置以及value的长度。环形结构是一个抽象概念。

    缓冲区是有大小限制,默认是100MB。当map task的输出结果很多时,就可能会撑爆内存,所以需要在一定条件下将缓冲区中的数据临时写入磁盘,然后重新利用这块缓冲区。这个从内存往磁盘写数据的过程被称为Spill,中文可译为溢写。这个溢写是由单独线程来完成,不影响往缓冲区写map结果的线程。溢写线程启动时不应该阻止map的结果输出,所以整个缓冲区有个溢写的比例spill.percent。这个比例默认是0.8,也就是当缓冲区的数据已经达到阈值(buffer size * spill percent = 100MB * 0.8 = 80MB),溢写线程启动,锁定这80MB的内存,执行溢写过程。Map task的输出结果还可以往剩下的20MB内存中写,互不影响。

    5、合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。

    至此map整个阶段结束。

     

    mapTask的一些基础设置配置(mapred-site.xml当中社会):

    设置一:设置环型缓冲区的内存值大小(默认设置如下)

    mapreduce.task.io.sort.mb:100

    设置二:设置溢写百分比(默认设置如下)

    mapreduce.map.sort.spill.percent:0.80

     设置三:设置溢写数据目录(默认设置)

    mapreduce.cluster.local.dir:${hadoop.tmp.dir}/mapred/local

     设置四:设置一次最多合并多少个溢写文件(默认设置如下)

    mapreduce.task.io.sort.factor:10

     

    二、 ReduceTask 工作机制以及reduceTask的并行度

     

    Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作,纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。

    详细步骤:

    1、Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。

    2、Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。

    3、合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。

    4、对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

    5、map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。

    MapReduce提供Partitioner接口,它的作用就是根据key或value及reduce的数量来决定当前的这对输出数据最终应该交由哪个reduce task处理。默认对key hash后再以reduce task数量取模。默认的取模方式只是为了平均reduce的处理能力,如果用户自己对Partitioner有需求,可以订制并设置到job上。

    6、当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。

     

    如果job设置过Combiner,那么现在就是使用Combiner的时候了。将有相同key的key/value对的value加起来,减少溢写到磁盘的数据量。Combiner会优化MapReduce的中间结果,所以它在整个模型中会多次使用。

    哪些场景才能使用Combiner呢?从这里分析,Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等(求平均值绝不能用Combiner)。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。

    MapReduce总体工作机制

     

     

     

    三、shuffle

    核心机制:数据分区,排序,分组,ComBine,合并等过程)。

    shuffle是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段。一般把从Map产生输出开始到Reduce取得数据作为输入之前的过程称作shuffle

     

    维度一、流程维度(从Map输出到Reduce输入)

     

    维度二,内存维度(从Map输出到Reduce输入)

     

    Shffle阶段的内存与流程的关系

    1.Collect阶段:将MapTask的结果输出到默认大小为100M(可以修改)的环形缓冲区,保存的是key/value,Partition分区信息等。

    2.Spill阶段:当内存中的数据量达到一定的阀值的时候,就会将数据写入本地磁盘,在将数据写入磁盘之前需要对数据进行一次排序的操作,如果配置了combiner,还会将有相同分区号和key的数据进行排序。

    3.Merge阶段:把所有溢出的临时文件进行一次合并操作,以确保一个MapTask最终只产生一个中间数据文件。

    4.Copy阶段:ReduceTask启动Fetcher线程到已经完成MapTask的节点上复制一份属于自己的数据,这些数据默认会保存在内存的缓冲区中,当内存的缓冲区达到一定的阀值的时候,就会将数据写到磁盘之上。

    5.Merge阶段:在ReduceTask远程复制数据的同时,会在后台开启两个线程对内存到本地的数据文件进行合并操作。

    6.Sort阶段:在对数据进行合并的同时,会进行排序操作,由于MapTask阶段已经对数据进行了局部的排序,ReduceTask只需保证Copy的数据的最终整体有效性即可。

    Shuffle中的缓冲区大小会影响到mapreduce程序的执行效率,原则上说,缓冲区越大,磁盘io的次数越少,执行速度就越快

    缓冲区的大小可以通过参数调整,  参数:mapreduce.task.io.sort.mb  默认100M

     

    Combiner

    Map结束后,在Map端进行局部聚和。

    作用:较少网络传入次数,降低了网络开销。

    哪些场景才能使用Combiner?

    Combiner的输出是Reducer的输入,Combiner绝不能改变最终的计算结果。Combiner只应该用于那种Reduce的输入key/value与输出key/value类型完全一致,且不影响最终结果的场景。比如累加,最大值等(求平均值绝不能用Combiner)。Combiner的使用一定得慎重,如果用好,它对job执行效率有帮助,反之会影响reduce的最终结果。 

    Split

    Split对数据进行逻辑切分

    hdfs数据块大小是128 ,split 逻辑切分数据块大小是128M

    HDFS 128 是存储层面的概念,是切分数据的分界点。

    split 128 是一个逻辑切分。

    这两个128没有关系。

     

    四、MapReduce参数优化

     

    MapReduce重要配置参数

    1 资源相关参数

    以下调整参数都在mapred-site.xml这个配置文件当中有

    //以下参数是在用户自己的mr应用程序中配置就可以生效

    (1) mapreduce.map.memory.mb: 一个Map Task可使用的资源上限(单位:MB),默认为1024。如果Map Task实际使用的资源量超过该值,则会被强制杀死。

    (2) mapreduce.reduce.memory.mb: 一个Reduce Task可使用的资源上限(单位:MB),默认为1024。如果Reduce Task实际使用的资源量超过该值,则会被强制杀死。

    (3) mapred.child.java.opts  配置每个map或者reduce使用的内存的大小,默认是200M

     (4) mapreduce.map.cpu.vcores: 每个Map task可使用的最多cpu core数目, 默认值: 1

    (5) mapreduce.reduce.cpu.vcores: 每个Reduce task可使用的最多cpu core数目, 默认值: 1

    virtual 虚拟的

     

    //shuffle性能优化的关键参数,应在yarn启动之前就配置好

    (6)mapreduce.task.io.sort.mb   100         //shuffle的环形缓冲区大小,默认100m

    (7)mapreduce.map.sort.spill.percent   0.8    //环形缓冲区溢出的阈值,默认80%

     

    //应该在yarn启动之前就配置在服务器的配置文件中才能生效

    以下配置都在yarn-site.xml配置文件当中配置

    (8) yarn.scheduler.minimum-allocation-mb   1024   给应用程序container分配的最小内存

    (9) yarn.scheduler.maximum-allocation-mb   8192 给应用程序container分配的最大内存

    (10) yarn.scheduler.minimum-allocation-vcores 1 container最小的虚拟内核的个数

    (11)yarn.scheduler.maximum-allocation-vcores 32 container最大的虚拟内核的个数

    (12)yarn.nodemanager.resource.memory-mb   8192  每个nodemanager给多少内存

     

    2 容错相关参数

    (1) mapreduce.map.maxattempts: 每个Map Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

    (2) mapreduce.reduce.maxattempts: 每个Reduce Task最大重试次数,一旦重试参数超过该值,则认为Map Task运行失败,默认值:4。

    (3) mapreduce.job.maxtaskfailures.per.tracker: 当失败的Map Task失败比例超过该值为,整个作业则失败,默认值为0. 如果你的应用程序允许丢弃部分输入数据,则该该值设为一个大于0的值,比如5,表示如果有低于5%的Map Task失败(如果一个Map Task重试次数超过mapreduce.map.maxattempts,则认为这个Map Task失败,其对应的输入数据将不会产生任何结果),整个作业仍认为成功。

     (5) mapreduce.task.timeout: Task超时时间,默认值为600000毫秒,经常需要设置的一个参数,该参数表达的意思为:如果一个task在一定时间内没有任何进入,即不会读取新的数据,也没有输出数据,则认为该task处于block状态,可能是卡住了,也许永远会卡主,为了防止因为用户程序永远block住不退出,则强制设置了一个该超时时间(单位毫秒)。如果你的程序对每条输入数据的处理时间过长(比如会访问数据库,通过网络拉取数据等),建议将该参数调大,该参数过小常出现的错误提示是“AttemptID:attempt_14267829456721_123456_m_000224_0 Timed out after 300 secsContainer killed by the ApplicationMaster.”。

    3 本地运行mapreduce 作业

    设置以下几个参数: file:///

    mapreduce.framework.name=local

    mapreduce.jobtracker.address=local

    fs.defaultFS=local

    4 效率和稳定性相关参数

    (1) mapreduce.map.speculative: 是否为Map Task打开推测执行机制,默认为true,如果为true,如果Map执行时间比较长,那么集群就会推测这个Map已经卡住了,会重新启动同样的Map进行并行的执行,哪个先执行完了,就采取哪个的结果来作为最终结果,一般直接关闭推测执行

    (2) mapreduce.reduce.speculative: 是否为Reduce Task打开推测执行机制,默认为true,如果reduce执行时间比较长,那么集群就会推测这个reduce已经卡住了,会重新启动同样的reduce进行并行的执行,哪个先执行完了,就采取哪个的结果来作为最终结果,一般直接关闭推测执行

     (3) mapreduce.input.fileinputformat.split.minsize: FileInputFormat做切片时的最小切片大小,默认为0

     

     

    Map端的最高效率是:尽量减少环形缓冲区flflush的次数(减少磁盘IO 的使用次数)

       如何能够减少环形缓冲区flflush的次数:

          1、加大环形缓冲区的内存

          2、增大缓冲区阈值的大小 (考虑剩余的空间是不是够系统使用)

          3、对输出的进行压缩(压缩-解压的过程会消耗CPU)

     

    Reduce端的最高效率是:

       尽量减少环形缓冲区flflush的次数

          尽量将所有的数据在内存中计算

     

    在网络带宽、磁盘IO是瓶颈的前提下

       能不使用IO 网络就不使用,在必须使用的前提下,能少用就少用。

       所有的,只要能够减少网络带宽的开销,只要能够减少磁盘io的使用的次数的配置项,都是集群调优的可选项。

    (可选项包括: 软件层面【系统软件和集群软件】,硬件层面,网络层面)

    展开全文
  • MapTask与ReduceTask详解

    2020-08-17 21:03:58
    1、MapReduce的分区与reduceTask的数量 1.1 MapReduce的分区 在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理,例如我们为了数据的统计,我们可以把一批类似的数据发 送到同...

    1、MapReduce的分区与reduceTask的数量

    1.1 MapReduce的分区

    在MapReduce中,通过我们指定分区,会将同一个分区的数据发送到同一个reduce当中进行处理,例如我们为了数据的统计,我们可以把一批类似的数据发 送到同一个reduce当中去,在同一个reduce当中统计相同类型的数据,就可以实现类似数据的分区,统计等。

    1.2 reduceTask的数量

    reducetask的数量通过我们自己手动指定。
    如指定3个reducetask。 job.setNumReduceTasks(3);

    2、MapTask运行机制详解以及Map任务的并行度

    2.1 MapTask运行机制

    1.首先,读取数据组件InputFormat(默认TextInputFormat)会通过getSplits方法对输入目录中文件进行逻辑切片规划得到splits,有多少个split就对应启动多少个MapTask。split与block的对应关系默认是一对一。

    2.将输入文件切分为splits之后,由RecordReader对象(默认LineRecordReader)进行读取,以\n作为分隔符,读取一行数据,返回<key,value>。Key表示每行首字符偏移值,value表示这一行文本内容。

    3.读取split返回<key,value>,进入用户自己继承的Mapper类中,执行用户重写的map函数。RecordReader读取一行这里调用一次。

    4.map逻辑完之后,将map的每条结果通过context.write进行collect数据收集。在collect中,会先对其进行分区处理,默认使用HashPartitioner。

    5.接下来,会将数据写入内存,内存中这片区域叫做环形缓冲区,缓冲区的作用是批量收集map结果,减少磁盘IO的影响。我们的key/value对以及Partition的结果都会被写入缓冲区。当然写入之前,key与value值都会被序列化成字节数组。

    6.当溢写线程启动后,需要对这80MB空间内的key做排序(Sort)。排序是MapReduce模型默认的行为,这里的排序也是对序列化的字节做的排序。

    7.合并溢写文件:每次溢写会在磁盘上生成一个临时文件(写之前判断是否有combiner),如果map的输出结果真的很大,有多次这样的溢写发生,磁盘上相应的就会有多个临时文件存在。当整个数据处理结束之后开始对磁盘中的临时文件进行merge合并,因为最终的文件只有一个,写入磁盘,并且为这个文件提供了一个索引文件,以记录每个reduce对应数据的偏移量。
    至此map整个阶段结束。

    3、ReduceTask 工作机制以及reduceTask的并行度

    3.1 Reduce的三个阶段

    Reduce大致分为copy、sort、reduce三个阶段,重点在前两个阶段。
    copy阶段包含一个eventFetcher来获取已完成的map列表,由Fetcher线程去copy数据,在此过程中会启动两个merge线程,分别为inMemoryMerger和onDiskMerger,分别将内存中的数据merge到磁盘和将磁盘中的数据进行merge。
    待数据copy完成之后,copy阶段就完成了,开始进行sort阶段,sort阶段主要是执行finalMerge操作。
    纯粹的sort阶段,完成之后就是reduce阶段,调用用户定义的reduce函数进行处理。

    3.2 详细步骤

    1、Copy阶段,简单地拉取数据。Reduce进程启动一些数据copy线程(Fetcher),通过HTTP方式请求maptask获取属于自己的文件。
    2、Merge阶段。这里的merge如map端的merge动作,只是数组中存放的是不同map端copy来的数值。Copy过来的数据会先放入内存缓冲区中,这里的缓冲区大小要比map端的更为灵活。merge有三种形式:内存到内存;内存到磁盘;磁盘到磁盘。默认情况下第一种形式不启用。当内存中的数据量到达一定阈值,就启动内存到磁盘的merge。与map 端类似,这也是溢写的过程,这个过程中如果你设置有Combiner,也是会启用的,然后在磁盘中生成了众多的溢写文件。第二种merge方式一直在运行,直到没有map端的数据时才结束,然后启动第三种磁盘到磁盘的merge方式生成最终的文件。
    3、合并排序。把分散的数据合并成一个大的数据后,还会再对合并后的数据排序。
    4、对排序后的键值对调用reduce方法,键相等的键值对调用一次reduce方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到HDFS文件中。

    展开全文
  • ReduceTask阶段

    2019-04-19 23:12:41
    ReduceTask阶段1.图解2.数据输出介绍3.设置ReduceTask并行度(个数)4.注意事项 1.图解 Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接...

    1.图解

    在这里插入图片描述
    Copy阶段:ReduceTask从各个MapTask上远程拷贝一片数据,并针对某一片数据,如果其大小超过一定阈值,则写到磁盘上,否则直接放到内存中。
    Merge阶段:在远程拷贝数据的同时,ReduceTask启动了两个后台线程对内存和磁盘上的文件进行合并,以防止内存使用过多或磁盘上文件过多。
    Sort阶段:按照MapReduce语义,用户编写reduce()函数输入数据是按key进行聚集的一组数据。为了将key相同的数据聚在一起,Hadoop采用了基于排序的策略。由于各个MapTask已经实现对自己的处理结果进行了局部排序,因此,ReduceTask只需对所有数据进行一次归并排序即可。
    Reduce阶段:reduce()函数将计算结果写到HDFS上。

    2.数据输出介绍

    OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。
    文本输出TextOutputFormat:默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。
    SequenceFileOutputFormat:将SequenceFileOutputFormat输出作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩
    自定义OutputFormat:为了实现控制最终文件的输出路径和输出格式,可以自定义OutputFormat。

    3.设置ReduceTask并行度(个数)

    ReduceTask的并行度同样影响整个job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,Reducetask数量的决定是可以直接手动设置:

    //默认值是1,手动设置为4
    job.setNumReduceTasks(4);
    

    4.注意事项

    1. ReduceTask=0,表示没有Reduce阶段,输出文件个数和Map个数一致
    2. ReduceTask默认值就是1,所以输出文件个数为一个
    3. 如果数据分布不均匀,就有可能在Reduce阶段产生数据倾斜
    4. ReduceTask数量并不是任意设置,还要考虑业务逻辑需求,有些情况下,需要计算全局汇总结果,就只能有1个ReduceTask
    5. 具体多少个ReduceTask,需要根据集群性能而定
    6. 如果分区数不是1,但是ReduceTask为1,是否执行分区过程。答案是:不执行分区过程。因为在MapTask的源码中,执行分区的前提是先判断reduceNum个数是否大于1。不大于1肯定不执行

    版权声明:本博客为记录本人自学感悟,转载需注明出处!
    https://me.csdn.net/qq_39657909

    展开全文
  • MapTask,ReduceTask,MapReduce运行机制详解

    千次阅读 2019-11-16 16:14:05
      ...这篇博客,主要针对MapTask与ReduceTask运行机制的一个详解与MapReduce总体运行机制做一个较为详细的介绍! MapTask运行机制详解以及Map任务的并行度     &nb...
  • MapReduce分区和reduceTask的数量 1.MapReduce分区:相同key的数据发送到同一个reduce里面去。 mapTask处理的是文件切片filesplit。 注意:block的概念是在hdfs当中的,mapreduce当中,每一个mapTask处理的数据都是...
  • MapReduce的分区与ReduceTask的数量

    千次阅读 2019-11-20 08:37:37
    MapReduce的分区与ReduceTask的数量 在MapReduce中,通过指定分区,会将同一个分区的数据发送到同一个reduce中,例如为了数据的统计,可以把一批类似的数据发 送到同一个reduce当中去,在同一个reduce中统计相同类型...
  • MapReduce的自定义分区与ReduceTask数量

    千次阅读 2019-11-14 20:13:43
    本篇博客小菌为大家带来的是MapReduce的自定义分区与ReduceTask内容的分享(ReduceMap具体计算流程见《MapReduce中shuffle阶段概述及计算任务流程》)。         在MapReduce...
  • 如果 ReduceTask 数量过多,一个 ReduceTask 会产生一个结果文件,这样就会生成很多小文件,那么如果这些结果文件会作为下一个 Job 的输入,则会出现小文件需要进行合并的问题,而且启动和初始化ReduceTask 需要耗费...
  • Hadoop之ReduceTask工作机制 目录 设置ReduceTask并行度(个数) 注意 实验:测试reducetask多少合适 ReduceTask工作机制 1. 设置ReduceTask并行度(个数) reducetask的并行度同样影响整个job的执行并发度和...
  • ReduceTask 工作机制

    2020-08-12 13:11:09
    ReduceTask 工作机制 概述: ReduceTask 分了四个阶段,第一阶段是copy阶段,每一个ReduceTask会从所有的MapTask中拷贝同一个分区的数据,就是说每一个ReduceTask单独负责处理一个分区,互不影响。第二阶段是Merge...
  • ReduceTask的工作原理 1.reduceTask获取map阶段的输出-copy阶段 两点说明: (1)map任务和reduce任务不一定在同一台机器上。 (2)map任务和reduce任务不是只有一个。可能有多个map任务和多个reduce任务同时执行。 ...
  • 1、MapTask运行机制详解以及Map任务的并行度 整个Map阶段流程大体如下图所示。 简单概述:inputFile通过split被逻辑切分为多个split文件,通过Record按行读取内容给map(用户自己实现的)进行处理,数据被map...
  • 第一步: 读取文件 &amp;nbsp;&amp;nbsp; &amp;nbsp;&amp;nbsp; FileInputFormat切片机制: ...将每一个split分配给一个对应的maptask处理。block是HDFS上物理上存储的存储的数据,切片是对
  • 2.ReduceTask工作机制 (1)Copy阶段:Reduce Task从各个Map Task上远程拷贝一片数据,并针对某一个片数据,如果其大小超过一定阈值,则写入磁盘中,否则直接放到内存中 (2)Merge阶段:在远程拷贝数据的...
  • 1、mapTask工作机制() 1、Read阶段:mapTask通过用户编写的RecordReader,从输入InputSplit中解析出一个个key/value对; 2、Map阶段:该节点主要是将解析出的key、value(k1,v2)交给用户编写map()函数...
  • Map Task数目的确定和Reduce Task数目的指定————自然得到结论,前者是后者决定的,后者是人为指定的。查看源码可以很容易看懂 1、MapReduce作业中Map Task数目的确定: 1)MapReduce从HDFS中分割读取Split文件...
  • MapReduce运行时的mapTask和reduceTask 1 mapTask任务 1.1 mapTask & mapTask并行度 1.2 如何修改mapTask并行度 2 reduceTask任务 2.1 reduceTask & reduceTask并行度 2.2 如何设置reduceTask的并行度...
  • reducetask的并行度

    2020-02-01 09:14:07
    reducetask的并行度 //指定reducetask的个数 job.setNumReduceTasks(4); 参数决定的 默认值是1 当有多个reducetask的时候,每一个reducetask的数据如何分配的,由分区算法决定 1、默认的分区算法 mapkey.hash % ...
  • MapReduce工作流程一: MapReduce工作流程二: 上面的流程是整个MapReduce最全工作流程,但是Shuffle过程只是从第7步开始到第16步结束,具体Shuffle过程详解,如下: ...(5)ReduceTask根据自己的分
  • 一、Reduce Task 并行度决定机制 reduce task的并行度,也就是同时开启了几个reduce task。分为两种情况: 1、如果我们自己定义了分区器,我们能够确定自己的分区器能够形成几个物理分区,加入我们要生成5个分区,...
  • 在Mapreduce中,Shuffle过程是Mapreduce的核心,它分布在Mapreduce的map阶段和reduce阶段,共可分为6个详细的阶段: 1).Collect阶段:将MapTask的结果输出到默认大小为100M的MapOutputBuffer内部环形内存缓冲区,...
  • MapTask、ReduceTask并行度决定机制

    千次阅读 2018-03-28 16:03:47
    MapTask的并行度决定map阶段的任务处理并发度,进而影响到整个job的处理速度。那么,MapTask并行实例是否越多越好呢?其并行度又是如何决定呢? 1、mapTask并行度的决定机制 一个job的map阶段并行度由客户端在...
  • Hive--优化 Hive中的执行引擎目前支持:MapReduce、Spark、Tez 本文设定的执行引擎为...1.1 hive.fetch.task.conversion Default Value: minimal in Hive 0.10.0 through 0.13.1, more in Hive 0.14.0 and la...
  • 13. ReduceTask并行度

    2020-07-12 22:40:23
    ReduceTask并行度 ReduceTask的并行度同样影响整个Job的执行并发度和执行效率,但与MapTask的并发数由切片数决定不同,ReduceTask数量的决定是可以直接手动设置: // 默认值是1,手动设置为4 job.setNumReduceTasks...
  • Hive进阶 设置reduce Task数量

    千次阅读 2019-07-08 22:14:55
    思考:某个MapReduce程序,到底有多少个Map Task和reduceTask mapTask: 要处理的数据放在HDFS上,有多少个Block就有几个Maptask,一个block被一个MapTask处理。 reduceTask: 自己编写MapReduce程序,自己设定 hive...
  • 文章目录4 、MapTask 工作机制5 、ReduceTask工作机制 4 、MapTask 工作机制 (1)Read阶段:MapTask 通过 InputFormat 获得的 RecordReader,从输入 InputSplit 中解析出一个个 key/value。 (2)Map 阶段:该节点...
  • MapReduce作业可以细分为map taskreduce task,而MRAppMaster又将map taskreduce task分为四种状态:  1、pending:刚启动但尚未向resourcemanager发送资源请求;  2、scheduled:已经向resourceManager发送...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 64,985
精华内容 25,994
关键字:

reducetask