shardingsphere分析

Posted smileice

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了shardingsphere分析相关的知识,希望对你有一定的参考价值。

1、前言
=
鉴于最近shardingSphere非常火爆,分析分析下其中的实现

2、常见的sharding的策略
=

实现层面 | 应用框架 | 优劣分析
---|---|---
数据访问层(DAO层) | 适合在该层进行自主研发 | 不受ORM框架的制约、实现起来较为简单、易于根据系统特点进行灵活的定制、无需SQL解析和路由规则匹配,性能上表现会稍好一些;劣势在于:有一定的技术门槛,工作量比依靠框架实现要大(反过来看,框架会有学习成本)、不通用,只能在特定系统里工作。当然,在DAO层同样可以通过XML配置或是注解将sharding逻辑抽离到“外部”,形成一套通用的框架. 不过目前还没有出现此类的框架
ORM 框架层 | Guzz、Hibernate Shards、MybatisSharding | 目前的hibernate shards来看,表现还算不上令人满意,主要是它对使用hibernate的限制过多,比如它对HQL的支持就非常有限,针对Mybatis,可以在拦截器层做数据的分表,通过改写SqlSession 去实现数据源的路由(该方式的路由只能通过MapperId,MapperNameSpace和参数等几种方式实现路由数据源,不能通过表路由数据源),但是对simple和batch模式对处理不太友好。
JDBC API 层 | dbShards、sharding-sphere | JDBC API层是很多人都会想到的一个实现sharding的绝佳场所,工作量较大,幸好有sharding-sphere帮我们做了这些
DAO与JDBC之间的Spring 数据库访问封装层 | CobarClient 或在 该层自主开发 | 该层和ORM 框架层 差不多,需要根据封装做业务等出来
应用服务器与数据库之间的代理层| mysql Proxy、Amoeba,mycat,sharding-Sphere| 该方式对业务来说完全隔离,开发人员不太需要关注分表分库对事儿,但是开发需要知道有哪些是不能做的,哪些是可以做的,在开发出现问题时,需要相关负责代理层的人协助排查。

3、JDBC 连接过程
=

```
//创建数据源
DataSource dataSource = new com.zaxxer.hikari.HikariDataSource();

// 获取连接
Connetion conn =dataSource.getConnetion();
// 建立PreparedStatement对象
Statement stmt=conn.createStatement();
//执行SQL查询
String sql="select * from users";
ResultSet rs=stmt.executeQuery(sql);

//建立PreparedStatement对象
String sql="select * from user where userName=? and password=?";
PreparedStatement pstmt=Conn.prepareStatement(sql);
pstmt.setString(1,"admin");
pstmt.setString(2,"liubin");
//执行动态SQL查询
ResultSet rs=pstmt.executeQuery();

//执行insert update delete等语句,先定义sql
stmt.executeUpdate(sql);

```
通过上面的代码可以发现DataSource,Connetion,ResultSet,PreparedStatement 这几个核心类,于是sharding-sphere重新实现上面几个接口,实现分表分库。

4、sharding-sphere实现的JDBC核心类
=

- MasterSlaveConnection
- ShardingConnection
- MasterSlaveDataSource
- ShardingDataSource
- MasterSlavePreparedStatement
- MasterSlaveStatement
- ShardingPreparedStatement
- ShardingStatement

根据名称我们可以看出
1. MasterSlaveConnection、MasterSlaveDataSource、MasterSlavePreparedStatement、MasterSlaveStatement 根据主从数据库实现的类
2. ShardingConnection、ShardingDataSource、ShardingPreparedStatement、ShardingStatement 根据多数据库实现的类。


先看看主从相关的几个类的源码

```
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.shardingjdbc.jdbc.core.connection;

import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractConnectionAdapter;
import io.shardingsphere.shardingjdbc.jdbc.core.datasource.MasterSlaveDataSource;
import io.shardingsphere.shardingjdbc.jdbc.core.statement.MasterSlavePreparedStatement;
import io.shardingsphere.shardingjdbc.jdbc.core.statement.MasterSlaveStatement;
import io.shardingsphere.transaction.api.TransactionType;
import lombok.Getter;

import javax.sql.DataSource;
import java.sql.DatabaseMetaData;
import java.sql.PreparedStatement;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Map;

/**
* Connection that support master-slave.
*
* @author zhangliang
* @author zhaojun
*/
@Getter
public final class MasterSlaveConnection extends AbstractConnectionAdapter

private final MasterSlaveDataSource masterSlaveDataSource;

private final Map<String, DataSource> dataSourceMap;

public MasterSlaveConnection(final MasterSlaveDataSource masterSlaveDataSource, final Map<String, DataSource> dataSourceMap)
this(masterSlaveDataSource, dataSourceMap, TransactionType.LOCAL);


public MasterSlaveConnection(final MasterSlaveDataSource masterSlaveDataSource, final Map<String, DataSource> dataSourceMap, final TransactionType transactionType)
super(transactionType);
this.masterSlaveDataSource = masterSlaveDataSource;
this.dataSourceMap = dataSourceMap;


@Override
public DatabaseMetaData getMetaData()
return masterSlaveDataSource.getDatabaseMetaData();


// 创建主从的分表分库Statement
@Override
public Statement createStatement()
return new MasterSlaveStatement(this);


// 创建主从的分表分库Statement
@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency)
return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency);


// 创建主从的分表分库Statement
@Override
public Statement createStatement(final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability)
return new MasterSlaveStatement(this, resultSetType, resultSetConcurrency, resultSetHoldability);


// 创建主从的分表分库prepareStatement
@Override
public PreparedStatement prepareStatement(final String sql) throws SQLException
return new MasterSlavePreparedStatement(this, sql);


// 创建主从的分表分库prepareStatement
@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException
return new MasterSlavePreparedStatement(this, sql, resultSetType, resultSetConcurrency);


// 创建主从的分表分库prepareStatement
@Override
public PreparedStatement prepareStatement(final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException
return new MasterSlavePreparedStatement(this, sql, resultSetType, resultSetConcurrency, resultSetHoldability);


// 创建主从的分表分库prepareStatement
@Override
public PreparedStatement prepareStatement(final String sql, final int autoGeneratedKeys) throws SQLException
return new MasterSlavePreparedStatement(this, sql, autoGeneratedKeys);


// 创建主从的分表分库prepareStatement
@Override
public PreparedStatement prepareStatement(final String sql, final int[] columnIndexes) throws SQLException
return new MasterSlavePreparedStatement(this, sql, columnIndexes);


// 创建主从的分表分库prepareStatement
@Override
public PreparedStatement prepareStatement(final String sql, final String[] columnNames) throws SQLException
return new MasterSlavePreparedStatement(this, sql, columnNames);

```

 

```
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.shardingjdbc.jdbc.core.datasource;

import io.shardingsphere.api.ConfigMapContext;
import io.shardingsphere.api.config.rule.MasterSlaveRuleConfiguration;
import io.shardingsphere.core.constant.properties.ShardingProperties;
import io.shardingsphere.core.rule.MasterSlaveRule;
import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractDataSourceAdapter;
import io.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection;
import io.shardingsphere.transaction.api.TransactionTypeHolder;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;

import javax.sql.DataSource;
import java.sql.Connection;
import java.sql.DatabaseMetaData;
import java.sql.SQLException;
import java.util.Map;
import java.util.Properties;

/**
* Master-slave data source.
*
* @author zhangliang
* @author panjuan
* @author zhaojun
*/
@Getter
@Slf4j
public class MasterSlaveDataSource extends AbstractDataSourceAdapter

private final DatabaseMetaData databaseMetaData;

private final MasterSlaveRule masterSlaveRule;

private final ShardingProperties shardingProperties;

public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRuleConfiguration masterSlaveRuleConfig,
final Map<String, Object> configMap, final Properties props) throws SQLException
super(dataSourceMap);
databaseMetaData = getDatabaseMetaData(dataSourceMap);
if (!configMap.isEmpty())
ConfigMapContext.getInstance().getConfigMap().putAll(configMap);

this.masterSlaveRule = new MasterSlaveRule(masterSlaveRuleConfig);
shardingProperties = new ShardingProperties(null == props ? new Properties() : props);


public MasterSlaveDataSource(final Map<String, DataSource> dataSourceMap, final MasterSlaveRule masterSlaveRule, final Map<String, Object> configMap, final Properties props) throws SQLException
super(dataSourceMap);
databaseMetaData = getDatabaseMetaData(dataSourceMap);
if (!configMap.isEmpty())
ConfigMapContext.getInstance().getConfigMap().putAll(configMap);

this.masterSlaveRule = masterSlaveRule;
shardingProperties = new ShardingProperties(null == props ? new Properties() : props);


private DatabaseMetaData getDatabaseMetaData(final Map<String, DataSource> dataSourceMap) throws SQLException
try (Connection connection = dataSourceMap.values().iterator().next().getConnection())
return connection.getMetaData();



@Override
public final MasterSlaveConnection getConnection()
return new MasterSlaveConnection(this, getShardingTransactionalDataSources().getDataSourceMap(), TransactionTypeHolder.get());

```

 

```
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.shardingjdbc.jdbc.core.statement;

import com.google.common.base.Preconditions;
import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant;
import io.shardingsphere.core.routing.router.masterslave.MasterSlaveRouter;
import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractMasterSlavePreparedStatementAdapter;
import io.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection;
import lombok.AccessLevel;
import lombok.Getter;

import java.sql.PreparedStatement;
import java.sql.ResultSet;
import java.sql.SQLException;
import java.util.Collection;
import java.util.LinkedList;

/**
* PreparedStatement that support master-slave.
*
* @author zhangliang
* @author panjuan
*/
@Getter
public final class MasterSlavePreparedStatement extends AbstractMasterSlavePreparedStatementAdapter

private final MasterSlaveConnection connection;

@Getter(AccessLevel.NONE)
private final MasterSlaveRouter masterSlaveRouter;

private final Collection<PreparedStatement> routedStatements = new LinkedList<>();

public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql) throws SQLException
this(connection, sql, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);


public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency) throws SQLException
this(connection, sql, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);


public MasterSlavePreparedStatement(
final MasterSlaveConnection connection, final String sql, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability) throws SQLException
this.connection = connection;
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(),
connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
for (String each : masterSlaveRouter.route(sql))
PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(preparedStatement);



public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int autoGeneratedKeys) throws SQLException
this.connection = connection;
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(),
connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
for (String each : masterSlaveRouter.route(sql))
PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, autoGeneratedKeys);
routedStatements.add(preparedStatement);



public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final int[] columnIndexes) throws SQLException
this.connection = connection;
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(),
connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
for (String each : masterSlaveRouter.route(sql))
PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, columnIndexes);
routedStatements.add(preparedStatement);



public MasterSlavePreparedStatement(final MasterSlaveConnection connection, final String sql, final String[] columnNames) throws SQLException
this.connection = connection;
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(),
connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
for (String each : masterSlaveRouter.route(sql))
PreparedStatement preparedStatement = connection.getConnection(each).prepareStatement(sql, columnNames);
routedStatements.add(preparedStatement);



@Override
public ResultSet executeQuery() throws SQLException
Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support executeQuery for DDL");
return routedStatements.iterator().next().executeQuery();


@Override
public int executeUpdate() throws SQLException
int result = 0;
for (PreparedStatement each : routedStatements)
result += each.executeUpdate();

return result;


@Override
public boolean execute() throws SQLException
boolean result = false;
for (PreparedStatement each : routedStatements)
result = each.execute();

return result;


@Override
public void clearBatch() throws SQLException
Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support clearBatch for DDL");
routedStatements.iterator().next().clearBatch();


@Override
public void addBatch() throws SQLException
Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support addBatch for DDL");
routedStatements.iterator().next().addBatch();


@Override
public int[] executeBatch() throws SQLException
Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support executeBatch for DDL");
return routedStatements.iterator().next().executeBatch();


@Override
public ResultSet getResultSet() throws SQLException
Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support getResultSet for DDL");
return routedStatements.iterator().next().getResultSet();


@Override
public ResultSet getGeneratedKeys() throws SQLException
Preconditions.checkArgument(1 == routedStatements.size(), "Cannot support getGeneratedKeys for DDL");
return routedStatements.iterator().next().getGeneratedKeys();


@Override
public int getResultSetHoldability() throws SQLException
return routedStatements.iterator().next().getResultSetHoldability();


@Override
public int getResultSetConcurrency() throws SQLException
return routedStatements.iterator().next().getResultSetConcurrency();


@Override
public int getResultSetType() throws SQLException
return routedStatements.iterator().next().getResultSetType();

```


```
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.shardingjdbc.jdbc.core.statement;

import com.google.common.base.Preconditions;
import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant;
import io.shardingsphere.core.routing.router.masterslave.MasterSlaveRouter;
import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractStatementAdapter;
import io.shardingsphere.shardingjdbc.jdbc.core.connection.MasterSlaveConnection;
import lombok.AccessLevel;
import lombok.Getter;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.Collection;
import java.util.LinkedList;

/**
* Statement that support master-slave.
*
* @author zhangliang
* @author panjuan
*/
@Getter
public final class MasterSlaveStatement extends AbstractStatementAdapter

private final MasterSlaveConnection connection;

@Getter(AccessLevel.NONE)
private final MasterSlaveRouter masterSlaveRouter;

private final int resultSetType;

private final int resultSetConcurrency;

private final int resultSetHoldability;

private final Collection<Statement> routedStatements = new LinkedList<>();

public MasterSlaveStatement(final MasterSlaveConnection connection)
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);


public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency)
this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);


public MasterSlaveStatement(final MasterSlaveConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability)
super(Statement.class);
this.connection = connection;
masterSlaveRouter = new MasterSlaveRouter(connection.getMasterSlaveDataSource().getMasterSlaveRule(),
connection.getMasterSlaveDataSource().getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW));
this.resultSetType = resultSetType;
this.resultSetConcurrency = resultSetConcurrency;
this.resultSetHoldability = resultSetHoldability;


@Override
public ResultSet executeQuery(final String sql) throws SQLException
clearPrevious();
Collection<String> dataSourceNames = masterSlaveRouter.route(sql);
Preconditions.checkState(1 == dataSourceNames.size(), "Cannot support executeQuery for DML or DDL");
Statement statement = connection.getConnection(dataSourceNames.iterator().next()).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
return statement.executeQuery(sql);


@Override
public int executeUpdate(final String sql) throws SQLException
clearPrevious();
int result = 0;
for (String each : masterSlaveRouter.route(sql))
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql);

return result;


@Override
public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException
clearPrevious();
int result = 0;
for (String each : masterSlaveRouter.route(sql))
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql, autoGeneratedKeys);

return result;


@Override
public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException
clearPrevious();
int result = 0;
for (String each : masterSlaveRouter.route(sql))
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql, columnIndexes);

return result;


@Override
public int executeUpdate(final String sql, final String[] columnNames) throws SQLException
clearPrevious();
int result = 0;
for (String each : masterSlaveRouter.route(sql))
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result += statement.executeUpdate(sql, columnNames);

return result;


@Override
public boolean execute(final String sql) throws SQLException
clearPrevious();
boolean result = false;
for (String each : masterSlaveRouter.route(sql))
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql);

return result;


@Override
public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException
clearPrevious();
boolean result = false;
for (String each : masterSlaveRouter.route(sql))
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql, autoGeneratedKeys);

return result;


@Override
public boolean execute(final String sql, final int[] columnIndexes) throws SQLException
clearPrevious();
boolean result = false;
for (String each : masterSlaveRouter.route(sql))
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql, columnIndexes);

return result;


@Override
public boolean execute(final String sql, final String[] columnNames) throws SQLException
clearPrevious();
boolean result = false;
for (String each : masterSlaveRouter.route(sql))
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql, columnNames);

return result;


@Override
public ResultSet getGeneratedKeys() throws SQLException
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().getGeneratedKeys();


@Override
public ResultSet getResultSet() throws SQLException
Preconditions.checkState(1 == routedStatements.size());
return routedStatements.iterator().next().getResultSet();


private void clearPrevious() throws SQLException
for (Statement each : routedStatements)
each.close();

routedStatements.clear();

```

根据源码我们可以发现MasterSlaveConnection、MasterSlaveDataSource 只是拼装数据,MasterSlavePreparedStatement、MasterSlaveStatement和路由方式差不多,我们现在重点分析MasterSlaveStatement

我们现在重点分析下面方法
```
@Override
public boolean execute(final String sql) throws SQLException
clearPrevious();
boolean result = false;
for (String each : masterSlaveRouter.route(sql))
Statement statement = connection.getConnection(each).createStatement(resultSetType, resultSetConcurrency, resultSetHoldability);
routedStatements.add(statement);
result = statement.execute(sql);

return result;

```

其中的路由主要通过masterSlaveRouter和Sql进行处理的

```
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.core.routing.router.masterslave;

import io.shardingsphere.core.constant.SQLType;
import io.shardingsphere.core.hint.HintManagerHolder;
import io.shardingsphere.core.parsing.SQLJudgeEngine;
import io.shardingsphere.core.rule.MasterSlaveRule;
import io.shardingsphere.core.util.SQLLogger;
import lombok.RequiredArgsConstructor;

import java.util.ArrayList;
import java.util.Collection;
import java.util.Collections;

/**
* Master slave router interface.
*
* @author zhangliang
* @author panjuan
*/
@RequiredArgsConstructor
public final class MasterSlaveRouter

private final MasterSlaveRule masterSlaveRule;

private final boolean showSQL;

/**
* Route Master slave.
*
* @param sql SQL
* @return data source names
*/
// TODO for multiple masters may return more than one data source
public Collection<String> route(final String sql)
Collection<String> result = route(new SQLJudgeEngine(sql).judge().getType());
if (showSQL)
SQLLogger.logSQL(sql, result);

return result;


private Collection<String> route(final SQLType sqlType)
if (isMasterRoute(sqlType))
MasterVisitedManager.setMasterVisited();
return Collections.singletonList(masterSlaveRule.getMasterDataSourceName());

return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())));


private boolean isMasterRoute(final SQLType sqlType)
return SQLType.DQL != sqlType || MasterVisitedManager.isMasterVisited() || HintManagerHolder.isMasterRouteOnly();

```

我们看下是否是Master路由的方式

```
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.core.constant;

/**
* SQL Type.
*
* @author zhangliang
*/
public enum SQLType

/**
* Data Query Language.
*
* <p>Such as @code SELECT.</p>
*/
DQL,

/**
* Data Manipulation Language.
*
* <p>Such as @code INSERT, @code UPDATE, @code DELETE.</p>
*/
DML,

/**
* Data Definition Language.
*
* <p>Such as @code CREATE, @code ALTER, @code DROP, @code TRUNCATE.</p>
*/
DDL,

/**
* Transaction Control Language.
*
* <p>Such as @code SET, @code COMMIT, @code ROLLBACK, @code SAVEPOIINT, @code BEGIN.</p>
*/
TCL,

/**
* Database administrator Language.
*/
DAL,

/**
* Database control Language.
*/
DCL

```

通过SQLType的类型说明和SQLType.DQL != sqlType 我们知道非查询的请求都是按Master路由的方式进行路由,
查询的SQL通过 MasterVisitedManager.isMasterVisited() || HintManagerHolder.isMasterRouteOnly() 这两个条件决定是否是Master路由,这两个方法之后在分析。

 

```
private Collection<String> route(final SQLType sqlType)
if (isMasterRoute(sqlType))
MasterVisitedManager.setMasterVisited();
return Collections.singletonList(masterSlaveRule.getMasterDataSourceName());

return Collections.singletonList(masterSlaveRule.getLoadBalanceAlgorithm().getDataSource(
masterSlaveRule.getName(), masterSlaveRule.getMasterDataSourceName(), new ArrayList<>(masterSlaveRule.getSlaveDataSourceNames())));

```
这代码也非常清晰,如果是主路由,直接用主库,如果非主路由,通过配置的LoadBalanceAlgorithm算法进行路由,这就不细说了。


现在重点看看sharding 这块路由的实现,同理,我们只看ShardingStatement


```
/*
* Copyright 2016-2018 shardingsphere.io.
* <p>
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
*
* http://www.apache.org/licenses/LICENSE-2.0
*
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
* </p>
*/

package io.shardingsphere.shardingjdbc.jdbc.core.statement;

import com.google.common.base.Optional;
import io.shardingsphere.core.constant.properties.ShardingPropertiesConstant;
import io.shardingsphere.core.executor.sql.execute.result.StreamQueryResult;
import io.shardingsphere.core.merger.MergeEngine;
import io.shardingsphere.core.merger.MergeEngineFactory;
import io.shardingsphere.core.merger.QueryResult;
import io.shardingsphere.core.parsing.parser.sql.dal.DALStatement;
import io.shardingsphere.core.parsing.parser.sql.dml.insert.InsertStatement;
import io.shardingsphere.core.parsing.parser.sql.dql.DQLStatement;
import io.shardingsphere.core.parsing.parser.sql.dql.select.SelectStatement;
import io.shardingsphere.core.routing.SQLRouteResult;
import io.shardingsphere.core.routing.StatementRoutingEngine;
import io.shardingsphere.core.routing.router.sharding.GeneratedKey;
import io.shardingsphere.shardingjdbc.executor.StatementExecutor;
import io.shardingsphere.shardingjdbc.jdbc.adapter.AbstractStatementAdapter;
import io.shardingsphere.shardingjdbc.jdbc.core.ShardingContext;
import io.shardingsphere.shardingjdbc.jdbc.core.connection.ShardingConnection;
import io.shardingsphere.shardingjdbc.jdbc.core.resultset.GeneratedKeysResultSet;
import io.shardingsphere.shardingjdbc.jdbc.core.resultset.ShardingResultSet;
import lombok.Getter;

import java.sql.ResultSet;
import java.sql.SQLException;
import java.sql.Statement;
import java.util.ArrayList;
import java.util.Collection;
import java.util.List;

/**
* Statement that support sharding.
*
* @author gaohongtao
* @author caohao
* @author zhangliang
* @author zhaojun
* @author panjuan
*/
public final class ShardingStatement extends AbstractStatementAdapter

@Getter
private final ShardingConnection connection;

private final StatementExecutor statementExecutor;

private boolean returnGeneratedKeys;

private SQLRouteResult routeResult;

private ResultSet currentResultSet;

public ShardingStatement(final ShardingConnection connection)
this(connection, ResultSet.TYPE_FORWARD_ONLY, ResultSet.CONCUR_READ_ONLY, ResultSet.HOLD_CURSORS_OVER_COMMIT);


public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency)
this(connection, resultSetType, resultSetConcurrency, ResultSet.HOLD_CURSORS_OVER_COMMIT);


public ShardingStatement(final ShardingConnection connection, final int resultSetType, final int resultSetConcurrency, final int resultSetHoldability)
super(Statement.class);
this.connection = connection;
statementExecutor = new StatementExecutor(resultSetType, resultSetConcurrency, resultSetHoldability, connection);


@Override
public ResultSet executeQuery(final String sql) throws SQLException
ResultSet result;
try
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getShardingContext().getDatabaseType(), connection.getShardingContext().getShardingRule(),
routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable(), statementExecutor.executeQuery());
result = new ShardingResultSet(statementExecutor.getResultSets(), mergeEngine.merge(), this);
finally
currentResultSet = null;

currentResultSet = result;
return result;


@Override
public int executeUpdate(final String sql) throws SQLException
try
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.executeUpdate();
finally
refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement());
currentResultSet = null;



@Override
public int executeUpdate(final String sql, final int autoGeneratedKeys) throws SQLException
if (RETURN_GENERATED_KEYS == autoGeneratedKeys)
returnGeneratedKeys = true;

try
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.executeUpdate(autoGeneratedKeys);
finally
currentResultSet = null;



@Override
public int executeUpdate(final String sql, final int[] columnIndexes) throws SQLException
returnGeneratedKeys = true;
try
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.executeUpdate(columnIndexes);
finally
currentResultSet = null;



@Override
public int executeUpdate(final String sql, final String[] columnNames) throws SQLException
returnGeneratedKeys = true;
try
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.executeUpdate(columnNames);
finally
currentResultSet = null;



@Override
public boolean execute(final String sql) throws SQLException
try
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.execute();
finally
refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement());
currentResultSet = null;



@Override
public boolean execute(final String sql, final int autoGeneratedKeys) throws SQLException
if (RETURN_GENERATED_KEYS == autoGeneratedKeys)
returnGeneratedKeys = true;

try
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.execute(autoGeneratedKeys);
finally
currentResultSet = null;



@Override
public boolean execute(final String sql, final int[] columnIndexes) throws SQLException
returnGeneratedKeys = true;
try
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.execute(columnIndexes);
finally
currentResultSet = null;



@Override
public boolean execute(final String sql, final String[] columnNames) throws SQLException
returnGeneratedKeys = true;
try
clearPrevious();
sqlRoute(sql);
initStatementExecutor();
return statementExecutor.execute(columnNames);
finally
currentResultSet = null;



@Override
public ResultSet getResultSet() throws SQLException
if (null != currentResultSet)
return currentResultSet;

if (1 == statementExecutor.getStatements().size() && routeResult.getSqlStatement() instanceof DQLStatement)
currentResultSet = statementExecutor.getStatements().iterator().next().getResultSet();
return currentResultSet;

List<ResultSet> resultSets = new ArrayList<>(statementExecutor.getStatements().size());
List<QueryResult> queryResults = new ArrayList<>(statementExecutor.getStatements().size());
for (Statement each : statementExecutor.getStatements())
ResultSet resultSet = each.getResultSet();
resultSets.add(resultSet);
queryResults.add(new StreamQueryResult(resultSet));

if (routeResult.getSqlStatement() instanceof SelectStatement || routeResult.getSqlStatement() instanceof DALStatement)
MergeEngine mergeEngine = MergeEngineFactory.newInstance(connection.getShardingContext().getDatabaseType(),
connection.getShardingContext().getShardingRule(), routeResult.getSqlStatement(), connection.getShardingContext().getMetaData().getTable(), queryResults);
currentResultSet = new ShardingResultSet(resultSets, mergeEngine.merge(), this);

return currentResultSet;


private void initStatementExecutor() throws SQLException
statementExecutor.init(routeResult);
replayMethodForStatements();


private void replayMethodForStatements()
for (Statement each : statementExecutor.getStatements())
replayMethodsInvocation(each);



private void sqlRoute(final String sql)
ShardingContext shardingContext = connection.getShardingContext();
routeResult = new StatementRoutingEngine(shardingContext.getShardingRule(),
shardingContext.getMetaData(), shardingContext.getDatabaseType(), shardingContext.getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)).route(sql);


private void clearPrevious() throws SQLException
statementExecutor.clear();


@SuppressWarnings("MagicConstant")
@Override
public int getResultSetType()
return statementExecutor.getResultSetType();


@SuppressWarnings("MagicConstant")
@Override
public int getResultSetConcurrency()
return statementExecutor.getResultSetConcurrency();


@Override
public int getResultSetHoldability()
return statementExecutor.getResultSetHoldability();


@Override
public Collection<Statement> getRoutedStatements()
return statementExecutor.getStatements();


@Override
public ResultSet getGeneratedKeys() throws SQLException
Optional<GeneratedKey> generatedKey = getGeneratedKey();
if (returnGeneratedKeys && generatedKey.isPresent())
return new GeneratedKeysResultSet(routeResult.getGeneratedKey().getGeneratedKeys().iterator(), generatedKey.get().getColumn().getName(), this);

if (1 == getRoutedStatements().size())
return getRoutedStatements().iterator().next().getGeneratedKeys();

return new GeneratedKeysResultSet();


private Optional<GeneratedKey> getGeneratedKey()
if (null != routeResult && routeResult.getSqlStatement() instanceof InsertStatement)
return Optional.fromNullable(routeResult.getGeneratedKey());

return Optional.absent();


```

同样我们只分析

```
@Override
public boolean execute(final String sql) throws SQLException
try
//清理缓存
clearPrevious();
//路由
sqlRoute(sql);
//初始化statementExecutor
initStatementExecutor();
//执行
return statementExecutor.execute();
finally
refreshTableMetaData(connection.getShardingContext(), routeResult.getSqlStatement());
currentResultSet = null;


```
clearPrevious 没必要多解释,清理上次执行的的数据


```
private void sqlRoute(final String sql)
ShardingContext shardingContext = connection.getShardingContext();
routeResult = new StatementRoutingEngine(shardingContext.getShardingRule(),
shardingContext.getMetaData(), shardingContext.getDatabaseType(), shardingContext.getShardingProperties().<Boolean>getValue(ShardingPropertiesConstant.SQL_SHOW)).route(sql);

```


```
public final class StatementRoutingEngine

private final ShardingRouter shardingRouter;

private final ShardingMasterSlaveRouter masterSlaveRouter;

public StatementRoutingEngine(final ShardingRule shardingRule, final ShardingMetaData shardingMetaData, final DatabaseType databaseType, final boolean showSQL)
shardingRouter = ShardingRouterFactory.newInstance(shardingRule, shardingMetaData, databaseType, showSQL);
masterSlaveRouter = new ShardingMasterSlaveRouter(shardingRule.getMasterSlaveRules());


/**
* SQL route.
*
* @param logicSQL logic SQL
* @return route result
*/
public SQLRouteResult route(final String logicSQL)
SQLStatement sqlStatement = shardingRouter.parse(logicSQL, false);
//为什么用shardingRouter还要使用masterSlaveRouter,因为shardingDatasource里面可能包括多个MasterSlaveDataSource
return masterSlaveRouter.route(shardingRouter.route(logicSQL, Collections.emptyList(), sqlStatement));

```

 

```
public final class ShardingRouterFactory

/**
* Create new instance of sharding router.
*
* @param shardingRule sharding rule
* @param shardingMetaData sharding meta data
* @param databaseType database type
* @param showSQL show SQL or not
* @return sharding router instance
*/
public static ShardingRouter newInstance(final ShardingRule shardingRule, final ShardingMetaData shardingMetaData, final DatabaseType databaseType, final boolean showSQL)
// 如果只是分库使用DatabaseHintSQLRouter 路由,否则使用ParsingSQLRouter
return HintManagerHolder.isDatabaseShardingOnly() ? new DatabaseHintSQLRouter(shardingRule, showSQL) : new ParsingSQLRouter(shardingRule, shardingMetaData, databaseType, showSQL);


```
如果只是DatabaseHintSQLRouter 则通过shardingRule.getDefaultDatabaseShardingStrategy() 路由。



ParsingSQLRouter 路由如下,可以自己去阅读
```
@Override
public SQLRouteResult route(final String logicSQL, final List<Object> parameters, final SQLStatement sqlStatement)
Optional<GeneratedKey> generatedKey = sqlStatement instanceof InsertStatement ? getGenerateKey(shardingRule, (InsertStatement) sqlStatement, parameters) : Optional.<GeneratedKey>absent();
SQLRouteResult result = new SQLRouteResult(sqlStatement, generatedKey.orNull());
ShardingConditions shardingConditions = OptimizeEngineFactory.newInstance(shardingRule, sqlStatement, parameters, generatedKey.orNull()).optimize();
if (generatedKey.isPresent())
setGeneratedKeys(result, generatedKey.get());

if (sqlStatement instanceof SelectStatement && !sqlStatement.getTables().isEmpty() && !((SelectStatement) sqlStatement).getSubQueryConditions().isEmpty())
mergeShardingValueForSubQuery(sqlStatement.getConditions(), shardingConditions);

RoutingResult routingResult = RoutingEngineFactory.newInstance(shardingRule, shardingMetaData.getDataSource(), sqlStatement, shardingConditions).route();
SQLRewriteEngine rewriteEngine = new SQLRewriteEngine(shardingRule, logicSQL, databaseType, sqlStatement, shardingConditions, parameters);
if (sqlStatement instanceof SelectStatement && null != ((SelectStatement) sqlStatement).getLimit())
processLimit(parameters, (SelectStatement) sqlStatement);

SQLBuilder sqlBuilder = rewriteEngine.rewrite(routingResult.isSingleRouting());
for (TableUnit each : routingResult.getTableUnits().getTableUnits())
result.getRouteUnits().add(new RouteUnit(each.getDataSourceName(), rewriteEngine.generateSQL(each, sqlBuilder, shardingMetaData.getDataSource())));

if (showSQL)
SQLLogger.logSQL(logicSQL, sqlStatement, result.getRouteUnits());

return result;

```



![image](https://upload-images.jianshu.io/upload_images/3397380-ddad6c16a57de9eb.png?imageMogr2/auto-orient/)

 

以上是关于shardingsphere分析的主要内容,如果未能解决你的问题,请参考以下文章

ShardingSphere5.0.0-Sharding-proxy MySql 读写分离

面试官: ShardingSphere 学一下吧

shardingsphere

shardingsphere

ShardingSphere入门实战-Sharding-Proxy使用

ShardingSphere系列之Sharding JDBC实现原理