精华内容
下载资源
问答
  • In Scala Spark, I can easily add a column to an existing Dataframe writingval newDf = df.withColumn("date_min", anotherDf("date_min"))Doing so in PySpark results in an AnalysisException.Here is what I...

    In Scala Spark, I can easily add a column to an existing Dataframe writing

    val newDf = df.withColumn("date_min", anotherDf("date_min"))

    Doing so in PySpark results in an AnalysisException.

    Here is what I'm doing :

    minDf.show(5)

    maxDf.show(5)

    +--------------------+

    | date_min|

    +--------------------+

    |2016-11-01 10:50:...|

    |2016-11-01 11:46:...|

    |2016-11-01 19:23:...|

    |2016-11-01 17:01:...|

    |2016-11-01 09:00:...|

    +--------------------+

    only showing top 5 rows

    +--------------------+

    | date_max|

    +--------------------+

    |2016-11-01 10:50:...|

    |2016-11-01 11:46:...|

    |2016-11-01 19:23:...|

    |2016-11-01 17:01:...|

    |2016-11-01 09:00:...|

    +--------------------+

    only showing top 5 rows

    And then, what results in an error :

    newDf = minDf.withColumn("date_max", maxDf["date_max"])

    AnalysisExceptionTraceback (most recent call last)

    in ()

    2 maxDf.show(5)

    3

    ----> 4 newDf = minDf.withColumn("date_max", maxDf["date_max"])

    /opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)

    1491 """

    1492 assert isinstance(col, Column), "col should be Column"

    -> 1493 return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)

    1494

    1495 @ignore_unicode_prefix

    /opt/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)

    1131 answer = self.gateway_client.send_command(command)

    1132 return_value = get_return_value(

    -> 1133 answer, self.gateway_client, self.target_id, self.name)

    1134

    1135 for temp_arg in temp_args:

    /opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)

    67 e.java_exception.getStackTrace()))

    68 if s.startswith('org.apache.spark.sql.AnalysisException: '):

    ---> 69 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

    70 if s.startswith('org.apache.spark.sql.catalyst.analysis'):

    71 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

    AnalysisException: u'resolved attribute(s) date_max#67 missing from date_min#66 in operator !Project [date_min#66, date_max#67 AS date_max#106];;\n!Project [date_min#66, date_max#67 AS date_max#106]\n+- Project [date_min#66]\n +- Project [cast((cast(date_min#6L as double) / cast(1000 as double)) as timestamp) AS date_min#66, cast((cast(date_max#7L as double) / cast(1000 as double)) as timestamp) AS date_max#67]\n +- SubqueryAlias df, `df`\n +- LogicalRDD [idvisiteur#5, date_min#6L, date_max#7L, sales_sum#8, sales_count#9L]\n'

    解决方案

    Hope this helps!

    from pyspark.sql.functions import monotonically_increasing_id, row_number

    from pyspark.sql.window import Window

    minDf = sc.parallelize([['2016-11-01 10:50:00'],['2016-11-01 11:46:00']]).toDF(["date_min"])

    maxDf = sc.parallelize([['2016-11-01 10:50:00'],['2016-11-01 11:46:00']]).toDF(["date_max"])

    # since there is no common column between these two dataframes add row_index so that it can be joined

    minDf=minDf.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))

    maxDf=maxDf.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))

    minDf = minDf.join(maxDf, on=["row_index"]).drop("row_index")

    minDf.show()

    Output is:

    +-------------------+-------------------+

    | date_min| date_max|

    +-------------------+-------------------+

    |2016-11-01 10:50:00|2016-11-01 10:50:00|

    |2016-11-01 11:46:00|2016-11-01 11:46:00|

    +-------------------+-------------------+

    展开全文
  • 0.050927 8 0.917438 0.847941 0.034235 -0.448948 2.228131 0.006109 >>> 实际上,这是目前熊猫文档中描述的更有效的方法 编辑2017 如评论中所述,@ Alexander指出,当前最好将Series的值添加DataFrame的新的...

    小编典典

    使用原始的df1索引创建系列:

    df1['e'] = pd.Series(np.random.randn(sLength), index=df1.index)

    编辑2015年

    有些人报告SettingWithCopyWarning使用此代码。

    但是,该代码仍可以在当前的熊猫0.10.1版本中完美运行。

    >>> sLength = len(df1['a'])

    >>> df1

    a b c d

    6 -0.269221 -0.026476 0.997517 1.294385

    8 0.917438 0.847941 0.034235 -0.448948

    >>> df1['e'] = pd.Series(np.random.randn(sLength), index=df1.index)

    >>> df1

    a b c d e

    6 -0.269221 -0.026476 0.997517 1.294385 1.757167

    8 0.917438 0.847941 0.034235 -0.448948 2.228131

    >>> p.version.short_version

    '0.16.1'

    该SettingWithCopyWarning目标对数据帧的副本通知可能无效转让的。它不一定表示您做错了(它可能会触发误报),但从0.13.0起,它会让您知道有更多适当的方法可以实现相同的目的。然后,如果收到警告,请遵循其建议:尝试使用.loc [row_index,col_indexer] = value代替

    >>> df1.loc[:,'f'] = pd.Series(np.random.randn(sLength), index=df1.index)

    >>> df1

    a b c d e f

    6 -0.269221 -0.026476 0.997517 1.294385 1.757167 -0.050927

    8 0.917438 0.847941 0.034235 -0.448948 2.228131 0.006109

    >>>

    实际上,这是目前熊猫文档中描述的更有效的方法

    编辑2017

    如评论中所述,@ Alexander指出,当前最好将Series的值添加为DataFrame的新列的最佳方法是使用assign:

    df1 = df1.assign(e=pd.Series(np.random.randn(sLength)).values)

    2020-02-13

    展开全文
  • 我有一份清单l = [1, 2, 3]和一个DataFramedf = sc.parallelize([['p1', 'a'],['p2', 'b'],['p3', 'c'],]).toDF(('product', 'name'))我想获得一个新的DataFrame,其中列表l作为另一列添加,即+-------+----+---------...

    我已经阅读了类似的问题,但无法找到解决我具体问题的方法.

    我有一份清单

    l = [1, 2, 3]

    和一个DataFrame

    df = sc.parallelize([

    ['p1', 'a'],

    ['p2', 'b'],

    ['p3', 'c'],

    ]).toDF(('product', 'name'))

    我想获得一个新的DataFrame,其中列表l作为另一列添加,即

    +-------+----+---------+

    |product|name| new_col |

    +-------+----+---------+

    | p1| a| 1 |

    | p2| b| 2 |

    | p3| c| 3 |

    +-------+----+---------+

    JOIN的方法,我加入df的时候

    sc.parallelize([[1], [2], [3]])

    失败了.使用withColumn的方法,如

    new_df = df.withColumn('new_col', l)

    失败,因为列表不是Column对象.

    最佳答案 因此,通过阅读一些有趣的东西

    here,我已经确定你不能真正只是将随机/任意列附加到给定的DataFrame对象.看起来你想要的更多的是拉链而不是连接.我环顾四周找到了

    this ticket,这让我觉得如果你有DataFrame而不是RDD对象你将无法压缩.

    我能够解决你的问题的唯一方法就是离开DataFrame对象的世界并返回到RDD对象.我还需要为连接创建索引,这可能适用于您的用例,也可能不适用.

    l = sc.parallelize([1, 2, 3])

    index = sc.parallelize(range(0, l.count()))

    z = index.zip(l)

    rdd = sc.parallelize([['p1', 'a'], ['p2', 'b'], ['p3', 'c']])

    rdd_index = index.zip(rdd)

    # just in case!

    assert(rdd.count() == l.count())

    # perform an inner join on the index we generated above, then map it to look pretty.

    new_rdd = rdd_index.join(z).map(lambda (x, y): [y[0][0], y[0][1], y[1]])

    new_df = new_rdd.toDF(["product", 'name', 'new_col'])

    当我运行new_df.show()时,我得到:

    +-------+----+-------+

    |product|name|new_col|

    +-------+----+-------+

    | p1| a| 1|

    | p2| b| 2|

    | p3| c| 3|

    +-------+----+-------+

    旁注:我真的很惊讶这没用.看起来像外部联接?

    from pyspark.sql import Row

    l = sc.parallelize([1, 2, 3])

    new_row = Row("new_col_name")

    l_as_df = l.map(new_row).toDF()

    new_df = df.join(l_as_df)

    当我运行new_df.show()时,我得到:

    +-------+----+------------+

    |product|name|new_col_name|

    +-------+----+------------+

    | p1| a| 1|

    | p1| a| 2|

    | p1| a| 3|

    | p2| b| 1|

    | p3| c| 1|

    | p2| b| 2|

    | p2| b| 3|

    | p3| c| 2|

    | p3| c| 3|

    +-------+----+------------+

    展开全文
  • pyspark Dataframe添加一列常量列

    千次阅读 2020-06-23 13:22:49
    比如添加1 “0” 使用 from pyspark.sql.functions import lit dm.withColumn('Flag_last_entry',lit(0))\ .withColumn('Flag_2',lit(0))

    比如添加1列 “0”

    使用

     

    from pyspark.sql.functions import lit

    dm.withColumn('Flag_last_entry',lit(0))\

         .withColumn('Flag_2',lit(0))  

    展开全文
  • 刚学习pandas,想给个原有的excel表格上实现添加新数据,但是由于刚学,不熟悉dataframe的特性,本来想按照写入json转csv的方式对数据进行添加,那就意味着要对原先表格的数据进行提取再series合并,最后再写入...
  • 熟悉pandas的pythoner 应该知道给dataframe增加一列很容易,直接以字典形式指定就好了,pyspark中就不同了,摸索了一下,可以使用如下方式增加from pyspark import SparkContextfrom pyspark import SparkConffrom ...
  • dataframe添加一列索引

    千次阅读 2018-12-20 15:36:00
    需求:给现在df数据添加一列sid,要求这一列是和stock一一对应的整数 代码如下: import pandas as pd test_data = {'stock': ['AAPL', 'GOOG', 'AMZN', 'AAPL', 'GOOG', 'AMZN'], 'open': [100, 110, 120,...
  • 相信有很多人收这个问题的困扰,如果你想一次性在pandas.DataFrame里添加几列,或者在指定的位置添加一列,都会很苦恼找不到简便的方法;可以用到的函数有df.reindex, pd.concat我们来看一个例子:df 是一个...
  • Spark DataFrame添加一列单调递增的id列

    千次阅读 2018-09-07 16:11:51
    import org.apache.spark.sql.functions._ val newDataFrame = dataFrame.withColumn("...这样只能添加 id,不能单调递增 import org.apache.spark.sql.expressions.Window import org.apache...
  • 最近身体不太舒服,停更几天,万分抱歉,望tie子们见谅。...本节知识点:pd.DataFrame添加新column(s)常用两种途径直接 赋值操作通过assign method添加新column(s) — 直接赋值 比较加单,直接上codeimpor...
  • 尽管这篇文章解释了如何使用RDD和基本的Dataframe操作,但是我在使用PySpark Dataframes时错过了很多东西。只有当我需要更多功能时,我才阅读并提出多种解决方案来做件事情。如何在Spark中创建新?现在,...
  • pandas把分箱后作为标签透视后,再添加列时再次遇坑。...date = pd.DataFrame({'a':[1,1,3,3,5],'b':[7,8,9,10,11],'c':[101,201,301,401,501]})对c进行分箱,并把结果添加到date的dbins = [100,200,300,40...
  • 数据格式为:因为是部分数据,很多数据都是两个+号连接,所以是一列拆分三列语句的写法为:product_field['一级领域'],product_field['二级领域'],product_field['三级领域'] = product_field['v']...
  • 问题:统计部门的名称,以及对应的人数,添加到最后一列汇总 原始结果,没有汇总列:SELECT D.DNAME,COUNT(*) AS 人数 FROM DEPT D LEFT JOIN EMP E ON D.DEPTNO = E.DEPTNO GROUP BY D.DNAME 方法一原理:两...
  • 各位志同道合的朋友们大家好,我是个一直在一线互联网踩坑十余年的编码爱好者,现在将我们的各种经验以及架构实战分享出来,如果大家喜欢,就关注我,一起将技术学深学透,我会每篇分享结束都会预告下专题泛型...
  • 如果Paul同学排在最后个,我们可以用list的pop()方法删除:L = ['Adam', 'Lisa', 'Bart', 'Paul'] L.pop()12‘Paul’6print(L)1[‘Adam’, ‘Lisa’, ‘Bart’]pop()方法总是删掉list的最后个元素,并且它还返.....
  • 直接df.withColumn(“time”,“201905”) 会报错,说没有引用其他值 之前一直用的 df.withColumn(“time”,col(“age”)-col(“age”)+201905)的变种方式 或者 df.rdd.map(lambda x:(x[0],x[1],x[2],“201905”)...
  • 示例代码如下: 一列一行 一列二行二列一行 二列二行 页面效果如下: 今天的内容到此结束了,一下是今天示例的全部代码: 第一个网页 第一个网页 表格元素表格标题 姓名 年龄 一列一行 一列二行 二列一行 二列二行 ...
  • 生成器 概念在Java,Python等语言中都是具备的,ES6也添加到了JavaScript中。Iterator可以使我们 不需要初始化集合,以及索引的变量 ,而是使用迭代器对象的 next 方法,返回集合的下项的值, 偏向程序化 。迭代器...
  • 种复杂的方法包括定义个模型,将每个缺失的特征作为所有其他特征的函数进行预测,并多次重复这估计特征值的过程。重复允许在预测缺失值的后续迭代中使用其他特征的优化估计值作为输入。这通常被称为迭代插补.....
  • 种复杂的方法包括定义个模型,将每个缺失的特征作为所有其他特征的函数进行预测,并多次重复这估计特征值的过程。重复允许在预测缺失值的后续迭代中使用其他特征的优化估计值作为输入。这通常被称为迭代插补.....
  • Python在Dataframe中新添加一列

    万次阅读 多人点赞 2019-08-13 16:31:25
    在敲代码的过程中,老是会遇到在Dataframe中新添加一列的情况,每次都要重新google,这次做个记录。 其实在Dataframe中新添加一列很简单,直接指明列名,然后赋值就可以了。 import pandas as pd data = pd....
  • 我想在现有的数据框架中添加一个新的‘e’,并且不改变数据框架中的任何内容。(该系列的长度总是与dataframe相同。)中的索引值e匹配那些df1.创建一个名为e,并将系列中的值赋值给它。e:df['e']=e.values分配...
  • 如何添加一个新的到Spark DataFrame(使用PySpark) 方法一:不能将任意添加到Spark中的DataFrame。新只能使用literal创建。 from pyspark.sql.functions import lit df = sqlContext.createDataFrame( [(1,...
  • Spark DataFrame 添加索引的三种方法

    万次阅读 2018-10-25 21:39:22
    刚开始用Spark,操作dataframe不是很熟练,遇到的第个问题是给dataframe添加索引,查阅了网上的一些教程,大都是用Scala语言编写的代码,下面给出自己用python写的三种方法。 方法:先创建Pandas版本的...

空空如也

空空如也

1 2 3 4 5 ... 20
收藏数 439
精华内容 175
关键字:

dataframe添加一列