使用 Python 脚本的 ADF 管道中的 Azure 函数

Posted

技术标签:

【中文标题】使用 Python 脚本的 ADF 管道中的 Azure 函数【英文标题】:Azure function in ADF pipeline using a Python script 【发布时间】:2021-12-24 15:48:06 【问题描述】:

我正在尝试在管道中的 Azure 数据工厂中运行我的以下脚本。我的 Python 代码从 Blob 存储中检索 2 个 CSV 文件,并根据密钥将它们合并到一个文件中,然后将其上传到数据湖存储中。我尝试过使用功能应用程序块,它给了我 InternalServerError,我还尝试了运行没有错误的 Web 活动。问题是我运行管道时没有创建文件,即使管道成功运行(使用 Web 块)。当我调用主函数并且在数据湖存储中创建文件时,该函数也会在本地运行。我在 VS Code 中也尝试过 http 触发器和持久函数,但它们都没有在 Azure 中创建“merged.csv”文件。

我的 Python 脚本(init.py):

import pandas as pd
import logging
from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import DataLakeServiceClient
import azure.functions as func


def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    STORAGEACCOUNTURL= 'https://storage.blob.core.windows.net/'
    STORAGEACCOUNTKEY= '****'
    LOCALFILENAME= ['file1.csv', 'file2.csv']
    CONTAINERNAME= 'inputblob'

    file1 = pd.DataFrame()
    file2 = pd.DataFrame()
    #download from blob

    blob_service_client_instance = BlobServiceClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY)

    for i in LOCALFILENAME:
        with open(i, "wb") as my_blobs:
            blob_client_instance = blob_service_client_instance.get_blob_client(container=CONTAINERNAME, blob=i, snapshot=None)
            blob_data = blob_client_instance.download_blob()
            blob_data.readinto(my_blobs)
            if i == 'file1.csv':
                file1 = pd.read_csv(i)
            if i == 'file2.csv':
                file2 = pd.read_csv(i)
    
    # load

  
    summary = pd.merge(left=file1, right=file2, on='key', how='inner')
        
    summary.to_csv()

    global service_client
            
    service_client = DataLakeServiceClient(account_url="https://storage.dfs.core.windows.net/", credential='****')
        
    file_system_client = service_client.get_file_system_client(file_system="outputdatalake")

    directory_client = file_system_client.get_directory_client("functionapp") 

    file_client = directory_client.create_file("merged.csv") 

    file_contents = summary.to_csv()

    file_client.upload_data(file_contents, overwrite=True) 

    return("This HTTP triggered function executed successfully.")

我的 JSON 文件(function.json):


  "scriptFile": "__init__.py",
  "bindings": [
    
      "authLevel": "function",
      "type": "httpTrigger",
      "direction": "in",
      "name": "req",
      "methods": [
        "get",
        "post"
      ]
    ,
    
      "type": "http",
      "direction": "out",
      "name": "$return"
    
  ]

【问题讨论】:

既然你说它在本地运行,那么当它在 azure 上运行时存在一些权限或配置问题,请尝试在代码中添加 try...except 块以记录正确的错误消息。 您可以查看该功能的日志/应用洞察,看看您的代码/访问存储帐户是否有任何错误。您甚至可以仅使用数据工厂来合并 CSV 文件,例如 docs.microsoft.com/en-us/answers/questions/542994/… 【参考方案1】:

我能想到的有 2 个原因可能是您的问题的原因。

A - 检查您的 requirements.txt。你所有的python库都应该在那里。它应该看起来像这样。

azure-functions
pandas==1.3.4
azure-storage-blob==12.9.0
azure-storage-file-datalake==12.5.0

B - 接下来,您似乎正在将文件写入 Functions 工作内存。这是不允许的,也是完全没有必要的。这可以解释为什么它在您的本地机器上运行,但在 Azure 中却不行。你可以不这样做就实现你想要的。请参阅下面的代码部分,它应该可以满足您的目的。我们将 csv 从 blob 加载到数据帧的方式略有变化。

import pandas as pd
import logging
from azure.storage.blob import BlobServiceClient
from azure.storage.filedatalake import DataLakeServiceClient
import azure.functions as func
from io import StringIO

def main(req: func.HttpRequest) -> func.HttpResponse:
    logging.info('Python HTTP trigger function processed a request.')

    STORAGEACCOUNTURL= 'https://storage.blob.core.windows.net/'
    STORAGEACCOUNTKEY= 'xxxxxxxxxxxxxxxxxxxxxxxxxxxx'
    LOCALFILENAME= ['file1.csv', 'file2.csv']
    CONTAINERNAME= 'inputblob'

    file1 = pd.DataFrame()
    file2 = pd.DataFrame()
    #download from blob

    blob_service_client_instance = BlobServiceClient(account_url=STORAGEACCOUNTURL, credential=STORAGEACCOUNTKEY)
    for i in LOCALFILENAME:
            blob_client_instance = blob_service_client_instance.get_blob_client(container=CONTAINERNAME, blob=i, snapshot=None)
            blob_data = blob_client_instance.download_blob()
            if i == 'file1.csv':
                file1 = pd.read_csv(StringIO(blob_data.content_as_text()))
            if i == 'file2.csv':
                file2 = pd.read_csv(StringIO(blob_data.content_as_text()))

    
    # load
    summary = pd.merge(left=file1, right=file2, on='key', how='inner')
    summary.to_csv()

    service_client = DataLakeServiceClient(account_url="https://storage.dfs.core.windows.net/", credential=STORAGEACCOUNTKEY)
    file_system_client = service_client.get_file_system_client(file_system="outputdatalake")
    directory_client = file_system_client.get_directory_client("my-directory") 
    file_client = directory_client.create_file("merged.csv") 
    file_contents = summary.to_csv()
    file_client.upload_data(file_contents, overwrite=True) 

    return("This HTTP triggered function executed successfully.")

【讨论】:

感谢您的帮助!不幸的是,当我使用您的代码并更新 requirements.txt 时,我仍然遇到同样的错误。 您的输入文件的大小是多少?我希望您在下载整个文件时不会耗尽内存。我的代码适用于小型测试文件。您是否检查过应用洞察的实时指标有哪些错误? 问题是我的内存不足。我不得不使用高级功能应用计划。感谢您的帮助!【参考方案2】:

您的代码看起来不错,并且在另一个环境中运行良好。因此,Azure 函数中可能会由于以下原因导致内部服务器错误:

    确保将 Local.Settings.json 文件中的所有值添加到应用程序设置(FunctionApp -> 配置 -> 应用程序设置)

    检查 CORS。尝试添加“*”(启用 CORS 后对存储资源发出的任何请求都必须具有有效的授权标头或必须针对公共资源发出。)

【讨论】:

我已经添加了这些配置,但我仍然得到同样的错误。

以上是关于使用 Python 脚本的 ADF 管道中的 Azure 函数的主要内容,如果未能解决你的问题,请参考以下文章

将python模块导入databricks中的python脚本

Azure 数据工厂 (ADF) 中的选择性部署?

Azure ADF 中具有动态源的增量复制 cdc 记录

如何从 ADF 中的执行管道获取输出参数?

迁移 ADF - 与链接服务和管道链接到 Synapse Analytics 的数据集

在 ADF 中参数化 Azure Blob 存储链接服务