利用redis简单实现消息订阅和发布
Posted SDingBa
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用redis简单实现消息订阅和发布相关的知识,希望对你有一定的参考价值。
好久没写博客了,最近关于redis消费者,生产者的功能,看了很多的资料,个人觉得很多的MQ开源都很好用,redis相对小俏,简单实现,一下先介绍redis实现;
1, redis实现消息发布和订阅,
/**
* Created by SDingBa.xiong on 17-3-9.
*/
public class RedisMsgPubSubListener extends JedisPubSub
private static final Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class);
@Resource
private AdOriginalityMapper adOriginalityMapper;
@Override
public void unsubscribe()
super.unsubscribe();
@Override
public void unsubscribe(String... channels)
super.unsubscribe(channels);
@Override
public void subscribe(String... channels)
super.subscribe(channels);
@Override
public void psubscribe(String... patterns)
super.psubscribe(patterns);
@Override
public void punsubscribe()
super.punsubscribe();
@Override
public void punsubscribe(String... patterns)
super.punsubscribe(patterns);
@Override
public void onMessage(String channel, String message)
logger.info("channel=, receives message=", channel, message);
code...
@Override
public void onPMessage(String pattern, String channel, String message)
@Override
public void onSubscribe(String channel, int subscribedChannels)
logger.info("====start Sub ====== channel=, receives subscribedChannels=", channel, subscribedChannels);
@Override
public void onPUnsubscribe(String pattern, int subscribedChannels)
@Override
public void onPSubscribe(String pattern, int subscribedChannels)
@Override
public void onUnsubscribe(String channel, int subscribedChannels)
logger.info("====end Sub ======= channel=, receives subscribedChannels=", channel, subscribedChannels);
2 spring boot初始化;
@Configuration
@Component
public class RedisMQSub implements ApplicationListener<ContextRefreshedEvent>
private final Logger logger = LoggerFactory.getLogger(RedisMQSub.class);
@Resource
private RedisService redisService;
// 运行 监听的 主机(123.56.86.1), 可以使用队列解决 多个服务器处理一个通知请求(先通过 MQ -> 处理队列)
// private static final String CRM_ACTIVE_HOSTNAME = "iZ25o9jw9hbZ";
private static final String CRM_ACTIVE_HOSTNAME = "su";
private void redisInit()
// 本地测试 ;
// Jedis jedis = new Jedis("localhost");
// System.out.println((redisService == null) + " redis server");
// RedisMsgPubSubListener listener = new RedisMsgPubSubListener();
// new Thread(() -> jedis.subscribe(listener, CreativeData.CREATIVE_ACTIVE_REDIS_MESSAGE_MQ)).start();
// listener.unsubscribe();//关闭监听
@Override
public void onApplicationEvent(ContextRefreshedEvent event)
redisInit();
subscribe 会阻塞进程,,,,
jedis.public(通道名,消息体…) 既可以发布消息
a,上面模式适合 单一服务器使用,,如果是分布式多服务器,很多其他的MQ很好实现,redis需要使用到队列功能了(因为消息发布和订阅是一对多的模式(redis)),,,不能仅仅使用消息的订阅和发布处理数据了,只能使用消息的发布用来通知其他的分布式节点,进行抢占时向队列里面取数据,
e redis.lpush(); 和 redis.rpop() 可实现
o 工具类:
@Scope("singleton")
@Service
public class RedisService implements InitializingBean
private static final Logger LOGGER = LoggerFactory.getLogger(RedisService.class);
/**
* Redis操作接口
*
* @author 林计钦
* @version 1.0 2013-6-14 上午08:54:14
*/
@Resource
private SystemConfig systemConfig;
private JedisPool pool = null;
/**
* 返还到连接池
*
* @param redis
*/
public static void returnResource(Jedis redis)
if (redis != null)
redis.close();
public boolean setNX(String key, int expired)
Jedis jedis = null;
int result = 0;
try
jedis = pool.getResource();
result = jedis.setnx(key, "1").intValue();
jedis.expire(key, expired);
catch (CacheException e)
LOGGER.error("sedis_error.key:", key, e.getMessage());
finally
returnResource(jedis);
return result > 0;
public List<String> brpop(String key)
Jedis jedis = null;
List<String> listStr = Lists.newArrayList();
try
jedis = pool.getResource();
listStr = jedis.brpop(10, key);
catch (CacheException e)
LOGGER.error("sedis_error.key:", key, e.getMessage());
finally
returnResource(jedis);
return listStr;
/**
* 获取数据
*
* @param key
* @return
*/
public String getString(String key)
String value = null;
Jedis jedis = null;
try
jedis = pool.getResource();
value = jedis.get(key);
catch (Exception e)
// 释放redis对象
if (jedis != null)
jedis.close();
LOGGER.error("jedis_pool_err", e);
finally
// 返还到连接池
returnResource(jedis);
return value;
/**
* Strings结构:set command
*
* @param key
* @param object 存储object
* @param expired 过期时间(秒)
*/
public boolean setString(String key, Object object, int expired)
Jedis jedis = null;
try
jedis = pool.getResource();
jedis.setex(key, expired, String.valueOf(object));
return true;
catch (Exception e)
LOGGER.error("redis error:key:;value", key, object, e);
finally
returnResource(jedis);
return false;
/**
* Hash结构:hset command
*
* @param key cache key
* @param field second key
* @param obj 直接存对象
*/
public void hset(String key, String field, Object obj, int expired)
Jedis jedis = null;
try
jedis = pool.getResource();
jedis.hset(key, field, JSON.toJSONString(obj));
jedis.expire(key.getBytes(), expired);
catch (CacheException e)
LOGGER.error("sedis_error.key:", key, e.getMessage());
finally
returnResource(jedis);
@Override
public void afterPropertiesSet() throws Exception
JedisPoolConfig config = new JedisPoolConfig();
// 控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
// 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
config.setMaxTotal(500);
// 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
config.setMaxIdle(5);
// 表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
config.setMaxWaitMillis(1000 * 100);
// 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
config.setTestOnBorrow(true);
String redisIp = systemConfig.getRedisIp();
String redisPassword = systemConfig.getRedisPassword();
int redisPort = systemConfig.getRedisPort();
pool = new JedisPool(config, redisIp, redisPort, 2000, redisPassword);
/**
* 获取数据
*
* @param key
* @return
*/
public Map<String, String> hgetAll(String key)
Map<String, String> value = null;
Jedis jedis = null;
try
jedis = pool.getResource();
value = jedis.hgetAll(key);
catch (Exception e)
// 释放redis对象
if (jedis != null)
jedis.close();
LOGGER.error("jedis_pool_err", e);
finally
// 返还到连接池
returnResource(jedis);
return value;
public String hget(String key, String field)
Jedis jedis = null;
try
jedis = pool.getResource();
return jedis.hget(key, field);
catch (CacheException e)
LOGGER.error("sedis_error.key:", key, e.getMessage());
finally
returnResource(jedis);
return "";
public Set<String> keys(String field)
Jedis jedis = null;
try
jedis = pool.getResource();
return jedis.hkeys(field);
catch (CacheException e)
LOGGER.error("redis_error.key:", field, e.getMessage());
finally
returnResource(jedis);
return null;
/**
* Hash结构:hset command
*
* @param key cache key
* @param field second key
* @param obj 直接存对象
*/
public void hset(String key, String field, Object obj)
Jedis jedis = null;
try
jedis = pool.getResource();
jedis.hset(key, field, JSON.toJSONString(obj));
catch (CacheException e)
LOGGER.error("sedis_error.key:", key, e.getMessage());
finally
returnResource(jedis);
/**
* 存储REDIS队列 顺序存储
*
* @param key reids键名
* @param value 键值
*/
public void lpush(String key, String value)
Jedis jedis = null;
try
jedis = pool.getResource();
jedis.lpush(key, value);
catch (Exception e)
LOGGER.error("sedis_error.key:", key, e);
finally
returnResource(jedis);
/**
* 获取队列数据
*
* @param key 键名
*/
public String rpop(String key)
String bytes = null;
Jedis jedis = null;
try
jedis = pool.getResource();
bytes = jedis.rpop(key);
catch (Exception e)
LOGGER.error("sedis_error.key:", key, e);
finally
returnResource(jedis);
return bytes;
/**
* 获取 队列 长度
*
* @param key 队列名
*/
public long llen(String key)
long bytes = 0;
Jedis jedis = null;
try
jedis = pool.getResource();
bytes = jedis.llen(key);
catch (Exception e)
LOGGER.error("sedis_error.key:", key, e);
finally
returnResource(jedis);
return bytes;
/**
* 接收消息。在main方法调用后,会一直执行下去。当有发布对应消息时,就会在jedisPubSub中接收到!
*
* 存在堵塞进程
* @param jedisPubSub
* @param channels
*/
public void subscribeMsg(JedisPubSub jedisPubSub, String channels)
Jedis jedis = null;
try
jedis = pool.getResource();
jedis.subscribe(jedisPubSub, channels);
LOGGER.debug("subscribeMsg = ", jedisPubSub, channels);
catch (Exception e)
LOGGER.error("subscribeMsg = ", jedisPubSub, channels, e);
finally
returnResource(jedis);
public void publishMsg(String channel, String message)
Jedis jedis = null;
try
jedis = pool.getResource();
jedis.publish(channel, message);
catch (Exception e)
LOGGER.error("publishMsg = ", channel, message, e);
finally
returnResource(jedis);
以上是关于利用redis简单实现消息订阅和发布的主要内容,如果未能解决你的问题,请参考以下文章