体验Rabbitmq强大的优先级队列之轻松面对现实业务场景

Posted dotNET跨平台

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了体验Rabbitmq强大的优先级队列之轻松面对现实业务场景相关的知识,希望对你有一定的参考价值。

        说到队列的话,大家一定不会陌生,但是扯到优先级队列的话,还是有一部分同学是不清楚的,可能是不知道怎么去实现吧,其实呢,,,这东西已经烂大街了。。。很简单,用“堆”去实现的,在我们系统中有一个订单催付的场景,我们客户的客户在tmall,taobao下的订单,taobao会及时将订单推送给我们,如果在用户设定的时间内未付款那么就会给用户推送一条短信提醒,很简单的一个功能对吧,但是,tmall商家对我们来说,肯定是要分大客户和小客户的对吧,比如像施华蔻,百雀林这样大商家一年起码能够给我们贡献几百万,所以理应当然,他们的订单必须得到优先处理,而曾经我们的后端系统是使用redis来存放的定时轮询,大家都知道redis只能用List做一个简简单单的消息队列,并不能实现一个优先级的场景,所以订单量大了后采用rabbitmq进行改造和优化,如果发现是大客户的订单给一个相对比较高的优先级,否则就是默认优先级,好了,废话不多说,我们来看看如何去设置。 

一:优先级队列

  既然是优先级队列,那么必然要在Queue上开一个口子贴上一个优先级的标签,为了看怎么设置,我们用一下rabbitmq的监控UI,看看这个里面是如何

手工的创建优先级队列。

      从这个图中可以看到在Arguments栏中有特别多的小属性,其中有一项就是"Maximum priority",这项的意思就是说可以定义优先级的最大值,其实

想想也是,不可能我们定义的优先级是一个非常大的数字,比如int.MaxValue,大多情况下都是10以内的数字就可以了,再或者我们曾今接触过的 MSMQ,

它的优先级只是一些枚举值,什么High,Normal,Low,不知道大家可否记得? 下面来看下代码中该如何实现呢??? 

1. 在Queue上附加优先级属性

Dictionary<string, object> dic = new Dictionary<string, object>();
dic.Add("x-max-priority", 20);
channel.QueueDeclare(queue: "hello",
                                durable: true,
                                exclusive: false,
                                autoDelete: false,
                                arguments: dic);

 上面的代码做了一个简单的队列声明,queuename="hello",持久化,排外。。。然后把"x-max-priority"塞入到字典中作为arguments参数,看起来还

是非常简单吧~~~ 

2. 在Message上指定优先级属性

 var properties = channel.CreateBasicProperties();
 properties.Priority = 1;
 channel.BasicPublish(exchange: "",
                      routingKey: "hello",
                      basicProperties: null,
                      body: body);

体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景

 

通过上面的代码可以看到,在Message上设置优先级,我是通过在channel通道上设置Priority属性,之后塞到basicProperties中就可以了,好了,有上面这两

个基础之后,下面就可以开始测试了,准备向rabbitmq推送10条记录,其中第5条的优先级最高,所以应该首先就print出来,如下图:

static void Main(string[] args)
{    var sb = new StringBuilder();    
   
for (int i = 0; i < 11; i++)    {        sb.Append(i);    }  
   
var factory = new ConnectionFactory() { HostName = "192.168.23.136", UserName = "datamip", Password = "datamip" };  
   
using (var connection = factory.CreateConnection())    {      
       
using (var channel = connection.CreateModel())        {            channel.ExchangeDeclare(exchange: "mydirect", type: ExchangeType.Direct, durable: true);            Dictionary<string, object> dic = new Dictionary<string, object>();            dic.Add("x-max-priority", 20);      
           
for (int i = 0; i < 10; i++)            {                channel.QueueDeclare(queue: "hello",                                            durable: true,                                            exclusive: false,                                            autoDelete: false,                                            arguments: dic);              
       
string message = string.Format("{0} {1}", i, sb.ToString());                

var body = Encoding.UTF8.GetBytes(message);                var properties = channel.CreateBasicProperties();                properties.Priority = (i == 5) ? (byte)10 : (byte)i;                channel.BasicPublish(exchange: "",                                     routingKey: "hello",                                     basicProperties: properties,                                     body: body);                Console.WriteLine(" [x] Sent {0}", i);            }        }    }    Console.WriteLine(" Press [enter] to exit.");    Console.ReadLine(); }

体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景

 

图中可以看到10条消息我都送到rabbitmq中去了,接下来打开consume端,来看看所谓的index=5 是否第一个送出来??

static void Main(string[] args)
{    
   for (int m = 0; m < int.MaxValue; m++)    {    
       
var factory = new ConnectionFactory() { HostName = "192.168.23.136", UserName = "datamip", Password = "datamip" };      
     
using (var connection = factory.CreateConnection())       using (var channel = connection.CreateModel())        {        
       
var result = channel.BasicGet("hello", true);             if (result != null)            {          
         
var str = Encoding.UTF8.GetString(result.Body);                Console.WriteLine("{0}  消息内容 {1}", m, str);                System.Threading.Thread.Sleep(1);            }        }    }    Console.WriteLine(" Press [enter] to exit.");    Console.ReadLine(); }

体验Rabbitmq强大的【优先级队列】之轻松面对现实业务场景 

一切都是这么的完美,接下来为了进行可视化验证,你可以在WebUI中观察观察,可以发现在Queue上面多了一个 Pri 标记,有意思吧。

 

 

好了,这么重要的功能,是不是已经让你足够兴奋啦, 希望大家能够好好的在实际场景中运用吧

 
   
   
 

.NET社区新闻,深度好文,微信中搜索dotNET跨平台或扫描二维码关注

以上是关于体验Rabbitmq强大的优先级队列之轻松面对现实业务场景的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ学习笔记五:RabbitMQ之优先级消息队列

RabbitMq之优先队列

Rabbitmq之发布确认高级回退消息备份交换机幂等性优先级队列惰性队列

RabbitMQ 初体验

分布式消息通信之RabbitMQ_01

Rabbitmq业务难点