精华内容
下载资源
问答
  • 题 目: 基于Spark机器学习的电商推荐系统的设计与实现 这是我去年本科毕业时做的毕业设计论文,全文三万多字,知网查重对重复率1%,由于本科论文不会被发表到知网上,再加上我已毕业近一年,现在将论文发表到CSDN...

    毕 业 设 计(论 文)
    题   目:    基于Spark机器学习的电商推荐系统的设计与实现

    这是我去年本科毕业时做的毕业设计论文,全文三万多字,知网查重对重复率1%,由于本科论文不会被发表到知网上,再加上我已毕业近一年,现在将论文发表到CSDN。如有需要做毕设论文可引用本文对内容,先到先得(内容纯原创,少有重复)。

    由于作者对水平有限,文章中难免有错误对内容或作者对相关技术有错误对见解,望读者予以谅解,谢谢!

    接上篇文章:电商推荐系统(下):实时推荐服务、实时框架、实时推荐算法、获取用户的K次最近评分、商品推荐优先级、实时系统联调、更新实时推荐结果


    摘要
    本论文作者在2019年11月到2020年5月期间在中国民营企业500强(2019年胡润榜)公司“深圳传音控股股份有限公司(上海)”参加大数据平台开发实习,主要负责数据仓库上层数据的ETL、解析等开发任务。作者所在的公司是一家移动互联网行业的公司,主营业务为非洲智能手机研发、生产、销售、智能软件研发等,公司在2019年间完成了全球1.37亿部智能手机的出货量,该年的智能手机销量位居世界第四。在这个年代,大数据正随着互联网用户数据的兴起而蓬勃发展。智能手机设备每天会产生大量的数据,而智能手机用户每天的各种数据也随着用户量和用户对手机的使用频率而剧增。作者在参加实习前就曾通过学校或自学网课视频学习了大数据相关技术栈,研读过很多大数据技术资料。在实习期间通过实习巩固了大数据技术,有一定的海量数据批处理、实时计算和数据挖掘经验。此外,作者还曾对电商行业的一些业务有深入研究。电商行业是当今时代的热门行业,作者希望通过大数据技术实现电商推荐系统,以帮助推动电商行业的发展。
    近些年来,电商、大数据逐渐成为当今时代信息技术发展的主题,基于大数据应用技术实现的电商推荐系统逐渐成为各大电商平台研究推荐系统服务的重点。阿里巴巴集团创始人马云也多次对外界强调阿里巴巴是一家大数据公司而不是电商企业,这足以说明数据是电商行业的命脉,可见大数据在电商行业中的重要性。然而,如何处理海量的用户行为数据,从用户行为中挖掘出用户的商品购买偏好是所有电商平台的重点难题,至今没有一家公司能做到推荐服务100%准确。在新的大数据时代里,一款名为Spark的大数据计算框架如同它的名字一样让星星之火照亮整个大数据世界,Spark以高效的海量数据处理能力等诸多优点,现已成为众多电商平台实现推荐服务的首选技术框架。SparkMLlib是Spark的机器学习库,本文会重点介绍SparkMLlib在电商行业中的具体实现,此外还对SparkSQL、SparkStreaming、微服务、非关系型数据库等技术做简单介绍,通过使用多项技术,查阅大量相关开源贡献资料,综合完成基于Spark机器学习的电商推荐系统的设计与实现。
    关键词:Java;电商;Scala;微服务;大数据;机器学习;Spark;推荐系统;数据挖掘 

    (此处省略英文翻译)

    目录
    第1章 绪论    1
    1.1 选题背景    1
    1.2 本课题研究的意义    1
    1.3 国内外研究现状    2
    1.3.1 国外研究现状    2
    1.3.2 国内研究现状    2
    1.4 论文的主要内容及组织结构    3
    第2章 相关理论及技术    4
    2.1 JavaEE技术    4
    2.1.1 SpringBoot后端框架    4
    2.1.2 AngularJS前端框架    4
    2.2 数据库    4
    2.2.1 MongoDB    4
    2.2.2 Redis    5
    2.3 日志采集与消息缓冲    5
    2.3.1 Flume-ng    5
    2.3.2 Kafka    5
    2.4 Spark技术栈    5
    2.4.1 Scala    6
    2.4.2 SparkSQL    6
    2.4.3 SparkMLlib    6
    2.4.4 SparkStreaming    6
    2.5 推荐算法    6
    2.5.1 统计分析算法    7
    2.5.2 ALS交替最小二乘算法    7
    2.5.3 过拟合与欠拟合    7
    2.5.4 正则化项    8
    2.5.5 协同过滤算法    8
    2.5.6 实时推荐算法    9
    2.5.7 其他推荐算法    9
    2.6 Linux系统    9
    2.6.1 CentOS虚拟机    9
    2.6.2 云主机    9
    2.7 未来技术选型    10
    2.7.1 EMR-Hadoop    10
    2.7.2 Hive    10
    2.7.3 Flink    11
    2.7.4 Dubbo与SpringCloud    11
    2.7.5 用户画像    12
    第3章 系统可行性分析和需求分析    14
    3.1 系统可行性分析    14
    3.1.1 技术可行性    14
    3.1.2 经济可行性    14
    3.1.3 操作可行性    15
    3.2 系统需求分析    15
    3.2.1 JavaEE业务功能需求    15
    3.2.2 商品推荐服务功能需求    16
    第4章 系统方案的设计    17
    4.1 系统功能设计原则    17
    4.1.1 微服务    17
    4.1.2 MVC架构    18
    4.1.3 系统的可靠性    18
    4.1.4 系统的维护性    19
    4.2 项目架构设计    19
    4.3 JavaEE模块的设计    20
    4.3.1 前端模块的设计    20
    4.3.2 后端模块的设计    21
    4.4 数据仓库设计    21
    4.4.1 业务数据库    21
    4.4.2 缓存数据库    22
    4.4.3 业务数据表的设计    22
    4.4.4 系统数据流程设计    25
    4.5 离线推荐模块的设计    26
    4.5.1 离线统计推荐模块的设计    26
    4.5.2 离线个性化智能推荐模块的设计    26
    4.6 实时推荐模块的设计    27
    第5章 推荐算法设计    1
    5.1 离线统计推荐算法设计    1
    5.1.1 历史热门商品统计推荐算法设计    1
    5.1.2 近期热门商品统计推荐算法设计    1
    5.1.3 商品平均评分统计推荐算法设计    2
    5.2 离线个性化智能推荐算法设计    2
    5.2.1 计算用户商品推荐列表    2
    5.2.2 模型评估和参数选取    4
    5.2.3 计算商品相似度矩阵    4
    5.3 实时推荐算法设计    4
    5.3.1 实时推荐算法思想    5
    5.3.2 初步计算备选商品的推荐优先级    6
    5.3.3 计算偏移项    6
    5.3.4 计算备选商品的最终推荐优先级    6
    5.4 其他推荐算法    6
    5.4.1 基于内容的相似推荐算法    6
    5.4.2 基于物品的协同过滤相似推荐算法    7
    5.4.3 用户画像系统算法    7
    第6章 系统实现    8
    6.1 前端用户系统实现    8
    6.1.1 登录注册模块的实现    8
    6.1.2 商品页功能的实现    8
    6.2 推荐系统服务环境搭建    9
    6.2.1 环境准备    9
    6.2.2 Linux虚拟机搭建    10
    6.2.3 安装框架组件    10
    6.2.4 创建微服务项目    12
    6.2.5 创建推荐服务子模块    14
    6.3 离线统计推荐系统实现    16
    6.3.1 历史热门商品统计    17
    6.3.2 最近热门商品统计    17
    6.3.3 商品平均得分统计    18
    6.4 基于隐语义模型的协同过滤推荐    20
    6.4.1 计算用户商品推荐列表    20
    6.4.2 模型评估和参数选取    21
    6.4.3 计算商品相似度矩阵    22
    6.5 实时推荐系统实现    24
    6.5.1 实时推荐功能的实现    26
    6.5.2 商品推荐优先级计算    26
    6.5.3 更新实时推荐结果    28
    6.6 后台服务系统实现    28
    6.6.1 后台系统注册登录模块实现    28
    6.6.2 后台商品模糊查询功能实现    28
    6.6.3 后台用户评分日志模块的实现    29
    6.7 系统测试    30
    6.7.1 系统演示    31
    6.7.2 冷启动问题    33
    第7章 总结    34
    7.1 课题总结    34
    7.2 项目亮点    35
    7.2.1 使用高效率的JavaEE框架    35
    7.2.2 使用Spark作为推荐服务工具    35
    7.2.3 使用多种智能推荐算法    35
    7.2.4 使用非关系型数据库替换传统数据库    36
    7.3 存在的问题    36
    7.3.1 研究更准确的推荐算法    36
    7.3.2 研究更高效的实时推荐框架    36
    7.3.3 提升系统性能    36
    致谢    38
    参考文献    39

    第1章绪论
    1.1选题背景
    截止2020年1月1日,阿里巴巴集团宣布淘宝用户已突破8亿,并且中国新零售市场月活跃用户超过8.24亿。网络资源在日益丰富,电商用户尤其是有“选择困难症”的人群要在海量的商品中挑选出自己喜欢的商品已经是一件非常困难的事。随着电商行业现在的“信息过载”现象日益严峻,使用电商推荐系统如何给用户精准推荐商品已经成为电商平台的重要难题。电商推荐系统的出现就是为了解决电商行业难以实现精准营销的情况,即使用技术帮助用户在海量商品中挑选出最合适、最喜欢的商品,为用户节省浏览商品的时间,丰富用户的个性化消费购物体验,同时为卖家提升商品的销量[1]。电商推荐系统现今日益受到各大电商平台开发者的青睐,也受到越来越多专家学者和推荐系统研究者的关注。
    而实现电商推荐系统的技术选型也尤为重要,需要考虑系统的运行效率、推荐结果的准确性。经过对比后,本课题选用当前最为先进的推荐技术,即Spark框架,SparkMLlib就是Spark机器学习库,它本次课题项目实现的核心组件。
    1.2本课题研究的意义
    本课题采用Spark全栈技术实现商品的个性化推荐,借助大数据项目常用的一些工具,使JavaEE系统部署业务服务,将个性化推荐作为电商网络营销的一种新的手段,能为电商公司带来巨大的利润。国内最大的电商平台——阿里巴巴(淘宝)自从使用了电商推荐服务以后,淘宝的商品关联支付转化率提升了近100%,商品的客单价提升了29%,可见推荐系统在电子商务实现精准营销[2]技术研究中的重要意义。
    电商推荐系统的目标是根据用户浏览商品以及对商品评分等行为,以此推测用户对其他相似商品的喜好程度,智能预测符合用户口味偏好的商品,并将推荐结果展示给用户[3]。可想而知,系统仅仅可通过用户近期浏览过的商品和对商品的评分情况就能知道用户喜欢什么样的口味、价位、质量等因素的商品,可以减少用户浏览商品的时间,而这样的推荐系统,一定会受到电商平台和电商用户的青睐。因此,有了电商个性化推荐系统和准确度较高的推荐算法,能为电商企业和消费者提升粘合度,达到商品的“精准营销”的目的,不仅能给电商平台提升市场竞争力,而且给用户提升网上购物的消费体验,实现电商交易的“双赢”。
    1.3国内外研究现状
    电商的概念最先起源于国外,而自从20世纪末以来电商在国内开始发展。直到现在,国外的亚马逊、EBay,国内的淘宝、天猫商城、京东、拼多多等电商大厂都赚的盆满钵满。
    1.3.1国外研究现状
    国外早在1994年就有人设计了新闻推荐系统,随后,全球著名的电商巨头亚马逊Amazon公司将推荐系统应用在了电商中,通过用户近期购买、浏览商品的行为猜测用户喜欢什么样的商品并推荐给用户[1],这为今后二十多年的推荐系统发展产生了巨大的影响。
    2006年Netflix通过悬赏百万美金大赛寻求将推荐算法的准确度提升10%以上的参赛者,重赏之下必有勇夫,从那以后各种高准确度的推荐算法就如同雨后春笋般涌现出来。虽然现在推荐系统在全球范围都在使用,在国外的电商推荐系统中主要是基于用户的点击、浏览、购买商品等行为预测用户是否喜欢该商品[4]。但是,如何提高推荐的准确度是各大电商公司都在思考的话题,因此,无论什么年代都要研究准确度更高的推荐算法。
    1.3.2国内研究现状
    与国外相比,虽然国内电商推荐系统的研究起步较晚,但是国内电商推荐系统发展速度较快,再加上当前用户的数据量过大以及技术的不断更新与迭代,国内各大电商企业对电商推荐系统的研究也越来越多。自从亚马逊公司将个性化电商推荐系统的成功应用以后,全球的电商推荐系统开始蓬勃发展,国内以阿里巴巴为首的电商巨头逐渐将商品推荐服务有原来的推荐算法发展到现今的“千人千面”的个性化精准智能推荐[2]。此外,国内的其他巨头企业如百度、腾讯、头条等公司都在发展电商业务,也在研究电商推荐系统。
    2006年,国内的当当网首次使用个性化图书商品推荐系统,将书籍的给类信息智能推荐给用户,当时就收到很多读者用户的青睐;2008年,淘宝网也推出自己的电商个性化推荐服务模块“i淘宝”,这款产品可以根据用户对商品的浏览情况,短时间为用户找到自己喜欢的商品以及可能感兴趣的东西[4],这就是淘宝电商推荐系统的雏形;2011年,百度也推出了搜索引擎的个性化推荐技术“一人一世界”,这个功能为百度积累用户的行为数据,方便于机器学习进行模型训练,为不同用户推荐不同的个性化内容;2016年,阿里巴巴的算法团队YunOS在ACM大赛上获得金牌。由此可见,国内在电商推荐系统的研究现今已经不逊色于国外。
    1.4论文的主要内容及组织结构
    本文将在第二章介绍实现基于Spark机器学习的电商推荐系统的设计原理与架构介绍,包括项目的整体架构、JavaEE介绍、数据库介绍、日志采集技术介绍、Spark技术栈介绍、Linux介绍等。由于电商推荐系统的原理包含多个方面,第二章结尾还将介绍项目的未来技术选型以及发展规划。在第三章中,本文将对电商推荐系统项目进行系统可行性分析与需求分析,以了解用户的需求和系统实现的可行性。在第四章里将介绍系统的设计方案,并在第五章中,文章会详细介绍系统的实现流程,包括JavaEE项目实现、统计推荐实现、基于隐语义模型协同过滤算法的实现、实时推荐以及其他推荐算法等等。在第六章对搭建商推荐系统的设计与实现进行全文总结,并在第七章对本次毕业设计毕业设计期间为我提供帮助的老师、同学们、实习公司以及全球开发者的开源贡献表示感谢。
    本文不讲解大数据、人工智能、数据库相关的理论概念,若读者对相关技术概念还很陌生的话,请读者先参考其他相关的文章理解大数据与人工智能相关的技术理论。
    在文章的结尾列举了设计与实现电商推荐系统的一些参考文献,需要特别声明的是,大数据、机器学习数据挖掘、数据采集等相关技术栈是近期来新兴起的技术,电商推荐系统目前还仅仅是一些电商互联网巨头企业才用得起的技术,当今无论国内还是国外都鲜有此类文献发表,学习相关技术主要依赖于官网的开源贡献文档。因此,本论文作者希望读者多参考如Apache Spark官网这样网站。在此,本文笔者祈愿能有更多的开发者或相关文献作者能早日在“大数据与机器学习”这块“新大陆”上创造出相关文献资料,为全球大数据、人工智能与电商行业的发展添砖加瓦。

    第2章
    相关理论及技术
    2.1JavaEE技术
    作为连续几年蝉联全球开发者使用频率最高的一门语言,Java是众多企业进行企业级项目开发的首选。由于其跨平台性、高效的计算性能,以及全球众多开发者在类库和框架方面的开源贡献,Java语言是本次课题项目实现处理系统综合业务的首选,此次选择的微服务框架是SpringBoot。
    2.1.1SpringBoot后端框架
    SpringBoot是Spring全家桶中的框架成员之一,也是当今最流行的JavaEE框架之一。它相当于以前流行的Spring框架的简化版,使用SpringBoot开发JavaEE项目实现成本低,项目运行效率高,并且支持高并发的运行环境,现在深受企业级Java项目开发者的喜爱。
    2.1.2AngularJS前端框架
    AngularJS是谷歌公司的开源框架之一,主要被用于前端开发,底层采用JS语言编写,由于本文项目采用的是微服务技术,前后端分离,因此可选择AngularJS作为前端框架,由前端程序员开发。
    2.2数据库
    本课题项目选择非关系型一来是非关系型数据库逐渐有取代关系型数据库的趋势,二来是大数据环境下业务的需要,具体会在文章后文进行说明。
    2.2.1MongoDB
    MongoDB是采用C++语言编写的文档型非结构化数据库,由于它存储的数据是JSON格式,并且性能比较好,现在逐渐被众多公司所采用。本课题项目的电商推荐系统摒弃了传统的关系型数据库(如Oracle、MySQL、SQL Server)而采用MongoDB作为业务数据库,不仅是考虑到MongoDB能存储大量的小文件,并且对存储海量数据有着非常好的支持,还考虑后期将业务数据导入到Hadoop、Hive表(底层数据存储在Hadoop-HDFS或AWS-S3上)的简便性,同时,MongoDB在安装和运维上也相对传统的关系型数据库更为简单。
    2.2.2Redis
    与MongoDB相同,Redis也是一种非结构化数据库,但是Redis是一种基于内存的缓存数据库,它的读写性能极高。现在众多企业也在使用Redis,本文的电商推荐系统就是借助Redis存储和传输实时数据用于实时推荐模块。
    2.3日志采集与消息缓冲
    在大数据实时计算中,数据的实时性非常重要,因此需要有日志采集与消息缓冲服务,在大数据环境中常用的日志采集与消息缓冲服务组件分别是Flume与Kafka。
    2.3.1Flume-ng
    Flume是一种被用于数据采集的框架,主要通过类似管道的形式采集与发送数据,它具有很强的拓展性与良好的容错性等诸多优点[5],成为大数据行业中必备的工具之一。本文的电商推荐系统通过Flume采集Nginx收集到的埋点数据,实时发送给Kafka。
    2.3.2Kafka
    与Flume不同的是,Kafka更做适合下游的数据消费,而Flume更做适合上游的数据采集,但是也有公司使用Kafka采集数据发送到Flume。本文的电商推荐系统使用Kafka广播数据传到SparkStreaming进行实时计算。
    2.4Spark技术栈
    Spark于2014年被加州大学开源捐赠给Apache基金会,随后其火热程度就如它的名字一样以星星之火迅速燎原,不到一年就成为Apache的顶级项目[6]。从被开源至今仅6年的发展历史,它虽然非常火爆,但是迄今为止Spark相关的期刊论文还是极少,研究相关的技术主要依赖于官网及其他开发者的博客贡献,因此本文的参考文献以Apache Spark官网为主。
    在Spark Core的基础上,Spark技术栈包含的组件有SparkSQL、SparkStreaming、SparkMLlib和GraphX,它们分别被用于离线交互式计算、实时流失计算、机器学习、图计算[7]。由于本课题电商推荐系统没有使用到图计算相关的技术,因此本次不介绍和使用GraphX。
    2.4.1Scala
    Scala是一种类似于Java但语法比Java简单的语言。Scalac会通过类似Javac编译器将Scala语言编译成.class文件,因此,Scala可以是一种能“运行在JVM上的语言(说法不准确)”,它的运行效率和Java相当[7],主要被用在大数据行业。此外,本文的电商推荐系统使用的Spark框架正是用Scala编写的,因此,本课题项目使用Scala语言写电商推荐系统的推荐服务。
    2.4.2SparkSQL
    SparkSQL是Spark框架的组件之一,它可以充当分布式SQL查询引擎,也可以通过使用DataFrame或DataSet对大数据进行开发,主要被用于交互式计算[7]。本课题项目选择SparkSQL来处理电商推荐系统的统计推荐服务,一方面是因为SparkSQL开发起来比Hadoop的MapReduce简单,二是因为Spark(基于内存)的运行速度比MapReduce(基于磁盘,有频繁的I/O操作)更快[7],Spark是众多公司的大数据开发处理数据的首选。
    2.4.3SparkMLlib
    SparkMLlib也是Spark框架的组件之一,全称为“Spark machine learning library”,即Spark的机器学习库[9],它降低大数据与人工智能技术之间的技术成本。虽然Hadoop也提供了机器学习库——Mahout[10],但是人们更喜欢用SparkMLlib。SparkMLlib包含了一些如聚类、分类、回归、协同过滤、分类等机器学习算法[8],本文的电商推荐系统的个性化推荐服务主要借助于SparkMLlib的协同过滤算法实现。
    2.4.4SparkStreaming
    SparkStreaming也是Spark框架的组件之一,它被用于大数据的流式处理实时计算[11]。课题项目电商推荐系统就是借助SparkStreaming接收Kafka的实时数据,再借助推荐算法完成实时推荐服务。
    2.5推荐算法
    推荐算法是实现电商推荐系统的推荐功能的核心,也是本课题项目的关键所在,本项目包含统计分析、机器学习智能推荐和实时推荐算法,下面将对相关算法做简要说明。
    2.5.1统计分析算法
    统计推荐模块使用SparkSQL进行统计分析实现,仅仅需要在Spark脚本程序中写SQL和UDF函数(用于处理数据的自定义函数)即可,具体的算法实现有SparkSQL封装的函数完成,在此不对该算法做重点介绍,读者感兴趣的话可以研读SparkSession.sql()函数中具体算法的实现流程。
    2.5.2ALS交替最小二乘算法
    ALS(alternating least squares,交替最小二乘法)是一种常常被用于推荐系统中的算法,该算法的核心原理就是将一个矩阵分解成两个包含隐特征的矩阵。与传统的矩阵分解算法SVD相比,ALS算法能解决数据中的缺失项问题。
    电商推荐系统会使用ALS算法进行模型训练,通过计算将用户(行)-商品(列)-评分(值)矩阵R(m×n)分解成两个低维度的用户对商品隐特征偏好矩阵X(m×k)和商品包含的隐特征矩阵Y(n×k),在计算的过程中,为使分解后的矩阵X与Y相乘后更接近于R,需要计算损失函数,具体算法会在后文中介绍。
    2.5.3过拟合与欠拟合
    计算损失函数的算法就是计算方差,即真实值减去预测值后计算平方取平均值以后再开方。但是这个方差值并不是越低越好:若损失函数值偏大,则拟合度不够;若损失函数过小(趋近于0),则出现过拟合问题。
    什么是过拟合与欠拟合?过拟合就是特征集过大,导致系统不能很好地识别数据,如果一味地追求预测能力,则很容易出现过拟合问题;而欠拟合就是特征集过小,模型不能很好地拟合数据。
    如图2-1所示,“人工智能之父”艾伦·图灵曾以一片树叶做形象地比喻:


    图 2-1过拟合与欠拟合
    套用毛泽东的话说:“每片树叶都不同”。通俗地讲,在人工智能领域里,若模型训练出现过拟合问题,则机器学习识别人脸的时候,若此人化妆了,或者是双胞胎兄弟姐妹,程序就识别不出来,此时仅能识别本人刚刚被拍照的样子。若模型训练出现欠拟合的问题,则机器学习识别人脸的时候,会把猴子、大猩猩甚至其他动物识别成人,这样的识别效果自然不言而喻。
    2.5.4正则化项
    正则化是风险最小化的实现,它是模型复杂度的单调递增函数,即在风险上增加一个正则化项或者罚项,模型越复杂,正则化项的值也就也大。
    通俗地讲,电商推荐系统的训练模型越复杂,就越有可能出现过拟合的问题,因此正则化项系数也就越大。那么此时损失函数的计算结果就不可能小,因此使用正则化项可以降低复杂模型的复杂度,从而避免出现过拟合的现象。
    2.5.5协同过滤算法
    协作过滤算法通常用于推荐系统。这些技术旨在填充用户项关联矩阵的缺失条目。 SparkMLlib当前支持基于模型的协作过滤,其中通过一小部分潜在因素来描述用户和产品,这些潜在因素可用于预测缺少的条目。SparkMLlib使用交替最小二乘(ALS)算法来学习这些潜在因素,本课题项目使用基于隐语义模型协同过滤算法计算用户推荐列表和商品推荐列表,具体实现算法将在后文说明。
    2.5.6实时推荐算法
    实时推荐算法主要依托于离线推荐的计算结果,推荐列表具备实时性,并且更新推荐结果的速度快。推荐原理是根据离线智能推荐计算的商品预测评分与用户对商品的真实评分相乘再取平均值,从而得出候选商品的推荐优先级。此外,还要考虑用户对商品的好评与差评对商品实时推荐列表优先级的影响。
    2.5.7其他推荐算法
    其他的混合推荐算法主要包含基于内容的推荐、基于物品的协同过滤推荐和借助用户画像系统实现推荐。除用户画像系统以外,具体算法将在后文中简单地做介绍。
    2.6Linux系统
    Linux有基于商业开发版如Redhat Linux和社区开发版如CentOS(RedHat简化免费版),由于其免费、开源、稳定等诸多优点,Linux系统是众多企业部署项目的首选,而本课题项目电商推荐系统的大部分组件(如Flume、Kafka)更适合在Linux系统中运行,项目的开发环境在Windows系统中,Spark生态系统组件运行在Linux系统中。
    2.6.1CentOS虚拟机
    由于CentOS是一款免费、开源的Linux社区版,本课题项目选择CentOS6.8版本的Linux操作系统部署电商推荐系统项目。本课题项目目前仅需在本地部署实现运行,无需部署上线,因此,需要在Window操作系统中搭建CentOS Linux虚拟机用来部署电商推荐系统。
    2.6.2云主机
    云主机产品来自于云计算平台,主要包含公有云和私有云。在公司的大数据平台中,推荐使用亚马逊云或私有云,但是本课题项目电商推荐系统对用户信息数据安全比较重要,而用户用户行为数据安全就没那么重要了。因此,可以使用私有云存储用户信息数据,使用公有云存储用户用户行为数据。
    2.7未来技术选型
    大数据、人工智能数据挖掘这些技术是近几年开始火起来的新技术,由于时代的发展和技术的更新迭代,势必将会使用新技术来取代旧技术,下面就以当前时间背景下的电商推荐系统的未来技术选型做简单的介绍。
    注意本课题项目设计时间为2019年9月,论文提交时间为2020年5月,本文中的“未来”是以论文提交时间为起点,以下技术也许将在多年以后被淘汰,仅以当前时间点为参考。
    2.7.1EMR-Hadoop
    尽管Spark生态系统先进已经非常强大,甚至可以独立于Hadoop运行,但是如果本课题项目采用Yarn调度Spark任务的话,Spark的运行效率可能会比部署在本地(local)更高一些。此外,Spark目前还没有像HDFS(即Hadoop Distributed File System)那样的数据存储系统[12],本课题项目电商推荐系统的业务数据目前还存储在MongoDB中,在未来的电商推荐系统架构中,本课题项目需要将数据存储在多台不同的机器上,未来会用到Hadoop或EMR。
    Yarn、HDFS都是Hadoop框架的组件,此外还有MapReduce[13],在现今众多企业中还在使用HDFS存储数据。因此,使用Hadoop框架还是很有必要的。
    Amazon EMR是全球领先的云计算大数据平台,它能和Apache Spark、Hive、HBase、Flink等大数据组件兼容,Amazon S3可以取代Hadoop的HDFS来存储海量分布式数据,因此,本文的电商推荐系统在未来可以选取EMR云大数据服务。
    EMR-Hadoop是Amazon EMR对Hadoop的封装,保留Hadoop的功能不变,使用Amazon S3存储数据,使用Hadoop的Yarn作为调度器。
    2.7.2Hive
    Hive是由FaceBook公司开发的一款大数据分析工具,Hive通过建立映射关系存储数据,它的底层通过将Hive SQL转换成MapReduce程序处理和分析数据,具有低延迟高效率等优点。在本文的电商系统的未来架构规划中,免不了要使用Hive建表和数据分析,因此,未来需要使用Hive。
    2.7.3Flink
    本文的电商推荐系统既然使用Spark框架,为什么还要使用Flink?因为Flink是一款纯流式计算框架,而Spark的流式处理框架SparkStreaming还只是微批处理思想[14],在实时性要求更高的情况下,使用Flink处理实时计算任务效率更高。此外,Flink还对离线计算、机器学习这方面技术有着很好的支持。
    既然Flink这么优秀,为什么现在不使用Flink呢?Apache Flink是一款新框架,在稳定性以及安全性等多方面目前还有待考证,未知Bug可能比较多。另一方面,在对数据量大的数据处理和历史数据交互查询计算方面,Spark的表现还是非常优秀的,因此,本课题项目推荐系统目前使用Spark框架。
    2.7.4Dubbo与SpringCloud
    Apache Dubbo是阿里巴巴公司开发并捐给Apache的一款开源RPC框架,它在分布式与微服务系统中被广泛应用,曾经风靡国内乃至全球。Dubbo曾在2014年停止更新,一度让Dubbo开发者和爱好者失落,而在2018年Dubbo传来继续更新的消息,Dubbo再次吸引了众多开发者的关注,Dubbo是电商推荐系统的未来技术框架选型之一。
    SpringCloud是一款微服务框架,本课题的电商推荐系统是使用SpringBoot作为业务服务,而SpringCloud对SpringBoot灵活地架构支持,项目后期会使用SpringCloud。
    与Dubbo相比,尽管SpringCloud目前对资源的消耗更高,效率更低,但是SpringCloud支持的核心功能比Dubbo多,且开发难度比Dubbo小,目前很多公司的微服务架构都从Dubbo往SpringCloud方向迁移。
    在未来的架构设计中,将设计电商微服务系统,现在的电商推荐系统将作为电商系统的一个子模块为电商业务服务,其中的用户对商品的偏好信息主要来源于用户对商品的浏览以及用户对商品的购买情况,采用代理链条模式,借助SpringCloud实现微服务电商系统,未来的电商微服务系统设计图如图2-2所示。

    图 2-2未来架构:SpringCloud代理链条设计模式
    在未来的电商微服务系统中,采用Consumer管理注册中心,分别管理User服务、Product服务、Recommend服务、Order服务、Trade服务。每个服务模块功能高度内聚,分别负责各自的服务模块,不同的服务模块之间借助Consumer通信。当用户使用电商微服务系统时,通过User服务实现注册登录,完成以后方可浏览Product服务中的商品;用户浏览商品以后将浏览行为信息通知给Recommend服务实现浏览推荐功能;若用户对商品下订单,Order模块将通知Recommend模块实现购买推荐;若用户通过Trade模块完成了对订单支付,则Trade模块通知Order模块更新订单支付结果,并通知Recommend模块提升已支付订单中商品的推荐优先级。
    2.7.5用户画像
    用户画像技术是一种能有效预测用户对商品的喜好的技术,它能根据用户的众多行为虚构出用户的“形象”,如图2-3所示。比如:性别、年龄、婚否、育否、收入、消费能力、消费价位、消费口味等等标签,即用户信息“标签化”,从而提升电商推荐系统的推荐准确度[15]。比如用户为男性、青少年,用户在搜索“服装”的时候就不会被推荐到女装、老年服装等等商品。

    图 2-3使用用户画像对用户进行聚类分析
    在后期的电商推荐系统设计中,将会对用户进行聚类分析,给每位用户构建一个虚拟的“模型”,从而提升电商推荐系统的推荐准确度。此外,使用用户画像技术在用户首次注册时手机用户的购物偏好标签,从而解决电商推荐系统的“冷启动”问题(文章后面会介绍),使电商推荐系统能懂用户的“初心”,由于课题主题与用户画像无关,本文不对用户画像技术的具体实现做介绍。
    第3章
    系统可行性分析和需求分析
    本课题项目要明确电商平台的用户需求,对用户的需求进行合理性的分析,如果需求合理,方可对推荐系统进行开发。
    3.1系统可行性分析
    系统的可行性分析是衡量系统是否可实现的重要指标,主要包含以下三点:
    3.1.1技术可行性
    系统目前借助Angular实现用户的可视化,由于项目前后端分离,未来会考虑微信与支付宝小程序、安卓、IOS等用户可视化服务的开发;综合业务层采用JavaEE的SpringBoot微服务实现,部署在Tomcat上;数据存储部分借助于MongoDB和Redis;离线推荐部分采用SparkSQL与SparkMLlib实现;实时推荐部分采用Flume-ng对用户评分行为数据进行采集,采用Kafka作为流式消息缓冲,将数据推送到SparkStreaming进行实时推荐。
    在研究这个课题的时候,作者使用的开发环境是Windows10操作系统,最终将项目部署到单节点伪分布式Linux操作系统中,Linux系统采用8G内存+8核16线程的CPU+500G磁盘,系统运行正常。
    这些技术是当今前沿新技术,尚存在很多未知Bug,但项目选择比较稳定的版本,因此在开发中很少存在问题。
    3.1.2经济可行性
    系统的经济可行性分析是衡量系统能否实现的重要标准,是企业开发信息系统对经济花费进行评估的前提,包括人力、物力、投资成本等诸多方面,同时考虑使用系统以后可能带来的收益。因此,务必在系统实施之前对系统进行经济可行性分析后方可进行系统开发。
    本课题项目虽然使用了很多新技术框架,如Apache基金会的项目、SpringFrameWork,但是这些框架全部都是开源的,在选择项目框架上完全是“0成本”。由于使用电商推荐系统的过程中可能会有系统运维上的成本,但由于大数据技术可以使用Zookeeper(上述架构中未描述)搭建高可用,在分布式系统中某个节点宕机后,系统还能正常运行而不受任何影响,以确保系统的稳定性,降低运维成本。
    在技术可行性分析中提到,目前采用的单节点Linux服务器成本并不高,但随着后期将电商推荐系统项目上线后,未来会搭建分布式系统,扩大系统可用磁盘空间、内存空间、CPU核数等等,也会搭建分布式文件系统、内存集群与CPU集群等等,这些开销将会根据公司上线项目的具体业务进行计算,但总体成本远比项目收入低。
    尽管现在国内外很多大公司(如阿里巴巴、亚马逊)的电商推荐系统做得已非常成熟,但我此次研究推荐系统的在一方面可以帮助众电商平台尤其是中小企业降低个性化推荐系统的实现成本。
    3.1.3操作可行性
    本次课题电商推荐系统选用选用B/S的架构模式开发,由于采用了微服务技术,前后端分离,未来仅需开发小程序界面、安卓应用界面等即可实现C/S的架构模式,而不需要再次进行后端开发。因此,项目不仅降低了前端开发成本,也可以提供多个流量入口,既丰富了用户的购物体验,又给电商平台增加了用户流量。
    在B/S架构的基础上,显示层用JavaEE前端技术的AngularJS框架,用户只需要打开浏览器输入网址即可即可访问Web页面,使用非常方便。
    在未来的C/S架构的基础上,用户的操作更简单了,只需要打开电商平台APP或者打开微信、支付宝使用小程序进行本课题项目的电商平台就可以“一键”登录系统。
    3.2系统需求分析
    系统包含了多个模块,需要对不同的模块进行需求分析,下面简单地介绍系统需求。
    3.2.1JavaEE业务功能需求
    JavaEE模块主要用于处理用户前后端业务交互的需求,不是本课题项目的重点,在JavaEE模块中仅需要提供用户注册、登录、采集用户行为数据和商品查询功能。
    (1)用户注册和登录
    电商推荐系统目前只设计用户注册和登录功能,用户首次使用电商推荐系统时,用户需要先注册方可登录。由于新用户尚未对任何商品有过评分和浏览行为,系统无法预测用户的商品口味偏好,因此需要收集用户的偏好标签。
    (2)用户可视化
    电商推荐系统服务的用户是普通消费者,因此需要提供JavaEE(或微信小程序、安卓界面等等)前端界面展示。
    (3)埋点数据采集
    用户的每次浏览商品、对商品进行评分等行为都会被系统记录下来,系统将用户的行为数据发送给推荐模块和数据库方可实现电商推荐系统的推荐功能[11]。
    (4)商品模糊查询
    用户登录操作系统后除了查看推荐商品以外,还可以根据自己的购物需求查找商品。用户不需要知道商品的详细信息,仅需输入商品名称关键字即可,商品模糊查询功能可为用户查看商品提供便利。
    3.2.2商品推荐服务功能需求
    项目的核心在推荐服务功能模块上,推荐功能主要包含离线推荐与实时推荐功能需求,项目中的数据需要被持久化,因此需要考虑数据存储需求。
    (1)数据存储部分
    存储数据主要需要使用业务数据库和缓存数据库,主要有有两点需求:
    一是业务数据库需要存储对海量数据频繁增删改查的支持。
    二是缓存数据库需要高效率,是其满足实时计算的需求。
    (2)离线推荐部分
    离线推荐要求海量数据处理能力比较强,并且推荐准确度要高,主要有有两点需求:
    一是统计推荐需要使用能进行交互式计算处理海量数据框架。
    二是机器学习个性化推荐要求推荐的准确度较高[8]。
    (3)实时推荐部分
    实时推荐要求高效率而不要求特别高的准确率,主要有有两点需求:
    一是日志采集和消息缓冲服务要求传输效率高、延迟低,避免低效率的处理速度影响用户的购物体验感[16]。
    二是实时推荐计算框架要求计算效率高,计算的复杂度低,使计算结果能尽快被推送到前端界面。
    第4章
    系统方案的设计
    4.1系统功能设计原则
    通过以上的分析,综合当前JavaEE开发和大数据开发行业的原则,本课题电商推荐系统设计主要遵循以下几点原则:
    4.1.1微服务
    项目采用SpringBoot搭建微服务项目,微服务系统主要设计原则如下图4-1所示。

    图 4-1商品表数据样例
    以下是微服务六大设计原则概述:
    (1)高内聚低耦合
    高内聚主要体现在系统的功能强度内联系,保证系统的某个功能的各个元素之间紧密联系[3]。除了功能内聚以外,还要保证通信、顺序、逻辑等多个内聚指标。
    耦合性主要体现在系统的各个模块之间的联系,若系统的不同模块之间联系很紧密,则耦合性就强,系统开发不需要这么强的耦合性,各个模块之间要要松耦合。
    (2)高度自治
    高度自治原则便于服务进行拓展开发和运行,使项目团队对整个生命周期负责,对于不同团队开发的模块由各自所在的团队进行维护和开发。
    (3)以业务为中心
    以业务为中心原则认为不同的服务代表不同的业务逻辑,使整个服务能快速响应和更新业务的变化。
    (4)弹性设计
    弹性设计原则使系统的可容错性提高,使系统具有更强的自我保护能力。
    (5)日志与监控
    日志与监控原则在监控系统的时候,如果发现系统出现异常或者错误,能够快速定位出错的位置,方便排查错误,并且检测可能发生的意外和错误,使运维人员能提前做好防范。
    (6)自动化
    自动化原则可以淘汰传统的手工运维,使运维变得自动化,这样就能极大地节约电商平台的运维成本。
    4.1.2MVC架构
    MVC即模型、视图、控制器,是现今JavaEE项目开发的常用架构,在开发电商推荐系统的时候可以根据不同的技术人员的专项擅长技术进行开发。
    在数据模型层模型需要将代码实现多个视图的复用,减少代码的重复。
    在视图层可以开发不同的图形界面,如JavaWeb前端、微信支付宝小程序、安卓,而实现电商推荐系统的功能主要依赖于调用后端程序API,这样前端开发人员不用处理后端代码,后端开发人员也不用处理前端代码。
    在业务逻辑层使用Controller控制器接受用户不同的操作行为来处理用户请求即可。
    4.1.3系统的可靠性
    在企业级JavaEE大数据项目中,经常会出现通信中断、高并发、服务器宕机等情况,因此,务必要保证系统的可靠性。
    (1)微服务的可靠性
    在微服务的设计原则中提到,使用日志与监控原则能提高系统的可靠性。此外,还要对数据链路进行检测,以杜绝网络通讯时出现故障;使用断链重连机制在系统断开与客户端连接后自动重连;使用消息缓存重发实现异步地发送消息;使用高并发技术使系统能承受众多用户的请求等等。
    (2)大数据平台的可靠性
    本课题项目电商推荐系统目前虽然是单节点伪分布式项目,如果遇到问题很容易追溯问题位置并解决问题,而在电商公司的企业级项目中,使用单节点项目是完全不可以的。在企业级电商项目中,通常使用分布式系统存储数据和处理数据,因为数据量实在是太大了。在前面的分析中,为了保证环境系统环境的可靠性,部署分布式系统时考虑到部署HA(高可用);在此之外为了保证内存、CPU的可靠性,可以根据业务实际情况搭建内存、CPU集群;为了保证实时推荐的可靠性,使用基于内存的缓存数据库Redis作为辅助数据库。
    4.1.4系统的维护性
    系统的可维护性是衡量系统在出现故障或者错误以后可改进的难易程度。目前电商推荐系统的可维护性包括JavaEE微服务项目的可维护性与Linux服务器Spark生态系统的可维护性。
    JavaEE模块可由Java开发团队进行维护,谁开发由谁维护。Spark生态系统的维护则有点复杂,如果电商企业对数据安全要求不高,可以选用公有云如阿里云、百度云、腾讯云等公有云平台部署项目,如果对数据安全要去较高则应当使用公司的私有云;在技术选型上,如果电商企业有雄厚的专业运维团队,可选取Apache Hadoop搭建Hadoop生态系统,该框架开源但维护成本高。若果如果电商企业没有专业的运维团队,可选取CDH(Cloudera’s Distribution Including Apache Hadoop)版本的Hadoop搭建Hadoop生态系统,该发行版部分闭源且免费,运维简单,但如果出现故障需要Cloudare公司维护则需要支付维护费用。
    4.2项目架构设计
    电商推荐系统的架构主要包含四大模块,分别是JavaEE模块、数据库模块、离线推荐服务模块、实时推荐服务模块,电商推荐系统架构设计图如图42所示。

    图 4-2电商推荐系统架构设计图
    目前架构设计如下:
    JavaEE前端采用AngularJS作为可视化界面,后端采用SpringBoot实现综合业务服务,在SpringBoot服务中部署埋点用于用户评分行为数据采集。
    数据持久化层采用MongoDB和Redis分别作为电商推荐系统的业务数据库和缓存数据库。
    离线推荐服务采用适合离线交互式计算的SparkSQL进行统计推荐,智能推荐服务采用SparkMLlib实现。
    实时推荐采用SparkStreaming实现,需要借助Flume、Kafka作为实时数据传输的中间组件。
    4.3JavaEE模块的设计
    本课题设计的电商系统借助微服务部署,主要包含JavaEE模块和大数据模块两个部分。JavaEE模块包含前端界面和后端服务,大数据模块包含离线推荐、实时推荐模块。
    4.3.1前端模块的设计
    前端模块使用AngularJS设计,由于电商推荐系统前后端分离,前端界面仅需前端开发人员设计,而后端开发人员仅需提供调用API即可。
    目前电商推荐系统的JavaWeb前端模块仅需开发首页为用户提供注册、登录、浏览商品、对商品评分功能。
    首页:用户输入电商推荐系统网址后需要先登录,若用户已登录,则显示商品页面。
    用户注册:若是新用户,必须先注册才能登录。
    商品搜索:在电商行业中搜索引擎非常重要,在本项目中仅仅设计模糊查询功能。
    商品页:展示商品的近期热门、猜你喜欢等;若是用户首次使用,由于尚未收集到用户的评分数据,系统只能展示近期热门商品。
    商品详情页:用户通过点击商品页的商品进入商品详情页,可以看到商品详情,此处有商品评分按钮,用户可以对商品进行评分。
    4.3.2后端模块的设计
    后端模块借助SparingBoot封装Tomcat,用来处理前端发送的请求,主要设计如下:
    用户登录:通过判断用户输入的信息是否正确实现登录。
    用户注册:前端界面校验用户输入合法后方可完成注册。
    商品页面:商品页页面仅提供查看商品详情页的API。
    商品详情页:商品详情页仅提供对商品评分的API。
    日志采集:使用Nginx部署埋点采集用户对商品的浏览和评分数据,追加写入到日志文件中方便进行实时推荐。
    商品搜索:项目采用Elasticsearch提供的API来实现商品搜索功能。
    4.4数据仓库设计
    如果说推荐算法是电商推荐系统的灵魂,那么数据就是电商推荐系统的筋骨了,因此,进行数据库设计是非常必要的。由于业务的需要以及对海量数据的管理,本课题项目电商推荐系统摒弃了传统的关系型数据库(如MySQL、Oracle),选择MongoDB和Redis作为电商推荐系统的业务数据库和缓存数据库。
    4.4.1业务数据库
    由于系统有频繁的对海量数据进行增删改操作,使用传统的关系型数据库可能有点力不从心[4]。因此,本课题项目采用现今被广泛使用的文档型数据库MongoDB,MongoDB主要负责电商推荐系统的逻辑数据的存储。
    令人感到遗憾的是,MongoDB在4.0版本更新以后才增加对事务的支持。而出于对项目的稳定性以及与各个大数据组件之间的兼容性的考虑,本课设项目目前采用的是不支持事务的MongoDB-RHEL-3.4.3版本,因此会遗留数据库事务上的问题,具体会在文章后面讨论。
    4.4.2缓存数据库
    在电商推荐系统项目中,由于系统需要根据用户近期的浏览行为快速地[16]了解到用户的购物偏好,因此必须使用实时推荐,因此需要借助Redis作为缓存数据库来满足实时推荐需求。
    以上两种数据库的表设计方案将在文章的后面进行详细地说明。
    4.4.3业务数据表的设计
    本课题属于大数据项目,由于需要存储海量的数据,因此不能像设计关系型数据库那样设计表。在数据仓库中,通常选择维度建模,各表之间不能有太多的Join(连接)操作。在模型架构上,选取星型架构,根据OLAP的需求,确保系统的稳定性、高效率能得以保证。
    项目的业务数据模型的具体表结构如下:
    1.Product【商品表】
    表 41 商品表

    字段名

    字段类型

    字段描述

    _id

    ObjectId

    主键

    productId

    Int

    商品的ID

    name

    String

    商品的名称

    categories

    String

    商品所属类别, 用“|”分割

    imageUrl

    String

    商品图片的URL

    tags

    String

    商品的UGC标签, 用“|”分割


    商品表数据样例如下图43所示:

    图 4-3商品表数据样例
    2.Rating【用户评分表】
    表 42 用户评分表

    字段名

    字段类型

    字段描述

    _id

    ObjectId

    主键

    userId

    Int

    用户的ID

    productId

    Int

    商品的ID

    score

    Double

    商品的分值

    timestamp

    Long

    评分的时间


    用户评分表数据样例如下图44所示:

    图 4-4用户评分表数据样例
    3.User【用户表】
    表 43 用户表

    字段名

    字段类型

    字段描述

    _id

    ObjectId

    主键

    userId

    Int

    用户的ID

    username

    String

    用户名

    password

    String

    用户密码

    timestamp

    Lon0067

    用户创建的时间

    字段名    字段类型    字段描述

    字段名

    字段类型

    字段描述

    _id

    ObjectId

    主键

    userId

    Int

    用户的ID

    username

    String

    用户名

    password

    String

    用户密码

    timestamp

    Long

    用户创建的时间


    用户表数据样例如下图45所示:

    图  45用户表数据样例
    4.RateMoreRecentlyProducts【最近商品评分个数统计表】
    表 44 最近商品评分个数统计表

    字段名

    字段类型

    字段描述

    _id

    ObjectId

    主键

    productId

    Int

    商品的ID

    count

    Int

    商品的评分数

    yearmonth

    String

    评分的时段


    最近商品评分个数统计表数据样例如下图46所示:

    图  46近期商品评分个数统计表数据样例
    5.RateMoreProducts【商品评分个数统计表】
    表 45 商品评分个数统计表

    字段名

    字段类型

    字段描述

    _id

    ObjectId

    主键

    productId

    Int

    商品的ID

    count

    Int

    商品的评分数


    商品评分个数统计表数据样例如下图47所示:

    图 47商品评分个数统计表数据样例
    6.AverageProducts【商品平均评分表】
    表 46 商品平均评分表

    字段名

    字段类型

    字段描述

    _id

    ObjectId

    主键

    productId

    Int

    商品的ID

    avg

    Double

    商品的平均评分


    商品平均评分表数据样例如下图48所示:

    图 48商品平均评分表数据样例
    7.ProductRecs【商品相似性矩阵】
    表 47 商品相似性矩阵

    字段名

    字段类型

    字段描述

    _id

    ObjectId

    主键

    productId

    Int

    商品的ID

    recs

    Array[(productId:Int,score:Double)]

    商品最相似的商品集合


    商品相似性矩阵数据样例如下图49所示:

    图  49商品相似度矩阵表数据样例
    以上是电商推荐系统的业务数据表及样例数据内容,可以看到MongoDB存储的是Json数据,但是实际上读取相关数据也可以是关系型的,其中key为字段名,value为字段内容。
    4.4.4系统数据流程设计
    所有数据的来源均为JavaEE服务获取的数据,其中包含用户注册信息和系统通过Nginx埋点收集得到的用户行为数据,系统的各个服务中数据流程如下图4-10所示。

    图  410 电商推荐系统数据流程设计图
    在用户首次注册之前,为了提供热门商品进行展示,系统需要提前在MongoDB中存储部分业务数据,使用统计推荐服务对商品进行“预推荐”。
    用户首次注册时,用户输入的注册信息直接被存储到MongoDB数据库中。
    用户浏览商品或对商品进行评分以后,Flume将收集到的数据发送到Kafka,Kafka通过生产者/消费者模式将数据广播给SparkStreaming进行实时计算。
    用户的部分评分数据可直接通过Redis缓存数据库实时发送到SparkStreaming进行实时计算。
    实时推荐服务将计算结果存储到MongoDB同时给用户进行展示。一段时间后,MongoDB中历史数据有更新,此时离线计算模块还要重新计算推荐列表以更新推荐结果。
    4.5离线推荐模块的设计
    离线推荐模块使用的是离线的历史数据,数据没有没有实时性,推荐原理是在设定的时间周期内计算推荐结果,其推荐内容在一定的时间周期内是一成不变的。根据业务需要,随着时间调整,离线推荐结果才会产生变动。在离线推荐模块中,需要预先设计推荐指标,以展示商品给用户,方便收集用户的实时评分数据,为实时计算做支撑。
    4.5.1离线统计推荐模块的设计
    统计推荐模块仅需借助SparkSQL实现,主要以下统计三个指标:
    (1)历史热门商品统计
    统计历史热门商品的实现思路是根据商品ID聚合,再进行COUNT(DISTINCT)统计,最后按照从大到小降序排列展示出来[7]。
    (2)近期热门商品统计
    统计近期热门商品原理与统计历史热门商品相同,不同的是限制时间段,这个时间段可根据业务设定。
    (3)商品平均评分统计
    统计商品的平均分主要根据商品的ID聚合,再使用AVG函数计算平均分,最后将计算结果按照从大到小的顺序排列展现出来。此外,若业务对实时性有要求,可以设置时间段滑动为过滤条件。
    4.5.2离线个性化智能推荐模块的设计
    本课题项目采用SparkMLlib的ALS算法作为协同过滤算法,需要先选取模型训练,然后计算用户推荐列表和商品相似度矩阵[8]。
    (1)用户推荐列表计算
    计算用户推荐矩阵的主要实现思路是将用户ID和商品ID做笛卡尔积,再通过训练的模型预测用于对商品的评分,将预测结果排序后商品来作为当前用户的商品推荐列表。
    (2)商品相似度矩阵计算
    计算商品相似度矩阵,主要实现原理是我们熟悉的计算余弦相似度,即COS(A,B)的值,两件商品的余弦相似度越趋近于1说明两件商品越相似,否则就越不相似[17]。由于候选商品有很多种,因此需要计算多种商品与备选商品的余弦相似度。
    4.6实时推荐模块的设计
    与离线推荐模块不同的是,实习推荐模块反映的是用户最近一段时间的商品口味偏好,而离线推荐反映的是总体用户在长期或者某个时间段内的商品偏好[11]。实时推荐对推荐结果的时效性要求比较高,而对商品推荐的准确度要求就没那么高了,当然,如果能做得推荐准确度也高那也更好。
    在实时推荐模块中,用户需要更新后的商品更加满足用户的近期商品口味偏好,在后文的系统实现章节中,文章会详细介绍实时推荐算法。

    第5章推荐算法设计
    5.1离线统计推荐算法设计
    统计推荐算法主要借助SQL和UDF函数实现,其目的在于为离线个性化智能推荐与实时推荐提供数据支持,并且粗略地反映大众用户在某个时间段内对不同商品的喜好程度。例如:用户在搜索“手机充电器”的时候,由于曾经风靡全球的“万能充”早已过时,因此推荐系统会优先给用户推荐USB接口的充电器而尽量少地推荐“万能充”,如图5-1所示。

    图 5-1统计推荐将过滤掉曾经热门商品“万能充电器”
    下面简单地介绍统计推荐算法。
    5.1.1历史热门商品统计推荐算法设计
    统计热门商品的核心算法思路如图5-2所示:

    图 5-2统计热门商品的SQL
    统计历史热门商品仅需要根据商品ID聚合汇总再降序排列即可,将SQL代码注入到SparkSession.sql()函数中实现。
    5.1.2近期热门商品统计推荐算法设计
    统计近期热门商品的核心算法思路如图5-3所示:

    图 5-3统计近期热门商品的SQL
    统计近期热门商品需要将商品评分表中的时间戳进行“规范化”,即使用UDF函数(图中的CHANGEDATE函数)将时间戳转化成“年-月”的形式,再根据年-月与商品ID聚合,根据商品ID汇总,再根据年-月、商品ID汇总结果降序排序即可,将SQL代码注入到SparkSession.sql()函数中实现。
    5.1.3商品平均评分统计推荐算法设计
    统计商品的平均分的核心算法思路如图5-4所示:

    图 5-4统计商品的平均分的SQL
    统计商品的平均分仅需要根据商品ID聚合,使用系统函数(图中的AVG函数)计算商品ID对应的平均法再降序排序,将SQL代码注入到SparkSession.sql()函数中实现。
    5.2离线个性化智能推荐算法设计
    该算法是电商推荐系统算法的核心,呈上依赖于统计推荐模块提供的数据集与推荐结果,启下则被用于计算实时推荐列表的商品推荐优先级。核心算法思想是使用ALS算法进行矩阵分解,从而得到用户推荐矩阵与商品推荐矩阵,进而计算推荐列表。
    5.2.1计算用户商品推荐列表
    使用ALS算法的矩阵分解技术将训练出来的模型用来计算用户的商品推荐列表[17],ALS算法交替最小二乘法的原理为矩阵分解技术,主要思想如图5-5所示。

    图 5-5ALS算法的矩阵分解思想
    我们知道两个矩阵相乘可以得到一个新的矩阵,例如将用户特征矩阵的转置与商品特征矩阵相乘可以得到一个新的评分矩阵,如图5-6所示。但是如何将一个评分矩阵拆分成一个用户对商品隐含特征的偏好矩阵矩阵和一个商品所包含的隐含特征矩阵,并且两个新矩阵相乘近似逼近原评分矩阵呢?ALS算法就能帮我们解决这个问题。

    图 5-6用户特征矩阵的转置与商品特征矩阵相乘
    使用ALS算法可进行显式评分训练与隐式评分训练,进行训练模型时需要传入四个参数。若进行显式训练,则参数为ALS.train(ratings, rank, iterations=5, lambda_=0.01),若进行隐式训练,则参数为ALS.trainImplict(ratings, rank, iterations=5, lambda_=0.01)。其中,ratings指训练模型数据源,rank指的是当进行矩阵分解时,将原矩阵分解成X(m*rank)矩阵和Y(rank*n)矩阵,Iterations指ALS算法的迭代次数,lambda为正则化项系数,默认为0.01。使用ALS算法就可以将用户评分矩阵进行分解,思想如图5-7所示。

    图 5-7ALS算法将评分矩阵分解
    使用ALS算法训练结果是否是我们想要的结果,这个需要通过模型评估来判断,以下是模型评估与参数选取算法。
    5.2.2模型评估和参数选取
    由于使用ALS算法进行模型训练是传的参数是手动设置的,因此需要调整参数,最简单的做法是计算均方根误差[8],计算公式如下:
                   (5.1)
    本文在前文中提到,这个均方根误差并非越小越好,当然均方根误差也不能太大,否则就会出现欠拟合与过拟合,因此在实际开发中需要合理调整ALS算法的相关参数,使计算结果更加准确,最优参数选取结果将在后文的“系统实现”章节说明。
    5.2.3计算商品相似度矩阵
    计算商品相似度矩阵步骤如下:假设用户有k个特征,商品也有k个特征,则商品p和q之间的相似度可以由p与q的余弦值来表示[20],具体公式如下:
                        (5.2)
    5.3实时推荐算法设计
    实时推荐算法的基本原理是考虑用户在最近一段时间内对商品口味是相似的,因为随之时间的推移,用户对商品的购买欲望会随着年龄、季节、职业、婚育情况等多个方面的改变而发生改变。如图5-8所示,用户为已婚女性,她在去年冬天曾给儿子买过一件棉袄并对该棉袄给予了很好的评价,但是当前季节为夏天,那么在搜索“服装”的时候,此时系统会优先给用户推荐女性的夏装T恤而不是男性的童装棉袄。

    图 5-8实时推荐将潜在屏蔽用户曾经喜爱但已“过时”的商品
    需要区分清楚的是,实时推荐与统计推荐的实现思虑、应用场景是完全不同的。统计推荐反映的是大众用户的历史数据生成的推荐列表,而实时推荐反映的是用户实时购物偏好,推荐结果会随着季节、温度、用户年龄等因素的变化而改变。
    5.3.1实时推荐算法思想
    假设u代表用户,p代表商品,用户对p进行评分后,系统需要对k个候选商品进行评分计算[11]。若用户对商品进行好评,则给商品的相似候选进行加分计算,若对商品进行了差评,则给该商品的相似商品进行扣分计算。
    具体来说:获取用户的最近k次评分PK和与商品p相似的k个商品S,假设商品的推荐优先级,那么计算公式如下:
      (5.3)
    其中:
    表示用户u 对商品r 的评分,sim(q,r)为商品p与r的余弦相似度,该数值来源于离线个性化推荐模块中商品相似度矩阵计算结果。sim_sum 表示用户评分商品个数,incount、recount 表示用户给商品好评和差评的个数。
    5.3.2初步计算备选商品的推荐优先级
    初步计算备选商品的推荐优先级公式的如下:
                        (5.4)
    公式的意义如下:计算候选商品的余弦相似度,只需计算商品余弦相似度与用户对商品评分的乘积,将计算结果取平均数[14]即可。
    5.3.3计算偏移项
    初步计算备选商品的推荐优先级后,根据商品的平均评分确定用户对商品的评价是好评还是差评,根据用户的评分计算偏移项,若是好评则优先推荐,若是差评则降低推荐优先度,计算公式如下:
                   (5.5)
    5.3.4计算备选商品的最终推荐优先级
    通过对上述两个指标的计算,备选商品的最终推荐优先级的计算仅需将初步计算备选商品的推荐优先级与偏移项相加即可。比如用户对三件商品的评分分别为5分、4分、1分,假设3分为中评,其中有两件好评和一件差评,则候选商品的计算得分为结果如下:
        (5.6)
    5.4其他推荐算法
    为提升商品推荐准确率,还可以使用其他的推荐算法进行混合智能推荐,以下是除上述推荐算法以外两种常用的推荐算法。
    5.4.1基于内容的相似推荐算法
    基于内容推荐需要用户给商品打标签,然后借助程序提取标签内容,从而获取商品的特征向量[15]。该方法也可以得出用户商品推荐矩阵,并且可以将计算结果传到实时推荐模块,提升推荐的准确度。
    该算法的具体实现如下:
    首先提取商品的标签信息来提取特征向量,算法公式[23]如下:


    接下来就是计算特征向量的余弦值,跟文章上述计算原理一样,在此不再重复介绍,结果余弦值接近于1的商品进行推荐[24],具体公式如下:
            (5.9)
    5.4.2基于物品的协同过滤相似推荐算法
    基于物品的协同过滤(Item-CF),系统通过埋点收集用户的多种行为就可以预测不同商品的相似度[19],这比通过用户评分预测商品相似度更加方便。
    利用Item-CF算法可以分析用户喜欢什么样的商品,在电商乃至零售行业中都被广泛地应用。假设Ni、Nj分别为对商品i、j评分过的用户列表,那么具体的推荐计算公式[25]如下:
                             (5.10)
    5.4.3用户画像系统算法
    用户画像系统的算法常常使用K-means聚类分析算法,核心思路是选择K个点作为初始聚类中心,计算欧几里得距离,C代表中心点,X代表任意一个非中心点。在此不做重点介绍,核心算法公式如下:
                             (5.11)

    第6章
    系统实现
    6.1前台用户系统实现
    前台不做重点介绍,简单地实现了用户注册登录、浏览商品页面,以满足最基本的功能需求。
    6.1.1登录注册模块的实现
    用户登录:用户进入电商推荐系统前需要完成登录,前端页面如图6-1所示。

    图 6-1电商推荐系统登录页面
    用户注册:新用户输入指定信息经校验正确即可完成注册,前端页面如图6-2所示。

    图 6-2电商推荐系统注册页面
    6.1.2商品页功能的实现
    用户在完成注册与登录后就能看到商品,用户可直接给商品打分,电商推荐系统的前端商品推荐页面如图6-3所示。

    图 6-3电商推荐系统推荐页面
    在商品推荐页面分别展示了统计推荐模块的推荐(如“热门推荐”、“评分最多”)结果和智能推荐(如“猜你喜欢”)结果,需要注意的是,若是新用户首次注册登录,“猜你喜欢”模块的推荐结果为空。
    6.2推荐系统服务环境搭建
    电商推荐系统项目中用到了许多工具对数据进行存储、计算、采集、传输等操作,因此部署服务器环境也是件非常困难的事,现在对环境搭建进行简介。
    6.2.1环境准备
    硬件环境包括4核8进程的CPU、40G磁盘(20G也可),8G内存(4G也可),使用VMware Workstations安装Linux虚拟机的硬件配置如图6-4所示。

    图 6-4VMware Workstations安装Linux虚拟机硬件配置
    软件环境包括VMware Workstations、JDK1.8、Scala2.11、Maven、CentOS6.8、IDEA、MongoDB4.3与Redis客户端。
    Spark生态系统组件包括JDK1.8、Scala2.11、Spark、Kafka、Flume-ng、Redis、MongoDB。
    6.2.2Linux虚拟机搭建
    搭建虚拟机的主要步骤如下:安装CentOS、修改网络IP和网关、修改主机名称为ECommerceRecommendSystem、关闭Linux防火墙。完成后使用Windows的DOS命令ping虚拟机,若ping通,则说明已连上Linux虚拟机,使用Windows下DOS命令ping通Linux虚拟机如下图6-5所示。

    图 6-5使用Windows下DOS命令ping通Linux虚拟机
    注意在安装Redis和MongoDB时也要关闭相应的保护机制,是其可被本地Windows系统连接。
    6.2.3安装框架组件
    搭建好虚拟机够就要安装电商推荐系统的各个插件,其中包括JDK、MongoDB、Redis、Kafka、Flume、Zookeeper、Spark,在此不做详细介绍。
    安装MongoDB(单节点)并配置,步骤省略,安装并启动MongoDB结果如下图6-6所示。

    图 6-6安装并启动MongoDB
    安装Redis(单节点)并配置启动,步骤省略,安装并启动Redis结果如下图6-7所示。

    图 6-7安装并启动Redis
    安装Java并配置环境变量,步骤省略,查看Linux中JDK版本如下图6-8所示。

    图 6-8安装JDK Linux版
    安装Spark(单节点)并启动,步骤省略,查看Spark启动状态如下图6-9所示。

    图 6-9安装并启动Spark,查看Spark启动状态
    安装Zookeeper(单节点)并启动,步骤省略,启动Zookeeper并查看Jps进程如下图6-10所示。

    图 6-10启动Zookeeper并查看Jps进程
    安装Flume-ng、Kafka并启动,步骤省略,启动Kafka并查看Jps进程如下图6-11所示。

    图 6-11启动Kafka并查看jps进程
    以上只是完成电商推荐系统的实现的第一步,安装过程会出现很多问题,在此就不一一列举了。
    6.2.4创建微服务项目
    电商推荐系统项目的推荐业务模块均由Scala代码编写,使用IDEA作为编辑器,使用SpringBoot和Maven作为项目管理和构建工具。
    使用ECommerceRecommendSystem来命名父项目,recommender和businessServer为子项目,分别管理JavaEE模块和推荐服务模块,创建业务服务和推荐服务模块结果如图6-12所示。

    图 6-12创建业务服务和推荐服务模块
    为了减少电商推荐系统使用的各种工具之间兼容性的影响,需要在父模块中声明所有子模块的配置信息,修改系统根模块的pom.xml文件,父模块中添加相关依赖信息如下图6-13所示。

    图 6-13在Maven的父模块中添加相关依赖
    为了方便日志管理,需要在父模块中引入公共依赖,其中包括slf4j、log4j,同时为了构建Maven项目,需要向其中引入引入共有插件,省略代码。
    在recommender模块中,由于本课题项目使用Spark技术栈开发电商推荐系统,因此需要引入Spark的相关依赖,父模块recommender的pom.xml中添加如下配置,推荐模块中添加相关依赖如下图6-14所示。

    图 6-14在Maven的推荐模块中添加相关依赖
    以上是完成父模块相关依赖的配置流程,目前这些版本的依赖之间兼容性比较好,为了考虑兼容性的问题不建议随便修改依赖的版本。
    6.2.5创建推荐服务子模块
    在推荐模块要添加DataLoader、KafkaStreaming、OfflineRecommender、OnlineRecommender、StatisticsRecommender模块,分别负责加载商品及评分数据到MongoDB、发送实时用户评分数据、离线推荐、实时推荐和统计推荐服务,推荐模块中添加不同的服务模块如下图6-15所示。

    图 6-15在推荐模块中添加不同的服务模块
    创建好推荐模块的各个子模块以后,首先要加载数据供离线统计推荐服务和模型训练使用。
    (1)加载数据
    在DataLoader中需要添加Spark的相关版本依赖以及MongoDB的相关依赖,本课题项目选取的Spark和MongoDB的API版本均为2.11,省略相关依赖代码。
    (2)电商数据准备
    将准备好的某电商巨头的商品数据集合评分数据集加载到数据库中,数据来源于亚马逊公司,下面将介绍数据的字段内容。
    (3)Products数据集
    数据格式:productId,name,categoryIds, amazonId, imageUrl, categories,tags
    Products数据集样例如图616所示。

    图 6-16Products数据集
    从Product选取如下几个字段。
    表 61 Products表字段

    字段名

    字段类型

    字段描述

    productId

    Int

    商品ID

    name

    String

    商品名称

    categories

    String

    商品分类,用“|”分割

    imageUrl

    String

    商品图片URL

    tags

    String

    商品UGC标签,用“|”分割


    (4)Ratings数据集
    数据格式:userId,prudcutId,rating,timestamp
    Ratings数据集样例如图617所示。

    图 6-17Ratings数据集
     Rating数据集有4个字段,每个字段之间通过“,”分割。
    表 62 Ratings表字段

    字段名

    字段类型

    字段描述

    userId

    Int

    用户ID

    produtId

    Int

    商品ID

    score

    Double

    评分值

    timestamp

    Long

    评分的时间


    (5)数据初始化到MongoDB
    首先定义Product、Rating、MongoConfig三个样例类,类的属性分别为两表所需的字段和MongoDB的配置信息,商品、评分、MongoDB配置样例类如下图6-18所示。

    图 6-18商品、评分、MongoDB配置样例类
    然后读取Product和Rating数据,将数据以overwrite的方式保存在MongoDB中,执行代码代码后查看linux中的MongoDB表,发现商品表和评分表已被创建,创建结果如图6-19所示。

    图 6-19商品表和评分表
    查看表Product,看商品信息是否被写入MongoDB,发现数据已被写入商品表,Product表中数据如下图6 20所示。

    图 6-20Product表中数据
    查看表Rating表中的数据,发现数据已被写入商品表,Rating表中数据如下图6 21所示。

    图 6-21Rating表中的数据
    到此为止,电商推荐系统的数据准备阶段已基本完成,接下来就是实现电商推荐系统的统计推荐模块功能。
    6.3离线统计推荐系统实现
    电商推荐系统的统计推荐模块主要借助于SparkSQL处理历史数据,根据历史热评、近期热评、平均评分等不同的统计指标统计不同的历史数据,反映的是商品在一段时间内的评分情况[18],为后面的离线推荐(统计推荐也归属于离线推荐,在离线推荐系统功能实现模块中将重点介绍SparkMLlib)与实时推荐提供数据支持。
    在StatisticsRecommender模块中,需要在StatisticsRecommender.pom.xml中配置Scala、Spark、MongoDB的相关依赖,相关代码省略。
    然后需要创建Rating和MongoConfig两个样例类,分别定义各自的属性,Rating和MongoConfig样例类及属性如下图6-22所示。

    图 6-22Rating和MongoConfig样例类
    创建好样例类以后接下来就可以编写统计推荐业务逻辑代码了。
    6.3.1历史热门商品统计
    该模块的核心原理是SparkSQL统计评分次数最多的商品,使用SparkSQL读取评分数据集,通过统计计算得出所有评分最多的商品,使用函数将结果从小到大排序,将计算结果写入数据库,具体实现SQL语句和代码如下如6-23所示。

    图 6-23历史热门商品统计代码
    执行该Spark脚本后查看数据库中历史热门商品统计数据分析结果,具体实现在下文中说明。
    6.3.2最近热门商品统计
    “最近”可以表示为近一个月、进一周、近三天都行,根据设定的时间跨度计算近期时间商品评分最多的商品集合。
    使用SparkSQL读取评分表,通过UDF(Spark中的自定义函数)将评分时间格式化为月、周或者天,再按月、周、天统计商品的评分数,以下以月为例,将统计结果写入到MongoDB,具体实现SQL语句和代码如下如6-24所示。

    图 6-24最近热门商品统计代码
    执行该Spark脚本后查看数据库中近期热门商品统计数据分析结果,具体实现在下文中说明。
    6.3.3商品平均得分统计
    统计MongoDB表中所有历史评分数据,每天计算每个商品的评价得分。实现方式为使用SparkSQL读取Rating表的数据,再计算商品的平均分,将结果写入到MongoDB,具体实现SQL语句和代码如下如6-25所示。

    图 6-25商品平均得分统计代码
    执行程序后,查看MongoDB表,发现三个统计评分表已被创建,统计推荐结果表如图6-26所示。

    图 6-26统计推荐结果表
    点击查看AverageProducts表中的数据,发现数据已被写入商品平均评分表,AverageProducts表中的数据样例如下图6-27所示。

    图 6-27AverageProducts表中的数据
    查看RateMoreProducts表中的数据,发现数据已被写入商品最多评分表,RateMoreProducts表中的数据如下图6-28所示。

    图 6-28RateMoreProducts表中的数据
    查看RateMoreRecentlyProducts表中的数据,发现数据已被写入商品近期评分表,RateMoreRecentlyProducts表中的数据如下图6-29所示。

    图 6-29RateMoreRecentlyProducts表中的数据
    根据以上结果可以看出,使用SparkSQL统计推荐模块已被基本实现。
    6.4基于隐语义模型的协同过滤推荐
    该模块是离线推荐的核心,使用的技术是SparkMLlib实现,具体步骤如下:
    6.4.1计算用户商品推荐列表
    计算用户推荐列表的具体做法为:
    1.读取MongoDB获取userId和productId作为训练数据集;
    2.使用模型预测(userId,productId)对应的商品评分;
    3.过滤计算分值最大的几个商品作为用户的商品推荐列表[19],将数据存储到MongoDB的UserRecs表中。
    实现过程:在OfflineRecommender模块的pom.xml,引入Spark、Scala、Mongo和Jblas的依赖,代码省略。
    分别创建Recommendation、UserRecs、ProductRecs三个样例类并分别定义类的属性,Recommendation、UserRecs、ProductRecs样例类及属性如图6-30所示。

    图 6-30Recommendation、UserRecs、ProductRecs样例类
    首先要进行模型训练,ALS的模型训练代码如下图6-31所示。

    图 6-31ALS模型训练代码
    接着对User表和Product表进行笛卡尔积计算,得到空的User-Product评分矩阵,主要实现代码如下图6-32所示。

    图 6-32计算用户-商品笛卡尔积代码
    计算用户推荐列表的主要代码如下图6-33所示。

    图 6-33计算用户推荐列表代码
    执行该Spark脚本后查看数据库中用户推荐列表结果,具体结果在下文中说明。
    6.4.2模型评估和参数选取
    通过手动调整参数,选择均方根误差最小的一组参数作为最终参数,核心代码实现如下图634所示。

    图 6-34ALS参数调整代码
    计算均方根误差的函数的代码实现如下图6-35所示。

    图 6-35计算RMSE代码
    运行代码,就可以得到目前数据的最优模型参数,最优模型参数如下图6-36所示。当然,这个最优模型参数不唯一,要根据实际计算结果而确定。

    图 6-36最优模型参数
    根据以上结果可以看出,使用SparkMLlib基于隐语义模型协同过滤推荐模块已被基本实现。
    6.4.3计算商品相似度矩阵
    计算商品相似度的代码如下图6-37所示,执行该Spark脚本后查看数据库中商品推荐列表结果,具体结果在下文中说明。

    图 6-37计算商品相似度代码
    求两个向量余弦相似度的函数关键代码实现如下6-38所示。

    图 6-38计算商品余弦相似度代码
    运行程序,查看MongoDB表,发现用户推荐表、商品推荐表已被创建,如图6-39所示。

    图 6-39用户推荐表、商品推荐表
    查看用户推荐列表UserRecs表中的内容,发现数据已被写入用户推荐列表,如图6-40所示。

    图 6-40UserRecs表中的内容
    查看商品推荐列表ProductRecs表中的内容,发现数据已被写入商品推荐列表,如图6-41所示。

    图 6-41ProductRecs表中的内容
    到此为止基于隐语义模型协同过滤的个性化智能推荐功能已基本实现,接下来将介绍实时推荐功能。
    6.5实时推荐系统实现
    电商推荐系统的实时推荐主要反映用户的近期偏好,而离线推荐反映的是用户的长期偏好。因此,用户的实时偏好比长期偏好更能反映用户的购买偏好。
    假设用户近期对A商品评了高分,而对B商品却评了低分,那么实时计算就要求系统近期会多给用户推荐A商品,而潜在地给用户屏蔽B商品。但是实时计算的复杂度不能太高,因此要使用时间复杂度稍微简单的算法。
    实现思路:在StreamingRecommende模块中添加Scala、Spark、MongoDB、Redis、Kafka、Fume依赖,代码省略。
    创建Recommendation、UserRecs、ProductRecs三个样例类并定义属性,标准推荐对象、用户推荐列表、商品相似度列表样例类及属性如下图6-42所示。

    图 6-42标准推荐对象、用户推荐列表、商品相似度列表样例类
    使用KafkaStream处理评分流数据具体算法实现如下图6-43所示。

    图 6-43KafkaStream处理评分流数据算法
    处理数据的核心算法部分如下图6-44所示。

    图 6-44处理评分流核心代码
    实时推荐需要借助Redis获取实时数据,从Redis里获取最近num次评分的核心代码如下图6-45所示。

    图 6-45从redis里获取最近评分代码
    获取当前商品的相似列表计算备选商品的代码如下图6-46所示。

    图 6-46计算备选商品相似列表代码
    以上是实时推荐模型算法介绍,下面将介绍实时推荐算法的具体实现。
    6.5.1实时推荐功能的实现
    实时推荐算法的前提是数据库中已经存储了部分商品推荐信息,并且实时计算框架已经获取到了用户的实时浏览和评分信息。
    实时推荐算法过程主要为:首先获取用户的实时评分,获取相似的关联商品,再计算商品的推荐优先级,下面将详细介绍实时推荐算法。
    6.5.2商品推荐优先级计算
    计算备选商品推荐得分的算法代码如下图6-47所示。

    图 6-47计算备选商品推荐得分算法
    计算结果需要加上偏移项,即对好评进行加分,对差评商品进行扣分,从而得出商品最终预测得分,最终得出商品推荐列表,具体算法代码如下图6-48所示。

    图 6-48计算商品推荐列表核心代码
    计算候选商品和已评分商品的相似度代码核心算法如下图6-49所示。

    图 6-49计算候选商品和已评分商品的相似度代码
    使用logN进行偏移项计算,这里实现为取10的对数(常用对数),计算偏移量代码如下图6-50所示。

    图 6-50计算偏移量代码
    运行以上Spark脚本,系统开始启动实时推荐,实时推荐功能基本完成。
    6.5.3更新实时推荐结果
    通过实时计算得出新的商品推荐优先级以后,实时计算结果要和历史计算结果进行合并,并覆盖替换掉过时的数据,以保持对推荐结果的实时性,在后文将会展示实时推荐结果。
    6.6后台服务系统实现
    JavaEE模块不是电商推荐系统的重点,在此不做重点介绍,在项目的businessServer模块下的java目录、log目录、resources目录、webapp目录下分别存放Java后端代码、系统日志(包含埋点采集日志)、配置文件和前端代码,电商推荐系统自动生成日志如下图6-51所示。简要介绍下用户注册与登录、用户评分模块以及埋点数据采集部分。

    图 6-51电商推荐系统自动生成日志
    执行JavaEE模块,系统将会自动在log目录下生成日志记录。
    6.6.1后台系统注册登录模块实现
    系统前端接收用户的请求,经用户名和密码都被校验正确后方可跳转到商品页。
    在注册用户模块中,仅需判断MongoDB中是否有该用户信息即可。
    6.6.2后台商品模糊查询功能实现
    后台通过获取前端提交的查询请求即可使用Elasticsearch进行模糊匹配查找商品,实现模糊查询代码如下图652所示。

    图 6-52模糊查询Controller代码
    使用Elasticsearch查询商品的业务代码如下图653所示。

    图 6-53Elasticsearch的Configuration代码
    用户在前端页面搜索框输入商品名,前端展示商品的查询结果如图654所示。

    图 6-54模糊查询结果展示
    模糊查询是电商系统的必备功能,不是推荐服务的核心功能,在此不重点介绍,读者可以参考ELK的相关官网。
    6.6.3后台用户评分日志模块的实现
    项目实现个性化推荐技术需要采集用户行为日志,通过Nginx生成埋点日志代码如下图6-55所示。

    图 6-55Nginx生成埋点日志代码
    启动项目以后,系统自动通过Nginx获取用户评分日志,agent.log日志文件如图6-56所示。

    图 6-56 agent.log日志文件
    系统自动将日志追加写入到agent.log文件中,通过Flume-ng此采集服务实时发送到Kafka和写入到Redis,再传到SparkStreaming进行实时计算。
    用户在浏览商品或对商品评分以后,系统会自动收集用户的评分信息并写入到日志文件中,日志中用户评分行为记录结果如图6-57所示。

    图 6-57日志中详细地记载了用户评分行为
    从上述结果看来,JavaEE前台服务系统已基本完成用户行为数据采集,接下来就进行系统测试。
    6.7系统测试
    完成电商推荐系统的开发以后,首先要进行测试,确保测试无误后方可上线。
    6.7.1系统演示
    演示系统的主要流程如下:
    1、创建四个测试用户a1、b1、c1、d1,密码均为“123”。
    2、分别登录用户a1、b1、c1、d1的账户,模拟浏览商品、对商品评分。
    3、一段时间后查看更新后的商品推荐结果。
    假设用户a1初次使用电商推荐系统,那么系统一开始不会知道用户a1的商品口味偏好,因此在“猜你喜欢”模块中不会有任何推荐内容,这就出现了电商推荐系统的“冷启动”问题(如图6-58所示),后文会详细介绍。

    图 6-58用户首次使用电视推荐系统出现“冷启动”问题
    分别使用用户a1、b1、c1、d1对商品进行模拟浏览和评分,其中,用户分别代表的购物口味如下:
    (1)用户a1喜欢看书,偶尔会看生活用品。
    (2)用户b1与用户a1的口味相似,也喜欢书和生活用品,但是用户b1和用户a1对商品的评分不完全相同。
    (3)用户c1与用户a1、b1的口味不同,喜欢数码产品而不喜欢书。
    (4)用户d1为系统的流失用户,即注册后就再也没有使用过电商推荐系统,也没对浏览过任何商品和对任何商品评分,主要被用于对照。
    一段时间后数据更新的推荐结果如下:
    用户a1的商品推荐结果如图6-59所示,电商推荐系统给用户a1推荐了4种书。

    图 6-59用户a1的个性化推荐结果
    用户b1的商品推荐结果如图6-60所示,电商推荐系统给用户b1也推荐了4种书,但是书名及内容与用户a1不完全相同。

    图 6-60用户b1的个性化推荐结果
    用户c1的商品推荐结果如图6-61所示,电商推荐系统给用户a1推荐了4种数码而仅仅推荐了一种书。

    图 6-61用户c1的个性化推荐结果
    用户d1的商品推荐结果如图6-62所示,电商推荐系统没有给d1推荐任何商品,因为用户d1未曾浏览过任何商品,也未给任何商品评分。

    图 6-62用户d1的个性化推荐结果
    从最终的推荐结果来看,用户a1、b1、c1的“猜你喜欢”模块中的推荐数据都不同,这是因为用户a1、b1、c1的购物口味偏好不尽相同。而“热门推荐”和“评分最多”推荐模块的历史推荐结果始终保持不变,这证明本课题项目的已基本成功地实现。
    6.7.2冷启动问题
    冷启动问题主要体现在用户首次注册系统时,由于用户从未对任何商品进行过评分,因此系统无法判断任何用户偏好[21],所以不能实现个性化推荐,这时候系统就看似出了问题而实际上没出问题。
    解决方式:在用户首次注册电商推荐系统的时候,系统发出弹窗收集用户的行为偏好,让用户选择自己的标签,这样用户在注册的时候系统就知道用户的商品口味偏好了[22],这也是各大APP在针对新用户收集用户标签的原因之一。

    第7章
    总结
    7.1课题总结
    本文基于当前大数据环境下的电商业务,介绍了当前主流的Spark相关技术与推荐算法,并借助Spark尤其是SparkMLlib以及其他的辅助框架工具实现了电商推荐系统,完成了推荐功能。在此期间,我查阅了大量的理论资料和技术文档,再加上对相关技术的深入学习,最终设计并实现了该系统,实属不易,以下是本次课设的总结。
    在文章的一开始介分析了一下国内外推荐系统的发展状况以及应用,从而得出推荐系统在电商行业中的“隐藏价值”。如果将大数据比喻成未来电商行业信息技术发展的“石油”,那么电商推荐系统就是“石油开采机”。
    在文章的第二章介绍了当前主流的JavaEE微服务技术,并对当前快速发展的非关系型数据库进行了介绍,同时介绍了在大数据背景下实时采集日志数据的两大框架,接着介绍了Spark技术栈以及本课题项目使用到的Spark组件,此外还对电商推荐系统的相关算法理论做了简单的介绍。在介绍了Linux服务器以后,最后介绍了电商推荐系统的未来技术选型,讲解了一下当前正在热门发展的Flink框架以及和Spark的对比,说明了本课题当前选择Spark而不使用Flink原因,并在结尾简单地概述了用户画像技术在电商推荐系统中的应用。
    在文章的第三章首先从技术、经济、操作的角度出发,综合地分析了电商推荐系统的实现可行性,结果证明本课题电商推荐系统是可行的。接着对系统进行需求分析,分别从系统的业务需求、离线推荐服务需求、实时推荐需求、用户需求、功能需求等进行分析。
    在文章的第四章简单地说明了设计电商推荐系统的思想,从系统功能设计原则出发,简单地介绍了JavaEE模块的设计、数据仓库设计与表设计、电商推荐系统的数据流程设计,再详细地介绍电商推荐系统的离线推荐模块设计与实时推荐模块的设计。本章从外到内,由浅到深,逐步介绍电商推荐系统的设计思想,为后面的电商推荐系统的实现做模型架构准备。
    第五章是本文的精髓,本章介绍了电商推荐系统的各种算法,包括统计推荐算法、ALS最小二乘法、协同过滤算法、实时推荐算法与其他常用的电商推荐系统的推荐算法,为后文的系统实现提供理论基础。
    第六章先简单地介绍前端服务模块的实现,从搭建Linux虚拟机开始,逐步实现Spark生态系统组件的安装,再分别介绍统计推荐、基于隐语义模型协同过滤推荐算法的离线推荐、实时推荐模块的详细实现方案,同时将项目的核心源码展示出来,并介绍推荐思想,再介绍如何部署、启动项目,实现搭建电商推荐系统的“一条龙”介绍,方便读者阅读。在本章的结尾说明了项目会遇到的非功能性的“冷启动”问题以及解决方案,同时介绍了其他的推荐算法。
    在实现本课题项目之后,我对电商推荐系统的推荐准确性、推荐效率进行了测试与评估,测试证明项目的推荐结果比较准确,并且推荐的实时性比较高。但是迄今为止还没有准确率高达100%的推荐系统,在今后的工作和学习中,需要研究更加准确的推荐算法,来使得电商推荐系统的推荐准确率更高。
    7.2项目亮点
    本文诞生于大数据技术被电商行业广泛应用的时代,本课题项目主要有以下几个亮点,可供电商行业和大数据行业进行参考:
    7.2.1使用高效率的JavaEE框架
    本课题项目采用轻量级且高效率的SpringBoot作为JavaEE微服务框架,摒弃了已被淘汰的SSH、SSM框架,使得系统的业务服务效率得到了很大的提升。同时,SpringBoot被公认为是Spring框架的简化配置版,使用SpringBoot会使开发成本变低,能降低项目的成本投入并提高系统运行效率。
    7.2.2使用Spark作为推荐服务工具
    本课题项目的推荐模块采用Scala语言开发,因此项目的拓展性更好,稳定性更强,项目的运行效率更快。
    项目没有依赖于Hadoop,而是使用SparkSQL作为统计推荐服务引擎,其效率远比使用MapReduce高;使用SparkMLlib进行机器学习推荐算法的实现,缩短了大数据与机器学习人工智能领域的边界距离;使用SparkStreaming进行实时推荐,不仅效率高而且推荐准确度也高。
    7.2.3使用多种智能推荐算法
    除了使用SparkSQL实现统计推荐以外,本课题项目主要借助基于隐语义模型协同过滤算法和实时推荐算法完成商品推荐。在此之外,文章还介绍了多种推荐算法,使用不同的算法能有效地提升商品推荐的准确度。
    7.2.4使用非关系型数据库替换传统数据库
    随着时间的推移,非关系型数据库越来越受开发者的欢迎。2019年11月的数据库流行度排行榜中,众所周知的三大关系型数据库前三甲已经大幅下跌,但是目前绝大多数企业仍然使用关系型数据库存储业务数据。本文希望能引导开发者们将视线从关系型数据库向非关系型数据库转移。
    本课题项目选择MongoDB存储业务数据,使其在存储海量数据以后对性能不会造成影响;使用Redis作为缓存数据库,使得实时推荐模块的推荐结果更加准确。无论从数据的存储量还是处理的速度来看,本课题项目对推进大数据技术应用发展都有非常积极的作用。
    7.3存在的问题
    尽管本课题实现的电商推荐系统在推荐准确度和推荐效率等诸多方面有很多优势,但是它仍有很大的改善空间,以下是项目需要改善的地方:
    7.3.1研究更准确的推荐算法
    文章一开始提到,世界上没有100%准确的推荐算法,即便是你身上细胞都不肯能完全知道你对不同商品的口味偏好。因此,推荐算法只有更准确的,没有最准确的。今后要研究更加准确的推荐算法,结合用户行为以及用户的需求分析,使用用户行为数据进行多次机器学习模型训练,从而使得电商推荐系统的推荐准确性得到提升。
    7.3.2研究更高效的实时推荐框架
    本文的前面提到了Flink,由于诸多原因目前还不适合使用Flink,但是Flink处理实时计算的效率比SparkStreaming高,在后期的设计中可以逐步测试Flink,确保测试稳定且推荐准确无误后再慢慢将实时推荐模块往Flink上迁移。
    7.3.3提升系统性能
    在实现该电商推荐系统的过程中,系统选用不支持数据库事务管理MongoDB-RHEL-3.4.3版本作为业务数据库,而在进行用户推荐列表与商品推荐列表更新,由于要考虑到用户的实时数据对推荐结果的影响,目前采用“overwrite”覆盖写入的方式更新业务数据库。我们知道在覆盖数据表的第一步就是清空原表数据,再写入数据,而这期间系统将花费一部分的时间开销。此外,本系统采用本地的CentOS(Redhat简化版)虚拟机搭建搭建单节点伪分布式项目运行环境,在性能上远比不上各大巨头企业的分布式系统。因此,本课题项目在进行个性化推荐的时候系统会有几秒的数据更新延迟。
    在此,未来项目可以考虑以下两种解决方案:
    (1)升级MongoDB组件
    在大数据环境中,本课题项目使用的绝大多数组件都是近几年才开源的组件,尽管很多组件已有新的版本,但是新版本会存在很多未知的Bug,并且新版本组件的相关技术文档非常少。在企业中使用这样的组件是很危险的,虽然能在一定程度上提升系统的性能,但是会给系统遗留更多的安全漏洞,并且运维成本也高。因此,在运维技术不够雄厚的企业不建议使用这种方式提升系统性能。
    (2)搭建分布式系统或集群
    大数据环境主要使用分布式计算与分布式存储技术,此外还可以使用分布式服务,它能在很大程度上提升数据的存储容量和计算效率。在分布式系统中,多台独立的主机分别负责各自的服务模块,通过资源的动态分配、负载均衡等技术,系统的多个模块协作共同完成一个任务,使其更像是单个系统。推荐本课题项目使用搭建完全分布式系统来提升系统的运行、计算效率。
    与分布式系统不同的是,集群主要包含一个系统的多个相同组件,而分布式表示一个系统的多个不同组件。在我们看来,百度、淘宝、腾讯等公司的服务器好像从来都不用重启,并且它们的系统更新数据的速度特别快。真是这样吗?其实它们的服务器也是要经常重启的,之所以没影响到用户的体验,重启速度快是一个原因,其主要原因还是它们的系统很多个集群并实现了“高可用”。它们采用系统热备份的方式部署系统集群,系统的任意一个节点宕机都不会影响到整体系统的性能,从而对用户体验毫无影响。因此,在企业级电商推荐系统项目中,搭建系统集群或MongoDB集群,将数据存储在不同节点的MongoDB数据库中,这样系统每次覆盖更新用户推荐列表与商品推荐列表只用更新单个节点的表数据信息,从而对用户体验感无影响。

    致谢
    近四个月的毕业设计已经结束,在这段时间里得到老师和同学的很大的帮助,首先我要感谢我的导师蔡亮老师。无论从论文的写作与修改,还是相关操作技术的指导,蔡老师都给予了我极大的帮助。毫不夸张的说,如果没有蔡老师的细心直到,我根本无法完成本次毕业设计。在这个过程中,蔡老师在各方面给予了我极大的帮助。这将对我以后的学习生活产生很大的帮助,让我不断前进、进步。谨以此表达我对蔡老师最崇高的敬意和深深的感谢,他将对我的学习及工作产生很大的影响。
    此外,作为一个“技术的使用者、代码的搬运工”,我一直对全球各大开源技术源码和资料的提供者保持尊敬、感谢的态度。本课题项目从头到尾使用的组件包括Apache Spark、Apache Flume、Apache Kafka、Apache Zookeeper、Scala、Java、Redis、MongoDB、CentOS-Linux、Maven、SpringBoot、Angular等等软件无一例外全部都是永久免费的。在此,我衷心地对Apache等开源贡献组织以及全球的开源项目贡献开发者们表示感谢!是他们推动了大数据的发展,推动了时代的进步。
    另外,我还要感谢我的实习公司给我提升技术的机会,感谢我的同学家人在百忙之中予以支持,感谢在此期间帮助过我的所有人。
    最后,我还要感谢我的学校,在这里,我实现从一个懵懵懂懂的高中生到大学生的蜕变。我相信,我在这四年里学到的知识,将会是我一生最大的财富。


    参考文献
    [1]流言.亚马逊如何猜你喜欢[J].电脑爱好者,2013(02):26-27.
    [2]洪亮,任秋圜,梁树贤.国内电子商务网站推荐系统信息服务质量比较研究——以淘宝、京东、亚马逊为例[J].图书情报工作,2016,60(23):97-110.
    [3]伍之昂,曹杰. 电子商务推荐系统导论[M]. 科学出版社 2019.2
    [4]颜颖.个性化推荐系统在电子商务中的应用研究[J].太原城市职业技术学院学报,2019(11):35-37.
    [5]K. G. Srinivasa,Siddesh G. M.,Srinidhi H.. Apache Flume[M].Springer International Publishing:2018-05-16.
    [6]Jason Bell John Wiley & Sons, Inc. Apache Spark [J]. 2020 
    [7]纪涵,靖晓文,赵政达著. Spark SQL入门与实践指南[M]. 清华大学出版社.2018: 82-120.
    [8]王晓华,夏毓彦. SparkMLlib机器学习实践(第2版) [M]. 清华大学出版社 2017.3
    [9]Dominique Laffly John Wiley & Sons, Inc.Spark and Machine Learning Library[J]. 2020 
    [10]Karima Sid,Mohamed Batouche. Ensemble Learning for Large Scale Virtual Screening on Apache Spark[M].Springer International Publishing:2018-05-16.
    [11]王佳娴,王中杰.基于Spark的分布式实时推荐系统[J].系统仿真技术,2017,13(02):158-161.
    [12]Mikhail Genkin,Frank Dehne,Pablo Navarro,Siyu Zhou. Machine-Learning Based Spark and Hadoop Workload Classification Using Container Performance Patterns[M].Springer International Publishing:2019-10-08.
    [13]周恒新. 基于Hadoop架构的商业推荐引擎协同过滤算法设计与实现[D].电子科技大学,2016.
    [14]岑凯伦,于红岩,杨腾霄.大数据下基于Spark的电商实时推荐系统的设计与实现[J].现代计算机(专业版),2016(24):61-69.
    [15]单京晶. 基于内容的个性化推荐系统研究[D].东北师范大学,2015.
    [16]韩德志,陈旭光,雷雨馨,戴永涛,张肖.基于Spark Streaming的实时数据分析系统及其应用[J].计算机应用,2017,37(05):1263-1269.
    [17]郑凤飞,黄文培,贾明正.基于Spark的矩阵分解推荐算法[J].计算机应用,2015,35(10):2781-2783+2788.
    [18]Hien Luu. Spark SQL (Foundations)[M].Apress:2018-08-20.
    [19]王建芳. 机器学习算法实践——推荐系统的协同过滤理论及其应用[M]. 清华大学出版社 2018.11
    [20]陆俊尧,李玲娟.基于Spark的协同过滤算法并行化研究[J].计算机技术与发展,2019,29(01):85-89.
    [21]李聪,马丽.  电子商务推荐系统瓶颈问题研究[M]. 科学出版社有限责任公司 2017.4
    [22]李思. 基于Spark平台的个性化新闻推荐系统研究[D].华北理工大学,2019.
    [23]黄美灵. 推荐系统算法实践[M]. 电子工业出版社. 2019.9
    [24]郇长武,朱琳,陈俊材,周锋奇,张智勇.基于“互联网+”的多能融合个性化方案推荐平台设计[J].电子设计工程,2020,28(08):30-33+38.
    [25]车晋强,谢红薇.基于Spark的分层协同过滤推荐算法[J].电子技术应用,2015,41(09):135-138.

    展开全文
  • 基于django+mysql的电商推荐系统
  • 大数据电商推荐系统

    2020-10-28 08:15:18
    大数据电商推荐系统 文章目录大数据电商推荐系统前言第一章、项目体系架构设计1.1 项目系统架构1.2 项目数据流程1.3 数据模型第二章 工具环境搭建2.0 Jdk 环境配置2.1 MongoDB(单节点)环境配置2.2 Redis(单节点)...

    大数据电商推荐系统


    前言

    随着大数据时代的到来,大数据机器学习成为了重要的研究热点。电商网站越来越重视对于用户推荐系统的搭建,然而推荐系统对于要过滤的信息的规模往往是巨大的,Spark平台作为新一代计算框架,具有对大数据处理能力强等特点,大大提高了推荐系统的运行效率。本文基于Spark平台的以上特点,提出并设计了基于Spark平台的电商推荐系统,设计和实现了协同过滤算法,提高了系统的运行效率。


    提示:以下是本篇文章正文内容,下面案例可供参考

    第一章、项目体系架构设计

    1.1 项目系统架构

    项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建了符合教学体系的一体化的电商推荐系统,包含了离线推荐与实时推荐体系,综合利用了协同过滤算法以及基于内容的推荐方法来提供混合推荐。提供了从前端应用、后台服务、算法设计实现、平台部署等多方位的闭环的业务实现。

    在这里插入图片描述
    用户可视化:主要负责实现和用户的交互以及业务数据的展示,主体采用AngularJS2进行实现,部署在Apache服务上。

    综合业务服务:主要实现JavaEE层面整体的业务逻辑,通过Spring进行构建,对接业务需求。部署在Tomcat上。

    【数据存储部分】
    业务数据库:项目采用广泛应用的文档数据库MongDB作为主数据库,主要负责平台业务逻辑数据的存储。
    缓存数据库:项目采用Redis作为缓存数据库,主要用来支撑实时推荐系统部分对于数据的高速获取需求。

    【离线推荐部分】
    离线统计服务:批处理统计性业务采用Spark Core + Spark SQL进行实现,实现对指标类数据的统计任务。
    离线推荐服务:离线推荐业务采用Spark Core + Spark MLlib进行实现,采用ALS算法进行实现。

    【实时推荐部分】
    日志采集服务:通过利用Flume-ng对业务平台中用户对于商品的一次评分行为进行采集,实时发送到Kafka集群。
    消息缓冲服务:项目采用Kafka作为流式数据的缓存组件,接受来自Flume的数据采集请求。并将数据推送到项目的实时推荐系统部分。
    实时推荐服务:项目采用Spark Streaming作为实时推荐系统,通过接收Kafka中缓存的数据,通过设计的推荐算法实现对实时推荐的数据处理,并将结构合并更新到MongoDB数据库。

    1.2 项目数据流程

    在这里插入图片描述
    【系统初始化部分】
    1.通过Spark SQL将系统初始化数据加载到MongoDB中。
    【离线推荐部分
    1.可以通过Azkaban实现对于离线统计服务以离线推荐服务的调度,通过设定的运行时间完成对任务的触发执行。
    2. 离线统计服务从MongoDB中加载数据,将【商品平均评分统计】、【商品评分个数统计】、【最近商品评分个数统计】三个统计算法进行运行实现,并将计算结果回写到MongoDB中;离线推荐服务从MongoDB中加载数据,通过ALS算法分别将【用户推荐结果矩阵】、【影片相似度矩阵】回写到MongoDB中。
    【实时推荐部分】
    3. Flume从综合业务服务的运行日志中读取日志更新,并将更新的日志实时推送到Kafka中;Kafka在收到这些日志之后,通过kafkaStream程序对获取的日志信息进行过滤处理,获取用户评分数据流【UID|MID|SCORE|TIMESTAMP】,并发送到另外一个Kafka队列;Spark Streaming监听Kafka队列,实时获取Kafka过滤出来的用户评分数据流,融合存储在Redis中的用户最近评分队列数据,提交给实时推荐算法,完成对用户新的推荐结果计算;计算完成之后,将新的推荐结构和MongDB数据库中的推荐结果进行合并。
    【业务系统部分】
    4. 推荐结果展示部分,从MongoDB中将离线推荐结果、实时推荐结果、内容推荐结果进行混合,综合给出相对应的数据。
    5. 商品信息查询服务通过对接MongoDB实现对商品信息的查询操作。
    6. 商品评分部分,获取用户通过UI给出的评分动作,后台服务进行数据库记录后,一方面将数据推动到Redis群中,另一方面,通过预设的日志框架输出到Tomcat中的日志中。
    7. 商品标签部分,项目提供用户对商品打标签服务。

    1.3 数据模型

    1. Product【商品数据表】
      在这里插入图片描述

    2. Rating【用户评分表】
      在这里插入图片描述

    3. Tag【商品标签表】在这里插入图片描述

    4. User【用户表】在这里插入图片描述

    5. RateMoreProductsRecently【最近商品评分个数统计表】在这里插入图片描述

    6. RateMoreProducts【商品评分个数统计表】在这里插入图片描述

    7. AverageProductsScore【商品平均评分表】在这里插入图片描述

    8. ProductRecs【商品相似性矩阵】在这里插入图片描述

    9. UserRecs【用户商品推荐矩阵】在这里插入图片描述

    10. StreamRecs【用户实时商品推荐矩阵】
      在这里插入图片描述

    第二章 环境,工具环境搭建

    我们的项目中用到了多种工具进行数据的存储、计算、采集和传输,本章主要简单介绍设计的工具环境搭建。

    如果机器的配置不足,推荐只采用一台虚拟机进行配置,而非完全分布式,将该虚拟机CPU的内存设置的尽可能大,推荐为CPU > 4、MEM > 4GB。

    虚拟机搭建

    首先我们访问官网地址https://www.vmware.com/cn.html
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述

    在这里插入图片描述

    2.0 Jdk 环境配置

    1.首先去Oracle官网下载相应的版本,这里使用的是1.8
    下载地址https://www.oracle.com/java/technologies/javase/javase-jdk8-downloads.html 选择jdk-8u221-linux-x64.tar.gz
    2、解压文件:
    执行 tar -zxf jdk-8u221-linux-x64.tar.gz 命令

    3、修改文件配置
    执行 vi /etc/profile 命令,在最底部加入
    JAVA_HOME=/usr/local/jdk1.8.0_221
    PATH=JAVA_HOME/bin:PATH
    CLASSPATH=JAVA_HOME/jre/lib/ext:$JAVA_HOME/lib/tools.jar
    export PATH JAVA_HOME CLASSPATH
    4、使配置文件立即生效
    执行source /etc/profile命令

    5、然后执行以下命令验证安装jdk是否成功
    java -version

    2.1 MongoDB(单节点)环境配置

    // 通过WGET下载Linux版本的MongoDB
    [root@linux ~]$ wget https://fastdl.mongodb.org/linux/mongodb-linux-x86_64-rhel62-3.4.3.tgz
    // 将压缩包解压到指定目录
    [root@linux backup]$ tar -xf mongodb-linux-x86_64-rhel62-3.4.3.tgz -C ~/
    // 将解压后的文件移动到最终的安装目录
    [root@linux ~]$ mv mongodb-linux-x86_64-rhel62-3.4.3/ /usr/local/mongodb
    // 在安装目录下创建data文件夹用于存放数据和日志
    [root@linux mongodb]$ mkdir /usr/local/mongodb/data/
    // 在data文件夹下创建db文件夹,用于存放数据
    [root@linux mongodb]$ mkdir /usr/local/mongodb/data/db/
    // 在data文件夹下创建logs文件夹,用于存放日志
    [root@linux mongodb]$ mkdir /usr/local/mongodb/data/logs/
    // 在logs文件夹下创建log文件
    [root@linux mongodb]$ touch /usr/local/mongodb/data/logs/ mongodb.log
    // 在data文件夹下创建mongodb.conf配置文件
    [root@linux mongodb]$ touch /usr/local/mongodb/data/mongodb.conf
    // 在mongodb.conf文件中输入如下内容
    [root@linux mongodb]$ vi ./data/mongodb.conf
    #端口号port = 27017
    #数据目录
    dbpath = /usr/local/mongodb/data/db
    #日志目录
    logpath = /usr/local/mongodb/data/logs/mongodb.log
    #设置后台运行
    fork = true
    #日志输出方式
    logappend = true
    #开启认证
    #auth = true
    完成MongoDB的安装后,启动MongoDB服务器:
    // 启动MongoDB服务器
    [root@linux mongodb]$ sudo /usr/local/mongodb/bin/mongod -config /usr/local/mongodb/data/mongodb.conf
    // 访问MongoDB服务器
    [root@linux mongodb]$ /usr/local/mongodb/bin/mongo
    // 停止MongoDB服务器
    [root@linux mongodb]$ sudo /usr/local/mongodb/bin/mongod -shutdown -config /usr/local/mongodb/data/mongodb.conf
    查看数据库
    在这里插入图片描述

    2.2 Redis(单节点)环境配置

    // 通过WGET下载REDIS的源码
    [root@linux ~]wgethttp://download.redis.io/releases/redis4.0.2.tar.gz//[root@linux ]wget http://download.redis.io/releases/redis-4.0.2.tar.gz // 将源代码解压到安装目录 [root@linux ~] tar -xf redis-4.0.2.tar.gz -C ~/
    // 进入Redis源代码目录,编译安装
    [root@linux ~]$ cd redis-4.0.2/
    // 安装GCC
    [root@linux ~]$ sudo yum install gcc
    // 编译源代码
    [root@linux redis-4.0.2]$ make MALLOC=libc
    // 编译安装
    [root@linux redis-4.0.2]$ sudo make install
    // 创建配置文件
    [root@linux redis-4.0.2]$ sudo cp ~/redis-4.0.2/redis.conf /etc/
    // 修改配置文件中以下内容
    [root@linux redis-4.0.2]$ sudo vi /etc/redis.conf
    daemonize yes #37行 #是否以后台daemon方式运行,默认不是后台运行
    pidfile /var/run/redis/redis.pid #41行 #redis的PID文件路径(可选)
    bind 0.0.0.0 #64行 #绑定主机IP,默认值为127.0.0.1,我们是跨机器运行,所以需要更改
    logfile /var/log/redis/redis.log #104行 #定义log文件位置,模式log信息定向到stdout,输出到/dev/null(可选)
    dir “/usr/local/rdbfile” #188行 #本地数据库存放路径,默认为./,编译安装默认存在在/usr/local/bin下(可选)
    在安装完Redis之后,启动Redis
    // 启动Redis服务器
    [root@linux redis-4.0.2]$ redis-server /etc/redis.conf
    在这里插入图片描述

    // 连接Redis服务器
    [root@linux redis-4.0.2]$ redis-cli
    在这里插入图片描述

    // 停止Redis服务器
    [root@linux redis-4.0.2]$ redis-cli shutdown
    ,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0lUX3Rhbg==,size_16,color_FFFFFF,t_70#pic_center)

    2.3 Spark(单节点)环境配置

    // 通过wget下载zookeeper安装包
    [root@linux ~]$ wget https://d3kbcqa49mib13.cloudfront.net/spark-2.1.1-bin-hadoop2.7.tgz
    // 将spark解压到安装目录
    [root@linux ~]$ tar –xf spark-2.1.1-bin-hadoop2.7.tgz –C ./cluster
    // 进入spark安装目录
    [root@linux cluster]$ cd spark-2.1.1-bin-hadoop2.7/
    // 复制slave配置文件
    [root@linux spark-2.1.1-bin-hadoop2.7]$ cp ./conf/slaves.template ./conf/slaves
    // 修改slave配置文件
    [root@linux spark-2.1.1-bin-hadoop2.7]$ vi ./conf/slaves
    linux #在文件最后将本机主机名进行添加
    // 复制Spark-Env配置文件
    [root@linux spark-2.1.1-bin-hadoop2.7]$ cp ./conf/spark-env.sh.template ./conf/spark-env.sh
    SPARK_MASTER_HOST=linux #添加spark master的主机名
    SPARK_MASTER_PORT=7077 #添加spark master的端口号
    [root@linux spark-2.1.1-bin-hadoop2.7]$ sbin/start-all.sh
    // 访问Spark集群,浏览器访问http://linux:8080
    1:查看防火状态
    systemctl status firewalld
    2:暂时关闭防火墙
    systemctl stop firewalld
    3:永久关闭防火墙
    systemctl disable firewalld
    在这里插入图片描述

    // 关闭Spark集群
    [root@linux spark-2.1.1-bin-hadoop2.7]$ sbin/stop-all.sh

    2.4 Zookeeper(单节点)环境配置

    // 通过wget下载zookeeper安装包
    [root@linux ~]$ wget http://mirror.bit.edu.cn/apache/zookeeper/zookeeper-3.4.10/zookeeper-3.4.10.tar.gz
    // 将zookeeper解压到安装目录
    [root@linux ~]$ tar –xf zookeeper-3.4.10.tar.gz –C ./cluster
    // 进入zookeeper安装目录
    [root@linux cluster]$ cd zookeeper-3.4.10/
    // 创建data数据目录
    [root@linux zookeeper-3.4.10]$ mkdir data/
    // 复制zookeeper配置文件
    [root@linux zookeeper-3.4.10]$ cp ./conf/zoo_sample.cfg ./conf/zoo.cfg
    // 修改zookeeper配置文件
    [root@linux zookeeper-3.4.10]$ vi conf/zoo.cfg
    dataDir=/home/bigdata/cluster/zookeeper-3.4.10/data #将数据目录地址修改为创建的目录
    // 启动Zookeeper服务
    [root@linux zookeeper-3.4.10]$ bin/zkServer.sh start
    // 查看Zookeeper服务状态
    [root@linux zookeeper-3.4.10]$ bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /home/bigdata/cluster/zookeeper-3.4.10/bin/…/conf/zoo.cfg
    Mode: standalone
    // 关闭Zookeeper服务
    [root@linux zookeeper-3.4.10]$ bin/zkServer.sh stop

    2.5 Flume-ng(单节点)环境配置

    // 通过wget下载zookeeper安装包
    [root@linux ~]$ wget http://www.apache.org/dyn/closer.lua/flume/1.8.0/apache-flume-1.8.0-bin.tar.gz
    // 将zookeeper解压到安装目录
    [root@linux ~]$ tar –xf apache-flume-1.8.0-bin.tar.gz –C ./cluster
    // 等待项目部署时使用

    2.6 Kafka(单节点)环境配置

    // 通过wget下载zookeeper安装包
    [root@linux ~]$ wget http://mirrors.tuna.tsinghua.edu.cn/apache/kafka/0.10.2.1/kafka_2.11-0.10.2.1.tgz
    // 将kafka解压到安装目录
    [root@linux ~]$ tar –xf kafka_2.12-0.10.2.1.tgz –C ./cluster
    // 进入kafka安装目录
    [root@linux cluster]$ cd kafka_2.12-0.10.2.1/
    // 修改kafka配置文件
    [root@linux kafka_2.12-0.10.2.1]$ vi config/server.properties
    host.name=linux #修改主机名
    port=9092 #修改服务端口号
    zookeeper.connect=linux:2181 #修改Zookeeper服务器地址
    // 启动kafka服务 !!! 启动之前需要启动Zookeeper服务
    [root@linux kafka_2.12-0.10.2.1]$ bin/kafka-server-start.sh -daemon ./config/server.properties
    // 关闭kafka服务
    [root@linux kafka_2.12-0.10.2.1]$ bin/kafka-server-stop.sh
    // 创建topic
    [root@linux kafka_2.12-0.10.2.1]$ bin/kafka-topics.sh --create --zookeeper linux:2181 --replication-factor 1 --partitions 1 --topic recommender
    // kafka-console-producer
    [root@linux kafka_2.12-0.10.2.1]$ bin/kafka-console-producer.sh --broker-list linux:9092 --topic recommender
    // kafka-console-consumer
    [root@linux kafka_2.12-0.10.2.1]$ bin/kafka-console-consumer.sh --bootstrap-server linux:9092 --topic recommender

    第三章 创建项目并初始化业务数据

    我们的项目主体用Scala编写,采用IDEA作为开发环境进行项目编写,采用maven作为项目构建和管理工具。

    3.1 在IDEA中创建maven项目

    打开IDEA,创建一个maven项目,命名为EcRecommendSystem1。为了方便后期的联调,我们会把业务系统的代码也添加进来,所以我们可以以ECommerceRecommendSystem作为父项目,并在其下建一个名为recommender的子项目,然后再在下面搭建多个子项目用于提供不同的推荐服务。
    3.1.1 项目框架搭建
    在EcRecommendSystem1下新建一个 maven module作为子项目,命名为recommender。同样的,再以recommender为父项目,新建一个maven module作为子项目。我们的第一步是初始化业务数据,所以子项目命名为 DataLoader。
    父项目只是为了规范化项目结构,方便依赖管理,本身是不需要代码实现的,所以EcRecommendSystem1和recommender下的src文件夹都可以删掉。
    目前的整体项目框架如下:
    在这里插入图片描述
    3.1.2 声明项目中工具的版本信息
    我们整个项目需要用到多个工具,它们的不同版本可能会对程序运行造成影响,所以应该在最外层的EcRecommendSystem1中声明所有子项目共用的版本信息。
    在pom.xml中加入以下配置:
    EcRecommendSystem1/pom.xml

    <properties>
        <log4j.version>1.2.17</log4j.version>
    <slf4j.version>1.7.22</slf4j.version>
    <mongodb-spark.version>2.0.0</mongodb-spark.version>
    <casbah.version>3.1.1</casbah.version>
    <redis.version>2.9.0</redis.version>
    <kafka.version>0.10.2.1</kafka.version>
    <spark.version>2.1.1</spark.version>
    <scala.version>2.11.8</scala.version>
    <jblas.version>1.2.1</jblas.version>
    </properties>
    

    3.1.3 添加项目依赖
    首先,对于整个项目而言,应该有同样的日志管理,我们在EcRecommendSystes1中引入公有依赖:
    EcRecommendSystem1/pom.xml

    <dependencies>
        <!-- 引入共同的日志管理工具 -->
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>jcl-over-slf4j</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-api</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>org.slf4j</groupId>
            <artifactId>slf4j-log4j12</artifactId>
            <version>${slf4j.version}</version>
        </dependency>
        <dependency>
            <groupId>log4j</groupId>
            <artifactId>log4j</artifactId>
            <version>${log4j.version}</version>
        </dependency>
    </dependencies>
    

    同样,对于maven项目的构建,可以引入公有的插件:

    <build>
        <!--声明并引入子项目共有的插件-->
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-compiler-plugin</artifactId>
                <version>3.6.1</version>
                <!--所有的编译用JDK1.8-->
                <configuration>
                    <source>1.8</source>
                    <target>1.8</target>
                </configuration>
            </plugin>
        </plugins>
        <pluginManagement>
            <plugins>
                <!--maven的打包插件-->
                <plugin>
                    <groupId>org.apache.maven.plugins</groupId>
                    <artifactId>maven-assembly-plugin</artifactId>
                    <version>3.0.0</version>
                    <executions>
                        <execution>
                            <id>make-assembly</id>
                            <phase>package</phase>
                            <goals>
                                <goal>single</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
                <!--该插件用于将scala代码编译成class文件-->
                <plugin>
                    <groupId>net.alchim31.maven</groupId>
                    <artifactId>scala-maven-plugin</artifactId>
                    <version>3.2.2</version>
                    <executions>
                        <!--绑定到maven的编译阶段-->
                        <execution>
                            <goals>
                                <goal>compile</goal>
                                <goal>testCompile</goal>
                            </goals>
                        </execution>
                    </executions>
                </plugin>
            </plugins>
        </pluginManagement>
    </build>
    

    然后,在recommender模块中,我们可以为所有的推荐模块声明spark相关依赖(这里的dependencyManagement表示仅声明相关信息,子项目如果依赖需要自行引入):
    EcRecommendSystem1/recommender/pom.xml

    <dependencyManagement>
        <dependencies>
            <!-- 引入Spark相关的Jar包 -->
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-core_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-sql_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-streaming_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-mllib_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
            <dependency>
                <groupId>org.apache.spark</groupId>
                <artifactId>spark-graphx_2.11</artifactId>
                <version>${spark.version}</version>
            </dependency>
    <dependency>
                <groupId>org.scala-lang</groupId>
    <artifactId>scala-library</artifactId>
    <version>${scala.version}</version>
    </dependency>
        </dependencies>
    </dependencyManagement>
    

    由于各推荐模块都是scala代码,还应该引入scala-maven-plugin插件,用于scala程序的编译。因为插件已经在父项目中声明,所以这里不需要再声明版本和具体配置:

    <build>
        <plugins>
            <!-- 父项目已声明该plugin,子项目在引入的时候,不用声明版本和已经声明的配置 -->
            <plugin>
                <groupId>net.alchim31.maven</groupId>
                <artifactId>scala-maven-plugin</artifactId>
            </plugin>
        </plugins>
    </build>
    

    对于具体的DataLoader子项目,需要spark相关组件,还需要mongodb的相关依赖,我们在pom.xml文件中引入所有依赖(在父项目中已声明的不需要再加详细信息):
    EcRecommendSystem1/recommender/DataLoader/pom.xml

    <dependencies>
        <!-- Spark的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <!-- 引入Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
        <!-- 加入MongoDB的驱动 -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-core_2.11</artifactId>
            <version>${casbah.version}</version>
        </dependency>
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>${mongodb-spark.version}</version>
        </dependency>
    </dependencies>
    

    至此,我们做数据加载需要的依赖都已配置好,可以开始写代码了。

    3.2 数据加载准备

    在src/main/目录下,可以看到已有的默认源文件目录是java,我们可以将其改名为scala。将数据文products.csv,ratings.csv复制到资源文件目录src/main/resources下,我们将从这里读取数据并加载到mongodb中。

    3.2.1 Products数据集
    数据格式:
    productId,name,categoryIds, amazonId, imageUrl, categories, tags
    例如:
    3982^Fuhlen 富勒 M8眩光舞者时尚节能无线鼠标(草绿)(眩光.悦动.时尚炫舞鼠标 12个月免换电池 高精度光学寻迹引擎 超细微接收器10米传输距离) ^1057,439,736 ^B009EJN4T2 ^https://images-cn-4.ssl-images-amazon.com/images/I/31QPvUDNavL.SY300_QL70.jpg ^外设产品|鼠标|电脑/办公 ^富勒|鼠标|电子产品|好用|外观漂亮
    在这里插入图片描述

    Product数据集有7个字段,每个字段之间通过“^”符号进行分割。其中的categoryIds、amazonId对于内容特征没有实质帮助,我们只需要其它5个字段:
    在这里插入图片描述
    3.2.2 Ratings数据集
    数据格式:
    在这里插入图片描述

    userId,prudcutId,rating,timestamp
    例如:
    4867,457976,5.0,1395676800
    Rating数据集有4个字段,每个字段之间通过“,”分割。
    在这里插入图片描述
    3.2.3 日志管理配置文件
    log4j对日志的管理,需要通过配置文件来生效。在src/main/resources下新建配置文件log4j.properties,写入以下内容:

    log4j.rootLogger=info, stdout
    log4j.appender.stdout=org.apache.log4j.ConsoleAppender
    log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
    log4j.appender.stdout.layout.ConversionPattern=%d{yyyy-MM-dd HH:mm:ss,SSS}  %5p --- [%50t]  %-80c(line:%5L)  :  %m%n
    

    3.3 数据初始化到MongoDB

    **

    3.3.1 启动MongoDB数据库
    // 启动MongoDB服务器
    [bigdata@linux mongodb]$ sudo /usr/local/mongodb/bin/mongod -config /usr/local/mongodb/data/mongodb.conf
    // 访问MongoDB服务器
    [bigdata@linux mongodb]$ /usr/local/mongodb/bin/mongo
    // 停止MongoDB服务器
    [bigdata@linux mongodb]$ sudo /usr/local/mongodb/bin/mongod -shutdown -config /usr/local/mongodb/data/mongodb.conf

    **
    3.3.2 数据加载程序主体实现
    我们会为原始数据定义几个样例类,通过SparkContext的textFile方法从文件中读取数据,并转换成DataFrame,再利用Spark SQL提供的write方法进行数据的分布式插入。
    在DataLoader/src/main/scala下新建package,命名为com.atguigu.recommender,新建名为DataLoader的scala class文件。
    程序主体代码如下:
    DataLoader/src/main/scala/com.tcj.recommerder/DataLoader.scala

    /**
      * Copyright (c) 2018-2028 尚硅谷 All Rights Reserved 
      *
      * Project: ECommerceRecommendSystem
      * Package: com.atguigu.recommender
      * Version: 1.0
      *
      * Created by wushengran on 2019/4/26 15:43
      */
    
    /**
      * Product数据集
      * 3982                            商品ID
      * Fuhlen 富勒 M8眩光舞者时尚节能    商品名称
      * 1057,439,736                    商品分类ID,不需要
      * B009EJN4T2                      亚马逊ID,不需要
      * https://images-cn-4.ssl-image   商品的图片URL
      * 外设产品|鼠标|电脑/办公           商品分类
      * 富勒|鼠标|电子产品|好用|外观漂亮   商品UGC标签
      */
    
    // 定义样例类
    case class Product(productId: Int, name: String, imageUrl: String, categories: String, 
    tags: String)
    /**
      * Rating数据集
      * 4867        用户ID
      * 457976      商品ID
      * 5.0         评分
      * 1395676800  时间戳
      */
    case class Rating(userId: Int, productId: Int, score: Double, timestamp: Int)
    
    /**
      * MongoDB连接配置
      * @param uri    MongoDB的连接uri
      * @param db     要操作的db
      */
    
    case class MongoConfig(uri:String, db:String)
    
    object DataLoader {
      // 以window下为例,需替换成自己的路径,linux下为 /YOUR_PATH/resources/products.csv
      val PRODUCT_DATA_PATH = " YOUR_PATH\\resources\\products.csv"
      val RATING_DATA_PATH = " YOUR_PATH\\resources\\ratings.csv"
    
      val MONGODB_PRODUCT_COLLECTION = "Product"
      val MONGODB_RATING_COLLECTION = "Rating"
     
    // 主程序的入口
      def main(args: Array[String]): Unit = {
        // 定义用到的配置参数
        val config = Map(
          "spark.cores" -> "local[*]",
          "mongo.uri" -> "mongodb://localhost:27017/recommender",
          "mongo.db" -> "recommender"
        )
        // 创建一个SparkConf配置
        val sparkConf = new
    SparkConf().setAppName("DataLoader").setMaster(config("spark.cores"))
        // 创建一个SparkSession 
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    
        // 在对DataFrame和Dataset进行操作许多操作都需要这个包进行支持
        import spark.implicits._
    
        // 将Product、Rating数据集加载进来
        val productRDD = spark.sparkContext.textFile(PRODUCT_DATA_PATH)
          //将ProdcutRDD装换为DataFrame
        val productDF = productRDD.map(item =>{
            val attr = item.split("\\^")
    Product(attr(0).toInt,attr(1).trim,attr(4).trim,attr(5).trim,attr(6).trim)
        }).toDF()
    
        val ratingRDD = spark.sparkContext.textFile(RATING_DATA_PATH)
          //将ratingRDD转换为DataFrame
        val ratingDF = ratingRDD.map(item => {
            val attr = item.split(",")
    Rating(attr(0).toInt,attr(1).toInt,attr(2).toDouble,attr(3).toInt)
        }).toDF()
    
        // 声明一个隐式的配置对象
        implicit val mongoConfig =
    	MongoConfig(config.get("mongo.uri").get,config.get("mongo.db").get)
        // 将数据保存到MongoDB中
        storeDataInMongoDB(productDF, ratingDF)
    
        // 关闭Spark
        spark.stop()
      }
    

    3.3.3 将数据写入MongoDB
    接下来,实现storeDataInMongo方法,将数据写入mongodb中:

    def storeDataInMongoDB(productDF: DataFrame, ratingDF:DataFrame)
    	(implicit mongoConfig: MongoConfig): Unit = {
    
      //新建一个到MongoDB的连接
      val mongoClient = MongoClient(MongoClientURI(mongoConfig.uri))
    
      // 定义通过MongoDB客户端拿到的表操作对象
      val productCollection = mongoClient(mongoConfig.db)(MONGODB_PRODUCT_COLLECTION)
    val ratingCollection = mongoClient(mongoConfig.db)(MONGODB_RATING_COLLECTION)
    
      //如果MongoDB中有对应的数据库,那么应该删除
      productCollection.dropCollection()
      ratingCollection.dropCollection()
    
      //将当前数据写入到MongoDB
      productDF
        .write
        .option("uri",mongoConfig.uri)
        .option("collection",MONGODB_PRODUCT_COLLECTION)
        .mode("overwrite")
        .format("com.mongodb.spark.sql")
        .save()
      ratingDF
        .write
        .option("uri",mongoConfig.uri)
        .option("collection",MONGODB_RATING_COLLECTION)
        .mode("overwrite")
        .format("com.mongodb.spark.sql")
        .save()
    
      //对数据表建索引
    productCollection.createIndex(MongoDBObject("productId" -> 1))
    ratingCollection.createIndex(MongoDBObject("userId" -> 1))
    ratingCollection.createIndex(MongoDBObject("productId" -> 1))
    
      //关闭MongoDB的连接
      mongoClient.close()
    }6
    

    进入数据库
    在这里插入图片描述
    在这里插入图片描述
    查看我们的表内容
    db.Product.find().pretty()
    在这里插入图片描述
    在这里插入图片描述

    第四章 离线推荐服务建设

    4.1 离线推荐服务

    离线推荐服务是综合用户所有的历史数据,利用设定的离线统计算法和离线推荐算法周期性的进行结果统计与保存,计算的结果在一定时间周期内是固定不变的,变更的频率取决于算法调度的频率。
    离线推荐服务主要计算一些可以预先进行统计和计算的指标,为实时计算和前端业务相应提供数据支撑。
    离线推荐服务主要分为统计推荐、基于隐语义模型的协同过滤推荐以及基于内容和基于Item-CF的相似推荐。我们这一章主要介绍前两部分,基于内容和Item-CF的推荐在整体结构和实现上是类似的

    4.2 离线统计服务

    4.2.1 统计服务主体框架

    在recommender下新建子项目StatisticsRecommender,pom.xml文件中只需引入spark、scala和mongodb的相关依赖:
    在这里插入图片描述

    <dependencies>
        <!-- Spark的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <!-- 引入Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
        <!-- 加入MongoDB的驱动 -->
        <!-- 用于代码方式连接MongoDB -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-core_2.11</artifactId>
            <version>${casbah.version}</version>
        </dependency>
        <!-- 用于Spark和MongoDB的对接 -->
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>${mongodb-spark.version}</version>
        </dependency>
    </dependencies>
    

    在resources文件夹下引入log4j.properties,然后在src/main/scala下新建scala 单例对象com.atguigu.statistics.StatisticsRecommender。
    同样,我们应该先建好样例类,在main()方法中定义配置、创建SparkSession并加载数据,最后关闭spark。代码如下:
    src/main/scala/com.atguigu.statistics/StatisticsRecommender.scala

    case class Rating(userId: Int, productId: Int, score: Double, timestamp: Int)
    
    case class MongoConfig(uri:String, db:String)
    
    object StatisticsRecommender {
    
      val MONGODB_RATING_COLLECTION = "Rating"
    
      //统计的表的名称
      val RATE_MORE_PRODUCTS = "RateMoreProducts"
      val RATE_MORE_RECENTLY_PRODUCTS = "RateMoreRecentlyProducts"
      val AVERAGE_PRODUCTS = "AverageProducts"
    
      // 入口方法
      def main(args: Array[String]): Unit = {
    
        val config = Map(
          "spark.cores" -> "local[*]",
          "mongo.uri" -> "mongodb://localhost:27017/recommender",
          "mongo.db" -> "recommender"
        )
    
        //创建SparkConf配置
        val sparkConf = new SparkConf().setAppName("StatisticsRecommender").setMaster(config("spark.cores"))
        //创建SparkSession
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    
        val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
    
        //加入隐式转换
        import spark.implicits._
    
        //数据加载进来
        val ratingDF = spark
          .read
          .option("uri",mongoConfig.uri)
          .option("collection",MONGODB_RATING_COLLECTION)
          .format("com.mongodb.spark.sql")
          .load()
          .as[Rating]
          .toDF()
    
        //创建一张名叫ratings的表
        ratingDF.createOrReplaceTempView("ratings")
    //TODO: 不同的统计推荐结果
    spark.stop()
    }
    

    4.2.2 历史热门商品统计
    根据所有历史评分数据,计算历史评分次数最多的商品。
    实现思路:
    通过Spark SQL读取评分数据集,统计所有评分中评分数最多的商品,然后按照从大到小排序,将最终结果写入MongoDB的RateMoreProducts数据集中。
    //统计所有历史数据中每个商品的评分数
    //数据结构 -》 productId,count

    val rateMoreProductsDF = spark.sql("select productId, count(productId) as count from ratings group by productId ")
    
    rateMoreProductsDF
      .write
      .option("uri",mongoConfig.uri)
      .option("collection",RATE_MORE_PRODUCTS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()
    

    4.2.3 最近热门商品统计
    根据评分,按月为单位计算最近时间的月份里面评分数最多的商品集合。
    实现思路:
    通过Spark SQL读取评分数据集,通过UDF函数将评分的数据时间修改为月,然后统计每月商品的评分数。统计完成之后将数据写入到MongoDB的RateMoreRecentlyProducts数据集中。
    //统计以月为单位拟每个商品的评分数
    //数据结构 -》 productId,count,time

    //创建一个日期格式化工具
    val simpleDateFormat = new SimpleDateFormat("yyyyMM")
    
    //注册一个UDF函数,用于将timestamp装换成年月格式   1260759144000  => 201605
    spark.udf.register("changeDate",(x:Int) => simpleDateFormat.format(new Date(x * 1000L)).toInt)
    
    // 将原来的Rating数据集中的时间转换成年月的格式
    val ratingOfYearMonth = spark.sql("select productId, score, changeDate(timestamp) as yearmonth from ratings")
    
    // 将新的数据集注册成为一张表
    ratingOfYearMonth.createOrReplaceTempView("ratingOfMonth")
    
    val rateMoreRecentlyProducts = spark.sql("select productId, count(productId) as count ,yearmonth from ratingOfMonth group by yearmonth,productId order by yearmonth desc, count desc")
    
    rateMoreRecentlyProducts
      .write
      .option("uri",mongoConfig.uri)
      .option("collection",RATE_MORE_RECENTLY_PRODUCTS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()
    

    4.2.4 商品平均得分统计
    根据历史数据中所有用户对商品的评分,周期性的计算每个商品的平均得分。
    实现思路:
    通过Spark SQL读取保存在MongDB中的Rating数据集,通过执行以下SQL语句实现对于商品的平均分统计:
    //统计每个商品的平均评分

    val averageProductsDF = spark.sql("select productId, avg(score) as avg from ratings group by productId ")
    
    averageProductsDF
      .write
      .option("uri",mongoConfig.uri)
      .option("collection",AVERAGE_PRODUCTS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()
    

    统计完成之后将生成的新的DataFrame写出到MongoDB的AverageProducts集合中。
    在这里插入图片描述
    查看AverageProducts
    db.AverageProducts.find().pretty()
    在这里插入图片描述

    4.3 基于隐语义模型的协同过滤推荐

    项目采用ALS作为协同过滤算法,根据MongoDB中的用户评分表计算离线的用户商品推荐列表以及商品相似度矩阵。
    4.3.1 用户商品推荐列表
    通过ALS训练出来的Model来计算所有当前用户商品的推荐列表,主要思路如下:

    1. userId和productId做笛卡尔积,产生(userId,productId)的元组
    2. 通过模型预测(userId,productId)对应的评分。
    3. 将预测结果通过预测分值进行排序。
    4. 返回分值最大的K个商品,作为当前用户的推荐列表。
      最后生成的数据结构如下:将数据保存到MongoDB的UserRecs表中
      在这里插入图片描述
      新建recommender的子项目OfflineRecommender,引入spark、scala、mongo和jblas的依赖:
    <dependencies>
    
        <dependency>
            <groupId>org.scalanlp</groupId>
            <artifactId>jblas</artifactId>
            <version>${jblas.version}</version>
        </dependency>
    
        <!-- Spark的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-mllib_2.11</artifactId>
        </dependency>
        <!-- 引入Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
    
        <!-- 加入MongoDB的驱动 -->
        <!-- 用于代码方式连接MongoDB -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-core_2.11</artifactId>
            <version>${casbah.version}</version>
        </dependency>
        <!-- 用于Spark和MongoDB的对接 -->
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>${mongodb-spark.version}</version>
        </dependency>
    </dependencies>
    

    同样经过前期的构建样例类、声明配置、创建SparkSession等步骤,可以加载数据开始计算模型了。
    核心代码如下:
    src/main/scala/com.atguigu.offline/OfflineRecommender.scala

    case class ProductRating(userId: Int, productId: Int, score: Double, timestamp: Int)
    
    case class MongoConfig(uri:String, db:String)
    
    // 标准推荐对象,productId,score
    case class Recommendation(productId: Int, score:Double)
    
    // 用户推荐列表
    case class UserRecs(userId: Int, recs: Seq[Recommendation])
    
    // 商品相似度(商品推荐)
    case class ProductRecs(productId: Int, recs: Seq[Recommendation])
    
    object OfflineRecommmeder {
    
      // 定义常量
      val MONGODB_RATING_COLLECTION = "Rating"
    
      // 推荐表的名称
      val USER_RECS = "UserRecs"
      val PRODUCT_RECS = "ProductRecs"
    
      val USER_MAX_RECOMMENDATION = 20
    
      def main(args: Array[String]): Unit = {
        // 定义配置
        val config = Map(
          "spark.cores" -> "local[*]",
          "mongo.uri" -> "mongodb://localhost:27017/recommender",
          "mongo.db" -> "recommender"
        )
    
        // 创建spark session
        val sparkConf = new SparkConf().setMaster(config("spark.cores")).setAppName("OfflineRecommender")
        val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    
        implicit val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
    
        import spark.implicits._
    //读取mongoDB中的业务数据
    val ratingRDD = spark
    .read
    .option("uri",mongoConfig.uri)
    .option("collection",MONGODB_RATING_COLLECTION)
    .format("com.mongodb.spark.sql")
    .load()
    .as[ProductRating]
    .rdd
    .map(rating=> (rating.userId, rating.productId, rating.score)).cache()
    //用户的数据集 RDD[Int]
    val userRDD = ratingRDD.map(_._1).distinct()
    val prodcutRDD = ratingRDD.map(_._2).distinct()
    
    //创建训练数据集
    val trainData = ratingRDD.map(x => Rating(x._1,x._2,x._3))
    // rank 是模型中隐语义因子的个数, iterations 是迭代的次数, lambda 是ALS的正则化参
    val (rank,iterations,lambda) = (50, 5, 0.01)
    // 调用ALS算法训练隐语义模型
    val model = ALS.train(trainData,rank,iterations,lambda)
    
    //计算用户推荐矩阵
    val userProducts = userRDD.cartesian(productRDD)
    // model已训练好,把id传进去就可以得到预测评分列表RDD[Rating] (userId,productId,rating)
    val preRatings = model.predict(userProducts)
    
    val userRecs = preRatings
    .filter(_.rating > 0)
    .map(rating => (rating.user,(rating.product, rating.rating)))
    .groupByKey()    
    .map{
    case (userId,recs) => UserRecs(userId,recs.toList.sortWith(_._2 >
    _._2).take(USER_MAX_RECOMMENDATION).map(x => Recommendation(x._1,x._2)))
    }.toDF()
    
    userRecs.write
    .option("uri",mongoConfig.uri)
    .option("collection",USER_RECS)
    .mode("overwrite")
    .format("com.mongodb.spark.sql")
    .save()
    
    //TODO:计算商品相似度矩阵
    
    // 关闭spark
    spark.stop()
    }
    }
    

    4.3.2 商品相似度矩阵

    通过ALS计算商品相似度矩阵,该矩阵用于查询当前商品的相似商品并为实时推荐系统服务。
    离线计算的ALS 算法,算法最终会为用户、商品分别生成最终的特征矩阵,分别是表示用户特征矩阵的U(m x k)矩阵,每个用户由 k个特征描述;表示物品特征矩阵的V(n x k)矩阵,每个物品也由 k 个特征描述。
    V(n x k)表示物品特征矩阵,每一行是一个 k 维向量,虽然我们并不知道每一个维度的特征意义是什么,但是k 个维度的数学向量表示了该行对应商品的特征。
    所以,每个商品用V(n x k)每一行的 向量表示其特征,于是任意两个商品 p:特征向量为 ,商品q:特征向量为 之间的相似度sim(p,q)可以使用 和 的余弦值来表示:
    在这里插入图片描述

    数据集中任意两个商品间相似度都可以由公式计算得到,商品与商品之间的相似度在一段时间内基本是固定值。最后生成的数据保存到MongoDB的ProductRecs表中。

    在这里插入图片描述
    核心代码如下:

    //计算商品相似度矩阵
    //获取商品的特征矩阵,数据格式 RDD[(scala.Int, scala.Array[scala.Double])]
    val productFeatures = model.productFeatures.map{case (productId,features) =>
      (productId, new DoubleMatrix(features))
    }
    
    // 计算笛卡尔积并过滤合并
    val productRecs = productFeatures.cartesian(productFeatures)
      .filter{case (a,b) => a._1 != b._1}  
      .map{case (a,b) =>
        val simScore = this.consinSim(a._2,b._2) // 求余弦相似度
        (a._1,(b._1,simScore))
      }.filter(_._2._2 > 0.6)    
      .groupByKey()             
      .map{case (productId,items) =>
        ProductRecs(productId,items.toList.map(x => Recommendation(x._1,x._2)))
      }.toDF()
    
    productRecs
      .write
      .option("uri", mongoConfig.uri)
      .option("collection",PRODUCT_RECS)
      .mode("overwrite")
      .format("com.mongodb.spark.sql")
      .save()
    

    其中,consinSim是求两个向量余弦相似度的函数,代码实现如下:

    //计算两个商品之间的余弦相似度
    def consinSim(product1: DoubleMatrix, product2:DoubleMatrix) : Double ={
      product1.dot(product2) / ( product1.norm2()  * product2.norm2() )
    

    运行代码:
    在这里插入图片描述
    查看UserRces表
    在这里插入图片描述
    查看ProductRces表
    在这里插入图片描述

    4.3.3 模型评估和参数选取
    在上述模型训练的过程中,我们直接给定了隐语义模型的rank,iterations,lambda三个参数。对于我们的模型,这并不一定是最优的参数选取,所以我们需要对模型进行评估。通常的做法是计算均方根误差(RMSE),考察预测评分与实际评分之间的误差。
    在这里插入图片描述
    有了RMSE,我们可以就可以通过多次调整参数值,来选取RMSE最小的一组作为我们模型的优化选择。
    在scala/com.atguigu.offline/下新建单例对象ALSTrainer,代码主体架构如下:

    def main(args: Array[String]): Unit = {
      val config = Map(
        "spark.cores" -> "local[*]",
        "mongo.uri" -> "mongodb://localhost:27017/recommender",
        "mongo.db" -> "recommender"
      )
      //创建SparkConf
      val sparkConf = new SparkConf().setAppName("ALSTrainer").setMaster(config("spark.cores"))
      //创建SparkSession
      val spark = SparkSession.builder().config(sparkConf).getOrCreate()
    
      val mongoConfig = MongoConfig(config("mongo.uri"),config("mongo.db"))
    
      import spark.implicits._
    
      //加载评分数据
      val ratingRDD = spark
        .read
        .option("uri",mongoConfig.uri)
        .option("collection",OfflineRecommender.MONGODB_RATING_COLLECTION)
        .format("com.mongodb.spark.sql")
        .load()
        .as[ProductRating]
        .rdd
        .map(rating => Rating(rating.userId,rating.productId,rating.score)).cache()
    
      // 将一个RDD随机切分成两个RDD,用以划分训练集和测试集
      val splits = ratingRDD.randomSplit(Array(0.8, 0.2))
    
      val trainingRDD = splits(0)
      val testingRDD = splits(1)
    
      //输出最优参数
      adjustALSParams(trainingRDD, testingRDD)
    
      //关闭Spark
      spark.close()
    }
    

    其中adjustALSParams方法是模型评估的核心,输入一组训练数据和测试数据,输出计算得到最小RMSE的那组参数。代码实现如下:

    // 输出最终的最优参数
    def adjustALSParams(trainData:RDD[Rating], testData:RDD[Rating]): Unit ={
    // 这里指定迭代次数为5,rank和lambda在几个值中选取调整
      val result = for(rank <- Array(100,200,250); lambda <- Array(1, 0.1, 0.01, 0.001))
        yield {
          val model = ALS.train(trainData,rank,5,lambda)
          val rmse = getRMSE(model, testData)
          (rank,lambda,rmse)
        }
      // 按照rmse排序
      println(result.sortBy(_._3).head)
    }
    

    计算RMSE的函数getRMSE代码实现如下:

    def getRMSE(model:MatrixFactorizationModel, data:RDD[Rating]):Double={
      val userProducts = data.map(item => (item.user,item.product))
      val predictRating = model.predict(userProducts)
    val real = data.map(item => ((item.user,item.product),item.rating))
      val predict = predictRating.map(item => ((item.user,item.product),item.rating))
      // 计算RMSE
      sqrt(
        real.join(predict).map{case ((userId,productId),(real,pre))=>
          // 真实值和预测值之间的差
          val err = real - pre
          err * err
        }.mean()
      )
    }
    

    运行代码,我们就可以得到目前数据的最优模型参数。
    在这里插入图片描述

    第五章 实时推荐服务建设

    5.1 实时推荐服务

    实时计算与离线计算应用于推荐系统上最大的不同在于实时计算推荐结果应该反映最近一段时间用户近期的偏好,而离线计算推荐结果则是根据用户从第一次评分起的所有评分记录来计算用户总体的偏好。
    用户对物品的偏好随着时间的推移总是会改变的。比如一个用户u 在某时刻对商品p 给予了极高的评分,那么在近期一段时候,u 极有可能很喜欢与商品p 类似的其他商品;而如果用户u 在某时刻对商品q 给予了极低的评分,那么在近期一段时候,u 极有可能不喜欢与商品q 类似的其他商品。所以对于实时推荐,当用户对一个商品进行了评价后,用户会希望推荐结果基于最近这几次评分进行一定的更新,使得推荐结果匹配用户近期的偏好,满足用户近期的口味。
    如果实时推荐继续采用离线推荐中的ALS 算法,由于算法运行时间巨大,不具有实时得到新的推荐结果的能力;并且由于算法本身的使用的是评分表,用户本次评分后只更新了总评分表中的一项,使得算法运行后的推荐结果与用户本次评分之前的推荐结果基本没有多少差别,从而给用户一种推荐结果一直没变化的感觉,很影响用户体验。
    另外,在实时推荐中由于时间性能上要满足实时或者准实时的要求,所以算法的计算量不能太大,避免复杂、过多的计算造成用户体验的下降。鉴于此,推荐精度往往不会很高。实时推荐系统更关心推荐结果的动态变化能力,只要更新推荐结果的理由合理即可,至于推荐的精度要求则可以适当放宽。
    所以对于实时推荐算法,主要有两点需求:
    (1)用户本次评分后、或最近几个评分后系统可以明显的更新推荐结果;
    (2)计算量不大,满足响应时间上的实时或者准实时要求;

    5.2 实时推荐模型和代码框架

    5.2.1 实时推荐模型算法设计
    当用户u 对商品p 进行了评分,将触发一次对u 的推荐结果的更新。由于用户u 对商品p 评分,对于用户u 来说,他与p 最相似的商品们之间的推荐强度将发生变化,所以选取与商品p 最相似的K 个商品作为候选商品。
    每个候选商品按照“推荐优先级”这一权重作为衡量这个商品被推荐给用户u 的优先级。
    这些商品将根据用户u 最近的若干评分计算出各自对用户u 的推荐优先级,然后与上次对用户u 的实时推荐结果的进行基于推荐优先级的合并、替换得到更新后的推荐结果。
    具体来说:
    首先,获取用户u 按时间顺序最近的K 个评分,记为RK;获取商品p 的最相似的K 个商品集合,记为S;
    然后,对于每个商品q S ,计算其推荐优先级 ,计算公式如下:
    在这里插入图片描述

    其中:
    表示用户u 对商品r 的评分;
    sim(q,r)表示商品q 与商品r 的相似度,设定最小相似度为0.6,当商品q和商品r 相似度低于0.6 的阈值,则视为两者不相关并忽略;
    sim_sum 表示q 与RK 中商品相似度大于最小阈值的个数;
    incount 表示RK 中与商品q 相似的、且本身评分较高(>=3)的商品个数;
    recount 表示RK 中与商品q 相似的、且本身评分较低(< 3)的商品个数;

    公式的意义如下:
    首先对于每个候选商品q,从u 最近的K 个评分中,找出与q 相似度较高(>=0.6)的u 已评分商品们,对于这些商品们中的每个商品r,将r 与q 的相似度乘以用户u 对r 的评分,将这些乘积计算平均数,作为用户u 对商品q 的评分预测即
    在这里插入图片描述

    然后,将u 最近的K 个评分中与商品q 相似的、且本身评分较高(>=3)的商品个数记为 incount,计算lgmax{incount,1}作为商品 q 的“增强因子”,意义在于商品q 与u 的最近K 个评分中的n 个高评分(>=3)商品相似,则商品q 的优先级被增加lgmax{incount,1}。如果商品 q 与 u 的最近 K 个评分中相似的高评分商品越多,也就是说n 越大,则商品q 更应该被推荐,所以推荐优先级被增强的幅度较大;如果商品q 与u 的最近K 个评分中相似的高评分商品越少,也就是n 越小,则推荐优先级被增强的幅度较小;
    而后,将u 最近的K 个评分中与商品q 相似的、且本身评分较低(< 3)的商品个数记为 recount,计算lgmax{recount,1}作为商品 q 的“削弱因子”,意义在于商品q 与u 的最近K 个评分中的n 个低评分(< 3)商品相似,则商品q 的优先级被削减lgmax{incount,1}。如果商品 q 与 u 的最近 K 个评分中相似的低评分商品越多,也就是说n 越大,则商品q 更不应该被推荐,所以推荐优先级被减弱的幅度较大;如果商品q 与u 的最近K 个评分中相似的低评分商品越少,也就是n 越小,则推荐优先级被减弱的幅度较小;
    最后,将增强因子增加到上述的预测评分中,并减去削弱因子,得到最终的q 商品对于u 的推荐优先级。在计算完每个候选商品q 的 后,将生成一组<商品q 的ID, q 的推荐优先级>的列表updatedList:
    在这里插入图片描述

    而在本次为用户u 实时推荐之前的上一次实时推荐结果Rec 也是一组<商品m,m 的推荐优先级>的列表,其大小也为K:
    在这里插入图片描述

    接下来,将updated_S 与本次为u 实时推荐之前的上一次实时推荐结果Rec进行基于合并、替换形成新的推荐结果NewRec:
    在这里插入图片描述

    其中,i表示updated_S 与Rec 的商品集合中的每个商品,topK 是一个函数,表示从 Rec updated _ S中选择出最大的 K 个商品,cmp = 表示topK 函数将推荐优先级 值最大的K 个商品选出来。最终,NewRec 即为经过用户u 对商品p 评分后触发的实时推荐得到的最新推荐结果。

    总之,实时推荐算法流程流程基本如下:
    (1)用户u 对商品p 进行了评分,触发了实时推荐的一次计算;
    (2)选出商品p 最相似的K 个商品作为集合S;
    (3)获取用户u 最近时间内的K 条评分,包含本次评分,作为集合RK;
    (4)计算商品的推荐优先级,产生<qID,>集合updated_S;

    将updated_S 与上次对用户u 的推荐结果Rec 利用公式(4-4)进行合并,产生新的推荐结果NewRec;作为最终输出。
    5.2.2 实时推荐模块框架
    我们在recommender下新建子项目OnlineRecommender,引入spark、scala、mongo、redis和kafka的依赖:

    <dependencies>
        <!-- Spark的依赖引入 -->
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-core_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-sql_2.11</artifactId>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming_2.11</artifactId>
        </dependency>
        <!-- 引入Scala -->
        <dependency>
            <groupId>org.scala-lang</groupId>
            <artifactId>scala-library</artifactId>
        </dependency>
    
        <!-- 加入MongoDB的驱动 -->
        <!-- 用于代码方式连接MongoDB -->
        <dependency>
            <groupId>org.mongodb</groupId>
            <artifactId>casbah-core_2.11</artifactId>
            <version>${casbah.version}</version>
        </dependency>
        <!-- 用于Spark和MongoDB的对接 -->
        <dependency>
            <groupId>org.mongodb.spark</groupId>
            <artifactId>mongo-spark-connector_2.11</artifactId>
            <version>${mongodb-spark.version}</version>
        </dependency>
    
        <!-- redis -->
        <dependency>
            <groupId>redis.clients</groupId>
            <artifactId>jedis</artifactId>
            <version>2.9.0</version>
        </dependency>
    
        <!-- kafka -->
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.spark</groupId>
            <artifactId>spark-streaming-kafka-0-10_2.11</artifactId>
            <version>${spark.version}</version>
        </dependency>
    
    </dependencies>
    

    代码中首先定义样例类和一个连接助手对象(用于建立redis和mongo连接),并在StreamingRecommender中定义一些常量:
    src/main/scala/com.tcj.streaming/StreamingRecommender.scala

    // 连接助手对象
    object ConnHelper extends Serializable{
      lazy val jedis = new Jedis("localhost")
      lazy val mongoClient = MongoClient(MongoClientURI("mongodb://localhost:27017/recommender"))
    }
    
    case class MongConfig(uri:String,db:String)
    
    // 标准推荐
    case class Recommendation(productId:Int, score:Double)
    
    // 用户的推荐
    case class UserRecs(userId:Int, recs:Seq[Recommendation])
    
    //商品的相似度
    case class ProductRecs(productId:Int, recs:Seq[Recommendation])
    
    object StreamingRecommender {
    
      val MAX_USER_RATINGS_NUM = 20
      val MAX_SIM_PRODUCTS_NUM = 20
      val MONGODB_STREAM_RECS_COLLECTION = "StreamRecs"
      val MONGODB_RATING_COLLECTION = "Rating"
      val MONGODB_PRODUCT_RECS_COLLECTION = "ProductRecs"
    //入口方法
    def main(args: Array[String]): Unit = {
    }
    }
    

    实时推荐主体代码如下:

    def main(args: Array[String]): Unit = {
    
      val config = Map(
        "spark.cores" -> "local[*]",
        "mongo.uri" -> "mongodb://localhost:27017/recommender",
        "mongo.db" -> "recommender",
        "kafka.topic" -> "recommender"
      )
      //创建一个SparkConf配置
      val sparkConf = new SparkConf().setAppName("StreamingRecommender").setMaster(config("spark.cores"))
      val spark = SparkSession.builder().config(sparkConf).getOrCreate()
      val sc = spark.sparkContext
      val ssc = new StreamingContext(sc,Seconds(2))
    
      implicit val mongConfig = MongConfig(config("mongo.uri"),config("mongo.db"))
      import spark.implicits._
    
      // 广播商品相似度矩阵
      //装换成为 Map[Int, Map[Int,Double]]
      val simProductsMatrix = spark
        .read
        .option("uri",config("mongo.uri"))
        .option("collection",MONGODB_PRODUCT_RECS_COLLECTION)
        .format("com.mongodb.spark.sql")
        .load()
        .as[ProductRecs]   
        .rdd
        .map{recs =>
          (recs.productId,recs.recs.map(x=> (x.productId,x.score)).toMap)
        }.collectAsMap()  
    
      val simProductsMatrixBroadCast = sc.broadcast(simProductsMatrix)
    
      //创建到Kafka的连接
      val kafkaPara = Map(
        "bootstrap.servers" -> "localhost:9092",
        "key.deserializer" -> classOf[StringDeserializer],
        "value.deserializer" -> classOf[StringDeserializer],
        "group.id" -> "recommender",
        "auto.offset.reset" -> "latest"
      )
    
      val kafkaStream = KafkaUtils.createDirectStream[String,String](ssc,LocationStrategies.PreferConsistent,ConsumerStrategies.Subscribe[String,String](Array(config("kafka.topic")),kafkaPara))
    
      // UID|MID|SCORE|TIMESTAMP
      // 产生评分流
      val ratingStream = kafkaStream.map{case msg=>
        var attr = msg.value().split("\\|")
        (attr(0).toInt,attr(1).toInt,attr(2).toDouble,attr(3).toInt)
      }
    
    // 核心实时推荐算法
      ratingStream.foreachRDD{rdd =>
        rdd.map{case (userId,productId,score,timestamp) =>
          println(">>>>>>>>>>>>>>>>")
    
          //获取当前最近的M次商品评分
          val userRecentlyRatings = getUserRecentlyRating(MAX_USER_RATINGS_NUM,userId,ConnHelper.jedis)
    
          //获取商品P最相似的K个商品
          val simProducts = getTopSimProducts(MAX_SIM_PRODUCTS_NUM,productId,userId,simProductsMatrixBroadCast.value)
    
          //计算待选商品的推荐优先级
          val streamRecs = computeProductScores(simProductsMatrixBroadCast.value,userRecentlyRatings,simProducts)
    
          //将数据保存到MongoDB
          saveRecsToMongoDB(userId,streamRecs)
    
        }.count()
      }
    
      //启动Streaming程序
      ssc.start()
      ssc.awaitTermination()
    }
    

    5.3 实时推荐算法的实现

    实时推荐算法的前提:

    1. 在Redis集群中存储了每一个用户最近对商品的K次评分。实时算法可以快速获取。
    2. 离线推荐算法已经将商品相似度矩阵提前计算到了MongoDB中。
    3. Kafka已经获取到了用户实时的评分数据。
      算法过程如下:
      实时推荐算法输入为一个评分<userId, productId, rate, timestamp>,而执行的核心内容包括:获取userId 最近K 次评分、获取productId 最相似K 个商品、计算候选商品的推荐优先级、更新对userId 的实时推荐结果。

    5.3.1 获取用户的K次最近评分
    业务服务器在接收用户评分的时候,默认会将该评分情况以userId, productId, rate, timestamp的格式插入到Redis中该用户对应的队列当中,在实时算法中,只需要通过Redis客户端获取相对应的队列内容即可。

    import scala.collection.JavaConversions._
    /**
      * 获取当前最近的M次商品评分
      * @param num  评分的个数
      * @param userId  谁的评分
      * @return
      */
    def getUserRecentlyRating(num:Int, userId:Int,jedis:Jedis): Array[(Int,Double)] ={
      //从用户的队列中取出num个评分
      jedis.lrange("userId:"+userId.toString, 0, num).map{item =>
        val attr = item.split("\\:")
        (attr(0).trim.toInt, attr(1).trim.toDouble)
      }.toArray
    }
    

    5.3.2 获取当前商品最相似的K个商品
    在离线算法中,已经预先将商品的相似度矩阵进行了计算,所以每个商品productId 的最相似的K 个商品很容易获取:从MongoDB中读取ProductRecs数据,从productId 在simHash 对应的子哈希表中获取相似度前K 大的那些商品。输出是数据类型为Array[Int]的数组,表示与productId 最相似的商品集合,并命名为candidateProducts 以作为候选商品集合。

    /**
      * 获取当前商品K个相似的商品
      * @param num          相似商品的数量
      * @param productId          当前商品的ID
      * @param userId          当前的评分用户
      * @param simProducts    商品相似度矩阵的广播变量值
      * @param mongConfig   MongoDB的配置
      * @return
      */
    def getTopSimProducts(num:Int, productId:Int, userId:Int, simProducts:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]])(implicit mongConfig: MongConfig): Array[Int] ={
      //从广播变量的商品相似度矩阵中获取当前商品所有的相似商品
      val allSimProducts = simProducts.get(productId).get.toArray
      //获取用户已经观看过得商品
      val ratingExist = ConnHelper.mongoClient(mongConfig.db)(MONGODB_RATING_COLLECTION).find(MongoDBObject("userId" -> userId)).toArray.map{item =>
        item.get("productId").toString.toInt
      }
      //过滤掉已经评分过得商品,并排序输出
      allSimProducts.filter(x => !ratingExist.contains(x._1)).sortWith(_._2 > _._2).take(num).map(x => x._1)
    }
    

    5.3.3 商品推荐优先级计算
    对于候选商品集合simiHash和userId 的最近K 个评分recentRatings,算法代码内容如下:

    /**
      * 计算待选商品的推荐分数
      * @param simProducts            商品相似度矩阵
      * @param userRecentlyRatings  用户最近的k次评分
      * @param topSimProducts         当前商品最相似的K个商品
      * @return
      */
    def computeProductScores(
    	simProducts:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Doub
    	le]],userRecentlyRatings:Array[(Int,Double)],topSimProducts: Array[Int]): 
    	Array[(Int,Double)] ={
    
      //用于保存每一个待选商品和最近评分的每一个商品的权重得分
      val score = scala.collection.mutable.ArrayBuffer[(Int,Double)]()
    
      //用于保存每一个商品的增强因子数
      val increMap = scala.collection.mutable.HashMap[Int,Int]()
    
      //用于保存每一个商品的减弱因子数
      val decreMap = scala.collection.mutable.HashMap[Int,Int]()
    
      for (topSimProduct <- topSimProducts; userRecentlyRating <- userRecentlyRatings){
        val simScore = getProductsSimScore(simProducts,userRecentlyRating._1,topSimProduct)
        if(simScore > 0.6){
          score += ((topSimProduct, simScore * userRecentlyRating._2 ))
          if(userRecentlyRating._2 > 3){
            increMap(topSimProduct) = increMap.getOrDefault(topSimProduct,0) + 1
          }else{
            decreMap(topSimProduct) = decreMap.getOrDefault(topSimProduct,0) + 1
          }
        }
      }
    
      score.groupBy(_._1).map{case (productId,sims) =>
        (productId,sims.map(_._2).sum / sims.length + log(increMap.getOrDefault(productId, 1)) - log(decreMap.getOrDefault(productId, 1)))
      }.toArray.sortWith(_._2>_._2)
    
    }
    

    其中,getProductSimScore是取候选商品和已评分商品的相似度,代码如下:

    /**
      * 获取当个商品之间的相似度
      * @param simProducts       商品相似度矩阵
      * @param userRatingProduct 用户已经评分的商品
      * @param topSimProduct     候选商品
      * @return
      */
    def getProductsSimScore(
    simProducts:scala.collection.Map[Int,scala.collection.immutable.Map[Int,Double]], userRatingProduct:Int, topSimProduct:Int): Double ={
      simProducts.get(topSimProduct) match {
        case Some(sim) => sim.get(userRatingProduct) match {
          case Some(score) => score
          case None => 0.0
        }
        case None => 0.0
      }
    }
    而log是对数运算,这里实现为取10的对数(常用对数):
    //取10的对数
    def log(m:Int):Double ={
      math.log(m) / math.log(10)
    }
    5.3.4 将结果保存到mongoDB
    saveRecsToMongoDB函数实现了结果的保存:
    /**
      * 将数据保存到MongoDB    userId -> 1,  recs -> 22:4.5|45:3.8
      * @param streamRecs  流式的推荐结果
      * @param mongConfig  MongoDB的配置
      */
    def saveRecsToMongoDB(userId:Int,streamRecs:Array[(Int,Double)])(implicit mongConfig: MongConfig): Unit ={
      //到StreamRecs的连接
      val streaRecsCollection = ConnHelper.mongoClient(mongConfig.db)(MONGODB_STREAM_RECS_COLLECTION)
    
      streaRecsCollection.findAndRemove(MongoDBObject("userId" -> userId))
      streaRecsCollection.insert(MongoDBObject("userId" -> userId, "recs" ->
    	streamRecs.map( x => MongoDBObject("productId"->x._1,"score"->x._2)) ))
    }
    

    实时推荐模块测试

    // 启动Zookeeper服务
    [bigdata@linux zookeeper-3.4.10]$ bin/zkServer.sh start
    // 查看Zookeeper服务状态
    [bigdata@linux zookeeper-3.4.10]$ bin/zkServer.sh status
    ZooKeeper JMX enabled by default
    Using config: /home/bigdata/cluster/zookeeper-3.4.10/bin/…/conf/zoo.cfg
    Mode: standalone

    // 启动Redis服务器
    [bigdata@linux redis-4.0.2]$ redis-server /etc/redis.conf
    // 连接Redis服务器
    [bigdata@linux redis-4.0.2]$ redis-cli

    选择一条数据进行测试,这里查找到4867,一共5条数据
    在这里插入图片描述
    在Redis中加入一条数据
    在这里插入图片描述
    查看数据
    在这里插入图片描述
    再插入几条数据
    在这里插入图片描述
    运行OnlineRecommender.scala
    这里我们调高一些日志级别
    在这里插入图片描述

    //启动Kafka
    [bigdata@linux kafka]$ bin/kafka-server-start.sh -daemon ./config/server.properties
    // 关闭kafka服务
    [bigdata@linux kafka]$ bin/kafka-server-stop.sh
    
    // 创建topic
    [bigdata@linux kafka1]$ bin/kafka-topics.sh --create --zookeeper 192.168.111.152:2181 --replication-factor 1 --partitions 1 --topic recommender
    
    // kafka-console-producer
    [bigdata@linux kafka]$ bin/kafka-console-producer.sh --broker-list 192.168.111.152:9092 --topic recommender
    
    // kafka-console-consumer
    [bigdata@linux kafka]$ bin/kafka-console-consumer.sh --bootstrap-server l192.168.111.152:9092 --topic recommender
    

    5.3.5 更新实时推荐结果
    当计算出候选商品的推荐优先级的数组updatedRecommends<productId, E>后,这个数组将被发送到Web 后台服务器,与后台服务器上userId 的上次实时推荐结果recentRecommends<productId, E>进行合并、替换并选出优先级E 前K大的商品作为本次新的实时推荐。具体而言:
    a.合并:将updatedRecommends 与recentRecommends 并集合成为一个新的<productId, E>数组;
    b.替换(去重):当updatedRecommends 与recentRecommends 有重复的商品productId 时,recentRecommends 中productId 的推荐优先级由于是上次实时推荐的结果,于是将作废,被替换成代表了更新后的updatedRecommends的productId 的推荐优先级;
    c.选取TopK:在合并、替换后的<productId, E>数组上,根据每个product 的推荐优先级,选择出前K 大的商品,作为本次实时推荐的最终结果。

    5.4 实时系统联调

    我们的系统实时推荐的数据流向是:业务系统 -> 日志 -> flume 日志采集 -> kafka streaming数据清洗和预处理 -> spark streaming 流式计算。在我们完成实时推荐服务的代码后,应该与其它工具进行联调测试,确保系统正常运行。

    5.4.1 启动实时系统的基本组件
    启动实时推荐系统StreamingRecommender以及mongodb、redis

    5.4.2 启动zookeeper

    bin/zkServer.sh start
    

    5.4.3 启动kafka

    bin/kafka-server-start.sh -daemon ./config/server.properties
    

    5.4.4 构建Kafka Streaming程序
    在recommender下新建module,KafkaStreaming,主要用来做日志数据的预处理,过滤出需要的内容。pom.xml文件需要引入依赖:

    <dependencies>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-streams</artifactId>
            <version>0.10.2.1</version>
        </dependency>
        <dependency>
            <groupId>org.apache.kafka</groupId>
            <artifactId>kafka-clients</artifactId>
            <version>0.10.2.1</version>
        </dependency>
    </dependencies>
    
    <build>
        <finalName>kafkastream</finalName>
        <plugins>
            <plugin>
                <groupId>org.apache.maven.plugins</groupId>
                <artifactId>maven-assembly-plugin</artifactId>
                <configuration>
                    <archive>
                        <manifest>
                            <mainClass>com.atguigu.kafkastream.Application</mainClass>
                        </manifest>
                    </archive>
                    <descriptorRefs>
                        <descriptorRef>jar-with-dependencies</descriptorRef>
                    </descriptorRefs>
                </configuration>
                <executions>
                    <execution>
                        <id>make-assembly</id>
                        <phase>package</phase>
                        <goals>
                            <goal>single</goal>
                        </goals>
                    </execution>
                </executions>
            </plugin>
        </plugins>
    </build>
    

    在src/main/java下新建java类com.atguigu.kafkastreaming.Application

    public class Application {
        public static void main(String[] args){
    
            String brokers = "localhost:9092";
            String zookeepers = "localhost:2181";
    
            // 定义输入和输出的topic
            String from = "log";
            String to = "recommender";
    
            // 定义kafka streaming的配置
            Properties settings = new Properties();
            settings.put(StreamsConfig.APPLICATION_ID_CONFIG, "logFilter");
            settings.put(StreamsConfig.BOOTSTRAP_SERVERS_CONFIG, brokers);
            settings.put(StreamsConfig.ZOOKEEPER_CONNECT_CONFIG, zookeepers);
    
            StreamsConfig config = new StreamsConfig(settings);
    
            // 拓扑建构器
            TopologyBuilder builder = new TopologyBuilder();
    
            // 定义流处理的拓扑结构
            builder.addSource("SOURCE", from)
                    .addProcessor("PROCESS", () -> new LogProcessor(), "SOURCE")
                    .addSink("SINK", to, "PROCESS");
    
            KafkaStreams streams = new KafkaStreams(builder, config);
            streams.start();
        }
    }
    

    这个程序会将topic为“log”的信息流获取来做处理,并以“recommender”为新的topic转发出去。
    流处理程序 LogProcess.java

    public class LogProcessor implements Processor<byte[],byte[]> {
        private ProcessorContext context;
    
        public void init(ProcessorContext context) {
            this.context = context;
        }
    
        public void process(byte[] dummy, byte[] line) {
            String input = new String(line);
            // 根据前缀过滤日志信息,提取后面的内容
            if(input.contains("PRODUCT_RATING_PREFIX:")){
                System.out.println("product rating coming!!!!" + input);
                input = input.split("PRODUCT_RATING_PREFIX:")[1].trim();
                context.forward("logProcessor".getBytes(), input.getBytes());
            }
        }
        public void punctuate(long timestamp) {
        }
        public void close() {
        }
    }
    

    完成代码后,启动Application。
    在这里插入图片描述

    5.4.5 配置并启动flume
    在flume的conf目录下新建log-kafka.properties,对flume连接kafka做配置:

    agent.sources = exectail
    agent.channels = memoryChannel
    agent.sinks = kafkasink
    
    # For each one of the sources, the type is defined
    agent.sources.exectail.type = exec
    # 下面这个路径是需要收集日志的绝对路径,改为自己的日志目录
    agent.sources.exectail.command = tail –f
    //d/Projects/BigData/ECommerceRecommenderSystem/businessServer/src/main/log/agent.log
    agent.sources.exectail.interceptors=i1
    agent.sources.exectail.interceptors.i1.type=regex_filter
    # 定义日志过滤前缀的正则
    agent.sources.exectail.interceptors.i1.regex=.+PRODUCT_RATING_PREFIX.+
    # The channel can be defined as follows.
    agent.sources.exectail.channels = memoryChannel
    
    # Each sink's type must be defined
    agent.sinks.kafkasink.type = org.apache.flume.sink.kafka.KafkaSink
    agent.sinks.kafkasink.kafka.topic = log
    agent.sinks.kafkasink.kafka.bootstrap.servers = 192.168.111.152:9092
    agent.sinks.kafkasink.kafka.producer.acks = 1
    agent.sinks.kafkasink.kafka.flumeBatchSize = 20
    
    #Specify the channel the sink should use
    agent.sinks.kafkasink.channel = memoryChannel
    
    # Each channel's type is defined.
    agent.channels.memoryChannel.type = memory
    
    # Other config values specific to each type of channel(sink or source)
    # can be defined as well
    # In this case, it specifies the capacity of the memory channel
    agent.channels.memoryChannel.capacity = 10000
    

    配置好后,启动flume:

    ./bin/flume-ng agent -c ./conf/ -f ./conf/log-kafka.properties -n agent -Dflume.root.logger=INFO,console
    

    ![在这里插入图片描述](https://img-blog.csdnimg.cn/20201221160934369.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0lUX3Rhbg==,size_16,color_FFFFFF,t_70在这里插入图片描述
    在这里插入图片描述
    在这里插入图片描述
    ![在这里插入图片描述](https://img-blog.csdnimg.cn/20201221161742108.png?x-oss-process=image/watermark,type_ZmFuZ3poZW5naGVpdGk,shadow_10,text_aHR0cHM6Ly9ibG9nLmNzZG4ubmV0L0lUX3Rhbg==,size_16,color_FFFFFF,t_70

    在这里插入图片描述

    在这里插入图片描述

    第六章 其它形式的离线相似推荐服务

    6.1 基于内容的相似推荐

    原始数据中的tag文件,是用户给商品打上的标签,这部分内容想要直接转成评分并不容易,不过我们可以将标签内容进行提取,得到商品的内容特征向量,进而可以通过求取相似度矩阵。这部分可以与实时推荐系统直接对接,计算出与用户当前评分商品的相似商品,实现基于内容的实时推荐。为了避免热门标签对特征提取的影响,我们还可以通过TF-IDF算法对标签的权重进行调整,从而尽可能地接近用户偏好。
    基于以上思想,加入TF-IDF算法的求取商品特征向量的核心代码如下:

    
    ```java
    // 载入商品数据集
    val productTagsDF = spark
      .read
      .option("uri",mongoConfig.uri)
      .option("collection",MONGODB_PRODUCT_COLLECTION)
      .format("com.mongodb.spark.sql")
      .load()
      .as[Product]
      .map(x => (x.productId, x.name, x.genres.map(c => if(c == '|') ' ' else c)))
      .toDF("productId", "name", "tags").cache()
    
    // 实例化一个分词器,默认按空格分
    val tokenizer = new Tokenizer().setInputCol("tags").setOutputCol("words")
    
    // 用分词器做转换
    val wordsData = tokenizer.transform(productTagsDF)
    
    // 定义一个HashingTF工具
    val hashingTF = new HashingTF().setInputCol("words").setOutputCol("rawFeatures").setNumFeatures(200)
    
    // 用 HashingTF 做处理
    val featurizedData = hashingTF.transform(wordsData)
    
    // 定义一个IDF工具
    val idf = new IDF().setInputCol("rawFeatures").setOutputCol("features")
    
    // 将词频数据传入,得到idf模型(统计文档)
    val idfModel = idf.fit(featurizedData)
    
    // 用tf-idf算法得到新的特征矩阵
    val rescaledData = idfModel.transform(featurizedData)
    
    // 从计算得到的 rescaledData 中提取特征向量
    val productFeatures = rescaledData.map{
      case row => ( row.getAs[Int]("productId"),row.getAs[SparseVector]("features").toArray )
    }
      .rdd
      .map(x => {
        (x._1, new DoubleMatrix(x._2) )
      })
    
    
    然后通过商品特征向量进而求出相似度矩阵,就可以在商品详情页给出相似推荐了;通常在电商网站中,用户浏览商品或者购买完成之后,都会显示类似的推荐列表。
    得到的相似度矩阵也可以为实时推荐提供基础,得到用户推荐列表。可以看出,基于内容和基于隐语义模型,目的都是为了提取出物品的特征向量,从而可以计算出相似度矩阵。而我们的实时推荐系统算法正是基于相似度来定义的。
    
    ## 6.2 基于物品的协同过滤相似推荐
    
    基于物品的协同过滤(Item-CF),只需收集用户的常规行为数据(比如点击、收藏、购买)就可以得到商品间的相似度,在实际项目中应用很广。
    我们的整体思想是,如果两个商品有同样的受众(感兴趣的人群),那么它们就是有内在相关性的。所以可以利用已有的行为数据,分析商品受众的相似程度,进而得出商品间的相似度。我们把这种方法定义为物品的“同现相似度”,公式如下:
    ![在这里插入图片描述](https://img-blog.csdnimg.cn/20201221134247937.png)
    
     
    其中,Ni 是购买商品 i (或对商品 i 评分)的用户列表,Nj 是购买商品 j 的用户列表。
    核心代码实现如下:
    
    ```java
        val ratingDF = spark.read
          .option("uri", mongoConfig.uri)
          .option("collection", MONGODB_RATING_COLLECTION)
          .format("com.mongodb.spark.sql")
          .load()
          .as[Rating]
          .map(x=> (x.userId, x.productId, x.score) )
          .toDF("userId", "productId", "rating")
    
        // 统计每个商品的评分个数,并通过内连接添加到 ratingDF 中
        val numRatersPerProduct = ratingDF.groupBy("productId").count()
        val ratingWithCountDF = ratingDF.join(numRatersPerProduct, "productId")
    
        // 将商品评分按 userId 两两配对,可以统计两个商品被同一用户做出评分的次数
        val joinedDF = ratingWithCountDF.join(ratingWithCountDF, "userId")
          .toDF("userId", "product1", "rating1", "count1", "product2", "rating2", "count2")
          .select("userId", "product1", "count1", "product2", "count2")
        joinedDF.createOrReplaceTempView("joined")
        val cooccurrenceDF = spark.sql(
          """
            |select product1
            |, product2
            |, count(userId) as coocount
            |, first(count1) as count1
            |, first(count2) as count2
            |from joined
            |group by product1, product2
          """.stripMargin
        ).cache()
    
        val simDF = cooccurrenceDF.map{ row =>
          // 用同现的次数和各自的次数,计算同现相似度
          val coocSim = cooccurrenceSim( row.getAs[Long]("coocount"), row.getAs[Long]("count1"), row.getAs[Long]("count2") )
          ( row.getAs[Int]("product1"), ( row.getAs[Int]("product2"), coocSim ) )
        }
          .rdd
          .groupByKey()
          .map{
            case (productId, recs) =>
              ProductRecs( productId,
                recs.toList
                  .filter(x=>x._1 != productId)
                  .sortWith(_._2>_._2)
                  .map(x=>Recommendation(x._1,x._2))
                  .take(MAX_RECOMMENDATION)
              )
          }
          .toDF()
    其中,计算同现相似度的函数代码实现如下:
    def cooccurrenceSim(cooCount: Long, count1: Long, count2: Long): Double ={
          cooCount / math.sqrt( count1 * count2 )
        }
    

    第七章 程序部署与运行

    第八章 项目总结

    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    
    展开全文
  • 大数据项目实战:电商推荐系统

    千人学习 2019-03-01 10:46:10
    打造的电商推荐系统项目,就是以经过修改的中文亚马逊电商数据集作为依托,并以某电商网站真实的业务架构作为基础来实现的,其中包含了离线推荐与实时推荐体系,综合利用了协同过滤算法以及基于内容的推荐方法来提供...
  • 电商推荐系统实践

    2020-12-16 09:45:10
    今天为大家分享下京东电商推荐系统实践方面的经验,主要包括: 简介 排序模块 实时更新 召回和首轮排序 实验平台 简介 说到推荐系统,最经典的就是协同过滤,上图是一个协同过滤的例子...

    今天为大家分享下京东电商推荐系统实践方面的经验,主要包括:

    • 简介

    • 排序模块

    • 实时更新

    • 召回和首轮排序

    • 实验平台

    简介

    图片

    说到推荐系统,最经典的就是协同过滤,上图是一个协同过滤的例子。协同过滤主要分为俩种:user-based 基于用户的协同过滤和 item-based 基于商品的协同过滤

    但是,现在绝大多数推荐系统都不会直接使用协同过滤来做推荐。目前主要用的是 learning to rank 框架

    这里,是推荐系统的框架,整个推荐系统可以分为两部分,在线部分和离线部分

    • 在线部分主要负责当用户访问时,如何把结果拼装好,然后返回给用户。主要模块有召回、排序和对结果的调整。

    • 离线部分主要是对用户日志的数据分析,应用于线上。

    整个推荐系统大概就是这样的一个框架。

    和新闻、视频这类的内容推荐相比,电商推荐系统又有一些特殊的地方,比如:

    优化方向(点击、销售额、时长、用户留存等)。另外,电商中推荐的内容也会有很多种,尤其像是活动类的内容,很多推荐都是算法和人工运营共同完成的。这就是电商推荐和新闻推荐等的区别之处。

    我们展开看下在线推荐系统:

    除了刚才说的召回和排序以及最终的调整之外,还有实践过程中的一些细节。

    • 召回:这里召回会有很多种方法,如协同过滤,热门商品、实时促销等和应用场景相关的召回,还有一些基于 KNN 的召回。

    • 过滤:召回之后,会进行过滤,主要是和应用场景相关,如已购商品过滤掉、没有库存的过滤掉,或者敏感的商品过滤掉等等这些逻辑

    • 排序:排序目前主要用到的是 DNN 模型,某些流量比较小的地方会用到 GBDT。

    • 过滤:排序之后还会有些分页、同商品过滤等逻辑。

    调整:最终调整过程中,主要有两部分逻辑,多样性和探索逻辑

    排序模块

    1. 模型结构

    图片

    深度学习 ranking 模型结构我们不作为重点讨论,这里列举了一种最经典的模型,它们都用到了很多 id 的 Embedding,然后这些 Embedding 规模都很大,这样训练和上线都比较耗时。因此,我们做了一些优化,比如做分布式的训练,并且会有一套 Pipeline 来完成模型的上线。另外,虽然模型很复杂,并且能带来很好的效果,但是特征工程还是必不可少的,很多指标的提升还是依赖于特征工程,当然也包括一些模型调整的工作。

    2. 实践

    图片

    那么如何把这些模型落地呢?我们看下整个模型的上线过程:

    首先最重要的部分是模型训练平台和排序服务,因为很多深度模型计算量要求很高,为了能达到比较快的效果,需要部署单独的排序服务。目前比较流行的是 TensorFlow serving,可以很快速的来上线一个深度模型,并充分利用对分片、单机并行,达到很高的计算效率。

    模型线上线下一致性问题对于模型效果非常重要,我们使用特征日志来实时记录特征,保证特征的一致性。这样离线处理的时候会把实时的用户反馈,和特征日志做一个结合生成训练样本,然后更新到模型训练平台上,平台更新之后在推送到线上,这样整个排序形成了一个闭环。

    3. 实时更新

    我们的特征和模型都需要做实时的更新。因为我们经常需要很快的 catch 一些实时的信号,比如需要实时的用户画像来抓住实时的用户兴趣的变化,还比如需要抓住实时的商品画像,因为经常会有一些活动或者爆品,我们需要快速的捕捉这些信号,并应用到推荐中。另外还有一些实时的召回和特征,比如一些交叉的特征,实时的点击率,实时订单等特征

    除了特征外,模型也需要实时更新,对于电商场景来说这是有一定困难的,因为订单是有延时的,延时可能是十几分钟到十几小时不等,这样实时模型更新上就会采取一些保守的策略,比如用点击率对模型做些微调,然后订单数据再通过离线来获得,这属于业务场景的限制。

    思考

    排序可以算是推荐系统中比较重要的一个环节,但是只有排序肯定是不够的,事实上,有一些问题是目前的排序框架无法解决的:

    • 排序得到的结果非常相似,影响体验。

    • 有多个优化目标,需要一个平衡(点击率、订单金额、用户交互时长等)

    • 计算能力有限,如果有无限的计算力,可以直接对全部候选集进行排序。

    1. 多样性

    使用模型输出的结果一般都会非常相似,如果直接给用户看体验会很差,因此在模型之后我们需要加入多样性的逻辑。

    比较通用的解决办法是多样性的 ranking,这是一个贪心算法,从第一个商品开始选,当选第二个商品的时候,会重新计算下候选集中每个商品的 score,然后选择一个 score 最高的。我们的方法是看 novelty score 候选商品的产品词分布和之前 N 个商品的产品词分布的 KL 距离。这样做的思路,就是选一个和已有商品最不像的商品,来更好的保证商品推荐结果的多样性。

    由于纯基于算法的多样性可能会出现 badcase,因此还需要一个规则来进行兜底,确保在极端情况下结果也能接受。

    最后,我们思考一个问题,有没有更好的方法实现多样性的逻辑呢?当然有,比如是否可以考虑使用 list wise ranking。这里只是为大家分享一个比较容易的,并且效果比较好的方法。

    2. 多目标

    图片

    我们的优化目标有很多,比如点击、转化、时长等,问题会变得比较复杂,单一的模型训练很难覆盖到所有指标。另外,经常我们需要在各个指标之间进行权衡,因此可调试性也非常重要。

    一种很有用的方式是多模型 ranking,然后用某种方式把所有模型的结果 combine。

    这也体现了一个思想,在算法的实际应用中,其实需要在算法的先进性和系统可维护性、可调试性之间做一个平衡。往往 paper 里很有创意的算法落地的时候是有些困难的。

    3. 多轮排序

    图片

    下面我们讨论一下多轮排序的问题。多轮排序是 learning to rank 实践中很重要的一个思想。使用多轮排序主要是因为计算资源的限制,无法使用复杂的模型进行大规模的候选集排序。右图描述了一个多轮排序的框架。这像是一个漏斗模型,从上往下模型的复杂度是递增的,同时候选集是逐渐减少的,就是越到后面用越复杂的模型来保证效果更好,越到前面可能只需要简单的模型来保证能拿到一些商品就可以了。

    这样会存在一个问题,由于训练样本可能有偏,导致只有被用户看到的样本才有 label,但是一般不会有太大的影响。

    基于索引的首轮排序

    1. 索引召回

    下面我们重点介绍一下第一轮排序。倒排索引很常见,是信息检索里常用的工具。它通过把 doc 的内容索引到 doc id 的方式,快速通过内容来查找 doc。我们很多召回都是通过索引实现的。这里我列举了一些基于索引的召回方式,如 item cf 的 key、产品词、热门类目、促销产品词等。

    虽然索引能够很大程度上的缩小候选集的范围,但是经常情况下,第一轮排序的 doc 数量仍然可能会很大。为了保证性能,截断逻辑是必不可少的。通过情况下可以通过 quality score 截断,保留质量好的 doc。经过线性的 LR 或者 GBDT 模型就可以有结果了。另外截断之后需要有些多样性的逻辑,因为只有在召回的时候保持多样性,最终结果才会有多样性。

     

    基于 quality score 截断是一种 naive 的算法,这里我们讨论另一种业界也较常用的算法,wand。wand 其实是 weak and,它的重点是 wand 操作符。wand 操作符是一个布尔操作符,当 Xi wi 比 θ 大时,它的值是1,否则是0。之所以叫做 weak-and,是因为当 w 都取1, θ 取 K 时,wand 操作符就变成了 and,当 w 取1,θ 取1时,wand 操作符就变成了 or。可以看出 wand 是介于 and 和 or 之间的操作。对 Xi wi 求和的操作其实和我们线性模型很相似。通过 wand 操作符,我们可以定义一些上界,因为是倒排索引,可以给每个索引链赋予一个估计值,这样就可以拿到权重上界 UBt,这样通过和 wand 操作符对比,就可以快速的判断 UBt 是否满足条件,如果满足条件就可以快速的把一些 doc 扔掉,这样就可以快速的使用线性模型对全户做 ranking。可以看到,基于线性模型的分数做截断,比完全基于 quality score 截断的策略要稍微好一点。

    图片

    这里我列了 paper 中 wand 算法的伪代码。出于时间关系,我们不会过算法逻辑的细节。我认为它的主要的思路是通过快速使用 upper bound 做截断和跳转,可以略过很多明显不符合的候选 doc,从而减少计算 score 的次数。当然这种方法对于线性模型来说,有一个缺点,当我们需要多样性的时候,没办法很好的实现在模型中增加多样性的。

    wand 算法目前已经应用非常广泛了,在很多开源的索引如 lucene 中,也会用到这种方法快速计算文本相关分。

    图片

    刚刚我们介绍了使用倒排索引做第一轮排序,以及一个常见的排序加速算法,回过来我们思考一下倒排索引本身,它适用于什么场景,不适用于什么场景。

    首先它适用于 kv 查找这种场景,并且 kv 查找也属于实际应用很多的情况。但是对于更复杂的方式,类似 graph 的召回方式不友好,比如找用户看过的商品中相似商品的相关商品,这时实现起来会比较麻烦,这是它的一些限制。再一个,我们需要有较好的截断策略,例如底层使用 relevence score 截断,排序使用 GBDT

    当然,索引还会受到机器本身的内存限制,限于机器的大小,很多时候我们需要多机分片部署索引,这样会带来一定的复杂性。虽然有些限制,但是索引是目前应用很广泛、有效的方式,包括在推荐、搜索等领域都会使用到。

    2. KNN 召回

    图片

    除了索引召回,KNN 也是现在较常用的一种召回方式。首先,我们把所有的候选集转换成 embedding,我们把用户兴趣也可以转换成 embedding,通过定义 embedding 之间距离计算公式,我们可以定义 KNN 召回问题,也就是在全部候选池中,找到与用户最接近的 k 个结果。

    图片

    定义好 KNN 召回的问题,下一步就是如何找到最近的 K 个候选集。由于整个候选集非常大,每次都使用用户的 embedding 去全量计算距离是不现实的,只能使用一种近似算法。我们今天分享其中的一种近似算法。是 facebook 开源的 KNN 计算库 faiss 使用的。其原理:

    首先需要对全部候选集进行分块,每一块都会有自己的质心。paper 中使用 Lloyd 算法,将整个空间划分开。分块后,就需要对每一块构建索引,进而通过索引实现快速检索的功能。

    右图是索引构建和检索的方法。

    上半部分是如何构建索引(这里的优化点是使用了二级索引):首先拿到 y 候选集之后,做一个 quantizer 分类得到一个一级索引,把它放到索引表中,另外还得到残差 compute residual,可以对残差再进行一次 quantizer,得到一个二级索引,通过两级索引来加快检索的速度,同理,在真正的 quary 的时候,拿到的是用户的向量 x,先做一个 quantizer,得到 k 近邻的一级索引,然后查找 k 个一级索引,同时拿到 k 个二级索引,然后在二级索引中查找,然后这里还有很多加速的算法(这里就不展开了),通过这样一种多层的查询方式来做到加速 K 近邻的算法。

    PS:关于 KNN 的一些思考,KNN 是一种有效的方式,但是不是唯一有效的方式。比如之后分享的 TDM,能够比 KNN 更加灵活。

    实验平台

    最后简单介绍下分层实验平台,因为大家想快速迭代特征和模型,离不开实验,经常会遇到的情况是实验流量不够用了,这时就需要对实验做分层。分层的逻辑见右图,通过在不同的 Layer 使用不同的哈希函数,保证每个 Layer 之间流量是正交的,这样就可以在不同的 Layer 上做不同的实验。

    图片

    分层实验的具体做法:召回->排序->后处理->业务,另外还有一部分对齐流量,用来做全量的验证。

    分层的优点,可以用于做实验的流量多,适合快速迭代;缺点,需要严格控制层与层之间的关系,防止相互干扰。

    展开全文
  • 大数据项目电商推荐系统经修改过的源码,已运行出来,论文可联系我获取。 免积分下载,如遇需要积分是csdn根据下载量自动累加设置的,请联系我设置免积分下载。 文章链接...
  • 目录:/80 尚硅谷大数据项目之电商推荐系统 ┣━━4.视频 ┃┣━━01 电商推荐系统_课程简介.wmv ┃┣━━02 电商推荐系统_项目系统设计(上).wmv ┃┣━━03 电商推荐系统_项目系统设计(中).wmv ┃┣━━04 电商...

    目录:/80 尚硅谷大数据项目之电商推荐系统
    ┣━━4.视频
    ┃    ┣━━01 电商推荐系统_课程简介.wmv
    ┃    ┣━━02 电商推荐系统_项目系统设计(上).wmv
    ┃    ┣━━03 电商推荐系统_项目系统设计(中).wmv
    ┃    ┣━━04 电商推荐系统_项目系统设计(下).wmv
    ┃    ┣━━05 电商推荐系统_项目框架搭建.wmv
    ┃    ┣━━06 电商推荐系统_数据加载模块(上).wmv
    ┃    ┣━━07 电商推荐系统_数据加载模块(中).wmv
    ┃    ┣━━08 电商推荐系统_数据加载模块(下).wmv
    ┃    ┣━━09 电商推荐系统_统计推荐模块(上).wmv
    ┃    ┣━━10 电商推荐系统_统计推荐模块(下).wmv
    ┃    ┣━━11 电商推荐系统_基于LFM的离线推荐模块(上).wmv
    ┃    ┣━━12 电商推荐系统_基于LFM的离线推荐模块(中).wmv
    ┃    ┣━━13 电商推荐系统_基于LFM的离线推荐模块(下).wmv
    ┃    ┣━━14 电商推荐系统_ALS模型评估和参数选择(上).wmv
    ┃    ┣━━15 电商推荐系统_ALS模型评估和参数选取(下).wmv
    ┃    ┣━━16 电商推荐系统_实时推荐模块(一).wmv
    ┃    ┣━━17 电商推荐系统_实时推荐模块(二).wmv
    ┃    ┣━━18 电商推荐系统_实时推荐模块(三).wmv
    ┃    ┣━━19 电商推荐系统_实时推荐模块(四).wmv
    ┃    ┣━━20 电商推荐系统_实时推荐模块(五).wmv
    ┃    ┣━━21 电商推荐系统_实时推荐模块(六).wmv
    ┃    ┣━━22 电商推荐系统_实时推荐模块测试.wmv
    ┃    ┣━━23 电商推荐系统_实时系统联调(上).wmv
    ┃    ┣━━24 电商推荐系统_实时系统联调(下).wmv
    ┃    ┣━━25 电商推荐系统_基于内容的离线推荐模块(上).wmv
    ┃    ┣━━26 电商推荐系统_基于内容的离线推荐模块(下).wmv
    ┃    ┣━━27 电商推荐系统_基于ItemCF的离线推荐(上).wmv
    ┃    ┣━━28 电商推荐系统_基于ItemCF的离线推荐(下).wmv
    ┃    ┗━━29 电商推荐系统_项目总结和部署.wmv
    ┣━━1.笔记.zip
    ┣━━2.资料.zip
    ┗━━3.代码.zip

     

    下载地址:百度云盘

    展开全文
  • 本教程为官方授权出品 如今大数据已经成了各大互联网公司工作的重点方向,而推荐系统可以说就是大数据最好的落地应用之一,已经为企业带来了...量身定制打造的电商推荐系统项目,就是以经过修改的中文亚马逊...
  • 本教程为官方授权出品 如今大数据已经成了各大互联网公司工作的重点方向,而推荐系统可以说就是大数据最好的落地应用之一,已经为企业带来了...量身定制打造的电商推荐系统项目,就是以经过修改的中文亚马逊...
  • 今天为大家分享下京东电商推荐系统实践方面的经验,主要包括:简介排序模块实时更新召回和首轮排序实验平台▌简介说到推荐系统,最经典的就是协同过滤,上图是一个协同过滤的例子。协同过滤主要分为俩...
  • #大数据之电商推荐系统# 项目系统架构 数据整理 商品数据 商品ID 商品名称 商品种类 商品图片URL 商品标签 productId name categories imageUrl tags 评分数据 用户ID 商品ID 商品评分 评分时间...
  • 基于Spark的电商推荐系统

    千次阅读 2019-12-06 10:46:55
    项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建了符合教学体系的一体化的电商推荐系统,包含了离线推荐与实时推荐体系,综合利用了协同过滤算法...
  • 电商推荐系统(一)

    2021-03-22 20:15:32
    项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建了符合教学体系的一体化的电商推荐系统,包含了离线推荐与实时推荐体系,综合利用了协同过滤算法...
  • 电商推荐系统(一)

    2020-07-30 23:35:12
    电商推荐系统设计数据生命周期大数据处理流程项目系统结构数据源主要数据模型统计推荐模块历史热门商品统计近期热门商品统计商品平均评分统计基于LFM的离线推荐模块用ALS算法训练隐语义模型(LFM)基于模型的实时...
  • Hive电商推荐系统开发实战1.构建数据仓库2.数据清洗3.推荐算法实现4.数据ETL 项目的主要流程: #mermaid-svg-y1KC8DzwYk96JI79 .label{font-family:'trebuchet ms', verdana, arial;font-family:var(--mermaid-font-...
  • 项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建的电商推荐系统,包含了离线推荐与实时推荐体系,综合利用了协同过滤算法以及基于内容的推荐方法...
  • 项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建了符合教学体系的一体化的电商推荐系统,包含了离线推荐与实时推荐体系,综合利用了协同过滤算法...
  • 今天给大家带来京东推荐广告算法负责人彭长平先生分享的《京东电商推荐系统的应用实践》。数字化信息时代,推荐系统已成为To C互联网产品的标配技术,而推荐算法对于业务收益的提升也起到了至关重...
  • 大数据项目实战:电商推荐系统 尚硅谷讲师,辽宁工程技术大学硕士,曾先后就职于...
  • python+django
  • 项目以推荐系统建设领域知名的经过修改过的中文亚马逊电商数据集作为依托,以某电商网站真实业务数据架构为基础,构建了符合教学体系的一体化的电商推荐系统,包含了离线推荐与实时推荐体系,综合利用了协同过滤...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,286
精华内容 914
关键字:

电商推荐系统