Spring Boot实现Redis同数据源动态切换DB | Spring Cloud 31

Posted gmHappy

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot实现Redis同数据源动态切换DB | Spring Cloud 31相关的知识,希望对你有一定的参考价值。

一、前言

在某些业务场景下,需要多访问同一Redis数据源下的不同DB

Redis中默认提供了16个数据库(序号0-15),默认Redis使用的是db 0

此章节基于spring-boot-starter-data-redis模块,实现了Redis同数据源动态切换DB,具体功能如下:

  • 突破一个项目只能连接Redis一个DB的限制
  • 提供多种操作切换Redis DB的方式(@Autowired方式 和RedisSelectSupport方式)
  • 提供完善的代码使用示例

二、项目结构

  • RedisSelect:自定义注解,用于切换同一redis数据源下的不同DB Index

  • RedisSelectSupport:自定义切换DB操作,用于线程间传递DB Index

  • RedisIndexSelectAspect:自定义AOP切面,对RedisSelect注解方法进行拦截,调用RedisSelectSupport,利用RedisSelectSupport设置DB Index

  • RedisSelectTemplate:对原RedisTemplate进行封装,利用RedisSelectSupport获取DB Index值,在对Redis操作前实现DB的实际切换

  • RedisIndexSelectConfig:实现RedisSelectTemplate定义,修改Lettuce连接池属性支持动态切换DB

下文对各类的功能描述不进行重复介绍,请详细了解本章节。

本示例源码位于项目的redis/redis-multi-index模块下。

三、动态切换Redis DB实现

3.1 所需依赖

redis/redis-multi-index/pom.xml

<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
         xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
    <parent>
        <artifactId>redis</artifactId>
        <groupId>com.gm</groupId>
        <version>0.0.1-SNAPSHOT</version>
    </parent>
    <modelVersion>4.0.0</modelVersion>

    <artifactId>redis-multi-index</artifactId>

    <properties>
        <maven.compiler.source>17</maven.compiler.source>
        <maven.compiler.target>17</maven.compiler.target>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
    </properties>

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-data-redis</artifactId>
        </dependency>
        <dependency>
            <groupId>com.alibaba</groupId>
            <artifactId>fastjson</artifactId>
            <version>1.2.83</version>
        </dependency>
        <dependency>
            <groupId>org.aspectj</groupId>
            <artifactId>aspectjweaver</artifactId>
        </dependency>
    </dependencies>

</project>

3.2 配置文件

src/main/resources/application.yml

server:
  port: 6008

spring:
  application:
    name: @artifactId@
  redis:
    database: 1
    host: 
    password: 
    port: 6379
    timeout: 60000
    lettuce:
      pool:
        max-active: -1
        max-idle: -1
        max-wait: -1
        min-idle: -1

logging:
  level:
    org:
      springframework:
        boot:
          autoconfigure:
            logging: info

3.3 自定义注解类@RedisSelect

com/gm/multi/redis/config/select/annotation/RedisSelect.java

import java.lang.annotation.*;

/**
 * 注解,用于切换同一redis数据源下的不同db index
 */
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisSelect 
    /**
     * redis库   0 - 15  库
     *
     * @return
     */
    int index() default 0;

3.4 自定义切换支持类RedisSelectSupport

com/gm/multi/redis/config/select/RedisSelectSupport.java

/**
 * Redis 切换DB操作
 */
public class RedisSelectSupport 

    /**
     * 定义一个静态变量,用于线程间传递值
     */
    private static final ThreadLocal<Integer> DB_SELECT_CONTEXT = new ThreadLocal<>();

    public static void selectIndex(int db) 
        DB_SELECT_CONTEXT.set(db);
    

    public static Integer getSelectIndex() 
        return DB_SELECT_CONTEXT.get();
    

3.5 自定义切面类RedisIndexSelectAspect

com/gm/multi/redis/config/select/aspect/RedisIndexSelectAspect.java

import com.gm.multi.redis.config.select.RedisSelectSupport;
import com.gm.multi.redis.config.select.annotation.RedisSelect;
import com.gm.multi.redis.config.select.RedisSelectTemplate;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;

@Slf4j
@Aspect
@Component
public class RedisIndexSelectAspect 

    @Value("$spring.redis.database:0")
    private int defaultIndex;

    /**
     * 创建RedisSelect对应的切面,来对标有注解的方法拦截
     *
     * @param point
     * @return
     * @throws Throwable
     */
    @Around("@annotation(com.gm.multi.redis.config.select.annotation.RedisSelect)")
    @ConditionalOnBean(RedisSelectTemplate.class)
    public Object configRedis(ProceedingJoinPoint point) throws Throwable 
        int index = defaultIndex;
        try 
            MethodSignature signature = (MethodSignature) point.getSignature();
            Method method = signature.getMethod();
            RedisSelect config = method.getAnnotation(RedisSelect.class);
            if (config != null) 
                index = config.index();
            
            RedisSelectSupport.selectIndex(index);
            return point.proceed();
         finally 
            RedisSelectSupport.selectIndex(defaultIndex);
            log.debug("redis index reset  to ", index, defaultIndex);
        
    

3.6 封装支持切换DB的RedisSelectTemplate

com/gm/multi/redis/config/select/RedisSelectTemplate.java:.

import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;

public class RedisSelectTemplate<K, V> extends RedisTemplate<K, V> 

    @Override
    protected RedisConnection createRedisConnectionProxy(RedisConnection pm) 
        return super.createRedisConnectionProxy(pm);
    

    /**
     * 在连接Redis之前做一些配置
     *
     * @param connection
     * @param existingConnection
     * @return
     */
    @Override
    protected RedisConnection preProcessConnection(RedisConnection connection, boolean existingConnection) 
        Integer index = RedisSelectSupport.getSelectIndex();
        if (index != null) 
            //切换 redis db 到 其他的库
            connection.select(index);
        
        return super.preProcessConnection(connection, existingConnection);
    

3.7 自定义配置类RedisIndexSelectConfig

com/gm/multi/redis/config/select/index/RedisIndexSelectConfig.java

import com.gm.multi.redis.config.select.RedisSelectTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;

@Slf4j
@Configuration(proxyBeanMethods = false)
public class RedisIndexSelectConfig 

    @Autowired(required = false)
    private LettuceConnectionFactory factory;

    @Bean
    @ConditionalOnMissingBean
    public RedisSelectTemplate<String, Object> redisIndexSelectTemplate() 

        /**
         * 使用默认注入的RedisConnectionFactory factory时,切换db时出现以下异常:
         *
         * java.lang.UnsupportedOperationException:Selecting a new database not supported due to shared connection.
         * Use separate ConnectionFactorys to work with multiple databases
         * 从默认RedisConnectionFactory factory注入,改为LettuceConnectionFactory factory,
         * 并通过factory.setShareNativeConnection(false)关闭共享链接
         */
        RedisSelectTemplate<String, Object> redisTemplate = new RedisSelectTemplate<>();
        redisTemplate.setKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashKeySerializer(new StringRedisSerializer());
        redisTemplate.setHashValueSerializer(new GenericToStringSerializer(Object.class));
        redisTemplate.setValueSerializer(new StringRedisSerializer());

        // 关闭共享链接
        factory.setShareNativeConnection(false);
        redisTemplate.setConnectionFactory(factory);
        redisTemplate.afterPropertiesSet();
        log.info("实例化 SelectableRedisTemplate 对象完成");
        return redisTemplate;
    

LettuceConnectionFactory默认创建的LettuceConnection共享同一个线程安全的本地连接处理非阻塞和非事物的操作。
shareNativeConnection 设置为 false可以让LettuceConnectionFactory每次创建一个专用连接。

四、测试业务搭建

4.1 多种操作切换Redis DB的方式

通过以下方式获取支持Redis同数据源可切换DB的操作类RedisSelectTemplate

@Autowired
private RedisSelectTemplate<String, Object> redisTemplate;

4.1.1 注解@RedisSelect方式

com/gm/multi/redis/controller/RedisSelectIndexontroller.java

    @RequestMapping("/one")
    @RedisSelect(index = 1)         //选择db1库
    public String selectOne() 
        redisTemplate.opsForValue().set("one", "one_" + System.currentTimeMillis());
        String one = (String) redisTemplate.opsForValue().get("one");
        return one;
    

    @RequestMapping("/two")
    @RedisSelect(index = 2)         //选择db2库
    public String selectTwo() 
        redisTemplate.opsForValue().set("two", "two_" + System.currentTimeMillis());
        String two = (String) redisTemplate.opsForValue().get("two");
        return two;
    

4.1.2 设置RedisSelectSupport方式

com/gm/multi/redis/controller/RedisSelectIndexontroller.java

    /**
     * 同一个方法中切换不同的redis库
     *
     * @return
     */
    @RequestMapping("/three")
    @RedisSelect(index = 2)         //选择db2库
    public String selectThree() 
        redisTemplate.opsForValue().set("two", "two_" + System.currentTimeMillis());
        String two = (String) redisTemplate.opsForValue().get("two");
        log.info(two);
        RedisSelectSupport.selectIndex(3);//此处切换到db3库
        redisTemplate.opsForValue().set("three", "three_" + System.currentTimeMillis());
        String three = (String) redisTemplate.opsForValue().get("three");
        log.info(three);
        return three;
    

4.2 多线程测试

com/gm/multi/redis/controller/RedisSelectIndexontroller.java

    @Autowired
    RedisMultiIndexService redisMultiIndexService;

    @RequestMapping("/testMultiIndex")
    public String testMultiIndex() 
        Thread thread[] = new Thread[500];
        AtomicBoolean result = new AtomicBoolean(true);
        for (int i = 0; i < thread.length; i++) 
            int finalI = i;
            thread[i] = new Thread(() -> 
                try 
                    redisMultiIndexService.testMultiIndex("Thread-" + finalI);
                 catch (Exception e) 
                    e.printStackTrace();
                    result.set(false);
                
            );
            thread[i].setName("Thread-" + i);

        
        for (int i = 0; i < thread.length; i++) 
            thread[i].start();
        
        return "";
    

com/gm/multi/redis/service/RedisMultiIndexService.java

public interface RedisMultiIndexService 
    void testMultiIndex(String suffix);

com/gm/multi/redis/service/impl/RedisMultiIndexServiceImpl.java

import com.gm.multi.redis.config.select.RedisSelectSupport;
import com.gm.multi.redis.config.select.RedisSelectTemplate;
import com.gm.multi.redis.service.RedisMultiIndexService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;

@Slf4j
@Service
public class RedisMultiIndexServiceImpl implements RedisMultiIndexService 

    @Autowired
    private RedisSelectTemplate<String, Object> redisSelectTemplate;

    public void testMultiIndex(String suffix) 
        String defaultkv = "default-" + suffix + "-index";
        String twokv = "two-" + suffix + "-index"

最近公司项目中 有需要用ElasticSearch (后续简称ES) 集成 SQL 查询功能,并可以按照请求参数动态切换目标数据源,同事找到我帮忙实现该功能,以前没做过,只好赶鸭子上架,
网上很多资料不全,瞎琢磨半天终于完成,记录了一些实现过程中踩过的坑,便于大家借鉴。

我们测试环境部署的是 ElasticSearch6.8.2 ,对应需要使用的jar需要是同版本的x-pack-sql-jdbc.jar 否则会报版本不一致错误.
不过该功能的开通需要铂金会员或者自己破解,具体的破解方案可以看看其他文章。以下介绍代码的具体实现.

切换数据源部分有参考下方链接代码,
https://blog.csdn.net/hekf2010/article/details/81155778

1. application.properties配置

server.port=6666
#主数据源
spring.datasource.url=jdbc:es://http://10.0.75.20:9200/
#es 从数据源 es1,es2
slave.datasource.names=es1,es2 
#es1
slave.datasource.es1.url=jdbc:es://http://10.0.75.21:9200/
#es2
slave.datasource.es2.url=jdbc:es://http://10.0.75.22:9200/
#mapper.xml文件
mybatis.mapper-locations=classpath:mapper/*.xml 
#实体类包
mybatis.type-aliases-package=com.kunlun.es.vo  

2. 注册动态数据源.
PS:这个地方一开始以为要添加ES db驱动的,后面查看源码之后发现,这货压根就不需要添加EsDriver

import org.apache.log4j.Logger;
import org.elasticsearch.xpack.sql.jdbc.EsDataSource;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;

import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;



/**
 * @Author zhaozhiguo
 * @Date 2020-11-04
 * @Description 注册动态数据源
 */
public class DynamicDataSourceRegister implements ImportBeanDefinitionRegistrar, EnvironmentAware {

    private Logger logger = Logger.getLogger(DynamicDataSourceRegister.class);

    /***默认数据源***/
    private DataSource deftDataSource;
    /***自定义数据源***/
    private Map<String, DataSource> slaveDataSources = new ConcurrentHashMap<>();

    @Override
    public void setEnvironment(Environment environment) {
        initDefaultDataSource(environment);
        initslaveDataSources(environment);
    }

    private void initDefaultDataSource(Environment env) {
        // 读取主数据源
        Properties properties = new Properties();
        EsDataSource esDataSource = new EsDataSource();
        esDataSource.setUrl( env.getProperty("spring.datasource.url"));
        esDataSource.setProperties(properties);
        deftDataSource = esDataSource;
    }


    private void initslaveDataSources(Environment env) {
        // 读取配置文件获取更多数据源
        String dsPrefixs = env.getProperty("slave.datasource.names");
        for (String dsPrefix : dsPrefixs.split(",")) {
            // 多个数据源
            Properties properties = new Properties();
            EsDataSource esDataSource = new EsDataSource();
            esDataSource.setUrl(env.getProperty("slave.datasource." + dsPrefix + ".url"));
            esDataSource.setProperties(properties);
            slaveDataSources.put(dsPrefix, esDataSource);
        }
    }

    @Override
    public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
        Map<Object, Object> targetDataSources = new HashMap<Object, Object>();
        //添加默认数据源
        targetDataSources.put("dataSource", this.deftDataSource);
        DynamicDataSourceContextHolder.dataSourceIds.add("dataSource");
        //添加其他数据源
        targetDataSources.putAll(slaveDataSources);
        for (String key : slaveDataSources.keySet()) {
            DynamicDataSourceContextHolder.dataSourceIds.add(key);
        }

        //创建DynamicDataSource
        GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
        beanDefinition.setBeanClass(DynamicDataSource.class);
        beanDefinition.setSynthetic(true);
        MutablePropertyValues mpv = beanDefinition.getPropertyValues();
        mpv.addPropertyValue("defaultTargetDataSource", deftDataSource);
        mpv.addPropertyValue("targetDataSources", targetDataSources);
        //注册 - BeanDefinitionRegistry
        beanDefinitionRegistry.registerBeanDefinition("dataSource", beanDefinition);

        logger.info("Dynamic DataSource Registry");
    }

3. 自定义注解,用于拦截 mapper 执行sql 时切换数据源

import java.lang.annotation.*;

/**
 * @Author zhaozhiguo
 * @Date 2020-11-04
 * @Description 需要切换数据源注解
 */

@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TargetDataSource {

}

4. 请求参数

/**
 * @Author zhaozg
 * @Date 2020-11-04
 * @Description SelectParam 查询参数
 */
public class SelectParam {

    /**需要执行的SQL*/
    private String sql;

    /**执行SQL的数据源名称,需要和properties slave.datasource.names 匹配*/
    private String dcName;

    public String getSql() {
        return sql;
    }

    public void setSql(String sql) {
        this.sql = sql;
    }

    public String getDcName() {
        return dcName;
    }

    public void setDcName(String dcName) {
        this.dcName = dcName;
    }
}

5. AOP 监听动态切换数据源

import com.kunlun.es.vo.SelectParam;
import org.apache.log4j.Logger;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;

/**
 * @Author zhaozhiguo
 * @Date 2020-11-04
 * @Description 动态数据源通知
 */

@Aspect
@Order(-1)
@Component
public class DynamicDattaSourceAspect {

    private Logger logger = Logger.getLogger(DynamicDattaSourceAspect.class);

    //改变数据源
    @Before("@annotation(targetDataSource)")
    public void changeDataSource(JoinPoint joinPoint, TargetDataSource targetDataSource) {
        Object[] str= joinPoint.getArgs();
        SelectParam selectParams = (SelectParam) str[0];
        if (!DynamicDataSourceContextHolder.isContainsDataSource(selectParams.getDcName())) {
            logger.error("数据源 " + selectParams.getDcName() + " 不存在使用默认的数据源 -> " + joinPoint.getSignature());
        } else {
            logger.debug("使用数据源:" + selectParams.getDcName());
            DynamicDataSourceContextHolder.setDataSourceType(selectParams.getDcName());
        }
    }

    @After("@annotation(targetDataSource)")
    public void clearDataSource(JoinPoint joinPoint, TargetDataSource targetDataSource) {
        Object[] str= joinPoint.getArgs();
        SelectParam selectParams = (SelectParam) str[0];
        logger.debug("清除数据源 " + selectParams.getDcName()+ " !");
        DynamicDataSourceContextHolder.clearDataSourceType();
    }
}

6. Mapper下方法 添加 TargetDataSource 注解

import com.kunlun.es.config.TargetDataSource;
import com.kunlun.es.vo.SelectParam;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;

import java.util.List;
import java.util.Map;

/**
 * @Author zhaozhiguo
 * @Date 2020-11-04
 * @Description 动态数据源通知
 */

@Mapper
public interface SelectObjMapper {

    @TargetDataSource
    @Select("${selectParam.sql}")
    List<Map> selectObj(@Param("selectParam") SelectParam selectParam);
}

7. 启动类,需要添加@Import(DynamicDataSourceRegister.class)

import com.kunlun.es.config.DynamicDataSourceRegister;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;

/**
 * @Author zhaozhiguo
 * @Date 2020-11-03
 * @Description 启动类
 */
@SpringBootApplication
@Import(DynamicDataSourceRegister.class)
public class EsSelectApplication {

    public static void main(String[] args) {
        SpringApplication.run(EsSelectApplication.class, args);
    }
}

8. 查询 接口暴露

import com.kunlun.es.service.SelectObjService;
import com.kunlun.es.vo.SelectParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;

import java.util.List;
import java.util.Map;

/**
 * @Author zhaozhiguo
 * @Date 2020-11-04
 * @Description 查询接口
 */
@RestController
public class SelectObjController {

    @Autowired
    private SelectObjService selectObjService;


    @PostMapping("/selectObj")
    public List<Map> selectObj(@RequestBody SelectParam selectParam) {
        return selectObjService.selectObj(selectParam);
    }
}

9. 调用接口,大工告成!

源码就不上传了,整体实现思路还是比较清楚的,jar包是淘宝花了0.56 块大洋代下的 (此处吐槽CSDN 这个包要46积分)

以上是关于Spring Boot实现Redis同数据源动态切换DB | Spring Cloud 31的主要内容,如果未能解决你的问题,请参考以下文章

聊聊spring-boot-starter-data-redis的配置变更

Spring Boot 如何热加载jar实现动态插件?

Spring Boot集成Redis实现缓存

Spring Boot集成Redis实现缓存

基于Spring Boot+Security+Redis权限管理系统,权限控制采用RBAC

Spring Boot如何整合Redis