如何将数据框中的连接值插入到 Pyspark 中的另一个数据框中?

Posted

技术标签:

【中文标题】如何将数据框中的连接值插入到 Pyspark 中的另一个数据框中?【英文标题】:How to insert concatenated values from a data-frame into another data-frame in Pyspark? 【发布时间】:2019-05-30 16:13:25 【问题描述】:

我正在创建一个 time_interval 列并将其添加到现有的 Pyspark 中的数据框。理想情况下,time_interval 将采用“HHmm”格式,分钟向下舍入到最接近的 15 分钟标记(815、830、845、900 等)。

我有为我执行逻辑的 spark sql 代码,但我如何获取连接为字符串列的值并将其插入现有数据帧?

time_interval = sqlContext.sql("select extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15")

time_interval.show()

+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|concat(CAST(hour(current_timestamp()) AS STRING), CAST((FLOOR((CAST(minute(current_timestamp()) AS DOUBLE) / CAST(15 AS DOUBLE))) * CAST(15 AS BIGINT)) AS STRING))|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|                                                                                                                                                               1045|
+-------------------------------------------------------------------------------------------------------------------------------------------------------------------+

baseDF = sqlContext.sql("select * from test_table")
newBase = baseDF.withColumn("time_interval", lit(str(time_interval)))

newBase.select("time_interval").show()

+--------------------+
|       time_interval|
+--------------------+
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
|DataFrame[concat(...|
+--------------------+
only showing top 20 rows

因此,实际的预期结果应该只是在我正在创建的新列中显示实际的字符串值,而不是来自数据帧的连接值。如下所示:

newBase.select("time_interval").show(1)
+-------------+
|time_interval|
+-------------+
|    1045     |                                                                                                                                           
+-------------+

【问题讨论】:

试试这个:newBase = baseDF.selectExpr("*, extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15 AS time_interval") 感谢 pault,“selectExpr” 的作用就像一个魅力! 【参考方案1】:

由于time_interval是一种数据帧类型,对于这种情况需要collectextract the required value out from dataframe

试试这个方法:

newBase = baseDF.withColumn("time_interval", lit(str(time_interval.collect()[0][0])))
newBase.show()

(或)

通过使用select(expr())函数:

newBase = baseDF.select("*",expr("string(extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15) AS time_interval"))

正如cmets中提到的pault,使用selectExpr()函数:

newBase = baseDF.selectExpr("*","string(extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15) AS time_interval")

示例:

>>> from pyspark.sql.functions import *
>>> from pyspark.sql.types import IntegerType
>>> time_interval = spark.sql("select extract(hour from current_timestamp())||floor(extract(minute from current_timestamp())/15)*15")
>>> baseDF=spark.createDataFrame([1,2,3,4],IntegerType())
>>> newBase = baseDF.withColumn("time_interval", lit(str(time_interval.collect()[0][0])))
>>> newBase.show()
+-----+-------------+
|value|time_interval|
+-----+-------------+
|    1|         1245|
|    2|         1245|
|    3|         1245|
|    4|         1245|
+-----+-------------+

【讨论】:

以上是关于如何将数据框中的连接值插入到 Pyspark 中的另一个数据框中?的主要内容,如果未能解决你的问题,请参考以下文章

如何将 CSV 值与 pyspark 数据框中的单元格中的值分别分隔为新列及其值

Pyspark:如何将现有非空列的元组列表作为数据框中的列值之一返回

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

如何将每一列映射到pyspark数据框中的其他列?

如何通过 Pyspark 中同一数据框中另一列的正则表达式值过滤数据框中的一列

如何拆分对象列表以分隔pyspark数据框中的列