从另一个 DataFrame 添加一列

Posted

技术标签:

【中文标题】从另一个 DataFrame 添加一列【英文标题】:Add a column from another DataFrame 【发布时间】:2017-03-17 09:32:19 【问题描述】:

在 Scala Spark 中,我可以轻松地将列添加到现有的 Dataframe 写入

val newDf = df.withColumn("date_min", anotherDf("date_min"))

在 PySpark 中这样做会产生 AnalysisException

这就是我正在做的事情:

minDf.show(5)
maxDf.show(5)
+--------------------+
|            date_min|
+--------------------+
|2016-11-01 10:50:...|
|2016-11-01 11:46:...|
|2016-11-01 19:23:...|
|2016-11-01 17:01:...|
|2016-11-01 09:00:...|
+--------------------+
only showing top 5 rows

+--------------------+
|            date_max|
+--------------------+
|2016-11-01 10:50:...|
|2016-11-01 11:46:...|
|2016-11-01 19:23:...|
|2016-11-01 17:01:...|
|2016-11-01 09:00:...|
+--------------------+
only showing top 5 rows

然后,是什么导致了错误:

newDf = minDf.withColumn("date_max", maxDf["date_max"])

AnalysisExceptionTraceback (most recent call last)
<ipython-input-13-7e19c841fa51> in <module>()
      2 maxDf.show(5)
      3 
----> 4 newDf = minDf.withColumn("date_max", maxDf["date_max"])

/opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/dataframe.pyc in withColumn(self, colName, col)
   1491         """
   1492         assert isinstance(col, Column), "col should be Column"
-> 1493         return DataFrame(self._jdf.withColumn(colName, col._jc), self.sql_ctx)
   1494 
   1495     @ignore_unicode_prefix

/opt/spark-2.1.0-bin-hadoop2.7/python/lib/py4j-0.10.4-src.zip/py4j/java_gateway.py in __call__(self, *args)
   1131         answer = self.gateway_client.send_command(command)
   1132         return_value = get_return_value(
-> 1133             answer, self.gateway_client, self.target_id, self.name)
   1134 
   1135         for temp_arg in temp_args:

/opt/spark-2.1.0-bin-hadoop2.7/python/pyspark/sql/utils.pyc in deco(*a, **kw)
     67                                              e.java_exception.getStackTrace()))
     68             if s.startswith('org.apache.spark.sql.AnalysisException: '):
---> 69                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)
     70             if s.startswith('org.apache.spark.sql.catalyst.analysis'):
     71                 raise AnalysisException(s.split(': ', 1)[1], stackTrace)

AnalysisException: u'resolved attribute(s) date_max#67 missing from date_min#66 in operator !Project [date_min#66, date_max#67 AS date_max#106];;\n!Project [date_min#66, date_max#67 AS date_max#106]\n+- Project [date_min#66]\n   +- Project [cast((cast(date_min#6L as double) / cast(1000 as double)) as timestamp) AS date_min#66, cast((cast(date_max#7L as double) / cast(1000 as double)) as timestamp) AS date_max#67]\n      +- SubqueryAlias df, `df`\n         +- LogicalRDD [idvisiteur#5, date_min#6L, date_max#7L, sales_sum#8, sales_count#9L]\n'

【问题讨论】:

如果df的长度相同,你可以尝试加入吗 【参考方案1】:
from pyspark.sql.functions import monotonically_increasing_id, row_number
from pyspark.sql.window import Window

minDf = sc.parallelize([['2016-11-01 10:50:00'],['2016-11-01 11:46:00']]).toDF(["date_min"])
maxDf = sc.parallelize([['2016-11-01 10:50:00'],['2016-11-01 11:46:00']]).toDF(["date_max"])

# since there is no common column between these two dataframes add row_index so that it can be joined
minDf=minDf.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))
maxDf=maxDf.withColumn('row_index', row_number().over(Window.orderBy(monotonically_increasing_id())))

minDf = minDf.join(maxDf, on=["row_index"]).drop("row_index")
minDf.show()

输出是:

+-------------------+-------------------+
|           date_min|           date_max|
+-------------------+-------------------+
|2016-11-01 10:50:00|2016-11-01 10:50:00|
|2016-11-01 11:46:00|2016-11-01 11:46:00|
+-------------------+-------------------+

【讨论】:

这是不正确的,因为monotonically_increasing_id() 依赖于分区,所以原则上你不能保证两个数据帧最终会得到相同的索引。 这非常适合我的用例,只需省略排序:D=new.join(y_test, on=["row_inedx]); D.show()【参考方案2】:

简短的回答是 Spark DataFrame API 不支持此功能,至少在 Spark 2.x 中不支持。但是,您可以编写一个辅助函数来实现类似的功能。

首先让我们创建一些测试数据:

minDf = sc.parallelize(['2016-11-01','2016-11-02','2016-11-03']).map(lambda x: (x, )).toDF(['date_min'])
maxDf = sc.parallelize(['2016-12-01','2016-12-02','2016-12-03']).map(lambda x: (x, )).toDF(['date_max'])

然后,您可以使用zip 组合两个数据帧,前提是数据帧的分区相同:

from pyspark.sql.types import StructType

def zip_df(l, r):
    return l.rdd.zip(r.rdd).map(lambda x: (x[0][0],x[1][0])).toDF(StructType([l.schema[0],r.schema[0]]))

combined = zip_df(minDf, maxDf.select('date_max'))
combined.show()

【讨论】:

记录在案 - Spark 不支持此功能,尤其是 PySpark。【参考方案3】:

您可以为这两个数据框创建一个索引,然后加入它们;像这样:

from pyspark.sql.window import Window
from pyspark.sql.functions import monotonically_increasing_id, row_number
w = Window.orderBy(monotonically_increasing_id())

minDf=  minDf.withColumn("columnindex", row_number().over(w))
maxDf=  maxDf.withColumn("columnindex", row_number().over(w))

minDf= minDf.join(maxDf, df1_in_indexed.columnindex == maxDf.columnindex, 'inner').drop(maxDf.columnindex)

【讨论】:

以上是关于从另一个 DataFrame 添加一列的主要内容,如果未能解决你的问题,请参考以下文章

向 DataFrame 添加一列,其值为 1,其中预测大于自定义阈值

在Spark Dataframe中的列列表中添加一列rowums

向熊猫数据框添加一列

Spark / Scala如何编写一个复杂的查询来遍历数据框并添加一列

使用 Python 在日期末尾添加一年

将列添加到包含其他列值列表的 pandas DataFrame