如何在 node.js 断开连接期间缓冲 MongoDB 插入?
Posted
技术标签:
【中文标题】如何在 node.js 断开连接期间缓冲 MongoDB 插入?【英文标题】:How to buffer MongoDB inserts during disconnect in node.js? 【发布时间】:2017-07-07 06:42:29 【问题描述】:我们确实读取了一个包含大约 500k 元素的 XML 文件(使用 xml-stream
),并将它们插入到 MongoDB 中,如下所示:
xml.on(`endElement: product`, writeDataToDb.bind(this, "product"));
插入writeDataToDb(type, obj)
看起来像这样:
collection.insertOne(obj, w: 1, wtimeout: 15000).catch((e) => );
现在,当 Mongo 连接断开时,xml 流仍会读取,并且控制台会充斥着错误消息(无法插入、断开连接、EPIPE 损坏……)。
在docs 中写道:
当您关闭 mongod 进程时,驱动程序会停止处理操作并继续缓冲它们,因为 bufferMaxEntries 默认为 -1,这意味着缓冲所有操作。
这个缓冲区的实际作用是什么?
我们注意到,当我们插入数据并关闭 mongo 服务器时,这些东西被缓冲了,然后我们恢复了 mongo 服务器,本机驱动程序成功重新连接并且节点继续插入数据,但是缓冲的文档(在 mongo beeing 离线期间)做不再插入。
所以我质疑这个缓冲区及其用途。
目标:
我们正在寻找将插入保持在缓冲区中直到 mongo 返回(根据wtimeout
在 15000 毫秒内)的最佳方法,然后让插入缓冲的文档或使用我们尝试过的 xml.pause();
和 xml.resume()
成功。
基本上,我们需要一些帮助来处理断开连接而不会丢失数据或中断。
【问题讨论】:
无法复制这一点,文档中的示例和使用xml-stream
的测试在 mongo 服务器备份后插入缓冲对象。也许您可以发布更多代码/提供更多信息关于你的设置?
@cviejo 我无法分享我的脚本,因为它们与公司相关,但您介意将您尝试复制的脚本发送给我吗? Gist/pastebin 没问题。
【参考方案1】:
使用 insertOne() 插入 500K 元素是一个非常糟糕的主意。您应该改用bulk operations,它允许您在单个请求中插入许多文档。 (这里以 10000 为例,因此可以在 50 个单个请求中完成) 为避免缓冲问题,您可以手动处理:
-
使用
bufferMaxEntries: 0
禁用缓冲
设置重新连接属性:reconnectTries: 30, reconnectInterval: 1000
创建一个 bulkOperation 并为其提供 10000 个项目
暂停 xml 阅读器。尝试插入 10000 个项目。如果失败,则每 3000ms 重试一次,直到成功
如果批量操作在执行过程中中断,您可能会遇到一些重复 ID 问题,请忽略它们(错误代码:11000)
这是一个示例脚本:
var fs = require('fs')
var Xml = require('xml-stream')
var MongoClient = require('mongodb').MongoClient
var url = 'mongodb://localhost:27017/test'
MongoClient.connect(url,
reconnectTries: 30,
reconnectInterval: 1000,
bufferMaxEntries: 0
, function (err, db)
if (err != null)
console.log('connect error: ' + err)
else
var collection = db.collection('product')
var bulk = collection.initializeUnorderedBulkOp()
var totalSize = 500001
var size = 0
var fileStream = fs.createReadStream('data.xml')
var xml = new Xml(fileStream)
xml.on('endElement: product', function (product)
bulk.insert(product)
size++
// if we have enough product, save them using bulk insert
if (size % 10000 == 0)
xml.pause()
bulk.execute(function (err, result)
if (err == null)
bulk = collection.initializeUnorderedBulkOp()
console.log('doc ' + (size - 10000) + ' : ' + size + ' saved on first try')
xml.resume()
else
console.log('bulk insert failed: ' + err)
counter = 0
var retryInsert = setInterval(function ()
counter++
bulk.execute(function (err, result)
if (err == null)
clearInterval(retryInsert)
bulk = collection.initializeUnorderedBulkOp()
console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
xml.resume()
else if (err.code === 11000) // ignore duplicate ID error
clearInterval(retryInsert)
bulk = collection.initializeUnorderedBulkOp()
console.log('doc ' + (size - 10000) + ' : ' + size + ' saved after ' + counter + ' tries')
xml.resume()
else
console.log('failed after first try: ' + counter, 'error: ' + err)
)
, 3000) // retry every 3000ms until success
)
else if (size === totalSize)
bulk.execute(function (err, result)
if (err == null)
db.close()
else
console.log('bulk insert failed: ' + err)
)
)
)
示例日志输出:
doc 0 : 10000 saved on first try
doc 10000 : 20000 saved on first try
doc 20000 : 30000 saved on first try
[...]
bulk insert failed: MongoError: interrupted at shutdown // mongodb server shutdown
failed after first try: 1 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 2 error: MongoError: no connection available for operation and number of stored operation > 0
failed after first try: 3 error: MongoError: no connection available for operation and number of stored operation > 0
doc 130000 : 140000 saved after 4 tries
doc 140000 : 150000 saved on first try
[...]
【讨论】:
您的答案没有提供有关 mongo 写入缓冲区的信息,也没有解决如何插入所有文档的解决方案,即使在副本集更改或断开连接期间也是如此。关于批量插入的信息很有趣,我会研究一下,谢谢! @DanFromGermany 是的,因为在我看来,您正在尝试解决错误的问题:真正的问题是您的应用程序与数据库断开连接。对数据库的调用更少,自动重新连接会更容易,因此不需要写缓冲 我的应用程序不与数据库断开连接。我想编写万一断开副本集断言中的主开关或以重新连接并写入所有数据的应用程序。 @DanFromGermany 我的错我误解了这个问题,请参阅更新的答案! 我看到你忽略了重复 ID 错误。我们遇到过这种情况,当重新连接后,mongo驱动打印这些错误,你知道它们为什么会出现吗?【参考方案2】:我不具体了解 Mongodb 驱动程序和这个条目缓冲区。也许它只保留特定场景的数据。
所以我将用一种更通用的方法来回答这个问题,该方法可以与任何数据库一起使用。
总而言之,你有两个问题:
-
您没有从失败的尝试中恢复
XML 流发送数据太快
要处理第一个问题,您需要实现一个重试算法,以确保在放弃之前进行多次尝试。
要处理第二个问题,您需要对 xml 流实施背压。您可以使用pause
方法、resume
方法和输入缓冲区来实现。
var Promise = require('bluebird');
var fs = require('fs');
var Xml = require('xml-stream');
var fileStream = fs.createReadStream('myFile.xml');
var xml = new Xml(fileStream);
// simple exponential retry algorithm based on promises
function exponentialRetry(task, initialDelay, maxDelay, maxRetry)
var delay = initialDelay;
var retry = 0;
var closure = function()
return task().catch(function(error)
retry++;
if (retry > maxRetry)
throw error
var promise = Promise.delay(delay).then(closure);
delay = Math.min(delay * 2, maxDelay);
return promise;
)
;
return closure();
var maxPressure = 100;
var currentPressure = 0;
var suspended = false;
var stopped = false;
var buffer = [];
// handle back pressure by storing incoming tasks in the buffer
// pause the xml stream as soon as we have enough tasks to work on
// resume it when the buffer is empty
function writeXmlDataWithBackPressure(product)
// closure used to try to start a task
var tryStartTask = function()
// if we have enough tasks running, pause the xml stream
if (!stopped && !suspended && currentPressure >= maxPressure)
xml.pause();
suspended = true;
console.log("stream paused");
// if we have room to run tasks
if (currentPressure < maxPressure)
// if we have a buffered task, start it
// if not, resume the xml stream
if (buffer.length > 0)
buffer.shift()();
else if (!stopped)
try
xml.resume();
suspended = false;
console.log("stream resumed");
catch (e)
// the only way to know if you've reached the end of the stream
// xml.on('end') can be triggered BEFORE all handlers are called
// probably a bug of xml-stream
stopped = true;
console.log("stream end");
;
// push the task to the buffer
buffer.push(function()
currentPressure++;
// use exponential retry to ensure we will try this operation 100 times before giving up
exponentialRetry(function()
return writeDataToDb(product)
, 100, 2000, 100).finally(function()
currentPressure--;
// a task has just finished, let's try to run a new one
tryStartTask();
);
);
// we've just buffered a task, let's try to run it
tryStartTask();
// write the product to database here :)
function writeDataToDb(product)
// the following code is here to create random delays and random failures (just for testing)
var timeToWrite = Math.random() * 100;
var failure = Math.random() > 0.5;
return Promise.delay(timeToWrite).then(function()
if (failure)
throw new Error();
return null;
)
xml.on('endElement: product', writeXmlDataWithBackPressure);
玩它,输入一些console.log
以了解它的行为方式。
我希望这能帮助你解决你的问题:)
【讨论】:
这基本上是一个很好的实现,但我希望能够利用 mongo 的内部写入关注点/写入缓冲区 - 请查看this page 和关键字bufferMaxEntries
。跨度>
以上是关于如何在 node.js 断开连接期间缓冲 MongoDB 插入?的主要内容,如果未能解决你的问题,请参考以下文章
客户端网络套接字在建立安全 TLS 连接之前断开连接 Node.js v13.0.1