Redis的发布订阅+线程池使用实践

Posted 恒奇恒毅

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis的发布订阅+线程池使用实践相关的知识,希望对你有一定的参考价值。

Redis支持发布订阅功能,Jedis也提供了较好的支持,但是使用的时候有些需要注意的地方:

  1. jedis的订阅是个死循环,并且会一直阻塞,所以不要将订阅的代码放在主线程,而是要另开线程。
  2. 另开线程建议不要使用new Thread()方式,而是使用线程池的方式。
  3. 订阅功能异常处理。
  4. 线程池的参数设置有讲究,因为这个线程池的特点是线程会一直阻塞,直到有异常,
    //1. 所以core size=1,最大要设置为Integer.MAX_VALUE,SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。
    //2. 线程空闲时间设置为60
    //3. 阻塞队列SynchronousQueue,不需要放在阻塞队列
    //4. 拒绝策略为抛异常
    //5. 建议不要监控太多channel,而是通过message来区分
    代码如下:
public class RedisSubService 
    private static final Logger logger = LoggerFactory.getLogger(RedisSubService.class);
    public static final RedisSubService service = new RedisSubService();
    private static final Executor SUBSCRIBE_EXECUTOR;
    /**
     * 因为subscribe也会占用Redis的连接,所以不宜过大
     */
    private static final int MAX_RETRY_COUNT = 2;
    /**
     * 最大线程数量,也就是最大监听频道数
     */
    private static final int MAX_THREAD_SIZE = 10;
    /**
     * 接口配置变化频道消息
     */
    public static final String CHANNEL_INTERFACE_CONFIG = "INTERFACE_CONFIG_CHANGE";
    private static final List<BiConsumer<String, String>> HANDLERS = new ArrayList<>();

    static 
        ThreadFactory threadFactory = ThreadFactoryBuilder.create()
                .setNameFormat("Redis-Subscribe-Thread-%d")
                .setDaemon(true)
                //.setThreadFactory(Thread::new)
                .build();
        //这个线程池的特点是线程会一直阻塞,直到有异常,
        //1. 所以core size=1,最大要设置为Integer.MAX_VALUE,SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。
        //2. 线程空闲时间设置为60
        //3. 阻塞队列SynchronousQueue,不需要放在阻塞队列
        //4. 拒绝策略为抛异常
        //5. 建议不要监控太多channel,而是通过message来区分
        ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
        SUBSCRIBE_EXECUTOR = new ThreadPoolExecutor(1,
                Integer.MAX_VALUE,
                60,
                TimeUnit.SECONDS,
                new SynchronousQueue<>(),
                threadFactory,
                abortPolicy);
    

    /**
     * 通用的监听方法,如果超过了系统限制会报RejectedExecutionException
     * @param channel 监听的频道
     * @param handler 消息处理器
     */
    public void subscribe(final String channel, final BiConsumer<String, String> handler)
        Objects.requireNonNull(handler);
        SUBSCRIBE_EXECUTOR.execute(()->
            for (int i = 0; i < MAX_RETRY_COUNT; i++) 
                try (Jedis jedis = Redis.use().getJedis()) 
                    try 
                        logger.info("redis subscribe  index-", channel, i);
                        //程序会阻塞在这里,理论上只会运行此代码一次,如果出现异常等相当于进行重试
                        subscribe(jedis, channel, handler);
                     catch (JedisConnectionException e) 
                        logger.info("redis subscribe  error index-", channel, i);
                        //如果发生了连接异常,则catch,然后重试
                        logger.error(e.getMessage(), e);
                    
                
            

            logger.warn("redis subscribe had retried , won't accept more published messages", MAX_RETRY_COUNT);
        );
    

    private void subscribe(Jedis jedis, String channel, BiConsumer<String, String> handler) 
        jedis.subscribe(new JedisPubSub() 
            @Override
            public void onMessage(String channel, String message) 
                logger.info(",", channel, message);
                handler.accept(channel, message);
            
        , channel);
    

    /**
     * 配合@this.registerHandler()
     * 监听的是CHANNEL_INTERFACE_CONFIG这个频道
     */
    public void subscribe()
        /*if(HANDLERS.isEmpty())
            return;
        */
        subscribe(CHANNEL_INTERFACE_CONFIG, (c,m)->
            for (BiConsumer<String, String> h : HANDLERS) 
                h.accept(c,m);
            
        );
    

    public void registerHandler(BiConsumer<String, String> handler)
        Objects.requireNonNull(handler);
        HANDLERS.add(handler);
    
    public void registerHandler(BiConsumer<String, String> handler, BiConsumer<String, String>... handlers)
        Objects.requireNonNull(handler);
        HANDLERS.add(handler);
        if(ArrayUtil.isNotEmpty(handlers))
            HANDLERS.addAll(Arrays.asList(handlers));
        
    

    public void publish(String channel,String message)

        try (Jedis jedis = Redis.use().getJedis()) 
            jedis.publish(channel, message);
        
    

当然,如果你还需要对JedisPubSub的其他方法进行扩展,那么你扩展BiConsumer就好了

以上是关于Redis的发布订阅+线程池使用实践的主要内容,如果未能解决你的问题,请参考以下文章

redis怎么解决订阅模式多节点重复问题

用阻塞队列和线程池简单实现生产者和消费者场景

线程池`基础`

Java 并发编程线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )

4种线程池和7种并发队列

面试官问:你做过什么Java线程池实践,我写了一篇博客给他看~