使用 Camel 并行处理大型 SQL 表

Posted

技术标签:

【中文标题】使用 Camel 并行处理大型 SQL 表【英文标题】:Parallel processing large SQL table with Camel 【发布时间】:2019-03-28 22:39:54 【问题描述】:

我每天尝试使用 Apache Camel 从 Informix 表中处理大约 700 万行,但我不知道如何完成。

我第一次尝试处理非常少的数据集(大约 50k 行)是使用 .split(body()).parallelProcessing(),如下所示:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryData").split(body()).parallelProcessing() // Essentially executes a query on my table and returns a list of MyTable.class
.bean(ProcessTable.class, "processData") // Converts each MyTable object into another type of object (NewData.class) for later processing, storing in them in a synchronized list
.end().to("direct:transform-data");

from("direct:transform-data")
.bean(ProcessNewData.class, "processNewData").split(body()).parallelProcessing() // Obtains list
.bean(AnalyzeData.class, "analyze") // Analyzes the data
.bean(PersistData.class, "persist") // Persists the new data on other tables
.end();

当我在.bean(QueryTable.class, "queryData").split(body()).parallelProcessing() 上尝试使用 500k 行时,这当然会导致“OutOfMemory”错误,因为它首先尝试在解析查询之前缓存查询中的所有数据。我尝试将 fetchSize 设置为 100 之类的值,但我得到了同样的错误,使用 maxRows 只会让我得到我指定的行数,而忽略其余的。

我的下一次尝试是使用 Camel 的组件之一,例如 sql-component 和 jdbc,并尝试使用 Splitter 在单独的线程中处理每一行,但我遇到了同样的问题。

sql:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryDataParams") // Gets the params for my query
.to("sql:SELECT * FROM my_table WHERE date_received BETWEEN :#startDate AND :#endDate?dataSource=dataSourceInformix").split(body()).parallelProcessing()
// The rest would be essentially the same

jdbc:

from("quartz2://transaccionesGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "queryString") // Gets the query to execute
.to("jdbc:dataSourceInformix").split(body()).parallelProcessing()

我的最后一次尝试是将maxMessagesPerPoll 用于sql,将outputType=StreamList 用于jdbc 组件,但不幸的是,前者一次只处理一行(而且它必须是消费者才能使用),并且后者给了我一个java.sql.SQLException: Cursor not open 异常。

sql:

from("sql:" + query +"?dataSource=dataSourceInformix&maxMessagesPerPoll=100") // I need to be able to use the quartz2 component

jdbc:

.to("jdbc:dataSourceInformix?outputType=StreamList").split(body()).streaming() // Throws exception

最终目标是能够在不消耗太多内存的情况下处理数百万行,从而防止“OutOfMemory”错误。如果可能的话,我的想法是执行以下操作:

    在石英 cron-trigger 上创建我的查询 获得并分组N个结果 发送一组结果以进行处理(在另一个线程中),同时获取另一组结果 重复直到处理完所有数据

我知道这个问题类似于this one,但答案并没有真正帮助我的情况。我还注意到,在 sql 组件的文档中,它有一个用于生产者的 outputType=StreamList 选项,但它是在 2.18 及更高版本上实现的,而我有 2.14.1 版本。

任何帮助和提示都会非常有帮助!

谢谢。

其他一些信息: 阿帕奇骆驼版本:2.14.1 数据库:Informix

【问题讨论】:

从文档 (camel.apache.org/sql-component.html) 看来,StreamList 工具可能是实现 Camel-native 的唯一方法。或者,您可以创建一个手动进行 SQL 查询的类,获取一个游标 (docs.oracle.com/javase/7/docs/api/java/sql/…),然后将批量(例如,一次 10k 行)交给另一个路由进行处理。您可以使用 seda 队列 (camel.apache.org/seda.html) 并设置最大大小并让您的自定义类休眠直到可用,这样您就不会阻塞下游。 @NotaJD 感谢您对 seda 队列的建议,我会调查的。至于光标,我尝试在 QueryTable 类中使用JdbcTemplate.fetchSize=100,但由于某种原因,该选项被忽略了。我还尝试通过查询限制结果,虽然它在前 2 次迭代中有效,但随后的迭代减慢了进程并出错。 是否为您的 JdbcTemplate 关闭了自动提交功能?我记得有些驱动程序在自动提交和流式传输结果方面存在问题。另外,您如何验证 fetchSize 被忽略?当您打开游标时,此参数会尝试修复返回的行数(例如,如果您的查询返回 1000 万行,它将一次从在线数据库中获取 100 行,无论如何这是理论上的)。 @NotaJD 当使用 jdbc 端点选项的resetAutoCommit=false 时,它不会给我一个错误。至于 fetchSize 的验证,我设置了一个非常小的大小并尝试使用它,但它从未通过执行并抛出错误。我尝试使用 RowCallbackHandler 并且这似乎有效,但不幸的是它给了我一个 GC overhead limit exceeded 异常,在我的列表中大约有 280k 条记录。 您能否使用“原始”JdbcTemplate 设置独立/隔离测试,执行查询并成功读取记录集中的每一行(即只需调用 while (rs.hasNext()) rs.下一个(); )。如果它是流式传输和游标(等),它不应该耗尽内存。 【参考方案1】:

经过大量研究、更多试验和错误以及来自 NotaJD 的 tips,我找到了一个可行的解决方案(仍在测试中)。实际上是做这 2 个解决方案,但它们只是执行类型不同。

信息:

为了便于解释,我将使用以下信息:

表有 700 万条记录(行) AggregationStrategyImpl 扩展 AggregationStrategy 具有以下内容: 在交换体中返回List<Object> Predicate 的聚合在 List<Object> >= 50000 时完成 聚合超时设置为30000毫秒 CustomThreadPool 是 Camel 的 ThreadPoolBuilder 类的伪实现: 池大小:100 MaxPoolSize: 50000 MaxQueueSize: 500 时间单位:毫秒 KeepAliveTime:30000 两种实现都在自动装配中

解决方案 1:

from("quartz2://myGroup/myTimerTransaction?cron=0+1+0+*+*+?")
.bean(QueryTable.class, "createQuery")

代码仍将在 Quartz cron-timer 上运行(每天 00:01),但这次我的 QueryTable.class 将获取要执行的正确查询(而不是 SELECT *,我现在指定了我需要的列)和将其设置为交换体。

.to("jdbc:dataSourceInformix?resetAutoCommit=false&outputType=StreamList").split(body()).streaming()
.bean(TransformRecord.class, "process")

Camel jdbc 组件将从交换体中获取查询,将 resetAutoCommit 设置为 false,这样它就不会抛出 Cursor not open 错误,将输出设置为流式传输并拆分流式执行,因此我不会一次查询所有记录,而是一一查询。然后将获取的每条记录通过TransformRecord.class 转换为适当的 POJO。

.aggregate(constant(true), aggregationStrategyImpl)
.completionPredicate(aggregationStrategyImpl.getCompletionPredicate())
.completionTimeout(aggregationStrategyImpl.getCompletionTimeout())
.to("direct:start-processing")
.end();

这次我使用aggregate 组件来创建记录列表。 aggregationStrategyImpl 包含聚合逻辑以及完成谓词和超时,因此当我达到一定数量的记录(或发生超时)时,列表将被发送到“direct:start-processing”。

有关此 Source Allies blog 和 Apache Camel Aggregate EIP 文档中的聚合实现的更多信息。

from("direct:start-processing")
.split(body()).executorService(customThreadPool.build(getContext()))
.bean(AnalyzeData.class, "analyze")
.bean(PersistData.class, "persist")
.end();

在这里,我拆分获得的列表并使用自定义 ThreadPool 创建 N 个线程来单独分析和处理每条记录。这样我就可以并行处理我的列表,而不是一个一个地处理。我本可以使用.split(body()).parallelProcessing(),但默认的 ThreadPool 设置以后可能不是最优的。

更多关于 Apache Camel Threading Model 文档、ThreadPool Configuration 注释和 Red Hat Threading Model 文档中的 ThreadPool 实现。

解决方案 2:

对于这个解决方案,它基本上是完全相同的执行,但有以下变化:

// .to("direct:start-processing")
.to("seda:start-processing?size=1&blockWhenFull=true")
.end();
// from("direct:start-processing")
from("seda:start-processing?size=1&blockWhenFull=true")
// continues normally

这样做的目的是将列表发送到异步进程,允许最多 1 个其他列表在内存中排队,如果队列已满,则暂停父线程。因此,父线程不会等待记录列表被处理,而是返回并收集另一批记录。这也意味着,如果处理路径没有完成,新记录不会被丢弃,父线程会一直等待,直到它可以将批处理发送到 SEDA 内存队列。

在GitHub 和他们的site 中的 Apache Camel SEDA 组件文档中了解有关 SEDA 组件的更多信息

结论:

使用解决方案 1,它应该需要更长的时间才能完成,因为它在从查询中收集更多记录之前首先处理所有数据,但是内存消耗应该更少,因为它是在聚合谓词中控制的。

使用解决方案 2,它应该更快,因为它在处理前一批时从查询中收集下一批记录,但是内存消耗会更大,因为它最多可以容纳 3 个列表:正在处理的列表, SEDA 队列中的一个和父线程收集的最新批次(队列满时暂停)。

我说我仍在测试这些解决方案,因为它可以使用 500k 记录,但我仍在为将要实现的服务器制定最佳 ThreadPool 设置。我研究过 Java 中的线程,但它似乎除了系统架构、RAM 和反复试验之外,真的没有什么可做的了。

【讨论】:

以上是关于使用 Camel 并行处理大型 SQL 表的主要内容,如果未能解决你的问题,请参考以下文章

并行处理nodejs中的多个文件

并行化大型动态程序

大型 Pandas Dataframe 并行处理

Sqoop 导入具有带 where 子句和并行处理的 SQL 查询

Sqoop导入具有带有where子句和并行处理的SQL查询

并行执行 oracle PL/SQL [重复]