java 实现利用 RabbitMQ 发送和消费消息

Posted wang_dong_yang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了java 实现利用 RabbitMQ 发送和消费消息相关的知识,希望对你有一定的参考价值。

1.简介

     RabbitMQ 是一个消息代理。从本质上说,它从生产者接收消息,然后把这些消息传递给消费者。在这期间,它能根据你制定的规则发送,缓存,或者持久化存储这些消息。

RabbitMQ 使用到的专业术语。

1).Producing 的意思不仅仅是发送消息。发送消息的程序叫做producer。我们像下图一样描绘它。

2).Queue 是一个消息盒子的名称。它存活在 RabbitMQ 里。虽然消息流经 RabbitMQ 和你的应用程序,但是他们只能在 Queue 里才能被保存。Queue 没有任何边界的限制,你想存多少消息都可以,它本质上是一个无限的缓存。许多生产者都可以向一个 Queue 里发送消息,许多消费者都可以从一个 Queue 里接收消息。我们下图一样描绘它。


3).Consuming 的意思和接收类似。一个消费者主要是指等待接收消息的程序。我们下图一样描绘它。

   

注意:生产者,消费者和代理不一定非要在同一台机器上,在大多数应用中的确也是这样的。


2."Hello World"

在这部分的使用指南中,我们要用 java 写两个程序;一个是生产者,他发送一个消息,另一个是消费者,它接收消息,并且把消息打印出来。我们将会忽略一些Java API 的细节,而是将注意力主要放在我们将要做的这件事上,这件事就是发送一个 "Hello World" 消息。

在下面的图中,"P" 代表生产者,而 "C" 代表消费者。中间的就是一个 Queue,一个消息缓存区。 

  

java 客户端类库

AMQP 是一个开源的,通用的消息协议。有几个用不同语言编写的 AMQP 的客户端。我们将要使用 RabbitMQ 提供的 java 版本的客户端。

下载这个客户端 (http://www.rabbitmq.com/java-client.html) ,把它解压到你的工作目录下,并且找到jar文件。

$ unzip rabbitmq-java-client-bin-*.zip
$ cp rabbitmq-java-client-bin-*/*.jar ./

这个客户端在 Maven 的中心仓库中也有。

groupId:com.rabbitmq

artifactId:amqp-client

现在我们有了客户端和依赖,我们开始写代码。

Sending

 

我们把消息发送者叫 Send,消息接收者叫 Recv。消息发送者连接 RabbitMQ ,发送一个消息,然后退出。

创建 Send.java 文件,并导入我们需要的类。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;

创建给 Queue 命名

public class Send 
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException 
      ...
  

然后我们创建一个连接。

ConnectionFactory factory = new ConnectionFactory();
factory.setHost("localhost");
Connection connection = factory.newConnection();
Channel channel = connection.createChannel();

到这,我们就和本机的代理建立了连接。如果我们想要连接到不同机器上的代理,我们只需要制定那台机器的域名或 IP 地址即可。

接下来,我们创建一个管道。为了发送消息,我们还需要声明一个 Queue ,然后我们就能发布消息到 Queue 上了。

channel.queueDeclare(QUEUE_NAME, false, false, false, null);
String message = "Hello World!";
channel.basicPublish("", QUEUE_NAME, null, message.getBytes());
System.out.println(" [x] Sent '" + message + "'");

Queue 声明的过程是幂等的,只有在它不存在的情况下才会被创建出来。消息的内容是字节数组,这意味着你能够使用任何你想使用的编码。

最后,我们关闭这个管道和连接。

下面是我们完整的 Send.java 文件

import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.ConnectionFactory;

public class Send
private final static String QUEUE_NAME = "hello";
  public static void main(String[] argv) throws Exception
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
        Connection connection = factory.newConnection();
        Channel channel = connection.createChannel();
        channel.queueDeclare(QUEUE_NAME, false, false, false, null);
        String message = "Hello World!";
        channel.basicPublish("", QUEUE_NAME, null, message.getBytes("UTF-8"));
        System.out.println(" [x] Sent '" + message + "'");
        channel.close();
        connection.close();
       

Receiving

不像消息的发送者只是发送一个消息,我们的接收者需要不断的监听消息,并把它们打印出来。



在 Recv.java 这个文件中需要导入的类和 Send.java 中类似。

import com.rabbitmq.client.ConnectionFactory;
import com.rabbitmq.client.Connection;
import com.rabbitmq.client.Channel;
import com.rabbitmq.client.Consumer;
import com.rabbitmq.client.DefaultConsumer;
DefaultConsumer 是一个实现了 Consumer 接口的类,我们将会用它来缓存从服务器推送过来的消息。

创建 Recv.java 文件,打开一个连接和一个管道,并且声明一个我们将要消费消息的 Queue。注意:这个 Queue 的名字要和 Send.java 文件中的一致。

public class Recv 
  private final static String QUEUE_NAME = "hello";

  public static void main(String[] argv)
      throws java.io.IOException,
             java.lang.InterruptedException 

    ConnectionFactory factory = new ConnectionFactory();
    factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();

    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    ...
    

注意到,我们在这又声明了一个 Queue,这是因为我们可能在运行 Send.java 文件之前运行 Recv.java 文件,所以我们要确保在从 Queue 消费消息之前,

它是已经存在的。

我们告诉服务器从 Queue 中向我们传递消息,由于它是以异步的方式向我们传递消息,所以我们采用从消息的缓存对象回调的方式,直到我们已经消费了这些消

息。这就是 DefaultConsumer 子类所做的事。

 Consumer consumer = new DefaultConsumer(channel) 
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
          throws IOException 
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
      
    ;
    channel.basicConsume(QUEUE_NAME, true, consumer);

下面是完整的 Recv.java 文件。

import com.rabbitmq.client.*;
import java.io.IOException;

public class Recv
private final static String QUEUE_NAME = "hello";
public static void main(String[] argv) throws Exception
        ConnectionFactory factory = new ConnectionFactory();
        factory.setHost("localhost");
    Connection connection = factory.newConnection();
    Channel channel = connection.createChannel();
    channel.queueDeclare(QUEUE_NAME, false, false, false, null);
    System.out.println(" [*] Waiting for messages. To exit press CTRL+C");
    Consumer consumer = new DefaultConsumer(channel)
      @Override
      public void handleDelivery(String consumerTag, Envelope envelope, AMQP.BasicProperties properties, byte[] body)
          throws IOException
        String message = new String(body, "UTF-8");
        System.out.println(" [x] Received '" + message + "'");
     
    ;
    channel.basicConsume(QUEUE_NAME, true, consumer);


Putting it all together

编译文件:

$ javac -cp rabbitmq-client.jar Send.java Recv.java

运行时,我们需要 rabbitmq-client.jar,打开终端,运行 Send 文件:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Send

然后运行 Recv 文件:

$ java -cp .:commons-io-1.2.jar:commons-cli-1.1.jar:rabbitmq-client.jar Recv

消息的接收者将会打印出发送者发送的消息,接收者将保持运行的状态,并且等待消息,使用 Ctrl+C 可以结束运行。因此,尽可能从另一个终端运行发送者程序。

原文地址:https://www.rabbitmq.com/tutorials/tutorial-one-java.html







以上是关于java 实现利用 RabbitMQ 发送和消费消息的主要内容,如果未能解决你的问题,请参考以下文章

rabbitmq--路由模式

python使用rabbitMQ介绍三(发布订阅模式)

rabbitmq 学习

Java SpringBoot集成RabbitMq实战和总结

Java SpringBoot集成RabbitMq实战和总结

RabbitMQ第二篇:java简单的实现RabbitMQ