精华内容
下载资源
问答
  • DataFrameWriter.csv

    2018-10-31 16:41:27
    def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None, header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None, timestampFormat=Non...
    def csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
            header=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
            timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
            charToEscapeQuoteEscaping=None):
        """Saves the content of the :class:`DataFrame` in CSV format at the specified path.
    param 
    

    path:

    hdfs路径

     

    mode:

    * ``append``: Append contents of this :class:`DataFrame` to existing data.
    * ``overwrite``: Overwrite existing data.
    * ``ignore``: Silently ignore this operation if data already exists.
    * ``error`` or ``errorifexists`` (default case): Throw an exception if data already \
        exists.

     

    compression:

     

    压缩编解码器,在保存到文件时使用。这可以是已知的不区分大小写的缩写名称之一(None、bzip 2、gzip、lz 4、snappy和unate)。

     

    sep:

    字段分隔符,默认","

     

    quote:

    设置一个字符,用于转义引用的值,其中分隔符可以是值的一部分。

    如果无设置,则使用默认值“"”。

    如果设置了空字符串,则使用‘u0000’(空字符)。

     

    escape:

    设置一个字符,用于在已引号的值中转义引号。

    如果无设置,则使用默认值``\‘

     

    escapeQuotes:

    一个标志,指示包含引号的值是否总是以引号括起来。

    如果无设置,则使用默认值`‘true’,转义包含引号字符的所有值。

     

    quoteAll:

     

    指示是否所有值都应以引号括起来的标志。

    如果没有设置,则使用默认值“false”,只转义包含引号字符的值。

     

    header:

    将列的名称写入第一行。如果没有设置,则使用默认值“false”。

     

    nullValue:

    设置空值的字符串表示形式。如果无设置,则使用默认值空字符串。

     

    dateFormat:

    设置指示日期格式的字符串。

    自定义日期格式遵循``java.texts.ImeDateFormat‘的格式。这适用于日期类型。如果无设置,则使用默认值‘yyyy-mm-dd’。

     

    timestampFormat:

    设置指示时间戳格式的字符串。

    自定义日期格式遵循``java.texts.ImeDateFormat‘的格式。这适用于时间戳类型。

    如果无设置,则使用默认值‘yyyy-MM-dd’T‘HH:mm:ss.SSSXXX’。

     

    ignoreLeadingWhiteSpace:

    指示是否应跳过从正在写入的值中引导空白空间的标志。

    如果没有设置,则使用默认值“true”。

     

    ignoreTrailingWhiteSpace:

    应跳过指示是否应跳过正在写入的值中的尾随空格的标志。

    如果没有设置,则使用默认值“true”。

     

    charToEscapeQuoteEscaping:

    设置单个字符,用于转义引号字符的转义。

    如果无设置,则当转义字符和引号字符不同时,默认值为转义字符。

     

    展开全文
  • ") 执行的时候,会爆错误: TypeError: ‘DataFrameWriter’ object is not callable 更本原因是,write 不是一个函数,不能写成write(); 正确代码如下: # 原始数据schame获取 # originTableRow = spark.table...

    pyspark 在写将rdd数据转成dataframe数据之后,写入指定路径(表中)
    样例指令:

       # 原始数据schame获取
        # originTableRow = spark.table(output_db + "." + tableName + "_temp")
        field_name_list = ["id","schema_table", "value", "unit",  "first_type", "second_type", "group_id"]
        fields = [StructField(field_name, StringType(), True) for field_name in field_name_list]
        schema = StructType(fields)
    
        print("result size: %d" % resultRdd.count())
        if resultRdd:
            spark.read.schema(schema).json(resultRdd).write().save(savaPath_backup, "orc")
            print("rdd result write success!")
        else:
            print("rdd result is empty!")
    

    执行的时候,会爆错误:
    TypeError: ‘DataFrameWriter’ object is not callable

    更本原因是,write 不是一个函数,不能写成write();
    正确代码如下:

       # 原始数据schame获取
        # originTableRow = spark.table(output_db + "." + tableName + "_temp")
        field_name_list = ["id","schema_table", "value", "unit",  "first_type", "second_type", "group_id"]
        fields = [StructField(field_name, StringType(), True) for field_name in field_name_list]
        schema = StructType(fields)
    
        print("result size: %d" % resultRdd.count())
        if resultRdd:
            spark.read.schema(schema).json(resultRdd).write.save(savaPath_backup, "orc")
            print("rdd result write success!")
        else:
            print("rdd result is empty!")
    
    展开全文
  • saveAsTable是DataFrameWriter的方法,DFW会有mode和option,mode统一有4种,但saveAsTable没有option,可以在上面的官文中查看某方法有哪些option 3. saveAsTable执行后,原来hive的表的元数据会变,TBLPROPERTIES...

    须知

    1. toplink

    2. saveAsTable是DataFrameWriter的方法,DFW会有mode和option,mode统一有4种,但saveAsTable没有option,可以在上面的官文中查看某方法有哪些option

    3. saveAsTable执行后,原来hive的表的元数据会变,TBLPROPERTIES会增加很多spark相关的属性。但分区字段会变成普通字段,需要使用DataFrameWriter的partitionBy方法重新指定下分区。

    4. 注意:sparksql中的INSERT OVERWRITE 后面必须加TABLE,否则报错

    5. 注意:调整DF中的字段顺序、筛选字段,可以使用select方法

    方案

    1. saveAsTable

    在这里插入图片描述

    在此方法中,如果数据源表存在于Spark-catalog中,则使用save方法参数中的模式来确定如何插入。如果表不在Spark-catalog中,我们总是会覆盖它(比如JDBC数据源中的一个表),因为spark可以整合hive,所以hive中的表在Spark-catalog中,但比如spark创建的临时表,就不在Spark-catalog中。如果表在Spark-catalog中,则追加。
    如果df根据一个没分区的 hive表创建,并且可以转换为hive内置的序列化器(比如ORC和Parquet),那么会以hive兼容的格式持久化到hdfs。否则,会以sparksql指定的格式持久化。

    问题:
    对象是分区表时,执行完分区表的元数据会变,分区字段变成普通字段,需要partitionBy方法指定分区字段
    Overwrite模式时,会把其他分区覆盖,暂时没有好的方法

    实验

    创建一个分区表t1,然后在t1基础上添加一个字段,创建表t2
    使用spark创建这个表的df1,然后添加字段转为df2,然后df2使用saveAsTable插入到t2的新分区。

    环境

    create table IF NOT EXISTS test1.parTable1(id string,name string) 
    partitioned by (etl_date string) stored as parquet;
    create table IF NOT EXISTS test1.parTable2(id string,name string,age string) 
    partitioned by (etl_date string) stored as parquet;
    

    1. saveAsTable

    发现overwrite状态下会将所有分区覆盖,并且使用show create table会发现分区字段变成普通字段
    在这里插入图片描述

    df3.write.mode(SaveMode.Overwrite).format("parquet").saveAsTable("test1.parTable2")
    

    添加.partitionBy(“etl_date”)方法后,分区字段还在,但还是会把其他所有分区删除

    df3.write.partitionBy("etl_date").mode(SaveMode.Overwrite).format("parquet").saveAsTable("test1.parTable2")
    

    网上没有找到解决方法,原因可能是因为表不是通过spark建的,所以会有问题,但也不能每次都通过spark建一次吧。网上说spark2.3已经解决这个问题。

    2. insert into

    不管有没有.partitionBy(“etl_date”),只要是Overwrite模式,都会覆盖所有分区。不加Overwrite默认append模式,不会删除其他分区但指定分区也不能覆盖

     df4.write
          .partitionBy("etl_date")
          .mode(SaveMode.Overwrite)
          .insertInto(tableName="test1.parTable2")
    

    3. 注册为spark临时表然后直接调用hql,能解决分区的覆盖问题,但不能指定插入的字段。只能是拼接INSERT INTO后面的SELECT列表

        val code :(String => String) = (arg: String) => {"1"}
        val addCol = udf(code)
        val df3 = df2.withColumn("age",addCol(col("id")))
        val df4 = df3.select("id","name","age")
        df4.show()
       /*
       目的就是为了把df4插入到指定表中
        */
        df4.createOrReplaceTempView("temp_df4")
        sparkSession.sql(
          """
            | INSERT OVERWRITE TABLE test1.parTable2 partition(ETL_DATE='20200916')
            | SELECT * FROM temp_df4
          """.stripMargin)
    

    拼接INSERT INTO后的SELECT 列表,没有的列设为null,因为select 后面如果不是列名而是值,则直接输出值。

    val mxjgDF = sparkSession.table(s"ADS.${tableNamePre}_${modelName}_MXJG")
    val fieldNames = mxjgDF.schema.fieldNames.dropRight(1) // 所有字段的按序数组
    .map(_.toLowerCase)
    Logl.error("CustomLog-MX-fieldNames:)
    for (i <- fieldNames)
    	logl.error(i)
    }
    Logl.error(s"CustomLog-MX-outputRule:outputRule")
    val outputcols = outputRule.split(",")
    	.map(s => if (s.startsWith("`") s.drop(1).dropRight(1) else s)
    	.map(_.toLowerCase)
    var colsStr = ""
    for (i <- fieldNames){
    	if (outputcois.contains(i)) colsStr += i + ","
    	else colsstrHnuii)
    colsstr = colsStr.dropRight(1)
    Logl.error(s"CustomLog-MX-outputcols:${outputcols.foneach(printLn(_))}")
    logl.error(s"CustomLog-MX-colsStr:$sStr")
    
    val temp_outputDF:DataFrame = transfomer.transform(wideDF)
    log1.error(s"预测结果DF:")
    temp_outputDF.show(2)
    
    展开全文
  • 返回DataFrameWriter对象,用于将df保存到其它文件、数据库或表中 writeStream 返回DataStreamWriter对象,用于将DataFrame流数据写入其他存储结果 schema 返回当前df的schema,格式为pyspark.sql....

    总结来自pyspark的官方文档:http://spark.apache.org/docs/latest/api/python/index.html

    pyspark中一共有以下几个包和子包:

    pyspark
    pyspark.sql
    pyspark.streaming
    pyspark.ml
    pyspark.mllib
    

    Spark最常用的几个核心类有如下:

    类名 功能
    pyspark.sql.SparkSession Spark的主要入口点,还包含了操作DataFrame和SQL的方法
    pyspark.SparkContext Spark功能的主要入口点
    pyspark.RDD 弹性分布式数据集(RDD),Spark中的基本数据集合
    pyspark.sql.DataFrame 分布式数据集合,类似与SQL数据库中的表
    pyspark.sql.Column DataFrame中的一列数据
    pyspark.sql.Row DataFrame中的一行数据
    pyspark.sql.DataFrameReader 类中包含读取文件为DataFrame的函数
    pyspark.sql.DataFrameWriter 类中包含将DataFrame写入其它文件或数据库的函数
    pyspark.sqlDataFrameNaFunctions 用于处理DataFrame中的缺失值
    pyspark.sqlDataFrameStatFunctions 用于对DataFrame做统计相关的功能
    pyspark.sql.functions 集成了可用于DataFrame的内置函数
    pyspark.streaming.StreamingContext Spark Streaming功能的主要入口点
    pyspark.streaming.DStream 离散流(DStream),Spark Streaming中的基本数据类型

    其中,SparkSession和SparkContext 是Spark的两个入口,目前官方推荐使用SparkSession作为入口,实际上SparkSession中包含了SparkContext中的大部分方法。

    RDD和DataFrame是Spark中最常用的两种数据集合。Column和Row分别是DataFrame类型中的一列和一行。

    1. RDD(Resilient Distributed Dataset),弹性分布式数据集,集合内对象分布在不同的节点之上,可以并行执行操作,是spark应用程序的基本数据结构。RDD中的元素类型是非结构化的,可以是任意JVM对象。

    2. DataFrame,类似于关系型数据库中的表,相比于RDD,其内部的每一条记录都是结构化的。一般而言,推荐优先使用这种数据结构,可以轻易的在其上使用sql类操作,与hive结合较好。

    1. Session类

    1.1 Session类的常用属性

    属性 返回值类型
    sparkContext 返回底层的SparkContext对象
    read 返回DataFrameReader对象,用于读取文件等
    version Spark的版本号,字符串
    conf Spark的配置,可用Session.conf.set()设置配置
    udf 返回一个UDFRegistration对象
    readStream 返回DataStreamReader对象,用于读取数据流
    streams 返回StreamingQueryManager,用于管理当前context下激活的所有StreamingQueries
    catalog 返回Catalog对象,用于创建、删除、更改或查询底层数据库/表/函数等

    1.2 Session类的常用方法

    1.2.1 获取Session对象,打开Spark入口

    注意:以下代码中出现的“spark”均为SparkSession对象

    from pyspark.sql import SparkSession
    spark = SparkSession.builder.appName("your_app_name").getOrCreate()
    

    1.2.2 配置Session

    spark.conf.set(“spark.sql.sources.partitionOverwriteMode”, “dynamic”)

    1.2.3 sql方法

    def sql(self, sqlQuery)

    传入SQL指令并执行,将获取到的数据存放到DataFrame中并返回

    >>> df2 = spark.sql("SELECT field1 AS f1, field2 as f2 from table1")
    >>> df2.collect()   # 将df的所有数据放到list中并返回,后面会介绍此方法
    [Row(f1=1, f2=u'row1'), Row(f1=2, f2=u'row2'), Row(f1=3, f2=u'row3')]
    

    1.2.4 createDataFrame方法

    def createDataFrame(self, data, schema=None, samplingRatio=None, verifySchema=True)

    通过RDD、list或pandas.DataFrame来创建一个pyspark的DataFrame对象。

    data: 用来创建df的原数据。传入类型:RDD、list、tuple、dict或pandas.DataFrame

    schema: 用来指定列名或列的数据类型。传入类型:pyspark.sql.types.DataType、datatype string 或 字符串列表表示列名。示例:

    1)指定列名:schema=[‘name’, ‘age’]
    2)同时指定列名和数据类型:“a: string, b: int”
    3)同时指定列名和数据类型

    schema = StructType([
        StructField("name", StringType(), True),
        StructField("age", IntegerType(), True)])
    

    返回值: DataFrame对象

    例程:

    # 用tuple创建一行两列的df
    >>> l = [('Alice', 1)]   # tuple
    >>> spark.createDataFrame(l).collect()	
    [Row(_1=u'Alice', _2=1)  
    # schema参数指定列名
    >>> spark.createDataFrame(l, ['name', 'age']).collect()
    [Row(name=u'Alice', age=1)]
    
    # 用dict创建一行两列的df
    >>> d = [{'name': 'Alice', 'age': 1}]   
    >>> spark.createDataFrame(d).collect()  
    [Row(age=1, name=u'Alice')]
    
    # 用rdd创建df
    >>> sc = SparkContext.getOrCreate()
    >>> rdd = sc.parallelize(l)     # 生成rdd的一种方法,会面会讲
    >>> spark.createDataFrame(rdd).collect()  
    [Row(_1=u'Alice', _2=1)]
    >>> df = spark.createDataFrame(rdd, ['name', 'age']) # 用rdd创建df,并指定列名
    >>> df.collect()
    [Row(name=u'Alice', age=1)]
    
    # 用schema参数指定列名和每列的数据类型
    >>> from pyspark.sql.types import *
    >>> schema = StructType([
    ...    StructField("name", StringType(), True),
    ...    StructField("age", IntegerType(), True)])
    >>> df3 = spark.createDataFrame(rdd, schema)
    >>> df3.collect()
    [Row(name=u'Alice', age=1)]
    
    # 用pandas.DataFrame创建Spark的df
    >>> spark.createDataFrame(df.toPandas()).collect()  # doctest: +SKIP
    [Row(name=u'Alice', age=1)]
    >>> spark.createDataFrame(pandas.DataFrame([[1, 2]])).collect()  # doctest: +SKIP
    [Row(0=1, 1=2)]
    
    # 用rdd创建df,并且用schema参数指定列名和数据类型
    >>> spark.createDataFrame(rdd, "a: string, b: int").collect()
    [Row(a=u'Alice', b=1)]
    >>> rdd = rdd.map(lambda row: row[1])
    >>> spark.createDataFrame(rdd, "int").collect()
    [Row(value=1)]
    >>> spark.createDataFrame(rdd, "boolean").collect() # doctest: +IGNORE_EXCEPTION_DETAIL
    Traceback (most recent call last):
        ...
    Py4JJavaError: ...
    

    1.2.5 range方法

    def range(self, start, end=None, step=1, numPartitions=None)
    Create a :class:DataFrame with single :class:pyspark.sql.types.LongType column named
    id, containing elements in a range from start to end (exclusive) with
    step value step.

    构建一个只有一列的DataFrame,内容是等差数列。
    start: 等差数列开始的数字;
    end: 等差数列结束的数字。数列中一定不包含end!
    step: 差值是多少;
    numPartitions: 从方法说明中没有搞懂什么意思;
    返回值: 只有1列的DataFrame,行数为数列的数字个数

    # 生成1-6的等差数列,不包含7.
    >>> ss.range(1,7,1).collect()
    [Row(id=1), Row(id=2), Row(id=3), Row(id=4), Row(id=5), Row(id=6)]
    
    >>> spark.range(1, 7, 2).collect()
    [Row(id=1), Row(id=3), Row(id=5)]
    
    # 只指定一个参数,该参数会被视为end
    >>> spark.range(3).collect()
    [Row(id=0), Row(id=1), Row(id=2)]
    

    1.2.6 stop方法

    def stop(self):
    停止底层的对象SparkContext。当SparkSession对象被创建时,底层是默认打开一个SparkContext对象的。

    2. DataFrame类

    2.1 DataFrame类的常用属性

    属性 功能
    dtypes 返回所有列名和列数据类型组成的元祖列表,例:[(‘col_1’, ‘int’), (‘col_2’, ‘string’)]
    columns 返回所有列名组成列表,例:[‘col_1’, ‘col_2’]
    rdd 将df的内容创建成rdd并返回,原df中的一行Row变为rdd中一个元素
    na 返回DataFrameNaFunctions的对象,用于处理缺失值
    stat 返回DataFrameStatFunctions的对象,用于做统计相关的功能
    write 返回DataFrameWriter对象,用于将df保存到其它文件、数据库或表中
    writeStream 返回DataStreamWriter对象,用于将DataFrame流数据写入其他存储结果
    schema 返回当前df的schema,格式为pyspark.sql.types.StructType
    isStreaming False 或 True, 返回当前df是否包含流数据源
    storageLevel 返回当前df的存储等级

    2.2 DataFrame类的常用方法

    2.2.1 创建或替换临时表的方法

    (1)createOrReplaceTempView(self, name):创建或替换临时表。
    使用当前df创建一个临时的表或替换当前有的表(相当于数据库中的表),表的声明周期与生成该df的SparkSession生命周期一致。 参数name为字符串,表示要创建的表名。
    例子:

    >>> df.createOrReplaceTempView("people")  # 用df创建临时表people
    >>> df2 = df.filter(df.age > 3)  
    >>> df2.createOrReplaceTempView("people") # 用df2替换刚才临时表people的内容
    >>> df3 = spark.sql("select * from people") # 用sql语句尝试读取临时表people的内容
    >>> sorted(df3.collect()) == sorted(df2.collect()) # 验证读取出来的数据与原df2是否一致
    True     # 一致!
    >>> spark.catalog.dropTempView("people")  # 删除临时表
    

    (2)createTempView(name):仅创建临时表,当表名已经存在时会报错。
    (3)createGlobalTempView(name):创建本地的临时表,当表名已经存在时会报错。
    (4)createOrReplaceGlobalTempView(name):创建或替换本地的临时表。

    2.2.2 打印DataFrame对象的相关信息方法

    (1) printSchema() : 打印df的schema信息

    以树的显示格式打印当前df的schema信息。
    例子:

    >>> df.printSchema()
    root
    	|-- age: integer (nullable = true)
    	|-- name: string (nullable = true)
    <BLANKLINE>
    
    (2) explain(extended=False) : 打印(逻辑和物理)计划

    打印(逻辑和物理)计划到控制台,以方便程序员进行debug。参数extended,是否打印逻辑计划,默认False,只打印物理计划。

    >>> df.explain()
    == Physical Plan ==
    Scan ExistingRDD[age#0,name#1]
    
    >>> df.explain(True)
    == Parsed Logical Plan ==
    ...
    == Analyzed Logical Plan ==
    ...
    == Optimized Logical Plan ==
    ...
    == Physical Plan ==
    ...
    
    (3) show(self, n=20, truncate=True, vertical=False):查看df的前几行

    打印DataFrame的前几行,相当于pandas里面的head()方法。
    n: 打印前几行,默认20.
    truncate: 是否截断长度超过20的字符串?默认True截断,也可设置为其它数字自定义截断长度。
    vertical: 是否将数据垂直打印(one line per column value)?默认False
    例:

    >>> df
    DataFrame[age: int, name: string]
    >>> df.show()
    +---+-----+
    |age| name|
    +---+-----+
    |  2|Alice|
    |  5|  Bob|
    +---+-----+
    >>> df.show(truncate=3)
    +---+----+
    |age|name|
    +---+----+
    |  2| Ali|
    |  5| Bob|
    +---+----+
    >>> df.show(vertical=True)
    -RECORD 0-----
    age  | 2
    name | Alice
    -RECORD 1-----
    age  | 5
    name | Bob
    
    (4) head(n=None): 打印df的前几行

    返回DataFrame的前n行,这个方法是将结果放到本地机器上,所以要求n尽可能的小。
    n默认为1。

    >>> df.head()
    Row(age=2, name=u'Alice')
    >>> df.head(1)
    [Row(age=2, name=u'Alice')]
    
    (5) describe(*cols) : 查看df的统计信息

    查看df的统计信息,包含 count, mean, stddev, min, and max.
    如果不指定cols参数,默认统计所有数值和字符串列。
    (注意:此函数用于探索性数据分析,因为我们无法保证生成的DataFrame的模式的向后兼容性)

    >>> df.describe(['age']).show()
    +-------+------------------+
    |summary|               age|
    +-------+------------------+
    |  count|                 2|
    |   mean|               3.5|
    | stddev|2.1213203435596424|
    |    min|                 2|
    |    max|                 5|
    +-------+------------------+
    >>> df.describe().show()
    +-------+------------------+-----+
    |summary|               age| name|
    +-------+------------------+-----+
    |  count|                 2|    2|
    |   mean|               3.5| null|
    | stddev|2.1213203435596424| null|
    |    min|                 2|Alice|
    |    max|                 5|  Bob|
    +-------+------------------+-----+
    

    (6) summary(*statistics)

    计算df中的数字列和字符串列的特定的统计信息,可选有:
    - count
    - mean
    - stddev # 标准差
    - min
    - max
    - 任意近似百分位数,例: ‘75%’ , ‘10%’
    如果参数statistics不指定,则默认计算:count, mean, stddev, min, approximate quartiles (percentiles at 25%, 50%, and 75%), and max。
    如果只想对某些列进行统计,需要先用select选中某些列。

            >>> df.summary().show()
            +-------+------------------+-----+
            |summary|               age| name|
            +-------+------------------+-----+
            |  count|                 2|    2|
            |   mean|               3.5| null|
            | stddev|2.1213203435596424| null|
            |    min|                 2|Alice|
            |    25%|                 2| null|
            |    50%|                 2| null|
            |    75%|                 5| null|
            |    max|                 5|  Bob|
            +-------+------------------+-----+
    
            >>> df.summary("count", "min", "25%", "75%", "max").show()
            +-------+---+-----+
            |summary|age| name|
            +-------+---+-----+
            |  count|  2|    2|
            |    min|  2|Alice|
            |    25%|  2| null|
            |    75%|  5| null|
            |    max|  5|  Bob|
            +-------+---+-----+
    
            # 如果只想对某些列进行统计,需要先用select选中某些列
    
            >>> df.select("age", "name").summary("count").show()
            +-------+---+----+
            |summary|age|name|
            +-------+---+----+
            |  count|  2|   2|
            +-------+---+----+
    
    (7) count

    def count(self):
    返回df中的行数。

    >>> df.count()
    2
    
    (8) sample

    def sample(self, withReplacement=None, fraction=None, seed=None):
    相当于在SQL中看sample,返回对df采样的子df。

    withReplacement: Sample with replacement or not (default False).
    fraction: 采样几分之几的行,可取范围[0.0, 1.0],0.1代表取10%的样本。
    seed: 随机种子 (default a random seed).

    >>> df = spark.range(10)
    >>> df.sample(0.5, 3).count()
    4
    >>> df.sample(fraction=0.5, seed=3).count()
    4
    >>> df.sample(withReplacement=True, fraction=0.5, seed=3).count()
    1
    >>> df.sample(1.0).count()
    10
    >>> df.sample(fraction=1.0).count()
    10
    >>> df.sample(False, fraction=1.0).count()
    10
    
    (9) limit

    def limit(self, num):
    相当于SQL中的limit,限定df的前几列。

    >>> df.limit(1).collect()
    [Row(age=2, name=u'Alice')]
    >>> df.limit(0).collect()
    []	
    

    2.2.3 对df切片的方法:

    (1) first() : 返回df的第一行为Row对象

    返回df的第一行,返回值类型为Row对象。

    >>> df.first()
    Row(age=2, name=u'Alice')
    
    (2) filter(condition) 或 where(condition) : 筛选行

    使用给定的condition条件,选择满足条件的行,返回值为DataFrame对象。filter和where是同名同功能的。
    **condition:**可选类型两种:
    1)数据类型为BooleanType的Column类,如 df.age > 3;
    2)字符串:SQL表达式,如"age > 3";

    # 第1种类型的筛选条件
    >>> df.filter(df.age > 3).collect()
    [Row(age=5, name=u'Bob')]
    >>> df.where(df.age == 2).collect()
    [Row(age=2, name=u'Alice')]
    
    # 第2种类型的筛选条件
    >>> df.filter("age > 3").collect()
    [Row(age=5, name=u'Bob')]
    >>> df.where("age = 2").collect()
    [Row(age=2, name=u'Alice')]
    
    (3) *select(cols) : 用string筛选df的列

    根据表达式筛选特定的列出来,返回一个新的DataFrame。
    cols: list of column names (string) or expressions (:class:Column). 即list或string
    如果其中一个列名为’*’,则会把整个df都选中。

    >>> df.select('*').collect()
    [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
    # 对列名排序
    >>> df.select('name', 'age').collect()
    [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
    # 根据原先列做数据转换,还可以命名新的列名
    >>> df.select(df.name, (df.age + 10).alias('age')).collect()
    [Row(name=u'Alice', age=12), Row(name=u'Bob', age=15)]
    
    (4) selectExpr(*expr): 用SQL筛选df的列

    select方法的变种,输入参数是SQL表达式:
    例子:

    >>> df.selectExpr("age * 2", "abs(age)").collect()
    [Row((age * 2)=4, abs(age)=2), Row((age * 2)=10, abs(age)=5)]
    
    (5)drop : 删除列

    def drop(self, *cols):
    丢掉原df中的某些列,返回一个丢掉列后的DataFrame。

    cols: 列名字符串组成的列表[‘col_1’,‘col_2’],或类型为Column的对象,如df.name。

    >>> df.drop('age').collect()
    [Row(name=u'Alice'), Row(name=u'Bob')]
    
    >>> df.drop(df.age).collect()
    [Row(name=u'Alice'), Row(name=u'Bob')]
    
    >>> df.join(df2, df.name == df2.name, 'inner').drop(df.name).collect()
    [Row(age=5, height=85, name=u'Bob')]
    
    >>> df.join(df2, df.name == df2.name, 'inner').drop(df2.name).collect()
    [Row(age=5, name=u'Bob', height=85)]
    
    >>> df.join(df2, 'name', 'inner').drop('age', 'height').collect()
    [Row(name=u'Bob')]
    
    (6) colRegex : 用正则表达式选择列,返回Column对象

    def colRegex(self, colName):
    用正则表达式来选择df的某些列,返回Column对象,返回值可以输入到其它方法中要求输入列名的地方,如cols等参数。

    colName: string, column name specified as a regex.

    >>> df = spark.createDataFrame([("a", 1), ("b", 2), ("c",  3)], ["Col1", "Col2"])
    >>> df.select(df.colRegex("`(Col1)?+.+`")).show()
    +----+
    |Col2|
    +----+
    |   1|
    |   2|
    |   3|
    +----+
    

    2.2.4 分组和聚合操作

    (1) groupby(*cols) 或 groupBy

    同pandas里面的groupby,根据某些列将df分组,以方便做其它统计聚合操作。
    返回值为GroupedData对象,所有可以做的操作见类:GroupedData

    cols: list of columns to group by. Each element should be a column name (string) or an expression (:class:Column).

    >>> df.groupBy().avg().collect()
    [Row(avg(age)=3.5)]
    >>> sorted(df.groupBy('name').agg({'age': 'mean'}).collect())
    [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
    >>> sorted(df.groupBy(df.name).avg().collect())
    [Row(name=u'Alice', avg(age)=2.0), Row(name=u'Bob', avg(age)=5.0)]
    >>> sorted(df.groupBy(['name', df.age]).count().collect())
    [Row(name=u'Alice', age=2, count=1), Row(name=u'Bob', age=5, count=1)]
    
    (2) agg(self, *exprs)

    agg对应英文单词Aggregate,意思为聚合!可实现在df上直接进行聚合操作(最值、平均值等)。
    agg方法是快速实现df.groupBy.agg()功能的一种方法。

    >>> df.agg({"age": "max"}).collect()
    [Row(max(age)=5)]
    >>> from pyspark.sql import functions as F
    >>> df.agg(F.min(df.age)).collect()
    [Row(min(age)=2)]
    

    2.2.5 合并操作

    (1) join

    def join(self, other, on=None, how=None):
    用给定的join表达式合并两个df。

    other: 要与本df进行合并的df,本df为左表,other为右表。

    on: 根据哪些列进行join,可选:列字符串、列字符串列表、Column类型的对象。
    注意: on指定的列必须在两个表中都存在!!!

    how: str, 默认为 inner. Must be one of: inner, cross, outer,
    full, full_outer, left, left_outer, right, right_outer,
    left_semi, and left_anti.

    >>> df.join(df2, df.name == df2.name, 'outer').select(df.name, df2.height).collect()
    [Row(name=None, height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)]
    
    >>> df.join(df2, 'name', 'outer').select('name', 'height').collect()
    [Row(name=u'Tom', height=80), Row(name=u'Bob', height=85), Row(name=u'Alice', height=None)]
    
    >>> cond = [df.name == df3.name, df.age == df3.age]
    >>> df.join(df3, cond, 'outer').select(df.name, df3.age).collect()
    [Row(name=u'Alice', age=2), Row(name=u'Bob', age=5)]
    
    >>> df.join(df2, 'name').select(df.name, df2.height).collect()
    [Row(name=u'Bob', height=85)]
    
    >>> df.join(df4, ['name', 'age']).select(df.name, df.age).collect()
    [Row(name=u'Bob', age=5)]
    

    crossJoin(self, other): (一般不用!!!)

    返回本df与other两个df的笛卡尔积。

    (2) 行方向的合并,增加行

    union(self, other):
    合并本df和other这个df的所有行,返回一个新的DataFrame。
    本方法类似与SQL中的 UNION ALL,与SQL中相同,本方法是根据列的位置来合并两个df的(不是根据列名),也就是说两表中同为第1列的会合并为一列,不论这两列的列名是否相同。

    >>> df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
    >>> df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
    >>> df1.union(df2).show()  
    +----+----+----+
    |col0|col1|col2|
    +----+----+----+
    |   1|   2|   3|
    |   4|   5|   6|
    +----+----+----+
    

    unionByName(self, other): (推荐!)

    与SQL中的UNION ALL and UNION DISTINCT不同,本方法通过列名解析列(而不是位置)。
    例:两表中列名同为"col0"的会合并为新的一列。

    >>> df1 = spark.createDataFrame([[1, 2, 3]], ["col0", "col1", "col2"])
    >>> df2 = spark.createDataFrame([[4, 5, 6]], ["col1", "col2", "col0"])
    >>> df1.unionByName(df2).show()  
    +----+----+----+
    |col0|col1|col2|
    +----+----+----+
    |   1|   2|   3|
    |   6|   4|   5|
    +----+----+----+
    
    (3) intersect(oter): 两表取交集

    def intersect(self, other): 返回新的DataFrame,只包含同时在两表中出现的行。等同于SQL中的INTERSECT。、
    注意:如果df1中有两行完全一致,df2中也有这两行且完全一致,在最后的结果中,之后出现一次该行。

    def intersectAll(self, other):
    如果df1中有两行完全一致,df2中也有这两行且完全一致,在最后的结果中会出现两次该行。
    例:

    >>> df1 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3), ("c", 4)], ["C1", "C2"])
    >>> df2 = spark.createDataFrame([("a", 1), ("a", 1), ("b", 3)], ["C1", "C2"])
    
    >>> df1.intersectAll(df2).sort("C1", "C2").show()
    +---+---+
    | C1| C2|
    +---+---+
    |  a|  1|
    |  a|  1|
    |  b|  3|
    +---+---+
    
    (4) 排除另一个表中包含的行

    def subtract(self, other):返回一个新的DataFrame,会排除掉本df中有,且other中也有的行,即只留下本df中独有的行,相当于SQL中的 EXCEPT DISTINCT

    2.2.6 数据预处理

    (1)去除重复的行

    def dropDuplicates(self, subset=None):
    返回新的DataFrame,删除原表中重复的行,当subset=None时,除非两行的所有列的值均相同,才会被删除。也可以用subset参数指定检测的列,如只检测某一列,这一列有重复值,就会发生删除。

    >>> from pyspark.sql import Row
    >>> df = sc.parallelize([ \\
    ...     Row(name='Alice', age=5, height=80), \\
    ...     Row(name='Alice', age=5, height=80), \\
    ...     Row(name='Alice', age=10, height=80)]).toDF()
    >>> df.dropDuplicates().show()
    +---+------+-----+
    |age|height| name|
    +---+------+-----+
    |  5|    80|Alice|
    | 10|    80|Alice|
    +---+------+-----+
    
    >>> df.dropDuplicates(['name', 'height']).show()
    +---+------+-----+
    |age|height| name|
    +---+------+-----+
    |  5|    80|Alice|
    +---+------+-----+
    
    (2) dropna

    def dropna(self, how=‘any’, thresh=None, subset=None):
    删除原df中包含空值的行,返回一个新的DataFrame。
    DataFrame.dropna 与 DataFrameNaFunctions.drop 方法等同。

    how:
    ‘any’:只有某行包含一个空值,就删除该行;
    ‘all’:某行全部是空值才会删除该行。

    thresh: int, 默认为None。表示如果某行的非空值小于thresh个,就会删除该行。设置thresh后,how参数将会失效。

    subset: string列表,指定考虑那些列名。

    >>> df4.dropna().show()
    >>> df4.na.drop().show()
    +---+------+-----+
    |age|height| name|
    +---+------+-----+
    | 10|    80|Alice|
    +---+------+-----+
    
    (3) fillna

    def fillna(self, value, subset=None):
    将表中的所有空值填充为value参数指定的值。
    df.fillna()与df.na.fill()等同,相当于 DataFrame.fillna and DataFrameNaFunctions.fill

    value: 可选:int, long, float, string, bool or dict. 用于替换空值的值。
    如果value设定为字典dict,subset参数将会被忽略,表示某一列有缺失时为该列填充什么值,例:{‘col_1’ : 1, ‘col_2’ : 0}。

    subset: 考虑哪些列。list of string

    >>> df4.na.fill(50).show()
    +---+------+-----+
    |age|height| name|
    +---+------+-----+
    | 10|    80|Alice|
    |  5|    50|  Bob|
    | 50|    50|  Tom|
    | 50|    50| null|
    +---+------+-----+
    
    >>> df5.na.fill(False).show()
    +----+-------+-----+
    | age|   name|  spy|
    +----+-------+-----+
    |  10|  Alice|false|
    |   5|    Bob|false|
    |null|Mallory| true|
    +----+-------+-----+
    
    >>> df4.na.fill({'age': 50, 'name': 'unknown'}).show()
    +---+------+-------+
    |age|height|   name|
    +---+------+-------+
    | 10|    80|  Alice|
    |  5|  null|    Bob|
    | 50|  null|    Tom|
    | 50|  null|unknown|
    +---+------+-------+
    
    (4)replace方法

    def replace(self, to_replace, value=_NoValue, subset=None):
    将原df中的某些值替换成其它的值,返回替换后的DataFrame。
    DataFrame.replace 和 DataFrameNaFunctions.replace等价。

    to_replace: bool, int, long, float, string, list or dict,想被替代的值。
    若to_replace设置为dict,参数value将会被忽略,dict的键指定要被替换的值,dict的值指定要替换成什么值,如{‘100’: 0, ‘888’:999},将所有100都换成0,所有888换成999.

    value: bool, int, long, float, string, list or None,想要替换成的值。

    subset: list of string要考虑哪些列。

    >>> df4.na.replace(10, 20).show()
    +----+------+-----+
    | age|height| name|
    +----+------+-----+
    |  20|    80|Alice|
    |   5|  null|  Bob|
    |null|  null|  Tom|
    |null|  null| null|
    +----+------+-----+
    
    >>> df4.na.replace('Alice', None).show()
    +----+------+----+
    | age|height|name|
    +----+------+----+
    |  10|    80|null|
    |   5|  null| Bob|
    |null|  null| Tom|
    |null|  null|null|
    +----+------+----+
    
    >>> df4.na.replace({'Alice': None}).show()
    +----+------+----+
    | age|height|name|
    +----+------+----+
    |  10|    80|null|
    |   5|  null| Bob|
    |null|  null| Tom|
    |null|  null|null|
    +----+------+----+
    
    >>> df4.na.replace(['Alice', 'Bob'], ['A', 'B'], 'name').show()
    +----+------+----+
    | age|height|name|
    +----+------+----+
    |  10|    80|   A|
    |   5|  null|   B|
    |null|  null| Tom|
    |null|  null|null|
    +----+------+----+
    
    (5)distinct方法

    def distinct(self):
    删除df中重复的行,返回删除后的新DataFrame。

    >>> df.distinct().count()  # 计算不重复的行有几行
    2
    

    2.2.7 排序方法

    (1)sortWithinPartitions

    **def sortWithinPartitions(self, *cols, kwargs):
    df的每个分区分别在自己分区范围内排序,返回df排序的结果。

    cols: 要排序的列名,Column或字符串组成的list。
    ascending: 是否升序?默认True。可选:boolean 或list of boolean。
    如果ascending参数输入为list,则该list的长度必须与cols的长度相同,对应每一列按照什么顺序排序。

    >>> df.sortWithinPartitions("age", ascending=False).show()
    +---+-----+
    |age| name|
    +---+-----+
    |  2|Alice|
    |  5|  Bob|
    +---+-----+
    

    **def sort(self, *cols, kwargs):
    排序,忽略分区,所有列一起排序。

    cols: 要排序的列名,Column或字符串组成的list。
    ascending: 是否升序?默认True。可选:boolean 或list of boolean。

    >>> df.sort(df.age.desc()).collect()
    [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
    >>> df.sort("age", ascending=False).collect()
    [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
    >>> df.orderBy(df.age.desc()).collect()
    [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
    >>> from pyspark.sql.functions import *
    >>> df.sort(asc("age")).collect()
    [Row(age=2, name=u'Alice'), Row(age=5, name=u'Bob')]
    >>> df.orderBy(desc("age"), "name").collect()
    [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
    >>> df.orderBy(["age", "name"], ascending=[0, 1]).collect()
    [Row(age=5, name=u'Bob'), Row(age=2, name=u'Alice')]
    

    2.2.8 数据类型转换相关的方法

    (1) toPandas

    def toPandas(self):
    将本df转换为pandas的df并返回。

    注意:这个方法用的时候,df一定不能太大,因为所有的存储都是在本机的内存上。

    >>> df.toPandas()  # doctest: +SKIP
        age   name
    0    2  Alice
    1    5    Bob
    

    (2) toJSON

    def toJSON(self, use_unicode=True):
    将df转换为包含字符串的RDD类型,原df中的每一行会转变成一个JSON格式的字符串,存成RDD中的一个元素。

    >>> df.toJSON().first()
    u'{"age":2,"name":"Alice"}'
    

    2.2.9 统计类方法

    (1) corr

    def corr(self, col1, col2, method=None):
    计算df中两列的皮尔森相关系数,返回double值。DataFrame.corr 等同于 DataFrameStatFunctions.corr。

    col1: The name of the first column
    col2: The name of the second column
    method: 求相关系数的方法,目前只支持 “pearson”。

    (2) cov

    def cov(self, col1, col2):
    计算给定两列的样本协方差,返回double值。DataFrame.cov 等同于DataFrameStatFunctions.cov。

    col1: The name of the first column
    col2: The name of the second column

    (3) approxQuantile: 求某列的分位数

    def approxQuantile(self, col, probabilities, relativeError):

    2.2.10 其它方法

    (1)每行执行函数

    def foreach(self, f):
    对df的每一行执行函数f , 等同于df.rdd.foreach().

    >>> def f(person):
    ...     print(person.name)
    >>> df.foreach(f)
    

    def foreachPartition(self, f):
    对df的每一个分区执行函数f,等同于 df.rdd.foreachPartition().

    >>> def f(people):
    ...     for person in people:
    ...         print(person.name)
    >>> df.foreachPartition(f)
    
    (2)增加列、修改列名

    def withColumn(self, colName, col): 增加一列或修改现有列的内容。

    colName: string, 新增列的列名.
    col: 新增列的值如何设置,用Column 表达式设定如何设置新列的值。col表达式必须是在调用本方法的df上进行操作,如果调用了其它的df会报错。

    见示例。

    >>> df.withColumn('age2', df.age + 2).collect()
    [Row(age=2, name=u'Alice', age2=4), Row(age=5, name=u'Bob', age2=7)]
    

    如果此时df中已有一列名为’age2’,则上面的操作实际为修改’age2’列的内容。

    def withColumnRenamed(self, existing, new): 修改df中的某列的列名,返回新的df。
    如果existing不存在,本方法相当于空操作,不会报错。
    existing: string, name of the existing column to rename.
    new: string, new name of the column.

    >>> df.withColumnRenamed('age', 'age2').collect()
    [Row(age2=2, name=u'Alice'), Row(age2=5, name=u'Bob')]
    
    (3)重新设定分区

    def repartition(self, numPartitions, *cols):

    将df按照指定的分区个数重新分区。分区方式为哈希分区。

    numPartitions: 无默认参数,必须赋值,不赋值会报错。
    1) int值,指定分区个数;
    2) string,列名,指定用哪一列作为分区(类似与SQL中根据字段分区);

    查询DataFrame的分区个数的方法: df.rdd.getNumPartitions()

    >>> df.repartition(10).rdd.getNumPartitions()
    10
    >>> data = df.union(df).repartition("age")
    >>> data.show()
    +---+-----+
    |age| name|
    +---+-----+
    |  5|  Bob|
    |  5|  Bob|
    |  2|Alice|
    |  2|Alice|
    +---+-----+
    >>> data = data.repartition(7, "age")  # 划分成7个分区,并将age作为分区的列
    >>> data.show()
    +---+-----+
    |age| name|
    +---+-----+
    |  2|Alice|
    |  5|  Bob|
    |  2|Alice|
    |  5|  Bob|
    +---+-----+
    >>> data.rdd.getNumPartitions()
    7
    >>> data = data.repartition("name", "age")
    >>> data.show()
    +---+-----+
    |age| name|
    +---+-----+
    |  5|  Bob|
    |  5|  Bob|
    |  2|Alice|
    |  2|Alice|
    +---+-----+
    

    def repartitionByRange(self, numPartitions, *cols):
    将df按照指定的分区个数重新分区。分区方式为范围分区,即根据某列值的大小,大致均匀的分成几份,以此将表分成几个区。默认顺序是升序排列。

    (不同分区方式的区别和介绍参考博客:https://blog.csdn.net/qq_33813365/article/details/78143492)
    

    numPartitions: 无默认参数,必须赋值,不赋值会报错。
    1) int值,指定分区个数;
    2) string,列名,指定用哪一列作为分区(类似与SQL中根据字段分区);

    注意:必须至少有一个分区表达式,即至少要指定数量或列名中的一个。

    When no explicit sort order is specified, “ascending nulls first” is assumed.

    >>> df.repartitionByRange(2, "age").rdd.getNumPartitions()
    2
    >>> df.show()
    +---+-----+
    |age| name|
    +---+-----+
    |  2|Alice|
    |  5|  Bob|
    +---+-----+
    >>> df.repartitionByRange(1, "age").rdd.getNumPartitions()
    1
    >>> data = df.repartitionByRange("age")
    >>> df.show()
    +---+-----+
    |age| name|
    +---+-----+
    |  2|Alice|
    |  5|  Bob|
    +---+-----+
    

    3. DataFrameReader常用方法

    3.1 text方法

    作用:从文本文件中读入数据,返回DataFrame对象,列名为‘value’。
    定义: text(self, paths, wholetext=False, lineSep=None):

    ---------------------------------------------- 参数列表 ----------------------------------------------
    paths: string 或 list of strings,文件的路径;
    wholetext: 默认False,表示文件中的每一行当做DataFrame中的一行 ; True:将单个文件的所有内容设定为DataFrame的一行。
    lineSep: 设置可以识别的行分隔符,当默认为None时,可以识别的有:\r, \r\n and \n

    例子:

    >>> df = spark.read.text('python/test_support/sql/text-test.txt')
    >>> df.collect()
    [Row(value=u'hello'), Row(value=u'this')]
    >>> df = spark.read.text('python/test_support/sql/text-test.txt', wholetext=True)
    >>> df.collect()
    [Row(value=u'hello\\nthis')]
    

    3.2 csv方法

    从csv文件中读取数据,返回DataFrame对象。
    定义:

    csv(self, path, schema=None, sep=None, encoding=None, quote=None, escape=None,
        comment=None, header=None, inferSchema=None, ignoreLeadingWhiteSpace=None,
        ignoreTrailingWhiteSpace=None, nullValue=None, nanValue=None, positiveInf=None,
        negativeInf=None, dateFormat=None, timestampFormat=None, maxColumns=None,
        maxCharsPerColumn=None, maxMalformedLogPerPartition=None, mode=None,
        columnNameOfCorruptRecord=None, multiLine=None, charToEscapeQuoteEscaping=None,
        samplingRatio=None, enforceSchema=None, emptyValue=None):
    

    ---------------------------------------------- 参数列表 ----------------------------------------------
    path: string, or list of strings, or RDD of Strings storing CSV rows,文件所在路径。
    schema: 用来指定列名或列的数据类型。传入类型:pyspark.sql.types.DataType、datatype string 或 字符串列表表示列名。示例:
    1)指定列名:schema=[‘name’, ‘age’]
    2)同时指定列名和数据类型:“a: string, b: int”
    3)同时指定列名和数据类型

    sep: 指定数据之间的分隔符,默认为逗号’,’
    encoding: 指定CSV文件的解码方式,默认为’UTF-8’。
    header: 是否将CSV文件的第一行当做DataFrame的列名,默认False。
    ignoreLeadingWhiteSpace: 是否忽略数据开头的空格?默认False不忽略。
    ignoreTrailingWhiteSpace:是否忽略数据末尾的空格?默认False不忽略。
    nullValue: 设定表示空值的字符串,默认为空字符串’’。
    nanValue: 设定表示非数字型数据的字符串,默认为’NaN’。

    示例:

    >>> df = spark.read.csv('python/test_support/sql/ages.csv')
    >>> df.dtypes
    [('_c0', 'string'), ('_c1', 'string')]
    >>> sc = spark.sparkContext
    >>> rdd = sc.textFile('python/test_support/sql/ages.csv')
    >>> df2 = spark.read.csv(rdd)
    >>> df2.dtypes
    [('_c0', 'string'), ('_c1', 'string')]
    

    3.3 table方法

    将指定数据库的某个表中的数据读取为DataFrame对象并返回。
    定义: def table(self, tableName):

    tableName: string, 表的名称。
    例程:

    >>> df = spark.read.parquet('python/test_support/sql/parquet_partitioned')
    >>> df.createOrReplaceTempView('tmpTable')  # 创建一个临时的表
    >>> spark.read.table('tmpTable').dtypes
    [('name', 'string'), ('year', 'int'), ('month', 'int'), ('day', 'int')]
    

    4. DataFrameWriter

    4.1 常用方法

    4.1.1 csv

    定义:

    csv(self, path, mode=None, compression=None, sep=None, quote=None, escape=None,
        eader=None, nullValue=None, escapeQuotes=None, quoteAll=None, dateFormat=None,
        timestampFormat=None, ignoreLeadingWhiteSpace=None, ignoreTrailingWhiteSpace=None,
        charToEscapeQuoteEscaping=None, encoding=None, emptyValue=None)
    

    将DataFrame保存到csv文件中。
    path: 要存放的文件路径和文件名
    mode: 如果文件已经存在,选择哪种模式写入数据:

    参数可选值 效果
    ‘error’ or ‘errorifexists’ (默认) 抛出异常
    ‘append’ 在已有数据后面接着写入新数据
    ‘overwrite’ 覆盖原有数据
    ‘ignore’ 忽略此次操作,即不写入数据

    sep: 分隔符,默认是逗号’,’。
    header: boolean,是否将列名写入文件,默认False;

    4.1.2 saveAsTable

    定义:saveAsTable(self, name, format=None, mode=None, partitionBy=None, **options)

    将DataFrame保存到数据库的某个表中。注意当mode为overwrite覆盖时,不会自动选择覆盖某个分区,而是会把某个表的内容全部删除!!!如果需要覆盖某个分区的功能,必须要配置以下参数:

    ss.conf.set('hive.exec.dynamic.partition', 'true')
    ss.conf.set('hive.exec.dynamic.partition.mode', 'nonstrict')
    ss.conf.set('spark.sql.sources.partition', verwriteMode','dynamic')
    

    name : 表名
    format: the format used to save
    mode: 存放的模式,有以下4种模式:

    参数可选值 效果
    ‘error’ or ‘errorifexists’ (默认) 抛出异常
    ‘append’ 在已有数据后面接着写入新数据
    ‘overwrite’ 覆盖原有数据,且不要求表结构一样。(慎用!如果不设置参数,会删除全部原数据!)
    ‘ignore’ 忽略此次操作,即不写入数据

    partitionBy: 分区的列的名称

    4.1.3 text方法

    定义:text(self, path, compression=None, lineSep=None)
    将DataFrame的内容存放到一个文本文件中。
    path: 存放文件的路径,在HDFS中的路径。
    compression: 压缩方式,可选:bzip2, gzip, lz4, snappy and deflate
    lineSep: 行与行之间的分隔符,默认为 ’ \n ’

    展开全文

空空如也

空空如也

1 2 3 4 5 ... 7
收藏数 139
精华内容 55
关键字:

dataframewriter