如题,直接上代码:
1 import java.util.Iterator; 2 import java.util.concurrent.ConcurrentHashMap; 3 import java.util.concurrent.TimeUnit; 4 5 import org.slf4j.Logger; 6 import org.slf4j.LoggerFactory; 7 8 /** 9 * 实现延时过期MAP集合 支持自定义过期触发事件 10 * 11 * @ClassName: BaseExpireMap 12 * @Description: TODO 13 * @author: wangs 14 * @date: 2017-12-25 上午9:59:04 15 * @param <K> 16 * @param <V> 17 */ 18 public abstract class BaseExpireMap<K, V> { 19 protected static final Logger logger = LoggerFactory.getLogger(BaseExpireMap.class); 20 private long expTime = 0L; 21 private TimeUnit unit = null; 22 /** 23 * 线程安全的map容器 24 */ 25 ConcurrentHashMap<K, V> expireMap = null; 26 /** 27 * 控制过期时间 28 */ 29 ConcurrentHashMap<K, Long> delayMap = null; 30 31 /** 32 * 将map提供给外部程序操作 33 * @Title: getDataMap 34 * @Description: TODO 35 * @return 36 * @return: ConcurrentHashMap<K,V> 37 */ 38 public ConcurrentHashMap<K, V> getDataMap(){ 39 return this.expireMap; 40 } 41 42 public BaseExpireMap(long expTime, TimeUnit unit) { 43 expireMap = new ConcurrentHashMap<K, V>(); 44 delayMap = new ConcurrentHashMap<K, Long>(); 45 this.expTime = expTime; 46 this.unit = unit; 47 // 启动监听线程 48 BaseExpireCheckTask task = new BaseExpireCheckTask(expireMap, delayMap) { 49 @Override 50 protected void expireEvent(K key,V val) { 51 baseExpireEvent(key,val); 52 } 53 }; 54 task.setDaemon(false); 55 task.start(); 56 } 57 58 /** 59 * 过期事件 子类实现 60 * 61 * @Title: baseExpireEvent 62 * @Description: TODO 63 * @param key 64 * @return: void 65 */ 66 protected abstract void baseExpireEvent(K key,V val); 67 68 public V put(K key, V val) { 69 delayMap.put(key, getExpireTime()); 70 return expireMap.put(key, val); 71 } 72 73 public V remove(K key) { 74 return expireMap.remove(key); 75 } 76 77 public V get(K key){ 78 return expireMap.get(key); 79 } 80 81 private Long getExpireTime() { 82 return unit.toMillis(expTime) + System.currentTimeMillis(); 83 } 84 85 public static void main(String[] args) { 86 System.out.println(TimeUnit.SECONDS.toMinutes(120)); 87 System.out.println(TimeUnit.MICROSECONDS.toMillis(120)); 88 System.out.println(TimeUnit.MILLISECONDS.toMillis(120)); 89 } 90 91 /** 92 * 扫描线程 定期移除过期元素并触发过期事件 93 * 94 * @ClassName: BaseExpireCheckTask 95 * @Description: TODO 96 * @author: wangs 97 * @date: 2017-12-25 上午9:59:18 98 */ 99 private abstract class BaseExpireCheckTask extends Thread { 100 ConcurrentHashMap<K, Long> delayMap = null; 101 ConcurrentHashMap<K, V> expireMap = null; 102 103 public BaseExpireCheckTask(ConcurrentHashMap<K, V> expireMap, ConcurrentHashMap<K, Long> delayMap) { 104 this.delayMap = delayMap; 105 this.expireMap = expireMap; 106 } 107 108 protected abstract void expireEvent(K key,V val); 109 110 public void run() { 111 Iterator<K> it = null; 112 K key = null; 113 while (true) { 114 if (delayMap != null && !delayMap.isEmpty()) { 115 it = delayMap.keySet().iterator(); 116 while (it.hasNext()) { 117 key = it.next(); 118 if (delayMap.get(key) <= System.currentTimeMillis()) {// 元素超时 119 // 触发回调 120 expireEvent(key,expireMap.get(key)); 121 // 移除 122 it.remove(); 123 expireMap.remove(key); 124 delayMap.remove(key); 125 } 126 } 127 } 128 try { 129 TimeUnit.MILLISECONDS.sleep(200); 130 } catch (InterruptedException e) { 131 logger.error(e.getMessage()); 132 } 133 } 134 } 135 } 136 }
上面是一个通用的延迟过期MAP容器,由两个线程安全的map集合和一个扫描线程组成,该容器会定时移除超时的元素并在移除时触发指定事件expireEvent,该方法的两个参数Key和val分别代表过期元素的键值,定义了元素过期时的触发事件,等待子类实现。
下面是一个使用实例:
1 import java.util.concurrent.TimeUnit; 2 3 import com.montnets.kafka.Producer; 4 import com.montnets.smsverify.bean.VerifyAccountSeatBean; 5 import com.montnets.smsverify.common.StaticValues; 6 import com.montnets.smsverify.netty.utils.GsonUtil; 7 8 /** 9 * 进退坐席缓存 10 * 11 * @ClassName: SeatCache 12 * @Description: 单例 13 * @author: wangs 14 * @date: 2017-12-25 下午2:23:21 15 */ 16 public class SeatCache extends BaseExpireMap<String, VerifyAccountSeatBean> { 17 private static SeatCache instance = null; 18 private static Producer producer = null; 19 static long expTime = 0L; 20 static TimeUnit unit = null; 21 22 private SeatCache(long expTime, TimeUnit unit) { 23 super(expTime, unit); 24 } 25 26 public synchronized static void init(long expTime, TimeUnit unit) { 27 SeatCache.expTime = expTime; 28 SeatCache.unit = unit; 29 if (instance == null) { 30 instance = new SeatCache(expTime, unit); 31 producer = new Producer(); 32 } 33 } 34 35 public synchronized static SeatCache getInstance() { 36 if (instance == null) { 37 if (unit == null) 38 throw (new IllegalArgumentException("please call init at first")); 39 instance = new SeatCache(expTime, unit); 40 producer = new Producer(); 41 } 42 return instance; 43 } 44 45 46 /** 47 * 过期事件 48 */ 49 @Override 50 protected void baseExpireEvent(String key, VerifyAccountSeatBean bean) { 51 if(bean!=null) 52 bean.setIsManual(1); //非手动退坐席 53 //更新 54 updateOffOnlie(bean); 55 //写kafka 56 send2kafka(bean); 57 } 58 59 /** 60 * 退坐席之前更新时间标记 61 * @Title: updateOffOnlie 62 * @Description: TODO 63 * @param bean 64 * @return: void 65 */ 66 public static void updateOffOnlie(VerifyAccountSeatBean bean) { 67 if (bean != null) { 68 long now = System.currentTimeMillis(); 69 // 退坐席时间 70 bean.setOutSeatTime(now); 71 // 在线时长 72 bean.setOnlineTime(now - bean.getInSeatTime()); 73 } 74 } 75 76 /** 77 * 退坐席时将数据写入kafka 78 * 79 * @Title: send2kafka 80 * @Description: TODO 81 * @param topic 82 * @param bean 83 * @return: void 84 */ 85 public static void send2kafka(VerifyAccountSeatBean bean) { 86 if (bean == null) 87 return; 88 producer.send(StaticValues.SEAT_DATA, GsonUtil.toJson(bean)); 89 } 90 }
推荐一个很强大的过期缓存第三方工具包,com.google.common.cache.Cache ,它提供多种回收策略,如基于创建时间或最后一次访问时间计时回收、基于对象容量大小回收、按照命中率使用LRU算法回收等,并且一样可以自定义过期回收触发事件(写这个工具的时候我还不知道有这么个强大的玩意 - -);另外还提供命中统计的API,功能很全,可以用作数据库到前端页面中间的缓存模块,推荐使用。
世间之事莫强求,凡事太尽,缘分势必早尽。