JPA:迭代大型结果集的正确模式是啥?

Posted

技术标签:

【中文标题】JPA:迭代大型结果集的正确模式是啥?【英文标题】:JPA: what is the proper pattern for iterating over large result sets?JPA:迭代大型结果集的正确模式是什么? 【发布时间】:2011-07-01 08:34:25 【问题描述】:

假设我有一个包含数百万行的表。使用 JPA,迭代针对该表的查询的正确方法是什么,以便 我没有包含数百万个对象的所有内存列表

例如,如果表很大,我怀疑以下内容会爆炸:

List<Model> models = entityManager().createQuery("from Model m", Model.class).getResultList();

for (Model model : models)

     System.out.println(model.getId());

分页(循环和手动更新setFirstResult()/setMaxResult())真的是最好的解决方案吗?

编辑:我定位的主要用例是一种批处理作业。如果需要很长时间运行,那很好。不涉及网络客户端;我只需要为每一行“做一些事情”,一次一个(或一些小的 N)。我只是想避免同时将它们全部存储在内存中。

【问题讨论】:

您使用的是什么数据库和 JDBC 驱动程序? 【参考方案1】:

Java Persistence with Hibernate 的第 537 页给出了使用 ScrollableResults 的解决方案,但可惜它仅适用于 Hibernate。

所以看来使用setFirstResult/setMaxResults 和手动迭代确实是必要的。这是我使用 JPA 的解决方案:

private List<Model> getAllModelsIterable(int offset, int max)

    return entityManager.createQuery("from Model m", Model.class).setFirstResult(offset).setMaxResults(max).getResultList();

然后,像这样使用它:

private void iterateAll()

    int offset = 0;

    List<Model> models;
    while ((models = Model.getAllModelsIterable(offset, 100)).size() > 0)
    
        entityManager.getTransaction().begin();
        for (Model model : models)
        
            log.info("do something with model: " + model.getId());
        

        entityManager.flush();
        entityManager.clear();
        em.getTransaction().commit();
        offset += models.size();
    

【讨论】:

我认为如果在批处理过程中有新的插入,这个例子是不安全的。用户必须根据确定新插入的数据位于结果列表末尾的列进行排序。 当当前页面是最后一页并且少于 100 个元素时,检查 size() == 100 会跳过一个返回空列表的额外查询【参考方案2】:

我尝试了此处提供的答案,但 JBoss 5.1 + mysql Connector/J 5.1.15 + Hibernate 3.3.2 不适用于这些答案。我们刚刚从 JBoss 4.x 迁移到 JBoss 5.1,所以我们暂时坚持使用它,因此我们可以使用的最新 Hibernate 是 3.3.2。

添加几个额外的参数就可以了,这样的代码在没有 OOME 的情况下运行:

        StatelessSession session = ((Session) entityManager.getDelegate()).getSessionFactory().openStatelessSession();

        Query query = session
                .createQuery("SELECT a FROM Address a WHERE .... ORDER BY a.id");
        query.setFetchSize(Integer.valueOf(1000));
        query.setReadOnly(true);
        query.setLockMode("a", LockMode.NONE);
        ScrollableResults results = query.scroll(ScrollMode.FORWARD_ONLY);
        while (results.next()) 
            Address addr = (Address) results.get(0);
            // Do stuff
        
        results.close();
        session.close();

关键行是createQuery 和scroll 之间的查询参数。没有它们,“滚动”调用会尝试将所有内容加载到内存中,并且永远不会完成或运行到 OutOfMemoryError。

【讨论】:

您好 Zds,您扫描数百万行的用例对我来说当然很常见,感谢您发布最终代码。在我的情况下,我将记录推入 Solr,以索引它们以进行全文搜索。而且,由于我不会涉及的业务规则,我需要通过 Hibernate,而不是仅使用 JDBC 或 Solr 的内置模块。 乐于助人:-)。我们也在处理大型数据集,在这种情况下,允许用户查询同一城市/县甚至州内的所有街道名称,因此创建索引需要读取大量数据。 出现在 MySQL 中,您确实必须经历所有这些障碍:***.com/a/20900045/32453(我想其他 DB 可能不那么严格......)【参考方案3】:

你不能在直接的 JPA 中真正做到这一点,但是 Hibernate 支持无状态会话和可滚动的结果集。

在它的帮助下,我们经常处理 数十亿 行。

这里是文档链接:http://docs.jboss.org/hibernate/core/3.3/reference/en/html/batch.html#batch-statelesssession

【讨论】:

谢谢。很高兴知道有人正在通过 Hibernate 执行数十亿行。这里的一些人声称这是不可能的。 :-) 可以在这里添加一个例子吗?我认为它类似于 Zds 的示例?【参考方案4】:

说实话,我建议离开 JPA 并坚持使用 JDBC(但肯定使用JdbcTemplate 支持类或类似的东西)。 JPA(和其他 ORM 提供程序/规范)并非旨在对一个事务中的多个对象进行操作,因为它们假定加载的所有内容都应保留在一级缓存中(因此在 JPA 中需要 clear())。

此外,我推荐更底层的解决方案,因为 ORM 的开销(反射只是冰山一角)可能非常重要,以至于在普通的 ResultSet 上进行迭代,即使使用像提到的 JdbcTemplate 这样的一些轻量级支持也会更快。

JPA 根本不是为对大量实体执行操作而设计的。您可以使用flush()/clear() 来避免OutOfMemoryError,但请再次考虑这一点。付出巨大资源消耗的代价,您获得的收益微乎其微。

【讨论】:

JPA 的优势不仅在于与数据库无关,而且甚至可以不使用传统数据库 (NoSQL)。不时进行刷新/清除并不难,通常批处理操作很少进行。 您好 Thomasz。我有很多理由抱怨 JPA/Hibernate,但恭敬地,我真的怀疑它们“不是为在许多对象上运行而设计的”。我怀疑我只需要为这个用例学习正确的模式。 好吧,我只能想到两种模式:分页(多次提到)和flush()/clear()。第一个是恕我直言,不是为批处理目的而设计的,而使用 flush()/clear() 序列闻起来像 leaky abstraction 是的,它是您提到的分页和刷新/清除的组合。谢谢!【参考方案5】:

如果你使用 EclipseLink,我会使用这种方法来获得 Iterable 的结果

private static <T> Iterable<T> getResult(TypedQuery<T> query)

  //eclipseLink
  if(query instanceof JpaQuery) 
    JpaQuery<T> jQuery = (JpaQuery<T>) query;
    jQuery.setHint(QueryHints.RESULT_SET_TYPE, ResultSetType.ForwardOnly)
       .setHint(QueryHints.SCROLLABLE_CURSOR, true);

    final Cursor cursor = jQuery.getResultCursor();
    return new Iterable<T>()
         
      @SuppressWarnings("unchecked")
      @Override
      public Iterator<T> iterator()
      
        return cursor;
      
    ; 
   
  return query.getResultList();  
  

关闭方法

static void closeCursor(Iterable<?> list)

  if (list.iterator() instanceof Cursor)
    
      ((Cursor) list.iterator()).close();
    

【讨论】:

我尝试了您的代码,但仍然出现 OOM - 似乎所有 T 对象(以及从 T 引用的所有连接表对象)都不是 GC。分析显示它们是从 org.eclipse.persistence.internal.sessions.RepeatableWriteUnitOfWork 中的“表”以及 org.eclipse.persistence.internal.identitymaps.CacheKey 引用的。我查看了缓存,我的设置都是默认的(禁用选择性,软子缓存弱,缓存大小 100,丢弃无效)。我将研究禁用会话,看看它是否有帮助。顺便说一句,我只是使用“for (T o: results)”遍历返回光标。 Badum tssssssss【参考方案6】:

这取决于您必须执行的操作类型。你为什么要循环超过一百万行?您是否以批处理模式更新某些内容?您要向客户显示所有记录吗?您是否正在计算检索到的实体的一些统计数据?

如果您要向客户显示一百万条记录,请重新考虑您的用户界面。在这种情况下,适当的解决方案是将结果分页并使用setFirstResult()setMaxResult()

如果您已经启动了大量记录的更新,您最好保持更新简单并使用Query.executeUpdate()。或者,您可以使用消息驱动 Bean 或工作管理器以异步模式执行更新。

如果您要根据检索到的实体计算一些统计信息,则可以利用 JPA 规范定义的分组函数。

对于任何其他情况,请更具体:)

【讨论】:

很简单,我需要“为每一行”做一些事情。当然,这是一个常见的用例。在我现在正在处理的特定情况下,我需要使用每一行的 id(PK)查询完全在我的数据库之外的外部 Web 服务。结果不会显示回任何客户端 Web 浏览器,因此没有用户界面可言。换句话说,这是一个批处理作业。 如果您“需要”为每一行打印 id,没有其他方法可以获取每一行,获取 id 并打印。最佳解决方案取决于您需要做什么。 @Caffeine Coma,如果您只需要每行的 id,那么最大的改进可能来自仅获取该列,如 SELECT m.id FROM Model m,然后遍历 List。跨度> @Jörn Horstmann- 如果有数百万行,这真的很重要吗?我的观点是,一个包含数百万个对象(无论多么小)的 ArrayList 对 JVM 堆没有好处。 @Dainius:我的问题真的是:“如何在没有整个 ArrayList 内存的情况下迭代每一行?”换句话说,我想要一个一次拉 N 的接口,其中 N 明显小于 100 万。 :-)【参考方案7】:

没有“正确”的做法,这不是 JPA 或 JDO 或任何其他 ORM 打算做的事情,直接 JDBC 将是您的最佳选择,因为您可以将其配置为带回少量一次行并在使用时刷新它们,这就是存在服务器端游标的原因。

ORM 工具不是为批量处理而设计的,它们旨在让您操作对象并尝试使存储数据的 RDBMS 尽可能透明,但大多数在透明部分至少在某种程度上失败了。在这种规模下,由于对象实例化开销简单明了,因此无法处理数十万行(对象),更不用说使用任何 ORM 处理数百万行并让它在任何合理的时间内执行。

使用适当的工具。纯 JDBC 和存储过程在 2011 年肯定占有一席之地,尤其是在与这些 ORM 框架相比它们更擅长的方面。

无论你如何做,将一百万个任何东西,即使是一个简单的List&lt;Integer&gt;,都不会非常有效。执行您所要求的正确方法是一个简单的SELECT id FROM table,设置为SERVER SIDE(取决于供应商)并将光标设置为FORWARD_ONLY READ-ONLY 并对其进行迭代。

如果您真的通过调用一些 Web 服务器来处理数百万个 id,那么您还必须进行一些并发处理才能在任何合理的时间内运行。使用 JDBC 游标拉动并一次将其中的几个放置在 ConcurrentLinkedQueue 中并拥有一个小线程池 (# CPU/Cores + 1) 拉动并处理它们是在具有以下功能的机器上完成任务的唯一方法任何“正常”数量的 RAM,因为您的内存已经用完了。

也可以查看answer。

【讨论】:

所以您是说没有公司需要访问其用户表的每一行?当需要这样做时,他们的程序员只是将 Hibernate 扔出窗外? “没有办法处理数十万行”- 在我的问题中,我指出了 setFirstResult/setMaxResult,所以很明显 一种方法。我在问有没有更好的。 “不管你怎么做,把一百万个东西拉到一个简单的 List 中都不会很有效。” 这正是我的观点.我在问如何创建巨型列表,而是迭代结果集。 按照我在回答中的建议,使用带有 FORWARD_ONLY READ_ONLY 和 SERVER_SIDE 游标的简单直接 JDBC 选择语句。如何使 JDBC 使用 SERVER_SIDE 游标取决于数据库驱动程序。 我完全同意这个答案。最佳解决方案取决于问题。如果问题是轻松加载一些实体,JPA 很好。如果问题是有效地使用大量数据,直接 JDBC 会更好。 扫描数以百万计的记录很常见,原因有很多,例如将它们索引到搜索引擎中。尽管我同意 JDBC 通常是更直接的途径,但有时您会走进一个项目,该项目已经将非常复杂的业务逻辑捆绑在 Hibernate 层中。如果您绕过它并使用 JDBC,那么您就绕过了业务逻辑,这有时对于重新实现和维护来说并非易事。当人们发布有关非典型用例的问题时,他们通常知道这有点奇怪,但可能是继承某些东西而不是从头开始构建,并且可能无法透露细节。【参考方案8】:

您可以使用另一个“技巧”。仅加载您感兴趣的实体的标识符集合。假设标识符的类型为 long=8bytes,那么 10^6 的此类标识符列表大约为 8Mb。如果它是一个批处理过程(一次一个实例),那么它是可以忍受的。然后只需迭代并完成工作。

另外一句话——无论如何你都应该分块做——尤其是当你修改记录时,否则数据库中的回滚段会增长。

在设置 firstResult/maxRows 策略时 - 对于远离顶部的结果,它会非常非常缓慢。

还要考虑到数据库可能在读取提交隔离中运行,因此要避免幻读加载标识符,然后一个接一个地加载实体(或 10 乘 10 或其他)。 p>

【讨论】:

嗨@Marcin,您或其他人能否提供一个示例代码的链接,该示例代码应用这种分块和id-first 逐步方法,最好使用Java8 流?【参考方案9】:

我很惊讶地发现存储过程的使用在此处的答案中并不突出。过去,当我不得不做这样的事情时,我创建了一个存储过程,它以小块的形式处理数据,然后休眠一会儿,然后继续。休眠的原因是为了不使数据库不堪重负,该数据库可能也用于更实时的查询类型,例如连接到网站。如果没有其他人在使用数据库,那么您可以省略睡眠。如果您需要确保每条记录只处理一次,那么您将需要创建一个额外的表(或字段)来存储您已处理的记录,以便在重新启动时保持弹性。

这里的性能节省是显着的,可能比您在 JPA/Hibernate/AppServer 领域所做的任何事情都要快几个数量级,并且您的数据库服务器很可能有自己的服务器端游标类型的机制来有效地处理大型结果集.性能节省来自不必将数据从数据库服务器传送到应用程序服务器,您在应用程序服务器处理数据,然后将其传送回来。

使用存储过程有一些明显的缺点,可能会完全为您排除这种情况,但如果您在个人工具箱中拥有该技能并且可以在这种情况下使用它,那么您可以淘汰这些类型的事情很快。

【讨论】:

-2 反对票 - 下一个反对票的人会为你的反对票辩护吗? 我在阅读这些内容时也有同样的想法。该问题表明没有 UI 的大批量批处理作业。假设您不需要应用服务器特定的资源,为什么要使用应用服务器呢?存储过程会更有效率。 @jdessey 根据情况,假设我们有一个导入工具,在导入时它应该对系统的其他部分做一些事情,例如根据已经编码为 EJB 的一些业务规则将行添加到另一个表。然后在应用服务器中运行会更有意义,除非您可以让 EJB 以嵌入式模式运行。【参考方案10】:

扩展@Tomasz Nurkiewicz 的答案。您可以访问DataSource,这反过来又可以为您提供连接

@Resource(name = "myDataSource",
    lookup = "java:comp/DefaultDataSource")
private DataSource myDataSource;

在你的代码中你有

try (Connection connection = myDataSource.getConnection()) 
    // raw jdbc operations

这将允许您绕过 JPA 进行某些特定的大批量操作(例如导入/导出),但是如果需要,您仍然可以访问实体管理器以进行其他 JPA 操作。

【讨论】:

【参考方案11】:

这是一个简单、直接的 JPA 示例(在 Kotlin 中),它展示了如何在不使用游标的情况下对任意大的结果集进行分页,一次读取 100 个项目的块(每个游标都消耗数据库上的资源)。它使用键集分页。

请参阅https://use-the-index-luke.com/no-offset 了解键集分页的概念,并参阅https://www.citusdata.com/blog/2016/03/30/five-ways-to-paginate/ 了解不同分页方式及其缺点的比较。

/*
create table my_table(
  id int primary key, -- index will be created
  my_column varchar
)
*/

fun keysetPaginationExample() 
    var lastId = Integer.MIN_VALUE
    do 

        val someItems =
        myRepository.findTop100ByMyTableIdAfterOrderByMyTableId(lastId)

        if (someItems.isEmpty()) break

        lastId = someItems.last().myTableId

        for (item in someItems) 
          process(item)
        

     while (true)

【讨论】:

【参考方案12】:

JPA 和 NativeQuery 每次使用偏移量获取大小元素的示例

public List<X> getXByFetching(int fetchSize) 
        int totalX = getTotalRows(Entity);
        List<X> result = new ArrayList<>();
        for (int offset = 0; offset < totalX; offset = offset + fetchSize) 
            EntityManager entityManager = getEntityManager();
            String sql = getSqlSelect(Entity) + " OFFSET " + offset + " ROWS";
            Query query = entityManager.createNativeQuery(sql, X.class);
            query.setMaxResults(fetchSize);
            result.addAll(query.getResultList());
            entityManager.flush();
            entityManager.clear();
        return result;
    

【讨论】:

【参考方案13】:

使用Pagination 概念检索结果

【讨论】:

分页对 GUI 非常有用。但是为了处理大量数据,ScrollableResultSet 是很久以前发明的。它只是不在 JPA 中。【参考方案14】:

我自己也想过这个问题。这似乎很重要:

您的数据集有多大(行) 您正在使用什么 JPA 实现 您正在对每一行进行何种处理。

我编写了一个迭代器,以便轻松交换两种方法(findAll 与 findEntries)。

我建议你两个都试试。

Long count = entityManager().createQuery("select count(o) from Model o", Long.class).getSingleResult();
ChunkIterator<Model> it1 = new ChunkIterator<Model>(count, 2) 

    @Override
    public Iterator<Model> getChunk(long index, long chunkSize) 
        //Do your setFirst and setMax here and return an iterator.
    

;

Iterator<Model> it2 = List<Model> models = entityManager().createQuery("from Model m", Model.class).getResultList().iterator();


public static abstract class ChunkIterator<T> 
    extends AbstractIterator<T> implements Iterable<T>
    private Iterator<T> chunk;
    private Long count;
    private long index = 0;
    private long chunkSize = 100;

    public ChunkIterator(Long count, long chunkSize) 
        super();
        this.count = count;
        this.chunkSize = chunkSize;
    

    public abstract Iterator<T> getChunk(long index, long chunkSize);

    @Override
    public Iterator<T> iterator() 
        return this;
    

    @Override
    protected T computeNext() 
        if (count == 0) return endOfData();
        if (chunk != null && chunk.hasNext() == false && index >= count) 
            return endOfData();
        if (chunk == null || chunk.hasNext() == false) 
            chunk = getChunk(index, chunkSize);
            index += chunkSize;
        
        if (chunk == null || chunk.hasNext() == false) 
            return endOfData();
        return chunk.next();
    


我最终没有使用我的块迭代器(所以它可能没有经过测试)。顺便说一句,如果你想使用它,你将需要谷歌收藏。

【讨论】:

关于“你对每一行进行什么样的处理”——如果行数以百万计,我怀疑即使是一个只有 id 列的简单对象也会导致问题.我也想过编写自己的迭代器来包装 setFirstResult/setMaxResult,但我认为这一定是一个常见的(希望已解决!)问题。 @Caffeine Coma 我发布了我的迭代器,你可能可以做更多的 JPA 来适应它。告诉我它是否有帮助。我最终没有使用(做了一个 findAll)。【参考方案15】:

使用 hibernate 有 4 种不同的方式来实现您想要的。每个都有设计权衡、限制和后果。我建议探索每一个,然后决定哪一个适合你的情况。

    通过 scroll() 使用无状态会话 在每次迭代后使用 session.clear()。当需要附加其他实体时,将它们加载到单独的会话中。实际上,第一个会话是模拟无状态会话,但保留有状态会话的所有功能,直到对象被分离。 使用 iterate() 或 list() 但仅在第一个查询中获取 id,然后在每次迭代的单独会话中,执行 session.load 并在迭代结束时关闭会话。 使用 Query.iterate() 和 EntityManager.detach() aka Session.evict();

【讨论】:

【参考方案16】:

JPA 2.2 和 Hibernate(至少在 v5.4.30 中)终于找到了您想要的答案,它使用了上面提到的 Scrollable 实现。

您的代码现在可以如下所示:

entityManager().createQuery("from Model m", Model.class)
                  .getResultStream();
                  .forEach(model -> System.out.println(model.getId());

【讨论】:

以上是关于JPA:迭代大型结果集的正确模式是啥?的主要内容,如果未能解决你的问题,请参考以下文章

基于查询结果集的PHP跳过for循环迭代

使用 JDBC 可调用 stmt 存储 proc 时返回的结果集的顺序是啥?

pandas.concat 和 numpy.append 的大型数据集的内存错误

pandas.concat 和 numpy.append 的大型数据集的内存错误

使用具有“order by”组件的窗口函数时,结果集的顺序是啥?

如何在不使用大型结果集的情况下检查一个字符串是不是是另一个字符串的子字符串?