精华内容
下载资源
问答
  • Spark withColumn 陷阱

    千次阅读 2020-04-27 23:43:41
    withColumn / withColumnRenamed 是 spark 中常用的 API,可以用于添加新字段 / 字段重命名 / 修改字段类型,但是当列的数量增加时,会出现严重的性能下降现象,本文将分析出现该现象的原因以及该如何解决

    withColumn / withColumnRenamed 是 spark 中常用的 API,可以用于添加新字段 / 字段重命名 / 修改字段类型,但是当列的数量增加时,会出现严重的性能下降现象,本文将分析出现该现象的原因以及该如何解决它。

    背景

    在日常工作中,有时候会有建模或分析的同学问我,为什么用 withColumn / withColumnRenamed 会这么慢,明明数据量也不大,应该怎么解决。初步分析会发现,出现这种情况的时往往伴随着大量的列,难道是 spark 处理不了大宽表的场景吗?

    现象及探究

    对真实场景做了一个简化,下面是对一个10行的数据增加500列的一个操作,从代码上看好像没有什么问题,执行一下,却发现耗时14秒。

    var df = spark.range(10).toDF()
    for (i <- 1 to 500) {
    	df = df.withColumn("id_" + i, col("id") + i)
    }
    

    同样的逻辑使用 select 来实现,只需要0.1秒。

    var df = spark.range(10).toDF()
    df = df.select((1 to 500).map { i =>
      (col("id") + i).as("id_" + i)
    }: _*)
    

    是什么导致了这么大差距,withColumn 时间花到哪去了?查看 withColumn 源码,每次执行完返回一个新的 DataFrame,好像也没有什么问题 。

    def withColumn(colName: String, col: Column): DataFrame = withColumns(Seq(colName), Seq(col))
    
    private[spark] def withColumns(colNames: Seq[String], cols: Seq[Column]): DataFrame = {
      require(colNames.size == cols.size,
              s"The size of column names: ${colNames.size} isn't equal to " +
              s"the size of columns: ${cols.size}")
      SchemaUtils.checkColumnNameDuplication(
        colNames,
        "in given column names",
        sparkSession.sessionState.conf.caseSensitiveAnalysis)
    
      val resolver = sparkSession.sessionState.analyzer.resolver
      val output = queryExecution.analyzed.output
    
      val columnMap = colNames.zip(cols).toMap
    
      val replacedAndExistingColumns = output.map { field =>
        columnMap.find { case (colName, _) =>
          resolver(field.name, colName)
        } match {
          case Some((colName: String, col: Column)) => col.as(colName)
          case _ => Column(field)
        }
      }
    
      val newColumns = columnMap.filter { case (colName, col) =>
        !output.exists(f => resolver(f.name, colName))
      }.map { case (colName, col) => col.as(colName) }
    
      select(replacedAndExistingColumns ++ newColumns : _*)
    }
    

    使用 df.explain(true) 就能发现一些端倪,虽然他们最终生成的物理计划是一致的,但是逻辑计划存在着巨大的差异,使用 withColumn 方式的逻辑计划存在 500个 Project ,而 select 只有1个。

    再用 RuleExecutor 查看 catalyst analysis 的统计信息,会发现 withColumn 中调用了 500 次 analyse,情况逐渐开始明朗了。

    import org.apache.spark.sql.catalyst.rules.RuleExecutor
    var df = spark.range(10).toDF()
    RuleExecutor.resetMetrics()
    for (i <- 1 to 500) {
    	df = df.withColumn("id_" + i, col("id") + i)
    }
    println(RuleExecutor.dumpTimeSpent())
    

    在这里插入图片描述
    而使用 select 的方式只会调用一次
    [外链图片转存失败,源站可能有防盗链机制,建议将图片保存下来直接上传(img-Gj1TL4T1-1588001866153)(/Users/liangshiwei/Library/Application Support/typora-user-images/image-20200427180552048.png)]
    进一步做了一个迭代次数和时间的关系测试,发现耗时并不是随着次数线性增长,这是因为每次迭代生成的逻辑计划中会多增加一个 Project ,因此下一次的 analyse 时间会比上一次要长。

    次数 analyse 耗时(s)
    1 0.4
    10 0.4
    100 0.9
    500 14
    1000 65

    总结

    1. 多次执行 withColumn / withColumnRenamed 时,大部分时间都花费在 catalyse analyse 的反复调用上,且随着迭代次数的增加,逻辑计划的 Project 会增加,耗时会呈指数上升,具体的耗时还会随原表字段数进行一些变化。
    2. 完全可以使用 select 取代多次调用 withColumn / withColumnRenamed 的方式。
    展开全文
  • scala 使用withColumn方法

    千次阅读 2019-08-06 17:11:01
    scala – 使用withColumn将两列添加到现有DataFrame 现在我想再向现有的DataFrame添加两列. 目前我正在使用DataFrame中的withColumn方法执行此操作. withColumn()方法: withColumn(colName, col)[source] ...

    scala – 使用withColumn将两列添加到现有DataFrame
    现在我想再向现有的DataFrame添加两列.

    目前我正在使用DataFrame中的withColumn方法执行此操作.
    withColumn()方法:

     withColumn(colName, col)[source]
    
        Returns a new DataFrame by adding a column or replacing the existing column that has the same name.
    
        The column expression must be an expression over this DataFrame; attempting to add a column from some other dataframe will raise an error.
    
        Parameters
    
                colName – string, name of the new column.
    
                col – a Column expression for the new column.
    
        >>> df.withColumn('age2', df.age + 2).collect()
        [Row(age=2, name='Alice', age2=4), Row(age=5, name='Bob', age2=7)]
    
    
    

    例如:

    df.withColumn("newColumn1", udf(col("somecolumn")))
      .withColumn("newColumn2", udf(col("somecolumn")))
    

    这种方法需要两次调用AFAIk(每个新列一次).但是如果你的udf计算量很大,你可以避免在将“复杂”结果存储到临时列中然后“解压缩”结果时将其调用两次

    使用案例类或元组作为udf的结果

    编辑:

    使用UDF返回元组,解压缩将如下所示:

    val newDf = df
        .withColumn("udfResult",myUDf(col("name")))
        .withColumn("lowercaseColumn", col("udfResult._1"))
        .withColumn("uppercaseColumn", col("udfResult._2"))
        .drop("udfResult")
      
    

    文章参考:https://codeday.me/bug/20180824/228440.html

    展开全文
  • spark dataFrame withColumn

    千次阅读 2018-06-25 19:17:00
    说明:withColumn用于在原有DF新增一列1. 初始化sqlContextval sqlContext = new org.apache.spark.sql.SQLContext(sc) 2.导入sqlContext隐式转换import sqlContext.implicits._ 3. 创建DataFrames val df = ...

    说明:withColumn用于在原有DF新增一列

    1. 初始化sqlContext

    val sqlContext = new org.apache.spark.sql.SQLContext(sc) 

    2.导入sqlContext隐式转换

    import sqlContext.implicits._ 

    3.  创建DataFrames

     

    val df = sqlContext.read.json("file:///usr/local/spark-2.3.0/examples/src/main/resources/people.json")

    4. 显示内容

    df.show()   

    | age|   name| 
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|

    5. 为原有df新加一列

    df.withColumn("id2", monotonically_increasing_id()+1) 

    6. 显示添加列后的内容

     res6.show() 

    +----+-------+---+
    | age|   name|id2|
    +----+-------+---+
    |null|Michael|  1|
    |  30|   Andy|  2|
    |  19| Justin|  3|
    +----+-------+---+

     

    完成的过程如下:

    scala> val sqlContext = new org.apache.spark.sql.SQLContext(sc) 
    warning: there was one deprecation warning; re-run with -deprecation for details
    sqlContext: org.apache.spark.sql.SQLContext = org.apache.spark.sql.SQLContext@2513155a
    scala> import sqlContext.implicits._
    import sqlContext.implicits._
    scala> val df = sqlContext.read.json("file:///usr/local/spark-2.3.0/examples/src/main/resources/people.json")
    2018-06-25 18:55:30 WARN  ObjectStore:6666 - Version information not found in metastore. hive.metastore.schema.verification is not enabled so recording the schema version 1.2.0
    2018-06-25 18:55:30 WARN  ObjectStore:568 - Failed to get database default, returning NoSuchObjectException
    2018-06-25 18:55:32 WARN  ObjectStore:568 - Failed to get database global_temp, returning NoSuchObjectException
    df: org.apache.spark.sql.DataFrame = [age: bigint, name: string]
    scala> df.show()
    +----+-------+
    | age|   name|
    +----+-------+
    |null|Michael|
    |  30|   Andy|
    |  19| Justin|
    +----+-------+

     

    scala> df.withColumn("id2", monotonically_increasing_id()+1)
    res6: org.apache.spark.sql.DataFrame = [age: bigint, name: string ... 1 more field]
    scala> res6.show()
    +----+-------+---+
    | age|   name|id2|
    +----+-------+---+
    |null|Michael|  1|
    |  30|   Andy|  2|
    |  19| Justin|  3|
    +----+-------+---+

     

    展开全文
  • <div><p>It seems like withColumn method strips alias part from ALIAS.COLUMN clause and replaces it with table name. Eg: <pre><code> $result = BookQuery::create() ->leftJoin('Book.Category ...
  • 在本文中,我将使用withColumn()示例向您介绍常用的PySpark DataFrame列操作。 PySpark withColumn –更改列的数据类型 转换/更改现有列的值 从现有列派生新列 添加具有文字值的列 重命名列名 删除D

    原文:https://sparkbyexamples.com/pyspark/pyspark-withcolumn/

    PySparkwithColumn()是DataFrame的转换函数,用于更改或更新值,转换现有DataFrame列的数据类型,添加/创建新列以及多核。在本文中,我将使用withColumn()示例向您介绍常用的PySpark DataFrame列操作。

    首先,让我们创建一个要使用的DataFrame

    data = [('James','','Smith','1991-04-01','M',3000),
      ('Michael','Rose','','2000-05-19','M',4000),
      ('Robert','','Williams','1978-09-05','M',4000),
      ('Maria','Anne','Jones','1967-12-01','F',4000),
      ('Jen','Mary','Brown','1980-02-17','F',-1)
    ]
    
    columns = ["firstname","middlename","lastname","dob","gender","salary"]
    df = spark.createDataFrame(data=data, schema = columns)
    

    1.使用带有列的PySpark更改列DataType

    通过在DataFramewithColumn()上使用PySpark,我们可以强制转换或更改列的数据类型。为了更改数据类型,您还需要将cast()函数与withColumn()一起使用。下面的语句将“工资”列的数据类型从String更改Integer为。

     df2 = df.withColumn("salary",col("salary").cast("Integer"))
    df2.printSchema()
    

    2.更新现有列的值

    DataFrame的PySparkwithColumn()函数也可以用于更改现有列的值。为了更改值,将现有的列名作为第一个参数传递,并将要分配的值作为第二个参数传递给withColumn()函数。请注意,第二个参数应为Columntype。

    df3 = df.withColumn("salary",col("salary")*100)
    df3.printSchema()
    

    此代码段将“ salary”的值乘以100,并将其值更新回“ salary”列。

    3.从现有的创建新列

    要添加/创建新列,请使用您希望新列成为的名称指定第一个参数,并通过对现有列执行操作来使用第二个参数来分配值。

    df4 = df.withColumn("CopiedColumn",col("salary")* -1)
    df3.printSchema()
    

    此代码段通过将“工资”列乘以值-1来创建新列“ CopiedColumn”。

    4.使用withColumn()添加一个新列

    为了创建新列,请将所需的列名传递给withColumn()转换函数的第一个参数。确保此新列尚未出现在DataFrame上(如果显示的话)会更新该列的值。

    在下面的代码片段中,使用lit()函数将常量值添加到DataFrame列。我们还可以链接以添加多个列。

    df5 = df.withColumn("Country", lit("USA"))
    df5.printSchema()
    
    df6 = df.withColumn("Country", lit("USA")) \
       .withColumn("anotherColumn",lit("anotherValue"))
    df6.printSchema()
    

    5.重命名列名

    尽管您不能使用withColumn重命名列,但我还是想介绍一下,因为重命名是我们在DataFrame上执行的常见操作之一。要重命名现有列,请使用withColumnRenamed()DataFrame上的函数。

    df.withColumnRenamed("gender","sex") \
      .show(truncate=False) 
    

    6.从PySpark DataFrame删除一列

    使用“放置”功能从DataFrame放置特定的列

    df4.drop("CopiedColumn") \
    .show(truncate=False) 
    

    **注意:**请注意,所有这些函数在应用函数后都将返回新的DataFrame,而不是更新DataFrame。

    PySpark withColumn完整示例

    import pyspark
    from pyspark.sql import SparkSession
    from pyspark.sql.functions import col, lit
    from pyspark.sql.types import StructType, StructField, StringType,IntegerType
    
    spark = SparkSession.builder.appName('SparkByExamples.com').getOrCreate()
    
    data = [('James','','Smith','1991-04-01','M',3000),
      ('Michael','Rose','','2000-05-19','M',4000),
      ('Robert','','Williams','1978-09-05','M',4000),
      ('Maria','Anne','Jones','1967-12-01','F',4000),
      ('Jen','Mary','Brown','1980-02-17','F',-1)
    ]
    
    columns = ["firstname","middlename","lastname","dob","gender","salary"]
    df = spark.createDataFrame(data=data, schema = columns)
    df.printSchema()
    df.show(truncate=False)
    
    df2 = df.withColumn("salary",col("salary").cast("Integer"))
    df2.printSchema()
    df2.show(truncate=False)
    
    df3 = df.withColumn("salary",col("salary")*100)
    df3.printSchema()
    df3.show(truncate=False) 
    
    df4 = df.withColumn("CopiedColumn",col("salary")* -1)
    df4.printSchema()
    
    df5 = df.withColumn("Country", lit("USA"))
    df5.printSchema()
    
    df6 = df.withColumn("Country", lit("USA")) \
       .withColumn("anotherColumn",lit("anotherValue"))
    df6.printSchema()
    
    df.withColumnRenamed("gender","sex") \
      .show(truncate=False) 
      
    df4.drop("CopiedColumn") \
    .show(truncate=False) 
    

    完整的代码可以从PySpark withColumn GitHub project下载

    学习愉快!

    展开全文
  • DataFram列操作_withColumn()

    千次阅读 2020-08-10 14:40:52
    2、withColumn的第二个参数要传入已有列的Column对象,否则会报错; 3、sql.functions.lit()函数,返回的也是列对象,可以传入任意参数值; 二、实例 1、testData 张三,23 李四,24 王五,25 赵六,26 2、代码 ...
  • withColumn('MONTH(Expense.Date)', 'Month') ->withColumn('SUM(Expense.Amount)', 'Total') ->groupBy('Month') ->orderBy('Month') ->find(); ...
  • spark dataFrame 新增一列函数withColumn

    万次阅读 2017-05-30 11:34:02
    往一个dataframe新增某个列是很常见的事情。 ...然而这个资料还是不多,很多都需要很多变换。...不过由于这回需要增加的列非常简单,倒也没有必要再用UDF函数去修改列。...利用withColumn函数就能实现对da
  • I am working with Spark and ... I am trying to achieve the result equivalent to the following pseudocode:df = df.withColumn('new_column',IF fruit1 == fruit2 THEN 1, ELSE 0. IF fruit1 IS NULL OR f...
  • df.withColumn

    千次阅读 2018-05-16 21:58:18
    df = df.withColumn(columnName, lowerCaseUDF(df[columnName])) df.select( "Tipo_unidad" ).distinct().show() def recent_six_months (clrq) : try : time.strptime(clrq, "%Y-%m-%d" ) clrq_date_...
  • salesIndexDf = salesIndexDf.withColumn("revenue_by_inh", salesIndexDf.col("revenue") .divide(salesIndexDf.col("pop"))); salesIndexDf = salesIndexDf.orderBy(col("revenue_by_inh").desc()); Row bestRow ...
  • 说明:withColumn用于在原有DF新增一列1. 初始化sqlContextval sqlContext = new org.apache.spark.sql.SQLContext(sc)2.导入sqlContext隐式转换import sqlContext.implicits._3. 创建DataFramesval df = sqlContext...
  • val asd = df.withColumn("isp",col("response")*3) asd.createOrReplaceTempView("infos2") session.sql("select ip,domain,sum(response) as responseSize,province(ip) as province ,sum(isp) from infos2 ...

空空如也

空空如也

1 2 3 4 5 ... 11
收藏数 216
精华内容 86
关键字:

withcolumn