rabbitmq的channel的connection的区别
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rabbitmq的channel的connection的区别相关的知识,希望对你有一定的参考价值。
参考技术A Connection 可以用来创建多个 Channel 实例,但是 Channel 实例不能在线程问共享,应用程序应该为每一个线程开辟一个 Channel 。某些情况下 Channel 的操作可以并发运行,但
是在其他情况下会导致在网络上出现错误的通信帧交错,同时也会影响友送方确认( publisher
confirm )机制的运行(详细可以参考 4.8 节),所以多线程问共享 Channel 实例是非线程安全的
对于第一种设置队列 TTL 属性的方法,一旦消息过期,就会从队列中抹去,而在第二种方
法中,即使消息过期,也不会马上从队列中抹去,因为每条消息是否过期是在即将投递到消费
者之前判定的。
为什么这两种方法处理的方式不一样?因为第一种方法里,队列中己过期的消息肯定在队
列头部, bbitM 只要定期从队头开始扫描是否有过期的消息即可。而第二种方法里,每条消
息的过期时间不同,如果要删除所有过期消息势必要扫描整个队列,所以不如等到此消息即将
被消费时再判定是否过期,如果过期再进行删除即可。
惰性队列和普通队列相比,只有很小的内存开销。这里很难对每种情况给出 个具体的数
值,但是我们可以类比一下:发送 千万条消息,每条消息的大小为 IKB ,并且此时没有任何
的消费者,那么普通队列会消耗 l.2GB 的内存,而惰性队列只消耗 1.5MB 的内存。
据官方测试数据显示,对于普通队列,如果要发送 千万条消息,需要耗费 801 秒,平均
发送速度约为 13000 条/秒。如果使用惰性队列,那么发送同样多的消息时,耗时是 421 秒,平
均发送速度约为 24000 条/秒。出现性能偏差的原因是普通队列会由于内存不足而不得不将消息
换页至磁盘。如果有消费者消费时,惰性队列会耗费将近 401'. 的空间来发送消息,对于一个
消费者的情况,平均的消费速度约为 14000 条/秒。
.Net RabbitMQ实战指南——客户端开发
开发中关键的Class和Interface有Channel、Connection、ConnectionFactory、Consumer等,与RabbitMQ相关的开发工作,基本上是围绕Connection和Channel这两个类展开的。
连接RabbitMQ
一个Connection可以创建多个Channel实例,但Channel实例不能在线程间共享,应用程序应该为每一个线程开辟一个Channel。
Channel或者Connection中有个isOpen方法可以用来检测其是否已处于开启状态。但并不推荐使用,这个方法的返回值依赖于shutdownCause的存在,有可能会产生竞争。更多的是捕获ShutdownSignalException,IOException或SocketException等异常判断RabbitMq的连接状态。
实际操作过程中遇到BrokerUnreachableException异常
因为我使用的账号是guest,guest账号默认是不支持远程连接,需要在http://localhost:15672(前提是安装了web插件)的Admin选项卡中添加一个新用户(或者使用命令行添加)。
安装web插件
rabbitmq-plugins enable rabbitmq_management
添加新用户:
sudo rabbitmqctl add_user user_admin passwd_admin
如上图所示,新添加的用户没有任何权限,需要点击用户名设置权限。
示例代码:
var factory = new ConnectionFactory { HostName = "localhost", //主机名 UserName = "mymq", //默认用户名 Password = "123456", //默认密码 RequestedHeartbeat = TimeSpan.FromSeconds(30) }; using (var connection = factory.CreateConnection())//连接服务器 { //创建一个通道 using (var channel = connection.CreateModel()) { channel.QueueDeclare("stacking", false, false, false, null);//创建消息队列 var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 1; string message = "RabbitMQ Test"; //传递的消息内容 channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生产消息 Console.WriteLine($"Send:{message}"); } }
在管理界面处看到消息插入成功
使用新加的账号链接MQ还会提示BrokerUnreachableException异常,很纳闷。折腾了半天把WSL升级到WSL2就链接成功。
交换器和队列
交换器和队列是应用层面的构建模块,使用前应对其进行声明确保其存在。
var exchangeName = "exchange_name"; channel.ExchangeDeclare(exchangeName, "direct", true);//创建一个持久化的、非自动删除的、绑定类型为direct的交换器 var queueName = channel.QueueDeclare().QueueName; //创建一个非持久化的、排他的、自动删除的队列(队列名由RabbitMQ自动生成) channel.QueueBind(queueName, exchangeName, "routing_key"); //使用路由键(routing_key)将队列和交换器绑定 channel.QueueDeclare("queue_name", true); // QueueDeclare拥有多个重载
ExchangeDeclare方法详解
各个参数详细说明如下:
exchange:交换器的名称。
type:交换器的类型,常见的如fanout、direct、topic。
durable:设置是否持久化。durable设置为true表示持久化,反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失相关信息。
autoDelete:设置是否自动删除。autoDelete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑才会删除。
internal:设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
argument:其他一些结构化参数
QueueDeclareNoWait方法实现设置了一个nowait参数(AMQP中Exchange.Declare命令的参数),意思是不需要等待服务区返回结果。
ExchangeDeclarePassive方法用来检测相应的交换器是否存在。如果存在则正常返回;如果不存在则抛出异常。
QueueDeclare方法详解
方法的参数详细说明如下:
queue:队列的名称。
durable:设置是否持久化。为true则设置队列为持久化。持久化的队列会存盘,在服务器重启的时候可以保证不丢失相关信息。
exclusive:设置是否排他。为true则设置队列为排他的。
autoDelete:设置是否自动删除。为true则设置队列为自动删除。自动删除的前提是:至少有一个消费者连接到这个队列,之后所有与这个队列连接的消费者都断开时,才会自动删除。
arguments:设置队列的其他一些参数,如x-message-ttl、x-expires、x-max-length、x-max-length-bytes、x-dead-letter-exchange、x-dead-letter-routing-key、x-max-priority等。
如果一个队列被声明为排他队列,则该队列仅对首次声明它的连接可见,并在连接断开时自动删除。需要注意:排他队列是基于连接(Connection)可见的,同一个连接的不同信道(Channel)可以访问同一连接创建的排他队列;“首次”是指如果一个连接已经声明了一个排他队列,其他连接不允许再建立同名的排他队列;即使该队列是持久化的,一旦连接关闭或客户端退出,该排他队列都会被自动删除,这种队列适用于一个客户端同时发送和读取消息的应用场景。
QueueDeclareNoWait方法实现设置了一个nowait参数,意思是不需要等待服务区返回结果。
QueueDeclarePassive方法用来检测相应的队列是否存在。如果存在则正常返回;如果不存在则抛出异常。
QueueBind方法详解
方法中涉及的参数:
queue:队列名称;
exchange:交换器的名称;
routingKey:用来绑定队列和交换器的路由键;
argument:定义绑定的一些参数。
ExchangeBind方法详解
不仅可以将交换器与队列绑定,也可以将交换器与交换器绑定。绑定之后,消息从source交换器转发到destination交换器
方法中涉及的参数:
destination:目的交换器名;
source:源交换器的名称;
routingKey:用来绑定队列和交换器的路由键;
argument:定义绑定的一些参数。
交换器的使用并不会真正耗费服务器的性能,而队列会。要衡量RabbitMQ当前的QPS只需看队列的即可。
发送消息
BasicPublish方法用来发送一条消息到。为了更好地控制发送,可以使用mandatory这个参数
对应的具体参数解释如下:
exchange:交换器的名称,指明消息需要发送到哪个交换器中。如果设置为空字符串,则消息会被发送到RabbitMQ默认的交换器中。
routingKey:路由键,交换器根据路由键将消息存储到相应的队列之中。
basicProperties:消息的基本属性集,其包含14个属性成员,分别有contentType、contentEncoding、headers(Map<;String,Object>;)、deliveryMode、priority、correlationId、replyTo、expiration、messageId、timestamp、type、userId、appId、clusterId。
byte[] body:消息体(payload),真正需要发送的消息。
mandatory:设置为true时,如果exchange根据自身类型和消息routeKey无法找到一个符合条件的queue,会调用basic.return方法将消息返还给生产者;设为false时,出现上述情形broker会直接将消息扔掉。
丰富了一下第一部分的代码:
var properties = channel.CreateBasicProperties(); properties.DeliveryMode = 1; properties.Priority = 2; properties.ContentType = "text/plain"; properties.Expiration = "60000"; string message = "RabbitMQ Test"; //传递的消息内容 channel.BasicPublish("", "stacking", properties, Encoding.UTF8.GetBytes(message)); //生产消息
消费消息
RabbitMQ的消费模式分两种:推(Push)模式和拉(Pull)模式。推模式采用Basic.Consume进行消费,而拉模式则是调用Basic.Get进行消费。
推模式
推模式接收消息需要实例化一个EventingBasicConsumer类,订阅Received事件来接收消息。EventingBasicConsumer实现了DefaultBasicConsumer类,实际使用中如果不满足需求可以继承该类。
示例代码:
var consumer = new EventingBasicConsumer(channel); consumer.Received += (ch, ea) => { var body = ea.Body.ToArray(); Console.WriteLine($"Received:{Encoding.UTF8.GetString(body)}"); channel.BasicAck(ea.DeliveryTag, false); }; var consumerTag = channel.BasicConsume("stacking", false, consumer);
BasicConsume方法对应的参数说明如下:
queue:队列的名称;
autoAck:设置是否自动确认。建议设成false,即不自动确认;
consumerTag:消费者标签,用来区分多个消费者;
arguments:设置消费者的其他参数;
callback:设置消费者的回调函数。
BasicConsume返回字符串类型consumerTag,可以通过调用channel.BasicCancel(consumerTag)显式地取消一个消费者的订阅。BasicCancel方法会首先触发HandleConsumerOk方法,之后触发HandleDelivery方法,最后才触发HandleCancelOk方法.
拉模式
拉模式通过channel.BasicGet方法可以单条地获取消息。
示例代码:
var result = channel.BasicGet("stacking",false); Console.WriteLine($"Received:{Encoding.UTF8.GetString(result.Body.ToArray())}"); channel.BasicAck(result.DeliveryTag, false);
Basic.Consume将信道(Channel)置为接收模式,直到取消队列的订阅,RabbitMQ会不断地推送消息给消费者,当然推送消息的个数还是会受到Basic.Qos的限制。如果只想从队列获得单条消息而不是持续订阅,建议使用Basic.Get进行消费。但是不能将Basic.Get放在一个循环里来代替Basic.Consume,这样做会严重影响RabbitMQ的性能。
消费端的确认与拒绝
为了保证消息从队列可靠地达到消费者,RabbitMQ提供了消息确认机制(message acknowledgement)。消费者在订阅队列时指定autoAck参数,当autoAck为false时,RabbitMQ会等待消费者显式地回复确认信号后才从内存(或者磁盘)中移去消息(实质上是先打上删除标记,之后再删除)。当autoAck为true时,RabbitMQ会自动把发送出去的消息置为确认,然后从内存(或者磁盘)中删除,而不管消费者是否真正地消费到了这些消息。
当autoAck参数设置为false,消费者就有足够的时间处理消息(任务),不用担心处理消息过程中消费者进程挂掉后消息丢失的问题。
当autoAck参数设置为false,对于RabbitMQ服务端而言,队列中的消息分为两部分:一部分是等待投递给消费者的消息;一部分是已投递给消费者,但是还没有收到消费者确认信号的消息。如果RabbitMQ一直没有收到消费者的确认信号,并且此消息的消费者断开连接,则RabbitMQ会安排该消息重新进入队列,等待投递给下一个消费者(可能还是原来的那个消费者 ),并且RabbitMQ不会为未确认的消息设置过期时间。
消费消息时autoAck参数设置为false需要主动调用channel.BasicAck对消息进行确认,以便RabbitMQ删除消息,对应的也可以调用channel.BasicReject方法拒绝消息,由其他消费端处理或者丢弃。
deliveryTag可以看作消息的编号。如果requeue参数设置为true,则RabbitMQ会重新将这条消息存入队列;如果requeue参数设置为false,则RabbitMQ立即会把消息从队列中移除,不会把它发送给新的消费者。
Basic.Reject命令一次只能拒绝一条消息,如果想要批量拒绝消息,则可以使用Basic.Nack这个命令,对应的实现方法为channel.BasicNack.
其中deliveryTag和requeue的含义可以参考BasicReject方法。multiple参数设置为false则表示仅拒绝编号为deliveryTag的单条消息;multiple参数设置为true则表示拒绝deliveryTag编号之前所有未被当前消费者确认的消息。
channel.BasicReject或者channel.BasicNack中的requeue设置为false,可以启用“死信队列”的功能。死信队列可以通过检测被拒绝或者未送达的消息来追踪问题。
关闭连接
可以显示的调用Connection和Channel的Close方法来关闭连接,也可以借助using来管理连接。
Connection和Channel所具备的生命周期如下:
Open:开启状态,代表当前对象可以使用。
Closing:正在关闭状态。当前对象被显式地通知调用关闭方法(shutdown),这样就产生了一个关闭请求让其内部对象进行相应的操作,并等待这些关闭操作的完成。
Closed:已经关闭状态。当前对象已经接收到所有的内部对象已完成关闭动作的通知,并且其也关闭了自身。
在Connection和Channel中都定义了对应实现监听状态的改变。
Connection
Channel
Github
示例代码地址:https://github.com/MayueCif/RabbitMQ
以上是关于rabbitmq的channel的connection的区别的主要内容,如果未能解决你的问题,请参考以下文章
rabbitmq的channel的connection的区别
RabbitMQ-ConnectionFactory、Connection、Channel