涓€涓畝鍗曠殑 鏀寔寤舵椂娑堟伅 ,鎸佷箙鍖栨秷鎭殑娑堟伅闃熷垪 鐨凧ava瀹炵幇
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了涓€涓畝鍗曠殑 鏀寔寤舵椂娑堟伅 ,鎸佷箙鍖栨秷鎭殑娑堟伅闃熷垪 鐨凧ava瀹炵幇相关的知识,希望对你有一定的参考价值。
鏍囩锛?a href='http://www.mamicode.com/so/1/zha' title='zha'>zha extends operation rds cti text resource str evel
涓昏鐨勬秷鎭鐞嗚€呭璞★細
package com.rynk.mugua.trading.biz.service.impl; import java.util.concurrent.DelayQueue; import org.springframework.beans.factory.annotation.Autowired; import org.springframework.stereotype.Component; import com.rynk.commons.entity.QueueMessage; import com.rynk.mugua.trading.biz.commons.RedisKeyResolver; import com.rynk.mugua.trading.biz.commons.lock.DistributedLockHandler; import com.rynk.mugua.trading.biz.eth.DelayedTack; import lombok.extern.slf4j.Slf4j; /** * 寤舵椂娑堟伅绠$悊鍛? * @author ZHANGYUKUNUP * */ @Component @Slf4j public class QueueManger { MessagePersistent messagePersistent; /** * 寤舵椂娑堟伅闃熷垪 */ private DelayQueue<DelayedTack> dQueue = new DelayQueue<>(); /** * 娑堟伅浠诲姟澶勭悊绾跨▼ */ private Thread taskThread; @Autowired DistributedLockHandler lock; public QueueManger() { taskThread = new TaskThread(); taskThread.start(); } /** * 浠诲姟绾跨▼ * @author ZHANGYUKUNUP * */ class TaskThread extends Thread{ @Override public void run() { while (true) { try { DelayedTack delayedTack = dQueue.take(); QueueMessage queueMessage = delayedTack.getQueueMessage(); if( queueMessage == null ) { return ; } //绠€鍗曠殑鍔犱釜閿佷繚璇佹秷鎭笉琚噸澶嶆秷璐癸紙闇€瑕佷繚璇佽В閿佸墠 鏁版嵁琚彁浜ゅ埌鏁版嵁搴擄紝鍚﹁€呬細鍑哄悓姝ラ棶棰?锛屼篃灏辨槸璇翠笉鑳芥湁鏇村姞澶х殑 浜嬪姟鑼冨洿 鍖呰9褰撳墠鏂规硶 锛? if( lock.tryLock( RedisKeyResolver.getMsgrKey( queueMessage.getId() ) ) ) { //濡傛灉杩欎釜娑堟伅琚甯告秷璐癸紝閭d箞涔呮爣璁版秷璐规垚鍔燂紝濡傛灉寮傚父娑堣垂锛岄偅涔堜箙閲嶈瘯杩欎釜娑堟伅 try { if( QueueManger.this.messageDispense(delayedTack.getQueueMessage()) ) { messagePersistent.succeed( queueMessage ); }else { QueueManger.this.reTry( queueMessage ); } }catch (Exception e) { e.printStackTrace(); QueueManger.this.reTry(queueMessage); }finally { lock.unLock( RedisKeyResolver.getMsgrKey( queueMessage.getId() ) ); } } } catch (Exception e) { e.printStackTrace(); } } } } /** * 閲嶈瘯 * @param queueMessage */ protected void reTry(QueueMessage queueMessage) { messagePersistent.reTry(queueMessage); } /** * 鍒嗗彂娑堟伅 * @param queueMessage */ protected boolean messageDispense(QueueMessage queueMessage) { return messagePersistent.consume(queueMessage); } /** * 娣诲姞涓€涓欢鏃舵秷鎭? * @param delayedTack */ public void put(DelayedTack delayedTack) { dQueue.put(delayedTack); } /** * 鏌ヨ鏈鐞嗙殑寤舵椂娑堟伅鏁伴噺 * @return */ public int size() { return dQueue.size(); } /** * 娑堟伅澶勭悊绾跨▼瀛樻椿鐘舵€? * @return */ public boolean isAlive() { return taskThread.isAlive(); } }
銆€
娑堟伅瀵硅薄锛?/p>
package com.rynk.commons.entity; import java.util.Date; import org.springframework.data.mongodb.core.mapping.Document; import com.rynk.commons.entity.em.QueueMessageType; import com.rynk.commons.entity.em.TransferRecordStatus; import com.rynk.commons.util.SnGeneratorUtil; import lombok.Data; @Data @Document(collection = "mg_queue_message") public class QueueMessage extends BaseEntity { /** * 鍞ら啋鏃堕棿 */ private Date awakenDate; /** * 澶勭悊鐘舵€? */ private TransferRecordStatus transferRecordStatus; /** * 娑堟伅浣? */ private String body; /** * 娑堟伅浣撶被鍨? */ private QueueMessageType type; /** * 閲嶈瘯娆℃暟 */ private Integer tryTimes; /** * 鏈€鍚庝竴娆℃牳瀵规椂闂? */ private Date lastCheckDate; /** * * @param body 娑堟伅浣撴潵婧愮被鍨? * @param type 娑堟伅绫诲瀷 * @param delayed 寤舵椂 * @return */ public static QueueMessage newInstance( String body , QueueMessageType type , long delayed ) { QueueMessage item = new QueueMessage(); item.setId( SnGeneratorUtil.getId().toString() ); item.setCreateDate( new Date() ); item.setDr(false); item.setTransferRecordStatus( TransferRecordStatus.WAIT ); item.setTryTimes(1); item.setBody(body); item.setType(type); item.setAwakenDate( new Date( System.currentTimeMillis()+delayed )); item.setLastCheckDate( item.getAwakenDate() ); return item; } }
銆€銆€
鍩轰簬redis 鐨?鍒嗗竷寮忛攣瀵硅薄:
package com.rynk.mugua.trading.biz.commons.lock; import org.slf4j.Logger; import org.slf4j.LoggerFactory; import org.springframework.data.redis.core.RedisTemplate; import org.springframework.data.redis.core.ValueOperations; import org.springframework.stereotype.Component; import org.springframework.util.StringUtils; import javax.annotation.Resource; import java.text.SimpleDateFormat; import java.util.Date; import java.util.concurrent.TimeUnit; /** * 鍒嗗竷寮忛攣 * * @author ZHANGYUKUN * */ @Component public class DistributedLockHandler { private static final Logger logger = LoggerFactory.getLogger(DistributedLockHandler.class); /** * 鏈€澶ф寔鏈夐攣鐨勬椂闂?姣) */ private final static long LOCK_EXPIRE = 30 * 1000L; /** * 灏濊瘯鑾峰彇閿佺殑鏃堕棿闂撮殧锛堟绉掞級 */ private final static long LOCK_TRY_INTERVAL = 30L; /** * 鑾峰彇閿佹渶澶х瓑寰呮椂闂? 姣 ) */ private final static long LOCK_TRY_TIMEOUT = 20 * 1000L; @Resource// (name = "customRedisTemplate") private RedisTemplate<String, String> template; /** * 灏濊瘯鑾峰彇 鍒嗗竷寮忛攣 * * @param lockKey * 閿佸悕 * @return true 寰楀埌浜嗛攣 锛宖alse 鑾峰彇閿佸け璐? */ public boolean tryLock(String lockKey) { return getLock(lockKey, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, LOCK_EXPIRE); } /** * 灏濊瘯鑾峰彇 鍒嗗竷寮忛攣锛堜笉鑷姩閲婃斁閿侊級 * * @param lockKey * 閿佸悕 * @return true 寰楀埌浜嗛攣 锛宖alse 鑾峰彇閿佸け璐? */ public boolean tryLockNotAutoRelease(String lockKey) { return getLock(lockKey, LOCK_TRY_TIMEOUT, LOCK_TRY_INTERVAL, -1); } /** * 灏濊瘯鑾峰彇 鍒嗗竷寮忛攣 * * @param lockKey * 閿佸悕 * @param timeout * 鑾峰彇閿佹渶澶х瓑寰呮椂闂? * @return true 寰楀埌浜嗛攣 锛宖alse 鑾峰彇閿佸け璐? */ public boolean tryLock(String lockKey, long timeout) { return getLock(lockKey, timeout, LOCK_TRY_INTERVAL, LOCK_EXPIRE); } /** * 灏濊瘯鑾峰彇 鍒嗗竷寮忛攣锛堜笉鑷姩閲婃斁閿侊級 * * @param lockKey * 閿佸悕 * @param timeout * 鑾峰彇閿佹渶澶х瓑寰呮椂闂? * @return true 寰楀埌浜嗛攣 锛宖alse 鑾峰彇閿佸け璐? */ public boolean tryLockNotAutoRelease(String lockKey, long timeout) { return getLock(lockKey, timeout, LOCK_TRY_INTERVAL, -1); } /** * 灏濊瘯鑾峰彇 鍒嗗竷寮忛攣 * * @param lockKey * 閿佸悕 * @param timeout * 鑾峰彇閿佹渶澶х瓑寰呮椂闂? * @param tryInterval * 鑾峰彇閿佸皾璇?鏃堕棿闂撮殧 * @return true 寰楀埌浜嗛攣 锛宖alse 鑾峰彇閿佸け璐? */ public boolean tryLock(String lockKey, long timeout, long tryInterval) { return getLock(lockKey, timeout, tryInterval, LOCK_EXPIRE); } /** * 灏濊瘯鑾峰彇 鍒嗗竷寮忛攣锛堜笉閲婃斁閿侊級 * * @param lockKey * 閿佸悕 * @param timeout * 鑾峰彇閿佹渶澶х瓑寰呮椂闂? * @param tryInterval * 鑾峰彇閿佸皾璇?鏃堕棿闂撮殧 * @return true 寰楀埌浜嗛攣 锛宖alse 鑾峰彇閿佸け璐? */ public boolean tryLockNotAutoRelease(String lockKey, long timeout, long tryInterval) { return getLock(lockKey, timeout, tryInterval, -1); } /** * 灏濊瘯鑾峰彇 鍒嗗竷寮忛攣 * * @param lockKey * 閿佸悕 * @param timeout * 鑾峰彇閿佹渶澶х瓑寰呮椂闂? * @param tryInterval * 鑾峰彇閿佸皾璇?鏃堕棿闂撮殧 * @param lockExpireTime * 閿佹渶澶ф寔鏈夋椂闂? * @return true 寰楀埌浜嗛攣 锛宖alse 鑾峰彇閿佸け璐? */ public boolean tryLock(String lockKey, long timeout, long tryInterval, long lockExpireTime) { return getLock(lockKey, timeout, tryInterval, lockExpireTime); } /** * 鑾峰彇鍒嗗竷寮忛攣 * * @param lockKey * 閿佸悕 * @param timeout * 鑾峰彇閿佹渶澶х瓑寰呮椂闂? * @param tryInterval * 鑾峰彇閿佸皾璇?鏃堕棿闂撮殧 * @param lockExpireTime * 閿佹渶澶ф寔鏈夋椂闂? * @return true 寰楀埌浜嗛攣 锛宖alse 鑾峰彇閿佸け璐? */ public boolean getLock(String lockKey, long timeout, long tryInterval, long lockExpireTime) { try { if (StringUtils.isEmpty(lockKey)) { return false; } long startTime = System.currentTimeMillis(); do { ValueOperations<String, String> ops = template.opsForValue(); SimpleDateFormat sd = new SimpleDateFormat("yyyy-mm-dd HH:mm:ss"); if (ops.setIfAbsent(lockKey, sd.format(new Date()) )) { if (lockExpireTime > 0) { template.expire(lockKey, lockExpireTime, TimeUnit.MILLISECONDS); } return true; } Thread.sleep(tryInterval); } while (System.currentTimeMillis() - startTime < timeout); } catch (InterruptedException e) { logger.error(e.getMessage()); return false; } return false; } /** * 閲婃斁閿? * * @param lockKey */ public void unLock(String lockKey) { if (!StringUtils.isEmpty(lockKey)) { template.delete(lockKey); } } }
銆€銆€
寤舵椂浠诲姟瀵硅薄: 鐢ㄦ潵鍒嗚寤舵椂娑堟伅鐨?/p>
package com.rynk.mugua.trading.biz.eth; import java.util.concurrent.Delayed; import java.util.concurrent.TimeUnit; import com.rynk.commons.entity.QueueMessage; import com.rynk.mugua.trading.biz.mqMessage.ChainMessageDelayTimeLevel; /** * 寤舵椂浠诲姟 * @author zhangyukun * */ public class DelayedTack implements Delayed{ /** * 鎵ц鐨勬椂闂? */ Long runTime; /** * 娑堟伅瀵硅薄 */ QueueMessage queueMessage; public QueueMessage getQueueMessage() { return queueMessage; } public void setQueueMessage(QueueMessage queueMessage) { this.queueMessage = queueMessage; } /** * * @param delay 寤舵椂姣鏁? * @param queueMessage 娑堟伅浣? */ public DelayedTack( QueueMessage queueMessage ) { if( queueMessage.getTryTimes() == 1 ) { this.runTime = queueMessage.getAwakenDate().getTime(); }else { this.runTime =System.currentTimeMillis() + ChainMessageDelayTimeLevel.getDelayTimeLevel( queueMessage.getTryTimes() )*1000 ; } this.queueMessage = queueMessage; } @Override public long getDelay(TimeUnit unit) { return unit.convert( runTime - System.currentTimeMillis() , TimeUnit.MILLISECONDS); } @Override public int compareTo(Delayed o) { return (int) (this.getDelay(TimeUnit.MILLISECONDS) -o.getDelay(TimeUnit.MILLISECONDS)); } }
銆€銆€
鎸佷箙鍖栨秷鎭殑瀵硅薄: 涓変釜 鏂瑰紡 鎸夌収鑷繁鐨勫疄鐜板仛灏辨槸浜?/p>
package com.rynk.mugua.trading.biz.service.impl; import com.rynk.commons.entity.QueueMessage; public class MessagePersistent { /** * 娑堣垂杩欎釜娑堟伅瑕佸鐞嗙殑涓氬姟閫昏緫 * @param queueMessage * @return */ public boolean consume(QueueMessage queueMessage) { return false; } /** * 鏍囪杩欎釜娑堟伅宸茬粡琚甯告秷璐? * @param queueMessage */ public void succeed(QueueMessage queueMessage) { } /** * 閲嶈瘯娑堟伅锛堟爣璁版暟鎹簱鐨勭姸鎬侊紝鐒跺悗鎶婂畠閲嶆柊鏀惧埌寤舵椂闃熷垪涓級 * @param queueMessage */ public void reTry(QueueMessage queueMessage) { } }
銆€銆€
以上是关于涓€涓畝鍗曠殑 鏀寔寤舵椂娑堟伅 ,鎸佷箙鍖栨秷鎭殑娑堟伅闃熷垪 鐨凧ava瀹炵幇的主要内容,如果未能解决你的问题,请参考以下文章