Azure Data PlatformETL工具(13)——ADF并行加载多个文件
Posted 發糞塗牆
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Azure Data PlatformETL工具(13)——ADF并行加载多个文件相关的知识,希望对你有一定的参考价值。
本文属于【Azure Data Platform】系列。
接上文:【Azure Data Platform】ETL工具(12)——ADF 参数
本文介绍ADF 的并行导入多个文件
前言
在一个数据类的IT项目中,从多个位置(云和本地或者云中很多存储)中的多个文件导入到一个数据库,比如Azure SQL DB,是很常见的需求。各种可变因素都会导致手工操作变得很困难。因此,借助上文的参数化,还有之前演示的一些内容,来个相对贴近实战的演示。
相对于前面用txt文件,csv文件更加常见,所以本文使用csv来做演示,但是在实际项目中,ADF的数据集并不需要关注到这么精确。毕竟混用的常见的,过度确定也将带来很多不便。不仅文件类型,连列的描述,数据类型等,也不需要非常明确地定义。可以通过占位符来替代一下文件之间的不同处,比如分隔符。
另外使用For Each 这个活动,可以遍历元数据并同时加载多个文件。把这些信息组合成在一起之后,可以创建一个以元数据驱动的管道来动态地加载多个文件。
环境准备
这里准备三个csv文件,第一个文件放10行数据,以“;”为分隔符。 第二个文件存放5行数据,以“;”为分隔符。第三个文件存放5行数据,为了模拟来自于其他系统,所以以逗号“,”为分隔符。并且序号是连续的。
Demo1.csv:
Demo2.csv:
Demo3.csv:
这里新建一个ADLS, 并且创建两个文件夹到container下面,一个叫douhao, 一个叫fenhao。
把文件分别上传到对应的文件夹里面。
然后在目标源(SQL DB)中创建一个配置表来存储这些关系:
create table config
(
ID INT IDENTITY(1,1) NOT NULL,
SourceType VARCHAR(50) NOT NULL,
ObjectName VARCHAR(500) NOT NULL,
ObjectValue VARCHAR(1000) NOT NULL
)
初始化数据:
INSERT INTO dbo.[config] ( [SourceType] ,[ObjectName] ,ObjectValue )
VALUES
--文件路径
('Container','fenhaoData','fenhao')
,('Container','douhaoData','douhao')
-- 分隔符
,('Delimiter','fenhaoData',';')
,('Delimiter','douhaoData',',')
-- 目标表名
,('SQLTable','fenhaoData','azure_fenhao')
,('SQLTable','douhaoData','azure_douhao')
创建目标表:
CREATE TABLE [dbo].[azure_fenhao](
[Index] [BIGINT] NULL,
[Name] varchar(500) NULL
);
CREATE TABLE [dbo].[azure_douhao](
[Index] [BIGINT] NULL,
[Name] varchar(500) NULL
);
搭建ADF
在环境准备好只有,就开始ADF方面的开发了。
step 1 创建数据集
如果不用参数化,那就要创建4个数据集,2个针对ADLS上的数据源,2个针对SQL DB上的表。但是参数化之后,可以只使用2个数据集即可,一个针对ADLS的数据源,另外一个针对SQL DB上的表。在运行的时候从参数表中获取所需的信息。
跳过页首,并且只指定container即可:
上一文说过,参数化之后,架构部分应该清空:
然后在参数选项中配置参数, 创建两个参数,一个指定路径(path),一个指定分隔符(delimiter)
这次对文件路径中的目录进行动态化:
选择前面定义好的变量“path”
然后配置列分隔符,注意要先点击【编辑】,才能出现“添加动态内容”,不然就只能是选择:
配置好之后如下:
源配置好了,接下来配置目的地的数据集,也就是这里的SQL DB。前面已经演示过很多次,这里就逐步演示。
在这个数据集中新建一个参数,参数名:Tablename
回到连接页,点击编辑后,对表名进行动态化,如下图,这里架构我们使用dbo这个默认架构,如果也需要动态,则还需要第三个变量。
结果如下:
现在,已经配置好了源和目标。接下来就是如何使用配置表了。前面提到过,要让这些一起运行,需要使用pipeline,所以接下来就是创建pipeline。
Step 2 配置管道
首先我们需要读取配置表,相应的ADF 活动就是lookup。 中文版叫“查找”
我们可以使用这个SQL查询信息:
SELECT
b.[ObjectName]
,FolderName = b.[ObjectValue]
,SQLTable = s.[ObjectValue]
,Delimiter = d.[ObjectValue]
FROM [dbo].[config] b
JOIN [dbo].[config] s ON b.[ObjectName] = s.[ObjectName]
JOIN [dbo].[config] d ON b.[ObjectName] = d.[ObjectName]
WHERE b.[SourceType] = 'Container'
AND s.[SourceType] = 'SQLTable'
AND d.[SourceType] = 'Delimiter';
因为我们设置了变量,虽然上面的SQL可以直接用,但是配置里面,一旦不传入点什么值,就会报错:
一旦设置了一个值,点击预览数据就可以看到数据,注意取消勾选【仅第一行】:
然后添加“ForEach”活动,并且通过拖动Lookup的绿色图标连到ForEach 活动中:
然后配置ForEach, 在下图中,因为这里的输入来自于Lookup的输出,所以可以直接获取。但是在最后,要手动添加“.value”:
对ForEach添加活动:
点了这个新增之后,会出现新的界面,如果想退回去前面的界面,点击下面红色长方形的部分即可。接下来我们拖动【复制数据】活动进去:
配置如下, 其中path参数和delimiter参数对应的@item().ObjectValue的ObjectValue是来源于Lookup的输出:
配置完后,进行调试:
实际上有两个并行操作:
1.ForEach 循环将并行分别处理每个 ADLS上的文件夹(分号文件与逗号文件)。
2.在 ForEach 循环的迭代中,CopyData 活动本身将并行处理在一个文件夹中找到的所有 ADLS文件(使用分号数据找到 2 个文件)。
总结
通过这种元数据驱动,可以基本上只更改配置表中的配置信息,即可应对同一行为的大量文件。
以上是关于Azure Data PlatformETL工具(13)——ADF并行加载多个文件的主要内容,如果未能解决你的问题,请参考以下文章
Azure Data PlatformETL工具(19)——Azure Databricks
Azure Data PlatformETL工具(20)——创建Azure Databricks
Azure Data PlatformETL工具(20)——创建Azure Databricks
Azure Data PlatformETL工具——重新认识Azure Data Factory