ADF V2 - 基于表列参数化数据复制管道

Posted

技术标签:

【中文标题】ADF V2 - 基于表列参数化数据复制管道【英文标题】:ADF V2 - Parameterize a data copy pipeline based on a table column 【发布时间】:2019-02-22 06:16:48 【问题描述】:

使用 Azure 数据工厂 V2,通过门户

https://adf.azure.com

我创建了一个管道,用于从多个表中增量复制数据,从一个 Azure SQL 数据库到另一个 Azure SQL 数据库。

为了创建它,我根据需要调整了以下示例: Incrementally load data from multiple tables

以下是创建的管道相关的json文件:


"name": "IncrementalCopyPipeline",
"properties": 
    "activities": [
        
            "name": "IterateSQLTables",
            "type": "ForEach",
            "typeProperties": 
                "items": 
                    "value": "@pipeline().parameters.tableList",
                    "type": "Expression"
                ,
                "activities": [
                    
                        "name": "LookupOldWaterMarkActivity",
                        "type": "Lookup",
                        "policy": 
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false,
                            "secureInput": false
                        ,
                        "typeProperties": 
                            "source": 
                                "type": "SqlSource",
                                "sqlReaderQuery": 
                                    "value": "select * \nfrom watermarktable \nwhere TableName  =  '@item().TABLE_NAME'",
                                    "type": "Expression"
                                
                            ,
                            "dataset": 
                                "referenceName": "WatermarkDataset",
                                "type": "DatasetReference"
                            
                        
                    ,
                    
                        "name": "LookupNewWaterMarkActivity",
                        "type": "Lookup",
                        "policy": 
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false,
                            "secureInput": false
                        ,
                        "typeProperties": 
                            "source": 
                                "type": "SqlSource",
                                "sqlReaderQuery": 
                                    "value": "select MAX(@item().WaterMark_Column) as NewWatermarkvalue \nfrom @item().TABLE_NAME",
                                    "type": "Expression"
                                
                            ,
                            "dataset": 
                                "referenceName": "SourceDataset",
                                "type": "DatasetReference"
                            
                        
                    ,
                    
                        "name": "IncrementalCopyActivity",
                        "type": "Copy",
                        "dependsOn": [
                            
                                "activity": "LookupNewWaterMarkActivity",
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            ,
                            
                                "activity": "LookupOldWaterMarkActivity",
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            
                        ],
                        "policy": 
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false,
                            "secureInput": false
                        ,
                        "typeProperties": 
                            "source": 
                                "type": "SqlSource",
                                "sqlReaderQuery": 
                                    "value": "select * from @item().TABLE_NAME \nwhere @item().WaterMark_Column > '@activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue' and @item().WaterMark_Column <= '@activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue'",
                                    "type": "Expression"
                                
                            ,
                            "sink": 
                                "type": "SqlSink",
                                "writeBatchSize": 10000,
                                "sqlWriterStoredProcedureName": 
                                    "value": "@item().StoredProcedureNameForMergeOperation",
                                    "type": "Expression"
                                ,
                                "sqlWriterTableType": 
                                    "value": "@item().TableType",
                                    "type": "Expression"
                                
                            ,
                            "enableStaging": false,
                            "dataIntegrationUnits": 0
                        ,
                        "inputs": [
                            
                                "referenceName": "SourceDataset",
                                "type": "DatasetReference"
                            
                        ],
                        "outputs": [
                            
                                "referenceName": "SinkDataset",
                                "type": "DatasetReference",
                                "parameters": 
                                    "SinkTableName": "@item().TABLE_NAME"
                                
                            
                        ]
                    ,
                    
                        "name": "StoredProceduretoWriteWatermarkActivity",
                        "type": "SqlServerStoredProcedure",
                        "dependsOn": [
                            
                                "activity": "IncrementalCopyActivity",
                                "dependencyConditions": [
                                    "Succeeded"
                                ]
                            
                        ],
                        "policy": 
                            "timeout": "7.00:00:00",
                            "retry": 0,
                            "retryIntervalInSeconds": 30,
                            "secureOutput": false,
                            "secureInput": false
                        ,
                        "typeProperties": 
                            "storedProcedureName": "[dbo].[sp_write_watermark]",
                            "storedProcedureParameters": 
                                "LastModifiedtime": 
                                    "value": 
                                        "value": "@activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue",
                                        "type": "Expression"
                                    ,
                                    "type": "DateTime"
                                ,
                                "TableName": 
                                    "value": 
                                        "value": "@activity('LookupOldWaterMarkActivity').output.firstRow.TableName",
                                        "type": "Expression"
                                    ,
                                    "type": "String"
                                
                            
                        ,
                        "linkedServiceName": 
                            "referenceName": "SqlServerLinkedService_dest",
                            "type": "LinkedServiceReference"
                        
                    
                ]
            
        
    ],
    "parameters": 
        "tableList": 
            "type": "Object",
            "defaultValue": [
                
                    "TABLE_NAME": "customer_table",
                    "WaterMark_Column": "LastModifytime",
                    "TableType": "DataTypeforCustomerTable",
                    "StoredProcedureNameForMergeOperation": "sp_upsert_customer_table"
                ,
                
                    "TABLE_NAME": "project_table",
                    "WaterMark_Column": "Creationtime",
                    "TableType": "DataTypeforProjectTable",
                    "StoredProcedureNameForMergeOperation": "sp_upsert_project_table"
                
            ]
        
    


在我的表格中,我有一列可以区分不同的公司,因此我想在此管道中添加另一个参数。我有一张这样的桌子:

NAME    LASTMODIFY                 COMPANY
John    2015-01-01 00:00:00.000    1
Mike    2016-02-02 01:23:00.000    2
Andy    2017-03-04 05:16:00.000    3
Annie   2018-09-08 00:00:00.000    1

有人会知道如何在管道中插入参数以指定要复制哪个公司以及不复制哪个公司?

有什么建议吗?在此先感谢大家!

【问题讨论】:

【参考方案1】:

不完全清楚你在问什么,如果我错过了标记,请道歉,但是:

复制允许您使用存储过程来潜在地解决您的问题。看看这个例子:https://docs.microsoft.com/en-us/azure/data-factory/connector-sql-server#invoking-stored-procedure-for-sql-sink

它使用存储过程来合并执行取决于 JOIN 匹配的 UPDATE 或 INSERT。它还允许传递参数。

因此,如果您尝试仅基于参数复制某些情况,则 MERGE 连接可能会有所帮助。

【讨论】:

以上是关于ADF V2 - 基于表列参数化数据复制管道的主要内容,如果未能解决你的问题,请参考以下文章

Azure ADF 管道复制数据日志文件未触发第二个管道的“已创建 Blob”触发器

Azure 数据工厂 ADF 数据管道将文件名包含在将数据复制到 sql 数据库中

Azure ADF 管道无法连接到 Azure SQL

Azure 数据工厂 (ADF) 中的选择性部署?

如何在 Azure ADF Pipelines 中更改 ADF 接收器数据集 Parquet 文件数据类型

如何从 ADF 中的执行管道获取输出参数?