通过 spring-data 迭代 MongoDB 中的大型集合

Posted

技术标签:

【中文标题】通过 spring-data 迭代 MongoDB 中的大型集合【英文标题】:Iterate over large collection in MongoDB via spring-data 【发布时间】:2012-06-18 06:05:43 【问题描述】:

朋友们!

我通过 spring-data 在 java 项目中使用 MongoDB。我使用 Repository 接口来访问集合中的数据。对于某些处理,我需要遍历集合的所有元素。我可以使用存储库的 fetchAll 方法,但它总是返回 ArrayList。

但是,假设其中一个集合会很大 - 最多 100 万条记录,每个记录至少有几千字节。我想我不应该在这种情况下使用 fetchAll,但是我找不到返回一些迭代器的方便方法(这可能允许部分获取集合),也找不到带有回调的方便方法。

我看到只支持在页面中检索此类集合。我想知道这是否是处理此类集合的唯一方法?

【问题讨论】:

您可以使用limit()函数来限制函数以块的形式检索数据 我不熟悉您使用的框架,但如果它没有 MongoDB 游标的一些包装器,我会觉得很奇怪。您确定您得到的是 ArrayList,而不是环绕光标的自定义 List 实现吗? 是的,当然 - 我刚刚记录了它的 findAll().getClass() 收集了大约 300 万个,我想我看到了 java.util.ArrayList... 从 Spring Boot 2 开始,您可以使用 streamAllBy() 一次只加载一个条目。 【参考方案1】:

由于这个问题最近被撞了,这个答案需要更多的爱!

如果你使用 Spring Data Repository 接口,你可以声明一个返回 Stream 的自定义方法,它将由 Spring Data 使用游标实现:

import java.util.Stream;

public interface AlarmRepository extends CrudRepository<Alarm, String> 

    Stream<Alarm> findAllBy();


因此,对于大量数据,您可以将它们流式传输并逐行处理,而不受内存限制。

见https://docs.spring.io/spring-data/mongodb/docs/current/reference/html/#mongodb.repositories.queries

【讨论】:

这个答案对我来说帮助很大。谢谢! 自春季启动 2 以来,您不能只使用 streamAllBy() 而没有任何附加注释吗?【参考方案2】:

此答案基于:https://***.com/a/22711715/5622596

这个答案需要更新一下,因为PageRequest 已经改变了它的构造方式。

说到这里是我修改后的回复:

int pageNumber = 1;

//Change value to whatever size you want the page to have
int pageLimit = 100;

Page<SomeClass> page;
List<SomeClass> compondList= new LinkedList<>();

do
    PageRequest pageRequest = PageRequest.of(pageNumber, pageLimit);
    
    page = repository.findAll(pageRequest);
    
    List<SomeClass> listFromPage = page.getContent();

    //Do something with this list example below
    compondList.addAll(listFromPage);

    pageNumber++;

  while (!page.isLast());

//Do something with the compondList: example below
return compondList;

【讨论】:

【参考方案3】:

对大型集合进行迭代的最佳方法是直接使用 Mongo API。我使用了下面的代码,它对我的​​用例来说就像一个魅力。 我必须迭代超过 15M 条记录,其中一些记录的文档大小很大。 以下代码在 Kotlin Spring Boot App(Spring Boot 版本:2.4.5)中

fun getAbcCursor(batchSize: Int, from: Long?, to: Long?): MongoCursor<Document> 

    val collection = xyzMongoTemplate.getCollection("abc")
    val query = Document("field1", "value1")
    if (from != null) 
        val fromDate = Date(from)
        val toDate = if (to != null)  Date(to)  else  Date() 
        query.append(
            "createTime",
            Document(
                "\$gte", fromDate
            ).append(
                "\$lte", toDate
            )
        )
    
    return collection.find(query).batchSize(batchSize).iterator()

然后,从服务层方法,您可以在返回的光标上继续调用 MongoCursor.next() 直到 MongoCursor.hasNext() 返回 true。重要观察:请不要错过在 'FindIterable' 上添加 batchSize(MongoCollection.find() 的返回类型)。如果您不提供批量大小,则游标将获取最初的 101 条记录,然后会挂起(它会尝试一次获取所有剩余的记录)。 对于我的场景,我使用了 2000 的批量大小,因为它在测试期间给出了最好的结果。这种优化的批量大小将受到记录平均大小的影响。 这是 Java 中的等效代码(从查询中删除 createTime,因为它特定于我的数据模型)。​​

    MongoCursor<Document> getAbcCursor(Int batchSize) 
        MongoCollection<Document> collection = xyzMongoTemplate.getCollection("your_collection_name");
        Document query = new Document("field1", "value1");// query --> "field1": "value1"
        return collection.find(query).batchSize(batchSize).iterator();
    

【讨论】:

【参考方案4】:

回复较晚,但将来可能会对某人有所帮助。 Spring data 不提供任何 API 来包装 Mongo DB Cursor 功能。它在find 方法中使用它,但总是返回完整的对象列表。选项是直接使用 Mongo API 或使用 Spring Data Paging API,类似这样:

        final int pageLimit = 300;
        int pageNumber = 0;
        Page<T> page = repository.findAll(new PageRequest(pageNumber, pageLimit));
        while (page.hasNextPage()) 
            processPageContent(page.getContent());
            page = repository.findAll(new PageRequest(++pageNumber, pageLimit));
        
        // process last page
        processPageContent(page.getContent());

UPD (!) 对于大型数据集,此方法不够(请参阅@Shawn Bush cmets)请直接使用 Mongo API 处理此类情况。

【讨论】:

我只是想在更晚的时候插话说,对于大型数据集,您也应该远离 Paging API,因为它必须在构建每个页面之前遍历整个集合。这很快就会变得昂贵。坚持直接使用 Mongo API。 @ShawnBush 你确定吗? 做 while();看起来会更好 出于对许多浪费时间的纯粹挫败感,希望能节省别人的时间,我想重复肖恩布什所说的话。不要将其用于更大的收藏!您最终会得到使用限制和跳过的查询。每个跳过的文档都会被检查,这使得页码大的请求非常慢。【参考方案5】:

您仍然可以使用 mongoTemplate 访问 Collection 并简单地使用 DBCursor:

     DBCollection collection = mongoTemplate.getCollection("boundary");
     DBCursor cursor = collection.find();        
     while(cursor.hasNext())
         DBObject obj = cursor.next();
         Object object =  obj.get("polygons");
         ..
      ...
     

【讨论】:

【参考方案6】:

您可能想尝试这样的 DBCursor 方式:

    DBObject query = new BasicDBObject(); //setup the query criteria
    query.put("method", method);
    query.put("ctime", (new BasicDBObject("$gte", bTime)).append("$lt", eTime));

    logger.debug("query: ", query);

    DBObject fields = new BasicDBObject(); //only get the needed fields.
    fields.put("_id", 0);
    fields.put("uId", 1);
    fields.put("ctime", 1);

    DBCursor dbCursor = mongoTemplate.getCollection("collectionName").find(query, fields);

    while (dbCursor.hasNext())
        DBObject object = dbCursor.next();
        logger.debug("object: ", object);
        //do something.
    

【讨论】:

【参考方案7】:

使用 MongoTemplate::stream() 可能是最适合 DBCursor 的 Java 包装器

【讨论】:

【参考方案8】:

检查新方法以根据文档处理结果。

http://docs.spring.io/spring-data/mongodb/docs/current/api/org/springframework/data/mongodb/core/MongoTemplate.html#executeQuery-org.springframework.data.mongodb.core.query.Query-java.lang.String-org.springframework.data.mongodb.core.DocumentCallbackHandler-

【讨论】:

如果不知道如何创建 Query 对象来查找集合中的所有内容,这在上下文中没有多大帮助。我比预期更难弄清楚这一点。【参考方案9】:

另一种方式:

do
  page = repository.findAll(new PageRequest(pageNumber, pageLimit));
  pageNumber++;

while (!page.isLastPage());

【讨论】:

以上是关于通过 spring-data 迭代 MongoDB 中的大型集合的主要内容,如果未能解决你的问题,请参考以下文章

在 MongoDB 的 spring-data 中配置 GridFS 模板

使用 Spring-Data 配置 MongoDb 时出现异常

如何在 Spring-data 中更改/定义 Mongodb 的默认数据库?

“没有找到类型的属性”......将 QueryDslPredicateExecutor 与 MongoDB 和 Spring-Data 一起使用时

SpringData,JPA,MongoDB,Solr,Elasticsearch底层逻辑关系

“迭代”通过 mongodb 中的所有文档字段