Azure Synapse Analysis 开箱 Blog - 伍 -- Data Factory Data Pipeline Automation

Posted wekang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Azure Synapse Analysis 开箱 Blog - 伍 -- Data Factory Data Pipeline Automation相关的知识,希望对你有一定的参考价值。

        上一篇我们介绍到通过 Date Warehouse T-SQL Script 来实现 CDC 数据的 ETL 和 Update,本篇 Blog 带大家通过 Data Factory 工具将该数据处理水线实现自动话,大体思路是将前面的 Data Warehouse ETL 和 Update 通过存储过程在 DW 中函数化,然后通过在 Data Factory 中创建数据水线来调起存储过程,整个水线的触发可以通过 Data Lake 中新的的 CDC 数据产生作为事件触发条件。

技术图片

         下面开始进行操作:

1. 创建存储过程,将上一篇中 ELT 和 Update T-SQL 脚本通过存储过程进行实现;

CREATE PROCEDURE CdcdemoUpsert
AS
BEGIN
    CREATE TABLE dbo.[demotable_upsert]
    WITH
    (   DISTRIBUTION = HASH(ID)
    ,   CLUSTERED COLUMNSTORE INDEX
    )
    AS
    -- New rows and new versions of rows
    SELECT      s.[ID]
    ,           s.[PRICE]
    ,           s.[QUANTITY]
    FROM      dbo.[stg_demotable] AS s
    UNION ALL  
    -- Keep rows that are not being touched
    SELECT      p.[ID]
    ,           p.[PRICE]
    ,           p.[QUANTITY]
    FROM      dbo.[demotable] AS p
        WHERE NOT EXISTS
    (   SELECT  *
        FROM    [dbo].[stg_demotable] s
        WHERE   s.[ID] = p.[ID]
    );

    RENAME OBJECT dbo.[demotable]          TO [demotable_old];
    RENAME OBJECT dbo.[demotable_upsert]  TO [demotable];
    DROP TABLE [dbo].[demotable_old];
    DROP TABLE [dbo].[stg_demotable];
END
;

2. 创建 Data Factory Pipeline,先通过 Copy Activity 将 Data Lake 中的 CDC 数据拷贝至 Data Warehouse 中的 Staging Table,再通过调用存储过程实现对 DW 中生产表格的 Update 操作,此步骤可以将下面的 Data Factory Pipeline Json 描述文件导入到 Data Factory 中并按照自己环境中的 SQL Pool 和 Data Lake 连接参数进行修改

技术图片

 

 

{
    "name": "CDC_Pipeline",
    "properties": {
        "activities": [
            {
                "name": "Copy_CDC_Data",
                "type": "Copy",
                "dependsOn": [],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "typeProperties": {
                    "source": {
                        "type": "DelimitedTextSource",
                        "storeSettings": {
                            "type": "AzureBlobFSReadSettings",
                            "recursive": true,
                            "enablePartitionDiscovery": false
                        },
                        "formatSettings": {
                            "type": "DelimitedTextReadSettings"
                        }
                    },
                    "sink": {
                        "type": "SqlDWSink",
                        "preCopyScript": "IF OBJECT_ID(‘[dbo].[stg_demotable]‘) IS NOT NULL
BEGIN
    DROP TABLE [dbo].[stg_demotable]
END;
CREATE TABLE [dbo].[stg_demotable]
(
    [ID] VARCHAR(50) COLLATE SQL_Latin1_General_CP1_CI_AS NOT NULL,
    [PRICE] INT NOT NULL,
    [QUANTITY] INT NOT NULL
)
WITH
(
    DISTRIBUTION = ROUND_ROBIN,
    CLUSTERED COLUMNSTORE INDEX
);",
                        "allowCopyCommand": true,
                        "disableMetricsCollection": false
                    },
                    "enableStaging": false,
                    "translator": {
                        "type": "TabularTranslator",
                        "mappings": [
                            {
                                "source": {
                                    "name": "id",
                                    "type": "Int16"
                                },
                                "sink": {
                                    "name": "ID",
                                    "type": "String"
                                }
                            },
                            {
                                "source": {
                                    "name": "price",
                                    "type": "Int16"
                                },
                                "sink": {
                                    "name": "PRICE",
                                    "type": "Int32"
                                }
                            },
                            {
                                "source": {
                                    "name": "quantity",
                                    "type": "String"
                                },
                                "sink": {
                                    "name": "QUANTITY",
                                    "type": "Int32"
                                }
                            }
                        ]
                    }
                },
                "inputs": [
                    {
                        "referenceName": "CDC_Data",
                        "type": "DatasetReference",
                        "parameters": {
                            "DataSetFileName": {
                                "value": "@pipeline().parameters.CDCFileName",
                                "type": "Expression"
                            }
                        }
                    }
                ],
                "outputs": [
                    {
                        "referenceName": "AzureSynapseAnalyticsTable1",
                        "type": "DatasetReference"
                    }
                ]
            },
            {
                "name": "CDC_Upsert_StoredProcedure",
                "type": "SqlPoolStoredProcedure",
                "dependsOn": [
                    {
                        "activity": "Copy_CDC_Data",
                        "dependencyConditions": [
                            "Succeeded"
                        ]
                    }
                ],
                "policy": {
                    "timeout": "7.00:00:00",
                    "retry": 0,
                    "retryIntervalInSeconds": 30,
                    "secureOutput": false,
                    "secureInput": false
                },
                "userProperties": [],
                "sqlPool": {
                    "referenceName": "cdcdemo",
                    "type": "SqlPoolReference"
                },
                "typeProperties": {
                    "storedProcedureName": "[dbo].[CdcdemoUpsert]"
                }
            }
        ],
        "parameters": {
            "CDCFileName": {
                "type": "string"
            }
        },
        "annotations": []
    },
    "type": "Microsoft.Synapse/workspaces/pipelines"
}

3. 创建 Data Factory Pipeline 触发条件,定义 Data Lake CDC 文件创建作为触发条件,其中 blobPathBeginWith 参数和 scope 参数替换为相应 Data Lake 存储参数值。

{
    "name": "CDCDemoTrigger",
    "properties": {
        "annotations": [],
        "runtimeState": "Stopped",
        "pipelines": [
            {
                "pipelineReference": {
                    "referenceName": "CDC_Pipeline",
                    "type": "PipelineReference"
                },
                "parameters": {
                    "CDCFileName": "@trigger().outputs.body.fileName"
                }
            }
        ],
        "type": "BlobEventsTrigger",
        "typeProperties": {
            "blobPathBeginsWith": "<datalakecdcfilepath>",
            "blobPathEndsWith": ".csv",
            "ignoreEmptyBlobs": true,
            "scope": "<datalakestorageaccountresourceid>",
            "events": [
                "Microsoft.Storage.BlobCreated"
            ]
        }
    }
}

 4. 通过在 Cosmos 中仿真数据变更操作,查看整个 Pipeline 工作日志       

        通过上述配置我们实现了通过 Data Factory 数据水线工具自动化完成 CDC 由数据湖导入 Data Warehouse 并更新 Data Warehouse 数据表格的工作。目前 Azure Synapse Analysis 处于 Preview 阶段,所以在内置的 Data Factory 中还不支持通过 Managed Identity 连接 SQL Pool, 且不支持 Blob Event Trigger Pipleline。Managed Identity 问题大家可以使用 ServicePrinciple 作为 Workaround, Blob Event Trigger 会在七月底支持,测试过程中大家可以通过手动触发的方式或者使用非 Synapse Analysis 内置 Data Factory 来实现相同逻辑。到此为止整个 Cosmos DB ChangeFeed 数据完整的处理流程已经完毕,大家回顾一下整个过程还是需要花些功夫才可以打通整个流程。下一篇 Blog 也是这个系列的最后一篇,将为大家介绍直通模式 Synapse Link 实现 Cosmos DB 一跳对接 Data Warehouse 的方案。

以上是关于Azure Synapse Analysis 开箱 Blog - 伍 -- Data Factory Data Pipeline Automation的主要内容,如果未能解决你的问题,请参考以下文章

Azure Synapse Sql 池未从 Azure Synapse Studio 数据流接收数据

Azure Synapse 无服务器 - Azure Synapse 无服务器池中的流加载镶木地板意外结束

Azure Synapse 专用 sql 池未在 Synapse Studio 中显示数据对象

Azure 分析服务与 Synapse 分析

Azure Synapse 管道:如何将增量更新从 SQL Server 移动到 Synapse 以处理数字

通过 Synapse Pipelines 执行 Azure Synapse Notebook 时访问 Key Vault 时出错