java实现rabbitMQ

Posted java交流空间

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java实现rabbitMQ相关的知识,希望对你有一定的参考价值。

首先创建maven项目,在pom.xml中配置rabbitMQ

实现代码:

    producer:

package com.rabbit.demo;



import com.rabbitmq.client.Channel;  

import com.rabbitmq.client.Connection;  

import com.rabbitmq.client.ConnectionFactory;  

import com.rabbitmq.client.MessageProperties;  


/**

 * @author xbyangd

 * 生产者

 * producer

 */

public class DemoProducer {



private static String queueName = "rabbit";  

public static void main(String[] args) throws Exception{  


ConnectionFactory factory = new ConnectionFactory();  

factory.setHost("127.0.0.1");  

factory.setUsername("rabbit");  

factory.setPassword("123456");  

Connection connection = factory.newConnection();  


Channel channel = connection.createChannel();  


channel.queueDeclare(queueName, true, false, false, null);    



for (int i = 0; i < 100; i++) {  

//发送的消息    

String message = "hello world!"+i;    

//往队列中发出一条消息    

channel.basicPublish("", queueName, MessageProperties.PERSISTENT_TEXT_PLAIN, message.getBytes());    

System.out.println(" [x] Sent '" + message + "'");    

//            Thread.sleep(1000);  

}  



//关闭频道和连接    

channel.close();    

connection.close();   


}  


}  


consumer:

package com.rabbit.demo;  

import com.rabbitmq.client.Channel;

import com.rabbitmq.client.Connection;

import com.rabbitmq.client.ConnectionFactory;

import com.rabbitmq.client.QueueingConsumer;


/**

 * @author xbyangd

 * 消费者

 * consumer

 */

public class DemoConsumer {


private static String queueName = "rabbit";  


public static void main(String[] args) throws Exception {  


ConnectionFactory factory = new ConnectionFactory();  

factory.setHost("127.0.0.1");  

factory.setUsername("guest");  

factory.setPassword("guest");  

Connection connection = factory.newConnection();  

Channel channel = connection.createChannel();  


// 声明队列,主要为了防止消息接收者先运行此程序,队列还不存在时创建队列。  

channel.queueDeclare(queueName, true, false, false, null);  


// 创建队列消费者  

QueueingConsumer consumer = new QueueingConsumer(channel);  


// 设置最大服务消息接收数量  

int prefetchCount = 1;  

channel.basicQos(prefetchCount);  


boolean ack = false; // 是否自动确认消息被成功消费  

channel.basicConsume(queueName, ack, consumer); // 指定消费队列  


while (true) {  

// nextDelivery是一个阻塞方法(内部实现其实是阻塞队列的take方法)  

QueueingConsumer.Delivery delivery = consumer.nextDelivery();  

String message = new String(delivery.getBody());  

System.out.println(" [x] Received '" + message + "'");  


channel.basicAck(delivery.getEnvelope().getDeliveryTag(), false);  

Thread.sleep(2000);  

}  


}  


}  


以上是关于java实现rabbitMQ的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ 客户端开发向导

RabbitMQ java 客户端 RpcClient/RpcServer 示例

RabbitMQ Java 客户端 - Java 依赖项(标准库?)

列出与rabbitmq java客户端API交换的绑定

一个队列上的多个消费者 RabbitMQ - Java

RabbitMQ - 关闭空闲/悬空通道