如何在 databricks 工作区中使用 python 获取 azure datalake 存储中存在的每个文件的最后修改时间?

Posted

技术标签:

【中文标题】如何在 databricks 工作区中使用 python 获取 azure datalake 存储中存在的每个文件的最后修改时间?【英文标题】:How to get the last modification time of each files present in azure datalake storage using python in databricks workspace? 【发布时间】:2019-10-04 13:24:07 【问题描述】:

我正在尝试获取 azure 数据湖中存在的每个文件的最后修改时间。

files = dbutils.fs.ls('/mnt/blob')

对于文件中的 fi: 打印(fi)

输出:-FileInfo(path='dbfs:/mnt/blob/rule_sheet_recon.xlsx', name='rule_sheet_recon.xlsx', size=10843)

这里我无法获取文件的最后修改时间。有没有办法获得该属性。

我尝试在 shell 命令下面查看属性,但无法将其存储在 python 对象中。

%sh ls -ls /dbfs/mnt/blob/

输出:- 共 0

0 -rw-r--r-- 1 根根 13577 Sep 20 10:50 a.txt

0 -rw-r--r-- 1 根根 10843 Sep 20 10:50 b.txt

【问题讨论】:

【参考方案1】:

我们没有直接的方法来获取这些细节。但是我们根据以下简单的 python 代码得到了这些细节。

示例:考虑一下,您想获取 adls 路径中的所有子目录和文件 container_name/container-Second --- 您可以使用以下代码

from pyspark.sql.functions import col
from azure.storage.blob import BlockBlobService
from datetime import datetime
import os.path

block_blob_service = BlockBlobService(account_name='account-name', account_key='account-key')
container_name ='container-firstname'
second_conatainer_name ='container-Second'
#block_blob_service.create_container(container_name)
generator = block_blob_service.list_blobs(container_name,prefix="Recovery/")
report_time = datetime.now().strftime('%Y-%m-%d %H:%M:%S')


for blob in generator:
    length = BlockBlobService.get_blob_properties(block_blob_service,container_name,blob.name).properties.content_length
    last_modified = BlockBlobService.get_blob_properties(block_blob_service,container_name,blob.name).properties.last_modified
    file_size = BlockBlobService.get_blob_properties(block_blob_service,container_name,blob.name).properties.content_length
    line = container_name+'|'+second_conatainer_name+'|'+blob.name+'|'+ str(file_size) +'|'+str(last_modified)+'|'+str(report_time)
    print(line)

【讨论】:

【参考方案2】:

我们可以使用os 包来获取信息。例如在 pyspark 中

import os

def get_filemtime(filename):
  return os.path.getmtime(filename)

您可以传递文件名的绝对路径,例如dbfs:/mnt/adls/logs/ehub/app/0/2021/07/21/15/05/40.avro

【讨论】:

【参考方案3】:

如果您已经配置了 DataBricks 集群,则可以避免使用帐户密钥和存储 blob 编码等。

import time

path = spark._jvm.org.apache.hadoop.fs.Path
sc = spark.SparkContext
fs = path('abfss://yourbucket@yourstorageacct.dfs.core.windows.net/').getFileSystem(sc._jsc.hadoopConfiguration())

res = fs.listFiles(path('abfss://yourbucket@yourstorageacct.dfs.core.windows.net/your/storage/path'), True)

while res.hasNext():
  file = res.next()
  localTime = time.strftime("%Y-%m-%d %H:%M:%S", time.localtime(file.getModificationTime() / 1000))
  print(f"file.getPath(): localTime")

【讨论】:

以上是关于如何在 databricks 工作区中使用 python 获取 azure datalake 存储中存在的每个文件的最后修改时间?的主要内容,如果未能解决你的问题,请参考以下文章

如何强制 Azure 数据工厂数据流使用 Databricks

如何使用 Azure databricks 通过 ADLS gen 2 中的多个工作表读取和写入 excel 数据

如何在 Spark Databricks 中注册 SQL 函数

Databricks 在没有进展的情况下激发工作

通过 Python 中的 Databricks api 读取 Databricks 表?

如何提高 Databricks 性能?