RabbitMQ RabbitMQ高级整合应用

Posted niugang0920

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了RabbitMQ RabbitMQ高级整合应用相关的知识,希望对你有一定的参考价值。

RabbitMQ整合Spring AMQP实战

常用组件介绍

  • RabbitAdmin

  • Spring AMQP声明 通过@Bean注解进行声明

  • RabbitTemplate

  • SimpleMessageListenerContainer 对消息消费进行详细配置和优化

  • MessageListenerAdapter 消息监听适配器,建立在监听器基础之上

  • MessageConverter

RabbitAdmin

  • RabbitAdmin类可以很好的操作RabbitMQ,在Spring中直接进行注入即可

  • 注意:autoSatrtup必须设置为true,否则spring容器不会加载RabbitAdmin类

  • RabbitAdmin底层实现就是从Spring容器中获取Exchange、Bingding、RoutingKey以及Queue的@Bean声明;

  • 底层使用RabbitTemplate的execute方法执行对应的声明、修改、删除等一系列RabbitMQ基础功能操作;

RabbitMQ简单使用

pom.xml

<dependency>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>	

配置

@Configuration
public class RabbitMqConfig1 {
    /**
     * 设置连接
     * @return ConnectionFactory
     */
    @Bean
    public ConnectionFactory connectionFactory(){
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    /**
     * 创建RabbitAdmin
     * @return RabbitAdmin
     */
    @Bean
    public RabbitAdmin rabbitAdmin() {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory());
        //默认就是true
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }
}

测试

 @Autowired
    private RabbitAdmin rabbitAdmin;

    /**
     * RabbitAdmin api应用
     */
    @Test
    public void testAdmin() {

        rabbitAdmin.declareExchange(new DirectExchange("test.direct", false, false));

        rabbitAdmin.declareExchange(new TopicExchange("test.topic", false, false));

        rabbitAdmin.declareExchange(new FanoutExchange("test.fanout", false, false));

        rabbitAdmin.declareQueue(new Queue("test.direct.queue", false));

        rabbitAdmin.declareQueue(new Queue("test.topic.queue", false));

        rabbitAdmin.declareQueue(new Queue("test.fanout.queue", false));

        //绑定
        rabbitAdmin.declareBinding(new Binding("test.direct.queue",
                Binding.DestinationType.QUEUE,
                "test.direct", "direct", new HashMap<>()));

        //使用 BindingBuilder 创建绑定
        // https://docs.spring.io/spring-amqp/docs/2.1.16.BUILD-SNAPSHOT/reference/html/#builder-api
        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("test.topic.queue", false))        //直接创建队列
                        .to(new TopicExchange("test.topic", false, false))    //直接创建交换机 建立关联关系
                        .with("user.#"));    //指定路由Key


        //FanoutExchange 类型exchange不走路由键
        rabbitAdmin.declareBinding(
                BindingBuilder
                        .bind(new Queue("test.fanout.queue", false))
                        .to(new FanoutExchange("test.fanout", false, false)));

        //清空队列数据
        // rabbitAdmin.purgeQueue("test.topic.queue", false);
    }

SpringAMQP声明(Exchange、Queue、Binding)

在RabbitMQ基础AP里面声明一个Exchange、声明一个绑定、一个队列

//基础API声明一个exchange
channel.exchangeDeclare(String exchange,String type,boolean durable,boolean autoDelete,boolean internal,Map<String, Object> arguments)
//基础API 声明一个队列
channel.queueDeclare(String queue, boolean durable, boolean exclusive, boolean autoDelete,
                                 Map<String, Object> arguments) 
//基础API 声明binding
channel.queueBind(String queue, String exchange, String routingKey)

使用SpringAMQP去声明,就需要使用SpringAMQP的如下模式,即声明@Bean方式

   //声明Topic 类型的exchange 
   @Bean
    public TopicExchange topicExchange() {
        //exchange 持久化
        // Exchange springEchange = ExchangeBuilder.topicExchange("spring_amqp_test_echange").durable(true).build();
        return new TopicExchange("spring_amqp_test_echange", true, false);
    }

    //声明队列
    @Bean
    public Queue queue() {
        //   Queue spring_amqp_test_echange = QueueBuilder.durable("spring_amqp_test_echange").build();
        return new Queue("spring_amqp_test_queue");
    }

   //建立绑定
    @Bean
    public Binding binding(TopicExchange topicExchange, Queue queue) {
        return BindingBuilder.bind(queue).to(topicExchange).with("spring.*");
    }

消息模板 RabbitTemplate

  • RabbitTemplate,即消息模板。

  • 在与SpringAMQP整合的时候进行发送消息的关键类

  • 该类提供了丰富的发送消息的方法,包括可靠性投递消息方法、回调监听消息接口ConfirmCallback、返回值确认接口ReturnCallback等等。同样我们需要进入注入到Spring容器中,然后直接使用;

  • 在与Spring整合时需要实例化,但是在与SpringBoot整合时,在配置文件里添加配置即可;

RabbitTemplate简单使用

配置


@Configuration
public class RabbitMqConfig3 {
    /**
     * 设置连接
     *
     * @return ConnectionFactory
     */
    @Bean
    public ConnectionFactory connectionFactory() {
        CachingConnectionFactory connectionFactory = new CachingConnectionFactory();
        connectionFactory.setHost("localhost");
        connectionFactory.setPort(5672);
        connectionFactory.setUsername("guest");
        connectionFactory.setPassword("guest");
        connectionFactory.setVirtualHost("/");
        return connectionFactory;
    }

    /**
     * 创建RabbitAdmin
     *
     * @return RabbitAdmin
     */
    @Bean
    public RabbitAdmin rabbitAdmin(ConnectionFactory connectionFactory) {
        RabbitAdmin rabbitAdmin = new RabbitAdmin(connectionFactory);
        //默认就是true
        rabbitAdmin.setAutoStartup(true);
        return rabbitAdmin;
    }

    /**
     * 消息模板
     *
     * @param connectionFactory connectionFactory
     * @return RabbitTemplate
     */
    @Bean
    public RabbitTemplate rabbitTemplate(ConnectionFactory connectionFactory) {
        return new RabbitTemplate(connectionFactory);
    }

    /**
     * 针对消费者配置
     * 1. 设置交换机类型
     * 2. 将队列绑定到交换机
     * FanoutExchange: 将消息分发到所有的绑定队列,无routingkey的概念
     * HeadersExchange :通过添加属性key-value匹配
     * DirectExchange:按照routingkey分发到指定队列
     * TopicExchange:多关键字匹配
     */
    @Bean
    public TopicExchange exchange001() {
        return new TopicExchange("topic001", true, false);
    }

    @Bean
    public Queue queue001() {
        return new Queue("queue001", true); //队列持久
    }

    @Bean
    public Binding binding001(TopicExchange exchange001, Queue queue001) {
        return BindingBuilder.bind(queue001).to(exchange001).with("spring.*");
    }
}

测试

   @Test
    public void testSendMessage() {
        //1 创建消息
        //AMQP消息的消息属性
        //MessageBuilder(也可以构建Message) 使用流利的API从byte[]主体或其他消息构建Spring AMQP消息。
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.getHeaders().put("desc", "信息描述..");
        messageProperties.getHeaders().put("type", "自定义消息类型..");
        Message message = new Message("Hello RabbitMQ".getBytes(), messageProperties);

        rabbitTemplate.convertAndSend("topic001", "spring.amqp", message, new MessagePostProcessor() {
            @Override
            public Message postProcessMessage(Message message) throws AmqpException {
                System.err.println("------添加额外的设置---------");
                message.getMessageProperties().getHeaders().put("desc", "额外修改的信息描述");
                message.getMessageProperties().getHeaders().put("attr", "额外新加的属性");
                return message;
            }
        });
    }

队列queue001
技术图片

 @Test
public void testSendMessage2() throws Exception {
        //1 创建消息
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("text/plain");
        Message message = new Message("mq 消息1234".getBytes(), messageProperties);

        rabbitTemplate.send("topic001", "spring.abc", message);

        rabbitTemplate.convertAndSend("topic001", "spring.amqp", "hello object message send!");
        rabbitTemplate.convertAndSend("topic002", "rabbit.abc", "hello object message send!");
    }

队列queue001
技术图片
队列queue002
技术图片

简单消息监听容器:SimpleMessageListenerContainer

  • 这个类非常的强大,我们可以对他进行很多设置,对于消费者的配置项,这个类都可以满足
  • 监听队列(多个队列)、自动启动、自动声明功能
  • 设置事务特性、事务管理器、事务属性、事务容量(并发)、是否开启事务、回滚消息等
  • 设置消费者数量、最小最大数量、批量消费
  • 设置消息确认和自动确认模式、是否重回队列、异常捕获handler函数
  • 设置消费者标签生成策略、是否独占模式、消费者属性等
  • 设置具体的监听器、消息转换器等等

注意:SimpleMessageListenerContainer可以进行动态设置,比如在运行中的应用可以动态的修改其消费者数量的大小、接收消息的模式等。很多基于RabbitMQ的自制定化后端管控台在进行动态设置的时候,也是根据这一特性去实现的。所以可以看出SpringAMQP非常的强大;
思考一下:SimpleMessageListenerContainer为什么可以动态感知配置变更?
技术图片
技术图片

配置

 @Bean
    public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // container.setQueueNames(); 接收字符串的队列名
        //
        container.setQueues(queue001(), queue002(), queue003());
        //当前消费者数量
        container.setConcurrentConsumers(1);
        //最大消费者数量
        container.setMaxConcurrentConsumers(5);
        //是否使用重队列
        container.setDefaultRequeueRejected(false);
        //自动签收
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setExposeListenerChannel(true);

        //消费端的标签策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });

        //设置消息监听
        //必须设置消息监听 否则 报  No message listener specified - see property ‘messageListener‘
        container.setMessageListener(new ChannelAwareMessageListener() {
            @Override
            public void onMessage(Message message, Channel channel) throws Exception {

                String msg = new String(message.getBody());
                System.err.println("----------消费者: " + msg);
                //做消息处理....
            }
        });


        return container;
    }

消息监听适配器:MessageListenerAdapter

通过MessageListenerAdapter的代码我们可以看出如下核心属性:

  • defaultListenerMethod默认监听方法名称:用于设置监听方法名称
  • Delegate委托对象:实际真实的委托对象,用于处理消息、
  • queueOrTagToMethodName: 队列标识与方法名称组成的集合
  • 可以一一进行队列与方法名称的匹配;
  • 队列和方法名称绑定,即指定队列里的消息会被绑定的方法所接收处理;

配置

public class MessageDelegate1 {

    public void handleMessage(byte[] messageBody) {
        System.err.println("默认方法, 消息内容:" + new String(messageBody));
    }

    public void consumeMessage(byte[] messageBody) {
        System.err.println("字节数组方法, 消息内容:" + new String(messageBody));
    }

    public void consumeMessage(String messageBody) {
        System.err.println("字符串方法, 消息内容:" + messageBody);
    }

    public void method1(String messageBody) {
        System.err.println("method1 收到消息内容:" + new String(messageBody));
    }

    public void method2(String messageBody) {
        System.err.println("method2 收到消息内容:" + new String(messageBody));
    }


    public void consumeMessage(Map messageBody) {
        System.err.println("map方法, 消息内容:" + messageBody);
    }
}
  @Bean
  public SimpleMessageListenerContainer messageContainer(ConnectionFactory connectionFactory) {

        SimpleMessageListenerContainer container = new SimpleMessageListenerContainer(connectionFactory);
        // container.setQueueNames(); 接收字符串的队列名
        //
        container.setQueues(queue001(), queue002(), queue003());
        //当前消费者数量
        container.setConcurrentConsumers(1);
        //最大消费者数量
        container.setMaxConcurrentConsumers(5);
        //是否使用重队列
        container.setDefaultRequeueRejected(false);
        //自动签收
        container.setAcknowledgeMode(AcknowledgeMode.AUTO);
        container.setExposeListenerChannel(true);

        //消费端的标签策略
        container.setConsumerTagStrategy(new ConsumerTagStrategy() {
            @Override
            public String createConsumerTag(String queue) {
                return queue + "_" + UUID.randomUUID().toString();
            }
        });


        //1 适配器方式. 默认是有自己的方法名字的:handleMessage
        // 可以自己指定一个方法的名字: consumeMessage
        // 也可以添加一个转换器: 从字节数组转换为String
        //MessageDelegate1如何写 MessageListenerAdapter 源码里面也给出了一些建议
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate1());
        //默认的方法是 	public static final String ORIGINAL_DEFAULT_LISTENER_METHOD = "handleMessage";
        adapter.setDefaultListenerMethod("consumeMessage");
        //TextMessageConverter 自定义的消息转换器
        //new TextMessageConverter()-->consumeMessage(byte[] messageBody))->MessageProperties.setContentType("text/plian")
        //new Jackson2JsonMessageConverter()--->consumeMessage(Map messageBody))->MessageProperties.setContentType("application/json")
         // adapter.setMessageConverter(new Jackson2JsonMessageConverter());
        container.setMessageListener(adapter);
        return container;
    }

MessageConverter消息转换器

  • 我们在进行发送消息的时候,正常情况下消息体为二进制的数据方式进行传输,如果希望内部帮我们进行转换,或者指定自定义的转换器,就需要用到MessageConverter;

  • 自定义常用转换器:MessageConverter,一般来讲都需要实现这个接口

  • 重写下面两个方法:

    • toMessage:java对象转换为Message
    • fromMessage:Message对象转换为java对象
  • MessageConverter消息转换器:

  • Json转换器:Jackson2JsonMessageConverter:可以进行java对象的转换功能;

  • DefaultJackson2JavaTypeMapper映射器:可以进行java对象的映射关系;

  • 自定义二进制转换器:比如图片类型、PDF、PPT、流媒体等

使用转换器的目的是当传入不同的类型的数据(如json,类,PDF,图片等)时,在消息的接收方接收到时也总是以传入的类型接收结果对象;我们通过写入不同的转换器以达到此种效果。具体可百度。

JSON格式转换

默认监听方法的参数为Map

public class Order {

	private String id;
	
	private String name;
	
	private String content;

	public Order() {
	}
	
	public Order(String id, String name, String content) {
		this.id = id;
		this.name = name;
		this.content = content;
	}

	public String getId() {
		return id;
	}

	public void setId(String id) {
		this.id = id;
	}

	public String getName() {
		return name;
	}

	public void setName(String name) {
		this.name = name;
	}

	public String getContent() {
		return content;
	}

	public void setContent(String content) {
		this.content = content;
	}
	
}

配置

     // 1.1 支持json格式的转换器
        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        //  public void consumeMessage(Map messageBody) {
        //        System.err.println("map方法, 消息内容:" + messageBody);
        //    }
        //对应map参数方法
        adapter.setDefaultListenerMethod("consumeMessage");


        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
        adapter.setMessageConverter(jackson2JsonMessageConverter);

        container.setMessageListener(adapter);

测试

  @Test
    public void testSendJsonMessage() throws Exception {

        Order order = new Order();
        order.setId("001");
        order.setName("消息订单");
        order.setContent("描述信息");
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        System.err.println("order 4 json: " + json);

        MessageProperties messageProperties = new MessageProperties();
        //这里注意一定要修改contentType为 application/json
        messageProperties.setContentType("application/json");
        Message message = new Message(json.getBytes(), messageProperties);

        rabbitTemplate.send("topic001", "spring.order", message);
    }
JSON格式转换支持Java对象

默认监听方法的参数为Java对象

委托对象方法

public void consumeMessage(Order order) {
		System.err.println("order对象, 消息内容, id: " + order.getId() + 
				", name: " + order.getName() + 
				", content: "+ order.getContent());
	}

配置

     // 1.2 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象转换

        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");

        Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();

        DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();

        //信任所有的包,否则会报 报不信任
        javaTypeMapper.setTrustedPackages("*");

        jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);

        adapter.setMessageConverter(jackson2JsonMessageConverter);
        container.setMessageListener(adapter);

测试

 @Test
    public void testSendJavaMessage() throws Exception {

        Order order = new Order();
        order.setId("001");
        order.setName("订单消息");
        order.setContent("订单描述信息");
        ObjectMapper mapper = new ObjectMapper();
        String json = mapper.writeValueAsString(order);
        System.err.println("order 4 json: " + json);

        MessageProperties messageProperties = new MessageProperties();
        //这里注意一定要修改contentType为 application/json
        messageProperties.setContentType("application/json");
        //__TypeId__ 这个是固定写法
        messageProperties.getHeaders().put("__TypeId__", "com.niugang.spring.entity.Order");
        Message message = new Message(json.getBytes(), messageProperties);

        rabbitTemplate.send("topic001", "spring.order", message);
    }

输出

order对象, 消息内容, id: 001, name: 订单消息, content: 订单描述信息
JSON格式转换支持Java对象(二)

委托对象方法

	public void consumeMessage(Order order) {
		System.err.println("order对象, 消息内容, id: " + order.getId() + 
				", name: " + order.getName() + 
				", content: "+ order.getContent());
	}
	
	public void consumeMessage(Packaged pack) {
		System.err.println("package对象, 消息内容, id: " + pack.getId() + 
				", name: " + pack.getName() + 
				", content: "+ pack.getDescription());
	}

配置

 //1.3 DefaultJackson2JavaTypeMapper & Jackson2JsonMessageConverter 支持java对象多映射转换

         MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
         adapter.setDefaultListenerMethod("consumeMessage");
         Jackson2JsonMessageConverter jackson2JsonMessageConverter = new Jackson2JsonMessageConverter();
         DefaultJackson2JavaTypeMapper javaTypeMapper = new DefaultJackson2JavaTypeMapper();

         Map<String, Class<?>> idClassMapping = new HashMap<String, Class<?>>();
         idClassMapping.put("order", com.niugang.spring.entity.Order.class);
         idClassMapping.put("packaged", com.niugang.spring.entity.Packaged.class);

         javaTypeMapper.setIdClassMapping(idClassMapping);

         jackson2JsonMessageConverter.setJavaTypeMapper(javaTypeMapper);
         adapter.setMessageConverter(jackson2JsonMessageConverter);
         container.setMessageListener(adapter);

测试

   @Test
    public void testSendMappingMessage() throws Exception {

        ObjectMapper mapper = new ObjectMapper();

        Order order = new Order();
        order.setId("001");
        order.setName("订单消息");
        order.setContent("订单描述信息");

        String json1 = mapper.writeValueAsString(order);
        System.err.println("order 4 json: " + json1);

        MessageProperties messageProperties1 = new MessageProperties();
        //这里注意一定要修改contentType为 application/json
        messageProperties1.setContentType("application/json");
        messageProperties1.getHeaders().put("__TypeId__", "order");
        Message message1 = new Message(json1.getBytes(), messageProperties1);
        rabbitTemplate.send("topic001", "spring.order", message1);

        Packaged pack = new Packaged();
        pack.setId("002");
        pack.setName("包裹消息");
        pack.setDescription("包裹描述信息");

        String json2 = mapper.writeValueAsString(pack);
        System.err.println("pack 4 json: " + json2);

        MessageProperties messageProperties2 = new MessageProperties();
        //这里注意一定要修改contentType为 application/json
        messageProperties2.setContentType("application/json");
        messageProperties2.getHeaders().put("__TypeId__", "packaged");
        Message message2 = new Message(json2.getBytes(), messageProperties2);
        rabbitTemplate.send("topic001", "spring.pack", message2);
    }
全局消息转化器与自定义转化器

自定义文本转化器

public class TextMessageConverter implements MessageConverter {

	@Override
	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
		return new Message(object.toString().getBytes(), messageProperties);
	}

	@Override
	public Object fromMessage(Message message) throws MessageConversionException {
		String contentType = message.getMessageProperties().getContentType();
		if(null != contentType && contentType.contains("text")) {
			return new String(message.getBody());
		}
		return message.getBody();
	}

}

自定义图片转化器

/**
 * 图片转化器
 */
public class ImageMessageConverter implements MessageConverter {

    @Override
    public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
        throw new MessageConversionException(" convert error ! ");
    }

    @Override
    public Object fromMessage(Message message) throws MessageConversionException {
        System.err.println("-----------Image MessageConverter----------");

        Object _extName = message.getMessageProperties().getHeaders().get("extName");
        String extName = _extName == null ? "png" : _extName.toString();

        byte[] body = message.getBody();
        String fileName = UUID.randomUUID().toString();
        //目录必须存在
        String path = "d:/springbootlog/" + fileName + "." + extName;
        File f = new File(path);
        try {
            //拷贝到指定路径
            Files.copy(new ByteArrayInputStream(body), f.toPath());
        } catch (IOException e) {
            e.printStackTrace();
        }
        return f;
    }

}

自定义pdf转化器

public class PDFMessageConverter implements MessageConverter {

	@Override
	public Message toMessage(Object object, MessageProperties messageProperties) throws MessageConversionException {
		throw new MessageConversionException(" convert error ! ");
	}

	@Override
	public Object fromMessage(Message message) throws MessageConversionException {
		System.err.println("-----------PDF MessageConverter----------");
		
		byte[] body = message.getBody();
		String fileName = UUID.randomUUID().toString();
		String path = "d:/springbootlog/" + fileName + ".pdf";
		File f = new File(path);
		try {
			Files.copy(new ByteArrayInputStream(body), f.toPath());
		} catch (IOException e) {
			e.printStackTrace();
		}
		return f;
	}

}

委托对象

    public void consumeMessage(File file) {
        System.err.println("文件对象 方法, 消息内容:" + file.getName());
    }

配置

     //1.4 ext convert

        MessageListenerAdapter adapter = new MessageListenerAdapter(new MessageDelegate());
        adapter.setDefaultListenerMethod("consumeMessage");

        //全局的转换器:
        ContentTypeDelegatingMessageConverter convert = new ContentTypeDelegatingMessageConverter();

        TextMessageConverter textConvert = new TextMessageConverter();
        convert.addDelegate("text", textConvert);
        convert.addDelegate("html/text", textConvert);
        convert.addDelegate("xml/text", textConvert);
        convert.addDelegate("text/plain", textConvert);

        Jackson2JsonMessageConverter jsonConvert = new Jackson2JsonMessageConverter();
        convert.addDelegate("json", jsonConvert);
        convert.addDelegate("application/json", jsonConvert);

        ImageMessageConverter imageConverter = new ImageMessageConverter();
        convert.addDelegate("image/png", imageConverter);
        convert.addDelegate("image", imageConverter);

        PDFMessageConverter pdfConverter = new PDFMessageConverter();
        convert.addDelegate("application/pdf", pdfConverter);


        adapter.setMessageConverter(convert);
        container.setMessageListener(adapter);

测试

@Test
    public void testSendExtConverterMessage() throws Exception {
        byte[] body = Files.readAllBytes(Paths.get("C:\Users\Administrator\Desktop\公众号", "spring.png"));
        MessageProperties messageProperties = new MessageProperties();
        messageProperties.setContentType("image/png");
        messageProperties.getHeaders().put("extName", "png");
        Message message = new Message(body, messageProperties);
        rabbitTemplate.send("", "image_queue", message);

        byte[] body1 = Files.readAllBytes(Paths.get("D:\Documents\技术书籍", "Java huashan-2019-06-20.pdf"));
        MessageProperties messageProperties1 = new MessageProperties();
        messageProperties.setContentType("application/pdf");
        Message message1 = new Message(body1, messageProperties);
        rabbitTemplate.send("", "pdf_queue", message1);
    }

SpringBoot整合配置详解(生产端)

  • publisher-confirms,实现一个监听器用于监听Broker端给我们返回的确认请求:RabbitTemplate.ConfirmCallback
  • publisher-returns,保证消息对Broker端是可达的,如果出现路由键不可达的情况,则使用监听器对不可达的消息进行后续的处理,保证消息的路由成功: RabbitTemplate.ReturnCallback

注意一点,在发送消息的时候对template进行配置mandatory=true保证监听有效;生产端还可以配置其他属性,比如发送重试,超时时间、次数、间隔等。

生产端代码示例

application.properties

spring.rabbitmq.addresses=localhost:5672
#spring.rabbitmq.host=localhost
#spring.rabbitmq.port=5762
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000

# 消息确认模式
spring.rabbitmq.publisher-confirms=true
# 消息返回模式
spring.rabbitmq.publisher-returns=true
# 为true 消息返回模式才生效
spring.rabbitmq.template.mandatory=true

配置

/**
 * springboot  消息生产者
 *
 * @author niugang
 */
@Configuration
public class RabbitMqConfig {
       /**
     * 自动注入RabbitTemplate模板类
     */
    @Autowired
    private RabbitTemplate rabbitTemplate;

    /**
     * 回调函数: confirm确认
     */
    final ConfirmCallback confirmCallback = new RabbitTemplate.ConfirmCallback() {
        @Override
        public void confirm(CorrelationData correlationData, boolean ack, String cause) {
            System.err.println("correlationData: " + correlationData);
            System.err.println("ack: " + ack);
            if (!ack) {
                System.err.println("异常处理....");
            }
        }
    };

    /**
     * 回调函数: return返回
     */
    final ReturnCallback returnCallback = new RabbitTemplate.ReturnCallback() {
        @Override
        public void returnedMessage(org.springframework.amqp.core.Message message, int replyCode, String replyText,
                                    String exchange, String routingKey) {
            System.err.println("return exchange: " + exchange + ", routingKey: "
                    + routingKey + ", replyCode: " + replyCode + ", replyText: " + replyText);
        }
    };


   /*
    队列监听在消费者端配置,没有将会自动创建
    @Bean
    public TopicExchange topicExchange() {
        return new TopicExchange("exchange-1");
    }

    @Bean
    public Queue queue() {
        return new Queue("queue-1");
    }


    @Bean
    public Binding binding(Queue queue, TopicExchange topicExchange) {
        return BindingBuilder.bind(queue).to(topicExchange).with("springboot.#");
    }*/

     /**
     * 发送消息方法调用: 构建Message消息
     *
     * @param message    消息体
     * @param properties 消息属性
     */
    public void send(Object message, Map<String, Object> properties) {

        MessageProperties messageProperties = new MessageProperties();
        if (properties != null && properties.size() > 0) {
            Set<Map.Entry<String, Object>> entries = properties.entrySet();
            for (Map.Entry<String, Object> entry : entries) {
                String key = entry.getKey();
                Object value = entry.getValue();
                messageProperties.setHeader(key, value);
            }
        }

        //org.springframework.amqp.core
        Message msg = MessageBuilder.withBody(message.toString().getBytes()).andProperties(messageProperties).build();

        //id + 时间戳 全局唯一
        CorrelationData correlationData = new CorrelationData(UUID.randomUUID().toString());

        rabbitTemplate.setConfirmCallback(confirmCallback);
        rabbitTemplate.setReturnCallback(returnCallback);

        //routingKey修改 为 spring.abc 消息将走 returnCallback
        rabbitTemplate.convertAndSend("exchange-1", "springboot.abc", msg, correlationData);
    }
   
}

测试

在rabbitmq控制台新建,Exchange名为exchange-1,新建队列queue-1,并建立两者之间的绑定,routingKey为springboot.#

@RunWith(SpringRunner.class)
@SpringBootTest
public class ApplicationTests {

	@Test
	public void contextLoads() {
	}
	
    @Autowired
    private RabbitMqConfig rabbitMqConfig ;

    private static SimpleDateFormat simpleDateFormat = new SimpleDateFormat("yyyy-MM-dd HH:mm:ss.SSS");

    @Test
    public void testSender1() throws Exception {
        Map<String, Object> properties = new HashMap<>();
        properties.put("number", "12345");
        properties.put("send_time", simpleDateFormat.format(new Date()));
        rabbitMqConfig.send("Hello RabbitMQ For Spring Boot!"+System.currentTimeMillis(), properties);
    }
	
}

注意:进行单元测试,ack一直是false;改为url请求,ack就正常了

SpringBoot整合配置详解(消费端)

消费端核心配置

#    NONE, MANUAL, AUTO;  手工消息消息确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#监听器调用程序线程的最小数量。
spring.rabbitmq.listener.simple.concurrency=5
#监听器调用程序线程的最大数量。
spring.rabbitmq.listener.simple.max-concurrency=10
# spring.rabbitmq.listener.type=simple 默认为 SimpleContainer  模式对应  spring.rabbitmq.listener.simple 前缀相关的

注意点

  • 首先配置手工确认模式,用于ACK的手工处理,这样我们可以保证消息的可靠性送达,或者在消费端消费失败的时候可以做到重回队列、根据业务记录日志等处理
  • 可以设置消费端的监听个数和最大个数,用于控制消费端的并发情况。

@RabbitListener注解的使用

  • 消费端监听@RabbitMQListener注解,这个对于在实际工作中非常的好用。
  • @RabbitListener是一个组合注解,里面可以注解配置
  • @QueueBinding、@Queue、@Exchange直接通过这个组合注解一次性搞定消费端交换机、队列、绑定、路由、并且配置监听功能等。

消费者端代码示例

类配置写在代码里非常不友好,所以强烈建议大家使用配置文件配置。

properties

#spring.rabbitmq.addresses=localhost:5672
spring.rabbitmq.host=localhost
spring.rabbitmq.port=5672
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest
spring.rabbitmq.virtual-host=/
spring.rabbitmq.connection-timeout=15000
#    NONE, MANUAL, AUTO;  手工消息消息确认
spring.rabbitmq.listener.simple.acknowledge-mode=manual
#监听器调用程序线程的最小数量。
spring.rabbitmq.listener.simple.concurrency=5
#监听器调用程序线程的最大数量。
spring.rabbitmq.listener.simple.max-concurrency=10
# spring.rabbitmq.listener.type=simple 默认为 SimpleContainer  模式对应  spring.rabbitmq.listener.simple 前缀相关的
spring.rabbitmq.listener.order.queue.name=queue-2
spring.rabbitmq.listener.order.queue.durable=true
spring.rabbitmq.listener.order.exchange.name=exchange-2
spring.rabbitmq.listener.order.exchange.durable=true
spring.rabbitmq.listener.order.exchange.type=topic
spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
spring.rabbitmq.listener.order.key=springboot.*

配置

public class Order implements Serializable {

	private String id;
	private String name;
	
	public Order() {
	}
	public Order(String id, String name) {
		super();
		this.id = id;
		this.name = name;
	}
	public String getId() {
		return id;
	}
	public void setId(String id) {
		this.id = id;
	}
	public String getName() {
		return name;
	}
	public void setName(String name) {
		this.name = name;
	}
	
	
}
/**
 * 消费者类
 *
 * @author niugang
 */
@Configuration
public class RabbitMQReceiver {


    /**
     * 从1.5.0版开始,您可以在类级别指定@RabbitListener注释。
     * 与新的@RabbitHandler批注一起,这使单个侦听器可以根据传入消息的有效负载类型调用不同的方法。
     *
     * @RabbitListener(id="multi", queues = "someQueue")
     * @SendTo("my.reply.queue") public class MultiListenerBean {
     * @RabbitHandler public String thing2(Thing2 thing2) {
     * ...
     * }
     * @RabbitHandler public String cat(Cat cat) {
     * ...
     * }
     * @RabbitHandler public String hat(@Header("amqp_receivedRoutingKey") String rk, @Payload Hat hat) {
     * ...
     * }
     * @RabbitHandler(isDefault = true)
     * public String defaultMethod(Object object) {
     * ...
     * }
     * }
     * 在这种情况下,如果转换后的有效负载是Thing2,Cat或Hat,则会调用各个@RabbitHandler方法。
     * 您应该了解,系统必须能够根据有效负载类型识别唯一方法。
     * 检查该类型是否可分配给没有注释或带有@Payload注释的单个参数。
     * 请注意,如方法级别@RabbitListener(前面所述)中所述,应用了相同的方法签名。
     */
    //队列 exchange  绑定 没有 自动创建
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "queue-1",
                    durable = "true"),
            exchange = @Exchange(value = "exchange-1",
                    durable = "true",
                    type = ExchangeTypes.TOPIC,
                    ignoreDeclarationExceptions = "true"),
            key = "springboot.*" //routing key
    )
    )
    @RabbitHandler
    //@RabbitListener 提供了很多灵活的签名 如Message Channel  @Payload  @Header 等 具体可查看源码
    // org.springframework.amqp.core.Message
    // org.springframework.messaging.Message
    public void onMessage(Message message, Channel channel) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端Payload: " + new String(message.getBody()));
        System.err.println("消费端MessageProperties.: " + message.getMessageProperties());
        //AmqpHeaders header属性封装
        //手工ACK
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    }


    /**
     * spring.rabbitmq.listener.order.queue.name=queue-2
     * spring.rabbitmq.listener.order.queue.durable=true
     * spring.rabbitmq.listener.order.exchange.name=exchange-1
     * spring.rabbitmq.listener.order.exchange.durable=true
     * spring.rabbitmq.listener.order.exchange.type=topic
     * spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions=true
     * spring.rabbitmq.listener.order.key=springboot.*
     *
     * @param order   order
     * @param channel channel
     * @param headers headers
     * @throws Exception Exception
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "${spring.rabbitmq.listener.order.queue.name}",
                    durable = "${spring.rabbitmq.listener.order.queue.durable}"),
            exchange = @Exchange(value = "${spring.rabbitmq.listener.order.exchange.name}",
                    durable = "${spring.rabbitmq.listener.order.exchange.durable}",
                    type = "${spring.rabbitmq.listener.order.exchange.type}",
                    ignoreDeclarationExceptions = "${spring.rabbitmq.listener.order.exchange.ignoreDeclarationExceptions}"),
            key = "${spring.rabbitmq.listener.order.key}"
    )
    )
    @RabbitHandler
    //@Headers 必须通过Map接收
    //@Header("amqp_receivedRoutingKey") String rk 直接获取header中某一个key
    //默认前缀为amqp_
    /**
     * {amqp_receivedDeliveryMode=PERSISTENT,
     * amqp_receivedExchange=exchange-2,
     * amqp_deliveryTag=1,
     * amqp_consumerQueue=queue-2,
     * amqp_redelivered=false,
     amqp_receivedRoutingKey=springboot.def,
     spring_listener_return_correlation=175a21c4-ffd5-4a3e-ac3a-2f63d60c18a5,
     spring_returned_message_correlation=0987654321,
     id=53443ced-0b23-3079-71c2-09997897a553,
     amqp_consumerTag=amq.ctag-V0hqyVObrHXJeC60MwPSVQ,
     contentType=application/x-java-serialized-object,
     timestamp=1591240122842}
     */
    public void onOrderMessage(@Payload Order order,
                               Channel channel,
                               @Headers Map<String, Object> headers) throws Exception {
        System.err.println("--------------------------------------");
        System.err.println("消费端order: " + order.getId());
        System.err.println("消费端headers: " + headers);
        Long deliveryTag = (Long) headers.get(AmqpHeaders.DELIVERY_TAG);
        //手工ACK
        channel.basicAck(deliveryTag, false);
    }


}

技术图片








以上是关于RabbitMQ RabbitMQ高级整合应用的主要内容,如果未能解决你的问题,请参考以下文章

RabbitMQ RabbitMQ高级整合应用

Springboot 整合 RabbitMQ高级特性 & 真实业务应用

RabbitMQ---延迟队列,整合springboot

Spring Boot 整合 RabbitMQ

荧客技荐RabbitMQ 实战之 Spring Boot 整合 RabbitMQ

Spring Boot 整合 RabbitMQ