NestJS MQTT 微服务的有效@MessagePattern 是啥?

Posted

技术标签:

【中文标题】NestJS MQTT 微服务的有效@MessagePattern 是啥?【英文标题】:What's a valid @MessagePattern for NestJS MQTT microservice?NestJS MQTT 微服务的有效@MessagePattern 是什么? 【发布时间】:2019-03-28 13:53:15 【问题描述】:

我正在尝试根据the docs 使用 NestJS 设置 MQTT 微服务。

我已经使用 Docker 启动了一个有效的 Mosquitto Broker,并使用各种 MQTT 客户端验证了它的可操作性。现在,当我启动 NestJS 服务时,它似乎连接正确(mqqt.fx 显示新客户端),但我无法在我的控制器中接收任何消息。 这是我的引导,就像在文档中一样:

ma​​in.ts

async function bootstrap() 
    const app = await NestFactory.createMicroservice(AppModule, 
        transport: Transport.MQTT,
        options: 
            host: 'localhost',
            port: 1883,
            protocol: 'tcp'
        
    );
    app.listen(() => console.log('Microservice is listening'));

bootstrap();

app.controller.ts

@Controller()
export class AppController 

    @MessagePattern('mytopic') // tried cmd:'mytopic' or topic:'mytopic'
    root(msg: Buffer) 
        console.log('received: ', msg)
    

我是错误地使用了消息模式装饰器,还是我对 NestJS MQTT 微服务应该做什么的概念有误?我认为它可能会订阅我传递给装饰器的主题。我唯一的其他信息来源是相应的unit tests

【问题讨论】:

这种奇怪的行为已在Nest 7.0 中得到修复。您可以直接订阅主题,无需任何后缀。 【参考方案1】:

nest.js 模式处理程序

在 nest.js 方面,我们有以下模式处理程序:

@MessagePattern('sum')
sum(data: number[]): number 
  return data.reduce((a, b) => a + b, 0);

正如@Alexandre 解释的那样,这实际上会听sum_ack


非nest.js 客户端

非nest.js 客户端可能如下所示(只需另存为client.js,运行npm install mqtt 并使用node client.js 运行程序):

var mqtt = require('mqtt')
var client  = mqtt.connect('mqtt://localhost:1883')

client.on('connect', function () 
  client.subscribe('sum_res', function (err) 
    if (!err) 
      client.publish('sum_ack', '"data": [2, 3]');
    
  )
)

client.on('message', function (topic, message) 
  console.log(message.toString())
  client.end()
)

它发送关于主题sum_ack 的消息并监听sum_res 上的消息。当它在sum_res 上收到一条消息时,它会记录该消息并结束程序。 nest.js 期望消息格式为data: myData,然后调用参数处理程序sum(myData)

// Log:
"err":null,"response":5 // This is the response from sum()
"isDisposed":true // Internal "complete event" (according to unit test)

当然,这样不太方便……


nest.js 客户端

这是因为这意味着要与另一个 nest.js 客户端一起使用,而不是与普通的 mqtt 客户端一起使用。 nest.js 客户端将所有内部逻辑抽象出来。见this answer,描述了redis的客户端(mqtt只需要改两行)。

async onModuleInit() 
  await this.client.connect();
  // no 'sum_ack' or data: [0, 2, 3] needed
  this.client.send('sum', [0, 2, 3]).toPromise();

【讨论】:

正如你所说,这不是很方便而且有点混乱。一开始我很高兴看到这个 MQTT 或 AMQP 的内置接口,但后来我明白它使用请求-响应范式,这对于这种协议来说有点奇怪。 非常感谢您的澄清,就像 Alexandre 指出的那样,当与特定客户端范例存在这种耦合时,它有点违背了目的。很高兴我现在知道它应该如何使用 有人使用clientMqtt 类进行发布吗?我有一个问题,那就是不仅发布数据 - 还发布模式和 ID。但我只需要数据。我使用 ClientMqtt.send 函数。【参考方案2】:

文档不是很清楚,但似乎对于 mqtt,如果您有 @MessagePattern('mytopic'),您可以在主题 mytopic_ack 上发布命令,您将在 mytopic_res 上得到响应。我仍在尝试找出如何从服务发布到 mqtt 代理。

见https://github.com/nestjs/nest/blob/e019afa472c432ffe9e7330dc786539221652412/packages/microservices/server/server-mqtt.ts#L99

  public getAckQueueName(pattern: string): string 
    return `$pattern_ack`;
  

  public getResQueueName(pattern: string): string 
    return `$pattern_res`;
  

【讨论】:

我在这个答案中创建了一个关于如何作为客户端发布消息的示例:***.com/a/54293468/4694994 这是您要找的吗?【参考方案3】:

@Tanas 是对的。 Nestjs/Microservice 现在监听你的 $[topic] 并回答 $[topic]/reply。后缀 _ack 和 _res 已弃用。

例如:

  @MessagePattern('helloWorld')
  getHello(): string 
    console.log("hello world")
    return this.appService.getHello();
  

现在收听主题:helloWorld 现在回复主题 helloWorld/reply

关于身份证

应该还在有效负载中提供一个 id(请参阅@Hakier),Nestjs 会回复一个包含您的 id 的答案。 如果你没有任何id,仍然不会有任何回复,但会触发相应的逻辑。

例如(使用上面的截图): 你的消息:

"data":"foo","id":"bar"

Nestjs 回复:

"response":"Hello World!","isDisposed":true,"id":"bar"

无 ID:

您的留言:

"data":"foo" or 

没有回复,但在终端中Hello World

【讨论】:

【参考方案4】:

我今天正在与 MQTT 战斗,这对我有一点帮助,但我遇到了更多问题,您可以在下面看到我的发现:

配置代理 URL 的方式错误

在我使用非本地 MQTT 服务器的情况下,我从以下开始:

  const app = await NestFactory.createMicroservice(AppModule, 
    transport: Transport.MQTT,
    options: 
      host: 'test.mosquitto.org',
      port: 1883,
      protocol: 'tcp',
    ,
  );
  await app.listenAsync();

但就像您可以在constructor of ServerMqtt 中阅读一样,他们仅使用url 选项(如果未提供,则回退到'mqtt://localhost:1883'。虽然我没有本地MQTT,但它永远不会解析app.listenAsync(),仅在connect 也不会运行任何处理程序。

当我调整代码以使用url 选项时它开始工作。

  const app = await NestFactory.createMicroservice(AppModule, 
    transport: Transport.MQTT,
    options: 
      url: 'mqtt://test.mosquitto.org:1883',
    ,
  );
  await app.listenAsync();

消息需要id 属性

第二个非常奇怪的问题是,当我使用 @KimKern 的 Non-nest.js Client 脚本时,我必须注册两个 MessagePatterns:sumsum_ack

  @MessagePattern('sum')
  sum(data: number[]): number 
    return data.reduce((a, b) => a + b, 0);
  

  @MessagePattern('sum_ack')
  sumAck(data: number[]): number 
    return data.reduce((a, b) => a + b, 0);
  

当我使用console.log 时,我发现后者正在运行,但只有在第一个存在时才运行。您可以使用 mqtt cli 工具将相同的消息推送到代理进行检查:

mqtt pub -t 'sum_ack' -h 'test.mosquitto.org' -m '"data":[1,2]'

但最大的问题是它没有回复(publish sum_res)

解决方案是在发送消息时也提供id

mqtt pub -t 'sum_ack' -h 'test.mosquitto.org' -m '"data":[1,2], "id":"any-id"'

然后我们可以删除 'sum_ack' MessagePattern 并只留下以下代码:

  @MessagePattern('sum')
  sum(data: number[]): number 
    return data.reduce((a, b) => a + b, 0);
  

原因隐藏在 ServerMqtt 的 handleMessage 方法中,如果消息没有 id,则处理程序不会 publish 响应。

TL/DR 仅使用 url 选项指定消息代理的 url,并始终为消息提供 id

我希望这会为其他人节省一些时间。

黑客愉快!

【讨论】:

以上是关于NestJS MQTT 微服务的有效@MessagePattern 是啥?的主要内容,如果未能解决你的问题,请参考以下文章

无法容器化 NestJS 微服务

NestJS - 在微服务中结合 HTTP 和 RabbitMQ

在 NestJS 微服务中公开普通的 http 端点

NestJS:有没有办法从外部调用微服务rabbitmq

如何将“typeorm”模型转换为 graphql 有效负载?

如何使用nestjs redis 微服务?