精华内容
下载资源
问答
  • 统计每一个手机号耗费的总上行流量下行流量、总流量 2.数据准备: 输入数据格式: 时间戳、电话号码、基站的物理地址、访问网址的ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码 最终输出的...

    1.需求:

    • 统计每一个手机号耗费的总上行流量、下行流量、总流量

    2.数据准备:

    (1)输入数据格式:
    时间戳、电话号码、基站的物理地址、访问网址的ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码
    

    输入的数据

    (2)最终输出的数据格式:
    手机号码		上行流量        下行流量		总流量
    

    输出的数据

    3.分析:

    基本思路:

    (1)Map阶段:
    • (a)读取一行数据,切分字段
    • (b)抽取手机号、上行流量、下行流量
    • ©以手机号为key,bean对象为value输出,即context.write(手机号,bean);
    (2)Reduce阶段:
    • (a)累加上行流量和下行流量得到总流量。
    • (b)实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输
    • ©MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key

    所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable。
    然后重写key的compareTo方法。

    4.程序代码:

    (1)编写流量统计的bean对象FlowBean

    FlowBean.java

    package phoneData;
    
    import lombok.Getter;
    import lombok.Setter;
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    // 1 实现writable接口
    @Setter
    @Getter
    public class FlowBean implements Writable {
        //上传流量
        private long upFlow;
        //下载流量
        private long downFlow;
        //流量总和
        private long sumFlow;
    
        //必须要有,反序列化要调用空参构造器
        public FlowBean() {
        }
    
        public FlowBean(long upFlow, long downFlow) {
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.sumFlow = upFlow + downFlow;
        }
    
        public void set(long upFlow, long downFlow){
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.sumFlow = upFlow + downFlow;
        }
    
    
        /**
         * 序列化
         *
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        /**
         * 反序列化
         * 注:字段属性顺序必须一致
         *
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            this.upFlow = in.readLong();
            this.downFlow = in.readLong();
            this.sumFlow = in.readLong();
        }
        @Override
        public String toString() {
            return upFlow + "\t" + downFlow + "\t" + sumFlow;
        }
    }
    
    (2)编写Mapper

    FlowCountMapper.java

    package phoneData;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * LongWritable, Text ===> Map输入    <偏移量,手机号>
     * Text, FlowBean  ======> Map的输出:<手机号、流量上传下载总和>
     */
    public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
        Text k = new Text();
        FlowBean v = new FlowBean();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取每一行数据
            String line = value.toString();
    
            //切割字段
            //1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
            String[] fields = line.split("\t");
            //手机号
            String phoneNum = fields[1];
    
            //上传和下载 upFlow downFlow
            long upFlow = Long.parseLong(fields[fields.length - 3]);
            long downFlow = Long.parseLong(fields[fields.length - 2]);
    
            k.set(phoneNum);
    
            context.write(k,new FlowBean(upFlow,downFlow));
        }
    }
    
    (3)编写Reducer

    FlowCountReducer.java

    package phoneData;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
            //上传和下载的总和初始化
            long sum_upFlow = 0;
            long sum_downFlow = 0;
    
            // 1 遍历所用bean,将其中的上行流量,下行流量分别累加
            for (FlowBean flowBean : values) {
                //所有的上传的流量加在一起
                sum_upFlow += flowBean.getUpFlow();
                //所有的下载的流量加在一起
                sum_downFlow += flowBean.getDownFlow();
            }
            // 2 封装对象
            FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
            // 3 写出
            context.write(key, resultBean);
        }
    }
    
    (4)编写驱动

    FlowsumDriver.java

    package phoneData;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class FlowsumDriver {
    
        public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
    
            //args = new String[]{"/Users/macbook/TestInfo/phone_data.txt", "/Users/macbook/TestInfo/MovlePhone1"};
    
            // 1 获取配置信息,或者job对象实例
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            // 6 指定本程序的jar包所在的本地路径
            job.setJarByClass(FlowsumDriver.class);
    
            // 2 指定本业务job要使用的mapper/Reducer业务类
            job.setMapperClass(FlowCountMapper.class);
            job.setReducerClass(FlowCountReducer.class);
    
            // 3 指定mapper输出数据的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
    
            // 4 指定最终输出的数据的kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
    
    //        job.setPartitionerClass(ProvincePartitioner.class);
    //        job.setNumReduceTasks(6);
    
            // 5 指定job的输入原始文件所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
            job.waitForCompletion(true);
        }
    }
    

    5.运行结果:

    (1)打包上传到hadoop集群:
    (2)运行:
    hadoop jar Hadoop-1.0-SNAPSHOT.jar phoneData.FlowSumDriver /phone_data.txt /out2
    

    运行结果

    展开全文
  • 代码类似于使用mapreduce统计单词个数 代码中未作过多的注释,注释上一篇博客的... 左边的图 是输入 三列分别是 : 手机号 上行流量 下行流量 左边的图是输出 四列分别代码:手机号 上行流量 下行流量 流量总和 ...

    代码中未作过多的注释,注释和上一篇博客的内容相似

    左边的图  是输入     三列分别是 : 手机号   上行流量  下行流量

    左边的图是输出  四列分别代码:手机号   上行流量   下行流量  流量总和

    展开全文
  • package com.dong.flow.phone; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable;...public class FlowPhoneBean im...

    package com.dong.flow.phone;

     

    import java.io.DataInput;

    import java.io.DataOutput;

    import java.io.IOException;

     

    import org.apache.hadoop.io.Writable;

     

     

     

     

    public class FlowPhoneBean implements Writable{

        public long upFlow;

        public long downFlow;

        public long sumFlow;

        public long getUpFlow() {

             return upFlow;

        }

       

        public FlowPhoneBean() {

        }

     

        public long getSumFlow() {

             return sumFlow;

        }

        public void setSumFlow(long sumFlow) {

             this.sumFlow = sumFlow;

        }

        public void setUpFlow(long upFlow) {

             this.upFlow = upFlow;

        }

        public long getDownFlow() {

             return downFlow;

        }

        public void setDownFlow(long downFlow) {

             this.downFlow = downFlow;

        }

        public FlowPhoneBean(long upFlow, long downFlow) {

             this.upFlow = upFlow;

             this.downFlow = downFlow;

             this.sumFlow = upFlow+downFlow;

        }

       

    @Override

        public String toString() {

             return "FlowPhoneBean [upFlow=" + upFlow + ", downFlow=" + downFlow

                      + ", sumFlow=" + sumFlow + "]";

        }

        /*

     * 反序列化

     * (non-Javadoc)

     * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)

     */

        public void readFields(DataInput arg0) throws IOException {

             // TODO Auto-generated method stub

             upFlow=arg0.readLong();

             downFlow=arg0.readLong();

             sumFlow=arg0.readLong();

            

        }

        /*

         * 序列化

         */

        public void write(DataOutput arg0) throws IOException {

             // TODO Auto-generated method stub

             arg0.writeLong(upFlow);

             arg0.writeLong(downFlow);

             arg0.writeLong(sumFlow);

        }

       

     

    }

    package com.dong.flow.phone;

     

    import java.io.IOException;

     

     

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

     

    import com.sun.org.apache.xml.internal.security.encryption.AgreementMethod;

     

    public class FlowStaPhone {

     

        static class FlowMap extends

                 Mapper<LongWritable, Text, Text, FlowPhoneBean> {

             @Override

             protected void map(LongWritable key, Text value, Context context)

                      throws IOException, InterruptedException {

                 String info = value.toString();

                 String[] infospilt = info.split("\t");

                 String phone = infospilt[1];

                 long upFlow = Long.parseLong(infospilt[infospilt.length - 3]);

                 long downFlow = Long.parseLong(infospilt[infospilt.length - 2]);

                 context.write(new Text(phone), new FlowPhoneBean(upFlow, downFlow));

     

             }

        }

     

        static class FlowReduce extends

                 Reducer<Text, FlowPhoneBean, Text, FlowPhoneBean> {

             @Override

             protected void reduce(Text text, Iterable<FlowPhoneBean> iteror,

                      Context context) throws IOException, InterruptedException {

                 long upsum = 0;

                 long downsum = 0;

                 for (FlowPhoneBean flowPhoneBean : iteror) {

                      upsum += flowPhoneBean.getUpFlow();

                      downsum += flowPhoneBean.getDownFlow();

                 }

                 context.write(text, new FlowPhoneBean(upsum, downsum));

     

             }

        }

     

        public static void main(String[] args) throws Exception {

             // TODO Auto-generated method stub

             Configuration configuration = new Configuration();

             Job job=Job.getInstance(configuration);

             job.setJarByClass(FlowPhoneBean.class);

             job.setMapperClass(FlowMap.class);

             job.setReducerClass(FlowReduce.class);

             job.setMapOutputKeyClass(Text.class);

             job.setMapOutputValueClass(FlowPhoneBean.class);

             job.setOutputKeyClass(Text.class);

             job.setOutputValueClass(FlowPhoneBean.class);

             FileInputFormat.setInputPaths(job, new Path(args[0]));

             FileOutputFormat.setOutputPath(job,new Path(args[1]));

             boolean res=job.waitForCompletion(true);

             System.exit(res?0:1);

       

        }

     

    }

    展开全文
  • package com.dong.flow.phone; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable;...public class FlowPhoneBean im...

    package com.dong.flow.phone;

     

    import java.io.DataInput;

    import java.io.DataOutput;

    import java.io.IOException;

     

    import org.apache.hadoop.io.Writable;

     

     

     

     

    public class FlowPhoneBean implements Writable{

        public long upFlow;

        public long downFlow;

        public long sumFlow;

        public long getUpFlow() {

            return upFlow;

        }

       

        public FlowPhoneBean() {

        }

     

        public long getSumFlow() {

            return sumFlow;

        }

        public void setSumFlow(long sumFlow) {

            this.sumFlow = sumFlow;

        }

        public void setUpFlow(long upFlow) {

            this.upFlow = upFlow;

        }

        public long getDownFlow() {

            return downFlow;

        }

        public void setDownFlow(long downFlow) {

            this.downFlow = downFlow;

        }

        public FlowPhoneBean(long upFlow, long downFlow) {

            this.upFlow = upFlow;

            this.downFlow = downFlow;

            this.sumFlow = upFlow+downFlow;

        }

       

    @Override

        public String toString() {

            return "FlowPhoneBean [upFlow=" + upFlow + ", downFlow=" + downFlow

                    + ", sumFlow=" + sumFlow + "]";

        }

        /*

     * 反序列化

     * (non-Javadoc)

     * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)

     */

        public void readFields(DataInput arg0) throws IOException {

            // TODO Auto-generated method stub

            upFlow=arg0.readLong();

            downFlow=arg0.readLong();

            sumFlow=arg0.readLong();

           

        }

        /*

         * 序列化

         */

        public void write(DataOutput arg0) throws IOException {

            // TODO Auto-generated method stub

            arg0.writeLong(upFlow);

            arg0.writeLong(downFlow);

            arg0.writeLong(sumFlow);

        }

       

     

    }

    package com.dong.flow.phone;

     

    import java.util.HashMap;

     

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapred.JobConf;

    import org.apache.hadoop.mapred.Partitioner;

     

    public class ProvincesFlow extends

            org.apache.hadoop.mapreduce.Partitioner<Text, FlowPhoneBean> {

        public static HashMap<String, Integer> hashMap = new HashMap<String, Integer>();

        static {

            hashMap.put("136", 0);

            hashMap.put("137", 1);

            hashMap.put("138", 2);

            hashMap.put("139", 3);

        }

     

        @Override

        public int getPartition(Text key, FlowPhoneBean value, int numPartitions) {

            // TODO Auto-generated method stub

            String prefix = key.toString().substring(0, 3);

            Integer id = hashMap.get(prefix);

            return id == null ? 4 : id;

        }

     

    }

    package com.dong.flow.phone;

     

    import java.io.IOException;

     

     

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

     

    import com.sun.org.apache.xml.internal.security.encryption.AgreementMethod;

     

    public class FlowStaPhone {

     

        static class FlowMap extends

                 Mapper<LongWritable, Text, Text, FlowPhoneBean> {

             @Override

             protected void map(LongWritable key, Text value, Context context)

                      throws IOException, InterruptedException {

                 String info = value.toString();

                 String[] infospilt = info.split("\t");

                 String phone = infospilt[1];

                 long upFlow = Long.parseLong(infospilt[infospilt.length - 3]);

                 long downFlow = Long.parseLong(infospilt[infospilt.length - 2]);

                 context.write(new Text(phone), new FlowPhoneBean(upFlow, downFlow));

     

             }

        }

     

        static class FlowReduce extends

                 Reducer<Text, FlowPhoneBean, Text, FlowPhoneBean> {

             @Override

             protected void reduce(Text text, Iterable<FlowPhoneBean> iteror,

                      Context context) throws IOException, InterruptedException {

                 long upsum = 0;

                 long downsum = 0;

                 for (FlowPhoneBean flowPhoneBean : iteror) {

                      upsum += flowPhoneBean.getUpFlow();

                      downsum += flowPhoneBean.getDownFlow();

                 }

                 context.write(text, new FlowPhoneBean(upsum, downsum));

     

             }

        }

     

        public static void main(String[] args) throws Exception {

             // TODO Auto-generated method stub

             Configuration configuration = new Configuration();

             Job job=Job.getInstance(configuration);

             job.setJarByClass(FlowPhoneBean.class);

             job.setMapperClass(FlowMap.class);

             job.setReducerClass(FlowReduce.class);

             //指定自定义的数据分区器

             job.setPartitionerClass(ProvincesFlow.class);

             job.setNumReduceTasks(5);

             job.setMapOutputKeyClass(Text.class);

             job.setMapOutputValueClass(FlowPhoneBean.class);

             job.setOutputKeyClass(Text.class);

             job.setOutputValueClass(FlowPhoneBean.class);

             FileInputFormat.setInputPaths(job, new Path(args[0]));

             FileOutputFormat.setOutputPath(job,new Path(args[1]));

             boolean res=job.waitForCompletion(true);

             System.exit(res?0:1);

       

        }

     

    }

    展开全文
  • 上行流量下行流量,总流量封装成一个对象bean,map中context.write(手机号,bean),reduce中对每个用户的流量进行统计。 MapReduce中传输自定义数据类型Bean: (1) 要在Hadoop的各个节点之间传输,就必须实现其...
  • MapReduce统计上行流量下行流量及流量之数据集需求分析数据需求分析具体操作自定义一个数据类型Map阶段自定义分区Reduce阶段Driver阶段将程序打成jar包在IDEA上打jar包的流程图在集群上运行 数据集需求分析 数据...
  • (2)抽取手机号、上行流量下行流量 (3)以手机号为key,bean对象为value输出,即context.write(手机号,bean); Reduce阶段: (1)累加上行流量和下行流量得到总流量。 (2)实现自定义的bean来封装流量信息,并...
  • 根据数据日志统计每一个手机号耗费的总上行流量下行流量、总流量 二、数据 1、输入数据 1 13736230513 192.196.100.1 www.atguigu.com 2481 24681 200 2 13846544121 192.196.100.2 ...
  • 数据的输入输出的指定目录 FileInputFormat.setInputPaths(job,new Path(args[0])); FileOutputFormat.setOutputPath(job,new Path(args[1])); //5.提交job job.waitForCompletion(true); long endTime = System....
  • 手机号 IP地址 上行流量 下行流量 网络代码 13345734563 192.168.1.1 1116 3452 200 13545734556 192.168.1.1 1126 252 200 13545734345 192.168.1.1 34516 3362 200 13575734574 192.168.1.1 1886 ...
  • 统计每一个手机号耗费的总上行流量下行流量、总流量 2)数据准备 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg.com 24 27 2481 24681 200 1363157995052 13826544101 5C-...
  • 一、准备 (1)windows可以连接hadoop集群 (2)配置hadoopjdk...(2)抽取手机号、上行流量下行流量 (3)以手机号为key,bean对象为value输出,即context.write(手机号,bean); Reduce阶段: (1)累加上...
  • 时间戳 手机号 MAC地址 ip 域名 上行流量包个数 下行 上行流量 下行流量 http状态码 1363157995052 13826544101 5C-0E-8B-C7-F1-E0:CMCC 120.197.40.4 ...
  • 自定义排序1.需求:2.数据准备:3.分析:4.代码实现:(1)FlowBean类:(2)MapWritable类:(3)ReduceWritable类:(4)MainWritable类:5....———————— —————————— ...将统计结果按照总流量倒序...
  • 目录 数据: 需求: 思路分析: 具备知识: FlowBean javabean FlowCountMapper: FlowCountReducer: 美观一点: 数据: 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg....

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 7,406
精华内容 2,962
关键字:

上行流量和下行流量