微服务Ⅲ Docker&SpringAMQP
Posted \Qinsiyang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了微服务Ⅲ Docker&SpringAMQP相关的知识,希望对你有一定的参考价值。
目录
Docker
初识Docker
项目部署时依赖关系复杂容易出现兼容问题,开发测试生产环境差异
- 一并打包,任意迁移
- 沙箱机制,互不干扰
Docker如何解决大型项目依赖复杂、兼容性问题?
开发中可以将应用、依赖、函数库、配置一起打包,行程可移植镜像;沙箱机制使应用相互隔离
Docker如何解决开发、测试、生产环境差异问题?
Docker镜像中包含完整运行环境(系统函数库)
Docker架构
- 镜像:将应用程序及依赖、环境、配置打包;镜像只能读
- 容器:运行镜像程序就要新建容器,将镜像中应用拷贝到容器中运行,相互隔离
SpringAMQP
spring-amqp是基础抽象
spring-rabbit是RabbitMQ的实现
- 用于发布和接收消息
RabbitTemplate
- 监听器容器,用于异步处理入栈消息
RabbitAdmin
自动声明队列,交换和绑定
简单模式
发送消息
在父pom中引入依赖
<!--AMQP依赖,包含RabbitMQ-->
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-amqp</artifactId>
</dependency>
更改配置
spring:
rabbitmq:
host: 192.168.32.129
port: 5672
virtual-host: /
username: admin
password: '000'
注入RabbitTemplate
并发送消息
@Autowired
RabbitTemplate rabbitTemplate;
@Test
public void send()
String queueName = "hello";
String message = "hello spring amqp";
rabbitTemplate.convertAndSend(queueName, message);
接收消息
消息一旦接收,就会从mq删除,RabbitMQ没有回溯功能
同样加配置
/**
* 接收消息
*/
@Component
public class SpringRabbitListener
//指定队列名称直接可以接收
@RabbitListener(queues = "hello")
public void listenHello(String message)
System.out.println("消费者接收到:\\t" + message);
工作模式
发送
@Test
public void send2() throws InterruptedException
String queueName = "hello";
String message = "hello spring amqp _";
for (int i = 1; i <= 50; i++)
rabbitTemplate.convertAndSend(queueName, message + i);
Thread.sleep(20);
接收
默认,消息预取机制,多个消费者会先将队列中的消息分配,然后才消费,这样不能按照消费者的能力分配。所以进行配置,以便消息按能力分配
spring:
rabbitmq:
listener:
simple:
prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息
接收消息
@RabbitListener(queues = "hello")
public void listenHello1(String message) throws InterruptedException
System.out.println("消费者1 接收到:\\t" + message + "\\t" + LocalTime.now());
Thread.sleep(20);
@RabbitListener(queues = "hello")
public void listenHello2(String message) throws InterruptedException
System.out.println("消费者2 接收到:\\t" + message + "\\t" + LocalTime.now());
Thread.sleep(200);
广播模式
生产者向交换机发送消息
@Test
public void send3() throws InterruptedException
String exchangeName = "fanout1";
String message = "hello spring amqp fanout";
rabbitTemplate.convertAndSend(exchangeName, "", message);
Thread.sleep(20);
消费者配置绑定关系
声明交换机队列并进行绑定
/**
* 广播模式
*/
@Configuration
public class FanoutConfig
/**
* 声明交换机
*/
@Bean
public FanoutExchange fanoutExchange()
return new FanoutExchange("fanout1");
/**
* 声明队列
*/
@Bean
public Queue fanoutQueue1()
return new Queue("fanout.queue1");
@Bean
public Queue fanoutQueue2()
return new Queue("fanout.queue2");
/**
* 绑定队列和交换机
*/
@Bean
public Binding bindingQueue1(FanoutExchange fanoutExchange, Queue fanoutQueue1)
return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
@Bean
public Binding bindingQueue2(FanoutExchange fanoutExchange, Queue fanoutQueue2)
return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
消费者接收消息
//广播模式
@RabbitListener(queues = "fanout.queue1")
public void listenHello1(String message) throws InterruptedException
System.out.println("消费者1 接收到:\\t" + message + "\\t" + LocalTime.now());
Thread.sleep(20);
@RabbitListener(queues = "fanout.queue2")
public void listenHello2(String message) throws InterruptedException
System.out.println("消费者2 接收到:\\t" + message + "\\t" + LocalTime.now());
Thread.sleep(200);
路由模式
接收消息
//路由模式,直接声明 交换机 队列 bindingkey
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue1"),
exchange = @Exchange(name = "direct1", type = ExchangeTypes.DIRECT),
key = "red", "blue"))
public void listenDirectQueue1(String message)
System.out.println("消费者1接收到:\\t" + message + "\\t" + LocalTime.now());
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "direct.queue2"),
exchange = @Exchange(name = "direct1", type = ExchangeTypes.DIRECT),
key = "red", "yellow"))
public void listenDirectQueue2(String message)
System.out.println("消费者2接收到:\\t" + message + "\\t" + LocalTime.now());
发送消息
写入bindingkey属性
/**
* 路由模式
*/
@Test
public void send4() throws InterruptedException
String exchangeName = "direct1";
String message = "hello spring amqp direct red";
rabbitTemplate.convertAndSend(exchangeName, "red", message);
主题模式
接收消息
//主题模式
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue1"),
exchange = @Exchange(name = "topic1", type = ExchangeTypes.TOPIC),
key = "china.#"
))
public void listenTopicQueue1(String message)
System.out.println("消费者1接收到:\\t" + message);
@RabbitListener(bindings = @QueueBinding(
value = @Queue(name = "topic.queue2"),
exchange = @Exchange(name = "topic1", type = ExchangeTypes.TOPIC),
key = "#.news"
))
public void listenTopicQueue2(String message)
System.out.println("消费者2接收到:\\t" + message);
发送消息
/**
* 主题模式
*/
@Test
public void send5()
String exchangeName = "topic1";
String message = "hello spring amqp topic";
rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
发送Object消息
我们可以直接发送object类型消息,spring默认使用jdk序列化方式,非常复杂而且容易注入,安全性不好。
所有使用Json方式
使用
- 在父工程pom中引入依赖
<!--jackson-->
<dependency>
<groupId>com.fasterxml.jackson.core</groupId>
<artifactId>jackson-databind</artifactId>
</dependency>
- 接 / 发 消息的启动类中加入bean
@Bean
public MessageConverter jsonMessageConverter()
return new Jackson2JsonMessageConverter();
以上是关于微服务Ⅲ Docker&SpringAMQP的主要内容,如果未能解决你的问题,请参考以下文章
天啦噜!看国外大神如何用Docker+Jenkins&CI/CD打造微服务架构?