nodejs中的RXJS PostgreSQL背压

Posted

技术标签:

【中文标题】nodejs中的RXJS PostgreSQL背压【英文标题】:RXJS PostgreSQL backpressure in nodejs 【发布时间】:2017-10-23 19:23:49 【问题描述】:

原标题:如何减慢javascript中的PgSQL结果行流?

我在使用 RXJS(5.4.0) 和 PostgreSQL(驱动程序“pg”:“6.1.4”)的 nodejs v4.5.0 中遇到内存不足问题。

我手动创建一个可观察的 PgSQL 行,如下所示:

return Rx.Observable.create((subscriber) => 
    pool.connect().then((client: pg.Client) => 
        const stream:any = client.query(query.toParam());
        stream.on('row', (row) => 
            subscriber.next(row);
        );

        stream.on('end', () => 
            subscriber.complete();
            client.release();
        );
    );
);

然后我将一些运算符附加到 rx observable 并进行一些处理。 请注意,从数据库返回的行有点重。

调查使我得出一个结论: 从数据库中返回的行比处理它们要快得多。必须为大量数据保留内存才能等待处理,这会导致内存不足问题:

致命错误:CALL_AND_RETRY_LAST 分配失败 - 进程内存不足

中止陷阱:6

我在 PostgreSQL 驱动程序上没有看到任何暂停流的选项。有什么想法可以解决这个问题吗?

【问题讨论】:

您的原始查询是否获得了不必要的列?有时“select *”在大型结果集上是个坏主意。 不幸的是,在 SQL 查询级别没有太多优化空间。一列包含基本数据,这是最重要的一列。 您必须分页读取数据,仅此而已。为此使用LIMITOFFSET 在获得下一部分之前使用 CURSOR 并获得少量结果。 CUROSR 是一个 PL/pgSQL 功能,但在我的情况下,我需要将数据检索到 javascript 上下文中。 【参考方案1】:

如果您使用同一作者的pg-cursor library,则应该相对简单:

return Rx.Observable.defer(() => pg.connect())
  .flatMap(client => 
    const cursor = client.query(new Cursor('SELECT * FROM some_table WHERE prop > $1', [100]))

    const observableCursor = Rx.Observable.bindNodeCallback(cursor.read.bind(cursor));

    // Get the first 100 items
    observableCursor(100)
      .map(processRows)
      // This will only emit after the first one completes
      // and will recursively call this for each result
      .expand(_ => 
        observableCursor(100)
          .map(processRows)
      )
      // Unsubscribes once we don't get any more results
      .takeWhile(rows => rows.length > 0)
  );

【讨论】:

【参考方案2】:

我试图尽可能多地编写@paulpdaniels 解决方案建议的可读性,但我不确定这是否真的是反应式风格。

我也有不同的处理函数,取决于上下文变量。我喜欢有一个架构,可以轻松地让使用依赖于上下文条件的不同 rxjs 运算符链并隐藏 cursor.read 逻辑,但我担心这是不可能的。

当我明确告诉数据库带来下一批数据(cursor.read)时必须有一个地方,并且它必须在我从大量数据中释放内存之后的地方。

以下解决方案在打字稿中:

const db = require('./libs/db.js');
import * as Cursor from 'pg-cursor';
import * as pg from 'pg';
import * as Rx from 'rxjs';

let cursor, readHandler;

const source = Rx.Observable.create((subscriber) => 
    db.getClient().then((client: pg.Client) => 

        cursor = client.query(new Cursor('SELECT * FROM some_table WHERE prop > $1', [100]));
        readHandler = function (err, rowsBulk) 
            if (err) 
                subscriber.error(err);
                return client.release();
            
            if (!rowsBulk.length) 
                subscriber.complete();
                return client.release();
            
            subscriber.next(rowsBulk);
        ;
    ).then(() => 
        // fetch first 100 records
        cursor.read(100, readHandler);
    );
);

source.flatMap((item) => 
    return new Promise((resolve) => 
        console.log('processing batch of items');
        setTimeout(() =>  // timeout simulates processing
            item = null;
            console.log('release memory');
            resolve();
        , 5000);
    );
).map(() => 
    // fetch every next 100 records
    console.log('allocate new memory');
    cursor.read(100, readHandler);
    return '.';
).subscribe(console.log, (err) => 
    console.log('Error occurred');
    console.error(err);
);

【讨论】:

可能会在您的问题中扩展“取决于上下文变量的不同处理功能”。我可以更新我的答案,但前提是您提供更具体的内容。 我已更新标题并标记了您的消息。我同意将上下文对象作为参数传递给处理函数。

以上是关于nodejs中的RXJS PostgreSQL背压的主要内容,如果未能解决你的问题,请参考以下文章

如何使用NodeJs中的rxjs中的forkjoin进行多次调用,使用NPM请求

Rxjs 可观察到承诺永远不会在 nodejs 中解决

解析 NodeJS CSS 子包?

Nodejs / PostgreSQL sequelize - 如何更新/更改数据库中的模型?

使用 NodeJS,如何使用 Spring Security OAuth2 制作的 JWT 检索存储在 PostgreSQL 中的会话信息?

如何处理 Kafka Connect Sink 中的背压?