redis之pub/sub(发布与订阅)

Posted 码农_heying

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了redis之pub/sub(发布与订阅)相关的知识,希望对你有一定的参考价值。

文章目录

一、概念

Redis发布/订阅(Pub/Sub)是一种通信机制,将数据推到某个信息管道中,其他客户端可通过订阅这些管道来获取推送信息,以此用于消息的传输。

由三部分组成:发布者(Publisher)、频道(Channel)、订阅者(Subscriber)。
发布者发布的消息分到不同的频道,不需要知道什么样的订阅者订阅。订阅者对一个或多个频道感兴趣,只需要接收感兴趣的消息,不需要知道什么样的发布者发布。主要目的是解除消息的发布者与订阅者之间的耦合关系。
发布者和订阅者都是Redis客户端,频道则是服务器端。

二、原理

Redis通过SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE和PUNSUBSCRIBE等命令实现发布和订阅功能。

在Redis底层结构中,客户端和频道的订阅关系是通过一个字典加链表的结构保存的,如下图:

在 Redis 的底层结构中,redis服务器中定义了一个pubsub_channels字典,用于保存所有频道的订阅关系,在这个字典中,key为所有频道名称,value结构是一个链表,其中存放的是所有订阅这个频道的订阅者客户端。subscribe命令的实质即为在key中添加value的订阅链。

若频道首次被订阅说明在字典中并不存在该渠道的信息,那么程序首先要新建一个对应的 key,并且要赋值一个空链表,然后将对应的客户端加入到链表中。此时链表只有一个元素。若该渠道已经被其他客户端订阅过:这个时候就直接找到key值对应的value客户端信息添加到链表的末尾即可。

三、推送的消息格式

所有订阅接收的消息均为由三个元素组成的多块响应。
有三种类型:

subscribe:

该类型表示成功订阅到频道响应,
第一个元素是消息类型,第二个元素为订阅的频道名称,第三个元素为已订阅的频道数量,例:

redis:6379> SUBSCRIBE myChannel
    Reading messages... (press Ctrl-C to quit)
    1) "subscribe"
    2) "myChannel"
    3) (integer) 1
unsubscribe:

该类型表示成功取消订阅到的频道响应,此时第二个元素为订阅的频道名称,第三个元素为已订阅的频道数量,例:

redis:6379> UNSUBSCRIBE myChannel
    1) "unsubscribe" 
    2) "myChannel"  
    3) (integer) 0 
message:

该类型表示订阅者客户端接收到其他客户端发出的发布命令结果,此时第二个元素表示来源频道的名称,第三个元素是实际的消息内容,例:

redis:6379> SUBSCRIBE myChannel  myChannel2
    1) "subscribe" 
    2) "myChannel"  
    3) (integer) 1
    1) "subscribe"
    2) "myChannel2"
    3) (integer) 2
    1) "message"
    2) "myChannel1"
    3) "Hello World"

注:以上三种消息类型均有对应的按模式订阅指令psubscribe,punsubscribe和pmessage,消息格式的第一个元素变为对应类型。

四、发布订阅命令

1.发送消息

PUBLISH命令为发送信息,返回值为接收到该消息的订阅者数量。

redis:4379> PUBLISH mychannel "Hello"
(integer) 1
redis:4379> PUBLISH mychannel "World"
(integer) 1

2.订阅频道

SUBSCRIBE命令为订阅频道,返回值消息格式如上所述。

redis:6379> SUBSCRIBE myChannel
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "myChannel"
3) (integer) 1
1) "message"
2) "myChannel"
3) "Hello"
1) "message"
2) "myChannel"
3) "World"

3.模式匹配

PSUBSCRIBE 命令订阅一个或多个符合给定模式的频道。

每个模式以 * 作为匹配符,例如news.* 匹配所有以 news. 开头的频道( news.one、news.global.today 等等)。

redis:6379> PSUBSCRIBE new.*
Reading messages... (press Ctrl-C to quit)
1) "psubscribe"
2) "new.*"
3) (integer) 1
1) "pmessage"
2) "new.*"      # 消息匹配的模式
3) "new.one"    # 消息本身的目标频道
4) "Hi"

注:当订阅客户端同时订阅某种模式和符合该模式的具体某个频道时,那么会接收到发布者推送的信息两次,两次接收的信息格式不同,一个为message类型,另一个为pmessage类型,消息内容一致。
4.取消订阅

UNSUBSCRIBE命令为取消订阅频道,与订阅频道命令相同,也有对应匹配模式PUNSUBSCRIBE。

如果没有填写指定频道,即一个无参数的UNSUBSCRIBE被调用执行,那么该客户端订阅的所有频道都会被退订。

由于Redis的订阅操作是阻塞式的,因此一旦客户端订阅了某个频道或模式,就将会一直处于订阅状态直到退出。在SUBSCRIBE,PSUBSCRIBE,UNSUBSCRIBE和PUNSUBSCRIBE命令中,其返回值都包含了该客户端当前订阅的频道和模式的数量,当这个数量变为0时,该客户端会自动退出订阅状态。

五、使用Jedis发布订阅命令

要使用Jedis的Publish/Subscribe功能,必须编写对JedisPubSub的自己的实现。

public class PubSubListener extends JedisPubSub
 
    // 取得订阅的消息后的处理
    public void onMessage(String channel, String message) 
        //TODO:接收订阅频道消息后,业务处理逻辑
        System.out.println(channel + "=" + message);
    
 
    // 初始化订阅时候的处理
    public void onSubscribe(String channel, int subscribedChannels) 
         System.out.println(channel + "=" + subscribedChannels);
    
 
    // 取消订阅时候的处理
    public void onUnsubscribe(String channel, int subscribedChannels) 
         System.out.println(channel + "=" + subscribedChannels);
    
 
    // 初始化按表达式的方式订阅时候的处理
    public void onPSubscribe(String pattern, int subscribedChannels) 
         System.out.println(pattern + "=" + subscribedChannels);
    
 
    // 取消按表达式的方式订阅时候的处理
    public void onPUnsubscribe(String pattern, int subscribedChannels) 
         System.out.println(pattern + "=" + subscribedChannels);
    
 
    // 取得按表达式的方式订阅的消息后的处理
    public void onPMessage(String pattern, String channel, String message) 
        System.out.println(pattern + "=" + channel + "=" + message);
    
 

Jedis有两种订阅模式:subsribe(一般模式设置频道)和psubsribe(使用模式匹配来设置频道)。不管是那种模式都可以设置个数不定的频道。订阅得到信息在将会lister的onMessage(…)方法或者onPMessage(…)中进行进行处理,这里我们只是做了简单的输出。 具体代码见GitHup:https://github.com/granett/Redis/tree/master/src/main/java/com/redis/pubsub

public class Subscribe 
 
    private Jedis jedis = new Jedis("192.168.1.207",6379);
 
    /**
     * SUBSCRIBE channel [channel ...]
     * 订阅给定的一个或多个频道的信息
     */
    @Test
    public void subscribe()
        final PubSubListener listener = new PubSubListener();
        jedis.subscribe(listener, "channel");
    
 
    /**
     * UNSUBSCRIBE [channel [channel ...]]
     * 指示客户端退订给定的频道
     * 如果没有频道被指定,即一个无参数的 UNSUBSCRIBE 调用被执行,
     * 那么客户端使用 SUBSCRIBE 命令订阅的所有频道都会被退订。
     * 在这种情况下,命令会返回一个信息,告知客户端所有被退订的频道。
     */
    @Test
    public void unsubscribe()
        final PubSubListener listener = new PubSubListener();
        listener.unsubscribe("channel");
    
 
    /**
     * PSUBSCRIBE pattern [pattern ...]
     * 订阅一个或多个符合给定模式的频道
     * 每个模式以 * 作为匹配符,比如 it* 匹配所有以 it 开头的频道( it.news 、 it.blog 、 it.tweets 等等),
     * news.* 匹配所有以 news. 开头的频道( news.it 、 news.global.today 等等),诸如此类。
     */
    @Test
    public void psubscribe()
        final PubSubListener listener = new PubSubListener();
        jedis.psubscribe(listener, "ch*");
    
 
    /**
     * PUNSUBSCRIBE [pattern [pattern ...]]
     * 指示客户端退订所有给定模式
     * 如果没有模式被指定,即一个无参数的 PUNSUBSCRIBE 调用被执行,
     * 那么客户端使用 PSUBSCRIBE 命令订阅的所有模式都会被退订。
     * 在这种情况下,命令会返回一个信息,告知客户端所有被退订的模式。
     */
    @Test
    public void punsubscribe()
        final PubSubListener listener = new PubSubListener();
        listener.punsubscribe("ch*");
    
 
    /**
     * PUBLISH channel message
     * 将信息 message 发送到指定的频道 channel
     * 返回值:接收到信息 message 的订阅者数量
     */
    @Test
    public void publish()
        jedis.publish("channel", "bar123");
        System.out.println("发布消息");
    
 
    /**
     * PUBSUB CHANNELS [pattern]
     * 列出当前的活跃频道。 活跃频道指的是那些至少有一个订阅者的频道, 订阅模式的客户端不计算在内。
     * pattern 参数是可选的:
     *    如果不给出 pattern 参数,那么列出订阅与发布系统中的所有活跃频道。
     *    如果给出 pattern 参数,那么只列出和给定模式 pattern 相匹配的那些活跃频道。
     */
    @Test
    public void PUBSUB()
        List<String> list = jedis.pubsubChannels("*");
        System.out.println(list);
    
 
    /**
     * PUBSUB NUMSUB [channel-1 ... channel-N]
     * 返回给定频道的订阅者数量, 订阅模式的客户端不计算在内。
     */
    @Test
    public void pubsubNumSub()
        Map<String,String> map = jedis.pubsubNumSub();
        System.out.println(map);
    
 
    /**
     * PUBSUB NUMPAT
     * 返回订阅模式的数量。
     * 注意, 这个命令返回的不是订阅模式的客户端的数量, 而是客户端订阅的所有模式的数量总和。
     */
    @Test
    public void pubsubNumPat()
        Long count = jedis.pubsubNumPat();
        System.out.println(count);
    
 

六、缺点

Redis可以提供基本的发布订阅功能,但毕竟不像消息队列那种专业级别,所以会存在以下缺点:

redis无法对消息持久化存储,消息一旦被发送,如果没有订阅者接收,数据会丢失
消息队列提供了消息传输保障,当客户端连接超时或事物回滚的等情况发生时,消息会重新发布给订阅者,redis没有该保障,导致的结果就是在订阅者断线超时或其他异常情况时,将会丢失所有发布者发布的信息
若订阅者订阅了频道,但自己读取消息的速度很慢的话,那么不断积压的消息会使redis输出缓冲区的体积变得越来越大,这可能使得redis本身的速度变慢,甚至直接崩溃

七、参考文档

官网文章: http://www.redis.cn/topics/pubsub.html

redis源码阅读-发布与订阅pub/sub

redis的发布订阅(pub/sub)是一种消息通信模式,由发布者(pub)发布消息,订阅者订阅(sub)消息。redis通过publish和subscribe等命令实现了发布与订阅模式。

我们先通过一张图了解下工作机制:

我们看下案列

# 客户端1
127.0.0.1:6379> subscribe notice
Reading messages... (press Ctrl-C to quit)
1) "subscribe"
2) "notice"
3) (integer) 1
# publish
127.0.0.1:6379> publish notice 'update version=1.1'
(integer) 1

# 客户端1
1) "message"
2) "notice"
3) "update version=1.1"

再看下消息通道的数据结构

struct redisServer   
    /**订阅客户端
     * key 为正在订阅的频道
     * val 为订阅这个频道的客户端
     * */
    dict *pubsub_channels;  /* Map channels to list of subscribed clients */
    //模式匹配
    list *pubsub_patterns;  /* A list of pubsub_patterns */


typedef struct client 
    dict *pubsub_channels;  /* channels a client is interested in (SUBSCRIBE) */
    list *pubsub_patterns;  /* patterns a client is interested in (SUBSCRIBE) */
 client;

typedef struct pubsubPattern 
    client *client;
    robj *pattern;
 pubsubPattern;

看下消费的源码

void subscribeCommand(client *c) 
    int j;
    //订阅多个通道的时候
    for (j = 1; j < c->argc; j++)
        pubsubSubscribeChannel(c,c->argv[j]);
    c->flags |= CLIENT_PUBSUB;

int pubsubSubscribeChannel(client *c, robj *channel) 
    dictEntry *de;
    list *clients = NULL;
    int retval = 0;

    /* Add the channel to the client -> channels hash table */
    //将客户端订阅的频道都放入c->pubsub_channels的hash表中
    if (dictAdd(c->pubsub_channels,channel,NULL) == DICT_OK) 
        retval = 1;
        incrRefCount(channel);
        /* Add the client to the channel -> list of clients hash table */
        /**
         * 将客户端添加到hash表中,以channel为key
         */
        de = dictFind(server.pubsub_channels,channel);
        //不存在客户端,创建列表
        if (de == NULL) 
            clients = listCreate();
            dictAdd(server.pubsub_channels,channel,clients);
            incrRefCount(channel);
         else 
            clients = dictGetVal(de);
        
        //追加
        listAddNodeTail(clients,c);
    
    /* Notify the client */
    addReply(c,shared.mbulkhdr[3]);
    addReply(c,shared.subscribebulk);
    addReplyBulk(c,channel);
    addReplyLongLong(c,clientSubscriptionsCount(c));
    return retval;

再看下发送的源码

void publishCommand(client *c) 
    int receivers = pubsubPublishMessage(c->argv[1],c->argv[2]);
   
/**
 * 发布广播消息
 * @param channel
 * @param message
 * @return
 */
int pubsubPublishMessage(robj *channel, robj *message) 
    int receivers = 0;
    dictEntry *de;
    listNode *ln;
    listIter li;

    /* Send to clients listening for that channel */
    /**
     * 从server.pubsub_channels中获取订阅channel的客户端链表
     */
    de = dictFind(server.pubsub_channels,channel);
    if (de) 
        list *list = dictGetVal(de);
        listNode *ln;
        listIter li;

        listRewind(list,&li);
        //遍历所有的客户端,一个个的通知
        while ((ln = listNext(&li)) != NULL) 

            client *c = ln->value;
            //将消息以固定格式回复,message,通道,消息内容
            addReply(c,shared.mbulkhdr[3]);
            addReply(c,shared.messagebulk);
            addReplyBulk(c,channel);
            addReplyBulk(c,message);
            receivers++;
        
    
    /* Send to clients listening to matching channels */
    /**
     * 从server.pubsub_patterns 获取所有的客户端,
     */
    if (listLength(server.pubsub_patterns)) 
        listRewind(server.pubsub_patterns,&li);
        channel = getDecodedObject(channel);
        while ((ln = listNext(&li)) != NULL) 
            pubsubPattern *pat = ln->value;
            //按模式匹配所有的客户端,只要pat->pattern->ptr能匹配到对应的channel->ptr,就发
            if (stringmatchlen((char*)pat->pattern->ptr,
                                sdslen(pat->pattern->ptr),
                                (char*)channel->ptr,
                                sdslen(channel->ptr),0)) 
                addReply(pat->client,shared.mbulkhdr[4]);
                addReply(pat->client,shared.pmessagebulk);
                addReplyBulk(pat->client,pat->pattern);
                addReplyBulk(pat->client,channel);
                addReplyBulk(pat->client,message);
                receivers++;
            
        
        decrRefCount(channel);
    
    return receivers;

  • 可以看到发布订阅通过redis服务中的server.pubsub_channels来服务的
  • 当客户端发起subscribe的命令的时候,会把客户端订阅的channels放入到server.pubsub_channels中,对应的channel通过hash路由到对应的桶,然后找到对应的channel,channel对应的value是一个链表,维护所有订阅的客户端
  • 当publish的时候,根据channel找到对应的value,一个个的遍历所有的客户端,并将publish的内容通知给订阅的客户端,除了具体的channel,还有个模式匹配,原理差不多
  • 从原来来看,发布时,订阅的服务必须在线,要不然消息就没了,redis不会存储这些消息
  • 这个场景比较适合版本的更新,比如规则包发布,所有的在线服务都需要更新(得注意点,没法做到强一致,一致性得自己根据业务需求考虑)

redis系列文章

redis源码阅读-入门篇

redis源码阅读二-终于把redis的启动流程搞明白了

redis源码阅读三-终于把主线任务执行搞明白了

redis源码阅读四-我把redis6里的io多线程执行流程梳理明白了

redis源码阅读五-为什么大量过期key会阻塞redis?

redis源码六-redis中的缓存淘汰策略处理分析

redis源码阅读-之哨兵流程

redis源码阅读-持久化之RDB

redis源码阅读-持久化之aof

redis源码阅读-rehash详解

阅读redis源码的时候一些c知识

阅读redis持久化RDB源码的时候一些c知识

linux中的文件描述符与套接字socket

redis中的IO多路复用select和epoll

Reactor模式详解及redis如何使用

redis的key过期了还能取出来?

本文是Redis源码剖析系列博文,有想深入学习Redis的同学,欢迎star和关注;
Redis中文注解版:https://github.com/yxkong/redis/tree/5.0
如果觉得本文对你有用,欢迎一键三连;
同时可以关注微信公众号5ycode获取第一时间的更新哦;

以上是关于redis之pub/sub(发布与订阅)的主要内容,如果未能解决你的问题,请参考以下文章

redis源码阅读-发布与订阅pub/sub

php redis pub/sub(Publish/Subscribe,发布/订阅的信息系统)之基本使用

Redis Pub/Sub 发布订阅模式的深度解析与实现消息队列

#yyds干货盘点#Redis之Pub/Sub

使用redis pub/sub 时间订阅与发布消息

大数据之Redis:Redis的发布和订阅