如何使用 Python 将 blob 数据移动到 Snowflake

Posted

技术标签:

【中文标题】如何使用 Python 将 blob 数据移动到 Snowflake【英文标题】:How to move a blob data to Snowflake thru Python 【发布时间】:2021-08-19 07:55:38 【问题描述】:

我正在尝试将数据从 ADLS blob 移动到 Snowflake 表。

我可以对 UI 做同样的事情。

用户界面遵循的步骤:

生成了以下 SAS 令牌:

sp=rl&st=2021-06-01T05:45:37Z&se=2021-06-01T13:45:37Z&spr=https&sv=2020-02-10&sr=c&sig=rYYY4o%2YY3jj%2XXXXXAB%2Bo8ygrtyAVCnPOxomlOc%3D

能够在 Snowflake Web UI 中使用上述令牌加载表格:

copy into FIRST_LEVEL.MOVIES
  from 'azure://adlsedmadifpoc.blob.core.windows.net/airflow-dif/raw-area/'
  credentials=(azure_sas_token='sp=rl&st=2021-06-01T05:45:37Z&se=2021-06-01T13:45:37Z&spr=https&sv=2020-02-10&sr=c&sig=rYYY4o%2YY3jj%2XXXXXAB%2Bo8ygrtyAVCnPOxomlOc%3D')
   FORCE = TRUE file_format = (TYPE = CSV);

我正在尝试用 Python 做同样的事情:

from azure.storage.blob import BlobServiceClient,generate_blob_sas,BlobSasPermissions
from datetime import datetime,timedelta
import snowflake.connector

def generate_sas_token(file_name):

    sas = generate_blob_sas(account_name="xxxx",
account_key="p5V2GELxxxxQ4tVgLdj9inKwwYWlAnYpKtGHAg==", container_name="airflow-dif",blob_name=file_name,permission=BlobSasPermissions(read=True),
expiry=datetime.utcnow() + timedelta(hours=2))
    print (sas)
    return sas

sas = generate_sas_token("raw-area/moviesDB.csv")

# Connectio string

conn = snowflake.connector.connect(user='xx',password='xx@123',account='xx.southeast-asia.azure',database='xx')

# Create cursor

cur = conn.cursor()
cur.execute(
            f"copy into FIRST_LEVEL.MOVIES FROM  'azure://xxx.blob.core.windows.net/airflow-dif/raw-area/moviesDB.csv'   credentials=(azure_sas_token='sas')  file_format = (TYPE = CSV) ;")
cur.execute(f" Commit  ;")
# Execute SQL statement
cur.close()
conn.close()

代码中生成的 SAS 令牌:

se=2021-06-01T07%3A42%3A11Z&sp=rt&sv=2020-06-12&sr=b&sig=ZhZMPSI%yyyyAPTqqE0%3D

通过 python 生成 sas 令牌时,我无法使用 List 权限。

我面临以下错误:

    cursor=cursor,
snowflake.connector.errors.ProgrammingError: 091003 (22000): Failure using stage area. Cause: [Server failed to authenticate the request. Make sure the value of Authorization header is formed correctly including the signature. (Status Code: 403; Error Code: AuthenticationFailed)]

我将来可能在那个文件夹中有 csv 文件的列表。

任何帮助表示赞赏。谢谢。

【问题讨论】:

如果要为 blob 创建 SAS 令牌,则不需要 List 权限。如果要为 blob 容器创建 SAS 令牌并且需要列出该 blob 容器中的 blob,则需要它。 Read 权限应该足以读取 blob 或获取其属性。 我没有获得任何添加列表权限的选项。 BlobSasPermissions(read=True, list=True) 不工作 正如我所说,为 blob 创建 SAS 令牌时,您不能拥有列表权限。您可以使用ContainerSasPermissions 为容器创建一个 SAS 令牌,该令牌将具有列表权限。 【参考方案1】:

以下代码有效:

from azure.storage.blob import generate_container_sas, ContainerSasPermissions
from datetime import datetime,timedelta
import snowflake.connector

def get_sas_token():
    container_sas_token = generate_container_sas(
        account_name = 'XX',
        account_key = 'p5V2GEL3AqGuPMMYXXXQ4tVgLdj9inKwwYWlAnYpKtGHAg==',
        container_name = 'airflow-dif',
        permission=ContainerSasPermissions(read=True,list=True),
        expiry=datetime.utcnow() + timedelta(hours=1)
    )
    print (container_sas_token)
    return container_sas_token

sas = get_sas_token()

# Connectio string

conn = snowflake.connector.connect(user='XX',password='XX@123',account='XX.southeast-asia.azure',database='XX')

# Create cursor

cur = conn.cursor()
cur.execute(
            f"copy into FIRST_LEVEL.MOVIES FROM  'azure://XX.blob.core.windows.net/airflow-dif/raw-area/'   credentials=(azure_sas_token='sas')  FORCE = TRUE file_format = (TYPE = CSV) ;")

    
print (cur.fetchone())
cur.execute(f" Commit  ;")

# Execute SQL statement

cur.close()
conn.close()

感谢 Gaurav 的投入。

【讨论】:

以上是关于如何使用 Python 将 blob 数据移动到 Snowflake的主要内容,如果未能解决你的问题,请参考以下文章

Azure 数据湖:将数据从 Blob 移动到 ADLS 时面临问题

如何有效地将大数据从数据中心移动到 Azure Blob 存储,以便以后通过 HDInsight 进行处理?

将数据从数据库移动到 Azure Blob 存储

如何通过 python 将 JSON 数据附加到存储在 Azure blob 存储中的现有 JSON 文件?

如何使用 AJAX 和 jQuery 将 blob 音频发布到 Python 服务器?

当文件上传到 azure 文件共享时,如何添加触发器以将文件从 azure 文件共享移动到 azure blob?