并发9借助redis 实现生产消费,消息订阅发布模式队列

Posted cutter-point

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了并发9借助redis 实现生产消费,消息订阅发布模式队列相关的知识,希望对你有一定的参考价值。

这个就是一个消息可以被多次消费的范例了

其实这个实现的方式可以参考我之前的设计模式,观察者模式

https://www.cnblogs.com/cutter-point/p/5249780.html

 

不过有一点需要注意一下啊,这个消息发布的时候,好像是不支持字节数据的,里面好像会对字节进行转换,这样的结果就是导致我最后无法吧相应的字节转换成我之前序列化的对象

不知道是不是ObjectInputStream和ObjectOutputStream实现不是很好的原因,还是什么,反正反序列化的时候,有些不可见的字符应该是被截掉了

 

消息发布者

package queue.redisQueue;

import queue.fqueue.vo.TempVo;
import redis.clients.jedis.Jedis;

import java.io.ByteArrayOutputStream;
import java.io.ObjectOutputStream;
import java.util.UUID;

/**
 * @ProjectName: cutter-point
 * @Package: queue.redisQueue
 * @ClassName: RedisQueueProducter3
 * @Author: xiaof
 * @Description: 订阅,发布模式 发布消息
 * @Date: 2019/6/12 16:47
 * @Version: 1.0
 */
public class RedisQueueProducter3 implements Runnable 

    private Jedis jedis;
    private String queueKey;

    public RedisQueueProducter3(Jedis jedis, String queueKey) 
        this.jedis = jedis;
        this.queueKey = queueKey;
    

    public void putMessage() 
        try 
            Thread.sleep((long) (Math.random() * 1000));

            //不存在则创建,存在则直接插入
            //向redis队列中存放数据
            //生成数据
            TempVo tempVo = new TempVo();
            tempVo.setName(Thread.currentThread().getName() + ",time is:" + UUID.randomUUID());

            try 
                int i = 0;
                while(i < 10) 
                    //反馈订阅的数量
                    long num = jedis.publish(queueKey.getBytes(), tempVo.toString().getBytes());
                    if(num > 0) 
                        System.out.println("成功!num:" + num);
                        break;
                    
                    ++i;
                
             catch (Exception e) 
                System.out.println("失败!");
            

         catch (Exception e) 
            e.printStackTrace();
        
    

    @Override
    public void run() 

        while(true) 
            putMessage();

        

    

 

消息消费者

package queue.redisQueue;

import queue.fqueue.vo.EventVo;
import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPubSub;
import redis.clients.util.SafeEncoder;

import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.UnsupportedEncodingException;

/**
 * @ProjectName: cutter-point
 * @Package: queue.redisQueue
 * @ClassName: RedisQueueConsume3
 * @Author: xiaof
 * @Description: 发布订阅消息,订阅线程
 * @Date: 2019/6/12 16:53
 * @Version: 1.0
 */
public class RedisQueueConsume3 implements Runnable 

    private Jedis jedis;
    private String queueKey;

    class myJedisPubSub extends JedisPubSub 
        /** JedisPubSub类是一个没有抽象方法的抽象类,里面方法都是一些空实现
         * 所以可以选择需要的方法覆盖,这儿使用的是SUBSCRIBE指令,所以覆盖了onMessage
         * 如果使用PSUBSCRIBE指令,则覆盖onPMessage方法
         * 当然也可以选择BinaryJedisPubSub,同样是抽象类,但方法参数为byte[]
         **/
        @Override
        public void onMessage(String channel, String message) 
            System.out.println(Thread.currentThread().getName()+"-接收到消息:channel=" + channel + ",message=" + message);
            //接收到exit消息后退出
            System.out.println(message);
        
    

    public RedisQueueConsume3(Jedis jedis, String queueKey) 
        this.jedis = jedis;
        this.queueKey = queueKey;
    

    public void consumerMessage() 
        jedis.subscribe(new myJedisPubSub(), queueKey);
    

    @Override
    public void run() 
        while (true) 
            consumerMessage();
        
    

 

测试代码:

@Test
    public void test4() throws InterruptedException 

        //读写取数据
        for(int i = 0; i < 2; ++i) 
            System.out.println("输出测试" + i);
            RedisQueueProducter3 producter = new RedisQueueProducter3(jedisPool.getResource(), "xiaof");
            Thread t = new Thread(producter);
            t.start();
        

        while(true) 
            Thread.sleep(1000);
        
    

    @Test
    public void test5() throws InterruptedException 

        //读写取数据
        for(int i = 0; i < 5; ++i) 
            System.out.println("输出测试" + i);
            //切记一定要重新获取Resource,不然无法并发操作
            RedisQueueConsume3 fqueueConsume = new RedisQueueConsume3(jedisPool.getResource(), "xiaof");
            Thread t = new Thread(fqueueConsume);
            t.setDaemon(true);
            t.start();
        

        while(true) 
            Thread.sleep(1000);
        
    

 

效果展示

 

同一消息被多个订阅者同步消费

技术图片

 

以上是关于并发9借助redis 实现生产消费,消息订阅发布模式队列的主要内容,如果未能解决你的问题,请参考以下文章

利用Redis作消息队列,实现生产消费和发布订阅

并发8借助redis 实现多线程生产消费阻塞队列

利用redis简单实现消息订阅和发布

利用redis简单实现消息订阅和发布

Redis实现消息队列之发布订阅模式

Redis消息队列