如何监听 MongoDB 集合的变化?

Posted

技术标签:

【中文标题】如何监听 MongoDB 集合的变化?【英文标题】:How to listen for changes to a MongoDB collection? 【发布时间】:2012-03-30 06:48:39 【问题描述】:

我正在创建一种以 MongoDB 作为数据存储的后台作业队列系统。如何在生成工作人员处理作业之前“监听”对 MongoDB 集合的插入?

我是否需要每隔几秒轮询一次以查看是否与上次相比有任何更改,或者我的脚本是否可以等待插入发生?

这是我正在处理的一个 php 项目,但可以随意用 Ruby 或与语言无关的方式回答。

【问题讨论】:

在 MongoDB 3.6 中添加了更改流以解决您的场景。 docs.mongodb.com/manual/changeStreams 此外,如果您使用的是 MongoDB Atlas,您可以利用 Stitch Triggers,它允许您执行函数以响应插入/更新/删除/等。 docs.mongodb.com/stitch/triggers/overview 不再需要解析 oplog。 【参考方案1】:

您的想法听起来很像触发器。 MongoDB 不支持触​​发器,但是有些人使用一些技巧“自己动手”。这里的关键是oplog。

当您在副本集中运行 MongoDB 时,所有 MongoDB 操作都会记录到操作日志(称为 oplog)中。 oplog 基本上只是对数据所做修改的运行列表。副本集通过侦听此 oplog 上的更改然后在本地应用更改来发挥作用。

这听起来很熟悉吗?

这里我不能详细说明整个过程,它是几页文档,但你需要的工具是可用的。

先写一些关于 oplog 的文章 - Brief description - Layout of the local collection(包含 oplog)

您还需要利用tailable cursors。这些将为您提供一种侦听更改而不是轮询更改的方法。请注意,复制使用可尾游标,因此这是受支持的功能。

【讨论】:

嗯...不完全是我的想法。此时我只运行一个实例(没有从站)。那么也许是一个更基本的解决方案? 您可以使用--replSet 选项启动服务器,它将创建/填充oplog。即使没有二级。这绝对是“监听”数据库变化的唯一方法。 这是一个很好的描述如何设置 oplog 以在本地记录对 DB 的更改:loosexaml.wordpress.com/2012/09/03/… 太棒了!这真的是我想要的。我在 npm 上找到了一个名为“mongo-oplog”的库。好开心~ 我同意在撰写此答案时触发器可能不可用,但对所有登陆这里的人来说,现在有一个选项可用,查看 MongoDB Stitch (docs.mongodb.com/stitch/#stitch) 和 Stitch 触发器 (@ 987654326@)..【参考方案2】:

MongoDB 具有所谓的capped collectionstailable cursors 允许 MongoDB 将数据推送到侦听器。

capped collection 本质上是一个固定大小且只允许插入的集合。下面是创建一个的样子:

db.createCollection("messages",  capped: true, size: 100000000 )

MongoDB Tailable 游标 (original post by Jonathan H. Wage)

红宝石

coll = db.collection('my_collection')
cursor = Mongo::Cursor.new(coll, :tailable => true)
loop do
  if doc = cursor.next_document
    puts doc
  else
    sleep 1
  end
end

PHP

$mongo = new Mongo();
$db = $mongo->selectDB('my_db')
$coll = $db->selectCollection('my_collection');
$cursor = $coll->find()->tailable(true);
while (true) 
    if ($cursor->hasNext()) 
        $doc = $cursor->getNext();
        print_r($doc);
     else 
        sleep(1);
    

Python(Robert Stewart)

from pymongo import Connection
import time

db = Connection().my_db
coll = db.my_collection
cursor = coll.find(tailable=True)
while cursor.alive:
    try:
        doc = cursor.next()
        print doc
    except StopIteration:
        time.sleep(1)

Perl(Max)

use 5.010;

use strict;
use warnings;
use MongoDB;

my $db = MongoDB::Connection->new;
my $coll = $db->my_db->my_collection;
my $cursor = $coll->find->tailable(1);
for (;;)

    if (defined(my $doc = $cursor->next))
    
        say $doc;
    
    else
    
        sleep 1;
    

其他资源:

Ruby/Node.js Tutorial which walks you through creating an application that listens to inserts in a MongoDB capped collection.

An article talking about tailable cursors in more detail.

PHP, Ruby, Python, and Perl examples of using tailable cursors.

【讨论】:

睡眠1?真的吗?生产代码?怎么不是轮询? @rbp 哈哈,我从来没有说过这是生产代码,但你是对的,睡一秒钟不是一个好习惯。很确定我从其他地方得到了这个例子。不知道如何重构它。 @kroe 因为那些不相关的细节会被可能不明白为什么不好的新程序员放入生产代码中。 我理解你的意思,但是期待一些新程序员将“sleep 1”添加到生产环境中几乎是冒犯的!我的意思是,我不会感到惊讶......但如果有人把它投入生产,至少会永远学习艰苦的方式......哈哈哈 在生产环境中执行 time.sleep(1) 有什么问题?【参考方案3】:

或者,您可以使用标准的 Mongo FindAndUpdate 方法,并在回调中,在运行回调时触发 EventEmitter 事件(在 Node 中)。

应用程序或架构中侦听此事件的任何其他部分都将收到更新通知,并且还会向那里发送任何相关数据。这是实现 Mongo 通知的一种非常简单的方法。

【讨论】:

这非常低效..您为每个 FindAndUpdate 锁定数据库! 我的猜测是,亚历克斯回答了一个稍微不同(不是专门解决插入问题)但相关的问题,例如当我们假设的排队作业状态发生变化时如何向客户发出某种通知将需要在作业产生、成功完成或失败时发生。通过使用 websocket 连接到节点的客户端,它们都可以通过 FIndAndUpdate 回调上的广播事件通知更改,该回调可以在接收状态更改消息时调用。我会说这并不是低效的,因为需要进行更新。【参考方案4】:

here 有一个可用的 java 示例。

 MongoClient mongoClient = new MongoClient();
    DBCollection coll = mongoClient.getDatabase("local").getCollection("oplog.rs");

    DBCursor cur = coll.find().sort(BasicDBObjectBuilder.start("$natural", 1).get())
            .addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

    System.out.println("== open cursor ==");

    Runnable task = () -> 
        System.out.println("\tWaiting for events");
        while (cur.hasNext()) 
            DBObject obj = cur.next();
            System.out.println( obj );

        
    ;
    new Thread(task).start();

这里给出的密钥是QUERY OPTIONS。

如果您不需要每次都加载所有数据,您也可以更改查找查询。

BasicDBObject query= new BasicDBObject();
query.put("ts", new BasicDBObject("$gt", new BsonTimestamp(1471952088, 1))); //timestamp is within some range
query.put("op", "i"); //Only insert operation

DBCursor cur = coll.find(query).sort(BasicDBObjectBuilder.start("$natural", 1).get())
.addOption(Bytes.QUERYOPTION_TAILABLE | Bytes.QUERYOPTION_AWAITDATA);

【讨论】:

【参考方案5】:

实际上,为什么使用mongoose schema提供的中间件插入新内容时,您不会注意到输出,而不是注意

你可以捕捉插入新文档的事件,并在插入完成后做一些事情

【讨论】:

我的错。对不起先生。【参考方案6】:

其中许多答案只会为您提供新记录而不是更新和/或效率极低

唯一可靠、高效的方法是在本地 db: oplog.rs 集合上创建一个可尾游标,以获取对 MongoDB 的所有更改并按照您的意愿使用它。 (MongoDB 甚至在内部或多或少地支持复制!)

oplog 包含的内容说明: https://www.compose.com/articles/the-mongodb-oplog-and-node-js/

一个 Node.js 库的示例,它提供了一个 API,围绕着 oplog 可用的功能: https://github.com/cayasso/mongo-oplog

【讨论】:

【参考方案7】:

从 MongoDB 3.6 开始,将有一个名为 Change Streams 的新通知 API,您可以使用它。见this blog post for an example。示例:

cursor = client.my_db.my_collection.changes([
    '$match': 
        'operationType': '$in': ['insert', 'replace']
    ,
    '$match': 
        'newDocument.n': '$gte': 1
    
])

# Loops forever.
for change in cursor:
    print(change['newDocument'])

【讨论】:

为什么?你能详细说明吗?这是现在的标准方式吗? 怎么样?不要使用轮询 - 您需要事件方法而不是 while 循环等。 你在哪里看到投票? 我认为他/她指的是最后一个循环。但我认为 PyMongo 只支持这一点。 Motor 可能具有异步/事件侦听器样式的实现。【参考方案8】:

MongoDB 3.6 版现在包含更改流,它本质上是 OpLog 之上的 API,允许类似触发器/通知的用例。

这是一个 Java 示例的链接: http://mongodb.github.io/mongo-java-driver/3.6/driver/tutorials/change-streams/

NodeJS 示例可能类似于:

 var MongoClient = require('mongodb').MongoClient;
    MongoClient.connect("mongodb://localhost:22000/MyStore?readConcern=majority")
     .then(function(client)
       let db = client.db('MyStore')

       let change_streams = db.collection('products').watch()
          change_streams.on('change', function(change)
            console.log(JSON.stringify(change));
          );
      );

【讨论】:

JSON.stringify 对于在 android Studio (Android App) 中接收此数据非常重要..【参考方案9】:

看看这个:改变流

2018 年 1 月 10 日 - 3.6 版

*编辑:我写了一篇关于如何做到这一点的文章https://medium.com/riow/mongodb-data-collection-change-85b63d96ff76

https://docs.mongodb.com/v3.6/changeStreams/


mongodb 3.6 中的新功能 https://docs.mongodb.com/manual/release-notes/3.6/ 2018/01/10

$ mongod --version
db version v3.6.2

为了使用changeStreams,数据库必须是复制集

关于复制集的更多信息: https://docs.mongodb.com/manual/replication/

默认情况下,您的数据库将是“独立”。

如何将独立集转换为副本集:https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-set/


以下示例是一个实际应用程序,说明您如何使用它。 * Node 专用。

/* file.js */
'use strict'


module.exports = function (
    app,
    io,
    User // Collection Name
) 
    // SET WATCH ON COLLECTION 
    const changeStream = User.watch();  

    // Socket Connection  
    io.on('connection', function (socket) 
        console.log('Connection!');

        // USERS - Change
        changeStream.on('change', function(change) 
            console.log('COLLECTION CHANGED');

            User.find(, (err, data) => 
                if (err) throw err;

                if (data) 
                    // RESEND ALL USERS
                    socket.emit('users', data);
                
            );
        );
    );
;
/* END - file.js */

有用的链接:https://docs.mongodb.com/manual/tutorial/convert-standalone-to-replica-sethttps://docs.mongodb.com/manual/tutorial/change-streams-example

https://docs.mongodb.com/v3.6/tutorial/change-streams-examplehttp://plusnconsulting.com/post/MongoDB-Change-Streams

【讨论】:

抱歉所有的编辑,所以不喜欢我的“链接”(说它们是格式不正确的代码。) 你不应该查询数据库,我认为使用 watch() 或类似的,新数据可以发送到正在监听的服务器【参考方案10】:

有一组很棒的服务,称为MongoDB Stitch。查看stitch functions/triggers。请注意,这是一项基于云的付费 服务 (AWS)。在您的情况下,在插入时,您可以调用用 javascript 编写的自定义函数。

【讨论】:

***.com/users/486867/manish-jain - 你有一个例子说明如何使用缝合来通知 REACT 应用程序数据已插入到表中?【参考方案11】:

3.6以后允许使用数据库以下数据库触发器类型:

事件驱动触发器 - 用于自动更新相关文档、通知下游服务、传播数据以支持混合工作负载、数据完整性和审计 计划的触发器 - 适用于计划的数据检索、传播、归档和分析工作负载

登录你的Atlas账号,选择Triggers界面,添加新的触发器:

展开每个部分以获取更多设置或详细信息。

【讨论】:

以上是关于如何监听 MongoDB 集合的变化?的主要内容,如果未能解决你的问题,请参考以下文章

mongodb连接问题

MongoDB基础操作

如何使用 mongodb 显示我的用户集合

如何使用 mongodb 显示我的用户集合

使用 Gulp 管理打开和关闭 MongoDB

如何使用mongodb显示我的用户集合