spring-cloud-stream 整合 rabbitmq

Posted 被遗忘的优雅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring-cloud-stream 整合 rabbitmq相关的知识,希望对你有一定的参考价值。

1,依赖与配置

1pom.xml

<dependency>
    <groupId>org.springframework.cloud</groupId>
    <artifactId>spring-cloud-starter-stream-rabbit</artifactId>
</dependency>

2,配置文件相关内容,这里使用系统默认的两个管道,output 和 input 分别对应 Source 和 Sink 两个接口

# spring.cloud.stream.bindings.[output].destination:         交换机的名称
# spring.cloud.stream.bindings.[output].group:               组,用于生成队列,组名相同时可以实现分布式
# spring.cloud.stream.bindings.[input].destination:          交换机的名称
# spring.cloud.stream.bindings.[input].group:                组,用于生成队列,组名相同时可以实现分布式
# spring.cloud.stream.bindings.[input].consumer.concurrency: 消费者的并发量
# spring.rabbitmq.addresses:                                 服务器地址
# spring.rabbitmq.username:                                  账号
# spring.rabbitmq.password:                                  密码
# spring.rabbitmq.virtual-host:                              虚拟主机
spring:
    stream:
      default-binder: rabbit
      bindings:
        output:
          destination: order.exchange
          group: order.queue
        input:
          destination: order.exchange
          group: order.queue
          consumer:
            concurrency: 3
  rabbitmq:
    addresses: 192.168.200.100:5672
    username: rabbit
    password: 123456
    virtual-host: /

2,代码部分

1,作为数据的实体类,注意需要实现 Serializable 接口

package com.hwq.rabbitmq.entity;

import lombok.Getter;
import lombok.Setter;
import lombok.ToString;

import java.io.Serializable;

@Getter
@Setter
@ToString
public class Order implements Serializable {
    private String id;
    private String name;
}

2,消费者监听

package com.hwq.rabbitmq.service;

import com.hwq.rabbitmq.entity.Order;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.annotation.StreamListener;
import org.springframework.cloud.stream.messaging.Sink;
import org.springframework.stereotype.Service;

@EnableBinding(Sink.class)
@Service
public class InputService {

    @StreamListener(Sink.INPUT)
    public void receiveOrder(Order order) throws InterruptedException {
        Thread.sleep(1000);
        System.out.println("接收到消息:" + order);
    }

}

3,封装发送消息的生产者

package com.hwq.rabbitmq.service;

import com.hwq.rabbitmq.entity.Order;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.cloud.stream.annotation.EnableBinding;
import org.springframework.cloud.stream.messaging.Source;
import org.springframework.messaging.Message;
import org.springframework.messaging.MessageHeaders;
import org.springframework.messaging.support.MessageBuilder;
import org.springframework.stereotype.Service;

import java.util.Map;

@EnableBinding(Source.class)
@Service
public class OutputService {

    @Autowired
    private Source source;

    public void sendOrder(Order order, Map<String, Object> properties) {
        MessageHeaders headers = new MessageHeaders(properties);
        Message<Order> message = MessageBuilder.createMessage(order, headers);
        boolean result = source.output().send(message);
        System.out.println("消息发送成功:" + result);
    }

}

4,测试的 控制器

package com.hwq.rabbitmq.controller;

import com.hwq.rabbitmq.entity.Order;
import com.hwq.rabbitmq.service.OutputService;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RestController;

import java.util.HashMap;

@RequestMapping("queue")
@RestController
public class QueueController {

    @Autowired
    private OutputService outputService;

    /**
     * 往消息队列中发送数据
     */
    @RequestMapping("send")
    public String send() {
        Order order = new Order();
        order.setId("123456789123456798");
        order.setName("你的订单");
        for (int i = 0; i < 20; i ++) {
            outputService.sendOrder(order, new HashMap<>());
        }
        return "ok";
    }

}

3,启动项目并访问 http://ip:port/queue/send

以上是关于spring-cloud-stream 整合 rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章

spring-cloud-stream kafka 消费者并发

使用 spring-boot:1.5.1 和 spring-cloud-stream 时无法启动 bean 'inputBindingLifecycle'

spring-cloud-stream 与测试中的 spring-boot-data-mongodb 冲突

Spring和SpringBoot整合RabbitMQ

Spring和SpringBoot整合RabbitMQ

SpringCloud学习之SpringCloudStream&集成kafka