精华内容
下载资源
问答
  • MR mapper数量怎么确定? 看过MR的处理流程的人应该都知道,在MR处理的时候有个split,这个split数量决定了mapper的数量,那split是怎么来的呢?我们在写MR代码的时候也没有接口可以定义split的数量,那...

    看过MR的处理流程的人应该都知道,在MR处理的时候有个split,这个split数量决定了mapper的数量,那split是怎么来的呢?我们在写MR代码的时候也没有接口可以定义split的数量,那split怎么来的? 有人说是block数量,真是是这样吗? 我们来看一下源码:

    public List<InputSplit> getSplits(JobContext job) throws IOException {
        StopWatch sw = new StopWatch().start();
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus> files = listStatus(job);
        for (FileStatus file: files) {
          Path path = file.getPath();
          long length = file.getLen();
          if (length != 0) {
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            if (isSplitable(job, path)) {
              long blockSize = file.getBlockSize();
              long splitSize = computeSplitSize(blockSize, minSize, maxSize);//这里计算splitSize
    
              long bytesRemaining = length;
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                            blkLocations[blkIndex].getHosts(),
                            blkLocations[blkIndex].getCachedHosts()));
                bytesRemaining -= splitSize;
              }
    
              if (bytesRemaining != 0) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                           blkLocations[blkIndex].getHosts(),
                           blkLocations[blkIndex].getCachedHosts()));
              }
            } else { // not splitable
              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        sw.stop();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
              + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
        }
        return splits;
      }
      protected long computeSplitSize(long blockSize, long minSize,
                                      long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize)); //获取splitSize
      }

    其中的getMinSplitSize和getMaxSplitSize方法分别用于获取最小InputSplit和最大InputSplit的值,对应的配置参数分别为mapreduce.input.fileinputformat.split.minsize,默认值为1L和mapreduce.input.fileinputformat.split.maxsize,默认值为Long.MAX_VALUE

    所以,hadoop中默认的block size为128M,所以split的size一般对应为block的大小,所以,Mapper的数量就是文件个数的数量;

    这样可以做到数据本地性,提示效率;

     

    1.如果根据特殊情况的需要非要自定义mapper的数量怎么办?

    那就只有修改块的大小、split的最小值和最大值来影响mapper的数量;

    posted on 2018-04-16 12:40 qiezijiajia 阅读(...) 评论(...) 编辑 收藏

    转载于:https://www.cnblogs.com/dpains/p/8855313.html

    展开全文
  • MR 的 mapper 数量问题

    2015-02-11 15:49:00
    看到群里面一篇文章涨了贱识 ... 之前关注过 reduceer 的数量问题,还没注意到 mapper 的数量怎么确定的 文章中可以提炼出三点: ...2.mapper数量是怎么确定的;3.一个split不会包含两个File的Block,不...

    看到群里面一篇文章涨了贱识

    http://www.cnblogs.com/xuxm2007/archive/2011/09/01/2162011.html

    之前关注过 reduceer 的数量问题,还没注意到 mapper 的数量怎么确定的

    文章中可以提炼出三点:

    1.block和split的关系;2.mapper数量是怎么确定的;3.一个split不会包含两个File的Block,不会跨越File边界

    还好自己手贱去翻了一下源码


     在hadoop2.2.0 的源码中关于mapper数量确定的核心代码为:

     1 for (FileStatus file: files) {
     2       Path path = file.getPath();
     3       long length = file.getLen();
     4       if (length != 0) {
     5         BlockLocation[] blkLocations;
     6         if (file instanceof LocatedFileStatus) {
     7           blkLocations = ((LocatedFileStatus) file).getBlockLocations();
     8         } else {
     9           FileSystem fs = path.getFileSystem(job.getConfiguration());
    10           blkLocations = fs.getFileBlockLocations(file, 0, length);
    11         }
    12         if (isSplitable(job, path)) {
    13           long blockSize = file.getBlockSize();
    14           long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    15 
    16           long bytesRemaining = length;
    17           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    18             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    19             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
    20                                      blkLocations[blkIndex].getHosts()));
    21             bytesRemaining -= splitSize;
    22           }
    23 
    24           if (bytesRemaining != 0) {
    25             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    26             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
    27                        blkLocations[blkIndex].getHosts()));
    28           }
    29         } else { // not splitable
    30           splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
    31         }
    32       } else { 
    33         //Create empty hosts array for zero length files
    34         splits.add(makeSplit(path, 0, length, new String[0]));
    35       }
    36     }

    并没有看到文章中提到的goalSize,读一下源码就会发现和自己设定的mapper数量变量一点关系都没有

    再看hadoop1.x的代码,一样没有,又翻到以前下载的hadoop0.x的源代码,才找到了,读了一下就会发现文章中博主的观点都是对的,不过已经过时了而已

    那么现在的mapper数量是怎么确定的?


     

    想要自己设定mapper数量并不像设定reduceer数量那么简单直接调用Job.setNumReduceTasks(int)就可以了,Job类没有setNumMapTasks方法

    但可以通过Configuration.set(JobContext.NUM_MAPS, int)和在hadoop jar命令行提交时加参数-Dmapreduce.job.maps

    但测试并没有效果

    根据hadoop作业提交过程跟读源码发现在hadoop通过JobSubmitter类的submitJobInternal(Jobjob, Cluster cluster)方法向系统提交作业时有跟mapper数量有关的代码

        // Create the splits for the job
          LOG.debug("Creating splits at " + jtFs.makeQualified(submitJobDir));
          int maps = writeSplits(job, submitJobDir);
          conf.setInt(MRJobConfig.NUM_MAPS, maps);
          LOG.info("number of splits:" + maps);

    mapper的数量通过writeSplits方法返回,该方法相关源代码:

      private int writeSplits(org.apache.hadoop.mapreduce.JobContext job,
          Path jobSubmitDir) throws IOException,
          InterruptedException, ClassNotFoundException {
         JobConf jConf = (JobConf)job.getConfiguration();
          int maps;
          if (jConf.getUseNewMapper()) {
            maps = writeNewSplits(job, jobSubmitDir);
          } else {
            maps = writeOldSplits(jConf, jobSubmitDir);
          }
          return maps;
      }

    新旧版本的jobcontext暂且不论,现在一般都是新的,所以由writeNewSplits方法确定

     1   @SuppressWarnings("unchecked")
     2   private <T extends InputSplit>
     3   int writeNewSplits(JobContext job, Path jobSubmitDir) throws IOException,
     4       InterruptedException, ClassNotFoundException {
     5      Configuration conf = job.getConfiguration();
     6       InputFormat<?, ?> input =
     7       ReflectionUtils.newInstance(job.getInputFormatClass(), conf);
     8 
     9       List<InputSplit> splits = input.getSplits(job);
    10       T[] array = (T[]) splits.toArray(new InputSplit[splits.size()]);
    11 
    12       // sort the splits into order based on size, so that the biggest
    13       // go first
    14       Arrays.sort(array, new SplitComparator());
    15       JobSplitWriter.createSplitFiles(jobSubmitDir, conf, 
    16         jobSubmitDir.getFileSystem(conf), array);
    17       return array.length;
    18   }

    从代码中可以读出1.map的数量就是split的数量;2.map的数量是由反射出的inputformat类算出的;

    inputfomat是一个接口,最常用的是它的实现类FileInputFormat和其子类TextInputFormat,在MR中如果不指定则默认为TextInputFormat

    split的计算方法在TextInputFormat中没有,在其父类FileInputFormat中

    在这里看到最开始贴出的问题源头源代码

     1 for (FileStatus file: files) {
     2       Path path = file.getPath();
     3       long length = file.getLen();
     4       if (length != 0) {
     5         BlockLocation[] blkLocations;
     6         if (file instanceof LocatedFileStatus) {
     7           blkLocations = ((LocatedFileStatus) file).getBlockLocations();
     8         } else {
     9           FileSystem fs = path.getFileSystem(job.getConfiguration());
    10           blkLocations = fs.getFileBlockLocations(file, 0, length);
    11         }
    12         if (isSplitable(job, path)) {
    13           long blockSize = file.getBlockSize();
    14           long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    15 
    16           long bytesRemaining = length;
    17           while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
    18             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    19             splits.add(makeSplit(path, length-bytesRemaining, splitSize,
    20                                      blkLocations[blkIndex].getHosts()));
    21             bytesRemaining -= splitSize;
    22           }
    23 
    24           if (bytesRemaining != 0) {
    25             int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
    26             splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
    27                        blkLocations[blkIndex].getHosts()));
    28           }
    29         } else { // not splitable
    30           splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts()));
    31         }
    32       } else { 
    33         //Create empty hosts array for zero length files
    34         splits.add(makeSplit(path, 0, length, new String[0]));
    35       }
    36     }

    其中核心是

     long blockSize = file.getBlockSize();
     long splitSize = computeSplitSize(blockSize, minSize, maxSize);

      protected long computeSplitSize(long blockSize, long minSize,
        long maxSize) {
        return Math.max(minSize, Math.min(maxSize, blockSize));
      }

    其中的getMinSplitSize和getMaxSplitSize方法分别用于获取最小InputSplit和最大InputSplit的值,对应的配置参数分别为mapreduce.input.fileinputformat.split.minsize,默认值为1L和mapreduce.input.fileinputformat.split.maxsize,默认值为Long.MAX_VALUE,十六进制数值为 0x7fffffffffffffffL,对应的十进制为9223372036854775807,getFormatMinSplitSize方法返回该输入格式下InputSplit的下限。以上数字的单位都是byte。由此得出minSize的大小为1L,maxSize的大小为Long.MAX_VALUE,而blockSize就是hadoop块的大小,hadoop2.x后一般为128M,结合代码可以发现splitSize一般就是块的大小

    所以,结论来了,一般mapper的数量就是文件块的数量。

    不过这样设计也很有道理,因为块都是分散和副本存储的,所以可以参考块在哪个主机上就跟哪个主机分配map任务(不是唯一因素),实现本地性,提高效率。


     

    不过还存在三个问题值得思考一下

    1.如果根据特殊情况的需要非要自定义mapper的数量怎么办?

    那就只有修改块的大小、split的最小值和最大值来影响mapper的数量;

    2.如果多文件呢?

    这里还发现源头文章中说在FileInputFormat的getSplits方法中计算单位都是块的数量,这个结论是不正确的,单位还是byte

    代码隐藏的一个规律就是split是按文件划分的,虽然划的时候采用了SPLIT_SLOP(默认1.1),也难免会有大于0.1*blockSize小于blockSize的split

    不过没有往下个文件推,所以 一个split不会包含两个File的Block,不会跨越File边界

    (一个split也不一定就是blockSize的大小,除了最小split和最大split的值影响外还可能小于blockSize和大于blockSize小于1.1*blockSize)

    3.如果通过Configuration.set(JobContext.NUM_MAPS, int)自定义了mapper的数量会出现什么情况?

    结合hadoop作业的提交过程可发现这个值会被计算后的NUM_MAPS覆盖,所以设置了也没用


     

    都是手贱惹的祸

     

    转载于:https://www.cnblogs.com/admln/p/hadoop-mapper-numbers-question.html

    展开全文
  • 本篇分两部分,第一部分分析使用 java 提交 mapreduce 任务时对 mapper 数量的控制,第二部分分析使用 streaming 形式提交 mapreduce 任务时对 mapper 数量的控制。 环境:hadoop-3.0.2 前言: 熟悉 hadoop ...

    本篇分两部分,第一部分分析使用 java 提交 mapreduce 任务时对 mapper 数量的控制,第二部分分析使用 streaming 形式提交 mapreduce 任务时对 mapper 数量的控制。

     

    环境:hadoop-3.0.2

    前言:

    熟悉 hadoop mapreduce 的人可能已经知道,即使在程序里对 conf 显式地设置了 mapred.map.tasks 或 mapreduce.job.maps,程序也并没有运行期望数量的 mapper。

    这是因为,mapper 的数量由输入的大小、HDFS 当前设置的 BlockSize、以及当前配置中的 split min size 和 split max size 等参数共同确定,并不会受到简单的人工设置 mapper num 的影响。

    因此,对于 mapper num 的控制,需要我们理解 hadoop 中对于 FileInputFormat 类中 getSplit() 方法的实现,针对性地配置 BlockSize、split min size、split max size 等参数,才能达到目的。

    重点:

    值得一提并且容易忽略的是,要区分 org.apache.hadoop.mapred.FileInputFormat类和 org.apache.hadoop.mapreduce.lib.input.FileInputFormat类,两者虽然相似,但在getSplit()上的实现是有区别的。

    重要区别是,hadoop streaming 中使用的 InputFormat 类,使用的是 org.apache.hadoop.mapred.FileInputFormat,仅仅需要指定 mapreduce.job.maps ,就能够设置 mapper num了(具体源码分析在第二部分)。而使用JAVA设计的 mapreduce 任务中使用的 InputFormat 类,使用的是 org.apache.hadoop.mapreduce.lib.input.FileInputFormat,则需要通过配置BlockSize、split min size、split max size 等参数来间接性地控制 mapper num。

     

    一、Java 本地提交 mapreduce 任务, org.apache.hadoop.mapreduce.lib.input.FileInputFormat 的 mapper num 控制

    1. 在java本地编辑 mapreduce 任务,(默认)使用 FileInputFormat 类的子类 TextInputFormat

    job.setInputFormatClass(TextInputFormat.class);
    
    

     

    2. mapper 的切分逻辑在 FileInputFormat 类中的 getSplits()实现:

    public List<InputSplit> getSplits(JobContext job) throws IOException {
            StopWatch sw = (new StopWatch()).start();
            long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
            long maxSize = getMaxSplitSize(job);
            List<InputSplit> splits = new ArrayList();
            List<FileStatus> files = this.listStatus(job);
            Iterator var9 = files.iterator();
    
            while(true) {
                while(true) {
                    while(var9.hasNext()) {
                        FileStatus file = (FileStatus)var9.next();
                        Path path = file.getPath();
                        long length = file.getLen();
                        if (length != 0L) {
                            BlockLocation[] blkLocations;
                            if (file instanceof LocatedFileStatus) {
                                blkLocations = ((LocatedFileStatus)file).getBlockLocations();
                            } else {
                                FileSystem fs = path.getFileSystem(job.getConfiguration());
                                blkLocations = fs.getFileBlockLocations(file, 0L, length);
                            }
    
                            if (this.isSplitable(job, path)) {
                                long blockSize = file.getBlockSize();
                                long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
    
                                long bytesRemaining;
                                int blkIndex;
                                for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
                                    blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                    splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                                }
    
                                if (bytesRemaining != 0L) {
                                    blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
                                    splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
                                }
                            } else {
                                if (LOG.isDebugEnabled() && length > Math.min(file.getBlockSize(), minSize)) {
                                    LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath());
                                }
    
                                splits.add(this.makeSplit(path, 0L, length, blkLocations[0].getHosts(), blkLocations[0].getCachedHosts()));
                            }
                        } else {
                            splits.add(this.makeSplit(path, 0L, length, new String[0]));
                        }
                    }
    
                    job.getConfiguration().setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.size());
                    sw.stop();
                    if (LOG.isDebugEnabled()) {
                        LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
                    }
    
                    return splits;
                }
            }
        }

     

    3. 最后确定 mapper 数量在这里:

     1                         if (this.isSplitable(job, path)) {
     2                             long blockSize = file.getBlockSize();
     3                             long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);
     4 
     5                             long bytesRemaining;
     6                             int blkIndex;
     7                             for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
     8                                 blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
     9                                 splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
    10                             }
    11 
    12                             if (bytesRemaining != 0L) {
    13                                 blkIndex = this.getBlockIndex(blkLocations, length - bytesRemaining);
    14                                 splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, blkLocations[blkIndex].getHosts(), blkLocations[blkIndex].getCachedHosts()));
    15                             }

    含义:

    a. 当 this.isSplitable 开启时,只要当前未分配的大小 bytesRemaining 大于 splitSize 的 1.1 倍,就添加一个 inputSplit, 即一个mapper 被生成。 

    b. 最后,不足 1.1 倍splitSize 的残余,补充为一个 mapper。因此,经常发现实际分配的 mapper 数比自己定义的会多 1 个。

    c. 为什么设置1.1倍?避免将不足 0.1 倍 splitSize 的量分配为一个 mapper, 避免浪费。

     

    4.  重要的两个量:BlockSize 和 splitSize

    long blockSize = file.getBlockSize();
    long splitSize = this.computeSplitSize(blockSize, minSize, maxSize);

    其中,blockSize 是 hdfs 设置的,一般是 64MB 或 128MB,我的 hdfs 中为 128 MB = 132217728L。这个量可认为静态,我们不宜修改。

    观察 splitSize 的获得:

    1     protected long computeSplitSize(long blockSize, long minSize, long maxSize) {
    2         return Math.max(minSize, Math.min(maxSize, blockSize));
    3     }

    在 getSplits()中找到 minSize, maxSize, blockSize 的赋值:

    long minSize = Math.max(this.getFormatMinSplitSize(), getMinSplitSize(job));
    long maxSize = getMaxSplitSize(job);

    找到这些量的赋值、默认值:

    maxSize 的 setter/getter:    除非用户重新设置,否则 maxSize 的默认值为 Long 的最大值 

    1 public static void setMaxInputSplitSize(Job job, long size) {
    2         job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.maxsize", size);
    3     }
    4 
    5 public static long getMaxSplitSize(JobContext context) {
    6         return context.getConfiguration().getLong("mapreduce.input.fileinputformat.split.maxsize", 9223372036854775807L);
    7     }

    minSize 的 setter/getter:  除非用户重新设置,否则 minSize 的默认值为 1L

    protected long getFormatMinSplitSize() {
            return 1L;
        }
    
    public static void setMinInputSplitSize(Job job, long size) {
            job.getConfiguration().setLong("mapreduce.input.fileinputformat.split.minsize", size);
        }
    
    public static long getMinSplitSize(JobContext job) {
            return job.getConfiguration().getLong("mapreduce.input.fileinputformat.split.minsize", 1L);
        }

     

    因此容易算出,默认情况下,

    long splitSize = this.computeSplitSize(blockSize, minSize, maxSize) = Math.max(Math.max(1L,1L), Math.min(9223372036854775807L, 128M=132217728L)) = 132217728L = 128M

    5. 控制 mapper 数量

    知道了上面的计算过程,我们要控制 mapper,在 BlockSize 不能动的情况下,就必须控制 minSize 和 maxSize 了。这里主要控制 maxSize。 

     

    TextInputFormat.setMinInputSplitSize(job, 1L);//设置minSize
    TextInputFormat.setMaxInputSplitSize(job, 10 * 1024 * 1024);//设置maxSize

     

    测试输入文件大小为 40MB, 很小, 在默认情况下, 被分配为 1 个或 2 个 mapper 执行成功。

    现在希望分配 4 个mapper:那么设置 maxSize 为10M ,那么 splitSize 计算为 10M。对于 40MB 的输入文件,理应分配 4 个mapper。

    实际运行,运行了 5 个mapper,认为成功摆脱了默认启动 2 个mapper 的限制,额外多出的 1 个 mapper 则猜测是上文提到的,对残余量的补充 mapper。

     

    6. 至此,对Java 本地提交 mapreduce 任务, org.apache.hadoop.mapreduce.lib.input.FileInputFormat 的 mapper num 控制方法如上。接下来讨论 streaming 使用的 org.apache.hadoop.mapred.FileInputFormat 的 mapper 控制。

     

    二、streaming 提交 mapreduce 任务, org.apache.hadoop.mapred.FileInputFormat 的 mapper num 控制

    1. 可通过 mapreduce.job.maps 直接控制,即使不是绝对精确。原因在下面的源码分析中可以看到。

    1 hadoop dfs -rm -r -f /output && \
    2 
    3 hadoop jar /opt/hadoop-3.0.2/share/hadoop/tools/lib/hadoop-streaming-3.0.2.jar \
    4 -D mapreduce.reduce.tasks=0 \
    5 -D mapreduce.job.maps=7 \
    6 -input /input \
    7 -output /output \
    8 -mapper "cat" \
    9 -inputformat TextInputFormat

     

    2. 将 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 中的 maxSize,尝试通过 streaming 的 -D 设置,是无效的。因为 streaming 使用的是 org.apache.hadoop.mapred.FileInputFormat,在下面的源码分析中可以看到。

     

    3. 查看 FileInputFormat 的 getSplits 源码

     1     public InputSplit[] getSplits(JobConf job, int numSplits) throws IOException {
     2         StopWatch sw = (new StopWatch()).start();
     3         FileStatus[] files = this.listStatus(job);
     4         job.setLong("mapreduce.input.fileinputformat.numinputfiles", (long)files.length);
     5         long totalSize = 0L;
     6         FileStatus[] var7 = files;
     7         int var8 = files.length;
     8 
     9         for(int var9 = 0; var9 < var8; ++var9) {
    10             FileStatus file = var7[var9];
    11             if (file.isDirectory()) {
    12                 throw new IOException("Not a file: " + file.getPath());
    13             }
    14 
    15             totalSize += file.getLen();
    16         }
    17 
    18         long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
    19         long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);
    20         ArrayList<FileSplit> splits = new ArrayList(numSplits);
    21         NetworkTopology clusterMap = new NetworkTopology();
    22         FileStatus[] var13 = files;
    23         int var14 = files.length;
    24 
    25         for(int var15 = 0; var15 < var14; ++var15) {
    26             FileStatus file = var13[var15];
    27             Path path = file.getPath();
    28             long length = file.getLen();
    29             if (length == 0L) {
    30                 splits.add(this.makeSplit(path, 0L, length, new String[0]));
    31             } else {
    32                 FileSystem fs = path.getFileSystem(job);
    33                 BlockLocation[] blkLocations;
    34                 if (file instanceof LocatedFileStatus) {
    35                     blkLocations = ((LocatedFileStatus)file).getBlockLocations();
    36                 } else {
    37                     blkLocations = fs.getFileBlockLocations(file, 0L, length);
    38                 }
    39 
    40                 if (!this.isSplitable(fs, path)) {
    41                     if (LOG.isDebugEnabled() && length > Math.min(file.getBlockSize(), minSize)) {
    42                         LOG.debug("File is not splittable so no parallelization is possible: " + file.getPath());
    43                     }
    44 
    45                     String[][] splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, 0L, length, clusterMap);
    46                     splits.add(this.makeSplit(path, 0L, length, splitHosts[0], splitHosts[1]));
    47                 } else {
    48                     long blockSize = file.getBlockSize();
    49                     long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);
    50 
    51                     long bytesRemaining;
    52                     String[][] splitHosts;
    53                     for(bytesRemaining = length; (double)bytesRemaining / (double)splitSize > 1.1D; bytesRemaining -= splitSize) {
    54                         splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, splitSize, clusterMap);
    55                         splits.add(this.makeSplit(path, length - bytesRemaining, splitSize, splitHosts[0], splitHosts[1]));
    56                     }
    57 
    58                     if (bytesRemaining != 0L) {
    59                         splitHosts = this.getSplitHostsAndCachedHosts(blkLocations, length - bytesRemaining, bytesRemaining, clusterMap);
    60                         splits.add(this.makeSplit(path, length - bytesRemaining, bytesRemaining, splitHosts[0], splitHosts[1]));
    61                     }
    62                 }
    63             }
    64         }
    65 
    66         sw.stop();
    67         if (LOG.isDebugEnabled()) {
    68             LOG.debug("Total # of splits generated by getSplits: " + splits.size() + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
    69         }
    70 
    71         return (InputSplit[])splits.toArray(new FileSplit[splits.size()]);
    72     }

    与 org.apache.hadoop.mapreduce.lib.input.FileInputFormat 相似,但不同之处还是很重要的。主要在

    long blockSize = file.getBlockSize();
    long splitSize = this.computeSplitSize(goalSize, minSize, blockSize);

    赋值:

    long goalSize = totalSize / (long)(numSplits == 0 ? 1 : numSplits);
    long minSize = Math.max(job.getLong("mapreduce.input.fileinputformat.split.minsize", 1L), this.minSplitSize);

     

    4. 追溯这些量

    protected long computeSplitSize(long goalSize, long minSize, long blockSize) {
        return Math.max(minSize, Math.min(goalSize, blockSize));
    }
    private long minSplitSize = 1L;

     

    5. 分析

    minSize 默认为1L,blocksize是当前目标文件的块大小,而 splitSize 就是 BlockSize 和 goalSize 的小值。

    goalSize 的计算,就是输入文件总大小与 numSplits 的比值。而 numSplits 就是我们在streaming 中设置的 -D mapreduce.job.maps

    因此,在streaming中才可以简单地直接设置 mapper 的数量了。

    但是,只有当 goalsize 小于 blocksize 时,mapreduce.job.maps 才会生效!

    当 goalsize < blocksize,splitsize = goalsize,此时你设置的 mapreduce.job.maps 数量一般大于输入块的数量,因此配置生效。

    当 goalsize > blocksize,splitsize = blocksize,此时你设置的 mapreduce.job.maps 不足,一般少于输入块的数量,因此配置不生效。

    换句话说,如果输入只有一个文件,那么只要 -D mapreduce.job.maps > 1,配置大多数会生效。

     

    转载于:https://www.cnblogs.com/PigeonNoir/p/9229611.html

    展开全文
  • 如何处理mapper数量的大小的问题

    千次阅读 2016-02-22 16:30:25
    减小Map-Reduce job 启动时创建的Mapper数量当处理大批量的大数据时,一种常见的情况是job启动的mapper数量太多而超出了系统限制,导致Hadoop抛出异常终止执行。解决这种异常的思路是减少mapper的数量。具体如下: ...

    减小Map-Reduce job 启动时创建的Mapper数量当处理大批量的大数据时,一种常见的情况是job启动的mapper数量太多而超出了系统限制,导致Hadoop抛出异常终止执行。解决这种异常的思路是减少mapper的数量。具体如下:


    一,输入文件size巨大,但不是小文件这种情况可以通过增大每个mapper的input size,即增大minSize或者增大blockSize来减少所需的mapper的数量。增大blockSize通常不可行,因为当HDFS被hadoop namenode -format之后,blockSize就已经确定了(由格式化时dfs.block.size决定),如果要更改blockSize,需要重新格式化HDFS,这样当然会丢失已有的数据。所以通常情况下只能通过增大minSize,即增大mapred.min.split.size的值。


    二,输入文件数量巨大,且都是小文件所谓小文件,就是单个文件的size小于blockSize。这种情况通过增大mapred.min.split.size不可行,需要使用FileInputFormat衍生的CombineFileInputFormat将多个input path合并成一个InputSplit送给mapper处理,从而减少mapper的数量。具体细节稍后会更新并展开。


    增加Map-Reduce job 启动时创建的Mapper数量增加mapper的数量,可以通过减小每个mapper的输入做到,即减小blockSize或者减小mapred.min.split.size的值。
    展开全文
  • hadoop mapper数量确定

    2017-05-01 14:33:21
    reducer 的数目可以通过参数指定--Job.setNumReduceTasks(int) 通常设置为 (0.95~1.75 ) * 节点数量 * 每个节点上最大的容器数 而mapper的数目如何确定呢? inputsplit由如下算法得到:Math.max(minSize, Math....
  • MapReduce框架的优势是能够在集群中并行运行mapper和reducer任务,那怎样确定mapper和reducer的数量呢,或者说怎样以编程的方式控制作业启动的mapper和reducer数量呢?在《Hadoop-2.4.1学习之Mapper和Reducer》中...
  • 设置Mapper数量与各节点container数量

    千次阅读 2018-04-20 13:45:42
    HashPartitioner是处理Mapper任务输出的,getPartition()方法有三个形参,key、value分别指的是Mapper任务的输出,numReduceTasks指的是设置的Reducer任务数量,默认值是1。 那么任何整数与1相除的余数肯定是0。也...
  • mapper数量 Hadoop采用“数据本地化优化”的策略,使得map端任务到达最佳性能。 在application master为每个分片创建map作业的过程中,MapReduce框架优先选择 存储有输入数据的节点 来执行map任务。  如果所有...
  • MapReduce框架的优势是可以在集群中并行运行mapper和reducer任务,那如何确定mapper和reducer的数量呢,或者说如何以编程的方式控制作业启动的mapper和reducer数量呢?在《Hadoop-2.4.1学习之Mapper和Reducer》中...
  • 根据旧版API中splitSize计算公式,当goalSize小于blockSize,大于minSize时,Mapper数量大致等于参数"mapreduce.job.maps"的值。 新版本API: 可以通过修改"mapreduce.input.fileinputformat.split.minsize"、...
  • Hadoop(17) MR 决定Mapper数量因素

    千次阅读 2016-10-03 19:49:57
    问会启动几个Mapper。 答:启动 3 个, 看个几个block 然后再真正提交作业 submitClient .submitJob (jobId, submitJobDir .toString (), job .getCredentials ()) ; ==>submitClient也是server的代理对象
  • 是这样的,我在nutch中调优是,想增大mapper数量来 加快其中几个job的处理速度,但是发现 代码中 没有 setNumMapTasks的函数, 在脚本中设置了 mapred.map.tasks 后也不起作用,请大神给指条明路,不胜感激
  • 很多文档中描述,...如果输入的文件数量巨大,但是每个文件的size都小于HDFS的blockSize,那么会造成启动的Mapper等于文件的数量(即每个文件都占据了一个block),那么很可能造成启动的Mapper数量超出限制而导...
  • 很多文档中描述,...如果输入的文件数量巨大,但是每个文件的size都小于HDFS的blockSize,那么会造成启动的Mapper等于文件的数量(即每个文件都占据了一个block),那么很可能造成启动的Mapper数量超出限制而导...
  • 如果不OK,如何增加map的数量?  a. 并不一定。当文件接近128M,但是里的内容却非常多的时候并且map处理的逻辑比较复杂。那么用一个map处理,则时间会比较长  b. 把原来的单个文件拆分成多个的文件, 然后使用...
  • 深入认识hive 以mapreduce 为计算引擎时Mapper 和Reducer的设置 背景; 运营团队反映,公司广告业务的日活用户数据量偶尔呈剧烈下降趋势,同时出现用户数低于日活用户数据的问题,后来查看离线解析任务,执行过程...
  • 但是通过这种方式设置map的个数,并不是每次都有效的。原因是mapred.map.tasks只是一个hadoop的参考数值,最终map的个数,还取决于其他的因素。 为了方便介绍,先来看几个名词: block_size : hdfs的文件块大小,...
  • 参考文章: 1. hadoop mapper和reduce数量设置 https://447214075.iteye.com/blog/2153694 2.Hive中如何确定map数 源码级别 文章揭示了不同 InputFormat 下的划分标准 ... Hiv...
  • Mapper数量在默认情况下不可直接控制干预,因为Mapper数量由输入的大小和个数决定。在默认情况下,最终input占据了多少block,就应该启动多少个MapperMapper数量在默认情况下不可直接控制干预,因为Mapper的...
  • 1,sqlsession的真实类型和数量 由于使用spring管理bean,当我们在代码中需要使用这个bean的时候,会首先去容器中找,第一次需要调用MapperFactoryBean的getObject方法获取一个bean,并保存到容器中。 ...
  • Application Master根据作业代码中指定的数据地址(数据源一般来自HDFS)进行数据分片,以确定Mapper任务数,具体每个Mapper任务发往哪个计算节点,Hadoop会考虑数据本地性,本地数据本地性、本机架数据本地性以及...
  • 只 要每个task都运行至少30-40秒钟,就可以考虑将mapper数扩大,比如集群的map slots为100个,那么就不要将一个job的mapper设成101,这样前100个map能够并行完成,而最后一个map要在前100个 mapper结束后才开始,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 414
精华内容 165
关键字:

mapper数量