Jedis 连接池源码分析
Posted ausky技术交流
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Jedis 连接池源码分析相关的知识,希望对你有一定的参考价值。
连接池概述
“连接”是一种稀缺资源,建立连接是一个费时的活动,而且系统还需要分配内存资源。连接的创建和销毁对于非频繁的操作来说可能没什么影响,但是对于高性能高并发则影响很大;所以我们需要使用连接池。
一般创建连接池,我们使用 commons-pool 包,因为其已经帮我们封装好了需要创建池所需要的方法和功能;
2个核心类:
BasePoolableObjectFactory 用于创建池中资源对象
GenericObjectPool 池,主要存储资源,提供池中资源的获取和维护
先不考虑 commons-pool 的实现,如果我们来重复造轮子,怎么处理呢?
需要一个工厂类来对创建资源;
需要池类
类成员属性 工厂实例
数组保存资源
提供资源的获取方法
提供资源的检测
接下来我们来看看Jedis中连接池的实现
sentinel 说明
在看连接池之前,我们先看看 sentinel; sentinel(哨兵)主要用于监测 redis的状态,实现动态主从切换; 在sentinel中,会维护 redis的主从信息
我们可以看到其中列出了该 sentinel维护的所有master信息
小结:sentinel 帮我们维护着redis的主从信息,我们可以通过sentinel获取当前的主和从的信息;
redis.clients.util.Pool
jedis中连接池的抽象父类;
//通过连接池的配置 和 资源工厂类 进行创建 连接池
public Pool(final GenericObjectPool.Config poolConfig,
PoolableObjectFactory factory) {
initPool(poolConfig, factory); //初始化
}
public void initPool(final GenericObjectPool.Config poolConfig, PoolableObjectFactory factory) {
if (this.internalPool != null) {
try {
destroy();
} catch (Exception e) {
}
}
//这是 commons-pool提供的池类
this.internalPool = new GenericObjectPool(factory, poolConfig);
}
@SuppressWarnings("unchecked")
public T getResource() {
try {
//从池中获取资源
return (T) internalPool.borrowObject();
} catch (Exception e) {
throw new JedisConnectionException(
"Could not get a resource from the pool", e);
}
}
public void returnResourceObject(final Object resource) {
try {
internalPool.returnObject(resource);
} catch (Exception e) {
throw new JedisException(
"Could not return the resource to the pool", e);
}
}
Pool 主要有3个子实现类: JedisPool,JedisSentinelPool和ShardedJedisPool,接下来我们一个个分析
JedisPool
JedisPool:是直接根据ip和port创建连接池
public JedisPool(final Config poolConfig, final String host, int port, int timeout, final String password,
final int database) {
super(poolConfig, new JedisFactory(host, port, timeout, password, database));
}
可以看到资源工厂类为JedisFactory
public JedisFactory(final String host, final int port,
final int timeout, final String password, final int database) {
super();
this.host = host;
this.port = port;
this.timeout = timeout;
this.password = password;
this.database = database;
}
public Object makeObject() throws Exception {
final Jedis jedis = new Jedis(this.host, this.port, this.timeout);
jedis.connect();
if (null != this.password) {
jedis.auth(this.password);
}
if( database != 0 ) {
jedis.select(database);
}
return jedis;
}
可以看到 这个工厂方法比较简单,就是根据传入的ip和端口进行创建Jedis连接
JedisSentinelPool
JedisSentinelPool:是根据sentinel获取master的池(因为master会因为主从切换导致变化,所以里面应该有线程监听主的变化)
在看代码之前,我们先简单思考下需要做哪些事情呢?
获取master信息
根据master的ip和端口创建连接池
监听master的变化,当 matser变化之后我们重新更新连接池
具体实现我们来看看;
构造方法:
public JedisSentinelPool(String masterName, Set<String> sentinels,
final Config poolConfig, int timeout, final String password,
final int database) {
this.poolConfig = poolConfig;
this.timeout = timeout;
this.password = password;
this.database = database;
//获取 master信息和 监听 sentinel 去发现master的变化
HostAndPort master = initSentinels(sentinels, masterName);
//根据master信息调用父类的初始化连接池
initPool(master);
}
核心就在 initSentinels 中
获取当前的主
对sentinel 添加监听事件
方法内部片段:
循环获取主,直到获取成功才跳出循环:
outer: while (running) {
log.info("Trying to find master from available Sentinels...");
for (String sentinel : sentinels) {
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel
.split(":")));
log.fine("Connecting to Sentinel " + hap);
try {
//获取与 sentinel的连接,因为 sentinel本身也是 redis
Jedis jedis = new Jedis(hap.host, hap.port);
if (master == null) {
//从sentinel中根据master名称获取master的ip和端口
master = toHostAndPort(jedis
.sentinelGetMasterAddrByName(masterName));
log.fine("Found Redis master at " + master);
jedis.disconnect();
break outer;
}
} catch (JedisConnectionException e) {
log.warning("Cannot connect to sentinel running @ " + hap
+ ". Trying next one.");
}
}
try {
log.severe("All sentinels down, cannot determine where is "
+ masterName + " master is running... sleeping 1000ms.");
Thread.sleep(1000);
} catch (InterruptedException e) {
e.printStackTrace();
}
}
添加监听
for (String sentinel : sentinels) {
final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel
.split(":")));
MasterListener masterListener = new MasterListener(masterName,
hap.host, hap.port);
masterListeners.add(masterListener);
masterListener.start();
}
再看看 sentinel 的监听 MasterListener:
构造方法传入 matserName,sentinel的ip和port;
public MasterListener(String masterName, String host, int port) {
this.masterName = masterName;
this.host = host;
this.port = port;
}
线程中 while循环订阅sentinel的switch-master事件,接收到事件之后,重新调用initPool 初始化连接池;
public void run() {
running.set(true);
while (running.get()) {
//建立与sentinel的连接
j = new Jedis(host, port);
try {
//订阅+switch-master事件
j.subscribe(new JedisPubSubAdapter() {
@Override
public void onMessage(String channel, String message) {
log.fine("Sentinel " + host + ":" + port
+ " published: " + message + ".");
String[] switchMasterMsg = message.split(" ");
if (switchMasterMsg.length > 3) {
if (masterName.equals(switchMasterMsg[0])) {
//根据切换后的 master 信息重新初始化连接池
initPool(toHostAndPort(Arrays.asList(
switchMasterMsg[3],
switchMasterMsg[4])));
} else {
log.fine("Ignoring message on +switch-master for master name "
+ switchMasterMsg[0]
+ ", our master name is "
+ masterName);
}
} else {
log.severe("Invalid message received on Sentinel "
+ host
+ ":"
+ port
+ " on channel +switch-master: "
+ message);
}
}
}, "+switch-master");
} catch (JedisConnectionException e) {
if (running.get()) {
log.severe("Lost connection to Sentinel at " + host
+ ":" + port
+ ". Sleeping 5000ms and retrying.");
try {
Thread.sleep(subscribeRetryWaitTimeMillis);
} catch (InterruptedException e1) {
e1.printStackTrace();
}
} else {
log.fine("Unsubscribing from Sentinel at " + host + ":"
+ port);
}
}
}
}
ShardedJedisPool
ShardedJedisPool主要用于多机分片连接池(比如100万的key,希望接近均匀的分配到 5台机器中)
在这之前 我们先看看 JedisShardInfo jedis分片信息类:存储了分片机器的ip,端口,权重等信息
public class JedisShardInfo extends ShardInfo<Jedis>
ShardInfo 最重要的一个方法就是 createResource ,实现如下
//根据 分片机器信息创建 redis的Jedis连接
public Jedis createResource() {
return new Jedis(this);
}
接下来我们来看ShardedJedisPool
先看看构造方法 :
//默认 使用 哈希算法进行分片
public ShardedJedisPool(final GenericObjectPool.Config poolConfig,
List<JedisShardInfo> shards) {
this(poolConfig, shards, Hashing.MURMUR_HASH);
}
public ShardedJedisPool(final GenericObjectPool.Config poolConfig,
List<JedisShardInfo> shards, Hashing algo) {
this(poolConfig, shards, algo, null);
}
public ShardedJedisPool(final GenericObjectPool.Config poolConfig,
List<JedisShardInfo> shards, Pattern keyTagPattern) {
this(poolConfig, shards, Hashing.MURMUR_HASH, keyTagPattern);
}
//具体实现在此
public ShardedJedisPool(final GenericObjectPool.Config poolConfig,
List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
//调用父类的构造方法 初始化连接池
super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern));
}
通过构造方法我们可以看到 传入配置池信息,分片机器,负载均衡算法; 默认使用 哈希;
工厂类:ShardedJedisFactory
public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo,
Pattern keyTagPattern) {
this.shards = shards;
this.algo = algo;
this.keyTagPattern = keyTagPattern;
}
public Object makeObject() throws Exception {
ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
return jedis;
}
提供的资源类为 ShardedJedis
public class ShardedJedis extends BinaryShardedJedis implements JedisCommands
可以看到 ShardedJedis 实现了 JedisCommands 接口类(redis的命令操作),所以 ShardedJedis可以直接对redis进行操作
BinaryShardedJedis 继承 Sharded<Jedis, JedisShardInfo>
Sharded:进行分片操作的父类
//这里的S在ShardedJedis里面就是 JedisShardInfo
public Sharded(List<S> shards, Hashing algo) {
this.algo = algo;
initialize(shards);
}
//分片初始化
private void initialize(List<S> shards) {
nodes = new TreeMap<Long, S>();
for (int i = 0; i != shards.size(); ++i) {
final S shardInfo = shards.get(i);
if (shardInfo.getName() == null)
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
//这个为啥使用 TreeMap 以及 为啥 存储的是 hash 值?
nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
}
else
for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
nodes.put(this.algo.hash(shardInfo.getName() + "*" + shardInfo.getWeight() + n), shardInfo);
}
resources.put(shardInfo, shardInfo.createResource());
}
}
在初始化中 我们做了 2件事情:
在nodes中根据权重保存 160*权重次
resources中保存每个分片信息创建的Jedis连接
我们来看看应用
//ShardedJedis 中 的 set 方法
public String set(String key, String value) {
Jedis j = getShard(key);
return j.set(key, value);
}
getShard 是 Sharded 类中的方法
//获取 Jedis (R)
public R getShard(String key) {
return resources.get(getShardInfo(key));
}
//根据key获取分片信息
//根据key的hash,从 nodes中挑选
public S getShardInfo(byte[] key) {
//这里的 hash 和 TreeMap的tailMap也不是特别清楚 ?
SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
if (tail.isEmpty()) {
return nodes.get(nodes.firstKey());
}
return tail.get(tail.firstKey());
}
//根据key获取分片信息
public S getShardInfo(String key) {
return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
}
小结 :ShardedJedisPool池中 存储的是 ShardedJedis对象,在ShardedJedis中存放了所有分片的Jedis连接
问题
对于分片的 TreeMap 和 put时候使用 hash 不是特别理解,后面如果搞清楚了再回来补起
以上是关于Jedis 连接池源码分析的主要内容,如果未能解决你的问题,请参考以下文章