spring boot集成用docker构建的rabbitmq

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了spring boot集成用docker构建的rabbitmq相关的知识,希望对你有一定的参考价值。


docker run -d --hostname my-rabbit --name rabbit -e RABBITMQ_DEFAULT_USER=admin -e RABBITMQ_DEFAULT_PASS=admin -p 15672:15672 -p 5672:5672 -p 25672:25672 -p 61613:61613 -p 1883:1883 rabbitmq:management

或者不设置密码

docker run -d --hostname my-rabbit --name rabbit -p 15672:15672 rabbitmq:management

使用 docker inspect rabbit查看容器的ip地址,

spring


这个留着待会儿用spring boot连接的时候进行配置。打开浏览器 localhost:15672 来到rabbitmq的web管理界面,刚刚我们的rabbit容器是以默认的方式打开的,所以账号密码都是 guest。

spring

登录进去之后就设计队列和交换机(类比计算机网络)
我设计了两个队列,分别是C.Queue 用来接收spider服务传过来给web端的,spider_queue spider组件来接收web传过来的项目。

编号

Exchange

RoutingKey

Queue

描述

1

C.Exchange

C.*

C.Queue

web服务端监听,消息来自于spider服务

2

topicExchange

spider.*

spider_queue

spider服务监听,消息来自于web服务端

前缀最好设置成一样的,这样一旦队列多了,方便维护,以免眼花

建好之后:

spring

spring

然后用spring boot来调用
在pom.xml加入:

<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
<version>2.0.0.RELEASE</version>
</dependency>

并在配置文件中加入配置:
这是web服务的配置

spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

com.shengqian.demo.rabbitmq.ToSpiderQueueName=C.Queue
com.shengqian.demo.rabbitmq.exchangename=C.Exchange
com.shengqian.demo.rabbitmq.routekey=C.*
com.shengqian.demo.rabbitmq.recv=recvMessage

下面来看看spider服务的配置

spring.rabbitmq.host=172.17.0.3
spring.rabbitmq.port=5672
spring.rabbitmq.virtual-host=/
spring.rabbitmq.username=guest
spring.rabbitmq.password=guest

com.shengqian.demo.rabbitmq.ToSpiderQueueName=spider_queue
com.shengqian.demo.rabbitmq.exchangename=topicExchange
com.shengqian.demo.rabbitmq.routekey=spider.*
com.shengqian.demo.rabbitmq.recv=recvMessage

实际上没什么区别,唯一的区别就是监听的队列参数不一样,详细的都在那个表里面了。

这里的host就是刚刚我们查看的docker 容器的ip。之所以能这么做是因为我在我的路由表中加了一条路由,使得本机发出的172.17.0.0/16的ip能丢到docker的默认network中,有关详情请参见我的 用springboot连接redis集群一文,里面有较为详细的操作方法。

然后加入rabbitmq配置:

@Configuration
public class RabbitMQConfig
@Autowired
private QueueService queueService;

@Value("$com.shengqian.demo.rabbitmq.ToSpiderQueueName")
private String queueName;


@Value("$com.shengqian.demo.rabbitmq.exchangename")
private String exchangeName;


@Value("$com.shengqian.demo.rabbitmq.routekey")
private String routeKey;


@Value("$com.shengqian.demo.rabbitmq.recv")
private String messageHandler;


@Bean
Queue getQueue()
return new Queue(queueName);



@Bean
TopicExchange getTopicExchange()
return new TopicExchange(exchangeName);





@Bean
Binding getBinding(Queue queue, TopicExchange exchange)
return BindingBuilder.bind(queue).to(exchange).with(routeKey);



@Bean
MessageListenerAdapter listenerAdapter(AmqpMessageService receiver)
return new MessageListenerAdapter(receiver, messageHandler);



@Bean
SimpleMessageListenerContainer container(ConnectionFactory connectionFactory,
MessageListenerAdapter listenerAdapter)
SimpleMessageListenerContainer container = new SimpleMessageListenerContainer();
container.setConnectionFactory(connectionFactory);
container.setQueueNames(queueName);
container.setMessageListener(listenerAdapter);
container.setConcurrentConsumers(1);
container.setMaxConcurrentConsumers(1);
container.setPrefetchCount(1);
container.setExposeListenerChannel(true);
return container;

现在就可以使用了,实现一个简单的两个服务之间用消息队列进行对话的demo:
在web端

public interface AmqpMessageService 
/**
* 接收消息队列消息
*
* @param message 消息内容
*/
void recvMessage(String message);

/**
* 发送消息到消息队列
*
* @param message 消息内容
*/
void sendMessage(String routingKey, String message);


public interface QueueService extends AmqpMessageService
void receive(String message);



@Service
public class QueueServiceImpl implements QueueService, InitializingBean

@Autowired
public RabbitTemplate rabbitTemplate;

private final static Logger LOGGER = LoggerFactory.getLogger(QueueServiceImpl.class);

public void receive(String message)
LOGGER.info("接收到来自C.Queue队列的消息:" + message);


@Override
public void afterPropertiesSet() throws Exception
//执行一些初始化操作,如程序重新启动,需要加载扫描参数


/**
* 接收消息队列消息
*
* @param message 消息内容
*/
@Override
public void recvMessage(String message)
receive(message);


/**
* 发送消息到消息队列
*
* @param message 消息内容
*/
@Override
public void sendMessage(String routingKey,String message)
rabbitTemplate.convertAndSend("topicExchange", routingKey, JSON.toString(message));



//最后定义一个controller
@Controller
public class TestController

@Autowired
private QueueService queueService;

@ApiOperation(value = "测试rabbit", notes = "测试rabbitmq")
@RequestMapping(value = Path.TEST_RABBITMQ, method = RequestMethod.GET)
@ResponseBody
public Integer send()
Map<String, Object> ans = new HashMap<>();
ans.put("hah", "hello spider");
queueService.sendMessage("spider.*", JSON.toString(ans));
return 1;

然后在spider服务端:

public interface AmqpMessageService 
/**
* 接收消息队列消息
*
* @param message 消息内容
*/
void recvMessage(String message);

/**
* 发送消息到消息队列
*
* @param message 消息内容
*/
void sendMessage(String routingKey, String message);



public interface QueueService extends AmqpMessageService
void receive(String message);


@Service
public class QueueServiceImpl implements QueueService, InitializingBean

@Autowired
public RabbitTemplate rabbitTemplate;

private final static Logger LOGGER = LoggerFactory.getLogger(QueueServiceImpl.class);

public void receive(String message)
LOGGER.info("接收到来自spider_queue队列的消息:" + message);
//处理一段时间然后,返回
Map<String, Object> ans = new HashMap<String, Object>();
ans.put("hello",message);
sendMessage("C.*",JSON.toString(ans));


@Override
public void afterPropertiesSet() throws Exception
//执行一些初始化操作,如程序重新启动,需要加载扫描参数


/**
* 接收消息队列消息
*
* @param message 消息内容
*/
@Override
public void recvMessage(String message)
receive(message);


/**
* 发送消息到消息队列
*
* @param message 消息内容
*/
@Override
public void sendMessage(String routingKey, String message)
rabbitTemplate.convertAndSend("C.Exchange", routingKey, JSON.toString(message));


最后分别打开两个spring boot服务即可,我的spider项目也是基于spring boot构建的服务,当订阅了消息队列,有消息来的时候会驱动整个服务继续运行,所以说该服务是消息驱动的服务。

打开两个服务后,我在web服务中构建了swagger2,

spring


直接点开第三个接口运行就可以了,有关如何在springboot环境下集成swagger请看我前面的博客。运行结果如下:

spring

spring

可以正常使用。接下来计划完成spider项目与docker环境下的cockroachDB集群的交互问题,cockroachDB是新一代newSql。


以上是关于spring boot集成用docker构建的rabbitmq的主要内容,如果未能解决你的问题,请参考以下文章

docker与Spring boot的集成:docker-maven-plugin使用

spring-boot项目的docker集成化部署

Spring Boot教程 springboot程序构建一个docker镜像

用_DockerGradle_来构建运行发布一个_Spring_Boot_应用

Docker 容器整合 Spring Boot 应用

译见 | 用 Docker,Spring Boot/Cloud 和 Axon CQRS/ES(事件溯源)来构建微服务