RabbitMQ中交换机的消息分发机制

Posted fg123

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ中交换机的消息分发机制相关的知识,希望对你有一定的参考价值。

RabbitMQ是一个消息代理,它接受和转发消息,是一个由 Erlang 语言开发的遵循AMQP协议的开源实现。在RabbitMQ中生产者不会将消息直接发送到队列当中,而是将消息直接发送到交换机(exchange),交换机用来接受生产者发送的消息并将这些消息发送给绑定的队列,即:生产者-->交换机-->队列。

在RabbitMQ中最主要的三种交换机:1. fanout(广播交换机)  2. direct(直连交换机)  3. topic(话题交换机)

1. fanout(广播交换机)

 fanout会将接受到的所有消息广播到它所绑定的所有队列当中(每个消费者都会收到所有的消息),对于广播交换机,消息路由键routing_key和队列绑定键routing_key的作用都会被忽略。

fanout生产者:

 

技术分享图片
 1 import pika
 2 
 3 
 4 class RabbitProducer(object):
 5     """
 6     与RabbitMq服务器建立连接
 7     """
 8 
 9     def __init__(self):
10         self.conn = pika.BlockingConnection(
11             pika.ConnectionParameters(host=localhost, port=5672)
12         )
13         self.channel = self.conn.channel()
14 
15         # 声明一个exchange交换机,交换机的类型为fanout广播.
16         self.channel.exchange_declare(
17             exchange=fanout_exchange, exchange_type=fanout, durable=True
18         )
19 
20     def send_msg(self, message):
21         """
22         routing_key:绑定的key
23         :param message:
24         :return:
25         """
26         self.channel.basic_publish(
27             exchange=fanout_exchange,
28             routing_key=‘‘,  # 因为exchange的类型为fanout,所以routing_key的数值在这里将被忽略
29             body=message,
30             properties=pika.BasicProperties(
31                 delivery_mode=2,
32                 # 消息进行持久化(防止服务器挂掉.)===> 如果没有queue绑定到这个exchange交换机,这个参数是没有的.
33             ))
34 
35     def close(self):
36         self.conn.close()
37 
38 
39 if __name__ == "__main__":
40     rabbit_producer = RabbitProducer()
41     for i in range(10):
42         message = hello world {}!.format(i)
43         rabbit_producer.send_msg(message)
View Code

 

消费者consumer1:

 

技术分享图片
 1 import pika
 2 import uuid
 3 
 4 
 5 class RabbitConsumer(object):
 6     """
 7     fanout 消费者1
 8     """
 9 
10     def __init__(self):
11         self.conn = pika.BlockingConnection(
12             pika.ConnectionParameters(host=localhost, port=5672)
13         )
14         self.channel = self.conn.channel()
15 
16         # 声明一个队列queue_consumer1,并进行持久化(防止服务器挂掉),exclusive设置为false
17         self.channel.queue_declare(
18             exclusive=False, durable=True, queue=queue_consumer1
19         )
20 
21         # 声明一个exhange交换机,其类型为fanout广播类型  与生产者的交换机一致
22         self.channel.exchange_declare(
23             exchange=fanout_exchange, exchange_type=fanout, durable=True
24         )
25 
26         # 将队列queue_consumer1与该exchange交换机进行绑定
27         self.channel.queue_bind(exchange=fanout_exchange, queue=queue_consumer1)
28 
29     def call_back(self, method, body):
30         """
31         消费者对消息进行确认,防止消费者挂掉.
32         :param method:
33         :param body:
34         :return:
35         """
36         self.channel.basic_ack(delivery_tag=method.delivery_tag)
37         print(接收到的消息为:{}.format(str(body)))
38 
39     def receive_msg(self):
40         print(consumer1开始接受消息...)
41         # 当上一条消息未确认时,会告知RabbitMQ不要再发送消息给这个消费者了 可以控制流量
42         self.channel.basic_qos(prefetch_count=1)
43         self.channel.basic_consume(
44             consumer_callback=self.call_back,
45             queue=queue_consumer1,
46             no_ack=False,  # 消费者对消息进行确认,防止消费者挂掉
47             consumer_tag=str(uuid.uuid4())
48         )
49 
50     def consume(self):
51         self.receive_msg()
52         self.channel.start_consuming()
53 
54 
55 if __name__ == __main__:
56     rabbit_consumer = RabbitConsumer()
57     rabbit_consumer.consume()
View Code

 

消费者consumer2:

技术分享图片
 1 import pika
 2 import uuid
 3 
 4 
 5 class RabbitConsumer(object):
 6     def __init__(self):
 7         self.conn = pika.BlockingConnection(
 8             pika.ConnectionParameters(host=localhost, port=5672)
 9         )
10         self.channel = self.conn.channel()
11 
12         # 声明一个队列queue_consumer2,并进行持久化(防止服务器挂掉),exclusive设置为false
13         self.channel.queue_declare(
14             exclusive=False, durable=True, queue=queue_consumer2
15         )
16 
17         # T声明一个exhange交换机,其类型为fanout广播类型
18         self.channel.exchange_declare(
19             exchange=fanout_exchange, exchange_type=fanout, durable=True
20         )
21 
22         # 将队列queue_consumer2与该exchange交换机进行绑定
23         self.channel.queue_bind(exchange=fanout_exchange, queue=queue_consumer2)
24 
25     def call_back(self, method, body):
26         """
27         消费者对消息进行确认,防止消费者挂掉.
28         :param method:
29         :param body:
30         :return:
31         """
32         self.channel.basic_ack(delivery_tag=method.delivery_tag)
33         print(接收到的消息为:{}.format(str(body)))
34 
35     def receive_msg(self):
36         print(consumer2开始接受消息...)
37         self.channel.basic_consume(
38             consumer_callback=self.call_back,
39             queue=queue_consumer2,
40             no_ack=False,
41             consumer_tag=str(uuid.uuid4())
42         )
43 
44     def consume(self):
45         self.receive_msg()
46         self.channel.start_consuming()
47 
48 
49 if __name__ == __main__:
50     rabbit_consumer = RabbitConsumer()
51     rabbit_consumer.consume()
View Code

fanout会将接受到的所有消息广播到消费者consumer1和消费者consumer2,交换机的缺陷:它只能无意识的播放,不够灵活地控制消息广播给指定的消费者
 

 2. direct(直连交换机)

对于direct,根据绑定键判定应该将数据发送至哪个队列,消息进入队列,其绑定秘钥(routing_key)与消息的路由秘钥要完全匹配,当exchange使用相同的绑定秘钥(routing_key)去绑定多个队列也是合法的,在这种情况下direct exchange的效果等同于fanout exchange,交换机会将消息广播到所有匹配的队列当中。

direct生产者:
技术分享图片
 1 import pika
 2 
 3 
 4 class RabbitProducer(object):
 5     """
 6     与RabbitMq服务器建立连接
 7     """
 8 
 9     def __init__(self):
10         self.conn = pika.BlockingConnection(
11             pika.ConnectionParameters(host=localhost, port=5672)
12         )
13         self.channel = self.conn.channel()
14 
15         # 声明一个exchange交换机,交换机的类型为direct
16         self.channel.exchange_declare(
17             exchange=direct_exchange, exchange_type=direct, durable=True
18         )
19 
20     def send_msg(self, routing_key, message):
21         """
22         :param routing_key: 消息的路由键 本例中为routing_info
23         :param message: 生成者发送的消息
24         :return:
25         """
26         self.channel.basic_publish(
27             exchange=direct_exchange,
28             routing_key=routing_key,
29             body=message,
30             properties=pika.BasicProperties(
31                 delivery_mode=2,
32                 # 消息进行持久化(防止服务器挂掉.)===> 如果没有queue绑定到这个exchange交换机,这个参数是没有的.
33             ))
34 
35     def close(self):
36         self.conn.close()
37 
38 
39 if __name__ == "__main__":
40     rabbit_producer = RabbitProducer()
41     routing_key = routing_info
42     for i in range(10):
43         message = hello world {}!.format(i)
44         print(生产者发送的消息为:{}.format(message))
45         rabbit_producer.send_msg(routing_key, message)
View Code
direct消费者:
技术分享图片
 1 import pika
 2 import uuid
 3 
 4 
 5 class RabbitConsumer(object):
 6     """
 7     消费者(订阅者)
 8     """
 9 
10     def __init__(self):
11         self.conn = pika.BlockingConnection(
12             pika.ConnectionParameters(host=localhost, port=5672)
13         )
14         self.channel = self.conn.channel()
15 
16         # 消息持久化
17         self.channel.queue_declare(
18             exclusive=False, durable=True, queue=task_queue
19         )
20 
21         # 交换机类型为direct.
22         self.channel.exchange_declare(
23             exchange=direct_exchange, exchange_type=direct, durable=True
24         )
25 
26         # 将队列与该exchange交换机进行绑定
27         routing_keys = [routing_info, aaa]
28         for routing_key in routing_keys:
29             self.channel.queue_bind(
30                 exchange=direct_exchange, queue=task_queue, routing_key=routing_key
31             )  # 如果生产者发生消息的routing_key与消费者绑定队列的routing_key相同则成功发送
32 
33     def call_back(self, channel, method, properties, body):
34         """
35         消费者对消息进行确认,防止消费者挂掉
36         :param channel:
37         :param method:
38         :param properties:
39         :param body:
40         :return:
41         """
42         self.channel.basic_ack(delivery_tag=method.delivery_tag)
43         print(接收到的消息为:{}.format(str(body)))
44 
45     def receive_msg(self):
46         print(开始接受消息...)
47         self.channel.basic_qos(prefetch_count=1)  # TODO 告诉RabbitMQ,不要向我发送新的消息.
48         self.channel.basic_consume(
49             consumer_callback=self.call_back,
50             queue=task_queue,
51             no_ack=False,
52             consumer_tag=str(uuid.uuid4())
53         )
54 
55     def consume(self):
56         self.receive_msg()
57         self.channel.start_consuming()
58 
59 
60 if __name__ == __main__:
61     rabbit_consumer = RabbitConsumer()
62     rabbit_consumer.consume()
View Code

direct直连交换机相当于是fanout的升级版,当消费者的队列
绑定的秘钥routing_key与生产者的routing_key相同时,消费者就会收到消息;当所有消费者的队列所绑定的routing_key都一样且与生产者相同时,就相当于fanout交换机

  3. topic(话题交换机)

direct(直连交换机)虽然相当于fanout的升级版,但它仍然有局限性,它不能根据多个标准进行路由;topic(话题交换机)可以很好地解决这一问题:
(1) 如果消息的路由秘钥与队列的绑定秘钥符合匹配规则,topic就会将消息发送到相应的队列当中
(2) 对于绑定键(routing_key)有两个特殊的情况: * (星号)可以代替一个单词,#(散列)可以替代零个或多个单词
(3) 对于发送到topic交换机消息的routing_key如果包含特殊字符,只能是由"."分割的单词表,如("zhangsan.lisi")

topic 生产者:
技术分享图片
 1 import pika
 2 
 3 
 4 class RabbitProducer(object):
 5     def __init__(self):
 6         self.conn = pika.BlockingConnection(
 7             pika.ConnectionParameters(host=localhost, port=5672)
 8         )
 9         self.channel = self.conn.channel()
10 
11         # 声明交换机,交换机的类型为topic
12         self.channel.exchange_declare(
13             exchange=logs_topic, exchange_type=topic, durable=True
14         )
15 
16     def send_msg(self, routing_key, message):
17         """
18         :param routing_key: 消息的路由键
19         :param message: 生成者发送的消息
20         :return:
21         """
22         self.channel.basic_publish(
23             exchange=logs_topic,
24             routing_key=routing_key,
25             body=message,
26             properties=pika.BasicProperties(
27                 delivery_mode=2,
28                 # 消息进行持久化
29             ))
30 
31     def close(self):
32         self.conn.close()
33 
34 
35 if __name__ == "__main__":
36     rabbit_producer = RabbitProducer()
37     routing_keys = [info, "debug", "a.debug.b", "a.info.b"]
38     for routing_key in routing_keys:
39         message = hello world! {}.format(routing_key)
40         print(生产者发送的消息为:{}.format(message))
41         rabbit_producer.send_msg(routing_key, message)
View Code
topic 消费者1 --> 实现fanout交换机:
技术分享图片
 1 """
 2 当topic交换机使用#绑定键绑定队列时,此时topic交换机就会将消息广播到所有的队列当中,
 3 不管消息的路由秘钥如何,此时topic交换机的效果等同于fanout:发送所有消息都会接受到
 4 """
 5 import pika
 6 import uuid
 7 
 8 
 9 class RabbitConsumer(object):
10     def __init__(self):
11         self.conn = pika.BlockingConnection(
12             pika.ConnectionParameters(host=localhost, port=5672)
13         )
14         self.channel = self.conn.channel()
15 
16         # 消息持久化
17         self.channel.queue_declare(
18             exclusive=False, durable=True, queue=task_queue
19         )
20 
21         # 声明交换机,其类型为topic
22         self.channel.exchange_declare(
23             exchange=logs_topic, exchange_type=topic, durable=True
24         )
25 
26         # 将队列与该交换机进行绑定
27         routing_keys = [#]  # 使用#绑定键时,它将接受所有的消息,同fanout效果一样.
28         for routing_key in routing_keys:
29             self.channel.queue_bind(
30                 exchange=logs_topic, queue=task_queue, routing_key=routing_key
31             )
32 
33     def call_back(self, channel, method, properties, body):
34         """
35         消费者对消息进行确认,防止消费者挂掉
36         :param channel:
37         :param method:
38         :param properties:
39         :param body:
40         :return:
41         """
42         self.channel.basic_ack(delivery_tag=method.delivery_tag)
43         print(接收到的消息为:{}.format(str(body)))
44 
45     def receive_msg(self):
46         print(开始接受消息...)
47         self.channel.basic_qos(prefetch_count=1)
48         self.channel.basic_consume(
49             consumer_callback=self.call_back,
50             queue=task_queue,
51             no_ack=False,  # 消费者对消息进行确认
52             consumer_tag=str(uuid.uuid4())
53         )
54 
55     def consume(self):
56         self.receive_msg()
57         self.channel.start_consuming()
58 
59 
60 if __name__ == __main__:
61     rabbit_consumer = RabbitConsumer()
62     rabbit_consumer.consume()
View Code
topic 消费者2 --> 实现direct交换机:
技术分享图片
 1 """
 2 当topic交换机没有使用*和#匹配符绑定键绑定队列时,此时topic交换机的效果等同于direct,
 3 会收到key相匹配的消息  如:info debug
 4 """
 5 import pika
 6 import uuid
 7 
 8 
 9 class RabbitConsumer(object):
10     def __init__(self):
11         self.conn = pika.BlockingConnection(
12             pika.ConnectionParameters(host=localhost, port=5672)
13         )
14         self.channel = self.conn.channel()
15 
16         # 消息持久化
17         self.channel.queue_declare(
18             exclusive=False, durable=True, queue=work_queue
19         )
20 
21         #
22         # 声明交换机,其类型为topic
23         self.channel.exchange_declare(
24             exchange=logs_topic, exchange_type=topic, durable=True
25         )
26 
27         # 将队列与交换机进行绑定
28         routing_keys = [info, debug]
29         for routing_key in routing_keys:
30             self.channel.queue_bind(
31                 exchange=logs_topic, queue=work_queue, routing_key=routing_key
32             )
33 
34     def call_back(self, channel, method, properties, body):
35         """
36         消费者对消息进行确认,防止消费者挂掉
37         :param channel:
38         :param method:
39         :param properties:
40         :param body:
41         :return:
42         """
43         self.channel.basic_ack(delivery_tag=method.delivery_tag)
44         print(接收到的消息为:{}.format(str(body)))
45 
46     def receive_msg(self):
47         print(开始接受消息...)
48         self.channel.basic_qos(prefetch_count=1)
49         self.channel.basic_consume(
50             consumer_callback=self.call_back,
51             queue=work_queue,
52             no_ack=False,  # 消费者对消息进行确认
53             consumer_tag=str(uuid.uuid4())
54         )
55 
56     def consume(self):
57         self.receive_msg()
58         self.channel.start_consuming()
59 
60 
61 if __name__ == __main__:
62     rabbit_consumer = RabbitConsumer()
63     rabbit_consumer.consume()
View Code
topic 消费者3 --> 实现*.x.* 消息匹配:
技术分享图片
 1 """
 2 匹配任意点分割的单词 生产者发送的:a.debug.b 则匹配了‘*.debug.*‘
 3 生产者发送的:a.info.b 则匹配了‘*.info.*‘
 4 """
 5 import pika
 6 import uuid
 7 
 8 
 9 class RabbitConsumer(object):
10     def __init__(self):
11         self.conn = pika.BlockingConnection(
12             pika.ConnectionParameters(host=localhost, port=5672)
13         )
14         self.channel = self.conn.channel()
15 
16         # 消息持久化
17         self.channel.queue_declare(
18             exclusive=False, durable=True, queue=other_queue
19         )
20 
21         # 声明交换机,其类型为topic
22         self.channel.exchange_declare(
23             exchange=logs_topic, exchange_type=topic, durable=True
24         )
25 
26         # 将队列与交换机进行绑定
27         routing_keys = [*.info.*, *.debug.*, dfdf*]
28         for routing_key in routing_keys:
29             self.channel.queue_bind(
30                 exchange=logs_topic, queue=other_queue, routing_key=routing_key
31             )
32 
33     def call_back(self, channel, method, properties, body):
34         """
35         消费者对消息进行确认,防止消费者挂掉
36         :param channel:
37         :param method:
38         :param properties:
39         :param body:
40         :return:
41         """
42         self.channel.basic_ack(delivery_tag=method.delivery_tag)
43         print(接收到的消息为:{}.format(str(body)))
44 
45     def receive_msg(self):
46         print(开始接受消息...)
47         self.channel.basic_qos(prefetch_count=1)
48         self.channel.basic_consume(
49             consumer_callback=self.call_back,
50             queue=other_queue,
51             no_ack=False,  # 消费者对消息进行确认
52             consumer_tag=str(uuid.uuid4())
53         )
54 
55     def consume(self):
56         self.receive_msg()
57         self.channel.start_consuming()
58 
59 
60 if __name__ == __main__:
61     rabbit_consumer = RabbitConsumer()
62     rabbit_consumer.consume()
View Code

topic消费者执行结果:

消费者1:

技术分享图片

消费者2:

技术分享图片

消费者3:

技术分享图片














以上是关于RabbitMQ中交换机的消息分发机制的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ实战

Python-RabbitMQ消息分发机制

消息队列RabbitMq入门

RabbitMQ消息队列:任务分发机制

RabbitMQ基础总结

RabbitMQ基础总结