定义Mybatis拦截器动态切换postgre数据库schema
Posted Neo Yang
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了定义Mybatis拦截器动态切换postgre数据库schema相关的知识,希望对你有一定的参考价值。
背景
随着业务的发展和合规要求,产品数据库将切换到Postgres。之前不同技术域,不同交付工程的数据分库管理的方式切换到PG数据库后将通过分schema管理。
ORM继续使用Mybatis,为使用迁移工作量尽可能小,现有的SQL代码不做大的修改。动态数据源实现考虑在Mybatis执行过程中做拦截,替换sql中的schema标识。
提取请求参数中的schema
约定rest接口请求Header参数中增加schema信息。通过切面技术从请求头中提取schema后保存到线程变量。
1. 提取schema
package com.postgres.manager;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.aspectj.lang.annotation.Pointcut;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.context.request.RequestContextHolder;
import org.springframework.web.context.request.ServletRequestAttributes;
import javax.servlet.http.HttpServletRequest;
/**
* Schema切面, 提取header头中的schema保存到SchemaHolder中
*
* @author elon
* @since 2022-03-20
*/
@Aspect
@Component
@Order(9999)
public class SchemaAspect
@Pointcut("@annotation(org.springframework.web.bind.annotation.GetMapping) "
+ "|| @annotation(org.springframework.web.bind.annotation.PostMapping) "
+ "|| @annotation(org.springframework.web.bind.annotation.DeleteMapping) "
+ "|| @annotation(org.springframework.web.bind.annotation.RequestMapping)")
void schema()
/**
* 从请求头提取
*
* @param joinPoint
*/
@Before("schema()")
public void setSchema(JoinPoint joinPoint)
String schema = getSchemaFromHeader();
SchemaHolder.set(schema);
@After("schema()")
public void clearSchema(JoinPoint joinPoint)
SchemaHolder.clear();
/**
* 从请求头中后去schema信息
*
* @return schema
*/
private String getSchemaFromHeader()
HttpServletRequest request = ((ServletRequestAttributes) RequestContextHolder.getRequestAttributes()).getRequest();
String schema = request.getHeader("schema");
return schema;
2. 保存schema的线程变量类
package com.postgres.manager;
/**
* Schema持有类. 用于在异步线程或者跨多个方法传递schema信息
*
* @author elon
* @since 2022-03-19
*/
public class SchemaHolder
private static ThreadLocal<String> schema = new ThreadLocal<>();
public static void set(String sch)
schema.set(sch);
public static String get()
return schema.get();
public static void clear()
schema.remove();
定义Mybatis拦截器
1. 定义拦截器注解,用于修饰DAO层级接口
package com.postgres.manager;
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;
/**
* schema拦截器注解。修饰mapper接口类,用以区分访问的pg数据库schema
*
* @author elon
* @since 2022-03-20
*/
@Target(ElementType.TYPE)
@Retention(RetentionPolicy.RUNTIME)
public @interface SchemaInterceptAnnotation
/**
* schema类型。取值范围:business, common
*
* @return
*/
String schemaType() default "";
在DAO层接口类加上该注解,拦截器会动态切换schema.
package com.postgres.mapper;
import com.postgres.manager.SchemaInterceptAnnotation;
import com.postgres.model.ExamResult;
import com.postgres.model.User;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import java.util.List;
@Mapper
@SchemaInterceptAnnotation(schemaType = "business")
public interface UserMapper
/**
* 从schema获取user数据
*
* @return user列表
*/
List<User> getUserFromSchema(@Param("name") String name);
/**
* 插入用户数据到schema
*
* @param userList 用户列表
*/
void insertUser2Schema(@Param("list") List<User> userList);
/**
* 获取测试成绩.
*
* @return 测试成绩列表
*/
List<ExamResult> getExamResult();
2. 拦截器替换sql中的表名为schema.表名
package com.postgres.manager;
import org.apache.ibatis.executor.statement.StatementHandler;
import org.apache.ibatis.mapping.BoundSql;
import org.apache.ibatis.mapping.MappedStatement;
import org.apache.ibatis.plugin.*;
import org.apache.ibatis.reflection.DefaultReflectorFactory;
import org.apache.ibatis.reflection.MetaObject;
import org.apache.ibatis.reflection.SystemMetaObject;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.lang.reflect.Field;
import java.sql.Connection;
import java.util.Properties;
/**
* StatementHandler拦截器. 在prepare方法执行前拦截,修改sql语句,增加schema.
*
* @author elon
* @since 2022-03-20
*/
@Component
@Intercepts(@Signature(type = StatementHandler.class, method = "prepare", args = Connection.class, Integer.class))
public class StatementHandlerInterceptor implements Interceptor
private static final Logger LOGGER = LoggerFactory.getLogger(StatementHandlerInterceptor.class);
/**
* 业务数据分schema存储
*/
private static final String BUSINESS_SCHEMA = "business";
/**
* 公共的配置数据(不分schema), 固定库
*/
private static final String COMMON_SCHEMA = "common";
@Override
public Object intercept(Invocation invocation) throws Throwable
StatementHandler statementHandler = (StatementHandler) invocation.getTarget();
MetaObject metaObject = MetaObject.forObject(statementHandler, SystemMetaObject.DEFAULT_OBJECT_FACTORY,
SystemMetaObject.DEFAULT_OBJECT_WRAPPER_FACTORY, new DefaultReflectorFactory());
MappedStatement mappedStatement = (MappedStatement) metaObject.getValue("delegate.mappedStatement");
String mapperMethod = mappedStatement.getId();
BoundSql boundSql = statementHandler.getBoundSql();
String sql = boundSql.getSql();
String mapperClass = mapperMethod.substring(0, mappedStatement.getId().lastIndexOf("."));
Class<?> classType = Class.forName(mapperClass);
SchemaInterceptAnnotation interceptAnnotation = classType.getAnnotation(SchemaInterceptAnnotation.class);
String schemaType = interceptAnnotation.schemaType();
String newSql = replaceSqlWithSchema(schemaType, sql, mapperMethod);
//通过反射修改sql语句
Field field = boundSql.getClass().getDeclaredField("sql");
field.setAccessible(true);
field.set(boundSql, newSql);
return invocation.proceed();
@Override
public Object plugin(Object object)
if (object instanceof StatementHandler)
return Plugin.wrap(object, this);
else
return object;
@Override
public void setProperties(Properties properties)
private String replaceSqlWithSchema(String schemaType, String originalSql, String mapperMethod)
// 替换sql中的表名,加上schema
if (BUSINESS_SCHEMA.equals(schemaType))
String schema = SchemaHolder.get();
return originalSql.replaceAll(" t_", " " + schema + ".t_");
else if (COMMON_SCHEMA.equals(schemaType))
return originalSql.replaceAll(" t_", " " + COMMON_SCHEMA + ".t_");
else
LOGGER.error("Invalid SchemaInterceptAnnotation. mapperMethod:", mapperMethod);
throw new IllegalArgumentException("Invalid SchemaInterceptAnnotation.");
2. 添加拦截器
加上如下处理, 拦截器才会生效
package com.postgres.config;
import com.postgres.manager.StatementHandlerInterceptor;
import org.apache.ibatis.session.SqlSessionFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Configuration;
import javax.annotation.PostConstruct;
import java.util.List;
@Configuration
public class InterceptorConfig
@Autowired
private List<SqlSessionFactory> sqlSessionFactoryList;
@PostConstruct
public void addSqlInterceptor()
StatementHandlerInterceptor interceptor = new StatementHandlerInterceptor();
for (SqlSessionFactory sqlSessionFactory : sqlSessionFactoryList)
sqlSessionFactory.getConfiguration().addInterceptor(interceptor);
完整的Demo代码还包括DataSource配置和XML中SQL,这些和普通的Spring Boot项目无异。参考github上的完整实现代码:https://github.com/ylforever/elon-postgres
以上是关于定义Mybatis拦截器动态切换postgre数据库schema的主要内容,如果未能解决你的问题,请参考以下文章
定义Mybatis拦截器动态切换postgre数据库schema
mybatis plus自定义的mapper如何动态切换数据源