精华内容
下载资源
问答
  • Hadoop map调优参数

    千次阅读 2015-12-26 20:13:20
    参数:io.sort.mb(default 100) 当map task开始运算,并产生中间数据时,其产生的中间结果并非直接就简单的写入磁盘。 而是会利用到了内存buffer来进行已经产生的部分结果的缓存, 并在内存buffer中进行一些预...

    参数:io.sort.mb(default 100)
    当map task开始运算,并产生中间数据时,其产生的中间结果并非直接就简单的写入磁盘。
    而是会利用到了内存buffer来进行已经产生的部分结果的缓存,
    并在内存buffer中进行一些预排序来优化整个map的性能。
    每一个map都会对应存在一个内存buffer,map会将已经产生的部分结果先写入到该buffer中,
    这个buffer默认是100MB大小,
    但是这个大小是可以根据job提交时的参数设定来调整的,
    当map的产生数据非常大时,并且把io.sort.mb调大,
    那么map在整个计算过程中spill的次数就势必会降低,
    map task对磁盘的操作就会变少,
    如果map tasks的瓶颈在磁盘上,这样调整就会大大提高map的计算性能。

    参数:o.sort.spill.percent(default 0.80,也就是80%)
    map在运行过程中,不停的向该buffer中写入已有的计算结果,
    但是该buffer并不一定能将全部的map输出缓存下来,
    当map输出超出一定阈值(比如100M),那么map就必须将该buffer中的数据写入到磁盘中去,
    这个过程在mapreduce中叫做spill。
    map并不是要等到将该buffer全部写满时才进行spill,
    因为如果全部写满了再去写spill,势必会造成map的计算部分等待buffer释放空间的情况。
    所以,map其实是当buffer被写满到一定程度(比如80%)时,就开始进行spill。
    这个阈值也是由一个job的配置参数来控制,
    这个参数同样也是影响spill频繁程度,进而影响map task运行周期对磁盘的读写频率的。
    但非特殊情况下,通常不需要人为的调整。调整io.sort.mb对用户来说更加方便。

    参数:io.sort.factor
    当map task的计算部分全部完成后,如果map有输出,就会生成一个或者多个spill文件,这些文件就是map的输出结果。
    map在正常退出之前,需要将这些spill合并(merge)成一个,所以map在结束之前还有一个merge的过程。
    merge的过程中,有一个参数可以调整这个过程的行为,该参数为:io.sort.factor。
    该参数默认为10。它表示当merge spill文件时,最多能有多少并行的stream向merge文件中写入。
    比如如果map产生的数据非常的大,产生的spill文件大于10,而io.sort.factor使用的是默认的10,
    那么当map计算完成做merge时,就没有办法一次将所有的spill文件merge成一个,而是会分多次,每次最多10个stream。
    这也就是说,当map的中间结果非常大,调大io.sort.factor,
    有利于减少merge次数,进而减少map对磁盘的读写频率,有可能达到优化作业的目的。

    参数:min.num.spill.for.combine(default 3)
    当job指定了combiner的时候,我们都知道map介绍后会在map端根据combiner定义的函数将map结果进行合并。
    运行combiner函数的时机有可能会是merge完成之前,或者之后,这个时机可以由一个参数控制,
    即min.num.spill.for.combine(default 3),当job中设定了combiner,并且spill数最少有3个的时候,
    那么combiner函数就会在merge产生结果文件之前运行。
    通过这样的方式,就可以在spill非常多需要merge,并且很多数据需要做conbine的时候,
    减少写入到磁盘文件的数据数量,同样是为了减少对磁盘的读写频率,有可能达到优化作业的目的。

    参数:mapred.compress.map.output(default false)
    减少中间结果读写进出磁盘的方法不止这些,还有就是压缩。
    也就是说map的中间,无论是spill的时候,还是最后merge产生的结果文件,都是可以压缩的。
    压缩的好处在于,通过压缩减少写入读出磁盘的数据量。
    对中间结果非常大,磁盘速度成为map执行瓶颈的job,尤其有用。
    控制map中间结果是否使用压缩的参数为:mapred.compress.map.output(true/false)。
    将这个参数设置为true时,那么map在写中间结果时,就会将数据压缩后再写入磁盘,读结果时也会采用先解压后读取数据。
    这样做的后果就是:写入磁盘的中间结果数据量会变少,但是cpu会消耗一些用来压缩和解压。
    所以这种方式通常适合job中间结果非常大,瓶颈不在cpu,而是在磁盘的读写的情况。
    说的直白一些就是用cpu换IO。
    根据观察,通常大部分的作业cpu都不是瓶颈,除非运算逻辑异常复杂。所以对中间结果采用压缩通常来说是有收益的。

    参数:mapred.map.output.compression.codec( default org.apache.hadoop.io.compress.DefaultCodec)
    当采用map中间结果压缩的情况下,用户还可以选择压缩时采用哪种压缩格式进行压缩,
    现在hadoop支持的压缩格式有:GzipCodec,LzoCodec,BZip2Codec,LzmaCodec等压缩格式。
    通常来说,想要达到比较平衡的cpu和磁盘压缩比,LzoCodec比较适合。但也要取决于job的具体情况。
    用户若想要自行选择中间结果的压缩算法,
    可以设置配置参数:mapred.map.output.compression.codec=org.apache.hadoop.io.compress.DefaultCodec或者其他用户自行选择的压缩方式

    原文出处:http://sishuok.com/forum/blogPost/list/5396.html

    展开全文
  • hadoop map端的超时参数

    千次阅读 2017-04-11 09:55:24
    目前集群上某台机器卡住导致出现大量的Map端任务FAIL,当定位到具体的机器上时,无法ssh或进去后terminal中无响应,退出的相关信息如下: [hadoop@xxx ~]$ Received disconnect from xxx: Timeout, your session ...

     

     目前集群上某台机器卡住导致出现大量的Map端任务FAIL,当定位到具体的机器上时,无法ssh或进去后terminal中无响应,退出的相关信息如下:

    [hadoop@xxx ~]$ Received disconnect from xxx: Timeout, your session not responding.
    
     
    任务执行失败的错误日志:
    AttemptID:attempt_1413206225298_24177_m_000001_0 Timed out after 1200 secsContainer killed by the ApplicationMaster. Container killed on request. Exit code is 143
    
     
    经过查找后1200s是配置的参数mapreduce.task.timeout,
    关于参数mapreduce.task.timeout的解释:
    The number of milliseconds before a task will be terminated if it neither reads an input, writes an output, nor updates its status string. A value of 0 disables the timeout.
    
     
    通过翻hadoop2.2.0的源代码,类TaskHeartbeatHandler会作为一个独立的线程来运行。它会定期去检查当前所有运行的TaskAttempt,时间间隔为:mapreduce.task.timeout.check-interval-ms(默认30s),
    Map.Entry<TaskAttemptId, ReportTime> entry = iterator.next();
              boolean taskTimedOut = (taskTimeOut > 0) &&
                  (currentTime > (entry.getValue().getLastProgress() + taskTimeOut));
              
              if(taskTimedOut) {
                // task is lost, remove from the list and raise lost event
                iterator.remove();
                eventHandler.handle(new TaskAttemptDiagnosticsUpdateEvent(entry
                    .getKey(), "AttemptID:" + entry.getKey().toString()
                    + " Timed out after " + taskTimeOut / 1000 + " secs"));
                eventHandler.handle(new TaskAttemptEvent(entry.getKey(),
                    TaskAttemptEventType.TA_TIMED_OUT));
              }
     
    如果监测到有一个task_attempt没有在规定的时间间隔内(mapreduce.task.timeout)汇报进度,那么就认为该attempt已经失败,并发送一个TA_TIMED_OUT的Event,通知ApplicationMaster去Kill掉该Attempt。
    Attempt的进度会定期报告给该线程,调用progressing方法:
      
    public void progressing(TaskAttemptId attemptID) {
      //only put for the registered attempts
        //TODO throw an exception if the task isn't registered.
        ReportTime time = runningAttempts.get(attemptID);
        if(time != null) {
          time.setLastProgress(clock.getTime());
        }
      }
     
     
    在TaskAttemptListenerImpl类中会调用报告进度的方法,在任务的不同阶段,都会对任务向ApplicationMaster报告,提交进度信息。更详细的方法这里就不再深入研究。


     
    一般情况下,我们的任务都是在运行过程中出现的这个错误,这就需要我们检查哪些资源的限制导致任务无法进行下去而出现这种问题。
    在Cloudera中有一篇文章教你如何能够避免这个问题:

     Report progress

    If your task reports no progress for 10 minutes (see the mapred.task.timeout property) then it will be killed by Hadoop. Most tasks don’t encounter this situation since they report progress implicitly by reading input and writing output. However, some jobs which don’t process records in this way may fall foul of this behavior and have their tasks killed. Simulations are a good example, since they do a lot of CPU-intensive processing in each map and typically only write the result at the end of the computation. They should be written in such a way as to report progress on a regular basis (more frequently than every 10 minutes). This may be achieved in a number of ways:

    • Call setStatus() on Reporter to set a human-readable description of
      the task’s progress
    • Call incrCounter() on Reporter to increment a user counter
    • Call progress() on Reporter to tell Hadoop that your task is still there (and making progress)

    但是,事情还没完,集群中会不定时地有任务卡死在某个点上导致任务无法继续下去:

     

    "main" prio=10 tid=0x000000000293f000 nid=0x1e06 runnable [0x0000000041b20000]
       java.lang.Thread.State: RUNNABLE
    at sun.nio.ch.EPollArrayWrapper.epollWait(Native Method)
    at sun.nio.ch.EPollArrayWrapper.poll(EPollArrayWrapper.java:228)
    at sun.nio.ch.EPollSelectorImpl.doSelect(EPollSelectorImpl.java:81)
    at sun.nio.ch.SelectorImpl.lockAndDoSelect(SelectorImpl.java:87)
    - locked <0x00000006e243c3f0> (a sun.nio.ch.Util$2)
    - locked <0x00000006e243c3e0> (a java.util.Collections$UnmodifiableSet)
    - locked <0x00000006e243c1a0> (a sun.nio.ch.EPollSelectorImpl)
    at sun.nio.ch.SelectorImpl.select(SelectorImpl.java:98)
    at org.apache.hadoop.net.SocketIOWithTimeout$SelectorPool.select(SocketIOWithTimeout.java:335)
    at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:157)
    at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
    
     

     

     

    读源码分析这个问题,找到SocketIOWithTimeout类中的doIO方法,157行附近,
    /now wait for socket to be ready.
          int count = 0;
          try {
            count = selector.select(channel, ops, timeout); 
          } catch (IOException e) { //unexpected IOException.
            closed = true;
            throw e;
          }
    
          if (count == 0) {
            throw new SocketTimeoutException(timeoutExceptionString(channel,
                                                                    timeout, ops));
          }
    
     
    当经过超时时间,但是却并没有读出任何数据时,抛出错误:
     
    Error: java.net.SocketTimeoutException: 70000 millis timeout while waiting for channel to be ready for read. ch : java.nio.channels.SocketChannel[connected local=xxx remote=/xxx]
        at org.apache.hadoop.net.SocketIOWithTimeout.doIO(SocketIOWithTimeout.java:164)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:161)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:131)
        at org.apache.hadoop.net.SocketInputStream.read(SocketInputStream.java:118)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at java.io.FilterInputStream.read(FilterInputStream.java:83)
        at org.apache.hadoop.hdfs.protocolPB.PBHelper.vintPrefixed(PBHelper.java:1490)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.transfer(DFSOutputStream.java:962)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.addDatanode2ExistingPipeline(DFSOutputStream.java:930)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.setupPipelineForAppendOrRecovery(DFSOutputStream.java:1031)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.processDatanodeError(DFSOutputStream.java:823)
        at org.apache.hadoop.hdfs.DFSOutputStream$DataStreamer.run(DFSOutputStream.java:475)
     
    超时时间通过(dfs.client.socket-timeout)来计算,如果在该时间范围内,没有读到任何的数据,那么就抛出这个异常。
    进入SocketIOTimeout.select方法,发现其中会执行一段轮询:
    while (true) {
              long start = (timeout == 0) ? 0 : Time.now();
              key = channel.register(info.selector, ops);
              ret = info.selector.select(timeout);         
              if (ret != 0) {
                return ret;
              }
             
              /* Sometimes select() returns 0 much before timeout for
               * unknown reasons. So select again if required.
               */
              if (timeout > 0) {
                timeout -= Time.now() - start;
                if (timeout <= 0) {
                  return 0;
                }
              }
             
              if (Thread.currentThread().isInterrupted()) {
                throw new InterruptedIOException("Interruped while waiting for " +
                                                 "IO on channel " + channel +
                                                 ". " + timeout +
                                                 " millis timeout left.");
              }
            }
     
    此时由于是读数据,ops一般就是指SelectionKey.OP_READ,我们设置的timeout不等于0,也就是说会执行一段总时间为timeout的任务,”Sometimes select() returns 0 much before timeout for  * unknown reasons. So select again if required.” 这个注释写的有点含糊,看来NIO有些问题当前都没确定清楚。
     
    我们看一下方法的介绍:
    java.nio.channels.Selector
    public abstract int select(long timeout)
                       throws java.io.IOException
    Selects a set of keys whose corresponding channels are ready for I/O operations.
    This method performs a blocking selection operation. It returns only after at least one channel is selected, this selector's wakeup method is invoked, the current thread is interrupted, or the given timeout period expires, whichever comes first.
     
    Selector选择的方法,仅当下面三个事件之一发生的情况下:
    • 至少一个已经注册的Channel被选择,返回的就是被选择的Channel数量;
    • Selector被中断;
    • 给定的超时时间已到;

     

    如果被中断了,会抛出中断异常,因此当前仅可能是超时时间已到,返回的ret=0,导致抛出上述的异常。

    但是,这也没完,难道超时了不会重试?到底会重试几次?

     

    经过继续分析,发现往下的堆栈中的DFSInputStream调用了readBuffer方法,可以看到retryCurrentNode在第一次失败后,将IOException捕获,会进行必要的重试操作,如果还是发生超时,并且找不到就将其加入黑名单作为失败的DataNode(可能下次不会进行重试?),并转移到另外的DataNode上(执行seekToNewSource方法),经过几次后才会将IOException真正抛出。

     
    try {
            return reader.doRead(blockReader, off, len, readStatistics);
          } catch ( ChecksumException ce ) {
            DFSClient.LOG.warn("Found Checksum error for "
                + getCurrentBlock() + " from " + currentNode
                + " at " + ce.getPos());       
            ioe = ce;
            retryCurrentNode = false;
            // we want to remember which block replicas we have tried
            addIntoCorruptedBlockMap(getCurrentBlock(), currentNode,
                corruptedBlockMap);
          } catch ( IOException e ) {
            if (!retryCurrentNode) {
              DFSClient.LOG.warn("Exception while reading from "
                  + getCurrentBlock() + " of " + src + " from "
                  + currentNode, e);
            }
            ioe = e;
          }
          boolean sourceFound = false;
          if (retryCurrentNode) {
            /* possibly retry the same node so that transient errors don't
             * result in application level failures (e.g. Datanode could have
             * closed the connection because the client is idle for too long).
             */
            sourceFound = seekToBlockSource(pos);
          } else {
            addToDeadNodes(currentNode);
            sourceFound = seekToNewSource(pos);
          }
          if (!sourceFound) {
            throw ioe;
          }
          retryCurrentNode = false;
        }
     
    总之,这部分的问题还是很多,继续研究中...
    • 大小: 133.8 KB
    展开全文
  • hadoop map端reduce端调优参数

    千次阅读 2012-05-21 12:18:20
    map端: io.sort.mb 类型int默认100=》map的内存缓冲区 io.sort.record.precent 类型:float默认0.05=》io.sort.mb的缓存区记录索引kvindices和缓存区记录索引排序工作数组kvoffsets占用空间比例 io.sort.spill....

    map端:

    io.sort.mb    类型int默认100=》map的内存缓冲区

    io.sort.record.precent 类型:float默认0.05=》io.sort.mb的缓存区记录索引kvindices和缓存区记录索引排序工作数组kvoffsets占用空间比例

    io.sort.spill.percent  类型:float默认0.8=》io.sort.mb的缓冲数据边界阙值

    io.sort.factor 类型int默认10=》每次合并文件数

    min.mum.spills.for.combine类型int默认3=》运行combiner需要的最少溢出文件数

    mapred.compress.map.output类型boolean默认false=》是否压缩map输出

    mapred.map.output.compression.coderc类型classname默认DefaultCodec=》map输出的压缩编码器

    tasktracker.http.threads类型int默认40=》每个tasktracker的工作线程数,将map输出到reduce#只能全局设定


    reduce端:

    mapred.reduce.parallel.copies类型int默认5=》复制map输出数据的线程数

    mapred.reduce.copy.backoff类型int默认300=》获取一个map数据的最大时间

    io.sort.factor

    mapred.job.shuffer.input.buffer.percent类型float默认0.70=>shuffer的复制阶段,分配给map输出的缓冲区的比例

    mapred.job.shuffer.merge.percent类型float默认0.66=》mapred.job.shuffer.input.buffer.percent的阙值

    mapred.inmem.merge.threshold类型int默认1000=》mapred.job.shuffer.input.buffer.percent的文件数阙值

    mapred.job.reduce.input.buffer.percent类型float默认0.0=》reduce过程中在内存中保存map输出的比例

    展开全文
  • Hadoop Map/Reduce教程

    千次阅读 2012-11-27 14:24:46
    这篇教程从用户的角度出发,全面地介绍了Hadoop Map/Reduce框架的各个方面。 先决条件 请先确认Hadoop被正确安装、配置和正常运行中。更多信息见: Hadoop快速入门对初次使用者。Hadoop集群搭建对大...

    目的

    这篇教程从用户的角度出发,全面地介绍了Hadoop Map/Reduce框架的各个方面。

    先决条件

    请先确认Hadoop被正确安装、配置和正常运行中。更多信息见:

    概述

    Hadoop Map/Reduce是一个使用简易的软件框架,基于它写出来的应用程序能够运行在由上千个商用机器组成的大型集群上,并以一种可靠容错的方式并行处理上T级别的数据集。

    一个Map/Reduce 作业(job) 通常会把输入的数据集切分为若干独立的数据块,由 map任务(task)以完全并行的方式处理它们。框架会对map的输出先进行排序, 然后把结果输入给reduce任务。通常作业的输入和输出都会被存储在文件系统中。 整个框架负责任务的调度和监控,以及重新执行已经失败的任务。

    通常,Map/Reduce框架和分布式文件系统是运行在一组相同的节点上的,也就是说,计算节点和存储节点通常在一起。这种配置允许框架在那些已经存好数据的节点上高效地调度任务,这可以使整个集群的网络带宽被非常高效地利用。

    Map/Reduce框架由一个单独的master JobTracker 和每个集群节点一个slave TaskTracker共同组成。master负责调度构成一个作业的所有任务,这些任务分布在不同的slave上,master监控它们的执行,重新执行已经失败的任务。而slave仅负责执行由master指派的任务。

    应用程序至少应该指明输入/输出的位置(路径),并通过实现合适的接口或抽象类提供map和reduce函数。再加上其他作业的参数,就构成了作业配置(job configuration)。然后,Hadoop的 job client提交作业(jar包/可执行程序等)和配置信息给JobTracker,后者负责分发这些软件和配置信息给slave、调度任务并监控它们的执行,同时提供状态和诊断信息给job-client。

    虽然Hadoop框架是用JavaTM实现的,但Map/Reduce应用程序则不一定要用 Java来写 。

    • Hadoop Streaming是一种运行作业的实用工具,它允许用户创建和运行任何可执行程序 (例如:Shell工具)来做为mapper和reducer。
    • Hadoop Pipes是一个与SWIG兼容的C++ API (没有基于JNITM技术),它也可用于实现Map/Reduce应用程序。

    输入与输出

    Map/Reduce框架运转在<key, value> 键值对上,也就是说, 框架把作业的输入看为是一组<key, value> 键值对,同样也产出一组 <key, value> 键值对做为作业的输出,这两组键值对的类型可能不同。

    框架需要对keyvalue的类(classes)进行序列化操作, 因此,这些类需要实现 Writable接口。 另外,为了方便框架执行排序操作,key类必须实现 WritableComparable接口。

    一个Map/Reduce 作业的输入和输出类型如下所示:

    (input) <k1, v1> -> map -> <k2, v2> -> combine -> <k2, v2> -> reduce -> <k3, v3> (output)

    例子:WordCount v1.0

    在深入细节之前,让我们先看一个Map/Reduce的应用示例,以便对它们的工作方式有一个初步的认识。

    WordCount是一个简单的应用,它可以计算出指定数据集中每一个单词出现的次数。

    这个应用适用于 单机模式, 伪分布式模式 或 完全分布式模式 三种Hadoop安装方式。

    源代码

      WordCount.java
    1. package org.myorg;
    2.  
    3. import java.io.IOException;
    4. import java.util.*;
    5.  
    6. import org.apache.hadoop.fs.Path;
    7. import org.apache.hadoop.conf.*;
    8. import org.apache.hadoop.io.*;
    9. import org.apache.hadoop.mapred.*;
    10. import org.apache.hadoop.util.*;
    11.  
    12. public class WordCount {
    13.  
    14.    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    15.      private final static IntWritable one = new IntWritable(1);
    16.      private Text word = new Text();
    17.  
    18.      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    19.        String line = value.toString();
    20.        StringTokenizer tokenizer = new StringTokenizer(line);
    21.        while (tokenizer.hasMoreTokens()) {
    22.          word.set(tokenizer.nextToken());
    23.          output.collect(word, one);
    24.        }
    25.      }
    26.    }
    27.  
    28.    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    29.      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    30.        int sum = 0;
    31.        while (values.hasNext()) {
    32.          sum += values.next().get();
    33.        }
    34.        output.collect(key, new IntWritable(sum));
    35.      }
    36.    }
    37.  
    38.    public static void main(String[] args) throws Exception {
    39.      JobConf conf = new JobConf(WordCount.class);
    40.      conf.setJobName("wordcount");
    41.  
    42.      conf.setOutputKeyClass(Text.class);
    43.      conf.setOutputValueClass(IntWritable.class);
    44.  
    45.      conf.setMapperClass(Map.class);
    46.      conf.setCombinerClass(Reduce.class);
    47.      conf.setReducerClass(Reduce.class);
    48.  
    49.      conf.setInputFormat(TextInputFormat.class);
    50.      conf.setOutputFormat(TextOutputFormat.class);
    51.  
    52.      FileInputFormat.setInputPaths(conf, new Path(args[0]));
    53.      FileOutputFormat.setOutputPath(conf, new Path(args[1]));
    54.  
    55.      JobClient.runJob(conf);
    57.    }
    58. }
    59.  

    用法

    假设环境变量HADOOP_HOME对应安装时的根目录,HADOOP_VERSION对应Hadoop的当前安装版本,编译WordCount.java来创建jar包,可如下操作:

    $ mkdir wordcount_classes 
    $ javac -classpath ${HADOOP_HOME}/hadoop-${HADOOP_VERSION}-core.jar -d wordcount_classes WordCount.java 
    $ jar -cvf /usr/joe/wordcount.jar -C wordcount_classes/ .

    假设:

    • /usr/joe/wordcount/input - 是HDFS中的输入路径
    • /usr/joe/wordcount/output - 是HDFS中的输出路径

    用示例文本文件做为输入:

    $ bin/hadoop dfs -ls /usr/joe/wordcount/input/ 
    /usr/joe/wordcount/input/file01 
    /usr/joe/wordcount/input/file02 

    $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01 
    Hello World Bye World 

    $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02 
    Hello Hadoop Goodbye Hadoop

    运行应用程序:

    $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output

    输出是:

    $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 
    Bye 1 
    Goodbye 1 
    Hadoop 2 
    Hello 2 
    World 2 

    应用程序能够使用-files选项来指定一个由逗号分隔的路径列表,这些路径是task的当前工作目录。使用选项-libjars可以向map和reduce的classpath中添加jar包。使用-archives选项程序可以传递档案文件做为参数,这些档案文件会被解压并且在task的当前工作目录下会创建一个指向解压生成的目录的符号链接(以压缩包的名字命名)。 有关命令行选项的更多细节请参考 Commands manual

    使用-libjars-files运行wordcount例子:
    hadoop jar hadoop-examples.jar wordcount -files cachefile.txt -libjars mylib.jar input output

    解释

    WordCount应用程序非常直截了当。

    Mapper(14-26行)中的map方法(18-25行)通过指定的 TextInputFormat(49行)一次处理一行。然后,它通过StringTokenizer 以空格为分隔符将一行切分为若干tokens,之后,输出< <word>, 1> 形式的键值对。

    对于示例中的第一个输入,map输出是:
    < Hello, 1> 
    < World, 1> 
    < Bye, 1> 
    < World, 1> 

    第二个输入,map输出是:
    < Hello, 1> 
    < Hadoop, 1> 
    < Goodbye, 1> 
    < Hadoop, 1> 

    关于组成一个指定作业的map数目的确定,以及如何以更精细的方式去控制这些map,我们将在教程的后续部分学习到更多的内容。

    WordCount还指定了一个combiner (46行)。因此,每次map运行之后,会对输出按照key进行排序,然后把输出传递给本地的combiner(按照作业的配置与Reducer一样),进行本地聚合。

    第一个map的输出是:
    < Bye, 1> 
    < Hello, 1> 
    < World, 2> 

    第二个map的输出是:
    < Goodbye, 1> 
    < Hadoop, 2> 
    < Hello, 1> 

    Reducer(28-36行)中的reduce方法(29-35行) 仅是将每个key(本例中就是单词)出现的次数求和。

    因此这个作业的输出就是:
    < Bye, 1> 
    < Goodbye, 1> 
    < Hadoop, 2> 
    < Hello, 2> 
    < World, 2> 

    代码中的run方法中指定了作业的几个方面, 例如:通过命令行传递过来的输入/输出路径、key/value的类型、输入/输出的格式等等JobConf中的配置信息。随后程序调用了JobClient.runJob(55行)来提交作业并且监控它的执行。

    我们将在本教程的后续部分学习更多的关于JobConf, JobClient, Tool和其他接口及类(class)。

    Map/Reduce - 用户界面

    这部分文档为用户将会面临的Map/Reduce框架中的各个环节提供了适当的细节。这应该会帮助用户更细粒度地去实现、配置和调优作业。然而,请注意每个类/接口的javadoc文档提供最全面的文档;本文只是想起到指南的作用。

    我们会先看看MapperReducer接口。应用程序通常会通过提供mapreduce方法来实现它们。

    然后,我们会讨论其他的核心接口,其中包括: JobConfJobClientPartitioner, OutputCollectorReporter, InputFormatOutputFormat等等。

    最后,我们将通过讨论框架中一些有用的功能点(例如:DistributedCache, IsolationRunner等等)来收尾。

    核心功能描述

    应用程序通常会通过提供mapreduce来实现 MapperReducer接口,它们组成作业的核心。

    Mapper

    Mapper将输入键值对(key/value pair)映射到一组中间格式的键值对集合。

    Map是一类将输入记录集转换为中间格式记录集的独立任务。 这种转换的中间格式记录集不需要与输入记录集的类型一致。一个给定的输入键值对可以映射成0个或多个输出键值对。

    Hadoop Map/Reduce框架为每一个InputSplit产生一个map任务,而每个InputSplit是由该作业的InputFormat产生的。

    概括地说,对Mapper的实现者需要重写 JobConfigurable.configure(JobConf)方法,这个方法需要传递一个JobConf参数,目的是完成Mapper的初始化工作。然后,框架为这个任务的InputSplit中每个键值对调用一次 map(WritableComparable, Writable, OutputCollector, Reporter)操作。应用程序可以通过重写Closeable.close()方法来执行相应的清理工作。

    输出键值对不需要与输入键值对的类型一致。一个给定的输入键值对可以映射成0个或多个输出键值对。通过调用 OutputCollector.collect(WritableComparable,Writable)可以收集输出的键值对。

    应用程序可以使用Reporter报告进度,设定应用级别的状态消息,更新Counters(计数器),或者仅是表明自己运行正常。

    框架随后会把与一个特定key关联的所有中间过程的值(value)分成组,然后把它们传给Reducer以产出最终的结果。用户可以通过 JobConf.setOutputKeyComparatorClass(Class)来指定具体负责分组的Comparator

    Mapper的输出被排序后,就被划分给每个Reducer。分块的总数目和一个作业的reduce任务的数目是一样的。用户可以通过实现自定义的 Partitioner来控制哪个key被分配给哪个 Reducer

    用户可选择通过 JobConf.setCombinerClass(Class)指定一个combiner,它负责对中间过程的输出进行本地的聚集,这会有助于降低从Mapper到 Reducer数据传输量。

    这些被排好序的中间过程的输出结果保存的格式是(key-len, key, value-len, value),应用程序可以通过JobConf控制对这些中间结果是否进行压缩以及怎么压缩,使用哪种 CompressionCodec

    需要多少个Map?

    Map的数目通常是由输入数据的大小决定的,一般就是所有输入文件的总块(block)数。

    Map正常的并行规模大致是每个节点(node)大约10到100个map,对于CPU 消耗较小的map任务可以设到300个左右。由于每个任务初始化需要一定的时间,因此,比较合理的情况是map执行的时间至少超过1分钟。

    这样,如果你输入10TB的数据,每个块(block)的大小是128MB,你将需要大约82,000个map来完成任务,除非使用 setNumMapTasks(int)(注意:这里仅仅是对框架进行了一个提示(hint),实际决定因素见这里)将这个数值设置得更高。

    Reducer

    Reducer将与一个key关联的一组中间数值集归约(reduce)为一个更小的数值集。

    用户可以通过 JobConf.setNumReduceTasks(int)设定一个作业中reduce任务的数目。

    概括地说,对Reducer的实现者需要重写 JobConfigurable.configure(JobConf)方法,这个方法需要传递一个JobConf参数,目的是完成Reducer的初始化工作。然后,框架为成组的输入数据中的每个<key, (list of values)>对调用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。之后,应用程序可以通过重写Closeable.close()来执行相应的清理工作。

    Reducer有3个主要阶段:shuffle、sort和reduce。

    Shuffle

    Reducer的输入就是Mapper已经排好序的输出。在这个阶段,框架通过HTTP为每个Reducer获得所有Mapper输出中与之相关的分块。

    Sort

    这个阶段,框架将按照key的值对Reducer的输入进行分组 (因为不同mapper的输出中可能会有相同的key)。

    Shuffle和Sort两个阶段是同时进行的;map的输出也是一边被取回一边被合并的。

    Secondary Sort

    如果需要中间过程对key的分组规则和reduce前对key的分组规则不同,那么可以通过 JobConf.setOutputValueGroupingComparator(Class)来指定一个Comparator。再加上JobConf.setOutputKeyComparatorClass(Class)可用于控制中间过程的key如何被分组,所以结合两者可以实现按值的二次排序

    Reduce

    在这个阶段,框架为已分组的输入数据中的每个 <key, (list of values)>对调用一次 reduce(WritableComparable, Iterator, OutputCollector, Reporter)方法。

    Reduce任务的输出通常是通过调用 OutputCollector.collect(WritableComparable, Writable)写入 文件系统的。

    应用程序可以使用Reporter报告进度,设定应用程序级别的状态消息,更新Counters(计数器),或者仅是表明自己运行正常。

    Reducer的输出是没有排序的

    需要多少个Reduce?

    Reduce的数目建议是0.951.75乘以 (<no. of nodes> * mapred.tasktracker.reduce.tasks.maximum)。

    用0.95,所有reduce可以在maps一完成时就立刻启动,开始传输map的输出结果。用1.75,速度快的节点可以在完成第一轮reduce任务后,可以开始第二轮,这样可以得到比较好的负载均衡的效果。

    增加reduce的数目会增加整个框架的开销,但可以改善负载均衡,降低由于执行失败带来的负面影响。

    上述比例因子比整体数目稍小一些是为了给框架中的推测性任务(speculative-tasks) 或失败的任务预留一些reduce的资源。

    无Reducer

    如果没有归约要进行,那么设置reduce任务的数目为是合法的。

    这种情况下,map任务的输出会直接被写入由 setOutputPath(Path)指定的输出路径。框架在把它们写入FileSystem之前没有对它们进行排序。

    Partitioner

    Partitioner用于划分键值空间(key space)。

    Partitioner负责控制map输出结果key的分割。Key(或者一个key子集)被用于产生分区,通常使用的是Hash函数。分区的数目与一个作业的reduce任务的数目是一样的。因此,它控制将中间过程的key(也就是这条记录)应该发送给m个reduce任务中的哪一个来进行reduce操作。

    HashPartitioner是默认的 Partitioner

    Reporter

    Reporter是用于Map/Reduce应用程序报告进度,设定应用级别的状态消息, 更新Counters(计数器)的机制。

    MapperReducer的实现可以利用Reporter 来报告进度,或者仅是表明自己运行正常。在那种应用程序需要花很长时间处理个别键值对的场景中,这种机制是很关键的,因为框架可能会以为这个任务超时了,从而将它强行杀死。另一个避免这种情况发生的方式是,将配置参数mapred.task.timeout设置为一个足够高的值(或者干脆设置为零,则没有超时限制了)。

    应用程序可以用Reporter来更新Counter(计数器)。

    OutputCollector

    OutputCollector是一个Map/Reduce框架提供的用于收集 MapperReducer输出数据的通用机制 (包括中间输出结果和作业的输出结果)。

    Hadoop Map/Reduce框架附带了一个包含许多实用型的mapper、reducer和partitioner 的类库

    作业配置

    JobConf代表一个Map/Reduce作业的配置。

    JobConf是用户向Hadoop框架描述一个Map/Reduce作业如何执行的主要接口。框架会按照JobConf描述的信息忠实地去尝试完成这个作业,然而:

    • 一些参数可能会被管理者标记为 final,这意味它们不能被更改。
    • 一些作业的参数可以被直截了当地进行设置(例如: setNumReduceTasks(int)),而另一些参数则与框架或者作业的其他参数之间微妙地相互影响,并且设置起来比较复杂(例如:setNumMapTasks(int))。

    通常,JobConf会指明Mapper、Combiner(如果有的话)、 PartitionerReducerInputFormat和 OutputFormat的具体实现。JobConf还能指定一组输入文件 (setInputPaths(JobConf, Path...)/addInputPath(JobConf, Path)) 和(setInputPaths(JobConf, String) /addInputPaths(JobConf, String)) 以及输出文件应该写在哪儿 (setOutputPath(Path))。

    JobConf可选择地对作业设置一些高级选项,例如:设置Comparator; 放到DistributedCache上的文件;中间结果或者作业输出结果是否需要压缩以及怎么压缩; 利用用户提供的脚本(setMapDebugScript(String)/setReduceDebugScript(String)) 进行调试;作业是否允许预防性(speculative)任务的执行 (setMapSpeculativeExecution(boolean))/(setReduceSpeculativeExecution(boolean)) ;每个任务最大的尝试次数 (setMaxMapAttempts(int)/setMaxReduceAttempts(int)) ;一个作业能容忍的任务失败的百分比 (setMaxMapTaskFailuresPercent(int)/setMaxReduceTaskFailuresPercent(int)) ;等等。

    当然,用户能使用 set(String, String)/get(String, String) 来设置或者取得应用程序需要的任意参数。然而,DistributedCache的使用是面向大规模只读数据的。

    任务的执行和环境

    TaskTracker是在一个单独的jvm上以子进程的形式执行 Mapper/Reducer任务(Task)的。

    子任务会继承父TaskTracker的环境。用户可以通过JobConf中的 mapred.child.java.opts配置参数来设定子jvm上的附加选项,例如: 通过-Djava.library.path=<> 将一个非标准路径设为运行时的链接用以搜索共享库,等等。如果mapred.child.java.opts包含一个符号@taskid@, 它会被替换成map/reduce的taskid的值。

    下面是一个包含多个参数和替换的例子,其中包括:记录jvm GC日志; JVM JMX代理程序以无密码的方式启动,这样它就能连接到jconsole上,从而可以查看子进程的内存和线程,得到线程的dump;还把子jvm的最大堆尺寸设置为512MB, 并为子jvm的java.library.path添加了一个附加路径。

    <property> 
      <name>mapred.child.java.opts</name> 
      <value> 
         -Xmx512M -Djava.library.path=/home/mycompany/lib -verbose:gc -Xloggc:/tmp/@taskid@.gc 
         -Dcom.sun.management.jmxremote.authenticate=false -Dcom.sun.management.jmxremote.ssl=false 
      </value> 
    </property>

    用户或管理员也可以使用mapred.child.ulimit设定运行的子任务的最大虚拟内存。mapred.child.ulimit的值以(KB)为单位,并且必须大于或等于-Xmx参数传给JavaVM的值,否则VM会无法启动。

    注意:mapred.child.java.opts只用于设置task tracker启动的子任务。为守护进程设置内存选项请查看 cluster_setup.html

    ${mapred.local.dir}/taskTracker/是task tracker的本地目录, 用于创建本地缓存和job。它可以指定多个目录(跨越多个磁盘),文件会半随机的保存到本地路径下的某个目录。当job启动时,task tracker根据配置文档创建本地job目录,目录结构如以下所示:

    • ${mapred.local.dir}/taskTracker/archive/ :分布式缓存。这个目录保存本地的分布式缓存。因此本地分布式缓存是在所有task和job间共享的。
    • ${mapred.local.dir}/taskTracker/jobcache/$jobid/ : 本地job目录。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/work/: job指定的共享目录。各个任务可以使用这个空间做为暂存空间,用于它们之间共享文件。这个目录通过job.local.dir 参数暴露给用户。这个路径可以通过API JobConf.getJobLocalDir()来访问。它也可以被做为系统属性获得。因此,用户(比如运行streaming)可以调用System.getProperty("job.local.dir")获得该目录。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/jars/: 存放jar包的路径,用于存放作业的jar文件和展开的jar。job.jar是应用程序的jar文件,它会被自动分发到各台机器,在task启动前会被自动展开。使用api JobConf.getJar() 函数可以得到job.jar的位置。使用JobConf.getJar().getParent()可以访问存放展开的jar包的目录。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/job.xml: 一个job.xml文件,本地的通用的作业配置文件。
      • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid: 每个任务有一个目录task-id,它里面有如下的目录结构:
        • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/job.xml: 一个job.xml文件,本地化的任务作业配置文件。任务本地化是指为该task设定特定的属性值。这些值会在下面具体说明。
        • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/output 一个存放中间过程的输出文件的目录。它保存了由framwork产生的临时map reduce数据,比如map的输出文件等。
        • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work: task的当前工作目录。
        • ${mapred.local.dir}/taskTracker/jobcache/$jobid/$taskid/work/tmp: task的临时目录。(用户可以设定属性mapred.child.tmp 来为map和reduce task设定临时目录。缺省值是./tmp。如果这个值不是绝对路径, 它会把task的工作路径加到该路径前面作为task的临时文件路径。如果这个值是绝对路径则直接使用这个值。 如果指定的目录不存在,会自动创建该目录。之后,按照选项 -Djava.io.tmpdir='临时文件的绝对路径'执行java子任务。 pipes和streaming的临时文件路径是通过环境变量TMPDIR='the absolute path of the tmp dir'设定的)。 如果mapred.child.tmp./tmp值,这个目录会被创建。

    下面的属性是为每个task执行时使用的本地参数,它们保存在本地化的任务作业配置文件里:

    名称 类型 描述
    mapred.job.id String job id
    mapred.jar String job目录下job.jar的位置
    job.local.dir String job指定的共享存储空间
    mapred.tip.id String task id
    mapred.task.id String task尝试id
    mapred.task.is.map boolean 是否是map task
    mapred.task.partition int task在job中的id
    map.input.file String map读取的文件名
    map.input.start long map输入的数据块的起始位置偏移
    map.input.length long map输入的数据块的字节数
    mapred.work.output.dir String task临时输出目录

    task的标准输出和错误输出流会被读到TaskTracker中,并且记录到 ${HADOOP_LOG_DIR}/userlogs

    DistributedCache 可用于map或reduce task中分发jar包和本地库。子jvm总是把 当前工作目录 加到 java.library.path 和 LD_LIBRARY_PATH。 因此,可以通过 System.loadLibrarySystem.load装载缓存的库。有关使用分布式缓存加载共享库的细节请参考 native_libraries.html

    作业的提交与监控

    JobClient是用户提交的作业与JobTracker交互的主要接口。

    JobClient 提供提交作业,追踪进程,访问子任务的日志记录,获得Map/Reduce集群状态信息等功能。

    作业提交过程包括:

    1. 检查作业输入输出样式细节
    2. 为作业计算InputSplit值。
    3. 如果需要的话,为作业的DistributedCache建立必须的统计信息。
    4. 拷贝作业的jar包和配置文件到FileSystem上的Map/Reduce系统目录下。
    5. 提交作业到JobTracker并且监控它的状态。

    作业的历史文件记录到指定目录的"_logs/history/"子目录下。这个指定目录由hadoop.job.history.user.location设定,默认是作业输出的目录。因此默认情况下,文件会存放在mapred.output.dir/_logs/history目录下。用户可以设置hadoop.job.history.user.locationnone来停止日志记录。

    用户使用下面的命令可以看到在指定目录下的历史日志记录的摘要。 
    $ bin/hadoop job -history output-dir 
    这个命令会打印出作业的细节,以及失败的和被杀死的任务细节。
    要查看有关作业的更多细节例如成功的任务、每个任务尝试的次数(task attempt)等,可以使用下面的命令 
    $ bin/hadoop job -history all output-dir 

    用户可以使用 OutputLogFilter 从输出目录列表中筛选日志文件。

    一般情况,用户利用JobConf创建应用程序并配置作业属性, 然后用 JobClient 提交作业并监视它的进程。

    作业的控制

    有时候,用一个单独的Map/Reduce作业并不能完成一个复杂的任务,用户也许要链接多个Map/Reduce作业才行。这是容易实现的,因为作业通常输出到分布式文件系统上的,所以可以把这个作业的输出作为下一个作业的输入实现串联。

    然而,这也意味着,确保每一作业完成(成功或失败)的责任就直接落在了客户身上。在这种情况下,可以用的控制作业的选项有:

    作业的输入

    InputFormat 为Map/Reduce作业描述输入的细节规范。

    Map/Reduce框架根据作业的InputFormat来:

    1. 检查作业输入的有效性。
    2. 把输入文件切分成多个逻辑InputSplit实例, 并把每一实例分别分发给一个 Mapper
    3. 提供RecordReader的实现,这个RecordReader从逻辑InputSplit中获得输入记录, 这些记录将由Mapper处理。

    基于文件的InputFormat实现(通常是 FileInputFormat的子类) 默认行为是按照输入文件的字节大小,把输入数据切分成逻辑分块(logical InputSplit )。 其中输入文件所在的FileSystem的数据块尺寸是分块大小的上限。下限可以设置mapred.min.split.size 的值。

    考虑到边界情况,对于很多应用程序来说,很明显按照文件大小进行逻辑分割是不能满足需求的。 在这种情况下,应用程序需要实现一个RecordReader来处理记录的边界并为每个任务提供一个逻辑分块的面向记录的视图。

    TextInputFormat 是默认的InputFormat

    如果一个作业的InputformatTextInputFormat, 并且框架检测到输入文件的后缀是.gz.lzo,就会使用对应的CompressionCodec自动解压缩这些文件。 但是需要注意,上述带后缀的压缩文件不会被切分,并且整个压缩文件会分给一个mapper来处理。

    InputSplit

    InputSplit 是一个单独的Mapper要处理的数据块。

    一般的InputSplit 是字节样式输入,然后由RecordReader处理并转化成记录样式。

    FileSplit 是默认的InputSplit。 它把 map.input.file 设定为输入文件的路径,输入文件是逻辑分块文件。

    RecordReader

    RecordReader 从InputSlit读入<key, value>对。

    一般的,RecordReader 把由InputSplit 提供的字节样式的输入文件,转化成由Mapper处理的记录样式的文件。 因此RecordReader负责处理记录的边界情况和把数据表示成keys/values对形式。

    作业的输出

    OutputFormat 描述Map/Reduce作业的输出样式。

    Map/Reduce框架根据作业的OutputFormat来:

    1. 检验作业的输出,例如检查输出路径是否已经存在。
    2. 提供一个RecordWriter的实现,用来输出作业结果。 输出文件保存在FileSystem上。

    TextOutputFormat是默认的 OutputFormat

    任务的Side-Effect File

    在一些应用程序中,子任务需要产生一些side-file,这些文件与作业实际输出结果的文件不同。

    在这种情况下,同一个Mapper或者Reducer的两个实例(比如预防性任务)同时打开或者写 FileSystem上的同一文件就会产生冲突。因此应用程序在写文件的时候需要为每次任务尝试(不仅仅是每次任务,每个任务可以尝试执行很多次)选取一个独一无二的文件名(使用attemptid,例如task_200709221812_0001_m_000000_0)。

    为了避免冲突,Map/Reduce框架为每次尝试执行任务都建立和维护一个特殊的 ${mapred.output.dir}/_temporary/_${taskid}子目录,这个目录位于本次尝试执行任务输出结果所在的FileSystem上,可以通过 ${mapred.work.output.dir}来访问这个子目录。 对于成功完成的任务尝试,只有${mapred.output.dir}/_temporary/_${taskid}下的文件会移动${mapred.output.dir}。当然,框架会丢弃那些失败的任务尝试的子目录。这种处理过程对于应用程序来说是完全透明的。

    在任务执行期间,应用程序在写文件时可以利用这个特性,比如 通过 FileOutputFormat.getWorkOutputPath()获得${mapred.work.output.dir}目录, 并在其下创建任意任务执行时所需的side-file,框架在任务尝试成功时会马上移动这些文件,因此不需要在程序内为每次任务尝试选取一个独一无二的名字。

    注意:在每次任务尝试执行期间,${mapred.work.output.dir} 的值实际上是 ${mapred.output.dir}/_temporary/_{$taskid},这个值是Map/Reduce框架创建的。 所以使用这个特性的方法是,在 FileOutputFormat.getWorkOutputPath() 路径下创建side-file即可。

    对于只使用map不使用reduce的作业,这个结论也成立。这种情况下,map的输出结果直接生成到HDFS上。

    RecordWriter

    RecordWriter 生成<key, value> 对到输出文件。

    RecordWriter的实现把作业的输出结果写到 FileSystem

    其他有用的特性

    Counters

    Counters 是多个由Map/Reduce框架或者应用程序定义的全局计数器。 每一个Counter可以是任何一种 Enum类型。同一特定Enum类型的Counter可以汇集到一个组,其类型为Counters.Group

    应用程序可以定义任意(Enum类型)的Counters并且可以通过 map 或者 reduce方法中的 Reporter.incrCounter(Enum, long)或者 Reporter.incrCounter(String, String, long) 更新。之后框架会汇总这些全局counters。

    DistributedCache

    DistributedCache 可将具体应用相关的、大尺寸的、只读的文件有效地分布放置。

    DistributedCache 是Map/Reduce框架提供的功能,能够缓存应用程序所需的文件 (包括文本,档案文件,jar文件等)。

    应用程序在JobConf中通过url(hdfs://)指定需要被缓存的文件。 DistributedCache假定由hdfs://格式url指定的文件已经在 FileSystem上了。

    Map-Redcue框架在作业所有任务执行之前会把必要的文件拷贝到slave节点上。 它运行高效是因为每个作业的文件只拷贝一次并且为那些没有文档的slave节点缓存文档。

    DistributedCache 根据缓存文档修改的时间戳进行追踪。 在作业执行期间,当前应用程序或者外部程序不能修改缓存文件。

    distributedCache可以分发简单的只读数据或文本文件,也可以分发复杂类型的文件例如归档文件和jar文件。归档文件(zip,tar,tgz和tar.gz文件)在slave节点上会被解档(un-archived)。 这些文件可以设置执行权限

    用户可以通过设置mapred.cache.{files|archives}来分发文件。 如果要分发多个文件,可以使用逗号分隔文件所在路径。也可以利用API来设置该属性: DistributedCache.addCacheFile(URI,conf)/DistributedCache.addCacheArchive(URI,conf) and DistributedCache.setCacheFiles(URIs,conf)DistributedCache.setCacheArchives(URIs,conf) 其中URI的形式是hdfs://host:port/absolute-path#link-name 在Streaming程序中,可以通过命令行选项 -cacheFile/-cacheArchive 分发文件。

    用户可以通过 DistributedCache.createSymlink(Configuration)方法让DistributedCache 在当前工作目录下创建到缓存文件的符号链接。 或者通过设置配置文件属性mapred.create.symlinkyes。 分布式缓存会截取URI的片段作为链接的名字。 例如,URI是 hdfs://namenode:port/lib.so.1#lib.so, 则在task当前工作目录会有名为lib.so的链接, 它会链接分布式缓存中的lib.so.1

    DistributedCache可在map/reduce任务中作为 一种基础软件分发机制使用。它可以被用于分发jar包和本地库(native libraries)。 DistributedCache.addArchiveToClassPath(Path, Configuration)DistributedCache.addFileToClassPath(Path, Configuration) API能够被用于 缓存文件和jar包,并把它们加入子jvm的classpath。也可以通过设置配置文档里的属性 mapred.job.classpath.{files|archives}达到相同的效果。缓存文件可用于分发和装载本地库。

    Tool

    Tool 接口支持处理常用的Hadoop命令行选项。

    Tool 是Map/Reduce工具或应用的标准。应用程序应只处理其定制参数, 要把标准命令行选项通过 ToolRunner.run(Tool, String[]) 委托给 GenericOptionsParser处理。

    Hadoop命令行的常用选项有:
    -conf <configuration file> 
    -D <property=value> 
    -fs <local|namenode:port> 
    -jt <local|jobtracker:port>

    IsolationRunner

    IsolationRunner 是帮助调试Map/Reduce程序的工具。

    使用IsolationRunner的方法是,首先设置 keep.failed.tasks.files属性为true (同时参考keep.tasks.files.pattern)。

    然后,登录到任务运行失败的节点上,进入 TaskTracker的本地路径运行 IsolationRunner
    $ cd <local path>/taskTracker/${taskid}/work 
    $ bin/hadoop org.apache.hadoop.mapred.IsolationRunner ../job.xml

    IsolationRunner会把失败的任务放在单独的一个能够调试的jvm上运行,并且采用和之前完全一样的输入数据。

    Profiling

    Profiling是一个工具,它使用内置的java profiler工具进行分析获得(2-3个)map或reduce样例运行分析报告。

    用户可以通过设置属性mapred.task.profile指定系统是否采集profiler信息。 利用api JobConf.setProfileEnabled(boolean)可以修改属性值。如果设为true, 则开启profiling功能。profiler信息保存在用户日志目录下。缺省情况,profiling功能是关闭的。

    如果用户设定使用profiling功能,可以使用配置文档里的属性 mapred.task.profile.{maps|reduces} 设置要profile map/reduce task的范围。设置该属性值的api是JobConf.setProfileTaskRange(boolean,String)。 范围的缺省值是0-2

    用户可以通过设定配置文档里的属性mapred.task.profile.params 来指定profiler配置参数。修改属性要使用api JobConf.setProfileParams(String)。当运行task时,如果字符串包含%s。 它会被替换成profileing的输出文件名。这些参数会在命令行里传递到子JVM中。缺省的profiling 参数是 -agentlib:hprof=cpu=samples,heap=sites,force=n,thread=y,verbose=n,file=%s

    调试

    Map/Reduce框架能够运行用户提供的用于调试的脚本程序。 当map/reduce任务失败时,用户可以通过运行脚本在任务日志(例如任务的标准输出、标准错误、系统日志以及作业配置文件)上做后续处理工作。用户提供的调试脚本程序的标准输出和标准错误会输出为诊断文件。如果需要的话这些输出结果也可以打印在用户界面上。

    在接下来的章节,我们讨论如何与作业一起提交调试脚本。为了提交调试脚本, 首先要把这个脚本分发出去,而且还要在配置文件里设置。

    如何分发脚本文件:

    用户要用 DistributedCache 机制来分发链接脚本文件

    如何提交脚本:

    一个快速提交调试脚本的方法是分别为需要调试的map任务和reduce任务设置 "mapred.map.task.debug.script" 和 "mapred.reduce.task.debug.script" 属性的值。这些属性也可以通过JobConf.setMapDebugScript(String) 和 JobConf.setReduceDebugScript(String) API来设置。对于streaming, 可以分别为需要调试的map任务和reduce任务使用命令行选项-mapdebug 和 -reducedegug来提交调试脚本。

    脚本的参数是任务的标准输出、标准错误、系统日志以及作业配置文件。在运行map/reduce失败的节点上运行调试命令是: 
    $script $stdout $stderr $syslog $jobconf

    Pipes 程序根据第五个参数获得c++程序名。 因此调试pipes程序的命令是
    $script $stdout $stderr $syslog $jobconf $program

    默认行为

    对于pipes,默认的脚本会用gdb处理core dump, 打印 stack trace并且给出正在运行线程的信息。

    JobControl

    JobControl是一个工具,它封装了一组Map/Reduce作业以及他们之间的依赖关系。

    数据压缩

    Hadoop Map/Reduce框架为应用程序的写入文件操作提供压缩工具,这些工具可以为map输出的中间数据和作业最终输出数据(例如reduce的输出)提供支持。它还附带了一些 CompressionCodec的实现,比如实现了 zliblzo压缩算法。 Hadoop同样支持gzip文件格式。

    考虑到性能问题(zlib)以及Java类库的缺失(lzo)等因素,Hadoop也为上述压缩解压算法提供本地库的实现。更多的细节请参考 这里

    中间输出

    应用程序可以通过 JobConf.setCompressMapOutput(boolean)api控制map输出的中间结果,并且可以通过 JobConf.setMapOutputCompressorClass(Class)api指定 CompressionCodec

    作业输出

    应用程序可以通过 FileOutputFormat.setCompressOutput(JobConf, boolean) api控制输出是否需要压缩并且可以使用 FileOutputFormat.setOutputCompressorClass(JobConf, Class)api指定CompressionCodec

    如果作业输出要保存成 SequenceFileOutputFormat格式,需要使用 SequenceFileOutputFormat.setOutputCompressionType(JobConf, SequenceFile.CompressionType)api,来设定SequenceFile.CompressionType (i.e. RECORD / BLOCK - 默认是RECORD)。

    例子:WordCount v2.0

    这里是一个更全面的WordCount例子,它使用了我们已经讨论过的很多Map/Reduce框架提供的功能。

    运行这个例子需要HDFS的某些功能,特别是 DistributedCache相关功能。因此这个例子只能运行在 伪分布式 或者 完全分布式模式的 Hadoop上。

    源代码

      WordCount.java
    1. package org.myorg;
    2.  
    3. import java.io.*;
    4. import java.util.*;
    5.  
    6. import org.apache.hadoop.fs.Path;
    7. import org.apache.hadoop.filecache.DistributedCache;
    8. import org.apache.hadoop.conf.*;
    9. import org.apache.hadoop.io.*;
    10. import org.apache.hadoop.mapred.*;
    11. import org.apache.hadoop.util.*;
    12.  
    13. public class WordCount extends Configured implements Tool {
    14.  
    15.    public static class Map extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    16.  
    17.      static enum Counters { INPUT_WORDS }
    18.  
    19.      private final static IntWritable one = new IntWritable(1);
    20.      private Text word = new Text();
    21.  
    22.      private boolean caseSensitive = true;
    23.      private Set<String> patternsToSkip = new HashSet<String>();
    24.  
    25.      private long numRecords = 0;
    26.      private String inputFile;
    27.  
    28.      public void configure(JobConf job) {
    29.        caseSensitive = job.getBoolean("wordcount.case.sensitive", true);
    30.        inputFile = job.get("map.input.file");
    31.  
    32.        if (job.getBoolean("wordcount.skip.patterns", false)) {
    33.          Path[] patternsFiles = new Path[0];
    34.          try {
    35.            patternsFiles = DistributedCache.getLocalCacheFiles(job);
    36.          } catch (IOException ioe) {
    37.            System.err.println("Caught exception while getting cached files: " + StringUtils.stringifyException(ioe));
    38.          }
    39.          for (Path patternsFile : patternsFiles) {
    40.            parseSkipFile(patternsFile);
    41.          }
    42.        }
    43.      }
    44.  
    45.      private void parseSkipFile(Path patternsFile) {
    46.        try {
    47.          BufferedReader fis = new BufferedReader(new FileReader(patternsFile.toString()));
    48.          String pattern = null;
    49.          while ((pattern = fis.readLine()) != null) {
    50.            patternsToSkip.add(pattern);
    51.          }
    52.        } catch (IOException ioe) {
    53.          System.err.println("Caught exception while parsing the cached file '" + patternsFile + "' : " + StringUtils.stringifyException(ioe));
    54.        }
    55.      }
    56.  
    57.      public void map(LongWritable key, Text value, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    58.        String line = (caseSensitive) ? value.toString() : value.toString().toLowerCase();
    59.  
    60.        for (String pattern : patternsToSkip) {
    61.          line = line.replaceAll(pattern, "");
    62.        }
    63.  
    64.        StringTokenizer tokenizer = new StringTokenizer(line);
    65.        while (tokenizer.hasMoreTokens()) {
    66.          word.set(tokenizer.nextToken());
    67.          output.collect(word, one);
    68.          reporter.incrCounter(Counters.INPUT_WORDS, 1);
    69.        }
    70.  
    71.        if ((++numRecords % 100) == 0) {
    72.          reporter.setStatus("Finished processing " + numRecords + " records " + "from the input file: " + inputFile);
    73.        }
    74.      }
    75.    }
    76.  
    77.    public static class Reduce extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    78.      public void reduce(Text key, Iterator<IntWritable> values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    79.        int sum = 0;
    80.        while (values.hasNext()) {
    81.          sum += values.next().get();
    82.        }
    83.        output.collect(key, new IntWritable(sum));
    84.      }
    85.    }
    86.  
    87.    public int run(String[] args) throws Exception {
    88.      JobConf conf = new JobConf(getConf(), WordCount.class);
    89.      conf.setJobName("wordcount");
    90.  
    91.      conf.setOutputKeyClass(Text.class);
    92.      conf.setOutputValueClass(IntWritable.class);
    93.  
    94.      conf.setMapperClass(Map.class);
    95.      conf.setCombinerClass(Reduce.class);
    96.      conf.setReducerClass(Reduce.class);
    97.  
    98.      conf.setInputFormat(TextInputFormat.class);
    99.      conf.setOutputFormat(TextOutputFormat.class);
    100.  
    101.      List<String> other_args = new ArrayList<String>();
    102.      for (int i=0; i < args.length; ++i) {
    103.        if ("-skip".equals(args[i])) {
    104.          DistributedCache.addCacheFile(new Path(args[++i]).toUri(), conf);
    105.          conf.setBoolean("wordcount.skip.patterns", true);
    106.        } else {
    107.          other_args.add(args[i]);
    108.        }
    109.      }
    110.  
    111.      FileInputFormat.setInputPaths(conf, new Path(other_args.get(0)));
    112.      FileOutputFormat.setOutputPath(conf, new Path(other_args.get(1)));
    113.  
    114.      JobClient.runJob(conf);
    115.      return 0;
    116.    }
    117.  
    118.    public static void main(String[] args) throws Exception {
    119.      int res = ToolRunner.run(new Configuration(), new WordCount(), args);
    120.      System.exit(res);
    121.    }
    122. }
    123.  

    运行样例

    输入样例:

    $ bin/hadoop dfs -ls /usr/joe/wordcount/input/ 
    /usr/joe/wordcount/input/file01 
    /usr/joe/wordcount/input/file02 

    $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file01 
    Hello World, Bye World! 

    $ bin/hadoop dfs -cat /usr/joe/wordcount/input/file02 
    Hello Hadoop, Goodbye to hadoop.

    运行程序:

    $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount /usr/joe/wordcount/input /usr/joe/wordcount/output

    输出:

    $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 
    Bye 1 
    Goodbye 1 
    Hadoop, 1 
    Hello 2 
    World! 1 
    World, 1 
    hadoop. 1 
    to 1 

    注意此时的输入与第一个版本的不同,输出的结果也有不同。

    现在通过DistributedCache插入一个模式文件,文件中保存了要被忽略的单词模式。

    $ hadoop dfs -cat /user/joe/wordcount/patterns.txt 
    \. 
    \, 
    \! 
    to 

    再运行一次,这次使用更多的选项:

    $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=true /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

    应该得到这样的输出:

    $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 
    Bye 1 
    Goodbye 1 
    Hadoop 1 
    Hello 2 
    World 2 
    hadoop 1 

    再运行一次,这一次关闭大小写敏感性(case-sensitivity):

    $ bin/hadoop jar /usr/joe/wordcount.jar org.myorg.WordCount -Dwordcount.case.sensitive=false /usr/joe/wordcount/input /usr/joe/wordcount/output -skip /user/joe/wordcount/patterns.txt

    输出:

    $ bin/hadoop dfs -cat /usr/joe/wordcount/output/part-00000 
    bye 1 
    goodbye 1 
    hadoop 2 
    hello 2 
    world 2 

    程序要点

    通过使用一些Map/Reduce框架提供的功能,WordCount的第二个版本在原始版本基础上有了如下的改进:

    • 展示了应用程序如何在Mapper (和Reducer)中通过configure方法 修改配置参数(28-43行)。
    • 展示了作业如何使用DistributedCache 来分发只读数据。 这里允许用户指定单词的模式,在计数时忽略那些符合模式的单词(104行)。
    • 展示Tool接口和GenericOptionsParser处理Hadoop命令行选项的功能 (87-116, 119行)。
    • 展示了应用程序如何使用Counters(68行),如何通过传递给map(和reduce) 方法的Reporter实例来设置应用程序的状态信息(72行)。

    Java和JNI是Sun Microsystems, Inc.在美国和其它国家的注册商标。

    展开全文
  • Hadoop map和reduce的个数设置,困扰了很多学习Hadoop的成员,为什么设置了配置参数就是不生效那?Hadoop Map和Reduce个数,到底跟什么有关系。首先他的参数很多,而且可能随着版本不同一些配置参数,会发生一些变化...
  • Hadoop map reduce 过程获取环境变量 Hadoop任务执行过程中,在每一个map节点或者reduce节点能获取一下环境变量,利用这些变量可以为特殊的需求服务,例如:获取当前map节点处理的数据文件的路径。 hadoop...
  • Hadoop Map/Reduce说明  hadoop Map/Reduce是一个使用简易... 一个Map/Reduce作业经常讲数据集切分成独立的块,这些块通过map任务并行处理,框架对map的输出进行排序,排序结果会被reduce以输入参数进行接收。通常作
  • 云计算(十九)- Hadoop Map/Reduce教程

    千次阅读 2013-12-16 22:19:11
    Hadoop Map/Reduce教程 目的先决条件概述输入与输出例子:WordCount v1.0 源代码用法解释 Map/Reduce - 用户界面 核心功能描述 MapperReducerPartitionerReporterOutputCollector 作业配置任务...
  • hadoop map reduce 过程获取环境变量

    千次阅读 2012-07-24 12:54:37
    hadoop任务执行过程中,在每一个map节点或者reduce节点能获取一下环境变量,利用这些变量可以为特殊的需求服务,例如:获取当前map节点处理的数据文件的路径。 hadoop是java实现的,利用java可以很方便的获取...
  • Hadoop Map/Reduce编程模型实现海量数据处理—数字求和,分别用不同的方法来使用Hadoop,进而了解Hadoop在数据处理的不同处理方式
  • Hadoop Map-Reduce 天气示例

    千次阅读 2013-09-06 10:34:42
    我们照着Hadoop教程简单的写了一个例子,它可以用于分析天气数据然后找到某年的最高气温。 我们是用hadoop 0.20的新的API写的,具体代码如下: Mapper类: /* */ package com.charles.parseweather; import ...
  • hadoop默认参数

    万次阅读 2012-08-14 18:47:08
    1 常用的端口配置 ...参数 描述 默认 配置文件 例子值 fs.default.name namenode namenode RPC交互端口 8020 core-site.xml hdfs://ma
  • Hadoop调优参数汇总

    千次阅读 2018-02-23 20:14:09
    linux参数以下参数最好优化一下: 文件描述符ulimit -n 用户最大进程 nproc (hbase需要 hbse book) 关闭swap分区 设置合理的预读取缓冲区 Linux的内核的IO调度器JVM参数JVM方面的优化项Hadoop Performance ...
  • 介绍 基于HadoopETL和Hadoop和Storm的各种实用程序类 哲学 使用简单 CSV格式的输入输出 在简单的JSON文件中定义的元数据 可以通过许多配置旋钮进行高度配置 解决方案 ... 该项目中的Map reduce作业可用于其
  • hadoop streaming参数整理

    2017-09-21 17:01:08
    Hadoop Streaming 是Hadoop提供的一个编程工具,Streamining框架允许任何可执行文件或者脚本文件作为Mapper和Reducer在Hadoop MapReduce中使用,方便已有程序向Hadoop平台移植。因此可以说对于hadoop的扩展性意义...
  • hadoop streaming 参数设置

    千次阅读 2013-08-29 16:23:08
    Hadoop Streaming用法 Usage: $HADOOP_HOME/bin/hadoop jar \ $HADOOP_HOME/hadoop-streaming.jar [options] options: (1)-input:输入文件路径 (2)-output:输出文件路径 (3)-mapper:用户自己写的...
  • 最近用Hadoop统计将近一亿行的数据的统计,由于每一行的列再加上Overall的统计counter数量超过了120,故在Hadoop的运行过程中,抛出如下异常: org.apache.hadoop.mapreduce.counters.LimitExceededException: Too many...
  • hadoop streaming参数配置

    千次阅读 2016-12-13 12:23:06
    Streaming简介Streamining框架允许任何程序语言实现的程序在Hadoop MapReduce中使用,方便已有程序向Hadoop平台移植。因此可以说对于hadoop的扩展性意义重大。Streamining的原理是用Java实现一个包装用户程序的...
  • hadoop stream 参数详解

    千次阅读 2014-03-20 15:20:07
    原文地址:streaming" style="background-color:inherit">Hadoop streaming作者:tivoli_chen ...它允许用户创建和执行使用任何程序或者脚本编写的map或者reduce的mapreducejobs。譬如, $HADOOP_H
  • hadoop 参数配置

    万次阅读 2015-12-18 10:04:37
    hadoop 参数配置 hadoop 参数 hadoop配置参数 hadoop优化 目录[-] Hadoop参数汇总 linux参数 JVM参数 Hadoop参数大全 core-default.xml hdfs-default.xml yarn-default.xml Hadoop...
  • Hadoop与JVM重用对应的参数是mapred.job.reuse.jvm.num.tasks,默认是1,表示一个JVM上最多可以顺序执行的task数目(属于同一个Job)是1。也就是说一个task启一个JVM。    比如在集群中配置每个slave...
  • Hadoop 可以在作业的Configuration对象中通过设定一系列参数来改变作业的行为,比如,我们需要进行一个map-reduce作业,并且吧最终作业reduce过程的结果输出为压缩的格式,我们可以在一般的map-reduce上进行一些定制...
  • 基于C++的Hadoop Map/Reduce框架--HCE

    千次阅读 2015-09-09 11:18:08
    Hadoop系统提供了MapReduce计算框架的开源实现,像Yahoo!、Facebook、淘宝、中移动、百度、腾讯等公司都...Hadoop系统性能不仅取决于任务调度器的分配策略,还受到分配后实际任务执行效率的影响,任务执行常常涉及
  • hadoop distcp 参数详解

    千次阅读 2017-11-13 10:13:39
    # hadoop distcp  usage: distcp OPTIONS [source_path...]  OPTIONS  -append Reuse existing data in target files and  append new data
  • Hadoop集群参数配置原则

    千次阅读 2011-08-28 21:54:37
    执行merge sort的时候,每次同时打开多少个spill文件由该参数决定。打开的文件越多,不一定merge sort就越快,所以要根据数据情况适当的调整。 mapred.child.java.opts 设置JVM堆的最大可用内存,需从应用程序...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 57,876
精华内容 23,150
关键字:

hadoopmap参数