使用 PySpark 将每一行的每一列作为单独的文件写入 S3
Posted
技术标签:
【中文标题】使用 PySpark 将每一行的每一列作为单独的文件写入 S3【英文标题】:Write each column of each row as a separate file to S3 using PySpark 【发布时间】:2020-07-03 20:36:43 【问题描述】:我有一个用例,程序需要将数据帧中的每一列作为单独的文件写入 S3 或 EMR 上的 HDFS。我正在对原始数据进行一些处理,输出数据框如下所示;
+------+--------------------+--------------------+--------------------+--------------------+
| id| processed_1| processed_2| processed_3| error|
+------+--------------------+--------------------+--------------------+--------------------+
|324650|some processed data |some processed data | some processed data| null|
+------+--------------------+--------------------+--------------------+--------------------+
对于 3 列 processed_1
, processed_2
, processed_3
,
我想将每一行的每一列存储在一个单独的文件中。我有 10 万行已处理的数据。我尝试使用 UDF 和 Python 来实现;
def writeToDisk(doc_id,error, processed_1, processed_2, processed_3):
try:
if error is None:
with open(r'hdfs://processed_1.json'.format(doc_id),'w',encoding='utf-8') as f:
f.write(processed_1)
with open(r'hdfs://processed_2.json'.format(doc_id),'w') as f:
f.write(processed_2)
with open(r'hdfs://processed_3.json'.format(doc_id),'w') as f:
f.write(processed_3)
return "SUCCESS"
else:
error_prefix=' - root - ERROR - '.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))
final_error_msg=''.format(error_prefix,error)
with open(r'hdfs://error.log'.format(doc_id),'w') as f:
f.write(unprocessed_html)
return "SUCCESS"
except Exception as e:
with open(r'hdfs://error.log','w') as f:
f.write("Failed : ".format(str(e)))
return "FAILED"
并将上述函数注册为udf并在as中使用;
store_data_udf = udf(writeToDisk, StringType())
stored_data = final_data.withColumn("store_results",store_data_udf("id","error","processed_1","processed_2","processed_3"))
上述方法不起作用。我不确定我在这里缺少什么。
任何关于如何完成这项任务的想法都将受到高度赞赏。
【问题讨论】:
【参考方案1】:您无法使用 python 写入文件函数写入 HDFS。相反,您可以创建 3 个具有所需列的单独数据帧并将其写入 hdfs/s3。
from pyspark.sql import SparkSession
from pyspark.sql.functions import monotonically_increasing_id
spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext
file_1 = "id": 1, "error": 20, 'processed_1': "test", 'processed_2': "test2", 'processed_3': "test3"
file_2 = "id": 2, "error": 30, 'processed_1': "test5", 'processed_2': "test6", 'processed_3': "test7"
final_data = spark.read.json(sc.parallelize([file_1,file_2]))
df1=final_data.select("id","error","processed_1").withColumn("num", monotonically_increasing_id())
df2=final_data.select("id","error","processed_2").withColumn("num", monotonically_increasing_id())
df3=final_data.select("id","error","processed_3").withColumn("num", monotonically_increasing_id())
df1.coalesce(1).write.partitionBy("num").parquet("df1/")
df2.coalesce(1).write.partitionBy("num").parquet("df2/")
df3.coalesce(1).write.partitionBy("num").parquet("df3/")
【讨论】:
但它会存储整个列。我想将每一行的每一列存储在一个单独的文件中。虽然,这是一个开始...... 您可以为每条记录添加一个行号并使用partitionBy
那个新列,因此每条记录都将存储在每个文件夹中。你可以试试,或者我可以更新答案。
让我试试这个。
@srikanthholur,为什么 monotonically_increasing_id()
而不是 lit(1)
、lit(2)
和 lit(3)
将这些存储在 3 个不同的目录中?
他需要将每条记录放在单独的文件中。 Lit 将为所有记录添加相同的值。以上是关于使用 PySpark 将每一行的每一列作为单独的文件写入 S3的主要内容,如果未能解决你的问题,请参考以下文章
可以单独对多个列进行 GROUP BY 并使用 django ORM 将它们中的每一列聚合到其他列?