MongoDb 实时(或接近实时)流出插入的数据

Posted

技术标签:

【中文标题】MongoDb 实时(或接近实时)流出插入的数据【英文标题】:MongoDb Streaming Out Inserted Data in Real-time (or near real-time) 【发布时间】:2011-08-24 04:18:15 【问题描述】:

我有许多 MongoDB 集合,它们从各种流式源中获取许多 JSON 文档。换句话说,有许多进程不断地将数据插入到一组 MongoDB 集合中。

我需要一种将数据从 MongoDB 流式传输到下游应用程序的方法。所以我想要一个概念上看起来像这样的系统:

App Stream1 --> 
App Stream2 -->     MONGODB     --->  Aggregated Stream
App Stream3 -->

或者这个:

App Stream1 -->                 --->  MongoD Stream1
App Stream2 -->     MONGODB     --->  MongoD Stream2
App Stream3 -->                 --->  MongoD Stream3

问题是如何从 Mongo 流式传输数据而无需不断轮询/查询数据库?

显而易见的问题答案是“为什么不更改这些应用程序流式处理进程以将消息发送到 Rabbit、Zero 或 ActiveMQ 等队列,然后让它们像这样立即发送到您的 Mongo 流式处理进程和 Mongo”:

                 MONGODB
                   /|\  
                    |
App Stream1 -->     |          --->  MongoD Stream1
App Stream2 -->  SomeMQqueue   --->  MongoD Stream2
App Stream3 -->                --->  MongoD Stream3

在理想的世界中是的,这很好,但是我们需要 Mongo 来确保首先保存消息,避免重复并确保生成所有 ID 等。Mongo 必须位于中间作为持久层。

那么我如何将消息从 Mongo 集合(不使用 GridFS 等)流式传输到这些下游应用程序中。基本思想流派只是轮询新文档,收集的每个文档通过向存储在数据库中的 JSON 文档添加另一个字段来更新它,就像 SQL 表中存储已处理时间戳的进程标志一样。 IE。每 1 秒轮询已处理的文档 == null.... 添加已处理 = now().... 更新文档。

有没有更简洁/计算效率更高的方法?

仅供参考 - 这些都是 Java 进程。

干杯!

【问题讨论】:

【参考方案1】:

如果您正在写入一个有上限的集合(或多个集合),您可以使用tailablecursor 将新数据推送到流中,或者推送到可以从中流出的消息队列中。但是,这不适用于无上限的集合。

【讨论】:

感谢您的链接。遗憾的是,没有使用封顶集合,但对于消息服务来说,这并不是一个糟糕的功能。听起来像是已处理标志上的索引,轮询是唯一的选择...如果索引项为 null,它是否仍会在索引中引用,或者查询 null 仍然意味着集合扫描? 或者我认为我们可以让一个固定大小的封顶集合像一个缓存一样,然后将这些项目拉出一个买 1 并将它们放回普通集合中。那么问题就变成了我们如何在应用程序运行之间保存我们的位置光标?我假设我们只使用 Mongo 自动生成的 _id 字段并选择大于该 ID 字段的所有内容...是否所有 mongo 生成的 _ID 都按升序排列? 索引确实存储null 的条目。如果您正在跟踪一个上限集合,您确实需要存储您看到的最后一个条目(您可以根据需要存储它,使用另一个 mongo 集合可以正常工作),然后使用 $min 在该元素处开始您的可尾光标和skip(1) 继续。见mongodb.org/display/DOCS/… 另外请注意,您需要确保您的上限集合足够大,以容纳您在跟踪该集合的应用程序的合理停机时间窗口期间可能收到的所有文档。如果上限集合太小,从队列的角度来看,您可能会丢失文档。 有一些方法可以解释 oplog(这也是一个有上限的集合),但必须在 master 上完成,但我并没有真正深入研究它。也许有什么要看的。请让我们了解事情的进展情况,因为这是一个有趣的问题。

以上是关于MongoDb 实时(或接近实时)流出插入的数据的主要内容,如果未能解决你的问题,请参考以下文章

在实时 mongodb 机器上更新(或替换)整个数据库集合的最佳方法是啥?

实时同步MongoDB Oplog开发指南

MongoDB

如何实时计算中证1000指数的主买/主卖交易量

作业1-加热罐实时控制案例分析

删除实时数据库上超过 17 亿行的未索引表(SQL Admin Nightmare)