RabbitMQ研究Java-API使用
Posted wu6660563
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ研究Java-API使用相关的知识,希望对你有一定的参考价值。
链接RabbitMQ
ConnectionFactory factory = new ConnectionFactory();
factory.setHost(IP_ADDRESS);
factory.setPort(PORT);
factory.setUsername("guest");
factory.setPassword("guest");
Connection connection = factory.newConnection(); //创建链接
也可以将setHost
、setPort
、setUsername
、setPassword
直接改成使用URI的方式链接
factory.setUri( "amqp:guest : guest@localhost:5672/");
Connection可以用来创建多个Channel实例,但是Channel实例不能再线程间共享,应用程序应该为每个线程开辟一个Channel。某些情况下Channle的操作可以并发运行,但是在其他情况下会导致在网络上出现错误的通信交错,同时也会影响发送方确认机制的运行,多线程间共享Channel实例是非线程安全的。
使用交换器和队列
交换器和队列在使用的时候,必须先声明
//创建一个 type = direct 持久化交换器
channel.exchangeDeclare(exchangeName, "direct", true);
String queueName = channel.queueDeclare().getQueue();
channel.queueBind(queueName, exchangeName, routingKey);
上面是创建了一个持久化、非自动删除的、绑定类型为direct
的交换器,同时也创建了一个非持久化的、排他的、自动删除的队列(队列名称是由RabbitMQ自动生成的)
如果要在应用中共享一个队列,可以如下:
channel.exchangeDeclare (exchangeName , "direct" , true) ;
channel.queueDeclare(queueName , true , false , false , null);
channel.queueBind(queueName , exchangeName, routingKey) ;
exchangeDeclare方法详解
exchangeDeclare有多个重载方法,这些重载方法都是由下面这个方法中缺省的某些参数构成
Exchange.DeclareOk exchangeDeclare(String exchange ,
String type , boolean durable ,
boolean autoDelete , boolean internal ,
Map<String, Object> arguments) throws IOException ;
这个方法的返回值是Exchange.DeclareOK
,用来标识成功声明了一个交换器。
各个参数详细说明如下:
- exchange:交换器的名称
- type:交换器的类型,常见的如fanout、direct、topic。
- durable:设置是否持久化。durable设置为true表示持久化,反之是非持久化。持久化可以将交换器存盘,在服务器重启的时候不会丢失交换器的相关信息。
- autoDelete:设置是否自动删除。autoDelete设置为true则表示自动删除。自动删除的前提是至少有一个队列或者交换器与这个交换器绑定,之后所有与这个交换器绑定的队列或者交换器都与此解绑。
- internal:设置是否是内置的。如果设置为true,则表示是内置的交换器,客户端程序无法直接发送消息到这个交换器中,只能通过交换器路由到交换器这种方式。
- argument:其他一些参数
Exchange.DeclareOk exchangeDeclare(String exchange , String type) throws
IOExcept 工on ;
Exchange.DeclareOk exchangeDeclare(String exchange, String type , boolean
durab1e) throws IOExcept 工on ;
Exchange.DeclareOk exchangeDeclare(String exchange, String type , boolean
durable , boolean autoDelete , Map<String , Object> arguments) throws IOException ;
有声明创建交换器的方法,当然也有删除交换器的方法。相应的方法如下:
Exchange.DeleteOk exchangeDelete(String exchange) throws IOException ;
void exchangeDeleteNoWait(String exchange, boolean ifUnused) throws
IOException ;
Exchange.DeleteOk exchangeDelete(String exchange, boo1ean ifUnused) throws
IOException;
其中exchange 表示交换器的名称,而ifUnused 用来设置是否在交换器没有被使用的情
况下删除。如果isUnused 设置为true ,则只有在此交换器没有被使用的情况下才会被删除:
如果设置false ,则无论如何这个交换器都要被删除。
queueBind方法详解
将队列和交换器绑定的方法如下
Queue.BindOK.queueBind(String queue, String exchange, String routingKey) throws IOException;
Queue.BindOK.queueBind(String queue, String exchange, String routingKey, Map<String, Object> argumenets) throws IOException;
Queue.BindOK.queueBindNoWait(String queue, String exchange, String routingKey, Map<String, Object> argumenets) throws IOException;
方法中涉及的参数详解:
- queue:队列名称
- exchange:交换器的名称
- routingKey:用来绑定队列和交换器的路由键
- argument:定义绑定的一些参数
nowait这个里面是有个nowait参数,这个nowait参数指的是不需要服务器返回,注意这个方法的返回值是void,如果没有特殊的缘由和应用场景,不建议使用这个方法
生产者和消费者都能够使用queueDeclare
来声明一个队列,但是如果消费者在同一个信道上订阅了另外一个队列,就无法再次声明队列了。必须先取消订阅,然后将信道设置为传输模式,才能声明队列
不仅可以将队列和交换器绑定起来,也可以将已经被绑定的队列和交换器进行解绑。具体有下:
Queue.UnbindOK.queueUnbind(String queue, String exchange, String routingKey) throws IOException;
Queue.UnbindOK.queueUnbind(String queue, String exchange, String routingKey, Map<String, Object> arguments) throws IOException;
exchangeBind方法
不仅可以将交换器和队列绑定,也可以将交换器和交换器绑定,下面表示从source
交换器转发到destination
交换器,某种程度上来说是destination
交换器可以看作一个队列
Exchange.BindOK exchangeBind(String destination, String source, String routingKey) throws IOException;
Exchange.BindOK exchangeBind(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
//表示不等待服务器回执信息.可以提高访问速度
void exchangeBindNoWait(String destination, String source, String routingKey, Map<String, Object> arguments) throws IOException;
何时创建
RabbitMQ的消息存储在队列中,交换器的使用并不真正的耗费服务器的性能,而队列会。如果要衡量RabbitMQ当前的QPS只需要看队列即可。在实际业务中,需要对创建的队列的流量、内存占用及网卡占用有一个清晰的认知,预估其平均值和峰值。
按照RabbitMQ官方建议,生产者和消费者都应该去创建队列(避免手工创建)。但是如果在本身设计的时候,就通过后台管理界面已经创建好了,那么可以免去声明的过程,直接使用即可。
预先创建资源有个优点:确保交换器和队列正确绑定。实际情况下,由于开发人员水平参吃不齐,可能发送消息的交换器并没有绑定任何队列,那么消息就会丢失;或者交换器绑定了队列,但是RoutingKey和BindKey没有正确匹配,也会丢失。
发送消息
简单发送消息
byte[] message = "Hello,Nick".getBytes();
channel.basicPublish(exchangeName, routingKey, null, message);
//带Headers消息
Map<String, Object> headers = new HashMap();
headers.put("token", "xxxx");
channel.basicPublish(exchnageName, routingKey,
new AMQP.BasicProperties.Builder()
.contentType("text/plain")
.deliveryMode(2) //投递模式为2表示会持久化
.expiration("60000") //带过期时间
.priority(1) //优先级
.userId("hidden")
.build()), message);
消息持久化
,需要将投递模式(delivery mode)设置为2,消息会持久化(刷盘)到服务器硬盘中。同时这条消息的优先级(priority)设置为1,content-type为text/plain
详细API如下:
void basicPublish(String exchange, String routingKey, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, BasicProperties props, byte[] body) throws IOException;
void basicPublish(String exchange, String routingKey, boolean mandatory, boolean immediate, BasicProperties props, byte[] body) throws IOException;
具体参数解释:
- exchange:交换器的名称,指明消息需要发送到哪个交换器。如果设置为空字符串,则消息会发送到模式的交换器中
- routingKey:路由键,交换器根据路由键将消息存储到相应的队列中
- props:消息的基本属性集:
contentType
、contentEncoding
、headers
、deliveryMode
、priority
、correlationId
、replyTo
、expiration
、messageId
、timestamp
、type
、userId
、appId
、clusterId
- byte[] body:消息体(payload),真正的消息,在SpringBoot中整理Rabbit中有直接使用
@Payload
注解
消费消息
RabbitMQ的消费模式有两种:推式(Push)和拉式(Pull)。推式采用Basic.Consume进行消费,而拉式采用Basic.Get消费。
推式
推式主要是
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
不同的订阅采用不同的消费者标签(consumerTag)区分,同一个Channel的消费者也需要通过唯一的消费者标签区分
boolean autoAck = false;
channel.basicQos(64);
channel.basicConsume(queueName, autoAck, "myConsumerTag",
new DefaultConsumer(channel)
@Override
public void headleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException
String routingKey = envelope.getRoutingKey();
String contentType = properties.getContentType();
long deliveryTag = envelope.getDeliveryTag();
channel.basicAck(deliveryTag, false);
);
注意显示的设置autoAck
为false,可以防止消息不必要的丢失
详细API如下:
String basicConsume(String queue , Consumer callback) throws IOException ;
String basicConsume(String 守ueue , boolean autoAck, Consumer callback) throws IOException;
String bas 工cConsume(String queue , boolean autoAck, Map<String, Object> arguments,
Consumer callback) throws IOException ;
String basicConsume(String queue , bool ean autoAck, String consumerTag,
Consumer callback) throws IOException ;
String basicConsume(String queue , boolean autoAck, String consumerTag,
boolean noLocal, boolean exclusive, Map<Str 工ng , Object> arguments,
Consumer callback) throws IOException;
参数讲解:
- queue:队列的名称
- autoAck:是否自动确认。建议设置为false,不自动确认
- consumerTag:消费者标签,用来区分多个消费者
- noLocal:设置为false,则表示不能讲同一个Connection中生产者发送的消息传送给这个Conection中的消费者
- exclusive:是否排他
- arguments:其他参数
- callback:消费者的毁掉函数
如果要重写回调,直接重写handleDelivery
十分方便
消费者消费需要考虑线程安全的问题。消费者客户端的这些callback会被分配到不同的线程,意味着消费者客户端可以安全的调用阻塞方法
每个Channel都有自己独立的线程,最好的方式是一个Channel对应一个消费者,每个消费者之间没有联系。
拉式
拉式是使用basicGet的方法单条获取消息,其返回值是GetResponse。Channel类的basicGet没有其他重载方法,只有一个:
GetResponse basicGet(String queue, boolean autoAck) throws IOException;
Basic.Consume将信道Channel设置为接收模式,直到取消订阅。接收模式的时候,RabbitMQ会不断推送消息给消费者,当然推送消息的个数是受到Basic.Qos的限制,如果只想从队列中获取单条信息而不是持续订阅,建议还是使用Basic.Get进行消费。但是不能讲Basic.Get放在一个循环里面来代替Basic.Consume,这样会严重影响RabbitMQ的性能。所以为了实现高吞吐,大部分场景都是用Basic.Consume推式模式的。
消费者的确认和拒绝
设置autoAck参数,当autoAck等于false的时候,RabbitMQ会等待消费者显式的回复确认信号后才从内存(或者磁盘)中移除消息(实质上是先打上记号,再删除)。当autoAck=true的时候,RabbitMQ会自动发送出去消息为确认,然后从内存(或磁盘)删除,而不管消费者是否真正的消费到了这些消息。
采用消息确认以后,只要设置为false,就有足够的时间处理消息,不用担心处理消息过程中消息进程挂掉导致消息丢失,因为会不断的推送,直到消费者显式的调用Basic.Ack。队列中分为两部分:第一个是等待投递给消费者的消息;第二个是已经投递给消费者,但是还没有确认的消息。如果消费者断开连接,RabbitMQ会重新安排进入队列,等待投递。
通过以下命令查看队列消息数量:
# queuename就是你实际的队列名称
rabbitmqctl list_queues queuename message_ready
显式的调用拒绝的方法
//拒绝一条消息
void basicReject(long deliveryTag, boolean requeue) throws IOException;
//拒绝多条消息
void basicNack(long deliveryTag, boolean mutiple, boolean requeue) throws IOException;
如果requeue=true,则RabbitMQ会重新将这条消息存入队列,以便可以发送给下一个订阅的消费者;如果requeue=false,会RabbitMQ会立即从队列中删除,以后也不会再次推送。
mutiple=false,则表示拒绝编号为deliveryTag的消息,这时候basicNack和basicReject一样,如果mutiple=true,则表示拒绝deliveryTag编号之前所有未被当前消费的消息
requeue设置为false,其实是启用了
死信队列
。死信队列可以通过检测被拒绝或者未发送成功的消息来跟踪问题
可冲入队列:
Basic.RecoverOK basicRecover() throws IOException;
Basic.RecoverOK basicRecover(boolean requeue) throws IOException
这个chaneel.basicRecover方法用来请求RabbitMQ重新发送未被确认的消息。如果requeue设置为true,则未被确认的消息会被重新加入到队列中,这样对于同一个消息来说,可能会被分配给与之不同的消费者。如果requeue参数设置为false,那么同一条消息一定会被分配给与之相同的消费者,默认是true
关闭连接
通过使用如下代码关闭连接,释放资源。其实原则上关闭Connection的时候,Channel也会自动关闭
channel.close();
conn.close();
Connection和Channel有以下生命周期:
- Open:开启状态,代表当前对象可以使用。
- Closing:正在关闭状态
- Closed:已经关闭
调用了close、程序正常关闭、异常关闭、网络异常断开,最终都会成为Closed状态
关闭方法的监听事件有addShutdownListener(ShutdownListener listener)
、removeShutdownListener(ShutdownListener listener)
connection.addShutdownListener(new ShutdownListener()
public void shutdownCompleted(ShutdownSignalException cause)
//业务代码
if(cause.isHardError())
Connection conn = (Connection)cause.getReference();
if(!cause.isInitiatedByApplication())
Method reason = cause.getReason(); //得到关闭的信息
);
以上是关于RabbitMQ研究Java-API使用的主要内容,如果未能解决你的问题,请参考以下文章