Azure Data PlatformETL工具(13)——ADF并行加载多个文件

Posted 發糞塗牆

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Azure Data PlatformETL工具(13)——ADF并行加载多个文件相关的知识,希望对你有一定的参考价值。

本文属于【Azure Data Platform】系列。
接上文:【Azure Data Platform】ETL工具(12)——ADF 参数
本文介绍ADF 的并行导入多个文件

前言

在一个数据类的IT项目中,从多个位置(云和本地或者云中很多存储)中的多个文件导入到一个数据库,比如Azure SQL DB,是很常见的需求。各种可变因素都会导致手工操作变得很困难。因此,借助上文的参数化,还有之前演示的一些内容,来个相对贴近实战的演示。

相对于前面用txt文件,csv文件更加常见,所以本文使用csv来做演示,但是在实际项目中,ADF的数据集并不需要关注到这么精确。毕竟混用的常见的,过度确定也将带来很多不便。不仅文件类型,连列的描述,数据类型等,也不需要非常明确地定义。可以通过占位符来替代一下文件之间的不同处,比如分隔符。

另外使用For Each 这个活动,可以遍历元数据并同时加载多个文件。把这些信息组合成在一起之后,可以创建一个以元数据驱动的管道来动态地加载多个文件。

环境准备

这里准备三个csv文件,第一个文件放10行数据,以“;”为分隔符。 第二个文件存放5行数据,以“;”为分隔符。第三个文件存放5行数据,为了模拟来自于其他系统,所以以逗号“,”为分隔符。并且序号是连续的。
Demo1.csv:

Demo2.csv:

Demo3.csv:

这里新建一个ADLS, 并且创建两个文件夹到container下面,一个叫douhao, 一个叫fenhao。


把文件分别上传到对应的文件夹里面。


然后在目标源(SQL DB)中创建一个配置表来存储这些关系:

create table config
(
 	ID INT IDENTITY(1,1) NOT NULL,
    SourceType VARCHAR(50) NOT NULL,
    ObjectName VARCHAR(500) NOT NULL,
    ObjectValue VARCHAR(1000) NOT NULL
)

初始化数据:

INSERT INTO dbo.[config] (	[SourceType]	,[ObjectName]	,ObjectValue	)
VALUES 
--文件路径
	   ('Container','fenhaoData','fenhao')
	  ,('Container','douhaoData','douhao')
--	分隔符 
	  ,('Delimiter','fenhaoData',';')
	  ,('Delimiter','douhaoData',',')
--	目标表名 
	  ,('SQLTable','fenhaoData','azure_fenhao')
	  ,('SQLTable','douhaoData','azure_douhao')

创建目标表:


CREATE TABLE [dbo].[azure_fenhao](
   [Index] [BIGINT] NULL,
   [Name] varchar(500) NULL
);
 
CREATE TABLE [dbo].[azure_douhao](
   [Index] [BIGINT] NULL,
   [Name] varchar(500) NULL
);

搭建ADF

在环境准备好只有,就开始ADF方面的开发了。

step 1 创建数据集

如果不用参数化,那就要创建4个数据集,2个针对ADLS上的数据源,2个针对SQL DB上的表。但是参数化之后,可以只使用2个数据集即可,一个针对ADLS的数据源,另外一个针对SQL DB上的表。在运行的时候从参数表中获取所需的信息。

跳过页首,并且只指定container即可:

上一文说过,参数化之后,架构部分应该清空:

然后在参数选项中配置参数, 创建两个参数,一个指定路径(path),一个指定分隔符(delimiter)

这次对文件路径中的目录进行动态化:


选择前面定义好的变量“path”

然后配置列分隔符,注意要先点击【编辑】,才能出现“添加动态内容”,不然就只能是选择:

配置好之后如下:

源配置好了,接下来配置目的地的数据集,也就是这里的SQL DB。前面已经演示过很多次,这里就逐步演示。


在这个数据集中新建一个参数,参数名:Tablename


回到连接页,点击编辑后,对表名进行动态化,如下图,这里架构我们使用dbo这个默认架构,如果也需要动态,则还需要第三个变量。


结果如下:

现在,已经配置好了源和目标。接下来就是如何使用配置表了。前面提到过,要让这些一起运行,需要使用pipeline,所以接下来就是创建pipeline。

Step 2 配置管道

首先我们需要读取配置表,相应的ADF 活动就是lookup。 中文版叫“查找”

我们可以使用这个SQL查询信息:

SELECT
     b.[ObjectName]
    ,FolderName = b.[ObjectValue]
    ,SQLTable   = s.[ObjectValue]
    ,Delimiter  = d.[ObjectValue]
FROM [dbo].[config] b
JOIN [dbo].[config] s ON b.[ObjectName] = s.[ObjectName]
JOIN [dbo].[config] d ON b.[ObjectName] = d.[ObjectName]
WHERE   b.[SourceType] = 'Container'
    AND s.[SourceType] = 'SQLTable'
    AND d.[SourceType] = 'Delimiter';


因为我们设置了变量,虽然上面的SQL可以直接用,但是配置里面,一旦不传入点什么值,就会报错:


一旦设置了一个值,点击预览数据就可以看到数据,注意取消勾选【仅第一行】:

然后添加“ForEach”活动,并且通过拖动Lookup的绿色图标连到ForEach 活动中:


然后配置ForEach, 在下图中,因为这里的输入来自于Lookup的输出,所以可以直接获取。但是在最后,要手动添加“.value”:

对ForEach添加活动:

点了这个新增之后,会出现新的界面,如果想退回去前面的界面,点击下面红色长方形的部分即可。接下来我们拖动【复制数据】活动进去:


配置如下, 其中path参数和delimiter参数对应的@item().ObjectValue的ObjectValue是来源于Lookup的输出:

配置完后,进行调试:

实际上有两个并行操作:

1.ForEach 循环将并行分别处理每个 ADLS上的文件夹(分号文件与逗号文件)。
2.在 ForEach 循环的迭代中,CopyData 活动本身将并行处理在一个文件夹中找到的所有 ADLS文件(使用分号数据找到 2 个文件)。

总结

通过这种元数据驱动,可以基本上只更改配置表中的配置信息,即可应对同一行为的大量文件。

以上是关于Azure Data PlatformETL工具(13)——ADF并行加载多个文件的主要内容,如果未能解决你的问题,请参考以下文章

Azure Data PlatformETL工具(19)——Azure Databricks

Azure Data PlatformETL工具(20)——创建Azure Databricks

Azure Data PlatformETL工具(20)——创建Azure Databricks

Azure Data PlatformETL工具——重新认识Azure Data Factory

Azure Data PlatformETL工具(11)——ADF 数据流

Azure Data PlatformETL工具——使用Azure Data Factory数据流传输数据