使用 PySpark 将每一行的每一列作为单独的文件写入 S3

Posted

技术标签:

【中文标题】使用 PySpark 将每一行的每一列作为单独的文件写入 S3【英文标题】:Write each column of each row as a separate file to S3 using PySpark 【发布时间】:2020-07-03 20:36:43 【问题描述】:

我有一个用例,程序需要将数据帧中的每一列作为单独的文件写入 S3 或 EMR 上的 HDFS。我正在对原始数据进行一些处理,输出数据框如下所示;

+------+--------------------+--------------------+--------------------+--------------------+
|    id|         processed_1|         processed_2|         processed_3|               error|
+------+--------------------+--------------------+--------------------+--------------------+
|324650|some processed data |some processed data | some processed data|                null|
+------+--------------------+--------------------+--------------------+--------------------+

对于 3 列 processed_1, processed_2, processed_3, 我想将每一行的每一列存储在一个单独的文件中。我有 10 万行已处理的数据。我尝试使用 UDF 和 Python 来实现;

def writeToDisk(doc_id,error, processed_1, processed_2, processed_3):
    
    try:
        if error is None:
            with open(r'hdfs://processed_1.json'.format(doc_id),'w',encoding='utf-8') as f:
                f.write(processed_1)

            with open(r'hdfs://processed_2.json'.format(doc_id),'w') as f:
                f.write(processed_2)
                
            with open(r'hdfs://processed_3.json'.format(doc_id),'w') as f:
                f.write(processed_3)

            return "SUCCESS"
        
        else:
            error_prefix=' - root - ERROR - '.format(datetime.now().strftime("%Y-%m-%d %H:%M:%S"))

            final_error_msg=''.format(error_prefix,error)

            with open(r'hdfs://error.log'.format(doc_id),'w') as f:
                f.write(unprocessed_html)

            
            return "SUCCESS"
    
    except Exception as e:

        with open(r'hdfs://error.log','w') as f:
            f.write("Failed : ".format(str(e)))

        return "FAILED"

并将上述函数注册为udf并在as中使用;

store_data_udf = udf(writeToDisk, StringType())

stored_data = final_data.withColumn("store_results",store_data_udf("id","error","processed_1","processed_2","processed_3"))

上述方法不起作用。我不确定我在这里缺少什么。

任何关于如何完成这项任务的想法都将受到高度赞赏。

【问题讨论】:

【参考方案1】:

您无法使用 python 写入文件函数写入 HDFS。相反,您可以创建 3 个具有所需列的单独数据帧并将其写入 hdfs/s3。

from pyspark.sql import SparkSession
from pyspark.sql.functions import  monotonically_increasing_id

spark = SparkSession.builder.getOrCreate()
sc = spark.sparkContext

file_1 = "id": 1, "error": 20, 'processed_1': "test", 'processed_2': "test2", 'processed_3': "test3"

file_2 = "id": 2, "error": 30, 'processed_1': "test5", 'processed_2': "test6", 'processed_3': "test7"

final_data = spark.read.json(sc.parallelize([file_1,file_2]))

df1=final_data.select("id","error","processed_1").withColumn("num", monotonically_increasing_id())
df2=final_data.select("id","error","processed_2").withColumn("num", monotonically_increasing_id())
df3=final_data.select("id","error","processed_3").withColumn("num", monotonically_increasing_id())

df1.coalesce(1).write.partitionBy("num").parquet("df1/")
df2.coalesce(1).write.partitionBy("num").parquet("df2/")
df3.coalesce(1).write.partitionBy("num").parquet("df3/")


【讨论】:

但它会存储整个列。我想将每一行的每一列存储在一个单独的文件中。虽然,这是一个开始...... 您可以为每条记录添加一个行号并使用partitionBy 那个新列,因此每条记录都将存储在每个文件夹中。你可以试试,或者我可以更新答案。 让我试试这个。 @srikanthholur,为什么 monotonically_increasing_id() 而不是 lit(1)lit(2)lit(3) 将这些存储在 3 个不同的目录中? 他需要将每条记录放在单独的文件中。 Lit 将为所有记录添加相同的值。

以上是关于使用 PySpark 将每一行的每一列作为单独的文件写入 S3的主要内容,如果未能解决你的问题,请参考以下文章

PySpark 传递列表到用户定义函数

pyspark:在数据框的每一列中获取唯一项目

可以单独对多个列进行 GROUP BY 并使用 django ORM 将它们中的每一列聚合到其他列?

rollapplyr()仅创建一个新列,而不为.SDcols中的每一列创建多个列

在没有数据重复的情况下爆炸数据框的每一列

如何使用python访问csv文件中的每一列