微服务Ⅲ Docker&SpringAMQP

Posted \Qinsiyang

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了微服务Ⅲ Docker&SpringAMQP相关的知识,希望对你有一定的参考价值。

目录

Docker

初识Docker

项目部署时依赖关系复杂容易出现兼容问题,开发测试生产环境差异

  • 一并打包,任意迁移
  • 沙箱机制,互不干扰

Docker如何解决大型项目依赖复杂、兼容性问题?

开发中可以将应用、依赖、函数库、配置一起打包,行程可移植镜像;沙箱机制使应用相互隔离

Docker如何解决开发、测试、生产环境差异问题?

Docker镜像中包含完整运行环境(系统函数库)

Docker架构

  • 镜像:将应用程序及依赖、环境、配置打包;镜像只能读
  • 容器:运行镜像程序就要新建容器,将镜像中应用拷贝到容器中运行,相互隔离

SpringAMQP

spring-amqp是基础抽象
spring-rabbit是RabbitMQ的实现

  • 用于发布和接收消息RabbitTemplate
  • 监听器容器,用于异步处理入栈消息
  • RabbitAdmin自动声明队列,交换和绑定

简单模式

发送消息

在父pom中引入依赖

<!--AMQP依赖,包含RabbitMQ-->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-amqp</artifactId>
        </dependency>

更改配置

spring:
  rabbitmq:
    host: 192.168.32.129
    port: 5672
    virtual-host: /
    username: admin
    password: '000'

注入RabbitTemplate并发送消息

    @Autowired
    RabbitTemplate rabbitTemplate;

    @Test
    public void send() 
        String queueName = "hello";
        String message = "hello spring amqp";
        rabbitTemplate.convertAndSend(queueName, message);
    
接收消息

消息一旦接收,就会从mq删除,RabbitMQ没有回溯功能

同样加配置

/**
 * 接收消息
 */
@Component
public class SpringRabbitListener 

	//指定队列名称直接可以接收
    @RabbitListener(queues = "hello")
    public void listenHello(String message) 
        System.out.println("消费者接收到:\\t" + message);
    

工作模式

发送
@Test
public void send2() throws InterruptedException 
    String queueName = "hello";
    String message = "hello spring amqp _";
    for (int i = 1; i <= 50; i++) 
        rabbitTemplate.convertAndSend(queueName, message + i);
        Thread.sleep(20);
    

接收

默认,消息预取机制,多个消费者会先将队列中的消息分配,然后才消费,这样不能按照消费者的能力分配。所以进行配置,以便消息按能力分配

spring:
  rabbitmq:
    listener:
      simple:
        prefetch: 1 # 每次只能获取一条消息,处理完成才能获取下一个消息

接收消息

@RabbitListener(queues = "hello")
public void listenHello1(String message) throws InterruptedException 
   System.out.println("消费者1 接收到:\\t" + message + "\\t" + LocalTime.now());
   Thread.sleep(20);


@RabbitListener(queues = "hello")
public void listenHello2(String message) throws InterruptedException 
   System.out.println("消费者2 接收到:\\t" + message + "\\t" + LocalTime.now());
   Thread.sleep(200);

广播模式

生产者向交换机发送消息
    @Test
    public void send3() throws InterruptedException 
        String exchangeName = "fanout1";
        String message = "hello spring amqp fanout";
        rabbitTemplate.convertAndSend(exchangeName, "", message);
        Thread.sleep(20);
    
消费者配置绑定关系

声明交换机队列并进行绑定

/**
 * 广播模式
 */
@Configuration
public class FanoutConfig 

    /**
     * 声明交换机
     */
    @Bean
    public FanoutExchange fanoutExchange() 
        return new FanoutExchange("fanout1");
    

    /**
     * 声明队列
     */
    @Bean
    public Queue fanoutQueue1() 
        return new Queue("fanout.queue1");
    

    @Bean
    public Queue fanoutQueue2() 
        return new Queue("fanout.queue2");
    

    /**
     * 绑定队列和交换机
     */
    @Bean
    public Binding bindingQueue1(FanoutExchange fanoutExchange, Queue fanoutQueue1) 
        return BindingBuilder.bind(fanoutQueue1).to(fanoutExchange);
    

    @Bean
    public Binding bindingQueue2(FanoutExchange fanoutExchange, Queue fanoutQueue2) 
        return BindingBuilder.bind(fanoutQueue2).to(fanoutExchange);
    

消费者接收消息
//广播模式
    @RabbitListener(queues = "fanout.queue1")
    public void listenHello1(String message) throws InterruptedException 
        System.out.println("消费者1 接收到:\\t" + message + "\\t" + LocalTime.now());
        Thread.sleep(20);
    

    @RabbitListener(queues = "fanout.queue2")
    public void listenHello2(String message) throws InterruptedException 
        System.out.println("消费者2 接收到:\\t" + message + "\\t" + LocalTime.now());
        Thread.sleep(200);
    

路由模式

接收消息
//路由模式,直接声明 交换机 队列 bindingkey
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue1"),
            exchange = @Exchange(name = "direct1", type = ExchangeTypes.DIRECT),
            key = "red", "blue"))
    public void listenDirectQueue1(String message) 
        System.out.println("消费者1接收到:\\t" + message + "\\t" + LocalTime.now());
    

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "direct.queue2"),
            exchange = @Exchange(name = "direct1", type = ExchangeTypes.DIRECT),
            key = "red", "yellow"))
    public void listenDirectQueue2(String message) 
        System.out.println("消费者2接收到:\\t" + message + "\\t" + LocalTime.now());
    
发送消息

写入bindingkey属性

/**
     * 路由模式
     */
    @Test
    public void send4() throws InterruptedException 
        String exchangeName = "direct1";
        String message = "hello spring amqp direct red";
        rabbitTemplate.convertAndSend(exchangeName, "red", message);
    

主题模式

接收消息
//主题模式
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue1"),
            exchange = @Exchange(name = "topic1", type = ExchangeTypes.TOPIC),
            key = "china.#"
    ))
    public void listenTopicQueue1(String message) 
        System.out.println("消费者1接收到:\\t" + message);
    

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(name = "topic.queue2"),
            exchange = @Exchange(name = "topic1", type = ExchangeTypes.TOPIC),
            key = "#.news"
    ))
    public void listenTopicQueue2(String message) 
        System.out.println("消费者2接收到:\\t" + message);
    
发送消息
/**
     * 主题模式
     */
    @Test
    public void send5() 
        String exchangeName = "topic1";
        String message = "hello spring amqp topic";
        rabbitTemplate.convertAndSend(exchangeName, "china.news", message);
    

发送Object消息

我们可以直接发送object类型消息,spring默认使用jdk序列化方式,非常复杂而且容易注入,安全性不好。

所有使用Json方式

使用

  1. 在父工程pom中引入依赖
<!--jackson-->
        <dependency>
            <groupId>com.fasterxml.jackson.core</groupId>
            <artifactId>jackson-databind</artifactId>
        </dependency>
  1. 接 / 发 消息的启动类中加入bean
@Bean
public MessageConverter jsonMessageConverter() 
    return new Jackson2JsonMessageConverter();

以上是关于微服务Ⅲ Docker&SpringAMQP的主要内容,如果未能解决你的问题,请参考以下文章

黑马四Docker服务编排 & Docker私有仓库

docker build 在nestjs微服务构建上失败

天啦噜!看国外大神如何用Docker+Jenkins&CI/CD打造微服务架构?

Docker & ASP.NET Core 2.0 微服务跨平台实践

docker 基本操作Ⅲ

微服务架构 & service mesh