zookeeper源码解析--会话管理--SessionTrackerImpl

Posted raindayinrain

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper源码解析--会话管理--SessionTrackerImpl相关的知识,希望对你有一定的参考价值。

SessionTrackerImpl

SessionImpl

// 会话id,超时时间
SessionImpl(long sessionId, int timeout) 

    this.sessionId = sessionId;
    this.timeout = timeout;
    isClosing = false;


// 会话id
final long sessionId;
// 超时时间
final int timeout;
// 关闭标志
boolean isClosing;
// 拥有者
Object owner;
public long getSessionId() 

    return sessionId;


public int getTimeout() 

    return timeout;


public boolean isClosing() 

    return isClosing;


public String toString() 

    return "0x" + Long.toHexString(sessionId);

SessionTrackerImpl

// 基于当前时间&传入id构造新的会话id
public static long initializeNextSessionId(long id) 

    long nextSid;
    nextSid = (Time.currentElapsedTime() << 24) >>> 8;
    nextSid = nextSid | (id << 56);
    if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER) 
    
        ++nextSid;  
    
    
    return nextSid;


public SessionTrackerImpl(
	SessionExpirer expirer, 
	ConcurrentMap<Long, Integer> sessionsWithTimeout, 
	int tickTime, 
	long serverId, 
	ZooKeeperServerListener listener) 

    super("SessionTracker", listener);
    this.expirer = expirer;
    this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
    this.sessionsWithTimeout = sessionsWithTimeout;
    this.nextSessionId.set(initializeNextSessionId(serverId));
    for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet()) 
    
        trackSession(e.getKey(), e.getValue());
    

    EphemeralType.validateServerId(serverId);


volatile boolean running = true;
public void dumpSessions(PrintWriter pwriter) 

    pwriter.print("Session ");
    sessionExpiryQueue.dump(pwriter);


// 时间点--在此时间点会超期的会话id集合
public synchronized Map<Long, Set<Long>> getSessionExpiryMap() 

    Map<Long, Set<SessionImpl>> expiryMap = sessionExpiryQueue.getExpiryMap();
    Map<Long, Set<Long>> sessionExpiryMap = new TreeMap<Long, Set<Long>>();
    for (Entry<Long, Set<SessionImpl>> e : expiryMap.entrySet()) 
    
        Set<Long> ids = new HashSet<Long>();
        sessionExpiryMap.put(e.getKey(), ids);
        for (SessionImpl s : e.getValue()) 
        
            ids.add(s.sessionId);
        
    

    return sessionExpiryMap;


@Override
public String toString() 

    StringWriter sw = new StringWriter();
    PrintWriter pwriter = new PrintWriter(sw);
    dumpSessions(pwriter);
    pwriter.flush();
    pwriter.close();
    return sw.toString();


@Override
public void run() 

    try 
    
        while (running) 
        
        	// 获取下一超期时间点距离现在时长
            long waitTime = sessionExpiryQueue.getWaitTime();
            if (waitTime > 0) 
            
                Thread.sleep(waitTime);
                continue;
            
			
			// 对每个超期的会话,设置状态为关闭中
			// 让会话超期
            for (SessionImpl s : sessionExpiryQueue.poll()) 
            
                ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
                setSessionClosing(s.sessionId);
                expirer.expire(s);
            
        
     
    catch (InterruptedException e) 
    
        handleException(this.getName(), e);
    
    
    LOG.info("SessionTrackerImpl exited loop!");


// 触碰会话
public synchronized boolean touchSession(long sessionId, int timeout) 

	// 会话对象
    SessionImpl s = sessionsById.get(sessionId);
    if (s == null) 
    
        logTraceTouchInvalidSession(sessionId, timeout);
        return false;
    

    if (s.isClosing()) 
    
        logTraceTouchClosingSession(sessionId, timeout);
        return false;
    
	
	// 为会话设置新的超期时间点
    updateSessionExpiry(s, timeout);
    return true;


// 为会话设置新的超期时间点
private void updateSessionExpiry(SessionImpl s, int timeout) 

    logTraceTouchSession(s.sessionId, timeout, "");
    sessionExpiryQueue.update(s, timeout);


private void logTraceTouchSession(long sessionId, int timeout, String sessionStatus) 

    if (LOG.isTraceEnabled()) 
    
        String msg = MessageFormat.format(
        	"SessionTrackerImpl --- Touch 0session: 0x1 with timeout 2", sessionStatus, 
        	Long.toHexString(sessionId), Integer.toString(timeout));
        ZooTrace.logTraceMessage(LOG, ZooTrace.CLIENT_PING_TRACE_MASK, msg);
    


private void logTraceTouchInvalidSession(long sessionId, int timeout) 

    logTraceTouchSession(sessionId, timeout, "invalid ");


private void logTraceTouchClosingSession(long sessionId, int timeout) 

    logTraceTouchSession(sessionId, timeout, "closing ");


// 获得会话的超期时间点
public int getSessionTimeout(long sessionId) 

    return sessionsWithTimeout.get(sessionId);


// 设置会话对象关闭标志
public synchronized void setSessionClosing(long sessionId) 

    if (LOG.isTraceEnabled()) 
    
        LOG.trace("Session closing: 0x", Long.toHexString(sessionId));
    

    SessionImpl s = sessionsById.get(sessionId);
    if (s == null) 
    
        return;
    
    
    s.isClosing = true;


public synchronized void removeSession(long sessionId) 

    LOG.debug("Removing session 0x", Long.toHexString(sessionId));
    // 容器1中移除会话
    SessionImpl s = sessionsById.remove(sessionId);
    // 容器2中移除会话
    sessionsWithTimeout.remove(sessionId);
    if (LOG.isTraceEnabled()) 
    
        ZooTrace.logTraceMessage(LOG, 
        	ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Removing session 0x" + 	
        	Long.toHexString(sessionId));
    
    
    if (s != null) 
    
    	// 容器3中移除会话
        sessionExpiryQueue.remove(s);
    


public void shutdown() 

    LOG.info("Shutting down");
    running = false;
    if (LOG.isTraceEnabled()) 
    
        ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "Shutdown SessionTrackerImpl!");
    


// 分配会话id,超期时间
public long createSession(int sessionTimeout) 

    long sessionId = nextSessionId.getAndIncrement();
    trackSession(sessionId, sessionTimeout);
    return sessionId;


@Override
public synchronized boolean trackSession(long id, int sessionTimeout) 

    boolean added = false;
    // 会话对象
    SessionImpl session = sessionsById.get(id);
    if (session == null) 
    
        session = new SessionImpl(id, sessionTimeout);
    
	
	// 会话容器1
    SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
    if (existedSession != null) 
    
        session = existedSession;
     
    else 
    
        added = true;
        LOG.debug("Adding session 0x", Long.toHexString(id));
    

    if (LOG.isTraceEnabled()) 
    
        String actionStr = added ? "Adding" : "Existing";
        ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- " 
        	+ actionStr + " session 0x" + Long.toHexString(id) + " " + sessionTimeout);
    
	
	// 更新会话下一超期时间点
    updateSessionExpiry(session, sessionTimeout);
    return added;


public synchronized boolean commitSession(long id, int sessionTimeout) 

	// 会话容器2
    return sessionsWithTimeout.put(id, sessionTimeout) == null;


public boolean isTrackingSession(long sessionId) 

    return sessionsById.containsKey(sessionId);


public synchronized void checkSession(
	long sessionId, Object owner) 
	throws KeeperException.SessionExpiredException, 
	KeeperException.SessionMovedException, KeeperException.UnknownSessionException 

    LOG.debug("Checking session 0x", Long.toHexString(sessionId));
    // 获得会话对象
    SessionImpl session = sessionsById.get(sessionId);
    if (session == null) 
    
        throw new KeeperException.UnknownSessionException();
    

    if (session.isClosing()) 
    
        throw new KeeperException.SessionExpiredException();
    
	
	// 设置会话拥有者
    if (session.owner == null) 
    
        session.owner = owner;
     
    else if (session.owner != owner) 
    
        throw new KeeperException.SessionMovedException();
    


public synchronized void setOwner(long id, Object owner) throws SessionExpiredException 

    SessionImpl session = sessionsById.get(id);
    if (session == null || session.isClosing())
    
        throw new KeeperException.SessionExpiredException();
    
    
    session.owner = owner;


// 检查会话
public void checkGlobalSession(
	long sessionId, Object owner) 
	throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException 

    try 
    
        checkSession(sessionId, owner);
     
    catch (KeeperException.UnknownSessionException e) 
    
        throw new KeeperException.SessionExpiredException();
    


// 本地会话个数
public long getLocalSessionCount() 

    return 0;


// 支持本地会话
@Override
public boolean isLocalSessionsEnabled() 

    return false;


// 全局会话id信息
public Set<Long> globalSessions() 

    return sessionsById.keySet();


// 本地会话
public Set<Long> localSessions() 

    return Collections.emptySet();

以上是关于zookeeper源码解析--会话管理--SessionTrackerImpl的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper会话管理源码深入探讨

Zookeeper会话管理源码深入探讨

zookeeper源码之临时节点管理

TensorFlow 运行模型--会话(Session)

zookeeper的Leader选举源码解析

CodeIgniter,使用 Redis 作为会话管理器