Jedis 连接池源码分析

Posted ausky技术交流

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Jedis 连接池源码分析相关的知识,希望对你有一定的参考价值。

连接池概述

“连接”是一种稀缺资源,建立连接是一个费时的活动,而且系统还需要分配内存资源。连接的创建和销毁对于非频繁的操作来说可能没什么影响,但是对于高性能高并发则影响很大;所以我们需要使用连接池。

一般创建连接池,我们使用 commons-pool 包,因为其已经帮我们封装好了需要创建池所需要的方法和功能;

Jedis 连接池源码分析

2个核心类:

  • BasePoolableObjectFactory 用于创建池中资源对象

  • GenericObjectPool 池,主要存储资源,提供池中资源的获取和维护

先不考虑 commons-pool 的实现,如果我们来重复造轮子,怎么处理呢?

  1. 需要一个工厂类来对创建资源;

  2. 需要池类

    • 类成员属性 工厂实例

    • 数组保存资源 

    • 提供资源的获取方法 

    • 提供资源的检测 

接下来我们来看看Jedis中连接池的实现

sentinel 说明

在看连接池之前,我们先看看 sentinel; sentinel(哨兵)主要用于监测 redis的状态,实现动态主从切换; 在sentinel中,会维护 redis的主从信息

我们可以看到其中列出了该 sentinel维护的所有master信息

小结:sentinel 帮我们维护着redis的主从信息,我们可以通过sentinel获取当前的主和从的信息;

redis.clients.util.Pool

jedis中连接池的抽象父类;

//通过连接池的配置 和 资源工厂类 进行创建 连接池
public Pool(final GenericObjectPool.Config poolConfig,
            PoolableObjectFactory factory) {
    initPool(poolConfig, factory); //初始化
}
public void initPool(final GenericObjectPool.Config poolConfig, PoolableObjectFactory factory) {
    
    if (this.internalPool != null) {
        try {
            destroy();
        } catch (Exception e) {             
        }
    }
    
    //这是 commons-pool提供的池类
    this.internalPool = new GenericObjectPool(factory, poolConfig);
}
    
@SuppressWarnings("unchecked")
public T getResource() {
    try {
        //从池中获取资源
        return (T) internalPool.borrowObject();
    } catch (Exception e) {
        throw new JedisConnectionException(
                "Could not get a resource from the pool", e);
    }
}
    
public void returnResourceObject(final Object resource) {
    try {
        internalPool.returnObject(resource);
    } catch (Exception e) {
        throw new JedisException(
                "Could not return the resource to the pool", e);
    }
}

Pool 主要有3个子实现类: JedisPool,JedisSentinelPool和ShardedJedisPool,接下来我们一个个分析

JedisPool

JedisPool:是直接根据ip和port创建连接池

public JedisPool(final Config poolConfig, final String host, int port, int timeout, final String password,
                 final int database) {
super(poolConfig, new JedisFactory(host, port, timeout, password, database));
}

可以看到资源工厂类为JedisFactory 

public JedisFactory(final String host, final int port,
        final int timeout, final String password, final int database) {
    super();
    this.host = host;
    this.port = port;
    this.timeout = timeout;
    this.password = password;
    this.database = database;
}

public Object makeObject() throws Exception {
    final Jedis jedis = new Jedis(this.host, this.port, this.timeout);

    jedis.connect();
    if (null != this.password) {
        jedis.auth(this.password);
    }
    if( database != 0 ) {
        jedis.select(database);
    }
    
    return jedis;
}

可以看到 这个工厂方法比较简单,就是根据传入的ip和端口进行创建Jedis连接

JedisSentinelPool

JedisSentinelPool:是根据sentinel获取master的池(因为master会因为主从切换导致变化,所以里面应该有线程监听主的变化)

在看代码之前,我们先简单思考下需要做哪些事情呢?

  • 获取master信息

  • 根据master的ip和端口创建连接池

  • 监听master的变化,当 matser变化之后我们重新更新连接池

具体实现我们来看看;

构造方法:

public JedisSentinelPool(String masterName, Set<String> sentinels,
        final Config poolConfig, int timeout, final String password,
        final int database) {
    this.poolConfig = poolConfig;
    this.timeout = timeout;
    this.password = password;
    this.database = database;
    
    //获取 master信息和 监听 sentinel 去发现master的变化
    HostAndPort master = initSentinels(sentinels, masterName);
    //根据master信息调用父类的初始化连接池
    initPool(master);
}

核心就在 initSentinels 中

  • 获取当前的主

  • 对sentinel 添加监听事件

方法内部片段:

循环获取主,直到获取成功才跳出循环:

outer: while (running) {

        log.info("Trying to find master from available Sentinels...");

        for (String sentinel : sentinels) {

        final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel
            .split(":")));

        log.fine("Connecting to Sentinel " + hap);

        try {
            //获取与 sentinel的连接,因为 sentinel本身也是 redis
            Jedis jedis = new Jedis(hap.host, hap.port);

            if (master == null) {
            //从sentinel中根据master名称获取master的ip和端口
            master = toHostAndPort(jedis
                .sentinelGetMasterAddrByName(masterName));
            log.fine("Found Redis master at " + master);
            jedis.disconnect();
            break outer;
            }
        } catch (JedisConnectionException e) {
            log.warning("Cannot connect to sentinel running @ " + hap
                + ". Trying next one.");
        }
        }

        try {
        log.severe("All sentinels down, cannot determine where is "
            + masterName + " master is running... sleeping 1000ms.");
        Thread.sleep(1000);
        } catch (InterruptedException e) {
        e.printStackTrace();
        }
    }

添加监听

for (String sentinel : sentinels) {
        final HostAndPort hap = toHostAndPort(Arrays.asList(sentinel
            .split(":")));
        MasterListener masterListener = new MasterListener(masterName,
            hap.host, hap.port);
        masterListeners.add(masterListener);
        masterListener.start();
    }

再看看 sentinel 的监听 MasterListener: 

构造方法传入 matserName,sentinel的ip和port;

public MasterListener(String masterName, String host, int port) {
    this.masterName = masterName;
    this.host = host;
    this.port = port;
}

线程中 while循环订阅sentinel的switch-master事件,接收到事件之后,重新调用initPool 初始化连接池;

public void run() {

        running.set(true);

        while (running.get()) {
        
        //建立与sentinel的连接
        j = new Jedis(host, port);

        try {
            //订阅+switch-master事件
            j.subscribe(new JedisPubSubAdapter() {
            @Override
            public void onMessage(String channel, String message) {
                log.fine("Sentinel " + host + ":" + port
                    + " published: " + message + ".");

                String[] switchMasterMsg = message.split(" ");

                if (switchMasterMsg.length > 3) {

                if (masterName.equals(switchMasterMsg[0])) {
                    //根据切换后的 master 信息重新初始化连接池
                    initPool(toHostAndPort(Arrays.asList(
                        switchMasterMsg[3],
                        switchMasterMsg[4])));
                } else {
                    log.fine("Ignoring message on +switch-master for master name "
                        + switchMasterMsg[0]
                        + ", our master name is "
                        + masterName);
                }

                } else {
                log.severe("Invalid message received on Sentinel "
                    + host
                    + ":"
                    + port
                    + " on channel +switch-master: "
                    + message);
                }
            }
            }, "+switch-master");

        } catch (JedisConnectionException e) {

            if (running.get()) {
            log.severe("Lost connection to Sentinel at " + host
                + ":" + port
                + ". Sleeping 5000ms and retrying.");
            try {
                Thread.sleep(subscribeRetryWaitTimeMillis);
            } catch (InterruptedException e1) {
                e1.printStackTrace();
            }
            } else {
            log.fine("Unsubscribing from Sentinel at " + host + ":"
                + port);
            }
        }
        }
    }

ShardedJedisPool

ShardedJedisPool主要用于多机分片连接池(比如100万的key,希望接近均匀的分配到 5台机器中)

在这之前 我们先看看 JedisShardInfo jedis分片信息类:存储了分片机器的ip,端口,权重等信息

public class JedisShardInfo extends ShardInfo<Jedis> 

ShardInfo 最重要的一个方法就是 createResource ,实现如下

//根据 分片机器信息创建 redis的Jedis连接
public Jedis createResource() {
    return new Jedis(this);
}

接下来我们来看ShardedJedisPool 

先看看构造方法 :

//默认 使用 哈希算法进行分片
public ShardedJedisPool(final GenericObjectPool.Config poolConfig,
        List<JedisShardInfo> shards) {
    this(poolConfig, shards, Hashing.MURMUR_HASH);
}

public ShardedJedisPool(final GenericObjectPool.Config poolConfig,
        List<JedisShardInfo> shards, Hashing algo) {
    this(poolConfig, shards, algo, null);
}

public ShardedJedisPool(final GenericObjectPool.Config poolConfig,
        List<JedisShardInfo> shards, Pattern keyTagPattern) {
    this(poolConfig, shards, Hashing.MURMUR_HASH, keyTagPattern);
}

//具体实现在此
public ShardedJedisPool(final GenericObjectPool.Config poolConfig,
        List<JedisShardInfo> shards, Hashing algo, Pattern keyTagPattern) {
    //调用父类的构造方法 初始化连接池
    super(poolConfig, new ShardedJedisFactory(shards, algo, keyTagPattern));
}

通过构造方法我们可以看到 传入配置池信息,分片机器,负载均衡算法; 默认使用 哈希;

工厂类:ShardedJedisFactory

public ShardedJedisFactory(List<JedisShardInfo> shards, Hashing algo,
                Pattern keyTagPattern) {
    this.shards = shards;
    this.algo = algo;
    this.keyTagPattern = keyTagPattern;
}

public Object makeObject() throws Exception {
    ShardedJedis jedis = new ShardedJedis(shards, algo, keyTagPattern);
    return jedis;
}

提供的资源类为 ShardedJedis

public class ShardedJedis extends BinaryShardedJedis implements JedisCommands 
  1. 可以看到 ShardedJedis 实现了 JedisCommands 接口类(redis的命令操作),所以 ShardedJedis可以直接对redis进行操作

  2. BinaryShardedJedis 继承 Sharded<Jedis, JedisShardInfo>

Sharded:进行分片操作的父类

//这里的S在ShardedJedis里面就是 JedisShardInfo
public Sharded(List<S> shards, Hashing algo) {
    this.algo = algo;
    initialize(shards);
}

//分片初始化
private void initialize(List<S> shards) {
    nodes = new TreeMap<Long, S>();

    for (int i = 0; i != shards.size(); ++i) {
        final S shardInfo = shards.get(i);
        if (shardInfo.getName() == null)
            for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
                //这个为啥使用 TreeMap 以及 为啥 存储的是  hash 值?
                nodes.put(this.algo.hash("SHARD-" + i + "-NODE-" + n), shardInfo);
            }
        else
            for (int n = 0; n < 160 * shardInfo.getWeight(); n++) {
                nodes.put(this.algo.hash(shardInfo.getName() + "*" + shardInfo.getWeight() + n), shardInfo);
            }
        resources.put(shardInfo, shardInfo.createResource());
    }
}

在初始化中 我们做了 2件事情:

  • 在nodes中根据权重保存 160*权重次 

  • resources中保存每个分片信息创建的Jedis连接 

我们来看看应用 

//ShardedJedis 中 的 set 方法
public String set(String key, String value) {
    Jedis j = getShard(key);
    return j.set(key, value);
}

getShard 是 Sharded 类中的方法 

//获取 Jedis (R)
public R getShard(String key) {
    return resources.get(getShardInfo(key));
}

//根据key获取分片信息
//根据key的hash,从 nodes中挑选
public S getShardInfo(byte[] key) {
    //这里的 hash  和 TreeMap的tailMap也不是特别清楚 ?
    SortedMap<Long, S> tail = nodes.tailMap(algo.hash(key));
    if (tail.isEmpty()) {
        return nodes.get(nodes.firstKey());
    }
    return tail.get(tail.firstKey());
}
//根据key获取分片信息
public S getShardInfo(String key) {
    return getShardInfo(SafeEncoder.encode(getKeyTag(key)));
}

小结 :ShardedJedisPool池中 存储的是 ShardedJedis对象,在ShardedJedis中存放了所有分片的Jedis连接

问题

对于分片的 TreeMap 和 put时候使用 hash 不是特别理解,后面如果搞清楚了再回来补起


以上是关于Jedis 连接池源码分析的主要内容,如果未能解决你的问题,请参考以下文章

jedis连接redis

jedis连接池爆满导致的服务不可用

Jedis连接池:JedisPool及连接池工具类搭建

jedis的连接池

Redis java客户端 jedis 源码分析系列二:单实例 jedis

jedis的ShardedJedisPool链接池的扩容问题