flea-frame-cache使用之Redis分片模式接入

Posted Huazie

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了flea-frame-cache使用之Redis分片模式接入相关的知识,希望对你有一定的参考价值。

Redis分片模式接入

1. 参考

flea-frame-cache使用之Redis分片模式接入 源代码v1.1.0

2. 依赖

jedis-3.0.1.jar

<!-- Java redis -->
<dependency>
     <groupId>redis.clients</groupId>
     <artifactId>jedis</artifactId>
     <version>3.0.1</version>
</dependency>

spring-context-4.3.18.RELEASE.jar

<!-- Spring相关 -->
<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context</artifactId>
    <version>4.3.18.RELEASE</version>
</dependency>

spring-context-support-4.3.18.RELEASE.jar

<dependency>
    <groupId>org.springframework</groupId>
    <artifactId>spring-context-support</artifactId>
    <version>4.3.18.RELEASE</version>
</dependency>

3. 基础接入

3.1 定义Flea缓存接口 — IFleaCache

可参考笔者的这篇博文 Memcached接入,不再赘述。

3.2 定义抽象Flea缓存类 — AbstractFleaCache

可参考笔者的这篇博文 Memcached接入,不再赘述。

3.3 定义Redis客户端接口类 — RedisClient

注意该版,相比《flea-frame-cache使用之Redis接入》博文中,废弃如下与 ShardedJedis 有关的方法:

	ShardedJedisPool getJedisPool();
	
    void setShardedJedis(ShardedJedis shardedJedis);
    
    ShardedJedis getShardedJedis();

flea-frame-cache使用之Redis接入》博文中 提到了使用 Redis客户端代理方式 访问 RedisClient, 在这版为了实现Redis访问异常后的重试机制,废弃了代理模式,采用了命令行模式,可参考下面的 RedisClientCommand

3.4 定义Redis客户端命令行 — RedisClientCommand

/**
 * Redis客户端命令行,封装了使用ShardedJedis操作Redis缓存的公共逻辑,
 * 如果出现异常可以重试@code maxAttempts 次。
 *
 * <p> 抽象方法 @code execute,由子类或匿名类实现。在实际调用前,
 * 需要从分布式Jedis连接池中获取分布式Jedis对象;调用结束后,
 * 关闭分布式Jedis对象,归还给分布式Jedis连接池。
 *
 * @author huazie
 * @version 1.1.0
 * @since 1.1.0
 */
public abstract class RedisClientCommand<T> 

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(RedisClientCommand.class);

    private final ShardedJedisPool shardedJedisPool; // 分布式Jedis连接池

    private final int maxAttempts; // Redis客户端操作最大尝试次数【包含第一次操作】

    public RedisClientCommand(ShardedJedisPool shardedJedisPool, int maxAttempts) 
        this.shardedJedisPool = shardedJedisPool;
        this.maxAttempts = maxAttempts;
    

    public abstract T execute(ShardedJedis connection);

    /**
     * 执行分布式Jedis操作
     *
     * @return 分布式Jedis对象操作的结果
     * @since 1.0.0
     */
    public T run() 
        return runWithRetries(this.maxAttempts);
    

    /**
     * 执行分布式Jedis操作,如果出现异常,包含第一次操作,可最多尝试maxAttempts次。
     *
     * @param attempts 重试次数
     * @return 分布式Jedis对象操作的结果
     * @since 1.0.0
     */
    private T runWithRetries(int attempts) 
        if (attempts <= 0) 
            throw new FleaCacheMaxAttemptsException("No more attempts left.");
        
        ShardedJedis connection = null;
        try 
            connection = shardedJedisPool.getResource();
            Object obj = null;
            if (LOGGER.isDebugEnabled()) 
                obj = new Object() ;
                LOGGER.debug1(obj, "Get ShardedJedis = ", connection);
            
            T result = execute(connection);
            if (LOGGER.isDebugEnabled()) 
                LOGGER.debug1(obj, "Result = ", result);
            
            return result;
         catch (JedisConnectionException e) 
            // 在开始下一次尝试前,释放当前分布式Jedis的连接,将分布式Jedis对象归还给分布式Jedis连接池
            releaseConnection(connection);
            connection = null; // 这里置空是为了最后finally不重复操作
            if (LOGGER.isErrorEnabled()) 
                Object obj = new Object() ;
                LOGGER.error1(obj, "Redis连接异常:", e);
                int currAttempts = this.maxAttempts - attempts + 1;
                LOGGER.error1(obj, "第  次尝试失败,开始第  次尝试...", currAttempts, currAttempts + 1);
            
            return runWithRetries(attempts - 1);
         finally 
            releaseConnection(connection);
        
    

    /**
     * 释放指定分布式Jedis的连接,将分布式Jedis对象归还给分布式Jedis连接池
     *
     * @param connection 分布式Jedis实例
     * @since 1.0.0
     */
    private void releaseConnection(ShardedJedis connection) 
        if (ObjectUtils.isNotEmpty(connection)) 
            if (LOGGER.isDebugEnabled()) 
                LOGGER.debug1(new Object() , "Close ShardedJedis");
            
            connection.close();
        
    


3.5 定义分片模式Redis客户端实现类 — FleaRedisShardedClient

分片模式 Redis 客户端 主要使用 ShardedJedis 来操作 Redis 数据。

/**
 * Flea分片模式Redis客户端实现类,封装了Flea框架操作Redis缓存的基本操作。
 *
 * <p> 它内部具体操作Redis缓存的功能,由分布式Jedis对象完成,
 * 包含读、写、删除Redis缓存的基本操作方法。
 * 
 * 详见笔者 https://github.com/Huazie/flea-frame,欢迎 Star
 *
 * @author huazie
 * @version 1.1.0
 * @since 1.0.0
 */
public class FleaRedisShardedClient extends FleaRedisClient 

    private ShardedJedisPool shardedJedisPool; // 分布式Jedis连接池

    private int maxAttempts; // Redis客户端操作最大尝试次数【包含第一次操作】

    /**
     * <p> Redis客户端构造方法 (默认连接池名) </p>
     *
     * @since 1.0.0
     */
    private FleaRedisShardedClient() 
        this(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME);
    

    /**
     * <p> Redis客户端构造方法(指定连接池名) </p>
     *
     * @param poolName 连接池名
     * @since 1.0.0
     */
    private FleaRedisShardedClient(String poolName) 
        super(poolName);
        init();
    

    /**
     * <p> 初始化分布式Jedis连接池 </p>
     *
     * @since 1.0.0
     */
    private void init() 
        if (CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME.equals(getPoolName())) 
            shardedJedisPool = RedisShardedPool.getInstance().getJedisPool();
            maxAttempts = RedisShardedConfig.getConfig().getMaxAttempts();
         else 
            shardedJedisPool = RedisShardedPool.getInstance(getPoolName()).getJedisPool();
            maxAttempts = CacheConfigUtils.getMaxAttempts();
        
    

    @Override
    public String set(final String key, final Object value) 
        return new RedisClientCommand<String>(this.shardedJedisPool, this.maxAttempts) 
            @Override
            public String execute(ShardedJedis connection) 
                if (value instanceof String)
                    return connection.set(key, (String) value);
                else
                    return connection.set(SafeEncoder.encode(key), ObjectUtils.serialize(value));
            
        .run();
    

    @Override
    public String set(final byte[] key, final byte[] value) 
        // 省略。。。。。。
    

    @Override
    public String set(final String key, final Object value, final int expiry) 
        // 省略。。。。。。
    

    @Override
    public String set(final byte[] key, final byte[] value, final int expiry) 
        // 省略。。。。。。
    

    @Override
    public String set(final String key, final Object value, final long expiry) 
        // 省略。。。。。。
    

    @Override
    public String set(final byte[] key, final byte[] value, final long expiry) 
        // 省略。。。。。。
    

    @Override
    public String set(final String key, final Object value, final SetParams params) 
        // 省略。。。。。。
    

    @Override
    public String set(final byte[] key, final byte[] value, final SetParams params) 
        // 省略。。。。。。
    

    @Override
    public byte[] get(final byte[] key) 
        // 省略。。。。。。
    

    @Override
    public Long del(final String key) 
        // 省略。。。。。。
    

    /**
     * <p> 获取客户端类 </p>
     *
     * @param key 数据键
     * @return 客户端类
     * @since 1.0.0
     */
    @Override
    protected Client getClientByKey(final Object key) 
        // 省略。。。。。。
    

    /**
     * <p> 内部建造者类 </p>
     */
    public static class Builder 
		// 省略。。。。。。
    


该类的构造函数初始化逻辑,可以看出我们使用了 RedisShardedPool, 下面来介绍一下。

3.6 定义Redis分片连接池 — RedisShardedPool

上个版本我们使用 RedisPool 初始化Redis相关配置信息,为了体现Redis分片模式,这个版本里面,我们使用 RedisShardedPool 用于Redis相关配置信息的初始化,其中重点是获取分布式Jedis连接池 ShardedJedisPool ,该类其中一个构造方法如下:

/**
 * @param poolConfig 连接池配置信息
 * @param shards Jedis分布式服务器列表
 * @param algo 分布式算法
 */
public ShardedJedisPool(final GenericObjectPoolConfig poolConfig, List<JedisShardInfo> shards,
      Hashing algo) 
/**
 * Redis分片连接池,用于初始化分布式 Jedis 连接池。
 *
 * <p> 针对单独缓存接入场景,采用默认连接池初始化的方式;<br/>
 * 可参考如下:
 * <pre>
 *   // 初始化默认连接池
 *   RedisShardedPool.getInstance().initialize(); </pre>
 *
 * <p> 针对整合缓存接入场景,采用指定连接池初始化的方式;<br/>
 * 可参考如下:
 * <pre>
 *   // 初始化指定连接池
 *   RedisShardedPool.getInstance(group).initialize(cacheServerList); </pre>
 *
 * @author huazie
 * @version 1.1.0
 * @since 1.0.0
 */
public class RedisShardedPool 

    private static final ConcurrentMap<String, RedisShardedPool> redisPools = new ConcurrentHashMap<>();

    private String poolName; // 连接池名

    private ShardedJedisPool shardedJedisPool; // 分布式Jedis连接池

    private RedisShardedPool(String poolName) 
        this.poolName = poolName;
    

    /**
     * <p> 获取Redis连接池实例 (默认连接池) </p>
     *
     * @return Redis连接池实例对象
     * @since 1.0.0
     */
    public static RedisShardedPool getInstance() 
        return getInstance(CommonConstants.FleaPoolConstants.DEFAULT_POOL_NAME);
    

    /**
     * <p> 获取Redis连接池实例 (指定连接池名) </p>
     *
     * @param poolName 连接池名
     * @return Redis连接池实例对象
     * @since 1.0.0
     */
    public static RedisShardedPool getInstance(String poolName) 
        if (!redisPools.containsKey(poolName)) 
            synchronized (redisPools) 
                if (!redisPools.containsKey(poolName)) 
                    RedisShardedPool redisShardedPool = new RedisShardedPool(poolName);
                    redisPools.putIfAbsent(poolName, redisShardedPool);
                
            
        
        return redisPools.get(poolName);
    

    /**
     * <p> 默认初始化 </p>
     *
     * @since 1.0.0
     */
    public void initialize() 
        // 省略。。。。。。
    

    /**
     * <p> 初始化 (非默认连接池) </p>
     *
     * @param cacheServerList 缓存服务器集
     * @since 1.0.0
     */
    public void initialize(List<CacheServer> cacheServerList) 
        // 省略。。。。。。
    

    /**
     * <p> 获取当前连接池名 </p>
     *
     * @return 连接池名
     * @since 1.0.0
     */
    public String getPoolName() 
        return poolName;
    

    /**
     * <p> 分布式Jedis连接池 </p>
     *
     * @return 分布式Jedis连接池
     * @since 1.0.0
     */
    public ShardedJedisPool getJedisPool() 
        if (ObjectUtils.isEmpty(shardedJedisPool)) 
            throw new FleaCacheConfigException("获取分布式Jedis连接池失败:请先调用initialize初始化");
        
        return shardedJedisPool;
    

3.7 Redis配置文件

flea-frame-cache读取 redis.properties(Redis配置文件),用作初始化 RedisShardedPool

# Redis配置
# Redis缓存所属系统名
redis.systemName=FleaFrame

# Redis服务器地址
redis.server=127.0.0.1:10001,127.0.0.1:10002,127.0.0.1:10003

# Redis服务登录密码
redis.password=huazie123,huazie123,huazie123

# Redis服务器权重分配
redis.weight=1,1,1

# Redis客户端socket连接超时时间(单位:ms)
redis.connectionTimeout=2000

# Redis客户端socket读写超时时间(单位:ms)
redis.soTimeout=2000

# Redis分布式hash算法
# 1 : MURMUR_HASH
# 2 : MD5
redis.hashingAlg=1

# Redis客户端连接池配置
# Jedis连接池最大连接数
redis.pool.maxTotal=100

# Jedis连接池最大空闲连接数
redis.pool.maxIdle=10

# Jedis连接池最小空闲连接数
redis.pool.minIdle=0

# Jedis连接池获取连接时的最大等待时间(单位:ms)
redis.pool.maxWaitMillis=2000

# Redis客户端操作最大尝试次数【包含第一次操作】
redis.maxAttempts=5

# 空缓存数据有效期(单位:s)
redis.nullCacheExpiry=10

3.8 定义Redis Flea缓存类 — RedisFleaCache

该类继承抽象Flea缓存类 AbstractFleaCache ,其构造方法可见如需要传入Redis客户端 RedisClient ,相关使用下面介绍:

/**
 * Redis Flea缓存类,实现了以Flea框架操作Redis缓存的基本操作方法。
 *
 * <p> 在上述基本操作方法中,实际使用Redis客户端【@code redisClient】
 * 读、写和删除Redis缓存。其中写缓存方法【@code putNativeValue】在
 * 添加的数据值为【@code null】时,默认添加空缓存数据【@code NullCache】
 * 到Redis中,有效期取初始化参数【@code nullCacheExpiry】。
 *
 * <p> 单个缓存接入场景,有效期配置可查看【redis.properties】中的配置参数
 * 【redis.nullCacheExpiry】
 *
 * <p> 整合缓存接入场景,有效期配置可查看【flea-cache-config.xml】中的缓存参数
 * 【@code <cache-param key="fleacore.nullCacheExpiry"
 * desc="空缓存数据有效期(单位:s)">300</cache-param>】
 *
 * @author huazie
 * @version 1.1.0
 * @since 1.0.0
 */
public class RedisFleaCache extends AbstractFleaCache 

    private static final FleaLogger LOGGER = FleaLoggerProxy.getProxyInstance(RedisFleaCache.class);

    private RedisClient redisClient; // Redis客户端

    private CacheModeEnum cacheMode; // 缓存模式【分片模式和集群模式】

    /**
     * <p> 带参数的构造方法,初始化Redis Flea缓存类 </p>
     *
     * @param name            缓存数据主关键字
     * @param expiry          缓存数据有效期(单位:s)
     * @param nullCacheExpiry 空缓存数据有效期(单位:s)
     * @param cacheMode       缓存模式【分分片模式和集群模式】
     * @param redisClient     Redis客户端
     * @since 1.0.0
     */
    public RedisFleaCache(String name, int expiry, int nullCacheExpiry, CacheModeEnum cacheMode, RedisClient redisClient) 
        super(name, expiry, nullCacheExpiry);
        this.cacheMode = cacheMode;
        this.redisClient = redisClient;
        if (CacheUtils.isClusterMode(cacheMode))
            cache = CacheEnum.RedisCluster; // 缓存实现之Redis集群模式
        else
            cache = CacheEnum.RedisSharded; // 缓存实现之Redis分片模式
    

    @Override
    public Object getNativeValue(String key) 
        if (LOGGER.isDebugEnabled()) 
            LOGGER.debug1(new Object() , "KEY = ", key);
        
        return redisClient.get(key);
    

    @Override
    public Object putNativeValue(String key, Object value, int expiry) 
        if (LOGGER.isDebugEnabled()) 
            Object obj = new Object() ;
            LOGGER.debug1(obj, "REDIS FLEA CACHE, KEY = ", key);
            LOGGER.Redis之Zset

第一部分之简单字符串SDS(第二章)

Redis基本使用 之——发布/订阅

Redis之Pipeline使用注意事项

Redis实战之好友关注功能

Redis实战之好友关注功能