如何使用分隔符连接 PySpark 中的多列?

Posted

技术标签:

【中文标题】如何使用分隔符连接 PySpark 中的多列?【英文标题】:How to concatenate multiple columns in PySpark with a separator? 【发布时间】:2019-11-25 13:07:07 【问题描述】:

我有一个pyspark Dataframe,我想加入 3 个专栏。

id |  column_1   | column_2    | column_3
--------------------------------------------
1  |     12      |   34        |    67
--------------------------------------------
2  |     45      |   78        |    90
--------------------------------------------
3  |     23      |   93        |    56
--------------------------------------------

我想加入 3 列:column_1, column_2, column_3 仅在其中一个添加值 "-"

预期结果:

id |  column_1   | column_2    | column_3    |   column_join
-------------------------------------------------------------
1  |     12      |     34      |     67      |   12-34-67
-------------------------------------------------------------
2  |     45      |     78      |     90      |   45-78-90
-------------------------------------------------------------
3  |     23      |     93      |     56      |   23-93-56
-------------------------------------------------------------

如何在 pyspark 中做到这一点? 谢谢

【问题讨论】:

【参考方案1】:

很简单:

from pyspark.sql.functions import col, concat, lit

df = df.withColumn("column_join", concat(col("column_1"), lit("-"), col("column_2"), lit("-"), col("column_3")))

使用concat- 分隔符连接所有列,为此您需要使用lit

如果不能直接使用,可以使用cast将列类型改为字符串,col("column_1").cast("string")

更新

或者您可以使用更动态的方法,使用内置函数concat_ws

pyspark.sql.functions.concat_ws(sep, *cols)

Concatenates multiple input string columns together into a single string column, using the given separator.

>>> df = spark.createDataFrame([('abcd','123')], ['s', 'd'])
>>> df.select(concat_ws('-', df.s, df.d).alias('s')).collect()
[Row(s=u'abcd-123')]

代码:

from pyspark.sql.functions import col, concat_ws

concat_columns = ["column_1", "column_2", "column_3"]
df = df.withColumn("column_join", concat_ws("-", *[F.col(x) for x in concat_columns]))

【讨论】:

如果您对此问题有任何建议,我需要您的帮助。 ***.com/questions/59197109/… @verojoucla 请关注 pault 对该问题的评论。你的答案确实已经得到了回答 谢谢你的回答,我问了一个新问题,但是为了寻找我的解决方案,你能看看吗,我只想在每个数据框中创建一个包含 monotonically_increasing_id 的新列,请看***.com/questions/59211575/… 你对这个问题有什么想法吗? ***.com/questions/59931770/… 谢谢【参考方案2】:

这是一个generic/dynamic 的方法,而不是manually 连接它。我们只需要指定需要连接的列。

# Importing requisite functions.
from pyspark.sql.functions import col, udf

# Creating the DataFrame
df = spark.createDataFrame([(1,12,34,67),(2,45,78,90),(3,23,93,56)],['id','column_1','column_2','column_3'])

现在,指定我们要连接的列列表,以- 分隔。

list_of_columns_to_join = ['column_1','column_2','column_3']

最后,创建一个UDF。请注意,基于 UDF 的解决方案会明显变慢。

def concat_cols(*list_cols):
    return '-'.join(list([str(i) for i in list_cols]))

concat_cols = udf(concat_cols)
df = df.withColumn('column_join', concat_cols(*list_of_columns_to_join))
df.show()
+---+--------+--------+--------+-----------+
| id|column_1|column_2|column_3|column_join|
+---+--------+--------+--------+-----------+
|  1|      12|      34|      67|   12-34-67|
|  2|      45|      78|      90|   45-78-90|
|  3|      23|      93|      56|   23-93-56|
+---+--------+--------+--------+-----------+

【讨论】:

以上是关于如何使用分隔符连接 PySpark 中的多列?的主要内容,如果未能解决你的问题,请参考以下文章

如何在多列上旋转 PySpark 中的 DataFrame?

如何将 map_keys() 中的值拆分为 PySpark 中的多列

如何在 pyspark 中对 spark 数据框中的多列求和?

如何在 pyspark 中对 spark 数据框中的多列求和?

数据框在多列上连接,pyspark中的列有一些条件[重复]

在 pyspark 的 StructStreaming 中;如何将 DataFrame 中的每一行(json 格式的字符串)转换为多列