RabbitMQ 和 MassTransit 之间的连接
Posted
技术标签:
【中文标题】RabbitMQ 和 MassTransit 之间的连接【英文标题】:Connection between RabbitMQ and MassTransit 【发布时间】:2019-01-29 20:30:09 【问题描述】:好的,我有以下问题,这只是一个基本的测试程序,但我已经在苦苦挣扎了。我对此很陌生,还没有找到类似的场景。
基本上,我需要将消息发布到特定队列(更进一步,但关键是我们需要一个固定的拓扑结构),据我了解,我无法使用 MassTransit 做到这一点。
我现在的想法是使用裸 RabbitMQ 将消息发布到队列,然后使用 MassTransit 从该队列中读取。
所以我现在有 2 个非常基本的控制台应用程序,其中 TestPublisher 是仅使用 RabbitMQ 的发布者,而 TestSubscriber 是使用 MassTransit 的消费者。
TestPublisher:
Program.cs
using System;
using RabbitMQ.Client;
using System.Text;
using TestPublisher;
class Program
public static void Main()
var factory = new ConnectionFactory() HostName = "localhost", VirtualHost = "testvhost" ;
using (var connection = factory.CreateConnection())
using (var channel = connection.CreateModel())
var message = new TestMessage() message = "Test";
var body = Encoding.UTF8.GetBytes(message.message);
channel.BasicPublish(exchange: "MTQueue", routingKey: "MTQueue", basicProperties: null, body: body);
Console.WriteLine(" [x] Sent 0", message);
Console.WriteLine(" Press [enter] to exit.");
Console.ReadLine();
TestMessage.cs
using Contracts;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
namespace TestPublisher
class TestMessage : MessageDelivery
public string message get; set;
测试订阅者
Program.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.Log4NetIntegration.Logging;
namespace TestSubscriber
class Program
static void Main(string[] args)
Log4NetLogger.Use();
var bus = Bus.Factory.CreateUsingRabbitMq(x =>
var host = x.Host(new Uri("rabbitmq://localhost/testvhost/"), h => );
x.ReceiveEndpoint(host, "MTQueue", e => e.Consumer<TestMessageConsumer>());
);
bus.Start();
Console.ReadKey();
bus.Stop();
TestMessageConsumer.cs
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Contracts;
using MassTransit;
namespace TestSubscriber
class TestMessageConsumer : IConsumer<MessageDelivery>
public Task Consume(ConsumeContext<MessageDelivery> context)
Console.Write("TEXT: " + context.Message);
Console.Write(" VERARBEITET: " + DateTime.Now);
Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");
return Task.FromResult(0);
合同
Contracts.cs
using System;
namespace Contracts
public interface MessageDelivery
string message get;
我可以毫无问题地用TestPublisher发布到队列,消息会在正确的队列中,使用RabbitMQ管理接口来验证。
我现在的问题是,因为 RabbitMQ 只允许我使用 BasicPublish 发布字节数组,所以我无法使用 TestSubscriber 从队列中获取消息,因为消费者期望 MessageDelivery 类型的消息并且不会让我将 ConsumeContext 或 IConsumer 配置为侦听 byte[]
我真的被困在这里,无法找到解决方案,无论是在 RabbitMQ 方面还是在 MassTransit 方面。
【问题讨论】:
基本上,我们需要知道你做了什么,失败了什么,你想要做什么(成功是什么样子)。如果您真的不知道自己在做什么,您应该查看很多教程,然后发布更具体的问题。 正如我所说,我发布了“Hello World!”仅使用 RabbitMQ 通过 TestPublisher 将消息放入队列。使用 RabbitMQ 管理 UI 检查队列,它显示消息确实已发布到队列。现在失败的是大众运输的消费部分。如您所见,这是一个非常简单的程序,基本上 TestSubscriber 在启动时应该做的所有事情,就是从队列中获取消息并将其写入命令行。但是,当我启动它时,没有显示任何消息并使用管理 ui 检查,MTQueue 队列中不再有消息,而是在 MTQueue_error 队列,输出如下:MT-Fault-ExceptionType:Newtonsoft.Json.JsonReaderException MT-Fault-Message:解析值时遇到意外字符:T. Path '',第 0 行,第 0 位。而且我无法让 TestSubscriber 使用我的消息。 【参考方案1】:MassTransit 使用 JSON 序列化,它需要预定义格式的消息。您描述的用例不是 MassTransit 的用途但是,如果您想这样做,请查看文档中的互操作性部分:http://masstransit-project.com/MassTransit/advanced/interoperability.html
这是来自那里的示例消息(JSON 有效负载):
"destinationAddress": "rabbitmq://localhost/input_queue",
"headers": ,
"message":
"value": "Some Value",
"customerId": 27
,
"messageType": [
"urn:message:MassTransit.Tests:ValueMessage"
]
【讨论】:
我正要发布这个,是的。消息必须是属性序列化的,ContentType(在基本属性中)也要设置(但默认是尽量使用JSON)。 我明白了,感谢您澄清这一点。但是,RabbitMQ 中的 BasicPublish 只会让我发布类型为 byte[] 的数据,有什么解决方法吗? @nico 你只需要使用Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message));
您还可以通过以下来源查看 MassTransit 的功能:github.com/MassTransit/MassTransit/blob/develop/src/MassTransit/…以上是关于RabbitMQ 和 MassTransit 之间的连接的主要内容,如果未能解决你的问题,请参考以下文章
我们可以通过 masstransit 一起使用 RabbitMQ 和 Mediatr 吗?
是否可以使用 MassTransit 为 RabbitMQ 队列注册多个消费者?
MassTransit / RabbitMQ - 为啥跳过这么多消息?