最近公司项目中 有需要用ElasticSearch (后续简称ES) 集成 SQL 查询功能,并可以按照请求参数动态切换目标数据源,同事找到我帮忙实现该功能,以前没做过,只好赶鸭子上架,
网上很多资料不全,瞎琢磨半天终于完成,记录了一些实现过程中踩过的坑,便于大家借鉴。
我们测试环境部署的是 ElasticSearch6.8.2 ,对应需要使用的jar需要是同版本的x-pack-sql-jdbc.jar 否则会报版本不一致错误.
不过该功能的开通需要铂金会员或者自己破解,具体的破解方案可以看看其他文章。以下介绍代码的具体实现.
切换数据源部分有参考下方链接代码,
https://blog.csdn.net/hekf2010/article/details/81155778
1. application.properties配置
server.port=6666
#主数据源
spring.datasource.url=jdbc:es://http://10.0.75.20:9200/
#es 从数据源 es1,es2
slave.datasource.names=es1,es2
#es1
slave.datasource.es1.url=jdbc:es://http://10.0.75.21:9200/
#es2
slave.datasource.es2.url=jdbc:es://http://10.0.75.22:9200/
#mapper.xml文件
mybatis.mapper-locations=classpath:mapper/*.xml
#实体类包
mybatis.type-aliases-package=com.kunlun.es.vo
2. 注册动态数据源.
PS:这个地方一开始以为要添加ES db驱动的,后面查看源码之后发现,这货压根就不需要添加EsDriver
import org.apache.log4j.Logger;
import org.elasticsearch.xpack.sql.jdbc.EsDataSource;
import org.springframework.beans.MutablePropertyValues;
import org.springframework.beans.factory.support.BeanDefinitionRegistry;
import org.springframework.beans.factory.support.GenericBeanDefinition;
import org.springframework.context.EnvironmentAware;
import org.springframework.context.annotation.ImportBeanDefinitionRegistrar;
import org.springframework.core.env.Environment;
import org.springframework.core.type.AnnotationMetadata;
import javax.sql.DataSource;
import java.util.HashMap;
import java.util.Map;
import java.util.Properties;
import java.util.concurrent.ConcurrentHashMap;
/**
* @Author zhaozhiguo
* @Date 2020-11-04
* @Description 注册动态数据源
*/
public class DynamicDataSourceRegister implements ImportBeanDefinitionRegistrar, EnvironmentAware {
private Logger logger = Logger.getLogger(DynamicDataSourceRegister.class);
/***默认数据源***/
private DataSource deftDataSource;
/***自定义数据源***/
private Map<String, DataSource> slaveDataSources = new ConcurrentHashMap<>();
@Override
public void setEnvironment(Environment environment) {
initDefaultDataSource(environment);
initslaveDataSources(environment);
}
private void initDefaultDataSource(Environment env) {
// 读取主数据源
Properties properties = new Properties();
EsDataSource esDataSource = new EsDataSource();
esDataSource.setUrl( env.getProperty("spring.datasource.url"));
esDataSource.setProperties(properties);
deftDataSource = esDataSource;
}
private void initslaveDataSources(Environment env) {
// 读取配置文件获取更多数据源
String dsPrefixs = env.getProperty("slave.datasource.names");
for (String dsPrefix : dsPrefixs.split(",")) {
// 多个数据源
Properties properties = new Properties();
EsDataSource esDataSource = new EsDataSource();
esDataSource.setUrl(env.getProperty("slave.datasource." + dsPrefix + ".url"));
esDataSource.setProperties(properties);
slaveDataSources.put(dsPrefix, esDataSource);
}
}
@Override
public void registerBeanDefinitions(AnnotationMetadata annotationMetadata, BeanDefinitionRegistry beanDefinitionRegistry) {
Map<Object, Object> targetDataSources = new HashMap<Object, Object>();
//添加默认数据源
targetDataSources.put("dataSource", this.deftDataSource);
DynamicDataSourceContextHolder.dataSourceIds.add("dataSource");
//添加其他数据源
targetDataSources.putAll(slaveDataSources);
for (String key : slaveDataSources.keySet()) {
DynamicDataSourceContextHolder.dataSourceIds.add(key);
}
//创建DynamicDataSource
GenericBeanDefinition beanDefinition = new GenericBeanDefinition();
beanDefinition.setBeanClass(DynamicDataSource.class);
beanDefinition.setSynthetic(true);
MutablePropertyValues mpv = beanDefinition.getPropertyValues();
mpv.addPropertyValue("defaultTargetDataSource", deftDataSource);
mpv.addPropertyValue("targetDataSources", targetDataSources);
//注册 - BeanDefinitionRegistry
beanDefinitionRegistry.registerBeanDefinition("dataSource", beanDefinition);
logger.info("Dynamic DataSource Registry");
}
3. 自定义注解,用于拦截 mapper 执行sql 时切换数据源
import java.lang.annotation.*;
/**
* @Author zhaozhiguo
* @Date 2020-11-04
* @Description 需要切换数据源注解
*/
@Target({ElementType.TYPE, ElementType.METHOD})
@Retention(RetentionPolicy.RUNTIME)
@Documented
public @interface TargetDataSource {
}
4. 请求参数
/**
* @Author zhaozg
* @Date 2020-11-04
* @Description SelectParam 查询参数
*/
public class SelectParam {
/**需要执行的SQL*/
private String sql;
/**执行SQL的数据源名称,需要和properties slave.datasource.names 匹配*/
private String dcName;
public String getSql() {
return sql;
}
public void setSql(String sql) {
this.sql = sql;
}
public String getDcName() {
return dcName;
}
public void setDcName(String dcName) {
this.dcName = dcName;
}
}
5. AOP 监听动态切换数据源
import com.kunlun.es.vo.SelectParam;
import org.apache.log4j.Logger;
import org.aspectj.lang.JoinPoint;
import org.aspectj.lang.annotation.After;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Before;
import org.springframework.core.annotation.Order;
import org.springframework.stereotype.Component;
/**
* @Author zhaozhiguo
* @Date 2020-11-04
* @Description 动态数据源通知
*/
@Aspect
@Order(-1)
@Component
public class DynamicDattaSourceAspect {
private Logger logger = Logger.getLogger(DynamicDattaSourceAspect.class);
//改变数据源
@Before("@annotation(targetDataSource)")
public void changeDataSource(JoinPoint joinPoint, TargetDataSource targetDataSource) {
Object[] str= joinPoint.getArgs();
SelectParam selectParams = (SelectParam) str[0];
if (!DynamicDataSourceContextHolder.isContainsDataSource(selectParams.getDcName())) {
logger.error("数据源 " + selectParams.getDcName() + " 不存在使用默认的数据源 -> " + joinPoint.getSignature());
} else {
logger.debug("使用数据源:" + selectParams.getDcName());
DynamicDataSourceContextHolder.setDataSourceType(selectParams.getDcName());
}
}
@After("@annotation(targetDataSource)")
public void clearDataSource(JoinPoint joinPoint, TargetDataSource targetDataSource) {
Object[] str= joinPoint.getArgs();
SelectParam selectParams = (SelectParam) str[0];
logger.debug("清除数据源 " + selectParams.getDcName()+ " !");
DynamicDataSourceContextHolder.clearDataSourceType();
}
}
6. Mapper下方法 添加 TargetDataSource 注解
import com.kunlun.es.config.TargetDataSource;
import com.kunlun.es.vo.SelectParam;
import org.apache.ibatis.annotations.Mapper;
import org.apache.ibatis.annotations.Param;
import org.apache.ibatis.annotations.Select;
import java.util.List;
import java.util.Map;
/**
* @Author zhaozhiguo
* @Date 2020-11-04
* @Description 动态数据源通知
*/
@Mapper
public interface SelectObjMapper {
@TargetDataSource
@Select("${selectParam.sql}")
List<Map> selectObj(@Param("selectParam") SelectParam selectParam);
}
7. 启动类,需要添加@Import(DynamicDataSourceRegister.class)
import com.kunlun.es.config.DynamicDataSourceRegister;
import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.context.annotation.Import;
/**
* @Author zhaozhiguo
* @Date 2020-11-03
* @Description 启动类
*/
@SpringBootApplication
@Import(DynamicDataSourceRegister.class)
public class EsSelectApplication {
public static void main(String[] args) {
SpringApplication.run(EsSelectApplication.class, args);
}
}
8. 查询 接口暴露
import com.kunlun.es.service.SelectObjService;
import com.kunlun.es.vo.SelectParam;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.PostMapping;
import org.springframework.web.bind.annotation.RequestBody;
import org.springframework.web.bind.annotation.RestController;
import java.util.List;
import java.util.Map;
/**
* @Author zhaozhiguo
* @Date 2020-11-04
* @Description 查询接口
*/
@RestController
public class SelectObjController {
@Autowired
private SelectObjService selectObjService;
@PostMapping("/selectObj")
public List<Map> selectObj(@RequestBody SelectParam selectParam) {
return selectObjService.selectObj(selectParam);
}
}
9. 调用接口,大工告成!
源码就不上传了,整体实现思路还是比较清楚的,jar包是淘宝花了0.56 块大洋代下的 (此处吐槽CSDN 这个包要46积分)