ADF V2 - 基于表列参数化数据复制管道
Posted
技术标签:
【中文标题】ADF V2 - 基于表列参数化数据复制管道【英文标题】:ADF V2 - Parameterize a data copy pipeline based on a table column 【发布时间】:2019-02-22 06:16:48 【问题描述】:使用 Azure 数据工厂 V2,通过门户
https://adf.azure.com
我创建了一个管道,用于从多个表中增量复制数据,从一个 Azure SQL 数据库到另一个 Azure SQL 数据库。
为了创建它,我根据需要调整了以下示例: Incrementally load data from multiple tables
以下是创建的管道相关的json文件:
"name": "IncrementalCopyPipeline",
"properties":
"activities": [
"name": "IterateSQLTables",
"type": "ForEach",
"typeProperties":
"items":
"value": "@pipeline().parameters.tableList",
"type": "Expression"
,
"activities": [
"name": "LookupOldWaterMarkActivity",
"type": "Lookup",
"policy":
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
,
"typeProperties":
"source":
"type": "SqlSource",
"sqlReaderQuery":
"value": "select * \nfrom watermarktable \nwhere TableName = '@item().TABLE_NAME'",
"type": "Expression"
,
"dataset":
"referenceName": "WatermarkDataset",
"type": "DatasetReference"
,
"name": "LookupNewWaterMarkActivity",
"type": "Lookup",
"policy":
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
,
"typeProperties":
"source":
"type": "SqlSource",
"sqlReaderQuery":
"value": "select MAX(@item().WaterMark_Column) as NewWatermarkvalue \nfrom @item().TABLE_NAME",
"type": "Expression"
,
"dataset":
"referenceName": "SourceDataset",
"type": "DatasetReference"
,
"name": "IncrementalCopyActivity",
"type": "Copy",
"dependsOn": [
"activity": "LookupNewWaterMarkActivity",
"dependencyConditions": [
"Succeeded"
]
,
"activity": "LookupOldWaterMarkActivity",
"dependencyConditions": [
"Succeeded"
]
],
"policy":
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
,
"typeProperties":
"source":
"type": "SqlSource",
"sqlReaderQuery":
"value": "select * from @item().TABLE_NAME \nwhere @item().WaterMark_Column > '@activity('LookupOldWaterMarkActivity').output.firstRow.WatermarkValue' and @item().WaterMark_Column <= '@activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue'",
"type": "Expression"
,
"sink":
"type": "SqlSink",
"writeBatchSize": 10000,
"sqlWriterStoredProcedureName":
"value": "@item().StoredProcedureNameForMergeOperation",
"type": "Expression"
,
"sqlWriterTableType":
"value": "@item().TableType",
"type": "Expression"
,
"enableStaging": false,
"dataIntegrationUnits": 0
,
"inputs": [
"referenceName": "SourceDataset",
"type": "DatasetReference"
],
"outputs": [
"referenceName": "SinkDataset",
"type": "DatasetReference",
"parameters":
"SinkTableName": "@item().TABLE_NAME"
]
,
"name": "StoredProceduretoWriteWatermarkActivity",
"type": "SqlServerStoredProcedure",
"dependsOn": [
"activity": "IncrementalCopyActivity",
"dependencyConditions": [
"Succeeded"
]
],
"policy":
"timeout": "7.00:00:00",
"retry": 0,
"retryIntervalInSeconds": 30,
"secureOutput": false,
"secureInput": false
,
"typeProperties":
"storedProcedureName": "[dbo].[sp_write_watermark]",
"storedProcedureParameters":
"LastModifiedtime":
"value":
"value": "@activity('LookupNewWaterMarkActivity').output.firstRow.NewWatermarkvalue",
"type": "Expression"
,
"type": "DateTime"
,
"TableName":
"value":
"value": "@activity('LookupOldWaterMarkActivity').output.firstRow.TableName",
"type": "Expression"
,
"type": "String"
,
"linkedServiceName":
"referenceName": "SqlServerLinkedService_dest",
"type": "LinkedServiceReference"
]
],
"parameters":
"tableList":
"type": "Object",
"defaultValue": [
"TABLE_NAME": "customer_table",
"WaterMark_Column": "LastModifytime",
"TableType": "DataTypeforCustomerTable",
"StoredProcedureNameForMergeOperation": "sp_upsert_customer_table"
,
"TABLE_NAME": "project_table",
"WaterMark_Column": "Creationtime",
"TableType": "DataTypeforProjectTable",
"StoredProcedureNameForMergeOperation": "sp_upsert_project_table"
]
在我的表格中,我有一列可以区分不同的公司,因此我想在此管道中添加另一个参数。我有一张这样的桌子:
NAME LASTMODIFY COMPANY
John 2015-01-01 00:00:00.000 1
Mike 2016-02-02 01:23:00.000 2
Andy 2017-03-04 05:16:00.000 3
Annie 2018-09-08 00:00:00.000 1
有人会知道如何在管道中插入参数以指定要复制哪个公司以及不复制哪个公司?
有什么建议吗?在此先感谢大家!
【问题讨论】:
【参考方案1】:不完全清楚你在问什么,如果我错过了标记,请道歉,但是:
复制允许您使用存储过程来潜在地解决您的问题。看看这个例子:https://docs.microsoft.com/en-us/azure/data-factory/connector-sql-server#invoking-stored-procedure-for-sql-sink
它使用存储过程来合并执行取决于 JOIN 匹配的 UPDATE 或 INSERT。它还允许传递参数。
因此,如果您尝试仅基于参数复制某些情况,则 MERGE 连接可能会有所帮助。
【讨论】:
以上是关于ADF V2 - 基于表列参数化数据复制管道的主要内容,如果未能解决你的问题,请参考以下文章
Azure ADF 管道复制数据日志文件未触发第二个管道的“已创建 Blob”触发器
Azure 数据工厂 ADF 数据管道将文件名包含在将数据复制到 sql 数据库中