精华内容
下载资源
问答
  • Hadoop实战之MapReducer项目结构分析

    千次阅读 2018-05-06 17:59:22
    一.MapReducer项目结构分析 1.前言 参考本例子前: 1.需要确保搭建好了hadoop集群环境。 2.安装了eclipse开发hadoop的环境。 3.这是关于MapReducer中的Mapper、Reduce、Job三个类的学习分析。 2.hadoop的...

    一.MapReducer项目结构分析

    1.前言

    参考本例子前: 1.需要确保搭建好了hadoop集群环境。 2.安装了eclipse开发hadoop的环境。 3.这是关于MapReducer中的Mapper、Reduce、Job三个类的学习分析。

    2.hadoop的MapReducer模型结构

    (1)在eclipse中做hadoop开发: 在系统上打开eclipse,新建一个MapReducer 项目:
    1. 打开 eclipse,file->other->Map/Reducer Project->Next->Project name 创建名为WordCount 的项目名,然后创建名为cn.edu.gznc的包,接着在包里面创建三个类,分别是WordCountMapper、WordCountReduce、WordCountJob。
      图示:
      这里写图片描述
      这里写图片描述
      这里是在演示通常的hadoop实战时候建立的 MapReducer项目结构。
      接下来,我们先分析一下什么是MapReduce。

    3.分析

    总所周知,HDFS和MapReduce是Hadoop的两个重要核心,其中MapReduce是Hadoop的分布式计算模型。 而典型的MapReduce主要分为两步Map步和Reduce步,为了方便学习和理解,这里引用一个故事来解释:
    现在派你统计一个图书馆里面有多少本书,为了完成这个任务,你可以指派小明去统计书架1,指派小红去统计书架2.....这个指派的过程就是Map步,最后,每个人统计完属于自己负责的书架后,再对每个人的结果进行累加统计,累计统计过程就是Reduce步。

    这就是一个简单的理解,为了方便我们hadoop实战学习,如果想深入理解,自行百度即可。

    接下来,我们分析一下MapReduce项目中三个必要的类。

    1. xxxMapper.java
      一般在 xxxmapper.java 类中需要写一个map方法。
      也就是上面说的Map步:
      首先,要实现Map步,实际上就是实现一个类,这个类继承了Mapper类并且重写其中的map方法。

    重写这个map方法有什么意义呢?
    继续拿统计图书的例子来说,当小明被指派到书架1统计图书的时候,小明可以偷懒,对于那些他不想统计的书,他可以不统计;小明也可以很尽责,统计的结果达到百分百准确。
    总而言之,小明只要拿出统计结果给负责汇总的人就可以了,至于他是怎么处理的,负责汇总的人管不着。
    而重写这个map方法,就对应于实现这个处理的过程,负责将输入的<key,value>对进行处理统计,并且输出<key,value>对给下一步处理。

    比如:

    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{ 
        /* 
         * map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次我们自定义的map方法 
         * map task在调用map方法时,传递的参数: 
         *      一行的起始偏移量LongWritable作为key 
         *      一行的文本内容Text作为value 
         */  
        @Override  
        protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {  
            //拿到一行文本内容,转换成String 类型  
            String line = value.toString();  
            //将这行文本切分成单词  
            String[] words=line.split(" ");  
              
            //输出<单词,1>  
            for(String word:words){  
                context.write(new Text(word), new IntWritable(1));  
            }  
        }  
    
    1. xxxReducer.java
      要实现Reduce步,就需要实现一个类,这个类继承了Reducer类并且重写其中的reduce方法。所以 xxxReducer.java里面写一个reduce方法。

    提醒:
    在Map步的输出结果为<单词,1>这样的形式,而且做了合并处理,将拥有相同key值的键值对进行合并,形成一个<key,valuelist>的形式,这个<key,valuelist>的键值对集合,又作为Reduce步的输入。
    <key,valuelist>或者Iterable 的键值对集合,作为Reduce步的输入。

    继续理解:
    这个Reduce步就相当于在统计图书中那个汇总统计的人,负责对手下的工作结果进行汇总,Reduce步的输入和输出同样为<key,value>。这部分代码参见图三中的第一个绿色框(Reduce步的输入)和第二个绿色框(Reduce步的输出)。
    context应该是用来传递数据以及其他运行状态信息,map中的key、value写入context,让它传递给下一层。

    代码如下:

    public class WordCountReducer  extends Reducer<Text, IntWritable, Text, IntWritable>{
    	@Override  
        /* 
         * reduce方法提供给reduce task进程来调用 
         *  
         * reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,聚合的机制是相同key的kv对聚合为一组 
         * 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法 
         * 比如:<hello,1><hello,1><hello,1><tom,1><tom,1><tom,1> 
         *  hello组会调用一次reduce方法进行处理,tom组也会调用一次reduce方法进行处理 
         *  调用时传递的参数: 
         *          key:一组kv中的key 
         *          values:一组kv中所有value的迭代器 
         */  
        protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {  
            //定义一个计数器  
            int count = 0;  
            //通过value这个迭代器,遍历这一组kv中所有的value,进行累加  
            for(IntWritable value:values){  
                count+=value.get();  
            }  
              
            //输出这个单词的统计结果  
            context.write(key, new IntWritable(count));  
        }  
    

    3.xxxJob.java
    这一步,一般是提供主函数入口,加载job下的jar包,mapper,reducer文件,将任务提交给hadoop集群等工作。
    在Hadoop中,每个MapReduce任务被当做一个Job(作业),在执行任务之前,首先要对任务进行一些配置。
    xxxJob.java 中一般需要设置如下东西:
    • 设置处理该作业的类,setJarByClass()
    • 设置这个作业的名字,setJobName()
    • 设置这个作业输入数据所在的路径
    • 设置这个作业输出结果保存的路径
    • 设置实现了Map步的类,setMapperClass()
    • 设置实现了Reduce步的类,setReducerClass()
    • 设置输出结果key的类型,setOutputKeyClass()
    • 设置输出结果value的类型,setOuputValueClass()
    • 执行作业(提交给hadoop集群)

    因为我们编码完成以后了,一般不再eclipse中运行,而是编码好了以后,打包为jar包,导出到hadoop下面去运行。


    上诉的分析就是整个MapReducer项目结构简要 分析,主要是帮助自身理解为什么做一个hadoop分析数据,需要这么三个java类文件和几个必要的方法。

    下一篇,开始记录hadoop实战之WordCount统计单词数目。
    Hadoop实战之WordCount统计单词数目


    You got a dream, you gotta protect it.
    如果你有梦想的话,就要去捍卫它 。 ——《当幸福来敲门》

    展开全文
  • 一.Hadoop实战之WordCount统计单词数目 1.前言: 上一篇文章MapReducer项目结构分析 分析了hadoop的计算模型MapReducer的项目结构,下面就仿照先做一个WordCount统计单词数目的实战,便于理解。 参考本例子前: ...

    一.Hadoop实战之mapreduce的WordCount统计单词数目

    1.前言:

    上一篇文章[MapReducer项目结构分析](https://blog.csdn.net/ITBigGod/article/details/80216622) 分析了hadoop的计算模型MapReducer的项目结构,下面就仿照先做一个WordCount统计单词数目的实战,便于理解。

    参考本例子前:
    1.需要确保搭建好了hadoop集群环境。
    2.安装了eclipse开发hadoop的环境。
    3.这是关于Hadoop实战之WordCount统计单词数目。

    2.实战:

    (1)在eclipse中编码:

    新建一个MapReducer项目:

    1. 打开eclipse,file->other->Map/Reducer Project->Next->Project name创建名为WordCount的项目名,然后创建名为cn.edu.gznc的包,接着在包里面创建三个类,分别是WordCountMapper、WordCountReduce、WordCountJob。
      如图所示:
      这里写图片描述这里写图片描述

    2. xxxMapper.java
      一个map方法.
      代码如下:
    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    /* 
     * KEYIN:输入kv数据对中key的数据类型 
     * VALUEIN:输入kv数据对中value的数据类型 
     * KEYOUT:输出kv数据对中key的数据类型 
     * VALUEOUT:输出kv数据对中value的数据类型 
     */  
    public class WordCountMapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    	  
        /* 
         * map方法是提供给map task进程来调用的,map task进程是每读取一行文本来调用一次我们自定义的map方法 
         * map task在调用map方法时,传递的参数: 
         *      一行的起始偏移量LongWritable作为key 
         *      一行的文本内容Text作为value 
         */  
        @Override  
        protected void map(LongWritable key, Text value,Context context) throws IOException, InterruptedException {  
            //拿到一行文本内容,转换成String 类型  
            String line = value.toString();  
            //将这行文本切分成单词,以空格切分。 
            String[] words=line.split(" "); 
            
            //输出格式<单词,1>  
            for(String word:words){  
                context.write(new Text(word), new IntWritable(1));  
            }  
        }  
    }
    
    
    1. xxxReducer.java
      一个reduce方法:

    WordCount程序中的Map步的输出结果为<单词,1>对,上面有一个合并处理,将拥有相同key值的键值对进行合并,形成一个<key,valuelist>,这个<key,valuelist>的键值对集合,作为Reduce步的输入。
    <key,valuelist>或者Iterable 的键值对集合,作为Reduce步的输入。
    代码如下:

    import java.io.IOException;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    /* 
     * KEYIN:对应mapper阶段输出的key类型 
     * VALUEIN:对应mapper阶段输出的value类型 
     * KEYOUT:reduce处理完之后输出的结果kv对中key的类型 
     * VALUEOUT:reduce处理完之后输出的结果kv对中value的类型 
     */  
    public class WordCountReducer  extends Reducer<Text, IntWritable, Text, IntWritable>{
    	@Override  
        /* 
         * reduce方法提供给reduce task进程来调用 
         *  
         * reduce task会将shuffle阶段分发过来的大量kv数据对进行聚合,聚合的机制是相同key的kv对聚合为一组 
         * 然后reduce task对每一组聚合kv调用一次我们自定义的reduce方法 
         * 比如:<hello,1><hello,1><hello,1><tom,1><tom,1><tom,1> 
         *  hello组会调用一次reduce方法进行处理,tom组也会调用一次reduce方法进行处理 
         *  调用时传递的参数: 
         *          key:一组kv中的key 
         *          values:一组kv中所有value的迭代器 
         */  
        protected void reduce(Text key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {  
            //定义一个计数器  
            int count = 0;  
            //通过value这个迭代器,遍历这一组kv中所有的value,进行累加  
            for(IntWritable value:values){  
                count+=value.get();  
            }  
              
            //输出这个单词的统计结果  
            context.write(key, new IntWritable(count));  
        }  
    }
    
    

    context 对象是用来传递数据以及其他运行状态信息,map中的key、value写入context,让它传递给下一层。

    3.xxxJob.java
    提供主函数入口,加载job下的jar包,mapper,reducer文件。
    代码如下:

    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;  
    import org.apache.hadoop.io.*;  
    import org.apache.hadoop.mapreduce.*;  
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;  
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;  
    public class WordCountJob {
    	public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {  
            Configuration conf = new Configuration();  
            Job wordCountJob = Job.getInstance(conf);  
              
            //重要:指定本job所在的jar包  
            wordCountJob.setJarByClass(WordCountJob.class);  
              
            //设置wordCountJob所用的mapper逻辑类为哪个类  
            wordCountJob.setMapperClass(WordCountMapper.class);  
            //设置wordCountJob所用的reducer逻辑类为哪个类  
            wordCountJob.setReducerClass(WordCountReducer.class);  
              
            //设置map阶段输出的kv数据类型  
            wordCountJob.setMapOutputKeyClass(Text.class);  
            wordCountJob.setMapOutputValueClass(IntWritable.class);  
              
            //设置最终输出的kv数据类型  
            wordCountJob.setOutputKeyClass(Text.class);  
            wordCountJob.setOutputValueClass(IntWritable.class);  
              
            //设置要处理的文本数据所存放的路径  
            FileInputFormat.setInputPaths(wordCountJob, args[0]);  
            FileOutputFormat.setOutputPath(wordCountJob, new Path(args[1]));  
              
            //提交job给hadoop集群  
            wordCountJob.waitForCompletion(true);  
        }  
    }
    
    

    编码完成以后了,一般不再eclipse中运行,而是编码好了以后,打包为jar包以后,导出到hadoop下面去运行。

    (2)导出jar包和准备单词文本:

    这里写图片描述这里写图片描述
    这里写图片描述
    这里写图片描述

    然后linux桌面上便出现了导出来的jar包
    这里写图片描述
    《注意:由于我是先做了再写文章的,可能图中名字不太一样,但是不影响。》

    在home/gznc/下新建一个myjar文件夹,把wc.jar移动到myjar文件夹下。(你可以自己选择其他路径。无影响)
    这里写图片描述

    其次准备一个文本文件1.txt,放在那里无所谓。
    里面直接填入一些单词!
    这里我就随便写一个了:
    这里写图片描述

    (3)启动hadoop集群环境:

    启动你的hadoop集群环境。
    我的是在root权限下操作。(你可以不一样。根据你自己的来启动)
    1.命令:start-all.sh
    或者单独启动:
    使用start-dfs.sh 和 start-yarn.sh代替start-all.sh命令
    启动完成:
    这里写图片描述

    (4)上传需要分析的数据到hdfs:

    这里需要注意的是在将文件内容上传至HDFS时,需要是已经创建了存放文件内容的文件夹,如果在上传之间我没有建立wordconut这个文件夹名,
    那么在上传之前我需要利用命令hadoop fs –mkdir /user/gznc/wordconut先创建wordconut文件夹。

    1.使用hadoop命令新建一个wordcount文件夹

    hadoop fs -mkdir /wordcount
    

    这里写图片描述

    查看是否创建成功:
    hadoop fs -ls / 或者 –ls –R / 查看所有
    这里写图片描述

    2.使用hadoop命令把txt文本上传到新建的那个文件夹中去。

    把1.txt上传到新建的文件夹里面去和查看是否上传成功:

    hadoop fs -put /home/gznc/1.txt /wordcount

    hadoop fs -ls /wordcount
    这里写图片描述

    查看一下文本内容是不是正常的:
    hadoop fs -cat /wordcount/1.txt

    (5)使用hadoop命令运行统计单词:

    命令:
    hadoop jar /home/gznc/myjar/wc.jar cn.edu.gznc.wc.WcJob hdfs://master:9000/wordcount hdfs://master:9000/outputfile
    

    解释如下:
    hadoop jar :hadoop命令
    wc.jar:导出的jar包
    cn.edu.gznc.wc.WcJob:wcjob文件的路径。
    Wordcount:新建的,用来保存单词文本的文件夹。
    Outputfile:统计以后输出的文件夹—自动创建。

    需要注意的是这里输出的路径在之前不需要建立,但每次跑集群时一定要注意输出路径或者路径名不能一致。

    运行结果:
    这里写图片描述

    4.使用hadoop命令查看统计的单词结果文本:
    查看统计结果:
    hadoop fs -ls /outputfile
    hadoop fs -cat /outputfile/part-r-00000
    这里写图片描述

    延展知识:
    hadoop fs –ls /----查看建立的hadoop创建的文件目录
    hadoop fs –ls –R /—查看文件系统目录的根目录
    hadoop fs –rm –r /output 删除hadoop文件系统目录:

    总结:
    HDFS和MapReduce是Hadoop的两个重要核心,其中MapReduce是Hadoop的分布式计算模型。
    MapReduce主要分为两步Map步和Reduce步,引用一个故事来解释:
    现在你要统计一个图书馆里面有多少本书,为了完成这个任务,你可以指派小明去统计书架1,指派小红去统计书架2,这个指派的过程就是Map步,最后,每个人统计完属于自己负责的书架后,再对每个人的结果进行累加统计,累计统计过程就是Reduce步。

    假设现在有n个文本,WordCount程序就是利用MR计算模型来统计这n个文本中每个单词出现的总次数。
    现在有两个文件:
    •   File 0:有两行,第一行的内容为“Hello World”,第二行的内容为“Hello Hadoop”
    •   File 1:有两行,第一行的内容为“Bye World”,第二行的内容为“Bye Hadoop”
    假设我们现在要统计这两个文件每种单词出现的次数,首先我们要对每个文本进行处理,即把其中的句子划分成词语,按照上面讲到的统计图书的故事,我们会将这两个文件分派给两个人,让这两个人各自去处理,待这两个人都处理完成之后,再对结果进行汇总统计,在图中充当这两个人角色的就是Map1和Map2,Map步的输入为<key,value>对,输出也为<key,value>对。

    统计单词原理图示:
    这里写图片描述


    到这里我们的Hadoop实战之WordCount统计单词数目就完成了。
    下一篇文章,将继续hadoop实战。
    需要源代码的可以下方留下邮箱。


    You got a dream, you gotta protect it.
    如果你有梦想的话,就要去捍卫它 。 ——《当幸福来敲门》

    展开全文
  • 前一篇说到项目记录了很多埋点日志,当有一天项目需求需要对这些日志做分析时那hadoop就是一把好手了,下面简单介绍下用java调用hadoop分布式计算的例子 首先我们需要做hadoop配置 [code="java"] ...
    前一篇说到项目记录了很多埋点日志,当有一天项目需求需要对这些日志做分析时那hadoop就是一把好手了,下面简单介绍下用java调用hadoop分布式计算的例子

    首先我们需要做hadoop配置

    public static Configuration getConf(){
    Configuration conf = new Configuration();
    conf.set("fs.default.name", "hdfs://192.168.217.129:9100");
    conf.set("mapred.job.tracker", "192.168.217.129:9101");
    return conf;
    }


    这里面的配置项是根据服务端hadoop的配置来定的,总之要能连接的上hadoop

    然后提供获取hdsf文件系统的方法

    public static FileSystem getHdfs(){
    if(hdfs != null){
    return hdfs;
    }
    try {
    hdfs = FileSystem.get(getConf());
    } catch (IOException e) {
    // TODO Auto-generated catch block
    e.printStackTrace();
    }

    return hdfs;
    }


    FileSysem是hdfs的核心类,封装了很多文件处理的方法比如我们要把我们的日志文件上传到hdfs

    getHdfs().copyFromLocalFile(new Path("D://op.log"), new Path(getHdfs().getWorkingDirectory().toString() + "/input/op.log"));

    执行完成后在eclipse hadoop视图中就能看到对应的文件

    [img]http://dl2.iteye.com/upload/attachment/0112/1910/5cb85570-e6ab-3174-a7d4-37ef2fc09789.png[/img]

    然后我们来看下文件的内容,里面都是用户的操作记录,我们要做的任务是把操作日志中用户发送消息的消息内容提取出来,并且统计发送的次数

    [img]http://dl2.iteye.com/upload/attachment/0112/1912/d0e90642-bc8d-36e7-bb0b-c229c3d40414.png[/img]

    都知道hadoop是用mapreduce的方式来计算的,那就先来看看这两块代码

    /**
    * 映射器
    * 用于将我们的数据进行预处理
    */
    public static class MyMapper extends Mapper<LongWritable, Text, Text, Text>{
    @Override
    protected void map(LongWritable key, Text value, Context context)
    throws IOException, InterruptedException {
    //获取单行数据
    String str = value.toString();
    String [] arr = str.split("\\|");
    if("/account!sendMessage.action".equals(arr[5]))
    //把用户名作为key 操作数据作为值输出
    context.write(new Text(arr[1]), new Text(","+arr[6]));
    }
    }

    /**
    * 处理器
    * 用于将mapper预处理的数据记录进行业务计算,然后输出
    */
    public static class MyReducer extends Reducer<Text, Text, Text, Text>{
    @Override
    //每个key都会调用这个reduce方法 values参数是用户的操作数据的集合,
    //hadoop会自动把相同key的values用集合的方式存储,一起穿个reduce处理
    protected void reduce(Text key, Iterable<Text> values,
    Context context)
    throws IOException, InterruptedException {
    int i = 0;
    for (Text v : values) {
    //统计用户发送的次数
    i ++;
    context.write(new Text(key), new Text(v));
    }

    context.write(new Text(key), new Text(i+"次发信息"));
    }
    }


    代码还是非常简洁的,mapper只做数据的解析,把日志数据进行拆分,索取需要的数据,reducer则做计算操作,最后输出我们想要的结果,我们要做的就是告诉hadoop输入输出的参数类型

    然后再来看看如果执行这段mapreduce代码

    public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
    System.setProperty("hadoop.home.dir", "D:/development/hadoop-2.2.0");


    //getHdfs().copyFromLocalFile(new Path("D://op.log"), new Path(getHdfs().getWorkingDirectory().toString() + "/input/op.log"));
    Job job = new Job(getConf(),"job 1");
    job.setJarByClass(JobToSomeThing.class);
    job.setMapperClass(MyMapper.class);
    job.setReducerClass(MyReducer.class);

    job.setInputFormatClass(TextInputFormat.class);
    job.setOutputKeyClass(Text.class);
    job.setOutputValueClass(Text.class);

    FileInputFormat.addInputPath(job, getHdfsPath("input/op*.log"));
    String outFileExt = "_" + new SimpleDateFormat("yyyyMMddHHmmss").format(new Date());
    FileOutputFormat.setOutputPath(job, getHdfsPath("out/helloworld"+outFileExt));
    System.out.println(job.waitForCompletion(true));
    getHdfs().copyToLocalFile(getHdfsPath("out/helloworld"+outFileExt), new Path("D://helloworld"+outFileExt));

    }

    是以单个job的方式,设置mapper类 reducer类,数据源来完成一个计算,把生成结果保存到hdfs的out/hellowrold下,最后我们把这个结果文件夹保存到我们本地D盘查看

    [img]http://dl2.iteye.com/upload/attachment/0112/1916/0a9cecc6-1a68-3c3e-9da7-fdf1d2242495.png[/img]
    展开全文
  • sso-cookie 跨域读写cookie的例子 思维导图 和java相关的 更多干货 分布式实战(干货) spring cloud 实战(干货) mybatis 实战(干货) spring boot 实战(干货) React 入门实战(干货) 构建中...
  • 仿微信朋友圈动态实战项目 APIJSONApp.apk 测试及自动生成代码工具 APIJSONTest.apk 开源许可 使用 Apache License 2.0,对 公司、团队、个人 等 商用、非商用 都自由免费且非常友好,请放心使用和登记 使用登记 ...

空空如也

空空如也

1 2 3
收藏数 49
精华内容 19
关键字:

hadoop实战项目例子