(转)java redis使用之利用jedis实现redis消息队列

Posted yewg

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了(转)java redis使用之利用jedis实现redis消息队列相关的知识,希望对你有一定的参考价值。

应用场景

最近在公司做项目,需要对聊天内容进行存储,考虑到数据库查询的IO连接数高、连接频繁的因素,决定利用缓存做。

从网上了解到redis可以对所有的内容进行二进制的存储,而java是可以对所有对象进行序列化的,序列化的方法会在下面的代码中提供实现。

序列化

这里我编写了一个java序列化的工具,主要是对对象转换成byte[],和根据byte[]数组反序列化成java对象;

主要是用到了ByteArrayOutputStream和ByteArrayInputStream;

需要注意的是每个自定义的需要序列化的对象都要实现Serializable接口;

其代码如下:

package com.bean.util;

import java.io.ByteArrayInputStream;
import java.io.ByteArrayOutputStream;
import java.io.IOException;
import java.io.ObjectInputStream;
import java.io.ObjectOutputStream;
public class ObjectUtil {
	/**对象转byte[]
	 * @param obj
	 * @return
	 * @throws IOException
	 */
	public static byte[] objectToBytes(Object obj) throws Exception{
		ByteArrayOutputStream bo = new ByteArrayOutputStream();
		ObjectOutputStream oo = new ObjectOutputStream(bo);
		oo.writeObject(obj);
		byte[] bytes = bo.toByteArray();
		bo.close();
		oo.close();
		return bytes;
	}
	/**byte[]转对象
	 * @param bytes
	 * @return
	 * @throws Exception
	 */
	public static Object bytesToObject(byte[] bytes) throws Exception{
		ByteArrayInputStream in = new ByteArrayInputStream(bytes);
		ObjectInputStream sIn = new ObjectInputStream(in);
		return sIn.readObject();
	}
}

定义一个消息类,主要用于接收消息内容和消息下表的设置。

package com.bean;

import java.io.Serializable;

/**定义消息类接收消息内容和设置消息的下标
 * @author lenovo
 *
 */
public class Message implements Serializable{
	private static final long serialVersionUID = 7792729L;
	private int id;
	private String content;
	public int getId() {
		return id;
	}
	public void setId(int id) {
		this.id = id;
	}
	public String getContent() {
		return content;
	}
	public void setContent(String content) {
		this.content = content;
	}
}

利用redis做队列,我们采用的是redis中list的push和pop操作;

结合队列的特点:

  • 只允许在一端插入
  • 新元素只能在队列的尾部
  • FIFO:先进先出原则

    redis中lpush(rpop)或rpush(lpop)可以满足要求,而redis中list 里要push或pop的对象仅需要转换成byte[]即可

    java采用Jedis进行redis的存储和redis的连接池设置

    package com.redis.util;
    
    import java.util.List;
    import java.util.Map;
    import java.util.Set;
    
    import redis.clients.jedis.Jedis;
    import redis.clients.jedis.JedisPool;
    import redis.clients.jedis.JedisPoolConfig;
    
    public class JedisUtil {
    
    	private static String JEDIS_IP;
    	private static int JEDIS_PORT;
    	private static String JEDIS_PASSWORD;
    	//private static String JEDIS_SLAVE;
    
    	private static JedisPool jedisPool;
    
    	static {
    		Configuration conf = Configuration.getInstance();
    		JEDIS_IP = conf.getString("jedis.ip", "127.0.0.1");
    		JEDIS_PORT = conf.getInt("jedis.port", 6379);
    		JEDIS_PASSWORD = conf.getString("jedis.password", null);
    		JedisPoolConfig config = new JedisPoolConfig();
    		config.setMaxActive(5000);
    		config.setMaxIdle(256);//20
    		config.setMaxWait(5000L);
    		config.setTestOnBorrow(true);
    		config.setTestOnReturn(true);
    		config.setTestWhileIdle(true);
    		config.setMinEvictableIdleTimeMillis(60000l);
    		config.setTimeBetweenEvictionRunsMillis(3000l);
    		config.setNumTestsPerEvictionRun(-1);
    		jedisPool = new JedisPool(config, JEDIS_IP, JEDIS_PORT, 60000);
    	}
    
    	/**
    	 * 获取数据
    	 * @param key
    	 * @return
    	 */
    	public static String get(String key) {
    
    		String value = null;
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			value = jedis.get(key);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    		} finally {
    			//返还到连接池
    			close(jedis);
    		}
    
    		return value;
    	}
    
    	public static void close(Jedis jedis) {
    		try {
    			jedisPool.returnResource(jedis);
    
    		} catch (Exception e) {
    			if (jedis.isConnected()) {
    				jedis.quit();
    				jedis.disconnect();
    			}
    		}
    	}
    
    	/**
    	 * 获取数据
    	 * 
    	 * @param key
    	 * @return
    	 */
    	public static byte[] get(byte[] key) {
    
    		byte[] value = null;
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			value = jedis.get(key);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    		} finally {
    			//返还到连接池
    			close(jedis);
    		}
    
    		return value;
    	}
    
    	public static void set(byte[] key, byte[] value) {
    
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			jedis.set(key, value);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    		} finally {
    			//返还到连接池
    			close(jedis);
    		}
    	}
    
    	public static void set(byte[] key, byte[] value, int time) {
    
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			jedis.set(key, value);
    			jedis.expire(key, time);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    		} finally {
    			//返还到连接池
    			close(jedis);
    		}
    	}
    
    	public static void hset(byte[] key, byte[] field, byte[] value) {
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			jedis.hset(key, field, value);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    		} finally {
    			//返还到连接池
    			close(jedis);
    		}
    	}
    
    	public static void hset(String key, String field, String value) {
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			jedis.hset(key, field, value);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    		} finally {
    			//返还到连接池
    			close(jedis);
    		}
    	}
    
    	/**
    	 * 获取数据
    	 * 
    	 * @param key
    	 * @return
    	 */
    	public static String hget(String key, String field) {
    
    		String value = null;
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			value = jedis.hget(key, field);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    		} finally {
    			//返还到连接池
    			close(jedis);
    		}
    
    		return value;
    	}
    
    	/**
    	 * 获取数据
    	 * 
    	 * @param key
    	 * @return
    	 */
    	public static byte[] hget(byte[] key, byte[] field) {
    
    		byte[] value = null;
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			value = jedis.hget(key, field);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    		} finally {
    			//返还到连接池
    			close(jedis);
    		}
    
    		return value;
    	}
    
    	public static void hdel(byte[] key, byte[] field) {
    
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			jedis.hdel(key, field);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    		} finally {
    			//返还到连接池
    			close(jedis);
    		}
    	}
    
    	/**
    	 * 存储REDIS队列 顺序存储
    	 * @param byte[] key reids键名
    	 * @param byte[] value 键值
    	 */
    	public static void lpush(byte[] key, byte[] value) {
    
    		Jedis jedis = null;
    		try {
    
    			jedis = jedisPool.getResource();
    			jedis.lpush(key, value);
    
    		} catch (Exception e) {
    
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    
    		} finally {
    
    			//返还到连接池
    			close(jedis);
    
    		}
    	}
    
    	/**
    	 * 存储REDIS队列 反向存储
    	 * @param byte[] key reids键名
    	 * @param byte[] value 键值
    	 */
    	public static void rpush(byte[] key, byte[] value) {
    
    		Jedis jedis = null;
    		try {
    
    			jedis = jedisPool.getResource();
    			jedis.rpush(key, value);
    
    		} catch (Exception e) {
    
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    
    		} finally {
    
    			//返还到连接池
    			close(jedis);
    
    		}
    	}
    
    	/**
    	 * 将列表 source 中的最后一个元素(尾元素)弹出,并返回给客户端
    	 * @param byte[] key reids键名
    	 * @param byte[] value 键值
    	 */
    	public static void rpoplpush(byte[] key, byte[] destination) {
    
    		Jedis jedis = null;
    		try {
    
    			jedis = jedisPool.getResource();
    			jedis.rpoplpush(key, destination);
    
    		} catch (Exception e) {
    
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    
    		} finally {
    
    			//返还到连接池
    			close(jedis);
    
    		}
    	}
    
    	/**
    	 * 获取队列数据
    	 * @param byte[] key 键名
    	 * @return
    	 */
    	public static List<byte[]> lpopList(byte[] key) {
    
    		List<byte[]> list = null;
    		Jedis jedis = null;
    		try {
    
    			jedis = jedisPool.getResource();
    			list = jedis.lrange(key, 0, -1);
    
    		} catch (Exception e) {
    
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    
    		} finally {
    
    			//返还到连接池
    			close(jedis);
    
    		}
    		return list;
    	}
    
    	/**
    	 * 获取队列数据
    	 * @param byte[] key 键名
    	 * @return
    	 */
    	public static byte[] rpop(byte[] key) {
    
    		byte[] bytes = null;
    		Jedis jedis = null;
    		try {
    
    			jedis = jedisPool.getResource();
    			bytes = jedis.rpop(key);
    
    		} catch (Exception e) {
    
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    
    		} finally {
    
    			//返还到连接池
    			close(jedis);
    
    		}
    		return bytes;
    	}
    
    	public static void hmset(Object key, Map<String, String> hash) {
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			jedis.hmset(key.toString(), hash);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    
    		} finally {
    			//返还到连接池
    			close(jedis);
    
    		}
    	}
    
    	public static void hmset(Object key, Map<String, String> hash, int time) {
    		Jedis jedis = null;
    		try {
    
    			jedis = jedisPool.getResource();
    			jedis.hmset(key.toString(), hash);
    			jedis.expire(key.toString(), time);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    
    		} finally {
    			//返还到连接池
    			close(jedis);
    
    		}
    	}
    
    	public static List<String> hmget(Object key, String... fields) {
    		List<String> result = null;
    		Jedis jedis = null;
    		try {
    
    			jedis = jedisPool.getResource();
    			result = jedis.hmget(key.toString(), fields);
    
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    
    		} finally {
    			//返还到连接池
    			close(jedis);
    
    		}
    		return result;
    	}
    
    	public static Set<String> hkeys(String key) {
    		Set<String> result = null;
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			result = jedis.hkeys(key);
    
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    
    		} finally {
    			//返还到连接池
    			close(jedis);
    
    		}
    		return result;
    	}
    
    	public static List<byte[]> lrange(byte[] key, int from, int to) {
    		List<byte[]> result = null;
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			result = jedis.lrange(key, from, to);
    
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    
    		} finally {
    			//返还到连接池
    			close(jedis);
    
    		}
    		return result;
    	}
    
    	public static Map<byte[], byte[]> hgetAll(byte[] key) {
    		Map<byte[], byte[]> result = null;
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			result = jedis.hgetAll(key);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    
    		} finally {
    			//返还到连接池
    			close(jedis);
    		}
    		return result;
    	}
    
    	public static void del(byte[] key) {
    
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			jedis.del(key);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    		} finally {
    			//返还到连接池
    			close(jedis);
    		}
    	}
    
    	public static long llen(byte[] key) {
    
    		long len = 0;
    		Jedis jedis = null;
    		try {
    			jedis = jedisPool.getResource();
    			jedis.llen(key);
    		} catch (Exception e) {
    			//释放redis对象
    			jedisPool.returnBrokenResource(jedis);
    			e.printStackTrace();
    		} finally {
    			//返还到连接池
    			close(jedis);
    		}
    		return len;
    	}
    
    }
    

    Configuration主要用于读取redis配置信息

    package com.redis.util;
    
    import java.io.IOException;
    import java.io.InputStream;
    import java.util.Properties;
    
    public class Configuration extends Properties {
    
    	private static final long serialVersionUID = 50440463580273222L;
    
    	private static Configuration instance = null;
    
    	public static synchronized Configuration getInstance() {
    		if (instance == null) {
    			instance = new Configuration();
    		}
    		return instance;
    	}
    
    	public String getProperty(String key, String defaultValue) {
    		String val = getProperty(key);
    		return (val == null || val.isEmpty()) ? defaultValue : val;
    	}
    
    	public String getString(String name, String defaultValue) {
    		return this.getProperty(name, defaultValue);
    	}
    
    	public int getInt(String name, int defaultValue) {
    		String val = this.getProperty(name);
    		return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
    	}
    
    	public long getLong(String name, long defaultValue) {
    		String val = this.getProperty(name);
    		return (val == null || val.isEmpty()) ? defaultValue : Integer.parseInt(val);
    	}
    
    	public float getFloat(String name, float defaultValue) {
    		String val = this.getProperty(name);
    		return (val == null || val.isEmpty()) ? defaultValue : Float.parseFloat(val);
    	}
    
    	public double getDouble(String name, double defaultValue) {
    		String val = this.getProperty(name);
    		return (val == null || val.isEmpty()) ? defaultValue : Double.parseDouble(val);
    	}
    
    	public byte getByte(String name, byte defaultValue) {
    		String val = this.getProperty(name);
    		return (val == null || val.isEmpty()) ? defaultValue : Byte.parseByte(val);
    	}
    
    	public Configuration() {
    		InputStream in = ClassLoader.getSystemClassLoader().getResourceAsStream("config.xml");
    		try {
    			this.loadFromXML(in);
    			in.close();
    		} catch (IOException e) {
    		}
    	}
    }
    


    测试redis队列

    package com.quene.test;
    
    import com.bean.Message;
    import com.bean.util.ObjectUtil;
    import com.redis.util.JedisUtil;
    
    public class TestRedisQuene {
    	public static byte[] redisKey = "key".getBytes();
    	static{
    		init();
    	}
    	public static void main(String[] args) {
    		pop();
    	}
    
    	private static void pop() {
    		byte[] bytes = JedisUtil.rpop(redisKey);
    		Message msg = (Message) ObjectUtil.bytesToObject(bytes);
    		if(msg != null){
    			System.out.println(msg.getId()+"   "+msg.getContent());
    		}
    	}
    
    	private static void init() {
    		Message msg1 = new Message(1, "内容1");
    		JedisUtil.lpush(redisKey, ObjectUtil.objectToBytes(msg1));
    		Message msg2 = new Message(2, "内容2");
    		JedisUtil.lpush(redisKey, ObjectUtil.objectToBytes(msg2));
    		Message msg3 = new Message(3, "内容3");
    		JedisUtil.lpush(redisKey, ObjectUtil.objectToBytes(msg3));
    	}
    
    }
    测试结果如下:
    1   内容1
    2   内容2

    3   内容3

    转自http://www.itnose.net/detail/6284422.html

以上是关于(转)java redis使用之利用jedis实现redis消息队列的主要内容,如果未能解决你的问题,请参考以下文章

Redis的Java客户端Jedis的八种调用方式(事务管道分布式…)介绍(转)

Spring Data Redis与Jedis的选择(转)

SpringBoot 操作 Redis的各种实现(Jedis、Redisson的区别比较)

redis入门到精通系列:Jedis--使用java操作redis详解

redis入门到精通系列:Jedis--使用java操作redis详解

redis入门到精通系列:Jedis--使用java操作redis详解