Spring Boot实现Redis同数据源动态切换DB | Spring Cloud 31
Posted gmHappy
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Spring Boot实现Redis同数据源动态切换DB | Spring Cloud 31相关的知识,希望对你有一定的参考价值。
一、前言
在某些业务场景下,需要多访问同一Redis
数据源下的不同DB
。
在
Redis
中默认提供了16
个数据库(序号0-15
),默认Redis
使用的是db 0
。
此章节基于spring-boot-starter-data-redis
模块,实现了Redis
同数据源动态切换DB
,具体功能如下:
- 突破一个项目只能连接
Redis
一个DB
的限制 - 提供多种操作切换
Redis DB
的方式(@Autowired
方式 和RedisSelectSupport
方式) - 提供完善的代码使用示例
二、项目结构
-
RedisSelect
:自定义注解,用于切换同一redis
数据源下的不同DB Index
-
RedisSelectSupport
:自定义切换DB
操作,用于线程间传递DB Index
值 -
RedisIndexSelectAspect
:自定义AOP
切面,对RedisSelect
注解方法进行拦截,调用RedisSelectSupport
,利用RedisSelectSupport
设置DB Index
值 -
RedisSelectTemplate
:对原RedisTemplate
进行封装,利用RedisSelectSupport
获取DB Index
值,在对Redis
操作前实现DB
的实际切换 -
RedisIndexSelectConfig
:实现RedisSelectTemplate
定义,修改Lettuce
连接池属性支持动态切换DB
下文对各类的功能描述不进行重复介绍,请详细了解本章节。
本示例源码位于项目的
redis/redis-multi-index
模块下。
三、动态切换Redis DB实现
3.1 所需依赖
redis/redis-multi-index/pom.xml
:
<?xml version="1.0" encoding="UTF-8"?>
<project xmlns="http://maven.apache.org/POM/4.0.0"
xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
<parent>
<artifactId>redis</artifactId>
<groupId>com.gm</groupId>
<version>0.0.1-SNAPSHOT</version>
</parent>
<modelVersion>4.0.0</modelVersion>
<artifactId>redis-multi-index</artifactId>
<properties>
<maven.compiler.source>17</maven.compiler.source>
<maven.compiler.target>17</maven.compiler.target>
<project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
</properties>
<dependencies>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-web</artifactId>
</dependency>
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-redis</artifactId>
</dependency>
<dependency>
<groupId>com.alibaba</groupId>
<artifactId>fastjson</artifactId>
<version>1.2.83</version>
</dependency>
<dependency>
<groupId>org.aspectj</groupId>
<artifactId>aspectjweaver</artifactId>
</dependency>
</dependencies>
</project>
3.2 配置文件
src/main/resources/application.yml
:
server:
port: 6008
spring:
application:
name: @artifactId@
redis:
database: 1
host:
password:
port: 6379
timeout: 60000
lettuce:
pool:
max-active: -1
max-idle: -1
max-wait: -1
min-idle: -1
logging:
level:
org:
springframework:
boot:
autoconfigure:
logging: info
3.3 自定义注解类@RedisSelect
com/gm/multi/redis/config/select/annotation/RedisSelect.java
:
import java.lang.annotation.*;
/**
* 注解,用于切换同一redis数据源下的不同db index
*/
@Target(ElementType.METHOD)
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface RedisSelect
/**
* redis库 0 - 15 库
*
* @return
*/
int index() default 0;
3.4 自定义切换支持类RedisSelectSupport
com/gm/multi/redis/config/select/RedisSelectSupport.java
:
/**
* Redis 切换DB操作
*/
public class RedisSelectSupport
/**
* 定义一个静态变量,用于线程间传递值
*/
private static final ThreadLocal<Integer> DB_SELECT_CONTEXT = new ThreadLocal<>();
public static void selectIndex(int db)
DB_SELECT_CONTEXT.set(db);
public static Integer getSelectIndex()
return DB_SELECT_CONTEXT.get();
3.5 自定义切面类RedisIndexSelectAspect
com/gm/multi/redis/config/select/aspect/RedisIndexSelectAspect.java
:
import com.gm.multi.redis.config.select.RedisSelectSupport;
import com.gm.multi.redis.config.select.annotation.RedisSelect;
import com.gm.multi.redis.config.select.RedisSelectTemplate;
import lombok.extern.slf4j.Slf4j;
import org.aspectj.lang.ProceedingJoinPoint;
import org.aspectj.lang.annotation.Around;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.reflect.MethodSignature;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.boot.autoconfigure.condition.ConditionalOnBean;
import org.springframework.stereotype.Component;
import java.lang.reflect.Method;
@Slf4j
@Aspect
@Component
public class RedisIndexSelectAspect
@Value("$spring.redis.database:0")
private int defaultIndex;
/**
* 创建RedisSelect对应的切面,来对标有注解的方法拦截
*
* @param point
* @return
* @throws Throwable
*/
@Around("@annotation(com.gm.multi.redis.config.select.annotation.RedisSelect)")
@ConditionalOnBean(RedisSelectTemplate.class)
public Object configRedis(ProceedingJoinPoint point) throws Throwable
int index = defaultIndex;
try
MethodSignature signature = (MethodSignature) point.getSignature();
Method method = signature.getMethod();
RedisSelect config = method.getAnnotation(RedisSelect.class);
if (config != null)
index = config.index();
RedisSelectSupport.selectIndex(index);
return point.proceed();
finally
RedisSelectSupport.selectIndex(defaultIndex);
log.debug("redis index reset to ", index, defaultIndex);
3.6 封装支持切换DB的RedisSelectTemplate
com/gm/multi/redis/config/select/RedisSelectTemplate.java
:.
import org.springframework.data.redis.connection.RedisConnection;
import org.springframework.data.redis.core.RedisTemplate;
public class RedisSelectTemplate<K, V> extends RedisTemplate<K, V>
@Override
protected RedisConnection createRedisConnectionProxy(RedisConnection pm)
return super.createRedisConnectionProxy(pm);
/**
* 在连接Redis之前做一些配置
*
* @param connection
* @param existingConnection
* @return
*/
@Override
protected RedisConnection preProcessConnection(RedisConnection connection, boolean existingConnection)
Integer index = RedisSelectSupport.getSelectIndex();
if (index != null)
//切换 redis db 到 其他的库
connection.select(index);
return super.preProcessConnection(connection, existingConnection);
3.7 自定义配置类RedisIndexSelectConfig
com/gm/multi/redis/config/select/index/RedisIndexSelectConfig.java
:
import com.gm.multi.redis.config.select.RedisSelectTemplate;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.autoconfigure.condition.ConditionalOnMissingBean;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.data.redis.connection.lettuce.LettuceConnectionFactory;
import org.springframework.data.redis.serializer.GenericToStringSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
@Slf4j
@Configuration(proxyBeanMethods = false)
public class RedisIndexSelectConfig
@Autowired(required = false)
private LettuceConnectionFactory factory;
@Bean
@ConditionalOnMissingBean
public RedisSelectTemplate<String, Object> redisIndexSelectTemplate()
/**
* 使用默认注入的RedisConnectionFactory factory时,切换db时出现以下异常:
*
* java.lang.UnsupportedOperationException:Selecting a new database not supported due to shared connection.
* Use separate ConnectionFactorys to work with multiple databases
* 从默认RedisConnectionFactory factory注入,改为LettuceConnectionFactory factory,
* 并通过factory.setShareNativeConnection(false)关闭共享链接
*/
RedisSelectTemplate<String, Object> redisTemplate = new RedisSelectTemplate<>();
redisTemplate.setKeySerializer(new StringRedisSerializer());
redisTemplate.setHashKeySerializer(new StringRedisSerializer());
redisTemplate.setHashValueSerializer(new GenericToStringSerializer(Object.class));
redisTemplate.setValueSerializer(new StringRedisSerializer());
// 关闭共享链接
factory.setShareNativeConnection(false);
redisTemplate.setConnectionFactory(factory);
redisTemplate.afterPropertiesSet();
log.info("实例化 SelectableRedisTemplate 对象完成");
return redisTemplate;
LettuceConnectionFactory
默认创建的LettuceConnection
共享同一个线程安全的本地连接处理非阻塞和非事物的操作。
将shareNativeConnection
设置为false
可以让LettuceConnectionFactory
每次创建一个专用连接。
四、测试业务搭建
4.1 多种操作切换Redis DB的方式
通过以下方式获取支持Redis
同数据源可切换DB
的操作类RedisSelectTemplate
:
@Autowired
private RedisSelectTemplate<String, Object> redisTemplate;
4.1.1 注解@RedisSelect方式
com/gm/multi/redis/controller/RedisSelectIndexontroller.java
:
@RequestMapping("/one")
@RedisSelect(index = 1) //选择db1库
public String selectOne()
redisTemplate.opsForValue().set("one", "one_" + System.currentTimeMillis());
String one = (String) redisTemplate.opsForValue().get("one");
return one;
@RequestMapping("/two")
@RedisSelect(index = 2) //选择db2库
public String selectTwo()
redisTemplate.opsForValue().set("two", "two_" + System.currentTimeMillis());
String two = (String) redisTemplate.opsForValue().get("two");
return two;
4.1.2 设置RedisSelectSupport方式
com/gm/multi/redis/controller/RedisSelectIndexontroller.java
:
/**
* 同一个方法中切换不同的redis库
*
* @return
*/
@RequestMapping("/three")
@RedisSelect(index = 2) //选择db2库
public String selectThree()
redisTemplate.opsForValue().set("two", "two_" + System.currentTimeMillis());
String two = (String) redisTemplate.opsForValue().get("two");
log.info(two);
RedisSelectSupport.selectIndex(3);//此处切换到db3库
redisTemplate.opsForValue().set("three", "three_" + System.currentTimeMillis());
String three = (String) redisTemplate.opsForValue().get("three");
log.info(three);
return three;
4.2 多线程测试
com/gm/multi/redis/controller/RedisSelectIndexontroller.java
:
@Autowired
RedisMultiIndexService redisMultiIndexService;
@RequestMapping("/testMultiIndex")
public String testMultiIndex()
Thread thread[] = new Thread[500];
AtomicBoolean result = new AtomicBoolean(true);
for (int i = 0; i < thread.length; i++)
int finalI = i;
thread[i] = new Thread(() ->
try
redisMultiIndexService.testMultiIndex("Thread-" + finalI);
catch (Exception e)
e.printStackTrace();
result.set(false);
);
thread[i].setName("Thread-" + i);
for (int i = 0; i < thread.length; i++)
thread[i].start();
return "";
com/gm/multi/redis/service/RedisMultiIndexService.java
:
public interface RedisMultiIndexService
void testMultiIndex(String suffix);
com/gm/multi/redis/service/impl/RedisMultiIndexServiceImpl.java
:
import com.gm.multi.redis.config.select.RedisSelectSupport;
import com.gm.multi.redis.config.select.RedisSelectTemplate;
import com.gm.multi.redis.service.RedisMultiIndexService;
import lombok.extern.slf4j.Slf4j;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
@Slf4j
@Service
public class RedisMultiIndexServiceImpl implements RedisMultiIndexService
@Autowired
private RedisSelectTemplate<String, Object> redisSelectTemplate;
public void testMultiIndex(String suffix)
String defaultkv = "default-" + suffix + "-index";
String twokv = "two-" + suffix + "-index"
最近公司项目中 有需要用ElasticSearch (后续简称ES) 集成 SQL 查询功能,并可以按照请求参数动态切换目标数据源,同事找到我帮忙实现该功能,以前没做过,只好赶鸭子上架,
网上很多资料不全,瞎琢磨半天终于完成,记录了一些实现过程中踩过的坑,便于大家借鉴。
我们测试环境部署的是 ElasticSearch6.8.2 ,对应需要使用的jar需要是同版本的x-pack-sql-jdbc.jar 否则会报版本不一致错误.
不过该功能的开通需要铂金会员或者自己破解,具体的破解方案可以看看其他文章。以下介绍代码的具体实现.
切换数据源部分有参考下方链接代码,
https://blog.csdn.net/hekf2010/article/details/81155778
1. application.properties配置
server.port=6666
#主数据源
spring.datasource.url=jdbc:es://http://10.0.75.20:9200/
#es 从数据源 es1,es2
slave.datasource.names=es1,es2
#es1
slave.datasource.es1.url=jdbc:es://http://10.0.75.21:9200/
#es2
slave.datasource.es2.url=jdbc:es://http://10.0.75.22:9200/
#mapper.xml文件
mybatis.mapper-locations=classpath:mapper/*.xml
#实体类包
mybatis.type-aliases-package=com.kunlun.es.vo
2. 注册动态数据源.
PS:这个地方一开始以为要添加ES db驱动的,后面查看源码之后发现,这货压根就不需要添加EsDriver
import org.apache.log4j.Logger;
import org.elasticsearch.xpack.sql.jdbc.EsDataSource;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Author zhaozhiguo
* @Date 2020-11-04
* @Description 注册动态数据源
*/
public class DynamicDataSourceRegister implements ImportBeanDefinitionRegistrar, EnvironmentAware {
private Logger logger = Logger.getLogger(DynamicDataSourceRegister.class);
/***默认数据源***/
private DataSource deftDataSource;
/***自定义数据源***/
private Map<String, DataSource> slaveDataSources = new ConcurrentHashMap<>();
@Override
public void setEnvironment(Environment environment) {
initDefaultDataSource(environment);
initslaveDataSources(environment);
}
private void initDefaultDataSource(Environment env) {
// 读取主数据源
Properties properties = new Properties();
EsDataSource esDataSource = new EsDataSource();
esDataSource.setUrl( env.getProperty("spring.datasource.url"));
esDataSource.setProperties(properties);
deftDataSource = esDataSource;
}
private void initslaveDataSources(Environment env) {
// 读取配置文件获取更多数据源
String dsPrefixs = env.getProperty("slave.datasource.names");
for (String dsPrefix : dsPrefixs.split(",")) {
// 多个数据源
Properties properties = new Properties();
EsDataSource esDataSource = new EsDataSource();
esDataSource.setUrl(env.getProperty("slave.datasource." + dsPrefix + ".url"));
esDataSource.setProperties(properties);
slaveDataSources.put(dsPrefix, esDataSource);
}
}
@Override
public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
Map<Object, Object> targetDataSources = new HashMap<Object, Object>();
//添加默认数据源
targetDataSources.put("dataSource", this.deftDataSource);
DynamicDataSourceContextHolder.dataSourceIds.add("dataSource");
//添加其他数据源
targetDataSources.putAll(slaveDataSources);
for (String key : slaveDataSources.keySet()) {
DynamicDataSourceContextHolder.dataSourceIds.add(key);
}
//创建DynamicDataSource
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClass(DynamicDataSource.class);
beanDefinition.setSynthetic(true);
MutablePropertyValues mpv = beanDefinition.getPropertyValues();
mpv.addPropertyValue("defaultTargetDataSource", deftDataSource);
mpv.addPropertyValue("targetDataSources", targetDataSources);
//注册 - BeanDefinitionRegistry
beanDefinitionRegistry.registerBeanDefinition("dataSource", beanDefinition);
logger.info("Dynamic DataSource Registry");
}
3. 自定义注解,用于拦截 mapper 执行sql 时切换数据源
import java.lang.annotation.*;
/**
* @Author zhaozhiguo
* @Date 2020-11-04
* @Description 需要切换数据源注解
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TargetDataSource {
}
4. 请求参数
/**
* @Author zhaozg
* @Date 2020-11-04
* @Description SelectParam 查询参数
*/
public class SelectParam {
/**需要执行的SQL*/
private String sql;
/**执行SQL的数据源名称,需要和properties slave.datasource.names 匹配*/
private String dcName;
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public String getDcName() {
return dcName;
}
public void setDcName(String dcName) {
this.dcName = dcName;
}
}
5. AOP 监听动态切换数据源
import com.kunlun.es.vo.SelectParam;
import org.apache.log4j.Logger;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* @Author zhaozhiguo
* @Date 2020-11-04
* @Description 动态数据源通知
*/
@Aspect
@Order(-1)
@Component
public class DynamicDattaSourceAspect {
private Logger logger = Logger.getLogger(DynamicDattaSourceAspect.class);
//改变数据源
@Before("@annotation(targetDataSource)")
public void changeDataSource(JoinPoint joinPoint, TargetDataSource targetDataSource) {
Object[] str= joinPoint.getArgs();
SelectParam selectParams = (SelectParam) str[0];
if (!DynamicDataSourceContextHolder.isContainsDataSource(selectParams.getDcName())) {
logger.error("数据源 " + selectParams.getDcName() + " 不存在使用默认的数据源 -> " + joinPoint.getSignature());
} else {
logger.debug("使用数据源:" + selectParams.getDcName());
DynamicDataSourceContextHolder.setDataSourceType(selectParams.getDcName());
}
}
@After("@annotation(targetDataSource)")
public void clearDataSource(JoinPoint joinPoint, TargetDataSource targetDataSource) {
Object[] str= joinPoint.getArgs();
SelectParam selectParams = (SelectParam) str[0];
logger.debug("清除数据源 " + selectParams.getDcName()+ " !");
DynamicDataSourceContextHolder.clearDataSourceType();
}
}
6. Mapper下方法 添加 TargetDataSource 注解
import com.kunlun.es.config.TargetDataSource;
import com.kunlun.es.vo.SelectParam;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
import java.util.Map;
/**
* @Author zhaozhiguo
* @Date 2020-11-04
* @Description 动态数据源通知
*/
@Mapper
public interface SelectObjMapper {
@TargetDataSource
@Select("${selectParam.sql}")
List<Map> selectObj(@Param("selectParam") SelectParam selectParam);
}
7. 启动类,需要添加@Import(DynamicDataSourceRegister.class)
import com.kunlun.es.config.DynamicDataSourceRegister;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
/**
* @Author zhaozhiguo
* @Date 2020-11-03
* @Description 启动类
*/
@SpringBootApplication
@Import(DynamicDataSourceRegister.class)
public class EsSelectApplication {
public static void main(String[] args) {
SpringApplication.run(EsSelectApplication.class, args);
}
}
8. 查询 接口暴露
import com.kunlun.es.service.SelectObjService;
import com.kunlun.es.vo.SelectParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Map;
/**
* @Author zhaozhiguo
* @Date 2020-11-04
* @Description 查询接口
*/
@RestController
public class SelectObjController {
@Autowired
private SelectObjService selectObjService;
@PostMapping("/selectObj")
public List<Map> selectObj(@RequestBody SelectParam selectParam) {
return selectObjService.selectObj(selectParam);
}
}
9. 调用接口,大工告成!
源码就不上传了,整体实现思路还是比较清楚的,jar包是淘宝花了0.56 块大洋代下的 (此处吐槽CSDN 这个包要46积分)