SpringData-Redis发布订阅自动重连分析

Posted 一杯半盏

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringData-Redis发布订阅自动重连分析相关的知识,希望对你有一定的参考价值。

SpringData-Redis发布订阅自动重连分析

RedisMessageListenerContainer

配置

@Bean
@Autowired
RedisMessageListenerContainer redisContainer(JedisConnectionFactory redisConnectionFactory, RedisMessageListener a) {
    RedisMessageListenerContainer container
            = new RedisMessageListenerContainer();
    container.setConnectionFactory(redisConnectionFactory);
    List<Topic> topics = Lists.newArrayList(new ChannelTopic(
                    CHANNEL),
            new ChannelTopic(CHANNEL)
    );
    container.addMessageListener(new MessageListenerAdapter(a), topics);
    return container;
}

启动分析

添加频道监听

//RedisMessageListenerContainer.java

public void addMessageListener(MessageListener listener, Collection<? extends Topic> topics) {
    addListener(listener, topics);
    lazyListen();
}

这个AddListener会 对Topic做一些记录,patternMapping, channelMapping,去重等等,然后最关键的一步:

//RedisMessageListenerContainer.java
//addListener
// check the current listening state
    if (listening) {
        subscriptionTask.subscribeChannel(channels.toArray(new byte[channels.size()][]));
        subscriptionTask.subscribePattern(patterns.toArray(new byte[patterns.size()][]));
    }
//RedisMessageListenerContainer.java

void subscribeChannel(byte[]... channels) {
    if (channels != null && channels.length > 0) {
        if (connection != null) {
            synchronized (localMonitor) {
                Subscription sub = connection.getSubscription();
                if (sub != null) {
                    sub.subscribe(channels);
                }
            }
        }
    }
}
//JedisSubscription.java
    protected void doSubscribe(byte[]... channels) {
        jedisPubSub.subscribe(channels);
    }

但是启动之前 这个listening=false。故该代码不生效。再看lazyListen方法:

//RedisMessageListenerContainer.java
private void lazyListen() {
        boolean debug = logger.isDebugEnabled();
        boolean started = false;

        if (isRunning()) {
            if (!listening) {
                synchronized (monitor) {
                    if (!listening) {
                        if (channelMapping.size() > 0 || patternMapping.size() > 0) {
                            subscriptionExecutor.execute(subscriptionTask);
                            listening = true;
                            started = true;
                        }
                    }
                }
                if (debug) {
                    if (started) {
                        logger.debug("Started listening for Redis messages");
                    } else {
                        logger.debug("Postpone listening for Redis messages until actual listeners are added");
                    }
                }
            }
        }
    }

调用addMessageListener的时候,isRunning()=false 也不生效。

最后:当@Bean构造完成的时候 ,LifeCycle进入start的时候,该Container启动。

//RedisMessageListenerContainer.java

    public void start() {
        if (!running) {
            running = true;
            // wait for the subscription to start before returning
            // technically speaking we can only be notified right before the subscription starts
            synchronized (monitor) {
                lazyListen();
                if (listening) {
                    try {
                        // wait up to 5 seconds for Subscription thread
                        monitor.wait(initWait);
                    } catch (InterruptedException e) {
                        // stop waiting
                    }
                }
            }

            if (logger.isDebugEnabled()) {
                logger.debug("Started RedisMessageListenerContainer");
            }
        }
    }

这个时候,running=true了。
然后调用 lazyListen(),确实比较Lazy。
这个时候,启动子线程来执行订阅和监听。

subscriptionExecutor.execute(subscriptionTask);

这个subscriptionTask的构造如下:

//RedisMessageListenerContainer.java
public void run() {
    synchronized (localMonitor) {
        subscriptionTaskRunning = true;
    }
    try {
        connection = connectionFactory.getConnection();
        if (connection.isSubscribed()) {
            throw new IllegalStateException("Retrieved connection is already subscribed; aborting listening");
        }

        boolean asyncConnection = ConnectionUtils.isAsync(connectionFactory);

        // NB: async drivers‘ Xsubscribe calls block, so we notify the RDMLC before performing the actual subscription.
        if (!asyncConnection) {
            synchronized (monitor) {
                monitor.notify();
            }
        }

        SubscriptionPresentCondition subscriptionPresent = eventuallyPerformSubscription();

        if (asyncConnection) {
            SpinBarrier.waitFor(subscriptionPresent, getMaxSubscriptionRegistrationWaitingTime());

            synchronized (monitor) {
                monitor.notify();
            }
        }
    } catch (Throwable t) {
        handleSubscriptionException(t);
    } finally {
        // this block is executed once the subscription thread has ended, this may or may not mean
        // the connection has been unsubscribed, depending on driver
        synchronized (localMonitor) {
            subscriptionTaskRunning = false;
            localMonitor.notify();
        }
    }
}

这里connection 肯定不是subscribed。
然后他根据Redis的客户端类型来判断是否是阻塞的
如果是阻塞的类型,则唤醒一下被阻塞的Container线程。(???)

然后,最关键的是:eventuallyPerformSubscription(),最终发起订阅的,并轮询订阅的是方法。

//RDMLC

private SubscriptionPresentCondition eventuallyPerformSubscription() {

    SubscriptionPresentCondition condition = null;

    if (channelMapping.isEmpty()) {

        condition = new PatternSubscriptionPresentCondition();
        connection.pSubscribe(new DispatchMessageListener(), unwrap(patternMapping.keySet()));
    } else {

        if (patternMapping.isEmpty()) {
            condition = new SubscriptionPresentCondition();
        } else {
            // schedule the rest of the subscription
            subscriptionExecutor.execute(new PatternSubscriptionTask());
            condition = new PatternSubscriptionPresentCondition();
        }

        connection.subscribe(new DispatchMessageListener(), unwrap(channelMapping.keySet()));
    }

    return condition;
}

以connection.subscribe()为例:即将发起订阅,注意这里是利用DispatchMessageListener做的事件分发监听器。

//JedisConnection.java

public void subscribe(MessageListener listener, byte[]... channels) {
    if (isSubscribed()) {
        throw new RedisSubscribedConnectionException(
                "Connection already subscribed; use the connection Subscription to cancel or add new channels");
    }

    if (isQueueing()) {
        throw new UnsupportedOperationException();
    }
    if (isPipelined()) {
        throw new UnsupportedOperationException();
    }

    try {
        BinaryJedisPubSub jedisPubSub = new JedisMessageListener(listener);

        subscription = new JedisSubscription(listener, jedisPubSub, channels, null);
        jedis.subscribe(jedisPubSub, channels);

    } catch (Exception ex) {
        throw convertJedisAccessException(ex);
    }
}
//BinaryJedis.java

public void subscribe(BinaryJedisPubSub jedisPubSub, byte[]... channels) {
    client.setTimeoutInfinite();
    try {
      jedisPubSub.proceed(client, channels);
    } finally {
      client.rollbackTimeout();
    }
}

这里调用了BinaryJedisPubSub的proceed()。

这里先提出两个问题?
要订阅是不是要发起subscribe命令给Redis?发起 subscribe channel命令,然后Listener怎么办?

这里调用是jedis.subscribe(jedisPubSub, channels);而一开始 subscibeChannels的实现却不太一样?

下面看jedisPubSub:

  public void proceed(Client client, byte[]... channels) {
    this.client = client;
    client.subscribe(channels);
    client.flush();
    process(client);
  }

这里subscribe是再次发起订阅请求,然后process轮询检查消息。

异常处理

再看看JedisConnection类subscribe方法的异常的处理:

protected DataAccessException convertJedisAccessException(Exception ex) {

    if (ex instanceof NullPointerException) {
        // An NPE before flush will leave data in the OutputStream of a pooled connection
        broken = true;
    }

    DataAccessException exception = EXCEPTION_TRANSLATION.translate(ex);
    if (exception instanceof RedisConnectionFailureException) {
        broken = true;
    }

    return exception;
}

EXCEPTION_TRANSLATION.translate(ex); 会调用:PassThroughExceptionTranslationStrategy的Convert。

public class JedisExceptionConverter implements Converter<Exception, DataAccessException> {

    public DataAccessException convert(Exception ex) {

        if (ex instanceof DataAccessException) {
            return (DataAccessException) ex;
        }
        if (ex instanceof JedisDataException) {
            return new InvalidDataAccessApiUsageException(ex.getMessage(), ex);
        }
        if (ex instanceof JedisConnectionException) {
            return new RedisConnectionFailureException(ex.getMessage(), ex);
        }
        if (ex instanceof JedisException) {
            return new InvalidDataAccessApiUsageException(ex.getMessage(), ex);
        }
        if (ex instanceof UnknownHostException) {
            return new RedisConnectionFailureException("Unknown host " + ex.getMessage(), ex);
        }
        if (ex instanceof IOException) {
            return new RedisConnectionFailureException("Could not connect to Redis server", ex);
        }

        return null;
    }
}

那么,当Jedis抛错:JedisConnectionException 服务器似乎断开了连接
这个时候,subscribe 从而抛出RedisConnectionFailureException。

最后,再看RedisMessageListenerContainerd的run方法内的异常处理:
这个时候,

protected void handleSubscriptionException(Throwable ex) {
    listening = false;
    subscriptionTask.closeConnection();
    if (ex instanceof RedisConnectionFailureException) {
        if (isRunning()) {
            logger.error("Connection failure occurred. Restarting subscription task after " + recoveryInterval + " ms");
            sleepBeforeRecoveryAttempt();
            lazyListen();
        }
    } else {
        logger.error("SubscriptionTask aborted with exception:", ex);
    }
}

到这个时候,isRunning还是true的(当且仅当LifeCycle进入close的时候,才会变成false),结果就会在 recoveryInterval ms之后,重启调用lazyListen(),再次启动订阅和监听。

实际效果

实际上,我在服务器上的错误日志中,我确实看到了

Connection failure occurred. Restarting subscription task after 5000 ms

总结

SpringData-Redis,能够解决手动处理Redis pub/sub的订阅被意外断开,导致监听失败的问题。
他能确保,服务持续监听,出现异常时,能够重新订阅并监听给定的频道。
所以,还是用框架吧,比自己手写的发布订阅更可靠。










以上是关于SpringData-Redis发布订阅自动重连分析的主要内容,如果未能解决你的问题,请参考以下文章

QTcpSocket 客户端自动重连

uniapp websocket原生服务(自动重连心跳检测) Ba-Websocket

MQTTX 接收不到订阅数据的排查

Socket连接-RxSocket

Springdata-redis在项目中的使用

自动重连socket客户端的设计选择