用于队列和 api 的嵌套公牛单独进程

Posted

技术标签:

【中文标题】用于队列和 api 的嵌套公牛单独进程【英文标题】:nest bull separate process for queues and api 【发布时间】:2020-03-27 03:46:35 【问题描述】:

我有一个 nestjs 应用程序,它暴露了一些 REST APIs。其中一个APIs 触发了处理某些任务的作业。问题是当作业被触发时,应用程序停止服务 REST 请求,这导致负载均衡器的健康检查失败。我按照README末尾给出的方法启动了一个单独的子进程来处理作业。但是,该作业不会在子进程中启动,并且 API 请求会停止。

这是我的工作:

import 
  BullQueueEvents,
  OnQueueActive,
  OnQueueEvent,
  Process,
  Processor,
 from 'nest-bull';
import  Job  from 'bull';
import  Logger  from '@nestjs/common';
import  AService  from './a-service';
import  AJobInterface  from '../AJobInterface';

@Processor( name: 'a_queue' )
export class AJob 
  private readonly logger = new Logger('AQueue');

  constructor(private readonly service: AService) 

  @Process(
    name: 'app',
    concurrency: 1
  )
  processApp(job: Job<AJobInterface>) 
    console.log('CHILD: ', process.pid);
    const  jobId  = job.data;
    return this.service.process(jobId);
  

  @OnQueueActive()
  onActive(job: Job) 
    this.logger.log(
      `Processing job $job.id of type $job.name with data $JSON.stringify(
        job.data,
      )...`,
    );
  

  @OnQueueEvent(BullQueueEvents.COMPLETED)
  onCompleted(job: Job) 
    this.logger.log(
      `Completed job $job.id of type $job.name with result $job.returnvalue`,
    );
  

这是我的 app.module.ts:

import  Module, OnModuleInit  from '@nestjs/common';
import  TypeOrmModule  from '@nestjs/typeorm';
import  AppController  from './app.controller';
import  AppService  from './app.service';
import  DatabaseModule  from './db/module';
import  BullModule  from 'nest-bull';
import  AJob  from './worker/a-job';
import  AService  from './worker/a-service';
import  join  from 'path';

@Module(
  imports: [
    TypeOrmModule.forRoot(),
    DatabaseModule,
    BullModule.register(
      name: 'a_queue',
      processors: [ join(__dirname, 'worker/a-job.js') ],
      options: 
        redis: 
          host: process.env.REDIS_URL || '127.0.0.1',
          port: 6379,
          showFriendlyErrorStack: true,
        ,
        settings: 
          lockDuration: 300000,
          stalledInterval: 300000
        ,
      ,
    ),
  ],
  controllers: [AppController],
  providers: [AppService, AJob, AService],
)
export class AppModule implements OnModuleInit 
  onModuleInit() 
    console.log('MAIN: ', process.pid);
  

我做错了什么吗?

【问题讨论】:

【参考方案1】:

很抱歉这么晚才发布答案。事实证明,在子进程中设置工作人员是不可能的。我最终得到了一个单独的 worker.module.ts 和一个单独的 worker.ts 并为 API 和工作程序创建了两个单独的进程。

worker.module.ts:

import  Module, OnModuleInit  from '@nestjs/common';
import  TypeOrmModule  from '@nestjs/typeorm';
import  AppService  from '../app.service';
import  DatabaseModule  from '../db/module';
import  BullModule  from 'nest-bull';
import  AJob  from './a-job';
import  AService  from './a-service';
import  join  from 'path';
import  Job, DoneCallback  from 'bull';

@Module(
  imports: [
    TypeOrmModule.forRoot(),
    DatabaseModule,
    BullModule.register(
      name: 'a_queue',
      processors: [ (job: Job, done: DoneCallback) =>  done(null, job.data);  ],
      options: 
        redis: 
          host: process.env.REDIS_URL || '127.0.0.1',
          port: 6379,
          password: process.env.REDIS_PWD,
          showFriendlyErrorStack: true,
        ,
        settings: 
          lockDuration: 300000,
          stalledInterval: 300000
        ,
      ,
    ),
  ],
  providers: [AppService, AJob, AService],
)
export class WorkerModule implements OnModuleInit 
  onModuleInit() 
    console.log('WORKER: ', process.pid);
  

worker.ts:

import  NestFactory  from '@nestjs/core';
import  WorkerModule  from './worker/worker.module';

async function bootstrap() 
  const app = await NestFactory.create(WorkerModule);
  app.init();


bootstrap();

虽然app.module.ts 现在看起来像这样:

//...imports
@Module(
  imports: [
    TypeOrmModule.forRoot(),
    DatabaseModule,
    BullModule.register(
      name: 'a_queue',
      processors: [ ],
      options: 
        redis: 
          host: process.env.REDIS_URL || '127.0.0.1',
          port: 6379,
          showFriendlyErrorStack: true,
        ,
      ,
    ),
  ],
  controllers: [AppController],
  providers: [AppService],
)
export class AppModule implements OnModuleInit 
  onModuleInit() 
    console.log('MAIN: ', process.pid);
  

以及对应的app.ts

import  NestFactory  from '@nestjs/core';
import  AppModule  from './app.module';
import  port  from './config';

async function bootstrap() 
  const app = await NestFactory.create(AppModule);
  app.enableCors();
  await app.listen(port);


bootstrap();

【讨论】:

愚蠢的问题,但是:你如何从下一个 cli 开始 worker.tsapp.ts?我的意思是,如果我执行npm start,则只执行 main.ts。我找不到如何将入口点传递给嵌套 cli。像npm start -entry-point worker.ts 之类的东西,或者您是否移至nest-cli 中的“monorepo”配置。谢谢 嗨@JesusMonzonLegido,您可以使用 pm2 或 supervisor 之类的东西来启动下一个流程 @ujwaldhakal 谢谢。是的,我已经将它与 pm2 集成。对于开发,我有 2 个 nest-cli.json 具有不同 "entryFile" 的文件。我以NODE_ENV=development nest start --watch --config nest-cli-worker.json之类的开头

以上是关于用于队列和 api 的嵌套公牛单独进程的主要内容,如果未能解决你的问题,请参考以下文章

linux中的进程间通信应该使用哪个时钟?

进程间通信方式

laravel 队列 - 同步驱动程序如何工作?它是在单独的进程/线程还是主执行线程中执行?

多线程下的队列问题

Python:将 wx.py.shell.Shell 插入单独的进程

限制子进程访问共享内存和消息队列