在 Azure Synapse 中并行作业运行期间避免重复

Posted

技术标签:

【中文标题】在 Azure Synapse 中并行作业运行期间避免重复【英文标题】:Avoid duplicates during parallel job run in Azure Synapse 【发布时间】:2020-10-20 12:06:05 【问题描述】:

在 Azure Synapse 中开发代码时需要您的建议。

我们有一个要求,我们的作业将同时并行运行并将数据插入到同一个表中。 在此插入过程中,重复条目将被插入到同一个表中。 例如:如果 Job A 和 Job B 同时运行,并且具有相同的值,则“不存在”或“不在”将无法工作。在这种情况下,我将从这两个工作中得到重复。主键或唯一约束允许 Azure 突触中的重复项。在数据插入期间有没有最好的方法来锁定表。就像作业 A 正在运行一样,作业 B 不应该将数据插入到同一个表中。请提出您的建议,因为我是新手。注意:我们使用存储过程通过 ADF V2 加载数据

谢谢, 南迪尼

【问题讨论】:

【参考方案1】:

在将数据插入 Azure Synapse 之前,必须在作业中处理重复项。如果两个作业之间存在重复,则在两个作业完成后执行。这实际上取决于您如何加载数据。您可以通过创建临时表而不是直接将数据加载到最终表来轻松管理。请确保临时表的结构应与最终表相同(分布、分区、约束、列的可空性)您可以使用 SQL BCP/INSERT TO/CTAS/CTAS 进行分区切换,将阶段表转换为最终表。

如果您可以分享特定场景,提供与您的用例相关的建议会很有帮助。

【讨论】:

【参考方案2】:

我刚遇到同样的情况,我用 Pipeline Runs - Query By Factory

解决了它
    DataFlow 活动之前使用Until 活动,使用此表达式@equals(activity('pingPL').output.value[0].runId, pipeline().RunId) 将值写入表中,如下所示:

    Until活动中放入一个网络活动和一个等待时间:

一个。 Web 活动主体-关注docs:


  "lastUpdatedAfter": "@addminutes(utcnow(), -30)",
  "lastUpdatedBefore": "@utcnow()",
  "filters": [
    
      "operand": "PipelineName",
      "operator": "Equals",
      "values": [
        "pipeline_name_where_writeInSynapse_is_located"
      ]
    ,
    
      "operand": "Status",
      "operator": "Equals",
      "values": [
        "InProgress"
      ]
    
  ]

b. Wait 活动 30 秒或任何有意义的事情

发生的情况是,如果您同时触发多次相同的管道,Web 活动将过滤每个 PL 状态InProgress。它看起来像这样:


    "value": [
        
            "id": "...",
            "runId": "52004775-5ef5-493b-8a44-ee3fff6bff7b",
            "debugRunId": null,
            "runGroupId": "52004775-5ef5-493b-8a44-ee3fff6bff7b",
            "pipelineName": "synapse_writting",
            "parameters": 
                "region": "NW",
                "unique_item": "a"
            ,
            "invokedBy": 
                "id": "80efce4dbda74636878bc99472978ccf",
                "name": "Manual",
                "invokedByType": "Manual"
            ,
            "runStart": "2021-10-13T17:24:01.0210945Z",
            "runEnd": "2021-10-13T17:25:06.9692394Z",
            "durationInMs": 65948,
            "status": "InProgress",
            "message": "",
            "output": null,
            "lastUpdated": "2021-10-13T17:25:06.9704432Z",
            "annotations": [],
            "runDimension": ,
            "isLatest": true
        ,
        
            "id": "...",
            "runId": "cf3f5038-ba10-44c3-b8f5-df8ad4c85819",
            "debugRunId": null,
            "runGroupId": "cf3f5038-ba10-44c3-b8f5-df8ad4c85819",
            "pipelineName": "synapse_writting",
            "parameters": 
                "region": "NW",
                "unique_item": "a"
            ,
            "invokedBy": 
                "id": "08205e0eda0b41f6b5a90a8dda06a7f6",
                "name": "Manual",
                "invokedByType": "Manual"
            ,
            "runStart": "2021-10-13T17:28:58.219611Z",
            "runEnd": null,
            "durationInMs": null,
            "status": "InProgress",
            "message": "",
            "output": null,
            "lastUpdated": "2021-10-13T17:29:00.9860175Z",
            "annotations": [],
            "runDimension": ,
            "isLatest": true
        
    ],
    "ADFWebActivityResponseHeaders": 
        "Pragma": "no-cache",
        "Strict-Transport-Security": "max-age=31536000; includeSubDomains",
        "X-Content-Type-Options": "nosniff",
        "x-ms-ratelimit-remaining-subscription-reads": "11999",
        "x-ms-request-id": "188508ef-8897-4c21-8c37-ccdd4adc6d81",
        "x-ms-correlation-request-id": "188508ef-8897-4c21-8c37-ccdd4adc6d81",
        "x-ms-routing-request-id": "WESTUS2:20211013T172902Z:188508ef-8897-4c21-8c37-ccdd4adc6d81",
        "Cache-Control": "no-cache",
        "Date": "Wed, 13 Oct 2021 17:29:02 GMT",
        "Server": "Microsoft-IIS/10.0",
        "X-Powered-By": "ASP.NET",
        "Content-Length": "1492",
        "Content-Type": "application/json; charset=utf-8",
        "Expires": "-1"
    ,
    "effectiveIntegrationRuntime": "NCAP-Simple-DataMovement (West US 2)",
    "executionDuration": 0,
    "durationInQueue": 
        "integrationRuntimeQueue": 0
    ,
    "billingReference": 
        "activityType": "ExternalActivity",
        "billableDuration": [
            
                "meterType": "AzureIR",
                "duration": 0.016666666666666666,
                "unit": "Hours"
            
        ]
    

然后Until 表达式将评估第一个value[0] 是否具有runId == pipeline_runid 以停止直到活动并运行在 Synapse 中写入的dataflow。一旦 PL 结束,状态将为 Succeeded,另一个作业中的 Web 活动将获得下一个 value[0],状态为 InProgress,并继续下一次写入。这会创建对并行作业的依赖,以等待数据流验证并在需要时写入表。

【讨论】:

以上是关于在 Azure Synapse 中并行作业运行期间避免重复的主要内容,如果未能解决你的问题,请参考以下文章

Azure Synapse:Spark 作业定义中指定的目标 Spark 池未处于成功状态。当前状态:供应

是否可以使用带有魔术命令的 Azure Synapse 在 Apache Spark 中运行 Bash 命令

文档中显示的 sql 代码未在 azure synapse 专用 sql 池上运行

Azure Synapse 中的 Hive

在 Azure Synapse 笔记本 ValueError 中运行 nltk.download:对关闭的文件进行 I/O 操作

Azure Synapse - 增量数据加载