Lettuce之RedisClusterClient使用以及源码分析
Posted wei_zw
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Lettuce之RedisClusterClient使用以及源码分析相关的知识,希望对你有一定的参考价值。
Redis 集群的数据分片
redis集群并没有使用一致性hash算法而引入了哈希槽概念,Redis 集群有16384个哈希槽,每个key通过CRC16校验后对16384取模来决定放置哪个槽.集群的每个节点负责一部分hash槽.也就是说如果key是不变的对应的slot也是不变的
可以通过cluster info 命名查看
cluster info cluster_state:ok cluster_slots_assigned:16384 cluster_slots_ok:16384 cluster_slots_pfail:0 cluster_slots_fail:0 cluster_known_nodes:12
通过cluster nodes命令查看当前节点以及该节点分配的slot,如下图可以发现当前redis集群有12个节点,每个节点大约管理1365个slot
xx.xxx.xxx.xx:6959> cluster nodes 45abb8663c0cdb25ed17c29521bf6fda98e913ea xx.xxx.xxx.xx:6961 master - 0 1529229636724 11 connected 13653-15018 e40080f32a3fb89e34b7622038ce490682428fdf xx.xxx.xxx.xx:6960 master - 0 1529229633723 10 connected 12288-13652 a749bba5614680dea9f47e3c8fe595aa8be71a2c xx.xxx.xxx.xx:6954 master - 0 1529229639230 4 connected 4096-5460 1096e2a8737401b66c7d4ee0addcb10d7ff14088 xx.xxx.xxx.xx:6952 master - 0 1529229636224 2 connected 1365-2730 fbc76f3481271241c1a89fabeb5139905e1ec2a6 xx.xxx.xxx.xx:6962 master - 0 1529229638230 12 connected 15019-16383 85601fa67820a5af0de0cc21d102d72575709ec6 xx.xxx.xxx.xx:6959 myself,master - 0 0 9 connected 10923-12287 c00d86999c98f97d697f3a2b33ba26fbf50e46eb xx.xxx.xxx.xx:6955 master - 0 1529229634724 5 connected 5461-6826 0b09a5c4c9e9158520389dd2672bd711d55085c6 xx.xxx.xxx.xx:6953 master - 0 1529229637227 3 connected 2731-4095 9f26d208fa8772449d5c322eb63786a1cf9937e0 xx.xxx.xxx.xx:6958 master - 0 1529229635224 8 connected 9557-10922 274294a88758fcb674e1a0292db0e36a66a0bf48 xx.xxx.xxx.xx:6951 master - 0 1529229634223 1 connected 0-1364 369780bdf56d483a0f0a92cb2baab786844051f3 xx.xxx.xxx.xx:6957 master - 0 1529229640232 7 connected 8192-9556 71ed0215356c664cc56d4579684e86a83dba3a92 xx.xxx.xxx.xx:6956 master - 0 1529229635724 6 connected 6827-8191
请求重定向
由于每个节点只负责部分slot,以及slot可能从一个节点迁移到另一节点,造成客户端有可能会向错误的节点发起请求。因此需要有一种机制来对其进行发现和修正,这就是请求重定向。有两种不同的重定向场景:
- MOVED
声明的是slot所有权的转移,收到的客户端需要更新其key-node映射关系
- ASK
申明的是一种临时的状态,所有权还并没有转移,客户端并不更新其映射关系。前面的加的ASKING命令也是申明其理解当前的这种临时状态
通过集群查询数据key为test的值
xx.xxx.xxx.xx:6959> get test (error) MOVED 6918 xx.xxx.xx.xxx:6956
此时返回的结果表示该key在6956这个实例上,通过这个实例可以获取到缓存值
xx.xxx.xx.xxx:6956> get test "cluster"
通过上文的示例可以发现获取缓存值的过程需要访问cluster两次,既然key到slot值的算法是已知的,如果可以通过key直接计算slot,在通过每个节点的管理的slot范围就可以知道这个key对应哪个节点了,这样不就可以一次获取到了吗?其实lettuce中就是这样处理的.
Lettuce使用
@Bean(name="clusterRedisURI") RedisURI clusterRedisURI(){ return RedisURI.builder().withHost("xx.xx.xxx.xx").withPort(6954).build(); } @Bean ClusterClientOptions clusterClientOptions(){ return ClusterClientOptions.builder().autoReconnect(true).maxRedirects(1).build(); } @Bean RedisClusterClient redisClusterClient(ClientResources clientResources, ClusterClientOptions clusterClientOptions, RedisURI clusterRedisURI){ RedisClusterClient redisClusterClient= RedisClusterClient.create(clientResources,clusterRedisURI); redisClusterClient.setOptions(clusterClientOptions); return redisClusterClient; } /** * 集群模式 */ @Bean(destroyMethod = "close") StatefulRedisClusterConnection<String,String> statefulRedisClusterConnection(RedisClusterClient redisClusterClient){ return redisClusterClient.connect(); }
Lettuce相关源码
在创建连接时就会主动发现集群图谱信息
<K, V> StatefulRedisClusterConnectionImpl<K, V> connectClusterImpl(RedisCodec<K, V> codec) { //如果分区信息为null则初始化分区信息 if (partitions == null) { initializePartitions(); } //如果需要就激活拓扑刷新 activateTopologyRefreshIfNeeded();
protected void initializePartitions() { this.partitions = loadPartitions(); }
protected Partitions loadPartitions() { //获取拓扑刷新信息, Iterable<RedisURI> topologyRefreshSource = getTopologyRefreshSource(); String message = "Cannot retrieve initial cluster partitions from initial URIs " + topologyRefreshSource; try { //加载拓扑信息 Map<RedisURI, Partitions> partitions = refresh.loadViews(topologyRefreshSource, useDynamicRefreshSources());
public Map<RedisURI, Partitions> loadViews(Iterable<RedisURI> seed, boolean discovery) { //获取超时时间,默认60秒 long commandTimeoutNs = getCommandTimeoutNs(seed); Connections connections = null; try { //获取所有种子连接 connections = getConnections(seed).get(commandTimeoutNs, TimeUnit.NANOSECONDS); Requests requestedTopology = connections.requestTopology(); Requests requestedClients = connections.requestClients(); //获取节点拓扑视图 NodeTopologyViews nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs); if (discovery) {//是否查找额外节点 //获取集群节点 Set<RedisURI> allKnownUris = nodeSpecificViews.getClusterNodes(); //排除种子节点,得到需要发现节点 Set<RedisURI> discoveredNodes = difference(allKnownUris, toSet(seed)); //如果需要发现节点不为空 if (!discoveredNodes.isEmpty()) { //需要发现节点连接 Connections discoveredConnections = getConnections(discoveredNodes).optionalGet(commandTimeoutNs, TimeUnit.NANOSECONDS); //合并连接 connections = connections.mergeWith(discoveredConnections); //合并请求 requestedTopology = requestedTopology.mergeWith(discoveredConnections.requestTopology()); requestedClients = requestedClients.mergeWith(discoveredConnections.requestClients()); //获取节点视图 nodeSpecificViews = getNodeSpecificViews(requestedTopology, requestedClients, commandTimeoutNs); //返回uri对应分区信息 return nodeSpecificViews.toMap(); } } return nodeSpecificViews.toMap(); } catch (InterruptedException e) { Thread.currentThread().interrupt(); throw new RedisCommandInterruptedException(e); } finally { if (connections != null) { connections.close(); } } }
这样在创建connection的时候就已经知道集群中的所有有效节点.根据之前的文章可以知道对于集群命令的处理是在ClusterDistributionChannelWriter中处理的.其中有一些信息在初始化writer的时候就初始化了
class ClusterDistributionChannelWriter implements RedisChannelWriter { //默认写入器 private final RedisChannelWriter defaultWriter; //集群事件监听器 private final ClusterEventListener clusterEventListener; private final int executionLimit; //集群连接提供器 private ClusterConnectionProvider clusterConnectionProvider; //异步集群连接提供器 private AsyncClusterConnectionProvider asyncClusterConnectionProvider; //是否关闭 private boolean closed = false; //分区信息 private volatile Partitions partitions;
写命令的处理如下,会根据key计算出slot,进而找到这个slot对应的node,直接访问这个node,这样可以有效减少访问cluster次数
public <K, V, T> RedisCommand<K, V, T> write(RedisCommand<K, V, T> command) { LettuceAssert.notNull(command, "Command must not be null"); //如果连接已经关闭则抛出异常 if (closed) { throw new RedisException("Connection is closed"); } //如果是集群命令且命令没有处理完毕 if (command instanceof ClusterCommand && !command.isDone()) { //类型转换, 转换为ClusterCommand ClusterCommand<K, V, T> clusterCommand = (ClusterCommand<K, V, T>) command; if (clusterCommand.isMoved() || clusterCommand.isAsk()) { HostAndPort target; boolean asking; //如果集群命令已经迁移,此时通过ClusterCommand中到重试操作进行到此 if (clusterCommand.isMoved()) { //获取命令迁移目标节点 target = getMoveTarget(clusterCommand.getError()); //触发迁移事件 clusterEventListener.onMovedRedirection(); asking = false; } else {//如果是ask target = getAskTarget(clusterCommand.getError()); asking = true; clusterEventListener.onAskRedirection(); } command.getOutput().setError((String) null); //连接迁移后的目标节点 CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = asyncClusterConnectionProvider .getConnectionAsync(ClusterConnectionProvider.Intent.WRITE, target.getHostText(), target.getPort()); //成功建立连接,则向该节点发送命令 if (isSuccessfullyCompleted(connectFuture)) { writeCommand(command, asking, connectFuture.join(), null); } else { connectFuture.whenComplete((connection, throwable) -> writeCommand(command, asking, connection, throwable)); } return command; } } //不是集群命令就是RedisCommand,第一个请求命令就是非ClusterCommand //将当前命令包装为集群命令 ClusterCommand<K, V, T> commandToSend = getCommandToSend(command); //获取命令参数 CommandArgs<K, V> args = command.getArgs(); //排除集群路由的cluster命令 if (args != null && !CommandType.CLIENT.equals(commandToSend.getType())) { //获取第一个编码后的key ByteBuffer encodedKey = args.getFirstEncodedKey(); //如果encodedKey不为null if (encodedKey != null) { //获取slot值 int hash = getSlot(encodedKey); //根据命令类型获取命令意图 是读还是写 ClusterConnectionProvider.Intent intent = getIntent(command.getType()); //根据意图和slot获取连接 CompletableFuture<StatefulRedisConnection<K, V>> connectFuture = ((AsyncClusterConnectionProvider) clusterConnectionProvider) .getConnectionAsync(intent, hash); //如果成功获取连接 if (isSuccessfullyCompleted(connectFuture)) { writeCommand(commandToSend, false, connectFuture.join(), null); } else {//如果连接尚未处理完,或有异常,则添加完成处理器 connectFuture.whenComplete((connection, throwable) -> writeCommand(commandToSend, false, connection, throwable)); } return commandToSend; } } writeCommand(commandToSend, defaultWriter); return commandToSend; }
但是如果计算出的slot因为集群扩展导致这个slot已经不在这个节点上lettuce是如何处理的呢?通过查阅ClusterCommand源码可以发现在complete方法中对于该问题进行了处理;如果响应是MOVED则会继续访问MOVED目标节点,这个重定向的此时可以指定的,默认为5次,通过上文的配置可以发现,在配置中只允许一次重定向
@Override public void complete() { //如果响应是MOVED或ASK if (isMoved() || isAsk()) { //如果最大重定向次数大于当前重定向次数则可以进行重定向 boolean retryCommand = maxRedirections > redirections; //重定向次数自增 redirections++; if (retryCommand) { try { //重定向 retry.write(this); } catch (Exception e) { completeExceptionally(e); } return; } } super.complete(); completed = true; }
如果是ask向重定向目标发送命令前需要同步发送asking
private static <K, V> void writeCommand(RedisCommand<K, V, ?> command, boolean asking, StatefulRedisConnection<K, V> connection, Throwable throwable) { if (throwable != null) { command.completeExceptionally(throwable); return; } try { //如果需要asking则发送asking if (asking) { connection.async().asking(); } //发送命令 writeCommand(command, ((RedisChannelHandler<K, V>) connection).getChannelWriter()); } catch (Exception e) { command.completeExceptionally(e); } }
以上是关于Lettuce之RedisClusterClient使用以及源码分析的主要内容,如果未能解决你的问题,请参考以下文章
Lettuce之RedisClusterClient使用以及源码分析
SpringBoot整合Redis 之 lettuce #私藏项目实操分享#