精华内容
下载资源
问答
  • 统计每一个手机号耗费的总上行流量下行流量、总流量 2.数据准备: 输入数据格式: 时间戳、电话号码、基站的物理地址、访问网址的ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码 最终输出的...

    1.需求:

    • 统计每一个手机号耗费的总上行流量、下行流量、总流量

    2.数据准备:

    (1)输入数据格式:
    时间戳、电话号码、基站的物理地址、访问网址的ip、网站域名、数据包、接包数、上行/传流量、下行/载流量、响应码
    

    输入的数据

    (2)最终输出的数据格式:
    手机号码		上行流量        下行流量		总流量
    

    输出的数据

    3.分析:

    基本思路:

    (1)Map阶段:
    • (a)读取一行数据,切分字段
    • (b)抽取手机号、上行流量、下行流量
    • ©以手机号为key,bean对象为value输出,即context.write(手机号,bean);
    (2)Reduce阶段:
    • (a)累加上行流量和下行流量得到总流量。
    • (b)实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输
    • ©MR程序在处理数据的过程中会对数据排序(map输出的kv对传输到reduce之前,会排序),排序的依据是map输出的key

    所以,我们如果要实现自己需要的排序规则,则可以考虑将排序因素放到key中,让key实现接口:WritableComparable。
    然后重写key的compareTo方法。

    4.程序代码:

    (1)编写流量统计的bean对象FlowBean

    FlowBean.java

    package phoneData;
    
    import lombok.Getter;
    import lombok.Setter;
    import org.apache.hadoop.io.Writable;
    
    import java.io.DataInput;
    import java.io.DataOutput;
    import java.io.IOException;
    
    // 1 实现writable接口
    @Setter
    @Getter
    public class FlowBean implements Writable {
        //上传流量
        private long upFlow;
        //下载流量
        private long downFlow;
        //流量总和
        private long sumFlow;
    
        //必须要有,反序列化要调用空参构造器
        public FlowBean() {
        }
    
        public FlowBean(long upFlow, long downFlow) {
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.sumFlow = upFlow + downFlow;
        }
    
        public void set(long upFlow, long downFlow){
            this.upFlow = upFlow;
            this.downFlow = downFlow;
            this.sumFlow = upFlow + downFlow;
        }
    
    
        /**
         * 序列化
         *
         * @param out
         * @throws IOException
         */
        @Override
        public void write(DataOutput out) throws IOException {
            out.writeLong(upFlow);
            out.writeLong(downFlow);
            out.writeLong(sumFlow);
        }
    
        /**
         * 反序列化
         * 注:字段属性顺序必须一致
         *
         * @param in
         * @throws IOException
         */
        @Override
        public void readFields(DataInput in) throws IOException {
            this.upFlow = in.readLong();
            this.downFlow = in.readLong();
            this.sumFlow = in.readLong();
        }
        @Override
        public String toString() {
            return upFlow + "\t" + downFlow + "\t" + sumFlow;
        }
    }
    
    (2)编写Mapper

    FlowCountMapper.java

    package phoneData;
    
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Mapper;
    
    import java.io.IOException;
    
    /**
     * LongWritable, Text ===> Map输入    <偏移量,手机号>
     * Text, FlowBean  ======> Map的输出:<手机号、流量上传下载总和>
     */
    public class FlowCountMapper extends Mapper<LongWritable, Text, Text, FlowBean> {
        Text k = new Text();
        FlowBean v = new FlowBean();
    
        @Override
        protected void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
            //获取每一行数据
            String line = value.toString();
    
            //切割字段
            //1363157995052 	13826544101	5C-0E-8B-C7-F1-E0:CMCC	120.197.40.4			4	0	264	0	200
            String[] fields = line.split("\t");
            //手机号
            String phoneNum = fields[1];
    
            //上传和下载 upFlow downFlow
            long upFlow = Long.parseLong(fields[fields.length - 3]);
            long downFlow = Long.parseLong(fields[fields.length - 2]);
    
            k.set(phoneNum);
    
            context.write(k,new FlowBean(upFlow,downFlow));
        }
    }
    
    (3)编写Reducer

    FlowCountReducer.java

    package phoneData;
    
    import java.io.IOException;
    
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Reducer;
    
    public class FlowCountReducer extends Reducer<Text, FlowBean, Text, FlowBean> {
    
        @Override
        protected void reduce(Text key, Iterable<FlowBean> values, Context context) throws IOException, InterruptedException {
            //上传和下载的总和初始化
            long sum_upFlow = 0;
            long sum_downFlow = 0;
    
            // 1 遍历所用bean,将其中的上行流量,下行流量分别累加
            for (FlowBean flowBean : values) {
                //所有的上传的流量加在一起
                sum_upFlow += flowBean.getUpFlow();
                //所有的下载的流量加在一起
                sum_downFlow += flowBean.getDownFlow();
            }
            // 2 封装对象
            FlowBean resultBean = new FlowBean(sum_upFlow, sum_downFlow);
            // 3 写出
            context.write(key, resultBean);
        }
    }
    
    (4)编写驱动

    FlowsumDriver.java

    package phoneData;
    
    import java.io.IOException;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    public class FlowsumDriver {
    
        public static void main(String[] args) throws IllegalArgumentException, IOException, ClassNotFoundException, InterruptedException {
    
            //args = new String[]{"/Users/macbook/TestInfo/phone_data.txt", "/Users/macbook/TestInfo/MovlePhone1"};
    
            // 1 获取配置信息,或者job对象实例
            Configuration configuration = new Configuration();
            Job job = Job.getInstance(configuration);
    
            // 6 指定本程序的jar包所在的本地路径
            job.setJarByClass(FlowsumDriver.class);
    
            // 2 指定本业务job要使用的mapper/Reducer业务类
            job.setMapperClass(FlowCountMapper.class);
            job.setReducerClass(FlowCountReducer.class);
    
            // 3 指定mapper输出数据的kv类型
            job.setMapOutputKeyClass(Text.class);
            job.setMapOutputValueClass(FlowBean.class);
    
            // 4 指定最终输出的数据的kv类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(FlowBean.class);
    
    //        job.setPartitionerClass(ProvincePartitioner.class);
    //        job.setNumReduceTasks(6);
    
            // 5 指定job的输入原始文件所在目录
            FileInputFormat.setInputPaths(job, new Path(args[0]));
            FileOutputFormat.setOutputPath(job, new Path(args[1]));
    
            // 7 将job中配置的相关参数,以及job所用的java类所在的jar包, 提交给yarn去运行
            job.waitForCompletion(true);
        }
    }
    

    5.运行结果:

    (1)打包上传到hadoop集群:
    (2)运行:
    hadoop jar Hadoop-1.0-SNAPSHOT.jar phoneData.FlowSumDriver /phone_data.txt /out2
    

    运行结果

    展开全文
  • 代码类似于使用mapreduce统计单词个数 代码中未作过多的注释,注释上一篇博客的... 左边的图 是输入 三列分别是 : 手机号 上行流量 下行流量 左边的图是输出 四列分别代码:手机号 上行流量 下行流量 流量总和 ...

    代码中未作过多的注释,注释和上一篇博客的内容相似

    左边的图  是输入     三列分别是 : 手机号   上行流量  下行流量

    左边的图是输出  四列分别代码:手机号   上行流量   下行流量  流量总和

    展开全文
  • 和流量总和 +---+-------------------+-------------------+-----+ | id| min(start_time)| max(end_time)|flows| +---+-------------------+-------------------+-----+ | 3|2020-02-18 14:39:58|2020-02-18 15:35:...
    | id|         start_time|           end_time|flow|
    +---+-------------------+-------------------+----+
    |  1|2020-02-18 14:20:30|2020-02-18 14:46:30|  20|
    |  1|2020-02-18 14:47:20|2020-02-18 15:20:30|  30|
    |  1|2020-02-18 15:37:23|2020-02-18 16:05:26|  40|
    |  1|2020-02-18 16:06:27|2020-02-18 17:20:49|  50|
    |  1|2020-02-18 17:21:50|2020-02-18 18:03:27|  60|
    |  2|2020-02-18 14:18:24|2020-02-18 15:01:40|  20|
    |  2|2020-02-18 15:20:49|2020-02-18 15:30:24|  30|
    |  2|2020-02-18 16:01:23|2020-02-18 16:40:32|  40|
    |  2|2020-02-18 16:44:56|2020-02-18 17:40:52|  50|
    |  3|2020-02-18 14:39:58|2020-02-18 15:35:53|  20|
    |  3|2020-02-18 15:36:39|2020-02-18 15:24:54|  30|
    +---+-------------------+-------------------+----+
    需求,start_time-上一个end_time如2020-02-18 14:47:20-2020-02-18 14:46:30 > 10的话就让flow相加.
    如果<10的话就不相加,以此类推
     SELECT
        id,
        start_time,
        end_time,
        lag(end_time,1,start_time) over(partition by id order by start_time) as flag,
        flow
        FROM
        v_flow
    先将end_time下压
    +---+-------------------+-------------------+-------------------+----+
    | id|         start_time|           end_time|               flag|flow|
    +---+-------------------+-------------------+-------------------+----+
    |  3|2020-02-18 14:39:58|2020-02-18 15:35:53|2020-02-18 14:39:58|  20|
    |  3|2020-02-18 15:36:39|2020-02-18 15:24:54|2020-02-18 15:35:53|  30|
    |  1|2020-02-18 14:20:30|2020-02-18 14:46:30|2020-02-18 14:20:30|  20|
    |  1|2020-02-18 14:47:20|2020-02-18 15:20:30|2020-02-18 14:46:30|  30|
    |  1|2020-02-18 15:37:23|2020-02-18 16:05:26|2020-02-18 15:20:30|  40|
    |  1|2020-02-18 16:06:27|2020-02-18 17:20:49|2020-02-18 16:05:26|  50|
    |  1|2020-02-18 17:21:50|2020-02-18 18:03:27|2020-02-18 17:20:49|  60|
    |  2|2020-02-18 14:18:24|2020-02-18 15:01:40|2020-02-18 14:18:24|  20|
    |  2|2020-02-18 15:20:49|2020-02-18 15:30:24|2020-02-18 15:01:40|  30|
    |  2|2020-02-18 16:01:23|2020-02-18 16:40:32|2020-02-18 15:30:24|  40|
    |  2|2020-02-18 16:44:56|2020-02-18 17:40:52|2020-02-18 16:40:32|  50|
    +---+-------------------+-------------------+-------------------+----+
    
     SELECT
      id,
      start_time,
      end_time,
      IF((UNIX_TIMESTAMP(start_time)-UNIX_TIMESTAMP(flag))) as fid,
      flow
      FROM
        (
        SELECT
        id,
        start_time,
        end_time,
        lag(end_time,1,start_time) over(partition by id order by start_time) as flag,
        flow
        FROM
        v_flow
        )
     
    将start_time-end_time.这里用到了UNIX_TIMESTAMP转为秒.注意是秒不是毫秒
    +---+-------------------+-------------------+---+----+
    | id|         start_time|           end_time|fid|flow|
    +---+-------------------+-------------------+---+----+
    |  3|2020-02-18 14:39:58|2020-02-18 15:35:53|  0|  20|
    |  3|2020-02-18 15:36:39|2020-02-18 15:24:54|  0|  30|
    |  1|2020-02-18 14:20:30|2020-02-18 14:46:30|  0|  20|
    |  1|2020-02-18 14:47:20|2020-02-18 15:20:30|  0|  30|
    |  1|2020-02-18 15:37:23|2020-02-18 16:05:26|  1|  40|
    |  1|2020-02-18 16:06:27|2020-02-18 17:20:49|  0|  50|
    |  1|2020-02-18 17:21:50|2020-02-18 18:03:27|  0|  60|
    |  2|2020-02-18 14:18:24|2020-02-18 15:01:40|  0|  20|
    |  2|2020-02-18 15:20:49|2020-02-18 15:30:24|  1|  30|
    |  2|2020-02-18 16:01:23|2020-02-18 16:40:32|  1|  40|
    |  2|2020-02-18 16:44:56|2020-02-18 17:40:52|  0|  50|
    +---+-------------------+-------------------+---+----+
    
    
     如果大于10分钟就为1,不大于就为0,那么我们可以让他们相加
    +---+-------------------+-------------------+----+----+
    | id|         start_time|           end_time|fids|flow|
    +---+-------------------+-------------------+----+----+
    |  3|2020-02-18 14:39:58|2020-02-18 15:35:53|   0|  20|
    |  3|2020-02-18 15:36:39|2020-02-18 15:24:54|   0|  30|
    |  1|2020-02-18 14:20:30|2020-02-18 14:46:30|   0|  20|
    |  1|2020-02-18 14:47:20|2020-02-18 15:20:30|   0|  30|
    |  1|2020-02-18 15:37:23|2020-02-18 16:05:26|   1|  40|
    |  1|2020-02-18 16:06:27|2020-02-18 17:20:49|   1|  50|
    |  1|2020-02-18 17:21:50|2020-02-18 18:03:27|   1|  60|
    |  2|2020-02-18 14:18:24|2020-02-18 15:01:40|   0|  20|
    |  2|2020-02-18 15:20:49|2020-02-18 15:30:24|   1|  30|
    |  2|2020-02-18 16:01:23|2020-02-18 16:40:32|   2|  40|
    |  2|2020-02-18 16:44:56|2020-02-18 17:40:52|   2|  50|
    +---+-------------------+-------------------+----+----+
    SELECT
    id,
    min(start_time),
    max(end_time),
    sum(flow)
    FROM
    (
    SELECT
    id,
    start_time,
    end_time,
    sum(fid) over(partition by id order by start_time) as fids,
    flow
    FROM
      (
      SELECT
      id,
      start_time,
      end_time,
      flow,
      IF((unix_timestamp(start_time)-unix_timestamp(flag)) / 60 > 10 ,1,0) as fid
      FROM
        (
         SELECT
         id,
         start_time,
         lag(end_time,1,start_time) over(partition by id order by start_time) flag,
         end_time,
         flow
         from
         v_time
        )t
      )
    )group by id ,fids
    
    
    然后分组,相加求出最小的时间,最大的时间.和流量总和
    +---+-------------------+-------------------+-----+
    | id|    min(start_time)|      max(end_time)|flows|
    +---+-------------------+-------------------+-----+
    |  3|2020-02-18 14:39:58|2020-02-18 15:35:53| 50.0|
    |  1|2020-02-18 14:20:30|2020-02-18 15:20:30| 50.0|
    |  1|2020-02-18 15:37:23|2020-02-18 18:03:27|150.0|
    |  2|2020-02-18 14:18:24|2020-02-18 15:01:40| 20.0|
    |  2|2020-02-18 15:20:49|2020-02-18 15:30:24| 30.0|
    |  2|2020-02-18 16:01:23|2020-02-18 17:40:52| 90.0|
    +---+-------------------+-------------------+-----+
    
    
    
    
    
    

     

    展开全文
  • package com.dong.flow.phone; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable;...public class FlowPhoneBean im...

    package com.dong.flow.phone;

     

    import java.io.DataInput;

    import java.io.DataOutput;

    import java.io.IOException;

     

    import org.apache.hadoop.io.Writable;

     

     

     

     

    public class FlowPhoneBean implements Writable{

        public long upFlow;

        public long downFlow;

        public long sumFlow;

        public long getUpFlow() {

             return upFlow;

        }

       

        public FlowPhoneBean() {

        }

     

        public long getSumFlow() {

             return sumFlow;

        }

        public void setSumFlow(long sumFlow) {

             this.sumFlow = sumFlow;

        }

        public void setUpFlow(long upFlow) {

             this.upFlow = upFlow;

        }

        public long getDownFlow() {

             return downFlow;

        }

        public void setDownFlow(long downFlow) {

             this.downFlow = downFlow;

        }

        public FlowPhoneBean(long upFlow, long downFlow) {

             this.upFlow = upFlow;

             this.downFlow = downFlow;

             this.sumFlow = upFlow+downFlow;

        }

       

    @Override

        public String toString() {

             return "FlowPhoneBean [upFlow=" + upFlow + ", downFlow=" + downFlow

                      + ", sumFlow=" + sumFlow + "]";

        }

        /*

     * 反序列化

     * (non-Javadoc)

     * @see org.apache.hadoop.io.Writable#readFields(java.io.DataInput)

     */

        public void readFields(DataInput arg0) throws IOException {

             // TODO Auto-generated method stub

             upFlow=arg0.readLong();

             downFlow=arg0.readLong();

             sumFlow=arg0.readLong();

            

        }

        /*

         * 序列化

         */

        public void write(DataOutput arg0) throws IOException {

             // TODO Auto-generated method stub

             arg0.writeLong(upFlow);

             arg0.writeLong(downFlow);

             arg0.writeLong(sumFlow);

        }

       

     

    }

    package com.dong.flow.phone;

     

    import java.io.IOException;

     

     

    import org.apache.hadoop.conf.Configuration;

    import org.apache.hadoop.fs.Path;

    import org.apache.hadoop.io.LongWritable;

    import org.apache.hadoop.io.Text;

    import org.apache.hadoop.mapreduce.Job;

    import org.apache.hadoop.mapreduce.Mapper;

    import org.apache.hadoop.mapreduce.Reducer;

    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;

    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;

     

    import com.sun.org.apache.xml.internal.security.encryption.AgreementMethod;

     

    public class FlowStaPhone {

     

        static class FlowMap extends

                 Mapper<LongWritable, Text, Text, FlowPhoneBean> {

             @Override

             protected void map(LongWritable key, Text value, Context context)

                      throws IOException, InterruptedException {

                 String info = value.toString();

                 String[] infospilt = info.split("\t");

                 String phone = infospilt[1];

                 long upFlow = Long.parseLong(infospilt[infospilt.length - 3]);

                 long downFlow = Long.parseLong(infospilt[infospilt.length - 2]);

                 context.write(new Text(phone), new FlowPhoneBean(upFlow, downFlow));

     

             }

        }

     

        static class FlowReduce extends

                 Reducer<Text, FlowPhoneBean, Text, FlowPhoneBean> {

             @Override

             protected void reduce(Text text, Iterable<FlowPhoneBean> iteror,

                      Context context) throws IOException, InterruptedException {

                 long upsum = 0;

                 long downsum = 0;

                 for (FlowPhoneBean flowPhoneBean : iteror) {

                      upsum += flowPhoneBean.getUpFlow();

                      downsum += flowPhoneBean.getDownFlow();

                 }

                 context.write(text, new FlowPhoneBean(upsum, downsum));

     

             }

        }

     

        public static void main(String[] args) throws Exception {

             // TODO Auto-generated method stub

             Configuration configuration = new Configuration();

             Job job=Job.getInstance(configuration);

             job.setJarByClass(FlowPhoneBean.class);

             job.setMapperClass(FlowMap.class);

             job.setReducerClass(FlowReduce.class);

             job.setMapOutputKeyClass(Text.class);

             job.setMapOutputValueClass(FlowPhoneBean.class);

             job.setOutputKeyClass(Text.class);

             job.setOutputValueClass(FlowPhoneBean.class);

             FileInputFormat.setInputPaths(job, new Path(args[0]));

             FileOutputFormat.setOutputPath(job,new Path(args[1]));

             boolean res=job.waitForCompletion(true);

             System.exit(res?0:1);

       

        }

     

    }

    展开全文
  • 1、flow.log计算上行流量和下行流量的排序 1363157985066 13726230503 00-FD-07-A4-72-B8:CMCC 120.196.100.82 i02.c.aliimg....8B-8C-E8-20:7DaysInn 120.197.40.4 122.72.52.12 20 16 4116 1432 200 1363157993055 ...
  • package com.dong.flow.phone; import java.io.DataInput; import java.io.DataOutput; import java.io.IOException; import org.apache.hadoop.io.Writable;...public class FlowPhoneBean im...
  • MapReduce统计上行流量下行流量及流量之数据集需求分析数据需求分析具体操作自定义一个数据类型Map阶段自定义分区Reduce阶段Driver阶段将程序打成jar包在IDEA上打jar包的流程图在集群上运行 数据集需求分析 数据...
  • 思路: Map阶段: (1)读取一行数据,...(1)累加上行流量和下行流量得到总流量。 (2)实现自定义的bean来封装流量信息,并将bean作为map输出的key来传输 (3)MR程序在处理数据的过程中会对数据排序(map输出的...
  • 一、准备 (1)windows可以连接hadoop集群 (2)配置hadoopjdk...(2)抽取手机号、上行流量下行流量 (3)以手机号为key,bean对象为value输出,即context.write(手机号,bean); Reduce阶段: (1)累加上...
  • 计算每个相同IMSI(国际移动用户标识)、TAC(跟踪区域码)的上行流量和下行流量和,总流量和。 需求说明:将S1U数据里面的VOLUME字段(数据流量)和IMSI、TAC两个字段提取出来, 并且按相同IMSI、TAC对VOLUME...
  • 防火墙的上行接口和下行接口分别代表什么呢? 其中上行接口与下行接口同样都有发送流量和接收流量,这样的话两 个接口就有四个流量,这四个流量又分别指什么呢?
  • 分区的默认实现HashPartitioner,它根据key的hashcodeInterger. 在Reduce过程中,可以根据实际需求(比如按某个维度进行归档,类似于数据库的分组),把Map完的数据Reduce到不同的文件中。分区的设置需要与Reduce...
  • 服务器宽带分为上行宽带和下行宽带,那么,哪个方向是上行?哪个方向是下行?哪个快网教你如何区分宽带的上行和下行: 上行宽带和下行宽带的区分 如何区分?我们以服务器为中心,流量流入服务器为下行,流量流出服务器为...
  • 1. MapReduce的输入输出 MapReduce执行流程图 详细图解如下 maptask通过自带的TextInputFormat将数据按照一行一行的读取 , 用每一行的起始偏移量作为k , 每行的内容作为value构成&lt;k1,v1&gt;键值对 ,...
  • 宝塔面板安装好后,首页会显示网络流量,网络流量动态实时显示接口流量数据,那么上行速度和下行速度什么意思?代表什么方向的流量上行速度和下行速度解释宝塔BT面板上行速度和下行速度上行速度:代表服务器向外...
  • 服务器的上行和下行带宽

    万次阅读 2018-02-28 16:48:51
    转载: https://segmentfault.com/q/1010000009242784对于服务器而...所以下载图片消耗的是上行流量,上传消耗的是下行。通常买的服务器,比如阿里云,一般买的带宽指的是上行带宽,下行通常是不限的。而且流量的计...
  • 首先,对于手机来说,支持我们上网的是流量,而这个网络就是所谓的4G网络。无论是联通还是移动来说,都是LTE,只不过一个使用的FDD模式,一个使用TDD模式。那么我们手机如何进行通信的?这个框架弄清楚就可以大致...

空空如也

空空如也

1 2 3 4 5 ... 10
收藏数 187
精华内容 74
关键字:

上行流量和下行流量