Pyspark 从 csv 文件中读取 delta/upsert 数据集

Posted

技术标签:

【中文标题】Pyspark 从 csv 文件中读取 delta/upsert 数据集【英文标题】:Pyspark read delta/upsert dataset from csv files 【发布时间】:2017-06-28 17:27:45 【问题描述】:

我有一个定期更新的数据集,我收到的一系列 CSV 文件给出了更改。我想要一个只包含每行最新版本的数据框。有没有办法在 Spark/pyspark 中加载整个数据集以实现并行性?

例子:

文件 1(键、值) 1,ABC 2,DEF 3,GHI 文件 2(键、值) 2,XYZ 4,UVW 文件 3(键、值) 3,JKL 4,MNO

应该导致: 1,ABC 2,XYZ 3,JKL 4,MNO

我知道我可以通过顺序加载每个文件然后使用反连接(踢出被替换的旧值)和联合来做到这一点,但这不会让工作负载并行。

【问题讨论】:

为了更多的并行性你总是可以做一个repartitionspark.apache.org/docs/latest/… 问题是我必须按顺序读取每个文件。 【参考方案1】:

放大@pandaromeo 的回答,这似乎有效...

from pyspark.sql import Window
from pyspark.sql.functions import row_number, desc, input_file_name


# load files, marking each with input file name
df = spark.read.csv(files).withColumn("_ifn", input_file_name())

# use a window function to order the rows for each ID by file name (most recent first)
w = Window.partitionBy(primaryKey).orderBy(desc('_ifn'))
df = df.withColumn("_rn", row_number().over(w))

# grab only the rows that were first (most recent) in each window
# clean up working columns
df = df.where(df._rn == 1).drop("_rn").drop("_ifn")

【讨论】:

【参考方案2】:

你可以

from pyspark.sql.functions import * 
alls = spark.read.csv("files/*").withColumn('filename', input_file_name())

这将加载目录中的所有文件并允许您对具有文件名的列进行操作。

我假设文件名具有某种时间戳或键,您可以使用 window 和 row_number 函数对其进行区分和排序。

【讨论】:

以上是关于Pyspark 从 csv 文件中读取 delta/upsert 数据集的主要内容,如果未能解决你的问题,请参考以下文章

pyspark用正则表达式读取csv文件

PYSPARK - 如何读取 S3 中所有子文件夹中的所有 csv 文件?

Pyspark 的 sqlContext.read.csv() 函数读取的行数比实际 .csv 文件中的行数多

使用 pyspark 从 AWS s3 Bucket 读取 csv 时出错

PySpark 无法从 hdfs 读取 csv:HiveExternalCatalog 错误

无法从 S3 读取 csv 到 AWS 上 EC2 实例上的 pyspark 数据帧