Python 发布到 ASP.NET Core 服务使用的 RabbitMQ 交换/队列
Posted
技术标签:
【中文标题】Python 发布到 ASP.NET Core 服务使用的 RabbitMQ 交换/队列【英文标题】:Python publish to RabbitMQ exchange/queue consumed by ASP.NET Core Service 【发布时间】:2022-01-04 08:17:37 【问题描述】:我在 Docker 容器(rabbitmq:3-management 映像)中运行 RabbitMQ,作为 Docker Compose 应用程序的一部分。该应用程序包含一些 ASP.NET Core WebApi 微服务,它们通过此代理交换消息。效果很好,到目前为止没有给我任何问题。
现在我需要将消息从 Python 应用程序发布到从 ASP.NET Core 微服务之一创建的交换/队列。微服务包含此队列的消费者。对于从 python 发布,我使用 pika。问题是,我似乎无法正确发布。每当我执行我的 Python 脚本时,我都可以在 RabbitMQ 管理 UI 中看到创建了一个带有后缀“_skipped”的新交换和队列。好像我的消息是在那里发送的,而不是实际的队列。此外,当尝试直接从管理 UI 发布时,消息实际上会发送到微服务,但我会遇到一个异常,即消息无法反序列化为 MassTransit 信封对象,以及新的交换和队列带有“_error”后缀。
我不知道问题出在哪里。我认为交换/队列本身很好,因为在这个项目中用于微服务到微服务通信的其他队列/消费者/发布者工作。那么这可能是我试图从 Python 解决交换/队列的方式,或者我的消息正文不正确。
This page 提供了一些有关如何构建消息的信息,但不是太详细,here 我获得了有关如何使用 Python 发布的大部分信息。
您可以在下面看到有关微服务中主机/队列配置的相关代码,以及 Python 脚本。任何有关我如何使其工作的帮助/提示将不胜感激。
ASP.NET 核心:
// Declaring the host, queue "mappingQueue", consumer in Startup.ConfigureServices of microservice
...
services.AddMassTransit(x =>
x.AddConsumer<MappingUpdateConsumer>();
x.AddBus(provider => Bus.Factory.CreateUsingRabbitMq(config =>
config.Host(new Uri(RabbitMqConst.RabbitMqRootUri), h =>
h.Username(RabbitMqConst.RabbitMqUsername);
h.Password(RabbitMqConst.RabbitMqPassword);
);
config.ReceiveEndpoint("mappingQueue", e =>
e.ConfigureConsumer<MappingUpdateConsumer>(provider);
);
));
);
services.AddMassTransitHostedService();
...
// Consumer
public class MappingUpdateConsumer : IConsumer<MappingUpdateMessage>
...
public async Task Consume(ConsumeContext<MappingUpdateMessage> context)
await Task.Run(async () =>
if (context.Message == null)
return;
...
);
// Message class (will have more properties in the future, thus not just using a string consumer)
public class MappingUpdateMessage
public string Message get; set;
Python:
import pika
import json
connection = pika.BlockingConnection(pika.ConnectionParameters('localhost'))
channel = connection.channel()
channel.exchange_declare(exchange='mappingQueue', exchange_type='fanout', durable=True)
message =
"message" :
"message": "Hello World"
,
"messageType": [
"urn:message:MassTransit.Tests:ValueMessage"
]
channel.basic_publish(exchange='mappingQueue',
routing_key='mappingQueue',
body=json.dumps(message))
connection.close()
print("sent")
【问题讨论】:
【参考方案1】:对于那些有同样问题的人,我最终想通了:
..
config.ReceiveEndpoint("mappingQueue", e =>
e.ClearMessageDeserializers();
e.UseRawJsonSerializer();
e.ConfigureConsumer<MappingUpdateConsumer>(provider);
);
...
【讨论】:
以上是关于Python 发布到 ASP.NET Core 服务使用的 RabbitMQ 交换/队列的主要内容,如果未能解决你的问题,请参考以下文章
ASP.NET Core2.2+Quartz.Net 实现web定时任务
将文件从 ASP.NET Core Web api 发布到另一个 ASP.NET Core Web api
linux 布署Asp.net Core 6.0 应用 (宝塔面板)
asp.net core发布到iis后出现An error occurred while starting the application