使用 JdbcTemplate 和 BlockingQueue 同时提取和插入数据库
Posted
技术标签:
【中文标题】使用 JdbcTemplate 和 BlockingQueue 同时提取和插入数据库【英文标题】:Extract and Insert concurrently into database using JdbcTemplate and BlockingQueue 【发布时间】:2013-05-09 18:47:14 【问题描述】:我正在从一个数据库表中提取数千行并插入到另一个数据库表中。我不想将所有记录加载到内存中,然后插入到另一个数据库中。
因此,我尝试使用 BlockingQueue 使用一个线程加载提取器结果,并使用另一个线程同时插入另一个数据库。我正在使用 Spring JdbcTemplate 来访问我的数据库。
这是我的计划
public void performExtractionInsertion(JdbcTemplate inboundJdbcTemplate, JdbcTemplate outboundJdbcTemplate)
final BlockingQueue queue = new LinkedBlockingQueue<Transaction>(50);
ExecutorService executor = Executors.newFixedThreadPool(2);
final String SELECT_QUERY = "SELECT acc_number, date, type FROM transactions";
final String INSERT_QUERY = "INSERT INTO analysis(col1, col2, col3) VALUES(?,?,?)";
executor.execute(new Runnable()
@Override
public void run()
queue.put(/*IMPLEMENTATION OF EXTRACTOR USING inboundJdbcTemplate*/);
);
executor.execute(new Runnable()
@Override
public void run()
queue.take(/*IMPLEMENTATION OF INSERTER USING outboundJdbcTemplate*/)
);
有人可以告诉我如何实现 EXTRACTOR 和 INSERTER 以便它们使用相同的 BlockingQueue 来限制内存中的行数吗?
这是正确的方法吗?我还能使用 jdbcTemplate 吗? 最智能、最方便的方法是什么?
谢谢大家
顺便说一句,Transaction 是要保存要插入的提取元素的对象的类。
【问题讨论】:
【参考方案1】:我也遇到过同样的情况(设置略有不同)。我不是为了 JdbcTemplate,而是为了 MappingSQLQuery。我认为,至少对于查询它更适合我的需要。如果您愿意更改,代码可能如下所示
MappingSQLQuery selector = ...;
executor.execute(new Runnable()
public void run()
List<WrapObject> list = selector.execute();
for (WrapObject object : list)
while (!queue.offer(object))
Thread.sleep(100);
while (!queue.offer(WrapObject.NULL_OBJECT))
Thread.sleep(100);
);
executor.execute(new Runnable()
public void run()
WrapObject object;
while ((object = queue.take) != WrapObject.NULL_OBJECT)
outboundJdbcTemplate.update(INSERT_QUERY, object.getParam1(),...)
);
给定一个合适的 WrapObject 定义,这应该可以解决问题。
通过大量使用性能关键型数据库系统,我发现了以下两点。
通常,重新实现 Spring 映射器可以让您更好地控制数据库发生的事情(尤其是批量更新、PreparedStatements 的编译时间、设置批量大小)
1234563显着提升您的应用程序根据您的数据库,您可能希望使用多个读取器/写入器。我曾使用 Oracle-Clusters,其中八个并行读取进程甚至不会开始给硬件带来很大的负载
【讨论】:
选择器不会在你执行的时候把所有的行都添加到内存中吗? 不,只有选择器;y 将 ResultSet 的fetchSize
(默认值 Oracle:10,mysql:1)元组提取到内存中。一旦到达预取列表的末尾,Fetch 会自动扩展。
所以你是说 WrapObject 的列表中充满了 fetchSize 而不是 WrapObjects?我认为列表中充满了从查询返回的所有对象。然后将列表中的每个对象放入队列中。如果不是这样,您能否澄清一下?
另外一点,BlockingQueue 不允许插入空对象。看起来您正试图防止队列下溢。但是,BlockingQueue 的实现保持自身防止上溢和下溢。如果不是这种情况,我希望得到澄清。再次感谢。
我已经修改了代码,使它可以处理 BlockingQueues。现在,您的内存消耗如下:您的ResultSet
中最多有fetchSize
元组和outboundJdbcTemplate
中的一些元素。以上是关于使用 JdbcTemplate 和 BlockingQueue 同时提取和插入数据库的主要内容,如果未能解决你的问题,请参考以下文章
Spring中使用JdbcTemplate和HibernateTemplate的数据库操作
带有 TransactionTemplate 和 Connection Pool 的 JDBCTemplate,使用哪个数据源