在 ADLS2 中合并通过 DataBricks 准备的 CSV 文件
Posted
技术标签:
【中文标题】在 ADLS2 中合并通过 DataBricks 准备的 CSV 文件【英文标题】:Merge CSV files in ADLS2 that are prepared through DataBricks 【发布时间】:2020-01-17 07:12:13 【问题描述】:在运行 DataBricks 代码并准备 CSV 文件并将它们加载到 ADLS2 时,CSV 文件被拆分为许多 CSV 文件并正在加载到 ADLS2。
有没有办法通过 pyspark 将这些 CSV 文件合并到 ADLS2 中。
谢谢
【问题讨论】:
【参考方案1】:有没有办法通过 pyspark 将这些 CSV 文件合并到 ADLS2 中。
据我所知,spark 数据框确实单独制作文件。理论上,您可以使用spark.csv method,它可以接受字符串列表作为参数。
>>> df = spark.read.csv('path')
然后使用df.toPandas().to_csv()方法将对象写入pandas
数据帧。您可以参考这个案例的一些线索:Azure Data-bricks : How to read part files and save it as one file to blob?。
但是,我担心这个进程无法承受如此高的内存消耗。所以,我建议你直接使用os
包来做合并工作。我测试了下面2个sn-p的代码供你参考。
第一个:
import os
path = '/dbfs/mnt/test/'
file_suffix = '.csv'
filtered_files = [file for file in files if file.endswith(file_suffix)]
print(filtered_files)
with open(path + 'final.csv', 'w') as final_file:
for file in filtered_files:
with open(file) as f:
lines = f.readlines()
final_file.writelines(lines[1:])
第二次:
import os
path = '/dbfs/mnt/test/'
file_suffix = '.csv'
filtered_files = [os.path.join(root, name) for root, dirs, files in os.walk(top=path , topdown=False) for name in files if name.endswith(file_suffix)]
print(filtered_files)
with open(path + 'final2.csv', 'w') as final_file:
for file in filtered_files:
with open(file) as f:
lines = f.readlines()
final_file.writelines(lines[1:])
第二个是兼容的层次结构。
另外,我在这里提供了一种方法,即使用 ADF 复制活动将多个 csv 文件传输到 ADLS gen2 中的一个文件中。
请参考此doc 并在 ADLS gen2 源数据集中配置文件夹路径。然后使用 copyBehavior 属性设置 MergeFiles。(此外,您可以使用 wildFileName 像 *.csv
来排除您不想要的文件在特定文件夹中触摸)
将源文件夹中的所有文件合并为一个文件。如果文件名 指定时,合并后的文件名就是指定的名称。除此以外, 这是一个自动生成的文件名。
【讨论】:
以上是关于在 ADLS2 中合并通过 DataBricks 准备的 CSV 文件的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Python / Pyspark 在 Databricks 笔记本中合并数据帧
如何使用 Pyspark 在 Databricks 中合并 Hive 表中的记录?
如何处理 Databricks 中不同数据类型的合并模式选项?