如何根据行的内容拆分pyspark数据框

Posted

技术标签:

【中文标题】如何根据行的内容拆分pyspark数据框【英文标题】:How to split the pyspark dataframe based on the content of the line 【发布时间】:2019-10-10 06:58:35 【问题描述】:

我想根据 Pyspark DataFrame 中行的第一个字符来拆分文件。

原始数据有一列,数据包括

    文件名(如'DATE20191009')

    文件内容(如'1'、'2'、'3')

输入示例文件(Pyspark DataFrame):

column1

Date20191009

1

2

3

Date20191010

1

4

5

我想获得一个 Pyspark DataFrame,其文件名作为数据的拆分。

文件名放在DataFrame的column1,文件的内容放在DataFrame的column2。

预期输出(Pyspark 数据帧)

column1  column2
Date20191009 [1,2,3]
Date20191010 [1,4,5]

我尝试了 Pandas DataFramePyspark DataFrame.collect(),但都因数据量过大(超过 900 万行)而失败。

【问题讨论】:

请提供拆分信息,例如您将考虑第 1 列的哪个值和第 2 列的哪个值以及第一次出现的含义 感谢您的建议,我更详细地描述了我的问题。 谢谢,但我想要更多细节。您的最终名称将具有任何模式,以便我可以在文件名和文件内容中区分它。对于您的示例,我可以看到文件名的名称中始终包含 DATE 并且内容具有数值。是这种情况,请确认。 原始数据中,文件名以DATE开头加具体日期,文件内容为中文,这里我用NUMBER代替。 【参考方案1】:
>>> from pyspark.sql.window import Window
>>> from pyspark.sql.functions import *
>>> w = Window.rowsBetween(Window.unboundedPreceding, 0)

   #Input DataFrame

>>> df.show()
+------------+
|     column1|
+------------+
|Date20191009|
|           1|
|           2|
|           3|
|Date20191010|
|           1|
|           4|
|           5|
+------------+

>>> df1 = df.withColumn('tmp', when(df.column1.startswith('Date'), df.column1).otherwise(None)).withColumn('temp', last('tmp', True).over(w)).drop('tmp')
>>> df1.show()

+------------+------------+
|     column1|        temp|
+------------+------------+
|Date20191009|Date20191009|
|           1|Date20191009|
|           2|Date20191009|
|           3|Date20191009|
|Date20191010|Date20191010|
|           1|Date20191010|
|           4|Date20191010|
|           5|Date20191010|
+------------+------------+

>>> df1.filter(df1.column1 != df1.temp).groupBy(df1.temp).agg(concat_ws(',',collect_list(df1.column1)).alias('column2')).withColumnRenamed("temp", "column1").show()

+------------+-------+
|     column1|column2|
+------------+-------+
|Date20191009|  1,2,3|
|Date20191010|  1,4,5|
+------------+-------+

【讨论】:

以上是关于如何根据行的内容拆分pyspark数据框的主要内容,如果未能解决你的问题,请参考以下文章

如何在限制行数的同时拆分 Pyspark 数据帧?

为 pyspark 数据帧的每一行评估多个 if elif 条件

如何拆分pyspark数据框并创建新列

如何将存储在 HDFS 中包含行的文本文件转换为 Pyspark 中的数据框?

如何在 PySpark 中拆分数据框列

使用 sql 或 pandas 数据框获取前 5 行的 pyspark 数据框