040 RabbitMq及数据同步02

Posted luckyplj

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了040 RabbitMq及数据同步02相关的知识,希望对你有一定的参考价值。

1.Spring AMQP

(1)简介

Spring有很多不同的项目,其中就有对AMQP的支持:

技术图片

 

Spring AMQP的页面:http://spring.io/projects/spring-amqp

技术图片

 

注意:Spring-amqp是对AMQP协议的抽象实现,而spring-rabbit 是对协议的具体实现,也是目前的唯一实现。底层使用的就是RabbitMQ。

(2)依赖和配置

添加AMQP的启动器:

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

application.yml中添加RabbitMQ地址:

spring:
  rabbitmq:
    host: 127.0.0.1
    username: guest
    password: guest
    virtual-host: /

(3)监听者

在SpringAmqp中,对消息的消费者进行了封装和抽象,一个普通的JavaBean中的普通方法,只要通过简单的注解,就可以成为一个消费者。

@Component
public class Listener 

    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "spring.test.queue", durable = "true"),
            exchange = @Exchange(
                    value = "spring.test.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC
            ),
            key = "#.#"))
    public void listen(String msg)
        System.out.println("接收到消息:" + msg);
    
  • @Componet:类上的注解,注册到Spring容器

  • @RabbitListener:方法上的注解,声明这个方法是一个消费者方法,需要指定下面的属性:

    • bindings:指定绑定关系,可以有多个。值是@QueueBinding的数组。@QueueBinding包含下面属性:

      • value:这个消费者关联的队列。值是@Queue,代表一个队列

      • exchange:队列所绑定的交换机,值是@Exchange类型

      • key:队列和交换机绑定的RoutingKey

类似listen这样的方法在一个类中可以写多个,就代表多个消费者。

(4)AmqpTemplate

Spring最擅长的事情就是封装,把他人的框架进行封装和整合。

Spring为AMQP提供了统一的消息处理模板:AmqpTemplate,非常方便的发送消息,其发送方法:

技术图片

 

红框圈起来的是比较常用的3个方法,分别是:

  • 指定交换机、RoutingKey和消息体

  • 指定消息

  • 指定RoutingKey和消息,会向默认的交换机发送消息

(5)测试代码

@RunWith(SpringRunner.class)
@SpringBootTest(classes = Application.class)
public class MqDemo 

    @Autowired
    private AmqpTemplate amqpTemplate;

    @Test
    public void testSend() throws InterruptedException 
        String msg = "hello, Spring boot amqp";
        this.amqpTemplate.convertAndSend("spring.test.exchange","a.b", msg);
        // 等待10秒后再结束
        Thread.sleep(10000);
    

运行后查看日志:

技术图片

2.搜索服务、商品静态页的数据同步

(1)思路分析

<1>发送方:商品微服务

 

  • 什么时候发?

    当商品服务对商品进行写操作:增、删、改的时候,需要发送一条消息,通知其它服务。

  • 发送什么内容?

    对商品的增删改时其它服务可能需要新的商品数据,但是如果消息内容中包含全部商品信息,数据量太大,而且并不是每个服务都需要全部的信息。因此我们只发送商品id,其它服务可以根据id查询自己需要的信息。

<2>接收方:搜索微服务、静态页微服务

接收消息后如何处理?

  • 搜索微服务:

    • 增/改:添加新的数据到索引库

    • 删:删除索引库数据

  • 静态页微服务:

    • 增/改:创建新的静态页

    • 删:删除原来的静态页

(2)商品服务发送消息

 我们先在商品微服务leyou-item-service中实现发送消息

<1>引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<2>配置文件

我们在application.yml中添加一些有关RabbitMQ的配置:

spring:
  rabbitmq:
      host: 127.0.0.1
      username: guest
      password: guest
      virtual-host: /
      template:
        exchange: leyou.item.exchange
      publisher-confirms: true
  • template:有关AmqpTemplate的配置

    • exchange:缺省的交换机名称,此处配置后,发送消息如果不指定交换机就会使用这个

  • publisher-confirms:生产者确认机制,确保消息会正确发送,如果发送失败会有错误回执,从而触发重试

<3>改造GoodsService

在GoodsService中封装一个发送消息到mq的方法:(需要注入AmqpTemplate模板)


@Autowired
private AmqpTemplate amqpTemplate;
/**
     * 利用rabbitmq发送消息
     * @param id
     * @param type
     */
    private void sendMessage(Long id, String type)
        // 发送消息
        try 
            this.amqpTemplate.convertAndSend("item." + type, id);
         catch (Exception e) 
            //logger.error("商品消息发送异常,商品id:", type, id, e);
        
    

这里没有指定交换机,因此默认发送到了配置中的:leyou.item.exchange

注意:这里要把所有异常都try起来,不能让消息的发送影响到正常的业务逻辑

然后在新增的时候调用:

/**
     * 新增商品
     * @param spuBo
     */
    @Override
    @Transactional  //添加事务
    public void saveGoods(SpuBo spuBo) 
        // 01 新增spu
        // 设置默认字段
        spuBo.setId(null);
        spuBo.setSaleable(true);  //设置是否可售
        spuBo.setValid(true);
        spuBo.setCreateTime(new Date());  //设置创建时间
        spuBo.setLastUpdateTime(spuBo.getCreateTime()); //设置更新时间
        this.spuMapper.insertSelective(spuBo);

        // 02 新增spuDetail
        SpuDetail spuDetail = spuBo.getSpuDetail();
        spuDetail.setSpuId(spuBo.getId());
        this.spuDetailMapper.insertSelective(spuDetail);

        saveSkuAndStock(spuBo);

        //发送rabbitmq消息
        sendMessage(spuBo.getId(),"insert");
    

修改的时候调用:

/**
     * 更新商品
     * @param spu
     */
    @Override
    @Transactional
    public void updateGoods(SpuBo spu) 
        // 查询以前sku
        Sku sku=new Sku();
        sku.setSpuId(spu.getId());
        List<Sku> skus = this.skuMapper.select(sku);

        // 如果以前存在,则删除
        if(!CollectionUtils.isEmpty(skus)) 
            List<Long> ids = skus.stream().map(s -> s.getId()).collect(Collectors.toList());
            // 删除以前库存
            Example example = new Example(Stock.class);
            example.createCriteria().andIn("skuId", ids);
            this.stockMapper.deleteByExample(example);

            // 删除以前的sku
            Sku record = new Sku();
            record.setSpuId(spu.getId());
            this.skuMapper.delete(record);

        
        // 新增sku和库存
        saveSkuAndStock(spu);

        // 更新spu
        spu.setLastUpdateTime(new Date());
        spu.setCreateTime(null);   //不能更新的内容,设置为null
        spu.setValid(null);
        spu.setSaleable(null);
        this.spuMapper.updateByPrimaryKeySelective(spu);

        // 更新spu详情
        this.spuDetailMapper.updateByPrimaryKeySelective(spu.getSpuDetail());

        //发送rabbitmq消息
        sendMessage(spu.getId(),"insert");

    

(3)搜索服务接收消息

搜索服务接收到消息后要做的事情:

  • 增:添加新的数据到索引库

  • 删:删除索引库数据

  • 改:修改索引库数据

因为索引库的新增和修改方法是合二为一的,因此我们可以将这两类消息一同处理,删除另外处理。

技术图片

<1>引入依赖

<dependency>
    <groupId>org.springframework.boot</groupId>
    <artifactId>spring-boot-starter-amqp</artifactId>
</dependency>

<2>添加配置

spring:
  rabbitmq:
      host: 127.0.0.1
      username: guest
      password: guest
      virtual-host: /

这里只是接收消息而不发送,所以不用配置template相关内容。

<3>编写监听器

package lucky.leyou.listener;

import lucky.leyou.service.SearchService;
import org.springframework.amqp.core.ExchangeTypes;
import org.springframework.amqp.rabbit.annotation.Exchange;
import org.springframework.amqp.rabbit.annotation.Queue;
import org.springframework.amqp.rabbit.annotation.QueueBinding;
import org.springframework.amqp.rabbit.annotation.RabbitListener;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

@Component
public class GoodsListener 

    @Autowired
    private SearchService searchService;

    /**
     * 处理insert和update的消息
     *
     * @param id
     * @throws Exception
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "leyou.create.index.queue", durable = "true"),
            exchange = @Exchange(
                    value = "leyou.item.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC),
            key = "item.insert", "item.update"))
    public void listenCreate(Long id) throws Exception 
        if (id == null) 
            return;
        
        // 创建或更新索引
        this.searchService.createIndex(id);
    

    /**
     * 处理delete的消息
     *
     * @param id
     */
    @RabbitListener(bindings = @QueueBinding(
            value = @Queue(value = "leyou.delete.index.queue", durable = "true"),
            exchange = @Exchange(
                    value = "leyou.item.exchange",
                    ignoreDeclarationExceptions = "true",
                    type = ExchangeTypes.TOPIC),
            key = "item.delete"))
    public void listenDelete(Long id) 
        if (id == null) 
            return;
        
        // 删除索引
        this.searchService.deleteIndex(id);
    

<4>编写创建和删除索引方法

这里因为要创建和删除索引,我们需要在SearchService中拓展两个方法,创建和删除索引:

public void createIndex(Long id) throws IOException 

    Spu spu = this.goodsClient.querySpuById(id);
    // 构建商品
    Goods goods = this.buildGoods(spu);

    // 保存数据到索引库
    this.goodsRepository.save(goods);


public void deleteIndex(Long id) 
    this.goodsRepository.deleteById(id);

创建索引的方法可以从之前导入数据的测试类中拷贝和改造。

 

以上是关于040 RabbitMq及数据同步02的主要内容,如果未能解决你的问题,请参考以下文章

学习笔记TF040:多GPU并行

高可用RabbitMQ安装及使用

实时同步服务知识梳理

【rabbitMQ】rabbitMQ集群节点重新加入集群相关操作

RabbitMQ 高可用集群搭建及电商平台使用经验总结

RabbitMQ 高可用集群搭建及电商平台使用经验总结