如何使用 Python 将 blob 数据移动到 Snowflake
Posted
技术标签:
【中文标题】如何使用 Python 将 blob 数据移动到 Snowflake【英文标题】:How to move a blob data to Snowflake thru Python 【发布时间】:2021-08-19 07:55:38 【问题描述】:我正在尝试将数据从 ADLS blob 移动到 Snowflake 表。
我可以对 UI 做同样的事情。
用户界面遵循的步骤:
生成了以下 SAS 令牌:
sp=rl&st=2021-06-01T05:45:37Z&se=2021-06-01T13:45:37Z&spr=https&sv=2020-02-10&sr=c&sig=rYYY4o%2YY3jj%2XXXXXAB%2Bo8ygrtyAVCnPOxomlOc%3D
能够在 Snowflake Web UI 中使用上述令牌加载表格:
copy into FIRST_LEVEL.MOVIES
from 'azure://adlsedmadifpoc.blob.core.windows.net/airflow-dif/raw-area/'
credentials=(azure_sas_token='sp=rl&st=2021-06-01T05:45:37Z&se=2021-06-01T13:45:37Z&spr=https&sv=2020-02-10&sr=c&sig=rYYY4o%2YY3jj%2XXXXXAB%2Bo8ygrtyAVCnPOxomlOc%3D')
FORCE = TRUE file_format = (TYPE = CSV);
我正在尝试用 Python 做同样的事情:
from azure.storage.blob import BlobServiceClient,generate_blob_sas,BlobSasPermissions
from datetime import datetime,timedelta
import snowflake.connector
def generate_sas_token(file_name):
sas = generate_blob_sas(account_name="xxxx",
account_key="p5V2GELxxxxQ4tVgLdj9inKwwYWlAnYpKtGHAg==", container_name="airflow-dif",blob_name=file_name,permission=BlobSasPermissions(read=True),
expiry=datetime.utcnow() + timedelta(hours=2))
print (sas)
return sas
sas = generate_sas_token("raw-area/moviesDB.csv")
# Connectio string
conn = snowflake.connector.connect(user='xx',password='xx@123',account='xx.southeast-asia.azure',database='xx')
# Create cursor
cur = conn.cursor()
cur.execute(
f"copy into FIRST_LEVEL.MOVIES FROM 'azure://xxx.blob.core.windows.net/airflow-dif/raw-area/moviesDB.csv' credentials=(azure_sas_token='sas') file_format = (TYPE = CSV) ;")
cur.execute(f" Commit ;")
# Execute SQL statement
cur.close()
conn.close()
代码中生成的 SAS 令牌:
se=2021-06-01T07%3A42%3A11Z&sp=rt&sv=2020-06-12&sr=b&sig=ZhZMPSI%yyyyAPTqqE0%3D
通过 python 生成 sas 令牌时,我无法使用 List 权限。
我面临以下错误:
cursor=cursor,
snowflake.connector.errors.ProgrammingError: 091003 (22000): Failure using stage area. Cause: [Server failed to authenticate the request. Make sure the value of Authorization header is formed correctly including the signature. (Status Code: 403; Error Code: AuthenticationFailed)]
我将来可能在那个文件夹中有 csv 文件的列表。
任何帮助表示赞赏。谢谢。
【问题讨论】:
如果要为 blob 创建 SAS 令牌,则不需要List
权限。如果要为 blob 容器创建 SAS 令牌并且需要列出该 blob 容器中的 blob,则需要它。 Read
权限应该足以读取 blob 或获取其属性。
我没有获得任何添加列表权限的选项。 BlobSasPermissions(read=True, list=True) 不工作
正如我所说,为 blob 创建 SAS 令牌时,您不能拥有列表权限。您可以使用ContainerSasPermissions
为容器创建一个 SAS 令牌,该令牌将具有列表权限。
【参考方案1】:
以下代码有效:
from azure.storage.blob import generate_container_sas, ContainerSasPermissions
from datetime import datetime,timedelta
import snowflake.connector
def get_sas_token():
container_sas_token = generate_container_sas(
account_name = 'XX',
account_key = 'p5V2GEL3AqGuPMMYXXXQ4tVgLdj9inKwwYWlAnYpKtGHAg==',
container_name = 'airflow-dif',
permission=ContainerSasPermissions(read=True,list=True),
expiry=datetime.utcnow() + timedelta(hours=1)
)
print (container_sas_token)
return container_sas_token
sas = get_sas_token()
# Connectio string
conn = snowflake.connector.connect(user='XX',password='XX@123',account='XX.southeast-asia.azure',database='XX')
# Create cursor
cur = conn.cursor()
cur.execute(
f"copy into FIRST_LEVEL.MOVIES FROM 'azure://XX.blob.core.windows.net/airflow-dif/raw-area/' credentials=(azure_sas_token='sas') FORCE = TRUE file_format = (TYPE = CSV) ;")
print (cur.fetchone())
cur.execute(f" Commit ;")
# Execute SQL statement
cur.close()
conn.close()
感谢 Gaurav 的投入。
【讨论】:
以上是关于如何使用 Python 将 blob 数据移动到 Snowflake的主要内容,如果未能解决你的问题,请参考以下文章
Azure 数据湖:将数据从 Blob 移动到 ADLS 时面临问题
如何有效地将大数据从数据中心移动到 Azure Blob 存储,以便以后通过 HDInsight 进行处理?
如何通过 python 将 JSON 数据附加到存储在 Azure blob 存储中的现有 JSON 文件?