在 ADLS2 中合并通过 DataBricks 准备的 CSV 文件

Posted

技术标签:

【中文标题】在 ADLS2 中合并通过 DataBricks 准备的 CSV 文件【英文标题】:Merge CSV files in ADLS2 that are prepared through DataBricks 【发布时间】:2020-01-17 07:12:13 【问题描述】:

在运行 DataBricks 代码并准备 CSV 文件并将它们加载到 ADLS2 时,CSV 文件被拆分为许多 CSV 文件并正在加载到 ADLS2。

有没有办法通过 pyspark 将这些 CSV 文件合并到 ADLS2 中。

谢谢

【问题讨论】:

【参考方案1】:

有没有办法通过 pyspark 将这些 CSV 文件合并到 ADLS2 中。

据我所知,spark 数据框确实单独制作文件。理论上,您可以使用spark.csv method,它可以接受字符串列表作为参数。

>>> df = spark.read.csv('path')

然后使用df.toPandas().to_csv()方法将对象写入pandas数据帧。您可以参考这个案例的一些线索:Azure Data-bricks : How to read part files and save it as one file to blob?。

但是,我担心这个进程无法承受如此高的内存消耗。所以,我建议你直接使用os包来做合并工作。我测试了下面2个sn-p的代码供你参考。

第一个:

import os

path = '/dbfs/mnt/test/'
file_suffix = '.csv'
filtered_files = [file for file in files if file.endswith(file_suffix)]
print(filtered_files)

with open(path + 'final.csv', 'w') as final_file:
    for file in filtered_files:
        with open(file) as f:
            lines = f.readlines()
            final_file.writelines(lines[1:])

第二次:

import os

path = '/dbfs/mnt/test/'
file_suffix = '.csv'

filtered_files = [os.path.join(root, name) for root, dirs, files in os.walk(top=path , topdown=False) for name in files if name.endswith(file_suffix)]
print(filtered_files)

with open(path + 'final2.csv', 'w') as final_file:
    for file in filtered_files:
        with open(file) as f:
            lines = f.readlines()
            final_file.writelines(lines[1:])

第二个是兼容的层次结构。


另外,我在这里提供了一种方法,即使用 ADF 复制活动将多个 csv 文件传输到 ADLS gen2 中的一个文件中。

请参考此doc 并在 ADLS gen2 源数据集中配置文件夹路径。然后使用 copyBehavior 属性设置 MergeFiles。(此外,您可以使用 wildFileName 像 *.csv 来排除您不想要的文件在特定文件夹中触摸)

将源文件夹中的所有文件合并为一个文件。如果文件名 指定时,合并后的文件名就是指定的名称。除此以外, 这是一个自动生成的文件名。

【讨论】:

以上是关于在 ADLS2 中合并通过 DataBricks 准备的 CSV 文件的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Python / Pyspark 在 Databricks 笔记本中合并数据帧

如何使用 Pyspark 在 Databricks 中合并 Hive 表中的记录?

如何处理 Databricks 中不同数据类型的合并模式选项?

Databricks:将数据框合并到 Azure 突触表中

通过 Databricks 笔记本更改表表名 CONCATENATE 错误

Azure 数据湖一代。 2 (adls2), api 获取存储在数据湖中的数据的整体大小