2019-03-28 13:21:10 gucapg 阅读数 434
  • [老汤]Spark 2.x实战应用系列一之怎样学习Spark

    系统的讲解了我们为什么需要去认识spark、spark有什么内容以及我们该怎么去学习spark。在学习spark过程中遵循的几个原则。内容如下: 1 大数据是什么 2 需要什么知识(除了scala,java和python都行) 3 spark可以做什么 4 spark整体结构图 5 spark中有什么 6 怎么学spark 7 学习spark的四个原则 8 学习spark需要的环境

    3116 人正在学习 去看看 老汤

idea + spark在win10系统本地调试中,出现以下错误:

Exception in thread "main" org.apache.spark.SparkException: Unable to create database default as failed to create its directory file:/D:/My%2520Works/dev/pro/git/dhe/spark-warehouse
	at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.liftedTree1$1(InMemoryCatalog.scala:115)
	at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.doCreateDatabase(InMemoryCatalog.scala:109)
	at org.apache.spark.sql.catalyst.catalog.ExternalCatalog.createDatabase(ExternalCatalog.scala:69)
	at org.apache.spark.sql.internal.SharedState.externalCatalog$lzycompute(SharedState.scala:117)
	at org.apache.spark.sql.internal.SharedState.externalCatalog(SharedState.scala:102)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.catalog$lzycompute(BaseSessionStateBuilder.scala:133)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.catalog(BaseSessionStateBuilder.scala:131)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anon$1.<init>(BaseSessionStateBuilder.scala:157)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder.analyzer(BaseSessionStateBuilder.scala:157)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
	at org.apache.spark.sql.internal.BaseSessionStateBuilder$$anonfun$build$2.apply(BaseSessionStateBuilder.scala:293)
	at org.apache.spark.sql.internal.SessionState.analyzer$lzycompute(SessionState.scala:79)
	at org.apache.spark.sql.internal.SessionState.analyzer(SessionState.scala:79)
	at org.apache.spark.sql.execution.QueryExecution.analyzed$lzycompute(QueryExecution.scala:57)
	at org.apache.spark.sql.execution.QueryExecution.analyzed(QueryExecution.scala:55)
	at org.apache.spark.sql.execution.QueryExecution.assertAnalyzed(QueryExecution.scala:47)
	at org.apache.spark.sql.Dataset$.ofRows(Dataset.scala:74)
	at org.apache.spark.sql.SparkSession.baseRelationToDataFrame(SparkSession.scala:432)
	at org.apache.spark.sql.DataFrameReader.loadV1Source(DataFrameReader.scala:233)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:227)
	at org.apache.spark.sql.DataFrameReader.load(DataFrameReader.scala:164)
	at org.apache.spark.sql.DataFrameReader.jdbc(DataFrameReader.scala:254)
	at TestRule$.main(TestRule.scala:25)
	at TestRule.main(TestRule.scala)
Caused by: ExitCodeException exitCode=-1073741515: 
	at org.apache.hadoop.util.Shell.runCommand(Shell.java:1008)
	at org.apache.hadoop.util.Shell.run(Shell.java:901)
	at org.apache.hadoop.util.Shell$ShellCommandExecutor.execute(Shell.java:1213)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1307)
	at org.apache.hadoop.util.Shell.execCommand(Shell.java:1289)
	at org.apache.hadoop.fs.RawLocalFileSystem.setPermission(RawLocalFileSystem.java:840)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkOneDirWithMode(RawLocalFileSystem.java:522)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:562)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:534)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:561)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:534)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:561)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:534)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:561)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:534)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:561)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:534)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirsWithOptionalPermission(RawLocalFileSystem.java:561)
	at org.apache.hadoop.fs.RawLocalFileSystem.mkdirs(RawLocalFileSystem.java:534)
	at org.apache.hadoop.fs.ChecksumFileSystem.mkdirs(ChecksumFileSystem.java:705)
	at org.apache.spark.sql.catalyst.catalog.InMemoryCatalog.liftedTree1$1(InMemoryCatalog.scala:112)
	... 23 more
19/03/28 10:22:17 INFO SparkContext: Invoking stop() from shutdown hook
19/03/28 10:22:17 INFO SparkUI: Stopped Spark web UI at http://DESKTOP-UGQ3VF9:4040
19/03/28 10:22:17 INFO MapOutputTrackerMasterEndpoint: MapOutputTrackerMasterEndpoint stopped!
19/03/28 10:22:17 INFO MemoryStore: MemoryStore cleared
19/03/28 10:22:17 INFO BlockManager: BlockManager stopped
19/03/28 10:22:17 INFO BlockManagerMaster: BlockManagerMaster stopped
19/03/28 10:22:17 INFO OutputCommitCoordinator$OutputCommitCoordinatorEndpoint: OutputCommitCoordinator stopped!
19/03/28 10:22:17 INFO SparkContext: Successfully stopped SparkContext
19/03/28 10:22:17 INFO ShutdownHookManager: Shutdown hook called
19/03/28 10:22:17 INFO ShutdownHookManager: Deleting directory C:\Users\gu\AppData\Local\Temp\spark-606e2938-e589-4e1e-a106-a163f6b205dc

代码是spark读取mysql数据源:

def main(args: Array[String]): Unit = {
 
   val session =  SparkSession.builder().master("local").appName("TestMysql").getOrCreate()



    val url = "jdbc:mysql://10.12.52.133:3306/test"
    val table = "vps_key_account_user_tkl_test"
    val properties = new Properties()
    properties.setProperty("user","bdpdoop")
    properties.setProperty("password","q1w2e3r4ys")
    //需要传入Mysql的URL、表明、properties(连接数据库的用户名密码)
    val df = session.read.jdbc(url,table,properties)
    df.createOrReplaceTempView(table)
    session.sql(s"select * from ${table}").show()

  }

解决方案:根据错误提示,是在D盘下,没有D:\My%20Works\dev\pro\git\dhe\spark-warehouse目录,可以手动创建相应目录,也可以多执行几次,一直到创建spark-warehoure目录为止。

2019-05-02 22:46:50 yolohohohoho 阅读数 171
  • [老汤]Spark 2.x实战应用系列一之怎样学习Spark

    系统的讲解了我们为什么需要去认识spark、spark有什么内容以及我们该怎么去学习spark。在学习spark过程中遵循的几个原则。内容如下: 1 大数据是什么 2 需要什么知识(除了scala,java和python都行) 3 spark可以做什么 4 spark整体结构图 5 spark中有什么 6 怎么学spark 7 学习spark的四个原则 8 学习spark需要的环境

    3116 人正在学习 去看看 老汤

org.apache.spark.SparkConf

Spark应用程序的配置。 用于将各种Spark参数设置为键值对。
大多数情况下,你可以使用new SparkConf()创建一个SparkConf对象,该对象将会从Java系统属性中载入所有spark.*的配置。
在这种情况下,你直接在SparkConf对象上所设置的参数优先于系统属性。
我们可以通过以下代码来访问它的配置:

scala> sc.getConf.getAll
res0: Array[(String, String)] = Array((spark.driver.host,lestats-mbp), (spark.driver.port,59836), 
(spark.repl.class.uri,spark://lestats-mbp:59836/classes), 
(spark.sql.catalogImplementation,in-memory), (spark.executor.id,driver), 
(spark.app.name,Spark shell), 
(spark.app.id,local-1556807997948), (spark.repl.class.outputDir,/private/var/folders/d6/_ffg20cj5tdg72nnx9n8pgc00000gn/T/spark-d6e58c7e-63f0-420c-b122-5f49521c3efb/repl-1afaa45c-f5f2-4edb-ba61-1aa4cdce4c28), (spark.jars,""), 
(spark.master,local[*]), 
(spark.submit.pyFiles,""), 
(spark.submit.deployMode,client), 
(spark.home,/Users/lestat/Documents/GitHub/spark), (spark.ui.showConsoleProgress,true))

注意:
将SparkConf对象传递给Spark后,它将被克隆,用户无法再对其进行修改。 Spark不支持在运行时修改配置。如果需要,则需要停止该sparkContext,用新的sparkConf创建sparkContext。

org.apache.spark.sql.RuntimeConfig

Spark的运行时配置(RuntimeConfig)的接口。 在这里设置的选项会在I/O期间自动更新到Hadoop配置。
我们可以使用SparkSession.conf来访问它,例如:

scala> spark.conf.getAll
res1: Map[String,String] = Map(spark.driver.host -> lestats-mbp, spark.driver.port -> 59676, 
spark.repl.class.uri -> spark://lestats-mbp:59676/classes, spark.jars -> "", 
spark.repl.class.outputDir -> /private/var/folders/d6/_ffg20cj5tdg72nnx9n8pgc00000gn/T/spark-22e2f2d6-db54-4269-9617-2ab5daabe511/repl-48b8ee6c-ed9b-4c38-8e69-8acc7959e1ab, 
spark.app.name -> Spark shell, 
spark.submit.pyFiles -> "", spark.ui.showConsoleProgress -> true, 
spark.executor.id -> driver, 
spark.submit.deployMode -> client, 
spark.master -> local[*], 
spark.home -> /Users/lestat/Documents/GitHub/spark,
spark.sql.catalogImplementation -> in-memory, spark.app.id -> local-1556807258642)
2017-02-25 22:13:18 zeroder 阅读数 5531
  • [老汤]Spark 2.x实战应用系列一之怎样学习Spark

    系统的讲解了我们为什么需要去认识spark、spark有什么内容以及我们该怎么去学习spark。在学习spark过程中遵循的几个原则。内容如下: 1 大数据是什么 2 需要什么知识(除了scala,java和python都行) 3 spark可以做什么 4 spark整体结构图 5 spark中有什么 6 怎么学spark 7 学习spark的四个原则 8 学习spark需要的环境

    3116 人正在学习 去看看 老汤

spark.eventLog.dir是记录Spark事件的基本目录,如果spark.eventLog.enabled为true。 在此基本目录中,Spark为每个应用程序创建一个子目录,并在此目录中记录特定于应用程序的事件。 用户可能希望将其设置为统一位置,如HDFS目录,以便历史记录服务器可以读取历史记录文件。

spark.history.fs.logDirectory用于为历史记录程序提供文件系统,包含要加载的应用程序事件日志的目录URL。 这可以是本地文件路径file://路径,HDFS路径hdfs://namenode:port /shared/spark-logs或Hadoop API支持的备用文件系统的路径。

spark.eventLog.dir用于生成日志,spark.history.fs.logDirectory是Spark History Server发现日志事件的位置。

2019-10-20 21:30:06 liuge36 阅读数 208
  • [老汤]Spark 2.x实战应用系列一之怎样学习Spark

    系统的讲解了我们为什么需要去认识spark、spark有什么内容以及我们该怎么去学习spark。在学习spark过程中遵循的几个原则。内容如下: 1 大数据是什么 2 需要什么知识(除了scala,java和python都行) 3 spark可以做什么 4 spark整体结构图 5 spark中有什么 6 怎么学spark 7 学习spark的四个原则 8 学习spark需要的环境

    3116 人正在学习 去看看 老汤

第四部分-推荐系统-模型训练

  • 本模块基于第3节 数据加工得到的训练集和测试集数据 做模型训练,最后得到一系列的模型,进而做 预测。
  • 训练多个模型,取其中最好,即取RMSE(均方根误差)值最小的模型

说明几点

1.ALS 算法不需要自己实现,Spark MLlib 已经实现好了,可以自己 跟源码学习
花时间钻研,动手写,写代码 翻译论文 写博客 多下功夫
2. 最新http://spark.apache.org/docs/latest/ml-guide.html
3. spark1.6.3
spark.mllib contains the original API built on top of RDDs.
spark.ml provides higher-level API built on top of DataFrames for constructing ML pipelines.

==》 我们采用spark.mllib ,也就是基于RDD之上来构建

学习:http://spark.apache.org/docs/1.6.3/mllib-collaborative-filtering.html#collaborative-filtering

import org.apache.spark.mllib.recommendation.ALS
import org.apache.spark.mllib.recommendation.MatrixFactorizationModel
import org.apache.spark.mllib.recommendation.Rating

// Load and parse the data
val data = sc.textFile("data/mllib/als/test.data")
val ratings = data.map(_.split(',') match { case Array(user, item, rate) =>
  Rating(user.toInt, item.toInt, rate.toDouble)
})

// Build the recommendation model using ALS
val rank = 10
val numIterations = 10
val model = ALS.train(ratings, rank, numIterations, 0.01)

// Evaluate the model on rating data
val usersProducts = ratings.map { case Rating(user, product, rate) =>
  (user, product)
}
val predictions =
  model.predict(usersProducts).map { case Rating(user, product, rate) =>
    ((user, product), rate)
  }
val ratesAndPreds = ratings.map { case Rating(user, product, rate) =>
  ((user, product), rate)
}.join(predictions)
val MSE = ratesAndPreds.map { case ((user, product), (r1, r2)) =>
  val err = (r1 - r2)
  err * err
}.mean()
println("Mean Squared Error = " + MSE)

// Save and load model
model.save(sc, "target/tmp/myCollaborativeFilter")
val sameModel = MatrixFactorizationModel.load(sc, "target/tmp/myCollaborativeFilter")

看官方是怎么写代码的,参照着写

开始项目Coding

步骤一: 继续在前面的项目中,新建ml包,再新建ModelTraining

package com.csylh.recommend.ml

import org.apache.spark.mllib.recommendation.{ALS, Rating}
import org.apache.spark.rdd.RDD
import org.apache.spark.sql.SparkSession

/**
  * Description:
  * 训练多个模型,取其中最好,即取RMSE(均方根误差)值最小的模型
  *
  * @Author: 留歌36
  * @Date: 2019-07-17 16:56
  */
object ModelTraining {
  def main(args: Array[String]): Unit = {
    // 面向SparkSession编程
    val spark = SparkSession.builder()
      .enableHiveSupport() //开启访问Hive数据, 要将hive-site.xml等文件放入Spark的conf路径
      .getOrCreate()

    val sc = spark.sparkContext

    //  在生产环境中一定要注意设置spark.sql.shuffle.partitions,默认是200,及需要配置分区的数量
    val shuffleMinPartitions = "8"
    spark.sqlContext.setConf("spark.sql.shuffle.partitions",shuffleMinPartitions)

    // 训练集,总数据集的70%
    val trainingData = spark.sql("select * from trainingData")
    // 测试集,总数据集的30%
    val testData = spark.sql("select * from testData")


    //--------------------------
    // 训练集,转为Rating格式
    val ratingRDD = trainingData.rdd.map(x => Rating(x.getInt(0), x.getInt(1), x.getDouble(2)))
    // 用于计算模型的RMSE      Rating(userid, movieid, rating)   ==>转为tuple  (userid, movieid)
    val training2 :RDD[(Int,Int)]  = ratingRDD.map{ case Rating(userid, movieid, rating) => (userid, movieid)}

    // 测试集,转为Rating格式
    val testRDD = testData.rdd.map(x => Rating(x.getInt(0), x.getInt(1), x.getDouble(2)))
    val test2 :RDD[((Int,Int),Double)]= testRDD.map {case Rating(userid, movieid, rating) => ((userid, movieid), rating)}
    //--------------------------


    // 特征向量的个数
    val rank = 1
    // 正则因子
    // val lambda = List(0.001, 0.005, 0.01, 0.015)
    val lambda = List(0.001, 0.005, 0.01)
    // 迭代次数
    val iteration = List(10, 15, 18)
    var bestRMSE = Double.MaxValue

    var bestIteration = 0
    var bestLambda = 0.0

    // persist可以根据情况设置其缓存级别
    ratingRDD.persist() // 持久化放入内存,迭代中使用到的RDD都可以持久化
    training2.persist()

    test2.persist()

    for (l <- lambda; i <- iteration) {
      // 循环收敛这个模型
      //lambda 用于表示过拟合的这样一个参数,值越大,越不容易过拟合,但精确度就低
      val model = ALS.train(ratingRDD, rank, i, l)

      //---------这里是预测-----------------
      val predict = model.predict(training2).map {
        // 根据 (userid, movieid) 预测出相对应的rating
        case Rating(userid, movieid, rating) => ((userid, movieid), rating)
      }
      //-------这里是实际的predictAndFact-------------------

      // 根据(userid, movieid)为key,将提供的rating与预测的rating进行比较
      val predictAndFact = predict.join(test2)

      // 计算RMSE(均方根误差)
      val MSE = predictAndFact.map {
        case ((user, product), (r1, r2)) =>
          val err = r1 - r2
          err * err
      }.mean()  // 求平均

      val RMSE = math.sqrt(MSE) // 求平方根

      // RMSE越小,代表模型越精确
      if (RMSE < bestRMSE) {
        // 将模型存储下来
        model.save(sc, s"/tmp/BestModel/$RMSE")
        bestRMSE = RMSE
        bestIteration = i
        bestLambda = l
      }

      println(s"Best model is located in /tmp/BestModel/$RMSE")
      println(s"Best RMSE is $bestRMSE")
      println(s"Best Iteration is $bestIteration")
      println(s"Best Lambda is $bestLambda")
    }
  }
}


步骤二:将创建的项目进行打包上传到服务器
mvn clean package -Dmaven.test.skip=true

步骤三:编写shell 执行脚本

[root@hadoop001 ml]# vim model.sh 
export HADOOP_CONF_DIR=/root/app/hadoop-2.6.0-cdh5.7.0/etc/hadoop

$SPARK_HOME/bin/spark-submit \
--class com.csylh.recommend.ml.ModelTraining \
--master spark://hadoop001:7077 \
--name ModelTraining \
--driver-memory 10g \
--executor-memory 5g \
/root/data/ml/movie-recommend-1.0.jar

步骤四:执行 sh model.sh 即可

sh model.sh之前:

[root@hadoop001 ~]# hadoop fs -ls /tmp
19/10/20 20:53:59 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 10 items
drwx------   - root supergroup          0 2019-04-01 16:27 /tmp/hadoop-yarn
drwx-wx-wx   - root supergroup          0 2019-04-02 09:33 /tmp/hive
drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /tmp/links
drwxr-xr-x   - root supergroup          0 2019-10-20 19:42 /tmp/movies
drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /tmp/ratings
drwxr-xr-x   - root supergroup          0 2019-10-20 19:43 /tmp/tags
drwxr-xr-x   - root supergroup          0 2019-10-20 20:19 /tmp/testData
drwxr-xr-x   - root supergroup          0 2019-10-20 20:19 /tmp/trainingData
drwxr-xr-x   - root supergroup          0 2019-10-20 20:18 /tmp/trainingDataAsc
drwxr-xr-x   - root supergroup          0 2019-10-20 20:19 /tmp/trainingDataDesc
[root@hadoop001 ~]#

sh model.sh之后:
这里运行很长时间,而且很有可能出现OOM。耐心等待~~
在这里插入图片描述
这些点都是要关注的,再就是shuffle 很重要
在这里插入图片描述
等待中。。。
在这里插入图片描述

[root@hadoop001 ~]# hadoop fs -ls /tmp/BestModel
19/10/20 21:26:36 WARN util.NativeCodeLoader: Unable to load native-hadoop library for your platform... using builtin-java classes where applicable
Found 2 items
drwxr-xr-x   - root supergroup          0 2019-10-20 21:00 /tmp/BestModel/0.8521581387523667
drwxr-xr-x   - root supergroup          0 2019-10-20 20:56 /tmp/BestModel/0.853805599360297
[root@hadoop001 ~]#

这里得到model /tmp/BestModel/0.8521581387523667 ,感觉不是很好。资源要是多一点的话,可以把迭代次数调大一点,估计模型可以更好。这里为了演示整个流程,模型差点就差点吧。思路搞懂就好。

有任何问题,欢迎留言一起交流~~
更多文章:基于Spark的电影推荐系统:https://blog.csdn.net/liuge36/column/info/29285

2019-10-20 18:55:07 liuge36 阅读数 774
  • [老汤]Spark 2.x实战应用系列一之怎样学习Spark

    系统的讲解了我们为什么需要去认识spark、spark有什么内容以及我们该怎么去学习spark。在学习spark过程中遵循的几个原则。内容如下: 1 大数据是什么 2 需要什么知识(除了scala,java和python都行) 3 spark可以做什么 4 spark整体结构图 5 spark中有什么 6 怎么学spark 7 学习spark的四个原则 8 学习spark需要的环境

    3116 人正在学习 去看看 老汤

第四部分-推荐系统-项目介绍

行业背景:

快速:Apache Spark以内存计算为核心
通用 :一站式解决各个问题,ADHOC SQL查询,流计算,数据挖掘,图计算
完整的生态圈
只要掌握Spark,就能够为大多数的企业的大数据应用场景提供明显的加速

“猜你喜欢”为代表的推荐系统,从吃穿住行等

项目背景介绍:

本项目是一个基于Apache Spark 的电影推荐系统,
技术路线:离线推荐+实时推荐

项目架构:

在这里插入图片描述

  • 存储层:HDFS作为底层存储,Hive作为数据仓库 (Hive Metastore:Hive管理数据的schema)

  • 离线数据处理:SparkSQL (做数据查询引擎<===> 数据ETL)

  • 实时数据处理:Kafka + Spark Streaming

  • 数据应用层:MLlib 产生一个模型 als算法

  • 数据展示和对接:Zeppelin

    选用考量:
    HDFS不管是在存储的性能,稳定性 吞吐量 都是在主流文件系统中很占有优势的
    如果感觉HDFS存储还是比较慢,可以采用SSD硬盘等方案

      数据处理层组件:
      Hive 在数据量不是很大或对实时性没有那么高要求的时候,可以选用作为计算引擎
      
      消息队列一般还是Kafka,消费者端也可以使用Flink,Storm等...
      同时,SparkStreaming的优势就是: 已经有与各个组件比较好的集成	
      这里写一个KafkaProducer作业实时将数据 放到Kafka 中 
      
      应用层:MLlib :Spark 对数据挖掘机器学习库的封装 ,ALS是其中一个算法 	
      http://spark.apache.org/docs/1.6.3/mllib-guide.html
      http://spark.apache.org/docs/latest/ml-guide.html
      TensorFlow 偏向于深度学习
      
      Zeppelin:包含各个图标表展示,而且组件集成性更多。作业调度略差
      HUE 数据展示+作业调度  
      	
      系统采用standaone模式,更加简单。
      只有SPARK 环境,就使用standalone 脱机运行模式
      Hadoop +Spark 就推荐:Spark On Yarn
      Spark On Docker : 任务封装为一个个的Docker,不依赖于你的物理机环境,每个Docker 的资源可以更好的分配
    

主要模块:

  • 存储模块:搭建和配置HDFS分布式存储系统,并Hbase和MySQL作为备用方案

  • ETL模块:加载原始数据,清洗,加工,为模型训练模块 和 推荐模块 准备所需的各种数据。

  • 模型训练模块:负责产生模型,以及寻找最佳的模型

  • 推荐模块:包含离线推荐和实时推荐,离线推荐负责把推荐结果存储到存储系统中
    实时推荐负责产生实时的消息队列,并且消费实时消息产生推荐结果,最后存储在存储模块中

  • 数据展示模块:负责展示项目中所用的数据

  • 数据流向:
    在这里插入图片描述

系统开发的重难点:

数据仓库的准备 :Spark + Hive 数据ETL  ,Zeppelin +Hive 数据展示 
数据处理:
实时数据处理 : 1.数据实时性,完整性 、一致性 ,
			    2.保证应用不会崩溃掉,or 崩掉之后及时启动起来 并 数据一致性处理

拓展:

1.数据仓库怎么理解?两种东西,其一是IBM微软数据产品为代表的,其二是Hadoop+Hive
Apache Hive™数据仓库软件有助于使用SQL读取,写入和管理驻留在分布式存储中的大型数据集。
可以将结构投影到已经存储的数据上。
提供了命令行工具和JDBC驱动程序以将用户连接到Hive。

2.数据源准备:
Data Source:Movielens Open Data
http://files.grouplens.org/datasets/movielens
http://files.grouplens.org/datasets/movielens/ml-latest.zip

[root@hadoop001 ml-latest]# pwd
/root/data/ml/ml-latest
[root@hadoop001 ml-latest]# ll -h
总用量 1.9G
-rw-r--r--. 1 root root 1.3M 10月 17 13:41 links.txt
-rw-r--r--. 1 root root 2.8M 10月 17 16:06 movies.txt
-rw-r--r--. 1 root root 725M 10月 17 16:07 ratings.txt
-rw-r--r--. 1 root root  38M 10月 17 16:08 tags.txt
[root@hadoop001 ml-latest]# 

接下来就是开始Coding…

有任何问题,欢迎留言一起交流~~
更多文章:基于Spark的电影推荐系统:https://blog.csdn.net/liuge36/column/info/29285

推荐系统(spark)

阅读数 974

Spark推荐系统实现

阅读数 758

没有更多推荐了,返回首页