使用天蓝色存储中的 pyspark 进行计数检查

Posted

技术标签:

【中文标题】使用天蓝色存储中的 pyspark 进行计数检查【英文标题】:count check using pyspark from azure storage 【发布时间】:2021-02-12 07:02:42 【问题描述】:

我在 blob 存储中有文件

abc_1.csv
abc_1.ok
def_2.csv
def_2.ok
ghi_3.csv
ghi_3.ok  

.csv 包含数据,.ok 包含数据计数,即所有 csv 都有数据,.ok 包含单个 csv 的文件计数。 .ok 文件中的数据为

abc_1_2
def_2_12
gh1_3_56

其中 2,12,56 是 .csv 文件中的文件数。

我的要求是单独计算 csv 文件中的行数,然后将其与最后提到计数的相应 .ok 文件进行比较,如上述语句中所述。

我正在使用以下代码 sn-p 但需要更多帮助:

import fnmatch
import os
basepath = '/mnt/demo/fold1'
count1 = len(fnmatch.filter(os.listdir(basepath), '*.ok'))
print(count1)
count2 = len(fnmatch.filter(os.listdir(basepath), '*.csv'))
print(count2)

【问题讨论】:

【参考方案1】:

您可以将*.csv 文件读入数据帧,将*.ok 读入另一个数据帧。获取每个的计数,然后加入文件名。

类似这样的:

from pyspark.sql import functions as F

df_csv = spark.read.csv("path/fold1/*.csv") \
    .withColumn("filename", F.input_file_name()) \
    .groupby("filename") \
    .agg(F.count("*").alias("count_csv"))

df_ok = spark.read.csv("path/fold1/*.ok").toDF("file_count") \
    .withColumn("filename", F.regexp_replace(F.input_file_name(), "\\.ok", ".csv")) \
    .select("filename", F.substring_index("file_count", "_", -1).alias("count_ok"))


df_compare = df_csv.join(df_ok, ["filename"])
df_compare.show()

【讨论】:

我想在这里我必须单独提供文件名,但是我需要有一个循环,其中将继续考虑 .csv 文件和 .ok 文件并记录计数检查跨度> df_csv = spark.read.csv("/dbfs/mnt/abc/20201111/*.csv") 当我这样做时,它会向我抛出一个错误,说明路径不存在,请您帮忙 @PradyotMohanty 尝试使用路径 dbfs:/mnt/abc/20201111/*.csv *.ok 中数据的真实格式是 abc_1_2,其中直到 abc_1 是与相应 csv 文件名相同的文件名,最后一部分是 _2,或者 _12 或 _56 是总数实际 csv 文件中的记录,即 abc_1.csv,def_2.csv 让我们continue this discussion in chat.

以上是关于使用天蓝色存储中的 pyspark 进行计数检查的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark:通过检查值是不是存在来聚合数据(不是计数或总和)

天蓝色存储队列中的最大出队数

从集群将整数/字符串写入 pyspark 中的文本文件

PySpark 中的二元计数

如何在 Pyspark 中以编程方式使用“计数”?

LeetCode——分类颜色