多租户改造

Posted 格格巫 MMQ!!

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了多租户改造相关的知识,希望对你有一定的参考价值。

一、多租户概念
1.多租户是什么意思?怎么理解?
​ 多租户是一种单个软件实例可以为多个不同用户组提供服务的软件架构。在云计算中,多租户也可以指共享主机,其服务器资源将在不同客户之间进行分配。与多租户相对应的是单租户,单租户是指软件实例或计算机系统中有 1 个最终用户或用户组。

2.多租户架构的优势
多租户可以节省成本。计算规模越大,成本就越低,并且多租户还允许对资源进行有效地整合和分配,最终节省运营成本。 对于个人用户而言,访问云服务或 SAAS 应用所需的费用通常要比运行单租户硬件和软件更具成本效益。
多租户可以提高灵活性。如果您选择自行购置硬件和软件,那么在需求旺盛时可能会难以满足需求,而在需求疲软时则可能会闲置不用。另一方面,多租户云却可以根据用户的需要来灵活地扩展和缩减资源池。作为公共云提供商的客户,您可以在需要时获得额外容量,而在不需要时则无需付费。
多租户可以提高效率。多租户消除了单个用户管理基础架构及处理更新和维护的必要。每个租户都可以依靠中央云提供商(而不用自己组建团队)来处理这些日常琐事。
2.多租户技术特点
多个租户共享平台。
租户之间数据隔离。
租户之间发布更新互不影响。
4.多租户数据隔离实现
目前saas多租户系统的数据隔离有三种解决方案,即为每个租户提供独立的数据库、独立的表空间、按字段区分租户,每种方案都有其各自的适用情况。

最终选用以字段隔离为主,表隔离为辅的方案,其好处就是成本低,改造小。

一、数据库隔离

1.方案
使用 MyBatis-Plus 中的 TenantLineInnerInterceptor(多租户)插件和 DynamicTableNameInnerInterceptor(动态表名)插件 实现租户间数据隔离

2.相关依赖

com.baomidou mybatis-plus-boot-starter 3.5.2 com.alibaba transmittable-thread-local 2.12.2

3.实现
(1)思路
(前提:前端发起请求时或者在网关转发时向header中添加租户id)

首先从过滤器中拿到 request header 中的 tenantId放到线程上下文中,后续获取

(2)线程上下文(用于存放租户id )
import com.alibaba.ttl.TransmittableThreadLocal;
import lombok.experimental.UtilityClass;

/**

  • 多租户上下文

  • @author cherf

  • @since 2022-9-1
    */
    @UtilityClass
    public class TenantContextHolder

    /**

    • 支持父子线程数据传递
      */
      private final ThreadLocal THREAD_LOCAL_TENANT = new TransmittableThreadLocal<>();

    /**

    • 设置租户ID
    • @param tenantId 租户ID
      */
      public void setTenantId(String tenantId)
      THREAD_LOCAL_TENANT.set(tenantId);

    /**

    • 获取租户ID
    • @return String
      */
      public String getTenantId()
      return THREAD_LOCAL_TENANT.get();

    /**

    • 清除tenantId
      */
      public void clear()
      THREAD_LOCAL_TENANT.remove();

(3)过滤器(用户向上下文中添加租户id)
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
import org.springframework.web.filter.GenericFilterBean;

import javax.servlet.FilterChain;
import javax.servlet.ServletException;
import javax.servlet.ServletRequest;
import javax.servlet.ServletResponse;
import javax.servlet.http.HttpServletRequest;
import javax.servlet.http.HttpServletResponse;
import java.io.IOException;

/**

  • 租户上下文过滤器
  • @author cherf
  • @date 2022-08-30 13:28:00
    */
    @Slf4j
    @Component
    @Order(Ordered.HIGHEST_PRECEDENCE)
    public class TenantContextHolderFilter extends GenericFilterBean
    @Override
    @SneakyThrows
    public void doFilter(ServletRequest servletRequest, ServletResponse servletResponse, FilterChain filterChain) throws IOException, ServletException
    HttpServletRequest request = (HttpServletRequest) servletRequest;
    HttpServletResponse response = (HttpServletResponse) servletResponse;
    try
    String tenantId = request.getHeader(Constant.TENANT_ID);
    if (StringUtil.isBlank(tenantId))
    tenantId = Constant.TENANT_ID_DEFAULT;

    log.info(“获取到的租户ID为:”, tenantId);
    if (StringUtil.isNotBlank(tenantId))
    TenantContextHolder.setTenantId(tenantId);
    else
    if (StringUtil.isBlank(TenantContextHolder.getTenantId()))
    TenantContextHolder.setTenantId(Constant.TENANT_ID_DEFAULT);


    filterChain.doFilter(request, response);
    finally
    TenantContextHolder.clear();


/**
* 租户相关静态常量
*/
class Constant

public static final String TENANT = "tenant";

/**
 * header 中租户ID
 */
public static final String TENANT_ID = "tenantId";

/**
 * 默认租户ID
 */
public static final String TENANT_ID_DEFAULT = "tenant_id_default";


(4)需要配置多租户表名配置
import lombok.Getter;
import lombok.Setter;
import org.springframework.boot.context.properties.ConfigurationProperties;
import org.springframework.cloud.context.config.annotation.RefreshScope;

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

/**

  • @author cherf

  • @description: 租户配置属性(可以写在yml配置文件里,也可以在这儿写死)

  • @date 2022/08/30 13:57
    /
    @Getter
    @Setter
    @RefreshScope
    @ConfigurationProperties(prefix = “tenant”)
    public class TenantProperties
    /

    • 是否开启租户模式
      */
      private Boolean enable = true;

    /**

    • 需要排除的多租户的表(根据自己需要修改)
      /
      private List ignoreTables = Arrays.asList(“t_menu”, “t_oauth_client”,“t_tenant_info”,“t_server_info”);
      /
      *
    • 动态表名的表(根据自己需要修改)
      */
      private List dynamicTables = Arrays.asList(“t_opt_log”, “t_flow”);

    /**

    • 多租户字段名称(根据实际项目修改)
      */
      private String column = “tenant_id”;

    /**

    • 排除不进行租户隔离的sql
    • 样例全路径:com.cherf.system.sauth.mapper.AppAccessMapper.findList
      */
      private List ignoreSqls = new ArrayList<>();

(5)mybatis插件配置
import com.baomidou.mybatisplus.annotation.DbType;
import com.baomidou.mybatisplus.extension.plugins.MybatisPlusInterceptor;
import com.baomidou.mybatisplus.extension.plugins.handler.TenantLineHandler;
import com.baomidou.mybatisplus.extension.plugins.inner.DynamicTableNameInnerInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.PaginationInnerInterceptor;
import com.baomidou.mybatisplus.extension.plugins.inner.TenantLineInnerInterceptor;
import com.cherf.system.common.constant.StringPool;
import com.cherf.system.common.context.TenantContextHolder;
import com.cherf.system.common.util.StringUtil;
import lombok.AllArgsConstructor;
import net.sf.jsqlparser.expression.Expression;
import net.sf.jsqlparser.expression.NullValue;
import net.sf.jsqlparser.expression.StringValue;
import org.springframework.boot.autoconfigure.AutoConfigureBefore;
import org.springframework.boot.context.properties.EnableConfigurationProperties;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

/**

  • Mybatis-plus 插件,分页插件,多租户插件,动态表名插件

  • @author cherf

  • @date 2022年9月1日
    */
    @Configuration
    @AllArgsConstructor
    @AutoConfigureBefore(MybatisPlusConfig.class)
    @EnableConfigurationProperties(TenantProperties.class)
    public class MybatisPlusConfig

    private final TenantProperties tenantProperties;

    @Bean
    public MybatisPlusInterceptor paginationInterceptor()
    MybatisPlusInterceptor interceptor = new MybatisPlusInterceptor();

     //动态表名插件
     DynamicTableNameInnerInterceptor dynamicTableNameInnerInterceptor = new DynamicTableNameInnerInterceptor();
     dynamicTableNameInnerInterceptor.setTableNameHandler((sql, tableName) -> 
     	String tenantId = TenantContextHolder.getTenantId();
     	//符合的表名拼接租户号
     	if (tenantProperties.getDynamicTables().stream().anyMatch(
     			(t) -> t.equalsIgnoreCase(tableName))) 
     		return tableName + StringPool.UNDER_LINE + tenantId;
     	
     	return tableName;
     );
     interceptor.addInnerInterceptor(dynamicTableNameInnerInterceptor);
     
     
     // 新多租户插件配置,一缓和二缓遵循mybatis的规则,需要设置 MybatisConfiguration#useDeprecatedExecutor = false 避免缓存万一出现问题
     //租户拦截器
     interceptor.addInnerInterceptor(new TenantLineInnerInterceptor(new TenantLineHandler() 
    
     	/**
     	 * 获取租户ID
     	 * @return
     	 */
     	@Override
     	public Expression getTenantId() 
     		String tenantId = TenantContextHolder.getTenantId();
     		if (StringUtil.isNotEmpty(tenantId)) 
     			return new StringValue(tenantId);
     		
     		return new NullValue();
     	
    
     	/**
     	 * 获取多租户的字段名
     	 * @return String
     	 */
     	@Override
     	public String getTenantIdColumn() 
     		return tenantProperties.getColumn();
     	
    
     	/**
     	 * 过滤不需要根据租户隔离的表
     	 * 这是 default 方法,默认返回 false 表示所有表都需要拼多租户条件
     	 * @param tableName 表名
     	 */
     	@Override
     	public boolean ignoreTable(String tableName) 
     		return tenantProperties.getIgnoreTables().stream().anyMatch(
     				(t) -> tableName.startsWith(t) || tableName.equalsIgnoreCase(t)
     		);
     	
     ));
     // 如果用了分页插件注意先 add TenantLineInnerInterceptor 再 add PaginationInnerInterceptor
     interceptor.addInnerInterceptor(new PaginationInnerInterceptor(DbType.mysql));
     return interceptor;
    


数据库隔离到此完成!

然后测试sql后自动拼接 tenant_id 条件

注意:如果不想让 sql 拼接 tenant_id 可以在mapper 上添加 @InterceptorIgnore(tenantLine = “true”) 注解 ;

如图:

二、redis隔离

1.方案
原本思路是在 redis 每个操作的 key 前拼租户号,但是涉及的 太多 key ,挨个加太多,最终继承 RedisSerializer 类 重写方法,拼装租户id 实际实现如下:

package com.cherf.system.common.redis;

import com.cherf.system.common.constant.StringPool;
import com.cherf.system.common.context.TenantContextHolder;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.data.redis.serializer.RedisSerializer;
import org.springframework.data.redis.serializer.StringRedisSerializer;
import org.springframework.lang.Nullable;
import org.springframework.util.Assert;
import org.springframework.util.CollectionUtils;

import java.nio.charset.Charset;
import java.nio.charset.StandardCharsets;
import java.util.List;

/**

  • @author cherf

  • @Description 自定义序列化,用于保存租户号,隔离数据

  • @Date 2022/9/5
    */
    public class TenantStringRedisSerializer implements RedisSerializer

    private static final Logger logger = LoggerFactory.getLogger(TenantStringRedisSerializer.class);

    private final Charset charset;
    public static final StringRedisSerializer US_ASCII;
    public static final StringRedisSerializer ISO_8859_1;
    public static final StringRedisSerializer UTF_8;

    private static final String TENANT_PREFIX = “tid”;

    private static final Boolean IS_LOG = false;

    private RedisProperties redisProperties = new RedisProperties();

    public TenantStringRedisSerializer()
    this(StandardCharsets.UTF_8);

    public TenantStringRedisSerializer(Charset charset)
    Assert.notNull(charset, “Charset must not be null!”);
    this.charset = charset;

    public String deserialize(@Nullable byte[] bytes)

     return (bytes == null ? null : new String(bytes, charset).replaceFirst(TenantContextHolder.getTenantId()+":", ""));
    

    public byte[] serialize(@Nullable String string)

     if (StringUtils.isBlank(string))
         echoLog(string);
         return null;
     
    
     String tenantId = TenantContextHolder.getTenantId();
     if (StringUtils.isBlank(tenantId))
         echoLog(string);
         return string.getBytes(charset);
     
    
     // 本身带有多租户ID的不拼接
     if (string.indexOf(StringPool.COLON) > 0 && string.startsWith(TENANT_PREFIX))
         echoLog(string);
         return string.getBytes(charset);
     
    
     // true:拼接多租户ID
     Boolean flag = true;
     List<String> ignoreEqualsKeys = redisProperties.getIgnoreEqualsKeys();
     if (!CollectionUtils.isEmpty(ignoreEqualsKeys))
         for (String key: ignoreEqualsKeys
         ) 
             if (key.equals(string))
                 flag = false;
                 break;
             
         
     
    
     if (flag)
         List<String> ignoreContainsKeys = redisProperties.getIgnoreContainsKeys();
         if (!CollectionUtils.isEmpty(ignoreContainsKeys))
             for (String key: ignoreContainsKeys
             ) 
                 if (string.startsWith(key))
                     flag = false;
                     break;
                 
             
         
     
    
     if (flag)
         echoLog(tenantId + StringPool.COLON + string);
         return (tenantId + StringPool.COLON + string).getBytes(charset);
     
    
     echoLog(string);
     return string.getBytes(charset);
    

    private void echoLog(String key)

     if (IS_LOG)
         logger.info("redis的key:"+key);
     
    

    public Class<?> getTargetType()
    return String.class;

    static
    US_ASCII = new StringRedisSerializer(StandardCharsets.US_ASCII);
    ISO_8859_1 = new StringRedisSerializer(StandardCharsets.ISO_8859_1);
    UTF_8 = new StringRedisSerializer(StandardCharsets.UTF_8);

注:正常请求中是能从 header 获取到租户id ,但是一些非请求的代码无法实现,目前采用的方法比较笨,就是捞出所有的租户信息,然后去循环,每个循环里塞入到上下文,后续代码逻辑可以获取到,逻辑结束再 clear 掉(包括一些线程和启动逻辑代码)!

三、ES,logstash

1.方案
ES是最简单的,根据索引隔离,新增租户时新增相关索引,然后CRUD时查询时在索引上拼接租户id

四、文件相关

1.方案
文件也较为简单,将配置文件里文件目录配置添加变量,在 getPath 的时候将租户id拼上去,根据租户id进行文件隔离

五、Feign

1.方案
feign 调用也比较简单,feign 可以统一设置请求头 具体如下

2.实现
(1)feign 统一配置
package com.cherf.system.common.config;

import com.cherf.system.common.constant.Constant;
import com.cherf.system.common.context.TenantContextHolder;
import feign.RequestInterceptor;
import feign.RequestTemplate;

/**

  • @author cherf
  • @description: Feign调用时添加租户ID
  • @date 2022/09/03 14:46
    **/
    public class FeignConfig implements RequestInterceptor
    @Override
    public void apply(RequestTemplate requestTemplate)
    //TENANT_ID
    requestTemplate.header(Constant.TENANT_ID, TenantContextHolder.getTenantId());

(2)调用时添加配置,如图

六、定时任务(使用xxl-job,不一定适用其他)

1.方案
将定时任务改为区分多租户和不区分多租户的两种,不区分的不需要管,区分多租户的定时任务将 执行参数jobparam 设置成租户id ,然后在定时任务里获取后塞入上下文 TenantContextHolder ,后续mysql,redis,es,文件操作都能拿到tenanId。

由于定时任务获取租户id,塞入租户id,改的太多,我这边是将 xxl-job-core 执行器的 init 和 destory 改造后实现 ,具体如下:

2.实现
(1)执行器改造如图

(2)添加统一父类
package com.cherf.common.base;

import com.alibaba.fastjson.JSONObject;
import com.cherf.common.constant.TenantConstant;
import com.cherf.common.context.TenantContextHolder;
import com.cherf.common.util.StringUtil;
import com.xxl.job.core.context.XxlJobHelper;
import org.springframework.stereotype.Component;

/**

  • @author cherf
  • @description: 定时任务父类, 用来处理租户号(子类需重写init和destroy方法)
  • @description: (不进行租户隔离的job不用继承)
  • @date 2022/09/01 09:57
    **/
    @Component
    public class BaseSchedule

/**
* jobParam 格式 “tenantId”:“tidXXXXXXX”
* 获取 jobParam 中的 tenantId 塞入上下文
*/
public void init()
String jobParam = XxlJobHelper.getJobParam();
if (StringUtil.isNotEmpty(jobParam))
JSONObject jsonObject = JSONObject.parseObject(jobParam);
if (null != jsonObject && jsonObject.containsKey(TenantConstant.TENANT_ID))
TenantContextHolder.setTenantId(jsonObject.getString(TenantConstant.TENANT_ID));


/**
 * 清除上下文中的 tenantId
 */
public void destroy() 
    TenantContextHolder.clear();

(3)示例
package com.cherf.system.sys;

import com.cherf.system.common.base.BaseSchedule;
import com.cherf.system.common.context.TenantContextHolder;
import com.xxl.job.core.context.XxlJobHelper;
import com.xxl.job.core.handler.annotation.XxlJob;
import org.springframework.stereotype.Component;

/**

  • @author cherf

  • @description: test

  • @date 2022/09/01 10:25
    **/
    @Component
    public class TaskTest extends BaseSchedule
    @XxlJob(value = “TaskTest”, init = “init”, destroy = “destroy”)
    public void execute()
    XxlJobHelper.log(“start============================================”);
    XxlJobHelper.log(TenantContextHolder.getTenantId());
    XxlJobHelper.log(“end============================================”);
    XxlJobHelper.handleSuccess(“yunxingchenggong”);

    @Override
    public void init()
    super.init();

    @Override
    public void destroy()
    super.destroy();

3.注意
如图所示,由于执行器是通过反射获取 init 和 destroy 方法,子类必须重写父类方法,否则获取不到对应方法

七、nginx ,Gateway 动态 url 示例

1.Nginx示例
#user root;
worker_processes 1;

#error_log CHERF_BASE_HOME/cherf/logs/nginx/nginx_error.log crit;
#pid CHERF_BASE_HOME/cherf/conf/nginx/nginx.pid;

#error_log logs/error.log;
#error_log logs/error.log notice;
#error_log logs/error.log info;

#pid logs/nginx.pid;

worker_rlimit_nofile 65535;
events
use epoll;
worker_connections 65535;

http
include mime.types;
default_type application/octet-stream;
sendfile on;
keepalive_timeout 65;
client_max_body_size 500M;
server_tokens off;
autoindex off;

access_log CHERF_BASE_HOME/cherf/logs/nginx/nginx_access.log;
#sendfile        on;
#tcp_nopush     on;

#keepalive_timeout  0;
#keepalive_timeout  65;

#gzip  on;

##微服务网关负载均衡
upstream gatewayServer
server 127.0.0.1:9527 weight=1;

server 
        listen       9080;
        server_name  localhost;

        #charset koi8-r;

        #access_log  logs/host.access.log  main;

        location /ngx_status  
                stub_status on;
                access_log off;
                #allow 127.0.0.1;允许哪个ip可以访问
        
    

server 
    add_header Strict-Transport-Security "max-age=63072000; includeSubdomains; preload";
    add_header Content-Security-Policy "default-src 'self' * 'unsafe-inline' 'unsafe-eval' blob: data: ; upgrade-insecure-requests;";
    add_header X-Frame-Options SAMEORIGIN;
    add_header X-XSS-Protection "1; mode=block";
    add_header X-Content-Type-Options nosniff;

    listen                  443 ssl;
    #listen                 10080;
    server_name             _;
    #ssl                    on;
    ssl_certificate         CHERF_BASE_HOME/cherf/conf/nginx/ssl/server.crt; # 改成你的证书的名字
    ssl_certificate_key     CHERF_BASE_HOME/cherf/conf/nginx/ssl/server.key; # 你的证书的名字
    ssl_verify_depth        1;

    ssl_session_timeout 5m;
    ssl_protocols TLSv1.2;
    ssl_ciphers ECDHE-RSA-AES128-GCM-SHA256:ECDHE:ECDH:AES:HIGH:!NULL:!aNULL:!MD5:!ADH:!RC4:!DH:!DHE;
    ssl_prefer_server_ciphers on;

    #charset                koi8-r;

    #access_log  logs/host.access.log  main;
    error_page 404 403 500 502 503 504 /404.html;

    location = /404.html 
        root    CHERF_BASE_HOME/cherf/conf/nginx;
    

    ## IC模块【vue】
    location /ic 
       proxy_redirect off;
       proxy_set_header Host $host;
       proxy_set_header X-Real-IP $remote_addr;
       proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
       proxy_pass http://127.0.0.1:6770/;
    
    
 ## IC 接口)【java】
    location ~ ^/cherfApi/(.*)/ic/(.*) 
      proxy_redirect off;
      proxy_set_header Host $host;
      proxy_set_header X-Real-IP $remote_addr;
      proxy_set_header X-Forwarded-For $proxy_add_x_forwarded_for;
      proxy_pass http://gatewayServer/cherfApi/$1/ic/$2;
    
    #---------------------logstash--------------------------
    location = /lgs 
                    proxy_pass http://gatewayServer/lgs;
    

    location ~ ^/cherfApi/(.*)/lgs 
            proxy_pass http://gatewayServer/cherfApi/$1/lgs;
    

    # /ic/outLogin
    location ^~ /console/ 
            return 301  https://CHERF_CURRENT_HOST/ic/outLogin?$query_string;
    

   
    error_page   500 502 503 504  /50x.html;
    location = /50x.html 
        root   html;
    




##vue IC
server 
        listen       6770;
        server_name  _;
        root   CHERF_BASE_HOME/cherf/fronts/ic;
        index  index.html index.htm;
        #charset koi8-r;
        #access_log  logs/host.access.log  main;
        location / 
            root  CHERF_BASE_HOME/cherf/fronts/ic;
            try_files $uri $uri/ /index.html last;
            index  index.html index.htm;
        
        error_page   500 502 503 504  /50x.html;
        location = /50x.html 
            root   html;
        
    

2.Gateway 路由配置示例
jasypt:
encryptor:
password: 8e0a

server:
port: 9527
spring:
cloud:
nacos:
discovery:
# 不使用nacos的配置
enabled: false
server-addr: 127.0.0.1:8848
gateway:
discovery:
locator:
enabled: true #开启从注册中心动态创建路由的功能,利用微服务名进行路由
routes:

    # ************************************* IC 模块**********************************
    - id: i-cherf-tenant-ic-api #payment_route    #路由的ID,没有固定规则但要求唯一,建议配合服务名
      uri: lb://i-cherf-ic-api #匹配后提供服务的路由地址,lb为负载均衡的其中一种模式
      predicates:
        # 断言,路径相匹配的进行路由
      - Path= /cherf/t_tenantName/ic/**
      filters:
      # 过滤器去掉⼀个路径(租户参数)
      - StripPrefix=2

    - id: i-cherf-ic-api
      uri: lb://i-cherf-ic-api
      predicates:
      - Path= /ic/**

解决某些错误,启动覆盖

main:
allow-bean-definition-overriding: true

redis:
database: 0
port: 6379
host: 127.0.0.1
password: ENC(ygfK2l63KfPfFTfsswuXq3Sr+QPmyZYXpFCDnBLaJ2F8Hhi/Mbx3wE4ynHp8nmu/)

签名开关

signature:
base: true

接口访问权限

interface:
auth: false

oauth2:
cloud:
sys:
parameter:
ignoreUrls:

    # 原来校验TOKEN的白名单
    - /oauth/**


    # 加上多租户的校验TOKEN白名单
    - /isapi/t_tenantName/oauth/**

八、新增租户ddl,dml,es索引初始化示例

1.方案
ddl,dml初始化比较简单,将初始化 sql 写在文件中,再用流读出来,降替换符替换成对应的租户号再去执行sql;

es使用 elasticsearch-rest-high-level-client 包里自带的api去实现即可。

2.实现
(1) msysql 包括 ddl,dml
import cn.hutool.core.io.IoUtil;
import cn.hutool.core.io.resource.ClassPathResource;
import cn.hutool.core.io.resource.Resource;
@Service
public class AsyncExecutorIniTenant
/**
* 获取ini文件sql
*
* @param fileName
* @return
*/
private String getIniResourec(String fileName)
String sql = “”;
Resource resource = new ClassPathResource(fileName);
InputStream is = resource.getStream();
sql = IoUtil.readUtf8(is);
return sql;

//动态替换租户号和sql执行省略

(2) es索引
package com.cherf.sys.tanent.service.impl;

import cn.hutool.core.io.IoUtil;
import cn.hutool.core.io.resource.ClassPathResource;
import cn.hutool.core.io.resource.Resource;
import cn.hutool.log.Log;
import cn.hutool.log.LogFactory;
import com.cherf.common.enums.ElasticAliasEnum;
import com.cherf.common.enums.ElasticTypeFileEnum;
import com.cherf.common.exception.CherfException;
import org.elasticsearch.action.admin.indices.alias.Alias;
import org.elasticsearch.client.RequestOptions;
import org.elasticsearch.client.RestHighLevelClient;
import org.elasticsearch.client.indices.CreateIndexRequest;
import org.elasticsearch.client.indices.CreateIndexResponse;
import org.elasticsearch.client.indices.GetIndexRequest;
import org.elasticsearch.common.xcontent.XContentType;
import org.springframework.stereotype.Service;

import java.io.IOException;

@Service
public class AsyncExecutorIniTenant

protected Log log = LogFactory.get(getClass());

/**
 * 初始化elastic索引
 *
 * @param tenantId
 */
@Async("simpleTaskExecutor")
public Future<Boolean> initElastic(String tenantId) 

    try 
        long start = System.currentTimeMillis();
        RestHighLevelClient client = ElasticRestHignLevelClient.buildClient();
        createIndex(ElasticTypeFileEnum.DATA_ACCESS_INDEX.getCode(), tenantId, client);
        long end = System.currentTimeMillis();
        log.info("总共耗时" + (end - start) / 1000 + "秒");
     catch (Exception e) 
        log.error("租户管理:elastic索引初始化失败:" + e);
        return new AsyncResult<>(false);
    
    return new AsyncResult<>(true);



/**
 * 创建es索引
 */
private void createIndex(String indexName, String tenantId, RestHighLevelClient client) throws IOException 

    // 索引拼上租户号
    String tenantIndexName = tenantId + "_" + indexName;
    // 索引别名
    String tenantIndexAlias = tenantId + "_" + ElasticAliasEnum.getInfoByCode(indexName);
    // 读取分片配置文件
    String settings = ElasticTypeFileEnum.ELASTIC_SET.getInfo();
    // 读取索引json配置文件
    String indexMappingPath = ElasticTypeFileEnum.getInfoByCode(indexName);

    // 判断当前索引是否存在
    boolean exists = true;
    try 
        exists = client.indices().exists(new GetIndexRequest(tenantIndexName), RequestOptions.DEFAULT);
     catch (IOException e) 
        log.error("ES查询索引失败");
        throw new CherfException("ES查询索引失败");
    

    if (!exists) 
        // 索引名称
        CreateIndexRequest request = new CreateIndexRequest(tenantIndexName);
        // 索引别名
        request.alias(new Alias(tenantIndexAlias));

        //索引配置
        Resource resource = new ClassPathResource(settings);
        String settingJson = IoUtil.readUtf8(resource.getStream());
        request.settings(settingJson, XContentType.JSON);

        // 添加索引mapping
        //Map<String, Object> mapping = new HashMap<>();
        //mapping.put("t-job-index", properties);
        Resource indexMappingResource = new ClassPathResource(indexMappingPath);
        String mapping = IoUtil.readUtf8(indexMappingResource.getStream());
        request.mapping(mapping, XContentType.JSON);

        CreateIndexResponse response = client.indices().create(request, RequestOptions.DEFAULT);
    

ElasticTypeFileEnum示例:

package com.cherf.common.enums;

import java.io.File;
import java.util.Arrays;

public enum ElasticTypeFileEnum

ELASTIC_INI("elastic","elastic" + File.separator),
ELASTIC_SET("setting","elastic/settings.json" + "settings.json"),

DATA_ACCESS_INDEX("data-access-index-v3", ELASTIC_INI.getInfo() + "data-access-index.json");

private final String code;
private final String info;

ElasticTypeFileEnum(String code, String info) 
    this.code = code;
    this.info = info;


public String getCode() 
    return code;


public String getInfo() 
    return info;


/**
 * getByCode
 *
 * @param code key
 * @return AlertStatusEnum
 */
public static ElasticTypeFileEnum getByCode(String code) 
    return Arrays.stream(values()).filter(d -> d.getCode().equals(code)).findFirst().orElse(null);


/**
 * getInfoByCode
 *
 * @param code key
 * @return String
 */
public static String getInfoByCode(String code) 
    ElasticTypeFileEnum e = getByCode(code);
    return e != null ? e.getInfo() : "";

3.注意
​ 为了提高同步速度,在方法上添加 @Async(“threadPool”) 注解(需要在启动类或配置类加上@EnableAsync,才可生效),实现异步调用,threadPool为自定义线程池,可以用自己环境里公用的,也可以使用默认的 SimpleAsyncTaskExecutor 线程池

九、总结

注意 mybatis-plus 新多租户插件配置,一缓和二缓遵循mybatis的规则,需要设置 MybatisConfiguration #useDeprecatedExecutor = false 避免缓存万一出现问题;
如果是使用mybatis-plus3.4.1之前的版本,可以通过自定义一个TenantSqlParser解析器并重写processInsert方法,网上很多,可自行百度;
如果项目中使用了pagehelper需要注意版本应与mybatis-plus 对应,否则有可能启动报错 (我使用的版本:pagehelper 5.3.1,mybatis-plus 3.5.2)

以上是关于多租户改造的主要内容,如果未能解决你的问题,请参考以下文章

JeecgBoot开发多租户SAAS数据隔离,查询数据库,改造多租户后 Mybatis-plus 查询数据库的SQL语句tenant-id[租户Id] 一直为0

SpringBoot + BeetlSQL3 多租户改造

Jeecg-boot 多租户改造方案(涉及菜单部门角色等基础模块)

Jeecg-boot 2.4.6+ 多租户改造方案(涉及菜单部门角色等基础模块)

多租户SaaS的数据库设计模式

多租户系统中如何实现分别限流