将 Spark Dataframe 中的每一行保存到不同的文件中
Posted
技术标签:
【中文标题】将 Spark Dataframe 中的每一行保存到不同的文件中【英文标题】:Save each row in Spark Dataframe into different file 【发布时间】:2018-07-29 00:22:38 【问题描述】:我使用以下结构构造了一个 spark DataFrame:
root
|-- tickers: string (nullable = true)
|-- name: string (nullable = true)
|-- price: array (nullable = true)
| |-- element: map (containsNull = true)
| | |-- key: string
| | |-- value: map (valueContainsNull = true)
| | | |-- key: string
| | | |-- value: string (valueContainsNull = true)
我想将price
中的每个对象保存到一个单独的 JSON 文件中,并让每个保存的文件都使用相应的 name
字符串作为文件名。有没有办法在 Python 环境中实现这一点?
我发现最相关的解决方案是将数据帧重新分区为数据帧中“行”数的分区,并使用.write.csv()
(请参阅https://***.com/a/49890590/6158414)。但这不符合我将“行”保存到具有不同文件名的单独文件中的需要。
提供更多上下文。我正在使用 spark 调用 API 并并行检索数据。 spark数据框中的每一行都是基于tickers
唯一值的数据查询。我过程的最后一步是分别保存每个查询结果。如果有人有更好的方法来做到这一点,也将不胜感激。
非常感谢!
【问题讨论】:
每个文件有多少行? 我想你可以从这里得到解决方案的第一部分:***.com/a/46738678/3389828(它是Java,我相信你可以在Python中找到类似的方法)然后使用***.com/a/49890590/6158414编写CSV。df.write.partitionBy("tickers").json(...)
,但是每行写一个文件,不会扩展(只是一些必需的文件描述符可能是个问题)。
@thebluephantom 每个price
元素要保存为一个文件,是一组地图;数组长度在 1000 左右,每个 map 有 4 个键值对。
【参考方案1】:
您可以编写 Spark UDF 将每个对象/元素保存到不同的 CSV 文件。
下面是一个示例,它将每一行写入一个单独的文件。在您的情况下,您只需要修改 UDF,遍历 Price 列的元素并将它们写入单独的文件。
>>> import csv
>>> from pyspark.sql.functions import udf, col
>>> from pyspark.sql.types import StringType
>>>
>>> list = [("1", "name1"), ("2", "name2"), ("3", "name3"), ("4", "name4")]
>>>
>>> df = spark.createDataFrame(list, ["id", "name"])
>>>
>>> df.show()
+---+-----+
| id| name|
+---+-----+
| 1|name1|
| 2|name2|
| 3|name3|
| 4|name4|
+---+-----+
>>> # UDF that takes 2 columns and return if the file is saved successfully.
>>> def writeToCsv(x, y):
... myData = [["id", "name"]]
... fileName = x + '.csv' # Modify the file name, add path if required.
... myFile = open(fileName, 'w')
... with myFile:
... writer = csv.writer(myFile)
... myData = myData + [[x, y]]
... writer.writerows(myData)
... return "SAVED"
...
>>> # Register UDF with StringType Return type.
>>> save_udf = udf(writeToCsv, StringType())
>>> # Invoke UDF for each row of the Dataframe.
... out_df = df.withColumn("processed", save_udf(df.id, df.name))
>>>
>>> # Check if all the rows are processed successfully.
>>> out_df.show()
+---+-----+---------+
| id| name|processed|
+---+-----+---------+
| 1|name1| SAVED|
| 2|name2| SAVED|
| 3|name3| SAVED|
| 4|name4| SAVED|
+---+-----+---------+
【讨论】:
以上是关于将 Spark Dataframe 中的每一行保存到不同的文件中的主要内容,如果未能解决你的问题,请参考以下文章
将 Spark Dataframe 中的多个列发送到外部 API 并将结果存储在单独的列中
如何将 DataFrame 中的每一行/单元格值转换为 pandas 中的字典列表?
将一个 pandas DataFrame 的副本合并到另一个 DataFrame 的每一行中?
pyspark dataframe foreach 填充列表
在 pyspark 的 StructStreaming 中;如何将 DataFrame 中的每一行(json 格式的字符串)转换为多列