改进此pyspark连接的最佳方法

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了改进此pyspark连接的最佳方法相关的知识,希望对你有一定的参考价值。

我需要合并250个csv文件,但是这种方法真的很慢,还有其他方法吗?

df1 = spark.read.csv("/Users/mac/Desktop/A.csv", header=True, sep=",", inferSchema = True)
df2 = spark.read.csv("/Users/mac/Desktop/B.csv", header=True, sep=",", inferSchema = True)
df3 = spark.read.csv("/Users/mac/Desktop/C.csv", header=True, sep=",", inferSchema = True)
df4 = spark.read.csv("/Users/mac/Desktop/D.csv", header=True, sep=",", inferSchema = True)
df5 = spark.read.csv("/Users/mac/Desktop/E.csv", header=True, sep=",", inferSchema = True)
df6 = spark.read.csv("/Users/mac/Desktop/F.csv", header=True, sep=",", inferSchema = True)
df7 = spark.read.csv("/Users/mac/Desktop/G.csv", header=True, sep=",", inferSchema = True)


dfs = [df1,df2,df3,df4, df5,df6,df7]
df = reduce(DataFrame.unionAll, dfs)

我尝试过:

dfs = []
for i in os.listdir('/Users/mac/Desktop/'):
    if i != ".DS_Store":
        dfs.append(f"spark.read.csv(i, header=True, sep=',', inferSchema=True")



此代码以字符串形式返回:

["spark.read.csv(A.csv, header=True, sep=',', inferSchema=True",
 "spark.read.csv(B.csv, header=True, sep=',', inferSchema=True",
 "spark.read.csv(C.csv, header=True, sep=',', inferSchema=True",
 "spark.read.csv(D.csv, header=True, sep=',', inferSchema=True",

我正在寻找此输出:

[DataFrame[identifier: string, identifier_type: string, timestamp: string, time_zone_name: string, device_lat: double, device_lon: double, country_short: string, province_short: string, ip_address: string, device_horizontal_accuracy: double, source_id: string, record_id: string],
 DataFrame[3660506e-fbe6-4575-8e04-7ac9e09006c3: string, gaid: string, 2020-01-29 05:56:29 UTC: string, America/Matamoros: string, 25.531531: double, -103.39389: double, MX: string, MX.07: string, _c8: string, 4382: double, _c10: string, 93733528-93d4-4774-8676-529b8af32646: string],
答案

我们可以一次从目录中读取所有csv个文件。

尝试使用

df=spark.read.csv("/Users/mac/Desktop/*.csv", header=True, sep=",", inferSchema = True)

以上是关于改进此pyspark连接的最佳方法的主要内容,如果未能解决你的问题,请参考以下文章

避免在 pyspark 代码中使用 collect() 函数的最佳方法是啥?编写优化pyspark代码的最佳方法?

在 pyspark 中转换或处理日期数据类型的最佳方法是啥

在 pyspark 中显示数据框不同值的最佳方法是啥?

使用 pyspark 检查 dbfs 中镶木地板表长度的最佳方法?

在 Python/PySpark 中 Spark 复制数据框列的最佳实践?

在pyspark中使用基于DataFrame的API在2个sparseVectors列表之间进行矩阵乘法的最佳方法是啥?