如何在 Azure 数据工厂的 DataFlow 中获取管道的元数据?想要创建调试管道
Posted
技术标签:
【中文标题】如何在 Azure 数据工厂的 DataFlow 中获取管道的元数据?想要创建调试管道【英文标题】:How to get meta data of the pipeline in DataFlow of Azure data factory? Want to create a debug pipeline 【发布时间】:2021-11-30 19:37:12 【问题描述】:我有一项任务需要创建一个仪表板,该仪表板将提供在 azure datafactory 中运行的管道的详细信息。为此,我想将数据加载到 SQL 中。目的是测试任何数据流的性能。管道将是一个即插即用模型,我们只需拖放要测试的数据流,建立连接,并在每次管道运行时,从管道中获取以下数据。以下是我要填写的列名:
**Sr No:** Auto Increment, Primary key
**pipeline ID:** <ID of pipeline run>
**Dataflow Start Time:** ddmmyy HH:SS
**Dataflow end time:** ddmmyy HH:SS
**Duration to acquire compute:** In seconds
**Data input:** Size of data read from source
**Data output:** Size of data written to sink
**Run type:** Triggered / Manual(Debug) run
**DataFlow Status:** Failed / Success
我可以看到元数据刀片来获取源的元数据。但是我似乎无法找到资源来获取其他信息。如果您知道如何在 Azure 数据工厂中创建这样的管道,请提供帮助。
【问题讨论】:
除了Monitor Data Flows 和Mapping data flows performance and tuning guide 之外,您还在寻找吗?您可以在管道中使用REST API 获取信息或使用诊断设置从 ADF 资源日志中查询 我想要Monitor Data Flows 中提供的统计信息,但需要 SQL 中的这些值(如写入的行、成功或失败等)。在特定数据流运行后有什么方法可以提取这些统计信息?目的是将这些统计信息分享给不属于 Azure 的人员,以便他们进行分析。所以这些人无法登录来监控管道运行,他们需要将所有数据导出到 SQL 表中。 数据流活动的输出 JSON 将包含这些统计信息。您可以通过启用 Azure Monitor 集成让 ADF 将它们存储在日志中。或者您可以解析管道中活动的输出。 我的回答有帮助吗? 哦,是的。我花了一些时间来复制它,但几乎得到了我想要的。非常感谢 【参考方案1】:您可以使用WebActivity
调用 API 并获取详细信息,然后将其复制到 SQL 接收器。
这是一个例子:
-
Pipeline Runs - Query By Factory
https://management.azure.com/subscriptions/subscriptionId/resourceGroups/resourceGroupName/providers/Microsoft.DataFactory/factories/factoryName/queryPipelineRuns?api-version=2018-06-01
替换占位符中的实际值。
Output
"value": [
"id": "/SUBSCRIPTIONS/B83C1EE3-C5B6-44FB-B5BA-2A83A074C23F/RESOURCEGROUPS/myrg/PROVIDERS/MICROSOFT.DATAFACTORY/FACTORIES/KTESTADF/pipelineruns/c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8",
"runId": "c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8",
"debugRunId": null,
"runGroupId": "c619e850-xxxx-xxxx-8d0c-4b73dcacb9d8",
"pipelineName": "partiion_parquets",
"parameters": ,
"invokedBy":
"id": "216e5917354a41b983f82d8ceda53789",
"name": "Manual",
"invokedByType": "Manual"
,
"runStart": "2021-10-12T13:00:12.9095561Z",
"runEnd": "2021-10-12T13:03:17.9824334Z",
"durationInMs": 185072,
"status": "Succeeded",
"message": "",
"output": null,
"lastUpdated": "2021-10-12T13:03:17.9824334Z",
"annotations": [],
"runDimension": ,
"isLatest": true
],
"ADFWebActivityResponseHeaders":
"Pragma": "no-cache",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"X-Content-Type-Options": "nosniff",
"x-ms-ratelimit-remaining-subscription-reads": "11999",
"x-ms-request-id": "4373f002-b352-4824-857b-15f5325241ae",
"x-ms-correlation-request-id": "4373f002-b352-4824-857b-15f5325241ae",
"x-ms-routing-request-id": "CENTRALUS:20211012T131047Z:4373f002-b352-4824-857b-15f5325241ae",
"Cache-Control": "no-cache",
"Date": "Tue, 12 Oct 2021 13:10:46 GMT",
"Server": "Microsoft-IIS/10.0",
"X-Powered-By": "ASP.NET",
"Content-Length": "717",
"Content-Type": "application/json; charset=utf-8",
"Expires": "-1"
,
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
"executionDuration": 0,
"durationInQueue":
"integrationRuntimeQueue": 0
,
"billingReference":
"activityType": "ExternalActivity",
"billableDuration": [
"meterType": "AzureIR",
"duration": 0.016666666666666666,
"unit": "Hours"
]
接下来例如,使用前面输出中的管道运行 ID,调用 api 以获取数据流详细信息。
-
Activity Runs - Query By Pipeline Run
https://management.azure.com/subscriptions/subscriptionId/resourceGroups/resourceGroupName/providers/Microsoft.DataFactory/factories/factoryName/pipelineruns/runId/queryActivityruns?api-version=2018-06-01
Output
"value": [
"activityRunEnd": "2021-10-12T13:03:17.1874105Z",
"activityName": "Data flow1",
"activityRunStart": "2021-10-12T13:00:14.7523815Z",
"activityType": "ExecuteDataFlow",
"durationInMs": 182435,
"retryAttempt": null,
"error":
"errorCode": "",
"message": "",
"failureType": "",
"target": "Data flow1",
"details": ""
,
"activityRunId": "5e7aece7-5fb7-4ffd-939e-d01d3fcb494a",
"iterationHash": "",
"input":
"dataflow":
"referenceName": "generateParquet",
"type": "DataFlowReference",
"parameters": ,
"datasetParameters":
"source1": ,
"sink1":
,
"staging": ,
"compute":
"coreCount": 8,
"computeType": "General"
,
"traceLevel": "Fine",
"dataFlowETag": "48019f9b-0000-0300-0000-616475e20000"
,
"linkedServiceName": "",
"output":
"runStatus":
"computeAcquisitionDuration": 107335,
"version": "20211011.2",
"profile":
"source1":
"computed": [],
"lineage": ,
"dropped": 0,
"drifted": 0,
"newer": 16,
"total": 16,
"updated": 0
,
"sink1":
"computed": [],
"lineage":
"CompanyName":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"CompanyName"
]
]
,
"LastName":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"LastName"
]
]
,
"MiddleName":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"MiddleName"
]
]
,
"PasswordSalt":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"PasswordSalt"
]
]
,
"ModifiedDate":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"ModifiedDate"
]
]
,
"path":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"path"
]
]
,
"rowguid":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"rowguid"
]
]
,
"Title":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"Title"
]
]
,
"NameStyle":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"NameStyle"
]
]
,
"CustomerID":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"CustomerID"
]
]
,
"FirstName":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"FirstName"
]
]
,
"Suffix":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"Suffix"
]
]
,
"Phone":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"Phone"
]
]
,
"PasswordHash":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"PasswordHash"
]
]
,
"SalesPerson":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"SalesPerson"
]
]
,
"EmailAddress":
"mapped": true,
"from": [
"source": "source1",
"columns": [
"EmailAddress"
]
]
,
"dropped": 0,
"drifted": 0,
"newer": 0,
"total": 16,
"updated": 16
,
"metrics":
"sink1":
"format": "parquet",
"stages": [
"stage": 1,
"partitionTimes": [
525
],
"lastUpdateTime": "2021-10-12 13:02:50.453",
"bytesWritten": 0,
"bytesRead": 199491,
"partitionStatus": "Success",
"streams":
"source1":
"count": 847,
"cached": false,
"totalPartitions": 1,
"partitionStatus": "Success",
"partitionCounts": [
847
],
"type": "source"
,
"target": "sink1",
"time": 640,
"progressState": "Completed"
,
"stage": 2,
"partitionTimes": [
3436,
3373,
3441,
3475,
2304,
2321,
2364,
2291,
1989,
2435
],
"lastUpdateTime": "2021-10-12 13:02:58.897",
"bytesWritten": 177728,
"bytesRead": 0,
"partitionStatus": "Success",
"streams":
"sink1":
"count": 847,
"cached": false,
"totalPartitions": 10,
"partitionStatus": "Success",
"partitionCounts": [
84,
85,
85,
85,
85,
85,
85,
85,
84,
84
],
"type": "sink"
,
"target": "sink1",
"time": 8452,
"progressState": "Completed"
],
"sinkPostProcessingTime": 0,
"data": [],
"store": "blob",
"rowsWritten": 847,
"details":
"postCommandsDuration": [
3
],
"preCommandsDuration": [
1
]
,
"sources":
"source1":
"rowsRead": 847,
"store": "blob",
"details":
"fileSystemInitDuration": [
7751
]
,
"format": "delimited"
,
"sinkProcessingTime": 12380
,
"dsl": "\nsource() ~> source1\n\nsource1 sink() ~> sink1"
,
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
"billingReference":
"activityType": "executedataflow",
"billableDuration": [
"meterType": "General",
"duration": 0.3691165737777778,
"unit": "coreHour",
"sessionType": "JobCluster"
]
,
"reportLineageToPurview":
"status": "NotReported"
,
"userProperties": ,
"pipelineName": "partiion_parquets",
"pipelineRunId": "c619e850-c7de-4d1f-8d0c-4b73dcacb9d8",
"status": "Succeeded",
"recoveryStatus": "None",
"integrationRuntimeNames": [
"defaultintegrationruntime"
],
"executionDetails":
"integrationRuntime": [
"name": "AutoResolveIntegrationRuntime",
"type": "Managed",
"computeType": "General",
"coreCount": "8"
]
,
"id": "/SUBSCRIPTIONS/B83C1EE3-xxxx-xxxx-B5BA-2E83A074C23K/RESOURCEGROUPS/myrg/PROVIDERS/MICROSOFT.DATAFACTORY/FACTORIES/KTESTADF/pipelineruns/c619e850-xxxx-xxxx-xxxx-4b73dcacb9d8/activityruns/5e7aece7-xxxx-4ffd-939e-d01d3fcb494a"
],
"ADFWebActivityResponseHeaders":
"Pragma": "no-cache",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"X-Content-Type-Options": "nosniff",
"x-ms-ratelimit-remaining-subscription-reads": "11999",
"x-ms-request-id": "e8fa08f2-594a-4464-b43e-d71104c1950c",
"x-ms-correlation-request-id": "e8fa08f2-594a-4464-b43e-d71104c1950c",
"x-ms-routing-request-id": "CENTRALUS:20211012T131050Z:e8fa08f2-594a-4464-b43e-d71104c1950c",
"Cache-Control": "no-cache",
"Date": "Tue, 12 Oct 2021 13:10:49 GMT",
"Server": "Microsoft-IIS/10.0",
"X-Powered-By": "ASP.NET",
"Content-Length": "4307",
"Content-Type": "application/json; charset=utf-8",
"Expires": "-1"
,
"effectiveIntegrationRuntime": "DefaultIntegrationRuntime (Central US)",
"executionDuration": 0,
"durationInQueue":
"integrationRuntimeQueue": 0
,
"billingReference":
"activityType": "ExternalActivity",
"billableDuration": [
"meterType": "AzureIR",
"duration": 0.016666666666666666,
"unit": "Hours"
]
您可以从上面筛选出"activityType": "ExecuteDataFlow"
、durationInMs
、activityRunStart
、activityRunEnd
、invokedByType
、input
、output
和其他指标。
只需按照Azure Data Factory version 2 (V2) REST API operations 进行操作,将选定的输出参数化并在必要时存储在变量中。适当地使用Add dynamic content
构建 URL 和 JSON 请求正文。
【讨论】:
以上是关于如何在 Azure 数据工厂的 DataFlow 中获取管道的元数据?想要创建调试管道的主要内容,如果未能解决你的问题,请参考以下文章
使用 Azure 数据工厂 (ADF) 数据流 (DF) 从/向 Azure Data Lake Store gen1 发送和接收数据
在 Azure 数据工厂中完成活动后,如何向 Azure 服务总线发送消息