精华内容
下载资源
问答
  • 使用pyspark做机器学习,实例化模型对象时,需要指定输入featuresCol的名称。其中,featuresCol是由数据的X构成的“单列”,aka 'vector'。 否则会报错: Traceback (most recent call last): File "<stdin&...

    使用pyspark做机器学习,实例化模型对象时,需要指定输入featuresCol的名称。其中,featuresCol是由数据的X构成的“单列”,aka 'vector'。

    否则会报错:

    Traceback (most recent call last):
      File "<stdin>", line 1, in <module>
      File "/data/spark/spark-2.4.4/python/pyspark/ml/base.py", line 132, in fit
        return self._fit(dataset)
      File "/data/spark/spark-2.4.4/python/pyspark/ml/wrapper.py", line 295, in _fit
        java_model = self._fit_java(dataset)
      File "/data/spark/spark-2.4.4/python/pyspark/ml/wrapper.py", line 292, in _fit_java
        return self._java_obj.fit(dataset._jdf)
      File "/data/spark/spark-2.4.4/python/lib/py4j-0.10.7-src.zip/py4j/java_gateway.py", line 1257, in __call__
      File "/data/spark/spark-2.4.4/python/pyspark/sql/utils.py", line 79, in deco
        raise IllegalArgumentException(s.split(': ', 1)[1], stackTrace)
    pyspark.sql.utils.IllegalArgumentException: 'Field "features" does not exist.

    Stack Overflow上,desertnaut :

    Spark dataframes are not used like that in Spark ML; all your features need to be vectors in a single column, usually named features. Here is how you can do it using the 5 rows you have provided as an example:

    spark.version
    # u'2.2.0'
    
    from pyspark.sql import Row
    from pyspark.ml.linalg import Vectors
    
    # your sample data:
    temp_df = spark.createDataFrame([Row(V4366=0.0, V4460=0.232, V4916=-0.017, V1495=-0.104, V1639=0.005, V1967=-0.008, V3049=0.177, V3746=-0.675, V3869=-3.451, V524=0.004, V5409=0), Row(V4366=0.0, V4460=0.111, V4916=-0.003, V1495=-0.137, V1639=0.001, V1967=-0.01, V3049=0.01, V3746=-0.867, V3869=-2.759, V524=0.0, V5409=0), Row(V4366=0.0, V4460=-0.391, V4916=-0.003, V1495=-0.155, V1639=-0.006, V1967=-0.019, V3049=-0.706, V3746=0.166, V3869=0.189, V524=0.001, V5409=0), Row(V4366=0.0, V4460=0.098, V4916=-0.012, V1495=-0.108, V1639=0.005, V1967=-0.002, V3049=0.033, V3746=-0.787, V3869=-0.926, V524=0.002, V5409=0), Row(V4366=0.0, V4460=0.026, V4916=-0.004, V1495=-0.139, V1639=0.003, V1967=-0.006, V3049=-0.045, V3746=-0.208, V3869=-0.782, V524=0.001, V5409=0)])
    
    trainingData=temp_df.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])
    trainingData.show()
    # +--------------------+-----+ 
    # |            features|label|
    # +--------------------+-----+
    # |[-0.104,0.005,-0....|    0| 
    # |[-0.137,0.001,-0....|    0|
    # |[-0.155,-0.006,-0...|    0|
    # |[-0.108,0.005,-0....|    0|
    # |[-0.139,0.003,-0....|    0|
    # +--------------------+-----+

     

    也就是说,需要把全部输入特征转化为一个‘vector’,使用的方法可以是

    from pyspark.ml.linalg import Vectors
    trainingData=temp_df.rdd.map(lambda x:(Vectors.dense(x[0:-1]), x[-1])).toDF(["features", "label"])

     

    Stack Overflow 上,  100+award回答

    Personally I would go with Python UDF and wouldn't bother with anything else:

    展开全文
  • 现在,您需要将C盘/用户/.ivy2/jars中出现的所有jar都复制到spark的jars目录中: 第二次启动pysparkpyspark --packages graphframes:graphframes:0.5.0-spark2.1-s_2.11 --jars graphframes-0.5.0-spark2.1-s_...

    如标题所言,创建GraphFrame,会出现
    Py4JJavaError: An error occurred while calling o138.loadClass.
    : java.lang.ClassNotFoundException: org.graphframes.GraphFramePythonAPI
    这个问题
    在这里插入图片描述
    在国内搜了半天没见有用的解决方案,最后在stackoverflow上找到了

    首先在cmd上启动pyspark
    这里有一个小度量,第一次使用参数启动pyspark,以便它下载所有graphframe的jar依赖项,很多教程启动的时候并没有指定依赖包,这可能会发生错误: (根据你的spark版本去graphframe官网找到对应的下载命令)

    pyspark --packages graphframes:graphframes:0.5.0-spark2.1-s_2.11 --jars graphframes-0.5.0-spark2.1-s_2.11.jar
    

    这应该出现:
    在这里插入图片描述
    意味着它已经下载了所需的所有依赖项。此处重要的是将Ivy Default Cache设置为:C盘/用户/.ivy2/cache,恰好是存储在C盘/用户/.ivy2/jars中的jar。

    现在,需要将C盘/用户/.ivy2/jars中出现的所有jar都复制到spark的jars目录中:
    在这里插入图片描述
    第二次启动pyspark:

    pyspark --packages graphframes:graphframes:0.5.0-spark2.1-s_2.11 --jars graphframes-0.5.0-spark2.1-s_2.11.jar
    

    这应该出现:
    在这里插入图片描述
    现在可以享受GraphFrame了:

    WDNMD,大家都是第一次玩GraphFrame,凭什么我就要承受这么多
    在这里插入图片描述

    展开全文
  • 文章大纲pyspark 实例scala 实例参考文献 pyspark 实例 import pyspark from pyspark.ml.feature import CountVectorizer from pyspark.context import SparkContext from pyspark.sql.session import SparkSession...


    pyspark 实例

    import pyspark
    from pyspark.ml.feature import CountVectorizer
    from pyspark.context import SparkContext
    from pyspark.sql.session import SparkSession
    from pyspark import SparkConf
    展开全文
  • 通过实例学习 PySpark

    2020-06-03 23:27:49
    通过实例学习 PySpark 原始数据获取 start_time = [ ['user1', '2020-05-13 10:46:43'], ['user2', '2020-05-22 08:26:42'], ['user3', '2020-05-17 02:42:31'], ['user4', '2020-05-23 18:25:23'], ['user5', ...

    通过实例学习 PySpark

    最近学习了一下 PySpark, 目标是在工作中能将其用上. 在实践过程中发现, 通过一个个具体的问题来进行学习, 很多内容掌握起来更为容易. 因此后面如果写相关的文章, 也会采用实例的方式来介绍.

    下面要解决的问题是:

    假设用户购买商品, 其点击时间记录在点击表中, 其下单时间记录在下单表中, 另外还有一张表记录用户的特征. 现在的目标是, 获取每个用户从点击时间到下单时间的时间间隔, 并和特征进行拼接. 比如用户 A:

    用户    点击时间     下单时间      时间间隔    特征
    user1  2020-05-13  2020-05-14   24*3600s  sex:male age:28
    

    了解问题的目标之后, 首先是获取原始数据.

    原始数据获取

    ## data.py
    click_time = [
        ['user1', '2020-05-13 10:46:43'],
        ['user2', '2020-05-22 08:26:42'],
        ['user3', '2020-05-17 02:42:31'],
        ['user4', '2020-05-23 18:25:23'],
        ['user5', '2020-05-19 13:29:05'],
        ['user6', '2020-05-16 19:48:23'],
        ['user7', '2020-05-20 16:56:13'],
    ]
    
    order_time = [
        ['user3', '2020-05-18 10:46:43'],
        ['user1', '2020-05-22 08:26:42'],
        ['user5', '2020-05-27 02:42:31'],
        ['user7', '2020-05-23 18:25:23'],
        ['user4', '2020-05-29 13:29:05'],
        ['user2', '2020-05-26 19:48:23'],
        ['user6', '2020-05-20 16:56:13'],
    ]
    
    features = [
        ['age:26', 'weight:70', 'sex:male', 'id:user2'],
        ['weight:50', 'age:22', 'sex:female', 'id:user1'],
        ['weight:70', 'sex:male'],
        ['age:16', 'weight:63', 'sex:male', 'id:user7'],
        ['age:22', 'sex:male'],
        ['age:33', 'weight:72', 'sex:female', 'id:user5'],
        ['weight:63', 'age:46', 'sex:female', 'id:user4'],
        ['age:45', 'weight:73'],
    ]
    
    with open('click_time.txt', 'w') as f:
        content = '\n'.join(['\t'.join(item) for item in click_time])
        f.write('{}\n'.format(content))
    
    with open('order_time.txt', 'w') as f:
        content = '\n'.join(['\t'.join(item) for item in order_time])
        f.write('{}\n'.format(content))
    
    with open('features.txt', 'w') as f:
        content = '\n'.join(['\t'.join(item) for item in features])
        f.write('{}\n'.format(content))
    

    SparkSession

    下一步, 创建 SparkSession.

    from pyspark.sql import SparkSession, Row
    
    spark = SparkSession.builder \
            .appName('test') \
            .master('local') \
            .enableHiveSupport() \
            .getOrCreate()
    

    加载数据

    数据文件生成后, 现在使用 PySpark 读入文件生成 DataFrame. 目前我发现有三种读入文件的方法.

    加载点击时间表

    1. 使用 HiveQL 语句读入文件:
    click_table = 'click_table'
    click_file = 'click_time.txt'
    spark.sql("""
        create table if not exists `{}` (
                id STRING,
                click_time STRING
        )
        using hive options (fileFormat 'textfile', fieldDelim '\t')
    """.format(click_table))
    spark.sql("""
        load data local inpath '{click_file}' into table `{click_table}`
    """.format(click_file=click_file, click_table=click_table))
    click_df = spark.sql("select * from `{}`".format(click_table))
    

    其中 using hive options (fileFormat 'textfile', fieldDelim '\t') 也可以用 row format delimited fields terminated by '\t' 替换.

    1. 使用 spark.read 读入文件:
    click_file = 'click_time.txt'
    df = spark.read.text(click_file).toDF('info')
    
    1. 使用 sc.textFile 读入文件:
    click_file = 'click_time.txt'
    sc = spark.sparkContext
    df = sc.textFile(click_file).map(lambda x: Row(info=x)).toDF()
    

    需要注意的是, 使用第 2 以及第 3 种方法, 为了获得 id 以及 click_time 两个 field, 还需要额外的处理:

    def time_split(row):
        id, time = row.split('\t')
        return (id, time)
    
    click_df = spark.createDataFrame(
        df.rdd.map(lambda x: time_split(x[0])),
        ['id', 'click_time']
    )
    

    注意 dfDataFrame 对象, 使用 .rdd 转换为 RDD 对象, 之后使用 .map 方法处理 RDD 中的每个 Row 对象, 在 How to get a value from the Row object in Spark Dataframe?
    中谈到, Row 继承于 namedtuple, 因此代码中的 x[0] (通过索引访问) 含义是取出 Row 中的值, 当然, 可以使用 x.info (通过 field 访问) 获取 Row 中的值.

    生成 click_df 后, 可以显示部分数据看看是否符合预期:

    click_df.limit(3).show()
    ## 或者
    click_df.show(3)
    

    效果如下:

    注意: 后面为了数据处理的一致性, 我一律采用第二种方法来读入数据.

    加载下单时间表

    基本和加载点击时间表逻辑相同.

    order_file = 'order_time.txt'
    df = spark.read.text(order_file).toDF('info')
    order_df = spark.createDataFrame(
        df.rdd.map(lambda x: time_split(x[0])),
        ['id', 'order_time']
    )
    

    加载 user 特征表

    注意对特征表的过滤, 因为有些记录存在内容缺失, 比如找不到 id 或者 sex.

    def feature_split(row):
    	row = row.split('\t')
    	feature_dict = {item.split(':')[0]: item.split(':')[1] for item in row}
    	return feature_dict
    
    df = spark.read.text('features.txt').toDF('info')
    feature_df = spark.createDataFrame(
    	df.rdd.map(lambda x: feature_split(x[0])),
    	['id', 'sex', 'age', 'weight']
    )
    feature_df = feature_df.filter((feature_df.id.isNotNull()) & \
    							   (feature_df.sex.isNotNull())
                                   )
    

    schema

    注意在加载 user 特征表时, 对于 sex, age, weight 之类的特征, 没有指定它们的类型, 可能默认就是字符串类型了. 为了显式指定对应的类型, 需要自定义 schema, 参考: pyspark: ValueError: Some of types cannot be determined after inferring. (我原来遇到过一个错误: ValueError: Some of types cannot be determined by the first 100 rows, please try again with sampling 也可以通过自定义 Schema 解决).

    代码如下:

    StructField 中的第三个参数含义为: Boolean nullable, 即是否可以被设置为 null. (具体参见: spark.sql.types.StructField)

    之所以下面的 schema 中全部设置为 StringType, 是因为我在 feature_split 函数中将结果均表示成字符串的形式, 比如 age 中的结果不是 int 而是字符串. 如果希望设置 ageIntegerType, 那么应该修改 feature_split 中的代码, 将 age 对应的结果用 int() 方法做转换.

    from pyspark.sql.types import (StructType, 
                                   StructField,
                                   StringType,
                                   IntegerType, 
                                   DoubleType,
                                  )
    
    df = spark.read.text('features.txt').toDF('info')
    schema = StructType([StructField("id", StringType(), True), 
                         StructField("sex", StringType(), True),
                         StructField("age", StringType(), True),
                         StructField("weight", StringType(), True),
                    ])
    feature_df = spark.createDataFrame(
        df.rdd.map(lambda x: feature_split(x[0])),
        schema=schema
    )
    feature_df = feature_df.filter((feature_df.id.isNotNull()) & \
                                   (feature_df.sex.isNotNull()) & \
                                   (feature_df.age.isNotNull()) & \
                                   (feature_df.weight.isNotNull())
                                   )
    

    获取下单和点击的时间间隔

    为了获取下单和点击的时间间隔, 需要将点击时间表和下单时间表进行 Left Outer Join, 以获取每个 user 对应的点击时间以及下单时间. 然而, 由于 user 可能只进行点击而未下单, 因此要对结果过滤.

    join_df = click_df.join(order_df, click_df.id == order_df.id, how='left') \
                      .select(click_df.id, click_df.click_time, order_df.order_time) \
                      .filter(order_df.order_time.isNotNull())
    

    上面代码获取了每个 user 对应的点击时间以及下单时间, 为了获得时间间隔, 需要额外的函数进行处理:

    from datetime import datetime
    
    def convert2datetime(s, format='%Y-%m-%d %H:%M:%S'):
        return datetime.strptime(s, format)
    
    def convert2str(s, format='%Y-%m-%d %H:%M:%S'):
        return datetime.strftime(s, format)
    
    def convert(row):
        id, click_time, order_time = row
        click_time, order_time = list(map(convert2datetime, [click_time, order_time]))
        diff = (order_time - click_time).total_seconds()
        return (id, diff)
    
    join_df = spark.createDataFrame(join_df.rdd.map(convert), ['id', 'diff'])
    

    Join 特征表

    最后只需要 Join 特征表就能达到我们最终的目的:

    final_df = join_df.join(feature_df, join_df.id == feature_df.id, how='inner') \
                      .select(join_df.id, feature_df.sex, feature_df.age, feature_df.weight)
    

    可以考虑使用 .show() 输出结果看看是否符合预期. 此外, 如果想将结果保存在目录中, 可以使用如下方式完成:

    def create(row):
        row = map(str, row)
        line = '\t'.join(row)
        return line
    
    output_dir = 'output'
    final_df.rdd.map(create).repartition(2).saveAsTextFile(output_dir)
    

    另外注意, 如果 output_dir 已经存在, 需要提前删除, 否则程序会报错.

    观察 output 的文件:

    可以发现结果保存在两个分区中, 比如 part-00001 中保存着:

    终曲 & 尾声

    不要忘记

    spark.stop()
    

    完整代码

    以上完整代码如下, 运行起来, 去感受 Spark 的强大 😂😂😂

    (发表完博客后补充: 文章在草稿中保存了几天, 发出来后发现, 结果好像跟一开始设置的目标不太一样啊 🤣🤣🤣 忘了把时间间隔加到结果中了, 不过这无伤大雅~ 果然写博客还是得一气呵成! )

    from datetime import datetime
    from pyspark.sql import SparkSession, Row
    from pyspark.sql.types import (StructType, 
                                   StructField,
                                   StringType,
                                   IntegerType, 
                                   DoubleType,
                                  )
    
    def time_split(row):
        id, time = row.split('\t')
        return (id, time)
    
    def feature_split(row):
        row = row.split('\t')
        feature_dict = {item.split(':')[0]: item.split(':')[1] for item in row}
        return feature_dict
    
    def convert2datetime(s, format='%Y-%m-%d %H:%M:%S'):
        return datetime.strptime(s, format)
    
    def convert2str(s, format='%Y-%m-%d %H:%M:%S'):
        return datetime.strftime(s, format)
    
    def convert(row):
        id, click_time, order_time = row
        click_time, order_time = list(map(convert2datetime, [click_time, order_time]))
        diff = (order_time - click_time).total_seconds()
        return (id, diff)
    
    def create(row):
        row = map(str, row)
        line = '\t'.join(row)
        return line
    
    spark = SparkSession.builder \
            .appName('test') \
            .master('local') \
            .enableHiveSupport() \
            .getOrCreate()
    
    click_file = 'click_time.txt'
    df = spark.read.text(click_file).toDF('info')
    click_df = spark.createDataFrame(
        df.rdd.map(lambda x: time_split(x[0])),
        ['id', 'click_time']
    )
    
    click_df.show(3)
    
    order_file = 'order_time.txt'
    df = spark.read.text(order_file).toDF('info')
    order_df = spark.createDataFrame(
        df.rdd.map(lambda x: time_split(x[0])),
        ['id', 'order_time']
    )
    
    df = spark.read.text('features.txt').toDF('info')
    schema = StructType([StructField("id", StringType(), True), 
                         StructField("sex", StringType(), True),
                         StructField("age", StringType(), True),
                         StructField("weight", StringType(), True),
                    ])
    feature_df = spark.createDataFrame(
        df.rdd.map(lambda x: feature_split(x[0])),
        schema=schema
    )
    feature_df = feature_df.filter((feature_df.id.isNotNull()) & \
                                   (feature_df.sex.isNotNull()) & \
                                   (feature_df.age.isNotNull()) & \
                                   (feature_df.weight.isNotNull())
                                   )
    
    join_df = click_df.join(order_df, click_df.id == order_df.id, how='left') \
                      .select(click_df.id, click_df.click_time, order_df.order_time) \
                      .filter(order_df.order_time.isNotNull())
    
    join_df = spark.createDataFrame(join_df.rdd.map(convert), ['id', 'diff'])
    
    feature_df.show(3)
    
    final_df = join_df.join(feature_df, join_df.id == feature_df.id, how='inner') \
                      .select(join_df.id, feature_df.sex, feature_df.age, feature_df.weight)
    
    output_dir = 'output'
    final_df.rdd.map(create).repartition(2).saveAsTextFile(output_dir)
    
    spark.stop()
    
    展开全文
  • pyspark读取hive数据实例

    千次阅读 2020-01-07 17:48:01
    使用pyspark读取hive中的数据,测试代码: vi test.py #!-*- coding:utf-8 -*- from pyspark import SparkConf, SparkContext from pyspark.sql import HiveContext conf = (SparkConf().setMaster("yarn")....
  • 对于PyCharm,需要作如下设置: 1、安装pyspark,它会自动安装py4j 2、在edit configuration中,add content root,选择spark下载包的python/pyspark/lib下的... pyspark程序实例 Python from py...
  • 主要介绍了Pyspark获取并处理RDD数据代码实例,文中通过示例代码介绍的非常详细,对大家的学习或者工作具有一定的参考学习价值,需要的朋友可以参考下
  • 基于pyspark图计算的算法实例

    千次阅读 2020-03-18 15:04:58
    基于pyspark的图计算实例引入广度优先搜索连通分量强连通分量标签传播PageRank最短路径算法三角形计数 引入 图算法指利用特制的线条算图求得答案的一种简便算法。无向图、有向图和网络能运用很多常用的图算法,这些...
  • PySpark详细安装教程+实例

    千次阅读 2020-03-30 09:21:11
    0.10.4-src.zip:$PYTHONPATH export PYSPARK_PYTHON=python3 三、启动Spark cd /usr/local/hadoop ./sbin/start-dfs.sh cd /usr/local/spark ./bin/pyspark #进入pyspark交互环境,单机模式 四、常见报错 No module ...
  • pyspark OneHotEncoder用法实例

    千次阅读 2019-10-10 21:29:43
    def encode_columns(df, col_list): indexers = [ StringIndexer(inputCol=c, outputCol=f'{c}_indexed').setHandleInvalid("keep") for c in col_list ] encoder = OneHotEncoderEstimator( ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 2,428
精华内容 971
关键字:

pyspark实例