如何使用pyspark流计算csv文件中的条目数

Posted

技术标签:

【中文标题】如何使用pyspark流计算csv文件中的条目数【英文标题】:How to count the number of entries in a csv file using pyspark streaming 【发布时间】:2019-12-24 10:51:02 【问题描述】:

我有一个监视器目录包含多个.csv 文件。我需要在每个即将到来的.csv 文件中计算number of entries。我想在 pyspark 流式传输上下文中执行此操作。 这就是我所做的,

my_DStream = ssc.textFileStream(monitor_Dir)
test = my_DStream.flatMap(process_file)  # process_file function simply process my file. e.g line.split(";")
print(len(test.collect()))

这并没有给我想要的结果。例如 file1.csv 包含 10 条目,file2.csv 包含 18 条目等。所以我需要查看输出

10
18
..
..
etc

如果我有一个单独的静态文件并使用 rdd 操作,我可以执行相同的任务。

【问题讨论】:

【参考方案1】:

如果有人感兴趣,这就是我所做的。

my_DStream = ssc.textFileStream(monitor_Dir)
DStream1 = my_DStream.flatMap(process_file) 
DStream2 = DStream1.filter(lambda x: x[0])
lines_num = DStream2.count() 
lines_num.pprint()

这给了我想要的输出。

【讨论】:

以上是关于如何使用pyspark流计算csv文件中的条目数的主要内容,如果未能解决你的问题,请参考以下文章

如何对 Pyspark spark.sql 数据框中的数据进行同质化

如何在Pyspark中计算或管理流数据?

使用 pyspark 读取多个 csv 文件

PYSPARK - 如何读取 S3 中所有子文件夹中的所有 csv 文件?

如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?

如何从 pyspark 中的本地 jar 导入包?