将 RabbitMQ 与 .NET Core WebAPI 一起使用时丢失消息

Posted

技术标签:

【中文标题】将 RabbitMQ 与 .NET Core WebAPI 一起使用时丢失消息【英文标题】:Losing messages when using RabbitMQ with .NET Core WebAPI 【发布时间】:2021-08-31 00:57:30 【问题描述】:

我正在尝试使用 RabbitMQ 从两个 Web API 接收和发送简单消息。现在这是一个非常简单的代码,我正在尝试查看两个 API 是否能够正确地相互通信。问题是我没有收到所有消息,并且无法在我丢失的消息和收到的消息之间建立模式。 下面是示例代码。

用于发送消息

public class QueueController : Controller
    
        [HttpGet]
        [Route("send")]
        public async Task<IActionResult> Send()
        
            QueueManager.Send();
            return Ok();
        
   

public class QueueManager
    
        public static string queueName = "test-queue";
        public static int count = 0;
        public static void Send()
        
            var factory = new ConnectionFactory()  HostName = "localhost" ;
            using (var connection = factory.CreateConnection())
            Using (var channel = connection.CreateModel())
            
            var queue = channel.QueueDeclare(queueName,
                                     durable: false,
                                     exclusive: false,
                                     autoDelete: false,
                                     arguments: null);
            count++;
            var message = new  Message = "Sent Message", count = count ;
            var body = JsonSerializer.Serialize(message);
            var queueMessage = Encoding.UTF8.GetBytes(body);
            channel.BasicPublish("", queueName, null, queueMessage);
            
        
    

用于接收消息

public class QueueController : Controller
    
        [HttpGet]
        [Route("receive")]
        public async Task<IActionResult> Receive()
        
            QueueManager.Receive();
            return Ok();
        
    

public class QueueManager
    
        public static string queueName = "test-queue";
        public static void Receive()
        
            var factory = new ConnectionFactory()  HostName = "localhost" ;
            using (var connection = factory.CreateConnection())
            using (var channel = connection.CreateModel())
            
                channel.QueueDeclare(queueName,
                                         durable: false,
                                         exclusive: false,
                                         autoDelete: false,
                                         arguments: null);
                var consumer = new EventingBasicConsumer(channel);
                consumer.Received += (model, ea) =>
                
                    var msg = ea.Body.ToArray();
                    var message = Encoding.UTF8.GetString(msg);
                    Console.WriteLine(message);
                ;
                channel.BasicConsume(queueName, true, consumer);
            
        
    

当我检查接收器 API 的控制台时,消息计数是随机的。例如,当我发送 7 条消息时,我收到的消息是数字 2,3 和 7。所以我丢失了 7 条消息中的 4 条。不知道这里有什么问题。 另外,当我检查管理控制台时,我可以看到只有当我在接收方 API 中调用端点时队列才被清空,但是消息仍然没有出现在控制台中。任何帮助将不胜感激。

【问题讨论】:

请注意,当我使用控制台应用程序接收消息时,这些不是问题,只有当我进行 API 调用以读取队列时。 【参考方案1】:

我创建了两个解决方案并在其中使用了您的代码,但它不起作用。我是这样改的。

你的生产者类:

public class QueueManager2

    public static string queueName = "test-queue";
    public static int count = 0;
    public static void Send()
    
        var factory = new ConnectionFactory()  HostName = "localhost" ;
        using (var connection = factory.CreateConnection())
        using (var channel = connection.CreateModel())
        
            channel.QueueDeclare(queue: queueName,
                                 durable: false,
                                 exclusive: false,
                                 autoDelete: false,
                                 arguments: null);

            count++;
            var message = new  Message = "Sent Message", count = count ;

            var body = JsonSerializer.Serialize(message);
            var queueMessage = Encoding.UTF8.GetBytes(body);
             
            channel.BasicPublish(exchange: "",
                                 routingKey: queueName,
                                 basicProperties: null,
                                 body: queueMessage);
        
    

和你的消费阶层:

public class QueueManager2

    public static string queueName = "test-queue";
    public static void Receive()
    
        var factory = new ConnectionFactory()  HostName = "localhost" ;
         var rabbitMqConnection = factory.CreateConnection();
        var rabbitMqChannel = rabbitMqConnection.CreateModel();

        rabbitMqChannel.QueueDeclare(queue: queueName,
                             durable: false,
                             exclusive: false,
                             autoDelete: false,
                             arguments: null);

        rabbitMqChannel.BasicQos(prefetchSize: 0, prefetchCount: 1, global: false);

        int messageCount = Convert.ToInt16(rabbitMqChannel.MessageCount(queueName));            

        var consumer = new EventingBasicConsumer(rabbitMqChannel);
        consumer.Received += (model, ea) =>
        
            var body = ea.Body;
            var message = Encoding.UTF8.GetString(body.ToArray());                
            rabbitMqChannel.BasicAck(deliveryTag: ea.DeliveryTag, multiple: false);
            
        ;
        rabbitMqChannel.BasicConsume(queue: queueName,
                             autoAck: false,
                             consumer: consumer); 
    
 

现在,如果您运行您的项目,您可以生成一条消息并使用它。实际上,对于消费者来说,最好有一个始终处于运行模式的托管服务。拥有一个获取消息的端点并不是一个好主意

【讨论】:

非常感谢。关于托管服务,我明白您的意思,我想做的是按需使用队列,但是似乎让消息留在队列中直到我真正想要它们可能不是最好的主意。我看到您检查了消息计数但从未真正使用它,有什么原因,还是您在发布解决方案时忘记删除它? 是的,很遗憾,我忘了提及消息数。而且我认为使用消息队列作为持久性并不是一个好主意。

以上是关于将 RabbitMQ 与 .NET Core WebAPI 一起使用时丢失消息的主要内容,如果未能解决你的问题,请参考以下文章

.Net Core&RabbitMQ限制循环消费

.Net Core&RabbitMQ消息转发可靠机制(上)

2021-07-03 .NET高级班 84-ASP.NET Core RabbitMQ群集安装

.Net Core&RabbitMQ基本使用

《ASP.NET Core 6框架揭秘》实例演示[25]:配置与承载环境的应用

.NET Core 微服务使用 RabbitMQ [关闭]