mapreduce 订阅
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。 展开全文
MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念"Map(映射)"和"Reduce(归约)",是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。它极大地方便了编程人员在不会分布式并行编程的情况下,将自己的程序运行在分布式系统上。 当前的软件实现是指定一个Map(映射)函数,用来把一组键值对映射成一组新的键值对,指定并发的Reduce(归约)函数,用来保证所有映射的键值对中的每一个共享相同的键组。
信息
思想来源
Google的几篇论文
本    质
一种编程模型
特    点
分布可靠
用    途
大规模数据集的并行运算
应    用
大规模的算法图形处理、文字处理
外文名
MapReduce
MapReduce定义
MapReduce是面向大数据并行处理的计算模型、框架和平台,它隐含了以下三层含义:1)MapReduce是一个基于集群的高性能并行计算平台(Cluster Infrastructure)。它允许用市场上普通的商用服务器构成一个包含数十、数百至数千个节点的分布和并行计算集群。2)MapReduce是一个并行计算与运行软件框架(Software Framework)。它提供了一个庞大但设计精良的并行计算软件框架,能自动完成计算任务的并行化处理,自动划分计算数据和计算任务,在集群节点上自动分配和执行任务以及收集计算结果,将数据分布存储、数据通信、容错处理等并行计算涉及到的很多系统底层的复杂细节交由系统负责处理,大大减少了软件开发人员的负担。3)MapReduce是一个并行程序设计模型与方法(Programming Model & Methodology)。它借助于函数式程序设计语言Lisp的设计思想,提供了一种简便的并行程序设计方法,用Map和Reduce两个函数编程实现基本的并行计算任务,提供了抽象的操作和并行编程接口,以简单方便地完成大规模数据的编程和计算处理 [1]  。
收起全文
精华内容
下载资源
问答
  • MapReduce

    千次阅读 多人点赞 2019-06-14 15:33:14
    一、MapReduce简介 什么是MapReduceMapreduce 是一个分布式运算程序的编程框架,是用户开发“基于 hadoop 的数据分析 应用”的核心框架。 Mapreduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一...

    一、MapReduce简介

    • 什么是MapReduce?
      Mapreduce 是一个分布式运算程序的编程框架,是用户开发“基于 hadoop 的数据分析 应用”的核心框架。
      Mapreduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的 分布式运算程序,并发运行在一个hadoop 集群上

    • Hadoop的四个组件
      HDFS:分布式存储系统
      MapReduce:分布式计算系统
      YARN: hadoop 的资源调度系统
      Common: 以上三大组件的底层支撑组件,主要提供基础工具包和 RPC 框架等

    • 为什么需要 MapReduce?
      (1) 海量数据在单机上处理因为硬件资源限制,无法胜任
      (2) 而一旦将单机版程序扩展到集群来分布式运行,将极大增加程序的复杂度和开发难度
      (3) 引入 MapReduce 框架后,开发人员可以将绝大部分工作集中在业务逻辑的开发上,而将 分布式计算中的复杂性交由框架来处理
      (4)从而提高开发效率,减轻开发者的负担

    • MapReduce的优点
      1.易于编程
      2.良好的扩展性
      3.高容错性
      4.适合PB级别以上的大数据的分布式离线批处理

    • MapReduce的缺点
      1.难以实时计算(MapReduce处理的是存储在本地磁盘上的离线数据)
      2.不能流式计算(MapReduce设计处理的数据源是静态的)
      3.难以DAG计算MapReduce这些并行计算大都是基于非循环的数据流模型,也就是说,一次计算过程中,不同计算节点之间保持高度并行,这样的数据流模型使得那些需要反复使用一个特定数据集的迭代算法无法高效地运行

    二、MapReduce的工作流程

    • input
      MapReduce需要把要执行的大文件数据进行切割(split)
      每一个输入分片(input split)对应一个map任务
      输入分片(input split)存储的并非数据本身而是一个分片长度和一个记录数据位置的数组
      输入分片(input split)和HDFS(hadoop2.0之后)的block(块)关系很密切,HDFS(hadoop2.0之后)的block块的大小默认是128M,如果我们执行的大文件是128x10M,MapReduce会分为10个map任务,每个map任务都存在于它所要计算的block(块)的DataNode上
    • map
      程序员编写的map函数
      因此map函数效率相对好控制
      而且一般map操作都是本地化操作也就是在数据存储节点上进行
    • shuffle
      负责将map生成的数据传递给reduce
      因此shuffle分为在map的执行过程和在reduce的执行过程
    • reduce
      负责shuffle传递过来的数据进行合并
    • output
      最后负责输出reduce中的数据信息
      在这里插入图片描述
    展开全文
  • mapreduce

    千次阅读 2016-07-05 08:48:38
    Hadoop mapreduce对外提供了5个可编程组件,分别是InputFormat,Mapper,Partitioner,Reducer,OutputFormat mapreduce能解决的问题有一个共同特点:任务可被分解成多个子问题,且这些子问题相对独立,彼此不会...
    Hadoop mapreduce对外提供了5个可编程组件,分别是InputFormat,Mapper,Partitioner,Reducer,OutputFormat
    mapreduce能解决的问题有一个共同特点:任务可被分解成多个子问题,且这些子问题相对独立,彼此不会相互牵制。
    分治的思想。
    task分为maptask和reducetask。hdfs以固定大小的block为基本的存储单元,而对于mapreduce而言,其处理单位是split,
    split是逻辑概念,它包含一些元数据信息,比如数据的起始位置,数据长度,数据所在节点等。它的划分由用户自己决定,
    split的多少决定了maptask的数目,因为每个split会交给一个maptask处理。


    maptask执行过程(map,buffer,split,sort(partition,key),combiner):先将对应的split迭代解析成一个个key/value对,依次
    调用map()函数进行处理,Map的输出是由collector控制的,输出的数据首先被写进
    环形内存缓冲区,这个缓冲区默认大小是100M,可以通过io.sort.mb属性来设置具体的大小,当缓冲区中的数据量达到一个特定
    的阀值(io.sort.mb * io.sort.spill.percent,其中io.sort.spill.percent 默认是0.80)时,系统将会启动一个后台线程把
    缓冲区中的内容spill 到磁盘(Map输出总是写到本地磁盘)。在spill过程中,Map的输出将会继续写入到缓冲区,但如果缓冲区已经满了,Map就会被阻塞
    直到spill完成。spill线程在把缓冲区的数据写到磁盘前,会对他进行一个二次排序,首先根据数据所属的partition排序,
    然后每个partition中再按Key排序。输出包括一个索引文件和数据文件,如果设定了Combiner,将在排序输出的基础上进行。
    Combiner就是一个Mini Reducer,它在执行Map任务的节点本身运行,先对Map的输出作一次简单的Reduce,使得Map的输出更紧凑,
    更少的数据会被写入磁盘和传送到Reducer。Spill文件保存在由mapred.local.dir指定的目录中,Map任务结束后删除。
    每当内存中的数据达到spill阀值的时候,都会产生一个新的spill文件,所以在Map任务写完他的最后一个输出记录的时候,可能
    会有多个spill文件,在Map任务完成前,所有的spill文件将会被归并排序为一个索引文件和数据文件。这是一个多路归并过程,
    最大归并路数由io.sort.factor 控制(默认是10)。


    reduce执行过程(复制map输出,排序合并,读取<key,value list>进行reduce处理):
    Reduce任务的输入数据分布在集群内的多个Map任务的输出中,只要有其中一个Map任务完成,
    Reduce任务就开始拷贝他的输出。这个阶段称为复制阶段,Reduce任务拥有多个拷贝线程,可以并行的获取Map输出。可以通过
    设定mapred.reduce.parallel.copies来改变线程数。
    如果reduce端接受的数据量相当小,则直接存储在ReduceTask内存中
    (缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比),
    如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merge.percent决定),则对数据合并后溢写到磁盘中
    或者达到了Map输出的阀值的大小(由mapred.inmem.merge.threshold控制),
    在reduce复制map输出的同时,reduce任务就进入了合并排序阶段
    缓冲区中的数据将会被归并然后spill到磁盘。拷贝来的数据叠加在磁盘上,有一个后台线程会将它们归并为更大的排序文件,
    这样做节省了后期归并的时间。对于经过压缩的Map 输出,系统会自动把它们解压到内存方便对其执行归并。
    当所有的Map 输出都被拷贝后,Reduce 任务进入排序阶段(更恰当的说应该是归并阶段,因为排序在Map 端就已经完成),
    这个阶段会对所有的Map 输出进行归并排序,这个工作会重复多次才能完成。
    JVM重用


    •启动JVM是一个比较耗时的工作,所以在MapReduce中有JVM重用的机制。
    •条件是统一个作业的任务。
    •可以通过mapred.job.reuse.jvm.num.tasks定义重用次数,如果属性是-1那么为无限制。




    第三章 Mapreduce编程模型
    3.1.1 mapreduce编程接口体系结构
    mapreduce编程模型位于应用程序层和mapreduce执行器之间,可分两层,第一层是最基本的Java API,主要有五个可编程组件
    InputFormat,Mapper,Partitioner,Reducer,OutputFormat。Hadoop自带了很多可以直接使用的InputFormat,Partitioner,OutputFormat
    第二层是工具层,四个工具包JobControl(编写有依赖关系的作业),ChainMapper/ChainReducer(编写链式作业),Hadoop Streaming(脚本),Hadoop Pipes(c/c++编程)
    3.1.2 新旧API对比
    旧版api在org.apache.hadoop.mapred包中,新版在org.apache.hadoop.mapreduce包及子包中
    接口变抽象类,抽象类具有良好的向后兼容性,当需要为抽象类添加新方法时,只要新添加的方法提供了默认实现,用户之前的代码不必修改,
    编程组件变抽象类。
    3.2.1 序列化
    序列化的两个作用:永久存储和进程间通信
    Hadoop mapreduce中使对象可序列化的方法是让其对应的类实现Writable接口,对于key需要比较排序,key要实现WritableComparable接口
    3.2.3 回调机制
    mapreduce对外提供的5个组件,全部属于回调接口。当用户按照约定实现这几个接口后,mapreduce运行时环境会自动调用他们。
    3.3.1 Hadoop配置文件和作业配置
    后添加的属性取值覆盖掉前面所添加资源中的属性,被final修饰的属性不能被后面定义的属性覆盖
    Hadoop默认先加载core-default.xml,hdfs-default.xml和mapred-default.xml,然后加载管理员自定义配置文件core-site.xml,
    hdfs-site.xml和mapred-site.xml
    3.3.2 InputFormat的设计和实现
    InputFormat主要描述输入数据的格式,它提供以下两个功能:数据切分,将输入数据切分为若干个split,以便确定map task数量;
    为Mapper提供输入数据,将给定的split解析成一个个key value对
    新版API中InputFormat是抽象类,包含两种方法
    public abstract List<InputSplit> getSplits(JobContext context) throws IOException, InterruptedException;
    public abstract RecordReader<K,V> createRecordReader(InputSplit split, TaskAttemptContext context) throws IOException,InterruptedException;
    getSplits()方法在逻辑上对输入数据分片,InputSplit只记录了分片的元数据信息,比如起始位置,长度及所在的节点列表
    createRecordReader()方法返回RecordReader对象,该对象可将输入的InputSplit解析成key/value对。map执行过程中会不断调用
    RecordReader对象中的方法,迭代获取key/value对交给map函数处理
    整个基于文件的InputFormat体系的设计思路是由公共基类FileInputFormat采用统一的方法对各种输入文件进行切分。而由各个派生
    InputFormat自己提供机制将进一步解析InputSplit,具体的实现是基类FileInputFormat提供getSplit实现,派生类提供createRecordReader实现
    系统自带的各种InputFormat实现。他们都集成自InputFormat,基于数据库的InputFormat实现DBInputFormat,基于文件的InputFormat
    实现基类FileInputFormat,并由此派生出TextInputFormat和,KeyValueTextInputFormat,NLInputFormat。针对二进制格式的SequenceFileInputFormat等
    FileInputFormat基类的实现,它的重要功能是为各种InputFormat提供统一的getSplit函数,该函数最核心的两个算法是文件切分算法
    和host选择算法。
    (1)文件切分算法
    旧API:
    splitSize=max{minSize,min{goalSize,blockSize}}
    minSize,InputSplit的最小值,由配置参数mapred.min.split.size确定,默认是1
    goalSize,根据totalSize/numSplits,其中totalSize为文件总大小,numSplits为用户设定的Map Task个数,默认是1
    blockSize,文件在hdfs中块的大小,默认64M
    新API:
    splitSize=max(minSize, min(maxSize, blockSize));
    -----------------
    blockSize = file.getBlockSize();
    -----------------
    minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
    getFormatMinSplitSize() return 1;
    getMinSplitSize(JobContext job) return job.getConfiguration().getLong(SPLIT_MINSIZE, 1L);
    SPLIT_MINSIZE = "mapreduce.input.fileinputformat.split.minsize";
    -----------------
    maxSize = getMaxSplitSize(job);
    getMaxSplitSize(JobContext context) return context.getConfiguration().getLong(SPLIT_MAXSIZE,Long.MAX_VALUE);
    SPLIT_MAXSIZE = "mapreduce.input.fileinputformat.split.maxsize";


    一旦确定splitSize值后,FileInputFormat将文件依次切分成splitSize的InputSplit,最后剩余不足的splitSize的数据块单独成为
    一个InputSplit。
    (2) host选择算法
    待InputSplit切分方案确定后,下一步要确定每个InputSplit的元数据信息。这通常由四部分组成:<file,start,length,hosts>,分别表示
    InputSplit所在的文件、起始位置,长度,以及所在的host(节点)列表。在进行任务调度时,优先让空闲资源处理本节点的数据,
    如果节点上没有可处理的数据,则处理同一机架上的数据,最差的情况是处理其他机架上的数据。
    虽然InputSplit对应的block可能位于多个节点上,但考虑任务调度的效率,通常不会把所有节点加到InputSplit的host列表中,而是选择包含(InputSplit)
    数据总量最大的前几个节点(Hadoop限制最多选择10个,多余的会过滤掉),以作为人物调度时判断任务是否具有本地性的主要凭证。为此
    FileInputFormat设计了一个简单有效的启发式算法:首先按照rack包含的数据量对rack进行排序,然后在rack内部按照每个node包含的数据量对node排序。
    最后取前N个node的host作为InputSplit的host列表,这里的N为block副本数。
    host选择算法可知,当InputSplit尺寸大于block尺寸时,map Task并不能实现完全数据本地性,所以当使用基于FileInputFormat时,为提高map task的数据
    本地性,尽量使用InputSplit与block大小相同。
    派生类实现getRecordReader函数,该函数返回一个RecordReader对象,它实现了类似迭代器的功能,将某个InputSplit解析成
    一个个key/value对,在具体实现时RecordReader应考虑以下两点:定位记录边界,为了能够识别一条完整的记录,记录之间应该添加一些同步标识,
    对于TextInputFormat,每两条记录之间存在换行符。另外,FileInputFormat对文件的切分是严格按照偏移量来的,因而InputSplit的第一条记录和最后一条记录
    可能会被从中间分开,为了解决这种记录跨InputSplit的读取问题,RecordReader规定每个InputSplit的第一条不完整记录划给前一个InputSplit处理。
    TextInputFormat关联的是LineRecordReader,对于跨InputSplit的行,LineRecordReader会自动跨InputSplit去读取。
    以行记录形式的文本,还真可能存在一行记录被划分到不同的Block,甚至不同的DataNode上去。通过分析FileInputFormat里面的getSplits方法,可以得出,某一行记录同样也可能被划分到不同的InputSplit。
    第二点解析key/value,对于TextInputFormat,每一行的内容即为value,而该行在整个文件中的偏移量为key
    几个简单的结论:
    1. 一个split不会包含零点几或者几点几个Block,一定是包含大于等于1个整数个Block
    2. 一个split不会包含两个File的Block,不会跨越File边界
    3. split和Block的关系是一对多的关系
    4. maptasks的个数最终决定于splits的长度


    文件输入
    •实现类:FileInputFormat
    •通过文件作为输入源的基类。
    •四个方法:
    •addInputPath()
    •addInputPaths()
    •setInputPath()
    •setInputPaths()
    •FileInputFormat会按HDFS块的大小来分割文件
    •避免分割
    •继承FileInputFormat 重载isSplitable()
    •return false
    •实现类:TextInputFormat
    •TextInputFormat 是默认的输入格式。
    •包括:
    •KeyValueTextInputFormat
    •NLineInputFormat
    •XML
    •输入分片与HDFS块之间的关系
    •TextInputFormat的某一条记录可能跨块存在


    二进制输入


    •实现类:SequenceFileInputFormat
    •处理二进制数据
    •包括:
    •SequenceFileAsTextInputFormat
    •SequenceFileAsBinaryInputFormat


    多文件输入


    •实现类:MultipleInputs
    •处理多种文件输入
    •包括:
    •addInputPath


    数据库输入


    •实现类:DBInputFormat
    •注意使用,因为连接过多,数据库无法承受。
    3.3.3 OutputFormat抽象类设计与实现
    OutputFormat主要用于描述输出数据的格式,它能够将用户提供的key/value对写入特定格式的文件中。
    OutputFormat抽象类提供了checkOutputSpecs方法在用户提交作业前,检查输出目录是否存在,存在则抛出异常
    getRecordWriter方法返回一个RecordWriter对象,该类中的方法write接收一个key/value对,并将其写入文件。
    基类FileOutputFormat需要提供所有基于文件的OutputFormat实现的公共功能,主要有以下两个:
    (1)实现checkOutputSpecs抽象方法
    (2)处理side-effect file,任务的side-effect file并不是任务的最终输出文件,而是具有特殊用途的任务专属文件,
    它的典型应用是推测式任务,在hadoop中,同一作业的某些任务可能慢于其他任务,这种任务会拖慢整个作业的执行速度,Hadoop会在另一个节点上启动一个相同的
    任务,该任务便被称为推测式任务,为防止这两个任务同时往输出文件中写入数据发生冲突,FileOutputFormat会为每个task的数据创建一个side-effect file,并将
    产生的数据临时写入该文件,待task结束后,再移动到最终输出目录。这些文件的创建,删除,移动等均由OutputCommitter完成。它是一个抽象类,FileOutputCommitter
    继承自OutputCommitter,并重写相关方法。用户可以编写自己的OutputCommitter,并通过参数mapred.output.committer.class指定
    MultipleOutputs合并了旧版本的MultipleOutputs功能和MultipleOutputFormat功能,新api都是用mapreduce包。
    用法:在setup中new MultipleOutputs 对象,mapreduce中mos.write,cleanup中close()。
    public <K, V> void write(String namedOutput, K key, V value)
          throws IOException, InterruptedException


    public <K, V> void write(String namedOutput, K key, V value,
          String baseOutputPath) throws IOException, InterruptedException 


    public void write(KEYOUT key, VALUEOUT value, String baseOutputPath) 
          throws IOException, InterruptedException 


    LazyOutputFormat.setOutputFormatClass(job,
                    TextOutputFormat.class);


    3.3.4 Mapper与Reducer解析
    mapreduce提供了很多Mapper/Reducer实现:
    ChainMapper/ChainReducer:用于支持链式作业
    IdentityMapper/IdentityReducer:对于key/value不进行任何处理直接输出
    InvertMapper:交换key/value位置
    RegexMapper:正则表达式字符串匹配
    TokenMapper:将字符串分割成若干个token(单词),可做WordCount的Mapper
    LongSumReducer:以key为组,对long类型的value求累加和
    Mapper包括初始化setup,map操作(mapreduce框架会通过InputFormat的RecordReader从InputSplit获取一个个keyvalue对,交给map函数处理)和清理cleanup三部分
    新版API和旧版API区别:Mapper由接口变为抽象类,且不再继承jobConfiguration和Closeable两个接口,而是直接添加setup和cleanup两个方法
    将参数封装到Context对象中
    去掉MapperRunnable接口,在Mapper中添加run方法,以方便用户定制map()函数的调用方法,实现与原来一致。
    对于一个Mapreduce应用程序,不一定非要存在Mapper。mapreduce提供了run方法,用户可以重写该方法自己实现处理逻辑


    3.3.5 Partitioner抽象类的设计与实现
    Partitioner的作用是对Mapper产生的中间结果进行分片,以便将同一分组的数据交给同一个Reduce处理,他直接影响Reduce阶段的负载均衡
    包含一个抽象方法getPartition(KEY key, VALUE value, int numPartitions),numPartitions指reduce的个数,
    mapreduce提供了两个Partitioner实现HashPartitioner和TotalOrderPartitioner,HashPartitioner是默认实现,它是基于hash值得分片
    方式(key.hashCode() & Integer.MAX_VALUE) % numReduceTasks。
    TotalOrderPartitioner提供了一种基于区间的分片方法,它能够按照大小将数据分成若干个区间,保证后一个区间的所有数据均大于
    前一个区间的数据。(首先数据采样后根据reduce task数量获得分割点(Hadoop自带很多采样算法IntercalSampler,RandomSampler,SplitSampler),
    ,然后map阶段,将分割点保存到trie树中,以便于快速定位查找,最后reduce阶段,每个reduce对分配的区间数据进行局部排序,最终得到全排序数据)
    基于TotalOrderPartitioner全排序的效率跟key分布规律和采样算法有直接关系,key值分布越均匀且采样越具有代表性,则reduce task负载
    越均衡,全排序效率越高。
    TotalOrderPartitioner有两个典型的应用实例:TeraSort和Hbase批量数据导入
    Hadoop中TeraSort算法分析http://dongxicheng.org/mapreduce/hadoop-terasort-analyse/
    Hbase以resion为单位划分数据,resion有序,resion内部数据有序(按key排列)
    3.4.1 Hadoop Streaming的实现原理
    Hadoop Streaming允许用户将任何可执行文件或脚本作为Mapper/Reducer,Hadoop Streaming要求用户编写的Mapper和Reducer
    从标准输入中读取数据,并将结果写到标准数据中,类似以Linux的管道。
    提交作业的命令:hadoop jar hadoop-streaming.jar -file myScript -input myInputDirs -output myOutputDirs -mapper Myscript -reducer /bin/wc
    -file 可以使可执行文件成为作业的一部分,并且会一起打包提交
    实现Hadoop Streaming的技术关键点是如何使用标准输入输出实现java与可执行文件或脚本文件之间的通信,为此Hadoop Streaming使用了JDK中的
    java.lang.ProcessBuilder类,该类提供了一整套管理操作系统进程的方法,包括创建、启动和停止进程等。
    3.4.2 Hadoop Pipes实现原理
    Hadoop Pipes是hadoop方便c/c++用户编写mapreduce程序而设计的工具。其设计思想是将应用逻辑相关的C++代码放到单独进程中,然后通过
    Socket让java代码与c++代码通信完成数据计算。
    Hadoop Streaming与Hadoop Pipes的不同之处是hadoop streaming 采用标准输入输出,而Hadoop Pipes采用Socket
    3.5.1 JobControl的实现原理
    用户只需要使用job.addDepending()函数添加作业依赖关系接口,jobControl会按照依赖关系调度各个作业
    //创建configuration对象,配置configuration
    //创建job
    //设置依赖关系
    //创建jobControl对象,jobControl.addJob(job),jobControl.run()
    jobControl由两个类组成jobControl和job,job封装了一个mapreduce作业及其对应的依赖关系,主要负责监控各个依赖作业的运行状态
    以此更新自己的状态(WAITING,REDAY,RUNNING,SUCCESS,FALDED),如果一个作业的依赖作业失败,则该作业也失败。
    jobControl封装了一系列mapreduce作业及其对应的依赖关系,它将处于不同状态的作业放入不同的hash表中,按照job的状态转移作业,直到所有作业完成
    在实现的时候,jobControl包含一个线程用于周期性的监控和更新各个作业的运行状态,调度依赖作业运行完成的作业,提交处于ready状态的作业。
    3.5.2 ChainMapper/ChainReducer的实现原理
    ChainMapper和ChainReducer主要为了解决线性链式Mapper提出的,也就是说在map或者reduce阶段存在多个Mapper,这些Mapper像Linux管道一样,
    前一个Mapper的输出结果直接重定向到下一个Mapper的输入,形成一个流水线,形式类似于[MAP+,REDUCE,MAP*]。注意对于任意一个mapreduce作业,
    map和reduce阶段可以有无限个map,但reduce只能有一个。
    3.5.3 Hadoop工作流引擎
    Hadoop之上出现了很多开源的工作流引擎,主要概括为两类:隐式工作流引擎和显式工作流引擎。
    隐式工作流引擎在mapreduce之上添加了一个语言抽象层,允许用户使用更简单的方式编写应用程序,如hive和pig
    显式工作流引擎直接面向mapreduce应用程序开发,提供了一种作业依赖关系描述,并按照这种方式进行作业调度,典型代表如oozie(采用xml)






    map reduce 个数
    调度器


    mapreduce压缩
    但可以很简单的设置mapred.compress.map.output为true 启用该功能。
    压缩所使用的库由mapred.map.output.compression.codec来设定
    mapred.output.compress=true
    mapred.output.compression.codec=org.apache.Hadoop.io.compress.GzipCode
    Using Compression in MapReduce
    MapReduce读取输入路径中的压缩文件时会自动完成数据解压(可参考CompressionCodecFactory)。
    如果MapReduce Job的结果输出需要使用压缩,可以通过设置Job的相关配置属性来实现:
    mapreduce.output.fileoutputformat.compress:true
    mapreduce.output.fileoutputformat.compress.codec:CompressionCodec全限定类名eg:org.apache.Hadoop.io.compress.GzipCode
    也可以通过FileOutputFormat提供的静态方法设置,如:
    FileOutputFormat.setCompressOutput(job, true);
    FileOutputFormat.setOutputCompressorClass(job, GzipCodec.class);
    不同的输出文件格式可能相应的设置属性会有不同。
    Compressing map output
    Map Task的输出被写出到本地磁盘,而且需要通过网络传输至Reduce Task的节点,只要简单地使用一个快速的压缩算法(如LZO、LZ4、Snappy)就可以带来性能的提升,因为压缩机制的使用避免了Map Tasks与Reduce Tasks之间大量中间结果数据被传输。可以通过设置相应的Job配置属性开启:
    mapreduce.map.output.compress:true
    mapreduce.map.output.compress.codec:CompressionCodec全限定类名
    也可以通过Configuration API进行设置:
    new API:
    Configuration conf = new Configuration();
    conf.setBoolean(Job.MAP_OUTPUT_COMPRESS, true);
    conf.setClass(Job.MAP_OUTPUT_COMPRESS_CODEC, GzipCodec.class, CompressionCodec.class);
    Job job = new Job(conf);


    old API:
    conf.setCompressMapOutput(true);
    conf.setMapOutputCompressorClass(GzipCodec.class);
    展开全文
  • Hadoop_MapReduce_数据去重示例

    万次阅读 2020-05-13 16:46:44
    Hadoop_MapReduce_数据去重 示例:删除输入文件中的重复数据,重复的数据只保留一个 输入文件1:D:\data\distinct\file1.txt zhangsan 500 450 jan lisi 200 150 jan jerry 200 150 feb amy 200 150 feb 输入文件2...

    Hadoop_MapReduce_数据去重

    示例:删除输入文件中的重复数据,重复的数据只保留一个

    输入文件1:D:\data\distinct\file1.txt

    zhangsan 500 450 jan
    lisi 200 150 jan
    jerry 200 150 feb
    amy 200 150 feb
    

    输入文件2:D:\data\distinct\file2.txt

    zhangsan 500 450 jan
    lisi 200 150 jan
    jack 150 160 jan
    tom 500 500 feb
    jerry 200 150 feb
    zhangsan 500 450 jan
    lisi 200 150 jan
    
    1. DistinctMapper
    package com.blu.distinct;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class DistinctMapper extends Mapper<LongWritable, Text, Text, NullWritable> {
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, NullWritable>.Context context)
    			throws IOException, InterruptedException {
    		context.write(value, NullWritable.get());
    	}
    }
    
    1. DistinctReducer
    package com.blu.distinct;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class DistinctReducer extends Reducer<Text, NullWritable, Text, NullWritable>{
    	@Override
    	protected void reduce(Text key, Iterable<NullWritable> value,
    			Reducer<Text, NullWritable, Text, NullWritable>.Context context) throws IOException, InterruptedException {
    		context.write(key, NullWritable.get());
    	}
    }
    
    1. DistinctJob
    package com.blu.distinct;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class DistinctJob {
    
    	public static void main(String[] args) throws Exception {
    		Job job = Job.getInstance();
    		job.setJarByClass(DistinctJob.class);
    		job.setMapperClass(DistinctMapper.class);
    		job.setReducerClass(DistinctReducer.class);
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(NullWritable.class);
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(NullWritable.class);
    		FileInputFormat.addInputPath(job, new Path(args[0]));
    		FileOutputFormat.setOutputPath(job, new Path(args[1]));
    		boolean flag = job.waitForCompletion(true);
    		System.exit(flag ? 0 : 1);
    
    	}
    
    }
    
    1. 运行参数:
    D:\data\distinct\ D:\data\output
    
    1. 运行结果:
    amy 200 150 feb
    jack 150 160 jan
    jerry 200 150 feb
    lisi 200 150 jan
    tom 500 500 feb
    zhangsan 500 450 jan
    
    展开全文
  • 初学MapReduce

    万次阅读 多人点赞 2019-11-13 08:24:52
    MapReduce计算框架 并行计算框架 一个大的任务拆分成多个小任务,将多个小任务分发到多个节点上。每个节点同时执行计算。 MapReduce核心思想 分而治之,先分后和:将一个大的、复杂的工作或任务,拆分成多个小的...

    MapReduce计算框架

    在这里插入图片描述

    并行计算框架

    一个大的任务拆分成多个小任务,将多个小任务分发到多个节点上。每个节点同时执行计算。
    在这里插入图片描述

    MapReduce核心思想

    分而治之,先分后和:将一个大的、复杂的工作或任务,拆分成多个小的任务,并行处理,最终进行合并。
    MapReduce由Map和Reduce组成
    Map: 将数据进行拆分
    Reduce:对数据进行汇总
    在这里插入图片描述

    Shuffle阶段4个步骤

    在这里插入图片描述

    第三步:对输出的key,value对进行分区。相同key的数据发送到同一个reduce里面去,相同key合并,value形成一个集合
    
    第四步:对不同分区的数据按照相同的key进行排序
    
    第五步:对分组后的数据进行规约(combine操作),降低数据的网络拷贝(可选步骤)
    
    第六步:对排序后的额数据进行分组,分组的过程中,将相同key的value放到一个集合当中
    

    MapReduce计算任务的步骤

    在这里插入图片描述

    第1步:InputFormat
          InputFormat 到hdfs上读取数据
          将数据传给Split
          
    第1.1步:Split
          Split将数据进行逻辑切分,
          将数据传给RR
          
    第1.2步:RR
          RR:将传入的数据转换成一行一行的数据,输出行首字母偏移量和偏移量对应的数据
          将数据传给Map
          
    第2步:Map
          Map:根据业务需求实现自定义代码
          将数据传给Shuffle的partition
          
    第3步:partition
          partition:按照一定的分区规则,将key value的list进行分区。
          将数据传给Shuffle的Sort
          
    第4步:Sort
          Sort:对分区内的数据进行排序
          将数据传给Shuffle的combiner
          
    第5步:combiner
          combiner:对数据进行局部聚合。
          将数据传给Shuffle的Group
          
    第6步:Group
          Group:将相同key的key提取出来作为唯一的key,
          将相同key对应的value获取出来作为value的list
          将数据传给Reduce
          
    第7步:Reduce
          Reduce:根据业务需求进行最终的合并汇总。
          将数据传给outputFormat
          
    第8步:outputFormat
           outputFormat:将数据写入HDFS
    
     MapReduce程序的输入:若是一个路径,那么程序会计算路径下的所有文件。
                         若是一个文件,那么只计算这个文件。
     MapReduce程序的输出:输出的路径必须不能存在
     Map的数量不能人为设置,Reduce的数量可以人为设置。
     Reduce数量越多,计算速度越快。
    

    Shuffle阶段的Partition分区算法

    算法:对key 进行哈希,获取到一个哈希值,用这个哈希值与reducetask的数量取余。余几,这个数据就放在余数编号的partition中。

    Split的逻辑切分

      获取读取到的数据,对数据进行逻辑切分,切分的大小是128M.
      这里的128 与HDFS数据块的128没有任何关系
      HDFS 128 是存储层面的数据切分
      split128 是计算层面的128,只不过数据恰好相等。
      两个128相同的原因是,一个集成程序能够正好计算一个数据块。
    

    在这里插入图片描述

    Map的输出到内存

    • Map的输出先写入环形缓冲区(默认大小100M-可以人为调整)(可以再输出的同时写入数据),当缓冲区内的数据达到阈值(默认0.8-可以人为调整)时,对数据进行flash。
    • flash 出去的数据的数量达到一定量(默认4个)时,进行数据的合并。
      在这里插入图片描述

    Reduce数据读取

    • Reduce 主动发出拷贝进程(默认5个copy进程)到Map端获取数据。
    • 获取到数据后,将数据写入内存,当数据达到阈值,将数据flash出去。
    • 当flash出去文件达到一定的量时,进行数据的合并。最终将数据发送给reduce
      在这里插入图片描述

    Map到reduce内存角度宏观流程

    在这里插入图片描述

    Map到reduce处理流程角度宏观步骤

    在这里插入图片描述

    Shuffle阶段

    从Map的输出到reduce的输入
    流程角度
    在这里插入图片描述
    内存角度
    在这里插入图片描述

    展开全文
  • MapReduce介绍

    万次阅读 多人点赞 2018-10-21 12:45:11
    MapReduce产生背景 如果让你统计日志里面的出现的某个URL的总次数,让你自己去写个单机版的程序,写个逻辑:无非就是读这个文件一行,然后把那个地方截取出来,截取出来之后,然后可以把它放到一个HashMap里面...
  • MapReduce的学习和使用

    万次阅读 2020-05-23 11:24:29
    MapReduce的学习和使用 本文是基于CentOS 7.3系统环境,进行MapReduce的学习和使用 CentOS 7.3 1. MapReduce简介 1.1 MapReduce定义 MapReduce是一个分布式运算程序的编程框架,是基于Hadoop的数据分析计算的核心...
  • MapReduce 简介

    千次阅读 2020-08-30 01:44:17
    1. MapReduce 介绍 MapReduce思想在生活中处处可见。或多或少都曾接触过这种思想。MapReduce的思 想核心是“分而治之”,适用于大量复杂的任务处理场景(大规模数据处理场景)。 Map负责“分”,即把复杂的...
  • MapReduce简介

    千次阅读 2020-10-09 16:08:09
    什么是MapReduceMapreduce 是一个分布式运算程序的编程框架,是用户开发“基于 hadoop 的数据分析 应用”的核心框架。 Mapreduce 核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的 分布式运算...
  • MapReduce--2--MapReduce全局计数器

    万次阅读 2017-10-23 13:29:17
    MapReduce的全局计数器 1.1、介绍 计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况。   MapReduce 计数器(Counter)为我们...
  • MapReduce详解

    万次阅读 多人点赞 2017-12-11 09:26:41
    1.1 MapReduce是什么  Hadoop MapReduce是一个软件框架,基于该框架能够容易地编写应用程序,这些应用程序能够运行在由上千个商用机器组成的大集群上,并以一种可靠的,具有容错能力的方式并行地处理上TB级别的...
  • MapReduce框架源码解析

    万次阅读 2019-11-01 17:01:07
    MapReduce框架源码解析
  • mapreduce代码示例Welcome to MapReduce algorithm example. Before writing MapReduce programs in CloudEra Environment, first we will discuss how MapReduce algorithm works in theory with some simple ...
  • MapReduce概述

    2019-11-04 16:24:08
    MapReduce概述 源自于Google的MapReduce论文,论文发表于2004年12月 Hadoop的MapReduce是Google MapReduce的克隆版。与HDFS非常类似。 MapReduce是分布式框架中的计算框架。分布式其实就是将一个作业在多个节点上...
  • MapReduce1.0和MapReduce2.0

    千次阅读 2017-06-29 11:48:04
    Hadoop:The Definitive Guid 总结 Chapter 6 MapReduce的工作原理   1.剖析MapReduce作业运行机制 1).经典MapReduce--MapReduce1.0 整个过程有有4个独立的实体 客户端:提交MapReduceJobTracker:协调作业...
  • MapReduce快速入门系列(1) | 什么是MapReduce

    千次阅读 多人点赞 2020-04-25 12:03:09
    随着HDFS系列的完结,下面就到了MapReduce系列了,很荣幸各位小伙伴们能够继续一如既往的观看博主的博文。 目录1. MapReduce的核心思想2. 分布式并行计算框架MapReduce3. MapReduce设计构思1. 如何对付大数据处理:...
  • mapreduce练习资源

    2018-04-03 07:44:31
    该资源中中包含MapReduce的练习题,可以是你同过这些练习题更好的掌握MapReduce
  • Hadoop MapReduce

    千次阅读 2019-12-16 11:21:11
    将会按照HDFS ,MapReduce, Yarn的顺序更新, 近期还会整理Zookeper, Hive, pig等相关 如果对您有帮助或者解决了您的问题, 就帮我点个赞或者评论关注支持吧, 您的鼓励是我写博客的最大支持, 感谢! 目前是大四研一...
  • MapReduce 设计模式

    热门讨论 2014-03-19 17:02:39
    MapReduce 设计模式,深入理解MapReduce编程模式,更好的利用MapReduce模型
  • MapReduce编程案例系列篇(01-15)

    万次阅读 多人点赞 2017-10-23 18:27:32
    由于本人最开始接触大数据工作,主要以写MapReduce程序为主,虽然现在有流行的言论称MapReduce这种运行很慢的分布式计算编程框架将要被各种内存计算框架取代。但是MapRedcue也会吸收很多流行的内存计算的各种优点,...
  • WARN mapreduce.JobResourceUploader: Hadoop command-line option parsing not performed. Implement the Tool interface and execute your application with ToolRunner to remedy this. 这个警告的意思是代码里...
  • 大数据mapreduce案例

    2018-08-25 21:08:29
    大数据mapreduce案例介绍,包括代码解释,详解MRS工作流程

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 73,758
精华内容 29,503
关键字:

mapreduce