Redis的发布订阅+线程池使用实践
Posted 恒奇恒毅
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Redis的发布订阅+线程池使用实践相关的知识,希望对你有一定的参考价值。
Redis支持发布订阅功能,Jedis也提供了较好的支持,但是使用的时候有些需要注意的地方:
- jedis的订阅是个死循环,并且会一直阻塞,所以不要将订阅的代码放在主线程,而是要另开线程。
- 另开线程建议不要使用new Thread()方式,而是使用线程池的方式。
- 订阅功能异常处理。
- 线程池的参数设置有讲究,因为这个线程池的特点是线程会一直阻塞,直到有异常,
//1. 所以core size=1,最大要设置为Integer.MAX_VALUE,SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。
//2. 线程空闲时间设置为60
//3. 阻塞队列SynchronousQueue,不需要放在阻塞队列
//4. 拒绝策略为抛异常
//5. 建议不要监控太多channel,而是通过message来区分
代码如下:
public class RedisSubService
private static final Logger logger = LoggerFactory.getLogger(RedisSubService.class);
public static final RedisSubService service = new RedisSubService();
private static final Executor SUBSCRIBE_EXECUTOR;
/**
* 因为subscribe也会占用Redis的连接,所以不宜过大
*/
private static final int MAX_RETRY_COUNT = 2;
/**
* 最大线程数量,也就是最大监听频道数
*/
private static final int MAX_THREAD_SIZE = 10;
/**
* 接口配置变化频道消息
*/
public static final String CHANNEL_INTERFACE_CONFIG = "INTERFACE_CONFIG_CHANGE";
private static final List<BiConsumer<String, String>> HANDLERS = new ArrayList<>();
static
ThreadFactory threadFactory = ThreadFactoryBuilder.create()
.setNameFormat("Redis-Subscribe-Thread-%d")
.setDaemon(true)
//.setThreadFactory(Thread::new)
.build();
//这个线程池的特点是线程会一直阻塞,直到有异常,
//1. 所以core size=1,最大要设置为Integer.MAX_VALUE,SynchronousQueue阻塞队列一般要求maximumPoolSizes为无界(Integer.MAX_VALUE),避免线程拒绝执行操作。
//2. 线程空闲时间设置为60
//3. 阻塞队列SynchronousQueue,不需要放在阻塞队列
//4. 拒绝策略为抛异常
//5. 建议不要监控太多channel,而是通过message来区分
ThreadPoolExecutor.AbortPolicy abortPolicy = new ThreadPoolExecutor.AbortPolicy();
SUBSCRIBE_EXECUTOR = new ThreadPoolExecutor(1,
Integer.MAX_VALUE,
60,
TimeUnit.SECONDS,
new SynchronousQueue<>(),
threadFactory,
abortPolicy);
/**
* 通用的监听方法,如果超过了系统限制会报RejectedExecutionException
* @param channel 监听的频道
* @param handler 消息处理器
*/
public void subscribe(final String channel, final BiConsumer<String, String> handler)
Objects.requireNonNull(handler);
SUBSCRIBE_EXECUTOR.execute(()->
for (int i = 0; i < MAX_RETRY_COUNT; i++)
try (Jedis jedis = Redis.use().getJedis())
try
logger.info("redis subscribe index-", channel, i);
//程序会阻塞在这里,理论上只会运行此代码一次,如果出现异常等相当于进行重试
subscribe(jedis, channel, handler);
catch (JedisConnectionException e)
logger.info("redis subscribe error index-", channel, i);
//如果发生了连接异常,则catch,然后重试
logger.error(e.getMessage(), e);
logger.warn("redis subscribe had retried , won't accept more published messages", MAX_RETRY_COUNT);
);
private void subscribe(Jedis jedis, String channel, BiConsumer<String, String> handler)
jedis.subscribe(new JedisPubSub()
@Override
public void onMessage(String channel, String message)
logger.info(",", channel, message);
handler.accept(channel, message);
, channel);
/**
* 配合@this.registerHandler()
* 监听的是CHANNEL_INTERFACE_CONFIG这个频道
*/
public void subscribe()
/*if(HANDLERS.isEmpty())
return;
*/
subscribe(CHANNEL_INTERFACE_CONFIG, (c,m)->
for (BiConsumer<String, String> h : HANDLERS)
h.accept(c,m);
);
public void registerHandler(BiConsumer<String, String> handler)
Objects.requireNonNull(handler);
HANDLERS.add(handler);
public void registerHandler(BiConsumer<String, String> handler, BiConsumer<String, String>... handlers)
Objects.requireNonNull(handler);
HANDLERS.add(handler);
if(ArrayUtil.isNotEmpty(handlers))
HANDLERS.addAll(Arrays.asList(handlers));
public void publish(String channel,String message)
try (Jedis jedis = Redis.use().getJedis())
jedis.publish(channel, message);
当然,如果你还需要对JedisPubSub的其他方法进行扩展,那么你扩展BiConsumer就好了
以上是关于Redis的发布订阅+线程池使用实践的主要内容,如果未能解决你的问题,请参考以下文章
Java 并发编程线程池机制 ( 线程池阻塞队列 | 线程池拒绝策略 | 使用 ThreadPoolExecutor 自定义线程池参数 )