2017-08-19 21:18:13 qq_33813365 阅读数 3935
  • Spark快速大数据处理

    课程的主要内容包括: 1.ZooKeeper-分布式过程协同组件 2.Hadoop3-大数据基础组件 3.Tez-Yarn底层计算引擎 4.Hive3-大数据仓库 5.Spark2实时大数据处理 6.Oozie5-大数据流程引擎 课程特点: 1.最新API: Hadoop3/Spark2/Hive3/Oozie5 2.手工搭建集群环境:编译+搭建 3.配套资源:分阶段镜像+课件+安装资源,其中安装资源包括案例源码、脚本等 4.案例为主:分模块案例+天池数据分析竞赛 5.故障教学 6.完整实战项目:天池数据分析

    12319 人正在学习 去看看 余海峰

——8.16开始整理
Spark快速大数据分析

推荐序:
一套大数据解决方案通常包含多个组件,从存储、计算和网络硬件层,到数据处理引擎,再到利用改良的统计和计算算法、数据可视化来获得商业洞见的分析层,这其中数据处理引擎起到了十分重要的作用,毫不夸张的说数据处理引擎至于大数据就相当于CPU之于计算机

spark起源:
2009年加州大学伯克利分校AMPlab 创立spark大数据处理和计算框架。不同于传统数据处理框架,spark基于内存的基本类型,为一些应用程序带来了100倍的性能提升。spark允许允许应用将数据加载到集群内存中反复查询,非擦汗那个适合于大数据处理和机器学习

spark发展:
spark已超越spark核心,发展到了spark streaming、sql、MLlib、GraphX、sparkR等模块,企业、交通、医疗、零售,推进商业洞见,加速决策;
作为MapReduce的继承者,spark主要有三大优点:1.spark非常好用,由于高级API剥离了对集群本身的关注,只关注任务实现的逻辑。2.spark很快,支持交互使用和复杂算法。3.spark是通用引擎,可以用它来完成各种各样的运算,包括SQL查询、文本处理、机器学习

第一章:spark数据分析导论
1.1 spark是什么
快速通用集群计算平台

spark扩展了mapreduce计算模型,高效的支持更多的计算模式,包括交互式查询和流处理(在处理大规模数据集时,速度非常重要,速度快就意味着我们可以进行交互式的数据操作),能够在内存中进行计算(不过就算必须在磁盘中进行复杂计算,spark依然比mapreduce更加高效)

spark适用于各种各样原来需要多种不同分布式平台的场景,包括批处理、迭代算法、交互式查询、流处理。通过在这一个统一的框架下支持这些不同类型的计算,spark是我们可以简单而低耗地把各种处理流程整合到一起。而这样的组合在实际开发中很有意义,进而减轻原来需要对各种平台分别管理的负担

spark可以运行在hadoop集群上,访问hadoop上任意数据源

1.2一个大一统的软件栈

spark的核心是一个对有许多计算任务组成的、运行在多个工作机器或者是一个计算集群上的应用进行调度、分发以及监控的计算引擎。该引擎支持多种不同应用场景专门设计的高级组件,例如 spark streaming、saprk sql、MLlib、GraphX ,这些组件关系密切并且可以相互调用,这样就可像平常软件项目中使用程序库那样,组合使用这些组件

各组间密切集成于spark平台的优点:
都可以从下层改进中受益(当spark核心引擎引入一个优化,sql和机器学习程序库都能自动获得性能提升)
运行整个软件栈的代价变小了(不需要同时运行支持多种不同应用场景所需的多个软件系统,只需在spark上调用各种库)这些代价包括系统的部署、维护、测试、支持、升级
能够无缝整合不同处理模型的应用

spark软件栈:独立调度器/YARN/Mesos——>spark core ——> spark sql/spark streaming/MLlib/GraphX

1.2.1 Spark Core
Spark Core 上实现了Spark的基本功能,包含任务调度、内存管理、错误恢复、与存储系统交互等模块。Spark Core还包含了对弹性分布式数据集的API定义。RDD表示分布在多个节点可并并行操作的元素集合,是spark主要的编程对象。spark core提供了创建和操作这些集合的多个API

1.2.2 Spark SQL
Spark SQL 是spark用来操作结构化数据的程序包,可以使用HiveQL来查询数据。Spark SQL支持多种数据源,比如 Hive表、Parquest以及Json。saprk SQL还支持开发者将SQL和传统的RDD编程数据操作方式结合(python、java、scala皆有相应程序包)

1.2.3 Spark streaming
Spark提供的对流式数据进行流式计算的组件。比如网页服务器日志或者网络服务中用户提交的状态更新组成的消息队列,都是数据流 可操作磁盘、内存、实时数据流

1.2.4 MLlib
Spark还提供一个机器学习的程序库,MLlib,提供了许多种机器学习的算法,包括分类、回归、聚类、协同过滤以及模型评估和数据导入等功能

1.2.5 GraphX
操作图的程序库,例如 社交网络的朋友关系图,可以进行并行的图计算以及常用的图算法

1.2.6 集群管理器
底层而言,spark设计为高效的在一个计算节点到数千个计算节点伸缩计算,spark支持多种集群管理器(Hadoop YARN、Apache Mesos),以及spark自带的独立调度器

第二章:spark下载与入门
spark 可以通过Python、java或者Scala使用
spark 本身是scala编写的,运行在java虚拟机(为什么要使用java虚拟机?java语言一个非常重要特点就是与平台无关性。而使用java虚拟机是实现这这一特点的关键。一般的高级语言如果在不同的平台运行,至少需要编译成不同的目标代码。而引入java虚拟机后java语言在不同平台与性是不需要重新编译。java语言使用java虚拟机屏蔽了具体平台的不同,使得java语言编译程序只需生成在java虚拟机上运行的字节码文件,就可以在不同平台不需修改的运行),所以需要集群都安装java环境,要是使用python接口,还需要一个python解释器

ps:.tgz(.tar.gz)

spark有python shell 和 scala版的shell

第三章:RDD编程
简介:
Spark对数据的核心抽象——弹性分布式数据集(Resilient Distributed Dataset)。RDD实际上是分布式的元素集合。在Spark中,对数据的所有操作不外乎创建RDD、转化已有RDD以及调用RDD操作进行求值,而在一切的背后,Spark会自动将RDD中的数据分发到集群中,并将操作并行执行

3.1 RDD基础
Spark中的RDD就是一个不可变的分布式对象集合。每个RDD被分成多个分区,这些分区运行在集群中不同节点
从外部数据创建出输入RDD
使用诸如filter()这样的转化操作对RDD进行转化,以定义新的RDD
对需要被重用的中间结果RDD执行persist()操作
使用行动操作(count()、first())来触发一次并行操作,spark会对计算优化后执行

3.2创建RDD
创建RDD两种方式:读取外部数据、驱动器中对一个集合进行转化

3.3 RDD操作
RDD支持两种操作:转化操作、行动操作。RDD的转化操作返回的是一个新的RDD,比如map()、filter(),而行动操作向驱动器系统返回结果或者把结果写到外部系统的操作,会触发实际的计算,比如 count()、first()。

3.3.1 转化操作

通过转化操作,从已有的RDD派生出新的RDD,spark会用 谱系图 来记录这些不同RDD的依赖关系。spark需要用这些信息计算每个RDD,可以依靠系谱图在持久化RDD丢失数据时恢复所丢失数据

inputRDD——》errorsRDD/warningRDD——》badlinesRDD

3.3.2 行动操作
转化操作是惰性操作,只有行动操作需要生成实际的输出,会强制执行用的RDD执行转化操作
take()取RDD少量元素,collect()函数可以用来获取整个RDD中的元素,不需注意的是,必须确定整个RDD元素能在单台内存中装的下,因此collect()不能用在大规模数据集
需要注意的是:每当我们调用一个新的行动操作时,整个RDD都会重新开始计算,所以要将中间结果持久化

3.3.3 惰性求值
RDD的转化操作是惰性的,RDD依赖关系由spark系谱图管理记录,当执行操作必须用到指定RDD时才强制执行生成该RDD的转化操作
spark用惰性求值,这样就可以将一些操作和起来来减少计算数据的步骤

2019-03-15 11:31:29 shenmanli 阅读数 279
  • Spark快速大数据处理

    课程的主要内容包括: 1.ZooKeeper-分布式过程协同组件 2.Hadoop3-大数据基础组件 3.Tez-Yarn底层计算引擎 4.Hive3-大数据仓库 5.Spark2实时大数据处理 6.Oozie5-大数据流程引擎 课程特点: 1.最新API: Hadoop3/Spark2/Hive3/Oozie5 2.手工搭建集群环境:编译+搭建 3.配套资源:分阶段镜像+课件+安装资源,其中安装资源包括案例源码、脚本等 4.案例为主:分模块案例+天池数据分析竞赛 5.故障教学 6.完整实战项目:天池数据分析

    12319 人正在学习 去看看 余海峰

Spark与Hadoop大数据分析比较系统地讲解了利用Hadoop和Spark及其生态系统里的一系列工具进行大数据分析的方法,既涵盖ApacheSpark和Hadoop的基础知识,又深入探讨所有Spark组件——SparkCore、SparkSQL、DataFrame、DataSet、普通流、结构化流、MLlib、Graphx,以及Hadoop的核心组件(HDFS、MapReduce和Yarn)等,并配套详细的实现示例,是快速掌握大数据分析基础架构及其实施方法的详实参考。

全书共10章,第1章从宏观的角度讲解大数据分析的概念,并介绍在Hadoop和Spark平台上使用的工具和技术,以及一些*常见的用例;第2章介绍Hadoop和Spark平台的基础知识;第3章深入探讨并学习Spark;第4章主要介绍DataSourcesAPI、DataFrameAPI和新的DatasetAPI;第5章讲解如何用SparkStreaming进行实时分析;第6章介绍Spark和Hadoop配套的笔记本和数据流;第7章讲解Spark和Hadoop上的机器学习技术;第8章介绍如何构建推荐系统;第9章介绍如何使用GraphX进行图分析;第10章介绍如何使用SparkR。

目录:

第1章 从宏观视角看大数据分析··········1

1.1 大数据分析以及Hadoop和Spark

在其中承担的角色····························3

1.1.1 典型大攻据分析项目的

生名周期.....................4

1.1.2 Hadoop中Spark承担的角色·············6

1.2 大数据札学以及Hadoop和

Spark在其中承扣的角色…………6

1.2.1 从数据分析到数据科学的

根本性转变···························6

1.2.2 典型数据科学项目的生命周期··········8

1.2.3 Hadoop和Spark承担的角色·················9

1.3 工具和技术··························9

1.4 实际环境中的用例·············11

1.5 小结········································12

第2章 Apache Hadoop和ApacheSpark 入门····13

2.1 Apache Hadoop概述..…………13

2.1.1 Hadoop分布式文件系统····14

2.1.2 HDFS的特性·······························15

2.1.3 MapReduce··························16

2.1.4 MapReduce的特性······················17

2.1.5 MapReduce v 1与

MapRcduce v2 对比······················17

2.1.6 YARN··································18

2.1.7 Hadoop上的存储选择······················20

2.2 Apache Spark概述···························24

2.2.1 Spark的发展历史······················24

2.2.2 Apache Spark是什么······················25

2.2.3 Apache Spark不是什么·······26

2.2.4 MapReduce的问题······················27

2.2.5 Spark的架构························28

2.3 为何把Hadoop和Spark结合使用·······31

2.3.1 Hadoop的持性······················31

2.3.2 Spark的特性·······························31

2.4 安装Hadoop和Spark集群···············33

2.5 小结··················································36

第3章 深入剖析Apache Spark ··········37

3.1 启动Spark守护进程·······························37

3.1.1 使用CDH ····························38

3.1.2 使用HDP 、MapR和Spark预制软件包··············38

3.2 学习Spark的核心概念························39

3.2.1 使用Spark的方法.··························39

3.2.2 弹性分布式数据集······················41

3.2.3 Spark环境································13

3.2.4 变换和动作..........................44

3.2.5 ROD中的并行度·························46

3.2.6 延迟评估·······························49

3.2.7 谱系图··································50

3.2.8 序列化·································51

3.2.9 在Spark 中利用Hadoop文件格式····52

3.2.10 数据的本地性··················53

3.2.11 共享变量........................... 54

3.2.12 键值对RDD ······················55

3.3 Spark 程序的生命周期………………55

3.3.1 流水线............................... 57

3.3.2 Spark执行的摘要....………58

3.4 Spark应用程序······························59

3.4.1 Spark Shell和Spark应用程序·········59

3.4.2 创建Spark环境…….............59

3.4.3 SparkConf·························59

3.4.4 SparkSubmit ························60

3.4.5 Spark 配置项的优先顺序····61

3.4.6 重要的应用程序配置··········61

3. 5 持久化与缓存··························62

3.5.1 存储级别............................. 62

3.5.2 应该选择哪个存储级别·····63

3.6 Spark 资源管理器: Standalone 、

YARN和Mesos·······························63

3.6.1 本地和集群模式··················63

3.6.2 集群资源管理器························64

3.7 小结·················································67

第4章 利用Spark SQL 、DataFrame

和Dataset 进行大数据分析····················69

4.1 Spark SQL的发展史····························70

4.2 Spark SQL的架构·······················71

4.3 介绍Spark SQL的四个组件················72

4.4 DataFrame和Dataset的演变············74

4.4.1 ROD 有什么问题····························74

4.4.2 ROD 变换与Dataset和

DataFramc 变换....................75

4.5 为什么要使用Dataset和Dataframe·····75

4.5.1 优化·····································76

4.5.2 速度·····································76

4.5.3 自动模式发现························77

4.5.4 多数据源,多种编程语言··················77

4.5.5 ROD和其包API之间的互操作性.......77

4.5.6 仅选择和读取为要的数据···········78

4.6 何时使用ROD 、Dataset

和DataFrame·············78

4.7 利用DataFraine进行分析.......……78

4.7.1 创建SparkSession …………...79

4.7.2 创建DataFrame·····························79

4.7.3 把DataFrame转换为RDD·············82

4.7.4 常用的Dataset DataFrame操作······83

4.7.5 缓存数据··································84

4.7.6 性能优化·····························84

4.8 利用DatasetAPl进行分析················85

4.8.1 创建Dataset·····························85

4.8.2 把Dataframe转换为Dataset····86

4.8.3 利用数据字典访问元数据···············87

4.9 Data Sources API ............................87

4.9.1 读和写函数································88

4.9.2 内置数据库····································88

4.9.3 外部数据源··························93

4.10 把Spark SQL作为分布式SQL引擎····97

4.10.1 把Spark SQL的Thrift服务器

用于JDBC/ODBC访问............97

4.10.2 使用beeline客户端查询数据·········98

4.10.3 使用spark-sqI CLI从Hive查询数据....99

4.10.4 与BI工具集成··························100

4.11 Hive on Spark ...........................…100

4.12 小结..............................................100

第5章 利用Spark Streaming和Structured Streaming 进行

实时分析···102

5.1 实时处理概述··························103

5.1.1 Spark Streaming 的优缺点...104

5.1.2 Spark Strcruning的发展史····104

5.2 Spark Streaming的架构···············104

5.2.1 Spark Streaming应用程序流··········106

5.2.2 无状态和有状态的准处理·················107

5.3 Spark Streaming的变换和动作········109

5.3.1 union·································· 109

5.3.2 join···························109

5.3.3 transform操作··························109

5.3.4 updateStateByKey·····················109

5.3.5 mapWithState ····················110

5.3.6 窗口操作······ ·····················110

5.3.7 输出操作........................... 1 11

5.4 输人数据源和输出存储·············111

5.4.1 基本数据源·······112

5.4.2 高级数据源····················112

5.4.3 自定义数据源.···················112

5.4.4 接收器的可靠性························ 112

5.4.5 输出存储··························113

5.5 使用Katlca和HBase的SparkStreaming···113

5.5.1 基于接收器的方法·······················114

5.5.2 直接方法(无接收器······················116

5.5.3 与HBase集成···························117

5.6 Spark Streaming的高级概念·········118

5.6.1 使用DataF rame······················118

5.6.2 MLlib操作·······················119

5.6.3 缓存/持久化·······················119

5.6.4 Spark Streaming中的容错机制······119

5.6.5 Spark Streaming应用程序的

性能调优············121

5.7 监控应用程序·······························122

5.8 结构化流概述································123

5.8.1 结构化流应用程序的工作流··········123

5.8.2 流式Dataset和流式DataFrame·····125

5.8.3 流式Dataset和流式

DataFrame的操作·················126

5.9 小结········································129

第6章 利用Spark 和Hadoop的

笔记本与数据流····················130

6.1 基下网络的笔记本概述·····················130

6.2 Jupyter概述..·························· 131

6.2.1 安装Jupyter···················132

6.2.2 用Jupyter进行分析···················134

6.3 Apache Zeppelin 概述····················· 135

6.3.1 Jupyter和Zeppelin对比····136

6.3.2 安装ApacheZeppelin···················137

6.3.3 使用Zeppelin进行兮析····139

6.4 Livy REST作业服务器和Hue笔记本····140

6.4.1 安装设置Livy服务器和Hue········141

6.4.2 使用Livy服务器····················1 42

6.4.3 Livy和Hue笔记本搭配使用·········145

6.4.4 Livy和Zeppelin搭配使用·············148

6.5 用于数据流的ApacheNiFi概述········148

6.5.1 安装ApacheNiFi··················148

6.5.2 把N iF1用干数据流和分析·····149

6.6 小结·····························152

第7章 利用Spark 和Hadoop 进行机器学习...153

7.1 机器学习概述........….................... 153

7.2 在Spark和Hadoop上进行机器学习.....154

7.3 机器学习算法··················155

7.3.1 有监督学习........…............. 156

7.3.2 无监督学习···················156

7.3.3 推荐系统…................…..... 157

7.3.4 特征提取和变换……...…157

7.3.5 优化...................................158

7.3.6 Spark MLlib的数据类型…158

7.4 机器学习算法示例·················160

7.5 构建机器学习流水线·················163

7.5.1 流水线工作流的一个示例···········163

7.5.2 构建一个ML流水线··················164

7.5.3 保存和加载模型··················166

7.6 利用H2O和Spark进行机器学习·····167

7.6.1 为什么使用SparklingWatcr······167

7.6.2 YARN上的一个应用程序流.........167

7 .6.3 Sparkling Water入门........168

7.7 Hivemall概述……..…………..169

7.8 Hivemall for Spark概述.. ……........170

7.9 小结······························170

第8章 利用Spark和Mahout构建推荐系统...171

8.1 构建推荐系统..............…171

8.1.1 基干内容的过滤························172

8.1.2 协同过滤······························ 172

8.2 推荐系统的局限性··························· 173

8.3 用MLlib实现推荐系统·······················173

8.3.1 准备环境·······················174

8.3.2 创建RDD······················175

8.3.3 利用DataFrame探索数据·······176

8.3.4 创建训练和测试数据集················178

8.3.5 创建一个模型···················178

8.3.6 做出预测··························179

8.3.7 利用测试数据对模型进行评估·······179

8.3.8 检查误型的准确度……......180

8.3.9 显式和隐式反馈····················181

8.4 Mahout和Spark的集成·····················181

8.4.1 安装Mahout····················181

8.4.2 探索Mahout shell ·····················182

8.4.3 利可Mahout和搜索工具

构建一个通用的推荐系统········186

8.5 小结····················189

第9章 利用GraphX进行图分析···190

9.1 图处理概述···································190

9.1.1 图是什么···························191

9.1.2 图数据库和图处理系统····191

9.1.3 GraphX概述·······················192

9.1.4 图算法···································192

9.2 GraphX入门·······················193

9.2.1 GraphX的基本操作·······················193

9.2.2 图的变换·············198

9.2.3 GraphX算法·······················202

9.3 利用GraphX分析航班数据···········205

9.4 GraphFrames概述························209

9.4.1 模式发现··························· 211

9.4.2 加载和保存Graphframes···212

9.5 小结...............................................212

第10章 利用SparkR进行交互式分析······213

10.1 R语言和Spark.R概述·······················213

10.1.1 R语言是什么.··························214

10.1.2 SparkR慨述.....................214

10.1.3 SparkR架构..................... 216

10.2 SparkR入门·······················216

10.2.1 安装和配置R·························216

10.2.2 使用SparkR shell··········218

10.2.3 使甲Spark.R脚本·······················222

10.3 在 SparkR里使用Dataframe······223

10.4 在RStudio里使用SparkR···········228

10.5 利用SparkR进行机器学习·······230

10.5.1 利用朴素贝叶斯模型······230

10.5.2 利用K均值模型·······················232

10.6 在Zeppelin里使用SparkR·······233

10.7 小结·······················234

如果想得到下载地址,请访问中科院计算所培训中心官网http://www.tcict.cn/添加官网上的微信客服号索取!

2017-12-07 10:01:58 MrGeroge 阅读数 3166
  • Spark快速大数据处理

    课程的主要内容包括: 1.ZooKeeper-分布式过程协同组件 2.Hadoop3-大数据基础组件 3.Tez-Yarn底层计算引擎 4.Hive3-大数据仓库 5.Spark2实时大数据处理 6.Oozie5-大数据流程引擎 课程特点: 1.最新API: Hadoop3/Spark2/Hive3/Oozie5 2.手工搭建集群环境:编译+搭建 3.配套资源:分阶段镜像+课件+安装资源,其中安装资源包括案例源码、脚本等 4.案例为主:分模块案例+天池数据分析竞赛 5.故障教学 6.完整实战项目:天池数据分析

    12319 人正在学习 去看看 余海峰

第一章 大数据分析

1.数据科学面临的挑战

1)数据分析绝大部分工作是数据预处理包括清洗,处理,融合,挖掘以及其他操作,即使模型调优阶段,特征提取和选择上花费的时间比选择和实现算法时间更多

2)迭代与数据科学紧密相关,比如模型优化时采用的随机梯度下降和最大似然估计,构建模型时需要选择正确特征,挑选合适算法,运行显著性测试,找到合适超参数

3)构建完表现卓越的模型不等于大功告成,模型需要定期甚至实时重建

第二章 用ScalaSpark进行数据分析

数据清洗是数据分析的第一步,数据分析工具语言Scala能完成所有事情

数据清洗一般包括身份解析,记录去重,合并-清除,列表清洗,统称为记录关联

 

记录关联问题描述:

需要从一个或者多个源系统的记录中,根据记录的属性判断记录是否是相同实体

由于RDD都是临时存在的,因此如果后面仍然需要用到此RDD,则需要在当前就缓存此RDD,这样再次使用此RDD时可直接从内存中获取,节约计算开销

 

2.1 聚合

mdsRDD[MatchData]

val grouped=mds.groupBy(md=>md.matched)//根据matched来聚合分为true,false两类

grouped.mapValues(x=>x.size) //grouped中每个元素Key=true or falseValues则为多行记录

 

2.2 概要统计

Parsed.map(md=>md.score(0)).filter(!isNaN(_)).stats() //可以获得score(0)列的统计信息包括总数,平均值,stdev,最大值,最小值

 

2.3 直方图

parsed.map(md=>md.matched).countByValue().toSeq.sortBy(_._1)

 

2.4zip

val nas1=Array(1.0,Double.NaN).map(d=>NAStatCounter(d)) //nas1=Array(NAStatCounter(1.0),NAStatCounter(NaN))

val nas2=Array(Double.NaN,2.0).map(d=>NAStatCounter(d)) //同理nas2

val nas3=nas1.zip(nas2)//Array((NAStatCounter(1.0),NAStatCounter(NaN)),(NAStatCounter(NaN),NAStatCounter(2.0)))

nas3.map(case(a,b)=>a.merge(b))//相当于Array((NAStatCounter(1.0).merge(NAStatCounter(NaN)),NAStatCounter(NaN).merge(NAStatCounter(2.0)))

 

第三章 音乐推荐和Audioscrobbler数据集

隐式反馈数据:Audioscrobbler数据集覆盖了大量用户和艺术家,同时包含了更多总体信息,而单条记录信息较少,这类数据称为隐式反馈数据

user_artist_data.txt 用户ID-艺术家ID-播放次数

artist_data.txt 艺术家ID-艺术家名

artist-alias.txt 将拼写错误的艺术家ID或者ID变体对应到该艺术家的规范ID 错误ID-规范ID

 

3.1 交替最小二乘法推荐

根据两个用户播放过许多相同的歌曲来判断他们可能都喜欢某首歌,称为协同过滤

利用user_artist_data.txt可构建一个稀疏矩阵,有m个用户,和n个艺术家,则此稀疏矩阵为m*n(i,j)的值表示第i个用户播放第j个艺术家的作品次数,此稀疏矩阵A=X*YT

 

用户-特征矩阵和特征-艺术家矩阵的乘积结果约等于稠密用户-艺术家相互关系矩阵的完整估计,求解X,Y可采用交替最小二乘法

AiY(YTY)-1=XI(X的每一行可分别计算,因此并行化),目标是最小化|AiY(YTY)-1-XI|,或者最小化两个矩阵的平方误差,实际会借助QR分解

 

ALS算法特征是稀疏的输入数据、可以用简单的线性代数运算求最优解、数据本身可并行化这三点导致大数据计算非常快

 

Spark MllibALS算法要求用户和产品的ID必须为整数型,而且是32位非负整数

 

1)数据清洗,过滤掉不合规范的数据

2)构建训练集和模型

mkString()常用作把集合元素表示成某种形式分隔的字符串

userFraction 用户特征矩阵

productFraction 商品特征矩阵

3)核查推荐结果 

看实际用户点击过得艺术家和推荐的艺术家是否一致

4)评价推荐质量

AUCBinaryClassificationMetrics:随机选择的好推荐高于坏推荐的概率,作为普遍和总和的测量整体模型质量的手段

AUC的输入为检验集CV(每个用户对应的“正面的”或者“好的”艺术家)和预测函数(每个用户-艺术家对转换成预测Rating

MAPRankingMetrics):准确率,召回率,平均准确度,更强调最前面的推荐质量

选择模型并评估模型准确度是通用做法,数据通常分为训练集(训练模型)、检验集(评估模型)、测试集

5)选择超参数

ALS.trainImplicit()参数包括:

rank=10模型的潜在因素个数,即“用户-特征”和“产品-特征”矩阵的列数(rank影响不明显)

Iterations=5 迭代次数

lambda=0.01标准的过拟合参数,值越大越不容易产生过拟合,但值太大会降低分解的准确性(lambda越大模型效果越好)

alpha=1.0 被观察到的“用户-产品”交互相对没被观察到的交互的权重(模型知道用户听过什么时比不知道用户听过什么时要好)

可以将推荐引擎得到的结果保存在HBase便于快速查询,且对于推荐模型可以保存在文件中 

 

第四章 用决策树算法预测森林植被

趋均数回归:父代豌豆大,子代豌豆也大,但比父代小;父代豌豆小,子代豌豆也小,但比父代大,因此斜率为正但小于1

何谓回归:根据某个值预测另外一个值的思想,回归通常用作预测,回归通常一侧数值型数量,比如大小,收入和温度;分类则只预测标号或者类别

监督学习:需要告诉其类别,然后训练出预测规则,分类(支持向量机,逻辑回归,朴素贝叶斯,神经网络,深度学习),预测

非监督学习:比如聚类

 

4.1向量和特征

维度、预测指标或者变量都称为特征,特征分为类别型特征和数值型特征,可将目标作为附加特征

比如当天天气的特征向量为12.5,15.5,0.10,晴朗,0,17,2,其中17.2作为目标是当天天气的附加特征

回归问题的目标是数值型特征,分类问题的目标是类别型特征

 

4.2 决策树和决策森林

基于决策树的优点:

1)容易并行化,对数据中的离散点具有鲁棒性

2)算法可以接受不同类型和量纲的数据,对数据类型和尺度不相同的情况不需要做预处理或者规范化

3)理解和推理起来相对直观

 

决策树类似于多重if语句判断

 

4.3 Covtype数据集

由于Vector+Label=LabeledPointVector为特征值,Label为目标,由于Spark Mllib库中Vector只能是Double类型,因此若数据中存在类别特征需要编码转换成数值特征

1)采用one-hot编码可以将N个类别特征->N个数值特征,比如多云,有雨,晴朗->(1,0,0),(0,1,0),(0,0,1)

2)采用直接映射比如规定多云=1.0,有雨=2.0,晴朗=3.0

 

4.4 构建决策树模型

val model=DecisionTree.trainClassifier(trainData,7,Map[Int,Int](),”gini”,4,100) 

除了第一个参数外,其余均为超参数

第二个参数为数据集中目标的取值个数7个,即目标可能取值为7

第三个参数为Map保存类别型特征的信息

第四个参数gini ,补偿函数

第五个参数4为最大深度

第六个参数为最大桶数100

MulticlassMetrics计算分类器预测质量的标准指标

BinaryclassficationMetrics提供类似的评价指标,但只适用于类别型目标只有两个可能取值

所谓的混淆矩阵的每个行对应实际的正确类别值;每一列对应预测值,第i行第j列元素代表正确类别为i的样本被预测为类别为j的次数,对角线上的元素表示预测正确的次数,剩余的为预测错误的次数

准确度:预测正确的样本数占整个样本的比例

召回率:被分类器标记为“正”而且确实为“正”的样本与所有本来为“正”的样本比例

每种类别在训练集所占比例*每种类别在检验集所占比例=每种类别的总体准确度,将每种类别的总体准确度累加所得即为整体的准确度

 

4.5决策树的超参数

包括最大深度,最大桶数,不纯性度量

1)最大深度:对决策树的最大层数限制,避免过拟合

 

决策树算法为每层生成可能的决策规则,对于数值型特征,决策采用特征>=值的形式;对于类别型特征,决策采用在(值1,值2...)中的形式;决策规则的集合称为“桶”,桶的数目越多,需要处理的时间越多但找到决策规则可能更优;好规则可把训练集数据的目标值分为相对是同类或者纯的子集,最好的规则意味着最小化规则对应的两个子集的不纯性;不纯性的度量方式:Gini不纯度或者熵

Gini不纯度直接与随机猜测分类器的准确度相关(1-每个类别的比例与自身的乘积之和)

熵不纯度来源于信息论中熵的计算,Spark的实现默认采用Gini不纯度

 

4.6决策树调优

最大桶数越多越好,会减慢模型构造过程且增加内存的使用量

最大深度能提高准确度,但有个拐点,超过它之后就没有用

在所有情况下都应该试试两种不纯性度量,有时候gini表现好点,有时候熵表现好点

 

检验集评估适合训练集的参数,测试集评估适合检验集的超参数,测试集保证了对最终选定超参数即模型准确度的无偏估计

 

最后一步需要利用测试集评估检验集选定的超参数和(训练集+检验集)下构造的模型,若得到的准确率与CV集所得的准确率差不多,则估计可靠

 

整体过程大致为:

1)利用trainData训练得到决策树模型

2)利用cvData找到最优的超参数组合

3)利用trainData&cvData以及最优的超参数组合训练出新的决策树模型

4)利用testData评估该模型的准确度,并与以trainData作为训练集,cvData作为测试集时的准确度比较,若相差不大,则估计可靠

 

4.7重谈类别型特征

直接使用类别型特征

Map(),不把任何特征当做类别型

Map()中的key指输入向量Vector的下标,value指类别型特征的不同取值个数

通过吧类别型特征真正当做类别型,分类器的准确度提高近3%

 

4.8 随机决策森林

随机森林的关键在于所及的构建多颗决策树,多颗决策树的平均预测作为最终的结果

val forest=RandomForest.trainClassifier(trainData,7,Map(10->4,11->40),20,”auto”,”entropy”,30,300)

20为随机森林由20个独立的决策树组成

auto特征决策树每层的评估特征选择策略,决策规则只考虑全部特征的一个子集,往往不会产生过拟合现象

 

第五章 基于K均值聚类的网络流量异常检测

K-Means算法是最经典的无监督学习算法吗,K值即为该聚类模型的超参数

点与点相似代表其距离小,这里的距离只欧式距离,簇群的中心为质心,它是簇群中所有点的算术平均值,数据点实际上等同于特征向量

 

5.1数据准备

http://kdd.ics.uci.edu/databases/kddcup99/kddcup99.html 网络流量异常检测,数据集大小为708MB,包括490万连接

每个连接占一行,共38个特征,包括发送字节数,登录次数,TCP错误数

由数据分析可知smurfneptune类型的连接攻击较多

数据清洗:删除类别型列和最后的标识列

5.2K值选择

随着K值的增加,平均质心距离先减少后增加,但只能取得局部最优情况,Spark Mllib采用K均值||

setRuns()设置给定K值时运行次数,每次随机选择初始质心,然后从多次聚类结果中选择最优的情况

setEpsilon()增加迭代时间可以优化聚类结果,控制聚类过程中簇质心进行有效移动的最小值

5.3特征的规范化

 

第六章 基于潜在语义分析算法分析维基百科

潜在语义分析:是一种自然语言处理和信息检索技术,目的是更好地理解文档语料库以及文档中词项的关系,它将语料库提炼成一组相关概念,每个概念捕捉数据中一个不同主题,且通常与语料库讨论的主题相符,每个概念分为三个属性:1)语料库中文档的相关度2)语料库中词项的相关度3)概念对描述主题的重要性评分;它可以计算词项与词项,文档与文档,词项与文档之间的相似度评分,这些相似度度量方法适合根据词项查询相关文档,按主题将文档分组和找到相关词项等任务

 

LSA在降维过程中采用奇异值分解的线性代数技术,首先根据词项在文档中出现的次数构造一个词项-文档矩阵,每个文档对应一列,每个词项对应一行,矩阵中的每个元素代表某个词项在对应文档中的重要性(权重),接着SVD可将矩阵分解为三个子矩阵,其中一个矩阵代表文档出现的概念,两一个代表词项对应的概念,还有一个代表每个概念的重要度

 

6.1 词项-文档矩阵

每一行代表词项,每一列代表文档,每个值代表词项在文档中的权重,其中权重的计算规则为TF-IDF        

需要对原始的维基百科到处文件转换成词项-文档矩阵需要进行许多预处理:

1)输入XML文件,每个文档由<page>标签分隔

2)将纯文本拆成词条(token

3)将词条的不同曲折词缀还原成词根,称为词形归并

4)词条用于计算词项频率和文档频率

6.2 获取数据 

1)需要利用XmlInputFormat读取HDFS上的每个page的内容                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                                          

2)利用wikipedia提供的API将每个page的内容->(page.getTitle,page.getContent)

 

6.3词形合并

1)像theis之类的常用词不会为模型提供有用信息,因此要去掉这些停用词

2)相同意思的词项可能存在不同词形,比如monkeymonkeys不能算作不同词项,再比如nationalizenationalization,将这些不同曲折词缀合并成单个词项的过程称为词干还原或者词形归并

 

6.5 计算TF-IDF

1)得到每个文档中每个词项出现的频次

2)得到每个文档中总的频次

3)得到总的文档数

4)docTermFreqs.flatMap(_.keySet).map((_,1)).reduceByKey(_+_) 得到每个item在多少个文档出现过

5)由于spark Mllib只能处理数值型特征,因此需要将item=>ididfs.keys.zipWithIndex.toMap

 

6.6 奇异值分解

M=USVT

Um*k维矩阵,U中的列是文档空间的正交基,m个文档和k个概念

Sk*k对角矩阵,每个对角代表概念的强度

Vn*k型矩阵,V中的列是词项空间的正交基,k个概念和n个词项

 

m为文档个数,n为词项个数,k为要保留的概念个数

计算奇异值分解的过程:

1)val mat=new RowMatrix(termDocMatrix)

2)val k=1000

3)val svd=mat.computeSVD(k,computeU=true)

 

6.7找出重要概念

Val v=svd.V //V矩阵,词项对概念的重要度

val topTerms=new ArrayBuffer[Seq(String,Double)]()

val arr=v.toAttay //将矩阵转换成数组

for(i<-0 until numConcepts){

val offs=i*v.numRows

val termWeights=arr.slice(offs,offs+v.numRows).zipWithIndex //求每个term的权重

val sorted=termWeights.sortBy((-)_._1)

topTerms+=sorted.take(numTerms).map{

case(score,id) =>(termIds(id,score))

}

topTerms

 

得到每个概念(主题)相关的词项和文档

 

6.8 基于低维近似的查询和评分

余弦相关度度量可以明确的标识两个词项的相关度得分,余弦相似度可以通过点乘除以向量的长度得到,广泛使用余弦相关度度量作为文档和词项相似度的指标

 

6.9词项-词项相关度

LSA算法背后的思想是低阶矩阵对数据更有用的表示

1)合并相关词项处理同义词

2)对词项的不同含义赋予低的权重处理多义词

3)过滤噪声

 

如何得到词项-词项之间的相似度

1)V点乘s=>Vs,然后对Vs进行归一化

2)根据normalizedVS和查询词项termId得到查询词项所在行向量rowVec

3)查询词项行向量rowVec*normalizedVS->termScores //得到每次词项与查询词项的相似度

4)按照相似度从大到小排序->得到与查询词项最相关的N个词项

 

6.10 文档-文档相关度

1)创建US

Val US=mutiplyByDiagonalMatrix(svd.U,svd.s) //创建US

2)归一化US

Val normalizedUS=rowsNormalized(US)

3)根据查询文档IDdocId与归一化矩阵normalizedUS得到docRowArr

val docRowArr=row(normalizedUS,docId) //US中查找给定docId对应的行

4)将文档行->文档行向量

Val docRowVec=Matrices.dense(docRowArr.length,1,docRowArr)

5)文档行向量与归一化US相乘

Val docScores=normalizedUS.multiply(docRowVec) //每个文档与查询文档的相关度

6)找出相关度最高的前N个文档

Val allDocWeights=docScores.rows.map(_.toArray(0)).zipWithUniqueId().filter(!_._1.isNaN)

 

6.11 词项-文档相关度

 1)在V中根据termId找到查询词对应的行<以及行向量

Val rowArr=row(V,termId).toArray //找到termId所对应的行向量

Val rowVec=Matrics.dense(termRowArr.length,1,termRowArr)

2)找到查询词相关的文档

Val docScores=US.multiply(termRowVec)

3)找到查询词最相关的前N个文档

Val allDocWeights=docScores.rows.map(_.toArray(0)).zipWithUniqueId().top(10)

 

6.12 多项词查询

1)构造查询词的稀疏向量

new BSparseVector[Double](indices,values,idTerms.size) //indices查询词的IDvalues为查询词对应的逆文档频率

2)得到查询词对应的行向量

val termRowArr=(breezeV.t*query).toArray

val termRowVec=Matrices.dense(termRowArr.length,1,termRowArr) 

3)US点乘行向量得到查询词对应的文档得分

Val docScores=US.multiply(termRowVec) 

4)取得查询词最相关的N个文档

docScores.rows.map(_.toArray(0)).zipWithUniqueId().top(10)

 

第七章 用GraphX分析伴生网络

Spark之上运行的GraphX并行图计算框架,支持PregelGraphGraphLab可分析用户-商品购买关系图,社交网络等

 

7.1MEDLINE文献引用索引的网络分析

基本思路:

1)研究数据集中的主要主题和它们的伴生关系

2)找到数据集中的连通组件

3)图的度分布,描述了主题的相关度变化并有助于找到那些与其他主题相关联最多的主题

4)计算图的统计量:聚类系数和平均路径长度

 

7.2获取数据

样本问价那种的每条记录是MedlineCitation类型的记录,该记录包括杂志名称、发行期号、发行日期、作者姓名、摘要、MeSH关键字集合

 

7.3Scala XML工具解析XML文档

通过scala.xml工具能快速方便的解析XML文件

 

7.4分析MeSH主要主题和伴生关系

高频topic占少数,低频topic占多数,因此呈现出长尾

 

7.5GraphX来建立一个伴生网络

把伴生网络真正当做网络分析,主题作为图的顶点,连接两个主题的引用记录看作图的边

VertexRDD[VD]RDD[(VertexId,VD)]的特殊实现,VertexId为顶点标识,VD为顶点属性;EdgeRDD[ED]RDD[Edge[ED]]的特殊实现,每个Edge包含两个VertexId和一个ED边属性

 

1)由于每个主题都是字符串型,因此需要将其转换成64位的Long

2)利用Hashing哈希算法将每个topic->唯一的vertexId,即生成顶点

3)利用伴生频率计数生成边,保证左边的vertexId(src)比右边的vertexId(dst)

4)准备好顶点和边后便可以创建GraphX实例,GraphX可以自动完成顶点的去重,但不会进行边去重,这样Graph可以为我们生成多图

 

7.6理解网络结构

判断该图是否连通图,若不是连通图,则可以进一步划分成子图研究其特性,GraphX内置了连通部件
val connectedComponentGraph[VertexId,Int]=topicGraph.connectedComponents()//VertexId是每个顶点所属连通组件的唯一标识符

 

7.7度分布

注意点

1)degrees RDD中条目个数比概念图中的顶点数少,这是由于部分顶点没有连接边,只有一个主题词

2)度的均值很小,意味着大部分顶点只连接少数的其他节点,当然也存在度很高的顶点

 

7.8过滤噪声边

 

2019-05-26 00:47:04 alionsss 阅读数 2556
  • Spark快速大数据处理

    课程的主要内容包括: 1.ZooKeeper-分布式过程协同组件 2.Hadoop3-大数据基础组件 3.Tez-Yarn底层计算引擎 4.Hive3-大数据仓库 5.Spark2实时大数据处理 6.Oozie5-大数据流程引擎 课程特点: 1.最新API: Hadoop3/Spark2/Hive3/Oozie5 2.手工搭建集群环境:编译+搭建 3.配套资源:分阶段镜像+课件+安装资源,其中安装资源包括案例源码、脚本等 4.案例为主:分模块案例+天池数据分析竞赛 5.故障教学 6.完整实战项目:天池数据分析

    12319 人正在学习 去看看 余海峰

《Spark高级数据分析》——音乐推荐(ALS算法)

0. 简介

  • 来源: 《Spark高级数据分析》
  • 原书GitHub地址: https://github.com/sryza/aas
  • 内容简述:利用Spark中ALS算法,为音乐用户推荐合适的艺术家

1. 数据准备

  • 艺术家id和名称的关系映射数据 artist_data.txt
// 读取原始数据
// 2 columns: artistid artist_name
val path1 = "./profiledata_06-May-2005/artist_data.txt"
val rawArtistData = spark.read.textFile(path1)
// 数据预处理
def transformArtistData(rawArtistData: Dataset[String]): DataFrame = {
   import rawArtistData.sparkSession.implicits._

   rawArtistData.flatMap(line => {
     val (id, name) = line.span(_ != '\t')
     try {
       if (name.nonEmpty)
         Some(id.toInt, name.trim)
       else
         None
     } catch {
       case _: Exception => None
     }
   }).toDF("id", "name")
}
val artistIdDF = transformArtistData(rawArtistData)  
  • 艺术家与其别名 artist_alias.txt
// 读取原始数据
// 2 columns: badid, goodid
 val path2 = "./profiledata_06-May-2005/artist_alias.txt"
 val rawAliasData = spark.read.textFile(path2)
 // 数据预处理
def transformAliasData(rawAliasData: Dataset[String]): Dataset[(Int, Int)] = {
  import rawAliasData.sparkSession.implicits._

  rawAliasData.flatMap(line => {
    val Array(artist, alias) = line.split('\t')
    try {
      if (artist.nonEmpty)
        Some(artist.toInt, alias.toInt)
      else
        None
    } catch {
      case _: Exception => None
    }
  })
}
val artistAlias = transformAliasData(rawAliasData).collect().toMap
  • 用户与艺术家的关系数据集(某个用户听了某艺术家的音乐多少次) user_artist_data.txt
// 原始数据
// 3 columns: userid artistid playcount
val path0 = "./profiledata_06-May-2005/user_artist_data.txt"
val rawUserArtistData = spark.read.textFile(path0)
// 数据预处理
def tarnsformUserArtistData(spark: SparkSession, rawUserArtistDS: Dataset[String], artistAlias: Map[Int, Int]): DataFrame = {
  import spark.implicits._

  val bArtistAlias = spark.sparkContext.broadcast(artistAlias)

  rawUserArtistDS.map(line => {
    val Array(userId, artistId, count) = line.split(' ').map(_.toInt)
    // 将id转成别称,没有的就还是用id
    val finalArtistId = bArtistAlias.value.getOrElse(artistId, artistId)
    (userId, finalArtistId, count)
  }).toDF("user", "artist", "count")
}
val allDF = transformUserArtistData(spark, rawUserArtistData, artistAlias)
  • 拆分训练集、测试集
val Array(trainDF, testDF) = allDF.randomSplit(Array(0.9, 0.1))
// 对训练数据集做好缓存,后续ALS模型迭代计算会多次使用
trainDF.unpersist()

2. 训练ALS模型

  • 构建ALS模型,开始训练
// 构建模型
val als = new ALS()
  .setSeed(Random.nextLong())
  .setImplicitPrefs(true)
  .setRank(30)
  .setRegParam(0.0001)
  .setAlpha(1.0)
  .setMaxIter(5)
  .setUserCol("user")
  .setItemCol("artist")
  .setRatingCol("count")
  .setPredictionCol("prediction")

// 训练模型
val model = als.fit(trainDF)

// 释放缓存资源
trainDF.unpersist()

3. 为用户推荐音乐家

  • 准备用户数据
val someUsers = testDF.select("user").as[Int].take(10).distinct
  • 推荐音乐家
// 构建推荐的方法
def recommend(model: ALSModel, userId: Int, howMany: Int, artistIdDF: DataFrame): DataFrame = {
  import artistIdDF.sparkSession.implicits._

  // 为用户userId推荐艺术家
  val toRecommend = model.itemFactors
    .select($"id".as("artist"))
    .withColumn("user", lit(userId))

  // 获取推荐的前几名艺术家
  val topRecommendtions = model.transform(toRecommend)
    .select("artist", "prediction")
    .orderBy($"prediction".desc)
    .limit(howMany)

  // 得到需要推荐的艺术家的id
  val recommendedArtistIds = topRecommendtions.select("artist").as[Int].collect()

  artistIdDF.filter($"id" isin (recommendedArtistIds: _*))
}

// 开始推荐
someUsers.map { user =>
  // 推荐
  val recommendDF = recommend(model, user, 5, artistIdDF)
  val strings = recommendDF.map(_.mkString("|")).collect()

  (user, strings.toBuffer)
}.foreach(println)

4. 利用网格搜索与AUC评分

  • 网格搜索
// 利用for循环,生成不同的超参数配置
for (rank <- Seq(5, 30);
     regParam <- Seq(4.0, 0.0001);
     alpha <- Seq(1.0, 40.0))
  yield {
    // 构建模型,训练
    ……
    // 返回结果
  }
  • AUC评分(此部分代码来自于原书GitHub)
    注:关于理解AUC,可以参考这篇博客
/**
  * 计算AUC评分
  *
  * @param positiveData    测试数据
  * @param bAllArtistIDs   所有的艺术家ID
  * @param predictFunction model.transform
  * @return 评分 0-1
  */
def areaUnderCurve(positiveData: DataFrame,
                   bAllArtistIDs: Broadcast[Array[Int]],
                   predictFunction: DataFrame => DataFrame): Double = {
  import positiveData.sparkSession.implicits._

  // What this actually computes is AUC, per user. The result is actually something
  // that might be called "mean AUC".

  // Take held-out data as the "positive".
  // Make predictions for each of them, including a numeric score
  val positivePredictions = predictFunction(positiveData.select("user", "artist")).
    withColumnRenamed("prediction", "positivePrediction")

  // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
  // small AUC problems, and it would be inefficient, when a direct computation is available.

  // Create a set of "negative" products for each user. These are randomly chosen
  // from among all of the other artists, excluding those that are "positive" for the user.
  val negativeData = positiveData.select("user", "artist").as[(Int, Int)].
    groupByKey { case (user, _) => user }.
    flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
      val random = new Random()
      val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet
      val negative = new ArrayBuffer[Int]()
      val allArtistIDs = bAllArtistIDs.value
      var i = 0
      // Make at most one pass over all artists to avoid an infinite loop.
      // Also stop when number of negative equals positive set size
      while (i < allArtistIDs.length && negative.size < posItemIDSet.size) {
        val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
        // Only add new distinct IDs
        if (!posItemIDSet.contains(artistID)) {
          negative += artistID
        }
        i += 1
      }
      // Return the set with user ID added back
      negative.map(artistID => (userID, artistID))
    }.toDF("user", "artist")

  // Make predictions on the rest:
  val negativePredictions = predictFunction(negativeData).
    withColumnRenamed("prediction", "negativePrediction")

  // Join positive predictions to negative predictions by user, only.
  // This will result in a row for every possible pairing of positive and negative
  // predictions within each user.
  val joinedPredictions = positivePredictions.join(negativePredictions, "user").
    select("user", "positivePrediction", "negativePrediction").cache()

  // Count the number of pairs per user
  val allCounts = joinedPredictions.
    groupBy("user").agg(count(lit("1")).as("total")).
    select("user", "total")
  // Count the number of correctly ordered pairs per user
  val correctCounts = joinedPredictions.
    filter($"positivePrediction" > $"negativePrediction").
    groupBy("user").agg(count("user").as("correct")).
    select("user", "correct")

  // Combine these, compute their ratio, and average over all users
  val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
    select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
    agg(mean("auc")).
    as[Double].first()

  joinedPredictions.unpersist()

  meanAUC
}
  • 合并网格搜索+AUC评分的调用
// 艺术家id数据,用于AUC评分
val allArtistIds = allDF.select("artist").as[Int].distinct().collect()
val bAllArtistIds = spark.sparkContext.broadcast(allArtistIds)
 
// 网格搜索
val evaluations =
  // 利用for循环,生成不同的超参数配置
  for (rank <- Seq(5, 30);
       regParam <- Seq(4.0, 0.0001);
       alpha <- Seq(1.0, 40.0))
    yield {
      // 构建模型
      val als = new ALS()
        .setSeed(Random.nextLong())
        .setImplicitPrefs(true)
        .setRank(10)
        .setRegParam(0.01)
        .setAlpha(1.0)
        .setMaxIter(5)
        .setUserCol("user")
        .setItemCol("artist")
        .setRatingCol("count")
        .setPredictionCol("prediction")
      // 训练模型
      val model = als.fit(trainDF)

	  // 进行AUC评分
      val auc = areaUnderCurve(testDF, bAllArtistIds, model.transform)

      // 释放资源
      model.userFactors.unpersist()
      model.itemFactors.unpersist()

      (auc, (rank, regParam, alpha))
    }

evaluations.sorted.reverse.foreach(println)
  • 结果示例
(0.9039124436650243,(30,1.0E-4,1.0))
(0.9034269912559532,(5,1.0E-4,1.0))
(0.9032449249724098,(30,1.0E-4,40.0))
(0.9028574761056848,(30,4.0,1.0))
(0.9019663966459797,(5,1.0E-4,40.0))
(0.9017698705975027,(30,4.0,40.0))
(0.9015351771563618,(5,4.0,40.0))
(0.9011632951254114,(5,4.0,1.0))

5. 完整代码

package com.skey.analytics.ch03

import org.apache.spark.SparkConf
import org.apache.spark.broadcast.Broadcast
import org.apache.spark.ml.recommendation.{ALS, ALSModel}
import org.apache.spark.sql.functions._
import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}

import scala.collection.mutable.ArrayBuffer
import scala.util.Random

/**
  * 第三章 - 音乐推荐
  *
  * @author ALion
  * @version 2019/2/24 10:53
  */
object Recommender {

  def main(args: Array[String]): Unit = {
    val conf = new SparkConf()
      .setMaster("local[4]")
      .setAppName("RecommenderApp")
    val spark = new SparkSession.Builder()
      .config(conf)
      .enableHiveSupport()
      .getOrCreate()

    import spark.implicits._

    //artist_data.txt
    //2 columns: artistid artist_name
    val path1 = "./profiledata_06-May-2005/artist_data.txt"
    val rawArtistData = spark.read.textFile(path1)
    val artistIdDF = transformArtistData(rawArtistData)

    //artist_alias.txt
    //2 columns: badid, goodid
    val path2 = "./profiledata_06-May-2005/artist_alias.txt"
    val rawAliasData = spark.read.textFile(path2)
    val artistAlias = transformAliasData(rawAliasData).collect().toMap

    //user_artist_data.txt
    //3 columns: userid artistid playcount
    val path0 = "./profiledata_06-May-2005/user_artist_data.txt"
    val rawUserArtistData = spark.read.textFile(path0)

    val allDF = transformUserArtistData(spark, rawUserArtistData, artistAlias)
    allDF.persist()

    // 拆分训练集和测试集
    val Array(trainDF, testDF) = allDF.randomSplit(Array(0.9, 0.1))
    trainDF.persist()

    //    // 构建模型
    //    val als = new ALS()
    //      .setSeed(Random.nextLong())
    //      .setImplicitPrefs(true)
    //      .setRank(30)
    //      .setRegParam(0.0001)
    //      .setAlpha(1.0)
    //      .setMaxIter(5)
    //      .setUserCol("user")
    //      .setItemCol("artist")
    //      .setRatingCol("count")
    //      .setPredictionCol("prediction")
    //
    //    // 训练模型
    //    val model = als.fit(trainDF)
    //
    //    // 释放缓存资源
    //    trainDF.unpersist()
    //
    //    // 开始推荐
    //
    //    // 准备需要推荐的用户
    //    val someUsers = testDF.select("user").as[Int].take(10).distinct
    //
    //    someUsers.map { user =>
    //      // 推荐
    //      val recommendDF = recommend(model, user, 5, artistIdDF)
    //      val strings = recommendDF.map(_.mkString("|")).collect()
    //
    //      (user, strings.toBuffer)
    //    }.foreach(println)

    // 艺术家id数据,用于AUC评分
    val allArtistIds = allDF.select("artist").as[Int].distinct().collect()
    val bAllArtistIds = spark.sparkContext.broadcast(allArtistIds)

    // 网格搜索
    val evaluations =
    // 利用for循环,生成不同的超参数配置
      for (rank <- Seq(5, 30);
           regParam <- Seq(4.0, 0.0001);
           alpha <- Seq(1.0, 40.0))
        yield {
          // 构建模型
          val als = new ALS()
            .setSeed(Random.nextLong())
            .setImplicitPrefs(true)
            .setRank(10)
            .setRegParam(0.01)
            .setAlpha(1.0)
            .setMaxIter(5)
            .setUserCol("user")
            .setItemCol("artist")
            .setRatingCol("count")
            .setPredictionCol("prediction")

          val model = als.fit(trainDF)

          val auc = areaUnderCurve(testDF, bAllArtistIds, model.transform)

          // 释放资源
          model.userFactors.unpersist()
          model.itemFactors.unpersist()

          (auc, (rank, regParam, alpha))
        }

    evaluations.sorted.reverse.foreach(println)
    //(0.9039124436650243,(30,1.0E-4,1.0))
    //(0.9034269912559532,(5,1.0E-4,1.0))
    //(0.9032449249724098,(30,1.0E-4,40.0))
    //(0.9028574761056848,(30,4.0,1.0))
    //(0.9019663966459797,(5,1.0E-4,40.0))
    //(0.9017698705975027,(30,4.0,40.0))
    //(0.9015351771563618,(5,4.0,40.0))
    //(0.9011632951254114,(5,4.0,1.0))

    spark.stop()
  }

  /**
    * 合并数据,创建一个总的数据集
    *
    * @param spark           SparkSession
    * @param rawUserArtistDS 用户和艺术家的关系数据集
    * @param artistAlias     艺术家别名id,用于补全
    * @return
    */
  def transformUserArtistData(spark: SparkSession, rawUserArtistDS: Dataset[String], artistAlias: Map[Int, Int]): DataFrame = {
    import spark.implicits._

    val bArtistAlias = spark.sparkContext.broadcast(artistAlias)

    rawUserArtistDS.map(line => {
      val Array(userId, artistId, count) = line.split(' ').map(_.toInt)
      val finalArtistId = bArtistAlias.value.getOrElse(artistId, artistId)
      (userId, finalArtistId, count)
    }).toDF("user", "artist", "count")
  }

  def transformArtistData(rawArtistData: Dataset[String]): DataFrame = {
    import rawArtistData.sparkSession.implicits._

    rawArtistData.flatMap(line => {
      val (id, name) = line.span(_ != '\t')
      try {
        if (name.nonEmpty)
          Some(id.toInt, name.trim)
        else
          None
      } catch {
        case _: Exception => None
      }
    }).toDF("id", "name")
  }

  def transformAliasData(rawAliasData: Dataset[String]): Dataset[(Int, Int)] = {
    import rawAliasData.sparkSession.implicits._

    rawAliasData.flatMap(line => {
      val Array(artist, alias) = line.split('\t')
      try {
        if (artist.nonEmpty)
          Some(artist.toInt, alias.toInt)
        else
          None
      } catch {
        case _: Exception => None
      }
    })
  }

  /**
    * 为指定用户推荐艺术家
    *
    * @param model      训练好的ALS模型
    * @param userId     用户id
    * @param howMany    推荐多少个艺术家
    * @param artistIdDF 艺术家id和名称的关系映射
    * @return
    */
  def recommend(model: ALSModel, userId: Int, howMany: Int, artistIdDF: DataFrame): DataFrame = {
    import artistIdDF.sparkSession.implicits._

    val toRecommend = model.itemFactors
      .select($"id".as("artist"))
      .withColumn("user", lit(userId))

    val topRecommendtions = model.transform(toRecommend)
      .select("artist", "prediction")
      .orderBy($"prediction".desc)
      .limit(howMany)

    // 得到需要推荐的艺术家的id
    val recommendedArtistIds = topRecommendtions.select("artist").as[Int].collect()

    artistIdDF.filter($"id" isin (recommendedArtistIds: _*))
  }

  /**
    * 计算AUC评分
    *
    * @param positiveData    测试数据
    * @param bAllArtistIDs   所有的艺术家ID
    * @param predictFunction model.transform
    * @return 评分 0-1
    */
  def areaUnderCurve(positiveData: DataFrame,
                     bAllArtistIDs: Broadcast[Array[Int]],
                     predictFunction: DataFrame => DataFrame): Double = {
    import positiveData.sparkSession.implicits._

    // What this actually computes is AUC, per user. The result is actually something
    // that might be called "mean AUC".

    // Take held-out data as the "positive".
    // Make predictions for each of them, including a numeric score
    val positivePredictions = predictFunction(positiveData.select("user", "artist")).
      withColumnRenamed("prediction", "positivePrediction")

    // BinaryClassificationMetrics.areaUnderROC is not used here since there are really lots of
    // small AUC problems, and it would be inefficient, when a direct computation is available.

    // Create a set of "negative" products for each user. These are randomly chosen
    // from among all of the other artists, excluding those that are "positive" for the user.
    val negativeData = positiveData.select("user", "artist").as[(Int, Int)].
      groupByKey { case (user, _) => user }.
      flatMapGroups { case (userID, userIDAndPosArtistIDs) =>
        val random = new Random()
        val posItemIDSet = userIDAndPosArtistIDs.map { case (_, artist) => artist }.toSet
        val negative = new ArrayBuffer[Int]()
        val allArtistIDs = bAllArtistIDs.value
        var i = 0
        // Make at most one pass over all artists to avoid an infinite loop.
        // Also stop when number of negative equals positive set size
        while (i < allArtistIDs.length && negative.size < posItemIDSet.size) {
          val artistID = allArtistIDs(random.nextInt(allArtistIDs.length))
          // Only add new distinct IDs
          if (!posItemIDSet.contains(artistID)) {
            negative += artistID
          }
          i += 1
        }
        // Return the set with user ID added back
        negative.map(artistID => (userID, artistID))
      }.toDF("user", "artist")

    // Make predictions on the rest:
    val negativePredictions = predictFunction(negativeData).
      withColumnRenamed("prediction", "negativePrediction")

    // Join positive predictions to negative predictions by user, only.
    // This will result in a row for every possible pairing of positive and negative
    // predictions within each user.
    val joinedPredictions = positivePredictions.join(negativePredictions, "user").
      select("user", "positivePrediction", "negativePrediction").cache()

    // Count the number of pairs per user
    val allCounts = joinedPredictions.
      groupBy("user").agg(count(lit("1")).as("total")).
      select("user", "total")
    // Count the number of correctly ordered pairs per user
    val correctCounts = joinedPredictions.
      filter($"positivePrediction" > $"negativePrediction").
      groupBy("user").agg(count("user").as("correct")).
      select("user", "correct")

    // Combine these, compute their ratio, and average over all users
    val meanAUC = allCounts.join(correctCounts, Seq("user"), "left_outer").
      select($"user", (coalesce($"correct", lit(0)) / $"total").as("auc")).
      agg(mean("auc")).
      as[Double].first()

    joinedPredictions.unpersist()

    meanAUC
  }

}

2017-11-07 20:17:39 fengzhimohan 阅读数 3454
  • Spark快速大数据处理

    课程的主要内容包括: 1.ZooKeeper-分布式过程协同组件 2.Hadoop3-大数据基础组件 3.Tez-Yarn底层计算引擎 4.Hive3-大数据仓库 5.Spark2实时大数据处理 6.Oozie5-大数据流程引擎 课程特点: 1.最新API: Hadoop3/Spark2/Hive3/Oozie5 2.手工搭建集群环境:编译+搭建 3.配套资源:分阶段镜像+课件+安装资源,其中安装资源包括案例源码、脚本等 4.案例为主:分模块案例+天池数据分析竞赛 5.故障教学 6.完整实战项目:天池数据分析

    12319 人正在学习 去看看 余海峰

项目应用需要利用Spark读取mysql数据进行数据分析,然后将分析结果保存到mysql中。
开发环境:
java:1.8
IDEA
spark:1.6.2

一.读取mysql数据
1.创建一个mysql数据库
user_test表结构如下:

create table user_test (
id int(11) default null comment "id",
name varchar(64) default null comment "用户名",
password varchar(64) default null comment "密码",
age int(11) default null comment "年龄"
)engine=InnoDB default charset=utf-8;

2.插入数据

insert into user_test values(12, 'cassie', '123456', 25);
insert into user_test values(11, 'zhangs', '1234562', 26);
insert into user_test values(23, 'zhangs', '2321312', 27);
insert into user_test values(22, 'tom', 'asdfg', 28);

3.创建maven工程,命名为Test,添加java类SparkMysql
这里写图片描述
添加依赖包

pom文件内容:

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <modelVersion>4.0.0</modelVersion>

    <groupId>SparkSQL</groupId>
    <artifactId>com.sparksql.test</artifactId>
    <version>1.0-SNAPSHOT</version>
    <properties>
         <java.version>1.8</java.version>
    </properties>
    <dependencies>
        <dependency>
            <groupId>mysql</groupId>
            <artifactId>mysql-connector-java</artifactId>
            <version>5.1.24</version>
        </dependency>
        <dependency>
            <groupId>org.apache.hadoop</groupId>
            <artifactId>hadoop-common</artifactId>
            <version>2.6.0</version>
        </dependency>
        <dependency>
            <groupId>net.sf.json-lib</groupId>
            <artifactId>json-lib</artifactId>
            <version>2.4</version>
            <classifier>jdk15</classifier>
        </dependency>

    </dependencies>

</project>

4.编写spark代码

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.SQLContext;

import java.util.Properties;

/**
 * Created by Administrator on 2017/11/6.
 */
public class SparkMysql {
    public static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SparkMysql.class);

    public static void main(String[] args) {
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));
        SQLContext sqlContext = new SQLContext(sparkContext);
        //读取mysql数据
        readMySQL(sqlContext);

        //停止SparkContext
        sparkContext.stop();
    }
        private static void readMySQL(SQLContext sqlContext){
        //jdbc.url=jdbc:mysql://localhost:3306/database
        String url = "jdbc:mysql://localhost:3306/test";
        //查找的表名
        String table = "user_test";
        //增加数据库的用户名(user)密码(password),指定test数据库的驱动(driver)
        Properties connectionProperties = new Properties();
        connectionProperties.put("user","root");
        connectionProperties.put("password","123456");
        connectionProperties.put("driver","com.mysql.jdbc.Driver");

        //SparkJdbc读取Postgresql的products表内容
        System.out.println("读取test数据库中的user_test表内容");
        // 读取表中所有数据
        DataFrame jdbcDF = sqlContext.read().jdbc(url,table,connectionProperties).select("*");
        //显示数据
        jdbcDF.show();
    }
}

运行结果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cQJdK41p-1572347591249)(http://img.blog.csdn.net/20171107200353583?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvZmVuZ3poaW1vaGFu/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)]

二.写入数据到mysql中

import org.apache.spark.SparkConf;
import org.apache.spark.api.java.JavaRDD;
import org.apache.spark.api.java.JavaSparkContext;
import org.apache.spark.api.java.function.Function;
import org.apache.spark.sql.DataFrame;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.RowFactory;
import org.apache.spark.sql.SQLContext;
import org.apache.spark.sql.types.DataTypes;
import org.apache.spark.sql.types.StructType;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.Properties;

/**
 * Created by Administrator on 2017/11/6.
 */
public class SparkMysql {
    public static org.apache.log4j.Logger logger = org.apache.log4j.Logger.getLogger(SparkMysql.class);

    public static void main(String[] args) {
        JavaSparkContext sparkContext = new JavaSparkContext(new SparkConf().setAppName("SparkMysql").setMaster("local[5]"));
        SQLContext sqlContext = new SQLContext(sparkContext);
        //写入的数据内容
        JavaRDD<String> personData = sparkContext.parallelize(Arrays.asList("1 tom 5","2 jack 6","3 alex 7"));
        //数据库内容
        String url = "jdbc:mysql://localhost:3306/test";
        Properties connectionProperties = new Properties();
        connectionProperties.put("user","root");
        connectionProperties.put("password","123456");
        connectionProperties.put("driver","com.mysql.jdbc.Driver");
        /**
         * 第一步:在RDD的基础上创建类型为Row的RDD
         */
        //将RDD变成以Row为类型的RDD。Row可以简单理解为Table的一行数据
        JavaRDD<Row> personsRDD = personData.map(new Function<String,Row>(){
            public Row call(String line) throws Exception {
                String[] splited = line.split(" ");
                return RowFactory.create(Integer.valueOf(splited[0]),splited[1],Integer.valueOf(splited[2]));
            }
        });

        /**
         * 第二步:动态构造DataFrame的元数据。
         */
        List structFields = new ArrayList();
        structFields.add(DataTypes.createStructField("id",DataTypes.IntegerType,true));
        structFields.add(DataTypes.createStructField("name",DataTypes.StringType,true));
        structFields.add(DataTypes.createStructField("age",DataTypes.IntegerType,true));

        //构建StructType,用于最后DataFrame元数据的描述
        StructType structType = DataTypes.createStructType(structFields);

        /**
         * 第三步:基于已有的元数据以及RDD<Row>来构造DataFrame
         */
        DataFrame personsDF = sqlContext.createDataFrame(personsRDD,structType);

        /**
         * 第四步:将数据写入到person表中
         */
        personsDF.write().mode("append").jdbc(url,"person",connectionProperties);
        
        //停止SparkContext
        sparkContext.stop();
    }
 }

运行结果:
[外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-xy8h8CCz-1572347591250)(http://img.blog.csdn.net/20171107201600768?watermark/2/text/aHR0cDovL2Jsb2cuY3Nkbi5uZXQvZmVuZ3poaW1vaGFu/font/5a6L5L2T/fontsize/400/fill/I0JBQkFCMA==/dissolve/70/gravity/SouthEast)]

代码下载:

转载请标明出处,谢谢!。

如果感觉本文对您有帮助,请留下您的赞,您的支持是我坚持写作最大的动力,谢谢!

没有更多推荐了,返回首页