在 Azure Synapse 中并行作业运行期间避免重复
Posted
技术标签:
【中文标题】在 Azure Synapse 中并行作业运行期间避免重复【英文标题】:Avoid duplicates during parallel job run in Azure Synapse 【发布时间】:2020-10-20 12:06:05 【问题描述】:在 Azure Synapse 中开发代码时需要您的建议。
我们有一个要求,我们的作业将同时并行运行并将数据插入到同一个表中。 在此插入过程中,重复条目将被插入到同一个表中。 例如:如果 Job A 和 Job B 同时运行,并且具有相同的值,则“不存在”或“不在”将无法工作。在这种情况下,我将从这两个工作中得到重复。主键或唯一约束允许 Azure 突触中的重复项。在数据插入期间有没有最好的方法来锁定表。就像作业 A 正在运行一样,作业 B 不应该将数据插入到同一个表中。请提出您的建议,因为我是新手。注意:我们使用存储过程通过 ADF V2 加载数据
谢谢, 南迪尼
【问题讨论】:
【参考方案1】:在将数据插入 Azure Synapse 之前,必须在作业中处理重复项。如果两个作业之间存在重复,则在两个作业完成后执行。这实际上取决于您如何加载数据。您可以通过创建临时表而不是直接将数据加载到最终表来轻松管理。请确保临时表的结构应与最终表相同(分布、分区、约束、列的可空性)您可以使用 SQL BCP/INSERT TO/CTAS/CTAS 进行分区切换,将阶段表转换为最终表。
如果您可以分享特定场景,提供与您的用例相关的建议会很有帮助。
【讨论】:
【参考方案2】:我刚遇到同样的情况,我用 Pipeline Runs - Query By Factory
解决了它-
在
DataFlow
活动之前使用Until
活动,使用此表达式@equals(activity('pingPL').output.value[0].runId, pipeline().RunId)
将值写入表中,如下所示:
-
在
Until
活动中放入一个网络活动和一个等待时间:
一个。 Web
活动主体-关注docs:
"lastUpdatedAfter": "@addminutes(utcnow(), -30)",
"lastUpdatedBefore": "@utcnow()",
"filters": [
"operand": "PipelineName",
"operator": "Equals",
"values": [
"pipeline_name_where_writeInSynapse_is_located"
]
,
"operand": "Status",
"operator": "Equals",
"values": [
"InProgress"
]
]
b. Wait
活动 30 秒或任何有意义的事情
发生的情况是,如果您同时触发多次相同的管道,Web 活动将过滤每个 PL 状态InProgress
。它看起来像这样:
"value": [
"id": "...",
"runId": "52004775-5ef5-493b-8a44-ee3fff6bff7b",
"debugRunId": null,
"runGroupId": "52004775-5ef5-493b-8a44-ee3fff6bff7b",
"pipelineName": "synapse_writting",
"parameters":
"region": "NW",
"unique_item": "a"
,
"invokedBy":
"id": "80efce4dbda74636878bc99472978ccf",
"name": "Manual",
"invokedByType": "Manual"
,
"runStart": "2021-10-13T17:24:01.0210945Z",
"runEnd": "2021-10-13T17:25:06.9692394Z",
"durationInMs": 65948,
"status": "InProgress",
"message": "",
"output": null,
"lastUpdated": "2021-10-13T17:25:06.9704432Z",
"annotations": [],
"runDimension": ,
"isLatest": true
,
"id": "...",
"runId": "cf3f5038-ba10-44c3-b8f5-df8ad4c85819",
"debugRunId": null,
"runGroupId": "cf3f5038-ba10-44c3-b8f5-df8ad4c85819",
"pipelineName": "synapse_writting",
"parameters":
"region": "NW",
"unique_item": "a"
,
"invokedBy":
"id": "08205e0eda0b41f6b5a90a8dda06a7f6",
"name": "Manual",
"invokedByType": "Manual"
,
"runStart": "2021-10-13T17:28:58.219611Z",
"runEnd": null,
"durationInMs": null,
"status": "InProgress",
"message": "",
"output": null,
"lastUpdated": "2021-10-13T17:29:00.9860175Z",
"annotations": [],
"runDimension": ,
"isLatest": true
],
"ADFWebActivityResponseHeaders":
"Pragma": "no-cache",
"Strict-Transport-Security": "max-age=31536000; includeSubDomains",
"X-Content-Type-Options": "nosniff",
"x-ms-ratelimit-remaining-subscription-reads": "11999",
"x-ms-request-id": "188508ef-8897-4c21-8c37-ccdd4adc6d81",
"x-ms-correlation-request-id": "188508ef-8897-4c21-8c37-ccdd4adc6d81",
"x-ms-routing-request-id": "WESTUS2:20211013T172902Z:188508ef-8897-4c21-8c37-ccdd4adc6d81",
"Cache-Control": "no-cache",
"Date": "Wed, 13 Oct 2021 17:29:02 GMT",
"Server": "Microsoft-IIS/10.0",
"X-Powered-By": "ASP.NET",
"Content-Length": "1492",
"Content-Type": "application/json; charset=utf-8",
"Expires": "-1"
,
"effectiveIntegrationRuntime": "NCAP-Simple-DataMovement (West US 2)",
"executionDuration": 0,
"durationInQueue":
"integrationRuntimeQueue": 0
,
"billingReference":
"activityType": "ExternalActivity",
"billableDuration": [
"meterType": "AzureIR",
"duration": 0.016666666666666666,
"unit": "Hours"
]
然后Until
表达式将评估第一个value[0]
是否具有runId == pipeline_runid
以停止直到活动并运行在 Synapse 中写入的dataflow
。一旦 PL 结束,状态将为 Succeeded
,另一个作业中的 Web
活动将获得下一个 value[0]
,状态为 InProgress
,并继续下一次写入。这会创建对并行作业的依赖,以等待数据流验证并在需要时写入表。
【讨论】:
以上是关于在 Azure Synapse 中并行作业运行期间避免重复的主要内容,如果未能解决你的问题,请参考以下文章
Azure Synapse:Spark 作业定义中指定的目标 Spark 池未处于成功状态。当前状态:供应
是否可以使用带有魔术命令的 Azure Synapse 在 Apache Spark 中运行 Bash 命令
文档中显示的 sql 代码未在 azure synapse 专用 sql 池上运行
在 Azure Synapse 笔记本 ValueError 中运行 nltk.download:对关闭的文件进行 I/O 操作