Pyspark 从 csv 文件中读取 delta/upsert 数据集
Posted
技术标签:
【中文标题】Pyspark 从 csv 文件中读取 delta/upsert 数据集【英文标题】:Pyspark read delta/upsert dataset from csv files 【发布时间】:2017-06-28 17:27:45 【问题描述】:我有一个定期更新的数据集,我收到的一系列 CSV 文件给出了更改。我想要一个只包含每行最新版本的数据框。有没有办法在 Spark/pyspark 中加载整个数据集以实现并行性?
例子:
文件 1(键、值)
1,ABC
2,DEF
3,GHI
文件 2(键、值)
2,XYZ
4,UVW
文件 3(键、值)
3,JKL
4,MNO
应该导致:
1,ABC
2,XYZ
3,JKL
4,MNO
我知道我可以通过顺序加载每个文件然后使用反连接(踢出被替换的旧值)和联合来做到这一点,但这不会让工作负载并行。
【问题讨论】:
为了更多的并行性你总是可以做一个repartition
spark.apache.org/docs/latest/…
问题是我必须按顺序读取每个文件。
【参考方案1】:
放大@pandaromeo 的回答,这似乎有效...
from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc, input_file_name
# load files, marking each with input file name
df = spark.read.csv(files).withColumn("_ifn", input_file_name())
# use a window function to order the rows for each ID by file name (most recent first)
w = Window.partitionBy(primaryKey).orderBy(desc('_ifn'))
df = df.withColumn("_rn", row_number().over(w))
# grab only the rows that were first (most recent) in each window
# clean up working columns
df = df.where(df._rn == 1).drop("_rn").drop("_ifn")
【讨论】:
【参考方案2】:你可以
from pyspark.sql.functions import *
alls = spark.read.csv("files/*").withColumn('filename', input_file_name())
这将加载目录中的所有文件并允许您对具有文件名的列进行操作。
我假设文件名具有某种时间戳或键,您可以使用 window 和 row_number 函数对其进行区分和排序。
【讨论】:
以上是关于Pyspark 从 csv 文件中读取 delta/upsert 数据集的主要内容,如果未能解决你的问题,请参考以下文章
PYSPARK - 如何读取 S3 中所有子文件夹中的所有 csv 文件?
Pyspark 的 sqlContext.read.csv() 函数读取的行数比实际 .csv 文件中的行数多
使用 pyspark 从 AWS s3 Bucket 读取 csv 时出错