多线程 & 信号量 & 事件

Posted

技术标签:

【中文标题】多线程 & 信号量 & 事件【英文标题】:Multi Thread & Semaphore & Events 【发布时间】:2018-10-28 04:27:48 【问题描述】:

我正在尝试执行一些来自 RabbitMQ 的命令。它大约 5 条消息/秒。所以msg太多了,我必须发送到一个线程执行,但是我没有那么多线程,所以我限制了10个。

所以我们的想法是消息会到达工作线程,放入队列中,10 个线程中的任何一个都会达到峰值并执行。所有这些都使用信号量。

经过一些实验,我不知道为什么,但是我的线程只执行了 3 或 4 个项目,之后它就停止了,没有错误......

我认为的问题是事件调用方法执行时的逻辑,无法更好地思考......

为什么只处理前 4 个消息??

什么模式或更好的方法来做到这一点?

这是我的代码的一些部分:

const int MaxThreads = 10;
private static Semaphore sem = new Semaphore(MaxThreads, MaxThreads);
private static Queue<BasicDeliverEventArgs> queue = new Queue<BasicDeliverEventArgs>();

static void Main(string[] args)

consumer.Received += (sender, ea) =>
               
                var m = JsonConvert.DeserializeObject<Mail>(ea.Body.GetString());
                Console.WriteLine($"Sub-> m.Subject");
                queue.Enqueue(ea);
                RUN();
              ;

            channel.BasicConsume(queueName, false, consumer);

            Console.Read();


private static void RUN()

            while (queue.Count > 0)
            
                sem.WaitOne();
                var item = queue.Dequeue();
                ThreadPool.QueueUserWorkItem(sendmail, item);
            


private static void sendmail(Object item)


//.....soem processing stuff....

//tell rabbitMq that everything was OK
channel.BasicAck(deliveryTag: x.DeliveryTag, multiple: true);

//release thread
sem.Release();


【问题讨论】:

【参考方案1】:

我认为您可以在这里使用阻塞集合。它将简化代码。 所以你的电子邮件发件人看起来像这样:

public class ParallelEmailSender : IDisposable

    private readonly BlockingCollection<string> blockingCollection;

    public ParallelEmailSender(int threadsCount)
    
        blockingCollection = new BlockingCollection<string>(new ConcurrentQueue<string>());
        for (int i = 0; i < threadsCount; i++)
        
            Task.Factory.StartNew(SendInternal);
        
    

    public void Send(string message)
    
        blockingCollection.Add(message);
    

    private void SendInternal()
    
        foreach (string message in blockingCollection.GetConsumingEnumerable())
        
            // send method
        
    

    public void Dispose()
    
        blockingCollection.CompleteAdding();
    

当然,您需要添加错误捕获逻辑,您还可以通过使用取消令牌来改进应用关闭过程。

我强烈建议阅读约瑟夫·阿尔巴哈里 (Joseph Albahari) 撰写的精彩 e-book about multithreading programming。

【讨论】:

谢谢!但是一个愚蠢的问题...... SendInternal 中的消息是线程安全的吗? 您可能会使用一些代表消息的对象,但只有一个线程会收到该消息。发送代码当然应该是线程安全的,因为它将被多个线程调用。 另一点...我应该在 RabbitMQ 发送的事件中实例化类 ParallelEmailSender 吗?如果是这样就没有意义,因为当事件被触发时它只发送一条消息,所以每条消息都会使类内部有 XX 个线程,并且想法是将每条消息放在不同的线程/并行集合中...... 。 我对么?看看我做了什么gist.github.com/PtkFerraro/14385ec8426d5b0bfd9cdee84dd30989 你应该有一个ParallelEmailSender 的实例,当你收到来自RabbitMQ 的消息时,你只需调用Send 方法。当然,您需要稍微修改ParallelEmailSender,以便它知道如何实际发送电子邮件。

以上是关于多线程 & 信号量 & 事件的主要内容,如果未能解决你的问题,请参考以下文章

Python学习笔记——进阶篇第八周———Socket编程进阶&多线程多进程

C++1函数重载,类和对象,引用,/string类,vector容器,类继承,类多态,/socket,进程&信号,多线程&线程同步

python开发线程:死锁和递归锁&信号量&定时器&线程queue&事件evevt

day18-多线程&线程同步&死锁

day18-多线程&线程同步&死锁

day18-多线程&线程同步&死锁