使用 JdbcTemplate 和 BlockingQueue 同时提取和插入数据库

Posted

技术标签:

【中文标题】使用 JdbcTemplate 和 BlockingQueue 同时提取和插入数据库【英文标题】:Extract and Insert concurrently into database using JdbcTemplate and BlockingQueue 【发布时间】:2013-05-09 18:47:14 【问题描述】:

我正在从一个数据库表中提取数千行并插入到另一个数据库表中。我不想将所有记录加载到内存中,然后插入到另一个数据库中。

因此,我尝试使用 BlockingQueue 使用一个线程加载提取器结果,并使用另一个线程同时插入另一个数据库。我正在使用 Spring JdbcTemplate 来访问我的数据库。

这是我的计划

    public void performExtractionInsertion(JdbcTemplate inboundJdbcTemplate, JdbcTemplate outboundJdbcTemplate)

        final BlockingQueue queue = new LinkedBlockingQueue<Transaction>(50);
        ExecutorService executor = Executors.newFixedThreadPool(2);
        final String SELECT_QUERY = "SELECT acc_number, date, type  FROM transactions";
        final String INSERT_QUERY = "INSERT INTO analysis(col1, col2, col3) VALUES(?,?,?)";

        executor.execute(new Runnable() 
            @Override
            public void run() 
                    queue.put(/*IMPLEMENTATION OF EXTRACTOR USING inboundJdbcTemplate*/);
            
        );
        executor.execute(new Runnable() 
            @Override
            public void run() 
                queue.take(/*IMPLEMENTATION OF INSERTER USING outboundJdbcTemplate*/)
            
        );

有人可以告诉我如何实现 EXTRACTOR 和 INSERTER 以便它们使用相同的 BlockingQueue 来限制内存中的行数吗?

这是正确的方法吗?我还能使用 jdbcTemplate 吗? 最智能、最方便的方法是什么?

谢谢大家

顺便说一句,Transaction 是要保存要插入的提取元素的对象的类。

【问题讨论】:

【参考方案1】:

我也遇到过同样的情况(设置略有不同)。我不是为了 JdbcTemplate,而是为了 MappingSQLQuery。我认为,至少对于查询它更适合我的需要。如果您愿意更改,代码可能如下所示

MappingSQLQuery selector = ...;       
executor.execute(new Runnable() 
  public void run() 
    List<WrapObject> list = selector.execute();
    for (WrapObject object : list) 
      while (!queue.offer(object)) 
        Thread.sleep(100);
      
    
    while (!queue.offer(WrapObject.NULL_OBJECT)) 
      Thread.sleep(100);
    
  
);
executor.execute(new Runnable() 
  public void run() 
    WrapObject object;
    while ((object = queue.take) != WrapObject.NULL_OBJECT) 
      outboundJdbcTemplate.update(INSERT_QUERY, object.getParam1(),...)
    
  
);

给定一个合适的 WrapObject 定义,这应该可以解决问题。

通过大量使用性能关键型数据库系统,我发现了以下两点。

    通常,重新实现 Spring 映射器可以让您更好地控制数据库发生的事情(尤其是批量更新、PreparedStatements 的编译时间、设置批量大小)

    1234563显着提升您的应用程序

    根据您的数据库,您可能希望使用多个读取器/写入器。我曾使用 Oracle-Clusters,其中八个并行读取进程甚至不会开始给硬件带来很大的负载

【讨论】:

选择器不会在你执行的时候把所有的行都添加到内存中吗? 不,只有选择器;y 将 ResultSet 的 fetchSize(默认值 Oracle:10,mysql:1)元组提取到内存中。一旦到达预取列表的末尾,Fetch 会自动扩展。 所以你是说 WrapObject 的列表中充满了 fetchSize 而不是 WrapObjects?我认为列表中充满了从查询返回的所有对象。然后将列表中的每个对象放入队列中。如果不是这样,您能否澄清一下? 另外一点,BlockingQueue 不允许插入空对象。看起来您正试图防止队列下溢。但是,BlockingQueue 的实现保持自身防止上溢和下溢。如果不是这种情况,我希望得到澄清。再次感谢。 我已经修改了代码,使它可以处理 BlockingQueues。现在,您的内存消耗如下:您的ResultSet 中最多有fetchSize 元组和outboundJdbcTemplate 中的一些元素。

以上是关于使用 JdbcTemplate 和 BlockingQueue 同时提取和插入数据库的主要内容,如果未能解决你的问题,请参考以下文章

Spring基础:JDBCTemplate的使用

Spring的jdbcTemplate使用

Spring中使用JdbcTemplate和HibernateTemplate的数据库操作

Spring--JdbcTemplate

带有 TransactionTemplate 和 Connection Pool 的 JDBCTemplate,使用哪个数据源

Spring中使用JdbcTemplate