精华内容
下载资源
问答
  • pyspark Spark SQL

    2021-02-28 23:02:52
    它是spark中用于处理结构化数据的一个模块 Spark SQL历史 Hive是目前大数据领域,事实上的数据仓库标准。 [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mPnE8yAy-1614524559940)...

    1、Spark SQL 概述

    Spark SQL概念

    • Spark SQL is Apache Spark’s module for working with structured data.
      • 它是spark中用于处理结构化数据的一个模块

    Spark SQL历史

    • Hive是目前大数据领域,事实上的数据仓库标准。

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-mPnE8yAy-1614524559940)(pics/s9.png)]

    • Shark:shark底层使用spark的基于内存的计算模型,从而让性能比Hive提升了数倍到上百倍。
    • 底层很多东西还是依赖于Hive,修改了内存管理、物理计划、执行三个模块
    • 2014年6月1日的时候,Spark宣布了不再开发Shark,全面转向Spark SQL的开发

    Spark SQL优势

    • Write Less Code

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Pirk8mbM-1614524559946)(pics/s10.png)]

    • Performance

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-FHxLHZaG-1614524559951)(pics/s11.png)]

    python操作RDD,转换为可执行代码,运行在java虚拟机,涉及两个不同语言引擎之间的切换,进行进程间 通信很耗费性能。

    DataFrame

    • 是RDD为基础的分布式数据集,类似于传统关系型数据库的二维表,dataframe记录了对应列的名称和类型
    • dataFrame引入schema和off-heap(使用操作系统层面上的内存)
      • 1、解决了RDD的缺点
      • 序列化和反序列化开销大
      • 频繁的创建和销毁对象造成大量的GC
      • 2、丢失了RDD的优点
      • RDD编译时进行类型检查
      • RDD具有面向对象编程的特性

    用scala/python编写的RDD比Spark SQL编写转换的RDD慢,涉及到执行计划

    • CatalystOptimizer:Catalyst优化器
    • ProjectTungsten:钨丝计划,为了提高RDD的效率而制定的计划
    • Code gen:代码生成器

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-vIEcghkH-1614524559955)(pics/s12.png)]

    直接编写RDD也可以自实现优化代码,但是远不及SparkSQL前面的优化操作后转换的RDD效率高,快1倍左右

    优化引擎:类似mysql等关系型数据库基于成本的优化器

    首先执行逻辑执行计划,然后转换为物理执行计划(选择成本最小的),通过Code Generation最终生成为RDD

    • Language-independent API

      用任何语言编写生成的RDD都一样,而使用spark-core编写的RDD,不同的语言生成不同的RDD

    • Schema

      结构化数据,可以直接看出数据的详情

      在RDD中无法看出,解释性不强,无法告诉引擎信息,没法详细优化。

    **为什么要学习sparksql **

    sparksql特性

    • 1、易整合
    • 2、统一的数据源访问
    • 3、兼容hive
    • 4、提供了标准的数据库连接(jdbc/odbc)

    2、DataFrame

    2.1 介绍

    在Spark语义中,DataFrame是一个分布式的行集合,可以想象为一个关系型数据库的表,或者一个带有列名的Excel表格。它和RDD一样,有这样一些特点:

    • Immuatable:一旦RDD、DataFrame被创建,就不能更改,只能通过transformation生成新的RDD、DataFrame
    • Lazy Evaluations:只有action才会触发Transformation的执行
    • Distributed:DataFrame和RDD一样都是分布式的
    • dataframe和dataset统一,dataframe只是dataset[ROW]的类型别名。由于Python是弱类型语言,只能使用DataFrame

    DataFrame vs RDD

    • RDD:分布式的对象的集合,Spark并不知道对象的详细模式信息
    • DataFrame:分布式的Row对象的集合,其提供了由列组成的详细模式信息,使得Spark SQL可以进行某些形式的执行优化。
    • DataFrame和普通的RDD的逻辑框架区别如下所示:

    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-iH4bKl8j-1614524573708)(pics/s13.png)]

    • 左侧的RDD Spark框架本身不了解 Person类的内部结构。

    • 右侧的DataFrame提供了详细的结构信息(schema——每列的名称,类型)

    • DataFrame还配套了新的操作数据的方法,DataFrame API(如df.select())和SQL(select id, name from xx_table where …)。

    • DataFrame还引入了off-heap,意味着JVM堆以外的内存, 这些内存直接受操作系统管理(而不是JVM)。

    • RDD是分布式的Java对象的集合。DataFrame是分布式的Row对象的集合。DataFrame除了提供了比RDD更丰富的算子以外,更重要的特点是提升执行效率、减少数据读取以及执行计划的优化。

    • DataFrame的抽象后,我们处理数据更加简单了,甚至可以用SQL来处理数据了

    • 通过DataFrame API或SQL处理数据,会自动经过Spark 优化器(Catalyst)的优化,即使你写的程序或SQL不仅高效,也可以运行的很快。

    • DataFrame相当于是一个带着schema的RDD

    Pandas DataFrame vs Spark DataFrame

    • Cluster Parallel:集群并行执行
    • Lazy Evaluations: 只有action才会触发Transformation的执行
    • Immutable:不可更改
    • Pandas rich API:比Spark SQL api丰富

    2.2 创建DataFrame

    1,创建dataFrame的步骤

    ​ 调用方法例如:spark.read.xxx方法

    2,其他方式创建dataframe

    • createDataFrame:pandas dataframe、list、RDD

    • 数据源:RDD、csv、json、parquet、orc、jdbc

      jsonDF = spark.read.json("xxx.json")
      
      jsonDF = spark.read.format('json').load('xxx.json')
      
      parquetDF = spark.read.parquet("xxx.parquet")
      
      jdbcDF = spark.read.format("jdbc").option("url","jdbc:mysql://localhost:3306/db_name").option("dbtable","table_name").option("user","xxx").option("password","xxx").load()
      
    • Transformation:延迟性操作

    • action:立即操作

      [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-akR1o85j-1614524573710)(pics/s14.png)]

    2.3 DataFrame API实现

    基于RDD创建

    from pyspark.sql import SparkSession
    from pyspark.sql import Row
    
    spark = SparkSession.builder.appName('test').getOrCreate()
    sc = spark.sparkContext
    # spark.conf.set("spark.sql.shuffle.partitions", 6)
    # ================直接创建==========================
    l = [('Ankit',25),('Jalfaizy',22),('saurabh',20),('Bala',26)]
    rdd = sc.parallelize(l)
    #为数据添加列名
    people = rdd.map(lambda x: Row(name=x[0], age=int(x[1])))
    #创建DataFrame
    schemaPeople = spark.createDataFrame(people)
    

    从csv中读取数据

    # ==================从csv读取======================
    #加载csv类型的数据并转换为DataFrame
    df = spark.read.format("csv"). \
        option("header", "true") \
        .load("iris.csv")
    #显示数据结构
    df.printSchema()
    #显示前10条数据
    df.show(10)
    #统计总量
    df.count()
    #列名
    df.columns
    

    增加一列

    # ===============增加一列(或者替换) withColumn===========
    #定义一个新的列,数据为其他某列数据的两倍
    #如果操作的是原有列,可以替换原有列的数据
    df.withColumn('newWidth',df.SepalWidth * 2).show()
    

    删除一列

    # ==========删除一列  drop=========================
    #删除一列
    df.drop('cls').show()
    

    统计信息

    #================ 统计信息 describe================
    df.describe().show()
    #计算某一列的描述信息
    df.describe('cls').show()   
    

    提取部分列

    # ===============提取部分列 select==============
    df.select('SepalLength','SepalWidth').show()
    

    基本统计功能

    # ==================基本统计功能 distinct count=====
    df.select('cls').distinct().count()
    

    分组统计

    # 分组统计 groupby(colname).agg({'col':'fun','col2':'fun2'})
    df.groupby('cls').agg({'SepalWidth':'mean','SepalLength':'max'}).show()
    
    # avg(), count(), countDistinct(), first(), kurtosis(),
    # max(), mean(), min(), skewness(), stddev(), stddev_pop(),
    # stddev_samp(), sum(), sumDistinct(), var_pop(), var_samp() and variance()
    

    自定义的汇总方法

    # 自定义的汇总方法
    import pyspark.sql.functions as fn
    #调用函数并起一个别名
    df.agg(fn.count('SepalWidth').alias('width_count'),fn.countDistinct('cls').alias('distinct_cls_count')).show()
    

    拆分数据集

    #====================数据集拆成两部分 randomSplit ===========
    #设置数据比例将数据划分为两部分
    trainDF, testDF = df.randomSplit([0.6, 0.4])
    

    采样数据

    # ================采样数据 sample===========
    #withReplacement:是否有放回的采样
    #fraction:采样比例
    #seed:随机种子
    sdf = df.sample(False,0.2,100)
    

    查看两个数据集在类别上的差异

    #查看两个数据集在类别上的差异 subtract,确保训练数据集覆盖了所有分类
    diff_in_train_test = testDF.select('cls').subtract(trainDF.select('cls'))
    diff_in_train_test.distinct().count()
    

    交叉表

    # ================交叉表 crosstab=============
    df.crosstab('cls','SepalLength').show()
    

    udf

    udf:自定义函数

    #================== 综合案例 + udf================
    # 测试数据集中有些类别在训练集中是不存在的,找到这些数据集做后续处理
    trainDF,testDF = df.randomSplit([0.99,0.01])
    
    diff_in_train_test = trainDF.select('cls').subtract(testDF.select('cls')).distinct().show()
    
    #首先找到这些类,整理到一个列表
    not_exist_cls = trainDF.select('cls').subtract(testDF.select('cls')).distinct().rdd.map(lambda x :x[0]).collect()
    
    #定义一个方法,用于检测
    def should_remove(x):
        if x in not_exist_cls:
            return -1
        else :
            return x
    
    #创建udf,udf函数需要两个参数:
    # Function
    # Return type (in my case StringType())
    
    #在RDD中可以直接定义函数,交给rdd的transformatioins方法进行执行
    #在DataFrame中需要通过udf将自定义函数封装成udf函数再交给DataFrame进行调用执行
    
    from pyspark.sql.types import StringType
    from pyspark.sql.functions import udf
    
    
    check = udf(should_remove,StringType())
    
    resultDF = trainDF.withColumn('New_cls',check(trainDF['cls'])).filter('New_cls <> -1')
    
    resultDF.show()
    

    3、JSON数据的处理

    3.1 介绍

    JSON数据

    • Spark SQL can automatically infer the schema of a JSON dataset and load it as a DataFrame

      Spark SQL能够自动将JSON数据集以结构化的形式加载为一个DataFrame

    • This conversion can be done using SparkSession.read.json on a JSON file

      读取一个JSON文件可以用SparkSession.read.json方法

    从JSON到DataFrame

    • 指定DataFrame的schema

      1,通过反射自动推断,适合静态数据

      2,程序指定,适合程序运行中动态生成的数据

    加载json数据

    #使用内部的schema
    jsonDF = spark.read.json("xxx.json")
    jsonDF = spark.read.format('json').load('xxx.json')
    
    #指定schema
    jsonDF = spark.read.schema(jsonSchema).json('xxx.json')
    

    嵌套结构的JSON

    • 重要的方法

      1,get_json_object

      2,get_json

      3,explode

    3.2 实践

    3.1 静态json数据的读取和操作

    无嵌套结构的json数据

    from pyspark.sql import SparkSession
    spark =  SparkSession.builder.appName('json_demo').getOrCreate()
    sc = spark.sparkContext
    
    # ==========================================
    #                无嵌套结构的json
    # ==========================================
    jsonString = [
    """{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""",
    """{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
    ]
    

    从json字符串数组得到DataFrame

    # 从json字符串数组得到rdd有两种方法
    # 1. 转换为rdd,再从rdd到DataFrame
    # 2. 直接利用spark.createDataFrame(),见后面例子
    
    jsonRDD = sc.parallelize(jsonString)   # stringJSONRDD
    jsonDF =  spark.read.json(jsonRDD)  # convert RDD into DataFrame
    jsonDF.printSchema()
    jsonDF.show()
    

    直接从文件生成DataFrame

    # -- 直接从文件生成DataFrame
    #只有被压缩后的json文件内容,才能被spark-sql正确读取,否则格式化后的数据读取会出现问题
    jsonDF = spark.read.json("xxx.json")
    # or
    # jsonDF = spark.read.format('json').load('xxx.json')
    
    jsonDF.printSchema()
    jsonDF.show()
    
    jsonDF.filter(jsonDF.pop>4000).show(10)
    #依照已有的DataFrame,创建一个临时的表(相当于mysql数据库中的一个表),这样就可以用纯sql语句进行数据操作
    jsonDF.createOrReplaceTempView("tmp_table")
    
    resultDF = spark.sql("select * from tmp_table where pop>4000")
    resultDF.show(10)
    

    3.2 动态json数据的读取和操作

    指定DataFrame的Schema

    3.1节中的例子为通过反射自动推断schema,适合静态数据

    下面我们来讲解如何进行程序指定schema

    没有嵌套结构的json

    jsonString = [
    """{ "id" : "01001", "city" : "AGAWAM",  "pop" : 15338, "state" : "MA" }""",
    """{ "id" : "01002", "city" : "CUSHMAN", "pop" : 36963, "state" : "MA" }"""
    ]
    
    jsonRDD = sc.parallelize(jsonString)
    
    from pyspark.sql.types import *
    
    #定义结构类型
    #StructType:schema的整体结构,表示JSON的对象结构
    #XXXStype:指的是某一列的数据类型
    jsonSchema = StructType() \
      .add("id", StringType(),True) \
      .add("city", StringType()) \
      .add("pop" , LongType()) \
      .add("state",StringType())
    
    jsonSchema = StructType() \
      .add("id", LongType(),True) \
      .add("city", StringType()) \
      .add("pop" , DoubleType()) \
      .add("state",StringType())
    
    reader = spark.read.schema(jsonSchema)
    
    jsonDF = reader.json(jsonRDD)
    jsonDF.printSchema()
    jsonDF.show()
    

    带有嵌套结构的json

    from pyspark.sql.types import *
    jsonSchema = StructType([
        StructField("id", StringType(), True),
        StructField("city", StringType(), True),
        StructField("loc" , ArrayType(DoubleType())),
        StructField("pop", LongType(), True),
        StructField("state", StringType(), True)
    ])
    
    reader = spark.read.schema(jsonSchema)
    jsonDF = reader.json('data/nest.json')
    jsonDF.printSchema()
    jsonDF.show(2)
    jsonDF.filter(jsonDF.pop>4000).show(10)
    

    4、数据清洗

    前面我们处理的数据实际上都是已经被处理好的规整数据,但是在大数据整个生产过程中,需要先对数据进行数据清洗,将杂乱无章的数据整理为符合后面处理要求的规整数据。

    数据去重

    '''
    1.删除重复数据
    
    groupby().count():可以看到数据的重复情况
    '''
    df = spark.createDataFrame([
      (1, 144.5, 5.9, 33, 'M'),
      (2, 167.2, 5.4, 45, 'M'),
      (3, 124.1, 5.2, 23, 'F'),
      (4, 144.5, 5.9, 33, 'M'),
      (5, 133.2, 5.7, 54, 'F'),
      (3, 124.1, 5.2, 23, 'F'),
      (5, 129.2, 5.3, 42, 'M'),
    ], ['id', 'weight', 'height', 'age', 'gender'])
    
    # 查看重复记录
    #无意义重复数据去重:数据中行与行完全重复
    # 1.首先删除完全一样的记录
    df2 = df.dropDuplicates()
    
    #有意义去重:删除除去无意义字段之外的完全重复的行数据
    # 2.其次,关键字段值完全一模一样的记录(在这个例子中,是指除了id之外的列一模一样)
    # 删除某些字段值完全一样的重复记录,subset参数定义这些字段
    df3 = df2.dropDuplicates(subset = [c for c in df2.columns if c!='id'])
    # 3.有意义的重复记录去重之后,再看某个无意义字段的值是否有重复(在这个例子中,是看id是否重复)
    # 查看某一列是否有重复值
    import pyspark.sql.functions as fn
    df3.agg(fn.count('id').alias('id_count'),fn.countDistinct('id').alias('distinct_id_count')).collect()
    # 4.对于id这种无意义的列重复,添加另外一列自增id
    
    df3.withColumn('new_id',fn.monotonically_increasing_id()).show()
    

    缺失值处理

    '''
    2.处理缺失值
    2.1 对缺失值进行删除操作(行,列)
    2.2 对缺失值进行填充操作(列的均值)
    2.3 对缺失值对应的行或列进行标记
    '''
    df_miss = spark.createDataFrame([
    (1, 143.5, 5.6, 28,'M', 100000),
    (2, 167.2, 5.4, 45,'M', None),
    (3, None , 5.2, None, None, None),
    (4, 144.5, 5.9, 33, 'M', None),
    (5, 133.2, 5.7, 54, 'F', None),
    (6, 124.1, 5.2, None, 'F', None),
    (7, 129.2, 5.3, 42, 'M', 76000),],
     ['id', 'weight', 'height', 'age', 'gender', 'income'])
    
    # 1.计算每条记录的缺失值情况
    
    df_miss.rdd.map(lambda row:(row['id'],sum([c==None for c in row]))).collect()
    [(1, 0), (2, 1), (3, 4), (4, 1), (5, 1), (6, 2), (7, 0)]
    
    # 2.计算各列的缺失情况百分比
    df_miss.agg(*[(1 - (fn.count(c) / fn.count('*'))).alias(c + '_missing') for c in df_miss.columns]).show()
    
    # 3、删除缺失值过于严重的列
    # 其实是先建一个DF,不要缺失值的列
    df_miss_no_income = df_miss.select([
    c for c in df_miss.columns if c != 'income'
    ])
    
    # 4、按照缺失值删除行(threshold是根据一行记录中,缺失字段的百分比的定义)
    df_miss_no_income.dropna(thresh=3).show()
    
    # 5、填充缺失值,可以用fillna来填充缺失值,
    # 对于bool类型、或者分类类型,可以为缺失值单独设置一个类型,missing
    # 对于数值类型,可以用均值或者中位数等填充
    
    # fillna可以接收两种类型的参数:
    # 一个数字、字符串,这时整个DataSet中所有的缺失值都会被填充为相同的值。
    # 也可以接收一个字典{列名:值}这样
    
    # 先计算均值,并组织成一个字典
    means = df_miss_no_income.agg( *[fn.mean(c).alias(c) for c in df_miss_no_income.columns if c != 'gender']).toPandas().to_dict('records')[0]
    # 然后添加其它的列
    means['gender'] = 'missing'
    
    df_miss_no_income.fillna(means).show()
    

    异常值处理

    '''
    3、异常值处理
    异常值:不属于正常的值 包含:缺失值,超过正常范围内的较大值或较小值
    分位数去极值
    中位数绝对偏差去极值
    正态分布去极值
    上述三种操作的核心都是:通过原始数据设定一个正常的范围,超过此范围的就是一个异常值
    '''
    df_outliers = spark.createDataFrame([
    (1, 143.5, 5.3, 28),
    (2, 154.2, 5.5, 45),
    (3, 342.3, 5.1, 99),
    (4, 144.5, 5.5, 33),
    (5, 133.2, 5.4, 54),
    (6, 124.1, 5.1, 21),
    (7, 129.2, 5.3, 42),
    ], ['id', 'weight', 'height', 'age'])
    # 设定范围 超出这个范围的 用边界值替换
    
    # approxQuantile方法接收三个参数:参数1,列名;参数2:想要计算的分位点,可以是一个点,也可以是一个列表(0和1之间的小数),第三个参数是能容忍的误差,如果是0,代表百分百精确计算。
    
    cols = ['weight', 'height', 'age']
    
    bounds = {}
    for col in cols:
        quantiles = df_outliers.approxQuantile(col, [0.25, 0.75], 0.05)
        IQR = quantiles[1] - quantiles[0]
        bounds[col] = [
            quantiles[0] - 1.5 * IQR,
            quantiles[1] + 1.5 * IQR
            ]
    
    >>> bounds
    {'age': [-11.0, 93.0], 'height': [4.499999999999999, 6.1000000000000005], 'weight': [91.69999999999999, 191.7]}
    
    # 为异常值字段打标志
    outliers = df_outliers.select(*['id'] + [( (df_outliers[c] < bounds[c][0]) | (df_outliers[c] > bounds[c][1]) ).alias(c + '_o') for c in cols ])
    outliers.show()
    #
    # +---+--------+--------+-----+
    # | id|weight_o|height_o|age_o|
    # +---+--------+--------+-----+
    # |  1|   false|   false|false|
    # |  2|   false|   false|false|
    # |  3|    true|   false| true|
    # |  4|   false|   false|false|
    # |  5|   false|   false|false|
    # |  6|   false|   false|false|
    # |  7|   false|   false|false|
    # +---+--------+--------+-----+
    
    # 再回头看看这些异常值的值,重新和原始数据关联
    
    df_outliers = df_outliers.join(outliers, on='id')
    df_outliers.filter('weight_o').select('id', 'weight').show()
    # +---+------+
    # | id|weight|
    # +---+------+
    # |  3| 342.3|
    # +---+------+
    
    df_outliers.filter('age_o').select('id', 'age').show()
    # +---+---+
    # | id|age|
    # +---+---+
    # |  3| 99|
    # +---+---+
    
    展开全文
  • pyspark.sql.DataFrameNaFunctions() DataFrame中处理缺失值的函数 drop(how=‘any’, thresh=None, subset=None) 返回删除含有空行的DataFrame,DataFrame.dropna()和DataFrameNaFunctions.drop()是彼此的别名。 ...

    pyspark.sql.DataFrameNaFunctions()

    DataFrame中处理缺失值的函数

    drop(how=‘any’, thresh=None, subset=None)

    返回删除含有空行的DataFrame,DataFrame.dropna()和DataFrameNaFunctions.drop()是彼此的别名。

    • how:'any’or ‘all’. 'any’删除包含空值的行,'all’一行中全部为空则删除改行.
    • thresh: int,默认值无如果指定,则删除小于thresh非空值的行。这将覆盖how参数
    • subset:要考虑的可选列名称列表

    fill(value, subset=None)

    填充空值,na.fill(). DataFrame.fillna()和DataFrameNaFunctions.fill()是彼此的别名。

    • value: int,long,float,string,bool或dict。用于替换空值的值。如果值是dict,则子集将被忽略,并且值必须是从列名(字符串)到替换值的映射。替换值必须是int,long,float,boolean或string
    • subset:要考虑的可选列名称列表。子集中指定的不具有匹配数据类型的列将被忽略。

    replace(to_replace, value=no value, subset=None)

    返回替换后的新的DataFrame。DataFrame.replace()和DataFrameNaFunctions.replace()是彼此的别名。to_replace的value和value必须具有相同的类型,只能是numerics, booleans, or strings。value可以为空,替换时,新值将强制转换为现有列的类型

    • to_replace – bool,int,long,float,string,list或dict。要替换的值,如果值是dict,则将忽略value或将其忽略,并且to_replace 必须是值和替换之间的映射
    • value: bool,int,long,float,string,list或None。替换值必须是bool,int,long,float,string或None。如果value是一个列表,则value的长度和类型应与to_replace相同。如果value是一个标量,并且to_replace是一个序列,则使用value替换to_replace中的每个项目。
    • subset:要考虑的可选列名称列表。子集中指定的不具有匹配数据类型的列将被忽略
    展开全文
  • 本节来学习pyspark.sql.functions中的pandas_udf函数。博客案例中用到的数据可以点击此处下载(提取码:2bd5) pyspark.sql.functions.pandas_udf(f=None, returnType=None, functionType=None) pandas_udf是用户...

    本节来学习pyspark.sql.functions中的pandas_udf函数。博客中代码基于spark 2.4.4版本。不同版本函数会有不同,详细请参考官方文档。博客案例中用到的数据可以点击此处下载(提取码:2bd5)

    pyspark.sql.functions.pandas_udf(f=None, returnType=None, functionType=None)

    pandas_udf是用户定义的函数,由Spark使用Arrow来传输数据,并使用Pandas来处理数据,从而实现矢量化操作。使用pandas_udf,可以方便的在PySpark和Pandas之间进行互操作,并且保证性能;其核心思想是将Apache Arrow作为序列化格式。

    Pandas UDF通常表现为常规的PySpark函数API。

    • f: 用户定义的函数;
    • returnType: 用户自定义函数的返回值类型,该值可以是pyspark.sql.types.DataType对象或DDL格式的类型字符串
    • functionType: pyspark.sql.functions.PandasUDFType中的枚举值。 默认值:SCALAR.存在此参数是为了兼容性。 鼓励使用Python类型。

    Pandas_UDF类型

    目前,有三种类型的Pandas_UDF,分别是Scalar(标量映射),GROUPED_MAP(分组映射),GROUPED_AGG(分组聚合),对应的输入输出格式如下表

    pandas_udf类型 输入 输出
    SCALAR 一个或多个pd.Series 一个pd.Series
    GROUPED_MAP pd.DataFrame pd.DataFrame
    GROUPED_AGG 一个或多个pd.Series SCALAR
    # 在学习之前先导入必要的包和数据
    from pyspark.sql import SparkSession
    from pyspark.sql.types import IntegerType, FloatType
    import pandas as pd
    from pyspark.sql.functions import pandas_udf, udf
    
    spark = SparkSession.Builder().master('local').appName('GroupedData').getOrCreate()
    df = spark.read.csv('../data/data.csv', header=True)
    
    df = df.withColumn('投篮数', df['投篮数'].astype(IntegerType()))
    df = df.withColumn('命中', df['命中'].astype(IntegerType()))
    df = df.withColumn('投篮命中率', df['投篮命中率'].astype(FloatType()))
    df = df.withColumn('3分命中率', df['3分命中率'].astype(FloatType()))
    df = df.withColumn('篮板', df['篮板'].astype(IntegerType()))
    df = df.withColumn('助攻', df['助攻'].astype(IntegerType()))
    df = df.withColumn('得分', df['得分'].astype(IntegerType()))
    

    使用方式

    使用pandas_udf作为装饰器来定义Pandas UDF

    注意:在spark 3.0之前,类型提示都应使用functionType指定pandas UDF类型。

    1. Scalar(标量类型)

    Scalar UDF定义了一个转换,函数输入一个或多个pd.Series,输出一个pd.Series,函数的输出和输入有相同的长度

    Scalar Pandas UDF用于向量化标量操作。常常与select和withColumn等函数一起使用。其中调用的Python函数需要使用pandas.Series作为输入并返回一个具有相同长度的pandas.Series。具体执行流程是,Spark将列分成批(每批进行向量化计算的数据量由 spark.sql.execution.arrow.maxRecordsPerBatch 参数控制,默认为10000条),并将每个批作为数据的子集进行函数的调用,进而执行panda UDF,最后将结果连接在一起。

    from pyspark.sql.functions import PandasUDFType
    
    @pandas_udf(returnType=IntegerType(), functionType=PandasUDFType.SCALAR)
    def total_shoot(x, y):
        return x + y
    
    df.withColumn("篮板+助攻", total_shoot("篮板", "助攻")).show(2)
    
    +---+----+----+------+----+------+----------+---------+----+----+----+---------+
    |_c0|对手|胜负|主客场|命中|投篮数|投篮命中率|3分命中率|篮板|助攻|得分|篮板+助攻|
    +---+----+----+------+----+------+----------+---------+----+----+----+---------+
    |  0|勇士|  胜|    客|  10|    23|     0.435|    0.444|   6|  11|  27|       17|
    |  1|国王|  胜|    客|   8|    21|     0.381|    0.286|   3|   9|  27|       12|
    +---+----+----+------+----+------+----------+---------+----+----+----+---------+
    only showing top 2 rows
    

    2. Grouped Map(分组映射)

    GROUPED_MAP UDF定义了转换:pd.DataFrame -> pd.DataFrame,returnType使用StructType描述返回值的pd.DataFrame的模式。
    Grouped Map(分组映射)panda_udf与groupBy().apply()一起使用,后者实现了“split-apply-combine”模式。
    “split-apply-combine”包括三个步骤:

    • 使用DataFrame.groupBy将数据分成多个组。
    • 对每个分组应用一个函数。函数的输入和输出都是pandas.DataFrame。输入数据包含每个组的所有行和列。
    • 将结果合并到一个新的DataFrame中。

    要使用groupBy().apply(),需要定义以下内容:

    • 定义每个分组的Python计算函数,这里可以使用pandas包或者Python自带方法。
    • 一个StructType对象或字符串,它定义输出DataFrame的格式,包括输出特征以及特征类型。
      需要注意的是,StructType对象中的Dataframe特征顺序需要与分组中的Python计算函数返回特征顺序保持一致。

    此外,在应用该函数之前,分组中的所有数据都会加载到内存,这可能导致内存不足抛出异常。

    from pyspark.sql import functions as F
    
    @pandas_udf('win_flag int, score double', functionType=PandasUDFType.GROUPED_MAP)
    def normalize(pdf):
        v = pdf['score']
        return pdf.assign(score=(v - v.mean() / v.std()))
    
    # 实验中中文列名会报错,所以尝试英文列名
    df = df.withColumn('win_flag', F.when(df['胜负'] == '胜', 1).otherwise(0))
    df = df.withColumn('score', df['得分'])
    df.select('win_flag', 'score').groupby('win_flag').apply(normalize).show(3)
    
    +--------+-----------------+
    |win_flag|            score|
    +--------+-----------------+
    |       1|23.21879940059463|
    |       1|23.21879940059463|
    |       1|25.21879940059463|
    +--------+-----------------+
    only showing top 3 rows
    

    3. GROUPED_AGG

    GROUPED_AGG定义了一个或多个pandas.Series -> 一个scalar,scalar的返回值类型(returnType)应该是原始数据类型

    Grouped aggregate Panda UDF类似于Spark聚合函数。Grouped aggregate Panda UDF常常与groupBy().agg()和pyspark.sql.window一起使用。 需要注意的是,这种类型的UDF不支持部分聚合,组或窗口的所有数据都将加载到内存中。此外,目前只支持Grouped aggregate Pandas UDFs的无界窗口。 下面的例子展示了如何使用这种类型的UDF来计算groupBy和窗口操作的平均值

    # 统计 胜 和 负 的平均分
    @pandas_udf('int', PandasUDFType.GROUPED_AGG)
    def count_num(v):
        return v.mean()
    
    df.groupby("胜负").agg(count_num(df['得分']).alias('avg_score')).show(2)
    
    +----+---------+
    |胜负|avg_score|
    +----+---------+
    |  负|       27|
    |  胜|       32|
    +----+---------+
    

    文末致敬官方文档:http://spark.apache.org/docs/latest/api/python/pyspark.sql.html#pyspark.sql.functions.pandas_udf

    展开全文
  • pyspark.sql.DataFrame

    2018-09-04 10:24:47
    method of DataFrame 缺失值&...重复值 处理 drop_duplicates eg: 1. df.drop_duplicates() 2.df.drop_duplicates(columns_lst) df.fillna(value),df.na.fill(dict{col_name:fill_value})

    method of DataFrame

    缺失值&重复值 处理

    • drop_duplicates
      eg: 1. df.drop_duplicates() 2.df.drop_duplicates(columns_lst)

    • df.fillna(value),df.na.fill(dict{col_name:fill_value})

    • df1.intersect (df2) 两个DF的交集

    • df1.subtract(df2) 属于df1 但不属于df2 的ROW

    • orderBy(col,ascending)

    from pyspark.sql.functions import *
    df.orderBy(colname,ascending=0)
    df.orderBy(asc(col_name))
    • randomSplit

    • getNunPartitions()

    -spark.range() 生成的是id ,本身就定义了列名”id”

    • sampleBy(col,fraction)
      按照某列采样,可对样本进行下采样

    • 基于自己的理解,DF中的select 与rdd 中的map 相同

    • unpersist(blocking=False) 释放内存

    • withColumn(col_name,exp) 用来新增加一列值,一般在rdd中增加一列写起来比较麻烦,DF中使用withColumn 该方法简便.

    展开全文
  • Spark SQL 简介及参考链接 Spark 是一个基于内存的用于处理大数据的集群计算框架。它提供了一套简单的编程接口,从而使得应用程序开发者方便使用集群节点的CPU,内存,存储资源来处理大数据。 Spark API提供了Scala,...
  • Mocule Context Spark SQL 和DF重要的类 类名 说明 SparkSession DF以及SQL的入口 DataFrame 分布式数据集 Column ...处理缺失数据的方法 DataFrameStatFu...
  • 目录pyspark创建DataFrameRDD和DataFrame使用二元组创建DataFramepyspark连接mysqlpyspark SQL常用语法输出schema预览表统计数量输出列名称和字段类型选择列为选择的列赋予新名称按条件过滤构造新列增加行删除重复...
  • Spark SQL 简介及参考链接 Spark 是一个基于内存的用于处理大数据的集群计算框架。它提供了一套简单的编程接口,从而使得应用程序开发者方便使用集群节点的CPU,内存,存储资源来处理大数据。 Spark API提供了Scala,...
  • PySpark - Spark SQL基础

    2020-06-26 00:22:31
    Spark SQL 是 Apache Spark 处理结构化数据的模块。 一、初始化 SparkSession SparkSession 用于创建数据框,将数据框注册为表,执行 SQL 查询,缓存表及读取 Parquet 文件。 from pyspark.sql import ...
  • from pyspark.sql import SparkSession from pyspark.sql import Row from pyspark.sql.types import * import pyspark.sql.functions as fn import pyspark.sql.types as typ spark = SparkSession.builder....
  • 数据体积(Volume)指定要处理的数据量。对于大量数据,我们需要大型机器或分布式系统。计算时间随数据量的增加而增加。所以如果我们能并行化计算,最好使用分布式系统。数据可以是结构化数据、非结构化数据或介于两者...

空空如也

空空如也

1 2 3 4 5 ... 8
收藏数 151
精华内容 60
关键字:

pyspark处理sql