mapreduce 订阅
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。 展开全文
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
信息
思想来源
Google的几篇论文
本    质
一种编程模型
特    点
分布可靠
用    途
大规模数据集的并行运算
应    用
大规模的算法图形处理、文字处理
外文名
MapReduce
MapReduce定义
MapReduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:1)MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。2)MapReduce是一个并行计算与运行软件框架(Software Framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。3)MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理 [1]  。
收起全文
精华内容
下载资源
问答
  • MapReduce

    千次阅读 多人点赞 2019-06-14 15:33:14
    一、MapReduce简介 什么是MapReduceMapreduce 是一个分布式运算程序的编程框架,是用户开发“基于 hadoop 的数据分析 应用”的核心框架。 Mapreduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一...

    一、MapReduce简介

    • 什么是MapReduce?
      Mapreduce 是一个分布式运算程序的编程框架,是用户开发“基于 hadoop 的数据分析 应用”的核心框架。
      Mapreduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的 分布式运算程序,并发运行在一个hadoop 集群上

    • Hadoop的四个组件
      HDFS:分布式存储系统
      MapReduce:分布式计算系统
      YARN: hadoop 的资源调度系统
      Common: 以上三大组件的底层支撑组件,主要提供基础工具包和 RPC 框架等

    • 为什么需要 MapReduce?
      (1) 海量数据在单机上处理因为硬件资源限制,无法胜任
      (2) 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
      (3) 引入 MapReduce 框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将 分布式计算中的复杂性交由框架来处理
      (4)从而提高开发效率,减轻开发者的负担

    • MapReduce的优点
      1.易于编程
      2.良好的扩展性
      3.高容错性
      4.适合PB级别以上的大数据的分布式离线批处理

    • MapReduce的缺点
      1.难以实时计算(MapReduce处理的是存储在本地磁盘上的离线数据)
      2.不能流式计算(MapReduce设计处理的数据源是静态的)
      3.难以DAG计算MapReduce这些并行计算大都是基于非循环的数据流模型,也就是说,一次计算过程中,不同计算节点之间保持高度并行,这样的数据流模型使得那些需要反复使用一个特定数据集的迭代算法无法高效地运行

    二、MapReduce的工作流程

    • input
      MapReduce需要把要执行的大文件数据进行切割(split)
      每一个输入分片(input split)对应一个map任务
      输入分片(input split)存储的并非数据本身而是一个分片长度和一个记录数据位置的数组
      输入分片(input split)和HDFS(hadoop2.0之后)的block(块)关系很密切,HDFS(hadoop2.0之后)的block块的大小默认是128M,如果我们执行的大文件是128x10M,MapReduce会分为10个map任务,每个map任务都存在于它所要计算的block(块)的DataNode上
    • map
      程序员编写的map函数
      因此map函数效率相对好控制
      而且一般map操作都是本地化操作也就是在数据存储节点上进行
    • shuffle
      负责将map生成的数据传递给reduce
      因此shuffle分为在map的执行过程和在reduce的执行过程
    • reduce
      负责shuffle传递过来的数据进行合并
    • output
      最后负责输出reduce中的数据信息
      在这里插入图片描述
    展开全文
  • mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce mapreduce ...
  • MapReduce MapReduce 计算 学习mapreduce工作原理的例子
  • MapReduce_mapReduce_源码

    2021-10-03 02:18:37
    MapReduce--1--入门程序WordCountMapReduce界的helloworld程序就是WordCount程序。所谓WordCount,就是单词计数,就是用来统计一篇或者一堆文本文件中的各单词的出现次数。
  • 图文详解 MapReduce 工作流程

    万次阅读 多人点赞 2021-06-17 00:14:28
    MapReduce 编程模型开发简单且功能强大,专门为并行处理大规模数据量而设计,接下来,通过一张图来描述 MapReduce 的工作过程,如图所示。 关于 MapReduce 编程模型的更多细节请参考我的这篇博客——MapReduce ...

    前言

    本文隶属于专栏《1000个问题搞定大数据技术体系》,该专栏为笔者原创,引用请注明来源,不足和错误之处请在评论区帮忙指出,谢谢!

    本专栏目录结构和参考文献请见1000个问题搞定大数据技术体系

    正文

    在这里插入图片描述

    MapReduce 编程模型

    MapReduce 编程模型开发简单且功能强大,专门为并行处理大规模数据量而设计,接下来,通过一张图来描述 MapReduce 的工作过程,如图所示。

    在这里插入图片描述

    关于 MapReduce 编程模型的更多细节请参考我的这篇博客——MapReduce 编程模型到底是怎样的?

    整体流程

    在上图中, MapReduce 的工作流程大致可以分为5步,具体如下:
    在这里插入图片描述

    分片、格式化数据源

    输入 Map 阶段的数据源,必须经过分片和格式化操作。

    • 分片操作:指的是将源文件划分为大小相等的小数据块( Hadoop 2.x 中默认 128MB ),也就是分片( split ),
      Hadoop 会为每一个分片构建一个 Map 任务,并由该任务运行自定义的 map() 函数,从而处理分片里的每一条记录;
    • 格式化操作:将划分好的分片( split )格式化为键值对<key,value>形式的数据,其中, key 代表偏移量, value 代表每一行内容。

    执行 MapTask

    每个 Map 任务都有一个内存缓冲区(缓冲区大小 100MB ),输入的分片( split )数据经过 Map 任务处理后的中间结果会写入内存缓冲区中。
    如果写人的数据达到内存缓冲的阈值( 80MB ),会启动一个线程将内存中的溢出数据写入磁盘,同时不影响 Map 中间结果继续写入缓冲区。
    在溢写过程中, MapReduce 框架会对 key 进行排序,如果中间结果比较大,会形成多个溢写文件,最后的缓冲区数据也会全部溢写入磁盘形成一个溢写文件,如果是多个溢写文件,则最后合并所有的溢写文件为一个文件。

    执行 Shuffle 过程

    MapReduce 工作过程中, Map 阶段处理的数据如何传递给 Reduce 阶段,这是 MapReduce 框架中关键的一个过程,这个过程叫作 Shuffle 。
    Shuffle 会将 MapTask 输出的处理结果数据分发给 ReduceTask ,并在分发的过程中,对数据按 key 进行分区和排序。

    执行 ReduceTask

    输入 ReduceTask 的数据流是<key, {value list}>形式,用户可以自定义 reduce()方法进行逻辑处理,最终以<key, value>的形式输出。

    写入文件

    MapReduce 框架会自动把 ReduceTask 生成的<key, value>传入 OutputFormat 的 write 方法,实现文件的写入操作。

    MapTask

    在这里插入图片描述

    1. Read 阶段: MapTask 通过用户编写的 RecordReader ,从输入的 InputSplit 中解析出一个个 key / value 。
    2. Map 阶段:将解析出的 key / value 交给用户编写的 Map ()函数处理,并产生一系列新的 key / value 。
    3. Collect 阶段:在用户编写的 map() 函数中,数据处理完成后,一般会调用 outputCollector.collect() 输出结果,在该函数内部,它会将生成的 key / value 分片(通过调用 partitioner ),并写入一个环形内存缓冲区中(该缓冲区默认大小是 100MB )。
    4. Spill 阶段:即“溢写”,当缓冲区快要溢出时(默认达到缓冲区大小的 80 %),会在本地文件系统创建一个溢出文件,将该缓冲区的数据写入这个文件。

    将数据写入本地磁盘前,先要对数据进行一次本地排序,并在必要时对数据进行合并、压缩等操作。
    写入磁盘之前,线程会根据 ReduceTask 的数量,将数据分区,一个 Reduce 任务对应一个分区的数据。
    这样做的目的是为了避免有些 Reduce 任务分配到大量数据,而有些 Reduce 任务分到很少的数据,甚至没有分到数据的尴尬局面。
    如果此时设置了 Combiner ,将排序后的结果进行 Combine 操作,这样做的目的是尽可能少地执行数据写入磁盘的操作。

    1. Combine 阶段:当所有数据处理完成以后, MapTask 会对所有临时文件进行一次合并,以确保最终只会生成一个数据文件

    合并的过程中会不断地进行排序和 Combine 操作,
    其目的有两个:一是尽量减少每次写人磁盘的数据量;二是尽量减少下一复制阶段网络传输的数据量。
    最后合并成了一个已分区且已排序的文件。

    ReduceTask

    在这里插入图片描述

    1. Copy 阶段: Reduce 会从各个 MapTask 上远程复制一片数据(每个 MapTask 传来的数据都是有序的),并针对某一片数据,如果其大小超过一定國值,则写到磁盘上,否则直接放到内存中
    2. Merge 阶段:在远程复制数据的同时, ReduceTask 会启动两个后台线程,分别对内存和磁盘上的文件进行合并,以防止内存使用过多或者磁盘文件过多。
    3. Sort 阶段:用户编写 reduce() 方法输入数据是按 key 进行聚集的一组数据。

    为了将 key 相同的数据聚在一起, Hadoop 采用了基于排序的策略。
    由于各个 MapTask 已经实现对自己的处理结果进行了局部排序,因此, ReduceTask 只需对所有数据进行一次归并排序即可。

    1. Reduce 阶段:对排序后的键值对调用 reduce() 方法,键相等的键值对调用一次 reduce()方法,每次调用会产生零个或者多个键值对,最后把这些输出的键值对写入到 HDFS 中
    2. Write 阶段: reduce() 函数将计算结果写到 HDFS 上。

    合并的过程中会产生许多的中间文件(写入磁盘了),但 MapReduce 会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到 Reduce 函数。

    展开全文
  • mapreduce 云计算

    2012-02-10 20:09:29
    mapreduce
  • mapreduce:mapreduce-源码

    2021-05-16 11:49:47
    python中的Mapreduce,带有mrjob和spark 我的幻灯片的源文件位于最初在#PyDayMDZ(2014年8月15日)提供,后来在8月7日)提供)。
  • Mapreduce经验

    2019-07-02 13:39:08
    MapReduce最早是由Google提出的用于一种分布式架构中的计算海量数据集的编程模型,它起源于函数式程程序的map 和reduce两个函数,但它们在MapReduce模型中的应用和原来的使用上的大相径庭。在MapReduce模型中,用户...
  • MapReduce
  • mapreduce详解

    2015-10-21 13:23:44
    mapreduce详解
  • Mapreduce原理

    2018-11-26 15:23:25
    介绍了hadoop的基本构成和原理,mapreduce的原理,适用场景等。
  • MapReduce.Net C# 中的 MapReduce
  • MapReduce 示例
  • Hadoop MapReduce架构

    2021-01-07 03:34:32
    Hadoop MapReduce 是 Hadoop 平台根据 MapReduce 原理实现的计算框架,目前已经实现了两个版本,MapReduce 1.0 和基于 YARN 结构的 MapReduce 2.0。 尽管 MapReduce 1.0 中存在一些问题,但是整体架构比较清晰,更...
  • MapReduce基础

    2014-12-15 17:02:48
    MapReduce基础
  • mapreduce框架

    2017-01-09 17:05:41
    一个实用的MapReduce框架
  • MapReduce详解

    万次阅读 多人点赞 2017-12-11 09:26:41
    1.1 MapReduce是什么  Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的...

    1.1 MapReduce是什么

      Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的海量数据集。这个定义里面有着这些关键词,

    一是软件框架,二是并行处理,三是可靠且容错,四是大规模集群,五是海量数据集。

    1.2 MapReduce做什么

      MapReduce擅长处理大数据,它为什么具有这种能力呢?这可由MapReduce的设计思想发觉。MapReduce的思想就是“分而治之”。

      (1)Mapper负责“,即把复杂的任务分解为若干个“简单的任务”来处理。“简单的任务”包含三层含义:

    一是数据或计算的规模相对原任务要大大缩小;二是就近计算原则,即任务会分配到存放着所需数据的节点上进行计算;三是这些小任务可以并行计算,彼此间几乎没有依赖关系。

      (2)Reducer负责对map阶段的结果进行汇总。至于需要多少个Reducer,用户可以根据具体问题,通过在mapred-site.xml配置文件里设置参数mapred.reduce.tasks的值,缺省值为1。

    一个比较形象的语言解释MapReduce:  

    我们要数图书馆中的所有书。你数1号书架,我数2号书架。这就是“Map”。我们人越多,数书就更快。

    现在我们到一起,把所有人的统计数加在一起。这就是“Reduce”。


    1.3 MapReduce工作机制

    MapReduce的整个工作过程如上图所示,它包含如下4个独立的实体:

      实体一:客户端,用来提交MapReduce作业。

      实体二:JobTracker,用来协调作业的运行。

      实体三:TaskTracker,用来处理作业划分后的任务。

      实体四:HDFS,用来在其它实体间共享作业文件。



    二、Hadoop中的MapReduce框架

                一个MapReduce作业通常会把输入的数据集切分为若干独立的数据块,由Map任务以完全并行的方式去处理它们。

               框架会对Map的输出先进行排序,然后把结果输入给Reduce任务。通常作业的输入和输出都会被存储在文件系统中,整个框架负责任务的调度和监控,以及重新执行已经关闭的任务。


               通常,MapReduce框架和分布式文件系统是运行在一组相同的节点上,也就是说,计算节点和存储节点通常都是在一起的。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使得整个集群的网络带宽被非常高效地利用。


    2.1 MapReduce框架的组成

    mapreduce

      (1)JobTracker

      JobTracker负责调度构成一个作业的所有任务,这些任务分布在不同的TaskTracker上(由上图的JobTracker可以看到2 assign map 和 3 assign reduce)。你可以将其理解为公司的项目经理,项目经理接受项目需求,并划分具体的任务给下面的开发工程师。

      (2)TaskTracker

      TaskTracker负责执行由JobTracker指派的任务,这里我们就可以将其理解为开发工程师,完成项目经理安排的开发任务即可。

    2.2 MapReduce的输入输出

      MapReduce框架运转在<key,value>键值对上,也就是说,框架把作业的输入看成是一组<key,value>键值对,同样也产生一组<key,value>键值对作为作业的输出,这两组键值对有可能是不同的。

      一个MapReduce作业的输入和输出类型如下图所示:可以看出在整个流程中,会有三组<key,value>键值对类型的存在。

    2.3 MapReduce的处理流程

      这里以WordCount单词计数为例,介绍map和reduce两个阶段需要进行哪些处理。单词计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数,如图所示:

      (1)map任务处理

      (2)reduce任务处理

    三、第一个MapReduce程序:WordCount

      WordCount单词计数是最简单也是最能体现MapReduce思想的程序之一,该程序完整的代码可以在Hadoop安装包的src/examples目录下找到。

      WordCount单词计数主要完成的功能是:统计一系列文本文件中每个单词出现的次数

    3.1 初始化一个words.txt文件并上传HDFS

      首先在Linux中通过Vim编辑一个简单的words.txt,其内容很简单如下所示:

    Hello Edison Chou
    Hello Hadoop RPC
    Hello Wncud Chou
    Hello Hadoop MapReduce
    Hello Dick Gu

      通过Shell命令将其上传到一个指定目录中,这里指定为:/testdir/input

    3.2 自定义Map函数

      在Hadoop 中, map 函数位于内置类org.apache.hadoop.mapreduce.Mapper<KEYIN,VALUEIN, KEYOUT, VALUEOUT>中,reduce 函数位于内置类org.apache.hadoop. mapreduce.Reducer<KEYIN, VALUEIN, KEYOUT, VALUEOUT>中。

      我们要做的就是覆盖map 函数和reduce 函数,首先我们来覆盖map函数:继承Mapper类并重写map方法

    复制代码
        /**
         * @author Edison Chou
         * @version 1.0
         * @param KEYIN
         *            →k1 表示每一行的起始位置(偏移量offset)
         * @param VALUEIN
         *            →v1 表示每一行的文本内容
         * @param KEYOUT
         *            →k2 表示每一行中的每个单词
         * @param VALUEOUT
         *            →v2 表示每一行中的每个单词的出现次数,固定值为1
         */
        public static class MyMapper extends
                Mapper<LongWritable, Text, Text, LongWritable> {
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                    throws java.io.IOException, InterruptedException {
                String[] spilted = value.toString().split(" ");
                for (String word : spilted) {
                    context.write(new Text(word), new LongWritable(1L));
                }
            };
        }
    复制代码

      Mapper 类,有四个泛型,分别是KEYIN、VALUEIN、KEYOUT、VALUEOUT,前面两个KEYIN、VALUEIN 指的是map 函数输入的参数key、value 的类型;后面两个KEYOUT、VALUEOUT 指的是map 函数输出的key、value 的类型;

    从代码中可以看出,在Mapper类和Reducer类中都使用了Hadoop自带的基本数据类型,例如String对应Text,long对应LongWritable,int对应IntWritable。这是因为HDFS涉及到序列化的问题,Hadoop的基本数据类型都实现了一个Writable接口,而实现了这个接口的类型都支持序列化。

      这里的map函数中通过空格符号来分割文本内容,并对其进行记录;

    3.3 自定义Reduce函数

      现在我们来覆盖reduce函数:继承Reducer类并重写reduce方法

    复制代码
        /**
         * @author Edison Chou
         * @version 1.0
         * @param KEYIN
         *            →k2 表示每一行中的每个单词
         * @param VALUEIN
         *            →v2 表示每一行中的每个单词的出现次数,固定值为1
         * @param KEYOUT
         *            →k3 表示每一行中的每个单词
         * @param VALUEOUT
         *            →v3 表示每一行中的每个单词的出现次数之和
         */
        public static class MyReducer extends
                Reducer<Text, LongWritable, Text, LongWritable> {
            protected void reduce(Text key,
                    java.lang.Iterable<LongWritable> values,
                    Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                    throws java.io.IOException, InterruptedException {
                long count = 0L;
                for (LongWritable value : values) {
                    count += value.get();
                }
                context.write(key, new LongWritable(count));
            };
        }
    复制代码

      Reducer 类,也有四个泛型,同理,分别指的是reduce 函数输入的key、value类型(这里输入的key、value类型通常和map的输出key、value类型保持一致)和输出的key、value 类型。

      这里的reduce函数主要是将传入的<k2,v2>进行最后的合并统计,形成最后的统计结果。

    3.4 设置Main函数

      (1)设定输入目录,当然也可以作为参数传入

    public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";

      (2)设定输出目录(输出目录需要是空目录),当然也可以作为参数传入

    public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";

      (3)Main函数的主要代码

    复制代码
         public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
    
            // 0.0:首先删除输出路径的已有生成文件
            FileSystem fs = FileSystem.get(new URI(INPUT_PATH), conf);
            Path outPath = new Path(OUTPUT_PATH);
            if (fs.exists(outPath)) {
                fs.delete(outPath, true);
            }
    
            Job job = new Job(conf, "WordCount");
            job.setJarByClass(MyWordCountJob.class);
    
            // 1.0:指定输入目录
            FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
            // 1.1:指定对输入数据进行格式化处理的类(可以省略)
            job.setInputFormatClass(TextInputFormat.class);
            // 1.2:指定自定义的Mapper类
            job.setMapperClass(MyMapper.class);
            // 1.3:指定map输出的<K,V>类型(如果<k3,v3>的类型与<k2,v2>的类型一致则可以省略)
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            // 1.4:分区(可以省略)
            job.setPartitionerClass(HashPartitioner.class);
            // 1.5:设置要运行的Reducer的数量(可以省略)
            job.setNumReduceTasks(1);
            // 1.6:指定自定义的Reducer类
            job.setReducerClass(MyReducer.class);
            // 1.7:指定reduce输出的<K,V>类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            // 1.8:指定输出目录
            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
            // 1.9:指定对输出数据进行格式化处理的类(可以省略)
            job.setOutputFormatClass(TextOutputFormat.class);
            // 2.0:提交作业
            boolean success = job.waitForCompletion(true);
            if (success) {
                System.out.println("Success");
                System.exit(0);
            } else {
                System.out.println("Failed");
                System.exit(1);
            }
        }
    复制代码

      在Main函数中,主要做了三件事:一是指定输入、输出目录;二是指定自定义的Mapper类和Reducer类;三是提交作业;匆匆看下来,代码有点多,但有些其实是可以省略的。

      (4)完整代码如下所示

    View Code

    3.5 运行吧小DEMO

      (1)调试查看控制台状态信息

      (2)通过Shell命令查看统计结果

    四、使用ToolRunner类改写WordCount

      Hadoop有个ToolRunner类,它是个好东西,简单好用。无论在《Hadoop权威指南》还是Hadoop项目源码自带的example,都推荐使用ToolRunner。

    4.1 最初的写法

      下面我们看下src/example目录下WordCount.java文件,它的代码结构是这样的:

    复制代码
    public class WordCount {
        // 略...
        public static void main(String[] args) throws Exception {
            Configuration conf = new Configuration();
            String[] otherArgs = new GenericOptionsParser(conf, 
                                                args).getRemainingArgs();
            // 略...
            Job job = new Job(conf, "word count");
            // 略...
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    复制代码

      WordCount.java中使用到了GenericOptionsParser这个类,它的作用是将命令行中参数自动设置到变量conf中。举个例子,比如我希望通过命令行设置reduce task数量,就这么写:

    bin/hadoop jar MyJob.jar com.xxx.MyJobDriver -Dmapred.reduce.tasks=5

      上面这样就可以了,不需要将其硬编码到java代码中,很轻松就可以将参数与代码分离开。

    4.2 加入ToolRunner的写法

      至此,我们还没有说到ToolRunner,上面的代码我们使用了GenericOptionsParser帮我们解析命令行参数,编写ToolRunner的程序员更懒,它将 GenericOptionsParser调用隐藏到自身run方法,被自动执行了,修改后的代码变成了这样:

    复制代码
    public class WordCount extends Configured implements Tool {
        @Override
        public int run(String[] arg0) throws Exception {
            Job job = new Job(getConf(), "word count");
            // 略...
            System.exit(job.waitForCompletion(true) ? 0 : 1);
            return 0;
        }
    
        public static void main(String[] args) throws Exception {
            int res = ToolRunner.run(new Configuration(), new WordCount(), args);
            System.exit(res);
        }
    }
    复制代码

      看看这段代码上有什么不同:

      (1)让WordCount继承Configured并实现Tool接口

      (2)重写Tool接口的run方法,run方法不是static类型,这很好。

      (3)在WordCount中我们将通过getConf()获取Configuration对象

      可以看出,通过简单的几步,就可以实现代码与配置隔离、上传文件到DistributeCache等功能。修改MapReduce参数不需要修改java代码、打包、部署,提高工作效率。

    4.3 重写WordCount程序

    复制代码
    public class MyJob extends Configured implements Tool {
        public static class MyMapper extends
                Mapper<LongWritable, Text, Text, LongWritable> {
            protected void map(LongWritable key, Text value,
                    Mapper<LongWritable, Text, Text, LongWritable>.Context context)
                    throws java.io.IOException, InterruptedException {
                           ......
                }
            };
        }
    
        public static class MyReducer extends
                Reducer<Text, LongWritable, Text, LongWritable> {
            protected void reduce(Text key,
                    java.lang.Iterable<LongWritable> values,
                    Reducer<Text, LongWritable, Text, LongWritable>.Context context)
                    throws java.io.IOException, InterruptedException {
                           ......
            };
        }
    
        // 输入文件路径
        public static final String INPUT_PATH = "hdfs://hadoop-master:9000/testdir/input/words.txt";
        // 输出文件路径
        public static final String OUTPUT_PATH = "hdfs://hadoop-master:9000/testdir/output/wordcount";
    
        @Override
        public int run(String[] args) throws Exception {
            // 首先删除输出路径的已有生成文件
            FileSystem fs = FileSystem.get(new URI(INPUT_PATH), getConf());
            Path outPath = new Path(OUTPUT_PATH);
            if (fs.exists(outPath)) {
                fs.delete(outPath, true);
            }
    
            Job job = new Job(getConf(), "WordCount");
            // 设置输入目录
            FileInputFormat.setInputPaths(job, new Path(INPUT_PATH));
            // 设置自定义Mapper
            job.setMapperClass(MyMapper.class);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(LongWritable.class);
            // 设置自定义Reducer
            job.setReducerClass(MyReducer.class);
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(LongWritable.class);
            // 设置输出目录
            FileOutputFormat.setOutputPath(job, new Path(OUTPUT_PATH));
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
            return 0;
        }
    
        public static void main(String[] args) {
            Configuration conf = new Configuration();
            try {
                int res = ToolRunner.run(conf, new MyJob(), args);
                System.exit(res);
            } catch (Exception e) {
                e.printStackTrace();
            }
        }
    
    }    

    展开全文
  • MapReduce 2.0

    2017-10-31 19:49:29
    主要描述MapReduce 2.0的应用场景及实现原理与基本架构
  • mapreduce例子

    2014-06-30 18:26:36
    mapreduce example
  • mapreduce程序

    2017-12-08 12:39:27
    在hadoop平台关于mapreduce的一些应用程序,实现对大数据的分析处理
  • mapreduce实战

    2018-03-03 17:24:00
    文档是我整理的所有关于mapreduce实战代码,里面所有的程序都是经过我自己全部运行验证通过的,现分享出供大家参考

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 234,316
精华内容 93,726
关键字:

mapreduce