将 Spark Dataframe 中的每一行保存到不同的文件中

Posted

技术标签:

【中文标题】将 Spark Dataframe 中的每一行保存到不同的文件中【英文标题】:Save each row in Spark Dataframe into different file 【发布时间】:2018-07-29 00:22:38 【问题描述】:

我使用以下结构构造了一个 spark DataFrame:

root
 |-- tickers: string (nullable = true)
 |-- name: string (nullable = true)
 |-- price: array (nullable = true)
 |    |-- element: map (containsNull = true)
 |    |    |-- key: string
 |    |    |-- value: map (valueContainsNull = true)
 |    |    |    |-- key: string
 |    |    |    |-- value: string (valueContainsNull = true)

我想将price 中的每个对象保存到一个单独的 JSON 文件中,并让每个保存的文件都使用相应的 name 字符串作为文件名。有没有办法在 Python 环境中实现这一点?

我发现最相关的解决方案是将数据帧重新分区为数据帧中“行”数的分区,并使用.write.csv()(请参阅https://***.com/a/49890590/6158414)。但这不符合我将“行”保存到具有不同文件名的单独文件中的需要。

提供更多上下文。我正在使用 spark 调用 API 并并行检索数据。 spark数据框中的每一行都是基于tickers唯一值的数据查询。我过程的最后一步是分别保存每个查询结果。如果有人有更好的方法来做到这一点,也将不胜感激。

非常感谢!

【问题讨论】:

每个文件有多少行? 我想你可以从这里得到解决方案的第一部分:***.com/a/46738678/3389828(它是Java,我相信你可以在Python中找到类似的方法)然后使用***.com/a/49890590/6158414编写CSV。 df.write.partitionBy("tickers").json(...),但是每行写一个文件,不会扩展(只是一些必需的文件描述符可能是个问题)。 @thebluephantom 每个price 元素要保存为一个文件,是一组地图;数组长度在 1000 左右,每个 map 有 4 个键值对。 【参考方案1】:

您可以编写 Spark UDF 将每个对象/元素保存到不同的 CSV 文件。

下面是一个示例,它将每一行写入一个单独的文件。在您的情况下,您只需要修改 UDF,遍历 Price 列的元素并将它们写入单独的文件。

>>> import csv
>>> from pyspark.sql.functions import udf, col
>>> from pyspark.sql.types import StringType
>>>
>>> list = [("1", "name1"), ("2", "name2"), ("3", "name3"), ("4", "name4")]
>>>
>>> df = spark.createDataFrame(list, ["id", "name"])
>>>
>>> df.show()
+---+-----+
| id| name|
+---+-----+
|  1|name1|
|  2|name2|
|  3|name3|
|  4|name4|
+---+-----+

>>> # UDF that takes 2 columns and return if the file is saved successfully.
>>> def writeToCsv(x, y):
...   myData = [["id", "name"]]
...   fileName = x + '.csv'   # Modify the file name, add path if required.
...   myFile = open(fileName, 'w')
...   with myFile:
...     writer = csv.writer(myFile)
...     myData = myData + [[x, y]]
...     writer.writerows(myData)
...   return "SAVED"
...
>>>  # Register UDF with StringType Return type.
>>> save_udf = udf(writeToCsv, StringType())
>>>  # Invoke UDF for each row of the Dataframe.
... out_df = df.withColumn("processed", save_udf(df.id, df.name))
>>>
>>> # Check if all the rows are processed successfully.
>>> out_df.show() 
+---+-----+---------+
| id| name|processed|
+---+-----+---------+
|  1|name1|    SAVED|
|  2|name2|    SAVED|
|  3|name3|    SAVED|
|  4|name4|    SAVED|
+---+-----+---------+ 

【讨论】:

以上是关于将 Spark Dataframe 中的每一行保存到不同的文件中的主要内容,如果未能解决你的问题,请参考以下文章

spark下dataframe转为rdd格式

将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中

如何将 DataFrame 中的每一行/单元格值转换为 pandas 中的字典列表?

将一个 pandas DataFrame 的副本合并到另一个 DataFrame 的每一行中?

pyspark dataframe foreach 填充列表

在 pyspark 的 StructStreaming 中;如何将 DataFrame 中的每一行(json 格式的字符串)转换为多列