035 spring amqp

Posted 最爱五仁月饼

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了035 spring amqp相关的知识,希望对你有一定的参考价值。

一 .概述

  本部分的主要的内容都是从spring amqp的官文档之中摘录过来的.


 二 .spring amqp的抽象

[1] 消息 : 

  在spring amqp之中,使用Message来抽象消息的内容.

public class Message {

    private final MessageProperties messageProperties;

    private final byte[] body;

    public Message(byte[] body, MessageProperties messageProperties) {
        this.body = body;
        this.messageProperties = messageProperties;
    }

    public byte[] getBody() {
        return this.body;
    }

    public MessageProperties getMessageProperties() {
        return this.messageProperties;
    }
}

从上面的定义之中,我们可以看到,一个消息被抽象称为两个部分,一个就是消息的主体,被抽象为是一个字节数组,另外一个部分就是MessageProperties .

[2]交换机Exchange

public interface Exchange {

    String getName();

    String getExchangeType();

    boolean isDurable();

    boolean isAutoDelete();

    Map<String, Object> getArguments();

}

常见的,我们可以使用下面的实现类:

public abstract class ExchangeTypes {

    public static final String DIRECT = "direct";

    public static final String TOPIC = "topic";

    public static final String FANOUT = "fanout";

    public static final String HEADERS = "headers";

    public static final String SYSTEM = "system";

    /**
     * The constant to represent {@code x-delayed-message} exchange mode.
     * @deprecated since 1.6.4, it\'s not a user-available exchange type,
     * the delayed {@code boolean} is used for that.
     */
    @Deprecated
    public static final String DELAYED = "x-delayed-message";
}

我们可以看到最常用的direct类型, topic类型,fanout类型.

[3]队列

public class Queue  {

    private final String name;

    private volatile boolean durable;

    private volatile boolean exclusive;

    private volatile boolean autoDelete;

    private volatile Map<String, Object> arguments;

    /**
     * 队列是持久的,非排他的和非自动删除的。
     *
     * @param name 队列名
     */
    public Queue(String name) {
        this(name, true, false, false);
    }

    // Getters and Setters omitted for brevity

}

我们可以通过上面的队列完成一个队列的创建.

[4]绑定

public class Binding extends AbstractDeclarable {

    public enum DestinationType {
        QUEUE, EXCHANGE;
    }

另外,我们也可以通过构建者模式来创建一个队列

Binding b = BindingBuilder.bind(someQueue).to(someTopicExchange).with("foo.*");

上面的代码表示的就是 绑定 一个队列到一个交换机,然后使用的一个绑定建.


三 .连接

  在spring amqp之中使用ConnectionFactory来抽象Rabbitmq客户端之中的ConnectionFacory对象,我们一般会使用CachingConnectionFacotory对象来作为实现.

如何声明一个ConnectionFactory对象.

[1]xml的方式:

<rabbit:connection-factory
    id="connectionFactory" channel-cache-size="50"/>

上面的代码会默认向容器之中添加一个ConnectionFactory对象.

另外,我们通过这种方式进行ConnetionFactory的一些属性的设置.


 

 

四 .配置RabbitTemplate对象

[1]return机制

  我们使用return机制非常的简单.

        template.setMandatory(true);
        template.setReturnCallback(new ReturnCallback() {
            @Override
            public void returnedMessage(Message message, int replyCode, String replyText, String exchange,
                    String routingKey) {
                // 当消息没有被送回到的时候,会回调这个方法.
                // 其中基本的信息都已经有了.
            }
        });

在上面,我们需要注意的就是两点内容:

[1]设置属性mandatory属性为true

[2]设置一个回调的函数

[2]Confirm机制

我们使用确认机制也非常的简单,如下:  

template.setConfirmCallback(new ConfirmCallback() {
            @Override
            public void confirm(CorrelationData correlationData, boolean ack, String cause) {
                System.out.println("确认消息");
            }
        });

五 . 消息的发送  

void send(Message message) throws AmqpException;

void send(String routingKey, Message message) throws AmqpException;

void send(String exchange, String routingKey, Message message) throws AmqpException;

在发送过程之中,我们需要创建一个消息.

Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();
Message message = MessageBuilder.withBody("foo".getBytes())
    .setContentType(MessageProperties.CONTENT_TYPE_TEXT_PLAIN)
    .setMessageId("123")
    .setHeader("bar", "baz")
    .build();

上面的两种方式都可以使用构建者模式创建一个消息对象.


 

六 .消息的接收

  我们一般情况下都会使用注解驱动的方式完成消息的消费.

    <rabbit:annotation-driven/>
@Component
public class MyService {

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue(value = "myQueue", durable = "true"),
        exchange = @Exchange(value = "auto.exch", ignoreDeclarationExceptions = "true"),
        key = "orderRoutingKey")
  )
  public void processOrder(String data) {
    ...
  }

  @RabbitListener(bindings = @QueueBinding(
        value = @Queue,
        exchange = @Exchange(value = "auto.exch"),
        key = "invoiceRoutingKey")
  )
  public void processInvoice(String data) {
    ...
  }

}

在上面,我们使用@RabbitListener来完成一个消息的监听,每当消息到达的时候,都会回调对应的方法.

更常见的:

我们仅仅只需要监听一个队列就可以了.  

   @RabbitListener(queues = "myQueue")
    public void processOrder(String data) {
        ...
    }

其中,我们最为关系的就是方法的参数,在spring amqp之中,变得更加的灵活了.

原始的org.springframework.amqp.core.Message。
接收消息的com.rabbitmq.client.Channel

另外,我们可以使用下面的注解完成一些参数的映射.

(1)@payload 将消息的body进行转换

(2)使用@Header 叫消息的头进行转换

更一般的,我们喜欢使用下面的这种方式进行.

@RabbitListener(queues = "myQueue")
public void processOrder(Message<Order> order) { ...
}

注意:这个接口是下面的这种类型:

public interface Message<T> {
    T getPayload();
    MessageHeaders getHeaders();
}

 

以上是关于035 spring amqp的主要内容,如果未能解决你的问题,请参考以下文章

Spring AMQP 源码分析 01 - Impatient

Spring amqp批处理接收消息

Spring AMQP 源码分析 08 - XML 配置

Spring AMQP杂记之Spring实现简述

Spring AMQP杂记之AMQP基本概念

spring-cloud-sleuth 与 spring-amqp 集成