RabbitMQ 和 MassTransit 之间的连接

Posted

技术标签:

【中文标题】RabbitMQ 和 MassTransit 之间的连接【英文标题】:Connection between RabbitMQ and MassTransit 【发布时间】:2019-01-29 20:30:09 【问题描述】:

好的,我有以下问题,这只是一个基本的测试程序,但我已经在苦苦挣扎了。我对此很陌生,还没有找到类似的场景。

基本上,我需要将消息发布到特定队列(更进一步,但关键是我们需要一个固定的拓扑结构),据我了解,我无法使用 MassTransit 做到这一点。

我现在的想法是使用裸 RabbitMQ 将消息发布到队列,然后使用 MassTransit 从该队列中读取。

所以我现在有 2 个非常基本的控制台应用程序,其中 TestPublisher 是仅使用 RabbitMQ 的发布者,而 TestSubscriber 是使用 MassTransit 的消费者。

TestPublisher:

Program.cs

using System;
using RabbitMQ.Client;
using System.Text;
using TestPublisher;

class Program

    public static void Main()
    
        var factory = new ConnectionFactory()  HostName = "localhost", VirtualHost = "testvhost" ;
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        


            var message = new TestMessage() message = "Test";
            var body = Encoding.UTF8.GetBytes(message.message);

            channel.BasicPublish(exchange: "MTQueue", routingKey: "MTQueue", basicProperties: null, body: body);
            Console.WriteLine(" [x] Sent 0", message);
        

        Console.WriteLine(" Press [enter] to exit.");
        Console.ReadLine();
    

TestMessage.cs

using Contracts;
using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;

namespace TestPublisher

    class TestMessage : MessageDelivery
    
        public string message  get; set; 

    

测试订阅者

Program.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using MassTransit;
using MassTransit.Log4NetIntegration.Logging;

namespace TestSubscriber

    class Program
    
        static void Main(string[] args)
        
            Log4NetLogger.Use();
            var bus = Bus.Factory.CreateUsingRabbitMq(x =>
            
                var host = x.Host(new Uri("rabbitmq://localhost/testvhost/"), h =>  );
                x.ReceiveEndpoint(host, "MTQueue", e => e.Consumer<TestMessageConsumer>());
            );
            bus.Start();
            Console.ReadKey();
            bus.Stop();

        
    

TestMessageConsumer.cs

using System;
using System.Collections.Generic;
using System.Linq;
using System.Text;
using System.Threading.Tasks;
using Contracts;
using MassTransit;

namespace TestSubscriber

    class TestMessageConsumer : IConsumer<MessageDelivery>
    
        public Task Consume(ConsumeContext<MessageDelivery> context)
        
            Console.Write("TEXT: " + context.Message);
            Console.Write(" VERARBEITET: " + DateTime.Now);
            Console.WriteLine(" (" + System.Threading.Thread.CurrentThread.ManagedThreadId + ")");
            return Task.FromResult(0);
        
    

合同

Contracts.cs

using System;

namespace Contracts

    public interface MessageDelivery
    
        string message  get; 
    

我可以毫无问题地用TestPublisher发布到队列,消息会在正确的队列中,使用RabbitMQ管理接口来验证。

我现在的问题是,因为 RabbitMQ 只允许我使用 BasicPublish 发布字节数组,所以我无法使用 TestSubscriber 从队列中获取消息,因为消费者期望 MessageDelivery 类型的消息并且不会让我将 ConsumeContext 或 IConsumer 配置为侦听 byte[]

我真的被困在这里,无法找到解决方案,无论是在 RabbitMQ 方面还是在 MassTransit 方面。

【问题讨论】:

基本上,我们需要知道你做了什么,失败了什么,你想要做什么(成功是什么样子)。如果您真的不知道自己在做什么,您应该查看很多教程,然后发布更具体的问题。 正如我所说,我发布了“Hello World!”仅使用 RabbitMQ 通过 TestPublisher 将消息放入队列。使用 RabbitMQ 管理 UI 检查队列,它显示消息确实已发布到队列。现在失败的是大众运输的消费部分。如您所见,这是一个非常简单的程序,基本上 TestSubscriber 在启动时应该做的所有事情,就是从队列中获取消息并将其写入命令行。但是,当我启动它时,没有显示任何消息并使用管理 ui 检查,MTQueue 队列中不再有消息,而是在 MTQueue_error 队列,输出如下:MT-Fault-ExceptionType:Newtonsoft.Json.JsonReaderException MT-Fault-Message:解析值时遇到意外字符:T. Path '',第 0 行,第 0 位。而且我无法让 TestSubscriber 使用我的消息。 【参考方案1】:

MassTransit 使用 JSON 序列化,它需要预定义格式的消息。您描述的用例不是 MassTransit 的用途但是,如果您想这样做,请查看文档中的互操作性部分:http://masstransit-project.com/MassTransit/advanced/interoperability.html

这是来自那里的示例消息(JSON 有效负载):


    "destinationAddress": "rabbitmq://localhost/input_queue",
    "headers": ,
    "message": 
        "value": "Some Value",
        "customerId": 27
    ,
    "messageType": [
        "urn:message:MassTransit.Tests:ValueMessage"
    ]

【讨论】:

我正要发布这个,是的。消息必须是属性序列化的,ContentType(在基本属性中)也要设置(但默认是尽量使用JSON)。 我明白了,感谢您澄清这一点。但是,RabbitMQ 中的 BasicPublish 只会让我发布类型为 byte[] 的数据,有什么解决方法吗? @nico 你只需要使用Encoding.UTF8.GetBytes(JsonConvert.SerializeObject(message)); 您还可以通过以下来源查看 MassTransit 的功能:github.com/MassTransit/MassTransit/blob/develop/src/MassTransit/…

以上是关于RabbitMQ 和 MassTransit 之间的连接的主要内容,如果未能解决你的问题,请参考以下文章

我们可以通过 masstransit 一起使用 RabbitMQ 和 Mediatr 吗?

是否可以使用 MassTransit 为 RabbitMQ 队列注册多个消费者?

MassTransit / RabbitMQ - 为啥跳过这么多消息?

.Net Core RabbitMQ/Masstransit 同一应用程序中每个可配置线程数一个消费者

MassTransit | .NET 分布式应用框架

MassTransit一个优秀的.NET消息(事件)总线框架