使用天蓝色存储中的 pyspark 进行计数检查
Posted
技术标签:
【中文标题】使用天蓝色存储中的 pyspark 进行计数检查【英文标题】:count check using pyspark from azure storage 【发布时间】:2021-02-12 07:02:42 【问题描述】:我在 blob 存储中有文件
abc_1.csv
abc_1.ok
def_2.csv
def_2.ok
ghi_3.csv
ghi_3.ok
.csv 包含数据,.ok 包含数据计数,即所有 csv 都有数据,.ok 包含单个 csv 的文件计数。 .ok 文件中的数据为
abc_1_2
def_2_12
gh1_3_56
其中 2,12,56 是 .csv 文件中的文件数。
我的要求是单独计算 csv 文件中的行数,然后将其与最后提到计数的相应 .ok 文件进行比较,如上述语句中所述。
我正在使用以下代码 sn-p 但需要更多帮助:
import fnmatch
import os
basepath = '/mnt/demo/fold1'
count1 = len(fnmatch.filter(os.listdir(basepath), '*.ok'))
print(count1)
count2 = len(fnmatch.filter(os.listdir(basepath), '*.csv'))
print(count2)
【问题讨论】:
【参考方案1】:您可以将*.csv
文件读入数据帧,将*.ok
读入另一个数据帧。获取每个的计数,然后加入文件名。
类似这样的:
from pyspark.sql import functions as F
df_csv = spark.read.csv("path/fold1/*.csv") \
.withColumn("filename", F.input_file_name()) \
.groupby("filename") \
.agg(F.count("*").alias("count_csv"))
df_ok = spark.read.csv("path/fold1/*.ok").toDF("file_count") \
.withColumn("filename", F.regexp_replace(F.input_file_name(), "\\.ok", ".csv")) \
.select("filename", F.substring_index("file_count", "_", -1).alias("count_ok"))
df_compare = df_csv.join(df_ok, ["filename"])
df_compare.show()
【讨论】:
我想在这里我必须单独提供文件名,但是我需要有一个循环,其中将继续考虑 .csv 文件和 .ok 文件并记录计数检查跨度> df_csv = spark.read.csv("/dbfs/mnt/abc/20201111/*.csv") 当我这样做时,它会向我抛出一个错误,说明路径不存在,请您帮忙 @PradyotMohanty 尝试使用路径dbfs:/mnt/abc/20201111/*.csv
*.ok 中数据的真实格式是 abc_1_2,其中直到 abc_1 是与相应 csv 文件名相同的文件名,最后一部分是 _2,或者 _12 或 _56 是总数实际 csv 文件中的记录,即 abc_1.csv,def_2.csv
让我们continue this discussion in chat.以上是关于使用天蓝色存储中的 pyspark 进行计数检查的主要内容,如果未能解决你的问题,请参考以下文章