精华内容
下载资源
问答
  • MapReduce核心功能是将用户编写的业务逻辑代码自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。 5.2_MapReduce的优缺点 优点 MapReduce易编程 它简单的实现一些接口,就可以完成一个...

    由于文章太长,其余部分在我的其他几篇博客中!


    5、MapReduce

    5.1_简介

    MapReduce是一个分布式运算程序的编程框架,是用户开发“基于Hadoop的数据分析应用”的核心框架。

    MapReduce核心功能是将用户编写的业务逻辑代码和自带默认组件整合成一个完整的分布式运算程序,并发运行在一个Hadoop集群上。


    5.2_MapReduce的优缺点

    优点

    1. MapReduce易编程
      它简单的实现一些接口,就可以完成一个分布式程序,这个分布式程序可以分布到大量廉价的PC机器上运行。也就是说你写一个分布式程序,跟写一个简单的串行程序是一模一样的。就是因为这个特点使得MapReduce编程变得非常流行。

    2. 良好的扩展性
      当你的计算资源不能得到满足的时候,你可以通过简单的增加机器来扩展它的计算能力。

    3. 高容错性
      MapRedice设计的初衷就是使程字能够部署在廉价的PC机器上,这就要求它具有很高的容错性。比如其中一台机器挂了,它可以把上面的计算任务转移到另外一个节点上运行,不至于这个任务运行失败,而且这个过程不需要人工参与,而完全是由Hadoop内部完成的。

    4. 适合PB级以上海量数据的离线处理
      可以实现上千台服务器集群并发工作,提供数据处理能力。

    缺点

    1. 不擅长实时计算
      MapReduce无法像MySQL一样,在毫秒或者秒级内返回结果。

    2. 不擅长流式计算
      流式计算的输入数据是动态的,而MapReduce的输入数据集是静态的,不能动态变化。这是因为MapReduce自身的设计特点决定了数据源必须是静态的。

    3. 不擅长DAG(有向图)计算
      多个应用程序存在依赖关系,后一个应用程序的输入为前一个的输出。在这种情况下,MapReduce并不是不能做,而是使用后,每个MapReduce作业的输出结果都会写入到磁盘,会造成大量的磁盘IO,导致性能非常的低下。

    5.3_MapReduce的核心思想

    1. 分布式的运算程序往往需要分成至少2个阶段。
    2. 第一个阶段的MapTask并发实例,完全并行运行,互不相干。
    3. 第二个阶段的ReduceTask并发实例互不相干,但是他们的数据依赖于上一个阶段的所有MapTask并发实例的输出
    4. MapReduce编程模型只能包含一个Map阶段和一个Reduce阶段,如果用户的业务逻辑非常复杂,那就只能多个MapReduce程序,串行运行。

    5.4_MapReduce的节点主从结构

    • jobtracker
      • tasktracker
        • MapTask
        • ReduceTask
      • tasktracker
      • tasktracker

    5.5_MR运行流程

    1. 简单介绍

    在这里插入图片描述

    1. 传入一个文段
    2. 按行拆分
    3. Mapping阶段:拆分每一个单词,形式为<key,value>;key是单词,value是单词的个数
    4. Shuffing阶段:将key相同的键值对汇总到一起
    5. Reducer阶段:合并Shuffing阶段处理后,key相同的键值对(value做一个累加操作)
    6. 输出结果

    2. 详细流程:

    1. 待处理文本

    2. 客户端submit()之前,获取待处理数据的信息,然后根据参数配置,形成一个任务分配的规划

    3. 提交切片信息(到 Yarn RM)

      切片信息、jar包(在不同机器时才需要提交)、xml文件

    4. 计算出MapTask的数量

      在map阶段读取数据前,FileInputFormat会将输入文件分割成split。split的个数决定了map的个数。影响map个数(split个数)的主要因素有:

      1. 文件的大小。当块(dfs.block.size)为128m时,如果输入文件为128m,会被划分为1个split;当块为256m,会被划分为2个split。

      2. 文件的个数。FileInputFormat按照文件分割split,并且只会分割大文件,即那些大小超过HDFS块的大小的文件。如果HDFS中dfs.block.size设置为128m,而输入的目录中文件有100个,则划分后的split个数至少为100个。

      3. splitsize的大小。分片是按照splitszie的大小进行分割的,一个split的大小在没有设置的情况下,默认等于hdfs block的大小。但应用程序可以通过两个参数来对splitsize进行调节

    5. 对每一个MapTask中的数据进行如下处理:

      1. 读取数据(默认是读一行)
      2. 将数据返回给Mapper(map(k,v)、Context.write(k,v))
      3. 在Mapper中处理好数据后,将数据写入到环形缓冲区(容量默认为100M,到80%后反向写入)
      4. 分区(字典顺序)、排序(快排)
      5. 写到磁盘(分区且区有序)
      6. Merge归并排序
      7. 合并
    6. 所有MapTask任务完成后,启动相应数量(前面分区分了多少个,这里就是多少个)的ReduceTask处理数据范围

      1. 将同一分区的数据下载到ReduceTask本地磁盘

      2. 合并文件 归并排序

        Reduce(k,v)

        Context.write(k,v)

    7. 写到指定路径的文件中


    3. Shuffle机制

    Mapreduce确保每个reducer的输入都是按key排序的。系统执行排序的过程(即将mapper输出作为输入传给reducer)称为shuffle。


    4. Partition分区

    分区:把数据扎堆存放

    每个reducetask可以读取一个分区,产生一个结果文件,多个分区可以由多个reducetask来分别读取,并分别产生多个结果文件。



    5.6_序列化

    1. 什么是序列化

    序列化就是把内存中的对象,转换成字节序列(或其他数据传输协议)以便于存储到磁盘(持久化)和网络传输。

    反序列化就是将收到字节序列(或其他数据传输协议)或者是磁盘的持久化数据,转换成内存中的对家。


    2. 为什么要序列化?

    一般来说,“活的”对象只生存在内存里,关机断电就没有了。而且“活的”对象只能由本地的进程使用,不能被发送到网络上的另外一台计算机。然而序列化可以存储“活的”对象,可以将“活的”对象发送到远程计算机。


    3. 为什么不用Java的序列化 - serilazable

    Java的序列化是一个重量级序列化框架(Serilazable),一个对象被序列化后,会附带很多额外的信息(各种校验信息,header,继承体系等),不便于在网络中高效传输。所以,Hadoop自己开发了一套序列化机制(Writable),特点如下:

    1. 紧凑
      紧凑的格式能让我们充分利用网络带宽,而带宽是数据中心最稀缺的资源

    2. 快速
      进程通信形成了分布式系统的骨架,所以需要尽量减少序列化和反序列化的性能开销,这是基本的;

    3. 可扩展
      协议为了满足新的需求变化,所以控制客户端和服务器过程中,需要直接引进相应的协议,这些是新协议,原序列化方式能支持新的协议报文;

    4. 互操作
      能支持不同语言写的客户端和服务端进行交互;


    4. 常用的数据类型对应的Hadoop数据序列化类型

    Java类型 Hadoop Writable类型
    boolean BooleanWritable
    byte ByteWritable
    int IntWritable
    float FloatWritable
    long LongWritable
    double DoubleWritable
    String Text
    map MapWritable
    array ArrayWritable

    5.7_MapReduce编程规范

    用户编写的程序分成三个部分:Mapper、Reducer和Driver。

    1. Mapper阶段
      (1)用户自定义的Mapper要继承自己的父类Mapper

      (2)Mapper的输入数据是KV对的形式(KV的类型可自定义)

      (3)Mapper中的业务逻辑写在map()方法

      (4)Mapper的输出数据是KV对的形式(KV的类型可自定义)

      (5)map()方法(MapTak进程)对每一个<K,v>调用一次

    2. Reducer阶段
      (1)用户自定义的Reducer要继承自己的父类Mapper

      (2)Reducer的输入数据类型对应Mapper的输出数据类型,也是KV

      (3)Reducer的业务逻辑写在reduce()方法中

      (4)ReduceTask进程对每一组相同<k,v>组调用一次reduce()方法

    3. Driver阶段
      相当于YARN集群的客户端,用于提交我们整个程序到YARN集群,提交的是封装了MapReduce程序相关运行参数的job对象


    5.8_统计案例

    1. 统计单词案例

    需求:

    • 统计相同单词的个数
    MapDemo.java
    package com.atSchool.MapReduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * Map 阶段
     * Map的输入是在读一个文本,输入时的key是一个偏移量,value是文本的类型
     * 输入的key是偏移量,value是读到的文本
     * 输出的key是文本类型,value就是Hadoop中的整数类型
     */
    public class MapDemo extends Mapper<LongWritable, Text, Text, IntWritable> {
        /**
    	 * 该类使用标准UTF-8编码存储文本。它提供了在字节级别序列化、反序列化和比较文本的方法。
    	 * 长度类型为整数,并使用零压缩格式序列化.
    	 * 此外,它还提供了字符串遍历的方法,而不将字节数组转换为字符串。 
    	 * 还包括用于序列化/反序列化字符串、编码/解码字符串、检查字节数组是否包含有效UTF-8代码、计算编码字符串长度的实用程序。
    	 */
    	private Text keyword = new Text();
    	private IntWritable keyvalue = new IntWritable(1);
    
    	/**
    	 * 参数说明: context:上下文对象,贯穿整个任务的对象
    	 */
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    			throws IOException, InterruptedException {
    		// 1、获取一行
    		String line = value.toString();
    
    		// 2、截取
    		String[] split = line.split(",");
    
    		// 3、输出到上下文对象中
    		for (String string : split) {
    			keyword.set(string);
    			context.write(keyword, keyvalue);
    		}
    	}
    }
    
    ReduceDemo.java
    package com.atSchool.MapReduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * Reduce 阶段
     * Reduce的输入和Map的输出应该是一样的,输出的话保持不变就可以了
     * 到了Reduce阶段,传过来的数据是 hadoop-{1,1,1} 的形式
     */
    public class ReduceDemo extends Reducer<Text, IntWritable, Text, IntWritable> {
    	IntWritable val = new IntWritable();
    
    	/**
    	 * 参数1Text key:Map传过来的key 参数2Iterable<IntWritable> values(是一个迭代器)中存储的是{1,1,1}
    	 */
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values,
    			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    		// 累加
    		int sum = 0;
    		for (IntWritable intWritable : values) {
    			sum += intWritable.get();
    		}
    		
    		// 输出
    		val.set(sum);
    		context.write(key, val);
    	}
    }
    
    WordCountJob.java
    package com.atSchool.MapReduce;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class WordCountJob extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new WordCountJob(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(WordCountJob.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(MapDemo.class);
    		job.setReducerClass(ReduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/world.txt"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/out");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
                System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    2. 统计相同字母组成的单词案例

    MapDemo.java
    package com.atSchool.MapReduce;
    
    import java.io.IOException;
    import java.util.Arrays;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * Map 阶段 Map的输入是在读一个文本,输入时的key是一个偏移量,value是文本的类型
     * 输出的key是文本类型,value就是Hadoop中的整数类型
     */
    public class MapDemo extends Mapper<LongWritable, Text, Text, Text> {
    	/**
    	 * 该类使用标准UTF-8编码存储文本。它提供了在字节级别序列化、反序列化和比较文本的方法。 长度类型为整数,并使用零压缩格式序列化.
    	 * 此外,它还提供了字符串遍历的方法,而不将字节数组转换为字符串。
    	 * 还包括用于序列化/反序列化字符串、编码/解码字符串、检查字节数组是否包含有效UTF-8代码、计算编码字符串长度的实用程序。
    	 */
    	private Text outKey = new Text();
    
    	/**
    	 * 参数说明: context:上下文对象,贯穿整个任务的对象
    	 */
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		// 1、获取一行
    		String line = value.toString().trim();
    
    		// 2、转成字符数组,并进行排序
    		char[] charArray = line.toCharArray();
    		Arrays.sort(charArray);
    		String string = new String(charArray);
    
    		// 3、输出到上下文对象中
    		outKey.set(string);
    		// 输出的key是排好序后的数据,value是传过来的原数据
    		context.write(outKey, value);
    	}
    }
    
    ReduceDemo.java
    package com.atSchool.MapReduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * Reduce 阶段 Reduce的输入和Map的输出应该是一样的 输出的话保持不变就可以了
     * 到了Reduce阶段,传过来的数据是hadoop{1,1,1}的形式
     */
    public class ReduceDemo extends Reducer<Text, Text, Text, Text> {
    	Text val = new Text();
    
    	/**
    	 * 参数1Text key:Map传过来的key 参数2Iterable<IntWritable> values(是一个迭代器)中存储的是{1,1,1}
    	 */
    	@Override
    	protected void reduce(Text key, Iterable<Text> values, Reducer<Text, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		StringBuffer stringBuffer = new StringBuffer();
    		// 累加
    		for (Text text : values) {
    			stringBuffer.append("-" + text.toString());
    		}
    
    		// 输出
    		val.set(stringBuffer.toString());
    		context.write(key, val);
    	}
    }
    
    WordCountJob.java
    package com.atSchool.MapReduce;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class WordCountJob extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new WordCountJob(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(WordCountJob.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(MapDemo.class);
    		job.setReducerClass(ReduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/world.txt"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    3. 气象平均值案例 (获取文件名)

    分析资料:

    • 链接:https://pan.baidu.com/s/18-U1mQ3bimG7sBH4twds9A 提取码:fs79
    MapDemo.java
    package com.atSchool.MapReduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * Map 阶段 Map的输入是在读一个文本,输入时的key是一个偏移量,value是文本的类型
     * 输出的key是文本类型,value就是Hadoop中的整数类型
     */
    public class MapDemo extends Mapper<LongWritable, Text, Text, IntWritable> {
    	/**
    	 * 该类使用标准UTF-8编码存储文本。它提供了在字节级别序列化、反序列化和比较文本的方法。 长度类型为整数,并使用零压缩格式序列化.
    	 * 此外,它还提供了字符串遍历的方法,而不将字节数组转换为字符串。
    	 * 还包括用于序列化/反序列化字符串、编码/解码字符串、检查字节数组是否包含有效UTF-8代码、计算编码字符串长度的实用程序。
    	 */
    	private Text outKey = new Text();
    	private IntWritable outValue = new IntWritable();
    
    	/**
    	 * 参数说明: context:上下文对象,贯穿整个任务的对象
    	 */
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    			throws IOException, InterruptedException {
    		/**
    		 * 1、Hadoop将MapReduce的输入数据划分成等长的小数据块,称为输入分片(inputsplit)或简称为“分片”。
    		 * 2、Hadoop为每个分片构建一个map任务,并由该任务来运行用户自定义的map函数从而处理分片中的每条数据。
    		 * 3、getSplits()负责将文件切分成多个分片(InputSplit),但InputSplit并没有实际切分文件,而只是说明了如何切分数据,也就是说,InputSplit只是逻辑上的切分。
    		 * 4、每个InputSplit对应一个map任务。作为map的输入,在逻辑上提供了这个map任务所要处理的key-value对。
    		 */
    		// 获取到文件名对应的InputSplit
    		InputSplit inputSplit = context.getInputSplit();
    		// 强转成子类类型FileSplit
    		FileSplit fSplit = (FileSplit) inputSplit;
    		// 获取到路径
    		Path path = fSplit.getPath();
    		// 获取到文件名
    		String name = path.getName();
    
    		// 1、获取一行
    		String line = value.toString();
    
    		// 2、截取
    		String[] split = line.split(" +");
    
    		// 3、输出到上下文对象中
    		outKey.set(name);
    		int valueOf = Integer.valueOf(split[4].trim());
    		// 由于数据文件中存在脏数据,所以要进行判断处理
    		if (valueOf != -9999) {
    			outValue.set(valueOf);
    			context.write(outKey, outValue);
    		}
    	}
    }
    
    ReduceDemo.java
    package com.atSchool.MapReduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * Reduce 阶段 Reduce的输入和Map的输出应该是一样的 输出的话保持不变就可以了
     * 到了Reduce阶段,传过来的数据是hadoop{1,1,1}的形式
     */
    public class ReduceDemo extends Reducer<Text, IntWritable, Text, IntWritable> {
    	IntWritable val = new IntWritable();
    
    	/**
    	 * 参数1Text key:Map传过来的key 参数2Iterable<IntWritable> values(是一个迭代器)中存储的是{1,1,1}
    	 */
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values,
    			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    		// 累加
    		int sum = 0; // 存储累加后的值
    		int count = 0; // 统计数据的个数
    		for (IntWritable intWritable : values) {
    			sum += intWritable.get();
    			count++;
    		}
    		sum /= count;
    
    		// 输出
    		val.set(sum);
    		context.write(key, val);
    	}
    }
    
    WordCountJob.java
    package com.atSchool.MapReduce;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class WordCountJob extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new WordCountJob(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(WordCountJob.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(MapDemo.class);
    		job.setReducerClass(ReduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/weather_forecast_source"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    4. 统计IP流量案例

    分析资料:

    • 链接:https://pan.baidu.com/s/16e418utH22G09FuLwXtFIQ 提取码:en5r
    MapDemo.java
    package com.atSchool.MapReduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * Map 阶段 Map的输入是在读一个文本,输入时的key是一个偏移量,value是文本的类型
     * 输出的key是文本类型,value就是Hadoop中的整数类型
     */
    public class MapDemo extends Mapper<LongWritable, Text, Text, IntWritable> {
    	/**
    	 * 该类使用标准UTF-8编码存储文本。它提供了在字节级别序列化、反序列化和比较文本的方法。 长度类型为整数,并使用零压缩格式序列化.
    	 * 此外,它还提供了字符串遍历的方法,而不将字节数组转换为字符串。
    	 * 还包括用于序列化/反序列化字符串、编码/解码字符串、检查字节数组是否包含有效UTF-8代码、计算编码字符串长度的实用程序。
    	 */
    	private Text outKey = new Text();
    	private IntWritable outValue = new IntWritable();
    
    	/**
    	 * 参数说明: context:上下文对象,贯穿整个任务的对象
    	 */
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    			throws IOException, InterruptedException {
    		// 1、获取一行
    		String line = value.toString();
    
    		// 2、截取
    		String[] split = line.split(" +");
    
    		// 3、输出到上下文对象中
    		// 由于数据文件中存在脏数据,所以要进行判断处理
    		if (!split[split.length - 1].trim().equals("-")) {
    			outKey.set(split[0].trim()); // name 为ip地址
    			outValue.set(Integer.valueOf(split[split.length - 1].trim()));
    			context.write(outKey, outValue);
    		}
    	}
    }
    
    ReduceDemo.java
    package com.atSchool.MapReduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * Reduce 阶段 Reduce的输入和Map的输出应该是一样的 输出的话保持不变就可以了
     * 到了Reduce阶段,传过来的数据是hadoop{1,1,1}的形式
     */
    public class ReduceDemo extends Reducer<Text, IntWritable, Text, IntWritable> {
    	IntWritable val = new IntWritable();
    
    	/**
    	 * 参数1Text key:Map传过来的key 参数2Iterable<IntWritable> values(是一个迭代器)中存储的是{1,1,1}
    	 */
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values,
    			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    		// 累加
    		int sum = 0; // 存储累加后的值
    		for (IntWritable intWritable : values) {
    			sum += intWritable.get();
    		}
    
    		// 输出
    		val.set(sum);
    		context.write(key, val);
    	}
    }
    
    WordCountJob.java
    package com.atSchool.MapReduce;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class WordCountJob extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new WordCountJob(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(WordCountJob.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(MapDemo.class);
    		job.setReducerClass(ReduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/traffic_source"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    运行MapReduce程序没有过程?

    将Windows下安装的Hadoop下的hadoop-2.7.3\etc\hadoop\log4j.properties文件复制到项目的src目录下即可。


    执行成功后生成的两个文件说明?

    _SUCESS:仅仅用来说明执行成功了。

    part-r-00000:存储执行过后的结果。


    5.9_自定义序列化(排序)

    1. 规范

    自定义bean对象要想序列化传输,必须实现序列化接口,需要注意以下几项。

    1. 必须实现writable接口

    2. 反序列化时,需要反射调用空参构造函数,所以必须有空参构造

      public FlowBean() { 
          super();
      }
      
    3. 重写序列化方法

      @override
      public void write(DataOutput out)throws IOException {
      	out.writeLong(upFlow);
          out.writeLong(downFlow);
          out.writeLong(sumFlow);
      }
      
    4. 重写反序列化方法

      @override
      public void readFields(DataInput in)throws IOException {
      	upFlow = in.readLong();
          downFlow = in.readLong();
          sumFlow = in.readLong();
      }
      
    5. 注意反序列化的顺序和序列化的顺序完全一致

    6. 要想把结果显示在文件中,需要重写toString(),可用”\t"分开,方便后续用。

    2. 流量统计案例

    分析资料:

    1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
    1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
    1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
    1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
    1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	视频网站	15	12	1527	2106	200
    1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
    1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
    1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	20	20	3156	2936	200
    1363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
    1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站点统计	24	9	6960	690	200
    1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	200
    1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站点统计	3	3	1938	180	200
    1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
    1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
    1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	综合门户	15	12	1938	2910	200
    1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
    1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	综合门户	57	102	7335	110349	200
    1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	200
    1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
    1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
    1363157985066 	13560436666	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
    1363157993055 	13560436666	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
    
    // 说明:
    1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com	24		27				2481	  24681		200
    记录报告时间戳		手机号码		AP mac						AC mac		访问的网址	  网址种类  上行数据包个数	  下行数据包个数  上行流量  下行流量
    

    需求说明:

    • 统计每一个号码的上行流量、下行流量、总流量
    FlowBean.java
    package com.atSchool.Serialize;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.Writable;
    
    /**
     * 自定义序列化类
     * 1、实现Writable接口
     */
    public class FlowBean implements Writable {
    	// 上行流量(即上传时使用的流量)
    	private Long upFlow;
    
    	// 下行流量(即下载时使用的流量)
    	private Long downFlow;
    
    	// 总流量
    	private Long sumFlow;
    
    	// 2、反序列化时,需要反射调用空参构造函数,所以必须有空参构造
    	public FlowBean() {
    		super();
    	}
    
    	// 带参的构造器
    	public FlowBean(Long upFlow, Long downFlow) {
    		this.upFlow = upFlow;
    		this.downFlow = downFlow;
    		this.sumFlow = upFlow + downFlow;
    	}
    
    	// 3、重写序列化方法
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeLong(upFlow);
    		out.writeLong(downFlow);
    		out.writeLong(sumFlow);
    	}
    
    	// 4、重写反序列化方法
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		upFlow = in.readLong();
    		downFlow = in.readLong();
    		sumFlow = in.readLong();
    	}
    
    	/**
    	 * 重写toString方法
    	 * 如果不重写toString方法,则不会输出FlowBean中的内容,回输出以下内容:
    	 * 13480253104	com.atSchool.Serialize.FlowBean@77353c84
    	 * 13502468823	com.atSchool.Serialize.FlowBean@31b3f152
    	 * 13560436666	com.atSchool.Serialize.FlowBean@52a567cf
    	 * 13560439658	com.atSchool.Serialize.FlowBean@3c17b48b
    	 * 13602846565	com.atSchool.Serialize.FlowBean@62186e91
    	 * 13660577991	com.atSchool.Serialize.FlowBean@4c0f3ae1
    	 * 13719199419	com.atSchool.Serialize.FlowBean@356db7b0
    	 */
    	@Override
    	public String toString() {
    		return upFlow + "\t" + downFlow + "\t" + sumFlow;
    	}
    
    	public Long getUpFlow() {
    		return upFlow;
    	}
    
    	public void setUpFlow(Long upFlow) {
    		this.upFlow = upFlow;
    	}
    
    	public Long getDownFlow() {
    		return downFlow;
    	}
    
    	public void setDownFlow(Long downFlow) {
    		this.downFlow = downFlow;
    	}
    
    	public Long getSumFlow() {
    		return sumFlow;
    	}
    
    	public void setSumFlow(Long sumFlow) {
    		this.sumFlow = sumFlow;
    	}
    }
    
    MapSerialize.java
    package com.atSchool.Serialize;
    
    import java.io.IOException;
    
    import javax.xml.transform.OutputKeys;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class MapSerialize extends Mapper<LongWritable, Text, Text, FlowBean> {
    	private FlowBean outValue;
    	private Text outKey = new Text();
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FlowBean>.Context context)
    			throws IOException, InterruptedException {
    		// 读取一行
    		String line = value.toString();
    
    		// 分割
    		String[] split = line.split("\t+");
    
    		// 输出到上下文对象中
    		String tel = split[1].trim();
    		String upFlow = split[split.length - 3].trim();
    		String downFlow = split[split.length - 2].trim();
    
    		outKey.set(tel);
    		outValue = new FlowBean(Long.parseLong(upFlow), Long.parseLong(downFlow));
            
            // 输出的key:电话号码;value:FlowBean对象
    		context.write(outKey, outValue);
    	}
    }
    
    ReduceSerialize.java
    package com.atSchool.Serialize;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class ReduceSerialize extends Reducer<Text, FlowBean, Text, FlowBean> {
    	@Override
    	protected void reduce(Text key, Iterable<FlowBean> value, Reducer<Text, FlowBean, Text, FlowBean>.Context context)
    			throws IOException, InterruptedException {
    		long upFlow = 0;
    		long downFlow = 0;
            // 累加上行和下行流量数
    		for (FlowBean flowBean : value) {
    			upFlow += flowBean.getUpFlow();
    			downFlow += flowBean.getDownFlow();
    		}
    
            // 输出的key:电话号码;value:FlowBean对象
    		context.write(key, new FlowBean(upFlow, downFlow));
    	}
    }
    
    FlowJob.java
    package com.atSchool.Serialize;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class FlowJob extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new FlowJob(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(FlowJob.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(MapSerialize.class);
    		job.setReducerClass(ReduceSerialize.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(FlowBean.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(FlowBean.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/phone_traffic.txt"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    3. 流量排序案例(对流量统计案例的结果进行排序处理)

    MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是 map输出的key

    所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable,然后重写keycompareTo方法,最后在reduce中把传入的maperr处理好的数据的keyvalue进行调换,这样输出结果就是手机号在前,其他的在后了

    FlowBean.java
    package com.atSchool.Serialize;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    /**
     * 自定义序列化类
     * 1、实现WritableComparable接口
     * 说明:
     *		public interface WritableComparable<T> extends Writable, Comparable<T> {}
     *		该接口继承了Writable, Comparable两个接口,所以直接实现该接口即可
     */
    public class FlowBean implements WritableComparable<FlowBean> {
    	// 上行流量(即上传时使用的流量)
    	private Long upFlow;
    
    	// 下行流量(即下载时使用的流量)
    	private Long downFlow;
    
    	// 总流量
    	private Long sumFlow;
    
    	// 2、反序列化时,需要反射调用空参构造函数,所以必须有空参构造
    	public FlowBean() {
    		super();
    	}
    
    	// 带参的构造器
    	public FlowBean(Long upFlow, Long downFlow) {
    		this.upFlow = upFlow;
    		this.downFlow = downFlow;
    		this.sumFlow = upFlow + downFlow;
    	}
    	
    	// 为了在排序的时候使用(排序时已经知道了sumFlow,再用上面的一个进行加减不太好)
    	public FlowBean(Long upFlow, Long downFlow, Long sumFlow) {
    		this.upFlow = upFlow;
    		this.downFlow = downFlow;
    		this.sumFlow = sumFlow;
    	}
    
    	// 3、重写序列化方法
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeLong(upFlow);
    		out.writeLong(downFlow);
    		out.writeLong(sumFlow);
    	}
    
    	// 4、重写反序列化方法
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		upFlow = in.readLong();
    		downFlow = in.readLong();
    		sumFlow = in.readLong();
    	}
    
    	// 重写toString方法
    	@Override
    	public String toString() {
    		return upFlow + "\t" + downFlow + "\t" + sumFlow;
    	}
    
    	public Long getUpFlow() {
    		return upFlow;
    	}
    
    	public void setUpFlow(Long upFlow) {
    		this.upFlow = upFlow;
    	}
    
    	public Long getDownFlow() {
    		return downFlow;
    	}
    
    	public void setDownFlow(Long downFlow) {
    		this.downFlow = downFlow;
    	}
    
    	public Long getSumFlow() {
    		return sumFlow;
    	}
    
    	public void setSumFlow(Long sumFlow) {
    		this.sumFlow = sumFlow;
    	}
    
        // 重写了compareTo方法,设置成按照sumFlow的倒序排序
    	@Override
    	public int compareTo(FlowBean flowBean) {
    		return sumFlow > flowBean.sumFlow ? -1 : 1;
    	}
    }
    
    MapSort.java

    这里先将 keyvalue 颠倒进行排序

    package com.atSchool.Serialize;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * 这里因为是要按sumFlow排序,所以将输出的key设置为FlowBean类型
     */
    public class MapSort extends Mapper<LongWritable, Text, FlowBean, Text> {
    	private Text outValue = new Text();
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, FlowBean, Text>.Context context)
    			throws IOException, InterruptedException {
    		String line = value.toString();
    
    		// 分割
    		String[] split = line.split("\t+");
    
    		// 输出到上下文对象中
    		String tel = split[0].trim();
    		String upFlow = split[1].trim();
    		String downFlow = split[2].trim();
    		String sumFlow = split[3].trim();
    		FlowBean flowBean = new FlowBean(Long.parseLong(upFlow), Long.parseLong(downFlow), Long.parseLong(sumFlow));
    
    		outValue.set(tel);
    		context.write(flowBean, outValue);
    	}
    }
    
    ReduceSort.java

    这里再将 keyvalue 换回来。

    package com.atSchool.Serialize;
    
    import java.io.IOException;
    import java.util.Iterator;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class ReduceSort extends Reducer<FlowBean, Text, Text, FlowBean> {
    
    	@Override
    	protected void reduce(FlowBean key, Iterable<Text> value, Reducer<FlowBean, Text, Text, FlowBean>.Context context)
    			throws IOException, InterruptedException {
            // 这里不需要遍历value,因为传过来的value就只是一个电话号码,直接拿出来就好了
    		Iterator<Text> iterator = value.iterator();
    		Text next = iterator.next();
    
    		context.write(next, key);
    	}
    }
    
    SortJob.java
    package com.atSchool.Serialize;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class SortJob extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new SortJob(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(FlowJob.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(MapSort.class);
    		job.setReducerClass(ReduceSort.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(FlowBean.class);
    		job.setMapOutputValueClass(Text.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(FlowBean.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/MapReduceOut/part-r-00000"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/SortOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    



    5.10_计数器

    计数器是用来记录job的执行进度和状态的。它的作用可以理解为日志。我们可以在程序的某个位置插入计数器,记录数据或者进度的变化情况。

    MapReduce计数器(Counter)为我们提供一个窗口,用于观察MapReduce Job运行期的各种细节数据。对MapReduce性能调优很有帮助,MapReduce性能优化的评估大部分都是基于这些Counter的数值表现出来的。

    MapReduce自带了许多默认Counter,现在我们来分析这些默认Counter的含义,方便大家观察Job结果,如输入的字节数、输出的字节数、Map端输入/输出的字节数和条数、Reduce端的输入/输出的字节数和条数等。


    1. 解读:计数器Counter

    ...
    ...
    21/04/02 11:06:49 INFO mapreduce.Job: Counters: 35		// Counters: 35 - 表示运行过程中使用到了35种计数器
    	File System Counters	// 计数器
            FILE: Number of bytes read=46873922
            FILE: Number of bytes written=107355440
            FILE: Number of read operations=0
            FILE: Number of large read operations=0
            FILE: Number of write operations=0
            HDFS: Number of bytes read=765843722
            HDFS: Number of bytes written=624014
            HDFS: Number of read operations=37
            HDFS: Number of large read operations=0
            HDFS: Number of write operations=10
        Map-Reduce Framework	// 计数器
            Map input records=1948789
            Map output records=1151021
            Map output bytes=21132892
            Map output materialized bytes=23434952
            Input split bytes=387
            Combine input records=0
            Combine output records=0
            Reduce input groups=31046
            Reduce shuffle bytes=23434952
            Reduce input records=1151021
            Reduce output records=31046
            Spilled Records=2302042
            Shuffled Maps =3
            Failed Shuffles=0
            Merged Map outputs=3
            GC time elapsed (ms)=1612
            Total committed heap usage (bytes)=3603431424
        Shuffle Errors		// 计数器
            BAD_ID=0		
            CONNECTION=0
            IO_ERROR=0
            WRONG_LENGTH=0
            WRONG_MAP=0
            WRONG_REDUCE=0
        File Input Format Counters 		// 计数器
            Bytes Read=218157941
        File Output Format Counters 	// 计数器
            Bytes Written=624014
    
    1. File System Counters:MR-Job执行依赖的数据来自不同的文件系统,这个group表示job与文件系统交互的读写统计

      • HDFS: Number of bytes read=765843722

        **说明:**map从HDFS读取数据,包括源文件内容、split元数据。所以这个值比FileInputFormatCounters.BYTES_READ 要略大些。

      • FILE: Number of bytes written=107355440

        **说明:**表示map task往本地磁盘中总共写了多少字节(其实,Reduce端的Merge也会写入本地File)

      • FILE: Number of bytes read=46873922

        **说明:**reduce从本地文件系统读取数据(map结果保存在本地磁盘)

      • HDFS: Number of bytes written=624014

        **说明:**最终结果写入HDFS

    2. Job Counters(上面的例子种没有出现):MR子任务统计,即map tasks 和 reduce tasks

      • Launched map tasks=4

        **说明:**启用map task的个数

      • Launched reduce tasks=5

        **说明:**启用reduce task的个数

    3. Map-Reduce Framework:MR框架计数器

      • Map input records=1948789

        **说明:**map task从HDFS读取的文件总行数

      • Reduce input groups=31046

        **说明:**Reduce输入的分组个数,如<hello,{1,1}> <me,1> <you,1>。如果有Combiner的话,那么这里的数值就等于map端Combiner运算后的最后条数,如果没有,那么就应该等于map的输出条数

      • Combine input records=0

        **说明:**Combiner输入 = map输出

      • Spilled Records=2302042

        **说明:**spill过程在map和reduce端都会发生,这里统计在总共从内存往磁盘中spill了多少条数据

    4. Shuffle Errors:

    5. File Input Format Counters:文件输入格式化计数器

      • Bytes Read=218157941

        **说明:**map阶段,各个map task的map方法输入的所有value值字节数之和

    6. File Output Format Counters:文件输出格式化计数器

      • Bytes Written=624014

        **说明:**MR输出总的字节数,包括【单词】,【空格】,【单词个数】及每行的【换行符】


    2. 自定义计数器

    //自定义计数器<Key , Value>的形式
    Counter counter = context.getCounter("查找hello", "hello");
    if (string.contains("hello")) {
    	counter.increment(1L);	//出现一次+1
    }
    

    3. map执行前后说明

    在map方法运行的前后各会有一个方法执行

    • 前:setup(context);方法
    • 后:cleanup(context);方法

    所以写计数器的时候我们不需要reduce,直接重写cleanup进行输出即可。


    4. 统计文件的行数和单词的个数案例

    WordCounterMapper.java
    package com.atSchool.Counter;
    
    import java.io.IOException;
    
    import org.apache.commons.lang.StringUtils;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.counters.CounterGroupBase;
    import org.apache.zookeeper.txn.Txn;
    
    /**
     * 计数总的行数和单词数量
     */
    public class WordCounterMapper extends Mapper<LongWritable, Text, Text, IntWritable> {
    	enum COUNTERS {
    		ALLLINES, ALLWORDS
    	}
    
    	Counter counter = null;
    	Counter counter2 = null;
    
    	/**
    	 * 对输入拆分中的每个键/值对调用一次。大多数应用程序都应该覆盖这一点,但默认的是标识函数。
    	 */
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    			throws IOException, InterruptedException {
    		/**
    		 * 获取一个计数器
    		 * 
    		 * 此处用了动态代理
    		 */
    		counter = context.getCounter(COUNTERS.ALLLINES);
    		counter2 = context.getCounter(COUNTERS.ALLWORDS);
    
    		/**
    		 * void increment(long incr) 按给定值递增此计数器 参数: incr-增加该计数器的价值
    		 */
    		counter.increment(1L); // 出现一次 +1
    
    		String[] split = value.toString().split(" +");
    		for (String string : split) {
    			/**
    			 * public static boolean isNotBlank(java.lang.String str)
    			 * 检查字符串是否为空(“”),不为空,是否仅为空白。
    			 * 示例:
    			 * 		StringUtils.isNotBlank(null) = false
    			 * 		StringUtils.isNotBlank("") = false
    			 * 		StringUtils.isNotBlank(" ") = false
    			 * 		StringUtils.isNotBlank("bob") = true
    			 * 		StringUtils.isNotBlank(" bob ") = true
    			 */
    			if (StringUtils.isNotBlank(string)) {
    				counter2.increment(1L); // 出现一次 +1
    			}
    		}
    	}
    
    	/**
    	 * 在map方法运行的前后各会有一个方法执行
    	 * 前:setup(context);方法
    	 * 后:cleanup(context);方法
    	 * 
    	 * 所以这里我们不需要reduce,直接重写cleanup进行输出即可。
    	 */
    	@Override
    	protected void cleanup(Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    			throws IOException, InterruptedException {
    		// 去除计数器
    		String name = counter.getName();
    		long value = counter.getValue();
    		String name2 = counter2.getName();
    		long value2 = counter2.getValue();
    		context.write(new Text(name), new IntWritable((int) value));
    		context.write(new Text(name2), new IntWritable((int) value2));
    	}
    }
    
    WordCounterJob.java
    package com.atSchool.Counter;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class WordCounterJob extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new WordCounterJob(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(WordCounterJob.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(WordCounterMapper.class);
    		job.setNumReduceTasks(0); // !!!!!!这里用为没有用到reduce所以设置为0!!!!!!!!
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/a.txt"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    5. 统计超速车辆案例

    分析资料:

    • 链接:https://pan.baidu.com/s/1XAVzBBEnZkBW_8bvVPqX8w 提取码:5gd0

    数据说明:

    日期		  摄像机ID       地址        车速    纬度        经度               位置
    07/01/2014,CHI003,4124 W FOSTER AVE,123,41.9756053,-87.7316698,"(41.9756053, -87.7316698)"
    07/01/2014,CHI004,5120 N PULASKI,68,41.9743327,-87.728347,"(41.9743327, -87.728347)"
    07/01/2014,CHI005,2080 W PERSHING,68,41.8231888,-87.6773488,"(41.8231888, -87.6773488)"
    07/01/2014,CHI007,3843 S WESTERN,75,41.823564,-87.6847211,"(41.823564, -87.6847211)"
    07/01/2014,CHI008,3655 W JACKSON,15,41.8770708,-87.7181683,"(41.8770708, -87.7181683)"
    07/01/2014,CHI009,3646 W MADISON,50,41.8809382,-87.7178984,"(41.8809382, -87.7178984)"
    07/01/2014,CHI010,1111 N HUMBOLDT,77,,,
    

    需求:统计车速超过120的车辆

    SpeedMaperr.java
    package com.atSchool.Counter.overspeed;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Counter;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class SpeedMaperr extends Mapper<LongWritable, Text, Text, LongWritable> {
    	enum speed {
    		OVERSPPD_CARS
    	}
    
    	Counter counter = null;
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
    			throws IOException, InterruptedException {
    		// 获取计数器
    		counter = context.getCounter(speed.OVERSPPD_CARS);
    
    		// 处理数据
    		String[] split = value.toString().split(",");
    
    		// 由于第一行是表头,所以要稍微的处理一下、
    		if (Character.isDigit(split[3].charAt(0))) {
    			// 统计
    			if (Integer.parseInt(split[3].trim()) >= 120) {
    				counter.increment(1L); // 只要车速超过120就算超速+1
    			}
    		}
    	}
    
    	@Override
    	protected void cleanup(Mapper<LongWritable, Text, Text, LongWritable>.Context context)
    			throws IOException, InterruptedException {
    		String name = counter.getName();
    		long value = counter.getValue();
    		context.write(new Text(name), new LongWritable(value));
    	}
    }
    
    SpeedJob.java
    package com.atSchool.Counter.overspeed;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class SpeedJob extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new SpeedJob(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(SpeedJob.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(SpeedMaperr.class);
    		job.setNumReduceTasks(0); // 这里用为没有用到reduce所以设置为0
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(LongWritable.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/overspeed_cars.csv"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    5.11_MapReduce中的cleanup和setup

    我们知道reduce和map都有一个局限性就是map是读一行执行一次reduce是每一组执行一次但是当我们想全部得到数据之后,按照需求筛选然后再输出(例如,经典的topN问题)怎么办?这时候只使用map和reduce显然是达不到目的的?那该怎么呢?

    这时候我们想到了setUp和cleanUp的特性,只执行一次。这样我们对于最终数据的过滤,然后输出要放在cleanUp中。这样就能实现对数据,不一组一组输出,而是全部拿到,最后过滤输出。


    topN问题案例

    分析:

    • 我们知道mapreduce有切分、聚合的功能,所以第一步就是:先在map种把每个单词读出来,然后在reduce中聚合,求出每个单词出现的次数

    • 但是怎么控制只输出前三名呢?我们知道,map是读一行执行一次,reduce是每一组执行一次,所以只用map和reduce是无法控制输出的次数的,

      但是我们又知道,无论map或者reduce都有setUp和cleanUp,而且这两个方法只执行一次

      所以我们可以在reduce阶段把每一个单词当做key,单词出现的次数当做value,每一组存放到一个map集合里面(此时只存,不写出)。在reduce的cleanUp阶段对map进行排序,然后输出前三名

    maperrDemo.java
    package com.atSchool.topN;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class maperrDemo extends Mapper<LongWritable, Text, Text, IntWritable> {
    	private Text outKey = new Text();
    	private IntWritable outValue = new IntWritable(1);
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    			throws IOException, InterruptedException {
    		String[] split = value.toString().split(" +");
    		for (String string : split) {
    			outKey.set(string);
    			context.write(outKey, outValue);
    		}
    	}
    }
    
    reduceDemo.java
    package com.atSchool.topN;
    
    import java.io.IOException;
    import java.util.Collections;
    import java.util.Comparator;
    import java.util.HashMap;
    import java.util.LinkedList;
    import java.util.Map;
    import java.util.Map.Entry;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class reduceDemo extends Reducer<Text, IntWritable, Text, IntWritable> {
    	// HashMap的key不能重复
    	Map<String, Integer> hashMap = new HashMap<>();
    
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values,
    			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    		int count = 0;
    		for (IntWritable intWritable : values) {
    			count += intWritable.get();
    		}
    
    		// 拿到每个单词和总的数量后
    		hashMap.put(key.toString(), count);
    	}
    
    	@Override
    	protected void cleanup(Reducer<Text, IntWritable, Text, IntWritable>.Context context)
    			throws IOException, InterruptedException {
    		// 进行排序后输出前三名
    		/**
    		 * Set<Map.Entry<K,V>> entrySet():获取 键值对对象 的集合 
    		 * 说明: Map.Entry<K,V>:描述的是键值对的对象实体
    		 * 这个对象的方法有: a
    		 * 		K getKey() 返回与此条目相对应的键。 
    		 * 		V getValue() 返回与此条目相对应的值。
    		 */
    		LinkedList<Entry<String, Integer>> linkedList = new LinkedList<>(hashMap.entrySet());
    		// 排序
    		Collections.sort(linkedList, new Comparator<Map.Entry<String, Integer>>() {
    			@Override
    			public int compare(Entry<String, Integer> o1, Entry<String, Integer> o2) {
    				return o2.getValue() - o1.getValue();
    			}
    		});
    
    		for (int i = 0; i < linkedList.size(); i++) {
    			if (i <= 2) {
    				context.write(new Text(linkedList.get(i).getKey()), new IntWritable(linkedList.get(i).getValue()));
    			}
    		}
    	}
    }
    
    jobDemo.java
    package com.atSchool.topN;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class jobDemo extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new jobDemo(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(jobDemo.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(maperrDemo.class);
    		job.setReducerClass(reduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/a.txt"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    5.12_MapReduce中的Combiner

    MapReduce中的Combiner是为了避免map任务和reduce任务之间的数据传输而设置的。Hadoop允许用户针对maptask的输出指定一个合并函数。即为了减少传输到Reduce中的数据量。它主要是为了削减Mapper的输出从而减少网络带宽和Reducer之上的负载。


    1. Combiner和Reducer的区别

    1. Combiner和Reducer的区别在于运行的位置:Combiner是在每一个MapTask所在的节点运行,Reducer是接收全局所有Mapper的输出结果
    2. Combiner的输入key-value的类型就是Mapper组件输出的key-value的类型,Combiner的输出key-value要跟reducer的输入key-value类型要对应起来
    3. Combiner的使用要非常谨慎,因为Combiner在MapReduce过程中是可选的组件,可能调用也可能不调用,可能调一次也可能调多次,所以:Combiner使用的原则是:有或没有都不能影响业务逻辑,都不能影向最终结果

    2. 统计单词案例

    1. 没有使用Combiner
    21/04/04 14:14:27 INFO mapreduce.Job: Counters: 35
    	File System Counters
    		FILE: Number of bytes read=5975774
    		FILE: Number of bytes written=9544692
    		FILE: Number of read operations=0
    		FILE: Number of large read operations=0
    		FILE: Number of write operations=0
    		HDFS: Number of bytes read=3562268
    		HDFS: Number of bytes written=39
    		HDFS: Number of read operations=15
    		HDFS: Number of large read operations=0
    		HDFS: Number of write operations=6
    	Map-Reduce Framework
    		Map input records=172368
    		Map output records=229824
    		Map output bytes=2528064
    		Map output materialized bytes=2987718
    		Input split bytes=98
    		Combine input records=0			// 这里显示0
    		Combine output records=0
    		Reduce input groups=3
    		Reduce shuffle bytes=2987718
    		Reduce input records=229824
    		Reduce output records=3
    		Spilled Records=459648
    		Shuffled Maps =1
    		Failed Shuffles=0
    		Merged Map outputs=1
    		GC time elapsed (ms)=28
    		Total committed heap usage (bytes)=466616320
    	Shuffle Errors
    		BAD_ID=0
    		CONNECTION=0
    		IO_ERROR=0
    		WRONG_LENGTH=0
    		WRONG_MAP=0
    		WRONG_REDUCE=0
    	File Input Format Counters 
    		Bytes Read=1781134
    	File Output Format Counters 
    		Bytes Written=39
    执行成功
    

    2. 使用Combiner后
    21/04/04 14:25:39 INFO mapreduce.Job: Counters: 35
    	File System Counters
    		FILE: Number of bytes read=426
    		FILE: Number of bytes written=582500
    		FILE: Number of read operations=0
    		FILE: Number of large read operations=0
    		FILE: Number of write operations=0
    		HDFS: Number of bytes read=3562268
    		HDFS: Number of bytes written=39
    		HDFS: Number of read operations=15
    		HDFS: Number of large read operations=0
    		HDFS: Number of write operations=6
    	Map-Reduce Framework
    		Map input records=172368
    		Map output records=229824
    		Map output bytes=2528064
    		Map output materialized bytes=44
    		Input split bytes=98
    		Combine input records=229824		// 这里说明使用了Combiner
    		Combine output records=3
    		Reduce input groups=3		// Reduce的工作量明显减少了
    		Reduce shuffle bytes=44
    		Reduce input records=3
    		Reduce output records=3
    		Spilled Records=6
    		Shuffled Maps =1
    		Failed Shuffles=0
    		Merged Map outputs=1
    		GC time elapsed (ms)=25
    		Total committed heap usage (bytes)=464519168
    	Shuffle Errors
    		BAD_ID=0
    		CONNECTION=0
    		IO_ERROR=0
    		WRONG_LENGTH=0
    		WRONG_MAP=0
    		WRONG_REDUCE=0
    	File Input Format Counters 
    		Bytes Read=1781134
    	File Output Format Counters 
    		Bytes Written=39
    执行成功
    
    MapDemo.java(和前面一样)
    package com.atSchool.MapReduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    
    public class MapDemo extends Mapper<LongWritable, Text, Text, IntWritable> {
    	private Text keyword = new Text();
    	private IntWritable keyvalue = new IntWritable(1);
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, IntWritable>.Context context)
    			throws IOException, InterruptedException {
    		// 1、获取一行
    		String line = value.toString();
    
    		// 2、截取
    		String[] split = line.split(" +");
    
    		// 3、输出到上下文对象中
    		for (String string : split) {
    			keyword.set(string);
    			context.write(keyword, keyvalue);
    		}
    	}
    }
    
    ReduceDemo.java(和前面一样)
    package com.atSchool.MapReduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class ReduceDemo extends Reducer<Text, IntWritable, Text, IntWritable> {
    	IntWritable val = new IntWritable();
    
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values,
    			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    		// 累加
    		int sum = 0;
    		for (IntWritable intWritable : values) {
    			sum += intWritable.get();
    		}
    
    		// 输出
    		val.set(sum);
    		context.write(key, val);
    	}
    }
    
    WordCombiner.java
    package com.atSchool.MapReduce;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class WordCombiner extends Reducer<Text, IntWritable, Text, IntWritable> {
    	IntWritable val = new IntWritable();
    
    	@Override
    	protected void reduce(Text key, Iterable<IntWritable> values,
    			Reducer<Text, IntWritable, Text, IntWritable>.Context context) throws IOException, InterruptedException {
    		// 累加
    		int sum = 0;
    		for (IntWritable intWritable : values) {
    			sum += intWritable.get();
    		}
    
    		// 输出
    		val.set(sum);
    		context.write(key, val);
    	}
    }
    
    WordCountJob.java
    package com.atSchool.MapReduce;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class WordCountJob extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new WordCountJob(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(WordCountJob.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(MapDemo.class);
    
    		job.setCombinerClass(WordCombiner.class);	// 此处指定要使用的Combiner即可
    
    		job.setReducerClass(ReduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(IntWritable.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(IntWritable.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/a.txt"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/out");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    5.13_倒排索引

    1. 索引的概念

    • 在关系数据库中,索引是一种单独的、物理的对数据库表中一列或多列的值进行排序的一种存储结构,它是某个表中一列或若干列值的集合 和 相应的指向表中物理标识这些值的数据页的逻辑指针清单。

    • 索引的作用相当于图书的目录,可以根据目录中的页码快速找到所需的内容。索引提供指向存储在表的指定列中的数据值的指针,然后根据您指定的排序顺序对这些指针排序。数据库使用索引以找到特定值,然后顺指针找到包含该值的行。这样可以使对应于表的SQL语句执行得更快,可快速访问数据库表中的特定信息。

    • 当表中有大量记录时,若要对表进行查询,第一种搜索信息方式是全表搜索,是将所有记录一一取出,和查询条件进行一一对比,然后返回满足条件的记录,这样做会消耗大量数据库系统时间,并造成大量磁盘I/0操作;

      第二种就是在表中建立索引,然后在索引中找到符合查询条件的索引值,最后通过保存在索引中的ROWID(相当于页码)快速找到表中对应的记录.


    2. 倒排索引

    正向索引结构:
    “文档1”的ID > 单词1:出现次数,出现位置列表;单词2:出现次数,出现位置列表;…………。
    “文档2”的ID > 此文档出现的关键词列表。
    

    一般是通过key,去找value当用户在主页上搜索关键词“华为手机”时,假设只存在正向索引(forward index),那么就需要扫描索引库中的所有文档,找出所有包含关键词“华为手机”的文档,再根据打分模型进行打分,排出名次后呈现给用户。因为互联网上收录在搜索引擎中的文档的数目是个天文数字,这样的索引结构根本无法满足实时返回排名结果的要求

    所以,搜索引擎会将正向索引重新构建为倒排索引,即把文件ID对应到关键词的映射转换为关键词到文件ID的映射,每个关键词都对应着一系列的文件,这些文件中都出现这个关键词.

    得到倒排索引的结构如下:
    “关键词1”:“文档1”的ID,“文档2”的ID.
    “关键词2”:带有此关键词的文档ID列表.
    

    从词的关键字,去找文档


    3. 制作倒排索引案例

    分析资料:
    • 链接:https://pan.baidu.com/s/1Vng4GW0J1qa9jC_-7pGE7Q
      提取码:y3sf
    需求:

    得到day01~03文件中关键字的倒排索引

    具体过程:
    -------------第一步Mapper的输出结果格式如下:--------------------
    context.wirte("love:day01.txt", "1")
    context.wirte("beijing:day01.txt", "1")
    context.wirte("love:day02.txt", "1")
    context.wirte("beijing:day01.txt", "1")
        
    -------------第二步Combiner的得到的输入数据格式如下:-------------
    <"love:day01.txt", {"1"}>
    <"beijing:day01.txt", {"1","1"}>
    <"love:day02.txt", {"1"}>
    
    -------------第二步Combiner的输出数据格式如下---------------------
    context.write("love", "day01.txt:1")
    context.write("beijing", "day01.txt:2")
    context.write("love", "day02.txt:1")
    
    -------------第三步Reducer得到的输入数据格式如下:-----------------
    <"love", {"day01.txt:1", "day02.txt:1"}>
    <"beijing", {"day01.txt:2"}>
        
    -------------第三步Reducer输出的数据格式如下:-----------------
    context.write("love", "day01.txt:1 day02.txt:2 ")  
    context.write("beijing", "day01.txt:2 ")
        
    最终结果为:
    love	day01.txt:1 day02.txt:2
    beijing day01.txt:2
    
    mapperDemo.java
    package com.atSchool.index;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.InputSplit;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    /**
     * 输出的形式:
     * context.wirte("love:day01.txt", "1")
     * context.wirte("beijing:day01.txt", "1")
     * context.wirte("love:day02.txt", "1")
     * context.wirte("beijing:day01.txt", "1")
     */
    public class mapperDemo extends Mapper<LongWritable, Text, Text, Text> {
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		// 分割
    		String[] split = value.toString().split(" +");
    
    		// 获取文件名
    		// 1、获取切片
    		InputSplit inputSplit = context.getInputSplit();
    		// 2、强转
    		FileSplit fSplit = (FileSplit) inputSplit;
    		// 3、得到文件路径再获取文件名
    		String name = fSplit.getPath().getName();
    
    		for (String string : split) {
    			context.write(new Text(string + ":" + name), new Text("1"));
    			System.out.println("mapper:" + "<" + string + ":" + name + "," + "1" + ">");
    		}
    	}
    }
    
    combinerDemo.java
    package com.atSchool.index;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * 输入的形式:
     * 	<"love:day01.txt", {"1"}>
     * 	<"beijing:day01.txt", {"1","1"}>
     * 	<"love:day02.txt", {"1"}>
     * 
     * 输出的形式:
     * 	context.write("love", "day01.txt:1")
     *  context.write("beijing", "day01.txt:2")
     *  context.write("love", "day02.txt:1")
     */
    public class combinerDemo extends Reducer<Text, Text, Text, Text> {
    	private Text outKey = new Text();
    	private Text outValue = new Text();
    
    	@Override
    	protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		// 处理value
    		int sum = 0;
    		for (Text text : value) {
    			int parseInt = Integer.parseInt(text.toString().trim());
    			sum += parseInt;
    		}
    
    		// 处理key
    		String[] split = key.toString().split(":");
    
    		// 输出
    		outKey.set(split[0].trim());
    		outValue.set(split[1].trim() + ":" + String.valueOf(sum));
    
    		context.write(outKey, outValue);
    		System.out
    				.println("combiner:" + "<" + split[0].trim() + "," + split[1].trim() + ":" + String.valueOf(sum) + ">");
    	}
    }
    
    reduceDemo.java
    package com.atSchool.index;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    /**
     * 输入格式:
     *  <"love", {"day01.txt:1", "day02.txt:1"}>
     *  <"beijing", {"day01.txt:2"}>
     * 
     * 输出格式:
     *  context.write("love", "day01.txt:1 day02.txt:2 ")  
     *  context.write("beijing", "day01.txt:2 ")
     */
    public class reduceDemo extends Reducer<Text, Text, Text, Text> {
    	// StringBuilder stringBuilder = new StringBuilder();
    	private Text outValue = new Text();
    
    	@Override
    	protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		/**
    		 * 这里为什么不能将stringBuilder定义在外面?
    		 * 因为MP程序在运行的时候,只会走reduce方法,并不是将这整个类走一遍。
    		 * 这里stringBuilder的append方法会一直将数据在末尾追加,不会覆盖之前的数据。
    		 * 使用Test中的set方法时会将之前的数据覆盖。
    		 */
    		StringBuilder stringBuilder = new StringBuilder();
    		/**
    		 * 迭代器为什么不能多次遍历?
    		 * 当调用next()时,返回当前索引(cursor)指向的元素,然后当前索引值(cursor)会+1,
    		 * 当所有元素遍历完,cursor == Collection.size(),
    		 * 此时再使用while(Iterator.hasNext())做循环条件时,返回的是false,无法进行下次遍历,
    		 * 如果需要多次使用Iterator进行遍历,当一次遍历完成,需要重新初始化Collection的iterator()。
    		 */
    		// for (Text text : value) {
    		// System.out.println("reduce输入的value:" + text.toString());
    		// }
    
    		for (Text text : value) {
    			System.out.println("reduce输入的value:" + text.toString());
    			stringBuilder.append(" " + text.toString());
    		}
    
    		// 输出
    		outValue.set(stringBuilder.toString());
    		context.write(key, outValue);
    		System.out.println("reduce:" + "<" + key.toString() + "," + stringBuilder.toString() + ">");
    	}
    }
    
    jobDemo.java
    package com.atSchool.index;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class jobDemo extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new jobDemo(), null);
    	}
    	
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(jobDemo.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(mapperDemo.class);
    		job.setCombinerClass(combinerDemo.class);
    		job.setReducerClass(reduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/index_source"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    
    运行结果
    and	 day01.txt:1
    beijing	 day01.txt:1 day03.txt:1
    capital	 day03.txt:1
    china	 day03.txt:1 day02.txt:1
    i	 day02.txt:1 day01.txt:2
    is	 day03.txt:1
    love	 day01.txt:2 day02.txt:1
    of	 day03.txt:1
    shanghai	 day01.txt:1
    the	 day03.txt:1
    

    4. 统计辩论词频案例

    分析资料:

    • 链接:https://pan.baidu.com/s/1XYZpOWRvxEyi1Chlv8CQRg
      提取码:lvpm

    需求:

    • 统计资料中的speaker所说的话中单词出现的频率,以 单词 文件名:speaker:出次数 的形式输出。
    mapperDemo.java
    package com.atSchool.speak;
    
    import java.io.IOException;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.lib.input.FileSplit;
    
    /**
     * 统计辩论赛的词频
     * 
     * 文件内容
     * "Line","Speaker","Text","Date"
     * 1,"Holt","Good evening from Hofstra University in Hempstead, New York. I'm Lester Holt, anchor of ""NBC Nightly News."" I want to welcome you to the first presidential debate. The participants tonight are Donald Trump and Hillary Clinton. This debate is sponsored by the Commission on Presidential Debates, a nonpartisan, nonprofit organization. The commission drafted tonight's format, and the rules have been agreed to by the campaigns. The 90-minute debate is divided into six segments, each 15 minutes long. We'll explore three topic areas tonight: Achieving prosperity; America's direction; and securing America. At the start of each segment, I will ask the same lead-off question to both candidates, and they will each have up to two minutes to respond. From that point until the end of the segment, we'll have an open discussion. The questions are mine and have not been shared with the commission or the campaigns. The audience here in the room has agreed to remain silent so that we can focus on what the candidates are saying. I will invite you to applaud, however, at this moment, as we welcome the candidates: Democratic nominee for president of the United States, Hillary Clinton, and Republican nominee for president of the United States, Donald J. Trump.","9/26/16"
     * 2,"Audience","(APPLAUSE)","9/26/16"
     * 3,"Clinton","How are you, Donald?","9/26/16"
     * 4,"Audience","(APPLAUSE)","9/26/16"
     * 5,"Holt","Good luck to you.","9/26/16"
     */
    public class mapperDemo extends Mapper<LongWritable, Text, Text, Text> {
    	private Text outKey = new Text();
    	private Text outValue = new Text();
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		// 获取按指定规则分割后的一行
    		String[] split = value.toString().split(",");
    
    		if (split.length == 4 && Character.isDigit(split[0].charAt(0))) {
    			String speaker = split[1]; // 辩论者
    			String text = split[2]; // 辩论内容
    
    			// 对辩论内容再进行分割
    			/**
    			 * 字符串StringTokenizer类允许应用程序将字符串拆分成令牌。 
    			 * StringTokenizer方法不区分标识符,数字和引用的字符串,也不识别和跳过注释。 
    			 * 可以在创建时或每个令牌的基础上指定一组分隔符(分隔标记的字符)。 
    			 */
    			StringTokenizer stringTokenizer = new StringTokenizer(text, " (),.?!--\"\"\n#");
    
    			// 获取文件名
    			String fileName = ((FileSplit) context.getInputSplit()).getPath().getName();
    
    			// 遍历每一个单词
    			while (stringTokenizer.hasMoreElements()) {
    				String nextToken = stringTokenizer.nextToken();
    				outKey.set(nextToken + ":" + fileName + ":" + speaker);
    				outValue.set("1");
    				context.write(outKey, outValue);
    			}
    		}
    	}
    }
    
    combinerDemo.java
    package com.atSchool.speak;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class combinerDemo extends Reducer<Text, Text, Text, Text> {
    	@Override
    	protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		// 合并
    		int sum = 0;
    		for (Text text : value) {
    			Integer valueOf = Integer.valueOf(text.toString());
    			sum += valueOf;
    		}
    
    		// 改变kv重新输出
    		String[] split = key.toString().split(":");
    		String outKey = split[0]; // 单词
    		String outValue = split[1] + ":" + split[2] + ":" + String.valueOf(sum); // 文件名:辩论者:出现次数
    
    		context.write(new Text(outKey), new Text(outValue));
    	}
    }
    
    reduceDemo.java
    package com.atSchool.speak;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class reduceDemo extends Reducer<Text, Text, Text, Text> {
    	@Override
    	protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		StringBuffer stringBuffer = new StringBuffer();
    		for (Text text : value) {
    			stringBuffer.append(text.toString() + "\t");
    		}
    
    		context.write(key, new Text(stringBuffer.toString()));
    	}
    }
    
    jobDemo.java
    package com.atSchool.speak;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class jobDemo extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new jobDemo(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(jobDemo.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(mapperDemo.class);
    		job.setCombinerClass(combinerDemo.class);
    		job.setReducerClass(reduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/美国两党辩论关键词"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    
    }
    

    5.14_计算共同好友案例

    分析资料:

    A:B,C,D,F,E,O
    B:A,C,E,K
    C:F,A,D,I
    D:A,E,F,L
    E:B,C,D,M,L
    F:A,B,C,D,E,O,M
    G:A,C,D,E,F
    H:A,C,D,E,O
    I:A,O
    J:B,O
    K:A,C,D
    L:D,E,F
    M:E,F,G
    O:A,H,I,J
    

    mapperDemo.java

    package com.atSchool.friend;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * 统计两个人的共同好友
     * 
     * 原数据:
     * A:B,C,D,F,E,O
     * B:A,C,E,K
     * C:F,A,D,I
     * D:A,E,F,L
     * 
     * 性质:好友双方应当都有对方名字,不会存在单一的情况
     * 所以A中有B,即B中有A
     */
    public class mapperDemo extends Mapper<LongWritable, Text, Text, Text> {
    	private Text outKey = new Text();
    	private Text outValue = new Text();
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		String[] split = value.toString().trim().split(":");
    
    		// 获取好友列表
    		String[] split2 = split[1].split(",");
    		
    		outValue.set(split[0]);		// 当前用户
    		
    		// 遍历好友列表
    		for (String string : split2) {
    			/**
    			 * 输出:
    			 * B A
    			 * C A
    			 * D A
    			 * F A
    			 * E A
    			 * O A
    			 * A B
    			 * C B
    			 * E B
    			 * K B
    			 */
    			outKey.set(string);
    			context.write(outKey, outValue);
    			System.out.println("mapper:" + string + "\t" + split[0]);
    		}
    	}
    }
    

    reduceDemo.java

    package com.atSchool.friend;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class reduceDemo extends Reducer<Text, Text, Text, Text> {
    	private Text outValue = new Text();
    
    	@Override
    	protected void reduce(Text key, Iterable<Text> value, Reducer<Text, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		StringBuilder stringBuilder = new StringBuilder();
    		for (Text text : value) {
    			stringBuilder.append(text.toString() + ",");
    		}
    
    		outValue.set(stringBuilder.toString());
    		context.write(key, outValue);
    
    		System.out.println("reduce-in-out:" + key.toString() + "\t" + stringBuilder.toString());
    	}
    }
    

    jobDemo.java

    package com.atSchool.friend;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class jobDemo extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new jobDemo(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(jobDemo.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(mapperDemo.class);
    		job.setReducerClass(reduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/friend.txt"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    
    }
    

    mapperDemo2.java

    package com.atSchool.friend;
    
    import java.io.IOException;
    import java.util.Arrays;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class mapperDemo2 extends Mapper<LongWritable, Text, Text, Text> {
    	private Text outKey = new Text();
    	private Text outValue = new Text();
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, Text>.Context context)
    			throws IOException, InterruptedException {
    		// 输出的value
    		String[] split = value.toString().split("\t");
    		outValue.set(split[0]);
    
    		// 输出的key
    		String[] split2 = split[1].split(",");
    		Arrays.sort(split2);
    		for (int i = 0; i < split2.length; i++) {
    			for (int j = i + 1; j < split2.length; j++) {
    				outKey.set(split2[i] + split2[j]);
    				/**
    				 * 输出:
    				 * BC	A
    				 * BD	A
    				 * BF	A
    				 * BG	A
    				 * BH	A
    				 * BI	A
    				 * BK	A
    				 * BO	A
    				 * CD	A
    				 * CF	A
    				 * CG	A
    				 * CH	A
    				 * CI	A
    				 * CK	A
    				 * CO	A
    				 */
    				context.write(outKey, outValue);
    				System.out.println("mapper-out:" + outKey.toString() + "\t" + outValue.toString());
    			}
    		}
    	}
    }
    

    jobDemo2.java

    package com.atSchool.friend;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class jobDemo2 extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new jobDemo2(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(jobDemo2.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(mapperDemo2.class);
    		job.setReducerClass(reduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(Text.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/MapReduceOut/part-r-00000"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/out");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    
    }
    



    5.15_自定义分区

    1. 自定义Partitioner步骤

    (1)自定义类继承Partitioner,重写getPartition()方法

    (2)在job驱动中,设置自定义partitioner:

    job.setPartitionerClass(CustomPartitioner.class);
    

    (3)自定义partition后,要根据自定义Partitioner的逻辑设置相应数量的reducetask

    job.setNumReduceTasks(5);
    

    注意:

    reduceTask的个数决定了有几个文件!!

    如果reduceTask的数量 > getPartition的结果数,则会多产生几个空的输出文件part-r-000xx;

    如果1 < reduceTask的数量 < getPartition的结果数,则有一部分 分区数据无处安放,会报Exception;

    如果reduceTask的数量=1,则不管mapTask端输出多少个分区文件,最终结果都交给这一个reduceTask,最终也就只会产生一个结果文件part–00000;


    2. 统计手机流量并按手机号划分案例

    分析资料:

    1363157985066 	13726230503	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
    1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
    1363157991076 	13926435656	20-10-7A-28-CC-0A:CMCC	120.196.100.99			2	4	132	1512	200
    1363154400022 	13926251106	5C-0E-8B-8B-B1-50:CMCC	120.197.40.4			4	0	240	0	200
    1363157993044 	18211575961	94-71-AC-CD-E6-18:CMCC-EASY	120.196.100.99	iface.qiyi.com	视频网站	15	12	1527	2106	200
    1363157995074 	84138413	5C-0E-8B-8C-E8-20:7DaysInn	120.197.40.4	122.72.52.12		20	16	4116	1432	200
    1363157993055 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
    1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	20	20	3156	2936	200
    1363157983019 	13719199419	68-A1-B7-03-07-B1:CMCC-EASY	120.196.100.82			4	0	240	0	200
    1363157984041 	13660577991	5C-0E-8B-92-5C-20:CMCC-EASY	120.197.40.4	s19.cnzz.com	站点统计	24	9	6960	690	200
    1363157973098 	15013685858	5C-0E-8B-C7-F7-90:CMCC	120.197.40.4	rank.ie.sogou.com	搜索引擎	28	27	3659	3538	200
    1363157986029 	15989002119	E8-99-C4-4E-93-E0:CMCC-EASY	120.196.100.99	www.umeng.com	站点统计	3	3	1938	180	200
    1363157992093 	13560439658	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			15	9	918	4938	200
    1363157986041 	13480253104	5C-0E-8B-C7-FC-80:CMCC-EASY	120.197.40.4			3	3	180	180	200
    1363157984040 	13602846565	5C-0E-8B-8B-B6-00:CMCC	120.197.40.4	2052.flash2-http.qq.com	综合门户	15	12	1938	2910	200
    1363157995093 	13922314466	00-FD-07-A2-EC-BA:CMCC	120.196.100.82	img.qfc.cn		12	12	3008	3720	200
    1363157982040 	13502468823	5C-0A-5B-6A-0B-D4:CMCC-EASY	120.196.100.99	y0.ifengimg.com	综合门户	57	102	7335	110349	200
    1363157986072 	18320173382	84-25-DB-4F-10-1A:CMCC-EASY	120.196.100.99	input.shouji.sogou.com	搜索引擎	21	18	9531	2412	200
    1363157990043 	13925057413	00-1F-64-E1-E6-9A:CMCC	120.196.100.55	t3.baidu.com	搜索引擎	69	63	11058	48243	200
    1363157988072 	13760778710	00-FD-07-A4-7B-08:CMCC	120.196.100.82			2	2	120	120	200
    1363157985066 	13560436666	00-FD-07-A4-72-B8:CMCC	120.196.100.82	i02.c.aliimg.com		24	27	2481	24681	200
    1363157993055 	13560436666	C4-17-FE-BA-DE-D9:CMCC	120.196.100.99			18	15	1116	954	200
    
    // 说明:(注意不一定每一条数据都有如下项,所以得注意)
    1363157995033 	15920133257	5C-0E-8B-C7-BA-20:CMCC	120.197.40.4	sug.so.360.cn	信息安全	  20	         20	       3156	    2936	    200
    记录报告时间戳		手机号码		AP mac					AC mac		   访问的网址	  网址种类  上行数据包个数	  下行数据包个数  上行流量  下行流量 HTTP Response的状态
    
    mapperDemo.java
    package com.atSchool.partitioner;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class mapperDemo extends Mapper<LongWritable, Text, Text, LongWritable> {
    	private Text outKey = new Text();
    	private LongWritable outValue = new LongWritable();
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, LongWritable>.Context context)
    			throws IOException, InterruptedException {
    		// 读取一行
    		String line = value.toString();
    
    		// 分割
    		String[] split = line.split("\t+");
    
    		// 输出到上下文对象中
    		String tel = split[1].trim();
    		String upFlow = split[split.length - 3].trim();
    		String downFlow = split[split.length - 2].trim();
    
    		outKey.set(tel);
    		outValue.set(Long.parseLong(upFlow) + Long.parseLong(downFlow));
    
    		context.write(outKey, outValue);
    	}
    }
    
    partitionerDemo.java
    package com.atSchool.partitioner;
    
    import java.util.HashMap;
    import java.util.Map;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class partitionerDemo extends Partitioner<Text, LongWritable> {
    	// 初始化分区
    	private static Map<String, Integer> maps = new HashMap<>();
    	static {
    		maps.put("135", 0);
    		maps.put("137", 1);
    		maps.put("138", 2);
    		maps.put("139", 3);
    	}
    
    	// 如果不是上面分区的任意一种,则放到第5个分区
    	@Override
    	public int getPartition(Text key, LongWritable value, int numPartitions) {
    		String substring = key.toString().substring(0, 3);
    		if (maps.get(substring) == null) {
    			return 4;
    		} else {
    			return maps.get(substring);
    		}
    	}
    }
    
    reduceDemo.java
    package com.atSchool.partitioner;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class reduceDemo extends Reducer<Text, LongWritable, Text, LongWritable> {
    	@Override
    	protected void reduce(Text key, Iterable<LongWritable> value,
    			Reducer<Text, LongWritable, Text, LongWritable>.Context context) throws IOException, InterruptedException {
    		long sum = 0;
    		for (LongWritable longWritable : value) {
    			sum += longWritable.get();
    		}
    
    		context.write(key, new LongWritable(sum));
    	}
    }
    
    jobDemo.java
    package com.atSchool.partitioner;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class jobDemo extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new jobDemo(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(jobDemo.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(mapperDemo.class);
    
    		job.setPartitionerClass(partitionerDemo.class); // 设置自定义partitioner
    
    		job.setReducerClass(reduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(LongWritable.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(LongWritable.class);
    
    		job.setNumReduceTasks(5); // 根据自定义Partitioner的逻辑设置相应数量的reducetask
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/phone_traffic.txt"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    5.16_MapReduce综合应用

    统计分析学生课程与成绩案例

    分析资料:

    computer,huangxilaoming,78,90,80,76,87
    computer,huangbo,65,60,0,75,77
    computer,xuzheng,50,60,60,40,80,90,100
    computer,wangbaoqiang,57,87,98,87,54,65,32,21
    java,liuyifei,50,40,90,50,80,70,60,50,40
    java,xuezhiqian,50,40,60,70,80,90,90,50
    java,wanghan,88,99,88, 99,77,55
    java,huangzitao,90,50,80,70,60,50,40
    java,huangbo,70,80,90,90,50
    java,xuzheng,99,88,99,77,55
    scala,wangzulan,50,60,70,80,90,40,50,60
    scala,dengchao,60,50,90,60,40,50,60,70,50
    scala,zhouqi,60,50,40,90,80,70,80,90
    scala,liuqian,80,90,40,50,60
    scala,liutao,60,40,50,60,70,50
    scala,zhourunfa,40,90,80,70,80,90
    mysql,liutao,50,60,70,80,90,100,65,60
    mysql,liuqian,50,60,50,70,80,90,50,60,40
    mysql,zhourunfa,80,50,60,90,70,50,60,40,50
    

    数据说明:

    数据解释数据字段个数不固定:第一个是课程名称,总共四个课程,computer,math,english,algorithm,第二个是学生姓名,后面是每次考试的分数,但是每个学生在某门课程中的考试次数不固定


    1.统计每门课程的参考人数和课程平均分

    mapperDemo.java
    package com.atSchool.partitioner2;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.FloatWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * 统计分析学生课程与成绩案例
     */
    public class mapperDemo extends Mapper<LongWritable, Text, Text, FloatWritable> {
    	private Text outKey = new Text();
    	private FloatWritable outValue = new FloatWritable();
    
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, Text, FloatWritable>.Context context)
    			throws IOException, InterruptedException {
    		// 读取一行+分割
    		String[] split = value.toString().trim().split(",");
    
    		// 输出到上下文对象中
    		String courseName = split[0].trim(); // 课程名
    		float sum = 0;
    		float index = 0;
    		for (int i = 2; i < split.length; i++) {
    			sum += Float.parseFloat(split[i].trim());
    			index++;
    		}
    		float averageScore = sum / index; // 学生平均分
    
    		outKey.set(courseName);
    		outValue.set(averageScore);
    
    		context.write(outKey, outValue);
    	}
    }
    
    reduceDemo.java
    package com.atSchool.partitioner2;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.FloatWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class reduceDemo extends Reducer<Text, FloatWritable, Text, Text> {
    	@Override
    	protected void reduce(Text key, Iterable<FloatWritable> value,
    			Reducer<Text, FloatWritable, Text, Text>.Context context) throws IOException, InterruptedException {
    		float sum = 0;
    		float index = 0;
    		for (FloatWritable floatWritable : value) {
    			sum += floatWritable.get();
    			index++;
    		}
    
    		float averageScore = sum / index;
    		context.write(key, new Text("考试人数:" + index + "\t" + "课程考试平均成绩:" + averageScore));
    	}
    }
    
    jobDemo.java
    package com.atSchool.partitioner2;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.FloatWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class jobDemo extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new jobDemo(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(jobDemo.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(mapperDemo.class);
    		job.setReducerClass(reduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(Text.class);
    		job.setMapOutputValueClass(FloatWritable.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(Text.class);
    		job.setOutputValueClass(Text.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/student.txt"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    输出结果:

    computer	考试人数:4.0	课程考试平均成绩:67.19911
    java	考试人数:6.0	课程考试平均成绩:71.98823
    mysql	考试人数:3.0	课程考试平均成绩:64.69907
    scala	考试人数:6.0	课程考试平均成绩:64.23148
    

    2.统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,并且按平均分从高到低排序,分数保留一位小数

    beanDemo.java
    package com.atSchool.studentDemo2;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    import org.apache.hadoop.io.WritableComparable;
    
    public class beanDemo implements WritableComparable<beanDemo> {
    	private String courseName; // 课程名称
    	private String stuName; // 学生姓名
    	private float avgScore; // 平均成绩
    
    	// 构造方法
    	public beanDemo() {
    		super();
    	}
    
    	public beanDemo(String courseName, String stuName, float avgScore) {
    		super();
    		this.courseName = courseName;
    		this.stuName = stuName;
    		this.avgScore = avgScore;
    	}
    
    	// getter/setter方法
    	public String getCourseName() {
    		return courseName;
    	}
    
    	public void setCourseName(String courseName) {
    		this.courseName = courseName;
    	}
    
    	public String getStuName() {
    		return stuName;
    	}
    
    	public void setStuName(String stuName) {
    		this.stuName = stuName;
    	}
    
    	public float getAvgScore() {
    		return avgScore;
    	}
    
    	public void setAvgScore(float avgScore) {
    		this.avgScore = avgScore;
    	}
    
    	// 重写方法
    	@Override
    	public void write(DataOutput out) throws IOException {
    		out.writeUTF(courseName);
    		out.writeUTF(stuName);
    		out.writeFloat(avgScore);
    	}
    
    	@Override
    	public void readFields(DataInput in) throws IOException {
    		courseName = in.readUTF();
    		stuName = in.readUTF();
    		avgScore = in.readFloat();
    	}
    
    	/**
    	 * 返回一个负整数、零或正整数,因为此对象小于、等于或大于指定对象。
    	 * 
    	 * o1.compareTo(o2);
    	 * 返回正数,比较对象(compareTo传参对象o2)放在 当前对象(调用compareTo方法的对象o1)的前面
    	 * 返回负数,放在后面
    	 */
    	@Override
    	public int compareTo(beanDemo o) {
    		// 判断是不是同一个课程
    		int compareTo = o.courseName.compareTo(this.courseName);
    		// 如果是同一个课程
    		if (compareTo == 0) {
    			// 如果比较的对象比当前对象小,就返回正数,比较对象放在当前对象的后面
    			return avgScore > o.avgScore ? -1 : 1;
    		}
    		return compareTo > 0 ? 1 : -1;
    	}
    
    	@Override
    	public String toString() {
    		return "courseName=" + courseName + ", stuName=" + stuName + ", avgScore=" + avgScore;
    	}
    }
    
    getPartition.java
    package com.atSchool.studentDemo2;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Partitioner;
    
    public class partitionerDemo extends Partitioner<beanDemo, NullWritable> {
    	@Override
    	public int getPartition(beanDemo key, NullWritable value, int numPartitions) {
    		String courseName = key.getCourseName();
    		if (courseName.equals("java")) {
    			return 0;
    		} else if (courseName.equals("computer")) {
    			return 1;
    		} else if (courseName.equals("scala")) {
    			return 2;
    		} else if (courseName.equals("mysql")) {
    			return 3;
    		} else {
    			return 4;
    		}
    	}
    }
    
    mapperDemo.java
    package com.atSchool.studentDemo2;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.FloatWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * 统计每门课程参考学生的平均分,并且按课程存入不同的结果文件,要求一门课程一个结果文件,
     * 并且按平均分从高到低排序,分数保留一位小数
     */
    public class mapperDemo extends Mapper<LongWritable, Text, beanDemo, NullWritable> {
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, beanDemo, NullWritable>.Context context)
    			throws IOException, InterruptedException {
    		// 读取一行+分割
    		String[] split = value.toString().trim().split(",");
    
    		// 输出到上下文对象中
    		String courseName = split[0].trim(); // 课程名
    		String stuName = split[1].trim();
    		float sum = 0;
    		float index = 0;
    		for (int i = 2; i < split.length; i++) {
    			sum += Float.parseFloat(split[i].trim());
    			index++;
    		}
            String format = String.format("%.1f", sum / index); // 学生平均分,保留一位小数
    
    		beanDemo beanDemo = new beanDemo(courseName, stuName, Float.parseFloat(format););
    		context.write(beanDemo, NullWritable.get());
    	}
    }
    
    jobDemo.java
    package com.atSchool.studentDemo2;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.FloatWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class jobDemo extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new jobDemo(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(jobDemo.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(mapperDemo.class);
    
    		job.setPartitionerClass(partitionerDemo.class); // 设置自定义partitioner
    		job.setNumReduceTasks(5);	// 根据自定义Partitioner的逻辑设置相应数量的reducetask
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(beanDemo.class);
    		job.setMapOutputValueClass(NullWritable.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(beanDemo.class);
    		job.setOutputValueClass(NullWritable.class);
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/student.txt"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    3.求出每门课程参考学生成绩排名前3的学生的信息:课程,姓名和平均分(分组)

    groupDemo.java
    package com.atSchool.studentDemo3;
    
    import org.apache.hadoop.io.WritableComparable;
    import org.apache.hadoop.io.WritableComparator;
    
    /**
     * 分组
     */
    public class groupDemo extends WritableComparator {
    	public groupDemo() {
    		super(beanDemo.class, true);
    	}
    
    	@Override
    	public int compare(WritableComparable a, WritableComparable b) {
    		beanDemo s1 = (beanDemo) a;
    		beanDemo s2 = (beanDemo) b;
    
    		return s1.getCourseName().compareTo(s2.getCourseName());
    	}
    
    }
    
    mapperDemo.java
    package com.atSchool.studentDemo3;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.FloatWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    /**
     * 求出每门课程参考学生成绩排名前3的学生的信息:课程,姓名和平均分
     */
    public class mapperDemo extends Mapper<LongWritable, Text, beanDemo, NullWritable> {
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, beanDemo, NullWritable>.Context context)
    			throws IOException, InterruptedException {
    		// 读取一行+分割
    		String[] split = value.toString().trim().split(",");
    
    		// 输出到上下文对象中
    		String courseName = split[0].trim(); // 课程名
    		String stuName = split[1].trim();	// 学生姓名
    		float sum = 0;
    		float index = 0;
    		for (int i = 2; i < split.length; i++) {
    			sum += Float.parseFloat(split[i].trim());
    			index++;
    		}
    		float averageScore = sum / index; // 学生平均分
    
    		beanDemo beanDemo = new beanDemo(courseName, stuName, averageScore);
    		context.write(beanDemo, NullWritable.get());
    	}
    }
    
    reduceDemo.java
    package com.atSchool.studentDemo3;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class reduceDemo extends Reducer<beanDemo, NullWritable, beanDemo, NullWritable> {
    	@Override
    	protected void reduce(beanDemo key, Iterable<NullWritable> value,
    			Reducer<beanDemo, NullWritable, beanDemo, NullWritable>.Context context)
    			throws IOException, InterruptedException {
    		int count = 0;
    		for (NullWritable nullWritable : value) {
    			if (count < 3) {
    				context.write(key, nullWritable);
    				count++;
    			} else {
    				break;
    			}
    		}
    	}
    }
    
    jobDemo.java
    package com.atSchool.studentDemo3;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.FloatWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class jobDemo extends Configured implements Tool {
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new jobDemo(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		// 获取Job
    		Configuration configuration = new Configuration();
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    		Job job = Job.getInstance(configuration);
    
    		// 设置需要运行的任务
    		job.setJarByClass(jobDemo.class);
    
    		// 告诉job Map和Reduce在哪
    		job.setMapperClass(mapperDemo.class);
    		job.setReducerClass(reduceDemo.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(beanDemo.class);
    		job.setMapOutputValueClass(NullWritable.class);
    
    		// 告诉job Reduce输出的key和value的数据类型的是什么
    		job.setOutputKeyClass(beanDemo.class);
    		job.setOutputValueClass(NullWritable.class);
    
    		job.setGroupingComparatorClass(groupDemo.class);	// 指定分组的规则
    
    		// 告诉job输入和输出的路径
    		FileInputFormat.addInputPath(job, new Path("/student.txt"));
    		/**
    		 * 因为输出的文件不允许存在,所以需要处理一下
    		 */
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 提交任务
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    



    5.17_MapReduce+MySQL

    1. Java与MySql数据类型对照表

    在这里插入图片描述


    2. MapReduce读取MySQL中的数据案例

    分析数据库:

    • 链接:https://pan.baidu.com/s/1td4NDtn3xaENAhQrF-F5Ow
      提取码:lc6c
    Shopping.java
    package com.atSchool.db.MR;
    
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    
    public class Shopping implements DBWritable {
    	private int id; // 商品id
    	private String name; // 商品名称
    	private String subtitle; // 商品副标题
    	private float price; // 商品价格
    	private int stock; // 库存数量
    
    	public Shopping() {}
    	
    	public Shopping(int id, String name, String subtitle, float price, int stock) {
    		super();
    		this.id = id;
    		this.name = name;
    		this.subtitle = subtitle;
    		this.price = price;
    		this.stock = stock;
    	}
    
    	public int getId() {
    		return id;
    	}
    
    	public void setId(int id) {
    		this.id = id;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public String getSubtitle() {
    		return subtitle;
    	}
    
    	public void setSubtitle(String subtitle) {
    		this.subtitle = subtitle;
    	}
    
    	public float getPrice() {
    		return price;
    	}
    
    	public void setPrice(float price) {
    		this.price = price;
    	}
    
    	public int getStock() {
    		return stock;
    	}
    
    	public void setStock(int stock) {
    		this.stock = stock;
    	}
    
        // PreparedStatement在JDBC中用来存储已经预编译好了的sql语句
    	@Override
    	public void write(PreparedStatement statement) throws SQLException {
    		statement.setInt(1, id);
    		statement.setString(2, name);
    		statement.setString(3, subtitle);
    		statement.setFloat(4, price);
    		statement.setInt(5, stock);
    	}
    
        // ResultSet在JDBC中是存储结果集的对象
    	@Override
    	public void readFields(ResultSet resultSet) throws SQLException {
    		this.id = resultSet.getInt(1);
    		this.name = resultSet.getString(2);
    		this.subtitle = resultSet.getString(3);
    		this.price = resultSet.getFloat(4);
    		this.stock = resultSet.getInt(5);
    	}
    
    	@Override
    	public String toString() {
    		return "id=" + id + ", name=" + name + ", subtitle=" + subtitle + ", price=" + price + ", stock=" + stock;
    	}
    }
    
    MapperDemo.java
    package com.atSchool.db.MR;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class MapperDemo extends Mapper<LongWritable, Shopping, NullWritable, Shopping> {
    	@Override
    	protected void map(LongWritable key, Shopping value,
    			Mapper<LongWritable, Shopping, NullWritable, Shopping>.Context context)
    			throws IOException, InterruptedException {
    		context.write(NullWritable.get(), value);
    	}
    }
    
    JobDemo.java
    package com.atSchool.db.MR;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    import com.atSchool.utils.HDFSUtils;
    
    public class JobDemo extends Configured implements Tool {
    	private String className = "com.mysql.cj.jdbc.Driver";
    	private String url = "jdbc:mysql:// 127.0.0.1:3306/shopping?&useSSL=false&serverTimezone=UTC&rewriteBatchedStatements=true";
    	private String user = "root";
    	private String password = "password";
    
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new JobDemo(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		/**
    		 * 获取job:一个工作对象
    		 */
    		// 创建一个 配置 对象
    		Configuration configuration = new Configuration();
    
    		// 设置 name属性 的值。
    		// 如果名称已弃用或有一个弃用的名称与之关联,它会将值设置为两个名称。
    		// 名称将在配置前进行修剪。
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    
    		// 在configuration中设置数据库访问相关字段。
    		DBConfiguration.configureDB(configuration, className, url, user, password);
    
    		// 根据配置文件创建一个job
    		Job job = Job.getInstance(configuration);
    
    		/**
    		 * 设置job
    		 */
    		// sql语句
    		String sql_1 = "SELECT id,name,subtitle,price,stock FROM neuedu_product WHERE price>1999";
    		String sql_2 = "SELECT COUNT(id) FROM neuedu_product WHERE price>1999";
    		/**
    		 * public static void setInput(Job job, Class<? extends DBWritable> inputClass, String inputQuery, String inputCountQuery)
    		 * 使用适当的输入设置初始化作业的映射-部分。
    		 * 参数:
    		 * 		job-The map-reduce job
    		 * 		inputClass-实现DBWritable的类对象,它是保存元组字段的Java对象。
    		 * 		inputQuery-选择字段的输入查询。示例:"SELECT f1, f2, f3 FROM Mytable ORDER BY f1"
    		 * 		inputCountQuery-返回表中记录数的输入查询。示例:"SELECT COUNT(f1) FROM Mytable"
    		 */
    		DBInputFormat.setInput(job, Shopping.class, sql_1, sql_2);
    
    		// 通过查找给定类的来源来设置Jar。
    		job.setJarByClass(JobDemo.class);
    
    		// 给 job 设置 Map和Reduce
    		job.setMapperClass(MapperDemo.class);
    		job.setNumReduceTasks(0); // 这里用为没有用到reduce所以设置为0
    
    		// 给 job 设置InputFormat
    		// InputFormat:描述 Map-Reduce job 的输入规范
    		// DBInputFormat:从一个SQL表中读取输入数据的输入格式。
    		job.setInputFormatClass(DBInputFormat.class);
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(NullWritable.class);
    		job.setMapOutputValueClass(Shopping.class);
    
    		/**
    		 * 设置输出路径
    		 */
    		// 因为输出的文件不允许存在,所以需要处理一下
    		FileSystem fileSystem = HDFSUtils.getFileSystem();
    		Path path = new Path("/MapReduceOut");
    		if (fileSystem.exists(path)) {
    			fileSystem.delete(path, true);
    			System.out.println("删除成功");
    		}
    
    		// 设置map-reduce job的输出目录
    		FileOutputFormat.setOutputPath(job, path);
    
    		// 将job提交到集群并等待它完成。
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    

    3. MapReduce写出数据到MySQL

    注意:写出的时候,写出的表在数据库中要事先建好

    User.java
    package com.atSchool.db.outmysql;
    
    import java.sql.PreparedStatement;
    import java.sql.ResultSet;
    import java.sql.SQLException;
    
    import org.apache.hadoop.mapreduce.lib.db.DBWritable;
    
    public class User implements DBWritable {
    	private int id;
    	private String name;
    	private String password;
    
    	public User() {}
    	
    	public User(int id, String name, String password) {
    		super();
    		this.id = id;
    		this.name = name;
    		this.password = password;
    	}
    
    	public int getId() {
    		return id;
    	}
    
    	public void setId(int id) {
    		this.id = id;
    	}
    
    	public String getName() {
    		return name;
    	}
    
    	public void setName(String name) {
    		this.name = name;
    	}
    
    	public String getPassword() {
    		return password;
    	}
    
    	public void setPassword(String password) {
    		this.password = password;
    	}
    
    	@Override
    	public void write(PreparedStatement statement) throws SQLException {
    		statement.setInt(1, id);
    		statement.setString(2, name);
    		statement.setString(3, password);
    	}
    
    	@Override
    	public void readFields(ResultSet resultSet) throws SQLException {
    		this.id = resultSet.getInt(1);
    		this.name = resultSet.getString(2);
    		this.password = resultSet.getString(3);
    	}
    
    	@Override
    	public String toString() {
    		return "id=" + id + ", name=" + name + ", password=" + password;
    	}
    }
    
    MapperDemo.java
    package com.atSchool.db.outmysql;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    public class MapperDemo extends Mapper<LongWritable, Text, User, NullWritable> {
    	@Override
    	protected void map(LongWritable key, Text value, Mapper<LongWritable, Text, User, NullWritable>.Context context)
    			throws IOException, InterruptedException {
    		String[] split = value.toString().split(",");
    		User user = new User(Integer.parseInt(split[0]), split[1], split[2]);
    		context.write(user, NullWritable.get());
    		System.out.println("mapper-out:" + user.toString());
    	}
    }
    
    JobDemo.java
    package com.atSchool.db.outmysql;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.conf.Configured;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.db.DBConfiguration;
    import org.apache.hadoop.mapreduce.lib.db.DBOutputFormat;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.input.TextInputFormat;
    import org.apache.hadoop.util.Tool;
    import org.apache.hadoop.util.ToolRunner;
    
    public class JobDemo extends Configured implements Tool {
    	private String className = "com.mysql.cj.jdbc.Driver";
    	/**
    	 * rewriteBatchedStatements=true
    	 * 说明:
    	 * 		MySQL Jdbc驱动在默认情况下会无视executeBatch()语句,把我们期望批量执行的一组sql语句拆散,一条一条地发给MySQL数据库,直接造成较低的性能。
    	 * 		而把rewriteBatchedStatements参数置为true, 驱动就会批量执行SQL
    	 * 注意:
    	 * 		在这里就不能将该属性设置成true了,这样会和mapreduce中形成冲突
    	 */
    	private String url = "jdbc:mysql://127.0.0.1:3306/shopping?&charactercEncoding=utf-8&useSSL=false&serverTimezone=UTC";
    	private String user = "root";
    	private String password = "password";
    
    	public static void main(String[] args) throws Exception {
    		new ToolRunner().run(new JobDemo(), null);
    	}
    
    	@Override
    	public int run(String[] args) throws Exception {
    		/**
    		 * 获取job:一个工作对象
    		 */
    		// 创建一个 配置 对象
    		Configuration configuration = new Configuration();
    
    		// 设置 name属性 的值。
    		// 如果名称已弃用或有一个弃用的名称与之关联,它会将值设置为两个名称。
    		// 名称将在配置前进行修剪。
    		configuration.set("fs.defaultFS", "hdfs://192.168.232.129:9000");
    
    		// 在configuration中设置数据库访问相关字段。
    		DBConfiguration.configureDB(configuration, className, url, user, password);
    
    		// 根据配置文件创建一个job
    		Job job = Job.getInstance(configuration);
    
    		/**
    		 * 设置job
    		 */
    		/**
    		 * setOutput(Job job, String tableName, String... fieldNames) throws IOException
    		 * 用适当的输出设置初始化作业的缩减部分
    		 * 参数:
    		 * job:The job
    		 * tableName:要插入数据的表
    		 * fieldNames:表中的字段名。
    		 */
    		DBOutputFormat.setOutput(job, "user", new String[] { "id", "name", "password" });
    
    		// 通过查找给定类的来源来设置Jar。
    		job.setJarByClass(JobDemo.class);
    
    		// 给 job 设置 Map和Reduce
    		job.setMapperClass(MapperDemo.class);
    		job.setNumReduceTasks(0); // 这里用为没有用到reduce所以设置为0
    
    		// 告诉job Map输出的key和value的数据类型的是什么
    		job.setMapOutputKeyClass(User.class);
    		job.setMapOutputValueClass(NullWritable.class);
    
    		// 给 job 设置InputFormat
    		// InputFormat:描述 Map-Reduce job 的输入规范
    		// DBInputFormat:从一个SQL表中读取输入数据的输入格式。
    		job.setInputFormatClass(TextInputFormat.class);
    		job.setOutputFormatClass(DBOutputFormat.class);
    		
    		/**
    		 * 设置输入路径
    		 */
    		FileInputFormat.setInputPaths(job, "/user_for_mysql.txt");
    		
    		// 将job提交到集群并等待它完成。
    		boolean waitForCompletion = job.waitForCompletion(true);
    		System.out.println(waitForCompletion ? "执行成功" : "执行失败");
    		return 0;
    	}
    }
    
    展开全文
  • Oracle SOA实施的最佳实践和案例分析:江苏电力SOA实例分析
  • 在计算机网络,数据库先进的开发平台上,利用现有的软件,配置一定的硬件,开发一个具有开放体系结构的、易扩充的、易维护的、具有良好人机交互界面的就业管理信息系统,实现管理学院毕业生就业信息的自动化的...
  • Matlab开发实例系列图书 -MATLAB神经网络30个案例分析- 程序数据
  • 您可以在 Modals(模态框)中使用 Popover(弹出框) Tooltip(工具提示插件)。页面中的模态框一般分为注册模态框、变更模态框、删除(信息提示)模态框三种基本模态框。 好了看代码。更希望大家互相关注,留下您...
  • 一、XgBoost算法 1.XgBoost简介 xgboost的核心算法思想: 不断地添加树,不断地进行特征分裂来生长一棵树,每次添加一个树,...优化的分布式梯度增强库,旨在实现高效,灵活便携。 2.XgBoost定义 事实上,如果不考虑

    一、XgBoost算法

    1.XgBoost简介

    xgboost的核心算法思想:

    1. 不断地添加树,不断地进行特征分裂来生长一棵树,每次添加一个树,其实是学习一个新函数,去拟合上次预测的残差;
    2. 当我们训练完成得到k棵树,我们要预测一个样本的分数,其实就是根据这个样本的特征,在每棵树中会落到对应的一个叶子节点,每个叶子节点就对应一个分数;
    3. 最后只需要将每棵树对应的分数加起来就是该样本的预测值。

    xgboost的优势:

    优化的分布式梯度增强库,旨在实现高效,灵活和便携。

    2.XgBoost定义

    事实上,如果不考虑工程实现、解决问题上的一些差异,xgboost与gbdt比较大的不同就是目标函数的定义。xgboost的目标函数如下图所示:

    在这里插入图片描述

    • 目标函数第一部分(红色箭头)为损失函数;
    • 目标函数第二部分(红色方框)为正则项;
    • 目标函数第三部分(红色圆圈)为常数项

    下面来解释目标函数如何展开为最终形式:

    在这里插入图片描述

    1)损失函数l是二次函数

    在这里插入图片描述
    在这里插入图片描述

    上一步省略了一项与ftf_{t}无关的平方项,需要注意。

    2)损失函数l一般化(泰勒展开)

    忽略损失函数l中的ftf_{t},因为它是已知的预测值,做如下对应:

    • x对应yi^(t1)\hat{y_{i}}^{(t-1)}
    • x\bigtriangleup x对应ft(xi)f_{t}(x_{i})

    在这里插入图片描述

    即可得到泰勒展开结果:

    在这里插入图片描述

    考虑正则项,即树的复杂度:

    • 树里面叶子节点的个数T;
    • 树上叶子节点的得分w的L2模平方;

    则正则项可表达为:

    Ω(ft)=γT+12λj=1Twj2\Omega (f_{t}) = \gamma T + \frac{1}{2} \lambda \sum_{j=1}^{T}w_{j}^2

    目标函数可变化为:

    在这里插入图片描述
    很显然,一棵树的生成是由一个节点一分为二,然后不断分裂最终形成为整棵树。那么树怎么分裂的就成为了接下来我们要探讨的关键。

    对于一个叶子节点如何进行分裂,xgboost作者在其原始论文中给出了两种分裂节点的方法:

    3)枚举所有不同树结构的贪心法

    在这里插入图片描述

    论文算法:

    对于所有的特征x,我们只要做一遍从左到右的扫描就可以枚举出所有分割的梯度和GL和GR。然后用计算Gain的公式计算每个分割方案的分数就可以了。

    然后后续则依然按照这种划分方法继续第二层、第三层、第四层、第N层的分裂。

    第二个值得注意的事情就是引入分割不一定会使得情况变好,所以我们有一个引入新叶子的惩罚项。优化这个目标对应了树的剪枝, 当引入的分割带来的增益小于一个阀值γ 的时候,则忽略这个分割。

    在这里插入图片描述

    4)近似算法

    主要针对数据太大,不能直接进行计算

    在这里插入图片描述

    3、XgBoost实例

    查看地址:

    在Anaconda中查找XgBoost安装包

    安装代码:

    conda install py-xgboost

    在这里插入图片描述
    之前不能使用conda install pxgboost安装XgBoost,后面发现已经可以正常使用。

    import xgboost as xgb
    # read in data
    dtrain = xgb.DMatrix('agaricus.txt.train')
    dtest = xgb.DMatrix('agaricus.txt.test') 
    # 设置XGB的参数,使用字典形式传入
    param = {'max_depth':2, 'eta':1, 'objective':'binary:logistic' }
    num_round = 2     # 使用线程数
    bst = xgb.train(param, dtrain, num_round)   # 训练
    # make prediction
    preds = bst.predict(dtest)   # 预测
    preds
    
    array([0.28583017, 0.9239239 , 0.28583017, ..., 0.9239239 , 0.05169873,
           0.9239239 ], dtype=float32)
    

    1)回归模型

    import xgboost as xgb
    from xgboost import plot_importance
    from matplotlib import pyplot as plt
    from sklearn.model_selection import train_test_split
    from sklearn.datasets import load_boston
    from sklearn.metrics import mean_squared_error
    
    # 加载数据集
    boston = load_boston()
    X,y = boston.data,boston.target
    
    # XGBoost训练过程
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=0)
    
    params = {
        'booster': 'gbtree',
        'objective': 'reg:squarederror',
        'gamma': 0.1,
        'max_depth': 5,
        'lambda': 3,
        'subsample': 0.7,
        'colsample_bytree': 0.7,
        'min_child_weight': 3,
        'silent': 1,
        'eta': 0.1,
        'seed': 1000,
        'nthread': 4,
    }
    
    dtrain = xgb.DMatrix(X_train, y_train)
    num_rounds = 300
    #这里需要转成list 否则会报错
    plst = list(params.items())
    model = xgb.train(plst, dtrain, num_rounds)
    
    # 对测试集进行预测
    dtest = xgb.DMatrix(X_test)
    ans = model.predict(dtest)
    
    # 显示重要特征
    plot_importance(model)
    plt.show()
    

    在这里插入图片描述

    注意上面的plst要改成list:

    plst = list(params.items())

    否则报错:

    在这里插入图片描述

    下面同理,不再赘述。

    2)分类模型

    from sklearn.datasets import load_iris
    from xgboost import plot_importance
    from sklearn.metrics import accuracy_score   
    iris = load_iris()
    X,y = iris.data,iris.target
    X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=0.2, random_state=1234565) # 数据集分割
    
    # 算法参数
    params = {
        'booster': 'gbtree',
        'objective': 'multi:softmax',
        'num_class': 3,
        'gamma': 0.1,
        'max_depth': 6,
        'lambda': 2,
        'subsample': 0.7,
        'colsample_bytree': 0.75,
        'min_child_weight': 3,
        'silent': 0,
        'eta': 0.1,
        'seed': 1,
        'nthread': 4,
    }
    
    plst = list(params.items())
    
    dtrain = xgb.DMatrix(X_train, y_train) # 生成数据集格式
    num_rounds = 500
    model = xgb.train(plst, dtrain, num_rounds) # xgboost模型训练
    
    # 对测试集进行预测
    dtest = xgb.DMatrix(X_test)
    y_pred = model.predict(dtest)
    
    # 计算准确率
    accuracy = accuracy_score(y_test,y_pred)
    print("accuarcy: %.2f%%" % (accuracy*100.0))
    
    # 显示重要特征
    plot_importance(model)
    plt.show()
    

    在这里插入图片描述

    二、LightGBM算法

    LightGBM

    目前Github各项数据:

    在这里插入图片描述

    LightGBM (Light Gradient Boosting Machine)是一个实现GBDT算法的框架,支持高效率的并行训练,并且具有以下优点:

    • 更快的训练速度

    • 更低的内存消耗

    • 更好的准确率

    • 分布式支持,可以快速处理海量数据

    1.提出的动机

    常用的机器学习算法,例如神经网络等算法,都可以以mini-batch的方式训练,训练数据的大小不会受到内存限制。

    而GBDT在每一次迭代的时候,都需要遍历整个训练数据多次。如果把整个训练数据装进内存则会限制训练数据的大小;如果不装进内存,反复地读写训练数据又会消耗非常大的时间。尤其面对工业级海量的数据,普通的GBDT算法是不能满足其需求的。

    LightGBM提出的主要原因就是为了解决GBDT在海量数据遇到的问题,让GBDT可以更好更快地用于工业实践。

    2.优化内容

    基于Histogram的决策树算法带深度限制的Leaf-wise的叶子生长策略直方图做差加速直接支持类别特征(Categorical Feature)Cache命中率优化基于直方图的稀疏特征优化多线程优化。

    在探寻了LightGBM的优化之后,发现LightGBM还具有支持高效并行的优点。LightGBM原生支持并行学习,目前支持特征并行和数据并行的两种。特征并行的主要思想是在不同机器在不同的特征集合上分别寻找最优的分割点,然后在机器间同步最优的分割点。数据并行则是让不同的机器先在本地构造直方图,然后进行全局的合并,最后在合并的直方图上面寻找最优分割点。LightGBM针对这两种并行方法都做了优化,在特征并行算法中,通过在本地保存全部数据避免对数据切分结果的通信;在数据并行中使用分散规约(Reduce scatter)把直方图合并的任务分摊到不同的机器,降低通信和计算,并利用直方图做差,进一步减少了一半的通信量。基于投票的数据并行则进一步优化数据并行中的通信代价,使通信代价变成常数级别。在数据量很大的时候,使用投票并行可以得到非常好的加速效果。

    在这里插入图片描述

    在这里插入图片描述

    在这里插入图片描述

    三、感受

    经过本次学习,个人对集成学习有了更加全面的感受,对于一些公式,敢于去推导,虽然有些地方仍然不是很懂,但是比学习前提高了不少,加油,坚持下去。

    2021年4月27日

    参考文献

    [1].通俗理解kaggle比赛大杀器xgboost

    [2].DataWhale组队学习-集成学习(中)

    [3].解决报错AttributeError: ‘dict_items‘ object has no attribute ‘copy‘

    [4].windows 下使用conda安装xgboost问题

    [5].开源 | LightGBM:三天内收获GitHub 1000 星

    展开全文
  • 第一节 主成分分析方法 主成分分析的基本原理 主成分分析的计算步骤 主成分分析方法应用实例 地理系统是多要素的复杂系统在地理学研究中多变量问题是经常会遇到的变量太多无疑会增加分析问题的难度与复杂性而且在...
  • pandas 案例分析:美国人口案例分析 ''' 需求: 导入文件,查看原始数据 将人口数据各州简称数据进行合并 将合并的数据中重复的abbreviation列进行删除 查看存在缺失数据的列 找到有哪些state/region使得...

    pandas 案例分析:美国人口案例分析

    '''
    需求:
        导入文件,查看原始数据
        将人口数据和各州简称数据进行合并
        将合并的数据中重复的abbreviation列进行删除
        查看存在缺失数据的列
        找到有哪些state/region使得state的值为NaN,进行去重操作
        为找到的这些state/region的state项补上正确的值,从而去除掉state这一列的所有NaN
        合并各州面积数据areas
        我们会发现area(sq.mi)这一列有缺失数据,找出是哪些行
        去除含有缺失数据的行
        找出2010年的全民人口数据
        计算各州的人口密度
        排序,并找出人口密度最高的五个州 df.sort_values()
    '''
    import numpy as np
    import pandas as pd
    
    # 导入文件,查看原始数据
    # 将人口数据和各州简称数据进行合并
    s_abbrevs = pd.read_csv('../datasets/state-abbrevs.csv')
    print(s_abbrevs.head())
    
    s_population = pd.read_csv('../datasets/state-population.csv')
    print(s_population.head())
    
    s_areas = pd.read_csv('../datasets/state-areas.csv')
    print(s_areas.head())
    
    # 将合并的数据中重复的abbreviation列进行删除
    abb_pop = pd.merge(s_abbrevs, s_population, left_on='abbreviation', right_on='state/region', how='outer')
    print(abb_pop.head())
    abb_pop = abb_pop.drop(columns='abbreviation')
    # 查看存在缺失数据的列
    '''
    state            True
    abbreviation     True
    state/region    False
    ages            False
    year            False
    population       True
    '''
    print(abb_pop.isnull().any(axis=0))
    # 找到有哪些state/region使得state的值为NaN,进行去重操作
    print(abb_pop[abb_pop['state'].isnull()]['state/region'].unique())  # ['PR' 'USA']
    index_PR = abb_pop[abb_pop['state/region'] == 'PR'].index
    print(index_PR)
    # 为找到的这些state/region的state项补上正确的值,从而去除掉state这一列的所有NaN
    abb_pop.loc[index_PR, 'state'] = 'PUERTO'
    index_USA = abb_pop[abb_pop['state/region'] == 'USA'].index
    print(index_USA)
    # 为找到的这些state/region的state项补上正确的值,从而去除掉state这一列的所有NaN
    abb_pop.loc[index_USA, 'state'] = 'America'
    print(abb_pop[abb_pop['state/region'] == 'PR'].head())
    print(abb_pop[abb_pop['state/region'] == 'USA'].head())
    print(abb_pop.isnull().any(axis=0))
    # 合并各州面积数据areas
    areas_abb_pop = pd.merge(s_areas, abb_pop, left_on='state', right_on='state', how='outer')
    print(areas_abb_pop.head())
    areas_abb_pop.set_index('state', inplace=True)
    # 我们会发现area(sq.mi)这一列有缺失数据,找出是哪些行
    print(areas_abb_pop.isnull().any(axis=0))
    # 去除含有缺失数据的行
    areas_abb_pop.dropna(inplace=True)
    # 找出2010年的全民人口数据
    query_2010 = areas_abb_pop.query('ages == "total" & year == 2010')
    print(query_2010.head())
    # 计算各州的人口密度
    midu = query_2010['population'] / query_2010['area (sq. mi)']
    print(midu)
    midu = midu.sort_values(ascending=True)
    # 排序,并找出人口密度最高的五个州 df.sort_values()
    print(midu.sort_values().tail())
    # 排序,并找出人口密度最低的五个州 df.sort_values()
    print(midu.sort_values().head())
    
    
    展开全文
  • 王栋:携程技术保障中心数据库专家,对数据库疑难问题的排查数据库自动化智能化运维工具的开发有强烈的兴趣。 【问题描述】 我们生产环境有一组集群的多台MySQL服务器(MySQL 5.6.21),不定期的会crash,但error ...

    【作者】

    王栋:携程技术保障中心数据库专家,对数据库疑难问题的排查和数据库自动化智能化运维工具的开发有强烈的兴趣。

    【问题描述】

    我们生产环境有一组集群的多台MySQL服务器(MySQL 5.6.21),不定期的会crash,但error log中只记录了重启信息,未记录crash时的堆栈:

    mysqld_safe Number of processes running now: 0
    mysqld_safe mysqld restarted
    

    接下来首先排查系统日志/var/log/message文件,crash时没有其他异常信息,也不是OOM导致的。

    【排查思路】

    由于日志中未记录有价值的信息。为定位crash的原因,首先开启mysql core dump的功能。
    下面是开启core dump的步骤:
    1、 在my.cnf文件中增加2个配置项

    [mysqld]
    core_file
    [mysqld_safe]
    core-file-size=unlimited

    2、修改系统参数,配置suid_dumpable

    echo 1 >/proc/sys/fs/suid_dumpable

    3、重启mysql服务,配置生效

    【问题分析】

    开启core dump后,服务器再次crash时生成了core file。
    用gdb分析生成的core file,可以看到crash时的堆栈信息如下:
    1247138-20181224111324701-1200878176.png

    从函数table_esms_by_digest::delete_all_rows可以看出触发crash的是truncate table events_statements_summary_by_digest操作。
    我们内部有个DML的分析工具,用来统计数据库每分钟增删改查的访问量。该工具的数据源是events_statements_summary_by_digest表,采集程序会每一分钟采集一次这张表的数据,采集完成后执行truncate操作。
    暂停这组集群上DML采集程序后MySQL没有再发生crash。

    进一步分析多个core file,发现最终函数的调用都发生在_lf_pinbox_real_free函数上。
    结合现场环境,有两处地方值得分析:

    1、 内存的不正常值。当打印该变量时,此处变量的地址偏低,不太正常:

    (gdb) p pins->pinbox
    $2 = (LF_PINBOX *) 0x1367208

    2、红字部分为pfs逐条释放digest记录的操作,正在释放某行数据时出现错误:

    void reset_esms_by_digest()
    {
    uint index;
    if (statements_digest_stat_array == NULL)
    return;
    PFS_thread *thread= PFS_thread::get_current_thread();
    if (unlikely(thread == NULL))
    return;
    for (index= 0; index < digest_max; index++)
    {


    }
    digest_index= 1;
    }

    猜测有两种可能导致错误:
    1、高并发下,对内存访问出现冲突;
    2、某个特殊SQL导致,在处理hash时。

    在网上搜索类似的问题,有了进一步的进展,基本确定了这个问题是bug导致

    如下Mysql的bug report中讲述了类似问题
    https://bugs.mysql.com/bug.php?id=73979
    更详细的环境描述如下连接中
    https://bugs.launchpad.net/percona-server/+bug/1351148

    查到5.6.35上的bug fix的修复内容,和我们碰到的情况非常类似。
    对比_lf_pinbox_real_free的修改,该部分确实进行很大的调整。

    下面是MySQL 5.6.35函数_lf_pinbox_real_free的代码片段:


    static void (LF_PINS pins)
    {
    LF_PINBOX
    pinbox= pins->pinbox;
    struct st_match_and_save_arg arg = {pins, pinbox, pins->purgatory};
    pins->purgatory= NULL;
    pins->purgatory_count= 0;
    lf_dynarray_iterate(&pinbox->pinarray,
    (lf_dynarray_func)match_and_save, &arg);
    if (arg.old_purgatory)
    {
    void *last= arg.old_purgatory;
    while (pnext_node(pinbox, last))
    last= pnext_node(pinbox, last);
    pinbox->free_func(arg.old_purgatory, last, pinbox->free_func_arg);
    }
    }

    下面是MySQL 5.6.21函数的_lf_pinbox_real_free的代码片段

    static void (LF_PINS pins)
    {
    int npins;
    void
    list;
    void **addr= NULL;
    void first= NULL, last= NULL;
    LF_PINBOX pinbox= pins->pinbox;
    npins= pinbox->pins_in_array+1;
    if (pins->stack_ends_here != NULL)
    {
    int alloca_size= sizeof(void
    )LF_PINBOX_PINSnpins;
    if (available_stack_size(&pinbox, *pins->stack_ends_here) > alloca_size)
    {
    struct st_harvester hv;
    addr= (void **) alloca(alloca_size);
    hv.granary= addr;
    hv.npins= npins;
    _lf_dynarray_iterate(&pinbox->pinarray,
    (lf_dynarray_func)harvest_pins, &hv);
    npins= hv.granary-addr;
    if (npins)
    qsort(addr, npins, sizeof(void *), (qsort_cmp)ptr_cmp);
    }
    }

    同时观察到出问题的集群有指标异常,QPS不到6000,Threads_connected将近8000。(对比其他高并发的集群,QPS在20000以上,Threads_connected也只有300左右)。

    排查应用端的连接方式,了解到其中一个应用有近百台应用服务器,可能同时发起请求,却没有合理的复用连接,维持大量的连接线程增大了bug触发的概率。

    Bugs Fixed的描述如下:

    Miscalculation of memory requirements for qsort operations could result in stack overflow errors in situations with a large number of concurrent server connections. (Bug #73979, Bug #19678930, Bug #23224078)

    【解决思路】

    我们通过分析crash时的core file文件,找到crash时的触发条件,暂停DML采集程序(truncate table events_statements_summary_by_digest操作)后恢复。
    后面了解到这是MySQL的一个bug,在MySQL 5.6.35版本后已修复。这个bug在应用端与数据库建立大量的连接时,更容易触发。

    转载于:https://www.cnblogs.com/CtripDBA/p/10167608.html

    展开全文
  • XGBoost算法分析案例调参实例 XGBoost算法分析 XGBoost是陈天奇等人开发的一个开源机器学习项目,高效地实现了GBDT算法并进行了算法工程上的许多改进,被广泛应用在Kaggle竞赛及其他许多机器学习竞赛中并取得了...
  • 重复数据处理在DataFrame中主要运用duplicated方法drop_duplicates方法:duplicated方法返回的是一个布尔型的Series,用来只是各行是否重复,如果重复则为True,否则为False。drop_duplicates直接返回已经删除了...
  • Stacking算法分析案例调参实例 Stacking方法是一种分层模型集成框架。以两层为例,首先将数据集分成训练集测试集,利用训练集训练得到多个初级学习器,然后用初级学习器对测试集进行预测,并将输出值作为下一...
  • 理论上合理的加权分位数草图过程,使得能够在近似树学习中处理实例权重; 平行分布式计算使得学习更快,从而能够更快的进行模型探索; 最重要的是XGBoost使用核外计算并且能够让数据科学家在台式机上处.
  • 集成学习-XGBoost算法分析案例调参实例(task11.2021.04.26) 文章目录集成学习-XGBoost算法分析案例调参实例(task11.2021.04.26)XGBoost算法简介XGBoost的核心算法思想不难,基本就是:一、XGBoost分类案例二...
  • 包括: 《MATLAB神经网络30个案例分析》(全书源代码) 《MATLAB神经网络原理与实例精解》 对于初学者有很大帮助,讲解很透彻!
  • XGBoost是陈天奇等人开发的一个开源机器学习项目,高效地实现了GBDT算法并进行了算法工程上的许多改进。 XGBoost本质上还是一个GBDT,但是力争把速度效率发挥到极致,所以叫X (Extreme) GBoosted,包括前面说过...
  • 视觉效果动画效果已经成为产品设计中不可或缺的一部分,开发人员使用动画效果可以创建引人入胜的内容,营销人员可以使用这些效果来获得更好的产品覆盖率良好的转化率。倒数计时器就是这样一种元素,当用户登陆...
  • 本文实例讲述了Python强口令检测算法。分享给大家供大家参考,具体如下:强口令检测题目如下:写一个函数,它使用正则表达式,确保传入的口令字符串是强口令。强口令定义:长度不少于8个字符,同时包含大写小写...
  • 最近又给baggingboosting绕迷糊了,再来复习下,首先bagging聚焦方差的减小,boosting聚焦偏差的减小。然后bagging采用了自助采样bootstrap投票的方法进行集成学习,但是boosting类似于以前的错题本,类似于三个...
  • 课程——基于Python数据分析与机器学习案例实战教程分享网盘下载——...数据分析和机器学习建模成了当下最热门的技术,课程旨在帮助同学们快速掌握python数据分析包以及经典机器学习算法并通过对真实数据集分...
  • C2C案例分析

    2013-03-16 20:28:23
    C2C简介,实例分析,主要对淘宝易趣进行案例分析
  • 预测数据分析的机器学习基础:算法,实例和案例研究,第二版作者:John D. Kelleher(作者),Brian Mac Namee(作者),Aoife D'Arcy(作者)出...
  • 分享知识要点:lubridate包拆解时间 | POSIXlt利用决策树分类,利用随机森林预测利用对数进行fit,exp函数还原训练集来自Kaggle华盛顿自行车共享计划中的自行车租赁数据,分析共享自行车与天气、时间等关系。...
  • XGBoost – 陈天奇等人开发的一个开源ML项目,高效地实现了GBDT算法并进行了算法工程上的许多改进,被广泛应用在Kaggle竞赛 & 其他许多ML竞赛中取得了不错的成绩。 XGBoost本质还是一个GBDT,但是力争把速度...
  • 语法: * :slot的值如果是“header”则会匹配到左边...案例: <!DOCTYPE html> <html lang="en"> <head> <meta charset="UTF-8"> <title>Document</title> </head> ...
  • 文章目录1.具体分析2.具体实现 1.具体分析 群主发普通红包。某群有多名成员,群主给成员发普通红包。普通红包的规则: 群主的一笔金额,从群主余额中扣除,平均... 创建gettersetter方法(成员变量为私有的,只能...
  • 思路:灰度转换,形态学处理,二值化,轮廓处理,拟合圆,位置分析等。 代码: // CountElement.cpp : 此文件包含 "main" 函数。程序执行将在此处开始并结束。 // #include "pch.h" #include <iostream...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,808
精华内容 723
关键字:

案例分析和实例分析