精华内容
下载资源
问答
  • MR 运行原理

    万次阅读 2017-11-17 14:58:23
    下图大概描述了Map-Reduce的Job运行的基本原理:   下面我们讨论JobConf,其有很多的项可以进行配置: setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, ...

    1、Map-Reduce的逻辑过程

    假设我们需要处理一批有关天气的数据,其格式如下:

    • 按照ASCII码存储,每行一条记录
    • 每一行字符从0开始计数,第15个到第18个字符为年
    • 第25个到第29个字符为温度,其中第25位是符号+/-

    0067011990999991950051507+0000+

    0043011990999991950051512+0022+

    0043011990999991950051518-0011+

    0043012650999991949032412+0111+

    0043012650999991949032418+0078+

    0067011990999991937051507+0001+

    0043011990999991937051512-0002+

    0043011990999991945051518+0001+

    0043012650999991945032412+0002+

    0043012650999991945032418+0078+

    现在需要统计出每年的最高温度。

    Map-Reduce主要包括两个步骤:Map和Reduce

    每一步都有key-value对作为输入和输出:

    • map阶段的key-value对的格式是由输入的格式所决定的,如果是默认的TextInputFormat,则每行作为一个记录进程处理,其中key为此行的开头相对于文件的起始位置,value就是此行的字符文本
    • map阶段的输出的key-value对的格式必须同reduce阶段的输入key-value对的格式相对应

    对于上面的例子,在map过程,输入的key-value对如下:

    (0, 0067011990999991950051507+0000+)

    (33, 0043011990999991950051512+0022+)

    (66, 0043011990999991950051518-0011+)

    (99, 0043012650999991949032412+0111+)

    (132, 0043012650999991949032418+0078+)

    (165, 0067011990999991937051507+0001+)

    (198, 0043011990999991937051512-0002+)

    (231, 0043011990999991945051518+0001+)

    (264, 0043012650999991945032412+0002+)

    (297, 0043012650999991945032418+0078+)

    在map过程中,通过对每一行字符串的解析,得到年-温度的key-value对作为输出:

    (1950, 0)

    (1950, 22)

    (1950, -11)

    (1949, 111)

    (1949, 78)

    (1937, 1)

    (1937, -2)

    (1945, 1)

    (1945, 2)

    (1945, 78)

    在reduce过程,将map过程中的输出,按照相同的key将value放到同一个列表中作为reduce的输入

    (1950, [0, 22, –11])

    (1949, [111, 78])

    (1937, [1, -2])

    (1945, [1, 2, 78])

    在reduce过程中,在列表中选择出最大的温度,将年-最大温度的key-value作为输出:

    (1950, 22)

    (1949, 111)

    (1937, 1)

    (1945, 78)

    其逻辑过程可用如下图表示:

    image

    下图大概描述了Map-Reduce的Job运行的基本原理:

    image

     

    下面我们讨论JobConf,其有很多的项可以进行配置:

    • setInputFormat:设置map的输入格式,默认为TextInputFormat,key为LongWritable, value为Text
    • setNumMapTasks:设置map任务的个数,此设置通常不起作用,map任务的个数取决于输入的数据所能分成的input split的个数
    • setMapperClass:设置Mapper,默认为IdentityMapper
    • setMapRunnerClass:设置MapRunner, map task是由MapRunner运行的,默认为MapRunnable,其功能为读取input split的一个个record,依次调用Mapper的map函数
    • setMapOutputKeyClass和setMapOutputValueClass:设置Mapper的输出的key-value对的格式
    • setOutputKeyClass和setOutputValueClass:设置Reducer的输出的key-value对的格式
    • setPartitionerClass和setNumReduceTasks:设置Partitioner,默认为HashPartitioner,其根据key的hash值来决定进入哪个partition,每个partition被一个reduce task处理,所以partition的个数等于reduce task的个数
    • setReducerClass:设置Reducer,默认为IdentityReducer
    • setOutputFormat:设置任务的输出格式,默认为TextOutputFormat
    • FileInputFormat.addInputPath:设置输入文件的路径,可以使一个文件,一个路径,一个通配符。可以被调用多次添加多个路径
    • FileOutputFormat.setOutputPath:设置输出文件的路径,在job运行前此路径不应该存在

    当然不用所有的都设置,由上面的例子,可以编写Map-Reduce程序如下:

    public class MaxTemperature {

        public static void main(String[] args) throws IOException {

            if (args.length != 2) {

                System.err.println("Usage: MaxTemperature <input path> <output path>");

                System.exit(-1);

            }

            JobConf conf = new JobConf(MaxTemperature.class);

            conf.setJobName("Max temperature");

            FileInputFormat.addInputPath(conf, new Path(args[0]));

            FileOutputFormat.setOutputPath(conf, new Path(args[1]));

            conf.setMapperClass(MaxTemperatureMapper.class);

            conf.setReducerClass(MaxTemperatureReducer.class);

            conf.setOutputKeyClass(Text.class);

            conf.setOutputValueClass(IntWritable.class);

            JobClient.runJob(conf);

        }

    }

    3、Map-Reduce数据流(data flow)

    Map-Reduce的处理过程主要涉及以下四个部分:

    • 客户端Client:用于提交Map-reduce任务job
    • JobTracker:协调整个job的运行,其为一个Java进程,其main class为JobTracker
    • TaskTracker:运行此job的task,处理input split,其为一个Java进程,其main class为TaskTracker
    • HDFS:hadoop分布式文件系统,用于在各个进程间共享Job相关的文件

    image

    3.1、任务提交

    JobClient.runJob()创建一个新的JobClient实例,调用其submitJob()函数。

    • 向JobTracker请求一个新的job ID
    • 检测此job的output配置
    • 计算此job的input splits
    • 将Job运行所需的资源拷贝到JobTracker的文件系统中的文件夹中,包括job jar文件,job.xml配置文件,input splits
    • 通知JobTracker此Job已经可以运行了

    提交任务后,runJob每隔一秒钟轮询一次job的进度,将进度返回到命令行,直到任务运行完毕。

     

    3.2、任务初始化

     

    当JobTracker收到submitJob调用的时候,将此任务放到一个队列中,job调度器将从队列中获取任务并初始化任务。

    初始化首先创建一个对象来封装job运行的tasks, status以及progress。

    在创建task之前,job调度器首先从共享文件系统中获得JobClient计算出的input splits。

    其为每个input split创建一个map task。

    每个task被分配一个ID。

     

    3.3、任务分配

     

    TaskTracker周期性的向JobTracker发送heartbeat。

    在heartbeat中,TaskTracker告知JobTracker其已经准备运行一个新的task,JobTracker将分配给其一个task。

    在JobTracker为TaskTracker选择一个task之前,JobTracker必须首先按照优先级选择一个Job,在最高优先级的Job中选择一个task。

    TaskTracker有固定数量的位置来运行map task或者reduce task。

    默认的调度器对待map task优先于reduce task

    当选择reduce task的时候,JobTracker并不在多个task之间进行选择,而是直接取下一个,因为reduce task没有数据本地化的概念。

     

    3.4、任务执行

     

    TaskTracker被分配了一个task,下面便要运行此task。

    首先,TaskTracker将此job的jar从共享文件系统中拷贝到TaskTracker的文件系统中。

    TaskTracker从distributed cache中将job运行所需要的文件拷贝到本地磁盘。

    其次,其为每个task创建一个本地的工作目录,将jar解压缩到文件目录中。

    其三,其创建一个TaskRunner来运行task。

    TaskRunner创建一个新的JVM来运行task。

    被创建的child JVM和TaskTracker通信来报告运行进度。

     

    3.4.1、Map的过程

    MapRunnable从input split中读取一个个的record,然后依次调用Mapper的map函数,将结果输出。

    map的输出并不是直接写入硬盘,而是将其写入缓存memory buffer。

    当buffer中数据的到达一定的大小,一个背景线程将数据开始写入硬盘。

    在写入硬盘之前,内存中的数据通过partitioner分成多个partition。

    在同一个partition中,背景线程会将数据按照key在内存中排序。

    每次从内存向硬盘flush数据,都生成一个新的spill文件。

    当此task结束之前,所有的spill文件被合并为一个整的被partition的而且排好序的文件。

    reducer可以通过http协议请求map的输出文件,tracker.http.threads可以设置http服务线程数。

    3.4.2、Reduce的过程

    当map task结束后,其通知TaskTracker,TaskTracker通知JobTracker。

    对于一个job,JobTracker知道TaskTracer和map输出的对应关系。

    reducer中一个线程周期性的向JobTracker请求map输出的位置,直到其取得了所有的map输出。

    reduce task需要其对应的partition的所有的map输出。

    reduce task中的copy过程即当每个map task结束的时候就开始拷贝输出,因为不同的map task完成时间不同。

    reduce task中有多个copy线程,可以并行拷贝map输出。

    当很多map输出拷贝到reduce task后,一个背景线程将其合并为一个大的排好序的文件。

    当所有的map输出都拷贝到reduce task后,进入sort过程,将所有的map输出合并为大的排好序的文件。

    最后进入reduce过程,调用reducer的reduce函数,处理排好序的输出的每个key,最后的结果写入HDFS。

     

    image

     

    3.5、任务结束

     

    当JobTracker获得最后一个task的运行成功的报告后,将job得状态改为成功。

    当JobClient从JobTracker轮询的时候,发现此job已经成功结束,则向用户打印消息,从runJob函数中返回。


    ------------------------------------------------------------------------------------------------------------------------------------

    市面上的hadoop权威指南一类的都是老版本的书籍了,索性学习并翻译了下最新版的Hadoop:The Definitive Guide, 4th Edition与大家共同学习。

      我们通过提交jar包,进行MapReduce处理,那么整个运行过程分为五个环节:

      1、向client端提交MapReduce job.

      2、随后yarn的ResourceManager进行资源的分配.

      3、由NodeManager进行加载与监控containers.

      4、通过applicationMaster与ResourceManager进行资源的申请及状态的交互,由NodeManagers进行MapReduce运行时job的管理.

      5、通过hdfs进行job配置文件、jar包的各节点分发。

    Job 提交过程

      job的提交通过调用submit()方法创建一个JobSubmitter实例,并调用submitJobInternal()方法。整个job的运行过程如下:

      1、向ResourceManager申请application ID,此ID为该MapReduce的jobId。

      2、检查output的路径是否正确,是否已经被创建。

      3、计算input的splits。

      4、拷贝运行job 需要的jar包、配置文件以及计算input的split 到各个节点。

      5、在ResourceManager中调用submitAppliction()方法,执行job

    Job 初始化过程

      1、当resourceManager收到了submitApplication()方法的调用通知后,scheduler开始分配container,随之ResouceManager发送applicationMaster进程,告知每个nodeManager管理器。

      2、由applicationMaster决定如何运行tasks,如果job数据量比较小,applicationMaster便选择将tasks运行在一个JVM中。那么如何判别这个job是大是小呢?当一个job的mappers数量小于10个只有一个reducer或者读取的文件大小要小于一个HDFS block时,(可通过修改配置项mapreduce.job.ubertask.maxmaps,mapreduce.job.ubertask.maxreduces以及mapreduce.job.ubertask.maxbytes 进行调整)

      3、在运行tasks之前,applicationMaster将会调用setupJob()方法,随之创建output的输出路径(这就能够解释,不管你的mapreduce一开始是否报错,输出路径都会创建)

    Task 任务分配

      1、接下来applicationMaster向ResourceManager请求containers用于执行map与reduce的tasks(step 8),这里map task的优先级要高于reduce task,当所有的map tasks结束后,随之进行sort(这里是shuffle过程后面再说),最后进行reduce task的开始。(这里有一点,当map tasks执行了百分之5%的时候,将会请求reduce,具体下面再总结)

      2、运行tasks的是需要消耗内存与CPU资源的,默认情况下,map和reduce的task资源分配为1024MB与一个核,(可修改运行的最小与最大参数配置,mapreduce.map.memory.mb,mapreduce.reduce.memory.mb,mapreduce.map.cpu.vcores,mapreduce.reduce.reduce.cpu.vcores.)

    Task 任务执行

      1、这时一个task已经被ResourceManager分配到一个container中,由applicationMaster告知nodemanager启动container,这个task将会被一个主函数为YarnChild的java application运行,但在运行task之前,首先定位task需要的jar包、配置文件以及加载在缓存中的文件

      2、YarnChild运行于一个专属的JVM中,所以任何一个map或reduce任务出现问题,都不会影响整个nodemanager的crash或者hang

      3、每个task都可以在相同的JVM task中完成,随之将完成的处理数据写入临时文件中。

    Mapreduce数据流

    运行进度与状态更新

      1、MapReduce是一个较长运行时间的批处理过程,可以是一小时、几小时甚至几天,那么Job的运行状态监控就非常重要。每个job以及每个task都有一个包含job(running,successfully completed,failed)的状态,以及value的计数器,状态信息及描述信息(描述信息一般都是在代码中加的打印信息),那么,这些信息是如何与客户端进行通信的呢?

      2、当一个task开始执行,它将会保持运行记录,记录task完成的比例,对于map的任务,将会记录其运行的百分比,对于reduce来说可能复杂点,但系统依旧会估计reduce的完成比例。当一个map或reduce任务执行时,子进程会持续每三秒钟与applicationMaster进行交互

    Job 完成

       最终,applicationMaster会收到一个job完成的通知,随后改变job的状态为successful。最终,applicationMaster与task containers被清空。

     

    Shuffle与Sort

      从map到reduce的过程,被称之为shuffle过程,MapReduce使到reduce的数据一定是经过key的排序的,那么shuffle是如何运作的呢?

      当map任务将数据output时,不仅仅是将结果输出到磁盘,它是将其写入内存缓冲区域,并进行一些预分类

      

      1、The Map Side

      首先map任务的output过程是一个环状的内存缓冲区,缓冲区的大小默认为100MB(可通过修改配置项mpareduce.task.io.sort.mb进行修改),当写入内存的大小到达一定比例,默认为80%(可通过mapreduce.map.sort.spill.percent配置项修改),便开始写入磁盘。

      在写入磁盘之前,线程将会指定数据写入与reduce相应的patitions中,最终传送给reduce.在每个partition中,后台线程将会在内存中进行Key的排序,(如果代码中有combiner方法,则会在output时就进行sort排序,这里,如果只有少于3个写入磁盘的文件,combiner将会在outputfile前启动,如果只有一个或两个,那么将不会调用)

      这里将map输出的结果进行压缩会大大减少磁盘IO与网络传输的开销(配置参数mapreduce.map .output.compress 设置为true,如果使用第三方压缩jar,可通过mapreduce.map.output.compress.codec进行设置)

       随后这些paritions输出文件将会通过HTTP发送至reducers,传送的最大启动线程通过mapreduce.shuffle.max.threads进行配置。

      2、The Reduce Side

      首先上面每个节点的map都将结果写入了本地磁盘中,现在reduce需要将map的结果通过集群拉取过来,这里要注意的是,需要等到所有map任务结束后,reduce才会对map的结果进行拷贝,由于reduce函数有少数几个复制线程,以至于它可以同时拉取多个map的输出结果。默认的为5个线程(可通过修改配置mapreduce.reduce.shuffle.parallelcopies来修改其个数)

      这里有个问题,那么reducers怎么知道从哪些机器拉取数据呢? 

      当所有map的任务结束后,applicationMaster通过心跳机制(heartbeat mechanism),由它知道mapping的输出结果与机器host,所以reducer会定时的通过一个线程访问applicationmaster请求map的输出结果

      Map的结果将会被拷贝到reduce task的JVM的内存中(内存大小可在mapreduce.reduce.shuffle.input.buffer.percent中设置)如果不够用,则会写入磁盘。当内存缓冲区的大小到达一定比例时(可通过mapreduce.reduce.shuffle.merge.percent设置)或map的输出结果文件过多时(可通过配置mapreduce.reduce.merge.inmen.threshold),将会除法合并(merged)随之写入磁盘。

      这时要注意,所有的map结果这时都是被压缩过的,需要先在内存中进行解压缩,以便后续合并它们。(合并最终文件的数量可通过mapreduce.task.io.sort.factor进行配置) 最终reduce进行运算进行输出。

    参考文献:《Hadoop:The Definitive Guide, 4th Edition》 



    展开全文
  • hadoop MR运行原理

    2018-08-25 11:36:04
    hadoop MR运行原理 [url]http://s5.51cto.com/wyfs02/M01/86/7B/wKiom1fADFHCZ-igAAFuYAeFdr0171.png-wh_500x0-wm_3-wmp_4-s_2360924780.png[/url] [url]https://www.cnblogs.com/dream-to-pku/p/7249954.html...
    hadoop MR运行原理
    

    [url]http://s5.51cto.com/wyfs02/M01/86/7B/wKiom1fADFHCZ-igAAFuYAeFdr0171.png-wh_500x0-wm_3-wmp_4-s_2360924780.png[/url]

    [url]https://www.cnblogs.com/dream-to-pku/p/7249954.html[/url]
    展开全文
  • mr运行原理详解

    千次阅读 2018-03-13 17:37:36
    3.上面的输出会先写到一个缓冲区里面(环形缓冲区,默认100M),当写入百分之80的时候会对里面的数据进行dump,dump的过程会对里面的数据先进行分区然后排序,如果有combiner会进行局部的combiner,之后写入运行map...

    hadoop2.x的三大核心:mapreduce 、hdfs以及yarn ,其中核心之一mapreduce,利用了分而治之的思想,Map(映射)和 Reduce(归约),分布式多处理然后进行汇总的思想,比如:清点扑克牌把里面的花色都分开,一个人清点那么可能耗时4分钟,如果利用mapreduce的思想,把扑克牌分成4份,每个人对自己的那一份进行清点,然后4个人都清点完成之后把各自的相同花色放一起进行汇总,那么这样可能只会耗时1分钟。这就是mapreduce的思想,其中每个人就相当于一个map,汇总就相当于是reduce,最开始的分牌就是patition分区(如果不均匀分配就相当于是数据倾斜),从map到reduce的过程就是shuffle。

    下面就简单说说我理解的mapreduce执行流程

    略过提交任务的流程以及yarn的相关过程,可以参考:http://blog.csdn.net/qq_20641565/article/details/54729059

    这里直接从map任务被执行说起。

    1.当map任务开始执行的时候,会先根据用户设置的FileInputFormat去读取数据源,默认是TextFileInputFormat读取hdfs中的文件(当然mapreduce内部也提供了其他的实现类,比如读取数据库的等等),我们也可以自己定义一个FileInputFormat,继承FileInputFormat类就可以重写isSplitable和createRecordReader方法并在createRecordReader方法里面返回一个自定义的RecordReader实例就行(也是继承RecordReader类就行,需要重写里面的getCurrentKey、close、getCurrentValue、getProgress、initialize、nextKeyValue这几个方法可以参考:http://blog.csdn.net/qq_20641565/article/details/52770522)

    这里写图片描述

    2.当上面的处理完成后会进入到map方法,就以wordcount为例,这里map收到的参数为文件的一行例如数据是:“hello lijie hello word spark scala java java java”,然后对上面的一行进行split(” “)切分,然后用context.write输出(hello ,1)(lijie ,1)(hello ,1)(word ,1)(spark ,1)(scala ,1)(java,1)(java,1)(java,1)

    3.上面的输出会先写到一个缓冲区里面(环形缓冲区,默认100M),当写入百分之80的时候会对里面的数据进行dump,dump的过程会对里面的数据先进行分区然后排序,如果有combiner会进行局部的combiner,之后写入运行map程序的那台服务器的本地磁盘中,如果map一直执行,那么会每满百分之80又会执行上面的过程直到map执行完成

    这里写图片描述

    3.上面的步骤走完之后产生了很多小的文件,然后会触发mr的文件合并,把多个文件进行合并,合并过程中又会进行排序和局部的combiner,如果定义的是2个reduce,那么最后每个map端就会生产2个分区文件,并且文件里面的内容会已经排序且局部combiner(前提是设置了combiner)

    这里写图片描述

    4.map任务执行完成之后,reduce会从map端下载对应的文件,并且又会对下载过来的文件进行合并且排序并且会调用GroupComparator 对象(用来自定义哪些是一组的,mr程序默认key相同为同一组,但是可以自己定义GroupComparator,这样不同的key也可以进入同一个reduce方法进行处理,详情见:http://blog.csdn.net/qq_20641565/article/details/53491257

    这里写图片描述

    5.执行reduce方法,并且执行reduce的逻辑,执行完成之后就会调用FileOutputFormat,这个同FileInputFormat一样,同样可以自定义,也是继承FileOutPutFormat返回一个RecordWriter,这里就不过多介绍,默认是TextFileOutputFormat写入到hdfs里面

    6.整个mapreduce的执行流程就完成了,如果FileOutputFormat和FileInputFormat是默认的话,那么数据流就是:HDFS -> 本地磁盘 ->HDFS,并且多次进行IO操作,所以mr的瓶颈在于他的IO操作,只适合进行离线计算

                                                                                                                                                              原创地址

    展开全文
  • MR原理运行流程

    万次阅读 2018-12-13 21:37:50
    文章目录MR的原理和运行流程Map的运行过程Reduce处理过程Shuffle过程MR运行过程Yarn &amp;&amp; Job MR的原理和运行流程 Map的运行过程 以HDFS上的文件作为默认输入源为例(MR也可以有其他的输入源) ...

    MR的原理和运行流程

    Map的运行过程

    以HDFS上的文件作为默认输入源为例(MR也可以有其他的输入源)

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-419qFeIb-1589376928673)(https://github.com/jiaoqiyuan/163-bigdate-note/raw/master/%E6%97%A5%E5%BF%97%E8%A7%A3%E6%9E%90%E5%8F%8A%E8%AE%A1%E7%AE%97%EF%BC%9AMR/img/Map%E8%BF%90%E8%A1%8C%E8%BF%87%E7%A8%8B.png “”)]

    • block是HDFS上的文件块,split是文件的分片(逻辑划分,不包含具体数据,只包含这些数据的位置信息)。

      • 一个split包含一个或多个block,默认是一对一的关系。
      • 一个split不包含两个文件的block, 不会跨越file边界,也就是说一个split是不会跨文件进行划分的。
    • 当分片完成后,MR程序会将split中的数据以K/V(key/value)的形式读取出来,然后将这些数据交给用户自定义的Map函数进行处理。

      • 一个Map处理一个split。
    • 用户用Map函数处理完数据后将处理后,同样将结果以K/V的形式交给MR的计算框架。

    • MR计算框架会将不同的数据划分成不同的partition,数据相同的多个partition最后会分到同一个reduce节点上面进行处理,也就是说一类partition对应一个reduce。

    • Map默认使用Hash算法对key值进行Hash计算,这样保证了相同key值的数据能够划分到相同的partition中,同时也保证了不同的partition之间的数据量时大致相当的,参考链接

    • 一个程序中Map和Reduce的数量是有split和partition的数据决定的。

    Reduce处理过程

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-yDsRinfZ-1589376928676)(https://github.com/jiaoqiyuan/163-bigdate-note/raw/master/%E6%97%A5%E5%BF%97%E8%A7%A3%E6%9E%90%E5%8F%8A%E8%AE%A1%E7%AE%97%EF%BC%9AMR/img/Reduce%E5%A4%84%E7%90%86%E8%BF%87%E7%A8%8B.png “”)]

    • Map处理完后,reduce处理程序在各个Map节点将属于自己的数据拷贝到自己的内存缓冲区中
    • 最后将这些数据合并成一个大的数据集,并且按照key值进行聚合,把聚合后的value值作为一个迭代器给用户使用。
    • 用户使用自定义的reduce函数处理完迭代器中的数据后,一般把结果以K/V的格式存储到HDFS上的文件中。

    Shuffle过程

    • 在上面介绍的MR过程中,还存在一个shuffle过程,发生与Map和Reduce之中。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nXBx9zAU-1589376928679)(https://github.com/jiaoqiyuan/163-bigdate-note/raw/master/%E6%97%A5%E5%BF%97%E8%A7%A3%E6%9E%90%E5%8F%8A%E8%AE%A1%E7%AE%97%EF%BC%9AMR/img/shuffle%E8%BF%87%E7%A8%8B.png “”)]

    Map中的shuffle

    • Collec阶段键数据放在环形缓冲区,唤醒缓冲区分为数据区和索引区。
    • sort阶段对在统一partition内的索引按照key值排序。
    • spill(溢写)阶段根据拍好序的索引将数据按顺序写到文件中。
    • Merge阶段将Spill生成的小文件分批合并排序成一个大文件。
    • Reduce中的shuffle
      • Copy阶段将Map段的数据分批拷贝到Reduce的缓冲区。
      • Spill阶段将内存缓冲区的数据按照顺序写到文件中。
      • Merge阶段将溢出文件合并成一个排好序的数据集。
    • Combine优化
      • 整个过程中可以提前对聚合好的value值进行计算,这个过程就叫Combine。

      • Combine在Map端发生时间

        • 在数据排序后,溢写到磁盘前,相同key值的value是紧挨在一起的,可以进行聚合运算,运行一次combiner。
        • 再合并溢出文件输出到磁盘前,如果存在至少3个溢出文件,则运行combiner,可以通过min.num.spills.for.combine设置阈值。
      • Reduce端

        • 在合并溢出文件输出到磁盘前,运行combiner。
      • Combiner不是任何情况下都适用的,需要根据业务需要进行设置。

    MR运行过程

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-RJX3RjGX-1589376928682)(https://github.com/jiaoqiyuan/163-bigdate-note/raw/master/%E6%97%A5%E5%BF%97%E8%A7%A3%E6%9E%90%E5%8F%8A%E8%AE%A1%E7%AE%97%EF%BC%9AMR/img/MR%E8%BF%90%E8%A1%8C%E8%BF%87%E7%A8%8B.png “”)]

    • 一个文件分成多个split数据片。
    • 每个split由多一个map进行处理。
    • Map处理完一个数据就把处理结果放到一个环形缓冲区内存中。
    • 环形缓冲区满后里面的数据会被溢写到一个个小文件中。
    • 小文件会被合并成一个大文件,大文件会按照partition进行排序。
    • reduce节点将所有属于自己的数据从partition中拷贝到自己的缓冲区中,并进行合并。
    • 最后合并后的数据交给reduce处理程序进行处理。
    • 处理后的结果存放到HDFS上。
    • MR运行在集群上:YARN(Yet Another Resource Negotiator)

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-gxnQd48w-1589376928686)(https://github.com/jiaoqiyuan/163-bigdate-note/raw/master/%E6%97%A5%E5%BF%97%E8%A7%A3%E6%9E%90%E5%8F%8A%E8%AE%A1%E7%AE%97%EF%BC%9AMR/img/YARN%E6%A1%86%E6%9E%B6.png “”)]
    __ ResourceManager负责调度和管理整个集群的资源__

    * 主要职责是调度,对应用程序的整体进行资源分配
    * Nodemanager负责节点上的计算资源,内部包含Container, App Master,管理Container生命周期,资源使用情况,节点健康状况,并将这些信息回报给RM。
        * Container中包含一些资源信息,如cpu核数,内存大小
        * 一个应用程序由一个App Master管理,App Master负责将应用程序运行在各个节点的Container中,App Master与RM协商资源分配的问题。
        * ## <a name="uo0fvt"></a>MapReduce On Yarn
    

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-nCnd4etj-1589376928688)(https://github.com/jiaoqiyuan/163-bigdate-note/raw/master/%E6%97%A5%E5%BF%97%E8%A7%A3%E6%9E%90%E5%8F%8A%E8%AE%A1%E7%AE%97%EF%BC%9AMR/img/MRonYarn.png “”)]

    * MR程序在客户端启动,客户端会向RM发送一个请求。
    * RM收到请求后返回一个AppID给客户端。
    * 然后客户端拿着AppID,用户名,队列,令牌向RM发出资源请求。
    * 客户端这时会将程序用到的jar包,资源文件,程序运行中需要的数据等传送到HDFS上。
    * RM接收到客户端的资源请求后,分配一个container0的资源包,由NodeManager启动一个AppMaster。
    * RM将集群的容量信息发送给AppMaster,AppMaster计算这个程序需要的资源量后,根据需要想RM请求更多的container。
    * 最后由各个NodeManager在节点上启动MapTask和ReduceTask。  
    

    Yarn && Job

    上面的 Yarn 管理 MR 任务是不是比较粗略,下面我将介绍比较详细的处理流程:

    这也是今日头条的一个面试题,引发的思考:

    MR 任务为例,讲一下 Yarn 的整个过程。

    Yarn 中的主要组件包括:Resourcemanager,ApplicationMaster, NodeManager。

    Resourcemanager:每个Hadoop集群只会有一个ResourceManager(如果是HA的话会存在两个,但是有且只有一个处于active状态),启动每一个 Job 所属的 ApplicationMaster,另外监控ApplicationMaster 以及NodeManager 的存在情况,并且负责协调计算节点上计算资源的分配。ResourceManager 内部主要有两个组件:

    • Scheduler:这个组件完全是插拔式的,用户可以根据自己的需求实现不同的调度器,目前YARN提供了FIFO、容量以及公平调度器。这个组件的唯一功能就是给提交到集群的应用程序分配资源,并且对可用的资源和运行的队列进行限制。Scheduler并不对作业进行监控;
    • ApplicationsManager :这个组件用于管理整个集群应用程序的 application masters,负责接收应用程序的提交;为application master启动提供资源;监控应用程序的运行进度以及在应用程序出现故障时重启它。

    ApplicationMaster:每个 Job 都有对应一个 ApplicationMaster ,并且负责运行 mapreduce 任务,并负责报告任务的状态。ApplicationMaster是应用程序级别的,每个ApplicationMaster管理运行在YARN上的应用程序。YARN 将 ApplicationMaster看做是第三方组件,ApplicationMaster负责和ResourceManager scheduler协商资源,并且和NodeManager通信来运行相应的task。ResourceManager 为 ApplicationMaster 分配容器,这些容器将会用来运行task。ApplicationMaster 也会追踪应用程序的状态,监控容器的运行进度。当容器运行完成, ApplicationMaster 将会向 ResourceManager 注销这个容器;如果是整个作业运行完成,其也会向 ResourceManager 注销自己,这样这些资源就可以分配给其他的应用程序使用了。

    NodeManager:负责启动和管理节点的容器。NodeManager是YARN中每个节点上的代理,它管理Hadoop集群中单个计算节点,根据相关的设置来启动容器的。NodeManager会定期向ResourceManager发送心跳信息来更新其健康状态。同时其也会监督Container的生命周期管理,监控每个Container的资源使用(内存、CPU等)情况,追踪节点健康状况,管理日志和不同应用程序用到的附属服务(auxiliary service)。

    Container: Container是与特定节点绑定的,其包含了内存、CPU磁盘等逻辑资源。不过在现在的容器实现中,这些资源只包括了内存和CPU。容器是由 ResourceManager scheduler 服务动态分配的资源构成。容器授予 ApplicationMaster 使用特定主机的特定数量资源的权限。ApplicationMaster 也是在容器中运行的,其在应用程序分配的第一个容器中运行。

    必须牢记yarn只是一个资源管理的框架,并不是一个计算框架,计算框架可以运行在yarn上。我们所能做的就是向RM申请container,然后配合NM一起来启动container。

    下面是请求资源和分配资源的流程:

    1.客户端向 ResourceManager 发送 job 请求,客户端产生的 RunJar 进程与 ResourceManager 通过 RPC 通信。
    2.ResourceManager 向客户端返回 job 相关资源的提交路径以及 jobID。
    3.客户端将 job 相关的资源提交到相应的共享文件夹下。
    4.客户端向 ResourceManager 提交 job
    5.ResourceManager 通过__调度器__在 NodeManager 创建一个容器,并且在容器中启用MRAppmaster 进程,该进程由 ResourceManager 启动。
    6.该 MRAppmaster 进程对作业进行初始化,创建多个对象对作业进行跟踪。
    7.MRAppmaster 从文件系统获取计算得到输入分片,只获取切片信息,不需要jar等资源,为每个分片创建一个 map 以及指定数量的 reduce 对象,之后 MRAppmaster 决定如何运行构成 mapreduce 的各个任务。
    8.若作业很大,MRAppmaster 为所有的 map 任务和reduce 任务向 ResourceManger 发起申请容器的请求,请求中包含 map 任务的数据本地化信息以及数据分片等信息。
    9.ResourceManager 为任务分配了容器之后,MRAppmaster 就通过 与 NodeManger 通信启动容器,由 MRAppmaster 负责分配在哪些 NodeManager 负责分配在哪些 NodeManager 上运行map (即 yarnchild 进程)和reduce 任务。
    10.运行 mao 和 reduce 任务的 NodeManager 从共享系统中获取 job 的相关县,包括 jar 文件,配置文件等。
    11.关于查询状态,不经过 reourcemanager ,而是任务周期性的 MRAppmaster 汇报状态以及进度,客户端每秒通过查询一次 MRAppmaster 来更新状态和信息。

    上面可以很乱,重点是辅助理解细节,认知到位了,无关细节了吧。

    下面总结一下,大概的流程:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-X10IXJnE-1589376928689)(https://cdn.nlark.com/yuque/0/2018/png/199648/1544534623921-5cd78e48-8181-404c-a67b-41898ca4574a.png “”)]

    流程大致如下:
    · client客户端向yarn集群(resourcemanager)提交任务
    · resourcemanager选择一个node创建appmaster
    · appmaster根据任务向rm申请资源
    · rm返回资源申请的结果
    · appmaster去对应的node上创建任务需要的资源(container形式,包括内存和CPU)
    · appmaster负责与nodemanager进行沟通,监控任务运行
    · 最后任务运行成功,汇总结果。
    其中Resourcemanager里面一个很重要的东西,就是调度器Scheduler,调度规则可以使用官方提供的,也可以自定义。


    专注大数据技术、架构、实战
    关注我,带你不同角度看数据架构

    在这里插入图片描述

    展开全文
  • 大数据-MR运行原理

    2020-01-05 17:06:06
    运行reduce task的节点通过过程5 将来自多个map任务的属于自己的分区数据下载到本地磁盘工作目录 这多个分区文件通过归并排序合并成大文件 并根据key值分好组(key值相同的 value值会以迭代器的形式组在一起) ...
  • MR1和MR2的运行原理

    千次阅读 2018-09-26 11:15:11
    MapReduce1 分为6个步骤: ... 2)、计算作业的输入分片,将运行作业所需要的资源(包括jar文件、配置文件和计算得到的输入分片)复制到一个以ID命名的jobtracker的文件系统中(HDFS),见第3步  3)、告知jo...
  • Hadoop之MR运行流程

    2020-01-01 20:48:03
    1.MR运行原理图: 2.MapTask的详细流程: 1.提交相应的信息到mr appmaster (1)都回提交哪些信息? split.xml 配置信息 jar包 切片信息.mrappmaster根据切片信息开启对应数量的maptask (2) 切片信息怎么得到? ...
  • Hadoop MR 核心原理

    千次阅读 多人点赞 2019-04-07 02:29:58
    在之前的系列内, 多数都是介绍Hadoop MR的基本操作, 对于运行原理涉及较少. 本章, 主要补充下这部分的内容. 方便以后的理解. 本章主要分为如下几个部分: 正文 WordCount运行设计 WordCount的Map/Reduce主要分为如下...
  • mr执行原理

    2019-02-23 15:39:20
    指的是在本地以线程模型mr运行的环境 由本地机器提供运算资源 这种模式是开发环境使用 这种模式程序是单机版的 不是分布式的程序 这种模式通常数据可以位于本地文件系统 至于mr程序到底是是本地模式还是集群模式...
  • Hive运行原理

    千次阅读 2016-09-18 22:18:48
    在执行Hive语句中,难免会好奇Hive的执行原理,通过explain可以查看Hive转换后的抽象语法树和操作符树 hive> explain extended select sum(shopid) from shopt1 limit 10; 生成的语法解析树AST Tree如下所示...
  • Spark Shuffle运行原理

    千次阅读 2020-09-07 12:48:19
    集合或者表操作:intersection,subtract,subtractByKey,join,leftOuterJoin 3.shuffle运行原理 在Spark的源码中,负责shuffle过程的执行、计算和处理的组件主要就是ShuffleManager。在Spark 1.2以前,默认的...
  • Spark:运行原理

    2018-09-27 19:53:47
    spark运行流程图如下(Spark job运行原理): spark-submit提交Spark应用程序后,其执行流程如下: 构建Spark Application的运行环境,启动SparkContext SparkContext向资源管理器Clutser Manager(可以是...
  • Electron运行原理

    千次阅读 2019-08-13 10:23:25
    Node.js是一个让JavaScript运行在服务端的开发平台,Node使用事件驱动,非阻塞I/O模型而得以轻量和高效。 单单靠Chromium是不能具备直接操作原生GUI能力的,Electron内集成了Nodejs,这让其在开发界面的同时也有了...
  • 运行一个MapReduce的作业,可以调用job对象的submit()方法(一般调用job的的waitForCompletion),主要是提交一个job。 整个作业的提交过程,涉及5个重要的实体对象: (1) Client,提交MapReduce作业 (2) YARN...
  • Hadoop基本知识,(以及MR编程原理

    千次阅读 2013-12-09 16:09:33
    1,一个map可能在多个节点上运行:  如果map运行过慢,就会在别的节点上重开一个,两个谁先跑完就取谁的结果,然后杀掉另一个。   2,如果有百个节点左右的集群想要做hadoop版本升级,有没有什么好办法?希望能...
  • Spark基本架构及运行原理

    万次阅读 多人点赞 2018-04-12 13:20:39
    2) Apache Mesos:与hadoop MR兼容性良好的一种资源调度框架 3) Hadoop Yarn: 主要是指Yarn中的ResourceManager 一个Application由一个Driver和若干个Job构成,一个Job由多个Stage构成,一个Stage由多个没有...
  • 1、基本概念 2、YARN、MR交互流程 3、源码解读
  • Yarn的体系结构和运行原理:运行MapReduce的容器hadoop jar hadoop-mapreduce-examples-2.7.3.jar wordcount /input/data.txt /output/wc12061.主节点:ResourceManager 职责: (*) 接受任务请求 (*) 资源的分配 ...
  • job任务在hadoop集群上的运行流程原理:yarn为调度资源(申请机器,运行任务),确定切片数量;数据上传HDFS;mapereduce全程
  • MFC程序运行原理

    2015-11-07 17:31:20
    如果没有这个全局对象,程序在编译时不会出错,但是在运行时会出错。 二 、调用全局应用程序对象的构造函数,从而就会先调用基类CWinApp的构造函数。后者完成应用程序的一些初始化工作,并将应用程序对象指针保存...
  • 通俗理解YARN运行原理

    2020-08-21 09:55:26
    根据资源,向相关的Node Manager通信,要求其启动程序 6、Node Manager(多个)启动MR(每个MR任务都是一个job,可以在job日志中查看程序运行日志) 7、Node Manager不断汇报MR状态和进展给Application Master 8、当...
  • Spark运行原理和RDD解密

    千次阅读 2016-05-06 11:22:55
    1.实战解析Spark运行原理 交互式查询(shell,sql) 批处理(机器学习,图计算) 首先,spark是基于内存的分布式高效计算框架,采用一栈式管理机制,同时支持流处理,实时交互式出,批处理三种方式,Spark特别...
  • spark 调优及运行原理

    千次阅读 2017-10-30 15:15:50
    运行Spark最简单的方法是通过Local模式(即伪分布式模式)。运行命令为:./bin/run-example org.apache.spark.examples.SparkPi local 基于standalone的Spark架构与作业执行流程 Standalone模式下,集群启动时包括...
  • Hadoop不仅仅是大数据技术的核心重点,还是我们面试官面试的时候经常会问道的问题,本文将详细介绍Hadoop的运行原理。 hadoop运行原理包括HDFS和Mapreduce两部分。 1)HDFS自动保存多个副本,移动计算。缺点...
  • 前言:想知道jmeter压测的原理是什么,得先知道性能测试的核心三原则: 基于协议,多线程,场景模拟! 基于协议:基于应用层和传输层的各种协议。比如http,udp,ftp,tcp等 多线程:通过进程下启动线程的方式来模拟...
  • 本篇博客将围绕Hadoop伪分布安装+MapReduce运行原理+基于MapReduce的KNN算法实现这三个方面进行叙述。 (一)Hadoop伪分布安装1、简述Hadoop的安装模式中–伪分布模式与集群模式的区别与联系. Hadoop的安装方式...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 17,849
精华内容 7,139
关键字:

mr的运行原理