精华内容
下载资源
问答
  • pyspark案例
    2022-06-16 09:02:10

    一. 进入官网选择对应的版本

    写Spark代码的时候经常会遇到一些问题,然后在网上找的例子不全而且偶尔还会出现一些问题,且Spark发展迅速,各个版本的语法都存在一定的差异。经常找到的demo都是不对的,拷贝执行会遇到意想不到的问题。

    于是转而查看Spark官网,接下来我们简单的看看如何查询官网。

    进入Spark官网:
    https://spark.apache.org/
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-MvxuduE1-1655341289963)(https://upload-images.jianshu.io/upload_images/2638478-23512012694d3d74.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-cZ3Aq5AE-1655341289965)(https://upload-images.jianshu.io/upload_images/2638478-cbbde1ed232cf453.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    二. 快速入门

    1. 选择 Programing Guides->Quick Start
      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bTE0fZKg-1655341289966)(https://upload-images.jianshu.io/upload_images/2638478-6cb920e1ab2c7e7b.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    2. 对应的编程语言我们选择Python
      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-JkzAgFyz-1655341289967)(https://upload-images.jianshu.io/upload_images/2638478-0ee06044bd0ff031.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    可以看到,如果使用Python操作Spark,我们一般是直接运行pyspark命令即可。

    三. Spark SQL, DataFrames 指导

    Python开发Spark的过程中,我们使用最多的就是Spark SQL和DataFrames。

    3.1 入门指南

    1. 选择 Programing Guides->SQL,DataFrames,and Datasets
      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pxVhgrZ4-1655341289967)(https://upload-images.jianshu.io/upload_images/2638478-50d6fe7859237f1f.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    2. 选择左边的 Getting Started
      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-bt2XaEI1-1655341289968)(https://upload-images.jianshu.io/upload_images/2638478-484de6fa3081c80e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    3.1.1 SparkSession

    SparkSession是所有Spark程序的开始
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-m6PDo1z9-1655341289969)(https://upload-images.jianshu.io/upload_images/2638478-30323dea60d49919.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    3.1.2 创建DataFrame

    创建DataFrame非常简单,Spark从支持的数据源读取的数据就是DataFrame类型。
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-x41W7JRM-1655341289970)(https://upload-images.jianshu.io/upload_images/2638478-ee4940685094f3d2.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    3.1.3 运行Spark SQL

    将上一步读取到的数据的DataFrame转换为一个临时视图后,我们就可以用SQL语句愉快的操作Spark程序了
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UaPKhRQy-1655341289971)(https://upload-images.jianshu.io/upload_images/2638478-6ac4407b7d3daec6.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    3.2 Spark支持的数据源

    1. 选择Data Sources
      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-jcKBSaie-1655341289971)(https://upload-images.jianshu.io/upload_images/2638478-863aa849e32b5e01.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    2. Spark支持的数据源
      我们从下图可以看到,Spark支持的数据源包括:

    3. Parquet Files

    4. Orc Files

    5. Json Files

    6. Hive Tables

    7. Jdbc

    8. Avro Files
      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-s7Ca3b83-1655341289971)(https://upload-images.jianshu.io/upload_images/2638478-8b7c0af82dd64b97.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    3.2.1 读写csv文件

    虽然上面没有写出读写CSV,其实Spark是支持读写CSV
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-UQ6nCNTa-1655341289972)(https://upload-images.jianshu.io/upload_images/2638478-f3d463492876be21.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    读csv文件:

    df = spark.read.load("examples/src/main/resources/people.csv",
                         format="csv", sep=":", inferSchema="true", header="true")
    

    写csv文件:
    coalesce(1)表示只写一个文件
    save 表示目标文件夹的位置

    1. hdfs格式: hdfs://hp1:8020/user/juzhen
    2. 本地格式: file:///tmp/
    df3.coalesce(1).write.format("csv").options(header='true', inferschema='true').save("hdfs://hp1:8020/user/juzhen")
    

    3.2.2 读写Hive table

    读写Hive表是我们实际开发过程中经常使用到的。
    一般集群的spark配置默认可以访问hive的元数据,所以spark读写hive是非常的简单。
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-TZL7mknn-1655341289972)(https://upload-images.jianshu.io/upload_images/2638478-5e94853719f1f9ae.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    读取hive表:

    df = spark.sql("SELECT * FROM src")
    

    ** 写入hive表:**

    spark.sql("CREATE TABLE IF NOT EXISTS src (key INT, value STRING) USING hive")
    spark.sql("LOAD DATA LOCAL INPATH 'examples/src/main/resources/kv1.txt' INTO TABLE src")
    

    3.2.3 读写MySQL

    Spark读写MySQL在实际开发过程中,使用的也表多
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-fmLPGGVk-1655341289976)(https://upload-images.jianshu.io/upload_images/2638478-e48c12db3f01e835.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    读MySQL数据:

    df1=spark.read.format("jdbc").options(url="jdbc:mysql://host:port/databasename",
                                           driver="com.mysql.jdbc.Driver",
                                           dbtable="(SELECT * FROM tablename) tmp",
                                           user="root",
                                           password="yourpassword").load()
    

    写MySQL数据

    df1.write.format("jdbc").options(url="jdbc:mysql://host:port/databasename", 
                                     driver="com.mysql.jdbc.Driver", 
                                     dbtable="tablename", 
                                     user="root", 
                                     password="yourpassword").mode('append').save()
    

    3.2.4 Save Modes

    保存操作可以选择使用SaveMode,该SaveMode指定如何处理现有数据(如果存在)。重要的是要认识到,这些保存模式不使用任何锁定,而且不是原子的。另外,当执行Overwrite时,数据将在写入新数据之前被删除。
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rTQitWSS-1655341289977)(https://upload-images.jianshu.io/upload_images/2638478-6d5a70965e306046.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    四. 性能调优

    https://spark.apache.org/docs/2.4.0/sql-performance-tuning.html
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-so1B1S45-1655341289977)(https://upload-images.jianshu.io/upload_images/2638478-85cd07b4dd0bb09e.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    4.1 在内存中缓存数据

    Spark SQL可以通过调用Spark.catalogs.cachetable (“tableName”)或dataFrame.cache()来使用内存中的柱状格式缓存表。然后,Spark SQL将只扫描所需的列,并自动调优压缩,以最小化内存使用和GC压力。你可以调用spark.catalog.uncacheTable(“tableName”)来从内存中删除这个表。

    内存缓存的配置可以通过SparkSession上的setConf方法或使用SQL运行SET key=value命令来完成。
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-YHsaE8N3-1655341289978)(https://upload-images.jianshu.io/upload_images/2638478-2b0a4aa1dbb9b2be.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    4.2 调优参数

    以下选项还可用于调优查询执行的性能。这些选项有可能在未来的版本中被弃用,因为会自动执行更多的优化。
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-rrqbYU6y-1655341289978)(https://upload-images.jianshu.io/upload_images/2638478-61bcf04c8c6a14b9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    4.3 BROADCAST Hint for SQL Queries

    BROADCAST提示指导Spark在将指定的表与其他表或视图连接时广播指定的表。当Spark决定连接方法时,广播哈希连接(即BHJ)是首选的,即使统计值高于配置Spark.sql.autobroadcastjointhreshold。当连接的两端都指定时,Spark会广播统计信息较低的一端。注意Spark不保证总是选择BHJ,因为不是所有情况(例如完全外连接)都支持BHJ。当选择广播嵌套循环连接时,我们仍然尊重提示。
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-HaAcV1Zu-1655341289979)(https://upload-images.jianshu.io/upload_images/2638478-958350099a6e2323.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    五. 分布式的SQL引擎

    https://spark.apache.org/docs/2.4.0/sql-distributed-sql-engine.html

    Spark SQL还可以使用它的JDBC/ODBC或命令行接口充当分布式查询引擎。在这种模式下,终端用户或应用程序可以直接与Spark SQL交互,运行SQL查询,而不需要编写任何代码。

    5.1 运行Thrift JDBC/ODBC服务器

    这里实现的Thrift JDBC/ODBC服务器对应于Hive 1.2.1中的HiveServer2。您可以使用Spark或Hive 1.2.1附带的beeline脚本来测试JDBC服务器。

    在Spark目录下执行如下命令启动JDBC/ODBC服务器。

    ./sbin/start-thriftserver.sh
    

    这个脚本接受所有bin/spark-submit命令行选项,外加一个——hiveconf选项来指定Hive属性。您可以运行./sbin/start-thriftserver.sh——help获取所有可用选项的完整列表。默认情况下,服务器监听localhost:10000。你可以通过环境变量重写这个行为,例如:

    export HIVE_SERVER2_THRIFT_PORT=<listening-port>
    export HIVE_SERVER2_THRIFT_BIND_HOST=<listening-host>
    ./sbin/start-thriftserver.sh \
      --master <master-uri> \
      ...
    

    或系统配置:

    ./sbin/start-thriftserver.sh \
      --hiveconf hive.server2.thrift.port=<listening-port> \
      --hiveconf hive.server2.thrift.bind.host=<listening-host> \
      --master <master-uri>
      ...
    

    现在你可以使用beeline来测试Thrift JDBC/ODBC服务器:

    ./bin/beeline
    

    直接连接JDBC/ODBC服务器:

    beeline> !connect jdbc:hive2://localhost:10000
    

    Beeline会要求你输入用户名和密码。在非安全模式下,只需在计算机上输入用户名和空白密码。为了安全模式,请遵循beeline文件中给出的说明。

    Hive的配置是通过把你的Hive -site.xml, core-site.xml和hdfs-site.xml文件放在conf/中完成的。

    你也可以使用Hive自带的直线脚本。

    Thrift JDBC服务器还支持通过HTTP传输发送Thrift RPC消息。在系统属性或conf/的hive-site.xml文件中使用如下设置启用HTTP模式:

    hive.server2.transport.mode - Set this to value: http
    hive.server2.thrift.http.port - HTTP port number to listen on; default is 10001
    hive.server2.http.endpoint - HTTP endpoint; default is cliservice
    

    为了测试,使用beeline以http模式连接JDBC/ODBC服务器:

    beeline> !connect jdbc:hive2://<host>:<port>/<database>?hive.server2.transport.mode=http;hive.server2.thrift.http.path=<http_endpoint>
    

    5.2 运行Spark SQL命令行

    Spark SQL CLI是一个方便的本地运行Hive metastore服务的工具,用于执行命令行输入的查询。注意Spark SQL CLI不能与Thrift JDBC服务器通信。

    在Spark目录下执行如下命令启动Spark SQL命令行:

    ./bin/spark-sql
    

    Hive的配置是通过把你的Hive -site.xml, core-site.xml和hdfs-site.xml文件放在conf/中完成的。您可以运行./bin/spark-sql——help获取所有可用选项的完整列表。

    更多相关内容
  • pyspark案例

    千次阅读 2018-09-25 11:33:08
    pyspark本地环境配置教程配置成功后,可以通过spark dataframe笔记练习pyspark的用法,不过最好是通过spark官网练习语法使用。下面写个小案例,供自己以后查阅: #!/usr/bin/python # -*- coding: utf-8 -*- "&...

    pyspark本地环境配置教程配置成功后,可以通过spark dataframe笔记练习pyspark的用法,不过最好是通过spark官网练习语法使用。下面写个小案例,供自己以后查阅:

    #!/usr/bin/python
    # -*- coding: utf-8 -*-
    """
    @author:
    @contact:
    @time:
    """
    from __future__ import print_function
    from pyspark.sql import SparkSession
    import os, time
    
    if __name__ == "__main__":
       # 设置spark_home环境变量,路径不能有中文、空格
       os.environ['SPARK_HOME'] = "E:/data_page/spark-2.0.2-bin-hadoop2.7"
       # 运行在本地(local),2个线程,一行写不完换行时用“\”
       spark = SparkSession.builder\
          .appName("test")\
          .master("local[2]")\
          .getOrCreate()
       # 如果想看函数源码,可以通过ctrl+点击函数的形式跳转到函数详情界面
       datas = ["hi I love you", "hello", "ni hao"]
       sc = spark.sparkContext
       rdd = sc.parallelize(datas)
       # 查看数据类型 type()
       print(type(datas))
       print(type(rdd))
       #获取总数,第一条数据
       print(rdd.count())
       print(rdd.first())
       # 每个spark运行会有一个监控界面(WEB UI4040),为了监控,让线程休眠一段时间,然后打开localhost:4040页面
       time.sleep(100)
       spark.stop()
    

    打印的结果如下:

    <type 'list'>
    <class 'pyspark.rdd.RDD'>
    3
    hi I love you
    

    localhost:4040界面如下:
    在这里插入图片描述

    展开全文
  • /usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark.sql.functions import collect_list from pyspark.sql.functions import concat, concat_ws, lit import os # ...

    一. 需求

    今天接单了一个学生的课程左右辅导,辅导内容是通过Spark实现好友推荐。

    文件格式:
    image.png

    二. 解决方案

    我之前的博客例子里面有好友推荐相关的帖子

    2.1 代码实现

    python脚本

    #!/usr/bin/env python
    # -*- coding: utf-8 -*-
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import collect_list
    from pyspark.sql.functions import concat, concat_ws, lit
    import os
    
    # 创建一个连接
    spark = SparkSession. \
            Builder(). \
            appName('sql'). \
            master('local'). \
            getOrCreate()
    
    df = spark.read.format("csv"). \
         option("header", "false"). \
         option("delimiter"," "). \
         load("file:///home/software/20220126/pre.txt")
    
    
    
    df1 = df.withColumnRenamed('_c0','id1').withColumnRenamed('_c1','id2_1')
    df2 = df.withColumnRenamed('_c0','id1').withColumnRenamed('_c1','id2_2')
    df = df1.drop('id2_1').distinct()
    
    # 首先通过表连接  t1.id1 = t2.id1 and t1.id2 != t2.id2  构造一个需要的多行数据
    df3 = df1.join(df2,df1.id1 == df2.id1 ,'inner').select(df1.id1, df1.id2_1, df2.id2_2)
    df4 = df3.select("id1", "id2_1", "id2_2").where(" id2_1 != id2_2 ")
    
    # 其次剔除掉id1这个多余的列,id2_1可以有id2_2 这么多个间接好友,因为可能存在重复,进行去重操作
    df5 = df4.drop('id1')
    df6 = df5.distinct()
    
    # 因为上述的结果集是id2_1的间接好友集,但是可能也会含有id2_1的直接好友,需要剔除
    df7 = df6.select("id2_1", "id2_2").subtract(df1.select("id1", "id2_1"))
    
    # df8 = df7.groupby('id2_1').agg(collect_list(df7["id2_2"]).alias("id2_2_new"))
    
    df8 = df7.groupby('id2_1').agg(
    concat_ws(
                " | ", 
                collect_list(
                    concat(lit("("), concat_ws(", ", 'id2_2'), lit(")"))
                )).alias("id2_2_new"))
    
    df9 = df8.join(df, df.id1 == df8.id2_1,'inner').select(df8.id2_1, df8.id2_2_new)
    df9.show()
    
    
    #保留第一行,以逗号作为分隔符,#overwrite 清空后再写入
    file1=r"file:///home/software/20220126/output"
    df10 = df9.coalesce(numPartitions= 1) 
    df10.write.csv(file1)
    
    # 关闭spark会话
    spark.stop()
    

    运行截图:
    image.png

    2.2 一些问题

    2.2.1 读取txt文件

    txt文件当成csv,然后没有表头,默认的分隔符是 空格
    df = spark.read.format(“csv”).
    option(“header”, “false”).
    option(“delimiter”," ").
    load(“file:///home/software/20220126/pre.txt”)

    2.2.2 collect_list使用注意

    df8 = df7.groupby(‘id2_1’).agg(collect_list(df7[“id2_2”]).alias(“id2_2_new”))

    df8 = df7.groupby(‘id2_1’).agg(
    concat_ws(
    " | “,
    collect_list(
    concat(lit(”(“), concat_ws(”, “, ‘id2_2’), lit(”)"))
    )).alias(“id2_2_new”))

    注释的df8输出会报错:
    pyspark.sql.utils.AnalysisException: u’CSV data source does not support array data type.;’
    image.png

    2.2.3 getPythonAuthSocketTimeout does not exist in the JVM

    运行代码报错:
    py4j.protocol.Py4JError: org.apache.spark.api.python.PythonUtils.getPythonAuthSocketTimeout does not exist in the JVM

    网上搜索了一下,spark安装也没问题,python安装也没有问题,只是python找不到spark,此时需要安装findspark包

    pip install findspark
    

    然后在程序中添加一以下代码

    import findspark
    findspark.init()
    

    参考:

    1. https://www.cnblogs.com/widgetbox/p/13383567.html
    展开全文
  • from pyspark.ml.evaluation import RegressionEvaluator from pyspark.ml.recommendation import ALS from pyspark.sql import Row lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd ...

    一. 需求

    近期朋友问我spark的推荐算法相关的。

    二. 解决方案

    因为之前没有接触过推荐算法相关,所以我在spark的官网上找了下,结果找到一个非常nice的案例。
    https://spark.apache.org/docs/2.4.7/ml-collaborative-filtering.html

    2.1 Spark官网demo

    2.1.1 协同过滤

    协同过滤通常用于推荐系统。这些技术旨在填补用户-项目关联矩阵中缺失的条目。Spark.ML 目前支持基于模型的协同过滤,其中用户和产品由一组潜在因素描述,可用于预测缺失条目 Spark.ML 使用交替最小二乘(ALS)算法来学习这些潜在因子。spark中的实现。Ml有以下参数:

    1. numBlocks是将用户和条目划分为并行计算的块的数量(默认为10)。

    2. Rank是模型中潜在因子的数量(默认为10)。

    3. maxIter是要运行的最大迭代次数(默认为10)。

    4. regParam指定ALS中的正则化参数(默认为1.0)。

    5. implicitPrefs指定是使用显式反馈ALS变体还是适合隐式反馈数据的变体(默认为false,这意味着使用显式反馈)。

    6. alpha是一个适用于ALS隐式反馈变体的参数,它控制着偏好观察的基线置信度(默认为1.0)。

    7. Nonnegative指定是否对最小二乘使用非负约束(默认为false)。

    注意:
    ALS的基于数据框架的API目前只支持用户id和条目id的整数。用户和项目id列也支持其他数字类型,但是id必须在整数值范围内。

    2.1.2 显性和隐性反馈

    基于矩阵分解的协同过滤的标准方法将用户-物品矩阵中的条目视为用户给物品的显式偏好,例如,用户给电影打分。

    在许多现实世界的用例中,只有隐式反馈(如浏览、点击、购买、喜欢、分享等)是很常见的。spark中使用的方法。用于处理此类数据的ml取自隐式反馈数据集协同过滤。从本质上来说,这种方法并不是直接模拟评级矩阵,而是将数据视为代表用户行为观察强度的数字(如点击次数,或某人观看电影的累计时长)。然后,这些数字与观察到的用户偏好的信心水平相关,而不是明确的给商品评级。然后,该模型试图找到潜在的因素,可以用来预测用户对某项商品的预期偏好。

    2.1.3 正则化参数的缩放

    在解决每个最小二乘问题时,我们将正则化参数regParam缩放为用户在更新用户因子中生成的评级数量,或产品在更新产品因子中收到的评级数量。这种方法被命名为“ALS-WR”,并在《Netflix奖的大规模并行协同过滤》一文中进行了讨论。它减少了regParam对数据集规模的依赖,因此我们可以将从采样子集学到的最佳参数应用到整个数据集,并期望类似的性能。

    2.1.4 本身的策略

    当使用ALSModel进行预测时,通常会遇到测试数据集中在训练模型期间不存在的用户和/或项。这通常发生在两种情况下:

    1. 在生产中,对于新用户或没有评级历史且模型没有经过训练的项目(这就是“冷启动问题”)。

    2. 在交叉验证期间,数据在训练集和评估集之间分割。在Spark的CrossValidator或TrainValidationSplit中使用简单的随机分割时,经常会遇到评估集中的用户和/或项目不在训练集中的情况

    默认情况下,Spark在ALSModel期间分配NaN预测。当模型中不存在用户和/或项因素时进行转换。这在生产系统中可能很有用,因为它指示了一个新用户或项,因此系统可以决定使用某些回退作为预测。

    然而,这在交叉验证期间是不希望的,因为任何NaN预测值都会导致评估度量的NaN结果(例如使用RegressionEvaluator时)。这使得模型选择变得不可能。

    Spark允许用户将coldStartStrategy参数设置为“drop”,以便删除DataFrame中任何包含NaN值的预测行。评估指标将在非nan数据上计算,并且是有效的。下面的示例说明了该参数的用法。

    注意:
    目前支持的冷启动策略是" nan “(上面提到的默认行为)和” drop "。未来可能会支持进一步的策略。

    2.1.5 Python代码

    在下面的示例中,我们从MovieLens数据集中加载评级数据,每行包含一个用户、一个电影、一个评级和一个时间戳。然后我们训练一个ALS模型,该模型默认情况下假设评分是显式的(implicitPrefs为False)。我们通过测量评级预测的均方根误差来评估推荐模型。

    from pyspark.ml.evaluation import RegressionEvaluator
    from pyspark.ml.recommendation import ALS
    from pyspark.sql import Row
    
    lines = spark.read.text("data/mllib/als/sample_movielens_ratings.txt").rdd
    parts = lines.map(lambda row: row.value.split("::"))
    ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]), movieId=int(p[1]),
                                         rating=float(p[2]), timestamp=long(p[3])))
    ratings = spark.createDataFrame(ratingsRDD)
    (training, test) = ratings.randomSplit([0.8, 0.2])
    
    # Build the recommendation model using ALS on the training data
    # Note we set cold start strategy to 'drop' to ensure we don't get NaN evaluation metrics
    als = ALS(maxIter=5, regParam=0.01, userCol="userId", itemCol="movieId", ratingCol="rating",
              coldStartStrategy="drop")
    model = als.fit(training)
    
    # Evaluate the model by computing the RMSE on the test data
    predictions = model.transform(test)
    evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                    predictionCol="prediction")
    rmse = evaluator.evaluate(predictions)
    print("Root-mean-square error = " + str(rmse))
    
    # Generate top 10 movie recommendations for each user
    userRecs = model.recommendForAllUsers(10)
    # Generate top 10 user recommendations for each movie
    movieRecs = model.recommendForAllItems(10)
    
    # Generate top 10 movie recommendations for a specified set of users
    users = ratings.select(als.getUserCol()).distinct().limit(3)
    userSubsetRecs = model.recommendForUserSubset(users, 10)
    # Generate top 10 user recommendations for a specified set of movies
    movies = ratings.select(als.getItemCol()).distinct().limit(3)
    movieSubSetRecs = model.recommendForItemSubset(movies, 10)
    

    2.2 ALS算法简要解释

    ALS是交替最小二乘(alternating least squares)的简称。在机器学习的范畴中,ALS特指使用交替最小二乘求解的一个协同推荐算法。它通过观察到的所有用户给产品的打分,来推断每个用户的喜好并向用户推荐适合的产品。不过ALS无法准确评估新加入的用户或商品。这个问题也被称为Cold Start问题。

    2.2.1 举例

    ALS推荐算法是基于矩形分解的一种方法。先看看矩阵分解的含义。

    我们拿电影推荐作为例子。推荐所使用的数据可以抽象成一个[m,n]的矩阵R,R的每一行代表m个用户对所有电影的评分,n列代表每部电影对应的得分。R是个稀疏矩阵,一个用户只是对所有电影中的一小部分看过,有评分。通过矩阵分解方法,我可以把这个低秩的矩阵,分解成两个小矩阵的点乘。公式如下:
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-EU9FiUIR-1655341069689)(https://upload-images.jianshu.io/upload_images/2638478-744fb280b908bea5.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    矩阵R(评分为1、2、3、4、5):
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-GbPIw7y1-1655341069690)(https://upload-images.jianshu.io/upload_images/2638478-fb797e9caef03bb9.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    我把矩阵分解之后,就变成了下面两个小矩阵(F是隐藏特征):
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-m8colMDe-1655341069691)(https://upload-images.jianshu.io/upload_images/2638478-a55965488d03cbb7.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    用户的特征
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-qyH2Jtw0-1655341069691)(https://upload-images.jianshu.io/upload_images/2638478-93fcd0568c13b0fd.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    电影的特征:

    1. 分解之后的矩阵,变成了根据特征数决定维数的向量。这种求出的向量作为用户的特征,用在推荐上,被成为隐藏特征或者隐藏因子。

    2. 为什么进行矩阵分解呢?因为推荐使用的矩阵不仅是稀疏的而且往往是低秩的。矩阵分解相当于进行了特征提取或者数据的降维。

    3. 为了求出两个分解后的矩阵,我可以产生两个维度一样的随机矩阵U和V,点乘之后得到同样m行n列的矩阵R1. 这一步我已经得到两个[m,n]的矩阵,其中一个是反映用户的真实喜好的数据,矩阵R。另一份只是一个近似数据,矩阵R1。我可以找到一个公式来衡量,两个同阶的矩阵的相似程度:
      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-pbxD0fYA-1655341069692)(https://upload-images.jianshu.io/upload_images/2638478-f70ebb9d32100ba1.png?imageMogr2/auto-orient/strip%7CimageView2/2/w/1240)]

    这是一个损失函数,我的目的就是让这个函数的值最小化,使得我构造的矩阵能够最接近原始矩阵。

    2.2.2 ALS算法参数

    关键代码(Java):

    MatrixFactorizationModel model = ALS.train(ratings, rank, iterations,lambda);
    

    解释:

    1. ratings:训练集,数据格式:(用户id 物品id 评分 )

    2. rank:矩阵分解时对应的低维的维数,即特征向量维数或者说特征数。如果这个值太小拟合的就会不够,误差就很大;如果这个值很大,就会导致模型大泛化能力较差。这个值会影响矩阵分解的性能,越大则算法运行的时间和占用的内存可能会越多。通常需要进行调参,一般可以取10-200之间的数。

    3. iterations:在矩阵分解用交替最小二乘法求解时,进行迭代的最大次数。这个值取决于评分矩阵的维度,以及评分矩阵的系数程度。一般来说,不需要太大,比如5-20次即可。

    4. lambda:正则因子。lambda也是和rank一样的,如果设置很大就可以防止过拟合问题,如果设置很小,其实可以理解为直接设置为0,那么就不会有防止过拟合的功能了;怎么设置呢?可以从0.0001 ,0.0003,0.001,0.003,0.01,0.03,0.1,0.3,1,3,10这样每次大概3倍的设置,先大概看下哪个值效果比较好,然后在那个比较好的值(比如说0.01)前后再设置一个范围,比如(0.003,0.3)之间,间隔设置小点,即0.003,0.005,0.007,0.009,0.011等等等。

    调优:
    需要引入均方根误差(RMSE):均方根误差是用来衡量观测值同真值之间的偏差。

    用途:
    预测评分和推荐物品或者用户等。

    1) 预测用户对物品的评分

    predict(int user, int product)
    

    2)预测用户集对物品集的评分

    predict(JavaPairRDD<Integer, Integer> usersProducts)
    

    3)推荐用户k个物品

    recommendProducts(final int user, int num)
    

    4)对物品推荐k个用户

    recommendUsers(final int product, int num)
    

    5) 对所有用户推荐物品,物品数量取前k个

    recommendProductsForUsers(int num)
    

    6) 对所有物品推荐用户,用户数量取前k个

    recommendUsersForProducts(int num)
    

    参考:

    1. https://spark.apache.org/docs/2.4.7/ml-collaborative-filtering.html
    2. https://www.jianshu.com/p/cfcbf08900dd
    展开全文
  • Pyspark 案例实践 假新闻分类

    千次阅读 2022-01-17 18:32:50
    基于Pyspark的分类器训练(ML库,使用sql.DataFrame),处理文本数据。
  • /usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark.sql import SQLContext # 创建一个连接 spark = SparkSession. \ Builder(). \ appName('local'). \ master('...
  • /usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark.sql.functions import concat_ws spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example...
  • /usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark.sql.functions import concat_ws spark = SparkSession \ .builder \ .appName("Python Spark SQL basic example...
  • 需求 我们使用pyspark的Dataframe的时候,经常会遇到求 差集、交集 、并集。 虽然这个需求可以通过Spark SQL来实现,但是如果列比较多,使用Spark SQL实现也是比较麻烦的。 二. 解决方案 2.1 数据准备 代码: from ...
  • /usr/bin/env python # -*- coding: utf-8 -*- from pyspark.sql import SparkSession from pyspark.sql import SQLContext # 创建一个连接 spark = SparkSession. \ Builder(). \ appName('local'). \ master('...
  • 文章目录一.环境介绍二.运行spark-sql的几种方式2.1 spark-shell的方式2.2 beeline的方式2.3 spark-sql的方式参考: 一.环境介绍 我本地的环境的CDH 6.3.1的环境,自己已经将spark软件安装成功了。...
  • 今天写pyspark遇到一个问题,要实现同mysql的GROUP_CONCAT函数的功能 数据1: col1 col2 1 a 1 b 1 c 2 d 2 f 想要的结果1: col1 new_col2 1 a,b,c 2 d,f 如果存在多列是否也可行 数据2: col1 col2 col3 1 a 100 1 ...
  • ❤️❤️❤️❤️❤️❤️❤️❤️❤️❤...使用spring+springmvc+mysql+bootstarp实现的学生就业案例~(教学需要大佬勿喷) 3、环境要求 anaconda python3.6 jdk1.8 spark-2.4.7-bin-hadoop2.7 scala-2.13....
  • spark 连接mysql报错
  • 今天写pyspark脚本的时候遇到一个问题,需要类似于关系型数据库group by再聚合的操作,尝试通过rdd来写,发现不好实现。 于是想到了使用DataFrame,通过类sql的group by直接进行实现。 二.解决方案 将rdd直接转为...
  • pyspark示例

    2022-08-01 22:11:34
    pyspark示例。

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 1,554
精华内容 621
关键字:

pyspark案例