精华内容
下载资源
问答
  • MapReduce--OutputFormat详解以及实现自定义OutputFormat 1. OutputFormat源代码解析 MapReuce OutputFormat 输出一般输出到:文件里面或者数据库中,今天就对常用的OutputFormat来分析一下 OutputFormat中的源...

          MapReduce--OutputFormat详解以及实现自定义OutputFormat

     

    1. OutputFormat源代码解析

    • MapReuce OutputFormat 输出一般输出到:文件里面或者数据库中,今天就对常用的OutputFormat来分析一下
    • OutputFormat中的源代码
    /** 
     * <code>OutputFormat</code> describes the output-specification for a 
     * Map-Reduce job.
     *
     * <p>The Map-Reduce framework relies on the <code>OutputFormat</code> of the
     * job to:<p>
     * <ol>
     *   <li>
     *   Validate the output-specification of the job. For e.g. check that the 
     *   output directory doesn't already exist. 
     *   <li>
     *   Provide the {@link RecordWriter} implementation to be used to write out
     *   the output files of the job. Output files are stored in a 
     *   {@link FileSystem}.
     *   </li>
     * </ol>
     * 
     * @see RecordWriter
     */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public abstract class OutputFormat<K, V> {
    
      /** 
       * Get the {@link RecordWriter} for the given task.
       *
       * @param context the information about the current task.
       * @return a {@link RecordWriter} to write the output for the job.
       * @throws IOException
       */
      public abstract RecordWriter<K, V> 
        getRecordWriter(TaskAttemptContext context
                        ) throws IOException, InterruptedException;
    
      /** 
       * Check for validity of the output-specification for the job.
       *  
       * <p>This is to validate the output specification for the job when it is
       * a job is submitted.  Typically checks that it does not already exist,
       * throwing an exception when it already exists, so that output is not
       * overwritten.</p>
       *
       * @param context information about the job
       * @throws IOException when output should not be attempted
       */
      public abstract void checkOutputSpecs(JobContext context
                                            ) throws IOException, 
                                                     InterruptedException;
    
      /**
       * Get the output committer for this output format. This is responsible
       * for ensuring the output is committed correctly.
       * @param context the task context
       * @return an output committer
       * @throws IOException
       * @throws InterruptedException
       */
      public abstract 
      OutputCommitter getOutputCommitter(TaskAttemptContext context
                                         ) throws IOException, InterruptedException;
    }
    • 如需要实现OutputFormat则需要实现RecordWriter
    • OutputFormat 里面的checkOutputSpecs函数是检查输出路径,检查输出路径是否设置以及输出路径是否存在
    • RecordWriter abstract class
    /**
     * <code>RecordWriter</code> writes the output &lt;key, value&gt; pairs 
     * to an output file.
     
     * <p><code>RecordWriter</code> implementations write the job outputs to the
     * {@link FileSystem}.
     * 
     * @see OutputFormat
     */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public abstract class RecordWriter<K, V> {
      /** 
       * Writes a key/value pair.
       *
       * @param key the key to write.
       * @param value the value to write.
       * @throws IOException
       */      
      public abstract void write(K key, V value
                                 ) throws IOException, InterruptedException;
    
      /** 
       * Close this <code>RecordWriter</code> to future operations.
       * 
       * @param context the context of the task
       * @throws IOException
       */ 
      public abstract void close(TaskAttemptContext context
                                 ) throws IOException, InterruptedException;
    • RecordWriter中只要实现write、close就可以实现自定义的RecordWriter
    • 下面就来操作试试,自定义实现输出为文件、数据库

     

    2 自定义实现输出文件

    2.1 需求

    计算每个域名的访问量总和
    1. 把www.baidu.com域名的MapReduce数据结果输出到www.baidu.com.log文件里面
    2. 把www.qq.com域名的MapReduce数据结果输出到www.qq.com.log文件里面

    2.2 数据

    域名,访问次数
    www.baidu.com,10
    www.qq.com,9
    www.baidu.com,7
    www.qq.com,10
    www.qq.com,23
    www.baidu.com,6
    www.qq.com,12
    www.qq.com,24
    www.baidu.com,9

    2.3 Code

    2.3.1 MyFileOutputFormatDriver Code

    package com.xk.bigata.hadoop.mapreduce.outputformat;
    
    import com.xk.bigata.hadoop.utils.FileUtils;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class MyFileOutputFormatDriver {
    
        public static void main(String[] args) throws Exception {
    
            String input = "mapreduce-basic/data/domain.data";
            String output = "mapreduce-basic/out";
    
            // 1 创建 MapReduce job
            Configuration conf = new Configuration();
            Job job = Job.getInstance(conf);
    
            // 删除输出路径
            FileUtils.deleteFile(job.getConfiguration(), output);
    
            // 2 设置运行主类
            job.setJarByClass(MyFileOutputFormatDriver.class);
    
            // 3 设置Map和Reduce运行的类
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
    
            // 4 设置Map 输出的 KEY 和 VALUE 数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 5 设置Reduce 输出 KEY 和 VALUE 数据类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            // 6 设置输入和输出路径
            FileInputFormat.setInputPaths(job, new Path(input));
            FileOutputFormat.setOutputPath(job, new Path(output));
    
            // 设定之定义的output
            job.setOutputFormatClass(MyFileOutputFormat.class);
    
            // 7 提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
    
        }
    
        public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String[] spilts = value.toString().split(",");
                context.write(new Text(spilts[0]), new IntWritable(Integer.parseInt(spilts[1])));
            }
        }
    
        public static class MyReducer extends Reducer<Text, IntWritable, Text, IntWritable> {
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int sum = 0;
                for (IntWritable value : values) {
                    sum += value.get();
                }
                context.write(key, new IntWritable(sum));
            }
        }
    }

    2.3.2 MyFileOutputFormat Code

    • 由于写入的是文件,所以直接继承FileOutputFormat class就可以了,可以不用自己再写 checkOutputSpecs、getOutputCommitter这两个函数的实现了
    package com.xk.bigata.hadoop.mapreduce.outputformat;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    public class MyFileOutputFormat extends FileOutputFormat<Text, IntWritable> {
    
        @Override
        public RecordWriter<Text, IntWritable> getRecordWriter(TaskAttemptContext job) throws IOException, InterruptedException {
            return new MyRecordWriter(job);
        }
    }

    2.3.3 MyRecordWriter Code

    package com.xk.bigata.hadoop.mapreduce.outputformat;
    
    import org.apache.hadoop.fs.FSDataOutputStream;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IOUtils;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.RecordWriter;
    import org.apache.hadoop.mapreduce.TaskAttemptContext;
    
    import java.io.IOException;
    
    public class MyRecordWriter extends RecordWriter<Text, IntWritable> {
    
        FileSystem fs = null;
        FSDataOutputStream baiduOut = null;
        FSDataOutputStream qqOut = null;
    
        public MyRecordWriter(TaskAttemptContext job) {
            try {
                fs = FileSystem.get(job.getConfiguration());
                baiduOut = fs.create(new Path("mapreduce-basic/out/www.baidu.com.log"));
                qqOut = fs.create(new Path("mapreduce-basic/out/www.qq.com.log"));
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
    
        @Override
        public void write(Text key, IntWritable value) throws IOException, InterruptedException {
            String domain = key.toString();
            if (domain.equals("www.baidu.com")) {
                baiduOut.write((key.toString() + "\t" + value.toString()).getBytes());
            } else if (domain.equals("www.qq.com")) {
                qqOut.write((key.toString() + "\t" + value.toString()).getBytes());
            }
        }
    
        @Override
        public void close(TaskAttemptContext context) throws IOException, InterruptedException {
            if (null != baiduOut) {
                IOUtils.closeStream(baiduOut);
            }
            if (null != qqOut) {
                IOUtils.closeStream(qqOut);
            }
            if (null != fs) {
                fs.close();
            }
        }
    }

    2.4 结果

    2.4.1 www.baidu.com.log

    www.baidu.com	32

    2.4.2 www.qq.com.log

    www.qq.com	78

    MyFileOutputFormatDriver Code

    3 DBOutputFormat

    3.1 需求

    使用MapReduce进行词频统计
    把MapReduce 计算结果输出到MySQL里面

    3.2 数据

    hadoop,spark,flink
    hbase,hadoop,spark,flink
    spark
    hadoop
    hadoop,spark,flink
    hbase,hadoop,spark,flink
    spark
    hadoop
    hbase,hadoop,spark,flink

    3.3 DDL

    CREATE TABLE `wc` (
     `word` varchar(100) DEFAULT NULL,
     `cnt` int(11) DEFAULT NULL
    );

    3.4 Code

    3.4.1 MysqlWordCountDoamin Code

    package com.xk.bigata.hadoop.mapreduce.domain;
    
    import org.apache.hadoop.io.Writable;
    import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    public class MysqlWordCountDoamin implements Writable, DBWritable {
    
        private String word;
    
        private int cnt;
    
        public MysqlWordCountDoamin() {
        }
    
        public MysqlWordCountDoamin(String word, int cnt) {
            this.word = word;
            this.cnt = cnt;
        }
    
        @Override
        public String toString() {
            return "MysqlWordCountDoamin{" +
                    "word='" + word + '\'' +
                    ", cnt=" + cnt +
                    '}';
        }
    
        public String getWord() {
            return word;
        }
    
        public void setWord(String word) {
            this.word = word;
        }
    
        public int getCnt() {
            return cnt;
        }
    
        public void setCnt(int cnt) {
            this.cnt = cnt;
        }
    
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeUTF(word);
            out.writeInt(cnt);
        }
    
        @Override
        public void readFields(DataInput in) throws IOException {
            word = in.readUTF();
            cnt = in.readInt();
        }
    
        @Override
        public void write(PreparedStatement statement) throws SQLException {
            statement.setString(1, word);
            statement.setInt(2, cnt);
        }
    
        @Override
        public void readFields(ResultSet resultSet) throws SQLException {
            word = resultSet.getString(1);
            cnt = resultSet.getInt(2);
        }
    }

    3.4.2 MysqlDBOutputFormat Code

    package com.xk.bigata.hadoop.mapreduce.outputformat;
    
    import com.xk.bigata.hadoop.mapreduce.domain.MysqlWordCountDoamin;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    
    public class MysqlDBOutputFormat {
    
        public static void main(String[] args) throws Exception {
            String input = "mapreduce-basic/data/wc.txt";
    
            // 1 创建 MapReduce job
            Configuration conf = new Configuration();
            // 设置JDBC连接
            DBConfiguration.configureDB(conf,
                    "com.mysql.jdbc.Driver",
                    "jdbc:mysql://bigdatatest01:3306/bigdata",
                    "root",
                    "Jgw@31500");
            Job job = Job.getInstance(conf);
    
            // 2 设置运行主类
            job.setJarByClass(MysqlDBOutputFormat.class);
    
            // 3 设置Map和Reduce运行的类
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
    
            // 4 设置Map 输出的 KEY 和 VALUE 数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 5 设置Reduce 输出 KEY 和 VALUE 数据类型
            job.setOutputKeyClass(NullWritable.class);
            job.setOutputValueClass(MysqlWordCountDoamin.class);
    
            // 设置输出类
            job.setOutputFormatClass(DBOutputFormat.class);
            // 6 设置输入和输出路径
            FileInputFormat.setInputPaths(job, new Path(input));
            DBOutputFormat.setOutput(job, "wc", "word", "cnt");
            // 7 提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    
        public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
            IntWritable ONE = new IntWritable(1);
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String[] spilts = value.toString().split(",");
                for (String word : spilts) {
                    context.write(new Text(word), ONE);
                }
            }
        }
    
        public static class MyReducer extends Reducer<Text, IntWritable, NullWritable, MysqlWordCountDoamin> {
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int count = 0;
                for (IntWritable value : values) {
                    count += value.get();
                }
                MysqlWordCountDoamin mysqlWordCountDoamin = new MysqlWordCountDoamin(key.toString(), count);
                context.write(NullWritable.get(), mysqlWordCountDoamin);
            }
        }
    }

    3.5 报错

    java.lang.Exception: java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
    	at org.apache.hadoop.mapred.LocalJobRunner$Job.runTasks(LocalJobRunner.java:489)
    	at org.apache.hadoop.mapred.LocalJobRunner$Job.run(LocalJobRunner.java:556)
    Caused by: java.lang.ClassCastException: org.apache.hadoop.io.NullWritable cannot be cast to org.apache.hadoop.mapreduce.lib.db.DBWritable
    	at org.apache.hadoop.mapreduce.lib.db.DBOutputFormat$DBRecordWriter.write(DBOutputFormat.java:66)
    	at org.apache.hadoop.mapred.ReduceTask$NewTrackingRecordWriter.write(ReduceTask.java:558)
    	at org.apache.hadoop.mapreduce.task.TaskInputOutputContextImpl.write(TaskInputOutputContextImpl.java:89)
    	at org.apache.hadoop.mapreduce.lib.reduce.WrappedReducer$Context.write(WrappedReducer.java:105)
    	at com.xk.bigata.hadoop.mapreduce.outputformat.MysqlDBOutputFormat$MyReducer.reduce(MysqlDBOutputFormat.java:81)
    	at com.xk.bigata.hadoop.mapreduce.outputformat.MysqlDBOutputFormat$MyReducer.reduce(MysqlDBOutputFormat.java:72)
    	at org.apache.hadoop.mapreduce.Reducer.run(Reducer.java:171)
    	at org.apache.hadoop.mapred.ReduceTask.runNewReducer(ReduceTask.java:627)
    	at org.apache.hadoop.mapred.ReduceTask.run(ReduceTask.java:389)
    	at org.apache.hadoop.mapred.LocalJobRunner$Job$ReduceTaskRunnable.run(LocalJobRunner.java:346)
    	at java.util.concurrent.Executors$RunnableAdapter.call(Executors.java:511)
    	at java.util.concurrent.FutureTask.run(FutureTask.java:266)
    	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    	at java.lang.Thread.run(Thread.java:748)

    解决方案

    • 是由于Reduce输出的Key设置成NullWritable,无法write数据
    • 只要把Reduce里面的key改成自定义的数据类型,value改成NullWritable
    package com.xk.bigata.hadoop.mapreduce.outputformat;
    
    import com.xk.bigata.hadoop.mapreduce.domain.MysqlWordCountDoamin;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    
    import java.io.IOException;
    
    public class MysqlDBOutputFormat {
    
        public static void main(String[] args) throws Exception {
            String input = "mapreduce-basic/data/wc.txt";
    
            // 1 创建 MapReduce job
            Configuration conf = new Configuration();
            // 设置JDBC连接
            DBConfiguration.configureDB(conf,
                    "com.mysql.jdbc.Driver",
                    "jdbc:mysql://bigdatatest01:3306/bigdata",
                    "root",
                    "Jgw@31500");
            Job job = Job.getInstance(conf);
    
            // 2 设置运行主类
            job.setJarByClass(MysqlDBOutputFormat.class);
    
            // 3 设置Map和Reduce运行的类
            job.setMapperClass(MyMapper.class);
            job.setReducerClass(MyReducer.class);
    
            // 4 设置Map 输出的 KEY 和 VALUE 数据类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            // 5 设置Reduce 输出 KEY 和 VALUE 数据类型
            job.setOutputKeyClass(MysqlWordCountDoamin.class);
            job.setOutputValueClass(NullWritable.class);
    
            // 设置输出类
            job.setOutputFormatClass(DBOutputFormat.class);
            // 6 设置输入和输出路径
            FileInputFormat.setInputPaths(job, new Path(input));
            DBOutputFormat.setOutput(job, "wc", "word", "cnt");
            // 7 提交job
            boolean result = job.waitForCompletion(true);
            System.exit(result ? 0 : 1);
        }
    
        public static class MyMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
            IntWritable ONE = new IntWritable(1);
    
            @Override
            protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String[] spilts = value.toString().split(",");
                for (String word : spilts) {
                    context.write(new Text(word), ONE);
                }
            }
        }
    
        public static class MyReducer extends Reducer<Text, IntWritable, MysqlWordCountDoamin, NullWritable> {
    
            @Override
            protected void reduce(Text key, Iterable<IntWritable> values, Context context) throws IOException, InterruptedException {
                int count = 0;
                for (IntWritable value : values) {
                    count += value.get();
                }
                MysqlWordCountDoamin mysqlWordCountDoamin = new MysqlWordCountDoamin(key.toString(), count);
                context.write(mysqlWordCountDoamin, NullWritable.get());
            }
        }
    }

    3.6 结果

    word,cnt
    flink,5
    hadoop,7
    hbase,3
    spark,7

    MysqlDBOutputFormat Code

     

    展开全文
  • 什么是OutputFormat及其运行机制? 如何自定义自己的OutputFormat? 实战自定义mysql OutputFormat。 一丶什么是OutputFormat? 定义了 spark 的输出规则的类。这也许会让你想到 Hadoop Mapreduce 的 OutputFormat...

    前言

    本文主要内容

    1. 什么是OutputFormat及其运行机制?
    2. 如何自定义自己的OutputFormat?
    3. 实战自定义mysql OutputFormat。

    一丶什么是OutputFormat?

    定义了 spark 的输出规则的类。这也许会让你想到 Hadoop Mapreduce 的 OutputFormat,没错,其实他们是一个东西,嗯,完全一样。Spark 本身只是一个计算框架,其输入和输出都是依赖于 Hadoop 的
    OutputFormat,但是因为 Spark 本身自带 Hadoop 相关 Jar 包,所以不需要我们额外考虑这些东西,下面我们以saveAsTextFile源码来验证我们的结论

     def saveAsTextFile(path: String): Unit = withScope {
        val nullWritableClassTag = implicitly[ClassTag[NullWritable]]
        val textClassTag = implicitly[ClassTag[Text]]
        val r = this.mapPartitions { iter =>
          val text = new Text()
          iter.map { x =>
            text.set(x.toString)
            (NullWritable.get(), text)
          }
        }
        //最后调用的 saveAsHadoopFile()  并且泛型是 org.apache.hadoop.mapred.TextOutputFormat,
        //是属于 hadoop 包下的一个outputformat,以此简单来验证我们的结论
        RDD.rddToPairRDDFunctions(r)(nullWritableClassTag, textClassTag, null)
          .saveAsHadoopFile[TextOutputFormat[NullWritable, Text]](path)
      }
    

    二丶OutputFormat运行机制?

    我们知道 Spark 是分布式计算框架,其计算是一个个 Executor 为单元进行的,当运行到 类似于 saveAsTextFile等输出型算子时,会根据其定义的 Outputformat 规则进行输出,在每个Executor 单元内的每个task有且只有一个 Outputformat 实例

    三丶自定义 OutputFormat 解析

    首先我们来看一下 OutputFormat 接口

    public interface OutputFormat<K, V> {
    
      /** 
       * 根据给予的参数返回一个 RecordWriter 对象
       *
       * @param ignored 基本没什么用
       * @param job 可以用来获取各种配置,定制特别的 RecordWriter
       * @param name 一个唯一的名字,比如:part-0001
       * @param progress mechanism for reporting progress while writing to file.
       */
      RecordWriter<K, V> getRecordWriter(FileSystem ignored, JobConf job,
                                         String name, Progressable progress)
      throws IOException;
    
      /** 
       * 用来做输出前的各种检查
       */
      void checkOutputSpecs(FileSystem ignored, JobConf job) throws IOException;
    
    //获取一个 OutputCommitter,用来保证输出的正确执行
      public abstract  OutputCommitter getOutputCommitter(TaskAttemptContext context) throws IOException, InterruptedException;
    }
    

    checkOutputSpecs很好理解,用来做输出前的检查,比如 Spark 会对输出路径做检查,如果存在就抛出异常,那么接下来我们先理解下 RecordWriter 和 OutputCommitter

    • RecordWriter
    public abstract class RecordWriter<K, V> {
      /** 
       * outputformat 是针对于 kv格式的RDD的,
       * Rdd数据的每条记录都会调用一次 write 方法 用来写入数据
       */      
      public abstract void write(K key, V value
                                 ) throws IOException, InterruptedException;
    
      /** 
       * 在数据写完之后,会进行调用,一般执行一些 IO 的 close 操作
       */ 
      public abstract void close(TaskAttemptContext context) throws IOException, InterruptedException;
    }
    

    这里我们可以发现,如果你不是 KV 格式的 Rdd,那么能调用的只有有限的几个输出型算子,比如saveAsTextFile,其实底层是给你加格式化成了 kv 格式 Rdd 的,其 key 为 NullWritable,这块一般是我们自定义的重点。

    • OutputCommitter
    package com.inveno.data.analysis.user.statistical;
    import java.io.IOException;
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    
    public abstract class OutputCommitter {
        /**
         * 每个job执行之前都会调用一次或者多次,用来进行一些初始化操作
         */
        public abstract void setupJob(JobContext jobContext) throws IOException;
    
        /**
         * 每个job执行之后都会调用一次或者多次,用来进行一些初始化操作
         */
        @Deprecated
        public void cleanupJob(JobContext jobContext) throws IOException {
        }
    
        /**
         * 每个job执行完成都会调用一次
         */
        public void commitJob(JobContext jobContext) throws IOException {
            cleanupJob(jobContext);
        }
    
    
        /**
         * 每个job中断执行会调用一次或者多次
         */
        public void abortJob(JobContext jobContext, JobStatus.State state)
                throws IOException {
            cleanupJob(jobContext);
        }
    
        /**
         * 每个 task 执行之前都会调用一次或者多次,用来进行一些初始化操作
         */
        public abstract void setupTask(TaskAttemptContext taskContext)
                throws IOException;
    
        /**
         * 需要输出到 hdfs 上的 task 用来检测是否有输出需要提交
         */
        public abstract boolean needsTaskCommit(TaskAttemptContext taskContext)
                throws IOException;
    
        /**
         * 每个 needsTaskCommit 为 true 的 task 执行完成都会调用一次或者多次
         */
        public abstract void commitTask(TaskAttemptContext taskContext)
                throws IOException;
    
        /**
         * task 中断会被调用一次或多次
         */
        public abstract void abortTask(TaskAttemptContext taskContext)
                throws IOException;
    
        /**
         * 是否支持输出恢复
         */
        public boolean isRecoverySupported() {
            return false;
        }
    
        /**
         * 恢复task输出
         */
        public void recoverTask(TaskAttemptContext taskContext)
                throws IOException {
        }
    }
    

    其中代码注释说的调用多次,一般都是因为重试机制导致的,一般只会调用一次,这个我们一般使用系统自带的实现类,然后在各个生命周期添加一些自定义操作。

    四丶实战---定义一个自己的 MysqlOutputFormat

    1. 每当你想自定义一个东西,第一步应该想的是:我有这个需求,别人有没有?我是不是在重复造轮子?别人的轮子适合我吗?我可以做的更好吗?
    2. 有了上面的思考,我们果断在源码包里面找到了一个叫做 DBOutputFormat的类,轮子果然是有的,那么好不好用呢?能不能优化一下呢?
    3. ok,废话不多说了,我们来看看今天我们自定义的 MysqlOutputFormat,因为要用在 Spark 上 所以我们使用的是 Scala 语言
    abstract class MysqlOutputFormat[K, V]() extends OutputFormat[K, V] {
    
      val logger = LoggerFactory.getLogger(getClass)
      //直接返回一个 MysqlWriter 对象
      override def getRecordWriter(taskAttemptContext: TaskAttemptContext): RecordWriter[K, V] = {
        new MysqlWriter[K, V](getDBFlag(), getValueConvert(), taskAttemptContext)
      }
    
    //空实现,这里可以根据你的需求实现,比如删除一些老旧数据
      override def checkOutputSpecs(jobContext: JobContext): Unit = {
      }
    //因为我们数据读入的KV格式,这里定义了一个 SQLValueConvert trait,来让使用者自定义输入规则
      def getValueConvert(): SQLValueConvert[K, V]
    //用于给 mysqlwriter 获取mysql相关参数的 flag
      def getDBFlag(): String
    
      //我们这里直接使用系统自带的就ok了,可以根据你的需求来做相关修改
      override def getOutputCommitter(taskAttemptContext: TaskAttemptContext): OutputCommitter = {
        new FileOutputCommitter(null, taskAttemptContext)
      }
    }
    

    实现比较简单,值得注意的是,在 Spark 中 OutputFormat 是通过反射生产的实例,所以需要提供一个无参的构造方法。那么接下来我们看看最重要的部分 MysqlWriter

    class MysqlWrite[K, V](db_flag: String, converter: SQLValueConvert[K, V], context: TaskAttemptContext) extends RecordWriter[K, V] {
      val logger = LoggerFactory.getLogger(getClass)
      
    //加载resource mysql配置文件
      val conf: Configuration = context.getConfiguration
      conf.addResource("mysql.xml")
    
    //根据传入的 flag 读取resource mysql 相应的配置文件
      val table: String = conf.get(String.format(JDBCManager.JDBC_TABLE_NAME, db_flag))//table name
      private val batch_size = conf.get(String.format(JDBCManager.BATCH, db_flag)).toInt// batch size
    
      var count = 0
    
      var committerStatement: PreparedStatement = _
      var conn: Connection = _
    
    //执行批量写入 mysql
      def commit(): Unit = {
        if (conn == null || committerStatement == null) {
          return
        }
        try {
          committerStatement.executeBatch()
          conn.commit()
    
          committerStatement.clearBatch()
          count = 0
        } catch {
          case e: Exception =>
            //出错回滚 并抛出异常
            conn.rollback()
            logger.error("在writer中写数据出现异常", e.printStackTrace())
            throw e
        }
      }
    
    //相关资源释放
      override def close(taskAttemptContext: TaskAttemptContext): Unit = {
        try {
        //提交剩余的数据
          commit()
        } catch {
          case e: Throwable =>
            throw new SQLException()
        } finally {
          if (committerStatement != null) {
            committerStatement.close()
          }
          if (conn != null) {
            conn.close()
          }
        }
      }
    
    
      override def write(key: K, value: V): Unit = {
        if (key == null || value == null) {
            return
         }
        try {
          //根据自定义规则 将KV转换成 array(),
          val values = converter.convert(key, value)
        
          //创建数据库链接
          if (conn == null) {
            conn = JDBCManager.getConnection(conf, db_flag)
            conn.setAutoCommit(false)
          }
          //创建Statement
          if (committerStatement == null) {
            committerStatement = conn.prepareStatement(
            //"INSERT INTO %s VALUES(%s)" 创建 sql 语句
              MysqlOperation.insertByParameter(table, values.length))
          }
        //添加参数
          for (i <- values.indices) {
            committerStatement.setObject(i + 1, values.apply(i))
          }
          committerStatement.addBatch()
    
          count = count + 1
          //大于batch_size进行提交
          if (count >= batch_size) {
            commit()
          }
        } catch {
          case e: Throwable =>
            println("在writer中写数据出现异常", e.printStackTrace())
            throw new Exception(e)
        }
      }
    

    上面的代码都比较简单,这里读者可以思考一下,数据库的连接是否可以放到 setupTask?提交任务是否可以放到 commitTask ?
    这边 mysql.xml 相关配置就不贴了,项目实际应用过程我们一般都需要将配置属性写到额外的文件,方便管理和维护。

    五丶额外的思考

    能否自定义一个outputformat来实现控制spark 文件的输出数量呢?这里主要考虑的多个task同时写入一个文件,必然涉及到文件的追加,而我们知道 hdfs虽然支持文件的追加,但是性能并不是很好,至于效率到底怎么样?笔者也没验证过。。。如果你有好的想法,欢迎留言。。。一起讨论!!!

    展开全文
  • OutputFormat 源码详解

    2018-12-20 15:36:32
    OutputFormat源码详解 1. 源码 package org.apache.hadoop.mapreduce; import java.io.IOException; import org.apache.hadoop.classification.InterfaceAudience; import org.apache.hadoop.classification....

    OutputFormat源码详解

    1. 源码

    package org.apache.hadoop.mapreduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.classification.InterfaceAudience;
    import org.apache.hadoop.classification.InterfaceStability;
    import org.apache.hadoop.fs.FileSystem;
    
    /** 
     * <code>OutputFormat</code> describes the output-specification for a 
     * Map-Reduce job.
    	OutputFormat 描述了指定的输出对于一个Map-Reduce job。
     
     *
     * <p>The Map-Reduce framework relies on the <code>OutputFormat</code> of the
     * job to:<p>
     * <ol>
     *   <li>
     *   Validate the output-specification of the job. For e.g. check that the 
     *   output directory doesn't already exist. 
     *   <li>
     *   Provide the {@link RecordWriter} implementation to be used to write out
     *   the output files of the job. Output files are stored in a 
     *   {@link FileSystem}.
     *   </li>
     * </ol>
     * 
     * @see RecordWriter
     */
    @InterfaceAudience.Public
    @InterfaceStability.Stable
    public abstract class OutputFormat<K, V> {
    
      /** 
       * Get the {@link RecordWriter} for the given task.
       *
       * @param context the information about the current task.
       * @return a {@link RecordWriter} to write the output for the job.
       * @throws IOException
       */
      public abstract RecordWriter<K, V> 
        getRecordWriter(TaskAttemptContext context
                        ) throws IOException, InterruptedException;
    
      /** 
       * Check for validity of the output-specification for the job.
       *  
       * <p>This is to validate the output specification for the job when it is
       * a job is submitted.  Typically checks that it does not already exist,
       * throwing an exception when it already exists, so that output is not
       * overwritten.</p>
       *
       * @param context information about the job
       * @throws IOException when output should not be attempted
       */
      public abstract void checkOutputSpecs(JobContext context
                                            ) throws IOException, 
                                                     InterruptedException;
    
      /**
       * Get the output committer for this output format. This is responsible
       * for ensuring the output is committed correctly.
       * @param context the task context
       * @return an output committer
       * @throws IOException
       * @throws InterruptedException
       */
      public abstract 
      OutputCommitter getOutputCommitter(TaskAttemptContext context
                                         ) throws IOException, InterruptedException;
    }
    
    展开全文
  • 主要介绍了java 中 自定义OutputFormat的实例详解的相关资料,这里提供实例帮助大家学习理解这部分内容,希望通过本文能帮助到大家,需要的朋友可以参考下
  • Hadoop之OutputFormat数据输出详解 目录 OutputFormat接口实现类 自定义OutputFormat 1. OutputFormat接口实现类 OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。下面...

    Hadoop之OutputFormat数据输出详解


    目录

    1. OutputFormat接口实现类
    2. 自定义OutputFormat

    1. OutputFormat接口实现类

    OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了 OutputFormat接口。下面我们介绍几种常见的OutputFormat实现类。

    1. 文本输出TextOutputFormat
      默认的输出格式是TextOutputFormat,它把每条记录写为文本行。它的键和值可以是任意类型,因为TextOutputFormat调用toString()方法把它们转换为字符串。

    2. SequenceFileOutputFormat
      SequenceFileOutputFormat将它的输出写为一个序列化文件。如果输出需要作为后续 MapReduce任务的输入,这便是一种好的输出格式,因为它的格式紧凑,很容易被压缩。

    3. 自定义OutputFormat
      根据用户需求,自定义实现输出。


    2. 自定义OutputFormat

    为了实现控制最终文件的输出路径,可以自定义OutputFormat。
    要在一个mapreduce程序中根据数据的不同输出两类结果到不同目录,这类灵活的输出需求可以通过自定义outputformat来实现。

    1. 自定义OutputFormat步骤
      1. 自定义一个类继承FileOutputFormat。
      2. 改写recordwriter,具体改写输出数据的方法write()。
    展开全文
  • 一、OutputFormat接口实现类 OutputFormat是MapReduce输出的基类,所有实现MapReduce输出都实现了OutputFormat接口。 二、自定义OutputFormat 1. 应用场景 例如:输出数据到MySQL、HBase、 ...3. 详解 https://blog
  • Hadoop中OutputFormat解析

    2019-09-23 22:40:30
    一、OutputFormat OutputFormat描述的是MapReduce的输出格式,它主要的任务是: ...OutputFormat的主要是由三个抽象方法组成,下面根据源代码介绍每个方法的功能,源代码详解如下: 1 public...
  • OutputFormat输出过程的学习

    千次阅读 2014-11-19 10:40:39
    今天花了一点点的时间,把MapReduce的最后一个阶段,输出OutputFormat给做了分析,这个过程跟InputFormat刚刚好是对着干的,二者极具对称性。为什么这么说呢,待我一一分析。  OutputFormat过程的作用就是定义数
  • JSF 2 outputFormat示例

    2020-05-27 03:52:04
    在JSF Web应用程序中,“ h:outputFormat ”标记与“ h:outputText ”标记相似,但是具有呈现参数化消息的额外功能。 例如, <h:outputFormat value="param0 : {0}, param1 : {1}" > <f:param value=...
  • OutputFormat的自我理解

    千次阅读 2016-06-28 22:34:04
    主要接口 1, getRecordWriter(FileSystem ...这个方法会调用其实现wrapper类的mapContext或者reduceContext的write方法,而这个调用的是一个同一个实例TaskInputOutputContextImpl类的outputFormat.write方法。
  • OutPutFormat介绍 (一)

    千次阅读 2015-02-02 17:11:12
    你可能已经熟悉了默认的OutputFormat,也就是TextOutputFormat,它是一种以行分隔,包含制表符界定的键值对的文本文件格式。尽管如此,对多数类型的数据而言,如再常见不过的数字,文本序列化会浪费一些空间,由此...
  • Hive中的InputFormat、OutputFormat与SerDe

    千次阅读 2016-06-03 18:53:16
    在有些时候,我们往往面对多行,结构化的文档,并需要将其导入Hive处理,此时,就需要自定义InputFormat、OutputFormat,以及SerDe了。 首先来理清这三者之间的关系,我们直接引用Hive官方说法: SerDe is a short...
  • Hive中压缩使用详解与性能分析

    千次阅读 多人点赞 2019-01-05 22:16:50
    所以HIVE可以使用hadoop自带的InputFormat和Outputformat实现从不同的数据源读取文件和写出不同格式的文件到文件系统中。同理,HIVE也可以使用hadoop配置的压缩方法对中间结果或最终数据进行压缩。 1.什么是压缩及...
  • MapReduce详解

    2019-09-21 23:28:42
    1.概述 MapReduce是一种编程模型,用于大规模数据集(大于1TB)的并行运算。概念“Map”(映射)和“Reduce(归约)”,是它们的主要思想,都是从函数式编程语言里借来的,还有从矢量编程语言里借来的特性。...
  • MapReduce框架详解

    2020-02-29 04:03:55
    文章目录MapReduce框架详解1、Job提交源码分析2、输入端InputFormat2.1、FilelnputFormat切片原则(默认)2.1.1、切片的原则2.1.2、修改切片大小2.1.3、获取切片信息 API2.1.4、代码测试2.2、CombineTextinputFomat...
  • hadoop自带wordcount代码详解

    千次阅读 2016-06-01 05:25:36
    hadoop中自带wordcount代码详解wordcount代码详解package cn.chinahadoop;import java.io.IOException; import java.util.StringTokenizer;import org.apache.hadoop.conf.Configuration; import org.apache.hadoop....
  • Hive学习3:Hive三种建表语句详解

    万次阅读 多人点赞 2017-10-29 11:29:47
    Hive学习3:Hive三种建表语句详解
  • Hadoop 新 MapReduce 框架 Yarn 详解
  • Android开发之MediaRecorder类详解

    万次阅读 多人点赞 2014-08-14 20:23:47
    详解 手机一般都有麦克风和摄像头,而 Android 系统就可以利用这些硬件来录制音视频了。 为了增加对录制音视频的支持, Android 系统提供了一个 MediaRecorder 的类。该类的使用也非常简单,下面让...
  • Sqoop详解

    2018-12-06 17:35:27
    原理:将导入导出命令翻译成MR成功徐来实现,在翻译出的MR中主要是对inputformat和outputformat进行定制。 安装与配置 1)解压安装 (这里以sqoop1.4.7为例) 2)修改配置文件 重命名文件mv sqoop-env-template.sh...
  • 标签详解

    2010-08-27 14:30:59
    JSF学习之十一:标签详解 一、 輸出類標籤輸出類的標籤包括了outputLabel、outputLink、outputFormat與 outputText,分別舉例說明如下: outputLabel  產生&lt;label&gt; HTML標籤,使用for屬性指定...
  • WordCount运行详解

    2014-06-19 16:07:38
    1、MapReduce理论简介   1.1 MapReduce编程模型 ... MapReduce采用"分而治之"的思想,把对大规模数据集的操作,分发给一个主节点管理下的各个分节点共同完成,然后通过整合各个节点的... (2)OutputFormat ...
  • MapReduce 详解

    千次阅读 2013-04-26 15:53:39
    函数式编程概念  MapReduce程序是设计用来并行计算大规模海量数据的,这需要把工作流分划到大量的机器上去,如果组件(component)之间可以任意的共享数据,那这个模型就没法扩展到大规模集群上去了(数百或数千个...
  • Hadoop InputFormat详解

    2014-04-08 20:30:00
    Hadoop InputFormat详解 InputFormat是MapReduce编程模型包括5个可编程组件之一,其余4个是Mapper、Partitioner、Reducer和OutputFormat。 新版HadoopInputFormat是一个抽象类,之前的InputFormat是...
  • 这里自定义的列的分割符是‘,’,无法直接定义行的分割符,如果要定义行要重写INPUTFORMAT和outputformat类。这里使用的数据存储格式还是默认的TEXTFILE。注意下系统show create table后建表语句的变化。 建表...
  • Jsf标签详解

    2014-04-18 16:19:17
    Jsf标签详解(一个不漏) 整理来自:www.web-tag.net      actionListener    f:actionListener标签为h:commandLink,h:commandButton等指定自定义的事件侦听类。 f:actionListener使用: JSP: &lt;...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,769
精华内容 707
关键字:

outputformat详解