Redis进阶学习09---缓存同步

Posted 大忽悠爱忽悠

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis进阶学习09---缓存同步相关的知识,希望对你有一定的参考价值。

Redis进阶学习09---缓存同步


Canal

认识Canal

Canal [kə’næl],译意为水道/管道/沟渠,canal是阿里巴巴旗下的一款开源项目,基于Java开发。基于数据库增量日志解析,提供增量数据订阅&消费。GitHub的地址:https://github.com/alibaba/canal

Canal是基于mysql的主从同步来实现的,MySQL主从同步的原理如下:

  • 1)MySQL master 将数据变更写入二进制日志( binary log),其中记录的数据叫做binary log events
  • 2)MySQL slave 将 master 的 binary log events拷贝到它的中继日志(relay log)
  • 3)MySQL slave 重放 relay log 中事件,将数据变更反映它自己的数据

而Canal就是把自己伪装成MySQL的一个slave节点,从而监听master的binary log变化。再把得到的变化信息通知给Canal的客户端,进而完成对其它数据库的同步。


安装和配置Canal

下面我们就开启mysql的主从同步机制,让Canal来模拟salve

开启MySQL主从

Canal是基于MySQL的主从同步功能,因此必须先开启MySQL的主从功能才可以。

这里以之前用Docker运行的mysql为例:

开启binlog

打开mysql容器挂载的日志文件,我的在/tmp/mysql/conf目录:

log-bin=/var/lib/mysql/mysql-bin
binlog-do-db=item

配置解读:

  • log-bin=/var/lib/mysql/mysql-bin:设置binary log文件的存放地址和文件名,叫做mysql-bin
  • binlog-do-db=heima:指定对哪个database记录binary log events,这里记录heima这个库

最终效果:

[mysqld]
#跳过域名解析
skip-name-resolve
#指定服务器级别的字符集
character_set_server=utf8
#指定数据存放目录
datadir=/var/lib/mysql
#MySQL服务的ID
server-id=1000
#mysql二进制文件
log-bin=/var/lib/mysql/mysql-bin
#需要复制的数据库
binlog-do-db=item

设置用户权限

接下来添加一个仅用于数据同步的账户,出于安全考虑,这里仅提供对item这个库的操作权限。

create user canal@'%' IDENTIFIED by 'canal';
GRANT SELECT, REPLICATION SLAVE, REPLICATION CLIENT,SUPER ON *.* TO 'canal'@'%' identified by 'canal';
FLUSH PRIVILEGES;

replication slave 的级别是global,所以不能只作用于某一数据库,而是全局

如果权限缩小为某一个数据库,那么会报错: Incorrect usage of DB GRANT and GLOBAL PRIVILEGES

重启mysql容器即可

docker restart mysql

测试设置是否成功:在mysql控制台,或者Navicat中,输入命令:

show master status;


安装Canal

创建网络

我们需要创建一个网络,将MySQL、Canal放到同一个Docker网络中:

docker network create item

让mysql加入这个网络:

docker network connect item mysql

查看当前网络细节:

docker network inspect item 

安装Canal

拉取docker镜像:

 docker pull canal/canal-server

然后运行命令创建Canal容器:

docker run -p 11111:11111 --name canal \\
-e canal.destinations=item \\
-e canal.instance.master.address=mysql:3306  \\
-e canal.instance.dbUsername=canal  \\
-e canal.instance.dbPassword=canal  \\
-e canal.instance.connectionCharset=UTF-8 \\
-e canal.instance.tsdb.enable=true \\
-e canal.instance.gtidon=false  \\
-e canal.instance.filter.regex=item\\\\..* \\
--network item \\
-d canal/canal-server

注意:mysql:3306,这里3306是容器中开放的端口,也就是容器中mysql启动时的端口,而不是映射到宿主机上的端口,这点必须注意,因为通过容器名直接访问,是容器与容器之间直接进行通信,不需要通过宿主机。

说明:

  • -p 11111:11111:这是canal的默认监听端口
  • -e canal.instance.master.address=mysql:3306:数据库地址和端口,如果不知道mysql容器名字,可以通过docker inspect 容器id来查看,因为这里是在同一个自定义网络下,因此可以用容器名字进行通信
  • -e canal.instance.dbUsername=canal:数据库用户名
  • -e canal.instance.dbPassword=canal :数据库密码
  • -e canal.instance.filter.regex=:要监听的表名称

表名称监听支持的语法:

mysql 数据解析关注的表,Perl正则表达式.
多个正则之间以逗号(,)分隔,转义符需要双斜杠(\\\\) 
常见例子:
1.  所有表:.*   or  .*\\\\..*
2.  canal schema下所有表: canal\\\\..*
3.  canal下的以canal打头的表:canal\\\\.canal.*
4.  canal schema下的一张表:canal.test1
5.  多个规则组合使用然后以逗号隔开:canal\\\\..*,mysql.test1,mysql.test2 

监听Canal

Canal提供了各种语言的客户端,当Canal监听到binlog变化时,会通知Canal的客户端。


我们可以利用Canal提供的Java客户端,监听Canal通知消息。当收到变化的消息时,完成对缓存的更新。

不过这里我们会使用GitHub上的第三方开源的canal-starter客户端。地址:https://github.com/NormanGyllenhaal/canal-client

与SpringBoot完美整合,自动装配,比官方客户端要简单好用很多。

将项目拉取下来,然后安装到本地仓库中


引入依赖:

  <dependency>
      <groupId>top.javatool</groupId>
      <artifactId>canal-spring-boot-starter</artifactId>
      <version>1.2.6-RELEASE</version>
  </dependency>

编写配置:

canal:
  destination: item # canal的集群名字,要与安装canal时设置的名称一致
  server: 192.168.150.101:11111 # canal服务地址

修改Item实体类

通过@Id、@Column、等注解完成Item与数据库表字段的映射:

@Data
@TableName("tb_item")
public class Item 
    @TableId(type = IdType.AUTO)
    @Id
    private Long id;//商品id
    @Column(name = "name")
    private String name;//商品名称
    private String title;//商品标题
    private Long price;//价格(分)
    private String image;//商品图片
    private String category;//分类名称
    private String brand;//品牌名称
    private String spec;//规格
    private Integer status;//商品状态 1-正常,2-下架
    private Date createTime;//创建时间
    private Date updateTime;//更新时间
    @TableField(exist = false)
    @Transient
    private Integer stock;
    @TableField(exist = false)
    @Transient
    private Integer sold;


编写监听器

通过实现EntryHandler<T>接口编写监听器,监听Canal消息。注意两点:

  • 实现类通过@CanalTable("tb_item")指定监听的表信息
  • EntryHandler的泛型是与表对应的实体类
@CanalTable("tb_item")
@Component
public class ItemHandler implements EntryHandler<Item> 

    @Autowired
    private RedisHandler redisHandler;
    @Autowired
    private Cache<Long, Item> itemCache;

    @Override
    public void insert(Item item) 
        // 写数据到JVM进程缓存
        itemCache.put(item.getId(), item);
        // 写数据到redis
        redisHandler.saveItem(item);
    

    @Override
    public void update(Item before, Item after) 
        // 写数据到JVM进程缓存
        itemCache.put(after.getId(), after);
        // 写数据到redis
        redisHandler.saveItem(after);
    

    @Override
    public void delete(Item item) 
        // 删除数据到JVM进程缓存
        itemCache.invalidate(item.getId());
        // 删除数据到redis
        redisHandler.deleteItemById(item.getId());
    

在这里对Redis的操作都封装到了RedisHandler这个对象中,是我们之前做缓存预热时编写的一个类,内容如下:

@Component
public class RedisHandler implements InitializingBean 

    @Autowired
    private StringRedisTemplate redisTemplate;

    @Autowired
    private IItemService itemService;
    @Autowired
    private IItemStockService stockService;

    private static final ObjectMapper MAPPER = new ObjectMapper();

    @Override
    public void afterPropertiesSet() throws Exception 
        // 初始化缓存
        // 1.查询商品信息
        List<Item> itemList = itemService.list();
        // 2.放入缓存
        for (Item item : itemList) 
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(item);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
        

        // 3.查询商品库存信息
        List<ItemStock> stockList = stockService.list();
        // 4.放入缓存
        for (ItemStock stock : stockList) 
            // 2.1.item序列化为JSON
            String json = MAPPER.writeValueAsString(stock);
            // 2.2.存入redis
            redisTemplate.opsForValue().set("item:stock:id:" + stock.getId(), json);
        
    

    public void saveItem(Item item) 
        try 
            String json = MAPPER.writeValueAsString(item);
            redisTemplate.opsForValue().set("item:id:" + item.getId(), json);
         catch (JsonProcessingException e) 
            throw new RuntimeException(e);
        
    

    public void deleteItemById(Long id) 
        redisTemplate.delete("item:id:" + id);
    


超详细canal入门,看这篇就够了


注意

如果canal出现了问题,通过进入canal容器内部,查看输出日志,来排查问题:

docker exec -it canal bash

查看日志:

进入容器内部canal-server/logs目录下,查看item目录下的,item.log日志

我在这里碰到一个问题是,无法识别mysql的主机地址:

原因: 一开始我运行canal容器的时候,标注的mysql地址为mysql:3307

我的mysql容器内端口为3306,宿主机映射端口为3307,我错误的将mysql宿主机映射端口写了上去,这是错误的,因为我们这里采用自定义网络进行通信,通过mysql容器名直接或容器内ip直接完成两个容器之间的通信,不需要经过宿主机,因此不能用宿主机映射端口

Docker重学系列之高级网络篇


多级缓存总结


对于不共享的本地缓存,通常采用的做法是,通过hash路由,让固定的uri去访问固定某台服务器,确保缓存命中,并且不至于缓存多分份


以上是关于Redis进阶学习09---缓存同步的主要内容,如果未能解决你的问题,请参考以下文章

Redis进阶学习总结(Docker搭建环境)

Redis进阶学习总结(Docker搭建环境)

Redis进阶学习02---Redis替代Session和Redis缓存

Redis进阶学习08--多级缓存

Redis进阶学习07--分布式缓存--下

redis学习小结一