使用 Python 或 Java 从本地将数据上传到 Azure ADLS Gen2

Posted

技术标签:

【中文标题】使用 Python 或 Java 从本地将数据上传到 Azure ADLS Gen2【英文标题】:Upload data to the Azure ADLS Gen2 from on-premise using Python or Java 【发布时间】:2019-12-09 02:32:18 【问题描述】:

我有一个使用 Data Lake Gen2 的 Azure 存储帐户。我想使用 Python(或 Java)将数据从本地上传到 Lake Gen2 文件系统。

我找到了examples,了解如何与存储帐户中的文件共享进行交互,但我还没有找到如何上传到 Lake(而不是文件共享)。我还发现了如何为 Gen1 Lakes here 做这件事,但除了为 Gen2 关闭 requests 之外什么都没有。

我的问题是,到今天为止,Python 是否也能做到这一点;或者,如何使用 Java 将文件上传到 Gen2 Lake?非常感谢您提供演示上传 API 调用的代码 sn-p。

【问题讨论】:

【参考方案1】:

根据官方教程Quickstart: Upload, download, and list blobs with Python,如下所示,如果您还没有注册multi-protocol access on Data Lake Storage的公共预览版,则不能直接使用Azure Storage SDK for Python在Azure Data Lake Store Gen 2中进行任何操作。

注意

只有在注册multi-protocol access on Data Lake Storage 的公共预览版后,具有分层命名空间的帐户才能使用本文中描述的功能。要查看限制,请参阅已知问题文章。

所以向 ADLS Gen2 上传数据的唯一解决方案是使用 ADLS Gen2 的 REST API,请参考其参考资料Azure Data Lake Store REST API

这是我在 Python 中将数据上传到 ADLS Gen2 的示例代码,它运行良好。

import requests
import json

def auth(tenant_id, client_id, client_secret):
    print('auth')
    auth_headers = 
        "Content-Type": "application/x-www-form-urlencoded"
    
    auth_body = 
        "client_id": client_id,
        "client_secret": client_secret,
        "scope" : "https://storage.azure.com/.default",
        "grant_type" : "client_credentials"
    
    resp = requests.post(f"https://login.microsoftonline.com/tenant_id/oauth2/v2.0/token", headers=auth_headers, data=auth_body)
    return (resp.status_code, json.loads(resp.text))

def mkfs(account_name, fs_name, access_token):
    print('mkfs')
    fs_headers = 
        "Authorization": f"Bearer access_token"
    
    resp = requests.put(f"https://account_name.dfs.core.windows.net/fs_name?resource=filesystem", headers=fs_headers)
    return (resp.status_code, resp.text)

def mkdir(account_name, fs_name, dir_name, access_token):
    print('mkdir')
    dir_headers = 
        "Authorization": f"Bearer access_token"
    
    resp = requests.put(f"https://account_name.dfs.core.windows.net/fs_name/dir_name?resource=directory", headers=dir_headers)
    return (resp.status_code, resp.text)
    
def touch_file(account_name, fs_name, dir_name, file_name, access_token):
    print('touch_file')
    touch_file_headers = 
        "Authorization": f"Bearer access_token"
    
    resp = requests.put(f"https://account_name.dfs.core.windows.net/fs_name/dir_name/file_name?resource=file", headers=touch_file_headers)
    return (resp.status_code, resp.text)

def append_file(account_name, fs_name, path, content, position, access_token):
    print('append_file')
    append_file_headers = 
        "Authorization": f"Bearer access_token",
        "Content-Type": "text/plain",
        "Content-Length": f"len(content)"
    
    resp = requests.patch(f"https://account_name.dfs.core.windows.net/fs_name/path?action=append&position=position", headers=append_file_headers, data=content)
    return (resp.status_code, resp.text)
    
def flush_file(account_name, fs_name, path, position, access_token):
    print('flush_file')
    flush_file_headers = 
        "Authorization": f"Bearer access_token"
    
    resp = requests.patch(f"https://account_name.dfs.core.windows.net/fs_name/path?action=flush&position=position", headers=flush_file_headers)
    return (resp.status_code, resp.text)

def mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token):
    print('mkfile')
    status_code, result = touch_file(account_name, fs_name, dir_name, file_name, access_token)
    if status_code == 201:
        with open(local_file_name, 'rb') as local_file:
            path = f"dir_name/file_name"
            content = local_file.read()
            position = 0
            append_file(account_name, fs_name, path, content, position, access_token)
            position = len(content)
            flush_file(account_name, fs_name, path, position, access_token)
    else:
        print(result)
        
    
if __name__ == '__main__':
    tenant_id = '<your tenant id>'
    client_id = '<your client id>'
    client_secret = '<your client secret>'
    
    account_name = '<your adls account name>'
    fs_name = '<your filesystem name>'
    dir_name = '<your directory name>'
    file_name = '<your file name>'
    local_file_name = '<your local file name>'
    
    # Acquire an Access token
    auth_status_code, auth_result = auth(tenant_id, client_id, client_secret)
    access_token = auth_status_code == 200 and auth_result['access_token'] or ''
    print(access_token)
    
    # Create a filesystem
    mkfs_status_code, mkfs_result = mkfs(account_name, fs_name, access_token)
    print(mkfs_status_code, mkfs_result)
    
    # Create a directory
    mkdir_status_code, mkdir_result = mkdir(account_name, fs_name, dir_name, access_token)
    print(mkdir_status_code, mkdir_result)
    
    # Create a file from local file
    mkfile(account_name, fs_name, dir_name, file_name, local_file_name, access_token)

希望对你有帮助。

【讨论】:

是否也可以进行批量上传?例如,如果我有一个必须上传的文件的整个目录,或者一个本地路径列表? 嗨,这也可以使用现有的 SAS 令牌吗?所以我不必将存储帐户的帐户密钥放在代码中

以上是关于使用 Python 或 Java 从本地将数据上传到 Azure ADLS Gen2的主要内容,如果未能解决你的问题,请参考以下文章

从请求将文件上传到云存储而不在本地保存

如何把python代码上传到服务器上

是否可以在通过表单上传之前预览本地图像?

如何使用 SQL 开发人员将任何文件从本地机器上传到数据库服务器文件系统

python - 使用 matplotlib 和 boto 将绘图从内存上传到 s3

使用 java 将文件从 Gdrive 下载到本地系统