如何有效地将大型 .tsv 文件上传到 pyspark 中具有拆分列的 Hive 表?
Posted
技术标签:
【中文标题】如何有效地将大型 .tsv 文件上传到 pyspark 中具有拆分列的 Hive 表?【英文标题】:How to efficiently upload a large .tsv file to a Hive table with split columns in pyspark? 【发布时间】:2019-08-08 11:45:06 【问题描述】:我有一个大(约 1000 万行).tsv 文件,其中包含“id”和“group”两列。 'Group' 列实际上是某个 id 所属的所有组的列表,因此文件如下所示:
id1 group1,group2
id2 group2,group3,group4
id3 group1
...
我需要使用 pyspark 将其上传到 Hive 表,但是我想拆分组列,以便一行中只有一个组,因此生成的表如下所示:
id1 group1
id1 group2
id2 group2
id2 group3
id2 group4
id3 group1
我已尝试逐行阅读,只需使用 python split() 拆分列,然后为每一行创建 spark 数据框并将其与每次迭代合并。我的代码有效,但效率极低,因为处理 1000 行代码需要 2 分钟。我的代码如下:
fields = [StructField('user_id', StringType(), True),StructField('group_id', StringType(), True)]
membership_schema = StructType(fields)
result_df = sqlContext.createDataFrame(sc.emptyRDD(), membership_schema)
with open('file.tsv','r') as f:
for line in f:
parts = line.split()
id_part = parts[0]
audience_parts = parts[1].split(',')
for item in audience_parts:
newRow = sqlContext.createDataFrame([(id_part,item)], membership_schema)
result_df = result_df.union(newRow)
df_writer = DataFrameWriter(result_df)
df_writer.insertInto("my_table_in_hive")
有没有一种更简单、更有效的方法可以将整个文件上传到表格中而无需遍历行?
感谢您的帮助。
【问题讨论】:
【参考方案1】:我查看了上述代码的计划,它似乎扫描了很多,也没有为您提供与 spark 的并行性。 您可以使用 spark 原生方法将文件数据读入更多分区,并控制它们在分区之间均匀分布数据。
df = sc.textFile(file_path,10).map(lambda x: x.split()).map(lambda x :(x[0],x[1].split(","))).toDF(['id','group'])
from pyspark.sql.functions import explode
newdf = df.withColumn("group", explode(df.group))
newdf.write.format("orc").option("header", "true").mode("overwrite").saveAsTable('db.yourHivetable')
您还可以增加或减少要爆炸的分区的大小或控制您的随机分区。
spark.conf.set("spark.sql.files.maxPartitionBytes","30")
spark.conf.set("spark.sql.shuffle.partitions", "100")
【讨论】:
谢谢,这正是我需要的。我不知道爆炸功能,它做我想要的。我尝试了代码,它在不到 15 分钟的时间内处理了我的整个文件,再次感谢。 @Pixeluz。别客气。 :-) 您可以根据数据量增加或减少分区号。这也将帮助您实现更多的并行性。以上是关于如何有效地将大型 .tsv 文件上传到 pyspark 中具有拆分列的 Hive 表?的主要内容,如果未能解决你的问题,请参考以下文章
有效地将许多大型 CSV 文件中的 XYZ 坐标排序到小图块中
如何有效且快速地将大型 (6 Gb) .csv 文件导入 R,而不会导致 R REPL 崩溃?
如何有效地将 hadoop 与大型 MySQL 数据库一起使用?