单应用下RabbitMQ如何保证线程安全,及多应用下抢数据问题

Posted molashaonian

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了单应用下RabbitMQ如何保证线程安全,及多应用下抢数据问题相关的知识,希望对你有一定的参考价值。

消费RabbitMQ时的注意事项,如何禁止大量的消息涌到Consumer,保证线程安全:

按照官网提供的订阅型写法( Retrieving Messages By Subscription ("push API") ) 我发现,RabbitMQ服务器会在短时间内发送大量的消息给Consumer,然后,如果你没有来得及Ack的话,那么服务端会积压大量的UnAcked消息,而Consumer如果来不急处理也会处于假死(也可能引起程序崩溃)。


仅有两个Channel,结果积压了大量的UnAcked消息。

这明显是与我们的目的不一致,我们不能保证Consumer一 定会及时快速的处理消息。所以这种方式带来的后果就是Consmer崩溃后,UnAcked消息又ReQueue,这肯定会消耗MQ的宝贵资源。

我试图在官网上找到一种方法,让每条消息明确的Ack后再接受下一条。但是没有。好在在 gitbooks.io/rabbitmq-quick/  这儿找到了,通过设置Channel的QOS即可

var channel = Connect.CreateModel();
channel.BasicQos(0,1,false); //RabbitMQ客户端接受消息最大数量

设置后的结果:

单应用下RabbitMQ如何保证线程安全,及多应用下抢数据问题

在开启4个Consumer的情况下,每条消息处理要耗时2秒。然后问题解决了。Unacked的消息只有4个。

让每条消息明确的Ack后再接受下一条,通过这种方式不仅可以解决积压大量消息的问题,还可以防止多条消息同时涌入而导致的多线程不安全问题。

多应用:

当然在单应用下上面的方案已经可以解决大部分问题,但是在多应用下似乎不太理想。

刚开始是单应用服务,但是后来用户量并发量提高了,渐渐地单应用服务扛不住了,就通过nginx负载均衡分担到另外一台服务机,此时就是双应用了。这样一来有两个应用同时监听同一个消息队列,有两个消费者。

消息队列的处理机制:一个消息队列里面的一条消息只会仅且发送到一个消费者里面,等待该消费者Ack,一条消息就完成它的使命了。

本以为这样子多应用下也不会出现什么问题,但是问题却出在别的地方了。

(注:应用A与应用B同库)当第一条消息分发到 应用A,第二条消息分发到 应用B,他们几乎是同时进行,消息A在应用A处理事务,消息B在应用B处理事务,他们都需要获取数据库中的某条数据并且更新其状态,由于A没处理完数据状态未改,B因此也获得了相同的一条数据,导致A,B抢了同一条数据做处理了。由于应用A,应用B都是独立的服务,所以单应用下的在代码里面加同步锁这些对他们毫无作用。

A,B同库,试着利用数据库本身的读写锁机制,进一步优化

在insert之前在做一次判断,如果更新成功则insert,否则表明该数据已经被更新了,放弃insert。

加了这种判断方式确实减少了很多抢数据问题,但并不能完全解决这个问题,毕竟数据库锁只是在读写的时候对一条数据的加锁,操作完了释放,但由于同步性太高,并不能解决现在的问题。

最后考虑到两个应用都是完全一样的,所以就干掉一个应用的消息消费者,只保留一个消费者,这样就可以回归到单应用的情景了。


总结:

这也是为什么在分布式,微服务下分布式事务的必要性和重要性,目前分布式事务主要通过 MQ+事件表、业务补偿、TCC、对账等方式实现,但都不好做,所以尽量避免。在使用分布式,微服务带来的方便同时,也得为事务的四个特性(原子性,一致性,隔离性,持久性)付出代价。

关注公众号,分享干货,讨论技术


以上是关于单应用下RabbitMQ如何保证线程安全,及多应用下抢数据问题的主要内容,如果未能解决你的问题,请参考以下文章

queue非线程安全及多线程解决的方法

Java线程及多线程技术及应用

NodeJS 单线程 如何保证其安全,稳定性?

如何创建线程?如何保证线程安全?

三单例模式详解

select2插件单选及多选应用