如何在 Azure 数据工厂的 DataFlow 中获取管道的元数据?想要创建调试管道

Posted

技术标签:

【中文标题】如何在 Azure 数据工厂的 DataFlow 中获取管道的元数据?想要创建调试管道【英文标题】:How to get meta data of the pipeline in DataFlow of Azure data factory? Want to create a debug pipeline 【发布时间】:2021-11-30 19:37:12 【问题描述】:

我有一项任务需要创建一个仪表板,该仪表板将提供在 azure datafactory 中运行的管道的详细信息。为此,我想将数据加载到 SQL 中。目的是测试任何数据流的性能。管道将是一个即插即用模型,我们只需拖放要测试的数据流,建立连接,并在每次管道运行时,从管道中获取以下数据。以下是我要填写的列名:

**Sr No:**               Auto Increment, Primary key
**pipeline ID:**         <ID of pipeline run>
**Dataflow Start Time:** ddmmyy HH:SS
**Dataflow end time:**   ddmmyy HH:SS
**Duration to acquire compute:** In seconds
**Data input:**           Size of data read from source
**Data output:**          Size of data written to sink
**Run type:**             Triggered / Manual(Debug) run
**DataFlow Status:** Failed / Success

我可以看到元数据刀片来获取源的元数据。但是我似乎无法找到资源来获取其他信息。如果您知道如何在 Azure 数据工厂中创建这样的管道,请提供帮助。

【问题讨论】:

除了Monitor Data Flows 和Mapping data flows performance and tuning guide 之外,您还在寻找吗?您可以在管道中使用REST API 获取信息或使用诊断设置从 ADF 资源日志中查询 我想要Monitor Data Flows 中提供的统计信息,但需要 SQL 中的这些值(如写入的行、成功或失败等)。在特定数据流运行后有什么方法可以提取这些统计信息?目的是将这些统计信息分享给不属于 Azure 的人员,以便他们进行分析。所以这些人无法登录来监控管道运行,他们需要将所有数据导出到 SQL 表中。 数据流活动的输出 JSON 将包含这些统计信息。您可以通过启用 Azure Monitor 集成让 ADF 将它们存储在日志中。或者您可以解析管道中活动的输出。 我的回答有帮助吗? 哦,是的。我花了一些时间来复制它,但几乎得到了我想要的。非常感谢 【参考方案1】:

您可以使用WebActivity 调用 API 并获取详细信息,然后将其复制到 SQL 接收器。

这是一个例子:

    Pipeline Runs - Query By Factory

https://management.azure.com/subscriptions/subscriptionId/resourceGroups/resourceGroupName/providers/Microsoft.DataFactory/factories/factoryName/queryPipelineRuns?api-version=2018-06-01

替换占位符中的实际值。

Output

    "value": [
        
            "id": "/SUBSCRIPTIONS/B83C1EE3-C5B6-44FB-B5BA-2A83A074C23F/RESOURCEGROUPS/myrg/PROVIDERS/MICROSOFT.DATAFACTORY/FACTORIES/KTESTADF/pipelineruns/c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8",
            "runId": "c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8",
            "debugRunId": null,
            "runGroupId": "c619e850-xxxx-xxxx-8d0c-4b73dcacb9d8",
            "pipelineName": "partiion_parquets",
            "parameters": ,
            "invokedBy": 
                "id": "216e5917354a41b983f82d8ceda53789",
                "name": "Manual",
                "invokedByType": "Manual"
            ,
            "runStart": "2021-10-12T13:00:12.9095561Z",
            "runEnd": "2021-10-12T13:03:17.9824334Z",
            "durationInMs": 185072,
            "status": "Succeeded",
            "message": "",
            "output": null,
            "lastUpdated": "2021-10-12T13:03:17.9824334Z",
            "annotations": [],
            "runDimension": ,
            "isLatest": true
        
    ],
    "ADFWebActivityResponseHeaders": 
        "Pragma": "no-cache",
        "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
        "X-Content-Type-Options": "nosniff",
        "x-ms-ratelimit-remaining-subscription-reads": "11999",
        "x-ms-request-id": "4373f002-b352-4824-857b-15f5325241ae",
        "x-ms-correlation-request-id": "4373f002-b352-4824-857b-15f5325241ae",
        "x-ms-routing-request-id": "CENTRALUS:20211012T131047Z:4373f002-b352-4824-857b-15f5325241ae",
        "Cache-Control": "no-cache",
        "Date": "Tue, 12 Oct 2021 13:10:46 GMT",
        "Server": "Microsoft-IIS/10.0",
        "X-Powered-By": "ASP.NET",
        "Content-Length": "717",
        "Content-Type": "application/json; charset=utf-8",
        "Expires": "-1"
    ,
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
    "executionDuration": 0,
    "durationInQueue": 
        "integrationRuntimeQueue": 0
    ,
    "billingReference": 
        "activityType": "ExternalActivity",
        "billableDuration": [
            
                "meterType": "AzureIR",
                "duration": 0.016666666666666666,
                "unit": "Hours"
            
        ]
    

接下来例如,使用前面输出中的管道运行 ID,调用 api 以获取数据流详细信息。

    Activity Runs - Query By Pipeline Run

https://management.azure.com/subscriptions/subscriptionId/resourceGroups/resourceGroupName/providers/Microsoft.DataFactory/factories/factoryName/pipelineruns/runId/queryActivityruns?api-version=2018-06-01

Output

    "value": [
        
            "activityRunEnd": "2021-10-12T13:03:17.1874105Z",
            "activityName": "Data flow1",
            "activityRunStart": "2021-10-12T13:00:14.7523815Z",
            "activityType": "ExecuteDataFlow",
            "durationInMs": 182435,
            "retryAttempt": null,
            "error": 
                "errorCode": "",
                "message": "",
                "failureType": "",
                "target": "Data flow1",
                "details": ""
            ,
            "activityRunId": "5e7aece7-5fb7-4ffd-939e-d01d3fcb494a",
            "iterationHash": "",
            "input": 
                "dataflow": 
                    "referenceName": "generateParquet",
                    "type": "DataFlowReference",
                    "parameters": ,
                    "datasetParameters": 
                        "source1": ,
                        "sink1": 
                    
                ,
                "staging": ,
                "compute": 
                    "coreCount": 8,
                    "computeType": "General"
                ,
                "traceLevel": "Fine",
                "dataFlowETag": "48019f9b-0000-0300-0000-616475e20000"
            ,
            "linkedServiceName": "",
            "output": 
                "runStatus": 
                    "computeAcquisitionDuration": 107335,
                    "version": "20211011.2",
                    "profile": 
                        "source1": 
                            "computed": [],
                            "lineage": ,
                            "dropped": 0,
                            "drifted": 0,
                            "newer": 16,
                            "total": 16,
                            "updated": 0
                        ,
                        "sink1": 
                            "computed": [],
                            "lineage": 
                                "CompanyName": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "CompanyName"
                                            ]
                                        
                                    ]
                                ,
                                "LastName": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "LastName"
                                            ]
                                        
                                    ]
                                ,
                                "MiddleName": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "MiddleName"
                                            ]
                                        
                                    ]
                                ,
                                "PasswordSalt": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "PasswordSalt"
                                            ]
                                        
                                    ]
                                ,
                                "ModifiedDate": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "ModifiedDate"
                                            ]
                                        
                                    ]
                                ,
                                "path": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "path"
                                            ]
                                        
                                    ]
                                ,
                                "rowguid": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "rowguid"
                                            ]
                                        
                                    ]
                                ,
                                "Title": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "Title"
                                            ]
                                        
                                    ]
                                ,
                                "NameStyle": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "NameStyle"
                                            ]
                                        
                                    ]
                                ,
                                "CustomerID": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "CustomerID"
                                            ]
                                        
                                    ]
                                ,
                                "FirstName": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "FirstName"
                                            ]
                                        
                                    ]
                                ,
                                "Suffix": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "Suffix"
                                            ]
                                        
                                    ]
                                ,
                                "Phone": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "Phone"
                                            ]
                                        
                                    ]
                                ,
                                "PasswordHash": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "PasswordHash"
                                            ]
                                        
                                    ]
                                ,
                                "SalesPerson": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "SalesPerson"
                                            ]
                                        
                                    ]
                                ,
                                "EmailAddress": 
                                    "mapped": true,
                                    "from": [
                                        
                                            "source": "source1",
                                            "columns": [
                                                "EmailAddress"
                                            ]
                                        
                                    ]
                                
                            ,
                            "dropped": 0,
                            "drifted": 0,
                            "newer": 0,
                            "total": 16,
                            "updated": 16
                        
                    ,
                    "metrics": 
                        "sink1": 
                            "format": "parquet",
                            "stages": [
                                
                                    "stage": 1,
                                    "partitionTimes": [
                                        525
                                    ],
                                    "lastUpdateTime": "2021-10-12 13:02:50.453",
                                    "bytesWritten": 0,
                                    "bytesRead": 199491,
                                    "partitionStatus": "Success",
                                    "streams": 
                                        "source1": 
                                            "count": 847,
                                            "cached": false,
                                            "totalPartitions": 1,
                                            "partitionStatus": "Success",
                                            "partitionCounts": [
                                                847
                                            ],
                                            "type": "source"
                                        
                                    ,
                                    "target": "sink1",
                                    "time": 640,
                                    "progressState": "Completed"
                                ,
                                
                                    "stage": 2,
                                    "partitionTimes": [
                                        3436,
                                        3373,
                                        3441,
                                        3475,
                                        2304,
                                        2321,
                                        2364,
                                        2291,
                                        1989,
                                        2435
                                    ],
                                    "lastUpdateTime": "2021-10-12 13:02:58.897",
                                    "bytesWritten": 177728,
                                    "bytesRead": 0,
                                    "partitionStatus": "Success",
                                    "streams": 
                                        "sink1": 
                                            "count": 847,
                                            "cached": false,
                                            "totalPartitions": 10,
                                            "partitionStatus": "Success",
                                            "partitionCounts": [
                                                84,
                                                85,
                                                85,
                                                85,
                                                85,
                                                85,
                                                85,
                                                85,
                                                84,
                                                84
                                            ],
                                            "type": "sink"
                                        
                                    ,
                                    "target": "sink1",
                                    "time": 8452,
                                    "progressState": "Completed"
                                
                            ],
                            "sinkPostProcessingTime": 0,
                            "data": [],
                            "store": "blob",
                            "rowsWritten": 847,
                            "details": 
                                "postCommandsDuration": [
                                    3
                                ],
                                "preCommandsDuration": [
                                    1
                                ]
                            ,
                            "sources": 
                                "source1": 
                                    "rowsRead": 847,
                                    "store": "blob",
                                    "details": 
                                        "fileSystemInitDuration": [
                                            7751
                                        ]
                                    ,
                                    "format": "delimited"
                                
                            ,
                            "sinkProcessingTime": 12380
                        
                    ,
                    "dsl": "\nsource() ~> source1\n\nsource1 sink() ~> sink1"
                ,
                "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
                "billingReference": 
                    "activityType": "executedataflow",
                    "billableDuration": [
                        
                            "meterType": "General",
                            "duration": 0.3691165737777778,
                            "unit": "coreHour",
                            "sessionType": "JobCluster"
                        
                    ]
                ,
                "reportLineageToPurview": 
                    "status": "NotReported"
                
            ,
            "userProperties": ,
            "pipelineName": "partiion_parquets",
            "pipelineRunId": "c619e850-c7de-4d1f-8d0c-4b73dcacb9d8",
            "status": "Succeeded",
            "recoveryStatus": "None",
            "integrationRuntimeNames": [
                "defaultintegrationruntime"
            ],
            "executionDetails": 
                "integrationRuntime": [
                    
                        "name": "AutoResolveIntegrationRuntime",
                        "type": "Managed",
                        "computeType": "General",
                        "coreCount": "8"
                    
                ]
            ,
            "id": "/SUBSCRIPTIONS/B83C1EE3-xxxx-xxxx-B5BA-2E83A074C23K/RESOURCEGROUPS/myrg/PROVIDERS/MICROSOFT.DATAFACTORY/FACTORIES/KTESTADF/pipelineruns/c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8/activityruns/5e7aece7-xxxx-4ffd-939e-d01d3fcb494a"
        
    ],
    "ADFWebActivityResponseHeaders": 
        "Pragma": "no-cache",
        "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
        "X-Content-Type-Options": "nosniff",
        "x-ms-ratelimit-remaining-subscription-reads": "11999",
        "x-ms-request-id": "e8fa08f2-594a-4464-b43e-d71104c1950c",
        "x-ms-correlation-request-id": "e8fa08f2-594a-4464-b43e-d71104c1950c",
        "x-ms-routing-request-id": "CENTRALUS:20211012T131050Z:e8fa08f2-594a-4464-b43e-d71104c1950c",
        "Cache-Control": "no-cache",
        "Date": "Tue, 12 Oct 2021 13:10:49 GMT",
        "Server": "Microsoft-IIS/10.0",
        "X-Powered-By": "ASP.NET",
        "Content-Length": "4307",
        "Content-Type": "application/json; charset=utf-8",
        "Expires": "-1"
    ,
    "effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
    "executionDuration": 0,
    "durationInQueue": 
        "integrationRuntimeQueue": 0
    ,
    "billingReference": 
        "activityType": "ExternalActivity",
        "billableDuration": [
            
                "meterType": "AzureIR",
                "duration": 0.016666666666666666,
                "unit": "Hours"
            
        ]
    

您可以从上面筛选出"activityType": "ExecuteDataFlow"durationInMsactivityRunStartactivityRunEndinvokedByTypeinputoutput 和其他指标。

只需按照Azure Data Factory version 2 (V2) REST API operations 进行操作,将选定的输出参数化并在必要时存储在变量中。适当地使用Add dynamic content 构建 URL 和 JSON 请求正文。

【讨论】:

以上是关于如何在 Azure 数据工厂的 DataFlow 中获取管道的元数据?想要创建调试管道的主要内容,如果未能解决你的问题,请参考以下文章

使用 Azure 数据工厂 (ADF) 数据流 (DF) 从/向 Azure Data Lake Store gen1 发送和接收数据

在 Azure 数据工厂中完成活动后,如何向 Azure 服务总线发送消息

如何从 Azure 数据工厂安全地调用 Azure 逻辑应用

如何使用 Azure 数据工厂管道创建容器?

如何使用 azure 数据工厂拆分列值

如何绕过 Azure 数据工厂 ARM 模板参数限制?