精华内容
下载资源
问答
  • 同现矩阵
    2015-07-27 13:47:29
    http://zengzhaozheng.blog.51cto.com/8219051/1557054?utm_source=tuicool
    更多相关内容
  • 昏迷的目标是基于空间数据创建共现矩阵,包括加权共现矩阵( wecoma )和集成共现矩阵( incoma )。 安装 您可以使用以下方法从安装comat的发行版本: install.packages( " comat " ) 您可以使用以下命令从安装...
  • coom.rar_同现矩阵

    2022-07-15 04:34:21
    同现矩阵(或共生矩阵)的求解与显示,输入可以是图像路径,也可以是矩阵,有注释。还有实验报告,伪代码介绍详尽。
  • 为什么同现矩阵*评分矩阵=推荐结果

    千次阅读 2015-10-30 11:17:20
    同现矩阵其实就是 物品与物品之间的关联度, 这个关联度由所有用户对所有物品的评分决定 比如第一条, 说明同时有3个用户喜欢 101 与 102 物品ID1 物品ID2 物品ID1与物品ID2...
    举个用户3对物品102是否感兴趣的例子:
    用户3对所有物品的评分, 有评分说明用户3喜欢这东西
    用户ID
    物品ID1
    用户评分
    3
    101
    2
    3
    102
    0
    3
    103
    0
    3
    104
    4
    3
    105
    4.5
    3
    106
    0
    3
    107
    5


    同现矩阵其实就是 物品与物品之间的关联度, 这个关联度由所有用户对所有物品的评分决定
    比如第一条, 说明同时有3个用户喜欢 101 与 102
    物品ID1
    物品ID2
    物品ID1与物品ID2关联权重
    101
    102
    3
    102
    102
    4
    103
    102
    4
    104
    102
    2
    105
    102
    2
    106
    102
    1
    107
    102
    1



    计算推荐得分,  其意义是: 比如第5行, 用户3很喜欢物品105(给了得了4.5), 同时由其他用户也知道喜欢物品105的对物品102的喜欢程度(得分2),  两者相乘表示用户3因为 喜欢物品105 推算出 同时也喜欢物品102的得分。
    用户ID
    物品ID1
    用户评分
    物品ID2
    物品ID1与物品ID2关联权重
    得分=用户评分*物品ID1与物品ID2关联权重
    3
    101
    2
    102
    3
    2*3=6
    3
    102
    0
    102
    4
    0
    3
    103
    0
    102
    4
    0
    3
    104
    4
    102
    2
    4*2=8
    3
    105
    4.5
    102
    2
    4.5*2=9
    3
    106
    0
    102
    1
    0
    3
    107
    5
    102
    1
    5*1=5


    用户3对物品102 兴趣总得分

    用户ID
    物品ID2
    总得分
    3
    102
    28










    本帖最后由 pig2 于 2014-6-17 19:43 编辑

    问题导读:
    1. 推荐系统概述;
    2. 推荐系统指标设计;
    3. Hadoop并行算法;
    4. 推荐系统架构;
    5. MapReduce程序实现。





    前言
    Netflix电影推荐的百万美金比赛,把“推荐”变成了时下最热门的数据挖掘算法之一。也正是由于Netflix的比赛,让企业界和学科界有了更深层次的技术碰撞。引发了各种网站“推荐”热,个性时代已经到来。

    一、 推荐系统概述
    电子商务网站是个性化推荐系统重要地应用的领域之一,亚马逊就是个性化推荐系统的积极应用者和推广者,亚马逊的推荐系统深入到网站的各类商品,为亚马逊带来了至少30%的销售额。
    不光是电商类,推荐系统无处不在。QQ,人人网的好友推荐;新浪微博的你可能感觉兴趣的人;优酷,土豆的电影推荐;豆瓣的图书推荐;大从点评的餐饮推荐;世纪佳缘的相亲推荐;天际网的职业推荐等。

    推荐算法分类:

    按数据使用划分:
    协同过滤算法:UserCF, ItemCF, ModelCF
    基于内容的推荐: 用户内容属性和物品内容属性
    社会化过滤:基于用户的社会网络关系

    按模型划分:
    最近邻模型:基于距离的协同过滤算法
    Latent Factor Mode(SVD):基于矩阵分解的模型
    Graph:图模型,社会网络图模型

    基于用户的协同过滤算法UserCF
    基于用户的协同过滤,通过不同用户对物品的评分来评测用户之间的相似性,基于用户之间的相似性做出推荐。简单来讲就是:给用户推荐和他兴趣相似的其他用户喜欢的物品。
    用例说明:
     
    算法实现及使用介绍,请参考文章:Mahout推荐算法API详解

    基于物品的协同过滤算法ItemCF
    基于item的协同过滤,通过用户对不同item的评分来评测item之间的相似性,基于item之间的相似性做出推荐。简单来讲就是:给用户推荐和他之前喜欢的物品相似的物品。
    用例说明:

    算法实现及使用介绍,请参考文章:Mahout推荐算法API详解
    注:基于物品的协同过滤算法,是目前商用最广泛的推荐算法。

    协同过滤算法实现,分为2个步骤
    • 1. 计算物品之间的相似度
    • 2. 根据物品的相似度和用户的历史行为给用户生成推荐列表
    有关协同过滤的另一篇文章,请参考:RHadoop实践系列之三 R实现MapReduce的协同过滤算法


    二、 需求分析:推荐系统指标设计
    下面我们将从一个公司案例出发来全面的解释,如何进行推荐系统指标设计。

    案例介绍
    Netflix电影推荐百万奖金比赛,http://www.netflixprize.com/
    Netflix官方网站:www.netflix.com

    Netflix,2006年组织比赛是的时候,是一家以在线电影租赁为生的公司。他们根据网友对电影的打分来判断用户有可能喜欢什么电影,并结合会员看过的电影以及口味偏好设置做出判断,混搭出各种电影风格的需求。
    收集会员的一些信息,为他们指定个性化的电影推荐后,有许多冷门电影竟然进入了候租榜单。从公司的电影资源成本方面考量,热门电影的成本一般较高,如果Netflix公司能够在电影租赁中增加冷门电影的比例,自然能够提升自身盈利能力。
    Netflix公司曾宣称60%左右的会员根据推荐名单定制租赁顺序,如果推荐系统不能准确地猜测会员喜欢的电影类型,容易造成多次租借冷门电影而并不符合个人口味的会员流失。为了更高效地为会员推荐电影,Netflix一直致力于不断改进和完善个性化推荐服务,在2006年推出百万美元大奖,无论是谁能最好地优化Netflix推荐算法就可获奖励100万美元。到2009年,奖金被一个7人开发小组夺得,Netflix随后又立即推出第二个百万美金悬赏。这充分说明一套好的推荐算法系统是多么重要,同时又是多么困难。
     
    上图为比赛的各支队伍的排名!

    补充说明:
    1. Netflix的比赛是基于静态数据的,就是给定“训练级”,匹配“结果集”,“结果集”也是提前就做好的,所以这与我们每天运营的系统,其实是不一样的。
    2. Netflix用于比赛的数据集是小量的,整个全集才666MB,而实际的推荐系统都要基于大量历史数据的,动不动就会上GB,TB等

    所以,我们在真实的环境中设计推荐的时候,要全面考量数据量,算法性能,结果准确度等的指标。

    推荐算法选型:基于物品的协同过滤算法ItemCF,并行实现
    数据量:基于Hadoop架构,支持GB,TB,PB级数据量
    算法检验:可以通过 准确率,召回率,覆盖率,流行度 等指标评判。
    结果解读:通过ItemCF的定义,合理给出结果解释

    三、 算法模型:Hadoop并行算法
    这里我使用”Mahout In Action”书里,第一章第六节介绍的分步式基于物品的协同过滤算法进行实现。Chapter 6: Distributing recommendation computations
    测试数据集:small.csv

    1. 1,101,5.0
    2. 1,102,3.0
    3. 1,103,2.5
    4. 2,101,2.0
    5. 2,102,2.5
    6. 2,103,5.0
    7. 2,104,2.0
    8. 3,101,2.0
    9. 3,104,4.0
    10. 3,105,4.5
    11. 3,107,5.0
    12. 4,101,5.0
    13. 4,103,3.0
    14. 4,104,4.5
    15. 4,106,4.0
    16. 5,101,4.0
    17. 5,102,3.0
    18. 5,103,2.0
    19. 5,104,4.0
    20. 5,105,3.5
    21. 5,106,4.0
    复制代码


    每行3个字段,依次是用户ID,电影ID,用户对电影的评分(0-5分,每0.5为一个评分点!)
    算法的思想:
    1. 建立物品的同现矩阵
    2. 建立用户对物品的评分矩阵
    3. 矩阵计算推荐结果

    1). 建立物品的同现矩阵
    按用户分组,找到每个用户所选的物品,单独出现计数及两两一组计数。

    1.         [101] [102] [103] [104] [105] [106] [107]
    2. [101]    5      3      4      4       2       2      1
    3. [102]    3      3      3      2       1       1      0
    4. [103]    4      3      4      3       1       2      0
    5. [104]    4      2      3      4       2       2      1
    6. [105]    2      1      1      2       2       1      1
    7. [106]    2      1      2      2       1       2      0
    8. [107]    1      0      0      1       1       0      1
    复制代码



    2). 建立用户对物品的评分矩阵
    按用户分组,找到每个用户所选的物品及评分
         
    1.         U3
    2. [101] 2.0
    3. [102] 0.0
    4. [103] 0.0
    5. [104] 4.0
    6. [105] 4.5
    7. [106] 0.0
    8. [107] 5.0
    复制代码



    3). 矩阵计算推荐结果
    同现矩阵*评分矩阵=推荐结果
     
    图片摘自”Mahout In Action”

    MapReduce任务设计
     
    图片摘自”Mahout In Action”

    解读MapRduce任务:
    步骤1: 按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵
    步骤2: 对物品组合列表进行计数,建立物品的同现矩阵
    步骤3: 合并同现矩阵和评分矩阵
    步骤4: 计算推荐结果列表

    四、 架构设计:推荐系统架构







    上图中,左边是Application业务系统,右边是Hadoop的HDFS, MapReduce。

    • 业务系统记录了用户的行为和对物品的打分
    • 设置系统定时器CRON,每xx小时,增量向HDFS导入数据(userid,itemid,value,time)。
    • 完成导入后,设置系统定时器,启动MapReduce程序,运行推荐算法。
    • 完成计算后,设置系统定时器,从HDFS导出推荐结果数据到数据库,方便以后的及时查询。


    五、 程序开发:MapReduce程序实现
    win7的开发环境 和 Hadoop的运行环境 ,请参考文章:用Maven构建Hadoop项目
    新建Java类:
    Recommend.java,主任务启动程序
    Step1.java,按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵
    Step2.java,对物品组合列表进行计数,建立物品的同现矩阵
    Step3.java,合并同现矩阵和评分矩阵
    Step4.java,计算推荐结果列表
    HdfsDAO.java,HDFS操作工具类

    1). Recommend.java,主任务启动程序
    源代码:
    1. package org.conan.myhadoop.recommend;

    2. import java.util.HashMap;
    3. import java.util.Map;
    4. import java.util.regex.Pattern;

    5. import org.apache.hadoop.mapred.JobConf;

    6. public class Recommend {

    7.     public static final String HDFS = "hdfs://192.168.1.210:9000";
    8.     public static final Pattern DELIMITER = Pattern.compile("[\t,]");

    9.     public static void main(String[] args) throws Exception {
    10.         Map<String, String> path = new HashMap<String, String>();
    11.         path.put("data", "logfile/small.csv");
    12.         path.put("Step1Input", HDFS + "/user/hdfs/recommend");
    13.         path.put("Step1Output", path.get("Step1Input") + "/step1");
    14.         path.put("Step2Input", path.get("Step1Output"));
    15.         path.put("Step2Output", path.get("Step1Input") + "/step2");
    16.         path.put("Step3Input1", path.get("Step1Output"));
    17.         path.put("Step3Output1", path.get("Step1Input") + "/step3_1");
    18.         path.put("Step3Input2", path.get("Step2Output"));
    19.         path.put("Step3Output2", path.get("Step1Input") + "/step3_2");
    20.         path.put("Step4Input1", path.get("Step3Output1"));
    21.         path.put("Step4Input2", path.get("Step3Output2"));
    22.         path.put("Step4Output", path.get("Step1Input") + "/step4");

    23.         Step1.run(path);
    24.         Step2.run(path);
    25.         Step3.run1(path);
    26.         Step3.run2(path);
    27.         Step4.run(path);
    28.         System.exit(0);
    29.     }

    30.     public static JobConf config() {
    31.         JobConf conf = new JobConf(Recommend.class);
    32.         conf.setJobName("Recommend");
    33.         conf.addResource("classpath:/hadoop/core-site.xml");
    34.         conf.addResource("classpath:/hadoop/hdfs-site.xml");
    35.         conf.addResource("classpath:/hadoop/mapred-site.xml");
    36.         return conf;
    37.     }

    38. }
    复制代码


    1. </blockquote></div></font></div><div align="left" style="font-size: 13px;"><font color="#4d4d4f"><b>2). Step1.java,按用户分组,计算所有物品出现的组合列表,得到用户对物品的评分矩阵</b></font></div>
    2. <div align="left" style="font-size: 13px;"><font color="#4d4d4f">源代码:</font></div><div align="left" style="font-size: 13px;"><span style="color: rgb(77, 77, 79); line-height: 1.5;"><div class="blockcode"><blockquote>package org.conan.myhadoop.recommend;

    3. import java.io.IOException;
    4. import java.util.Iterator;
    5. import java.util.Map;

    6. import org.apache.hadoop.fs.Path;
    7. import org.apache.hadoop.io.IntWritable;
    8. import org.apache.hadoop.io.Text;
    9. import org.apache.hadoop.mapred.FileInputFormat;
    10. import org.apache.hadoop.mapred.FileOutputFormat;
    11. import org.apache.hadoop.mapred.JobClient;
    12. import org.apache.hadoop.mapred.JobConf;
    13. import org.apache.hadoop.mapred.MapReduceBase;
    14. import org.apache.hadoop.mapred.Mapper;
    15. import org.apache.hadoop.mapred.OutputCollector;
    16. import org.apache.hadoop.mapred.Reducer;
    17. import org.apache.hadoop.mapred.Reporter;
    18. import org.apache.hadoop.mapred.RunningJob;
    19. import org.apache.hadoop.mapred.TextInputFormat;
    20. import org.apache.hadoop.mapred.TextOutputFormat;
    21. import org.conan.myhadoop.hdfs.HdfsDAO;

    22. public class Step1 {

    23.     public static class Step1_ToItemPreMapper extends MapReduceBase implements Mapper<Object, Text, IntWritable, Text> {
    24.         private final static IntWritable k = new IntWritable();
    25.         private final static Text v = new Text();

    26.         @Override
    27.         public void map(Object key, Text value, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
    28.             String[] tokens = Recommend.DELIMITER.split(value.toString());
    29.             int userID = Integer.parseInt(tokens[0]);
    30.             String itemID = tokens[1];
    31.             String pref = tokens[2];
    32.             k.set(userID);
    33.             v.set(itemID + ":" + pref);
    34.             output.collect(k, v);
    35.         }
    36.     }

    37.     public static class Step1_ToUserVectorReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> {
    38.         private final static Text v = new Text();

    39.         @Override
    40.         public void reduce(IntWritable key, Iterator values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
    41.             StringBuilder sb = new StringBuilder();
    42.             while (values.hasNext()) {
    43.                 sb.append("," + values.next());
    44.             }
    45.             v.set(sb.toString().replaceFirst(",", ""));
    46.             output.collect(key, v);
    47.         }
    48.     }

    49.     public static void run(Map<String, String> path) throws IOException {
    50.         JobConf conf = Recommend.config();

    51.         String input = path.get("Step1Input");
    52.         String output = path.get("Step1Output");

    53.         HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
    54.         hdfs.rmr(input);
    55.         hdfs.mkdirs(input);
    56.         hdfs.copyFile(path.get("data"), input);

    57.         conf.setMapOutputKeyClass(IntWritable.class);
    58.         conf.setMapOutputValueClass(Text.class);

    59.         conf.setOutputKeyClass(IntWritable.class);
    60.         conf.setOutputValueClass(Text.class);

    61.         conf.setMapperClass(Step1_ToItemPreMapper.class);
    62.         conf.setCombinerClass(Step1_ToUserVectorReducer.class);
    63.         conf.setReducerClass(Step1_ToUserVectorReducer.class);

    64.         conf.setInputFormat(TextInputFormat.class);
    65.         conf.setOutputFormat(TextOutputFormat.class);

    66.         FileInputFormat.setInputPaths(conf, new Path(input));
    67.         FileOutputFormat.setOutputPath(conf, new Path(output));

    68.         RunningJob job = JobClient.runJob(conf);
    69.         while (!job.isComplete()) {
    70.             job.waitForCompletion();
    71.         }
    72.     }

    73. }
    复制代码



    计算结果:
    1. ~ hadoop fs -cat /user/hdfs/recommend/step1/part-00000

    2. 1       102:3.0,103:2.5,101:5.0
    3. 2       101:2.0,102:2.5,103:5.0,104:2.0
    4. 3       107:5.0,101:2.0,104:4.0,105:4.5
    5. 4       101:5.0,103:3.0,104:4.5,106:4.0
    6. 5       101:4.0,102:3.0,103:2.0,104:4.0,105:3.5,106:4.0
    复制代码



    3). Step2.java,对物品组合列表进行计数,建立物品的同现矩阵
    源代码:

    1. package org.conan.myhadoop.recommend;

    2. import java.io.IOException;
    3. import java.util.Iterator;
    4. import java.util.Map;

    5. import org.apache.hadoop.fs.Path;
    6. import org.apache.hadoop.io.IntWritable;
    7. import org.apache.hadoop.io.LongWritable;
    8. import org.apache.hadoop.io.Text;
    9. import org.apache.hadoop.mapred.FileInputFormat;
    10. import org.apache.hadoop.mapred.FileOutputFormat;
    11. import org.apache.hadoop.mapred.JobClient;
    12. import org.apache.hadoop.mapred.JobConf;
    13. import org.apache.hadoop.mapred.MapReduceBase;
    14. import org.apache.hadoop.mapred.Mapper;
    15. import org.apache.hadoop.mapred.OutputCollector;
    16. import org.apache.hadoop.mapred.Reducer;
    17. import org.apache.hadoop.mapred.Reporter;
    18. import org.apache.hadoop.mapred.RunningJob;
    19. import org.apache.hadoop.mapred.TextInputFormat;
    20. import org.apache.hadoop.mapred.TextOutputFormat;
    21. import org.conan.myhadoop.hdfs.HdfsDAO;

    22. public class Step2 {
    23.     public static class Step2_UserVectorToCooccurrenceMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    24.         private final static Text k = new Text();
    25.         private final static IntWritable v = new IntWritable(1);

    26.         @Override
    27.         public void map(LongWritable key, Text values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    28.             String[] tokens = Recommend.DELIMITER.split(values.toString());
    29.             for (int i = 1; i < tokens.length; i++) {
    30.                 String itemID = tokens[i].split(":")[0];
    31.                 for (int j = 1; j < tokens.length; j++) {
    32.                     String itemID2 = tokens[j].split(":")[0];
    33.                     k.set(itemID + ":" + itemID2);
    34.                     output.collect(k, v);
    35.                 }
    36.             }
    37.         }
    38.     }

    39.     public static class Step2_UserVectorToConoccurrenceReducer extends MapReduceBase implements Reducer<Text, IntWritable, Text, IntWritable> {
    40.         private IntWritable result = new IntWritable();

    41.         @Override
    42.         public void reduce(Text key, Iterator values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    43.             int sum = 0;
    44.             while (values.hasNext()) {
    45.                 sum += values.next().get();
    46.             }
    47.             result.set(sum);
    48.             output.collect(key, result);
    49.         }
    50.     }

    51.     public static void run(Map<String, String> path) throws IOException {
    52.         JobConf conf = Recommend.config();

    53.         String input = path.get("Step2Input");
    54.         String output = path.get("Step2Output");

    55.         HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
    56.         hdfs.rmr(output);

    57.         conf.setOutputKeyClass(Text.class);
    58.         conf.setOutputValueClass(IntWritable.class);

    59.         conf.setMapperClass(Step2_UserVectorToCooccurrenceMapper.class);
    60.         conf.setCombinerClass(Step2_UserVectorToConoccurrenceReducer.class);
    61.         conf.setReducerClass(Step2_UserVectorToConoccurrenceReducer.class);

    62.         conf.setInputFormat(TextInputFormat.class);
    63.         conf.setOutputFormat(TextOutputFormat.class);

    64.         FileInputFormat.setInputPaths(conf, new Path(input));
    65.         FileOutputFormat.setOutputPath(conf, new Path(output));

    66.         RunningJob job = JobClient.runJob(conf);
    67.         while (!job.isComplete()) {
    68.             job.waitForCompletion();
    69.         }
    70.     }
    71. }
    复制代码



    计算结果:
    1. ~ hadoop fs -cat /user/hdfs/recommend/step2/part-00000

    2. 101:101 5
    3. 101:102 3
    4. 101:103 4
    5. 101:104 4
    6. 101:105 2
    7. 101:106 2
    8. 101:107 1
    9. 102:101 3
    10. 102:102 3
    11. 102:103 3
    12. 102:104 2
    13. 102:105 1
    14. 102:106 1
    15. 103:101 4
    16. 103:102 3
    17. 103:103 4
    18. 103:104 3
    19. 103:105 1
    20. 103:106 2
    21. 104:101 4
    22. 104:102 2
    23. 104:103 3
    24. 104:104 4
    25. 104:105 2
    26. 104:106 2
    27. 104:107 1
    28. 105:101 2
    29. 105:102 1
    30. 105:103 1
    31. 105:104 2
    32. 105:105 2
    33. 105:106 1
    34. 105:107 1
    35. 106:101 2
    36. 106:102 1
    37. 106:103 2
    38. 106:104 2
    39. 106:105 1
    40. 106:106 2
    41. 107:101 1
    42. 107:104 1
    43. 107:105 1
    44. 107:107 1
    复制代码


    4). Step3.java,合并同现矩阵和评分矩阵
    源代码:

    1. package org.conan.myhadoop.recommend;

    2. import java.io.IOException;
    3. import java.util.Map;

    4. import org.apache.hadoop.fs.Path;
    5. import org.apache.hadoop.io.IntWritable;
    6. import org.apache.hadoop.io.LongWritable;
    7. import org.apache.hadoop.io.Text;
    8. import org.apache.hadoop.mapred.FileInputFormat;
    9. import org.apache.hadoop.mapred.FileOutputFormat;
    10. import org.apache.hadoop.mapred.JobClient;
    11. import org.apache.hadoop.mapred.JobConf;
    12. import org.apache.hadoop.mapred.MapReduceBase;
    13. import org.apache.hadoop.mapred.Mapper;
    14. import org.apache.hadoop.mapred.OutputCollector;
    15. import org.apache.hadoop.mapred.Reporter;
    16. import org.apache.hadoop.mapred.RunningJob;
    17. import org.apache.hadoop.mapred.TextInputFormat;
    18. import org.apache.hadoop.mapred.TextOutputFormat;
    19. import org.conan.myhadoop.hdfs.HdfsDAO;

    20. public class Step3 {

    21.     public static class Step31_UserVectorSplitterMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {
    22.         private final static IntWritable k = new IntWritable();
    23.         private final static Text v = new Text();

    24.         @Override
    25.         public void map(LongWritable key, Text values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
    26.             String[] tokens = Recommend.DELIMITER.split(values.toString());
    27.             for (int i = 1; i < tokens.length; i++) {
    28.                 String[] vector = tokens[i].split(":");
    29.                 int itemID = Integer.parseInt(vector[0]);
    30.                 String pref = vector[1];

    31.                 k.set(itemID);
    32.                 v.set(tokens[0] + ":" + pref);
    33.                 output.collect(k, v);
    34.             }
    35.         }
    36.     }

    37.     public static void run1(Map<String, String> path) throws IOException {
    38.         JobConf conf = Recommend.config();

    39.         String input = path.get("Step3Input1");
    40.         String output = path.get("Step3Output1");

    41.         HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
    42.         hdfs.rmr(output);

    43.         conf.setOutputKeyClass(IntWritable.class);
    44.         conf.setOutputValueClass(Text.class);

    45.         conf.setMapperClass(Step31_UserVectorSplitterMapper.class);

    46.         conf.setInputFormat(TextInputFormat.class);
    47.         conf.setOutputFormat(TextOutputFormat.class);

    48.         FileInputFormat.setInputPaths(conf, new Path(input));
    49.         FileOutputFormat.setOutputPath(conf, new Path(output));

    50.         RunningJob job = JobClient.runJob(conf);
    51.         while (!job.isComplete()) {
    52.             job.waitForCompletion();
    53.         }
    54.     }

    55.     public static class Step32_CooccurrenceColumnWrapperMapper extends MapReduceBase implements Mapper<LongWritable, Text, Text, IntWritable> {
    56.         private final static Text k = new Text();
    57.         private final static IntWritable v = new IntWritable();

    58.         @Override
    59.         public void map(LongWritable key, Text values, OutputCollector<Text, IntWritable> output, Reporter reporter) throws IOException {
    60.             String[] tokens = Recommend.DELIMITER.split(values.toString());
    61.             k.set(tokens[0]);
    62.             v.set(Integer.parseInt(tokens[1]));
    63.             output.collect(k, v);
    64.         }
    65.     }

    66.     public static void run2(Map<String, String> path) throws IOException {
    67.         JobConf conf = Recommend.config();

    68.         String input = path.get("Step3Input2");
    69.         String output = path.get("Step3Output2");

    70.         HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
    71.         hdfs.rmr(output);

    72.         conf.setOutputKeyClass(Text.class);
    73.         conf.setOutputValueClass(IntWritable.class);

    74.         conf.setMapperClass(Step32_CooccurrenceColumnWrapperMapper.class);

    75.         conf.setInputFormat(TextInputFormat.class);
    76.         conf.setOutputFormat(TextOutputFormat.class);

    77.         FileInputFormat.setInputPaths(conf, new Path(input));
    78.         FileOutputFormat.setOutputPath(conf, new Path(output));

    79.         RunningJob job = JobClient.runJob(conf);
    80.         while (!job.isComplete()) {
    81.             job.waitForCompletion();
    82.         }
    83.     }

    84. }
    复制代码



    计算结果:

    1. ~ hadoop fs -cat /user/hdfs/recommend/step3_1/part-00000

    2. 101     5:4.0
    3. 101     1:5.0
    4. 101     2:2.0
    5. 101     3:2.0
    6. 101     4:5.0
    7. 102     1:3.0
    8. 102     5:3.0
    9. 102     2:2.5
    10. 103     2:5.0
    11. 103     5:2.0
    12. 103     1:2.5
    13. 103     4:3.0
    14. 104     2:2.0
    15. 104     5:4.0
    16. 104     3:4.0
    17. 104     4:4.5
    18. 105     3:4.5
    19. 105     5:3.5
    20. 106     5:4.0
    21. 106     4:4.0
    22. 107     3:5.0

    23. ~ hadoop fs -cat /user/hdfs/recommend/step3_2/part-00000

    24. 101:101 5
    25. 101:102 3
    26. 101:103 4
    27. 101:104 4
    28. 101:105 2
    29. 101:106 2
    30. 101:107 1
    31. 102:101 3
    32. 102:102 3
    33. 102:103 3
    34. 102:104 2
    35. 102:105 1
    36. 102:106 1
    37. 103:101 4
    38. 103:102 3
    39. 103:103 4
    40. 103:104 3
    41. 103:105 1
    42. 103:106 2
    43. 104:101 4
    44. 104:102 2
    45. 104:103 3
    46. 104:104 4
    47. 104:105 2
    48. 104:106 2
    49. 104:107 1
    50. 105:101 2
    51. 105:102 1
    52. 105:103 1
    53. 105:104 2
    54. 105:105 2
    55. 105:106 1
    56. 105:107 1
    57. 106:101 2
    58. 106:102 1
    59. 106:103 2
    60. 106:104 2
    61. 106:105 1
    62. 106:106 2
    63. 107:101 1
    64. 107:104 1
    65. 107:105 1
    66. 107:107 1
    复制代码



    5). Step4.java,计算推荐结果列表
    源代码:

    1. package org.conan.myhadoop.recommend;

    2. import java.io.IOException;
    3. import java.util.ArrayList;
    4. import java.util.HashMap;
    5. import java.util.Iterator;
    6. import java.util.List;
    7. import java.util.Map;

    8. import org.apache.hadoop.fs.Path;
    9. import org.apache.hadoop.io.IntWritable;
    10. import org.apache.hadoop.io.LongWritable;
    11. import org.apache.hadoop.io.Text;
    12. import org.apache.hadoop.mapred.FileInputFormat;
    13. import org.apache.hadoop.mapred.FileOutputFormat;
    14. import org.apache.hadoop.mapred.JobClient;
    15. import org.apache.hadoop.mapred.JobConf;
    16. import org.apache.hadoop.mapred.MapReduceBase;
    17. import org.apache.hadoop.mapred.Mapper;
    18. import org.apache.hadoop.mapred.OutputCollector;
    19. import org.apache.hadoop.mapred.Reducer;
    20. import org.apache.hadoop.mapred.Reporter;
    21. import org.apache.hadoop.mapred.RunningJob;
    22. import org.apache.hadoop.mapred.TextInputFormat;
    23. import org.apache.hadoop.mapred.TextOutputFormat;
    24. import org.conan.myhadoop.hdfs.HdfsDAO;

    25. public class Step4 {

    26.     public static class Step4_PartialMultiplyMapper extends MapReduceBase implements Mapper<LongWritable, Text, IntWritable, Text> {
    27.         private final static IntWritable k = new IntWritable();
    28.         private final static Text v = new Text();

    29.         private final static Map<Integer, List> cooccurrenceMatrix = new HashMap<Integer, List>();

    30.         @Override
    31.         public void map(LongWritable key, Text values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
    32.             String[] tokens = Recommend.DELIMITER.split(values.toString());

    33.             String[] v1 = tokens[0].split(":");
    34.             String[] v2 = tokens[1].split(":");

    35.             if (v1.length > 1) {// cooccurrence
    36.                 int itemID1 = Integer.parseInt(v1[0]);
    37.                 int itemID2 = Integer.parseInt(v1[1]);
    38.                 int num = Integer.parseInt(tokens[1]);

    39.                 List list = null;
    40.                 if (!cooccurrenceMatrix.containsKey(itemID1)) {
    41.                     list = new ArrayList();
    42.                 } else {
    43.                     list = cooccurrenceMatrix.get(itemID1);
    44.                 }
    45.                 list.add(new Cooccurrence(itemID1, itemID2, num));
    46.                 cooccurrenceMatrix.put(itemID1, list);
    47.             }

    48.             if (v2.length > 1) {// userVector
    49.                 int itemID = Integer.parseInt(tokens[0]);
    50.                 int userID = Integer.parseInt(v2[0]);
    51.                 double pref = Double.parseDouble(v2[1]);
    52.                 k.set(userID);
    53.                 for (Cooccurrence co : cooccurrenceMatrix.get(itemID)) {
    54.                     v.set(co.getItemID2() + "," + pref * co.getNum());
    55.                     output.collect(k, v);
    56.                 }

    57.             }
    58.         }
    59.     }

    60.     public static class Step4_AggregateAndRecommendReducer extends MapReduceBase implements Reducer<IntWritable, Text, IntWritable, Text> {
    61.         private final static Text v = new Text();

    62.         @Override
    63.         public void reduce(IntWritable key, Iterator values, OutputCollector<IntWritable, Text> output, Reporter reporter) throws IOException {
    64.             Map<String, Double> result = new HashMap<String, Double>();
    65.             while (values.hasNext()) {
    66.                 String[] str = values.next().toString().split(",");
    67.                 if (result.containsKey(str[0])) {
    68.                     result.put(str[0], result.get(str[0]) + Double.parseDouble(str[1]));
    69.                 } else {
    70.                     result.put(str[0], Double.parseDouble(str[1]));
    71.                 }
    72.             }
    73.             Iterator iter = result.keySet().iterator();
    74.             while (iter.hasNext()) {
    75.                 String itemID = iter.next();
    76.                 double score = result.get(itemID);
    77.                 v.set(itemID + "," + score);
    78.                 output.collect(key, v);
    79.             }
    80.         }
    81.     }

    82.     public static void run(Map<String, String> path) throws IOException {
    83.         JobConf conf = Recommend.config();

    84.         String input1 = path.get("Step4Input1");
    85.         String input2 = path.get("Step4Input2");
    86.         String output = path.get("Step4Output");

    87.         HdfsDAO hdfs = new HdfsDAO(Recommend.HDFS, conf);
    88.         hdfs.rmr(output);

    89.         conf.setOutputKeyClass(IntWritable.class);
    90.         conf.setOutputValueClass(Text.class);

    91.         conf.setMapperClass(Step4_PartialMultiplyMapper.class);
    92.         conf.setCombinerClass(Step4_AggregateAndRecommendReducer.class);
    93.         conf.setReducerClass(Step4_AggregateAndRecommendReducer.class);

    94.         conf.setInputFormat(TextInputFormat.class);
    95.         conf.setOutputFormat(TextOutputFormat.class);

    96.         FileInputFormat.setInputPaths(conf, new Path(input1), new Path(input2));
    97.         FileOutputFormat.setOutputPath(conf, new Path(output));

    98.         RunningJob job = JobClient.runJob(conf);
    99.         while (!job.isComplete()) {
    100.             job.waitForCompletion();
    101.         }
    102.     }

    103. }

    104. class Cooccurrence {
    105.     private int itemID1;
    106.     private int itemID2;
    107.     private int num;

    108.     public Cooccurrence(int itemID1, int itemID2, int num) {
    109.         super();
    110.         this.itemID1 = itemID1;
    111.         this.itemID2 = itemID2;
    112.         this.num = num;
    113.     }

    114.     public int getItemID1() {
    115.         return itemID1;
    116.     }

    117.     public void setItemID1(int itemID1) {
    118.         this.itemID1 = itemID1;
    119.     }

    120.     public int getItemID2() {
    121.         return itemID2;
    122.     }

    123.     public void setItemID2(int itemID2) {
    124.         this.itemID2 = itemID2;
    125.     }

    126.     public int getNum() {
    127.         return num;
    128.     }

    129.     public void setNum(int num) {
    130.         this.num = num;
    131.     }

    132. }
    复制代码



    计算结果:

    1. ~ hadoop fs -cat /user/hdfs/recommend/step4/part-00000

    2. 1       107,5.0
    3. 1       106,18.0
    4. 1       105,15.5
    5. 1       104,33.5
    6. 1       103,39.0
    7. 1       102,31.5
    8. 1       101,44.0
    9. 2       107,4.0
    10. 2       106,20.5
    11. 2       105,15.5
    12. 2       104,36.0
    13. 2       103,41.5
    14. 2       102,32.5
    15. 2       101,45.5
    16. 3       107,15.5
    17. 3       106,16.5
    18. 3       105,26.0
    19. 3       104,38.0
    20. 3       103,24.5
    21. 3       102,18.5
    22. 3       101,40.0
    23. 4       107,9.5
    24. 4       106,33.0
    25. 4       105,26.0
    26. 4       104,55.0
    27. 4       103,53.5
    28. 4       102,37.0
    29. 4       101,63.0
    30. 5       107,11.5
    31. 5       106,34.5
    32. 5       105,32.0
    33. 5       104,59.0
    34. 5       103,56.5
    35. 5       102,42.5
    36. 5       101,68.0
    复制代码



    对Step4过程优化,请参考本文最后的补充内容。

    6). HdfsDAO.java,HDFS操作工具类
    详细解释,请参考文章:Hadoop编程调用HDFS
    源代码:

    1. package org.conan.myhadoop.hdfs;

    2. import java.io.IOException;
    3. import java.net.URI;

    4. import org.apache.hadoop.conf.Configuration;
    5. import org.apache.hadoop.fs.FSDataInputStream;
    6. import org.apache.hadoop.fs.FSDataOutputStream;
    7. import org.apache.hadoop.fs.FileStatus;
    8. import org.apache.hadoop.fs.FileSystem;
    9. import org.apache.hadoop.fs.Path;
    10. import org.apache.hadoop.io.IOUtils;
    11. import org.apache.hadoop.mapred.JobConf;

    12. public class HdfsDAO {

    13.     private static final String HDFS = "hdfs://192.168.1.210:9000/";

    14.     public HdfsDAO(Configuration conf) {
    15.         this(HDFS, conf);
    16.     }

    17.     public HdfsDAO(String hdfs, Configuration conf) {
    18.         this.hdfsPath = hdfs;
    19.         this.conf = conf;
    20.     }

    21.     private String hdfsPath;
    22.     private Configuration conf;

    23.     public static void main(String[] args) throws IOException {
    24.         JobConf conf = config();
    25.         HdfsDAO hdfs = new HdfsDAO(conf);
    26.         hdfs.copyFile("datafile/item.csv", "/tmp/new");
    27.         hdfs.ls("/tmp/new");
    28.     }        

    29.     public static JobConf config(){
    30.         JobConf conf = new JobConf(HdfsDAO.class);
    31.         conf.setJobName("HdfsDAO");
    32.         conf.addResource("classpath:/hadoop/core-site.xml");
    33.         conf.addResource("classpath:/hadoop/hdfs-site.xml");
    34.         conf.addResource("classpath:/hadoop/mapred-site.xml");
    35.         return conf;
    36.     }

    37.     public void mkdirs(String folder) throws IOException {
    38.         Path path = new Path(folder);
    39.         FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    40.         if (!fs.exists(path)) {
    41.             fs.mkdirs(path);
    42.             System.out.println("Create: " + folder);
    43.         }
    44.         fs.close();
    45.     }

    46.     public void rmr(String folder) throws IOException {
    47.         Path path = new Path(folder);
    48.         FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    49.         fs.deleteOnExit(path);
    50.         System.out.println("Delete: " + folder);
    51.         fs.close();
    52.     }

    53.     public void ls(String folder) throws IOException {
    54.         Path path = new Path(folder);
    55.         FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    56.         FileStatus[] list = fs.listStatus(path);
    57.         System.out.println("ls: " + folder);
    58.         System.out.println("==========================================================");
    59.         for (FileStatus f : list) {
    60.             System.out.printf("name: %s, folder: %s, size: %d\n", f.getPath(), f.isDir(), f.getLen());
    61.         }
    62.         System.out.println("==========================================================");
    63.         fs.close();
    64.     }

    65.     public void createFile(String file, String content) throws IOException {
    66.         FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    67.         byte[] buff = content.getBytes();
    68.         FSDataOutputStream os = null;
    69.         try {
    70.             os = fs.create(new Path(file));
    71.             os.write(buff, 0, buff.length);
    72.             System.out.println("Create: " + file);
    73.         } finally {
    74.             if (os != null)
    75.                 os.close();
    76.         }
    77.         fs.close();
    78.     }

    79.     public void copyFile(String local, String remote) throws IOException {
    80.         FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    81.         fs.copyFromLocalFile(new Path(local), new Path(remote));
    82.         System.out.println("copy from: " + local + " to " + remote);
    83.         fs.close();
    84.     }

    85.     public void download(String remote, String local) throws IOException {
    86.         Path path = new Path(remote);
    87.         FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    88.         fs.copyToLocalFile(path, new Path(local));
    89.         System.out.println("download: from" + remote + " to " + local);
    90.         fs.close();
    91.     }

    92.     public void cat(String remoteFile) throws IOException {
    93.         Path path = new Path(remoteFile);
    94.         FileSystem fs = FileSystem.get(URI.create(hdfsPath), conf);
    95.         FSDataInputStream fsdis = null;
    96.         System.out.println("cat: " + remoteFile);
    97.         try {  
    98.             fsdis =fs.open(path);
    99.             IOUtils.copyBytes(fsdis, System.out, 4096, false);  
    100.           } finally {  
    101.             IOUtils.closeStream(fsdis);
    102.             fs.close();
    103.           }
    104.     }
    105. }
    复制代码



    这样我们就自己编程实现了MapReduce化基于物品的协同过滤算法。
    RHadoop的实现方案,请参考文章:RHadoop实践系列之三 R实现MapReduce的协同过滤算法
    Mahout的实现方案,请参考文章:Mahout分步式程序开发 基于物品的协同过滤ItemCF
    我已经把整个MapReduce的实现都放到了github上面:
    https://github.com/bsspirit/maven_hadoop_template/releases/tag/recommend




    http://f.dataguru.cn/thread-229459-1-1.html


    展开全文
  • 建立同现矩阵的过程在协同过滤算法中同现矩阵的建立是极其关键的一部,而在大多数的博客文章中大多都没有详细说明矩阵的建立过程。在此详细说明一下:原始数据如下所示:1,101,5.0 1,102,3.0 1,103,2.5 2,101,2.0 2,...

    建立同现矩阵的过程

    在协同过滤算法中同现矩阵的建立是极其关键的一部,而在大多数的博客文章中大多都没有详细说明矩阵的建立过程。在此详细说明一下:

    原始数据如下所示:

    1,101,5.0
    1,102,3.0
    1,103,2.5
    2,101,2.0
    2,102,2.5
    2,103,5.0
    2,104,2.0
    3,101,2.0
    3,104,4.0
    3,105,4.5
    3,107,5.0
    4,101,5.0
    4,103,3.0
    4,104,4.5
    4,106,4.0
    5,101,4.0
    5,102,3.0
    5,103,2.0
    5,104,4.0
    5,105,3.5
    5,106,4.0

    对这些数据建立的同现矩阵如下:

          [101] [102] [103] [104] [105] [106] [107]
    [101]   5     3     4     4     2     2     1
    [102]   3     3     3     2     1     1     0
    [103]   4     3     4     3     1     2     0
    [104]   4     2     3     4     2     2     1
    [105]   2     1     1     2     2     1     1
    [106]   2     1     2     2     1     2     0
    [107]   1     0     0     1     1     0     1

    建立过程解释:
    1.针对用户 1 对电影101 102 103 进行了评分,那么会在上述的矩阵如下位置写 1 :

    (101,101)(101,102)(101,103)
    (102,101)(102,102)(102,103)
    (103,101)(103,102)(103,103)

    即对101,102,103进行组合产生3*3个位置点,在对应的位置写1就行。
    针对所有的用户进行上述操作,然后将所有的结果相加,最后就可以得到同现矩阵。


    同现矩阵*评分矩阵的意义

    举个用户3对物品102是否感兴趣的例子:
    用户3对所有物品的评分, 有评分说明用户3喜欢这东西
    这里写图片描述

    同现矩阵其实就是物品与物品之间的关联度, 这个关联度由所有用户对所有物品的评分决定。

    同现的意思就是同时出现的意思,就是说,喜欢 101 的时候同时喜欢 102 。

    比如第一条, 说明同时有3个用户喜欢 101 与 102。意思就是说,在3个用户的各自喜好列表中 101 和 102 同时出现了。
    这里写图片描述

    计算推荐得分, 其意义是: 比如第5行, 用户3很喜欢物品105(给了得了4.5), 同时由其他用户也知道喜欢物品105的对物品102的喜欢程度(得分2), 两者相乘表示用户3因为喜欢物品105 推算出同时也喜欢物品102的得分。
    这里写图片描述

    用户3对物品102 兴趣总得分
    这里写图片描述

    展开全文
  • 同现矩阵其实就是 物品与物品之间的关联度, 这个关联度由所有用户对所有物品的评分决定 比如第一条, 说明同时有3个用户喜欢 101 与 102 物品ID1 物品ID2 物品ID1与物品ID2...

    该讲主要是基于ItemCF的物品推荐。

    1,常用的两大协同过滤算法。

    a,基于物品的协同过滤算法
    基于 item 的协同过滤,通过用户对不同 item 的评分来评测 item 之间的相似性,基于 item 之间的相似性做出推荐。简单来讲就是:给用户推荐和他之前喜欢的物品相似的物品。

    b, 基于用户的协同过滤算法 UserCF
    基于用户的协同过滤,通过不同用户对物品的评分来评测用户之间的相似性,基于用户之间的相似性做出推荐。简单来讲就是:给用户推荐和他兴趣相似的其他用户喜欢的物品。

    数据源:

    评分矩阵:

    举个用户3对物品102是否感兴趣的例子:
    用户3对所有物品的评分, 有评分说明用户3喜欢这东西
    用户ID
    物品ID1
    用户评分
    3
    101
    2
    3
    102
    0
    3
    103
    0
    3
    104
    4
    3
    105
    4.5
    3
    106
    0
    3
    107
    5

    同现矩阵:
    同现矩阵其实就是 物品与物品之间的关联度, 这个关联度由所有用户对所有物品的评分决定
    比如第一条, 说明同时有3个用户喜欢 101 与 102
    物品ID1
    物品ID2
    物品ID1与物品ID2关联权重
    101
    102
    3
    102
    102
    4
    103
    102
    4
    104
    102
    2
    105
    102
    2
    106
    102
    1
    107
    102
    1


    2,mapreduce代码一共分为5部分:

    1,去除数据中的重复记录:
    package com.laoxiao.tuijian;
    
    import java.io.IOException;
    import java.util.Map;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    
    /**
     * 去重复
     * @author root
     *
     */
    public class Step1 {
    
    	
    	public static boolean run(Configuration config,Map<String, String> paths){
    		try {
    			FileSystem fs =FileSystem.get(config);
    			Job job =Job.getInstance(config);
    			job.setJobName("step1");
    			job.setJarByClass(Step1.class);
    			job.setMapperClass(Step1_Mapper.class);
    			job.setReducerClass(Step1_Reducer.class);
    //			
    			job.setMapOutputKeyClass(Text.class);
    			job.setMapOutputValueClass(NullWritable.class);
    			
    			
    			
    			FileInputFormat.addInputPath(job, new Path(paths.get("Step1Input")));
    			Path outpath=new Path(paths.get("Step1Output"));
    			if(fs.exists(outpath)){
    				fs.delete(outpath,true);
    			}
    			FileOutputFormat.setOutputPath(job, outpath);
    			
    			boolean f= job.waitForCompletion(true);
    			return f;
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		return false;
    	}
    	
    	 static class Step1_Mapper extends Mapper<LongWritable, Text, Text, NullWritable>{
    
    		protected void map(LongWritable key, Text value,
    				Context context)
    				throws IOException, InterruptedException {
    			if(key.get()!=0){//为了去掉表头
    				context.write(value, NullWritable.get());
    			}
    		}
    	}
    	
    	 
    	 static class Step1_Reducer extends Reducer<Text, IntWritable, Text, NullWritable>{
    
    			protected void reduce(Text key, Iterable<IntWritable> i,
    					Context context)
    					throws IOException, InterruptedException {
    				context.write(key,NullWritable.get());
    			}
    		}
    }
    
    2,按照用户分组,得出用户对物品的推荐得分矩阵
    package com.laoxiao.tuijian;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.MapWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.log4j.Logger;
    
    /**
     * 按用户分组,计算所有物品出现的组合列表,得到用户对物品的推荐得分矩阵
    u13	i160:1,
    u14	i25:1,i223:1,
    u16	i252:1,
    u21	i266:1,
    u24	i64:1,i218:1,i185:1,
    u26	i276:1,i201:1,i348:1,i321:1,i136:1,
     * @author root
     *
     */
    public class Step2 {
    
    	
    	public static boolean run(Configuration config,Map<String, String> paths){
    		try {
    //			config.set("mapred.jar", "C:\\Users\\Administrator\\Desktop\\wc.jar");
    			FileSystem fs =FileSystem.get(config);
    			Job job =Job.getInstance(config);
    			job.setJobName("step2");
    			job.setJarByClass(StartRun.class);
    			job.setMapperClass(Step2_Mapper.class);
    			job.setReducerClass(Step2_Reducer.class);
    //			
    			job.setMapOutputKeyClass(Text.class);
    			job.setMapOutputValueClass(Text.class);
    			
    			
    			
    			FileInputFormat.addInputPath(job, new Path(paths.get("Step2Input")));
    			Path outpath=new Path(paths.get("Step2Output"));
    			if(fs.exists(outpath)){
    				fs.delete(outpath,true);
    			}
    			FileOutputFormat.setOutputPath(job, outpath);
    			
    			boolean f= job.waitForCompletion(true);
    			return f;
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		return false;
    	}
    	
    	 static class Step2_Mapper extends Mapper<LongWritable, Text, Text, Text>{
    
    		protected void map(LongWritable key, Text value,
    				Context context)
    				throws IOException, InterruptedException {
    			String[]  tokens=value.toString().split(",");
    			String item=tokens[0];//取得物品id
    			String user=tokens[1];//取得用户id
    			String action =tokens[2];//取得用户行为
    			Text k= new Text(user);
    			Integer rv =StartRun.R.get(action);//从一个map集合中得出某种行为应该得到的分数权重
    //			if(rv!=null){
    			Text v =new Text(item+":"+ rv.intValue());
    			context.write(k, v);//按照userid分组,values是物品id:分数
    		}
    	}
    	
    	 
    	 static class Step2_Reducer extends Reducer<Text, Text, Text, Text>{
    
    			protected void reduce(Text key, Iterable<Text> i,
    					Context context)
    					throws IOException, InterruptedException {
    				Map<String, Integer> r =new HashMap<String, Integer>();//为了累加对同样一件物品的多次评分
    				
    				for(Text value :i){//循环每个用户,对不同物品的打分情况
    					String[] vs =value.toString().split(":");
    					String item=vs[0];//这个是物品Id
    					Integer action=Integer.parseInt(vs[1]);//这个是该物品的得分
    					action = ((Integer) (r.get(item)==null?  0:r.get(item))).intValue() + action;
    					r.put(item,action);
    				}
    				StringBuffer sb =new StringBuffer();
    				for(Entry<String, Integer> entry :r.entrySet() ){
    					sb.append(entry.getKey()+":"+entry.getValue().intValue()+",");
    				}
    				
    				context.write(key,new Text(sb.toString()));
    			}
    		}
    }
    
    3,计算物品的同现矩阵
    package com.laoxiao.tuijian;
    
    import java.io.IOException;
    import java.util.HashMap;
    import java.util.Map;
    import java.util.Map.Entry;
    import java.util.StringTokenizer;
    
    import org.apache.hadoop.conf.Configuration;
    import org.apache.hadoop.fs.FileSystem;
    import org.apache.hadoop.fs.Path;
    import org.apache.hadoop.io.IntWritable;
    import org.apache.hadoop.io.LongWritable;
    import org.apache.hadoop.io.MapWritable;
    import org.apache.hadoop.io.NullWritable;
    import org.apache.hadoop.io.Text;
    import org.apache.hadoop.mapreduce.Job;
    import org.apache.hadoop.mapreduce.Mapper;
    import org.apache.hadoop.mapreduce.Reducer;
    import org.apache.hadoop.mapreduce.lib.input.FileInputFormat;
    import org.apache.hadoop.mapreduce.lib.output.FileOutputFormat;
    import org.apache.log4j.Logger;
    /**
     * 对物品组合列表进行计数,建立物品的同现矩阵
    i100:i100	3
    i100:i105	1
    i100:i106	1
    i100:i109	1
    i100:i114	1
    i100:i124	1
     * @author root
     *
     */
    public class Step3 {
    	 private final static Text K = new Text();
         private final static IntWritable V = new IntWritable(1);
    	
    	public static boolean run(Configuration config,Map<String, String> paths){
    		try {
    			FileSystem fs =FileSystem.get(config);
    			Job job =Job.getInstance(config);
    			job.setJobName("step3");
    			job.setJarByClass(StartRun.class);
    			job.setMapperClass(Step3_Mapper.class);
    			job.setReducerClass(Step3_Reducer.class);
    			job.setCombinerClass(Step3_Reducer.class);
    //			
    			job.setMapOutputKeyClass(Text.class);
    			job.setMapOutputValueClass(IntWritable.class);
    			
    			
    			
    			FileInputFormat.addInputPath(job, new Path(paths.get("Step3Input")));
    			Path outpath=new Path(paths.get("Step3Output"));
    			if(fs.exists(outpath)){
    				fs.delete(outpath,true);
    			}
    			FileOutputFormat.setOutputPath(job, outpath);
    			
    			boolean f= job.waitForCompletion(true);
    			return f;
    		} catch (Exception e) {
    			e.printStackTrace();
    		}
    		return false;
    	}
    	
    	 static class Step3_Mapper extends Mapper<LongWritable, Text, Text, IntWritable>{
    
    		protected void map(LongWritable key, Text value,
    				Context context)
    				throws IOException, InterruptedException {
    			String[]  tokens=value.toString().split("\t");
    			String[] items =tokens[1].split(",");//取得每一个物品id和对应的评分
    			for (int i = 0; i < items.length; i++) {
    				String itemA = items[i].split(":")[0];//取得物品id
    				for (int j = 0; j < items.length; j++) {
    					String itemB = items[j].split(":")[0];
    					K.set(itemA+":"+itemB);//设置物品之间的同现,次数为1,因为在一次用户行为中
    					context.write(K, V);
    				}
    			}
    			
    		}
    	}
    	
    	 
    	 static class Step3_Reducer extends Reducer<Text, IntWritable, Text, IntWritable>{
    
    			protected void reduce(Text key, Iterable<IntWritable> i,
    					Context context)
    					throws IOException, InterruptedException {
    				int sum =0;
    				for(IntWritable v :i ){
    					sum =sum+v.get();
    				}
    				V.set(sum);
    				context.write(key, V);
    			}
    		}
    	 
    }
    

    4,同现矩阵和得分矩阵相乘
    (待续)

    展开全文
  • 如何建立物品的同现矩阵

    千次阅读 2014-08-20 17:34:30
    在协同过滤算法中同现矩阵的建立是极其关键的一部,而在大多数的博客文章中大多都没有详细说明矩阵的建立过程。...针对所有的用户进行上述操作,然后将所有的结果相加,最后就可以得到同现矩阵
  • 灰度图像之同现矩阵的求解算法与实现 一、 算法 输入:灰度图像矩阵 、位移矢量 。 输出:灰度图像矩阵 对应的同现矩阵 。   Step1:统计灰度图像矩阵 (以下简称矩阵 )的元素的最大值(即灰度图像的...
  • 协同过滤与同现矩阵

    千次阅读 2018-09-24 23:09:47
    协同过滤与同现矩阵:                            
  • 同现矩阵:  
  • 推荐系统中的同现矩阵问题

    千次阅读 2016-12-07 15:35:35
    最近在研究Hadoop大数据方面的知识,正巧看到了推荐系统,其中里面有个同现矩阵很是让我蒙圈,查了很多blog之后,写下来与大家分享。 1.建立同现矩阵的过程 在协同过滤算法中同现矩阵的建立是极其关键的一部,...
  • 同现矩阵*评分矩阵=推荐结果

    千次阅读 2016-05-22 10:38:17
    同现矩阵其实就是 物品与物品之间的关联度, 这个关联度由所有用户对所有物品的评分决定 比如第一条, 说明同时有3个用户喜欢 101 与 102 物品ID1 物品ID2 物品ID1与物品ID2关联权重 ...
  • 同现矩阵其实就是 物品与物品之间的关联度, 这个关联度由所有用户对所有物品的评分决定 比如第一条, 说明有3个用户既喜欢 101 也喜欢 102 物品ID1 物品ID2 物品ID1与物品ID2...
  • 推荐系统协同过滤算法ItemCF同现矩阵的构建
  • Spark机器学习:同现相似度矩阵

    千次阅读 2016-10-25 11:26:23
    相似度可用于为协调过滤推荐中,查找相似的物品或者用户。下面对相似度进行简单的定义 物品i和物品j的相似度公式定义:   其中,分母是喜欢物品i的用户数,而分子则是同时喜欢物品i和物品j的用户数。因此,...
  • (1)建立用户物品倒排表 A a b d B a c e C b e null D b d e ( 2 ) 构建同现矩阵 同现矩阵表示同时喜欢两个物品的用户数,根据用户物品倒排表计算出来 (3) 统计每个物品有行为的用户数 (4) 计算物品之间的相似度...
  • 共现矩阵

    万次阅读 2019-08-02 19:54:52
    共现矩阵 主要用于发现主题,解决词向量相近关系的表示; 将共现矩阵行(列)作为词向量 例如:语料库如下: • I like deep learning. • I like NLP. • I enjoy flying. 则共现矩阵表示如下:(使用对称的窗函数...
  • python 共现矩阵的实现

    千次阅读 2020-02-29 15:26:58
    文章目录前言什么是共现矩阵共现矩阵的构建思路 前言 最近在学习python词库的可视化,其中有一个依据共现矩阵制作的可视化,感觉十分炫酷,便以此复刻。 什么是共现矩阵 比如我们有两句话: ls = ['我永远喜欢三上...
  • 训练是使用语料库中的同现矩阵来完成的。 结果表示形式包含对许多其他任务有用的结构。 描述模型的论文在。 可以在该机器学习模型的原始实现。 @作者乔纳森·赖曼(Jonathan Raiman) 例子 要使用此软件包,您...
  • 利用MapReduce计算单词共现矩阵
  • Movielens数据集 -用户物品评分矩阵

    千次阅读 2020-04-01 12:04:43
    Movielens数据集 -用户物品评分矩阵转化 #代码实现 // An highlighted block file = open("ratings.csv", 'r', encoding='UTF-8') # 记得读取文件时加‘r’, encoding='UTF-8' # 读取data.csv中每行中除了名字的...
  • 最长公共子串问题——矩阵法求解

    千次阅读 2019-03-03 21:07:46
    最大公共子串长度问题就是: 求两个串的所有子串中能够匹配上的最大长度是多少。...2、比较二维矩阵中每个点对应行列字符中否相等,相等的话值设置为1,否则设置为0。 3、通过查找出值为1的最长对角线...
  • Spark MLlib之矩阵

    千次阅读 2016-09-19 13:24:34
    将一个分布式矩阵转换为另一种不同格式的分布式矩阵,可能引发全局shuffle操作,开销很大 四种分布式矩阵实现 RowMatrix IndexedRowMatrix CoordinateMatrix BlockMatrix 行矩阵(RowMatrix) ...
  • 词向量发展史-共现矩阵-SVD-NNLM-Word2Vec-Glove-ELMo

    万次阅读 多人点赞 2018-12-13 15:47:10
    话不多说,直接上干货。 首先介绍相关概念: 词嵌入:把词映射为实数域上向量的技术也叫词嵌入(word embedding)。...具体来说,我们通过从大量的语料文本中构建一个共现矩阵来定义word representation。 ...
  • 深度学习自学记录(3)——两种多分类混淆矩阵的Python实现(含代码)1、什么是混淆矩阵2、分类模型评价指标3、两种多分类混淆矩阵3.1直接打印出每一个类别的分类准确率。3.2打印具体的分类结果的数值4、总结 1、...
  • 目前矩阵式变频器因采用具有输入功率因数可调,输出频率连续,功率双向流动且无直流母线的矩阵式变换器(MC)而倍受关注。虽然三相用电设备广泛应用于生产领域,但是在一些行业(如感应加热和感应熔炼)仍需要单相电源,...
  • 刘丁酉矩阵分析

    2019-03-14 19:11:56
    1994年1月起任武汉测绘科技大学数理系副主任、主任及武汉大学数学与统计学院公共课部主任,为武汉大学数学与统计学院基础数学系教授、硕博士生导师、湖北省优秀教师、湖北省高校数学学科跨世纪学科带头人、测绘...
  • 对腔镜有限分划,基于衍射积分理论构造光腔的传输数值矩阵, 并对其进行特征值和特征向量求解, 从而实 光腔模式计算。计算了非虚共焦非稳腔内的光场特性和远场光斑分布, 并与实际结果吻合。

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 39,571
精华内容 15,828
关键字:

同现矩阵

友情链接: 风扇.rar