精华内容
下载资源
问答
  • Spark withColumn 陷阱

    千次阅读 2020-04-27 23:43:41
    withColumn / withColumnRenamed 是 spark 中常用的 API,可以用于添加新字段 / 字段重命名 / 修改字段类型,但是当列的数量增加时,会出现严重的性能下降现象,本文将分析出现该现象的原因以及该如何解决

    withColumn / withColumnRenamed 是 spark 中常用的 API,可以用于添加新字段 / 字段重命名 / 修改字段类型,但是当列的数量增加时,会出现严重的性能下降现象,本文将分析出现该现象的原因以及该如何解决它。

    背景

    在日常工作中,有时候会有建模或分析的同学问我,为什么用 withColumn / withColumnRenamed 会这么慢,明明数据量也不大,应该怎么解决。初步分析会发现,出现这种情况的时往往伴随着大量的列,难道是 spark 处理不了大宽表的场景吗?

    现象及探究

    对真实场景做了一个简化,下面是对一个10行的数据增加500列的一个操作,从代码上看好像没有什么问题,执行一下,却发现耗时14秒。

    var df = spark.range(10).toDF()
    for (i <- 1 to 500) {
    	df = df.withColumn("id_" + i, col("id") + i)
    }
    

    同样的逻辑使用 select 来实现,只需要0.1秒。

    var df = spark.range(10).toDF()
    df = df.select((1 to 500).map { i =>
      (col("id") + i).as("id_" + i)
    }: _*)
    

    是什么导致了这么大差距,withColumn 时间花到哪去了?查看 withColumn 源码,每次执行完返回一个新的 DataFrame,好像也没有什么问题 。

    def withColumn(colName: String, col: Column): DataFrame = withColumns(Seq(colName), Seq(col))
    
    private[spark] def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = {
      require(colNames.size == cols.size,
              s"The size of column names: ${colNames.size} isn't equal to " +
              s"the size of columns: ${cols.size}")
      SchemaUtils.checkColumnNameDuplication(
        colNames,
        "in given column names",
        sparkSession.sessionState.conf.caseSensitiveAnalysis)
    
      val resolver = sparkSession.sessionState.analyzer.resolver
      val output = queryExecution.analyzed.output
    
      val columnMap = colNames.zip(cols).toMap
    
      val replacedAndExistingColumns = output.map { field =>
        columnMap.find { case (colName, _) =>
          resolver(field.name, colName)
        } match {
          case Some((colName: String, col: Column)) => col.as(colName)
          case _ => Column(field)
        }
      }
    
      val newColumns = columnMap.filter { case (colName, col) =>
        !output.exists(f => resolver(f.name, colName))
      }.map { case (colName, col) => col.as(colName) }
    
      select(replacedAndExistingColumns ++ newColumns : _*)
    }
    

    使用 df.explain(true) 就能发现一些端倪,虽然他们最终生成的物理计划是一致的,但是逻辑计划存在着巨大的差异,使用 withColumn 方式的逻辑计划存在 500个 Project ,而 select 只有1个。

    再用 RuleExecutor 查看 catalyst analysis 的统计信息,会发现 withColumn 中调用了 500 次 analyse,情况逐渐开始明朗了。

    import org.apache.spark.sql.catalyst.rules.RuleExecutor
    var df = spark.range(10).toDF()
    RuleExecutor.resetMetrics()
    for (i <- 1 to 500) {
    	df = df.withColumn("id_" + i, col("id") + i)
    }
    println(RuleExecutor.dumpTimeSpent())
    

    在这里插入图片描述
    而使用 select 的方式只会调用一次
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Gj1TL4T1-1588001866153)(/Users/liangshiwei/Library/Application Support/typora-user-images/image-20200427180552048.png)]
    进一步做了一个迭代次数和时间的关系测试,发现耗时并不是随着次数线性增长,这是因为每次迭代生成的逻辑计划中会多增加一个 Project ,因此下一次的 analyse 时间会比上一次要长。

    次数analyse 耗时(s)
    10.4
    100.4
    1000.9
    50014
    100065

    总结

    1. 多次执行 withColumn / withColumnRenamed 时,大部分时间都花费在 catalyse analyse 的反复调用上,且随着迭代次数的增加,逻辑计划的 Project 会增加,耗时会呈指数上升,具体的耗时还会随原表字段数进行一些变化。
    2. 完全可以使用 select 取代多次调用 withColumn / withColumnRenamed 的方式。
    展开全文
  • PySpark withColumn更新或添加列

    千次阅读 2021-02-09 14:24:16
    在本文中,我将使用withColumn()示例向您介绍常用的PySpark DataFrame列操作。 PySpark withColumn –更改列的数据类型 转换/更改现有列的值 从现有列派生新列 添加具有文字值的列 重命名列名 删除D

    原文:https://sparkbyexamples.com/pyspark/pyspark-withcolumn/

    PySparkwithColumn()是DataFrame的转换函数,用于更改或更新值,转换现有DataFrame列的数据类型,添加/创建新列以及多核。在本文中,我将使用withColumn()示例向您介绍常用的PySpark DataFrame列操作。

    首先,让我们创建一个要使用的DataFrame

    data = [('James','','Smith','1991-04-01','M',3000),
      ('Michael','Rose','','2000-05-19','M',4000),
      ('Robert','','Williams','1978-09-05','M',4000),
      ('Maria','Anne','Jones','1967-12-01','F',4000),
      ('Jen','Mary','Brown','1980-02-17','F',-1)
    ]
    
    columns = ["firstname","middlename","lastname","dob","gender","salary"]
    df = spark.createDataFrame(data=data, schema = columns)
    

    1.使用带有列的PySpark更改列DataType

    通过在DataFramewithColumn()上使用PySpark,我们可以强制转换或更改列的数据类型。为了更改数据类型,您还需要将cast()函数与withColumn()一起使用。下面的语句将“工资”列的数据类型从String更改Integer为。

     df2 = df.withColumn("salary",col("salary").cast("Integer"))
    df2.printSchema()
    

    2.更新现有列的值

    DataFrame的PySparkwithColumn()函数也可以用于更改现有列的值。为了更改值,将现有的列名作为第一个参数传递,并将要分配的值作为第二个参数传递给withColumn()函数。请注意,第二个参数应为Columntype。

    df3 = df.withColumn("salary",col("salary")*100)
    df3.printSchema()
    

    此代码段将“ salary”的值乘以100,并将其值更新回“ salary”列。

    3.从现有的创建新列

    要添加/创建新列,请使用您希望新列成为的名称指定第一个参数,并通过对现有列执行操作来使用第二个参数来分配值。

    df4 = df.withColumn("CopiedColumn",col("salary")* -1)
    df3.printSchema()
    

    此代码段通过将“工资”列乘以值-1来创建新列“ CopiedColumn”。

    4.使用withColumn()添加一个新列

    为了创建新列,请将所需的列名传递给withColumn()转换函数的第一个参数。确保此新列尚未出现在DataFrame上(如果显示的话)会更新该列的值。

    在下面的代码片段中,使用lit()函数将常量值添加到DataFrame列。我们还可以链接以添加多个列。

    df5 = df.withColumn("Country", lit("USA"))
    df5.printSchema()
    
    df6 = df.withColumn("Country", lit("USA")) \
       .withColumn("anotherColumn",lit("anotherValue"))
    df6.printSchema()
    

    5.重命名列名

    尽管您不能使用withColumn重命名列,但我还是想介绍一下,因为重命名是我们在DataFrame上执行的常见操作之一。要重命名现有列,请使用withColumnRenamed()DataFrame上的函数。

    df.withColumnRenamed("gender","sex") \
      .show(truncate=False) 
    

    6.从PySpark DataFrame删除一列

    使用“放置”功能从DataFrame放置特定的列

    df4.drop("CopiedColumn") \
    .show(truncate=False) 
    

    **注意:**请注意,所有这些函数在应用函数后都将返回新的DataFrame,而不是更新DataFrame。

    PySpark withColumn完整示例

    import pyspark
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, lit
    from pyspark.sql.types import StructType, StructField, StringType,IntegerType
    
    spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
    
    data = [('James','','Smith','1991-04-01','M',3000),
      ('Michael','Rose','','2000-05-19','M',4000),
      ('Robert','','Williams','1978-09-05','M',4000),
      ('Maria','Anne','Jones','1967-12-01','F',4000),
      ('Jen','Mary','Brown','1980-02-17','F',-1)
    ]
    
    columns = ["firstname","middlename","lastname","dob","gender","salary"]
    df = spark.createDataFrame(data=data, schema = columns)
    df.printSchema()
    df.show(truncate=False)
    
    df2 = df.withColumn("salary",col("salary").cast("Integer"))
    df2.printSchema()
    df2.show(truncate=False)
    
    df3 = df.withColumn("salary",col("salary")*100)
    df3.printSchema()
    df3.show(truncate=False) 
    
    df4 = df.withColumn("CopiedColumn",col("salary")* -1)
    df4.printSchema()
    
    df5 = df.withColumn("Country", lit("USA"))
    df5.printSchema()
    
    df6 = df.withColumn("Country", lit("USA")) \
       .withColumn("anotherColumn",lit("anotherValue"))
    df6.printSchema()
    
    df.withColumnRenamed("gender","sex") \
      .show(truncate=False) 
      
    df4.drop("CopiedColumn") \
    .show(truncate=False) 
    

    完整的代码可以从PySpark withColumn GitHub project下载

    学习愉快!

    展开全文
  • DataFram列操作_withColumn()

    千次阅读 2020-08-10 14:40:52
    2、withColumn的第二个参数要传入已有列的Column对象,否则会报错; 3、sql.functions.lit()函数,返回的也是列对象,可以传入任意参数值; 二、实例 1、testData 张三,23 李四,24 王五,25 赵六,26 2、代码 ...

    一、说明

    1、常用列对象:' 、$ 、col 、column;

    2、withColumn的第二个参数要传入已有列的Column对象,否则会报错;

    3、sql.functions.lit()函数,返回的也是列对象,可以传入任意参数值;

    二、实例

    1、testData

    张三,23
    李四,24
    王五,25
    赵六,26

    2、代码 

    package com.cn.dataFram
    
    import org.apache.spark.sql.{DataFrame, Dataset, SparkSession}
    import org.apache.spark.sql.functions._
    object DataFram {
      def main(args: Array[String]): Unit = {
        val spark = SparkSession.builder()
          .appName("")
          .master("local[1]")
          .getOrCreate()
    
        spark.sparkContext.setLogLevel("WARN")
    
        import spark.implicits._
        val ds: Dataset[String] = spark.read.textFile("testData")
        val df: DataFrame = ds.map(x => x.split(","))
          .map(x => (x(0), x(1)))
          .toDF("name","age")
          .cache()
    
        /**
          * withColumn的第二个参数要传入已有列的Column对象,否则会报错;
          * column的表达式只能引用此数据集提供的属性。 添加引用其他数据集的列是错误的
          */
        //df.withColumn("sno","22")//报错
    
        //展示name,age列
        df.show()
    
        /**
          * def lit(literal : scala.Any) : org.apache.spark.sql.Column
          * sql.functions.lit()函数,返回的也是列对象,可以传入任意参数值;
          */
        df.withColumn("sno",lit("22"))
          //为列进行重命名
          .withColumnRenamed("name","newName")
          .show()
    
        /**
          * 列对象:' 、$ 、col 、column
          */
        df.withColumn("sno1",'age+1).show()
    
        df.withColumn("sno2",$"age").show()
    
        df.withColumn("sno3",col("age")+2).show()
    
        df.withColumn("sno4",column("age")+3).show()
    
      }
    }
    

    3、输出结果

    +----+---+
    |name|age|
    +----+---+
    |  张三| 23|
    |  李四| 24|
    |  王五| 25|
    |  赵六| 26|
    +----+---+
    
    +-------+---+---+
    |newName|age|sno|
    +-------+---+---+
    |     张三| 23| 22|
    |     李四| 24| 22|
    |     王五| 25| 22|
    |     赵六| 26| 22|
    +-------+---+---+
    
    +----+---+----+
    |name|age|sno1|
    +----+---+----+
    |  张三| 23|24.0|
    |  李四| 24|25.0|
    |  王五| 25|26.0|
    |  赵六| 26|27.0|
    +----+---+----+
    
    +----+---+----+
    |name|age|sno2|
    +----+---+----+
    |  张三| 23|  23|
    |  李四| 24|  24|
    |  王五| 25|  25|
    |  赵六| 26|  26|
    +----+---+----+
    
    +----+---+----+
    |name|age|sno3|
    +----+---+----+
    |  张三| 23|25.0|
    |  李四| 24|26.0|
    |  王五| 25|27.0|
    |  赵六| 26|28.0|
    +----+---+----+
    
    +----+---+----+
    |name|age|sno4|
    +----+---+----+
    |  张三| 23|26.0|
    |  李四| 24|27.0|
    |  王五| 25|28.0|
    |  赵六| 26|29.0|
    +----+---+----+
    

     

    展开全文
  • withcolumn方法 待补充

    2021-06-23 11:18:13
    source.withColumn(“dnvj”,"id").show()source.withColumn("dnvj",lit(null)).show()source.withColumn("dnvj",udftolower(("id").show() source.withColumn("dnvj",lit(null)).show() source.withColumn("dnvj...

    source.withColumn(“dnvj”, " i d " ) . s h o w ( ) s o u r c e . w i t h C o l u m n ( " d n v j " , l i t ( n u l l ) ) . s h o w ( ) s o u r c e . w i t h C o l u m n ( " d n v j " , u d f t o l o w e r ( ( "id").show() source.withColumn("dnvj",lit(null)).show() source.withColumn("dnvj",udftolower(( "id").show()source.withColumn("dnvj",lit(null)).show()source.withColumn("dnvj",udftolower((“category”))).show()
    source.withColumn(“dnvj”,udftolower((col(“category”)))).show()

    展开全文
  • scala 使用withColumn方法

    千次阅读 2019-08-06 17:11:01
    scala – 使用withColumn将两列添加到现有DataFrame 现在我想再向现有的DataFrame添加两列. 目前我正在使用DataFrame中的withColumn方法执行此操作. withColumn()方法: withColumn(colName, col)[source] ...
  •   在开发spark应用过程中需要往hive表中造测试数据,同时造多列数据,部分列之间存在逻辑计算关系,正常情况下使用.withColumn(“col_name”,conditions),此时conditions可以直接是类似于 col(“column_a”) * ...
  • spark dataFrame withColumn

    万次阅读 2018-06-25 19:17:00
    说明:withColumn用于在原有DF新增一列1. 初始化sqlContextval sqlContext = new org.apache.spark.sql.SQLContext(sc) 2.导入sqlContext隐式转换import sqlContext.implicits._ 3. 创建DataFrames val df = ...
  • df.withColumn

    千次阅读 2018-05-16 21:58:18
    df = df.withColumn(columnName, lowerCaseUDF(df[columnName])) df.select( "Tipo_unidad" ).distinct().show() def recent_six_months (clrq) : try : time.strptime(clrq, "%Y-%m-%d" ) clrq_date_...
  • spark dataFrame 新增一列函数withColumn

    万次阅读 2017-05-30 11:34:02
    往一个dataframe新增某个列是很常见的事情。 ...然而这个资料还是不多,很多都需要很多变换。...不过由于这回需要增加的列非常简单,倒也没有必要再用UDF函数去修改列。...利用withColumn函数就能实现对da
  • }) val data = rdd2.toDF("id1", "id2", "id3").withColumn("aa", lag("id3", 1).over(win)) data.show() val df_difftime = data.withColumn("diff", when(isnull(col("id3") - col("aa")), 0) .otherwise((col("id...
  • val asd = df.withColumn("isp",col("response")*3) asd.createOrReplaceTempView("infos2") session.sql("select ip,domain,sum(response) as responseSize,province(ip) as province ,sum(isp) from infos2 ...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 3,371
精华内容 1,348
关键字:

withcolumn