精华内容
下载资源
问答
  • SparkProjects:针对不同项目的Markdown代码,涵盖了Pyspark项目的各个方面
  • Pyspark3模板 概括 该项目使用请求作为依赖项,基于Poetry创建了一个结构,并将应用程序与诗歌的构建一起打包,而依赖项与pex打包在一起。 要求 Python^ 3.9 诗歌^ 1.1(安装诗歌指南: : ) Apache Spark> = 3.1...
  • 在windows环境用pycharm开发pyspark首先我们得知道什么是pyspark。首先Apache Spark用Scala编程语言编写。为了支持带有Spark的Python,Apache Spark社区发布了一个工具PySpark。使用PySpark,你就可以使用Python编....

        最近家里新买了台电脑,环境什么的又得重新安装,干脆沉淀下一篇博客记录下整个过程,省得再去搜索。

        在windows环境用pycharm开发pyspark首先我们得知道什么是pyspark。首先Apache Spark用Scala编程语言编写。为了支持带有Spark的Python,Apache Spark社区发布了一个工具PySpark。使用PySpark,你就可以使用Python编程语言来处理RDD。正是由于有了一个名为Py4j的库,他们才能够实现这一目标。

    步骤一:配置环境

              1、第一因为spark需要在jvm中运行,所以jdk是必须安装得,我配置得java版本是1.8.0_144;

              2、第二安装spark版本2.44下载链接,下载后直接解压到D盘根目录下;

              3、第三安装python,我安装得是版本3.7.2

    步骤二:pycharm创建项目并添加

              1、打开pycharm-(左上角)file---settings--Project Interpreter--选择你的python环境

                   

              2、.打开解压后的spark文件---python---lib:D:\spark\spark-2.4.4-bin-hadoop2.7\python\lib   你会看到有两个压缩文件

     

              

    将其 粘贴 复制 解压到项目D:\code\pycharm_python\myFirstPySpark\venv\Lib\site-packages中如下并将其中文件包改成py4j

    然后就可以看到这两个包

    也可以通过在py文件尝试输入import pyspark 若不报错则证明操作成功,也可以在cmd敲命令直接pip3 install pyspark==版本号安装。

     

    下面就是项目代码

    ···python

    from pyspark import SparkContext
    from py4j import *
    import os
    os.environ['SPARK_HOME'] = 'D:\spark\spark-2.4.4-bin-hadoop2.7'
    sc =  SparkContext("local", "count app")
    words = sc.parallelize (
       ["scala",
       "java",
       "hadoop",
       "spark",
       "akka",
       "spark vs hadoop",
       "pyspark",
       "pyspark and spark"]
    )
    counts = words.count()
    print ("Number of elements in RDD -> %i" % (counts))

    ···

    直接运行就可以了。

    展开全文
  • pyspark 项目训练 配置spark环境 import json import datetime import pandas as pd from pyspark.sql import SparkSession import pyspark.sql.functions as F from pyspark.sql.functions import col from ...

    pyspark 项目训练

    配置spark环境

    import json
    import datetime
    import pandas as pd
    from pyspark.sql import SparkSession
    import pyspark.sql.functions as F
    from pyspark.sql.functions import col
    from pyspark.sql.types import StructType,StructField, StringType, IntegerType
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.ml.classification import RandomForestClassifier
    
    from datetime import datetime
    import matplotlib.pyplot as plt
    import os
    import numpy as np
    os.environ['PYSPARK_PYTHON']='/usr/local/anaconda3/bin/python3.6'
    import datetime
    import seaborn as sns
    import matplotlib.pyplot as plt
    from pyspark.sql.window import *
    from pyspark.sql import Window
    from pyspark.ml.feature import StringIndexer
    from pyspark.ml.feature import OneHotEncoder
    from pyspark.mllib.util import MLUtils
    from pyspark.mllib.evaluation import MulticlassMetrics
    spark = (SparkSession
     .builder
     .appName("gps-test-DockerLinuxContainer")
     .enableHiveSupport()
     .config("spark.executor.instances", "300")
     .config("spark.executor.memory","16g")
     .config("spark.executor.cores","4")
     .config("spark.driver.memory","8g")
     .config("spark.driver.maxResultSize", "8g")
     .config("spark.sql.shuffle.partitions","1200")
     .config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class","DockerLinuxContainer")
     .config("spark.executorEnv.yarn.nodemanager.container-executor.class","DockerLinuxContainer")
     .getOrCreate()) 

    数据预处理

    ####################################################### 函数封装
    # 读取数据集合
    def getDataFromHive(table):
        sql = "select * from "+table
        print(sql)
        df = spark.sql(sql)
        print('The number of training data', df.count())
        #df.printSchema()
        return df 
    
            
    # 查看数据中类的占比
    def getClassRatio(df, label):
        counts_level = df.groupBy(label).count().rdd.collectAsMap()
        count_total = df.count()
        for level, cnt in counts_level.items():
            print(level, cnt, round(cnt/count_total,2))
    
            
    # 数据采样使各类均衡
    def resampleClass(df, label, sample_ratio1, sample_ratio2, sample_ratio3):
        # level = 1 的数据按照1/3进行降采样
        df_level_1 = df.where(label+'=1').sample(True, sample_ratio1, seed = 2018)
        # level = 2 保留所有样本
        df_level_2 = df.where(label+'=2').sample(True, sample_ratio2, seed = 2018)
        # level = 3 复制5倍
        df_level_3 = df.where(label+'=3').sample(True, sample_ratio3, seed = 2018)
        df = df_level_1.union(df_level_2).union(df_level_3)
        getClassRatio(df, label)
        return df
        
            
    # 除去one-hot字段和pin,其余字段全部转为double类型
    def convertToDouble(df, no_double_cols):
        double_cols = list(set(df.columns).difference(set(no_double_cols)))    
        df = df.select(no_double_cols + [col(column).cast("double").alias(column) for column in double_cols])
        return df 
    
    
    # 字段缺失值填充
    def fillNone(df, cate_cols, num_cols, onehot_cols):
        # 对于转为double的类别型特征,用 -1
        df = df.fillna(-1.0, subset=cate_cols)
        # 对于数值型特征,用 0
        df = df.fillna(0.0, subset=num_cols)
        # 对于需要onehot的特征,用 '-1'
        df = df.fillna('-1', subset=onehot_cols)
        for col in df.columns:
            filter_condition = col+' is null'
            cnt = df.filter(filter_condition).count()
            if cnt > 0:
                print(col+" have null value")
        return df
    
    
    # 将数据集分为train_data和test_data
    def splitData(df, split_train_ratio):
        #将数据随机分为训练集和测试集, 并将它们存与内存中,加快后序运行速度 cache()
        train_df, test_df = df.randomSplit(weights=[split_train_ratio, 1-split_train_ratio], seed=666)#设立随机种子
        train_df.cache()
        print("-------------training data------------- ")
        counts_level = train_df.groupBy('label').count().rdd.collectAsMap()
        count_train_total = train_df.count()
        for level, cnt in counts_level.items():
            print(level, cnt, round(cnt/count_train_total,2))
        test_df.cache()
        print("-------------test data------------- ")
        count_test_total = test_df.count()
        counts_level = test_df.groupBy('label').count().rdd.collectAsMap()
        for level, cnt in counts_level.items():
            print(level, cnt, round(cnt/count_test_total,2))
        return train_df, test_df
    
    
    ####################################################### 流程使用
    # 变量定义
    table = 'tmp.tmp_zwm_user_by_phone_90_train_data_v2'
    label = 'label'
    sample_ratio1, sample_ratio2, sample_ratio3 = 0.33, 1.0, 5.0
    cate_cols = ['col1','col2']
    num_cols = ['col3','col4']
    onehot_cols = ['职业']
    no_double_cols = ['user_log_acct'] + onehot_cols
    split_train_ratio = 0.9
    
    # 流程使用
    df = getDataFromHive(table)
    getClassRatio(df, label)
    df = resampleClass(df, label, sample_ratio1, sample_ratio2, sample_ratio3)
    df = convertToDouble(df, no_double_cols)
    df = fillNone(df, cate_cols, num_cols, onehot_cols)
    train_df, test_df = splitData(df, split_train_ratio)
    

    特征工程和模型训练(结合pipeline)

    ############################################ 函数封装
    def pipeline(need_onehot_col,cate_cols,num_cols,cv_grid_search):
        # one-hot 处理
        stringIndex = StringIndexer(inputCol=need_onehot_col, outputCol=need_onehot_col+'_Index')
        onehoter = OneHotEncoder(dropLast= False, inputCol=need_onehot_col+'_Index', outputCol= need_onehot_col+'_vector')
    
        # 用vectorAssembler将所有特征合并为一列,将此列输入模型
        asseblerInputs = [need_onehot_col+'_vector'] + cate_cols + num_cols
        assembler = VectorAssembler(inputCols=asseblerInputs, outputCol="features")
    
        # 构建随机森林的模型
        # 不需要调参
        if not cv_grid_search:
            model = RandomForestClassifier(labelCol="level", featuresCol="features", numTrees=50)
        # 用grid search来调参    
        else:
            # 构建随机森林的模型
            rf = RandomForestClassifier(labelCol="label", featuresCol="features")
            # 使用评估器
            evaluator = MulticlassClassificationEvaluator()
            # evaluator = BinaryClassificationEvaluator(rawPredictionCol="rawPrediction",labelCol="buy")
            # evaluator.setMetricName('areaUnderROC')
            # 使用交叉验证, 使用 10 折 交叉验证获取最好的参数
            paramGrid = ParamGridBuilder(). \
                addGrid(rf.maxDepth, [10, 15]). \
                addGrid(rf.numTrees, [50, 60]). \
                build()
            model = CrossValidator(estimator=rf, estimatorParamMaps=paramGrid, evaluator=evaluator, numFolds=2)
            
        # 将以上过程都集合到pipeline中
        rfpipeline = Pipeline(stages = [stringIndex, onehoter, assembler, model])
        rfpipeline.getStages()
        return rfpipeline
    
    
    # 模型训练
    def train(train_df, rfPipeline):
        rfModel = rfPipeline.fit(train_df)
        return rfModel
    
    # 模型保存
    def saveModel(path, rfModel):
        rfModel.write().overwrite().save(path)
    
    # 测试集上验证    
    def test(test_df,rfModel):
        rfPrediction = rfModel.transform(test_df)
        return rfPrediction
    
    ############################################ 流程使用
    rfPipeline = pipeline(onehot_cols[0],cate_cols,num_cols,cv_grid_search=1)
    rfModel = train(train_df, rfPipeline)
    
    rfPrediction = test(test_df,rfModel)
    path = 'hdfs://ns4/rf_model'
    saveModel(path, rfModel)
    

    模型参数和特征重要性

    ####################################### 函数封装
    # 获取模型参数(只有在用grid search时才需要调用)
    def getBestParam(rfModel):
        param_dict = rfModel.extractParamMap()
        print(param_dict)
        sane_dict = {}
        print("------------------- 最佳参数 -------------------")
        for k, v in param_dict.items():
            sane_dict[k.name] = v
            print(k.name, v)
    
    
    # 获取特征重要性
    def getFeaImportance(rfModel, asseblerInputs):
        ff=rfModel.featureImportances
        importancesList=[float(col) for col in  ff]
        colList=asseblerInputs
        result=dict(zip(colList,importancesList))
        result_sorted = sorted(result.items(), key = lambda kv:(kv[1], kv[0]), reverse = True)
        for key,value in result_sorted:
            print(str(key)+'\t'+str(value))
    ####################################### 流程使用
    cv_grid_search = 1
    asseblerInputs = [onehot_cols[0]+'_vector'] + cate_cols + num_cols
    
    
    # 如果有用参数调优,那么需要首先选出最佳模型
    if (cv_grid_search):
        bestModel = rfModel.stages[3].bestModel
        getBestParam(rfModel)
        getFeaImportance(bestModel, asseblerInputs)

    模型预测效果评估

    #################################### 函数封装
    # 混淆矩阵
    def getConfusionMatrix(class_list,final_data):
        matrice = []
        for label_class in class_list:
            for pred_class in class_list:
                ans = final_data.where("label = '"+label_class+"' and prediction = '"+pred_class+"' ").count()
                matrice.append((label_class, pred_class, ans))
        schema = StructType([ \
            StructField("label_class",StringType(),True), \
            StructField("pred_class",StringType(),True), \
            StructField("num",IntegerType(),True) \
          ])
        confusion_df = spark.createDataFrame(data=matrice,schema=schema)
        print('---------------------- 混淆矩阵 ----------------------')
        confusion_df.show(20,False)
        return confusion_df
    
    
    # 获取Accuracy(准确度), Precision(精确率), Recall(召回率), F1
    # 返回:[(label,accuracy,precision,recall,F1)]
    def getAccPreRecallF1(num_class, confusion_df):
        indicators = []   
        for label in class_list:
            tp = confusion_df.where("label_class = "+label+" and  pred_class = "+label+"").select("num").groupBy().sum().collect()[0][0]
            fp = confusion_df.where("label_class != "+label+" and  pred_class = "+label+"").select("num").groupBy().sum().collect()[0][0]
            fn = confusion_df.where("label_class = "+label+" and  pred_class != "+label+"").select("num").groupBy().sum().collect()[0][0]
            tn = confusion_df.where("label_class != "+label+" and  pred_class != "+label+"").select("num").groupBy().sum().collect()[0][0]
            try:
                accuracy = float((tp + tn)/(tp + fp + tn + fn))
            except:
                accuracy = 0    
            try:
                precision = float((tp)/(tp + fp))
            except:
                precision = 0
            try:
                recall = float((tp)/(tp + fn))
            except:
                recall = 0
            try:
                F1 = float(2 * precision * recall / (precision + recall))
            except:
                F1 = 0           
            print("********************************  "+label+"accuracy + precision + recall +f1 指标  **************************************")
            print("Accuracy: " + str(accuracy))
            print("Precision: " + str(precision))
            print("Recall: " + str(recall))
            print("F1: " + str(F1))
            indicators.append((label,accuracy,precision,recall,F1))
        return indicators
    
    
    #################################### 流程使用
    class_list = ['1','2','3']
    class_num = len(class_list)
    data = rfPrediction.select(["probability",  "prediction", "label"])
    data.show(10,False)
    /*
    +-----------------------------------------------------------------+----------+-----+
    |probability                                                      |prediction|label|
    +-----------------------------------------------------------------+----------+-----+
    |[0.0,0.583591671704619,0.28108212650563363,0.13532620178974736]  |1.0       |1.0  |
    |[0.0,0.8774618812572824,0.10234430380603526,0.02019381493668242] |1.0       |1.0  |
    |[0.0,0.4782509343569565,0.33612438458777977,0.18562468105526372] |1.0       |1.0  |
    |[0.0,0.9098097538504624,0.07026420031359343,0.01992604583594417] |1.0       |1.0  |
    |[0.0,0.48297873481567066,0.33108429300070136,0.1859369721836279] |1.0       |1.0  |
    |[0.0,0.4968909309318865,0.28535594176292345,0.21775312730519]    |1.0       |1.0  |
    |[0.0,0.5934095936269977,0.2614107165549716,0.14517968981803062]  |1.0       |1.0  |
    |[0.0,0.8722150576870729,0.09922368779910376,0.028561254513823426]|1.0       |1.0  |
    |[0.0,0.9589919232485213,0.02980026788117229,0.011207808870306393]|1.0       |1.0  |
    |[0.0,0.8086883057529066,0.1586952135257052,0.032616480721388154] |1.0       |1.0  |
    +-----------------------------------------------------------------+----------+-----+
    */
    
    # 获取混淆矩阵
    confusion_df =  getConfusionMatrix(class_list,data)
    # 获取acc_precision_recall_f1指标
    acc_precision_recall_f1 = getAccPreRecallF1(class_num, confusion_df)
    

     

    pyspark 项目预测过程

    #############################################################################################################################
    # 配置saprk 环境
    #############################################################################################################################
    import json
    import datetime
    import pandas as pd
    from pyspark.sql import SparkSession
    import pyspark.sql.functions as F
    from pyspark.sql.functions import col
    from pyspark.sql.types import StructType,StructField, StringType, IntegerType
    from pyspark.mllib.evaluation import BinaryClassificationMetrics
    from pyspark.ml import Pipeline
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.ml.classification import RandomForestClassificationModel
    from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
    from pyspark.ml.evaluation import MulticlassClassificationEvaluator
    from pyspark.ml import Pipeline
    from pyspark.ml.feature import StringIndexer, OneHotEncoder, VectorAssembler
    from pyspark.ml.classification import RandomForestClassifier
    from pyspark.sql.functions import current_date
    from datetime import datetime
    from pyspark.ml import PipelineModel
    from pyspark.ml import Pipeline
    import matplotlib.pyplot as plt
    import os
    import numpy as np
    os.environ['PYSPARK_PYTHON']='/usr/local/anaconda3/bin/python3.6'
    import datetime
    import seaborn as sns
    import matplotlib.pyplot as plt
    from pyspark.sql.window import *
    from pyspark.sql import Window
    from pyspark.ml.feature import StringIndexer
    from pyspark.ml.feature import OneHotEncoder
    from pyspark.mllib.util import MLUtils
    from pyspark.mllib.evaluation import MulticlassMetrics
    spark = (SparkSession
     .builder
     .appName("phone_purchase")
     .enableHiveSupport()
     .config("spark.executor.instances", "300")
     .config("spark.executor.memory","16g")
     .config("spark.executor.cores","4")
     .config("spark.driver.memory","8g")
     .config("spark.driver.maxResultSize", "8g")
     .config("spark.sql.shuffle.partitions","1200")
     .config("spark.yarn.appMasterEnv.yarn.nodemanager.container-executor.class","DockerLinuxContainer")
     .config("spark.executorEnv.yarn.nodemanager.container-executor.class","DockerLinuxContainer")
     .getOrCreate()) 
    
    #############################################################################################################################
    # 帮助函数
    #############################################################################################################################
    # 读取数据集合
    def getDataFromHive(table,dt):
        sql = "select * from "+table+" where dt = '"+dt+"'"
        print(sql)
        df = spark.sql(sql)
        print('The number of data', df.count())
        #df.printSchema()
        return df 
        
            
    # 除去one-hot字段和pin,其余字段全部转为double类型
    def convertToDouble(df, no_double_cols):
        double_cols = list(set(df.columns).difference(set(no_double_cols)))    
        df = df.select(no_double_cols + [col(column).cast("double").alias(column) for column in double_cols])
        return df 
    
    
    # 字段缺失值填充
    def fillNone(df, cate_cols, num_cols, onehot_cols):
        # 对于转为double的类别型特征,用 -1
        df = df.fillna(-1.0, subset=cate_cols)
        # 对于数值型特征,用 0
        df = df.fillna(0.0, subset=num_cols)
        # 对于需要onehot的特征,用 '-1'
        df = df.fillna('-1', subset=onehot_cols)
        for col in df.columns:
            filter_condition = col+' is null'
            cnt = df.filter(filter_condition).count()
            if cnt > 0:
                print(col+" have null value")
        return df
    
    #############################################################################################################################
    # 加载数据并进行数据预处理
    #############################################################################################################################
    # 变量定义
    today=datetime.date.today() 
    oneday=datetime.timedelta(days=1) 
    dt=str(today-oneday)  
    
    table = '特征表名'
    cate_cols = ['col1','col2','col3']
    num_cols = ['col4','col5','col6']
    onehot_cols = ['col7']
    no_double_cols = ['user_log_acct'] + onehot_cols
    
    
    # 流程使用
    predict_df = getDataFromHive(table,dt)
    print("预测数据中共有",predict_df.count(),"条数据")
    predict_df = convertToDouble(predict_df, no_double_cols)
    predict_df = fillNone(predict_df, cate_cols, num_cols, onehot_cols)
    
    
    #############################################################################################################################
    # 加载模型并进行预测
    #############################################################################################################################
    from pyspark.ml import PipelineModel
    from pyspark.ml import Pipeline
    
    rfModel = PipelineModel.load('hdfs://ns4/rf_model')
    predict_result = rfModel.transform(predict_df)
    
    
    #############################################################################################################################
    # 查看预测结果并落表
    #############################################################################################################################
    data = predict_result.select(["user_log_acct", "probability",  "prediction"])
    data.show(10,False)
    
    
    # probability为 vector 类型,通过udf函数来抽取出其中每个标签值
    from pyspark.sql.functions import udf, col
    from pyspark.sql.types import ArrayType, DoubleType
    
    def to_array(col):
        def to_array_(v):
            return v.toArray().tolist()
        # Important: asNondeterministic requires Spark 2.3 or later
        # It can be safely removed i.e.
        # return udf(to_array_, ArrayType(DoubleType()))(col)
        # but at the cost of decreased performance
        return udf(to_array_, ArrayType(DoubleType())).asNondeterministic()(col)
    
    df = (data.withColumn("xs", to_array(col("probability"))).select(["user_log_acct","prediction","probability"] + [col("xs")[i] for i in range(4)]))
          
    df = df.withColumnRenamed("xs[1]","low_probability").withColumnRenamed("xs[2]","high_probability").withColumnRenamed("xs[3]","super_high_probability")
    df = df.select(["user_log_acct","probability","low_probability","high_probability","super_high_probability","prediction"])
    df.show(10)
    
    
    # df 转为临时表/临时视图
    df.createOrReplaceTempView("df_tmp_view")
    
    sql = """
    insert overwrite table 结果表
    select
        user_log_acct,
    	low_probability,
    	high_probability,
    	super_high_probability,
    	prediction
    from
    	df_tmp_view 
    """
    print(sql)
    # spark.sql 插入hive
    spark.sql(sql)
    
    
    # CREATE TABLE `结果表`(
    #   user_log_acct string comment '用户pin',
    #   low_probability double comment '低购买力概率',
    #   high_probability double comment '高购买力概率',
    #   super_high_probability double comment '超高购买力概率n',
    #   prediction double comment '模型预测的该用户的分类')
    # comment '预测表'
    # ROW FORMAT DELIMITED FIELDS TERMINATED BY '\t' LINES TERMINATED BY '\n' NULL DEFINED AS "" STORED AS ORC tblproperties
    # 	(
    # 		'orc.compress' = 'SNAPPY'
    # 	) ;

     

    展开全文
  • PySpark_Test:测试项目以练习pyspark
  • spark2-submit 提交 python(pyspark)项目

    千次阅读 2019-10-13 07:49:02
    假设一个pyspark项目下边有两个文件:main.py和utils.py local (1)目录结构 (2)提交命令 spark2-submit --master local --deploy-mode client main.py yarn (1)目录结构 其中main.py是项目的主入口...

    两种方式

    • local
    • yarn

    假设一个pyspark的项目下边有两个文件:main.pyutils.py。其中main.py是项目的主入口文件,utils.py中可能包含一些UDF。

    1. local
      (1)目录结构
      在这里插入图片描述
      (2)提交命令
    spark2-submit --master local --deploy-mode client main.py
    
    1. yarn
      (1)目录结构
      在这里插入图片描述
      其中project.zipmain.pyutils.py两个文件的zip压缩文件。
      (2)提交命令
    spark2-submit --master yarn --deploy-mode client --py-files project.zip main.py
    
    展开全文
  • 摘要PySpark作为工业界常用于处理大数据以及分布式计算的工具,特别是在算法建模时起到了非常大的作用。PySpark如何建模呢?这篇文章手把手带你入门PySpark,提前感受工...

    摘要  

     PySpark作为工业界常用于处理大数据以及分布式计算的工具,特别是在算法建模时起到了非常大的作用。PySpark如何建模呢?这篇文章手把手带你入门PySpark,提前感受工业界的建模过程!

    任务简介  

    电商中,了解用户在不同品类的各个产品的购买力是非常重要的!这将有助于他们为不同产品的客户创建个性化的产品。在这篇文章中,笔者在真实的数据集中手把手实现如何预测用户在不同品类的各个产品的购买行为

    如果有兴趣和笔者一步步实现项目,可以先根据上一篇文章的介绍中安装PySpark,并在网站中下载数据。

    https://datahack.analyticsvidhya.com/contest/black-friday/

    数据集简介  

    零售公司想要了解针对不同类别的各种产品的顾客购买行为(购买量)。他们为上个月选定的大批量产品分享了各种客户的购买汇总。该数据集还包含客户人口统计信息(age, gender, marital status, city_type, stay_in_current_city),产品详细信息(product_id and product category)以及上个月的purchase_amount总数。现在,他们希望建立一个模型来预测客户对各种产品的购买量,这将有助于他们为不同产品的客户创建个性化的产品。

    手把手实战项目  

    1. 导入数据

    这里我们使用PySpark的读数据接口read.csv读取数据,和pandas读取数据接口迷之相似。

    from pyspark.sql import SparkSession
    
    
    spark = SparkSession \
        .builder \
        .appName("test") \
        .config("spark.some.config.option", "setting") \
        .getOrCreate()
        
    train = spark.read.csv('./BlackFriday/train.csv', header=True, inferSchema=True)
    test = spark.read.csv('./BlackFriday/test.csv', header=True,  inferSchema=True
    

    2. 分析数据的类型

    要查看Dataframe中列的类型,可以使用printSchema()方法。让我们在train上应用printSchema(),它将以树格式打印模式。

    train.printSchema()
    """
    root
     |-- User_ID: integer (nullable = true)
     |-- Product_ID: string (nullable = true)
     |-- Gender: string (nullable = true)
     |-- Age: string (nullable = true)
     |-- Occupation: integer (nullable = true)
     |-- City_Category: string (nullable = true)
     |-- Stay_In_Current_City_Years: string (nullable = true)
     |-- Marital_Status: integer (nullable = true)
     |-- Product_Category_1: integer (nullable = true)
     |-- Product_Category_2: integer (nullable = true)
     |-- Product_Category_3: integer (nullable = true)
     |-- Purchase: integer (nullable = true)
    """
    

    3预览数据集

    在PySpark中,我们使用head()方法预览数据集以查看Dataframe的前n行,就像python中的pandas一样。我们需要在head方法中提供一个参数(行数)。让我们看一下train的前5行。

    train.head(5)
    """
    [Row(User_ID=1000001, Product_ID='P00069042', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=3, Product_Category_2=None, Product_Category_3=None, Purchase=8370), 
    Row(User_ID=1000001, Product_ID='P00248942', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=1, Product_Category_2=6, Product_Category_3=14, Purchase=15200),  
    Row(User_ID=1000001, Product_ID='P00087842', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=None, Product_Category_3=None, Purchase=1422), 
    Row(User_ID=1000001, Product_ID='P00085442', Gender='F', Age='0-17', Occupation=10, City_Category='A', Stay_In_Current_City_Years='2', Marital_Status=0, Product_Category_1=12, Product_Category_2=14, Product_Category_3=None, Purchase=1057), 
    Row(User_ID=1000002, Product_ID='P00285442', Gender='M', Age='55+', Occupation=16, City_Category='C', Stay_In_Current_City_Years='4+', Marital_Status=0, Product_Category_1=8, Product_Category_2=None, Product_Category_3=None, Purchase=7969)]
    """
    

    要查看数据框架中的行数,我们需要调用方法count()。让我们核对一下train上的行数。Pandas和Spark的count方法是不同的。

    4插补缺失值

    通过调用drop()方法,可以检查train上非空数值的个数,并进行测试。默认情况下,drop()方法将删除包含任何空值的行。我们还可以通过设置参数“all”,当且仅当该行所有参数都为null时以删除该行。这与pandas上的drop方法类似。

    train.na.drop('any').count(),test.na.drop('any').count()
    """
    (166821, 71037)
    """
    

    在这里,为了填充简单,我使用-1来填充train和test的null值。虽然这不是一个很好的填充方法,你可以选择其他的填充方式。

    train = train.fillna(-1)
    test = test.fillna(-1)
    

    5分析数值特征

    我们还可以使用describe()方法查看Dataframe列的各种汇总统计信息,它显示了数字变量的统计信息。要显示结果,我们需要调用show()方法。

    train.describe().show()
    """
    +-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
    |summary|           User_ID|Product_ID|Gender|   Age|        Occupation|City_Category|Stay_In_Current_City_Years|     Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|         Purchase|
    +-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
    |  count|            550068|    550068|550068|550068|            550068|       550068|                    550068|             550068|            550068|            550068|            550068|           550068|
    |   mean|1003028.8424013031|      null|  null|  null| 8.076706879876669|         null|         1.468494139793958|0.40965298835780306| 5.404270017525106| 6.419769919355425| 3.145214773446192|9263.968712959126|
    | stddev| 1727.591585530871|      null|  null|  null|6.5226604873418115|         null|         0.989086680757309| 0.4917701263173259| 3.936211369201324| 6.565109781181374| 6.681038828257864|5023.065393820593|
    |    min|           1000001| P00000142|     F|  0-17|                 0|            A|                         0|                  0|                 1|                -1|                -1|               12|
    |    max|           1006040|  P0099942|     M|   55+|                20|            C|                        4+|                  1|                20|                18|                18|            23961|
    +-------+------------------+----------+------+------+------------------+-------------+--------------------------+-------------------+------------------+------------------+------------------+-----------------+
    """
    

    上面看起来好像比较乱,这里我们选择某一列来看看

    让我们从一个列中选择一个名为“User_ID”的列,我们需要调用一个方法select并传递我们想要选择的列名。select方法将显示所选列的结果。我们还可以通过提供用逗号分隔的列名,从数据框架中选择多个列。

    train.select('User_ID','Age').show(5)
    """
    +-------+----+
    |User_ID| Age|
    +-------+----+
    |1000001|0-17|
    |1000001|0-17|
    |1000001|0-17|
    |1000001|0-17|
    |1000002| 55+|
    +-------+----+
    only showing top 5 rows
    """
    
    6. 分析categorical特征

    为了建立一个模型,我们需要在“train”和“test”中看到分类特征的分布。这里我只对Product_ID显示这个,但是我们也可以对任何分类特性执行相同的操作。让我们看看在“train”和“test”中Product_ID的不同类别的数量。这可以通过应用distinct()count()方法来实现。

    train.select('Product_ID').distinct().count(), test.select('Product_ID').distinct().count()
    """
    (3631, 3491)
    """
    

    在计算“train”和“test”的不同值的数量后,我们可以看到“train”和“test”有更多的类别。让我们使用相减方法检查Product_ID的类别,这些类别正在"test"中,但不在“train”中。我们也可以对所有的分类特征做同样的处理。

    diff_cat_in_train_test=test.select('Product_ID').subtract(train.select('Product_ID'))
    
    
    diff_cat_in_train_test.distinct().count()
    """
    (46, None)
    """
    
    
    diff_cat_in_train_test.distinct().show(5)
    """
    +----------+
    |Product_ID|
    +----------+
    | P00322642|
    | P00300142|
    | P00077642|
    | P00249942|
    | P00294942|
    +----------+
    only showing top 5 rows
    """
    

    以上你可以看到46个不同的类别是在"test"中,而不在"train"中。在这种情况下,我们要么收集更多关于它们的数据,要么跳过那些类别(无效类别)的“test”。

    7. 将分类变量转换为标签

    我们还需要通过在Product_ID上应用StringIndexer转换将分类列转换为标签,该转换将标签的Product_ID列编码为标签索引的列。

    from pyspark.ml.feature import StringIndexer
    plan_indexer = StringIndexer(inputCol = 'Product_ID', outputCol = 'product_id_trans')
    labeller = plan_indexer.fit(train)
    

    在上面,我们将fit()方法应用于“train”数据框架上,构建了一个标签。稍后我们将使用这个标签来转换我们的"train"和“test”。让我们在labeller的帮助下转换我们的train和test的Dataframe。我们需要调用transform方法。我们将把转换结果存储在Train1和Test1中.

    Train1 = labeller.transform(train)
    Test1 = labeller.transform(test)
    Train1.show(2)
    """
    +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+
    |User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|product_id_trans|
    +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+
    |1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|           766.0|
    |1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|           183.0|
    +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+
    only showing top 2 rows
    """
    
    
    Train1.select('product_id_trans').show(2)
    """
    +----------------+
    |product_id_trans|
    +----------------+
    |           766.0|
    |           183.0|
    +----------------+
    only showing top 2 rows
    """
    

    上面已经显示了我们在以前的"train" Dataframe中成功的添加了一个转化后的列“product_id_trans”,("Train1" Dataframe)。

    8选择特征来构建机器学习模型

    首先,我们需要从pyspark.ml.feature导入RFormula;然后,我们需要在这个公式中指定依赖和独立的列;我们还必须为为features列和label列指定名称

    from pyspark.ml.feature import RFormula
    formula = RFormula(formula="Purchase ~ Age+ Occupation +City_Category+Stay_In_Current_City_Years+Product_Category_1+Product_Category_2+ Gender",
                       featuresCol="features",labelCol="label")
    

    在创建了这个公式之后,我们需要将这个公式应用到我们的Train1上,并通过这个公式转换Train1,Test1。让我们看看如何做到这一点,在拟合变换train1之后,

    t1 = formula.fit(Train1)
    train1 = t1.transform(Train1)
    test1 = t1.transform(Test1)
    train1.show(2)
    """
    +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+--------------------+-------+
    |User_ID|Product_ID|Gender| Age|Occupation|City_Category|Stay_In_Current_City_Years|Marital_Status|Product_Category_1|Product_Category_2|Product_Category_3|Purchase|product_id_trans|            features|  label|
    +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+--------------------+-------+
    |1000001| P00069042|     F|0-17|        10|            A|                         2|             0|                 3|                -1|                -1|    8370|           766.0|(16,[6,10,13,14],...| 8370.0|
    |1000001| P00248942|     F|0-17|        10|            A|                         2|             0|                 1|                 6|                14|   15200|           183.0|(16,[6,10,13,14],...|15200.0|
    +-------+----------+------+----+----------+-------------+--------------------------+--------------+------------------+------------------+------------------+--------+----------------+--------------------+-------+
    only showing top 2 rows
    """
    

    在应用了这个公式之后,我们可以看到train1和test1有两个额外的列,称为features和label,并对我们在公式中指定的列进行标记(featuresCol= features和labelCol= label)。直观上,train1和test1中的features列中的所有分类变量都被转换为数值,数值变量与之前应用ML时相同。我们还可以查看train1和test1中的列特性和标签。

    train1.select('features').show(2)
    """
    +--------------------+
    |            features|
    +--------------------+
    |(16,[6,10,13,14],...|
    |(16,[6,10,13,14],...|
    +--------------------+
    only showing top 2 rows
    """
    
    
    train1.select('label').show(2)
    """
    +-------+
    |  label|
    +-------+
    | 8370.0|
    |15200.0|
    +-------+
    only showing top 2 rows
    """
    

    9建立机器学习模型

    在应用RFormula和转换Dataframe之后,我们现在需要根据这些数据开发机器学习模型。我想为这个任务应用一个随机森林回归。让我们导入一个在pyspark.ml中定义的随机森林回归器。然后建立一个叫做rf的模型。我将使用随机森林算法的默认参数。

    from pyspark.ml.regression import RandomForestRegressor
    rf = RandomForestRegressor()
    

    在创建一个模型rf之后,我们需要将train1数据划分为train_cv和test_cv进行交叉验证。这里,我们将train1数据区域划分为train_cv的70%和test_cv的30%。

    (train_cv, test_cv) = train1.randomSplit([0.7, 0.3])
    

    在train_cv上建立模型,在test_cv上进行预测。结果将保存在predictions中。

    model1 = rf.fit(train_cv)
    predictions = model1.transform(test_cv)
    

    10. 模型效果评估

    让我们评估对test_cv的预测,看看rmse和mse是多少。

    为了评估模型,我们需要从pyspark.ml.evaluation中导入RegressionEvaluator。我们必须为此创建一个对象。有一种方法叫 evaluate for evaluator ,它对模型求值。我们需要为此指定度量标准。

    from pyspark.ml.evaluation import RegressionEvaluator
    evaluator = RegressionEvaluator()
    mse = evaluator.evaluate(predictions,{evaluator.metricName:"mse" })
    import numpy as np
    np.sqrt(mse), mse
    """
    (3832.4796474051345, 14687900.247774584)
    """
    

    经过计算,我们可以看到我们的rmse是3827.767295494888。

    现在,我们将在所有的train1数据集上再次训练一个模型。

    model = rf.fit(train1)
    predictions1 = model.transform(test1)
    

    预测之后,我们得到测试集预测结果,并将其保存成csv文件。

    df = predictions1.selectExpr("User_ID as User_ID", "Product_ID as Product_ID", 'prediction as Purchase')
    df.toPandas().to_csv('./BlackFriday/submission.csv')
    

    写入csv文件后(submission.csv)。我们可以上传我们的第一个解决方案来查看分数,我得到的分数是3844.20920145983。

    总结  

    在本文中,我以一个真实案例介绍了PySpark建模流程。这只是本系列文章的开始。在接下来的几周,我将继续分享PySpark使用的教程。同时,如果你有任何问题,或者你想对我要讲的内容提出任何建议,欢迎留言。

    个人微信:加时请注明 (昵称+公司/学校+方向)

      历史精品文章推荐   

    知否?知否?一文看懂深度文本分类之DPCNN原理与代码

    机器学习入门方法和资料合集

    撩一发深度文本分类之RNN via Attention

    15分钟带你入门sklearn与机器学习——分类算法篇

    如何为你的回归问题选择最合适的机器学习方法?

    PySpark 安装、配置之使用初体验

    展开全文
  • pyspark

    2018-10-26 10:47:00
    通过win7使用spark的pyspark访问hive 1、安装spark软件包 2、复制mysql驱动 3、复制hadoop配置目录到spark的conf下 4、复制hadoop和hive的配置文件到conf下 5.1、在pyspark脚本中添加HADOOP_CONF_DIR...
  • PySpark

    2018-11-30 19:14:14
    PySpark 是 Spark 为 Python 开发者提供的 API ,位于 $SPARK_HOME/bin 目录,其依赖于 Py4J。 系统环境 Linux Ubuntu 14.04 jdk-7u75-linux-x64 hadoop-2.6.0-cdh5.4.5 scala-2.10.4 spark-1.6.0-bin-hadoop2.6 ...
  • PySpark示例项目 本文档旨在与pyspark-template-project存储库中的代码并行阅读。 这些共同构成了我们认为是使用Apache Spark及其Python('PySpark')API编写ETL作业的“最佳实践”方法。 该项目解决以下主题: ...
  • Python项目实战:使用PySpark分析日志文件 日志文件是用于记录系统操作事件的记录文件或文件集合,可分为事件日志和消息日志。具有处理历史数据、诊断问题的追踪以及理解系统的活动等重要作用。有了日志文件,就...
  • pyspark学习

    2019-10-16 20:49:33
    由于公司的项目需要用pyspark做数据清洗等工作,于是现学现用,也有很多不懂的地方,如果文章里面有什么总结得有问题的,欢迎大家指出。 更详细的介绍也可以参考PySpark教程:使用Python学习Apache Spark 一. ...
  • 作者 |hecongqing来源 | AI算法之心(ID:AIHeartForYou)【导读】PySpark作为工业界常用于处理大数据以及分布式计算的工具,特别是在算法建模时起到了非常大的作用。PySpark如何建模呢?这篇文章手把手带你入门...
  • Python项目实战:使用PySpark对大数据进行分析 大数据,顾名思义就是大量的数据,一般这些数据都是PB级以上。PB是数据存储容量的单位,它等于2的50次方个字节,或者在数值上大约等于1000个TB。这些数据的特点是种类...
  • PySpark在线图书的内部原理 该项目包含《 在线书籍的来源。 工具 该项目基于或使用以下工具: 致力于成为一个快速,简单,彻头彻尾的华丽静态站点生成器,旨在生成项目文档 主题的(具有“功能) 作为Markdown编辑...
  • 这个代码单独作为一个文件在pyspark项目代码中,只有一个功能即实现join相关的几个方法 # 这个方法是所有join动作的基础方法,实现了join的基本需求 # 通过mapValues的方法把所有的rdd value都变成一个(rddNumber,rdd....
  • 带有PySpark的Spark和Python用于大数据:Spark机器学习项目
  • i am trying to work with Pyspark in IntelliJ but i cannot figure out how to correctly install it/setup the project. I can work with Python in IntelliJ and I can use the pyspark shell but I cannot tell...
  • pyspark:GBDT

    千次阅读 2019-06-04 13:46:48
    from pyspark import SparkConf from pyspark.sql import SparkSession from pyspark.ml.linalg import Vectors from pyspark.ml.feature import StringIndexer from pyspark.ml.classification import GBTClassifi....
  • Pyspark实战

    2019-09-25 19:59:03
    本篇文章介绍pyspark的使用,参考书籍《pyspark实战》。 Apache Spark最初由Matei Zaharia开发,于2013年创建了Databricks公司并担任CTO。 1.spark介绍 spark提供MapReduce的灵活性和可扩展性,但是明显速度更快...
  • 序言 MARS的IPM项目后期要在Azure平台上搭算法系统,终归要把之前所有用pandas做数据处理的代码全部重构成PySpark,作为小白从前辈的口中得知PySpark是“世界上最烂的语言”后决定笨鸟先飞,既然早晚要承受痛苦,那...
  • Pyspark-源码

    2021-03-17 02:17:36
    大数据的Python和火花 适用于Python的课程笔记本和适用于大数据的Spark 课程大纲: 课程介绍 促销/介绍视频 课程大纲 Spark,RDD和Spark 2.0简介 课程设置 设置概述 EC2安装指南 ...带有PySpark示例
  • pyspark案例

    千次阅读 2018-09-25 11:33:08
    pyspark本地环境配置教程配置成功后,可以通过spark dataframe笔记练习pyspark的用法,不过最好是通过spark官网练习语法使用。下面写个小案例,供自己以后查阅: #!/usr/bin/python # -*- coding: utf-8 -*- "&...
  • 一个示例项目,旨在使用Apache Spark中的Pyspark和Spark SQL API演示ETL过程。 在这个项目中,我使用了Apache Sparks的Pyspark和Spark SQL API来对数据实施ETL过程,最后将转换后的数据加载到目标源。 我已经使用...
  • PySpark线性回归

    2020-09-06 22:44:20
    利用Python调用spark接口训练线性回归模型,详细介绍了PySpark的使用,包含:数据准备、数据探索、特征工程和模型训练

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,626
精华内容 1,450
关键字:

pyspark项目