RabbitMQ学习笔记二:rabbitmq发送接收消息Helloworld(Java版)

Posted RunningFan

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ学习笔记二:rabbitmq发送接收消息Helloworld(Java版)相关的知识,希望对你有一定的参考价值。

前面我们已经了解了Windows下RabbitMQ相关服务安装,参见http://blog.csdn.net/u010416588/article/details/54599341

一 引入rabbitmq java client

前面我们已经在本地(windows下)安装配置好了RabbitMQ server。现在我们引入rabbitmq Java client。
在eclipse中创建一个maven项目,在pom.xml文件中加入依赖

<dependency>
  <groupId>com.rabbitmq</groupId>
  <artifactId>amqp-client</artifactId>
  <version>4.0.1</version>
</dependency>

Alternatively, if you’re using Gradle:

dependencies 
  compile 'com.rabbitmq:amqp-client:4.0.1'

jar包下载地址http://www.rabbitmq.com/java-client.html

二 rabbitmq发送消息

2.1 创建一个消息发送类

package com.gta.goldnock.mq;

import java.io.IOException;
import java.util.concurrent.TimeoutException;

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
/**
 * 
* @ClassName: Send
* @Description: TODO(The sender will connect to RabbitMQ, send a single message, then exit.)
* @author yuhuan.gao
* @date 2017年1月19日 下午1:37:19
*
 */
public class Send 
    /*
     * 定义一个队列“hello”
     */
    private final static String QUEUE_NAME = "hello";

    public static void main(String[] argv) throws IOException, TimeoutException
        //创建一个连接
        ConnectionFactory factory = new ConnectionFactory();
        //连接本地,如果需要指定到服务,需在这里指定IP
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //申明通道发送消息的队列,把消息发送至消息队列‘hello’
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        //Declaring a queue is idempotent - 如果队列不存在则会创建一个队列 
        //消息内容为byte array, so可以自己随意编码
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
        System.out.println(" [x] Sent '" + message + "'");

        //消息发送完成后,关闭通道和连接
        channel.close();
        connection.close();
    

2.2 运行main,成功,但是不确定消息是否发送成功了。查看rabbit-server的日志

=INFO REPORT==== 19-Jan-2017::14:27:46 ===
accepting AMQP connection <0.19748.19> ([::1]:60445 -> [::1]:5672)

=INFO REPORT==== 19-Jan-2017::14:27:46 ===
closing AMQP connection <0.19748.19> ([::1]:60445 -> [::1]:5672)

2.3 通过插件查看服务管理


message从3变成4了,消息是发送成功了。

Sending doesn't work!

If this is your first time using RabbitMQ and you don't see the "Sent" message then you may be left scratching your head wondering what could be wrong. Maybe the broker was started without enough free disk space (by default it needs at least 1Gb free) and is therefore refusing to accept messages. Check the broker logfile to confirm and reduce the limit if necessary. The configuration file documentation will show you how to set disk_free_limit.

三 rabbitmq消息接收

不同于sender,reciever是从RabbitMQ中推出消息。sender每次发送一条单一的消息,而reciever一直运行并监控着服务,一有消息就接收。

3.1 创建消息接收类

package com.gta.goldnock.mq;


import java.io.IOException;

import com.rabbitmq.client.AMQP;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
import com.rabbitmq.client.Envelope;

/**
 * 
* @ClassName: Recv
* @Description: TODO(接收消息类)
* @author yuhuan.gao
* @date 2017年1月19日 下午2:33:27
*
 */
public class Recv 

    private final static String QUEUE_NAME = "hello";

    public static void main(String[] args) throws Exception
        //创建一个连接
        ConnectionFactory factory = new ConnectionFactory();
        //连接本地,如果需要指定到服务,需在这里指定IP
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        //创建一个通道
        Channel channel = connection.createChannel();
        //申明接收消息的队列,与发送消息队列"hello"对应
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
        //The extra DefaultConsumer is a class implementing the Consumer interface 
        //we'll use to buffer the messages pushed to us by the server.
        Consumer consumer = new DefaultConsumer(channel)
            //重写DefaultConsumer中handleDelivery方法,在方法中获取消息
            @Override
            public void handleDelivery(String consumerTag, Envelope envelope, 
                    AMQP.BasicProperties properties, byte[] body) throws IOException
                String message = new String(body, "UTF-8");
                System.out.println(" [x] Received '" + message + "'");
            
        ;
        channel.basicConsume(QUEUE_NAME, true,consumer);
    
We're about to tell the server to deliver us the messages from the queue. Since it will push us messages asynchronously, we provide a callback in the form of an object that will buffer the messages until we're ready to use them. That is what a DefaultConsumer subclass does.

3.2 运行

运行main,运行成功,并且控制台打印如下

 [*] Waiting for messages. To exit press CTRL+C
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'
 [x] Received 'Hello World!'

上面结果正如预期的,当订阅‘hello’队列消息后,收到了之前Sender发的x消息。继续运行Sender的main,Recv在运行状态下会自动收到消息。

以上是关于RabbitMQ学习笔记二:rabbitmq发送接收消息Helloworld(Java版)的主要内容,如果未能解决你的问题,请参考以下文章

消息中间件系列三:使用RabbitMq原生Java客户端进行消息通信(消费者(接收方)自动确认模式消费者(接收方)自行确认模式生产者(发送方)确认模式)

RabbitMQ学习笔记

RabbitMQ学习笔记4-使用fanout交换器

RabbitMQ常见面试题

springboot学习笔记-6 springboot整合RabbitMQ

RabbitMQ学习笔记(自用)