2016-01-31 18:27:59 kwu_ganymede 阅读数 2451
  • 大数据—Scala

    一、Scala核心编程课程简介 近年来随着大数据的兴起,大数据核心框架Spark和Kafka也受到到广泛关注, Spark底层是Scala实现的, 因此也带火了Scala语言,目前Scala被全球知名公司(如:谷歌、百度、阿里、Twitter、京东等)广泛用于Spark开发。新一代的物联网时代到来,会对大数据应用人才的需求越加紧迫。 尚硅谷网罗和整合了学员很喜爱的师资,打造出专注于Spark开发的Scala课程,我们讲解Scala核心编程技术,同时也讲解编程思想、设计模式和Scala底层实现,让您有豁然开朗的感受。 二、课程内容和目标 本课程重点讲解Scala核心编程,内容包括: Scala语言概述、运算符、程序流程控制、数据结构之集合、Map映射、过滤、化简、折叠、扫描、拉链、视图、并行集合、高阶函数、函数柯里化、偏函数、参数推断、控制抽象、Trait、面向对象编程、异常处理、惰性函数、Akka及Actor模型、Spark Master和Worker通讯、隐式转换、隐式参数、工厂模式、单例模式、观察者模式、装饰者模式、代理模式、泛型、上下界、视图界定、上下文界定、协变逆变不变和源码剖析。通过系统全面的学习,学员能掌握Scala编程思想和Scala底层机制,为进一步学习Spark打下坚实基础。 三、谁适合学 1.希望以较低的投入和风险,来了解自己是否适合转型从事Spark开发的求职人员。 2.有一定的Java基础,或者自学过一些Java书籍与视频资料,想系统掌握Scala语言的程序员。

    3716 人正在学习 去看看 缪传海

Spark爆发式的成长导致今年Spark相关书籍明显比去年那时候多得多,这里给出国内外目前所有的书籍,推荐给大家,希望能够对大家有所帮助。因为在网络上,不便于将书籍的电子版公开出来,需要的可以私信我或加我微信:zhouzhihubeyond

国内Spark书籍: 
《Spark大数据处理:技术、应用与性能优化》,作者:高彦杰,出版社:机械工业出版社,出版时间:2014年11月,Spark版本:1.0; 
《Spark大数据处理技术》,作者:夏俊鸾,黄洁,程浩 等,出版社:电子工业出版社,出版时间:2015-01-01,Spark版本:0.9.0; 
《Spark技术内幕:深入解析Spark内核架构设计与实现原理》,作者:张安站,出版社:机械工业出版社,出版时间:2015-09-01,Spark版本:1.2.0;
《Apache Spark源码剖析》,作者:许鹏,出版社:电子工业出版社,出版时间:2015-03-01,Spark版本:1.0; 
《大数据Spark企业级实战》,作者:王家林,出版社:电子工业出版社,出版时间:2015-01-01,Spark版本:1.3;

国外Spark书籍: 
《Spark机器学习》,作者:彭特里思(Nick Pentreath) 著,译者:蔡立宇,黄章帅,周济民,出版社:人民邮电出版社,出版时间:2015-09-01;

《Fast Data Processing with Spark, 2nd Edition》,作者:Krishna Sankar, Holden Karau,出版社:Packt Publishing,出版时间:2015 
《Mastering Apache Spark》,作者:Mike Frampton,出版社:Packt Publishing,出版时间:2015 
《Advanced Analytics with Spark》,作者:Sandy Ryza, Uri Laserson, Sean Owen, Josh Wills,出版社:O’Reilly Media,出版时间:2015 
《Learning Spark》,作者:Holden Karau, Andy Konwinski, Patrick Wendell, Matei Zaharia,出版社:O’Reilly Media,出版时间:2015 
《Spark Cookbook》,作者:Rishi Yadav,出版社:Packt Publishing,出版时间:July 2015

2018-03-24 16:34:35 u013560925 阅读数 6859
  • 大数据—Scala

    一、Scala核心编程课程简介 近年来随着大数据的兴起,大数据核心框架Spark和Kafka也受到到广泛关注, Spark底层是Scala实现的, 因此也带火了Scala语言,目前Scala被全球知名公司(如:谷歌、百度、阿里、Twitter、京东等)广泛用于Spark开发。新一代的物联网时代到来,会对大数据应用人才的需求越加紧迫。 尚硅谷网罗和整合了学员很喜爱的师资,打造出专注于Spark开发的Scala课程,我们讲解Scala核心编程技术,同时也讲解编程思想、设计模式和Scala底层实现,让您有豁然开朗的感受。 二、课程内容和目标 本课程重点讲解Scala核心编程,内容包括: Scala语言概述、运算符、程序流程控制、数据结构之集合、Map映射、过滤、化简、折叠、扫描、拉链、视图、并行集合、高阶函数、函数柯里化、偏函数、参数推断、控制抽象、Trait、面向对象编程、异常处理、惰性函数、Akka及Actor模型、Spark Master和Worker通讯、隐式转换、隐式参数、工厂模式、单例模式、观察者模式、装饰者模式、代理模式、泛型、上下界、视图界定、上下文界定、协变逆变不变和源码剖析。通过系统全面的学习,学员能掌握Scala编程思想和Scala底层机制,为进一步学习Spark打下坚实基础。 三、谁适合学 1.希望以较低的投入和风险,来了解自己是否适合转型从事Spark开发的求职人员。 2.有一定的Java基础,或者自学过一些Java书籍与视频资料,想系统掌握Scala语言的程序员。

    3716 人正在学习 去看看 缪传海

1.Spark

   (1)Spark快速大数据分析

         介绍:由spark开发者编写,无过多实现细节,注重基础理念,适合小白版可以让数据科学家和工程师即刻上手。你能学到如何使用简短的代码实现复杂的并行作业,还能了解从简单的批处理作业到流处理以及机器学习等应用。

         封皮:


     (2)  Spark大数据商业实战三部曲:内核解密|商业案例|性能调优

          介绍:国内作者,1000多页,涵盖源码、实践和调优,目前正在读,18年3月第一版,虽存在许多文字错误,但是感觉内容蛮不错的,如果你已经入门spark一段时间了,可以看一下这本书,边看源码部分,边做实践部分,最后把调优部分看了,应该会有很大的提升。

           封皮:


2.Hadoop

        等待添加。。。。


3.学习网站

    (1) http://www.chinahadoop.cn/bigdata/all/list 小象学院

       介绍:很多不错的免费课程,也有收费的,感觉还行,课程举例:《实时大数据处理平台的设计与实现》、《广告系统中缓存应用》

       网站截图:

 



    (2) 极客学院

     介绍:收费很合理,其他网站动不动就几百几千的课程,简直就是抢钱,按会员模式收费,一个月会员也就几十块钱,视频都可以看,更加注重基础,内容覆盖面广。

             !!!重点是,他有不同职业的技术路线,职业规划和知识体系这种总结性的信息,很不错的网站。

     网站截图:



2018-08-19 15:15:04 weixin_41366941 阅读数 2182
  • 大数据—Scala

    一、Scala核心编程课程简介 近年来随着大数据的兴起,大数据核心框架Spark和Kafka也受到到广泛关注, Spark底层是Scala实现的, 因此也带火了Scala语言,目前Scala被全球知名公司(如:谷歌、百度、阿里、Twitter、京东等)广泛用于Spark开发。新一代的物联网时代到来,会对大数据应用人才的需求越加紧迫。 尚硅谷网罗和整合了学员很喜爱的师资,打造出专注于Spark开发的Scala课程,我们讲解Scala核心编程技术,同时也讲解编程思想、设计模式和Scala底层实现,让您有豁然开朗的感受。 二、课程内容和目标 本课程重点讲解Scala核心编程,内容包括: Scala语言概述、运算符、程序流程控制、数据结构之集合、Map映射、过滤、化简、折叠、扫描、拉链、视图、并行集合、高阶函数、函数柯里化、偏函数、参数推断、控制抽象、Trait、面向对象编程、异常处理、惰性函数、Akka及Actor模型、Spark Master和Worker通讯、隐式转换、隐式参数、工厂模式、单例模式、观察者模式、装饰者模式、代理模式、泛型、上下界、视图界定、上下文界定、协变逆变不变和源码剖析。通过系统全面的学习,学员能掌握Scala编程思想和Scala底层机制,为进一步学习Spark打下坚实基础。 三、谁适合学 1.希望以较低的投入和风险,来了解自己是否适合转型从事Spark开发的求职人员。 2.有一定的Java基础,或者自学过一些Java书籍与视频资料,想系统掌握Scala语言的程序员。

    3716 人正在学习 去看看 缪传海

前言:这两个月来一直在处理接手实验室师兄的一个图书推荐项目,期间从读懂其用python构建的简易推荐系统到在spark上写pyspark、scala程序来实现一个基于大数据平台的分布式推荐系统,对于我这样一个无人指点的小白着实是费了一番功夫,现在做记录如下。

一、在spark分布式平台运到的坑

1、 如何在spark ui上监听到spark的历史运行记录

利用spark UI 调试和监控运行的spark程序非常的方便,它可以让我们很直观的看到正在运行的stages、某stage运行的时间和executors的运行状况等,但有个比较不便的是没用经过特殊设置,spark UI 在spark程序运行完就不能再连接上了。市场上常见的spark书籍在教搭建spark平台时都不会教怎么配置spark环境变量以使我们可以看到spark平台运行的历史记录。在网上进行搜索一番后发现如何配置环境也都是众说纷纭,不同的人配置的方式也不用。
经过一番比较后,我配置spark的环境变量以使其可以看到spark的历史运行记录的方式如下:
在环境变量文件spark-defaults.conf中添加如下几行:

spark.history.ui.port=Master:18080   //设置监听历史记录的端口号为18080
spark.eventLog.enabled=true   //设置为ture,表示设置记录Spark事件,用于应用程序在完成后重构webUI
spark.eventLog.dir=hdfs://Master:9000/tmp/spark/events   //设置hdfs缓存历史的任务目录,目录应在hdfs中相应的创建好
spark.history.retainedApplications=30  //设置可查看的最大历史记录数

在环境变量文件spark-env.sh中添加如下几行:

export SPARK_HISTORY_OPTS="-Dspark.history.ui.port=18080 -Dspark.history.retainedApplications=3 -Dspark.history.fs.logDirectory=hdfs://Master:9000/tmp/spark/events" //设置后相应hdfs目录需相应的创建好

参考连接官方文档博客1
在上述两个环境变量文件中添加好相应代码后,在启动spark集群时同时打开spark-history-server.sh脚本即可在spark UI中监听到spark 的历史运行记录了。

2、 用spark-submit提交pyspark代码没有按预期运行分布式模式

异常描述: 分布式平台(hadoop+spark)配置好后,我在spark-shell master spark://Mastter:7070入口进入交互式环境下和以spark-submit为入口在yarn集群和spark standalone 集群模式上提交sbt打包的scala程序都是以运行分布式模式运行程序的。但在spark-submit为入口,按书上的提示在yarn集群和spark standalone 集群模式上提交pyspark.py程序欲使其运行分布式时就出现了问题,在运行记录中看到的是提交的pyspark代码是以localhost模式形式运行的,也就是说跑的是单机。这个问题困扰我两天,因为我的spark和hadoop是照着几本书配置的,也参考了师兄配置的集群且提交打包好的jar程序运行时并无异常。期间在网上查阅许久,也把spark和hadoop的配置文件检查了数遍,及在一些技术群里询问但都没有得到答案,最后问题的解决是来自于一个无意的尝试。平常在spark-submit提交的pyspark程序的格式是 “spark-submit /opt/data/LibraryProject/programSet/base_user_submit.py(程序名) --master yarn --deploy-mode client --queue default --executor-memory 5g --num-executors 3 “后来改为“spark-submit --master yarn --deploy-mode client --queue default --executor-memory 4g --num-executors 3 --executor-cores 4 --driver-memory 1G /opt/data/LibraryProject/programSet/base_user_submit.py”即将代码的的位置放在提交指令的最后一栏就可以如愿在yarn上运行分布式程序了。
  异常分析:经过分析后发现,spark-submit提交方式应该是读到所提交的程序文件就不会再读后面如–master yarn --deploy-mode client…等提交的参数设置了,而我所提交的程序中在创建spark实例的时候刚好没有进行任何给参数的操作(我创建spark的方式是spark=SparkSession(context),context=SparkContext()),所以出现这样spark平台即以默认提交方式localhost形式运行程序了。这真的是一个大坑,现阶段介绍pyspark的书比较少,且介绍的较浅,尽管python在越来越多的应用到编写spark程序实现分布式任务当中。

3、spark在执行程序中出现too much file open 的错误

写在我的另一个博客中
Uncaught exception while reverting partial writes to file …(Too many open files)

4、运行pyspark程序运到python版本问题

运行错误警告

问题分析:因为在集群中存在python官网中下载安装的python和安装的anaconda带的python两个版本,程序在执行时不知道使用哪个版本python的解释器。
  
  问题解决:
在环境变量文件spark-env.sh中添加如下设置:

export PYSPARK_PYTHON=/opt/app/anaconda2/bin/python

二、 基于读者与读者的图书推荐实现

1、基于用户的协同过滤的基本思想是:根据用户对物品的爱好找到相邻邻居用户,然后将邻居用户偏爱的物品中但是当前用户又没有购买的推荐给他。它的原理就是将某个用户对全部物品的爱好作为一个向量,计算出各个用户之间的相似度,找到邻居后,依据邻居们的相似度及购物历史,预测当前用户可能会喜欢的但尚未购买的物品,计算得出按照一定排列顺序的物品推荐类表给用户。
2、基于用户的算法在很多方面有着非常成功的运用,但是随着推荐系统规模的日益扩充,特别是用户数和项目数的日益增多,该算法存在的弱点开始体现出来了。
a、稀疏性问题。据研究结果表明,当用户评价项目数少于总项目数的,就很容易造成评价矩阵数据相当稀疏,导致算法难以找到一个用户的偏好相似邻居。
b、冷启动问题。基于用户协同过滤是建立在有大量用户对某个产品的评价上的,由于在新产品开始阶段没有人购买,也没有对其进行评价,那么在开始阶段也将无法对其进行推荐。
c、算法扩展性问题。随着物品数尤其是用户数的剧烈增加,最近邻居算法的计算量也相应增加,所以不太适合数据量大的情况使用,所以推荐系统性能也会大大受影响,而现在的推荐系统几乎是结构,没有快速的相应速度,对网络用户来说无法忍受的,因此这在某种程度上限制了基于用户协同过滤在推荐系统中的使用。
 d、特殊用户问题。在生活中,往往有一部分人的偏好是比较特殊,他没有相对固定的兴趣爱好,而这刚好是基于用户协同过滤的前提,那么系统很难为他找出邻居,也就是很难给出比较精确的推荐信息了。
3、基于读者与读者的协同过滤如何实现
a、构建读者-图书评价矩阵
b、利用余弦相似度计算公式,以每个读者的借阅评分向量来计算不同用户之间的相似度。
c、制定推荐策略,为每一位读者推荐图书
读者-图书评价矩阵

4、在实现过程中遇到的理论问题
  图书馆没有对图书进行借阅打分的机制,因而在构建用户-图书评价矩阵时缺失读者借阅评分数据,只能以简单的是否借阅某书来代替读者对某书的评分。其中借阅过表示为1,代表完全满意;没有借阅过表示为0,代表完全不满意。这种无可奈何的办法会导致不能准确计算读者的相似度,且容易造成推荐热门借阅图书,难以对读者进行个性化推荐。所以如果想要实现对读者进行个性化推荐,后期图书馆应该制定一个可以让读者对其所借阅的图书进行评分的机制。

三、 代码中觉得值得记录的操作

1、读取csv文件并将其转化为dataframe的操作指令

user_book=spark.read.csv('logSet/user_book.csv',inferSchema='true',header="true")//pyspark版,读取时设置为自动推断列的类型,取第一列为列名
var user_book_list=spark.read.format("csv").option("header","true").load("dataSet/user_book_id.csv")//scala版本,读取时设置为自动推断列的类型,取第一列为列名

2、保存datafram到hdfs(df.repartition(1)表示将数据聚合在一个分区上进行保存,这样保存的数据不会别切分成多个)

book_list.write.repartition(1).csv('test_book.csv',header='true',mode='overwrite')//pyspark版,保存时时设置未取第一列为列名,如有相同文件名则进行覆盖
user_sim.repartition(1).write.format("csv").option("header","true").option("mode","overwrite").save("spark_result/user_Sim3.csv")scala版,保存时时设置未取第一列为列名,如有相同文件名则进行覆盖

3、创建dataframe的几种方式
pyspark版

schema = StructType([StructField("user_id", IntegerType(), True), StructField("sim_id", IntegerType(), True),StructField("sim", FloatType(), True)])
df=spark.createDataFrame([(user_id,i,sim_values)],schema)

scala版
a、创建带制定格式的dataframe

case class userSim(user_id:String,sim_id:String,sim_value:Double)
var df_tmp = List(userSim(user_id,user_i,sim_value))
var user_sim=sc.parallelize(df_tmp).toDF()
或者:
var list_tmp = List(userSim(user_book_cv_list(0)(0).toString,  user_book_cv_list(0)(0).toString, 1.0)).toBuffer//预设df的格式
list_tmp += userSim(user_book_cv_list(i)(0).toString,  user_book_cv_list(j)(0).toString, cosineSim)
val user_sim = sc.parallelize(list_tmp).toDF()//将得出的读者相似度list创建为df(user_id,sim_id,sim_value)

b、不带格式的dataframe

val df = Seq(("00",Array("机器学习","深度学习","spark机器学习"))).toDF("user_id","userName")
var arr =("大数据导论","快速大数据分析")
val df4 = Seq(("02",Array(arr))).toDF("user_id","userName")

4、合并两个dataframe(在代码实现过程的用到多次,很好用)

user_book=user_book.join(book_id,"bookName","left").cache()//pyspark版,以两个df都有的col(bookName)进行合并,合并模式是"left",数据向df user_book倾斜
var user_book_list=user_book.join(book_list,$"bookName"===$"bookName1","left")//scala版
var user_sim_list = sim_list.join(all_user,sim_list("sim_id")===all_user("user_id"),"inner")//scala版,合并两个df中col(sim_id)===col(user_id)相同的值,,join的模式是"inner",注意用的是三个“=”

5、将某列值相同的标签聚合到同一行

var all_user = all_user.withColumn("bookName1", $"bookName").groupBy("user_id").agg(collect_list($"bookName1")).toDF("user_id","bookName").persist(StorageLevel.MEMORY_ONLY)/以读者ID聚合,得到一个型为(user_id,Array(user_all_book))的dataframe

6、删除col(1)中包含col(0)中具有的值(这个在实现对每个读者进行最终推荐图书时非常好用,而且实现起来很简单)

//自定义一个函数
val filterList = udf((a: Seq[String], b: Seq[String]) => a diff b)
//在col(recomBook)去除掉user_id用户借阅过的书籍
var user_sim_recomBook=user_sim_book.withColumn("left", filterList($"recomBook", $"user_bookName")).show()

7、构建读者-评分矩阵,(这里用了一个取巧的方法–调用分词函数,处理起来也很快,120万行的数据只需要几分钟。实验室用同学写过类似的代码,借鉴了一部分。不过这里可以这么用刚好是因为除处理的图书没有评分数据,只能以借阅与否代表满意度,否则不能直接用下示方法)

import org.apache.spark.ml.feature.CountVectorizer
import org.apache.spark.storage.StorageLevel

var user_book_list=spark.read.format("csv").option("header","true").load("dataSet/user_book_id.csv").select("id","bookName","book_id").toDF("user_id","bookName","book_id")//加载全部读者的图书借阅记录
user_book_list.persist(StorageLevel.MEMORY_ONLY)//相当于调用cache()函数
var all_user=user_book_list.select("user_id","bookName").cache()//取用户名和用户借阅书名
val user_book = all_user.withColumn("bookName1", $"bookName").groupBy("user_id").agg(collect_list($"bookName1")).toDF("user_id","bookName").cache()//以读者ID聚合,得到一个型为(user_id,Array(user_all_book))的dataframe
val book_cv = new CountVectorizer().setInputCol("bookName").setOutputCol("TitleVectors")//以待分词的列为输入创建分词实例
val bookmodel = book_cv.fit(user_book)//调用fit实例
val user_book_cv = bookmodel.transform(user_book)//得到形为(user_id,bookName,TitleVectors)的dataframe,其中的col(TitleVectors)为每个读者的读者-评分矩阵

四、低效率编写代码的体现(spark平台用的好的人可以实现快速计算数以Tb的数据,但代码没有优化好,写出来的程序可能还不如用pyython单机来的快,这一点自己深有体会,以下是自己遇到的一些坑)

1、不会学会看spark官方文档。
2、重复使用active操作,特别是在大的循环里。
3、不会为重复调用的rdd或者df设置缓存。
4、想实现某一功能,特别是spark官方文档没有直接写到的,不是先googel而是自己硬着写。
5、写代码之前没有提前都构思好,想到什么写什么,写到后面才发现方向错了。
6、能用scala实现的功能要用pyspark实现。官方指出,pyspark代码执行的速度比scala慢一倍(rdd)。

五、项目处理过程中查阅的觉得不错的博客

1、How to create an empty DataFrame? Why “ValueError: RDD is empty”?
2、How to create an empty DataFrame with a specified schema?
3、Spark DataFrame按某列降序排序](https://blog.csdn.net/dkl12/article/details/80961981)
4、Python pyspark.sql.SparkSession() Examples
5、Merging multiple data frames row-wise in PySpark
6、Extract column values of Dataframe as List in Apache Spark
7、spark - Converting dataframe to list improving performance
8、How do I add a new column to a Spark DataFrame (using PySpark)?
9、PySpark 学习笔记三
10、spark dataframe 一列分隔多列,一列分隔多行(scala)
11、PySpark︱DataFrame操作指南:增/删/改/查/合并/统计与数据处理
12、Trying to create dataframe with two columns [Seq(), String] - Spark
13、how to convert rows into columns in spark dataframe, scala
14、Difference between columns of ArrayType in dataframe
15、python—pandas.merge使用

2019-03-13 10:08:15 keyue123 阅读数 817
  • 大数据—Scala

    一、Scala核心编程课程简介 近年来随着大数据的兴起,大数据核心框架Spark和Kafka也受到到广泛关注, Spark底层是Scala实现的, 因此也带火了Scala语言,目前Scala被全球知名公司(如:谷歌、百度、阿里、Twitter、京东等)广泛用于Spark开发。新一代的物联网时代到来,会对大数据应用人才的需求越加紧迫。 尚硅谷网罗和整合了学员很喜爱的师资,打造出专注于Spark开发的Scala课程,我们讲解Scala核心编程技术,同时也讲解编程思想、设计模式和Scala底层实现,让您有豁然开朗的感受。 二、课程内容和目标 本课程重点讲解Scala核心编程,内容包括: Scala语言概述、运算符、程序流程控制、数据结构之集合、Map映射、过滤、化简、折叠、扫描、拉链、视图、并行集合、高阶函数、函数柯里化、偏函数、参数推断、控制抽象、Trait、面向对象编程、异常处理、惰性函数、Akka及Actor模型、Spark Master和Worker通讯、隐式转换、隐式参数、工厂模式、单例模式、观察者模式、装饰者模式、代理模式、泛型、上下界、视图界定、上下文界定、协变逆变不变和源码剖析。通过系统全面的学习,学员能掌握Scala编程思想和Scala底层机制,为进一步学习Spark打下坚实基础。 三、谁适合学 1.希望以较低的投入和风险,来了解自己是否适合转型从事Spark开发的求职人员。 2.有一定的Java基础,或者自学过一些Java书籍与视频资料,想系统掌握Scala语言的程序员。

    3716 人正在学习 去看看 缪传海

  在《推荐系统》中,详细的介绍了常用的推荐系统,大家基本上对推荐系统有了一个认识。这里我就简单的讲述一下基于spark的推荐系统。
  spark有着处理速度快,容易使用,而且可以和很多数据库融合的优势,所以在大数据分析中经常使用。具体的介绍我就不在这里赘述了,有兴趣的童鞋可以去看看《Spark快速大数据分析》,这本书详细的介绍了spark的基础知识和使用方法,是我学过spark中讲解最清晰和最容易入门的书籍,下面我们就开始推荐系统的介绍了。
  下面使用的是MovieLens数据集,由于是个人练习,我使用的是 1M 数据集 ml-latest-small.zip 。spark 版本为 2.4.0。

创建spark对象

	import findspark
	findspark.init("/opt/spark", edit_rc=True)   
	
	from pyspark import SparkContext
	
	sc = SparkContext("local", "john-spark")

  findspark 引入 pyspark 依赖,/opt/spark 为我的 spark 安装路径。
  local: 本地运行
  john-spark:应用名

数据准备

  1. 导入数据
	text = sc.textFile("./data/ml-latest-small/ratings.csv")
	text = text .filter(lambda x: "movieId" not in x)    # 去除标题行

  导入数据,我们也可以熟悉一下数据:

	text.count()      # 查看数据数量
	text.take(5)      # 查看前5条数据

  数据格式为:

	['1,1,4.0,964982703',
	 '1,3,4.0,964981247',
	 '1,6,4.0,964982224',
	 '1,47,5.0,964983815',
	 '1,50,5.0,964982931']
  1. 数据处理
    spark主要有四种数据类型:Vector(向量),LabeledPoint(数据点,主要用在分类回归这些算法中)、Rating(评分,用于推荐)、model(模型,训练结果)。我们这里使用的是Rating,Rating定义如下:
    Rating(user,product,product)Rating(user, product, product)

  所以我们这里只需要使用前三列:userId,movieId,rating。

	movieRatings = text.map(lambda x: x.split(",")[:3])   # 取出前三列
	movieRatings.take(5)

  不熟悉命令的可以查看《spark快速大数据分析》第四章:RDD 编程。取出来的结果如下所示:

	[['1', '1', '4.0'],
	 ['1', '3', '4.0'],
	 ['1', '6', '4.0'],
	 ['1', '47', '5.0'],
	 ['1', '50', '5.0']]

  根据前三列的userID, movieID, rate,我们可以获得电影的一些信息

	movieRatings.map(lambda x: x[0]).distinct().count()  # 用户数量
	movieRatings.map(lambda x: x[1]).distinct().count()  # 电影数量

  可轻松分别获得在这个数据集中有用户610个,电影9724部。

模型训练

  有了数据之后,我们就可以开始训练推荐的模型,在 spark 的机器学习库 MLlib 中有直接训练推荐模型的库,具体可以参考 pyspark 的官方API,我们这里主要使用的是 ALS 模块,导入方式为:

	from pyspark.mllib.recommendation import ALS

  我们将使用ALS.train训练,ALS.train可以分为显性训练和隐性训练
  显性训练:
ALS.train(ratings,rank,iterations,lambda)ALS.train(ratings, rank, iterations, lambda)
  隐性训练:
ALS.trainImplicit(ratings,rank,iterations,lambda,alpha)ALS.trainImplicit(ratings, rank, iterations, lambda, alpha)
  ratings 为训练数据,格式为Rating,rank为矩阵分解时的中间数,A(m,n)X(m,rank)Y(rank,n)A(m, n) ≈ X(m, rank)*Y(rank, n),iterations 重复计算次数,默认值为5,lambda 默认值为0.01。

	model = ALS.train(movieRatings, 10, 10, 0.01)   # 模型训练

推荐

  1. 针对用户推荐
    ALS针对用户,可以使用model.recommendProducts(user: int, num: int) ,其中user为用户ID,num 为推荐数量。
	model.recommendProducts(1, 5)  # 为用户 1 推荐 5 部电影

  推荐结果:

	[Rating(user=1, product=3036, rating=6.581469154740313),
	 Rating(user=1, product=52885, rating=6.231709580290815),
	 Rating(user=1, product=3477, rating=6.128453453362306),
	 Rating(user=1, product=3635, rating=6.080639765555475),
	 Rating(user=1, product=3836, rating=6.0476067796079995)]
  1. 这对电影推荐
    ALS针对用户,可以使用model.recommendUsers(product: int, num: int) ,其中product为电影ID,num 为推荐数量。
	model.recommendUsers(7034, 5)  # 将电影 7034 推送给 5 个用户

  推荐结果:

	[Rating(user=433, product=7034, rating=9.687131642078953),
	 Rating(user=258, product=7034, rating=9.22972547340868),
	 Rating(user=236, product=7034, rating=9.027718039618472),
	 Rating(user=549, product=7034, rating=8.945649337741106),
	 Rating(user=197, product=7034, rating=8.85388839846953)]

电影名显示

上面推荐的都是id,我们无法知道id对应的是哪一个用户和哪一部电影,电影名存放在 movies.csv 这份文件中,我们看看数据:

	movies = sc.textFile("./data/ml-latest-small/movies.csv")
	movies = movies.filter(lambda x: "movieId" not in x)   # 去除标题行
	
	movies.count()
	movies.take(5)

  数据显示:

	9743
	['1,Toy Story (1995),Adventure|Animation|Children|Comedy|Fantasy',
	 '2,Jumanji (1995),Adventure|Children|Fantasy',
	 '3,Grumpier Old Men (1995),Comedy|Romance',
	 '4,Waiting to Exhale (1995),Comedy|Drama|Romance',
	 '5,Father of the Bride Part II (1995),Comedy']

  数据为movieId, title, genres。我们只需要id和title组成字典就可以将它们一一对应。

	movieTitle = movies.map(lambda x: x.split(",")[: 2]).collectAsMap()# 创建字典

可以看看对应的结果:

  重新推荐:

	recommandP = model.recommendProducts(1, 5)  # 为用户推荐电影
	for p in recommandP:
	    print("为用户", str(p[0]), "推荐电影《", movieTitle[str(p[1])], "》,推荐评分:", p[2])
	    
	print("------------------------------------------------------------------------------------------")
	
	recommandP = model.recommendUsers(1, 5)  # 为用户推荐电影
	for p in recommandP:
	    print("为用户", str(p[0]), "推荐电影《", movieTitle[str(p[1])], "》,推荐评分:", p[2])

  推荐结果:

	为用户 1 推荐电影《 Stranger Than Paradise (1984) 》,推荐评分: 7.409966775717861
	为用户 1 推荐电影《 Burnt by the Sun (Utomlyonnye solntsem) (1994) 》,推荐评分: 7.162750014751613
	为用户 1 推荐电影《 "World's End 》,推荐评分: 7.009930175524299
	为用户 1 推荐电影《 Barcelona (1994) 》,推荐评分: 6.876207011935259
	为用户 1 推荐电影《 Antonia's Line (Antonia) (1995) 》,推荐评分: 6.849347899803536
	------------------------------------------------------------------------------------------
	为用户 99 推荐电影《 Toy Story (1995) 》,推荐评分: 5.7616037185959375
	为用户 267 推荐电影《 Toy Story (1995) 》,推荐评分: 5.399197893665546
	为用户 498 推荐电影《 Toy Story (1995) 》,推荐评分: 5.3928558642123905
	为用户 35 推荐电影《 Toy Story (1995) 》,推荐评分: 5.3299243033839465
	为用户 558 推荐电影《 Toy Story (1995) 》,推荐评分: 5.284973512896739

spark读书笔记

阅读数 537

没有更多推荐了,返回首页