使用线程处理事件 RabbitMQ 消费者

Posted

技术标签:

【中文标题】使用线程处理事件 RabbitMQ 消费者【英文标题】:Working on events with threads RabbitMQ consumer 【发布时间】:2020-04-21 05:00:19 【问题描述】:

我在使用 RabbitMQ 点网库的 C# 中的不同队列上有两个消费者。

我想要什么:

由于某些业务逻辑,我不得不在一个消费者中等待一段时间 所以我为此使用了Thread.Sleep()

问题

如果我在一个事件中使用Thread.Sleep,第二个线程也不会暂停

我的代码:

consumer.Received += (model, ea) =>

    try
    
        DRModel drModel = JsonConvert.DeserializeObject<DRModel>(Encoding.UTF8.GetString(ea.Body));
        RMQReturnType type = ProcessSubmitSMS(drModel);
        if (type == RMQReturnType.ACK)
            channel.BasicAck(ea.DeliveryTag, false);
        else
         
            channel.BasicNack(ea.DeliveryTag, false, true);
            Thread.Sleep(300000); // <=== SLEEP
        
    
    catch (Exception ex)
    
        channel.BasicNack(ea.DeliveryTag, false, true);
        WriteLog(ControlChoice.ListError, "Exception: " + ex.Message + " | Stack Trace: " + ex.StackTrace.ToString() + " | [Consumer Event]");
    
;


【问题讨论】:

这个linke如何注册你的回调并返回task而不是使用Thread.sleep() 但是你为什么需要Sleep 呢? 一些业务逻辑 【参考方案1】:

这似乎是Mutex 类的好案例,您需要的是多线程中的条件睡眠。不确切知道您需要的逻辑,但您的代码类似于以下代码:

public class Consumer

    public event EventHandler Received;

    public virtual void OnReceived()
    
        Received?.Invoke(this, EventArgs.Empty);
    


class Program

    static void Main(string[] args)
    
        var mutex = new Mutex();

        var consumer =  new Consumer();

        consumer.Received += (model, ea) =>
        
            try
            
                mutex.WaitOne();
                var id = Guid.NewGuid().ToString();
                Console.WriteLine($"Start mutex id");
                Console.WriteLine($"Mutex finished id");
                Console.WriteLine($"Start sleep id");
                if ( new Random().Next(10000)  % 2 == 0) // randomly sleep, that your condition
                
                    Thread.Sleep(3000); // <=== SLEEP
                
                Console.WriteLine($"Sleep finished id");
            
            catch (Exception ex)
            
                mutex.ReleaseMutex(); // this is where you release, if something goes wrong
            
            finally
            
                mutex.ReleaseMutex();// always release it
            
        ;


        Parallel.For(0, 10, t =>   //running 10 threads in parallel and stops all if the condition is true
        
            consumer.OnReceived();
        );

        Console.ReadLine();
    


【讨论】:

根据我的理解 Thread.Sleep() 睡眠当前线程,在我的情况下,它应该是睡眠当前事件,为什么它会睡眠其他事件。 Mutex 可以工作,你只需要在你的代码顶部启动它,请查看我编辑的代码示例。如果条件为真,它将停止【参考方案2】:

我理解的代码中有一些逻辑错误。 在rabbitmq中 我在两个不同的渠道上创建了两个消费者事件,所以我认为它不会在这里共享我错了一个连接是共享的黑白通道,所以我明确定义了两个连接。 我认为 消费者阻塞通道和通道阻塞连接和连接在两个事件中是相同的。

【讨论】:

以上是关于使用线程处理事件 RabbitMQ 消费者的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 跨多个队列的多个消费者 - 消息延迟处理

rabbitmq消费者生产者实践

检查rabbitmq中是不是存在指定名称的Exchange

微服务之集成下

Netty事件监听和处理(上)

事件溯源和处理数据依赖关系