微服务专题之.Net6下集成消息队列-RabbitMQ交换机模式代码演示(全)

Posted dotNET跨平台

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了微服务专题之.Net6下集成消息队列-RabbitMQ交换机模式代码演示(全)相关的知识,希望对你有一定的参考价值。

微信公众号:趣编程ACE
关注可了解更多的.NET日常实战开发技巧,如需源码 请公众号后台留言 源码;
[如果觉得本公众号对您有帮助,欢迎关注]

.Net中RabbitMQ中交换机模式的使用

前文回顾

【微服务专题之】.Net6下集成消息队列上-RabbitMQ

【微服务专题之】.Net6下集成消息队列2-RabbitMQ

【微服务专题之】.Net6中集成消息队列-RabbitMQ中直接路由模式

TopicExchange 交换机模式

如果我们需要将一条信息发送到多个队列上,假若利用直连模式,那么就会有很多的路由,而TopicExchange只需要配置定义好的路由规则,即可省略多余的路由指定。
PS:路由规则有一定的约束,比如需要采取* .#. * 的格式,用.号 分隔
其中

  • 1.*表示一个单词

  • 2. #表示任意数量(零个或多个)单词。

消费者程序演示

public static class TopicExchangeReceive
    
        public static void Receive(IModel channel)
        
            channel.ExchangeDeclare("hello-topic-exchange", ExchangeType.Topic);
            channel.QueueDeclare(queue: "hello-topic-queue",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);
            channel.QueueBind("hello-topic-queue", "hello-topic-exchange", "route.*");
            //channel.QueueBind("hello", "hello-direct-exchange", "route2");

            // 创建一个消费者基本事件
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received 0", message);
            ;
            channel.BasicConsume(queue: "hello-topic-queue",
                                 // 自动确认
                                 autoAck: true,
                                 consumer: consumer);

            //channel.BasicConsume(queue: "hello",
            //                     // 自动确认
            //                     autoAck: true,
            //                     consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        
    

生产者程序演示

public static class TopicExchangeSend
    
        public static void Send(IModel channel)
        
            channel.ExchangeDeclare("hello-topic-exchange", ExchangeType.Topic);
            var count = 0;
            while (true)
            
                Thread.Sleep(1000);
                // 发送的消息
                string message = $"Hello World count";
                var body = Encoding.UTF8.GetBytes(message);
                //var body2 = Encoding.UTF8.GetBytes(message + "body2");

                // 基本发布 不指定交换
                channel.BasicPublish(exchange: "hello-topic-exchange",
                                     // 路由键   就是队列名称
                                     routingKey: "route.2",
                                     // 基础属性
                                     basicProperties: null,
                                     // 传递的消息体
                                     body: body);

                //channel.BasicPublish(exchange: "hello-direct-exchange",
                //                     // 路由键   就是队列名称
                //                     routingKey: "route2",
                //                     // 基础属性
                //                     basicProperties: null,
                //                     // 传递的消息体
                //                     body: body2);
                count++;
                Console.WriteLine(" [x] sent 0", message);
            
        
    

效果展示


Headers Exchange 模式

其实这种模式类似与Http请求里面的headers,我们定义一个字典然后通过交换机携带,作为交换机与路由器交换媒介,相比于Topic Exchange交换模式,header定义的格式类型更加丰富。

消费者程序演示

public static class HeadersExchangeReceive
    
        public static void Receive(IModel channel)
        
            channel.ExchangeDeclare("hello-headers-exchange", ExchangeType.Headers);
            channel.QueueDeclare(queue: "hello-headers-queue",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

            var headers = new Dictionary<string, object>()
            
                "test","A" 
            ;
            channel.QueueBind("hello-headers-queue", "hello-headers-exchange", String.Empty, headers);
            //channel.QueueBind("hello", "hello-direct-exchange", "route2");

            // 创建一个消费者基本事件
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received 0", message);
            ;
            channel.BasicConsume(queue: "hello-headers-queue",
                                 // 自动确认
                                 autoAck: true,
                                 consumer: consumer);

            //channel.BasicConsume(queue: "hello",
            //                     // 自动确认
            //                     autoAck: true,
            //                     consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        
    

生产者程序演示

public static void Send(IModel channel)
        
            channel.ExchangeDeclare("hello-headers-exchange", ExchangeType.Headers);
            var count = 0;
            while (true)
            
                Thread.Sleep(1000);
                // 发送的消息
                string message = $"Hello World count";
                var body = Encoding.UTF8.GetBytes(message);
                //var body2 = Encoding.UTF8.GetBytes(message + "body2");


                var basicProperties = channel.CreateBasicProperties();
                basicProperties.Headers = new Dictionary<string, object>()
                
                    "test" ,"A"
                ;
                // 基本发布 不指定交换
                channel.BasicPublish(exchange: "hello-headers-exchange",
                                     // 路由键   就是队列名称
                                     routingKey: String.Empty,
                                     // 基础属性
                                     basicProperties,
                                     // 传递的消息体
                                     body: body);

                //channel.BasicPublish(exchange: "hello-direct-exchange",
                //                     // 路由键   就是队列名称
                //                     routingKey: "route2",
                //                     // 基础属性
                //                     basicProperties: null,
                //                     // 传递的消息体
                //                     body: body2);
                count++;
                Console.WriteLine(" [x] sent 0", message);
            
        

效果演示


FanoutExchange 模式

扇形交换机模式是最傻瓜的一种交换模式,总的而言只要将队列绑定到交换机上,就能做到消息互通了。当然了这种机制处理事件的速度也是所有交换机类型里面最快的。

消费者程序演示

public static class FanoutExchangeReceive
    
        public static void Receive(IModel channel)
        
            channel.ExchangeDeclare("hello-fanout-exchange", ExchangeType.Fanout);
            channel.QueueDeclare(queue: "hello-fanout-queue",
                        durable: true,
                        exclusive: false,
                        autoDelete: false,
                        arguments: null);

            //var headers = new Dictionary<string, object>()
            //
            //    "test","A" 
            //;
            channel.QueueBind("hello-fanout-queue", "hello-fanout-exchange", String.Empty);
            //channel.QueueBind("hello", "hello-direct-exchange", "route2");

            // 创建一个消费者基本事件
            var consumer = new EventingBasicConsumer(channel);
            consumer.Received += (model, ea) =>
            
                var body = ea.Body.ToArray();
                var message = Encoding.UTF8.GetString(body);
                Console.WriteLine(" [x] Received 0", message);
            ;
            channel.BasicConsume(queue: "hello-fanout-queue",
                                 // 自动确认
                                 autoAck: true,
                                 consumer: consumer);

            //channel.BasicConsume(queue: "hello",
            //                     // 自动确认
            //                     autoAck: true,
            //                     consumer: consumer);

            Console.WriteLine(" Press [enter] to exit.");
            Console.ReadLine();
        

生产者程序演示

public static class FanoutExchangeSend
    
        public static void Send(IModel channel)
        
            channel.ExchangeDeclare("hello-fanout-exchange", ExchangeType.Fanout);
            var count = 0;
            while (true)
            
                Thread.Sleep(1000);
                // 发送的消息
                string message = $"Hello World count";
                var body = Encoding.UTF8.GetBytes(message);
                //var body2 = Encoding.UTF8.GetBytes(message + "body2");


                //var basicProperties = channel.CreateBasicProperties();
                //basicProperties.Headers = new Dictionary<string, object>()
                //
                //    "test" ,"A"
                //;
                // 基本发布 不指定交换
                channel.BasicPublish(exchange: "hello-fanout-exchange",
                                     // 路由键   就是队列名称
                                     routingKey: String.Empty,
                                     // 基础属性
                                     null,
                                     // 传递的消息体
                                     body: body);

                //channel.BasicPublish(exchange: "hello-direct-exchange",
                //                     // 路由键   就是队列名称
                //                     routingKey: "route2",
                //                     // 基础属性
                //                     basicProperties: null,
                //                     // 传递的消息体
                //                     body: body2);
                count++;
                Console.WriteLine(" [x] sent 0", message);
            
        
    

效果演示


PS:具体的代码效果演示看视频哦~

以上是关于微服务专题之.Net6下集成消息队列-RabbitMQ交换机模式代码演示(全)的主要内容,如果未能解决你的问题,请参考以下文章

微服务专题之.Net6中集成消息队列-RabbitMQ中直接路由模式

微服务专题之.Net6中集成消息队列-RabbitMQ中直接路由模式

微服务专题之.Net6下集成微服务网关-Ocelot

微服务专题之.Net6下集成微服务网关-Ocelot

spring cloudgradle父子项目微服务框架搭建---rabbitMQ延时队列

flask 之 rabbit