利用redis简单实现消息订阅和发布

Posted SDingBa

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了利用redis简单实现消息订阅和发布相关的知识,希望对你有一定的参考价值。

好久没写博客了,最近关于redis消费者,生产者的功能,看了很多的资料,个人觉得很多的MQ开源都很好用,redis相对小俏,简单实现,一下先介绍redis实现;

1, redis实现消息发布和订阅,

/**
 * Created by SDingBa.xiong on 17-3-9.
 */
public class RedisMsgPubSubListener extends JedisPubSub 

    private static final Logger logger = LoggerFactory.getLogger(RedisMsgPubSubListener.class);

    @Resource
    private AdOriginalityMapper adOriginalityMapper;

    @Override
    public void unsubscribe() 
        super.unsubscribe();
    

    @Override
    public void unsubscribe(String... channels) 
        super.unsubscribe(channels);
    

    @Override
    public void subscribe(String... channels) 
        super.subscribe(channels);
    

    @Override
    public void psubscribe(String... patterns) 
        super.psubscribe(patterns);
    

    @Override
    public void punsubscribe() 
        super.punsubscribe();
    

    @Override
    public void punsubscribe(String... patterns) 
        super.punsubscribe(patterns);
    

    @Override
    public void onMessage(String channel, String message) 
        logger.info("channel=, receives message=", channel, message);


     code...
    

    @Override
    public void onPMessage(String pattern, String channel, String message) 

    

    @Override
    public void onSubscribe(String channel, int subscribedChannels) 
        logger.info("====start Sub ====== channel=, receives subscribedChannels=", channel, subscribedChannels);
    

    @Override
    public void onPUnsubscribe(String pattern, int subscribedChannels) 

    

    @Override
    public void onPSubscribe(String pattern, int subscribedChannels) 

    

    @Override
    public void onUnsubscribe(String channel, int subscribedChannels) 
        logger.info("====end Sub ======= channel=, receives subscribedChannels=", channel, subscribedChannels);
    

2 spring boot初始化;

@Configuration
@Component
public class RedisMQSub implements ApplicationListener<ContextRefreshedEvent> 

    private final Logger logger = LoggerFactory.getLogger(RedisMQSub.class);

    @Resource
    private RedisService redisService;

    // 运行 监听的 主机(123.56.86.1), 可以使用队列解决 多个服务器处理一个通知请求(先通过 MQ -> 处理队列)
    // private static final String CRM_ACTIVE_HOSTNAME = "iZ25o9jw9hbZ";
    private static final String CRM_ACTIVE_HOSTNAME = "su";

    private void redisInit() 
        // 本地测试 ;
        // Jedis jedis = new Jedis("localhost");
        // System.out.println((redisService == null) + " redis server");
        // RedisMsgPubSubListener listener = new RedisMsgPubSubListener();
        // new Thread(() -> jedis.subscribe(listener, CreativeData.CREATIVE_ACTIVE_REDIS_MESSAGE_MQ)).start();

        // listener.unsubscribe();//关闭监听
    

    @Override
    public void onApplicationEvent(ContextRefreshedEvent event) 
        redisInit();
    

subscribe 会阻塞进程,,,,

jedis.public(通道名,消息体…) 既可以发布消息

a,上面模式适合 单一服务器使用,,如果是分布式多服务器,很多其他的MQ很好实现,redis需要使用到队列功能了(因为消息发布和订阅是一对多的模式(redis)),,,不能仅仅使用消息的订阅和发布处理数据了,只能使用消息的发布用来通知其他的分布式节点,进行抢占时向队列里面取数据,

e redis.lpush(); 和 redis.rpop() 可实现

o 工具类:

@Scope("singleton")
@Service
public class RedisService implements InitializingBean 

    private static final Logger LOGGER = LoggerFactory.getLogger(RedisService.class);
    /**
     * Redis操作接口
     *
     * @author 林计钦
     * @version 1.0 2013-6-14 上午08:54:14
     */

    @Resource
    private SystemConfig systemConfig;
    private JedisPool pool = null;

    /**
     * 返还到连接池
     *
     * @param redis
     */
    public static void returnResource(Jedis redis) 
        if (redis != null) 
            redis.close();
        
    

    public boolean setNX(String key, int expired) 
        Jedis jedis = null;
        int result = 0;
        try 
            jedis = pool.getResource();
            result = jedis.setnx(key, "1").intValue();
            jedis.expire(key, expired);
         catch (CacheException e) 
            LOGGER.error("sedis_error.key:", key, e.getMessage());
         finally 
            returnResource(jedis);
        
        return result > 0;
    

    public List<String> brpop(String key) 
        Jedis jedis = null;
        List<String> listStr = Lists.newArrayList();
        try 
            jedis = pool.getResource();
            listStr = jedis.brpop(10, key);
         catch (CacheException e) 
            LOGGER.error("sedis_error.key:", key, e.getMessage());
         finally 
            returnResource(jedis);
        
        return listStr;
    

    /**
     * 获取数据
     *
     * @param key
     * @return
     */
    public String getString(String key) 
        String value = null;

        Jedis jedis = null;
        try 
            jedis = pool.getResource();
            value = jedis.get(key);
         catch (Exception e) 
            // 释放redis对象
            if (jedis != null) 
                jedis.close();
            
            LOGGER.error("jedis_pool_err", e);
         finally 
            // 返还到连接池
            returnResource(jedis);
        
        return value;
    

    /**
     * Strings结构:set command
     *
     * @param key
     * @param object 存储object
     * @param expired 过期时间(秒)
     */
    public boolean setString(String key, Object object, int expired) 
        Jedis jedis = null;
        try 
            jedis = pool.getResource();
            jedis.setex(key, expired, String.valueOf(object));
            return true;
         catch (Exception e) 
            LOGGER.error("redis error:key:;value", key, object, e);
         finally 
            returnResource(jedis);
        
        return false;
    

    /**
     * Hash结构:hset command
     *
     * @param key cache key
     * @param field second key
     * @param obj 直接存对象
     */
    public void hset(String key, String field, Object obj, int expired) 
        Jedis jedis = null;
        try 
            jedis = pool.getResource();
            jedis.hset(key, field, JSON.toJSONString(obj));
            jedis.expire(key.getBytes(), expired);
         catch (CacheException e) 
            LOGGER.error("sedis_error.key:", key, e.getMessage());
         finally 
            returnResource(jedis);
        

    

    @Override
    public void afterPropertiesSet() throws Exception 
        JedisPoolConfig config = new JedisPoolConfig();
        // 控制一个pool可分配多少个jedis实例,通过pool.getResource()来获取;
        // 如果赋值为-1,则表示不限制;如果pool已经分配了maxActive个jedis实例,则此时pool的状态为exhausted(耗尽)。
        config.setMaxTotal(500);
        // 控制一个pool最多有多少个状态为idle(空闲的)的jedis实例。
        config.setMaxIdle(5);
        // 表示当borrow(引入)一个jedis实例时,最大的等待时间,如果超过等待时间,则直接抛出JedisConnectionException;
        config.setMaxWaitMillis(1000 * 100);
        // 在borrow一个jedis实例时,是否提前进行validate操作;如果为true,则得到的jedis实例均是可用的;
        config.setTestOnBorrow(true);
        String redisIp = systemConfig.getRedisIp();
        String redisPassword = systemConfig.getRedisPassword();
        int redisPort = systemConfig.getRedisPort();
        pool = new JedisPool(config, redisIp, redisPort, 2000, redisPassword);

    

    /**
     * 获取数据
     *
     * @param key
     * @return
     */
    public Map<String, String> hgetAll(String key) 
        Map<String, String> value = null;

        Jedis jedis = null;
        try 
            jedis = pool.getResource();
            value = jedis.hgetAll(key);
         catch (Exception e) 
            // 释放redis对象
            if (jedis != null) 
                jedis.close();
            
            LOGGER.error("jedis_pool_err", e);
         finally 
            // 返还到连接池
            returnResource(jedis);
        
        return value;
    

    public String hget(String key, String field) 
        Jedis jedis = null;
        try 
            jedis = pool.getResource();
            return jedis.hget(key, field);
         catch (CacheException e) 
            LOGGER.error("sedis_error.key:", key, e.getMessage());
         finally 
            returnResource(jedis);
        
        return "";
    

    public Set<String> keys(String field) 
        Jedis jedis = null;
        try 
            jedis = pool.getResource();
            return jedis.hkeys(field);
         catch (CacheException e) 
            LOGGER.error("redis_error.key:", field, e.getMessage());
         finally 
            returnResource(jedis);
        
        return null;
    

    /**
     * Hash结构:hset command
     *
     * @param key cache key
     * @param field second key
     * @param obj 直接存对象
     */
    public void hset(String key, String field, Object obj) 
        Jedis jedis = null;
        try 
            jedis = pool.getResource();
            jedis.hset(key, field, JSON.toJSONString(obj));
         catch (CacheException e) 
            LOGGER.error("sedis_error.key:", key, e.getMessage());
         finally 
            returnResource(jedis);
        

    

    /**
     * 存储REDIS队列 顺序存储
     *
     * @param key reids键名
     * @param value 键值
     */
    public void lpush(String key, String value) 

        Jedis jedis = null;
        try 
            jedis = pool.getResource();
            jedis.lpush(key, value);
         catch (Exception e) 
            LOGGER.error("sedis_error.key:", key, e);
         finally 
            returnResource(jedis);
        

    

    /**
     * 获取队列数据
     *
     * @param key 键名
     */
    public String rpop(String key) 
        String bytes = null;
        Jedis jedis = null;
        try 
            jedis = pool.getResource();
            bytes = jedis.rpop(key);
         catch (Exception e) 
            LOGGER.error("sedis_error.key:", key, e);
         finally 
            returnResource(jedis);
        
        return bytes;
    

    /**
     * 获取 队列 长度
     *
     * @param key 队列名
     */
    public long llen(String key) 
        long bytes = 0;
        Jedis jedis = null;
        try 
            jedis = pool.getResource();
            bytes = jedis.llen(key);
         catch (Exception e) 
            LOGGER.error("sedis_error.key:", key, e);
         finally 
            returnResource(jedis);
        
        return bytes;
    

    /**
     * 接收消息。在main方法调用后,会一直执行下去。当有发布对应消息时,就会在jedisPubSub中接收到!
     *
     * 存在堵塞进程
     * @param jedisPubSub
     * @param channels

     */
    public void subscribeMsg(JedisPubSub jedisPubSub, String channels) 
        Jedis jedis = null;
        try 
            jedis = pool.getResource();
            jedis.subscribe(jedisPubSub, channels);
            LOGGER.debug("subscribeMsg  = ", jedisPubSub, channels);
         catch (Exception e) 
            LOGGER.error("subscribeMsg  = ", jedisPubSub, channels, e);
         finally 
            returnResource(jedis);
        
    

    public void publishMsg(String channel, String message) 
        Jedis jedis = null;
        try 
            jedis = pool.getResource();

            jedis.publish(channel, message);
         catch (Exception e) 
            LOGGER.error("publishMsg  = ", channel, message, e);
         finally 
            returnResource(jedis);
        
    

以上是关于利用redis简单实现消息订阅和发布的主要内容,如果未能解决你的问题,请参考以下文章

利用Redis作消息队列,实现生产消费和发布订阅

Redis Pub/Sub 发布订阅模式的深度解析与实现消息队列

redis发布订阅模式

基于Redis发布订阅和websocket实现聊天室功能

python redis 实现简单的消息订阅

Redis实现消息队列(生产者/消费者发布订阅模式)