RabbitMQ的简单使用
Posted haizhilangzi
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ的简单使用相关的知识,希望对你有一定的参考价值。
RabbitMQ的简单使用
RabbitMQ安装
安装步骤可以参考该网址进行安装,此处不再赘述
RabbitMQ的生产者消费者简单实例
创建pom工程,并引入下面依赖
<dependency> <groupId>com.rabbitmq</groupId> <artifactId>amqp-client</artifactId> <version>5.7.3</version> </dependency>
生产者代码
首先定义一个链接对象
public class ConnectionUtil { public static Connection getConnection(String host, int port, String vHost, String userName, String passWord) throws IOException, TimeoutException { //定义连接工厂 ConnectionFactory connectionFactory = new ConnectionFactory(); //设置服务器地址 connectionFactory.setHost(host); //设置端口号 connectionFactory.setPort(port); //设置主机,用户名,密码 connectionFactory.setVirtualHost(vHost); connectionFactory.setUsername(userName); connectionFactory.setPassword(passWord); //返回连接 return connectionFactory.newConnection(); } }
生产者代码
public class ProducerMessage { private final static String QUEUE_NAME = "MQ"; public static void main(String[] args) throws Exception { //1、获取连接 Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest"); //2、声明通道 Channel channel = connection.createChannel(); //3、创建队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、定义消息内容 String message = "hello MQ"; //发布消息 for (int i = 0; i < 100; i++) { channel.basicPublish("", QUEUE_NAME, null, (message+i).getBytes()); System.out.println("[x] Sent'" + message+i + "'"); } //6、关闭通道和连接 channel.close(); connection.close(); } }
消费者代码
由于本教程是参考其他人的博客进行,且由于MQ的版本不同,所以copy了一段消费者队列代码,如下:
QueueingConsumer需要实现DefaultConsumer
public class QueueingConsumer extends DefaultConsumer { //定义一个队列 private final BlockingQueue<QueueingConsumer.Delivery> _queue; // When this is non-null the queue is in shutdown mode and nextDelivery should // throw a shutdown signal exception. private volatile ShutdownSignalException _shutdown; private volatile ConsumerCancelledException _cancelled; // Marker object used to signal the queue is in shutdown mode. // It is only there to wake up consumers. The canonical representation // of shutting down is the presence of _shutdown. // Invariant: This is never on _queue unless _shutdown != null. private static final QueueingConsumer.Delivery POISON = new QueueingConsumer.Delivery(null, null, null); public QueueingConsumer(Channel ch) { this(ch, new LinkedBlockingQueue<QueueingConsumer.Delivery>()); } public QueueingConsumer(Channel ch, BlockingQueue<QueueingConsumer.Delivery> q) { super(ch); this._queue = q; } @Override public void handleShutdownSignal(String consumerTag, ShutdownSignalException sig) { _shutdown = sig; _queue.add(POISON); } @Override public void handleCancel(String consumerTag) throws IOException { _cancelled = new ConsumerCancelledException(); _queue.add(POISON); } @Override public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body) throws IOException { checkShutdown(); this._queue.add(new QueueingConsumer.Delivery(envelope, properties, body)); } /** * Encapsulates an arbitrary message - simple "bean" holder structure. */ public static class Delivery { private final Envelope _envelope; private final AMQP.BasicProperties _properties; private final byte[] _body; public Delivery(Envelope envelope, AMQP.BasicProperties properties, byte[] body) { _envelope = envelope; _properties = properties; _body = body; } /** * Retrieve the message envelope. * * @return the message envelope */ public Envelope getEnvelope() { return _envelope; } /** * Retrieve the message properties. * * @return the message properties */ public AMQP.BasicProperties getProperties() { return _properties; } /** * Retrieve the message body. * * @return the message body */ public byte[] getBody() { return _body; } } /** * Check if we are in shutdown mode and if so throw an exception. */ private void checkShutdown() { if (_shutdown != null) throw Utility.fixStackTrace(_shutdown); } /** * If delivery is not POISON nor null, return it. * <p/> * If delivery, _shutdown and _cancelled are all null, return null. * <p/> * If delivery is POISON re-insert POISON into the queue and * throw an exception if POISONed for no reason. * <p/> * Otherwise, if we are in shutdown mode or cancelled, * throw a corresponding exception. */ private QueueingConsumer.Delivery handle(QueueingConsumer.Delivery delivery) { if (delivery == POISON || delivery == null && (_shutdown != null || _cancelled != null)) { if (delivery == POISON) { _queue.add(POISON); if (_shutdown == null && _cancelled == null) { throw new IllegalStateException( "POISON in queue, but null _shutdown and null _cancelled. " + "This should never happen, please report as a BUG"); } } if (null != _shutdown) throw Utility.fixStackTrace(_shutdown); if (null != _cancelled) throw Utility.fixStackTrace(_cancelled); } return delivery; } /** * 等待消息投递,并将消息返回 * * @return the next message * @throws InterruptedException if an interrupt is received while waiting * @throws ShutdownSignalException if the connection is shut down while waiting * @throws ConsumerCancelledException if this consumer is cancelled while waiting */ public QueueingConsumer.Delivery nextDelivery() throws InterruptedException, ShutdownSignalException, ConsumerCancelledException { return handle(_queue.take()); } /** * Main application-side API: wait for the next message delivery and return it. * * @param timeout timeout in millisecond * @return the next message or null if timed out * @throws InterruptedException if an interrupt is received while waiting * @throws ShutdownSignalException if the connection is shut down while waiting * @throws ConsumerCancelledException if this consumer is cancelled while waiting */ public QueueingConsumer.Delivery nextDelivery(long timeout) throws InterruptedException, ShutdownSignalException, ConsumerCancelledException { return handle(_queue.poll(timeout, TimeUnit.MILLISECONDS)); } }
消费者代码
public class ConsumerMessage { private final static String QUEUE_NAME = "MQ"; //consumer.n QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } }public static void main(String[] args) throws Exception { //1、获取连接 Connection connection = ConnectionUtil.getConnection("127.0.0.1", 5672, "/", "guest", "guest"); //2、声明通道 Channel channel = connection.createChannel(); //3、声明队列 channel.queueDeclare(QUEUE_NAME, false, false, false, null); //4、定义队列的消费者 //Consumer consumer=new DefaultConsumer(channel); QueueingConsumer consumer = new QueueingConsumer(channel); //5、监听队列 channel.basicConsume(QUEUE_NAME, true, consumer); //6、获取消息 while (true) { //consumer.n QueueingConsumer.Delivery delivery = consumer.nextDelivery(); String message = new String(delivery.getBody()); System.out.println(" [x] Received '" + message + "'"); } } }
上面完成消费者和生产者代码,运行即可
RabbitMQ SpringBoot代码实现
应用启动类:
@SpringBootApplication @EnableScheduling public class RabbitAmqpTutorialsApplication { public static void main(String[] args) throws Exception { SpringApplication.run(RabbitAmqpTutorialsApplication.class, args); } }
生产者代码:
public class Tut1Sender { @Autowired private RabbitTemplate template; @Autowired private Queue queue; /** * 使用RabbitTemplate向队列中定时发送消息 */ @Scheduled(fixedDelay = 1000, initialDelay = 500) public void send() { String message = "Hello World!"; this.template.convertAndSend(queue.getName(), message); System.out.println(" [x] Sent '" + message + "'"); } }
消费者代码:
@RabbitListener可以标注在类上面,当使用在类上面的时候,需要配合@RabbitHandler注解一起使用,
@RabbitListener标注在类上面表示当有收到消息的时候,就交给带有@RabbitHandler的方法处理,具体找哪个方法处理,需要跟进MessageConverter转换后的java对象。//表示监听一个hello队列 @RabbitListener(queues = "hello") public class Tut1Receiver { @RabbitHandler public void receive(String message) { System.out.println(" [x] Received '" + message + "'"); } }
配置类:
@Configuration public class Tut1Config { //声明一个队列 @Bean public Queue hello() { return new Queue("hello"); } @Bean public Tut1Receiver receiver() { //实例化消费者 return new Tut1Receiver(); } //@Profile("sender") @Bean public Tut1Sender sender() { //实例化生产者 return new Tut1Sender(); } }
application.yml
#spring: # profiles: # active: usage_message logging: level: org: ERROR tutorial: client: duration: 10000 spring: rabbitmq: host: localhost username: guest password: guest
其实使用springboot还是比较简单的,这个主要是因为springboot对RabbitMQ的底层做了很多的封装,隐藏和很多的细节。如果想全面的了解RabbitMQ最好还是到官网上面查看资料,虽然都是英文资料,读起来比较生涩难懂
以上是关于RabbitMQ的简单使用的主要内容,如果未能解决你的问题,请参考以下文章