精华内容
下载资源
问答
  • 常见的二次营销
    千次阅读
    2018-01-25 14:17:20

    常用开发方式一:基础资料插件
    客户、供应商、物料等基础资料录入界面可以控制编码的生成规则以及任意字段的生成方式。
    常用开发方式二:业务单据插件
    业务单据插件用于进行一些复杂的业务计算。
    常用开发方式三:触发器
    用触发器处理一些简单的数据提取、反写。
    常用开发方式四:存储过程
    用SQL存储过程处理一些复杂的数据提取、反写。如销售出库单表头费用在出库单审核时,自动生成对应的其他应付单等关联单据。
    常用开发方式五:复杂报表
    如一些复杂的对账单,需要通过代码来提取生成,导出至现有格式的EXCEL内或生成个性化很强的报表格式。
    用开发方式六:外部业务数据生成凭证
    一些使用非金蝶ERP的系统,可以把外部业务数据生成金蝶凭证。

    更多相关内容
  • 想必大家都听说过微信二次开发,那么什么是微信二次开发呢?如何进行微信二次开发呢?下面就由艺形艺意工作室创始人黎想将从3W角度为大家详细介绍什么是微信二次开发,如何进行微信二次开发。那么也请大家带着这个...

    近年来,随着微信业务的突飞猛进,微信可谓是开创了中国即时通讯业的又一先河。想必大家都听说过微信二次开发,那么什么是微信二次开发呢?如何进行微信二次开发呢?下面就由艺形艺意工作室创始人黎想将从3W角度为大家详细介绍什么是微信二次开发,如何进行微信二次开发。那么也请大家带着这个问题和我一起看下去!

    因为微信庞大的用户群,间接意义上是企业搭建了一个良好的营销推广平台,所以许多个人、企业都纷纷借助微信开展了属于自己的微信模块业务,最常见的一类便是微商,这也是最基础的一类。

    什么是微信二次开发?

    所谓的微信二次开发,其实就是在微信上面建设自己需要的一些板块,而这些板块是原先并没有的,微信团队留给用户的一块开垦地,也就是微信二次开发。例如大转盘、刮刮乐等这些功能也属于二次开发,微信二次开发是微信营销的营销神器,通过开发一些互动功能,促进客户的粘度,跟客户一对一的交流了解客户最真实的需求。

    为什么要进行微信二次开发?

    如果微信公众号不进行开发,只是用免费的功能每天发布一些广告、新闻、动态长此以往客户潜意识就认为你这打广告没有任何的实力。微信公众号认证后可以加上企业简介、企业动态、产品图片、团队介绍等这些栏目,一:可以向可以展示我们公司的实力,二:可以提高客户对我们的信任度,三:可以和客户进行互动不再是需要你点击1或是2等待系统给你回复了。其次:可以加入微官网、微活动、微服务、微相册、会员系统、留言系统等可以更好的跟客户交流,了解客户的需求以便达成交易。

    微信二次开发的意义何在?

    1、通过微信公众平台的二次开发,将企业品牌展示给微信用户,减少宣传成本,建立企业与消费者、客户的一对一互动和沟通,将消费者接入企业CRM系统,进行促销、推广、宣传、售后等,形成了一种主流的线上线下微信互动营销方式。
    2、再小的个体也有自己品牌,提升企业品牌知名度
    3、顺应时代,积极创新

    微信营销是互联网经济时代企业营销模式的又一次创新,是伴随着微信的火热而兴起的一种新型网络营销模式。微信突破了时间、距离的限制,用户注册微信后,可与周围同样注册的“朋友”形成一种联系,用户订阅自己所需的信息,商家通过提供用户需要的信息,推广自己的产品,从而实现点对点的精准营销。
    微信营销主要体现在商家通过微信公众平台,结合微信会员卡展示商家微官网、微会员、微推送、微支付、微活动,已经形成了一种主流的线上线下微信互动营销方式。

    最后,在和大家分享一下微信二次开发常用功能有哪些?

    (一)微信官网
    1、公司介绍:支持多级分类;
    2、产品展示:产品支持多图显示,手指滑动浏览图片;
    3、新闻资讯:对接微信公众平台消息推送;
    4、联系方式:LBS地图位置标注,点击电话号码直接拨打;
    5、信息推送:以微信官方规定的消息推送为标准

    (二)微信客服
    1、智能客服:实现多个人工客服在线与微信公众平台客户沟通;
    2、LBS位置服务:用户经过微信提供位置,公众平台自动应答离用户最近的地方;
    3、建议/投诉/售后:微信平台内嵌售后服务表单。

    (三)微信商城
    1、在线订购:支持现有商城系统进行对接;
    2、会员系统:支持现有会员系统进行对接;
    3、在线支付:支持在线支付功能,若无需在线支付,则只记录订单信息与流程。

    2018年9月30日,即日起至12月底,用户在韩国、日本、新加坡、泰国、澳大利亚、新西兰等地的微信支付合作门店,再次彰显了微信的未来发展趋势是势不可挡!

    展开全文
  • 二次排序  任务描述 过程分析 代码 执行结果 倒排索引  任务描述 设计思路 代码 执行过程 执行结果   一次排序 熟悉MapReduce的人都知道,排序是MapReduce的天然特性!在数据达到reducer之前...

    目录

    一次排序

    MapReduce的默认排序规则

    Map、Reduce任务中Shuffle和排序的过程

    流程分析

    任务描述

    代码

    执行结果

    二次排序 

    任务描述

    过程分析

    代码

    执行结果

    倒排索引 

    任务描述

    设计思路

    代码

    执行过程

    执行结果


     

    一次排序

    熟悉MapReduce的人都知道,排序是MapReduce的天然特性!在数据达到reducer之前,MapReduce框架已经对这些数据按键排序了。

     

    MapReduce的默认排序规则

    它是按照key值进行排序的,如果key为封装的int为IntWritable类型,那么MapReduce按照数字大小对key排序;

    如果Key为封装String的Text类型,那么MapReduce将按照数据字典顺序对字符排序。

     

    Map、Reduce任务中Shuffle和排序的过程

    流程分析

    1. Map端:

    (1)每个输入分片会让一个map任务来处理,默认情况下,以HDFS的一个块的大小(默认为64M)为一个分片,当然我们也可以设置块的大小。map输出的结果会暂且放在一个环形内存缓冲区中(该缓冲区的大小默认为100M,由io.sort.mb属性控制),当该缓冲区快要溢出时(默认为缓冲区大小的80%,由io.sort.spill.percent属性控制),会在本地文件系统中创建一个溢出文件,将该缓冲区中的数据写入这个文件。

    (2)在写入磁盘之前,线程首先根据reduce任务的数目将数据划分为相同数目的分区,也就是一个reduce任务对应一个分区的数据。这样做是为了避免有些reduce任务分配到大量数据,而有些reduce任务却分到很少数据,甚至没有分到数据的尴尬局面。其实分区就是对数据进行hash的过程。然后对每个分区中的数据进行排序,如果此时设置了Combiner,将排序后的结果进行Combia操作,这样做的目的是让尽可能少的数据写入到磁盘。

    (3)当map任务输出最后一个记录时,可能会有很多的溢出文件,这时需要将这些文件合并。合并的过程中会不断地进行排序和combia操作,目的有两个:①尽量减少每次写入磁盘的数据量。②尽量减少下一复制阶段网络传输的数据量。最后合并成了一个已分区且已排序的文件。为了减少网络传输的数据量,这里可以将数据压缩,只要将mapred.compress.map.out设置为true就可以了。

    (4)将分区中的数据拷贝给相对应的reduce任务。有人可能会问:分区中的数据怎么知道它对应的reduce是哪个呢?其实map任务一直和其父TaskTracker保持联系,而TaskTracker又一直和JobTracker保持心跳。所以JobTracker中保存了整个集群中的宏观信息。只要reduce任务向JobTracker获取对应的map输出位置就ok了哦。

    到这里,map端就分析完了。那到底什么是Shuffle呢?Shuffle的中文意思是“洗牌”,如果我们这样看:一个map产生的数据,结果通过hash过程分区却分配给了不同的reduce任务,是不是一个对数据洗牌的过程呢?

    2.Reduce端:

    (1)Reduce会接收到不同map任务传来的数据,并且每个map传来的数据都是有序的。如果reduce端接受的数据量相当小,则直接存储在内存中(缓冲区大小由mapred.job.shuffle.input.buffer.percent属性控制,表示用作此用途的堆空间的百分比),如果数据量超过了该缓冲区大小的一定比例(由mapred.job.shuffle.merge.percent决定),则对数据合并后溢写到磁盘中。

    (2)随着溢写文件的增多,后台线程会将它们合并成一个更大的有序的文件,这样做是为了给后面的合并节省时间。其实不管在map端还是reduce端,MapReduce都是反复地执行排序,合并操作,现在终于明白了有些人为什么会说:排序是hadoop的灵魂。

    (3)合并的过程中会产生许多的中间文件(写入磁盘了),但MapReduce会让写入磁盘的数据尽可能地少,并且最后一次合并的结果并没有写入磁盘,而是直接输入到reduce函数。

     

    任务描述

    现有用户对商品访问情况的数据文件goods_visit1,包含商品id ,点击次数两个字段,内容以“\t”分割,数据内容如下:

    商品id  点击次数
    1010037	100
    1010102	100
    1010152	97
    1010178	96
    1010280	104
    1010320	103
    1010510	104
    1010603	96
    1010637	97

    要求编写mapreduce程序来对商品点击次数实现由低到高的排序。

     

    代码

    package MapReduce.sort;
    
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    import java.io.IOException;
    
    // goods_visit1中包含(商品id ,点击次数)两个字段,内容以“\t”分割
    // 对商品点击次数由低到高进行排序
    public class OneSort {
    
        public static class Map extends Mapper<Object, Text, IntWritable, Text>{
            private static Text goods=new Text();
            private static IntWritable num=new IntWritable();
            @Override
            protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String line=value.toString();
                String arr[]=line.split("\t");
                num.set(Integer.parseInt(arr[1]));//把要排序的点击次数字段转化为IntWritable类型并设置为key
                goods.set(arr[0]);//商品id字段设置为value
                context.write(num,goods);//输出<key,value>
            }
        }
    
        // 在数据达到reducer之前,MapReduce框架已经按照key值对这些数据按键排序了,就是shuffle()
        // 如果key为封装的int为IntWritable类型,那么MapReduce按照数字大小对key排序
        // 如果Key为封装String的Text类型,那么MapReduce将按照数据字典顺序对字符排序
        // 所以一般在map中把要排序的字段使用IntWritable类型,作为key,不排序的字段作为value
        public static class Reduce extends Reducer<IntWritable, Text, IntWritable, Text>{
            @Override
            protected void reduce(IntWritable key, Iterable <Text> values, Context context) throws IOException, InterruptedException {
                for(Text value : values){
                    context.write(key,value);
                }
            }
        }
    
        public static void main(String[] args) throws Exception {
            Job job = Job.getInstance();
            job.setJobName("OneSort");
            job.setJarByClass(OneSort.class);
    
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
            job.setOutputKeyClass(IntWritable.class);
            job.setOutputValueClass(Text.class);
    
            Path in = new Path("hdfs://localhost:9000/mr/in/goods_visit1");
            Path out = new Path("hdfs://localhost:9000/mr/out/onesort/goods_visit1");
    
            FileInputFormat.addInputPath(job, in);
            FileOutputFormat.setOutputPath(job, out);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }

     

    执行结果

     

     

     

     

    二次排序 

    在mapreduce操作时,shuffle阶段会多次根据key值排序。但是在shuffle分组后,相同key值的values序列的顺序是不确定的。如果想要此时value值也是排序好的,这种需求就是二次排序。

     

    任务描述

    用户对商品的访问情况记录为goods_visit2表,包含(goods_id,click_num)两个字段。 要求编写MapReduce代码,功能为根据商品的点击次数(click_num)进行降序排序,再根据goods_id升序排序,并输出所有商品。

    数据内容如下:

    goods_id click_num
    1010037	100
    1010102	100
    1010152	97
    1010178	96
    1010280	104
    1010320	103
    1010510	104
    1010603	96
    1010637	97

     

    过程分析

    在Map阶段:

    1.使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。本实验中使用的是TextInputFormat,他提供的RecordReder会将文本的字节偏移量作为key,这一行的文本作为value。这就是自定义Map的输入是<LongWritable, Text>的原因。

    2.然后调用自定义Map的map方法,将一个个<LongWritable, Text>键值对输入给Map的map方法。注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。

    3.在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二
    次排序。如果没有通过job.setSortComparatorClass设置key比较函数类,则可以使用key实现的compareTo方法进行排序。


    在Reduce阶段:

    1.reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序。

    2.然后开始构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类。只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,
    而这个迭代器的key使用属于同一个组的所有key的第一个key。

    3.最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器)。同样注意输入与输出的类型必须与自定义的Reducer中声明的一致。

     

    代码

    package MapReduce.sort;
    import java.io.*;
    import java.util.StringTokenizer;
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.*;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.*;
    import org.apache.hadoop.mapreduce.lib.output.*;
    
    // 二次排序
    // goods_visit2表,包含(goods_id,click_num)两个字段
    // 根据商品的点击次数(click_num)进行降序排序,再根据goods_id升序排序,并输出所有商品
    public class SecondarySort
    {
        public static class IntPair implements WritableComparable<IntPair>// 自定义组合key,让类中个每个成员变量都参与计算和比较
        {
            int first;//第一个成员变量
            int second;//第二个成员变量
            public void set(int left, int right) {
                first = left;
                second = right;
            }
            public int getFirst() {
                return first;
            }
            public int getSecond() {
                return second;
            }
    
            @Override
            public void readFields(DataInput in) throws IOException {//反序列化,从流中的二进制转换成IntPair
                first = in.readInt();
                second = in.readInt();
            }
    
            @Override
            public void write(DataOutput out) throws IOException {//序列化,将IntPair转化成使用流传送的二进制
                out.writeInt(first);
                out.writeInt(second);
            }
    
            @Override
            public int compareTo(IntPair o) {// 自定义key比较
                if (first != o.first)
                    return first < o.first ? 1 : -1;
                else if (second != o.second)
                    return second < o.second ? -1 : 1;
                else
                    return 0;
            }
    
            // 由于后面进行了自定义组合key对象的相等比较操作,最好重写hashCode()和equal()方法
            @Override
            public int hashCode(){
                return first * 157 + second;
            }
    
            @Override
            public boolean equals(Object right){
                if (right == null)
                    return false;
                if (this == right)
                    return true;
                if (right instanceof IntPair) {
                    IntPair r = (IntPair) right;
                    return r.first == first && r.second == second;
                }
                else
                    return false;
            }
        }
    
        // 分区函数类代码
        public static class FirstPartitioner extends Partitioner<IntPair, IntWritable>
        {
            @Override
            public int getPartition(IntPair key, IntWritable value,int numPartitions) {
                /**
                 *  数据输入来源:map输出
                 *  @param key map输出键值,自定义组合key
                 *  @param value map输出value值
                 *  @param numPartitions 分区总数,即reduce task个数
                **/
                // 数字的分区写法:
                // 根据自定义key中first(click_num)乘以127取绝对值,再对numPartions取余来进行分区,主要是为实现了第一次排序
                return Math.abs(key.getFirst() * 127) % numPartitions;
            }
        }
    
        // 分组函数类代码,即自定义比较器,自定义二次排序策略
        public static class GroupingComparator extends WritableComparator // 这是一个比较器,需要继承WritableComparator
        {
            protected GroupingComparator() {
                super(IntPair.class, true);
            }
            @Override
            public int compare(WritableComparable w1, WritableComparable w2) {
                // 在reduce阶段,构造一个key对应的value迭代器的时候,只要first相同就属于同一个组,放在一个value迭代器
                IntPair ip1 = (IntPair) w1;
                IntPair ip2 = (IntPair) w2;
                int l = ip1.getFirst();//click_num
                int r = ip2.getFirst();
                return l == r ? 0 : (l < r ? -1 : 1);//比较click_num大小,相等返回0,小于返回-1,大于返回1
            }
        }
    
        // 在Map阶段:
        // 1. 使用job.setInputFormatClass定义的InputFormat将输入的数据集分割成小数据块splites,同时InputFormat提供一个RecordReder的实现。
        //    本实验中使用的是TextInputFormat,他提供的RecordReder会将文本的字节偏移量作为key,这一行的文本作为value。
        //    这就是自定义Map的输入是<LongWritable, Text>的原因。
        // 2. 然后调用自定义Map的map方法,将一个个<LongWritable, Text>键值对输入给Map的map方法。
        //    注意输出应该符合自定义Map中定义的输出<IntPair, IntWritable>。最终是生成一个List<IntPair, IntWritable>。
        // 3. 在map阶段的最后,会先调用job.setPartitionerClass对这个List进行分区,每个分区映射到一个reducer。
        //    每个分区内又调用job.setSortComparatorClass设置的key比较函数类排序。可以看到,这本身就是一个二次排序。
        //    如果没有通过job.setSortComparatorClass设置key比较函数类,则可以使用key实现的compareTo方法进行排序。
    
        // 将map端输出的<key,value>中的key和value组合成一个新的key(称为newKey),value值不变,变成<(key,value),value>
        // 在针对newKey排序的时候,如果key相同,就再对value进行排序。
        public static class Map extends Mapper<LongWritable, Text, IntPair, IntWritable>
        {
            private final IntPair intkey = new IntPair();
            private final IntWritable intvalue = new IntWritable();//相当于int
            public void map(LongWritable key, Text value, Context context) throws IOException, InterruptedException {
                String line = value.toString();
                StringTokenizer tokenizer = new StringTokenizer(line);
                int left = 0;
                int right = 0;
                if (tokenizer.hasMoreTokens())//如果还存在下一个记录
                {
                    left = Integer.parseInt(tokenizer.nextToken());//goods_id
                    if (tokenizer.hasMoreTokens())
                        right = Integer.parseInt(tokenizer.nextToken());//click_num
                    intkey.set(right, left);
                    intvalue.set(left);
                    context.write(intkey, intvalue);//组合为新的键<(key,value),value>,即<(click_num,goods_id),goods_id>
                }
            }
        }
    
    
        // 在Reduce阶段:
        // 1. reducer接收到所有映射到这个reducer的map输出后,也是会调用job.setSortComparatorClass设置的key比较函数类对所有数据对排序
        // 2. 然后开始构造一个key对应的value迭代器。这时就要用到分组,使用job.setGroupingComparatorClass设置的分组函数类
        //    只要这个比较器比较的两个key相同,他们就属于同一个组,它们的value放在一个value迭代器,而这个迭代器的key使用属于同一个组的所有key的第一个key
        // 3. 最后就是进入Reducer的reduce方法,reduce方法的输入是所有的(key和它的value迭代器),同样注意输入与输出的类型必须与自定义的Reducer中声明的一致
        public static class Reduce extends Reducer<IntPair, IntWritable, Text, IntWritable>
        {
            private final Text left = new Text();
            private static final Text SEPARATOR = new Text("------------------------------------------------");
            public void reduce(IntPair key, Iterable<IntWritable> values,Context context) throws IOException, InterruptedException {
                context.write(SEPARATOR, null);
                left.set(Integer.toString(key.getFirst()));//click_num
                for (IntWritable val : values)//goods_id
                    context.write(left, val);
            }
        }
    
        public static void main(String[] args) throws IOException, InterruptedException, ClassNotFoundException {
            Configuration conf = new Configuration();
            Job job = new Job(conf, "SecondarySort");
            job.setJarByClass(SecondarySort.class);
    
            job.setMapperClass(Map.class);
            job.setReducerClass(Reduce.class);
    
            //设置分区函数类,实现第一次排序
            job.setPartitionerClass(FirstPartitioner.class);
    
            // 指定分组排序使用的比较器,默认使用key对象(IntPair)自身的compareTo()方法,实现第二次排序
            job.setGroupingComparatorClass(GroupingComparator.class);
    
            //设置map输出类型
            job.setMapOutputKeyClass(IntPair.class);
            job.setMapOutputValueClass(IntWritable.class);
    
            //设置reduce输出类型
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(IntWritable.class);
    
            job.setInputFormatClass(TextInputFormat.class);
            job.setOutputFormatClass(TextOutputFormat.class);
    
    //        job.setNumReduceTasks(1);//设置reduce  Task的数量,默认是1
    
            String[] otherArgs=new String[]{
                    "hdfs://localhost:9000/mr/in/goods_visit2",
                    "hdfs://localhost:9000/mr/out/secondarysort/goods_visit2"
            };
            FileInputFormat.setInputPaths(job, new Path(otherArgs[0]));
            FileOutputFormat.setOutputPath(job, new Path(otherArgs[1]));
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    

     

    执行结果

     

     

    倒排索引 

    "倒排索引"是文档检索系统中最常用的数据结构,被广泛地应用于全文搜索引擎。它主要是用来存储某个单词(或词组)在一个文档或一组文档中的存储位置的映射,即提供了一种根据内容来查找文档的方式。由于不是根据文档来确定文档所包含的内容,而是进行相反的操作,因而称为倒排索引(Inverted Index)。

    实现"倒排索引"主要关注的信息为:单词、文档URL及词频。

     

    任务描述

    现有3张信息数据表,分别为商品库表goods3,商品访问情况表goods_visit3,订单明细表order_items3,goods表记录了商品的状态数据,goods_visit3记录了商品的点击情况,order_items3记录了用户购买的商品的信息数据,它们的表结构及内容如下:

    goods3(goods_id,goods_status,cat_id,goods_score)

    商品ID 商品状态 分类ID 评分
    1024600	6	52006	0
    1024593	1	52121	0
    1024592	1	52121	0
    1024590	1	52119	0
    1024589	1	52119	0
    1024588	1	52030	0
    1024587	1	52021	0
    1024586	1	52029	0
    1024585	1	52014	0
    1024584	1	52029	0

    goods_visit3(goods_id,click_num)

    商品ID 商品点击次数
    1024600	2
    1024593	0
    1024592	0
    1024590	0
    1024589	0
    1024588	0
    1024587	0
    1024586	0
    1024585	0
    1024584	0

    order_items3(item_id,order_id,goods_id,goods_number,shop_price,goods_price,goods_amount)

    明细ID 订单ID 商品ID 购买数据 商品销售价格 商品最终单价 商品金额
    251688	52107	1024600	1	31.6	31.6	15.8
    252165	52209	1024600	1	31.6	31.6	15.8
    251870	52146	1024481	1	15.6	15.6	7.8
    251935	52158	1024481	1	15.6	15.6	7.8
    252415	52264	1024480	1	69.0	69.0	69.0
    250983	51937	1024480	1	69.0	69.0	69.0
    252609	52299	1024480	1	69.0	69.0	69.0
    251689	52107	1024440	1	31.6	31.6	15.8
    239369	49183	1024256	1	759.0	759.0	759.0
    249222	51513	1024140	1	198.0	198.0	198.0

    要求查询goods_id相同的商品都在哪几张表中,并统计出现了多少次。

     

    设计思路

    (1)Map过程

    首先使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容。显然,Map过程首先必须分析输入的<key,value>对,得到倒排索引中需要的三个信息:单词、文档URL和词频,接着我们对读入的数据利用Map操作进行预处理。如下图所示:

    这里存在两个问题:

    第一,<key,value>对只能有两个值,在不使用Hadoop自定义数据类型的情况下,需要根据情况将其中两个值合并成一个值,作为key或value值。

    第二,通过一个Reduce过程无法同时完成词频统计和生成文档列表,所以必须增加一个Combine过程完成词频统计。

    这里将商品ID和URL组成key值(如"1024600:goods3"),将词频(商品ID出现次数)作为value,这样做的好处是可以利用MapReduce框架自带的Map端排序,将同一文档的相同单词的词频组成列表,传递给Combine过程,实现类似于WordCount的功能。

    (2)Combine过程

    经过map方法处理后,Combine过程将key值相同的value值累加,得到一个单词在文档中的词频,如下图所示。如果直接将下图所示的输出作为Reduce过程的输入,在Shuffle过程时将面临一个问题:所有具有相同单词的记录(由单词、URL和词频组成)应该交由同一个Reducer处理,但当前的key值无法保证这一点,所以必须修改key值和value值。这次将单词(商品ID)作为key值,URL和词频组成value值(如"goods3:1")。这样做的好处是可以利用MapReduce框架默认的HashPartitioner类完成Shuffle过程,将相同单词的所有记录发送给同一个Reducer进行处理。如下图所示:

    (3)Reduce过程

    经过上述两个过程后,Reduce过程只需将相同key值的所有value值组合成倒排索引文件所需的格式即可,剩下的事情就可以直接交给MapReduce框架进行处理了。如下图所示:

     

    代码

    package MapReduce.sort;
    import java.io.IOException;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.*;
    import org.apache.hadoop.mapreduce.lib.input.*;
    import org.apache.hadoop.mapreduce.lib.output.*;
    
    // 倒排索引
    //goods3(goods_id,goods_status,cat_id,goods_score)
    //goods_visit3(goods_id,click_num)
    //order_items3(item_id,order_id,goods_id,goods_number,shop_price,goods_price,goods_amount)
    //查询goods_id相同的商品都在哪几张表,并统计出现了多少次
    public class InvertedIndex {
    
        public static class doMapper extends Mapper<Object, Text, Text, Text>{
            public static Text myKey = new Text();   // 存储单词和URL组合
            public static Text myValue = new Text();  // 存储词频
            //private FileSplit filePath;     // 存储Split对象
            @Override
            protected void map(Object key, Text value, Context context) throws IOException, InterruptedException {
                String filePath=((FileSplit)context.getInputSplit()).getPath().toString();
                System.out.println("filePath= "+filePath);
                System.out.println("传给map的key为 "+key);//偏移量
                System.out.println("传给map的value为 "+value);//文件每行内容
                // Map过程必须分析输入的<key,value>对,得到倒排索引中需要的三个信息:单词、文档URL和词频
                String val[]=value.toString().split("\t");
                if(filePath.contains("goods")){
                    int splitIndex =filePath.indexOf("goods");
                    myKey.set(val[0] + ":" + filePath.substring(splitIndex));
                }else if(filePath.contains("order")){
                    int splitIndex =filePath.indexOf("order");//获取字符串中含有order的起始索引位置
                    //order表中的goods_id位于第三列,即val[2]
                    //以“goods_id:文件名”格式组成key
                    myKey.set(val[2] + ":" + filePath.substring(splitIndex));//获取字符串中指定索引位置开始的子串
                }
                myValue.set("1");
                context.write(myKey, myValue);
                System.out.println("map的key为 "+myKey.toString());
                System.out.println("map的value为 "+myValue.toString());
            }
        }
        // 这里存在两个问题:
        // 第一,<key,value>对只能有两个值,在不使用Hadoop自定义数据类型的情况下,需要根据情况将其中两个值合并成一个值,作为key或value值
        // 第二,通过一个Reduce过程无法同时完成词频统计和生成文档列表,所以必须增加一个Combine过程完成词频统计
    
    
        public static class doCombiner extends Reducer<Text, Text, Text, Text>{
            public static Text myKey = new Text();
            public static Text myValue = new Text();
            @Override
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                //传给combine的key为map中set的myKey,如“1024140:order_items3”
                System.out.println("传给combine的key为 "+key);
                //key可能重复,一个key对应多个value,这些value组成了一个Iterable<Text> values的list,list中每个值都为1
                int sum = 0 ;
                for (Text value : values)
                    sum += Integer.parseInt(value.toString());//将key值相同的value值累加,得到一个单词在文档中的词频
    
                //分隔map传来的key(goods_id:文件名)
                int mysplit = key.toString().indexOf(":");
                myKey.set(key.toString().substring(0, mysplit));//goods_id
                myValue.set(key.toString().substring(mysplit + 1) + ":" + sum);//文件名:词频
                context.write(myKey, myValue);
                System.out.println("combiner key "+myKey.toString());
                System.out.println("combiner value "+myValue.toString());
            }
        }
        // 如果直接将输出作为Reduce过程的输入,在Shuffle过程时将面临一个问题:
        // 所有具有相同单词的记录(由单词、URL和词频组成)应该交由同一个Reducer处理,但当前的key值无法保证这一点,所以必须修改key值和value值
        // 这次将单词(goods_id)作为key值,URL和词频组成value值
        // 这样做的好处是可以利用MapReduce框架默认的HashPartitioner类完成Shuffle过程,将相同单词的所有记录发送给同一个Reducer进行处理
    
    
        public static class doReducer extends Reducer<Text, Text, Text, Text>{
            public static Text myKey = new Text();
            public static Text myValue = new Text();
            @Override
            //经过上述两个过程后,Reduce过程只需将相同key值的value值组合成倒排索引文件所需的格式即可,剩下的事情就可以直接交给MapReduce框架进行处理了。
            protected void reduce(Text key, Iterable<Text> values, Context context) throws IOException, InterruptedException {
                System.out.println("传给reduce的key为 "+key);
                System.out.println("传给reduce的values为 "+values);
                String myList = new String();
                for (Text value : values)
                    myList += value.toString() + ";";
                myKey.set(key);
                myValue.set(myList);
                context.write(myKey, myValue);
                System.out.println("reduce key "+myKey.toString());
                System.out.println("reduce value "+myValue.toString());
            }
        }
    
        public static void main(String[] args) throws IOException, ClassNotFoundException, InterruptedException {
            Job job = Job.getInstance();
            job.setJobName("InversedIndex");
            job.setJarByClass(InvertedIndex.class);
    
            job.setMapperClass(doMapper.class);
            job.setCombinerClass(doCombiner.class);
            job.setReducerClass(doReducer.class);
    
            job.setOutputKeyClass(Text.class);
            job.setOutputValueClass(Text.class);
    
            Path in1 = new Path("hdfs://localhost:9000/mr/in/goods3");
            Path in2 = new Path("hdfs://localhost:9000/mr/in/goods_visit3");
            Path in3 = new Path("hdfs://localhost:9000/mr/in/order_items3");
            Path out = new Path("hdfs://localhost:9000/mr/out/invertedindex");
    
            // 使用默认的TextInputFormat类对输入文件进行处理,得到文本中每行的偏移量及其内容,移交给Map
            FileInputFormat.addInputPath(job, in1);
            FileInputFormat.addInputPath(job, in2);
            FileInputFormat.addInputPath(job, in3);
            FileOutputFormat.setOutputPath(job, out);
    
            System.exit(job.waitForCompletion(true) ? 0 : 1);
        }
    }
    
    

     

    执行过程

    上述代码的执行过程为:
    1. 一个path作为filePath传给map,path中的文件的行偏移量作为传给map的key,path中文件的每行内容作为传给map的value
    2. 以'\t'split value,存储到val[]中
    3. 截取path中的文件名
    4. 根据path判断传给map的是具体哪个文件,根据文件名选择goods_id所在val[]列,设置key为goods_id:文件名,value为1,传给combine
    5. 重复1-4步,直到该文件读取完毕
    6. combine获取map传来的key和values list(key可能重复,一个key对应多个value,这些value组成了一个Iterable<Text> values的list,list中每个值都为1)
    7. 对每个key进行词频统计,遍历values累加其value值,赋给sum
    8. split map传来的key,获取goods_id和文件名
    9. 设置key为goods_id,value为“文件名:sum(词频)”
    10. 重复6-9步,直至map传来的全部combine完毕
    11. 重复1-10步,直至所有文件都combine完毕,将combian的key,value传给reduce
    12. 根据combine传来的key,遍历其values,以分号间隔集成一个String,设置成reduce的value,key不变
    13. 重复12步,直至combine传来的全部reduce完毕
    14. 将结果写入out文件

     

    执行结果

     

    展开全文
  • {管理信息化 ERPMRP}浪 潮 erp 销售常见问题解说 辞参考 浪潮 ERP 销售常见问题解说辞 目录 浪潮 ERP 销售常见问题解说辞 2 目录2 文档说明 5 浪潮 ERP 销售常见问题解说辞 6 一.产品类 6 1.浪潮 PS 产品为什么搞 c/...
  • Hadoop二次排序及MapReduce处理流程实例详解

    万次阅读 多人点赞 2015-01-19 17:17:17
    对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现原理及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的。本文将通过一个实际的MapRe

    一、概述

    MapReduce框架对处理结果的输出会根据key值进行默认的排序,这个默认排序可以满足一部分需求,但是也是十分有限的,在我们实际的需求当中,往往有要对reduce输出结果进行二次排序的需求。对于二次排序的实现,网络上已经有很多人分享过了,但是对二次排序的实现原理及整个MapReduce框架的处理流程的分析还是有非常大的出入,而且部分分析是没有经过验证的。本文将通过一个实际的MapReduce二次排序的例子,讲述二次排序的实现和其MapReduce的整个处理流程,并且通过结果和Map、Reduce端的日志来验证描述的处理流程的正确性。


    二、需求描述

    1.输入数据

    sort1	1
    sort2	3
    sort2	88
    sort2	54
    sort1	2
    sort6	22
    sort6	888
    sort6	58

    2.目标输出

    sort1	1,2
    sort2	3,54,88
    sort6	22,58,888

    三、解决思路

    1.首先,在思考解决问题思路时,我们应该先深刻的理解MapReduce处理数据的整个流程,这是最基础的,不然的话是不可能找到解决问题的思路的。我描述一下MapReduce处理数据的大概流程:首先,MapReduce框架通过getSplits()方法实现对原始文件的切片之后,每一个切片对应着一个MapTask,InputSplit输入到map()函数进行处理,中间结果经过环形缓冲区的排序,然后分区、自定义二次排序(如果有的话)和合并,再通过Shuffle操作将数据传输到reduce Task端,reduce端也存在着缓冲区,数据也会在缓冲区和磁盘中进行合并排序等操作,然后对数据按照key值进行分组,然后每处理完一个分组之后就会去调用一次reduce()函数,最终输出结果。大概流程 我画了一下,如下图:


    2.具体解决思路

    (1):Map端处理

    根据上面的需求,我们有一个非常明确的目标就是要对第一列相同的记录,并且对合并后的数字进行排序。我们都知道MapReduce框架不管是默认排序或者是自定义排序都只是对key值进行排序,现在的情况是这些数据不是key值,怎么办?其实我们可以将原始数据的key值和其对应的数据组合成一个新的key值,然后新的key值对应的value还是原始数据中的valu。那么我们就可以将原始数据的map输出变成类似下面的数据结构:

    {[sort1,1],1}
    {[sort2,3],3}
    {[sort2,88],88}
    {[sort2,54],54}
    {[sort1,2],2}
    {[sort6,22],22}
    {[sort6,888],888}
    {[sort6,58],58}

    那么我们只需要对[]里面的心key值进行排序就OK了,然后我们需要自定义一个分区处理器,因为我的目标不是想将新key相同的记录传到一个reduce中,而是想将新key中第一个字段相同的记录放到同一个reduce中进行分组合并,所以我们需要根据新key值的第一个字段来自定义一个分区处理器。通过分区操作后,得到的数据流如下:

    Partition1:{[sort1,1],1}、{[sort1,2],2}
    
    Partition2:{[sort2,3],3}、{[sort2,88],88}、{[sort2,54],54}
    
    Partition3:{[sort6,22],22}、{[sort6,888],888}、{[sort6,58],58}

    分区操作完成之后,我调用自己的自定义排序器对新的key值进行排序。

    {[sort1,1],1}
    {[sort1,2],2}
    {[sort2,3],3}
    {[sort2,54],54}
    {[sort2,88],88}
    {[sort6,22],22}
    {[sort6,58],58}
    {[sort6,888],888}


    (2).Reduce端处理

    经过Shuffle处理之后,数据传输到Reducer端了。在Reducer端按照组合键的第一个字段进行分组,并且每处理完一次分组之后就会调用一次reduce函数来对这个分组进行处理和输出。最终各个分组的数据结果变成类似下面的数据结构:

    sort1	1,2
    sort2	3,54,88
    sort6	22,58,888


    四、具体实现

    1.自定义组合键

    public class CombinationKey implements WritableComparable<CombinationKey>{
    
    	private Text firstKey;
    	private IntWritable secondKey;
    	
    	//无参构造函数
    	public CombinationKey() {
    		this.firstKey = new Text();
    		this.secondKey = new IntWritable();
    	}
    	
    	//有参构造函数
    	public CombinationKey(Text firstKey, IntWritable secondKey) {
    		this.firstKey = firstKey;
    		this.secondKey = secondKey;
    	}
    
    	public Text getFirstKey() {
    		return firstKey;
    	}
    
    	public void setFirstKey(Text firstKey) {
    		this.firstKey = firstKey;
    	}
    
    	public IntWritable getSecondKey() {
    		return secondKey;
    	}
    
    	public void setSecondKey(IntWritable secondKey) {
    		this.secondKey = secondKey;
    	}
    
    	public void write(DataOutput out) throws IOException {
    		this.firstKey.write(out);
    		this.secondKey.write(out);
    	}
    
    	public void readFields(DataInput in) throws IOException {
    		this.firstKey.readFields(in);
    		this.secondKey.readFields(in);
    	}
    
    	
    	/*public int compareTo(CombinationKey combinationKey) {
    		int minus = this.getFirstKey().compareTo(combinationKey.getFirstKey());
    		if (minus != 0){
    			return minus;
    		}
    		return this.getSecondKey().get() - combinationKey.getSecondKey().get();
    	}*/
    	/**
    	 * 自定义比较策略
    	 * 注意:该比较策略用于MapReduce的第一次默认排序
    	 * 也就是发生在Map端的sort阶段
    	 * 发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整)
    	 */
    	public int compareTo(CombinationKey combinationKey) {
    		System.out.println("------------------------CombineKey flag-------------------");
    		return this.firstKey.compareTo(combinationKey.getFirstKey());
    	}
    
    	@Override
    	public int hashCode() {
    		final int prime = 31;
    		int result = 1;
    		result = prime * result + ((firstKey == null) ? 0 : firstKey.hashCode());
    		return result;
    	}
    
    	@Override
    	public boolean equals(Object obj) {
    		if (this == obj)
    			return true;
    		if (obj == null)
    			return false;
    		if (getClass() != obj.getClass())
    			return false;
    		CombinationKey other = (CombinationKey) obj;
    		if (firstKey == null) {
    			if (other.firstKey != null)
    				return false;
    		} else if (!firstKey.equals(other.firstKey))
    			return false;
    		return true;
    	}
    
    	
    }
    
    说明:在自定义组合键的时候,我们需要特别注意,一定要实现WritableComparable接口,并且实现compareTo()方法的比较策略。这个用于MapReduce的第一次默认排序,也就是发生在Map阶段的sort小阶段,发生地点为环形缓冲区(可以通过io.sort.mb进行大小调整),但是其对我们最终的二次排序结果是没有影响的,我们二次排序的最终结果是由我们的自定义比较器决定的。

    2.自定义分区器

    /**
     * 自定义分区
     * @author 廖钟*民
     * time : 2015年1月19日下午12:13:54
     * @version
     */
    public class DefinedPartition extends Partitioner<CombinationKey, IntWritable>{
    
    	/**
    	 * 数据输入来源:map输出 我们这里根据组合键的第一个值作为分区
    	 * 如果不自定义分区的话,MapReduce会根据默认的Hash分区方法
    	 * 将整个组合键相等的分到一个分区中,这样的话显然不是我们要的效果
    	 * @param key map输出键值
    	 * @param value map输出value值
    	 * @param numPartitions 分区总数,即reduce task个数
    	 */
    	public int getPartition(CombinationKey key, IntWritable value, int numPartitions) {
    		System.out.println("---------------------进入自定义分区---------------------");
    		System.out.println("---------------------结束自定义分区---------------------");
    		return (key.getFirstKey().hashCode() & Integer.MAX_VALUE) % numPartitions;
    	}
    
    }

    3.自定义比较器

    public class DefinedComparator extends WritableComparator{
    
    	protected DefinedComparator() {
    		super(CombinationKey.class,true);
    	}
    
    	/**
    	 * 第一列按升序排列,第二列也按升序排列
    	 */
    	public int compare(WritableComparable a, WritableComparable b) {
    		System.out.println("------------------进入二次排序-------------------");
    		CombinationKey c1 = (CombinationKey) a;
    		CombinationKey c2 = (CombinationKey) b;
    		int minus = c1.getFirstKey().compareTo(c2.getFirstKey());
    		
    		if (minus != 0){
    			System.out.println("------------------结束二次排序-------------------");
    			return minus;
    		} else {
    			System.out.println("------------------结束二次排序-------------------");
    			return c1.getSecondKey().get() -c2.getSecondKey().get();
    		}
    	}
    }
    

    4.自定义分组

    /**
     * 自定义分组有中方式,一种是继承WritableComparator
     * 另外一种是实现RawComparator接口
     * @author 廖*民
     * time : 2015年1月19日下午3:30:11
     * @version
     */
    public class DefinedGroupSort extends WritableComparator{
    
    
    	protected DefinedGroupSort() {
    		super(CombinationKey.class,true);
    	}
    
    	@Override
    	public int compare(WritableComparable a, WritableComparable b) {
    		System.out.println("---------------------进入自定义分组---------------------");
    		CombinationKey combinationKey1 = (CombinationKey) a;
    		CombinationKey combinationKey2 = (CombinationKey) b;
    		System.out.println("---------------------分组结果:" + combinationKey1.getFirstKey().compareTo(combinationKey2.getFirstKey()));
    		System.out.println("---------------------结束自定义分组---------------------");
    		//自定义按原始数据中第一个key分组
    		return combinationKey1.getFirstKey().compareTo(combinationKey2.getFirstKey());
    	}
    
    
    }

    5.主体程序实现

    public class SecondSortMapReduce {
    
    		// 定义输入路径
    		private static final String INPUT_PATH = "hdfs://liaozhongmin:9000/sort_data";
    		// 定义输出路径
    		private static final String OUT_PATH = "hdfs://liaozhongmin:9000/out";
    
    		public static void main(String[] args) {
    
    			try {
    				// 创建配置信息
    				Configuration conf = new Configuration();
    				conf.set(KeyValueLineRecordReader.KEY_VALUE_SEPERATOR, "\t");
    
    				// 创建文件系统
    				FileSystem fileSystem = FileSystem.get(new URI(OUT_PATH), conf);
    				// 如果输出目录存在,我们就删除
    				if (fileSystem.exists(new Path(OUT_PATH))) {
    					fileSystem.delete(new Path(OUT_PATH), true);
    				}
    
    				// 创建任务
    				Job job = new Job(conf, SecondSortMapReduce.class.getName());
    
    				//1.1	设置输入目录和设置输入数据格式化的类
    				FileInputFormat.setInputPaths(job, INPUT_PATH);
    				job.setInputFormatClass(KeyValueTextInputFormat.class);
    
    				//1.2	设置自定义Mapper类和设置map函数输出数据的key和value的类型
    				job.setMapperClass(SecondSortMapper.class);
    				job.setMapOutputKeyClass(CombinationKey.class);
    				job.setMapOutputValueClass(IntWritable.class);
    
    				//1.3	设置分区和reduce数量(reduce的数量,和分区的数量对应,因为分区为一个,所以reduce的数量也是一个)
    				job.setPartitionerClass(DefinedPartition.class);
    				job.setNumReduceTasks(1);
    				
    				//设置自定义分组策略
    				job.setGroupingComparatorClass(DefinedGroupSort.class);
    				//设置自定义比较策略(因为我的CombineKey重写了compareTo方法,所以这个可以省略)
    				job.setSortComparatorClass(DefinedComparator.class);
    				
    				//1.4	排序
    				//1.5	归约
    				//2.1	Shuffle把数据从Map端拷贝到Reduce端。
    				//2.2	指定Reducer类和输出key和value的类型
    				job.setReducerClass(SecondSortReducer.class);
    				job.setOutputKeyClass(Text.class);
    				job.setOutputValueClass(Text.class);
    
    				//2.3	指定输出的路径和设置输出的格式化类
    				FileOutputFormat.setOutputPath(job, new Path(OUT_PATH));
    				job.setOutputFormatClass(TextOutputFormat.class);
    
    
    				// 提交作业 退出
    				System.exit(job.waitForCompletion(true) ? 0 : 1);
    			
    			} catch (Exception e) {
    				e.printStackTrace();
    			}
    		}
    		
    	public static class SecondSortMapper extends Mapper<Text, Text, CombinationKey, IntWritable>{
    		/**
    		 * 这里要特殊说明一下,为什么要将这些变量写在map函数外边
    		 * 对于分布式的程序,我们一定要注意到内存的使用情况,对于MapReduce框架
    		 * 每一行的原始记录的处理都要调用一次map()函数,假设,这个map()函数要处理1一亿
    		 * 条输入记录,如果将这些变量都定义在map函数里面则会导致这4个变量的对象句柄
    		 * 非常的多(极端情况下将产生4*1亿个句柄,当然java也是有自动的GC机制的,一定不会达到这么多)
    		 * 导致栈内存被浪费掉,我们将其写在map函数外面,顶多就只有4个对象句柄
    		 */
    		private CombinationKey combinationKey = new CombinationKey();
    		Text sortName = new Text();
    		IntWritable score = new IntWritable();
    		String[] splits = null;
    		protected void map(Text key, Text value, Mapper<Text, Text, CombinationKey, IntWritable>.Context context) throws IOException, InterruptedException {
    			System.out.println("---------------------进入map()函数---------------------");
    			//过滤非法记录(这里用计数器比较好)
    			if (key == null || value == null || key.toString().equals("")){
    				return;
    			}
    			//构造相关属性
    			sortName.set(key.toString());
    			score.set(Integer.parseInt(value.toString()));
    			//设置联合key
    			combinationKey.setFirstKey(sortName);
    			combinationKey.setSecondKey(score);
    			
    			//通过context把map处理后的结果输出
    			context.write(combinationKey, score);
    			System.out.println("---------------------结束map()函数---------------------");
    		}
    		
    	}
    	
    	
    	public static class SecondSortReducer extends Reducer<CombinationKey, IntWritable, Text, Text>{
    		
    		StringBuffer sb = new StringBuffer();
    		Text score = new Text();
    		/**
    		 * 这里要注意一下reduce的调用时机和次数:
    		 * reduce每次处理一个分组的时候会调用一次reduce函数。
    		 * 所谓的分组就是将相同的key对应的value放在一个集合中
    		 * 例如:<sort1,1> <sort1,2>
    		 * 分组后的结果就是
    		 * <sort1,{1,2}>这个分组会调用一次reduce函数
    		 */
    		protected void reduce(CombinationKey key, Iterable<IntWritable> values, Reducer<CombinationKey, IntWritable, Text, Text>.Context context)
    				throws IOException, InterruptedException {
    			
    			
    			//先清除上一个组的数据
    			sb.delete(0, sb.length());
    			
    			for (IntWritable val : values){
    				sb.append(val.get() + ",");
    			}
    			
    			//取出最后一个逗号
    			if (sb.length() > 0){
    				sb.deleteCharAt(sb.length() - 1);
    			}
    			
    			//设置写出去的value
    			score.set(sb.toString());
    			
    			//将联合Key的第一个元素作为新的key,将score作为value写出去
    			context.write(key.getFirstKey(), score);
    			
    			System.out.println("---------------------进入reduce()函数---------------------");
    			System.out.println("---------------------{[" + key.getFirstKey()+"," + key.getSecondKey() + "],[" +score +"]}");
    			System.out.println("---------------------结束reduce()函数---------------------");
    		}
    	}
    }

    程序运行的结果:


    五、处理流程

    看到前面的代码,都知道我在各个组件上已经设置好了相应的标志,用于追踪整个MapReduce处理二次排序的处理流程。现在让我们分别看看Map端和Reduce端的日志情况。

    (1)Map端日志分析

    15/01/19 15:32:29 INFO input.FileInputFormat: Total input paths to process : 1
    15/01/19 15:32:29 WARN snappy.LoadSnappy: Snappy native library not loaded
    15/01/19 15:32:30 INFO mapred.JobClient: Running job: job_local_0001
    15/01/19 15:32:30 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
    15/01/19 15:32:30 INFO mapred.MapTask: io.sort.mb = 100
    15/01/19 15:32:30 INFO mapred.MapTask: data buffer = 79691776/99614720
    15/01/19 15:32:30 INFO mapred.MapTask: record buffer = 262144/327680
    ---------------------进入map()函数---------------------
    ---------------------进入自定义分区---------------------
    ---------------------结束自定义分区---------------------
    ---------------------结束map()函数---------------------
    ---------------------进入map()函数---------------------
    ---------------------进入自定义分区---------------------
    ---------------------结束自定义分区---------------------
    ---------------------结束map()函数---------------------
    ---------------------进入map()函数---------------------
    ---------------------进入自定义分区---------------------
    ---------------------结束自定义分区---------------------
    ---------------------结束map()函数---------------------
    ---------------------进入map()函数---------------------
    ---------------------进入自定义分区---------------------
    ---------------------结束自定义分区---------------------
    ---------------------结束map()函数---------------------
    ---------------------进入map()函数---------------------
    ---------------------进入自定义分区---------------------
    ---------------------结束自定义分区---------------------
    ---------------------结束map()函数---------------------
    ---------------------进入map()函数---------------------
    ---------------------进入自定义分区---------------------
    ---------------------结束自定义分区---------------------
    ---------------------结束map()函数---------------------
    ---------------------进入map()函数---------------------
    ---------------------进入自定义分区---------------------
    ---------------------结束自定义分区---------------------
    ---------------------结束map()函数---------------------
    ---------------------进入map()函数---------------------
    ---------------------进入自定义分区---------------------
    ---------------------结束自定义分区---------------------
    ---------------------结束map()函数---------------------
    15/01/19 15:32:30 INFO mapred.MapTask: Starting flush of map output
    ------------------进入二次排序-------------------
    ------------------结束二次排序-------------------
    ------------------进入二次排序-------------------
    ------------------结束二次排序-------------------
    ------------------进入二次排序-------------------
    ------------------结束二次排序-------------------
    ------------------进入二次排序-------------------
    ------------------结束二次排序-------------------
    ------------------进入二次排序-------------------
    ------------------结束二次排序-------------------
    ------------------进入二次排序-------------------
    ------------------结束二次排序-------------------
    ------------------进入二次排序-------------------
    ------------------结束二次排序-------------------
    ------------------进入二次排序-------------------
    ------------------结束二次排序-------------------
    ------------------进入二次排序-------------------
    ------------------结束二次排序-------------------
    ------------------进入二次排序-------------------
    ------------------结束二次排序-------------------
    ------------------进入二次排序-------------------
    ------------------结束二次排序-------------------
    ------------------进入二次排序-------------------
    ------------------结束二次排序-------------------
    15/01/19 15:32:30 INFO mapred.MapTask: Finished spill 0
    15/01/19 15:32:30 INFO mapred.Task: Task:attempt_local_0001_m_000000_0 is done. And is in the process of commiting
    15/01/19 15:32:30 INFO mapred.LocalJobRunner: 
    15/01/19 15:32:30 INFO mapred.Task: Task 'attempt_local_0001_m_000000_0' done.
    15/01/19 15:32:30 INFO mapred.Task:  Using ResourceCalculatorPlugin : null
    15/01/19 15:32:30 INFO mapred.LocalJobRunner: 
    从Map端的日志,我们可以很容易的看出来每一条记录开始时进入到map()函数进行处理,处理完了之后立马就自定义分区函数中对其进行分区,当所有输入数据经过map()函数和分区函数处理之后,就调用自定义二次排序函数对其进行排序。

    (2)Reduce端日志分析

    15/01/19 15:32:30 INFO mapred.Merger: Merging 1 sorted segments
    15/01/19 15:32:30 INFO mapred.Merger: Down to the last merge-pass, with 1 segments left of total size: 130 bytes
    15/01/19 15:32:30 INFO mapred.LocalJobRunner: 
    ---------------------进入自定义分组---------------------
    ---------------------分组结果:0
    ---------------------结束自定义分组---------------------
    ---------------------进入自定义分组---------------------
    ---------------------分组结果:-1
    ---------------------结束自定义分组---------------------
    ---------------------进入reduce()函数---------------------
    ---------------------{[sort1,2],[1,2]}
    ---------------------结束reduce()函数---------------------
    ---------------------进入自定义分组---------------------
    ---------------------分组结果:0
    ---------------------结束自定义分组---------------------
    ---------------------进入自定义分组---------------------
    ---------------------分组结果:0
    ---------------------结束自定义分组---------------------
    ---------------------进入自定义分组---------------------
    ---------------------分组结果:-4
    ---------------------结束自定义分组---------------------
    ---------------------进入reduce()函数---------------------
    ---------------------{[sort2,88],[3,54,88]}
    ---------------------结束reduce()函数---------------------
    ---------------------进入自定义分组---------------------
    ---------------------分组结果:0
    ---------------------结束自定义分组---------------------
    ---------------------进入自定义分组---------------------
    ---------------------分组结果:0
    ---------------------结束自定义分组---------------------
    ---------------------进入reduce()函数---------------------
    ---------------------{[sort6,888],[22,58,888]}
    ---------------------结束reduce()函数---------------------
    15/01/19 15:32:30 INFO mapred.Task: Task:attempt_local_0001_r_000000_0 is done. And is in the process of commiting
    15/01/19 15:32:30 INFO mapred.LocalJobRunner: 
    15/01/19 15:32:30 INFO mapred.Task: Task attempt_local_0001_r_000000_0 is allowed to commit now
    15/01/19 15:32:30 INFO output.FileOutputCommitter: Saved output of task 'attempt_local_0001_r_000000_0' to hdfs://liaozhongmin:9000/out
    15/01/19 15:32:30 INFO mapred.LocalJobRunner: reduce > reduce
    15/01/19 15:32:30 INFO mapred.Task: Task 'attempt_local_0001_r_000000_0' done.
    15/01/19 15:32:31 INFO mapred.JobClient:  map 100% reduce 100%
    15/01/19 15:32:31 INFO mapred.JobClient: Job complete: job_local_0001
    15/01/19 15:32:31 INFO mapred.JobClient: Counters: 19
    15/01/19 15:32:31 INFO mapred.JobClient:   File Output Format Counters 
    15/01/19 15:32:31 INFO mapred.JobClient:     Bytes Written=40
    15/01/19 15:32:31 INFO mapred.JobClient:   FileSystemCounters
    15/01/19 15:32:31 INFO mapred.JobClient:     FILE_BYTES_READ=446
    15/01/19 15:32:31 INFO mapred.JobClient:     HDFS_BYTES_READ=140
    15/01/19 15:32:31 INFO mapred.JobClient:     FILE_BYTES_WRITTEN=131394
    15/01/19 15:32:31 INFO mapred.JobClient:     HDFS_BYTES_WRITTEN=40
    15/01/19 15:32:31 INFO mapred.JobClient:   File Input Format Counters 
    15/01/19 15:32:31 INFO mapred.JobClient:     Bytes Read=70
    15/01/19 15:32:31 INFO mapred.JobClient:   Map-Reduce Framework
    15/01/19 15:32:31 INFO mapred.JobClient:     Reduce input groups=3
    15/01/19 15:32:31 INFO mapred.JobClient:     Map output materialized bytes=134
    15/01/19 15:32:31 INFO mapred.JobClient:     Combine output records=0
    15/01/19 15:32:31 INFO mapred.JobClient:     Map input records=8
    15/01/19 15:32:31 INFO mapred.JobClient:     Reduce shuffle bytes=0
    15/01/19 15:32:31 INFO mapred.JobClient:     Reduce output records=3
    15/01/19 15:32:31 INFO mapred.JobClient:     Spilled Records=16
    15/01/19 15:32:31 INFO mapred.JobClient:     Map output bytes=112
    15/01/19 15:32:31 INFO mapred.JobClient:     Total committed heap usage (bytes)=391118848
    15/01/19 15:32:31 INFO mapred.JobClient:     Combine input records=0
    15/01/19 15:32:31 INFO mapred.JobClient:     Map output records=8
    15/01/19 15:32:31 INFO mapred.JobClient:     SPLIT_RAW_BYTES=99
    15/01/19 15:32:31 INFO mapred.JobClient:     Reduce input records=8
    首先,我们看了Reduce端的日志,第一个信息我应该很容易能够很容易看出来,就是分组和reduce()函数处理都是在Shuffle完成之后才进行的。另外一点我们也非常容易看出,就是每次处理完一个分组数据就会去调用一次的reduce()函数对这个分组进行处理和输出。此外,说明一些分组函数的返回值问题,当返回0时才会被分到同一个组中。另外一点我们也可以看出来,一个分组中每合并n个值就会有n-1分组函数返回0值,也就是说进行了n-1次比较。


    六、总结

    本文主要从MapReduce框架执行的流程,去分析了如何去实现二次排序,通过代码进行了实现,并且对整个流程进行了验证。另外,要吐槽一下,网络上有很多文章都记录了MapReudce处理二次排序问题,但是对MapReduce框架整个处理流程的描述错漏很多,而且他们最终的流程描述也没有证据可以支撑。所以,对于网络上的学习资源不能够完全依赖,要融入自己的思想,并且要重要的观点进行代码或者实践的验证。另外,今天在一个hadoop交流群上听到少部分人在讨论,有了hive我们就不用学习些MapReduce程序?对这这个问题我是这么认为:我不相信写不好MapReduce程序的程序员会写好hive语句,最起码的他们对整个执行流程是一无所知的,更不用说性能问题了,有可能连最常见的数据倾斜问题的弄不清楚。


     如果文章写的有问题,欢迎指出,共同学习!




    展开全文
  • 因此一般来说,销售、客服、商务、文案等一线人员,转行比较难。总部的运营、产品、开发等人员,转行难度相对较小。问题Q:不转岗,但仍然很喜欢做分析,可以吗?A:当然可以,实际上业务职能里的高级岗位,都和...
  • 常见需求函数模型商品的最优价格,林承初,,商品的销售价格要依据产品成本和销售情况而定.常见的商品需求函数有线性函数、二次函数、指数函数等.在成本函数为已知的条件下
  • 源码依然开源发布,大家可以尽情二次开发。 产品性能:现在市场上的电话机器人最核心的功能就除了​‌‌有智能电话机器人话术配置,一键导入数据资料,根据时间设置进行外呼。还可以对筛选出意向客户进行按意向度...
  • 比如说生产销售白酒、啤酒的公司股票,其生产和销售的产品白酒、啤酒是用于个人直接消费的就是消费股,相反生产啤酒花、或者酒精的公司,他们生产和销售的产品是直接用于二次生产,不能被个人直接消费,因此不能叫消费股...
  • 网络营销重点(

    千次阅读 2018-07-07 20:46:33
    网络营销重点:1. 网络营销基础服务市场:域名注册 虚拟主机 企业网站建设2. 网络营销推广服务市场:分类目录登录、专业搜索引擎的关键词广告和竞价排名、供求信息发布、专业e-mail策略电子商务平台 推荐网站:...
  • 这些营销活动以电话为基础,一般,银行的客服人员需要联系客户至少一,以此确认客户是否将认购该银行的产品(定期存款)。 因此,与该数据集对应的任务是「分类任务」,「分类目标」是预测客户是(’ 1 ‘)或者否...
  • 想要从事互联网营销的朋友,那么有一些常见名词是不可不记,具体有哪些常见名词呢? CTR (Click Through Rate):点击率 具体公式:CTR=点击量/展现量 运营人员用来分析广告投放创意的质量,如果点击率较低,说明...
  • 常见彩票种类

    千次阅读 2021-05-13 20:17:32
    即开型彩票,全称应该是“即开即兑型彩票”,就是购票者在一个销售点上一完成购票和兑奖全过程的一种彩票。 即开型彩票的优点首先是即开即兑。当你买到彩票后,刮开、撕开或揭开兑奖区后,马上就可以知道
  • 相比其他投入费用过高的营销方式,邮件营销以及低廉高效特点吸引了无数企业竞相使用,一有效的企业电子邮件营销能带来丰厚回报,如何避免EDM营销失败又能高效发挥其效果呢?我们总结出企业实际操作中遇到的常见...
  • 本文介绍六种概率分布的原理、举例、均值、方差、期望、概率分布图等,分布分别为伯努利分布、均匀分布、项分布、正态分布、泊松分布、指数分布;另外介绍各种分布之间的关系和不同
  • 市场营销理论详解

    千次阅读 2022-03-27 19:52:59
    市场营销学1. 市场营销概念2. 市场营销理论3. 营销组合理论3.1 4Ps营销理论3.2 4Cs营销理论3.3 4Rs营销理论 1. 市场营销概念 2. 市场营销理论 3. 营销组合理论 3.1 4Ps营销理论 3.2 4Cs营销理论 3.3 4Rs营销理论 .....
  • 这是涛哥给你推荐的第42篇文来源 :苦逼的码农 | 作者:帅地这个春招估计也要介绍了吧,自己投的公司也不多吧,投简历的时候,如果你提前批和正常网申都投的话,可能会获得两...
  • 七种常见的回归分析

    万次阅读 多人点赞 2020-09-26 10:09:32
    因为在这里我们使用的是的项分布(因变量),我们需要选择一个对于这个分布最佳的连结函数。它就是Logit函数。在上述方程中,通过观测样本的极大似然估计值来选择参数,而不是最小化平方和误差(如在普通回归使用...
  • 病毒营销和裂变营销

    千次阅读 2020-10-21 16:19:34
    病毒营销 ...病毒营销是一种常见的网络营销方法,常用于进行网站推广、品牌推广等。由于这种传播是用户之间自发进行的,因此是几乎不需要费用的网络营销手段。 也就是说,病毒营销是通过提供有价值的
  • 目录用户获取渠道到达量渠道转化率渠道ROI日应用下载量日新增用户数用户获取成本一会话用户数用户活跃活跃用户PV和UV用户会话次数用户访问时长功能使用率用户留存留存率用户流失率退出率和跳出率市场营销用户生命...
  • U8常见问题汇总

    万次阅读 2020-11-26 09:11:39
    U8常见问题汇总 1、帐套如何进行输出 系统管理——账套——输出; 2、帐套如何进行引入 开始——程序——用友ERP-U8——系统管理——操作员admin——密码791121——账套(找到备份的数据)——引入——**账套引入...
  • 数据分析方法有非常多,但大致可以分为两类 第一,营销模型类,第统计方法。 本篇主要是营销类,整理的框架为:定义、模型内容、应用场景 一、5W2H分析法 1、定义 5W2H分析法又叫七问分析法,即what,why,who,when...
  • Elasticsearch聚合学习之:区间聚合

    千次阅读 多人点赞 2019-05-03 01:13:35
    本文是《Elasticsearch聚合学习》系列的第篇,上一篇是我们熟悉了聚合的基本操作,本篇的内容是按照区间聚合的实战操作;
  • 下面就给大家详细拆解七种常见的数据分析法,让我们的数据分析少走弯路。 01 象限分析法 从这张图,你能分析出来什么呢? X轴从左到右是点击率的高低,Y轴从下到上是转化率的高低,形成了4个象限,这就是...
  • 几种常见的开源协议

    千次阅读 2018-11-13 20:53:26
    但”为所欲为”的前提当你发布使用了BSD协议的代码,或则以BSD协议代码为基础做二次开发自己的产品时,需要满足三个条件: (1)如果再发布的产品中包含源代码,则在源代码中必须带有原来代码中的BSD协议。...
  • Hive常见面试题整理(HQL)

    千次阅读 2021-12-04 20:56:49
    Hive常见面试题整理(HQL)
  • 移动安全大黄书《Android应用安全防护和逆向分析》在开售不到一个月第一次印刷就销售完了,紧急开始了第二次印刷,感谢各位同学对本书的支持:关于这本书的内容之前已经介绍了,这里就就简单的大致说一下吧:本书...
  • 常见的属于流失的状态定义示例: ·会员已经退订公司的促销活动; ·会员打电话要求将自己的信息加入通知黑名单; ·会员已经连续6个月没有登录过网站; ·针对会员发送的关怀激励活动中没有任何有效反馈和互动; ·...
  • matlab三维图像变

    千次阅读 2021-05-05 07:09:42
    matlab实验三维图形和三维图形的创建_计算机软件及应用_IT/计算机_专业资料。MATLAB维图形的绘制,图形的标注,三维曲线和曲面图形的绘制。 实验三:维图形和三维图形的创建 一、实验目的 1.掌握维图形的绘制。...
  • 网络营销特点及职能

    千次阅读 2020-09-28 20:51:52
    网络营销特点及功能 一、网络营销的特点 1. 跨时空性 由于互联网能够不受时间和空间的约束来进行信息的交换,所以网络营销可以最大程度地利用时间和空间进行营销活动,使得网络营销可以脱离时空限制进行交易。 2....

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 52,229
精华内容 20,891
热门标签
关键字:

常见的二次营销