用于队列和 api 的嵌套公牛单独进程
Posted
技术标签:
【中文标题】用于队列和 api 的嵌套公牛单独进程【英文标题】:nest bull separate process for queues and api 【发布时间】:2020-03-27 03:46:35 【问题描述】:我有一个 nestjs
应用程序,它暴露了一些 REST API
s。其中一个API
s 触发了处理某些任务的作业。问题是当作业被触发时,应用程序停止服务 REST 请求,这导致负载均衡器的健康检查失败。我按照README末尾给出的方法启动了一个单独的子进程来处理作业。但是,该作业不会在子进程中启动,并且 API
请求会停止。
这是我的工作:
import
BullQueueEvents,
OnQueueActive,
OnQueueEvent,
Process,
Processor,
from 'nest-bull';
import Job from 'bull';
import Logger from '@nestjs/common';
import AService from './a-service';
import AJobInterface from '../AJobInterface';
@Processor( name: 'a_queue' )
export class AJob
private readonly logger = new Logger('AQueue');
constructor(private readonly service: AService)
@Process(
name: 'app',
concurrency: 1
)
processApp(job: Job<AJobInterface>)
console.log('CHILD: ', process.pid);
const jobId = job.data;
return this.service.process(jobId);
@OnQueueActive()
onActive(job: Job)
this.logger.log(
`Processing job $job.id of type $job.name with data $JSON.stringify(
job.data,
)...`,
);
@OnQueueEvent(BullQueueEvents.COMPLETED)
onCompleted(job: Job)
this.logger.log(
`Completed job $job.id of type $job.name with result $job.returnvalue`,
);
这是我的 app.module.ts:
import Module, OnModuleInit from '@nestjs/common';
import TypeOrmModule from '@nestjs/typeorm';
import AppController from './app.controller';
import AppService from './app.service';
import DatabaseModule from './db/module';
import BullModule from 'nest-bull';
import AJob from './worker/a-job';
import AService from './worker/a-service';
import join from 'path';
@Module(
imports: [
TypeOrmModule.forRoot(),
DatabaseModule,
BullModule.register(
name: 'a_queue',
processors: [ join(__dirname, 'worker/a-job.js') ],
options:
redis:
host: process.env.REDIS_URL || '127.0.0.1',
port: 6379,
showFriendlyErrorStack: true,
,
settings:
lockDuration: 300000,
stalledInterval: 300000
,
,
),
],
controllers: [AppController],
providers: [AppService, AJob, AService],
)
export class AppModule implements OnModuleInit
onModuleInit()
console.log('MAIN: ', process.pid);
我做错了什么吗?
【问题讨论】:
【参考方案1】:很抱歉这么晚才发布答案。事实证明,在子进程中设置工作人员是不可能的。我最终得到了一个单独的 worker.module.ts
和一个单独的 worker.ts
并为 API 和工作程序创建了两个单独的进程。
worker.module.ts
:
import Module, OnModuleInit from '@nestjs/common';
import TypeOrmModule from '@nestjs/typeorm';
import AppService from '../app.service';
import DatabaseModule from '../db/module';
import BullModule from 'nest-bull';
import AJob from './a-job';
import AService from './a-service';
import join from 'path';
import Job, DoneCallback from 'bull';
@Module(
imports: [
TypeOrmModule.forRoot(),
DatabaseModule,
BullModule.register(
name: 'a_queue',
processors: [ (job: Job, done: DoneCallback) => done(null, job.data); ],
options:
redis:
host: process.env.REDIS_URL || '127.0.0.1',
port: 6379,
password: process.env.REDIS_PWD,
showFriendlyErrorStack: true,
,
settings:
lockDuration: 300000,
stalledInterval: 300000
,
,
),
],
providers: [AppService, AJob, AService],
)
export class WorkerModule implements OnModuleInit
onModuleInit()
console.log('WORKER: ', process.pid);
worker.ts
:
import NestFactory from '@nestjs/core';
import WorkerModule from './worker/worker.module';
async function bootstrap()
const app = await NestFactory.create(WorkerModule);
app.init();
bootstrap();
虽然app.module.ts
现在看起来像这样:
//...imports
@Module(
imports: [
TypeOrmModule.forRoot(),
DatabaseModule,
BullModule.register(
name: 'a_queue',
processors: [ ],
options:
redis:
host: process.env.REDIS_URL || '127.0.0.1',
port: 6379,
showFriendlyErrorStack: true,
,
,
),
],
controllers: [AppController],
providers: [AppService],
)
export class AppModule implements OnModuleInit
onModuleInit()
console.log('MAIN: ', process.pid);
以及对应的app.ts
:
import NestFactory from '@nestjs/core';
import AppModule from './app.module';
import port from './config';
async function bootstrap()
const app = await NestFactory.create(AppModule);
app.enableCors();
await app.listen(port);
bootstrap();
【讨论】:
愚蠢的问题,但是:你如何从下一个 cli 开始worker.ts
和 app.ts
?我的意思是,如果我执行npm start
,则只执行 main.ts。我找不到如何将入口点传递给嵌套 cli。像npm start -entry-point worker.ts
之类的东西,或者您是否移至nest-cli
中的“monorepo”配置。谢谢
嗨@JesusMonzonLegido,您可以使用 pm2 或 supervisor 之类的东西来启动下一个流程
@ujwaldhakal 谢谢。是的,我已经将它与 pm2 集成。对于开发,我有 2 个 nest-cli.json
具有不同 "entryFile" 的文件。我以NODE_ENV=development nest start --watch --config nest-cli-worker.json
之类的开头以上是关于用于队列和 api 的嵌套公牛单独进程的主要内容,如果未能解决你的问题,请参考以下文章
laravel 队列 - 同步驱动程序如何工作?它是在单独的进程/线程还是主执行线程中执行?