源码阅读——Sharding-JDBC分库分表中间件

Posted MyClass社区

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了源码阅读——Sharding-JDBC分库分表中间件相关的知识,希望对你有一定的参考价值。

(sharding-jdbc架构设计-图来自官网文档)

        最近在研究分库分表,之前业务也有过分库分表,依赖Mybatis插件来实现的,在查询Mapper上拦截处理路由字段,并且赋值ThreadLocal上下文来实现路由的库和表管理,感觉很简便,但是有很多功能没有支持到位,如果没有ES的支持,很多查询都不能实现,但是现在ES的流行,对分库分表的要求可能更加随意,这样一来,分库分表只是开始,接下来更多的工作可能就是数据同步和ES查询。

        当前已经很多公司已经在用Sharding-JDBC,很多功能也支持的很好,我觉得是一个可取的分库分表的方案。接下来从SJ的文档一个简单的demo入手,看一下简单的查询分库分表是如何实现的,和阅读Sharding-JDBC源码阅读的一些笔记

DEMO示例

 
   
   
 
  1. /**

  2. * 当当分库分表组件实践

  3. *

  4. * @author zhangzuizui

  5. * @date 2018/1/9

  6. */

  7. public class ShardingJDBC {

  8.    public static void main(String[] args) throws SQLException {

  9.        Map<String, DataSource> dataSourceMap = new HashMap<>();

  10.        // 配置第一个数据源

  11.        dataSourceMap.put("my_shard_01",createDataSource("my_shard_01"));

  12.        // 配置第二个数据源

  13.        dataSourceMap.put("my_shard_02", createDataSource("my_shard_02"));

  14.        // 配置Order表规则

  15.        TableRuleConfiguration orderTableRuleConfig = new TableRuleConfiguration();

  16.        orderTableRuleConfig.setLogicTable("my_order");

  17.        orderTableRuleConfig.setActualDataNodes("my_shard_0${1..2}.my_order_00${1..2}");

  18.        // 配置分库策略

  19.        orderTableRuleConfig.setDatabaseShardingStrategyConfig(new InlineShardingStrategyConfiguration("order_id", "my_shard_0${order_id.hashCode()%2+1}"));

  20.        // 配置分表策略

  21.        orderTableRuleConfig.setTableShardingStrategyConfig(new InlineShardingStrategyConfiguration("order_id", "my_order_00${order_id.hashCode()%2+1}"));

  22.        // 配置分片规则

  23.        ShardingRuleConfiguration shardingRuleConfig = new ShardingRuleConfiguration();

  24.        shardingRuleConfig.getTableRuleConfigs().add(orderTableRuleConfig);

  25.        // 省略配置order_item表规则...

  26.        Properties properties = new Properties();

  27.        properties.setProperty("sql.show","true");

  28.        // 获取数据源对象

  29.        DataSource dataSource = ShardingDataSourceFactory.createDataSource(dataSourceMap, shardingRuleConfig, new ConcurrentHashMap(), properties);

  30.        String sql = "SELECT * FROM my_order WHERE order_id = ?";

  31.        //2.获取连接

  32.        Connection conn = dataSource.getConnection();

  33.        /**        

  34.         * sql预处理

  35.         */

  36.        PreparedStatement pstmt = conn.prepareStatement(sql);

  37.        pstmt.setString(1, "000001");

  38.        //4.SQL执行和结果归并

  39.        ResultSet rs = pstmt.executeQuery();

  40.        //5.获取结果

  41.        while(rs.next()) {

  42.            System.out.println("id="+rs.getInt(1)+",order_id="+rs.getString(2));

  43.        }

  44.    }

  45.    /**

  46.     * 创建数据源

  47.     * @param dataSourceName

  48.     * @return

  49.     */

  50.    private static DataSource createDataSource(String dataSourceName) {

  51.        BasicDataSource result = new BasicDataSource();

  52.        result.setDriverClassName(com.mysql.jdbc.Driver.class.getName());

  53.        result.setUrl(String.format("jdbc:mysql://localhost:3306/%s", dataSourceName));

  54.        result.setUsername("root");

  55.        result.setPassword("123456");

  56.        return result;

  57.    }

  58. }

Sharding-JDBC源码简单的流程梳理

一.初始化配置文件

 
   
   
 
  1. 1.创建数据源:通过ShardingDataSourceFactory创建datasource,会初始化下面几个重要的对象:

  2.    a.执行引擎:executorEngine,这里会初始化一个线程池;

  3.    b.一些配置信息:是否打印sqlshowSQL=true,和一些shardingProperties;

  4.    c.ShardingContextshardingRule规则,数据库类型:getDatabaseType,和上面的执行引擎等参数;

  5. 2.获取连接:getConnect:返回一个new ShardingConnection(this.shardingContext);

二.sql预处理

 
   
   
 
  1. 1.构造PreparedStatementRoutingEngine();


  1. public PreparedStatementRoutingEngine(String logicSQL, ShardingContext shardingContext) {

  2.        this.logicSQL = logicSQL;

  3.        this.sqlRouter = SQLRouterFactory.createSQLRouter(shardingContext);

  4.    }

  5. 2.根据isDatabaseShardingOnly判断构造那个sqlRouter

  6. (SQLRouter)(HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingContext) : new ParsingSQLRouter(shardingContext));

  7.    a.提示sqlRouter:DatabaseHintSQLRouter

  8.    b.解释sqlRouter:ParsingSQLRouter

三.Sharding-JDBC执行

        里边干了很多事,首先是sql解析和路由,然后再executeQuery,最后处理结果集

 
   
   
 
  1. public ResultSet executeQuery() throws SQLException {

  2.        ShardingResultSet result;

  3.        try {

  4.            Collection<PreparedStatementUnit> preparedStatementUnits = this.route();

  5.            List<ResultSet> resultSets = (new PreparedStatementExecutor(this.getConnection().getShardingContext().getExecutorEngine(), this.routeResult.getSqlStatement().getType(), preparedStatementUnits, this.getParameters())).executeQuery();

  6.            result = new ShardingResultSet(resultSets, (new MergeEngine(resultSets, (SelectStatement)this.routeResult.getSqlStatement())).merge(), this);

  7.        } finally {

  8.            this.clearBatch();

  9.        }

  10.        this.currentResultSet = result;

  11.        return result;

  12.    }

1.sql解析

 
   
   
 
  1.  //首先加载Mysql的字典目录,里边都是定义好的Mysql相关的关键字枚举;

  2.  //再根据lexerEngine.nextToken()获取不同的token(就是一些关键字)然后走到不同的parse流程,

  3.  //最后处理获取结果SQLStatement->SelectStatement

  4.  public SQLRouteResult route(List<Object> parameters) {

  5.        if (null == this.sqlStatement) {

  6.            this.sqlStatement = this.sqlRouter.parse(this.logicSQL, parameters.size());

  7.        }

  8.        return this.sqlRouter.route(this.logicSQL, parameters, this.sqlStatement);

  9.    }

  10. //解析是基于LexerEngine, lexerEngine.nextToken();下面是select的解析过程:

  11. protected void parseInternal(SelectStatement selectStatement) {

  12.        this.parseDistinct();

  13.        this.parseSelectOption();

  14.        this.parseSelectList(selectStatement, this.getItems());

  15.        this.parseFrom(selectStatement);

  16.        this.parseWhere(this.getShardingRule(), selectStatement, this.getItems());

  17.        this.parseGroupBy(selectStatement);

  18.        this.parseHaving();

  19.        this.parseOrderBy(selectStatement);

  20.        this.parseLimit(selectStatement);

  21.        this.parseSelectRest();

  22.    }

2.获取路由库和表结果,这里将路由库的流程贴一下,首先获取配置的分库策略:

 
   
   
 
  1. private Collection<String> routeDataSources(TableRule tableRule, List<ShardingValue> databaseShardingValues) {

  2.        Collection<String> availableTargetDatabases = tableRule.getActualDatasourceNames();

  3.        if (databaseShardingValues.isEmpty()) {

  4.            return availableTargetDatabases;

  5.        } else {

  6.            Collection<String> result = this.shardingRule.getDatabaseShardingStrategy(tableRule).doSharding(availableTargetDatabases, databaseShardingValues);

  7.            Preconditions.checkState(!result.isEmpty(), "no database route info");

  8.            return result;

  9.        }

  10.    }

3.然后进行分库操作,将一个表达式和路由字段value交给groovy代理对象执行类调用call(),然后将路由结果返回:

 
   
   
 
  1. public Collection<String> doSharding(Collection<String> availableTargetNames, Collection<ShardingValue> shardingValues) {

  2.        ShardingValue shardingValue = (ShardingValue)shardingValues.iterator().next();

  3.        Preconditions.checkState(shardingValue instanceof ListShardingValue, "Inline strategy cannot support range sharding.");

  4.        Collection<String> shardingResult = this.doSharding((ListShardingValue)shardingValue);

  5.        Collection<String> result = new TreeSet(String.CASE_INSENSITIVE_ORDER);

  6.        result.addAll(shardingResult);

  7.        return result;

  8.    }

  9.    private Collection<String> doSharding(ListShardingValue shardingValue) {

  10.        Collection<String> result = new LinkedList();

  11.        Iterator i$ = this.transferToPreciseShardingValues(shardingValue).iterator();

  12.        while(i$.hasNext()) {

  13.            PreciseShardingValue<?> each = (PreciseShardingValue)i$.next();

  14.            result.add(this.execute(each));

  15.        }

  16.        return result;

  17.    }

  18. private String execute(PreciseShardingValue shardingValue) {

  19.        Closure<?> result = this.closure.rehydrate(new Expando(), (Object)null, (Object)null);

  20.        result.setResolveStrategy(3);

  21.        result.setProperty(shardingValue.getColumnName(), shardingValue.getValue());

  22.        return result.call().toString();

  23.    }

4.执行路由sql,进行数据库操作:

1.JDBC常规流程:

 
   
   
 
  1. List<ResultSet> resultSets = new PreparedStatementExecutor(

  2.                    getConnection().getShardingContext().getExecutorEngine(), routeResult.getSqlStatement().getType(), preparedStatementUnits, getParameters()).executeQuery();

2.核心执行逻辑

 
   
   
 
  1. private  <T> List<T> execute(

  2.            final SQLType sqlType, final Collection<? extends BaseStatementUnit> baseStatementUnits,

  3.            final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback) throws SQLException {

  4.        if (baseStatementUnits.isEmpty()) {

  5.            return Collections.emptyList();

  6.        }

  7.        OverallExecutionEvent event = new OverallExecutionEvent(sqlType, baseStatementUnits.size());

  8.        EventBusInstance.getInstance().post(event);

  9.        Iterator<? extends BaseStatementUnit> iterator = baseStatementUnits.iterator();

  10.        BaseStatementUnit firstInput = iterator.next();

  11.        //首先异步线程池执行

  12.        ListenableFuture<List<T>> restFutures = asyncExecute(sqlType, Lists.newArrayList(iterator), parameterSets, executeCallback);

  13.        T firstOutput;

  14.        List<T> restOutputs;

  15.        try {

  16.            //这里将第一个Statement同步执行,就不再开线程进行多线程处理了,避免多开线程,和线程切换,带来资源浪费

  17.            firstOutput = syncExecute(sqlType, firstInput, parameterSets, executeCallback);

  18.            restOutputs = restFutures.get();

  19.            //CHECKSTYLE:OFF

  20.        } catch (final Exception ex) {

  21.            //CHECKSTYLE:ON

  22.            event.setException(ex);

  23.            event.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);

  24.            EventBusInstance.getInstance().post(event);

  25.            ExecutorExceptionHandler.handleException(ex);

  26.            return null;

  27.        }

  28.        event.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);

  29.        EventBusInstance.getInstance().post(event);

  30.        List<T> result = Lists.newLinkedList(restOutputs);

  31.        //最后合并结果,将同步的和异步的合并返回

  32.        result.add(0, firstOutput);

  33.        return result;

  34.    }

3.内部执行器

 
   
   
 
  1. //executeInternal,这里通过回调函数,sqlStatement执行调用JDBC的PreparedStatement,并将结果返回

  2. private <T> T executeInternal(final SQLType sqlType, final BaseStatementUnit baseStatementUnit, final List<List<Object>> parameterSets, final ExecuteCallback<T> executeCallback,

  3.                          final boolean isExceptionThrown, final Map<String, Object> dataMap) throws Exception {

  4.        synchronized (baseStatementUnit.getStatement().getConnection()) {

  5.            T result;

  6.            ExecutorExceptionHandler.setExceptionThrown(isExceptionThrown);

  7.            ExecutorDataMap.setDataMap(dataMap);

  8.            List<AbstractExecutionEvent> events = new LinkedList<>();

  9.            if (parameterSets.isEmpty()) {

  10.                events.add(getExecutionEvent(sqlType, baseStatementUnit, Collections.emptyList()));

  11.            }

  12.            for (List<Object> each : parameterSets) {

  13.                events.add(getExecutionEvent(sqlType, baseStatementUnit, each));

  14.            }

  15.            for (AbstractExecutionEvent event : events) {

  16.                EventBusInstance.getInstance().post(event);

  17.            }

  18.            try {

  19.                //这里通过回调函数,执行sqlStatement

  20.                result = executeCallback.execute(baseStatementUnit);

  21.            } catch (final SQLException ex) {

  22.                for (AbstractExecutionEvent each : events) {

  23.                    each.setEventExecutionType(EventExecutionType.EXECUTE_FAILURE);

  24.                    each.setException(ex);

  25.                    EventBusInstance.getInstance().post(each);

  26.                    ExecutorExceptionHandler.handleException(ex);

  27.                }

  28.                return null;

  29.            }

  30.            for (AbstractExecutionEvent each : events) {

  31.                each.setEventExecutionType(EventExecutionType.EXECUTE_SUCCESS);

  32.                EventBusInstance.getInstance().post(each);

  33.            }

  34.            return result;

  35.        }

  36.    }

4.这里简单的说一下集成Mybaitis时,在queryFromDatabase处,从SharingConnect创建PreparedStatement->ShardingPreparedStatement 所有ps.execute();这里就会走到Sharding-jdbc中来,进行分库分表的数据库操作,如果没有,Mybatis就直接会调用JDBC的executor。

 
   
   
 
  1. public <E> List<E> query(Statement statement, ResultHandler resultHandler) throws SQLException {

  2.        PreparedStatement ps = (PreparedStatement)statement;

  3.        ps.execute();

  4.        return this.resultSetHandler.handleResultSets(ps);

  5.    }

四.结果并归:

 
   
   
 
  1. result = new ShardingResultSet(resultSets, (new MergeEngine(resultSets, (SelectStatement)this.routeResult.getSqlStatement())).merge(), this);

         大家如果有相关的经验,可以简单的评论一下当前的分库分表中间件(SJ,Mycat,阿里的TDDL等)的优缺点,性能上,系统复杂度上,还有一些建议,都可以。

以上是关于源码阅读——Sharding-JDBC分库分表中间件的主要内容,如果未能解决你的问题,请参考以下文章

数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 路由之分库分表路由

数据库分库分表中间件 Sharding-JDBC 源码分析 —— 分布式主键

数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 执行

数据库分库分表中间件 Sharding-JDBC 源码分析 —— 分布式主键

数据库分库分表中间件 Sharding-JDBC 源码分析 —— SQL 解析之删除SQL

SpringBoot 2.0 整合sharding-jdbc中间件,实现数据分库分表