精华内容
下载资源
问答
  • FileInputFormat

    2015-02-21 20:59:00
    MapReduce框架要处理数据的文件类型 FileInputFormat这个类决定。 TextInputFormat是框架默认的文件类型,可以处理Text文件类型,如果你要处理的文件类型不是Text, 譬如说是Xml或DB,你就需要自己实现或用库中已有的...
    MapReduce框架要处理数据的文件类型 FileInputFormat这个类决定。
    TextInputFormat是框架默认的文件类型,可以处理Text文件类型,如果你要处理的文件类型不是Text,
    譬如说是Xml或DB,你就需要自己实现或用库中已有的类型。
    FileInputFormat的主要方法之一getSplits完成的功能是获取job要处理的路径文件所在的block信息。
    数据结构:FileInputSplit 存储了文件的位置信息,如Host,所属文件信息,开始offset,还有长度信息。
    public class FileSplit extends InputSplit implements Writable {
      private Path file;
      private long start;
      private long length;
      private String[] hosts;
      private SplitLocationInfo[] hostInfos;
    …
    }
    方法介绍:
    blockSize:块大小
    minSize:最小分片大小,由参数mapred.min.split.size设置,默认为1
    maxSize:最大分片大小,由参数mapred.max.split.size设置,默认Long.MAX-VALUE
    计算splitsize的方法:Math.max(minSize,Math.min(maxSize,blockSize)
    FileInputFormat的另一个重要方法是CreateRecordReader.在这个方法里面会用到前面方法所获取到的InpustSplit.这个RecordReader会用来去读取数据,传递给maptask去执行处理。

    当InputSplit尺寸大于block并且其对应的所有block(包含副本)不在同一个节点上时,Map Task不可能完全实现数据的本地化,

    也就是说,总有一部分数据需要从远程节点上读取,因此得出,当使用基于FileInputFormat实现InputFormat时,为了提高数据本地性,应该尽量使InputSplit大小与block大小一致。

    因为不同的文件,在上传的时候可以具体指定blocksize,若不指定则使用系统默认的blocksize,所以在代码中它使用的是file.getblocksize().

    若文件的blocksize是32M,我们的文件是70M,而且文件是可以切分的,则系统是如何分片的呢?(根据源代码进行分析)

    如果我们的minsize=1,maxsize=128,则计算得到的splitsize=32M,每一个block一个inputsplit.

    如果我们的minsize=64,maxsize=128,则计算得到的splitsize=64M, 但因为不满足70/64>1.1的情况,所以还是只会分成一个fileinputsplit,这一个inputsplit包含了两个block的信息。

    试想一下,如果还拆分成两个inputsplit让两个map task去做,第二个maptask只获取一点点的数据,利用率不高。

    若我们的文件是xml文件类型,不管我们的文件是多大,都只能分给一个InputSplit去处理,因为它的isSplitable=false,xml不能切开处理,那样数据就会乱掉。

    /** 
       * Generate the list of files and make them into FileSplits.
       * @param job the job context
       * @throws IOException
       */
      public List<InputSplit> getSplits(JobContext job) throws IOException {
        Stopwatch sw = new Stopwatch().start();
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);
    
        // generate splits
        List<InputSplit> splits = new ArrayList<InputSplit>();
        List<FileStatus> files = listStatus(job);
        for (FileStatus file: files) {
          Path path = file.getPath();
          long length = file.getLen();
          if (length != 0) {
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              FileSystem fs = path.getFileSystem(job.getConfiguration());
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            if (isSplitable(job, path)) {
              long blockSize = file.getBlockSize();
              long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
              long bytesRemaining = length;
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                            blkLocations[blkIndex].getHosts(),
                            blkLocations[blkIndex].getCachedHosts()));
                bytesRemaining -= splitSize;
              }
    
              if (bytesRemaining != 0) {
                int blkIndex = getBlockIndex(blkLocations, length-bytesRemaining);
                splits.add(makeSplit(path, length-bytesRemaining, bytesRemaining,
                           blkLocations[blkIndex].getHosts(),
                           blkLocations[blkIndex].getCachedHosts()));
              }
            } else { // not splitable
              splits.add(makeSplit(path, 0, length, blkLocations[0].getHosts(),
                          blkLocations[0].getCachedHosts()));
            }
          } else { 
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        // Save the number of input files for metrics/loadgen
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        sw.stop();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
              + ", TimeTaken: " + sw.elapsedMillis());
        }
        return splits;
      }

    转载于:https://www.cnblogs.com/huaxiaoyao/p/4297178.html

    展开全文
  • FileInputFormat详解

    千次阅读 2017-07-23 09:32:47
    FileInputFormat 详解笔记

    转载:http://blog.csdn.net/hellozpc/article/details/45771933 https://my.oschina.net/leejun2005/blog/133424

    1. 概述

    我们在设置MapReduce输入格式的时候,会调用这样一条语句:

    job.setInputFormatClass(KeyValueTextInputFormat.class);

    这条语句保证了输入文件会按照我们预设的格式被读取。KeyValueTextInputFormat即为我们设定的数据读取格式。

    所有的输入格式类都继承自InputFormat,这是一个抽象类。其子类有例如专门用于读取普通文件的FileInputFormat,还有用来读取数据库的DBInputFormat等等。相关类图简单画出如下:

    2. InputFormat

    从InputFormat类图看,InputFormat抽象类仅有两个抽象方法:

     List<InputSplit> getSplits(), 获取由输入文件计算出输入分片(InputSplit),解决数据或文件分割成片问题。
        RecordReader<K,V> createRecordReader(),创建RecordReader,从InputSplit中读取数据,解决读取分片中数据问题。

    在后面说到InputSplits的时候,会介绍在getSplits()时需要验证输入文件是否可分割、文件存储时分块的大小和文件大小等因素,所以总体来说,通过InputFormat,Mapreduce框架可以做到:

      验证作业输入的正确性
        将输入文件切割成逻辑分片(InputSplit),一个InputSplit将会被分配给一个独立的MapTask
        提供RecordReader实现,读取InputSplit中的“K-V对”供Mapper使用

    InputFormat抽象类源码也很简单,如下供参考(文章格式考虑,删除了部分注释,添加了部分中文注释):

    public abstract class InputFormat<K, V> {
    
        /**
         * 仅仅是逻辑分片,并没有物理分片,所以每一个分片类似于这样一个元组 <input-file-path, start, offset>
         */
        public abstract List<InputSplit> getSplits(JobContext context)
                throws IOException, InterruptedException;
    
        /**
         * Create a record reader for a given split.
         */
        public abstract RecordReader<K, V> createRecordReader(InputSplit split,
                TaskAttemptContext context) throws IOException,
                InterruptedException;
    
    }
    

    不同的InputFormat会各自实现不同的文件读取方式以及分片方式,每个输入分片会被单独的map task作为数据源。下面详细介绍输入分片(inputSplit)是什么。

    3.InputSplit

    Mappers的输入是一个一个的输入分片,称InputSplit。看源码可知,InputSplit也是一个抽象类,它在逻辑上包含了提供给处理这个InputSplit的Mapper的所有K-V对。

    public abstract class InputSplit {
          /**
           * 获取Split的大小,支持根据size对InputSplit排序.
           */
          public abstract long getLength() throws IOException, InterruptedException;
    
          /**
           * 获取存储该分片的数据所在的节点位置.
           */
          public abstract 
            String[] getLocations() throws IOException, InterruptedException;
    }

    下面深入看一个InputSplit的子类:FileSplit类

    public class FileSplit extends InputSplit implements Writable {
        private Path file;
        private long start;
        private long length;
        private String[] hosts;
    
        /**
         * Constructs a split with host information
         * 
         * @param file
         *            the file name
         * @param start
         *            the position of the first byte in the file to process
         * @param length
         *            the number of bytes in the file to process
         * @param hosts
         *            the list of hosts containing the block, possibly null
         */
        public FileSplit(Path file, long start, long length, String[] hosts) {
            this.file = file;
            this.start = start;
            this.length = length;
            this.hosts = hosts;
        }
    
        /** The number of bytes in the file to process. */
        @Override
        public long getLength() {
            return length;
        }
    
        @Override
        public String[] getLocations() throws IOException {
            if (this.hosts == null) {
                return new String[] {};
            } else {
                return this.hosts;
            }
        }
    
        // 略掉部分方法
    }
    

    从源码中可以看出,FileSplit有四个属性:文件路径,分片起始位置,分片长度和存储分片的hosts。用这四项数据,就可以计算出提供给每个Mapper的分片数据。在InputFormat的getSplit()方法中构造分片,分片的四个属性会通过调用FileSplit的Constructor设置。

    再看一个InputSplit的子类:CombineFileSplit。源码如下:

    public class CombineFileSplit extends InputSplit implements Writable {
    
        private Path[] paths;
        private long[] startoffset;
        private long[] lengths;
        private String[] locations;
        private long totLength;
    
        public CombineFileSplit(Path[] files, long[] start, long[] lengths,
                String[] locations) {
            initSplit(files, start, lengths, locations);
        }
    
        private void initSplit(Path[] files, long[] start, long[] lengths,
                String[] locations) {
            this.startoffset = start;
            this.lengths = lengths;
            this.paths = files;
            this.totLength = 0;
            this.locations = locations;
            for (long length : lengths) {
                totLength += length;
            }
        }
    
        public long getLength() {
            return totLength;
        }
    
        /** Returns all the Paths where this input-split resides */
        public String[] getLocations() throws IOException {
            return locations;
        }
    
        //省略了部分构造函数和方法,深入学习请阅读源文件
    }

    为什么介绍该类呢,因为接下来要学习《Hadoop学习(五) – 小文件处理》,深入理解该类,将有助于该节学习。

    上面我们介绍的FileSplit对应的是一个输入文件,也就是说,如果用FileSplit对应的FileInputFormat作为输入格式,那么即使文件特别小,也是作为一个单独的InputSplit来处理,而每一个InputSplit将会由一个独立的Mapper Task来处理。在输入数据是由大量小文件组成的情形下,就会有同样大量的InputSplit,从而需要同样大量的Mapper来处理,大量的Mapper Task创建销毁开销将是巨大的,甚至对集群来说,是灾难性的!

    CombineFileSplit是针对小文件的分片,它将一系列小文件封装在一个InputSplit内,这样一个Mapper就可以处理多个小文件。可以有效的降低进程开销。与FileSplit类似,CombineFileSplit同样包含文件路径,分片起始位置,分片大小和分片数据所在的host列表四个属性,只不过这些属性不再是一个值,而是一个列表。

    需要注意的一点是,CombineFileSplit的getLength()方法,返回的是这一系列数据的数据的总长度。

    现在,我们已深入的了解了InputSplit的概念,看了其源码,知道了其属性。我们知道数据分片是在InputFormat中实现的,接下来,我们就深入InputFormat的一个子类,FileInputFormat看看分片是如何进行的。
    4. FileInputFormat

    FileInputFormat中,分片方法代码及详细注释如下,就不再详细解释该方法:

    public List<InputSplit> getSplits(JobContext job) throws IOException {
        // 首先计算分片的最大和最小值。这两个值将会用来计算分片的大小。
        // 由源码可知,这两个值可以通过mapred.min.split.size和mapred.max.split.size来设置
        long minSize = Math.max(getFormatMinSplitSize(), getMinSplitSize(job));
        long maxSize = getMaxSplitSize(job);
    
        // splits链表用来存储计算得到的输入分片结果
        List<InputSplit> splits = new ArrayList<InputSplit>();
        // files链表存储由listStatus()获取的输入文件列表,listStatus比较特殊,我们在下面详细研究
        List<FileStatus> files = listStatus(job);
        for (FileStatus file : files) {
            Path path = file.getPath();
            FileSystem fs = path.getFileSystem(job.getConfiguration());
            long length = file.getLen();
            // 获取该文件所有的block信息列表[hostname, offset, length]
            BlockLocation[] blkLocations = fs.getFileBlockLocations(file, 0,
                    length);
            // 判断文件是否可分割,通常是可分割的,但如果文件是压缩的,将不可分割
            // 是否分割可以自行重写FileInputFormat的isSplitable来控制
            if ((length != 0) && isSplitable(job, path)) {
                long blockSize = file.getBlockSize();
                // 计算分片大小
                // 即 Math.max(minSize, Math.min(maxSize, blockSize));
                // 也就是保证在minSize和maxSize之间,且如果minSize<=blockSize<=maxSize,则设为blockSize
                long splitSize = computeSplitSize(blockSize, minSize, maxSize);
    
                long bytesRemaining = length;
                // 循环分片。
                // 当剩余数据与分片大小比值大于Split_Slop时,继续分片, 小于等于时,停止分片
                while (((double) bytesRemaining) / splitSize > SPLIT_SLOP) {
                    int blkIndex = getBlockIndex(blkLocations, length
                            - bytesRemaining);
                    splits.add(new FileSplit(path, length - bytesRemaining,
                            splitSize, blkLocations[blkIndex].getHosts()));
                    bytesRemaining -= splitSize;
                }
                // 处理余下的数据
                if (bytesRemaining != 0) {
                    splits.add(new FileSplit(path, length - bytesRemaining,
                            bytesRemaining,
                            blkLocations[blkLocations.length - 1].getHosts()));
                }
            } else if (length != 0) {
                // 不可split,整块返回
                splits.add(new FileSplit(path, 0, length, blkLocations[0]
                        .getHosts()));
            } else {
                // 对于长度为0的文件,创建空Hosts列表,返回
                splits.add(new FileSplit(path, 0, length, new String[0]));
            }
        }
    
        // 设置输入文件数量
        job.getConfiguration().setLong(NUM_INPUT_FILES, files.size());
        return splits;
    }
    

    在getSplits()方法中,我们提到了一个方法,listStatus(),我们先来看一下这个方法:

    protected List<FileStatus> listStatus(JobContext job) throws IOException {
    
        // 省略部分代码...
    
        List<PathFilter> filters = new ArrayList<PathFilter>();
        filters.add(hiddenFileFilter);
        PathFilter jobFilter = getInputPathFilter(job);
        if (jobFilter != null) {
            filters.add(jobFilter);
        }
        // 创建了一个MultiPathFilter,其内部包含了两个PathFilter
        // 一个为过滤隐藏文件的Filter,一个为用户自定义Filter(如果制定了)
        PathFilter inputFilter = new MultiPathFilter(filters);
    
        for (int i = 0; i < dirs.length; ++i) {
            Path p = dirs[i];
            FileSystem fs = p.getFileSystem(job.getConfiguration());
            FileStatus[] matches = fs.globStatus(p, inputFilter);
            if (matches == null) {
                errors.add(new IOException("Input path does not exist: " + p));
            } else if (matches.length == 0) {
                errors.add(new IOException("Input Pattern " + p
                        + " matches 0 files"));
            } else {
                for (FileStatus globStat : matches) {
                    if (globStat.isDir()) {
                        for (FileStatus stat : fs.listStatus(
                                globStat.getPath(), inputFilter)) {
                            result.add(stat);
                        }
                    } else {
                        result.add(globStat);
                    }
                }
            }
        }
    
        // 省略部分代码
    }

    NLineInputFormat是一个很有意思的FileInputFormat的子类,有时间可以了解一下。

    5. PathFilter

    PathFilter文件筛选器接口,使用它我们可以控制哪些文件要作为输入,哪些不作为输入。PathFilter有一个accept(Path)方法,当接收的Path要被包含进来,就返回true,否则返回false。可以通过设置mapred.input.pathFilter.class来设置用户自定义的PathFilter。

    public interface PathFilter {
      /**
       * Tests whether or not the specified abstract pathname should be
       * included in a pathname list.
       *
       * @param  path  The abstract pathname to be tested
       * @return  <code>true</code> if and only if <code>pathname</code>
       *          should be included
       */
      boolean accept(Path path);
    }
    
    FileInputFormat类有hiddenFileFilter属性:
    
    private static final PathFilter hiddenFileFilter = new PathFilter() {
        public boolean accept(Path p) {
            String name = p.getName();
            return !name.startsWith("_") && !name.startsWith(".");
        }
    };
    
    hiddenFileFilter过滤掉隐藏文件。
    
    FileInputFormat类还有一个内部类:
    
    private static class MultiPathFilter implements PathFilter {
        private List<PathFilter> filters;
    
        public MultiPathFilter(List<PathFilter> filters) {
            this.filters = filters;
        }
    
        public boolean accept(Path path) {
            for (PathFilter filter : filters) {
                if (!filter.accept(path)) {
                    return false;
                }
            }
            return true;
        }
    }

    MultiPathFilter类类似于一个PathFilter代理,其内部有一个PathFilter list属性,只有符合其内部所有filter的路径,才被作为输入。在FileInputFormat类中,它被listStatus()方法调用,而listStatus()又被getSplits()方法调用来获取输入文件,也即实现了在获取输入分片前进行文件过滤。

    至此,我们已经利用PathFilter过滤了文件,利用FileInputFormat 的getSplits方法,计算出了Mapreduce的Map的InputSplit。作业的输入分片有了,而这些分片,是怎么被Map读取的呢?

    这由InputFormat中的另一个方法createRecordReader()来负责。FileInputFormat没有对于这个方法的实现,而是交给子类自行去实现它。

    6. RecordReader

    RecordReader将读入到Map的数据拆分成

    public abstract class RecordReader<KEYIN, VALUEIN> implements Closeable {
    
        /**
         * 由一个InputSplit初始化
         */
        public abstract void initialize(InputSplit split, TaskAttemptContext context)
                throws IOException, InterruptedException;
    
        /**
         * 顾名思义,读取分片下一个<key, value>对
         */
        public abstract boolean nextKeyValue() throws IOException,
                InterruptedException;
    
        /**
         * Get the current key
         */
        public abstract KEYIN getCurrentKey() throws IOException,
                InterruptedException;
    
        /**
         * Get the current value.
         */
        public abstract VALUEIN getCurrentValue() throws IOException,
                InterruptedException;
    
        /**
         * 跟踪读取分片的进度
         */
        public abstract float getProgress() throws IOException,
                InterruptedException;
    
        /**
         * Close the record reader.
         */
        public abstract void close() throws IOException;
    }
    

    从源码可以看出,一个RecordReader主要来完成这几项功能。接下来,通过一个具体的RecordReader实现类,来详细了解一下各功能的具体操作。

    public class LineRecordReader extends RecordReader<LongWritable, Text> {
        private CompressionCodecFactory compressionCodecs = null;
        private long start;
        private long pos;
        private long end;
        private LineReader in;
        private int maxLineLength;
        private LongWritable key = null;
        private Text value = null;
    
        // initialize函数即对LineRecordReader的一个初始化
        // 主要是计算分片的始末位置,打开输入流以供读取K-V对,处理分片经过压缩的情况等
        public void initialize(InputSplit genericSplit, TaskAttemptContext context)
                throws IOException {
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                    Integer.MAX_VALUE);
            start = split.getStart();
            end = start + split.getLength();
            final Path file = split.getPath();
            compressionCodecs = new CompressionCodecFactory(job);
            final CompressionCodec codec = compressionCodecs.getCodec(file);
    
            // 打开文件,并定位到分片读取的起始位置
            FileSystem fs = file.getFileSystem(job);
            FSDataInputStream fileIn = fs.open(split.getPath());
            boolean skipFirstLine = false;
            if (codec != null) {
                // 文件是压缩文件的话,直接打开文件
                in = new LineReader(codec.createInputStream(fileIn), job);
                end = Long.MAX_VALUE;
            } else {
                //
                if (start != 0) {
                    skipFirstLine = true;
                    --start;
                    // 定位到偏移位置,下次读取就会从便宜位置开始
                    fileIn.seek(start);
                }
                in = new LineReader(fileIn, job);
            }
            if (skipFirstLine) { // skip first line and re-establish "start".
                start += in.readLine(new Text(), 0,
                        (int) Math.min((long) Integer.MAX_VALUE, end - start));
            }
            this.pos = start;
        }
    
        public boolean nextKeyValue() throws IOException {
            if (key == null) {
                key = new LongWritable();
            }
            key.set(pos);// key即为偏移量
            if (value == null) {
                value = new Text();
            }
            int newSize = 0;
            while (pos < end) {
                newSize = in.readLine(value, maxLineLength,
                        Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
                                maxLineLength));
                // 读取的数据长度为0,则说明已读完
                if (newSize == 0) {
                    break;
                }
                pos += newSize;
                // 读取的数据长度小于最大行长度,也说明已读取完毕
                if (newSize < maxLineLength) {
                    break;
                }
                // 执行到此处,说明该行数据没读完,继续读入
            }
            if (newSize == 0) {
                key = null;
                value = null;
                return false;
            } else {
                return true;
            }
        }
        // 省略了部分方法
    }

    数据从InputSplit分片中读出已经解决,但是RecordReader是如何被Mapreduce框架利用的呢?我们先看一下Mapper类

    7. Mapper

    public class Mapper<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
    
        public class Context extends MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
            public Context(Configuration conf, TaskAttemptID taskid,
                    RecordReader<KEYIN, VALUEIN> reader,
                    RecordWriter<KEYOUT, VALUEOUT> writer,
                    OutputCommitter committer, StatusReporter reporter,
                    InputSplit split) throws IOException, InterruptedException {
                super(conf, taskid, reader, writer, committer, reporter, split);
            }
        }
    
        /**
         * 预处理,仅在map task启动时运行一次
         */
        protected void setup(Context context) throws IOException,
                InterruptedException {
        }
    
        /**
         * 对于InputSplit中的每一对<key, value>都会运行一次
         */
        @SuppressWarnings("unchecked")
        protected void map(KEYIN key, VALUEIN value, Context context)
                throws IOException, InterruptedException {
            context.write((KEYOUT) key, (VALUEOUT) value);
        }
    
        /**
         * 扫尾工作,比如关闭流等
         */
        protected void cleanup(Context context) throws IOException,
                InterruptedException {
        }
    
        /**
         * map task的驱动器
         */
        public void run(Context context) throws IOException, InterruptedException {
            setup(context);
            while (context.nextKeyValue()) {
                map(context.getCurrentKey(), context.getCurrentValue(), context);
            }
            cleanup(context);
        }
    }
    

    重点看一下Mapper.class中的run()方法,它相当于map task的驱动。

    run()方法首先调用setup()进行初始操作
    然后循环对每个从context.nextKeyValue()获取的“K-V对”调用map()函数进行处理
    最后调用cleanup()做最后的处理

    事实上,content.nextKeyValue()就是使用了相应的RecordReader来获取“K-V对”。Mapper.class中的Context类,它继承自MapContext类,使用一个RecordReader进行构造。下面我们再看这个MapContext。

    public class MapContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> extends
            TaskInputOutputContext<KEYIN, VALUEIN, KEYOUT, VALUEOUT> {
        private RecordReader<KEYIN, VALUEIN> reader;
        private InputSplit split;
    
        public MapContext(Configuration conf, TaskAttemptID taskid,
                RecordReader<KEYIN, VALUEIN> reader,
                RecordWriter<KEYOUT, VALUEOUT> writer, OutputCommitter committer,
                StatusReporter reporter, InputSplit split) {
            super(conf, taskid, writer, committer, reporter);
            this.reader = reader;
            this.split = split;
        }
    
        /**
         * Get the input split for this map.
         */
        public InputSplit getInputSplit() {
            return split;
        }
    
        @Override
        public KEYIN getCurrentKey() throws IOException, InterruptedException {
            return reader.getCurrentKey();
        }
    
        @Override
        public VALUEIN getCurrentValue() throws IOException, InterruptedException {
            return reader.getCurrentValue();
        }
    
        @Override
        public boolean nextKeyValue() throws IOException, InterruptedException {
            return reader.nextKeyValue();
        }
    
    }

    从MapContent类中的方法可见,content.getCurrentKey(),content.getCurrentValue()以及nextKeyValue(),其实都是对RecordReader方法的封装,即MapContext是直接使用传入的RecordReader来对InputSplit进行“K-V对”读取的。

    至此,我们已经清楚的知道Mapreduce的输入文件是如何被过滤、读取、分片、读出“K-V对”,然后交给Mapper类来处理的。

    自定义 hadoop MapReduce InputFormat 切分输入文件

    在上一篇中,我们实现了按 cookieId 和 time 进行二次排序,现在又有新问题:假如我需要按 cookieId 和 cookieId&time 的组合进行分析呢?此时最好的办法是自定义 InputFormat,让 mapreduce 一次读取一个 cookieId 下的所有记录,然后再按 time 进行切分 session,逻辑伪码如下:

    for OneSplit in MyInputFormat.getSplit() // OneSplit 是某个 cookieId 下的所有记录
    
        for session in OneSplit // session 是按 time 把 OneSplit 进行了二次分割
    
            for line in session // line 是 session 中的每条记录,对应原始日志的某条记录

    1、原理:

    InputFormat是MapReduce中一个很常用的概念,它在程序的运行中到底起到了什么作用呢?
    InputFormat其实是一个接口,包含了两个方法:

    
    public interface InputFormat<K, V> {
      InputSplit[] getSplits(JobConf job, int numSplits) throws IOException;
    
      RecordReader<K, V> createRecordReader(InputSplit split, 
    
                                      TaskAttemptContext context)  throws IOException;
    }

    这两个方法有分别完成着以下工作:方法 getSplits 将输入数据切分成splits,splits的个数即为map tasks的个数,splits的大小默认为块大小,即64M方法 方法 getRecordReader 将每个 split 解析成records, 再依次将record解析成K,V对
    也就是说 InputFormat完成以下工作:
    InputFile –> splits –> K,V

    系统常用的 InputFormat 又有哪些呢?
    这里写图片描述

    其中Text InputFormat便是最常用的,它的

    InputFormat(interface), FileInputFormat(abstract class), TextInputFormat(class),
    RecordReader (interface), Line RecordReader(class)的关系
          FileInputFormat implements  InputFormat
          TextInputFormat extends  FileInputFormat
          TextInputFormat.get RecordReader calls  Line RecordReader
          Line RecordReader  implements  RecordReader

    对于InputFormat接口,上面已经有详细的描述
    再看看 FileInputFormat,它实现了 InputFormat接口中的 getSplits方法,而将 getRecordReader与isSplitable留给具体类(如 TextInputFormat )实现, isSplitable方法通常不用修改,所以只需要在自定义的 InputFormat中实现
    getRecordReader方法即可,而该方法的核心是调用 Line RecordReader(即由LineRecorderReader类来实现 ” 将每个s plit解析成records, 再依次将record解析成

      public interface RecordReader<K, V> {
      boolean   next(K key, V value) throws IOException;
      K   createKey();
      V   createValue();
      long   getPos() throws IOException;
      public void   close() throws IOException;
      float   getProgress() throws IOException;
    }

    因此自定义InputFormat的核心是自定义一个实现接口RecordReader类似于LineRecordReader的类,该类的核心也正是重写接口RecordReader中的几大方法,
    定义一个InputFormat的核心是定义一个类似于LineRecordReader的,自己的RecordReader
    2、代码:

    package MyInputFormat;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.JobContext;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    public class TrackInputFormat extends FileInputFormat<LongWritable, Text> {
    
        @SuppressWarnings("deprecation")
        @Override
        public RecordReader<LongWritable, Text> createRecordReader(
                InputSplit split, TaskAttemptContext context) {
            return new TrackRecordReader();
        }
    
        @Override
        protected boolean isSplitable(JobContext context, Path file) {
            CompressionCodec codec = new CompressionCodecFactory(
                    context.getConfiguration()).getCodec(file);
            return codec == null;
        }
    
    }
    package MyInputFormat;
    
    import java.io.IOException;
    import java.io.InputStream;
    
    import org.apache.commons.logging.Log;
    import org.apache.commons.logging.LogFactory;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FSDataInputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.io.compress.CompressionCodec;
    import org.apache.hadoop.io.compress.CompressionCodecFactory;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.RecordReader;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    /**
     * Treats keys as offset in file and value as line.
     * 
     * @deprecated Use
     *             {@link org.apache.hadoop.mapreduce.lib.input.LineRecordReader}
     *             instead.
     */
    public class TrackRecordReader extends RecordReader<LongWritable, Text> {
        private static final Log LOG = LogFactory.getLog(TrackRecordReader.class);
    
        private CompressionCodecFactory compressionCodecs = null;
        private long start;
        private long pos;
        private long end;
        private NewLineReader in;
        private int maxLineLength;
        private LongWritable key = null;
        private Text value = null;
        // ----------------------
        // 行分隔符,即一条记录的分隔符
        private byte[] separator = "END\n".getBytes();
    
        // --------------------
    
        public void initialize(InputSplit genericSplit, TaskAttemptContext context)
                throws IOException {
            FileSplit split = (FileSplit) genericSplit;
            Configuration job = context.getConfiguration();
            this.maxLineLength = job.getInt("mapred.linerecordreader.maxlength",
                    Integer.MAX_VALUE);
            start = split.getStart();
            end = start + split.getLength();
            final Path file = split.getPath();
            compressionCodecs = new CompressionCodecFactory(job);
            final CompressionCodec codec = compressionCodecs.getCodec(file);
    
            FileSystem fs = file.getFileSystem(job);
            FSDataInputStream fileIn = fs.open(split.getPath());
            boolean skipFirstLine = false;
            if (codec != null) {
                in = new NewLineReader(codec.createInputStream(fileIn), job);
                end = Long.MAX_VALUE;
            } else {
                if (start != 0) {
                    skipFirstLine = true;
                    this.start -= separator.length;//
                    // --start;
                    fileIn.seek(start);
                }
                in = new NewLineReader(fileIn, job);
            }
            if (skipFirstLine) { // skip first line and re-establish "start".
                start += in.readLine(new Text(), 0,
                        (int) Math.min((long) Integer.MAX_VALUE, end - start));
            }
            this.pos = start;
        }
    
        public boolean nextKeyValue() throws IOException {
            if (key == null) {
                key = new LongWritable();
            }
            key.set(pos);
            if (value == null) {
                value = new Text();
            }
            int newSize = 0;
            while (pos < end) {
                newSize = in.readLine(value, maxLineLength,
                        Math.max((int) Math.min(Integer.MAX_VALUE, end - pos),
                                maxLineLength));
                if (newSize == 0) {
                    break;
                }
                pos += newSize;
                if (newSize < maxLineLength) {
                    break;
                }
    
                LOG.info("Skipped line of size " + newSize + " at pos "
                        + (pos - newSize));
            }
            if (newSize == 0) {
                key = null;
                value = null;
                return false;
            } else {
                return true;
            }
        }
    
        @Override
        public LongWritable getCurrentKey() {
            return key;
        }
    
        @Override
        public Text getCurrentValue() {
            return value;
        }
    
        /**
         * Get the progress within the split
         */
        public float getProgress() {
            if (start == end) {
                return 0.0f;
            } else {
                return Math.min(1.0f, (pos - start) / (float) (end - start));
            }
        }
    
        public synchronized void close() throws IOException {
            if (in != null) {
                in.close();
            }
        }
    
        public class NewLineReader {
            private static final int DEFAULT_BUFFER_SIZE = 64 * 1024;
            private int bufferSize = DEFAULT_BUFFER_SIZE;
            private InputStream in;
            private byte[] buffer;
            private int bufferLength = 0;
            private int bufferPosn = 0;
    
            public NewLineReader(InputStream in) {
                this(in, DEFAULT_BUFFER_SIZE);
            }
    
            public NewLineReader(InputStream in, int bufferSize) {
                this.in = in;
                this.bufferSize = bufferSize;
                this.buffer = new byte[this.bufferSize];
            }
    
            public NewLineReader(InputStream in, Configuration conf)
                    throws IOException {
                this(in, conf.getInt("io.file.buffer.size", DEFAULT_BUFFER_SIZE));
            }
    
            public void close() throws IOException {
                in.close();
            }
    
            public int readLine(Text str, int maxLineLength, int maxBytesToConsume)
                    throws IOException {
                str.clear();
                Text record = new Text();
                int txtLength = 0;
                long bytesConsumed = 0L;
                boolean newline = false;
                int sepPosn = 0;
                do {
                    // 已经读到buffer的末尾了,读下一个buffer
                    if (this.bufferPosn >= this.bufferLength) {
                        bufferPosn = 0;
                        bufferLength = in.read(buffer);
                        // 读到文件末尾了,则跳出,进行下一个文件的读取
                        if (bufferLength <= 0) {
                            break;
                        }
                    }
                    int startPosn = this.bufferPosn;
                    for (; bufferPosn < bufferLength; bufferPosn++) {
                        // 处理上一个buffer的尾巴被切成了两半的分隔符(如果分隔符中重复字符过多在这里会有问题)
                        if (sepPosn > 0 && buffer[bufferPosn] != separator[sepPosn]) {
                            sepPosn = 0;
                        }
                        // 遇到行分隔符的第一个字符
                        if (buffer[bufferPosn] == separator[sepPosn]) {
                            bufferPosn++;
                            int i = 0;
                            // 判断接下来的字符是否也是行分隔符中的字符
                            for (++sepPosn; sepPosn < separator.length; i++, sepPosn++) {
                                // buffer的最后刚好是分隔符,且分隔符被不幸地切成了两半
                                if (bufferPosn + i >= bufferLength) {
                                    bufferPosn += i - 1;
                                    break;
                                }
                                // 一旦其中有一个字符不相同,就判定为不是分隔符
                                if (this.buffer[this.bufferPosn + i] != separator[sepPosn]) {
                                    sepPosn = 0;
                                    break;
                                }
                            }
                            // 的确遇到了行分隔符
                            if (sepPosn == separator.length) {
                                bufferPosn += i;
                                newline = true;
                                sepPosn = 0;
                                break;
                            }
                        }
                    }
                    int readLength = this.bufferPosn - startPosn;
                    bytesConsumed += readLength;
                    // 行分隔符不放入块中
                    if (readLength > maxLineLength - txtLength) {
                        readLength = maxLineLength - txtLength;
                    }
                    if (readLength > 0) {
                        record.append(this.buffer, startPosn, readLength);
                        txtLength += readLength;
                        // 去掉记录的分隔符
                        if (newline) {
                            str.set(record.getBytes(), 0, record.getLength()
                                    - separator.length);
                        }
                    }
                } while (!newline && (bytesConsumed < maxBytesToConsume));
                if (bytesConsumed > (long) Integer.MAX_VALUE) {
                    throw new IOException("Too many bytes before newline: "
                            + bytesConsumed);
                }
    
                return (int) bytesConsumed;
            }
    
            public int readLine(Text str, int maxLineLength) throws IOException {
                return readLine(str, maxLineLength, Integer.MAX_VALUE);
            }
    
            public int readLine(Text str) throws IOException {
                return readLine(str, Integer.MAX_VALUE, Integer.MAX_VALUE);
            }
        }
    }
    package MyInputFormat;
    
    import java.io.IOException;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    public class TestMyInputFormat {
    
        public static class MapperClass extends Mapper<LongWritable, Text, Text, Text> {
    
            public void map(LongWritable key, Text value, Context context) throws IOException,
                    InterruptedException {
                System.out.println("key:\t " + key);
                System.out.println("value:\t " + value);
                System.out.println("-------------------------");
            }
        }
    
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
            Configuration conf = new Configuration();
             Path outPath = new Path("/hive/11");
             FileSystem.get(conf).delete(outPath, true);
            Job job = new Job(conf, "TestMyInputFormat");
            job.setInputFormatClass(TrackInputFormat.class);
            job.setJarByClass(TestMyInputFormat.class);
            job.setMapperClass(TestMyInputFormat.MapperClass.class);
            job.setNumReduceTasks(0);
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(Text.class);
    
            FileInputFormat.addInputPath(job, new Path(args[0]));
            org.apache.hadoop.mapreduce.lib.output.FileOutputFormat.setOutputPath(job, outPath);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

    3、测试数据:
    cookieId   time   url    cookieOverFlag
    1       a   1_hao123
    1       a   1_baidu
    1       b   1_google   2END
    2       c   2_google
    2       c   2_hao123
    2       c   2_google   1END
    3       a   3_baidu
    3       a   3_sougou
    3       b   3_soso     2END
    4、结果:

    key:     0
    value:   1  a   1_hao123    
    1   a    1_baidu    
    1   b    1_google   2
    -------------------------
    key:     47
    value:   2  c    2_google   
    2   c    2_hao123   
    2   c    2_google   1
    -------------------------
    key:     96
    value:   3  a    3_baidu    
    3   a    3_sougou   
    3   b    3_soso 2
    -------------------------
    展开全文
  • 8、FileInputFormat

    2019-12-25 11:46:55
    FileInputFormat切片分析 FileInputFormat默认切片策略 TextInputFormat KeyValueInputFormat NLineInputFormat CombineTextInputFormat 自定义类继承FileInputFormat FileInputFormat切片分析 FileInpu...

    InputFormat

    FileInputFormat切片分析

    FileInputFormat是所有使用文件作为数据源的InputFormat实现的基类。一个是用于指出job文件的输入路径,一个是为输出文件生成分片的代码实现。

    FileInputFormat默认切片策略

    切片的大小由minSize,gloabSize和blocksize共同作用后确定。FileInputFormat更加适合用于大文件的切割,因为他是按照文件切割,一个小文件也会成为一个块。

    ArrayList<FileSplit> splits = new ArrayList<FileSplit>(numSplits);
        NetworkTopology clusterMap = new NetworkTopology();
        // 遍历文件,一个文件一个文件的切
        for (FileStatus file: files) {
          Path path = file.getPath();
          long length = file.getLen();
          if (length != 0) {
            //获得一个文件系统
            FileSystem fs = path.getFileSystem(job);
            BlockLocation[] blkLocations;
            if (file instanceof LocatedFileStatus) {
              blkLocations = ((LocatedFileStatus) file).getBlockLocations();
            } else {
              blkLocations = fs.getFileBlockLocations(file, 0, length);
            }
            // 判断是否可切。压缩文件什么的就不能切。如果不能切的话整个文件作为一个split
            if (isSplitable(fs, path)) {
              long blockSize = file.getBlockSize();
            // computeSplitSize(long goalSize, long minSize,long blockSize) {Math.max(minSize, Math.min(goalSize, blockSize))};,
            // 通过修改minSize和maxSize就可以控制切片的大小。
              long splitSize = computeSplitSize(goalSize, minSize, blockSize);
              long bytesRemaining = length;
            // 当文件剩下的大小小于SPLIT_SLOP时就不切了,SPLIT_SLOP默认为1.1,也就是说如果最后块的大小小于splitSize*1.1就不切了。
              while (((double) bytesRemaining)/splitSize > SPLIT_SLOP) {
                String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,
                    length-bytesRemaining, splitSize, clusterMap);
                splits.add(makeSplit(path, length-bytesRemaining, splitSize,
                    splitHosts[0], splitHosts[1]));
                bytesRemaining -= splitSize;
              }
    
            // 剩下的大于1.1小于1.1的块单独作为一个切片,如果切片最后的是个空片,也要作为一个片。
              if (bytesRemaining != 0) {
                String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations, length
                    - bytesRemaining, bytesRemaining, clusterMap);
                splits.add(makeSplit(path, length - bytesRemaining, bytesRemaining,
                    splitHosts[0], splitHosts[1]));
              }
            } else {
              String[][] splitHosts = getSplitHostsAndCachedHosts(blkLocations,0,length,clusterMap);
              splits.add(makeSplit(path, 0, length, splitHosts[0], splitHosts[1]));
            }
          } else {
            //Create empty hosts array for zero length files
            splits.add(makeSplit(path, 0, length, new String[0]));
          }
        }
        sw.stop();
        if (LOG.isDebugEnabled()) {
          LOG.debug("Total # of splits generated by getSplits: " + splits.size()
              + ", TimeTaken: " + sw.now(TimeUnit.MILLISECONDS));
        }
        return splits.toArray(new FileSplit[splits.size()]);
      }

    TextInputFormat

    切片:采用父类切片策略。 RecordReader:采用LineRecorder,读取一行作为一个key-value.key是改行的偏移量,Text是改行的内容。

    KeyValueInputFormat

    切片:默认的切片策略! RecordReader: KeyValueLineRecordReader : 读取一行,使用分隔符分割每行,封装key-value Text Key: 分隔符前内容 Text value: 分隔符后的内容

    NLineInputFormat

    切片: 以文件为单位,切分文件的mapreduce.input.lineinputformat.linespermap行(如果没有指定,默认为1)作为一个切片! RecordReader: LineRecordReader : 读取一行作为一个key-value LongWritable key: 每行的偏移量 Text value: 每行的内容 适合宽行数据的处理!(一次切几行增加并行度)

    CombineTextInputFormat

    CombineTextInputFormat可以将多个小文件合并到一个切片中处理!主要是为了解决小文件过多的问题,决定哪些块放在一起时,CombineTextInputFormat也会考虑节点和机架的因素。 设置一个参数,每片的大小mapreduce.input.fileinputformat.split.maxsiz(maxSize)

    切片策略:

    (a)判断虚拟存储的文件大小是否大于setMaxInputSplitSize值,大于等于则单独形成一个切片。
    (b)如果不大于则跟下一个虚拟存储文件进行合并,共同形成一个切片。\

    RecordReader: LineRecordReader : 读取一行作为一个key-value
    LongWritable:key: 每行的偏移量
    Text value: 每行的内容

    自定义类继承FileInputFormat

    展开全文
  • FileInputFormat切片机制

    2019-10-12 23:05:43
    FileInputFormat切片机制

    FileInputFormat切片机制

     

     

    展开全文
  • MapReduce-FileInputFormat

    2019-04-29 19:20:00
    在运行 MapReduce 程序时,输入...FileInputFormat 用来读取数据,其本身为一个抽象类,继承自 InputFormat 抽象类,针对不同的类型的数据有不同的子类来处理。 FileInputFormat 常见的接口实现类包括:TextInputF...
  • 写MR过程中经常会遇到多个源路径的输入,我们可以在MR程序主函数中通过FileInputFormat.addInputPaths(job,args[0])方法来实现, args[0]可以是folder1或者folder1,folder2只要以逗号分隔就可以了。 这样在执行...
  • FileInputFormat源码解析

    2019-11-03 17:07:03
    FileInputFormat源码解析(input.getSplits(job)) (1)找到你数据存储的目录。 (2)开始遍历处理(规划切片)目录下的每一个文件 (3)遍历第一个文件ss.txt a)获取文件大小fs.sizeOf(ss.txt); b)...
  • FileInputFormat切片源码解析 一、FileInputFormat切片机制 1.简单按照文件的内容长度进行切片 2.切片大小,默认等于Block大小 3.切片时不考虑数据集体性,而是针对每一个文件单独切片 二、切片过程 三、...
  • fileInputformat切片总结

    2018-11-11 21:22:12
    FileInputFormat源码解析(input.getSplits(job)) 找到你数据存储目录 开始遍历处理(规划切片)目录下的每一个文件 遍历第一个文件 aa.txt (1) 获取文件大小fs.sizeOf(aa.txt) (2)计算切片的大小 ...
  • Hadoop--入门FileInputFormat切片机制和实现类FileInputFormat切片机制CombineTextInputFormat切片机制CombineTextInputFormat案例FileInputFormat实现类1. TextInputFormat2. KeyValueTextInputFormat3. ...
  • 6. FileInputFormat实现类

    2020-07-10 22:29:52
    抽象类FileInputFormat`常见的`实现类`包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat`等。
  • 1.FileInputFormat 类介绍 FileInputFormat 是所有使用文件作为数据源的 InputFormat 实现的基类 提供两个功能: 1.用于支出作业的输入文件的位置; 2.输入文件生成分片的实现代码段; 类结构图: 2.通过 ...
  • hadoop mapreduce相关类 FileInputFormat 官方链接 http://hadoop.apache.org/docs/r2.9.1/api/ 类继承关系 abstract InputFormat 来自官方api文档中关于InputFormat的描述。 InputFormat describes the input-...
  • FileInputFormat 常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat 和 自定义 InputFormat 等。 二、TextInputFormat TextInputFormat 是默认的 ...
  • 写MR过程中经常会遇到多个源路径的输入,我们可以在MR程序主函数中通过FileInputFormat.addInputPaths(job, conf.get("input_dir"))方法来实现, 在提交脚本中将多个源的路径用“,”分隔,input_dir=$INPUT_FILE...
  • 一、切片机制 二、FileInputFormat 切片大小参数设置
  • 1. FileInputFormat中默认的切片机制 (1)简单地按照文件的内容长度进行切片 (2)切片大小,默认等于block大小 (本地模式块大小32M,yarn模式128M,老的版本64M) (3)切片时不考虑数据集整体,而是逐个针对每...
  • FileInputFormat 切片机制源码分析: FileInputFormat类中的getSplits()方法 .
  • fileinputformat切片机制 本地运行时为切片32m,Hadoop1.x集群切片为64m,Hadoop2.x集群切片为128m 要注意的事情是在计算切片大小的时候我们如果要改动切片的大小blocksize,那么就要在这个公式上 面下手:Math.max...
  • FileInputFormat在MapReduce中主要负责将文件拆分为多个切片InputSplit,并根据不同的记录读取器RecordReaders去读取切片数据,转换为为标准的<key,value>键值对,作为map 的输入 而FileInputFormat有多种...
  • 1. FileInputFormat切片机制 2. CombineTextInputFormat切片机制 框架默认的TextInputFormat切片机制是对任务按文件规划切片,不管文件多小,都会是一个单独的切片,都会交给一个MapTask,这样如果有大量小文件,...
  • 但是我们可以通过重写FileInputFormat和FileOutputFormat类来实现自定义的mapper输入和reducer输出。 1.自定义mapper输入是重写FileInputFormat和RecordReader类,而真正产生mapper输入的key和
  • Hadoop 源码详解之FileInputFormat类【updating…】 1. 类释义 A base class for file-based InputFormats. 针对基于文件的 InputFormats 一个基类 FileInputFormat is the base class for all file-based...
  • FileInputFormat.setInputPaths的执行原理

    千次阅读 2019-07-11 10:38:29
    今天在看 MapReduce 源码的时候看了一下 FileInputFormat 的 setInputPaths 方法,内容如下: /** * Set the array of {@link Path}s as the list of inputs * for the map-reduce job. * * @param job The ...
  • FileInputFormat常见的接口实现类包括:TextInputFormat、KeyValueTextInputFormat、NLineInputFormat、CombineTextInputFormat和自定义InputFormat等。 1.TextInputFormat(输入k,v类型为LongWritable,Text) 2....
  • Hadoop_FileInputFormat分片

    2018-02-02 14:15:07
    Hadoop_FileInputFormat分片 Hadoop学习笔记总结 01. InputFormat和OutFormat 1. 整个MapReduce组件 InputFormat类和OutFormat类都是抽象类。 可以实现文件系统的读写,数据库的读写,...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,054
精华内容 421
关键字:

fileinputformat