从 20 个相关表中获取数据(通过 id),将它们组合到一个 json 文件并为此利用 spring 批处理
Posted
技术标签:
【中文标题】从 20 个相关表中获取数据(通过 id),将它们组合到一个 json 文件并为此利用 spring 批处理【英文标题】:Fetch data from 20 related tables (through id), combine them to a json File and leverage spring batch for this 【发布时间】:2015-09-22 15:43:13 【问题描述】:我在 SQL Server 中有一个 Person 数据库,其中包含地址、许可证、亲属等表,其中大约 20 个。所有表格都有每个人唯一的 id 参数。这些表中有数百万条记录。我需要使用他们的公共 id 参数组合该人的这些记录,并转换为带有一些列名更改的 json 表文件。然后这个 json 文件通过生产者被推送到 kafka。如果我可以将 kafka 生产者作为项目编写器的示例 - 很好,但真正的问题是了解如何利用 spring batch 项目阅读器、处理器和项目编写器创建复合材料的策略和细节json 文件。这是我的第一个 Spring 批处理应用程序,所以我对此比较陌生。
我希望对使用复合阅读器或处理器的实施策略提出建议,以使用人员ID作为光标,并使用每个表的ID查询每个表,将结果记录转换为json并将其聚合为复合,带有根表 PersonData 的关系 json 文件,该文件提供给 kafka 集群。
基本上我有一个数据源,相同的数据库供读者使用。我计划使用 Person 表来获取该人唯一的 id 和其他记录,并使用 id 作为其他 19 个表的 where 子句。将表中的每个结果集转换为json,最后合成json对象写入kafka。
【问题讨论】:
你试过你的选择了吗?您的具体问题是什么? 我猜似乎不鼓励存储过程选项,并且巨大的连接查询选项也出来了,所以我编辑了我的问题,具体问题是如何创建查询所有表和构建的复合项目阅读器为 PersonData 生成一个复杂的 json 文件 【参考方案1】:我们在一个项目中有这样一个需求,用下面的方法解决了。
在并行运行的 Splitflow 中,我们有一个步骤 for ever 表,它加载文件中表的数据,按公共 id 排序(这是可选的,但如果您有文件中的数据)。
然后我们实现了我们自己的“MergeReader”。 这个合并阅读器对每个文件/表都有 FlatFileItemReaders(我们称它们为 dataReaders)。所有这些 FlatFileItemReader 都用 SingleItemPeekableItemReader 包装。 MergeReader的read方法的逻辑如下:
public MyContainerPerId read()
// you need a container to store the items, that belong together
MyContainerPerId container = new MyContainerPerId();
// peek through all "dataReaders" to find the lowest actual key
int lowestId = searchLowestKey();
for (Reader dataReader : dataReaders)
// I assume, that more than one entry in a table can belong to
// the same person id
wihile (dataReader.peek().getId() == lowestId)
container.add(dataReader.read());
// the container contains all entries from all tables
// belonging to the same person id
return container;
如果您需要重新启动功能,您可以以某种方式实现 ItemStream,它会跟踪每个 dataReader 的当前读取位置。
【讨论】:
【参考方案2】:我使用here 中描述的Driving Query Based ItemReaders 使用模式来解决这个问题。
Reader:只是 JdbcCursoritemReader 的默认实现,带有 sql 来获取 唯一的关系 id(例如 select id from person -)
处理器:使用这个长 id 作为输入和我实现的一个 dao 来自 spring 的 jdbcTemplate 通过针对每个的查询来获取数据 特定 id 的表(例如 select * from license where id=)并将结果以列表格式映射到 POJO Person - 然后转换为 json 对象(使用 Jackson),然后转换为 字符串
Writer:要么用 json 字符串写出文件,要么将 json 字符串发布到 使用kafka时的主题
【讨论】:
【参考方案3】:我们进行了类似的练习,将多个表中的 1 亿多行迁移为 JSON 格式,以便我们可以将其发布到消息总线。
这个想法是创建一个视图,对数据进行反规范化并使用 JdbcPagingItemReader 从该视图读取。从一个源读取开销更少。
当您对数据进行非规范化时,请确保您没有为主表获得多行。
示例 - SQL 服务器 -
create or alter view viewName as
select master.col1 , master.col2,
(select dep1.col1,
dep1.col2
from dependent1 dep1
where dep1.col3 = master.col3 for json path
) as dep1
from master master;
上面将为您提供一个json字符串中的依赖表数据,每个主表数据一行。检索数据后,您可以使用 GSON 或 Jackson 将其转换为 POJO。
我们试图避免使用 JdbcCursoritemReader,因为它会拉取内存中的所有数据并从中一一读取。它不支持分页。
【讨论】:
以上是关于从 20 个相关表中获取数据(通过 id),将它们组合到一个 json 文件并为此利用 spring 批处理的主要内容,如果未能解决你的问题,请参考以下文章