Python 发布到 ASP.NET Core 服务使用的 RabbitMQ 交换/队列

Posted

技术标签:

【中文标题】Python 发布到 ASP.NET Core 服务使用的 RabbitMQ 交换/队列【英文标题】:Python publish to RabbitMQ exchange/queue consumed by ASP.NET Core Service 【发布时间】:2022-01-04 08:17:37 【问题描述】:

我在 Docker 容器(rabbitmq:3-management 映像)中运行 RabbitMQ,作为 Docker Compose 应用程序的一部分。该应用程序包含一些 ASP.NET Core WebApi 微服务,它们通过此代理交换消息。效果很好,到目前为止没有给我任何问题。

现在我需要将消息从 Python 应用程序发布到从 ASP.NET Core 微服务之一创建的交换/队列。微服务包含此队列的消费者。对于从 python 发布,我使用 pika。问题是,我似乎无法正确发布。每当我执行我的 Python 脚本时,我都可以在 RabbitMQ 管理 UI 中看到创建了一个带有后缀“_skipped”的新交换和队列。好像我的消息是在那里发送的,而不是实际的队列。此外,当尝试直接从管理 UI 发布时,消息实际上会发送到微服务,但我会遇到一个异常,即消息无法反序列化为 MassTransit 信封对象,以及新的交换和队列带有“_error”后缀。

我不知道问题出在哪里。我认为交换/队列本身很好,因为在这个项目中用于微服务到微服务通信的其他队列/消费者/发布者工作。那么这可能是我试图从 Python 解决交换/队列的方式,或者我的消息正文不正确。

This page 提供了一些有关如何构建消息的信息,但不是太详细,here 我获得了有关如何使用 Python 发布的大部分信息。

您可以在下面看到有关微服务中主机/队列配置的相关代码,以及 Python 脚本。任何有关我如何使其工作的帮助/提示将不胜感激。

ASP.NET 核心:

// Declaring the host, queue "mappingQueue", consumer in Startup.ConfigureServices of microservice
...
    services.AddMassTransit(x =>
    
        x.AddConsumer<MappingUpdateConsumer>();
        x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(config => 
         
            config.Host(new Uri(RabbitMqConst.RabbitMqRootUri), h =>
            
                h.Username(RabbitMqConst.RabbitMqUsername);
                h.Password(RabbitMqConst.RabbitMqPassword);
            );         
            config.ReceiveEndpoint("mappingQueue", e =>
            
                e.ConfigureConsumer<MappingUpdateConsumer>(provider);                        
            );
        ));
    );
    services.AddMassTransitHostedService();
...
// Consumer
public class MappingUpdateConsumer : IConsumer<MappingUpdateMessage>

    ...
    public async Task Consume(ConsumeContext<MappingUpdateMessage> context)
    
        await Task.Run(async () =>
        
            if (context.Message == null)
            
                return;
            
            ...
        );
    

// Message class (will have more properties in the future, thus not just using a string consumer)
public class MappingUpdateMessage

    public string Message  get; set; 

Python:

import pika
import json

connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()

channel.exchange_declare(exchange='mappingQueue', exchange_type='fanout', durable=True)

message = 
  "message"    : 
    "message": "Hello World"
  ,
  "messageType": [
    "urn:message:MassTransit.Tests:ValueMessage"
  ]


channel.basic_publish(exchange='mappingQueue',
                      routing_key='mappingQueue',
                      body=json.dumps(message))

connection.close()
print("sent")

【问题讨论】:

【参考方案1】:

对于那些有同样问题的人,我最终想通了:

..
config.ReceiveEndpoint("mappingQueue", e =>

    e.ClearMessageDeserializers();
    e.UseRawJsonSerializer();
    e.ConfigureConsumer<MappingUpdateConsumer>(provider);
);
...

【讨论】:

以上是关于Python 发布到 ASP.NET Core 服务使用的 RabbitMQ 交换/队列的主要内容,如果未能解决你的问题,请参考以下文章

ASP.NET Core2.2+Quartz.Net 实现web定时任务

将文件从 ASP.NET Core Web api 发布到另一个 ASP.NET Core Web api

linux 布署Asp.net Core 6.0 应用 (宝塔面板)

Asp.net core 通过grpc调用python

ASP.NET Core部署系列一:发布到IIS上

asp.net core发布到iis后出现An error occurred while starting the application