Rabbit简单队列模式
Posted fuguang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rabbit简单队列模式相关的知识,希望对你有一定的参考价值。
1 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd"> 2 <modelVersion>4.0.0</modelVersion> 3 <groupId>com.kf</groupId> 4 <artifactId>rabbitMQ.demo</artifactId> 5 <version>0.0.1-SNAPSHOT</version> 6 7 8 9 <dependencies> 10 <dependency> 11 <groupId>com.rabbitmq</groupId> 12 <artifactId>amqp-client</artifactId> 13 <version>3.6.5</version> 14 </dependency> 15 </dependencies> 16 17 18 19 </project>
1 package com.kf.utils; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.rabbitmq.client.Connection; 7 import com.rabbitmq.client.ConnectionFactory; 8 9 /** 10 * 获取rabbit 11 * @author kf 12 * 13 */ 14 public class RabbitConnectionUtils { 15 16 17 public static Connection getConnection() throws IOException, TimeoutException{ 18 ConnectionFactory factory = new ConnectionFactory(); 19 factory.setHost("127.0.0.1"); 20 factory.setUsername("admin"); 21 factory.setPassword("admin"); 22 //AMQP协议端口号 23 factory.setPort(5672); 24 factory.setVirtualHost("/kf"); 25 Connection newConnection = factory.newConnection(); 26 return newConnection; 27 } 28 29 }
1 package com.kf.queueDemo.simpleQueue; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.kf.utils.RabbitConnectionUtils; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 10 /** 11 * 简单队列 12 * @author kf 13 * 14 */ 15 public class SimpleQueueProducer { 16 17 //队列名称 18 private static String QUEUENAME = "SIMPLEQUEUE"; 19 20 public static void main(String[] args) throws IOException, TimeoutException{ 21 Connection connection = RabbitConnectionUtils.getConnection(); 22 23 //创建通道 24 Channel channel = connection.createChannel(); 25 26 //通道里放入队列 27 /** 28 * 第一个参数是 队列名称 29 * 第二个参数指 要不要持久化 30 */ 31 channel.queueDeclare(QUEUENAME, false, false, false, null); 32 33 /* //消息体 34 String mes = "demo_message汉字"; 35 36 //发送消息 37 *//** 38 * 参数为 exchange, routingKey, props, body 39 * exchange 交换机 40 * routingKey 路由键 41 * 42 * body 消息体 43 *//* 44 channel.basicPublish("", QUEUENAME, null, mes.getBytes());*/ 45 46 /** 47 * 集群环境下,多个消费者情况下。消费者默认采用均摊 48 */ 49 for(int i=1; i<11; i++){ 50 String mes = "demo_message汉字"+i; 51 System.out.println("发送消息"+mes); 52 channel.basicPublish("", QUEUENAME, null, mes.getBytes()); 53 } 54 55 56 // System.out.println("发送消息"+mes); 57 58 channel.close(); 59 connection.close(); 60 } 61 62 }
1 package com.kf.queueDemo.simpleQueue; 2 3 import java.io.IOException; 4 import java.util.concurrent.TimeoutException; 5 6 import com.kf.utils.RabbitConnectionUtils; 7 import com.rabbitmq.client.Channel; 8 import com.rabbitmq.client.Connection; 9 import com.rabbitmq.client.DefaultConsumer; 10 import com.rabbitmq.client.Envelope; 11 import com.rabbitmq.client.AMQP.BasicProperties; 12 13 /** 14 * 简单队列消费者 15 * @author kf 16 * 17 */ 18 public class SimpleConsumer { 19 //队列名称 20 private static String QUEUENAME = "SIMPLEQUEUE"; 21 22 public static void main(String[] args) throws IOException, TimeoutException{ 23 System.out.println("01开始接收消息"); 24 Connection connection = RabbitConnectionUtils.getConnection(); 25 26 //创建通道 27 final Channel channel = connection.createChannel(); 28 29 //通道里放入队列 30 /** 31 * 第一个参数是 队列名称 32 * 第二个参数指 要不要持久化 33 */ 34 channel.queueDeclare(QUEUENAME, false, false, false, null); 35 36 DefaultConsumer consumer = new DefaultConsumer(channel){ 37 //监听队列 38 @Override 39 public void handleDelivery(String consumerTag, Envelope envelope, BasicProperties properties, 40 byte[] body) throws IOException { 41 System.out.println("------------进入监听---------"); 42 String s = new String(body, "utf-8"); 43 System.out.println("获取到的消息是:"+s); 44 //手动应答。 45 /** 46 * 当 channel.basicConsume(QUEUENAME, true, consumer);第二个参数为false时 是手动应答模式 47 */ 48 // channel.basicAck(envelope.getDeliveryTag(), false); 49 } 50 }; 51 52 //设置应答模式 53 /** 54 * 参数: 对列名,是否自动签收,监听的类 55 */ 56 System.out.println("获取消息的方法之前"); 57 channel.basicConsume(QUEUENAME, true, consumer); 58 System.out.println("获取消息的方法之后"); 59 60 } 61 62 63 }
以上是关于Rabbit简单队列模式的主要内容,如果未能解决你的问题,请参考以下文章