如何有效地将大型 .tsv 文件上传到 pyspark 中具有拆分列的 Hive 表?

Posted

技术标签:

【中文标题】如何有效地将大型 .tsv 文件上传到 pyspark 中具有拆分列的 Hive 表?【英文标题】:How to efficiently upload a large .tsv file to a Hive table with split columns in pyspark? 【发布时间】:2019-08-08 11:45:06 【问题描述】:

我有一个大(约 1000 万行).tsv 文件,其中包含“id”和“group”两列。 'Group' 列实际上是某个 id 所属的所有组的列表,因此文件如下所示:

id1     group1,group2
id2     group2,group3,group4
id3     group1
...

我需要使用 pyspark 将其上传到 Hive 表,但是我想拆分组列,以便一行中只有一个组,因此生成的表如下所示:

id1    group1
id1    group2
id2    group2
id2    group3
id2    group4
id3    group1

我已尝试逐行阅读,只需使用 python split() 拆分列,然后为每一行创建 spark 数据框并将其与每次迭代合并。我的代码有效,但效率极低,因为处理 1000 行代码需要 2 分钟。我的代码如下:

fields = [StructField('user_id', StringType(), True),StructField('group_id', StringType(), True)] 
membership_schema = StructType(fields) 

result_df = sqlContext.createDataFrame(sc.emptyRDD(), membership_schema)

with open('file.tsv','r') as f:
    for line in f:
        parts = line.split()
        id_part = parts[0]
        audience_parts = parts[1].split(',')
        for item in audience_parts:
            newRow = sqlContext.createDataFrame([(id_part,item)], membership_schema)
            result_df = result_df.union(newRow)
df_writer = DataFrameWriter(result_df)
df_writer.insertInto("my_table_in_hive")

有没有一种更简单、更有效的方法可以将整个文件上传到表格中而无需遍历行?

感谢您的帮助。

【问题讨论】:

【参考方案1】:

我查看了上述代码的计划,它似乎扫描了很多,也没有为您提供与 spark 的并行性。 您可以使用 spark 原生方法将文件数据读入更多分区,并控制它们在分区之间均匀分布数据。

df = sc.textFile(file_path,10).map(lambda x: x.split()).map(lambda x :(x[0],x[1].split(","))).toDF(['id','group'])
from pyspark.sql.functions import explode
newdf = df.withColumn("group", explode(df.group))

newdf.write.format("orc").option("header", "true").mode("overwrite").saveAsTable('db.yourHivetable')

您还可以增加或减少要爆炸的分区的大小或控制您的随机分区。

spark.conf.set("spark.sql.files.maxPartitionBytes","30")
spark.conf.set("spark.sql.shuffle.partitions", "100")

【讨论】:

谢谢,这正是我需要的。我不知道爆炸功能,它做我想要的。我尝试了代码,它在不到 15 分钟的时间内处理了我的整个文件,再次感谢。 @Pixeluz。别客气。 :-) 您可以根据数据量增加或减少分区号。这也将帮助您实现更多的并行性。

以上是关于如何有效地将大型 .tsv 文件上传到 pyspark 中具有拆分列的 Hive 表?的主要内容,如果未能解决你的问题,请参考以下文章

如何有效地将大型数据框拆分为多个拼花文件?

有效地将许多大型 CSV 文件中的 XYZ 坐标排序到小图块中

如何有效且快速地将大型 (6 Gb) .csv 文件导入 R,而不会导致 R REPL 崩溃?

如何有效地将 hadoop 与大型 MySQL 数据库一起使用?

如何有效地将大文件加载到 IndexedDB 存储中?我的应用程序在超过 100,000 行时崩溃

如何有效地将 Postgres 数据从 Query 传输到 S3