精华内容
下载资源
问答
  • 集合操作符集合操作符是一个项目符号,它利用关键路径对集合中的元素进行操作。下面阐释有效的集合操作符,关键路径应用实例,以及他们产生的结果。 集合操作符是关键路径,它作为 valueForKeyPath:方法参数。...

    集合操作符

    集合操作符是一个项目符号,它利用关键路径对集合中的元素进行操作。下面阐释有效的集合操作符,关键路径应用实例,以及他们产生的结果。
    集合操作符是关键路径,它作为 valueForKeyPath:方法参数。操作符字符串以符号@为开头,关键路径集合在操作符的左边,数组和集合都适用于操作符,在操作符的右边是一个集合属性列表。
    实例1 操作关键路径格式

    这里写图片描述

    除了@count操作符以外,都有一个集合属性列表在操作符的右边。

    注意:目前是不支持自定义操作符的。

    操作符返回结果有几种类型:
    - 简单的操作符,返回字符串,数字或者日期,主要依靠右边的关键字(参考简单操作符)
    - 对象操作符返回一个数组实例,参考对象操作符
    - 数组和集合返回一个数组或者集合,参考数组和集合操作符

    简单集合运算符

    例子:Transactions对象数据
    这里写图片描述
    在数组或者集合中,简单集合运算符对运算左边的属性进行操作。

    @avg

    @avg操作符使用valueForKeyPath: 函数,以关键路径为参数,返回在操作位右边属性的平均值,类型为double,这个值被NSNumber包装。如果值为nil,将会以0替代。

    NSNumber *transactionAverage = [transactions valueForKeyPath:@"@avg.amount"];

    transactionAverage is $456.54

    @count
    @count操作符返回在操作符左边的关键路径指定的对象的个数(NSNumber对象),操作符右边关键路径被忽略。

    NSNumber *numberOfTransactions = [transactions valueForKeyPath:@"@count"];

    numberOfTransactions is 13.

    @max
    @max操作符通过比较操作符右边指定的属性的值,来返回一个最大值。通过对象的compare:方法来比较 根据指定的关键路径来决定最大值。这个参与比较的对象的属性必须支持相互比较。如果操作符右边关键路径指定的值是nil,将会忽略。

    NSDate *latestDate = [transactions valueForKeyPath:@"@max.date"];

    latestDate is Jul 15, 2010

    @min

    NSDate *earliestDate = [transactions valueForKeyPath:@"@min.date"];

    earliestDate is Dec 1, 2009

    @sum
    @sum返回操作符右边关键路径指定的属性的值的和,将每一个数值转成double类型,在计算值的和,其结果被包装在一个NSNumber对象中,如果操作符右边关键路径指定的值是nil,将被忽略.

    NSNumber *amountSum = [transactions valueForKeyPath:@"@sum.amount"];

    amountSum is $5,935.00.

    对象操作符

    @distinctUnionOfObjects
    @distinctUnionOfObjects操作符根据操作符右边的关键路径指定的属性返回一个包含不同对象的数组。

    NSArray *payees = [transactions valueForKeyPath:@"@distinctUnionOfObjects.payee"];

    payees数组包含字符串: Car Loan, General Cable, Animal Hospital, Green Power, Mortgage.
    @unionOfObjects操作符和它类似,但是不会移除相同对象的副本。
    注意:如果右关键路径的属性为nil,则会抛出一个异常。

    数组和集合操作符

    数组和集合操作符能在集合中嵌套操作,这个集合中的每一个元素也是一个集合。
    arrayOfTransactions变量在每个操作符中都有使用。它是一个包含transaction对象的2个数组的数组。

    // Create the array that contains additional arrays.
    
    self.arrayOfTransactionsArray = [NSMutableArray array];
    
    
    
    // Add the array of objects used in the above examples.
    
    [arrayOfTransactionsArray addObject:transactions];
    
    
    
    // Add a second array of objects; this array contains alternate values.
    
    [arrayOfTransactionsArrays addObject:moreTransactions];

    transaction包含上图中的数据,moreTransactions包含下图中的数据。
    moreTransactions数组中的数据

    @distinctUnionOfArrays
    @distinctUnionOfArrays操作符根据操作符右边的关键路径指定的属性返回一个包含不同对象的数组

    NSArray *payees = [arrayOfTransactionsArrays valueForKeyPath:@"@distinctUnionOfArrays.payee"];

    结果:payees数组包含下列值:Hobby Shop, Mortgage, Animal Hospital, Second Mortgage, Car Loan, General Cable - Cottage, General Cable, Green Power
    @unionOfArrays和它类似,只是不移除副本。
    注意:如果右关键路径的属性为nil,则会抛出一个异常。

    @distinctUnionOfSets
    @distinctUnionOfSets操作符返回一个包含不同对象的集合,根据操作符右边的关键路径指定的属性。
    和@distinctUnionOfArrays类似,除了返回一个集合而不是数组。
    @distinctUnionOfArrays

    展开全文
  • MapReduce中文翻译

    2008-11-07 10:00:19
    MapReduce中文翻译,MapReduce是一个编程模型、和处理,产生大数据集的相关实现。用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集。然后再指定一个reduce函数合并所有的具有相同中间key的中间...
  • MapReduce:超大机群上的简单数据处理 ...用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间value.下面将列举许多可以用这...

    MapReduce:超大机群上的简单数据处理

     

                                              摘要

    MapReduce是一个编程模型,和处理,产生大数据集的相关实现.用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间value.下面将列举许多可以用这个模型来表示的现实世界的工作.

    以这种方式写的程序能自动的在大规模的普通机器上实现并行化.这个运行时系统关心这些细节:分割输入数据,在机群上的调度,机器的错误处理,管理机器之间必要的通信.这样就可以让那些没有并行分布式处理系统经验的程序员利用大量分布式系统的资源.

    我们的MapReduce实现运行在规模可以灵活调整的由普通机器组成的机群上,一个典型的MapReduce计算处理几千台机器上的以TB计算的数据.程序员发现这个系统非常好用:已经实现了数以百计的MapReduce程序,每天在Google的机群上都有1000多个MapReduce程序在执行.

    1.介绍

    在过去的5年里,作者和Google的许多人已经实现了数以百计的为专门目的而写的计算来处理大量的原始数据,比如,爬行的文档,Web请求日志,等等.为了计算各种类型的派生数据,比如,倒排索引,Web文档的图结构的各种表示,每个主机上爬行的页面数量的概要,每天被请求数量最多的集合,等等.很多这样的计算在概念上很容易理解.然而,输入的数据量很大,并且只有计算被分布在成百上千的机器上才能在可以接受的时间内完成.怎样并行计算,分发数据,处理错误,所有这些问题综合在一起,使得原本很简介的计算,因为要大量的复杂代码来处理这些问题,而变得让人难以处理.

    作为对这个复杂性的回应,我们设计一个新的抽象模型,它让我们表示我们将要执行的简单计算,而隐藏并行化,容错,数据分布,负载均衡的那些杂乱的细节,在一个库里.我们的抽象模型的灵感来自Lisp和许多其他函数语言的map和reduce的原始表示.我们认识到我们的许多计算都包含这样的操作:在我们输入数据的逻辑记录上应用map操作,来计算出一个中间key/value对集,在所有具有相同key的value上应用reduce操作,来适当的合并派生的数据.功能模型的使用,再结合用户指定的map和reduce操作,让我们可以非常容易的实现大规模并行化计算,和使用再次执行作为初级机制来实现容错.

    这个工作的主要贡献是通过简单有力的接口来实现自动的并行化和大规模分布式计算,结合这个接口的实现来在大量普通的PC机上实现高性能计算.

    第二部分描述基本的编程模型,并且给一些例子.第三部分描述符合我们的基于集群的计算环境的MapReduce的接口的实现.第四部分描述我们觉得编程模型中一些有用的技巧.第五部分对于各种不同的任务,测量我们实现的性能.第六部分探究在Google内部使用MapReduce作为基础来重写我们的索引系统产品.第七部分讨论相关的,和未来的工作.

    2.编程模型

    计算利用一个输入key/value对集,来产生一个输出key/value对集.MapReduce库的用户用两个函数表达这个计算:map和reduce.

    用户自定义的map函数,接受一个输入对,然后产生一个中间key/value对集.MapReduce库把所有具有相同中间key I的中间value聚合在一起,然后把它们传递给reduce函数.

    用户自定义的reduce函数,接受一个中间key I和相关的一个value集.它合并这些value,形成一个比较小的value集.一般的,每次reduce调用只产生0或1个输出value.通过一个迭代器把中间value提供给用户自定义的reduce函数.这样可以使我们根据内存来控制value列表的大小.

    2.1 实例

    考虑这个问题:计算在一个大的文档集合中每个词出现的次数.用户将写和下面类似的伪代码:

    map(String key,String value):

     //key:文档的名字

     //value:文档的内容

     for each word w in value:

        EmitIntermediate(w,"1");

     

    reduce(String key,Iterator values):

    //key:一个词

    //values:一个计数列表

     int result=0;

     for each v in values:

       result+=ParseInt(v);

     Emit(AsString(resut));

    map函数产生每个词和这个词的出现次数(在这个简单的例子里就是1).reduce函数把产生的每一个特定的词的计数加在一起.

    另外,用户用输入输出文件的名字和可选的调节参数来填充一个mapreduce规范对象.用户然后调用MapReduce函数,并把规范对象传递给它.用户的代码和MapReduce库链接在一起(用C++实现).附录A包含这个实例的全部文本.

    2.2类型

    即使前面的伪代码写成了字符串输入和输出的term格式,但是概念上用户写的map和reduce函数有关联的类型:

     map(k1,v1) ->list(k2,v2)

     reduce(k2,list(v2)) ->list(v2)

    例如,输入的key,value和输出的key,value的域不同.此外,中间key,value和输出key,values的域相同.

    我们的C++实现传递字符串来和用户自定义的函数交互,并把它留给用户的代码,来在字符串和适当的类型间进行转换.

    2.3更多实例

    这里有一些让人感兴趣的简单程序,可以容易的用MapReduce计算来表示.

    分布式的Grep(UNIX工具程序, 可做文件内的字符串查找):如果输入行匹配给定的样式,map函数就输出这一行.reduce函数就是把中间数据复制到输出.

    计算URL访问频率:map函数处理web页面请求的记录,输出(URL,1).reduce函数把相同URL的value都加起来,产生一个(URL,记录总数)的对.

    倒转网络链接图:map函数为每个链接输出(目标,源)对,一个URL叫做目标,包含这个URL的页面叫做源.reduce函数根据给定的相关目标URLs连接所有的源URLs形成一个列表,产生(目标,源列表)对.

    每个主机的术语向量:一个术语向量用一个(词,频率)列表来概述出现在一个文档或一个文档集中的最重要的一些词.map函数为每一个输入文档产生一个(主机名,术语向量)对(主机名来自文档的URL).reduce函数接收给定主机的所有文档的术语向量.它把这些术语向量加在一起,丢弃低频的术语,然后产生一个最终的(主机名,术语向量)对.

    倒排索引:map函数分析每个文档,然后产生一个(词,文档号)对的序列.reduce函数接受一个给定词的所有对,排序相应的文档IDs,并且产生一个(词,文档ID列表)对.所有的输出对集形成一个简单的倒排索引.它可以简单的增加跟踪词位置的计算.

    分布式排序:map函数从每个记录提取key,并且产生一个(key,record)对.reduce函数不改变任何的对.这个计算依赖分割工具(在4.1描述)和排序属性(在4.2描述).

    3实现

    MapReduce接口可能有许多不同的实现.根据环境进行正确的选择.例如,一个实现对一个共享内存较小的机器是合适的,另外的适合一个大NUMA的多处理器的机器,而有的适合一个更大的网络机器的集合.

    这部分描述一个在Google广泛使用的计算环境的实现:用交换机连接的普通PC机的大机群.我们的环境是:

    1.Linux操作系统,双处理器,2-4GB内存的机器.

    2.普通的网络硬件,每个机器的带宽或者是百兆或者千兆,但是平均小于全部带宽的一半.

    3.因为一个机群包含成百上千的机器,所有机器会经常出现问题.

    4.存储用直接连到每个机器上的廉价IDE硬盘.一个从内部文件系统发展起来的分布式文件系统被用来管理存储在这些磁盘上的数据.文件系统用复制的方式在不可靠的硬件上来保证可靠性和有效性.

    5.用户提交工作给调度系统.每个工作包含一个任务集,每个工作被调度者映射到机群中一个可用的机器集上.

     

    3.1执行预览

    通过自动分割输入数据成一个有M个split的集,map调用被分布到多台机器上.输入的split能够在不同的机器上被并行处理.通过用分割函数分割中间key,来形成R个片(例如,hash(key) mod R),reduce调用被分布到多台机器上.分割数量(R)和分割函数由用户来指定.

    图1显示了我们实现的MapReduce操作的全部流程.当用户的程序调用MapReduce的函数的时候,将发生下面的一系列动作(下面的数字和图1中的数字标签相对应):

        1.在用户程序里的MapReduce库首先分割输入文件成M个片,每个片的大小一般从 16到64MB(用户可以通过可选的参数来控制).然后在机群中开始大量的拷贝程序.

          2.这些程序拷贝中的一个是master,其他的都是由master分配任务的worker.有M 个map任务和R个reduce任务将被分配.管理者分配一个map任务或reduce任务给一个空闲的worker.

    3.一个被分配了map任务的worker读取相关输入split的内容.它从输入数据中分析出key/value对,然后把key/value对传递给用户自定义的map函数.由map函数产生的中间key/value对被缓存在内存中.

    4.缓存在内存中的key/value对被周期性的写入到本地磁盘上,通过分割函数把它们写入R个区域.在本地磁盘上的缓存对的位置被传送给master,master负责把这些位置传送给reduce worker.

    5.当一个reduce worker得到master的位置通知的时候,它使用远程过程调用来从map worker的磁盘上读取缓存的数据.当reduce worker读取了所有的中间数据后,它通过排序使具有相同key的内容聚合在一起.因为许多不同的key映射到相同的reduce任务,所以排序是必须的.如果中间数据比内存还大,那么还需要一个外部排序.

          6.reduce worker迭代排过序的中间数据,对于遇到的每一个唯一的中间key,它把key和相关的中间value集传递给用户自定义的reduce函数.reduce函数的输出被添加到这个reduce分割的最终的输出文件中.

    7.当所有的map和reduce任务都完成了,管理者唤醒用户程序.在这个时候,在用户程序里的MapReduce调用返回到用户代码.

    在成功完成之后,mapreduce执行的输出存放在R个输出文件中(每一个reduce任务产生一个由用户指定名字的文件).一般,用户不需要合并这R个输出文件成一个文件--他们经常把这些文件当作一个输入传递给其他的MapReduce调用,或者在可以处理多个分割文件的分布式应用中使用他们.

    3.2master数据结构

    master保持一些数据结构.它为每一个map和reduce任务存储它们的状态(空闲,工作中,完成),和worker机器(非空闲任务的机器)的标识.

    master就像一个管道,通过它,中间文件区域的位置从map任务传递到reduce任务.因此,对于每个完成的map任务,master存储由map任务产生的R个中间文件区域的大小和位置.当map任务完成的时候,位置和大小的更新信息被接受.这些信息被逐步增加的传递给那些正在工作的reduce任务.

    3.3容错

    因为MapReduce库被设计用来使用成百上千的机器来帮助处理非常大规模的数据,所以这个库必须要能很好的处理机器故障.

    worker故障

    master周期性的ping每个worker.如果master在一个确定的时间段内没有收到worker返回的信息,那么它将把这个worker标记成失效.因为每一个由这个失效的worker完成的map任务被重新设置成它初始的空闲状态,所以它可以被安排给其他的worker.同样的,每一个在失败的worker上正在运行的map或reduce任务,也被重新设置成空闲状态,并且将被重新调度.

    在一个失败机器上已经完成的map任务将被再次执行,因为它的输出存储在它的磁盘上,所以不可访问.已经完成的reduce任务将不会再次执行,因为它的输出存储在全局文件系统中.

    当一个map任务首先被worker A执行之后,又被B执行了(因为A失效了),重新执行这个情况被通知给所有执行reduce任务的worker.任何还没有从A读数据的reduce任务将从worker B读取数据.

    MapReduce可以处理大规模worker失败的情况.例如,在一个MapReduce操作期间,在正在运行的机群上进行网络维护引起80台机器在几分钟内不可访问了,MapReduce master只是简单的再次执行已经被不可访问的worker完成的工作,继续执行,最终完成这个MapReduce操作.

    master失败

    可以很容易的让管理者周期的写入上面描述的数据结构的checkpoints.如果这个master任务失效了,可以从上次最后一个checkpoint开始启动另一个master进程.然而,因为只有一个master,所以它的失败是比较麻烦的,因此我们现在的实现是,如果master失败,就中止MapReduce计算.客户可以检查这个状态,并且可以根据需要重新执行MapReduce操作.

    在错误面前的处理机制

    当用户提供的map和reduce操作对它的输出值是确定的函数时,我们的分布式实现产生,和全部程序没有错误的顺序执行一样,相同的输出.

    我们依赖对map和reduce任务的输出进行原子提交来完成这个性质.每个工作中的任务把它的输出写到私有临时文件中.一个reduce任务产生一个这样的文件,而一个map任务产生R个这样的文件(一个reduce任务对应一个文件).当一个map任务完成的时候,worker发送一个消息给master,在这个消息中包含这R个临时文件的名字.如果master从一个已经完成的map任务再次收到一个完成的消息,它将忽略这个消息.否则,它在master的数据结构里记录这R个文件的名字.

    当一个reduce任务完成的时候,这个reduce worker原子的把临时文件重命名成最终的输出文件.如果相同的reduce任务在多个机器上执行,多个重命名调用将被执行,并产生相同的输出文件.我们依赖由底层文件系统提供的原子重命名操作来保证,最终的文件系统状态仅仅包含一个reduce任务产生的数据.

    我们的map和reduce操作大部分都是确定的,并且我们的处理机制等价于一个顺序的执行的这个事实,使得程序员可以很容易的理解程序的行为.当map或/和reduce操作是不确定的时候,我们提供虽然比较弱但是合理的处理机制.当在一个非确定操作的前面,一个reduce任务R1的输出等价于一个非确定顺序程序执行产生的输出.然而,一个不同的reduce任务R2的输出也许符合一个不同的非确定顺序程序执行产生的输出.

    考虑map任务M和reduce任务R1,R2的情况.我们设定e(Ri)为已经提交的Ri的执行(有且仅有一个这样的执行).这个比较弱的语义出现,因为e(R1)也许已经读取了由M的执行产生的输出,而e(R2)也许已经读取了由M的不同执行产生的输出.

    3.4存储位置

    在我们的计算机环境里,网络带宽是一个相当缺乏的资源.我们利用把输入数据(由GFS管理)存储在机器的本地磁盘上来保存网络带宽.GFS把每个文件分成64MB的一些块,然后每个块的几个拷贝存储在不同的机器上(一般是3个拷贝).MapReduce的master考虑输入文件的位置信息,并且努力在一个包含相关输入数据的机器上安排一个map任务.如果这样做失败了,它尝试在那个任务的输入数据的附近安排一个map任务(例如,分配到一个和包含输入数据块在一个switch里的worker机器上执行).当运行巨大的MapReduce操作在一个机群中的一部分机器上的时候,大部分输入数据在本地被读取,从而不消耗网络带宽.

    3.5任务粒度

    象上面描述的那样,我们细分map阶段成M个片,reduce阶段成R个片.M和R应当比worker机器的数量大许多.每个worker执行许多不同的工作来提高动态负载均衡,也可以加速从一个worker失效中的恢复,这个机器上的许多已经完成的map任务可以被分配到所有其他的worker机器上.

    在我们的实现里,M和R的范围是有大小限制的,因为master必须做O(M+R)次调度,并且保存O(M*R)个状态在内存中.(这个因素使用的内存是很少的,在O(M*R)个状态片里,大约每个map任务/reduce任务对使用一个字节的数据).

    此外,R经常被用户限制,因为每一个reduce任务最终都是一个独立的输出文件.实际上,我们倾向于选择M,以便每一个单独的任务大概都是16到64MB的输入数据(以便上面描述的位置优化是最有效的),我们把R设置成我们希望使用的worker机器数量的小倍数.我们经常执行MapReduce计算,在M=200000,R=5000,使用2000台工作者机器的情况下.

    3.6备用任务

    一个落后者是延长MapReduce操作时间的原因之一:一个机器花费一个异乎寻常地的长时间来完成最后的一些map或reduce任务中的一个.有很多原因可能产生落后者.例如,一个有坏磁盘的机器经常发生可以纠正的错误,这样就使读性能从30MB/s降低到3MB/s.机群调度系统也许已经安排其他的任务在这个机器上,由于计算要使用CPU,内存,本地磁盘,网络带宽的原因,引起它执行MapReduce代码很慢.我们最近遇到的一个问题是,一个在机器初始化时的Bug引起处理器缓存的失效:在一个被影响的机器上的计算性能有上百倍的影响.

    我们有一个一般的机制来减轻这个落后者的问题.当一个MapReduce操作将要完成的时候,master调度备用进程来执行那些剩下的还在执行的任务.无论是原来的还是备用的执行完成了,工作都被标记成完成.我们已经调整了这个机制,通常只会占用多几个百分点的机器资源.我们发现这可以显著的减少完成大规模MapReduce操作的时间.作为一个例子,将要在5.3描述的排序程序,在关闭掉备用任务的情况下,要比有备用任务的情况下多花44%的时间.

    4技巧

    尽管简单的map和reduce函数的功能对于大多数需求是足够的了,但是我们开发了一些有用的扩充.这些将在这个部分描述.

    4.1分割函数

    MapReduce用户指定reduce任务和reduce任务需要的输出文件的数量.在中间key上使用分割函数,使数据分割后通过这些任务.一个缺省的分割函数使用hash方法(例如,hash(key) mod R).这个导致非常平衡的分割.然后,有的时候,使用其他的key分割函数来分割数据有非常有用的.例如,有时候,输出的key是URLs,并且我们希望每个主机的所有条目保持在同一个输出文件中.为了支持像这样的情况,MapReduce库的用户可以提供专门的分割函数.例如,使用"hash(Hostname(urlkey)) mod R"作为分割函数,使所有来自同一个主机的URLs保存在同一个输出文件中.

    4.2顺序保证

    我们保证在一个给定的分割里面,中间key/value对以key递增的顺序处理.这个顺序保证可以使每个分割产出一个有序的输出文件,当输出文件的格式需要支持有效率的随机访问key的时候,或者对输出数据集再作排序的时候,就很容易.

     

    4.3combiner函数

    在某些情况下,允许中间结果key重复会占据相当的比重,并且用户定义的reduce函数

    满足结合律和交换律.一个很好的例子就是在2.1部分的词统计程序.因为词频率倾向于一个zipf分布(齐夫分布),每个map任务将产生成百上千个这样的记录<the,1>.所有的这些计数将通过网络被传输到一个单独的reduce任务,然后由reduce函数加在一起产生一个数字.我们允许用户指定一个可选的combiner函数,先在本地进行合并一下,然后再通过网络发送.

    在每一个执行map任务的机器上combiner函数被执行.一般的,相同的代码被用在combiner和reduce函数.在combiner和reduce函数之间唯一的区别是MapReduce库怎样控制函数的输出.reduce函数的输出被保存最终输出文件里.combiner函数的输出被写到中间文件里,然后被发送给reduce任务.

    部分使用combiner可以显著的提高一些MapReduce操作的速度.附录A包含一个使用combiner函数的例子.

    4.4输入输出类型

    MapReduce库支持以几种不同的格式读取输入数据.例如,文本模式输入把每一行看作是一个key/value对.key是文件的偏移量,value是那一行的内容.其他普通的支持格式以key的顺序存储key/value对序列.每一个输入类型的实现知道怎样把输入分割成对每个单独的map任务来说是有意义的(例如,文本模式的范围分割确保仅仅在每行的边界进行范围分割).虽然许多用户仅仅使用很少的预定意输入类型的一个,但是用户可以通过提供一个简单的reader接口来支持一个新的输入类型.

    一个reader不必要从文件里读数据.例如,我们可以很容易的定义它从数据库里读记录,或从内存中的数据结构读取.

    4.5副作用

    有的时候,MapReduce的用户发现在map操作或/和reduce操作时产生辅助文件作为一个附加的输出是很方便的.我们依靠应用程序写来使这个副作用成为原子的.一般的,应用程序写一个临时文件,然后一旦这个文件全部产生完,就自动的被重命名.

    对于单个任务产生的多个输出文件来说,我们没有提供其上的两阶段提交的原子操作支持.因此,一个产生需要交叉文件连接的多个输出文件的任务,应该使确定性的任务.不过这个限制在实际的工作中并不是一个问题.

    4.6跳过错误记录

    有的时候因为用户的代码里有bug,导致在某一个记录上map或reduce函数突然crash掉.这样的bug使得MapReduce操作不能完成.虽然一般是修复这个bug,但是有时候这是不现实的;也许这个bug是在源代码不可得到的第三方库里.有的时候也可以忽略一些记录,例如,当在一个大的数据集上进行统计分析.我们提供一个可选的执行模式,在这个模式下,MapReduce库检测那些记录引起的crash,然后跳过那些记录,来继续执行程序.

    每个worker程序安装一个信号处理器来获取内存段异常和总线错误.在调用一个用户自定义的map或reduce操作之前,MapReduce库把记录的序列号存储在一个全局变量里.如果用户代码产生一个信号,那个信号处理器就会发送一个包含序号的"last gasp"UDP包给MapReduce的master.当master不止一次看到同一个记录的时候,它就会指出,当相关的map或reduce任务再次执行的时候,这个记录应当被跳过.

    4.7本地执行

    调试在map或reduce函数中问题是很困难的,因为实际的计算发生在一个分布式的系统中,经常是有一个master动态的分配工作给几千台机器.为了简化调试和测试,我们开发了一个可替换的实现,这个实现在本地执行所有的MapReduce操作.用户可以控制执行,这样计算可以限制到特定的map任务上.用户以一个标志调用他们的程序,然后可以容易的使用他们认为好用的任何调试和测试工具(例如,gdb).

    4.8状态信息

    master运行一个HTTP服务器,并且可以输出一组状况页来供人们使用.状态页显示计算进度,象多少个任务已经完成,多少个还在运行,输入的字节数,中间数据字节数,输出字节数,处理百分比,等等.这个页也包含到标准错误的链接,和由每个任务产生的标准输出的链接.用户可以根据这些数据预测计算需要花费的时间,和是否需要更多的资源.当计算比预期的要慢很多的时候,这些页面也可以被用来判断是不是这样.

    此外,最上面的状态页显示已经有多少个工作者失败了,和当它们失败的时候,那个map和reduce任务正在运行.当试图诊断在用户代码里的bug时,这个信息也是有用的.

    4.9计数器

    MapReduce库提供一个计数器工具,来计算各种事件的发生次数.例如,用户代码想要计算所有处理的词的个数,或者被索引的德文文档的数量.

    为了使用这个工具,用户代码创建一个命名的计数器对象,然后在map或/和reduce函数里适当的增加计数器.例如:

    Counter * uppercase;

    uppercase=GetCounter("uppercase");

    map(String name,String contents):

     for each word w in contents:

        if(IsCapitalized(w)):

          uppercase->Increment();

        EmitIntermediate(w,"1");

    来自不同worker机器上的计数器值被周期性的传送给master(在ping回应里).master把来自成功的map和reduce任务的计数器值加起来,在MapReduce操作完成的时候,把它返回给用户代码.当前计数器的值也被显示在master状态页里,以便人们可以查看实际的计算进度.当计算计数器值的时候消除重复执行的影响,避免数据的累加.(在备用任务的使用,和由于出错的重新执行,可以产生重复执行)

    有些计数器值被MapReduce库自动的维护,比如,被处理的输入key/value对的数量,和被产生的输出key/value对的数量.

    用户发现计数器工具对于检查MapReduce操作的完整性很有用.例如,在一些MapReduce操作中,用户代码也许想要确保输出对的数量完全等于输入对的数量,或者处理过的德文文档的数量是在全部被处理的文档数量中属于合理的范围.

    5性能

    在本节,我们用在一个大型集群上运行的两个计算来衡量MapReduce的性能.一个计算用来在一个大概1TB的数据中查找特定的匹配串.另一个计算排序大概1TB的数据.

    这两个程序代表了MapReduce的用户实现的真实的程序的一个大子集.一类是,把数据从一种表示转化到另一种表示.另一类是,从一个大的数据集中提取少量的关心的数据.

    5.1机群配置

    所有的程序在包含大概1800台机器的机群上执行.机器的配置是:2个2G的Intel Xeon超线程处理器,4GB内存,两个160GB IDE磁盘,一个千兆网卡.这些机器部署在一个由两层的,树形交换网络中,在根节点上大概有100到2000G的带宽.所有这些机器都有相同的部署(对等部署),因此任意两点之间的来回时间小于1毫秒.

     

    在4GB的内存里,大概有1-1.5GB被用来运行在机群中其他的任务.这个程序是在周末的下午开始执行的,这个时候CPU,磁盘,网络基本上是空闲的.

    5.2Grep

    这个Grep程序扫描大概10^10个,每个100字节的记录,查找比较少的3字符的查找串(这个查找串出现在92337个记录中).输入数据被分割成大概64MB的片(M=15000),全部 的输出存放在一个文件中(R=1).

    图2显示计算过程随时间变化的情况.Y轴表示输入数据被扫描的速度.随着更多的机群被分配给这个MapReduce计算,速度在逐步的提高,当有1764个worker的时候这个速度达到最高的30GB/s.当map任务完成的时候,速度开始下降,在计算开始后80秒,输入的速度降到0.这个计算持续的时间大概是150秒.这包括了前面大概一分钟的启动时间.启动时间用来把程序传播到所有的机器上,等待GFS打开1000个输入文件,得到必要的位置优化信息.

    5.3排序

    这个sort程序排序10^10个记录,每个记录100个字节(大概1TB的数据).这个程序是模仿TeraSort的.

    这个排序程序只包含不到50行的用户代码.其中有3行map函数用来从文本行提取10字节的排序key,并且产生一个由这个key和原始文本行组成的中间key/value对.我们使用一个内置的Identity函数作为reduce操作.这个函数直接把中间key/value对作为输出的key/value对.最终的排序输出写到一个2路复制的GFS文件中(也就是,程序的输出会写2TB的数据).

    象以前一样,输入数据被分割成64MB的片(M=15000).我们把排序后的输出写到4000个文件中(R=4000).分区函数使用key的原始字节来把数据分区到R个小片中.

    我们以这个基准的分割函数,知道key的分布情况.在一般的排序程序中,我们会增加一个预处理的MapReduce操作,这个操作用于采样key的情况,并且用这个采样的key的分布情况来计算对最终排序处理的分割点。

    图3(a)显示这个排序程序的正常执行情况.左上图显示输入数据的读取速度.这个速度最高到达13GB/s,并且在不到200秒所有map任务完成之后迅速滑落到0.注意到这个输入速度小于Grep.这是因为这个排序map任务花费大概一半的时间和带宽,来把中间数据写到本地硬盘中.而Grep相关的中间数据可以忽略不计.

    左中图显示数据通过网络从map任务传输给reduce任务的速度.当第一个map任务完成后,这个排序过程就开始了.图示上的第一个高峰是启动了第一批大概1700个reduce任务(整个MapReduce任务被分配到1700台机器上,每个机器一次只执行一个reduce任务).大概开始计算后的300秒,第一批reduce任务中的一些完成了,我们开始执行剩下的reduce任务.全部的排序过程持续了大概600秒的时间.

    左下图显示排序后的数据被reduce任务写入最终文件的速度.因为机器忙于排序中间数据,所以在第一个排序阶段的结束和写阶段的开始有一个延迟.写的速度大概是2-4GB/s.大概开始计算后的850秒写过程结束.包括前面的启动过程,全部的计算任务持续的891秒.这个和TeraSort benchmark的最高纪录1057秒差不多.

    需要注意的事情是:因此位置优化的原因,很多数据都是从本地磁盘读取的而没有通过我们有限带宽的网络,所以输入速度比排序速度和输出速度都要快.排序速度比输出速度快的原因是输出阶段写两个排序后数据的拷贝(我们写两个副本的原因是为了可靠性和可用性).我们写两份的原因是因为底层文件系统的可靠性和可用性的要求.如果底层文件系统用类似容错编码(erasure coding)的方式,而不采用复制写的方式,在写盘阶段可以降低网络带宽的要求。

    5.4备用任务的影响

    在图3(b)中,显示我们不用备用任务的排序程序的执行情况.除了它有一个很长的几乎没有写动作发生的尾巴外,执行流程和图3(a)相似.在960秒后,只有5个reduce任务没有完成.然而,就是这最后几个落后者知道300秒后才完成.全部的计算任务执行了1283秒,多花了44%的时间.

    5.5机器失效

    在图3(c)中,显示我们有意的在排序程序计算过程中停止1746台worker中的200台机器上的程序的情况.底层机群调度者在这些机器上马上重新开始新的worker程序(因为仅仅程序被停止,而机器仍然在正常运行).

    因为已经完成的map工作丢失了(由于相关的map worker被杀掉了),需要重新再作,所以worker死掉会导致一个负数的输入速率.相关map任务的重新执行很快就重新执行了.整个计算过程在933秒内完成,包括了前边的启动时间(只比正常执行时间多了5%的时间).

    6经验

    我们在2003年的2月写了MapReduce库的第一个版本,并且在2003年的8月做了显著的增强,包括位置优化,worker机器间任务执行的动态负载均衡,等等.从那个时候起,我们惊奇的发现MapReduce函数库广泛用于我们日常处理的问题.它现在在Google内部各个领域内广泛应用,包括:

        大规模机器学习问题

    Google News和Froogle产品的机器问题.

    提取数据产生一个流行查询的报告(例如,Google Zeitgeist).

    为新的试验和产品提取网页的属性(例如,从一个web页的大集合中提取位置信息   用在位置查询).

       大规模的图计算.

    图4显示了我们主要的源代码管理系统中,随着时间推移,MapReduce程序的显著增加,从2003年早先时候的0个增长到2004年9月份的差不多900个不同的程序.MapReduce之所以这样的成功,是因为他能够在不到半小时时间内写出一个简单的能够应用于上千台机器的大规模并发程序,并且极大的提高了开发和原形设计的周期效率.并且,他可以让一个完全没有分布式和/或并行系统经验的程序员,能够很容易的利用大量的资源.

    在每一个任务结束的时候,MapReduce函数库记录使用的计算资源的统计信息.在图1里,我们列出了2004年8月份在Google运行的一些MapReduce的工作的统计信息.

    6.1大规模索引

    到目前为止,最成功的MapReduce的应用就是重写了Google web 搜索服务所使用到的index系统.索引系统处理爬虫系统抓回来的超大量的文档集,这些文档集保存在GFS文件里.这些文档的原始内容的大小,超过了20TB.索引程序是通过一系列的,大概5到10次MapReduce操作来建立索引.通过利用MapReduce(替换掉上一个版本的特别设计的分布处理的索引程序版本)有这样一些好处:

       索引的代码简单,量少,容易理解,因为容错,分布式,并行处理都隐藏在MapReduce库中了.例如,当使用MapReduce函数库的时候,计算的代码行数从原来的3800行C++代码一下减少到大概700行代码.

       MapReduce的函数库的性能已经非常好,所以我们可以把概念上不相关的计算步骤分开处理,而不是混在一起以期减少在数据上的处理.这使得改变索引过程很容易.例如,我们对老索引系统的一个小更改可能要好几个月的时间,但是在新系统内,只需要花几天时间就可以了.

       索引系统的操作更容易了,这是因为机器的失效,速度慢的机器,以及网络失效都已经由MapReduce自己解决了,而不需要操作人员的交互.另外,我们可以简单的通过对索引系统增加机器的方式提高处理性能.

    7相关工作

    很多系统都提供了严格的设计模式,并且通过对编程的严格限制来实现自动的并行计算.例如,一个结合函数可以通过N个元素的数组的前缀在N个处理器上使用并行前缀计算在log N的时间内计算完.MapReduce是基于我们的大型现实计算的经验,对这些模型的一个简化和精炼.并且,我们还提供了基于上千台处理器的容错实现.而大部分并发处理系统都只在小规模的尺度上实现,并且机器的容错还是程序员来控制的.

    Bulk Synchronous Programming以及一些MPI primitives提供了更高级别的抽象,可以更容易写出并行处理的程序.这些系统和MapReduce系统的不同之处在,MapReduce利用严格的编程模式自动实现用户程序的并发处理,并且提供了透明的容错处理.

    我们本地的优化策略是受active disks等技术的启发,在active disks中,计算任务是尽量推送到靠近本地磁盘的处理单元上,这样就减少了通过I/O子系统或网络的数据量.我们在少量磁盘直接连接到普通处理机运行,来代替直接连接到磁盘控制器的处理机上,但是一般的步骤是相似的.

    我们的备用任务的机制和在Charlotte系统上的积极调度机制相似.这个简单的积极调度的一个缺陷是,如果一个任务引起了一个重复性的失败,那个整个计算将无法完成.我们通过在故障情况下跳过故障记录的机制,在某种程度上解决了这个问题.

    MapReduce实现依赖一个内置的机群管理系统来在一个大规模共享机器组上分布和运行用户任务.虽然这个不是本论文的重点,但是集群管理系统在理念上和Condor等其他系统是一样的.

    在MapReduce库中的排序工具在操作上和NOW-Sort相似.源机器(map worker)分割将要被排序的数据,然后把它发送到R个reduce worker中的一个上.每个reduce worker来本地排序它的数据(如果可能,就在内存中).当然,NOW-Sort没有用户自定义的map和reduce函数,使得我们的库可以广泛的应用.

    River提供一个编程模型,在这个模型下,处理进程可以靠在分布式的队列上发送数据进行彼此通讯.和MapReduce一样,River系统尝试提供对不同应用有近似平均的性能,即使在不对等的硬件环境下或者在系统颠簸的情况下也能提供近似平均的性.River是通过精心调度硬盘和网络的通讯,来平衡任务的完成时间.MapReduce不和它不同.利用严格编程模型,MapReduce构架来把问题分割成大量的任务.这些任务被自动的在可用的worker上调度,以便速度快的worker可以处理更多的任务.这个严格编程模型也让我们可以在工作快要结束的时候安排冗余的执行,来在非一致处理的情况减少完成时间(比如,在有慢机或者阻塞的worker的时候).

    BAD-FS是一个很MapReduce完全不同的编程模型,它的目标是在一个广阔的网络上执行工作.然而,它们有两个基本原理是相同的.(1)这两个系统使用冗余的执行来从由失效引起的数据丢失中恢复.(2)这两个系统使用本地化调度策略,来减少通过拥挤的网络连接发送的数据数量.

    TACC是一个被设计用来简化高有效性网络服务结构的系统.和MapReduce一样,它通过再次执行来实现容错.

    8结束语

    MapReduce编程模型已经在Google成功的用在不同的目的.我们把这个成功归于以下几个原因:第一,这个模型使用简单,甚至对没有并行和分布式经验的程序员也是如此,因为它隐藏了并行化,容错,位置优化和负载均衡的细节.第二,大量不同的问题可以用MapReduce计算来表达.例如,MapReduce被用来,为Google的产品web搜索服务,排序,数据挖掘,机器学习,和其他许多系统,产生数据.第三,我们已经在一个好几千台计算机的大型集群上开发实现了这个MapReduce.这个实现使得对于这些机器资源的利用非常简单,因此也适用于解决Google遇到的其他很多需要大量计算的问题.

    从这个工作中我们也学习到了一些东西.首先,严格的编程模型使得并行化和分布式计算简单,并且也易于构造这样的容错计算环境.第二,网络带宽是系统的瓶颈.因此在我们的系统中大量的优化目标是减少通过网络发送的数据量,本地优化使用我们从本地磁盘读取数据,并且把中间数据写到本地磁盘,以保留网络带宽.第三,冗余的执行可以用来减少速度慢的机器的影响,和控制机器失效和数据丢失.

    感谢

    Josh Levenberg校定和扩展了用户级别的MapReduce API,并且结合他的适用经验和其他人的改进建议,增加了很多新的功能.MapReduce从GFS中读取和写入数据.我们要感谢Mohit Aron,Howard Gobioff,Markus Gutschke,David Krame,Shun-Tak Leung,和Josh Redstone,他们在开发GFS中的工作.我们还感谢Percy Liang Olcan Sercinoglu 在开发用于MapReduce的集群管理系统得工作.Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach为本论文提出了宝贵的意见.OSDI的无名审阅者,以及我们的审核者Eric Brewer,在论文应当如何改进方面给出了有益的意见.最后,我们感谢Google的工程部的所有MapReduce的用户,感谢他们提供了有用的反馈,建议,以及错误报告等等.

    A单词频率统计

    本节包含了一个完整的程序,用于统计在一组命令行指定的输入文件中,每一个不同的单词出现频率.

    #include "mapreduce/mapreduce.h"

    //用户map函数

    class WordCounter : public Mapper {

     public:

        virtual void Map(const MapInput& input) {

          const string& text = input.value();

          const int n = text.size();

          for (int i = 0; i < n; ) {

            //跳过前导空格

            while ((i < n) && isspace(text[i]))

                 i++;

             // 查找单词的结束位置

             int start = i;

             while ((i < n) && !isspace(text[i]))

                  i++;

             if (start < i)

                Emit(text.substr(start,i-start),"1");

            }

     

         }

     

    };

     

    REGISTER_MAPPER(WordCounter);

    //用户的reduce函数

    class Adder : public Reducer {

        virtual void Reduce(ReduceInput* input) {

                 //迭代具有相同key的所有条目,并且累加它们的value

                  int64 value = 0;

                  while (!input->done()) {

                         value += StringToInt(input->value());

                         input->NextValue();

                  }

                   //提交这个输入key的综合

                  Emit(IntToString(value));

           }

     

    };

    REGISTER_REDUCER(Adder);

    int main(int argc, char** argv) {

           ParseCommandLineFlags(argc, argv);

           MapReduceSpecification spec;

           // 把输入文件列表存入"spec"

           for (int i = 1; i < argc; i++) {

                  MapReduceInput* input = spec.add_input();

                  input->set_format("text");

                  input->set_filepattern(argv[i]);

                  input->set_mapper_class("WordCounter");

           }

            //指定输出文件:

           // /gfs/test/freq-00000-of-00100

           // /gfs/test/freq-00001-of-00100

          // ...

           MapReduceOutput* out = spec.output();

           out->set_filebase("/gfs/test/freq");

           out->set_num_tasks(100);

           out->set_format("text");

           out->set_reducer_class("Adder");

           // 可选操作:在map任务中做部分累加工作,以便节省带宽

           out->set_combiner_class("Adder");

           // 调整参数: 使用2000台机器,每个任务100MB内存

           spec.set_machines(2000);

           spec.set_map_megabytes(100);

           spec.set_reduce_megabytes(100);

           // 运行它

           MapReduceResult result;

           if (!MapReduce(spec, &result)) abort();

           // 完成: 'result'结构包含计数,花费时间,和使用机器的信息

           return 0;

    }

    展开全文
  • MapReduce论文中文翻译

    千次阅读 2017-03-14 11:34:24
    原文地址:  ... 译者: alex ...MapReduce是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。... pair的数据集合,输出中间的基于key/value pair的数据集合;然后再创建一个Reduce函数用

    原文地址: 
    http://labs.google.com/papers/mapreduce.html

    译者: alex

    摘要

    MapReduce是一个编程模型,也是一个处理和生成超大数据集的算法模型的相关实现。用户首先创建一个Map函数处理一个基于key/value pair的数据集合,输出中间的基于key/value pair的数据集合;然后再创建一个Reduce函数用来合并所有的具有相同中间key值的中间value值。现实世界中有很多满足上述处理模型的例子,本论文将详细描述这个模型。

    MapReduce架构的程序能够在大量的普通配置的计算机上实现并行化处理。这个系统在运行时只关心:如何分割输入数据,在大量计算机组成的集群上的调度,集群中计算机的错误处理,管理集群中计算机之间必要的通信。采用MapReduce架构可以使那些没有并行计算和分布式处理系统开发经验的程序员有效利用分布式系统的丰富资源。

    我们的MapReduce实现运行在规模可以灵活调整的由普通机器组成的集群上:一个典型的MapReduce计算往往由几千台机器组成、处理以TB计算的数据。程序员发现这个系统非常好用:已经实现了数以百计的MapReduce程序,在Google的集群上,每天都有1000多个MapReduce程序在执行。

    介绍

    在过去的5年里,包括本文作者在内的Google的很多程序员,为了处理海量的原始数据,已经实现了数以百计的、专用的计算方法。这些计算方法用来处理大量的原始数据,比如,文档抓取(类似网络爬虫的程序)、Web请求日志等等;也为了计算处理各种类型的衍生数据,比如倒排索引、Web文档的图结构的各种表示形势、每台主机上网络爬虫抓取的页面数量的汇总、每天被请求的最多的查询的集合等等。大多数这样的数据处理运算在概念上很容易理解。然而由于输入的数据量巨大,因此要想在可接受的时间内完成运算,只有将这些计算分布在成百上千的主机上。如何处理并行计算、如何分发数据、如何处理错误?所有这些问题综合在一起,需要大量的代码处理,因此也使得原本简单的运算变得难以处理。

    为了解决上述复杂的问题,我们设计一个新的抽象模型,使用这个抽象模型,我们只要表述我们想要执行的简单运算即可,而不必关心并行计算、容错、数据分布、负载均衡等复杂的细节,这些问题都被封装在了一个库里面。设计这个抽象模型的灵感来自Lisp和许多其他函数式语言的Map和Reduce的原语。我们意识到我们大多数的运算都包含这样的操作:在输入数据的“逻辑”记录上应用Map操作得出一个中间key/value pair集合,然后在所有具有相同key值的value值上应用Reduce操作,从而达到合并中间的数据,得到一个想要的结果的目的。使用MapReduce模型,再结合用户实现的Map和Reduce函数,我们就可以非常容易的实现大规模并行化计算;通过MapReduce模型自带的“再次执行”(re-execution)功能,也提供了初级的容灾实现方案。

    这个工作(实现一个MapReduce框架模型)的主要贡献是通过简单的接口来实现自动的并行化和大规模的分布式计算,通过使用MapReduce模型接口实现在大量普通的PC机上高性能计算。

    第二部分描述基本的编程模型和一些使用案例。第三部分描述了一个经过裁剪的、适合我们的基于集群的计算环境的MapReduce实现。第四部分描述我们认为在MapReduce编程模型中一些实用的技巧。第五部分对于各种不同的任务,测量我们MapReduce实现的性能。第六部分揭示了在Google内部如何使用MapReduce作为基础重写我们的索引系统产品,包括其它一些使用MapReduce的经验。第七部分讨论相关的和未来的工作。

    编程模型

    MapReduce编程模型的原理是:利用一个输入key/value pair集合来产生一个输出的key/value pair集合。MapReduce库的用户用两个函数表达这个计算:Map和Reduce。

    用户自定义的Map函数接受一个输入的key/value pair值,然后产生一个中间key/value pair值的集合。MapReduce库把所有具有相同中间key值I的中间value值集合在一起后传递给reduce函数。

    用户自定义的Reduce函数接受一个中间key的值I和相关的一个value值的集合。Reduce函数合并这些value值,形成一个较小的value值的集合。一般的,每次Reduce函数调用只产生0或1个输出value值。通常我们通过一个迭代器把中间value值提供给Reduce函数,这样我们就可以处理无法全部放入内存中的大量的value值的集合。

    例子

    例如,计算一个大的文档集合中每个单词出现的次数,下面是伪代码段:

    map(String key, String value):
        // key: document name
        // value: document contents
        for each word w in value:
            EmitIntermediate(w, “1″);
    reduce(String key, Iterator values):
        // key: a word
        // values: a list of counts
        int result = 0;
        for each v in values:
            result += ParseInt(v);
        Emit(AsString(result));
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 8
    • 9
    • 10
    • 11
    • 12

    Map函数输出文档中的每个词、以及这个词的出现次数(在这个简单的例子里就是1)。Reduce函数把Map函数产生的每一个特定的词的计数累加起来。

    另外,用户编写代码,使用输入和输出文件的名字、可选的调节参数来完成一个符合MapReduce模型规范的对象,然后调用MapReduce函数,并把这个规范对象传递给它。用户的代码和MapReduce库链接在一起(用C++实现)。附录A包含了这个实例的全部程序代码。

    类型

    尽管在前面例子的伪代码中使用了以字符串表示的输入输出值,但是在概念上,用户定义的Map和Reduce函数都有相关联的类型:

    map(k1,v1) ->list(k2,v2)
    reduce(k2,list(v2)) ->list(v2)
    • 1
    • 2
    • 1
    • 2

    比如,输入的key和value值与输出的key和value值在类型上推导的域不同。此外,中间key和value值与输出key和value值在类型上推导的域相同。 
    (alex注:原文中这个domain的含义不是很清楚,我参考Hadoop、KFS等实现,map和reduce都使用了泛型,因此,我把domain翻译成类型推导的域)。 
    我们的C++中使用字符串类型作为用户自定义函数的输入输出,用户在自己的代码中对字符串进行适当的类型转换。

    更多的例子

    这里还有一些有趣的简单例子,可以很容易的使用MapReduce模型来表示:

    • 分布式的Grep:Map函数输出匹配某个模式的一行,Reduce函数是一个恒等函数,即把中间数据复制到输出。
    • 计算URL访问频率:Map函数处理日志中web页面请求的记录,然后输出(URL,1)。Reduce函数把相同URL的value值都累加起来,产生(URL,记录总数)结果。
    • 倒转网络链接图:Map函数在源页面(source)中搜索所有的链接目标(target)并输出为(target,source)。Reduce函数把给定链接目标(target)的链接组合成一个列表,输出(target,list(source))。
    • 每个主机的检索词向量:检索词向量用一个(词,频率)列表来概述出现在文档或文档集中的最重要的一些词。Map函数为每一个输入文档输出(主机名,检索词向量),其中主机名来自文档的URL。Reduce函数接收给定主机的所有文档的检索词向量,并把这些检索词向量加在一起,丢弃掉低频的检索词,输出一个最终的(主机名,检索词向量)。
    • 倒排索引:Map函数分析每个文档输出一个(词,文档号)的列表,Reduce函数的输入是一个给定词的所有(词,文档号),排序所有的文档号,输出(词,list(文档号))。所有的输出集合形成一个简单的倒排索引,它以一种简单的算法跟踪词在文档中的位置。
    • 分布式排序:Map函数从每个记录提取key,输出(key,record)。Reduce函数不改变任何的值。这个运算依赖分区机制(在4.1描述)和排序属性(在4.2描述)。

    实现

    MapReduce模型可以有多种不同的实现方式。如何正确选择取决于具体的环境。例如,一种实现方式适用于小型的共享内存方式的机器,另外一种实现方式则适用于大型NUMA架构的多处理器的主机,而有的实现方式更适合大型的网络连接集群。 
    本章节描述一个适用于Google内部广泛使用的运算环境的实现:用以太网交换机连接、由普通PC机组成的大型集群。在我们的环境里包括:

    • x86架构、运行Linux操作系统、双处理器、2-4GB内存的机器。
    • 普通的网络硬件设备,每个机器的带宽为百兆或者千兆,但是远小于网络的平均带宽的一半。 (alex注:这里需要网络专家解释一下了)
    • 集群中包含成百上千的机器,因此,机器故障是常态。
    • 存储为廉价的内置IDE硬盘。一个内部分布式文件系统用来管理存储在这些磁盘上的数据。文件系统通过数据复制来在不可靠的硬件上保证数据的可靠性和有效性。
    • 用户提交工作(job)给调度系统。每个工作(job)都包含一系列的任务(task),调度系统将这些任务调度到集群中多台可用的机器上。

    执行概括

    通过将Map调用的输入数据自动分割为M个数据片段的集合,Map调用被分布到多台机器上执行。输入的数据片段能够在不同的机器上并行处理。使用分区函数将Map调用产生的中间key值分成R个不同分区(例如,hash(key) mod R),Reduce调用也被分布到多台机器上执行。分区数量(R)和分区函数由用户来指定。 
    这里写图片描述 
    图1展示了我们的MapReduce实现中操作的全部流程。当用户调用MapReduce函数时,将发生下面的一系列动作(下面的序号和图1中的序号一一对应):

    • 用户程序首先调用的MapReduce库将输入文件分成M个数据片度,每个数据片段的大小一般从 16MB到64MB(可以通过可选的参数来控制每个数据片段的大小)。然后用户程序在机群中创建大量的程序副本。 (alex:copies of the program还真难翻译)
    • 这些程序副本中的有一个特殊的程序–master。副本中其它的程序都是worker程序,由master分配任务。有M个Map任务和R个Reduce任务将被分配,master将一个Map任务或Reduce任务分配给一个空闲的worker。
    • 被分配了map任务的worker程序读取相关的输入数据片段,从输入的数据片段中解析出key/value pair,然后把key/value pair传递给用户自定义的Map函数,由Map函数生成并输出的中间key/value pair,并缓存在内存中。
    • 缓存中的key/value pair通过分区函数分成R个区域,之后周期性的写入到本地磁盘上。缓存的key/value pair在本地磁盘上的存储位置将被回传给master,由master负责把这些存储位置再传送给Reduce worker。
    • 当Reduce worker程序接收到master程序发来的数据存储位置信息后,使用RPC从Map worker所在主机的磁盘上读取这些缓存数据。当Reduce worker读取了所有的中间数据后,通过对key进行排序后使得具有相同key值的数据聚合在一起。由于许多不同的key值会映射到相同的Reduce任务上,因此必须进行排序。如果中间数据太大无法在内存中完成排序,那么就要在外部进行排序。
    • Reduce worker程序遍历排序后的中间数据,对于每一个唯一的中间key值,Reduce worker程序将这个key值和它相关的中间value值的集合传递给用户自定义的Reduce函数。Reduce函数的输出被追加到所属分区的输出文件。
    • 当所有的Map和Reduce任务都完成之后,master唤醒用户程序。在这个时候,在用户程序里的对MapReduce调用才返回。

    在成功完成任务之后,MapReduce的输出存放在R个输出文件中(对应每个Reduce任务产生一个输出文件,文件名由用户指定)。一般情况下,用户不需要将这R个输出文件合并成一个文件–他们经常把这些文件作为另外一个MapReduce的输入,或者在另外一个可以处理多个分割文件的分布式应用中使用。

    Master数据结构

    Master持有一些数据结构,它存储每一个Map和Reduce任务的状态(空闲、工作中或完成),以及Worker机器(非空闲任务的机器)的标识。

    Master就像一个数据管道,中间文件存储区域的位置信息通过这个管道从Map传递到Reduce。因此,对于每个已经完成的Map任务,master存储了Map任务产生的R个中间文件存储区域的大小和位置。当Map任务完成时,Master接收到位置和大小的更新信息,这些信息被逐步递增的推送给那些正在工作的Reduce任务。

    容错

    因为MapReduce库的设计初衷是使用由成百上千的机器组成的集群来处理超大规模的数据,所以,这个库必须要能很好的处理机器故障。

    worker故障

    master周期性的ping每个worker。如果在一个约定的时间范围内没有收到worker返回的信息,master将把这个worker标记为失效。所有由这个失效的worker完成的Map任务被重设为初始的空闲状态,之后这些任务就可以被安排给其他的worker。同样的,worker失效时正在运行的Map或Reduce任务也将被重新置为空闲状态,等待重新调度。

    当worker故障时,由于已经完成的Map任务的输出存储在这台机器上,Map任务的输出已不可访问了,因此必须重新执行。而已经完成的Reduce任务的输出存储在全局文件系统上,因此不需要再次执行。

    当一个Map任务首先被worker A执行,之后由于worker A失效了又被调度到worker B执行,这个“重新执行”的动作会被通知给所有执行Reduce任务的worker。任何还没有从worker A读取数据的Reduce任务将从worker B读取数据。

    MapReduce可以处理大规模worker失效的情况。比如,在一个MapReduce操作执行期间,在正在运行的集群上进行网络维护引起80台机器在几分钟内不可访问了,MapReduce master只需要简单的再次执行那些不可访问的worker完成的工作,之后继续执行未完成的任务,直到最终完成这个MapReduce操作。

    master失败

    一个简单的解决办法是让master周期性的将上面描述的数据结构(alex注:指3.2节)的写入磁盘,即检查点(checkpoint)。如果这个master任务失效了,可以从最后一个检查点(checkpoint)开始启动另一个master进程。然而,由于只有一个master进程,master失效后再恢复是比较麻烦的,因此我们现在的实现是如果master失效,就中止MapReduce运算。客户可以检查到这个状态,并且可以根据需要重新执行MapReduce操作。

    在失效方面的处理机制

    (alex注:原文为”semantics in the presence of failures”) 
    当用户提供的Map和Reduce操作是输入确定性函数(即相同的输入产生相同的输出)时,我们的分布式实现在任何情况下的输出都和所有程序没有出现任何错误、顺序的执行产生的输出是一样的。

    我们依赖对Map和Reduce任务的输出是原子提交的来完成这个特性。每个工作中的任务把它的输出写到私有的临时文件中。每个Reduce任务生成一个这样的文件,而每个Map任务则生成R个这样的文件(一个Reduce任务对应一个文件)。当一个Map任务完成的时,worker发送一个包含R个临时文件名的完成消息给master。如果master从一个已经完成的Map任务再次接收到到一个完成消息,master将忽略这个消息;否则,master将这R个文件的名字记录在数据结构里。

    当Reduce任务完成时,Reduce worker进程以原子的方式把临时文件重命名为最终的输出文件。如果同一个Reduce任务在多台机器上执行,针对同一个最终的输出文件将有多个重命名操作执行。我们依赖底层文件系统提供的重命名操作的原子性来保证最终的文件系统状态仅仅包含一个Reduce任务产生的数据。

    使用MapReduce模型的程序员可以很容易的理解他们程序的行为,因为我们绝大多数的Map和Reduce操作是确定性的,而且存在这样的一个事实:我们的失效处理机制等价于一个顺序的执行的操作。当Map或/和Reduce操作是不确定性的时候,我们提供虽然较弱但是依然合理的处理机制。当使用非确定操作的时候,一个Reduce任务R1的输出等价于一个非确定性程序顺序执行产生时的输出。但是,另一个Reduce任务R2的输出也许符合一个不同的非确定顺序程序执行产生的R2的输出。

    考虑Map任务M和Reduce任务R1、R2的情况。我们设定e(Ri)是Ri已经提交的执行过程(有且仅有一个这样的执行过程)。当e(R1)读取了由M一次执行产生的输出,而e(R2)读取了由M的另一次执行产生的输出,导致了较弱的失效处理。

    存储位置

    在我们的计算运行环境中,网络带宽是一个相当匮乏的资源。我们通过尽量把输入数据(由GFS管理)存储在集群中机器的本地磁盘上来节省网络带宽。GFS把每个文件按64MB一个Block分隔,每个Block保存在多台机器上,环境中就存放了多份拷贝(一般是3个拷贝)。MapReduce的master在调度Map任务时会考虑输入文件的位置信息,尽量将一个Map任务调度在包含相关输入数据拷贝的机器上执行;如果上述努力失败了,master将尝试在保存有输入数据拷贝的机器附近的机器上执行Map任务(例如,分配到一个和包含输入数据的机器在一个switch里的worker机器上执行)。当在一个足够大的cluster集群上运行大型MapReduce操作的时候,大部分的输入数据都能从本地机器读取,因此消耗非常少的网络带宽。

    任务粒度

    如前所述,我们把Map拆分成了M个片段、把Reduce拆分成R个片段执行。理想情况下,M和R应当比集群中worker的机器数量要多得多。在每台worker机器都执行大量的不同任务能够提高集群的动态的负载均衡能力,并且能够加快故障恢复的速度:失效机器上执行的大量Map任务都可以分布到所有其他的worker机器上去执行。

    但是实际上,在我们的具体实现中对M和R的取值都有一定的客观限制,因为master必须执行O(M+R)次调度,并且在内存中保存O(M*R)个状态(对影响内存使用的因素还是比较小的:O(M*R)块状态,大概每对Map任务/Reduce任务1个字节就可以了)。

    更进一步,R值通常是由用户指定的,因为每个Reduce任务最终都会生成一个独立的输出文件。实际使用时我们也倾向于选择合适的M值,以使得每一个独立任务都是处理大约16M到64M的输入数据(这样,上面描写的输入数据本地存储优化策略才最有效),另外,我们把R值设置为我们想使用的worker机器数量的小的倍数。我们通常会用这样的比例来执行MapReduce:M=200000,R=5000,使用2000台worker机器。

    备用任务

    影响一个MapReduce的总执行时间最通常的因素是“落伍者”:在运算过程中,如果有一台机器花了很长的时间才完成最后几个Map或Reduce任务,导致MapReduce操作总的执行时间超过预期。出现“落伍者”的原因非常多。比如:如果一个机器的硬盘出了问题,在读取的时候要经常的进行读取纠错操作,导致读取数据的速度从30M/s降低到1M/s。如果cluster的调度系统在这台机器上又调度了其他的任务,由于CPU、内存、本地硬盘和网络带宽等竞争因素的存在,导致执行MapReduce代码的执行效率更加缓慢。我们最近遇到的一个问题是由于机器的初始化代码有bug,导致关闭了的处理器的缓存:在这些机器上执行任务的性能和正常情况相差上百倍。

    我们有一个通用的机制来减少“落伍者”出现的情况。当一个MapReduce操作接近完成的时候,master调度备用(backup)任务进程来执行剩下的、处于处理中状态(in-progress)的任务。无论是最初的执行进程、还是备用(backup)任务进程完成了任务,我们都把这个任务标记成为已经完成。我们调优了这个机制,通常只会占用比正常操作多几个百分点的计算资源。我们发现采用这样的机制对于减少超大MapReduce操作的总处理时间效果显著。例如,在5.3节描述的排序任务,在关闭掉备用任务的情况下要多花44%的时间完成排序任务。

    技巧

    虽然简单的Map和Reduce函数提供的基本功能已经能够满足大部分的计算需要,我们还是发掘出了一些有价值的扩展功能。本节将描述这些扩展功能。

    分区函数

    MapReduce的使用者通常会指定Reduce任务和Reduce任务输出文件的数量(R)。我们在中间key上使用分区函数来对数据进行分区,之后再输入到后续任务执行进程。一个缺省的分区函数是使用hash方法(比如,hash(key) mod R)进行分区。hash方法能产生非常平衡的分区。然而,有的时候,其它的一些分区函数对key值进行的分区将非常有用。比如,输出的key值是URLs,我们希望每个主机的所有条目保持在同一个输出文件中。为了支持类似的情况,MapReduce库的用户需要提供专门的分区函数。例如,使用“hash(Hostname(urlkey)) mod R”作为分区函数就可以把所有来自同一个主机的URLs保存在同一个输出文件中。

    顺序保证

    我们确保在给定的分区中,中间key/value pair数据的处理顺序是按照key值增量顺序处理的。这样的顺序保证对每个分成生成一个有序的输出文件,这对于需要对输出文件按key值随机存取的应用非常有意义,对在排序输出的数据集也很有帮助。

    Combiner函数

    在某些情况下,Map函数产生的中间key值的重复数据会占很大的比重,并且,用户自定义的Reduce函数满足结合律和交换律。在2.1节的词数统计程序是个很好的例子。由于词频率倾向于一个zipf分布(齐夫分布),每个Map任务将产生成千上万个这样的记录the,1.所有的这些记录将通过网络被发送到一个单独的Reduce任务,然后由这个Reduce任务把所有这些记录累加起来产生一个数字。我们允许用户指定一个可选的combiner函数,combiner函数首先在本地将这些记录进行一次合并,然后将合并的结果再通过网络发送出去。

    Combiner函数在每台执行Map任务的机器上都会被执行一次。一般情况下,Combiner和Reduce函数是一样的。Combiner函数和Reduce函数之间唯一的区别是MapReduce库怎样控制函数的输出。Reduce函数的输出被保存在最终的输出文件里,而Combiner函数的输出被写到中间文件里,然后被发送给Reduce任务。

    部分的合并中间结果可以显著的提高一些MapReduce操作的速度。附录A包含一个使用combiner函数的例子。

    输入和输出的类型

    MapReduce库支持几种不同的格式的输入数据。比如,文本模式的输入数据的每一行被视为是一个key/value pair。key是文件的偏移量,value是那一行的内容。另外一种常见的格式是以key进行排序来存储的key/value pair的序列。每种输入类型的实现都必须能够把输入数据分割成数据片段,该数据片段能够由单独的Map任务来进行后续处理(例如,文本模式的范围分割必须确保仅仅在每行的边界进行范围分割)。虽然大多数MapReduce的使用者仅仅使用很少的预定义输入类型就满足要求了,但是使用者依然可以通过提供一个简单的Reader接口实现就能够支持一个新的输入类型。

    Reader并非一定要从文件中读取数据,比如,我们可以很容易的实现一个从数据库里读记录的Reader,或者从内存中的数据结构读取数据的Reader。 
    类似的,我们提供了一些预定义的输出数据的类型,通过这些预定义类型能够产生不同格式的数据。用户采用类似添加新的输入数据类型的方式增加新的输出类型。

    副作用

    在某些情况下,MapReduce的使用者发现,如果在Map和/或Reduce操作过程中增加辅助的输出文件会比较省事。我们依靠程序writer把这种“副作用”变成原子的和幂等的(alex注:幂等的指一个总是产生相同结果的数学运算)。通常应用程序首先把输出结果写到一个临时文件中,在输出全部数据之后,在使用系统级的原子操作rename重新命名这个临时文件。

    如果一个任务产生了多个输出文件,我们没有提供类似两阶段提交的原子操作支持这种情况。因此,对于会产生多个输出文件、并且对于跨文件有一致性要求的任务,都必须是确定性的任务。但是在实际应用过程中,这个限制还没有给我们带来过麻烦。

    跳过损坏的记录

    有时候,用户程序中的bug导致Map或者Reduce函数在处理某些记录的时候crash掉,MapReduce操作无法顺利完成。惯常的做法是修复bug后再次执行MapReduce操作,但是,有时候找出这些bug并修复它们不是一件容易的事情;这些bug也许是在第三方库里边,而我们手头没有这些库的源代码。而且在很多时候,忽略一些有问题的记录也是可以接受的,比如在一个巨大的数据集上进行统计分析的时候。我们提供了一种执行模式,在这种模式下,为了保证保证整个处理能继续进行,MapReduce会检测哪些记录导致确定性的crash,并且跳过这些记录不处理。

    每个worker进程都设置了信号处理函数捕获内存段异常(segmentation violation)和总线错误(bus error)。在执行Map或者Reduce操作之前,MapReduce库通过全局变量保存记录序号。如果用户程序触发了一个系统信号,消息处理函数将用“最后一口气”通过UDP包向master发送处理的最后一条记录的序号。当master看到在处理某条特定记录不止失败一次时,master就标志着条记录需要被跳过,并且在下次重新执行相关的Map或者Reduce任务的时候跳过这条记录。

    本地执行

    调试Map和Reduce函数的bug是非常困难的,因为实际执行操作时不但是分布在系统中执行的,而且通常是在好几千台计算机上执行,具体的执行位置是由master进行动态调度的,这又大大增加了调试的难度。为了简化调试、profile和小规模测试,我们开发了一套MapReduce库的本地实现版本,通过使用本地版本的MapReduce库,MapReduce操作在本地计算机上顺序的执行。用户可以控制MapReduce操作的执行,可以把操作限制到特定的Map任务上。用户通过设定特别的标志来在本地执行他们的程序,之后就可以很容易的使用本地调试和测试工具(比如gdb)。

    状态信息

    master使用嵌入式的HTTP服务器(如Jetty)显示一组状态信息页面,用户可以监控各种执行状态。状态信息页面显示了包括计算执行的进度,比如已经完成了多少任务、有多少任务正在处理、输入的字节数、中间数据的字节数、输出的字节数、处理百分比等等。页面还包含了指向每个任务的stderr和stdout文件的链接。用户根据这些数据预测计算需要执行大约多长时间、是否需要增加额外的计算资源。这些页面也可以用来分析什么时候计算执行的比预期的要慢。

    另外,处于最顶层的状态页面显示了哪些worker失效了,以及他们失效的时候正在运行的Map和Reduce任务。这些信息对于调试用户代码中的bug很有帮助。

    计数器

    MapReduce库使用计数器统计不同事件发生次数。比如,用户可能想统计已经处理了多少个单词、已经索引的多少篇German文档等等。

    为了使用这个特性,用户在程序中创建一个命名的计数器对象,在Map和Reduce函数中相应的增加计数器的值。例如:

    Counter* uppercase;
    uppercase = GetCounter(“uppercase”);
    map(String name, String contents):
     for each word w in contents:
      if (IsCapitalized(w)):
       uppercase->Increment();
      EmitIntermediate(w, “1″);
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7
    • 1
    • 2
    • 3
    • 4
    • 5
    • 6
    • 7

    这些计数器的值周期性的从各个单独的worker机器上传递给master(附加在ping的应答包中传递)。master把执行成功的Map和Reduce任务的计数器值进行累计,当MapReduce操作完成之后,返回给用户代码。

    计数器当前的值也会显示在master的状态页面上,这样用户就可以看到当前计算的进度。当累加计数器的值的时候,master要检查重复运行的Map或者Reduce任务,避免重复累加(之前提到的备用任务和失效后重新执行任务这两种情况会导致相同的任务被多次执行)。

    有些计数器的值是由MapReduce库自动维持的,比如已经处理的输入的key/value pair的数量、输出的key/value pair的数量等等。

    计数器机制对于MapReduce操作的完整性检查非常有用。比如,在某些MapReduce操作中,用户需要确保输出的key value pair精确的等于输入的key value pair,或者处理的German文档数量在处理的整个文档数量中属于合理范围。

    性能

    本节我们用在一个大型集群上运行的两个计算来衡量MapReduce的性能。一个计算在大约1TB的数据中进行特定的模式匹配,另一个计算对大约1TB的数据进行排序。

    这两个程序在大量的使用MapReduce的实际应用中是非常典型的 — 一类是对数据格式进行转换,从一种表现形式转换为另外一种表现形式;另一类是从海量数据中抽取少部分的用户感兴趣的数据。

    集群配置

    所有这些程序都运行在一个大约由1800台机器构成的集群上。每台机器配置2个2G主频、支持超线程的Intel Xeon处理器,4GB的物理内存,两个160GB的IDE硬盘和一个千兆以太网卡。这些机器部署在一个两层的树形交换网络中,在root节点大概有100-200GBPS的传输带宽。所有这些机器都采用相同的部署(对等部署),因此任意两点之间的网络来回时间小于1毫秒。

    在4GB内存里,大概有1-1.5G用于运行在集群上的其他任务。测试程序在周末下午开始执行,这时主机的CPU、磁盘和网络基本上处于空闲状态。

    GREP

    这个分布式的grep程序需要扫描大概10的10次方个由100个字节组成的记录,查找出现概率较小的3个字符的模式(这个模式在92337个记录中出现)。输入数据被拆分成大约64M的Block(M=15000),整个输出数据存放在一个文件中(R=1)。 
    这里写图片描述 
    图2显示了这个运算随时间的处理过程。其中Y轴表示输入数据的处理速度。处理速度随着参与MapReduce计算的机器数量的增加而增加,当1764台worker参与计算的时,处理速度达到了30GB/s。当Map任务结束的时候,即在计算开始后80秒,输入的处理速度降到0。整个计算过程从开始到结束一共花了大概150秒。这包括了大约一分钟的初始启动阶段。初始启动阶段消耗的时间包括了是把这个程序传送到各个worker机器上的时间、等待GFS文件系统打开1000个输入文件集合的时间、获取相关的文件本地位置优化信息的时间。

    排序

    排序程序处理10的10次方个100个字节组成的记录(大概1TB的数据)。这个程序模仿TeraSort benchmark[10]。

    排序程序由不到50行代码组成。只有三行的Map函数从文本行中解析出10个字节的key值作为排序的key,并且把这个key和原始文本行作为中间的key/value pair值输出。我们使用了一个内置的恒等函数作为Reduce操作函数。这个函数把中间的key/value pair值不作任何改变输出。最终排序结果输出到两路复制的GFS文件系统(也就是说,程序输出2TB的数据)。

    如前所述,输入数据被分成64MB的Block(M=15000)。我们把排序后的输出结果分区后存储到4000个文件(R=4000)。分区函数使用key的原始字节来把数据分区到R个片段中。

    在这个benchmark测试中,我们使用的分区函数知道key的分区情况。通常对于排序程序来说,我们会增加一个预处理的MapReduce操作用于采样key值的分布情况,通过采样的数据来计算对最终排序处理的分区点。 
    这里写图片描述 
    图三(a)显示了这个排序程序的正常执行过程。左上的图显示了输入数据读取的速度。数据读取速度峰值会达到13GB/s,并且所有Map任务完成之后,即大约200秒之后迅速滑落到0。值得注意的是,排序程序输入数据读取速度小于分布式grep程序。这是因为排序程序的Map任务花了大约一半的处理时间和I/O带宽把中间输出结果写到本地硬盘。相应的分布式grep程序的中间结果输出几乎可以忽略不计。

    左边中间的图显示了中间数据从Map任务发送到Reduce任务的网络速度。这个过程从第一个Map任务完成之后就开始缓慢启动了。图示的第一个高峰是启动了第一批大概1700个Reduce任务(整个MapReduce分布到大概1700台机器上,每台机器1次最多执行1个Reduce任务)。排序程序运行大约300秒后,第一批启动的Reduce任务有些完成了,我们开始执行剩下的Reduce任务。所有的处理在大约600秒后结束。

    左下图表示Reduce任务把排序后的数据写到最终的输出文件的速度。在第一个排序阶段结束和数据开始写入磁盘之间有一个小的延时,这是因为worker机器正在忙于排序中间数据。磁盘写入速度在2-4GB/s持续一段时间。输出数据写入磁盘大约持续850秒。计入初始启动部分的时间,整个运算消耗了891秒。这个速度和TeraSort benchmark[18]的最高纪录1057秒相差不多。

    还有一些值得注意的现象:输入数据的读取速度比排序速度和输出数据写入磁盘速度要高不少,这是因为我们的输入数据本地化优化策略起了作用 — 绝大部分数据都是从本地硬盘读取的,从而节省了网络带宽。排序速度比输出数据写入到磁盘的速度快,这是因为输出数据写了两份(我们使用了2路的GFS文件系统,写入复制节点的原因是为了保证数据可靠性和可用性)。我们把输出数据写入到两个复制节点的原因是因为这是底层文件系统的保证数据可靠性和可用性的实现机制。如果底层文件系统使用类似容错编码[14](erasure coding)的方式而不是复制的方式保证数据的可靠性和可用性,那么在输出数据写入磁盘的时候,就可以降低网络带宽的使用。

    高效的backup任务

    图三(b)显示了关闭了备用任务后排序程序执行情况。执行的过程和图3(a)很相似,除了输出数据写磁盘的动作在时间上拖了一个很长的尾巴,而且在这段时间里,几乎没有什么写入动作。在960秒后,只有5个Reduce任务没有完成。这些拖后腿的任务又执行了300秒才完成。整个计算消耗了1283秒,多了44%的执行时间。

    失效的机器

    在图三(c)中演示的排序程序执行的过程中,我们在程序开始后几分钟有意的kill了1746个worker中的200个。集群底层的调度立刻在这些机器上重新开始新的worker处理进程(因为只是worker机器上的处理进程被kill了,机器本身还在工作)。

    图三(c)显示出了一个“负”的输入数据读取速度,这是因为一些已经完成的Map任务丢失了(由于相应的执行Map任务的worker进程被kill了),需要重新执行这些任务。相关Map任务很快就被重新执行了。整个运算在933秒内完成,包括了初始启动时间(只比正常执行多消耗了5%的时间)。

    经验

    我们在2003年1月完成了第一个版本的MapReduce库,在2003年8月的版本有了显著的增强,这包括了输入数据本地优化、worker机器之间的动态负载均衡等等。从那以后,我们惊喜的发现,MapReduce库能广泛应用于我们日常工作中遇到的各类问题。它现在在Google内部各个领域得到广泛应用,包括:

    • 大规模机器学习问题
    • Google News和Froogle产品的集群问题
    • 从公众查询产品(比如Google的Zeitgeist)的报告中抽取数据。
    • 从大量的新应用和新产品的网页中提取有用信息(比如,从大量的位置搜索网页中抽取地理位置信息)。
    • 大规模的图形计算。 
      这里写图片描述 
      图四显示了在我们的源代码管理系统中,随着时间推移,独立的MapReduce程序数量的显著增加。从2003年早些时候的0个增长到2004年9月份的差不多900个不同的程序。MapReduce的成功取决于采用MapReduce库能够在不到半个小时时间内写出一个简单的程序,这个简单的程序能够在上千台机器的组成的集群上做大规模并发处理,这极大的加快了开发和原形设计的周期。另外,采用MapReduce库,可以让完全没有分布式和/或并行系统开发经验的程序员很容易的利用大量的资源,开发出分布式和/或并行处理的应用。

    在每个任务结束的时候,MapReduce库统计计算资源的使用状况。在表1,我们列出了2004年8月份MapReduce运行的任务所占用的相关资源。

    大规模索引

    到目前为止,MapReduce最成功的应用就是重写了Google网络搜索服务所使用到的index系统。索引系统的输入数据是网络爬虫抓取回来的海量的文档,这些文档数据都保存在GFS文件系统里。这些文档原始内容(alex注:raw contents,我认为就是网页中的剔除html标记后的内容、pdf和word等有格式文档中提取的文本内容等)的大小超过了20TB。索引程序是通过一系列的MapReduce操作(大约5到10次)来建立索引。使用MapReduce(替换上一个特别设计的、分布式处理的索引程序)带来这些好处:

    • 实现索引部分的代码简单、小巧、容易理解,因为对于容错、分布式以及并行计算的处理都是MapReduce库提供的。比如,使用MapReduce库,计算的代码行数从原来的3800行C++代码减少到大概700行代码。
    • MapReduce库的性能已经足够好了,因此我们可以把在概念上不相关的计算步骤分开处理,而不是混在一起以期减少数据传递的额外消耗。概念上不相关的计算步骤的隔离也使得我们可以很容易改变索引处理方式。比如,对之前的索引系统的一个小更改可能要耗费好几个月的时间,但是在使用MapReduce的新系统上,这样的更改只需要花几天时间就可以了。
    • 索引系统的操作管理更容易了。因为由机器失效、机器处理速度缓慢、以及网络的瞬间阻塞等引起的绝大部分问题都已经由MapReduce库解决了,不再需要操作人员的介入了。另外,我们可以通过在索引系统集群中增加机器的简单方法提高整体处理性能。

    相关工作

    很多系统都提供了严格的编程模式,并且通过对编程的严格限制来实现并行计算。例如,一个结合函数可以通过把N个元素的数组的前缀在N个处理器上使用并行前缀算法,在log N的时间内计算完[6,9,13](alex注:完全没有明白作者在说啥,具体参考相关6、9、13文档)。MapReduce可以看作是我们结合在真实环境下处理海量数据的经验,对这些经典模型进行简化和萃取的成果。更加值得骄傲的是,我们还实现了基于上千台处理器的集群的容错处理。相比而言,大部分并发处理系统都只在小规模的集群上实现,并且把容错处理交给了程序员。

    Bulk Synchronous Programming[17]和一些MPI原语[11]提供了更高级别的并行处理抽象,可以更容易写出并行处理的程序。MapReduce和这些系统的关键不同之处在于,MapReduce利用限制性编程模式实现了用户程序的自动并发处理,并且提供了透明的容错处理。

    我们数据本地优化策略的灵感来源于active disks[12,15]等技术,在active disks中,计算任务是尽量推送到数据存储的节点处理(alex注:即靠近数据源处理),这样就减少了网络和IO子系统的吞吐量。我们在挂载几个硬盘的普通机器上执行我们的运算,而不是在磁盘处理器上执行我们的工作,但是达到的目的一样的。

    我们的备用任务机制和Charlotte System[3]提出的eager调度机制比较类似。Eager调度机制的一个缺点是如果一个任务反复失效,那么整个计算就不能完成。我们通过忽略引起故障的记录的方式在某种程度上解决了这个问题。

    MapReduce的实现依赖于一个内部的集群管理系统,这个集群管理系统负责在一个超大的、共享机器的集群上分布和运行用户任务。虽然这个不是本论文的重点,但是有必要提一下,这个集群管理系统在理念上和其它系统,如Condor[16]是一样。

    MapReduce库的排序机制和NOW-Sort[1]的操作上很类似。读取输入源的机器(map workers)把待排序的数据进行分区后,发送到R个Reduce worker中的一个进行处理。每个Reduce worker在本地对数据进行排序(尽可能在内存中排序)。当然,NOW-Sort没有给用户自定义的Map和Reduce函数的机会,因此不具备MapReduce库广泛的实用性。

    River[2]提供了一个编程模型:处理进程通过分布式队列传送数据的方式进行互相通讯。和MapReduce类似,River系统尝试在不对等的硬件环境下,或者在系统颠簸的情况下也能提供近似平均的性能。River是通过精心调度硬盘和网络的通讯来平衡任务的完成时间。MapReduce库采用了其它的方法。通过对编程模型进行限制,MapReduce框架把问题分解成为大量的“小”任务。这些任务在可用的worker集群上动态的调度,这样快速的worker就可以执行更多的任务。通过对编程模型进行限制,我们可用在工作接近完成的时候调度备用任务,缩短在硬件配置不均衡的情况下缩小整个操作完成的时间(比如有的机器性能差、或者机器被某些操作阻塞了)。

    BAD-FS[5]采用了和MapReduce完全不同的编程模式,它是面向广域网(alex注:wide-area network)的。不过,这两个系统有两个基础功能很类似。(1)两个系统采用重新执行的方式来防止由于失效导致的数据丢失。(2)两个都使用数据本地化调度策略,减少网络通讯的数据量。

    TACC[7]是一个用于简化构造高可用性网络服务的系统。和MapReduce一样,它也依靠重新执行机制来实现的容错处理。

    结束语

    MapReduce编程模型在Google内部成功应用于多个领域。我们把这种成功归结为几个方面:首先,由于MapReduce封装了并行处理、容错处理、数据本地化优化、负载均衡等等技术难点的细节,这使得MapReduce库易于使用。即便对于完全没有并行或者分布式系统开发经验的程序员而言;其次,大量不同类型的问题都可以通过MapReduce简单的解决。比如,MapReduce用于生成Google的网络搜索服务所需要的数据、用来排序、用来数据挖掘、用于机器学习,以及很多其它的系统;第三,我们实现了一个在数千台计算机组成的大型集群上灵活部署运行的MapReduce。这个实现使得有效利用这些丰富的计算资源变得非常简单,因此也适合用来解决Google遇到的其他很多需要大量计算的问题。

    我们也从MapReduce开发过程中学到了不少东西。首先,约束编程模式使得并行和分布式计算非常容易,也易于构造容错的计算环境;其次,网络带宽是稀有资源。大量的系统优化是针对减少网络传输量为目的的:本地优化策略使大量的数据从本地磁盘读取,中间文件写入本地磁盘、并且只写一份中间文件也节约了网络带宽;第三,多次执行相同的任务可以减少性能缓慢的机器带来的负面影响(alex注:即硬件配置的不平衡),同时解决了由于机器失效导致的数据丢失问题。

    感谢

    (alex注:还是原汁原味的感谢词比较好,这个就不翻译了)Josh Levenberg has been instrumental in revising and extending the user-level MapReduce API with a number of new features based on his experience with using MapReduce and other people’s suggestions for enhancements. MapReduce reads its input from and writes its output to the Google File System [8]. We would like to thank Mohit Aron, Howard Gobioff, Markus Gutschke, David Kramer, Shun-Tak Leung, and Josh Redstone for their work in developing GFS. We would also like to thank Percy Liang and Olcan Sercinoglu for their work in developing the cluster management system used by MapReduce. Mike Burrows, Wilson Hsieh, Josh Levenberg, Sharon Perl, Rob Pike, and Debby Wallach provided helpful comments on earlier drafts of this paper.The anonymous OSDI reviewers, and our shepherd, Eric Brewer, provided many useful suggestions of areas where the paper could be improved. Finally, we thank all the users of MapReduce within Google’s engineering organization for providing helpful feedback, suggestions, and bug reports. 
    10、参考资料 
    [1] Andrea C. Arpaci-Dusseau, Remzi H. Arpaci-Dusseau,David E. Culler, Joseph M. Hellerstein, and David A. Patterson.High-performance sorting on networks of workstations.In Proceedings of the 1997 ACM SIGMOD InternationalConference on Management of Data, Tucson,Arizona, May 1997. 
    [2] Remzi H. Arpaci-Dusseau, Eric Anderson, NoahTreuhaft, David E. Culler, Joseph M. Hellerstein, David Patterson, and Kathy Yelick. Cluster I/O with River:Making the fast case common. In Proceedings of the Sixth Workshop on Input/Output in Parallel and Distributed Systems (IOPADS ’99), pages 10.22, Atlanta, Georgia, May 1999. 
    [3] Arash Baratloo, Mehmet Karaul, Zvi Kedem, and Peter Wyckoff. Charlotte: Metacomputing on the web. In Proceedings of the 9th International Conference on Parallel and Distributed Computing Systems, 1996. [4] Luiz A. Barroso, Jeffrey Dean, and Urs H¨olzle. Web search for a planet: The Google cluster architecture. IEEE Micro, 23(2):22.28, April 2003. 
    [5] John Bent, Douglas Thain, Andrea C.Arpaci-Dusseau, Remzi H. Arpaci-Dusseau, and Miron Livny. Explicit control in a batch-aware distributed file system. In Proceedings of the 1st USENIX Symposium on Networked Systems Design and Implementation NSDI, March 2004. 
    [6] Guy E. Blelloch. Scans as primitive parallel operations.IEEE Transactions on Computers, C-38(11), November 1989. 
    [7] Armando Fox, Steven D. Gribble, Yatin Chawathe, Eric A. Brewer, and Paul Gauthier. Cluster-based scalable network services. In Proceedings of the 16th ACM Symposium on Operating System Principles, pages 78. 91, Saint-Malo, France, 1997. 
    [8] Sanjay Ghemawat, Howard Gobioff, and Shun-Tak Leung. The Google file system. In 19th Symposium on Operating Systems Principles, pages 29.43, Lake George, New York, 2003. To appear in OSDI 2004 12 
    [9] S. Gorlatch. Systematic efficient parallelization of scan and other list homomorphisms. In L. Bouge, P. Fraigniaud, A. Mignotte, and Y. Robert, editors, Euro-Par’96. Parallel Processing, Lecture Notes in Computer Science 1124, pages 401.408. Springer-Verlag, 1996. 
    [10] Jim Gray. Sort benchmark home page. http://research.microsoft.com/barc/SortBenchmark/
    [11] William Gropp, Ewing Lusk, and Anthony Skjellum. Using MPI: Portable Parallel Programming with the Message-Passing Interface. MIT Press, Cambridge, MA, 1999. 
    [12] L. Huston, R. Sukthankar, R.Wickremesinghe, M. Satyanarayanan, G. R. Ganger, E. Riedel, and A. Ailamaki. Diamond: A storage architecture for early discard in interactive search. In Proceedings of the 2004 USENIX File and Storage Technologies FAST Conference, April 2004. 
    [13] Richard E. Ladner and Michael J. Fischer. Parallel prefix computation. Journal of the ACM, 27(4):831.838, 1980. 
    [14] Michael O. Rabin. Efficient dispersal of information for security, load balancing and fault tolerance. Journal of the ACM, 36(2):335.348, 1989. 
    [15] Erik Riedel, Christos Faloutsos, Garth A. Gibson, and David Nagle. Active disks for large-scale data processing. IEEE Computer, pages 68.74, June 2001. 
    [16] Douglas Thain, Todd Tannenbaum, and Miron Livny. Distributed computing in practice: The Condor experience. Concurrency and Computation: Practice and Experience, 2004. 
    [17] L. G. Valiant. A bridging model for parallel computation. Communications of the ACM, 33(8):103.111, 1997. 
    [18] Jim Wyllie. Spsort: How to sort a terabyte quickly. http://alme1.almaden.ibm.com/cs/spsort.pdf.

    附录A、单词频率统计

    本节包含了一个完整的程序,用于统计在一组命令行指定的输入文件中,每一个不同的单词出现频率。

    #include “mapreduce/mapreduce.h”
    // User’s map function
    class WordCounter : public Mapper {
     public:
      virtual void Map(const MapInput& input) {
       const string& text = input.value();
       const int n = text.size();
       for (int i = 0; i < n; ) {
        // Skip past leading whitespace
        while ((i < n) && isspace(text[i]))
         i++;
       // Find word end
       int start = i;
       while ((i < n) && !isspace(text[i]))
        i++;
       if (start < i)
        Emit(text.substr(start,i-start),”1″);
      }
     }
    };
    REGISTER_MAPPER(WordCounter);
    // User’s reduce function
    class Adder : public Reducer {
     virtual void Reduce(ReduceInput* input) {
      // Iterate over all entries with the
      // same key and add the values
      int64 value = 0;
      while (!input->done()) {
       value += StringToInt(input->value());
       input->NextValue();
      }
      // Emit sum for input->key()
      Emit(IntToString(value));
     }
    };
    REGISTER_REDUCER(Adder);
    int main(int argc, char** argv) {
     ParseCommandLineFlags(argc, argv);
    
     MapReduceSpecification spec;
    
     // Store list of input files into “spec”
     for (int i = 1; i < argc; i++) {
      MapReduceInput* input = spec.add_input();
      input->set_format(“text”);
      input->set_filepattern(argv[i]);
      input->set_mapper_class(“WordCounter”);
     }
     // Specify the output files:
     // /gfs/test/freq-00000-of-00100
     // /gfs/test/freq-00001-of-00100
     // …
     MapReduceOutput* out = spec.output();
     out->set_filebase(“/gfs/test/freq”);
     out->set_num_tasks(100);
     out->set_format(“text”);
     out->set_reducer_class(“Adder”);
    
     // Optional: do partial sums within map
     // tasks to save network bandwidth
     out->set_combiner_class(“Adder”);
     // Tuning parameters: use at most 2000
     // machines and 100 MB of memory per task
     spec.set_machines(2000);
     spec.set_map_megabytes(100);
     spec.set_reduce_megabytes(100);
    
     // Now run it
     MapReduceResult result;
     if (!MapReduce(spec, &result)) abort();
    
     // Done: ‘result’ structure contains info
     // about counters, time taken, number of
     // machines used, etc.
     return 0;
    }
    展开全文
  • MapReduce:超大机群上的简单数据处理 摘要...用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间value.下面将列举许多可以用这个模

    MapReduce:超大机群上的简单数据处理
     
                                              摘要
    MapReduce是一个编程模型,和处理,产生大数据集的相关实现.用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间value.下面将列举许多可以用这个模型来表示的现实世界的工作.
    以这种方式写的程序能自动的在大规模的普通机器上实现并行化.这个运行时系统关心这些细节:分割输入数据,在机群上的调度,机器的错误处理,管理机器之间必要的通信.这样就可以让那些没有并行分布式处理系统经验的程序员利用大量分布式系统的资源.
    我们的MapReduce实现运行在规模可以灵活调整的由普通机器组成的机群上,一个典型的MapReduce计算处理几千台机器上的以TB计算的数据.程序员发现这个系统非常好用:已经实现了数以百计的MapReduce程序,每天在Google的机群上都有1000多个MapReduce程序在执行.
    1.介绍
    在过去的5年里,作者和Google的许多人已经实现了数以百计的为专门目的而写的计算来处理大量的原始数据,比如,爬行的文档,Web请求日志,等等.为了计算各种类型的派生数据,比如,倒排索引,Web文档的图结构的各种表示,每个主机上爬行的页面数量的概要,每天被请求数量最多的集合,等等.很多这样的计算在概念上很容易理解.然而,输入的数据量很大,并且只有计算被分布在成百上千的机器上才能在可以接受的时间内完成.怎样并行计算,分发数据,处理错误,所有这些问题综合在一起,使得原本很简介的计算,因为要大量的复杂代码来处理这些问题,而变得让人难以处理.
    作为对这个复杂性的回应,我们设计一个新的抽象模型,它让我们表示我们将要执行的简单计算,而隐藏并行化,容错,数据分布,负载均衡的那些杂乱的细节,在一个库里.我们的抽象模型的灵感来自Lisp和许多其他函数语言的map和reduce的原始表示.我们认识到我们的许多计算都包含这样的操作:在我们输入数据的逻辑记录上应用map操作,来计算出一个中间key/value对集,在所有具有相同key的value上应用reduce操作,来适当的合并派生的数据.功能模型的使用,再结合用户指定的map和reduce操作,让我们可以非常容易的实现大规模并行化计算,和使用再次执行作为初级机制来实现容错.
    这个工作的主要贡献是通过简单有力的接口来实现自动的并行化和大规模分布式计算,结合这个接口的实现来在大量普通的PC机上实现高性能计算.
    第二部分描述基本的编程模型,并且给一些例子.第三部分描述符合我们的基于集群的计算环境的MapReduce的接口的实现.第四部分描述我们觉得编程模型中一些有用的技巧.第五部分对于各种不同的任务,测量我们实现的性能.第六部分探究在Google内部使用MapReduce作为基础来重写我们的索引系统产品.第七部分讨论相关的,和未来的工作.
    2.编程模型
    计算利用一个输入key/value对集,来产生一个输出key/value对集.MapReduce库的用户用两个函数表达这个计算:map和reduce.
    用户自定义的map函数,接受一个输入对,然后产生一个中间key/value对集.MapReduce库把所有具有相同中间key I的中间value聚合在一起,然后把它们传递给reduce函数.
    用户自定义的reduce函数,接受一个中间key I和相关的一个value集.它合并这些value,形成一个比较小的value集.一般的,每次reduce调用只产生0或1个输出value.通过一个迭代器把中间value提供给用户自定义的reduce函数.这样可以使我们根据内存来控制value列表的大小.
    2.1 实例
    考虑这个问题:计算在一个大的文档集合中每个词出现的次数.用户将写和下面类似的伪代码:
    map(String key,String value):
     //key:文档的名字
     //value:文档的内容
     for each word w in value:
        EmitIntermediate(w,"1");
     
    reduce(String key,Iterator values):
    //key:一个词
    //values:一个计数列表
     int result=0;
     for each v in values:
       result+=ParseInt(v);
     Emit(AsString(resut));
    map函数产生每个词和这个词的出现次数(在这个简单的例子里就是1).reduce函数把产生的每一个特定的词的计数加在一起.
    另外,用户用输入输出文件的名字和可选的调节参数来填充一个mapreduce规范对象.用户然后调用MapReduce函数,并把规范对象传递给它.用户的代码和MapReduce库链接在一起(用C++实现).附录A包含这个实例的全部文本.
    2.2类型
    即使前面的伪代码写成了字符串输入和输出的term格式,但是概念上用户写的map和reduce函数有关联的类型:
     map(k1,v1) ->list(k2,v2)
     reduce(k2,list(v2)) ->list(v2)
    例如,输入的key,value和输出的key,value的域不同.此外,中间key,value和输出key,values的域相同.
    我们的C++实现传递字符串来和用户自定义的函数交互,并把它留给用户的代码,来在字符串和适当的类型间进行转换.
    2.3更多实例
    这里有一些让人感兴趣的简单程序,可以容易的用MapReduce计算来表示.
    分布式的Grep(UNIX工具程序, 可做文件内的字符串查找):如果输入行匹配给定的样式,map函数就输出这一行.reduce函数就是把中间数据复制到输出.
    计算URL访问频率:map函数处理web页面请求的记录,输出(URL,1).reduce函数把相同URL的value都加起来,产生一个(URL,记录总数)的对.
    倒转网络链接图:map函数为每个链接输出(目标,源)对,一个URL叫做目标,包含这个URL的页面叫做源.reduce函数根据给定的相关目标URLs连接所有的源URLs形成一个列表,产生(目标,源列表)对.
    每个主机的术语向量:一个术语向量用一个(词,频率)列表来概述出现在一个文档或一个文档集中的最重要的一些词.map函数为每一个输入文档产生一个(主机名,术语向量)对(主机名来自文档的URL).reduce函数接收给定主机的所有文档的术语向量.它把这些术语向量加在一起,丢弃低频的术语,然后产生一个最终的(主机名,术语向量)对.
    倒排索引:map函数分析每个文档,然后产生一个(词,文档号)对的序列.reduce函数接受一个给定词的所有对,排序相应的文档IDs,并且产生一个(词,文档ID列表)对.所有的输出对集形成一个简单的倒排索引.它可以简单的增加跟踪词位置的计算.
    分布式排序:map函数从每个记录提取key,并且产生一个(key,record)对.reduce函数不改变任何的对.这个计算依赖分割工具(在4.1描述)和排序属性(在4.2描述).
    3实现
    MapReduce接口可能有许多不同的实现.根据环境进行正确的选择.例如,一个实现对一个共享内存较小的机器是合适的,另外的适合一个大NUMA的多处理器的机器,而有的适合一个更大的网络机器的集合.
    这部分描述一个在Google广泛使用的计算环境的实现:用交换机连接的普通PC机的大机群.我们的环境是:
    1.Linux操作系统,双处理器,2-4GB内存的机器.
    2.普通的网络硬件,每个机器的带宽或者是百兆或者千兆,但是平均小于全部带宽的一半.
    3.因为一个机群包含成百上千的机器,所有机器会经常出现问题.
    4.存储用直接连到每个机器上的廉价IDE硬盘.一个从内部文件系统发展起来的分布式文件系统被用来管理存储在这些磁盘上的数据.文件系统用复制的方式在不可靠的硬件上来保证可靠性和有效性.
    5.用户提交工作给调度系统.每个工作包含一个任务集,每个工作被调度者映射到机群中一个可用的机器集上.
     
    3.1执行预览
    通过自动分割输入数据成一个有M个split的集,map调用被分布到多台机器上.输入的split能够在不同的机器上被并行处理.通过用分割函数分割中间key,来形成R个片(例如,hash(key) mod R),reduce调用被分布到多台机器上.分割数量(R)和分割函数由用户来指定.
    图1显示了我们实现的MapReduce操作的全部流程.当用户的程序调用MapReduce的函数的时候,将发生下面的一系列动作(下面的数字和图1中的数字标签相对应):
        1.在用户程序里的MapReduce库首先分割输入文件成M个片,每个片的大小一般从 16到64MB(用户可以通过可选的参数来控制).然后在机群中开始大量的拷贝程序.
          2.这些程序拷贝中的一个是master,其他的都是由master分配任务的worker.有M 个map任务和R个reduce任务将被分配.管理者分配一个map任务或reduce任务给一个空闲的worker.
    3.一个被分配了map任务的worker读取相关输入split的内容.它从输入数据中分析出key/value对,然后把key/value对传递给用户自定义的map函数.由map函数产生的中间key/value对被缓存在内存中.
    4.缓存在内存中的key/value对被周期性的写入到本地磁盘上,通过分割函数把它们写入R个区域.在本地磁盘上的缓存对的位置被传送给master,master负责把这些位置传送给reduce worker.
    5.当一个reduce worker得到master的位置通知的时候,它使用远程过程调用来从map worker的磁盘上读取缓存的数据.当reduce worker读取了所有的中间数据后,它通过排序使具有相同key的内容聚合在一起.因为许多不同的key映射到相同的reduce任务,所以排序是必须的.如果中间数据比内存还大,那么还需要一个外部排序.
          6.reduce worker迭代排过序的中间数据,对于遇到的每一个唯一的中间key,它把key和相关的中间value集传递给用户自定义的reduce函数.reduce函数的输出被添加到这个reduce分割的最终的输出文件中.
    7.当所有的map和reduce任务都完成了,管理者唤醒用户程序.在这个时候,在用户程序里的MapReduce调用返回到用户代码.
    在成功完成之后,mapreduce执行的输出存放在R个输出文件中(每一个reduce任务产生一个由用户指定名字的文件).一般,用户不需要合并这R个输出文件成一个文件--他们经常把这些文件当作一个输入传递给其他的MapReduce调用,或者在可以处理多个分割文件的分布式应用中使用他们.
    3.2master数据结构
    master保持一些数据结构.它为每一个map和reduce任务存储它们的状态(空闲,工作中,完成),和worker机器(非空闲任务的机器)的标识.
    master就像一个管道,通过它,中间文件区域的位置从map任务传递到reduce任务.因此,对于每个完成的map任务,master存储由map任务产生的R个中间文件区域的大小和位置.当map任务完成的时候,位置和大小的更新信息被接受.这些信息被逐步增加的传递给那些正在工作的reduce任务.
    3.3容错
    因为MapReduce库被设计用来使用成百上千的机器来帮助处理非常大规模的数据,所以这个库必须要能很好的处理机器故障.
    worker故障
    master周期性的ping每个worker.如果master在一个确定的时间段内没有收到worker返回的信息,那么它将把这个worker标记成失效.因为每一个由这个失效的worker完成的map任务被重新设置成它初始的空闲状态,所以它可以被安排给其他的worker.同样的,每一个在失败的worker上正在运行的map或reduce任务,也被重新设置成空闲状态,并且将被重新调度.
    在一个失败机器上已经完成的map任务将被再次执行,因为它的输出存储在它的磁盘上,所以不可访问.已经完成的reduce任务将不会再次执行,因为它的输出存储在全局文件系统中.
    当一个map任务首先被worker A执行之后,又被B执行了(因为A失效了),重新执行这个情况被通知给所有执行reduce任务的worker.任何还没有从A读数据的reduce任务将从worker B读取数据.
    MapReduce可以处理大规模worker失败的情况.例如,在一个MapReduce操作期间,在正在运行的机群上进行网络维护引起80台机器在几分钟内不可访问了,MapReduce master只是简单的再次执行已经被不可访问的worker完成的工作,继续执行,最终完成这个MapReduce操作.
    master失败
    可以很容易的让管理者周期的写入上面描述的数据结构的checkpoints.如果这个master任务失效了,可以从上次最后一个checkpoint开始启动另一个master进程.然而,因为只有一个master,所以它的失败是比较麻烦的,因此我们现在的实现是,如果master失败,就中止MapReduce计算.客户可以检查这个状态,并且可以根据需要重新执行MapReduce操作.
    在错误面前的处理机制
    当用户提供的map和reduce操作对它的输出值是确定的函数时,我们的分布式实现产生,和全部程序没有错误的顺序执行一样,相同的输出.
    我们依赖对map和reduce任务的输出进行原子提交来完成这个性质.每个工作中的任务把它的输出写到私有临时文件中.一个reduce任务产生一个这样的文件,而一个map任务产生R个这样的文件(一个reduce任务对应一个文件).当一个map任务完成的时候,worker发送一个消息给master,在这个消息中包含这R个临时文件的名字.如果master从一个已经完成的map任务再次收到一个完成的消息,它将忽略这个消息.否则,它在master的数据结构里记录这R个文件的名字.
    当一个reduce任务完成的时候,这个reduce worker原子的把临时文件重命名成最终的输出文件.如果相同的reduce任务在多个机器上执行,多个重命名调用将被执行,并产生相同的输出文件.我们依赖由底层文件系统提供的原子重命名操作来保证,最终的文件系统状态仅仅包含一个reduce任务产生的数据.
    我们的map和reduce操作大部分都是确定的,并且我们的处理机制等价于一个顺序的执行的这个事实,使得程序员可以很容易的理解程序的行为.当map或/和reduce操作是不确定的时候,我们提供虽然比较弱但是合理的处理机制.当在一个非确定操作的前面,一个reduce任务R1的输出等价于一个非确定顺序程序执行产生的输出.然而,一个不同的reduce任务R2的输出也许符合一个不同的非确定顺序程序执行产生的输出.
    考虑map任务M和reduce任务R1,R2的情况.我们设定e(Ri)为已经提交的Ri的执行(有且仅有一个这样的执行).这个比较弱的语义出现,因为e(R1)也许已经读取了由M的执行产生的输出,而e(R2)也许已经读取了由M的不同执行产生的输出.
    3.4存储位置
    在我们的计算机环境里,网络带宽是一个相当缺乏的资源.我们利用把输入数据(由GFS管理)存储在机器的本地磁盘上来保存网络带宽.GFS把每个文件分成64MB的一些块,然后每个块的几个拷贝存储在不同的机器上(一般是3个拷贝).MapReduce的master考虑输入文件的位置信息,并且努力在一个包含相关输入数据的机器上安排一个map任务.如果这样做失败了,它尝试在那个任务的输入数据的附近安排一个map任务(例如,分配到一个和包含输入数据块在一个switch里的worker机器上执行).当运行巨大的MapReduce操作在一个机群中的一部分机器上的时候,大部分输入数据在本地被读取,从而不消耗网络带宽.
    3.5任务粒度
    象上面描述的那样,我们细分map阶段成M个片,reduce阶段成R个片.M和R应当比worker机器的数量大许多.每个worker执行许多不同的工作来提高动态负载均衡,也可以加速从一个worker失效中的恢复,这个机器上的许多已经完成的map任务可以被分配到所有其他的worker机器上.
    在我们的实现里,M和R的范围是有大小限制的,因为master必须做O(M+R)次调度,并且保存O(M*R)个状态在内存中.(这个因素使用的内存是很少的,在O(M*R)个状态片里,大约每个map任务/reduce任务对使用一个字节的数据).
    此外,R经常被用户限制,因为每一个reduce任务最终都是一个独立的输出文件.实际上,我们倾向于选择M,以便每一个单独的任务大概都是16到64MB的输入数据(以便上面描述的位置优化是最有效的),我们把R设置成我们希望使用的worker机器数量的小倍数.我们经常执行MapReduce计算,在M=200000,R=5000,使用2000台工作者机器的情况下.
    3.6备用任务
    一个落后者是延长MapReduce操作时间的原因之一:一个机器花费一个异乎寻常地的长时间来完成最后的一些map或reduce任务中的一个.有很多原因可能产生落后者.例如,一个有坏磁盘的机器经常发生可以纠正的错误,这样就使读性能从30MB/s降低到3MB/s.机群调度系统也许已经安排其他的任务在这个机器上,由于计算要使用CPU,内存,本地磁盘,网络带宽的原因,引起它执行MapReduce代码很慢.我们最近遇到的一个问题是,一个在机器初始化时的Bug引起处理器缓存的失效:在一个被影响的机器上的计算性能有上百倍的影响.
    我们有一个一般的机制来减轻这个落后者的问题.当一个MapReduce操作将要完成的时候,master调度备用进程来执行那些剩下的还在执行的任务.无论是原来的还是备用的执行完成了,工作都被标记成完成.我们已经调整了这个机制,通常只会占用多几个百分点的机器资源.我们发现这可以显著的减少完成大规模MapReduce操作的时间.作为一个例子,将要在5.3描述的排序程序,在关闭掉备用任务的情况下,要比有备用任务的情况下多花44%的时间.
    4技巧
    尽管简单的map和reduce函数的功能对于大多数需求是足够的了,但是我们开发了一些有用的扩充.这些将在这个部分描述.
    4.1分割函数
    MapReduce用户指定reduce任务和reduce任务需要的输出文件的数量.在中间key上使用分割函数,使数据分割后通过这些任务.一个缺省的分割函数使用hash方法(例如,hash(key) mod R).这个导致非常平衡的分割.然后,有的时候,使用其他的key分割函数来分割数据有非常有用的.例如,有时候,输出的key是URLs,并且我们希望每个主机的所有条目保持在同一个输出文件中.为了支持像这样的情况,MapReduce库的用户可以提供专门的分割函数.例如,使用"hash(Hostname(urlkey)) mod R"作为分割函数,使所有来自同一个主机的URLs保存在同一个输出文件中.
    4.2顺序保证
    我们保证在一个给定的分割里面,中间key/value对以key递增的顺序处理.这个顺序保证可以使每个分割产出一个有序的输出文件,当输出文件的格式需要支持有效率的随机访问key的时候,或者对输出数据集再作排序的时候,就很容易.
     
    4.3combiner函数
    在某些情况下,允许中间结果key重复会占据相当的比重,并且用户定义的reduce函数
    满足结合律和交换律.一个很好的例子就是在2.1部分的词统计程序.因为词频率倾向于一个zipf分布(齐夫分布),每个map任务将产生成百上千个这样的记录<the,1>.所有的这些计数将通过网络被传输到一个单独的reduce任务,然后由reduce函数加在一起产生一个数字.我们允许用户指定一个可选的combiner函数,先在本地进行合并一下,然后再通过网络发送.
    在每一个执行map任务的机器上combiner函数被执行.一般的,相同的代码被用在combiner和reduce函数.在combiner和reduce函数之间唯一的区别是MapReduce库怎样控制函数的输出.reduce函数的输出被保存最终输出文件里.combiner函数的输出被写到中间文件里,然后被发送给reduce任务.
    部分使用combiner可以显著的提高一些MapReduce操作的速度.附录A包含一个使用combiner函数的例子.
    4.4输入输出类型
    MapReduce库支持以几种不同的格式读取输入数据.例如,文本模式输入把每一行看作是一个key/value对.key是文件的偏移量,value是那一行的内容.其他普通的支持格式以key的顺序存储key/value对序列.每一个输入类型的实现知道怎样把输入分割成对每个单独的map任务来说是有意义的(例如,文本模式的范围分割确保仅仅在每行的边界进行范围分割).虽然许多用户仅仅使用很少的预定意输入类型的一个,但是用户可以通过提供一个简单的reader接口来支持一个新的输入类型.
    一个reader不必要从文件里读数据.例如,我们可以很容易的定义它从数据库里读记录,或从内存中的数据结构读取.
    4.5副作用
    有的时候,MapReduce的用户发现在map操作或/和reduce操作时产生辅助文件作为一个附加的输出是很方便的.我们依靠应用程序写来使这个副作用成为原子的.一般的,应用程序写一个临时文件,然后一旦这个文件全部产生完,就自动的被重命名.
    对于单个任务产生的多个输出文件来说,我们没有提供其上的两阶段提交的原子操作支持.因此,一个产生需要交叉文件连接的多个输出文件的任务,应该使确定性的任务.不过这个限制在实际的工作中并不是一个问题.
    4.6跳过错误记录
    有的时候因为用户的代码里有bug,导致在某一个记录上map或reduce函数突然crash掉.这样的bug使得MapReduce操作不能完成.虽然一般是修复这个bug,但是有时候这是不现实的;也许这个bug是在源代码不可得到的第三方库里.有的时候也可以忽略一些记录,例如,当在一个大的数据集上进行统计分析.我们提供一个可选的执行模式,在这个模式下,MapReduce库检测那些记录引起的crash,然后跳过那些记录,来继续执行程序.
    每个worker程序安装一个信号处理器来获取内存段异常和总线错误.在调用一个用户自定义的map或reduce操作之前,MapReduce库把记录的序列号存储在一个全局变量里.如果用户代码产生一个信号,那个信号处理器就会发送一个包含序号的"last gasp"UDP包给MapReduce的master.当master不止一次看到同一个记录的时候,它就会指出,当相关的map或reduce任务再次执行的时候,这个记录应当被跳过.
    4.7本地执行
    调试在map或reduce函数中问题是很困难的,因为实际的计算发生在一个分布式的系统中,经常是有一个master动态的分配工作给几千台机器.为了简化调试和测试,我们开发了一个可替换的实现,这个实现在本地执行所有的MapReduce操作.用户可以控制执行,这样计算可以限制到特定的map任务上.用户以一个标志调用他们的程序,然后可以容易的使用他们认为好用的任何调试和测试工具(例如,gdb).
    4.8状态信息
    master运行一个HTTP服务器,并且可以输出一组状况页来供人们使用.状态页显示计算进度,象多少个任务已经完成,多少个还在运行,输入的字节数,中间数据字节数,输出字节数,处理百分比,等等.这个页也包含到标准错误的链接,和由每个任务产生的标准输出的链接.用户可以根据这些数据预测计算需要花费的时间,和是否需要更多的资源.当计算比预期的要慢很多的时候,这些页面也可以被用来判断是不是这样.
    此外,最上面的状态页显示已经有多少个工作者失败了,和当它们失败的时候,那个map和reduce任务正在运行.当试图诊断在用户代码里的bug时,这个信息也是有用的.
    4.9计数器
    MapReduce库提供一个计数器工具,来计算各种事件的发生次数.例如,用户代码想要计算所有处理的词的个数,或者被索引的德文文档的数量.
    为了使用这个工具,用户代码创建一个命名的计数器对象,然后在map或/和reduce函数里适当的增加计数器.例如:
    Counter * uppercase;
    uppercase=GetCounter("uppercase");
    map(String name,String contents):
     for each word w in contents:
        if(IsCapitalized(w)):
          uppercase->Increment();
        EmitIntermediate(w,"1");
    来自不同worker机器上的计数器值被周期性的传送给master(在ping回应里).master把来自成功的map和reduce任务的计数器值加起来,在MapReduce操作完成的时候,把它返回给用户代码.当前计数器的值也被显示在master状态页里,以便人们可以查看实际的计算进度.当计算计数器值的时候消除重复执行的影响,避免数据的累加.(在备用任务的使用,和由于出错的重新执行,可以产生重复执行)
    有些计数器值被MapReduce库自动的维护,比如,被处理的输入key/value对的数量,和被产生的输出key/value对的数量.
    用户发现计数器工具对于检查MapReduce操作的完整性很有用.例如,在一些MapReduce操作中,用户代码也许想要确保输出对的数量完全等于输入对的数量,或者处理过的德文文档的数量是在全部被处理的文档数量中属于合理的范围.
    5性能
    在本节,我们用在一个大型集群上运行的两个计算来衡量MapReduce的性能.一个计算用来在一个大概1TB的数据中查找特定的匹配串.另一个计算排序大概1TB的数据.
    这两个程序代表了MapReduce的用户实现的真实的程序的一个大子集.一类是,把数据从一种表示转化到另一种表示.另一类是,从一个大的数据集中提取少量的关心的数据.
    5.1机群配置
    所有的程序在包含大概1800台机器的机群上执行.机器的配置是:2个2G的Intel Xeon超线程处理器,4GB内存,两个160GB IDE磁盘,一个千兆网卡.这些机器部署在一个由两层的,树形交换网络中,在根节点上大概有100到2000G的带宽.所有这些机器都有相同的部署(对等部署),因此任意两点之间的来回时间小于1毫秒.
     
    在4GB的内存里,大概有1-1.5GB被用来运行在机群中其他的任务.这个程序是在周末的下午开始执行的,这个时候CPU,磁盘,网络基本上是空闲的.
    5.2Grep
    这个Grep程序扫描大概10^10个,每个100字节的记录,查找比较少的3字符的查找串(这个查找串出现在92337个记录中).输入数据被分割成大概64MB的片(M=15000),全部 的输出存放在一个文件中(R=1).
    图2显示计算过程随时间变化的情况.Y轴表示输入数据被扫描的速度.随着更多的机群被分配给这个MapReduce计算,速度在逐步的提高,当有1764个worker的时候这个速度达到最高的30GB/s.当map任务完成的时候,速度开始下降,在计算开始后80秒,输入的速度降到0.这个计算持续的时间大概是150秒.这包括了前面大概一分钟的启动时间.启动时间用来把程序传播到所有的机器上,等待GFS打开1000个输入文件,得到必要的位置优化信息.
    5.3排序
    这个sort程序排序10^10个记录,每个记录100个字节(大概1TB的数据).这个程序是模仿TeraSort的.
    这个排序程序只包含不到50行的用户代码.其中有3行map函数用来从文本行提取10字节的排序key,并且产生一个由这个key和原始文本行组成的中间key/value对.我们使用一个内置的Identity函数作为reduce操作.这个函数直接把中间key/value对作为输出的key/value对.最终的排序输出写到一个2路复制的GFS文件中(也就是,程序的输出会写2TB的数据).
    象以前一样,输入数据被分割成64MB的片(M=15000).我们把排序后的输出写到4000个文件中(R=4000).分区函数使用key的原始字节来把数据分区到R个小片中.
    我们以这个基准的分割函数,知道key的分布情况.在一般的排序程序中,我们会增加一个预处理的MapReduce操作,这个操作用于采样key的情况,并且用这个采样的key的分布情况来计算对最终排序处理的分割点。
    图3(a)显示这个排序程序的正常执行情况.左上图显示输入数据的读取速度.这个速度最高到达13GB/s,并且在不到200秒所有map任务完成之后迅速滑落到0.注意到这个输入速度小于Grep.这是因为这个排序map任务花费大概一半的时间和带宽,来把中间数据写到本地硬盘中.而Grep相关的中间数据可以忽略不计.
    左中图显示数据通过网络从map任务传输给reduce任务的速度.当第一个map任务完成后,这个排序过程就开始了.图示上的第一个高峰是启动了第一批大概1700个reduce任务(整个MapReduce任务被分配到1700台机器上,每个机器一次只执行一个reduce任务).大概开始计算后的300秒,第一批reduce任务中的一些完成了,我们开始执行剩下的reduce任务.全部的排序过程持续了大概600秒的时间.
    左下图显示排序后的数据被reduce任务写入最终文件的速度.因为机器忙于排序中间数据,所以在第一个排序阶段的结束和写阶段的开始有一个延迟.写的速度大概是2-4GB/s.大概开始计算后的850秒写过程结束.包括前面的启动过程,全部的计算任务持续的891秒.这个和TeraSort benchmark的最高纪录1057秒差不多.
    需要注意的事情是:因此位置优化的原因,很多数据都是从本地磁盘读取的而没有通过我们有限带宽的网络,所以输入速度比排序速度和输出速度都要快.排序速度比输出速度快的原因是输出阶段写两个排序后数据的拷贝(我们写两个副本的原因是为了可靠性和可用性).我们写两份的原因是因为底层文件系统的可靠性和可用性的要求.如果底层文件系统用类似容错编码(erasure coding)的方式,而不采用复制写的方式,在写盘阶段可以降低网络带宽的要求。
    5.4备用任务的影响
    在图3(b)中,显示我们不用备用任务的排序程序的执行情况.除了它有一个很长的几乎没有写动作发生的尾巴外,执行流程和图3(a)相似.在960秒后,只有5个reduce任务没有完成.然而,就是这最后几个落后者知道300秒后才完成.全部的计算任务执行了1283秒,多花了44%的时间.
    5.5机器失效
    在图3(c)中,显示我们有意的在排序程序计算过程中停止1746台worker中的200台机器上的程序的情况.底层机群调度者在这些机器上马上重新开始新的worker程序(因为仅仅程序被停止,而机器仍然在正常运行).
    因为已经完成的map工作丢失了(由于相关的map worker被杀掉了),需要重新再作,所以worker死掉会导致一个负数的输入速率.相关map任务的重新执行很快就重新执行了.整个计算过程在933秒内完成,包括了前边的启动时间(只比正常执行时间多了5%的时间).
    6经验
    我们在2003年的2月写了MapReduce库的第一个版本,并且在2003年的8月做了显著的增强,包括位置优化,worker机器间任务执行的动态负载均衡,等等.从那个时候起,我们惊奇的发现MapReduce函数库广泛用于我们日常处理的问题.它现在在Google内部各个领域内广泛应用,包括:
        大规模机器学习问题
    Google News和Froogle产品的机器问题.
    提取数据产生一个流行查询的报告(例如,Google Zeitgeist).
    为新的试验和产品提取网页的属性(例如,从一个web页的大集合中提取位置信息   用在位置查询).
       大规模的图计算.
    图4显示了我们主要的源代码管理系统中,随着时间推移,MapReduce程序的显著增加,从2003年早先时候的0个增长到2004年9月份的差不多900个不同的程序.MapReduce之所以这样的成功,是因为他能够在不到半小时时间内写出一个简单的能够应用于上千台机器的大规模并发程序,并且极大的提高了开发和原形设计的周期效率.并且,他可以让一个完全没有分布式和/或并行系统经验的程序员,能够很容易的利用大量的资源.
    在每一个任务结束的时候,MapReduce函数库记录使用的计算资源的统计信息.在图1里,我们列出了2004年8月份在Google运行的一些MapReduce的工作的统计信息.
    6.1大规模索引
    到目前为止,最成功的MapReduce的应用就是重写了Google web 搜索服务所使用到的index系统.索引系统处理爬虫系统抓回来的超大量的文档集,这些文档集保存在GFS文件里.这些文档的原始内容的大小,超过了20TB.索引程序是通过一系列的,大概5到10次MapReduce操作来建立索引.通过利用MapReduce(替换掉上一个版本的特别设计的分布处理的索引程序版本)有这样一些好处:
       索引的代码简单,量少,容易理解,因为容错,分布式,并行处理都隐藏在MapReduce库中了.例如,当使用MapReduce函数库的时候,计算的代码行数从原来的3800行C++代码一下减少到大概700行代码.
       MapReduce的函数库的性能已经非常好,所以我们可以把概念上不相关的计算步骤分开处理,而不是混在一起以期减少在数据上的处理.这使得改变索引过程很容易.例如,我们对老索引系统的一个小更改可能要好几个月的时间,但是在新系统内,只需要花几天时间就可以了.
       索引系统的操作更容易了,这是因为机器的失效,速度慢的机器,以及网络失效都已经由MapReduce自己解决了,而不需要操作人员的交互.另外,我们可以简单的通过对索引系统增加机器的方式提高处理性能.
    7相关工作
    很多系统都提供了严格的设计模式,并且通过对编程的严格限制来实现自动的并行计算.例如,一个结合函数可以通过N个元素的数组的前缀在N个处理器上使用并行前缀计算在log N的时间内计算完.MapReduce是基于我们的大型现实计算的经验,对这些模型的一个简化和精炼.并且,我们还提供了基于上千台处理器的容错实现.而大部分并发处理系统都只在小规模的尺度上实现,并且机器的容错还是程序员来控制的.
    Bulk Synchronous Programming以及一些MPI primitives提供了更高级别的抽象,可以更容易写出并行处理的程序.这些系统和MapReduce系统的不同之处在,MapReduce利用严格的编程模式自动实现用户程序的并发处理,并且提供了透明的容错处理.
    我们本地的优化策略是受active disks等技术的启发,在active disks中,计算任务是尽量推送到靠近本地磁盘的处理单元上,这样就减少了通过I/O子系统或网络的数据量.我们在少量磁盘直接连接到普通处理机运行,来代替直接连接到磁盘控制器的处理机上,但是一般的步骤是相似的.
    我们的备用任务的机制和在Charlotte系统上的积极调度机制相似.这个简单的积极调度的一个缺陷是,如果一个任务引起了一个重复性的失败,那个整个计算将无法完成.我们通过在故障情况下跳过故障记录的机制,在某种程度上解决了这个问题.
    MapReduce实现依赖一个内置的机群管理系统来在一个大规模共享机器组上分布和运行用户任务.虽然这个不是本论文的重点,但是集群管理系统在理念上和Condor等其他系统是一样的.
    在MapReduce库中的排序工具在操作上和NOW-Sort相似.源机器(map worker)分割将要被排序的数据,然后把它发送到R个reduce worker中的一个上.每个reduce worker来本地排序它的数据(如果可能,就在内存中).当然,NOW-Sort没有用户自定义的map和reduce函数,使得我们的库可以广泛的应用.
    River提供一个编程模型,在这个模型下,处理进程可以靠在分布式的队列上发送数据进行彼此通讯.和MapReduce一样,River系统尝试提供对不同应用有近似平均的性能,即使在不对等的硬件环境下或者在系统颠簸的情况下也能提供近似平均的性.River是通过精心调度硬盘和网络的通讯,来平衡任务的完成时间.MapReduce不和它不同.利用严格编程模型,MapReduce构架来把问题分割成大量的任务.这些任务被自动的在可用的worker上调度,以便速度快的worker可以处理更多的任务.这个严格编程模型也让我们可以在工作快要结束的时候安排冗余的执行,来在非一致处理的情况减少完成时间(比如,在有慢机或者阻塞的worker的时候).
    BAD-FS是一个很MapReduce完全不同的编程模型,它的目标是在一个广阔的网络上执行工作.然而,它们有两个基本原理是相同的.(1)这两个系统使用冗余的执行来从由失效引起的数据丢失中恢复.(2)这两个系统使用本地化调度策略,来减少通过拥挤的网络连接发送的数据数量.
    TACC是一个被设计用来简化高有效性网络服务结构的系统.和MapReduce一样,它通过再次执行来实现容错.
    8结束语
    MapReduce编程模型已经在Google成功的用在不同的目的.我们把这个成功归于以下几个原因:第一,这个模型使用简单,甚至对没有并行和分布式经验的程序员也是如此,因为它隐藏了并行化,容错,位置优化和负载均衡的细节.第二,大量不同的问题可以用MapReduce计算来表达.例如,MapReduce被用来,为Google的产品web搜索服务,排序,数据挖掘,机器学习,和其他许多系统,产生数据.第三,我们已经在一个好几千台计算机的大型集群上开发实现了这个MapReduce.这个实现使得对于这些机器资源的利用非常简单,因此也适用于解决Google遇到的其他很多需要大量计算的问题.
    从这个工作中我们也学习到了一些东西.首先,严格的编程模型使得并行化和分布式计算简单,并且也易于构造这样的容错计算环境.第二,网络带宽是系统的瓶颈.因此在我们的系统中大量的优化目标是减少通过网络发送的数据量,本地优化使用我们从本地磁盘读取数据,并且把中间数据写到本地磁盘,以保留网络带宽.第三,冗余的执行可以用来减少速度慢的机器的影响,和控制机器失效和数据丢失.
    感谢
    Josh Levenberg校定和扩展了用户级别的MapReduce API,并且结合他的适用经验和其他人的改进建议,增加了很多新的功能.MapReduce从GFS中读取和写入数据.我们要感谢Mohit Aron,Howard Gobioff,Markus Gutschke,David Krame,Shun-Tak Leung,和Josh Redstone,他们在开发GFS中的工作.我们还感谢Percy Liang Olcan Sercinoglu 在开发用于MapReduce的集群管理系统得工作.Mike Burrows,Wilson Hsieh,Josh Levenberg,Sharon Perl,RobPike,Debby Wallach为本论文提出了宝贵的意见.OSDI的无名审阅者,以及我们的审核者Eric Brewer,在论文应当如何改进方面给出了有益的意见.最后,我们感谢Google的工程部的所有MapReduce的用户,感谢他们提供了有用的反馈,建议,以及错误报告等等.
    A单词频率统计
    本节包含了一个完整的程序,用于统计在一组命令行指定的输入文件中,每一个不同的单词出现频率.
    #include "mapreduce/mapreduce.h"
    //用户map函数
    class WordCounter : public Mapper {
     public:
        virtual void Map(const MapInput& input) {
          const string& text = input.value();
          const int n = text.size();
          for (int i = 0; i < n; ) {
            //跳过前导空格
            while ((i < n) && isspace(text[i]))
                 i++;
             // 查找单词的结束位置
             int start = i;
             while ((i < n) && !isspace(text[i]))
                  i++;
             if (start < i)
                Emit(text.substr(start,i-start),"1");
            }
     
         }
     
    };
     
    REGISTER_MAPPER(WordCounter);
    //用户的reduce函数
    class Adder : public Reducer {
        virtual void Reduce(ReduceInput* input) {
                 //迭代具有相同key的所有条目,并且累加它们的value
                  int64 value = 0;
                  while (!input->done()) {
                         value += StringToInt(input->value());
                         input->NextValue();
                  }
                   //提交这个输入key的综合
                  Emit(IntToString(value));
           }
     
    };
    REGISTER_REDUCER(Adder);
    int main(int argc, char** argv) {
           ParseCommandLineFlags(argc, argv);
           MapReduceSpecification spec;
           // 把输入文件列表存入"spec"
           for (int i = 1; i < argc; i++) {
                  MapReduceInput* input = spec.add_input();
                  input->set_format("text");
                  input->set_filepattern(argv[i]);
                  input->set_mapper_class("WordCounter");
           }
            //指定输出文件:
           // /gfs/test/freq-00000-of-00100
           // /gfs/test/freq-00001-of-00100
          // ...
           MapReduceOutput* out = spec.output();
           out->set_filebase("/gfs/test/freq");
           out->set_num_tasks(100);
           out->set_format("text");
           out->set_reducer_class("Adder");
           // 可选操作:在map任务中做部分累加工作,以便节省带宽
           out->set_combiner_class("Adder");
           // 调整参数: 使用2000台机器,每个任务100MB内存
           spec.set_machines(2000);
           spec.set_map_megabytes(100);
           spec.set_reduce_megabytes(100);
           // 运行它
           MapReduceResult result;
           if (!MapReduce(spec, &result)) abort();
           // 完成: 'result'结构包含计数,花费时间,和使用机器的信息
           return 0;
    }
     
    <script type="text/javascript" src="http://pagead2.googlesyndication.com/pagead/show_ads.js"> </script>
    展开全文
  • MapReduce(中文翻译)

    2017-05-17 10:21:23
    用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间value.下面将列举许多可以用这个模型来表示的现实世界的工作. 以这种方式写的...
  • MapReduce:超大机群上的简单数据处理 ...用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间value.下面将列举许多可以用
  • PHP ini文件中文翻译

    2016-03-10 10:16:23
    ;;;;;;;;;;; ;; 语法 ;; ;;;;;;;;;;;; ; 该文件的语法非常简单。空白字符和以分号开始的行被... directive = value ; 指令名(directive)是大小写敏感的!所以"foo=bar"不同于"FOO=bar"。...
  • 包含美赛2018年C题的所有优秀论文及其中文翻译 例: We first build CAFE, a novel framework for Characterization, Analysis, Forecast and Evaluation on the energy profile (EP) of a state. We constitute ...
  • argument is the type value ``Bool``. </man-singleton-types></man-singleton-types></code></pre> <ul><li> 由于Julia现在更换为了markdown作为文档语言,我们也更换到...
  • 请设计程序和相应的数据结构,使小王能记录新学的英文单词和其中文翻译,并能很方便地根据英文来查找中文。(参考:数据结构建议用集合。集合添加: dic[key]=value 判断 key 是否在集合中: if key in dic)   一...
  • bindActionCreators就是给action创建函数绑定了dispatch, 可以直接以普通函数执行,而不用dispatch...比如下面,bindActionCreators生成一个对象,对象里面的value值是function, 那么可以直接this.boundActionCreators....
  • CakePHP manual 中文翻译4

    2008-11-10 10:29:00
    注意save操作是如何放置在一个条件语句中:如果你试图保存数据到model中,cake自动尝试确数据正确(根据你提供的规则)。可以查看第十章了解更多... ●savaFieldId($name,$value) ○保存一个单field的值 ...
  • Language Guide注:这是本人的翻译,可能不准确,可能有错误,但是基本上可以理解,希望能对大家有所帮助!(转载请注明出处:本文来自learnhard的CSDN博客:http://blog.csdn.net/learnhard/)· Defining A ...
  • MapReduce:超大机群上的简单数据处理   ...用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间value.下面将列举
  • You need to return multiple values, and each value may require some asynchronous work. This point is commonly reached from one of two paths: 您需要返回多个值,每个值可能需要一些异步工作。通常有两种...
  • MapReduce:超大机群上的简单数据处理 ...用户指定一个map函数处理一个key/value对,从而产生中间的key/value对集.然后再指定一个reduce函数合并所有的具有相同中间key的中间value.下面将列举许多可以用...
  • 在UML中,采用stereotype、...这里我想说一说stereotype在UML中的意思及其在中文中的翻译问题。 Stereotyp英文的原意是印刷中的铅字。比如,如果我们需要印一本书,而这本书中有一个字目前没有其铅字,那么我们就...

空空如也

空空如也

1 2 3 4 5 ... 17
收藏数 325
精华内容 130
关键字:

value中文翻译