zookeeper源码解析--会话管理--SessionTrackerImpl
Posted raindayinrain
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper源码解析--会话管理--SessionTrackerImpl相关的知识,希望对你有一定的参考价值。
SessionTrackerImpl
SessionImpl
// 会话id,超时时间
SessionImpl(long sessionId, int timeout)
this.sessionId = sessionId;
this.timeout = timeout;
isClosing = false;
// 会话id
final long sessionId;
// 超时时间
final int timeout;
// 关闭标志
boolean isClosing;
// 拥有者
Object owner;
public long getSessionId()
return sessionId;
public int getTimeout()
return timeout;
public boolean isClosing()
return isClosing;
public String toString()
return "0x" + Long.toHexString(sessionId);
SessionTrackerImpl
// 基于当前时间&传入id构造新的会话id
public static long initializeNextSessionId(long id)
long nextSid;
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
nextSid = nextSid | (id << 56);
if (nextSid == EphemeralType.CONTAINER_EPHEMERAL_OWNER)
++nextSid;
return nextSid;
public SessionTrackerImpl(
SessionExpirer expirer,
ConcurrentMap<Long, Integer> sessionsWithTimeout,
int tickTime,
long serverId,
ZooKeeperServerListener listener)
super("SessionTracker", listener);
this.expirer = expirer;
this.sessionExpiryQueue = new ExpiryQueue<SessionImpl>(tickTime);
this.sessionsWithTimeout = sessionsWithTimeout;
this.nextSessionId.set(initializeNextSessionId(serverId));
for (Entry<Long, Integer> e : sessionsWithTimeout.entrySet())
trackSession(e.getKey(), e.getValue());
EphemeralType.validateServerId(serverId);
volatile boolean running = true;
public void dumpSessions(PrintWriter pwriter)
pwriter.print("Session ");
sessionExpiryQueue.dump(pwriter);
// 时间点--在此时间点会超期的会话id集合
public synchronized Map<Long, Set<Long>> getSessionExpiryMap()
Map<Long, Set<SessionImpl>> expiryMap = sessionExpiryQueue.getExpiryMap();
Map<Long, Set<Long>> sessionExpiryMap = new TreeMap<Long, Set<Long>>();
for (Entry<Long, Set<SessionImpl>> e : expiryMap.entrySet())
Set<Long> ids = new HashSet<Long>();
sessionExpiryMap.put(e.getKey(), ids);
for (SessionImpl s : e.getValue())
ids.add(s.sessionId);
return sessionExpiryMap;
@Override
public String toString()
StringWriter sw = new StringWriter();
PrintWriter pwriter = new PrintWriter(sw);
dumpSessions(pwriter);
pwriter.flush();
pwriter.close();
return sw.toString();
@Override
public void run()
try
while (running)
// 获取下一超期时间点距离现在时长
long waitTime = sessionExpiryQueue.getWaitTime();
if (waitTime > 0)
Thread.sleep(waitTime);
continue;
// 对每个超期的会话,设置状态为关闭中
// 让会话超期
for (SessionImpl s : sessionExpiryQueue.poll())
ServerMetrics.getMetrics().STALE_SESSIONS_EXPIRED.add(1);
setSessionClosing(s.sessionId);
expirer.expire(s);
catch (InterruptedException e)
handleException(this.getName(), e);
LOG.info("SessionTrackerImpl exited loop!");
// 触碰会话
public synchronized boolean touchSession(long sessionId, int timeout)
// 会话对象
SessionImpl s = sessionsById.get(sessionId);
if (s == null)
logTraceTouchInvalidSession(sessionId, timeout);
return false;
if (s.isClosing())
logTraceTouchClosingSession(sessionId, timeout);
return false;
// 为会话设置新的超期时间点
updateSessionExpiry(s, timeout);
return true;
// 为会话设置新的超期时间点
private void updateSessionExpiry(SessionImpl s, int timeout)
logTraceTouchSession(s.sessionId, timeout, "");
sessionExpiryQueue.update(s, timeout);
private void logTraceTouchSession(long sessionId, int timeout, String sessionStatus)
if (LOG.isTraceEnabled())
String msg = MessageFormat.format(
"SessionTrackerImpl --- Touch 0session: 0x1 with timeout 2", sessionStatus,
Long.toHexString(sessionId), Integer.toString(timeout));
ZooTrace.logTraceMessage(LOG, ZooTrace.CLIENT_PING_TRACE_MASK, msg);
private void logTraceTouchInvalidSession(long sessionId, int timeout)
logTraceTouchSession(sessionId, timeout, "invalid ");
private void logTraceTouchClosingSession(long sessionId, int timeout)
logTraceTouchSession(sessionId, timeout, "closing ");
// 获得会话的超期时间点
public int getSessionTimeout(long sessionId)
return sessionsWithTimeout.get(sessionId);
// 设置会话对象关闭标志
public synchronized void setSessionClosing(long sessionId)
if (LOG.isTraceEnabled())
LOG.trace("Session closing: 0x", Long.toHexString(sessionId));
SessionImpl s = sessionsById.get(sessionId);
if (s == null)
return;
s.isClosing = true;
public synchronized void removeSession(long sessionId)
LOG.debug("Removing session 0x", Long.toHexString(sessionId));
// 容器1中移除会话
SessionImpl s = sessionsById.remove(sessionId);
// 容器2中移除会话
sessionsWithTimeout.remove(sessionId);
if (LOG.isTraceEnabled())
ZooTrace.logTraceMessage(LOG,
ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- Removing session 0x" +
Long.toHexString(sessionId));
if (s != null)
// 容器3中移除会话
sessionExpiryQueue.remove(s);
public void shutdown()
LOG.info("Shutting down");
running = false;
if (LOG.isTraceEnabled())
ZooTrace.logTraceMessage(LOG, ZooTrace.getTextTraceLevel(), "Shutdown SessionTrackerImpl!");
// 分配会话id,超期时间
public long createSession(int sessionTimeout)
long sessionId = nextSessionId.getAndIncrement();
trackSession(sessionId, sessionTimeout);
return sessionId;
@Override
public synchronized boolean trackSession(long id, int sessionTimeout)
boolean added = false;
// 会话对象
SessionImpl session = sessionsById.get(id);
if (session == null)
session = new SessionImpl(id, sessionTimeout);
// 会话容器1
SessionImpl existedSession = sessionsById.putIfAbsent(id, session);
if (existedSession != null)
session = existedSession;
else
added = true;
LOG.debug("Adding session 0x", Long.toHexString(id));
if (LOG.isTraceEnabled())
String actionStr = added ? "Adding" : "Existing";
ZooTrace.logTraceMessage(LOG, ZooTrace.SESSION_TRACE_MASK, "SessionTrackerImpl --- "
+ actionStr + " session 0x" + Long.toHexString(id) + " " + sessionTimeout);
// 更新会话下一超期时间点
updateSessionExpiry(session, sessionTimeout);
return added;
public synchronized boolean commitSession(long id, int sessionTimeout)
// 会话容器2
return sessionsWithTimeout.put(id, sessionTimeout) == null;
public boolean isTrackingSession(long sessionId)
return sessionsById.containsKey(sessionId);
public synchronized void checkSession(
long sessionId, Object owner)
throws KeeperException.SessionExpiredException,
KeeperException.SessionMovedException, KeeperException.UnknownSessionException
LOG.debug("Checking session 0x", Long.toHexString(sessionId));
// 获得会话对象
SessionImpl session = sessionsById.get(sessionId);
if (session == null)
throw new KeeperException.UnknownSessionException();
if (session.isClosing())
throw new KeeperException.SessionExpiredException();
// 设置会话拥有者
if (session.owner == null)
session.owner = owner;
else if (session.owner != owner)
throw new KeeperException.SessionMovedException();
public synchronized void setOwner(long id, Object owner) throws SessionExpiredException
SessionImpl session = sessionsById.get(id);
if (session == null || session.isClosing())
throw new KeeperException.SessionExpiredException();
session.owner = owner;
// 检查会话
public void checkGlobalSession(
long sessionId, Object owner)
throws KeeperException.SessionExpiredException, KeeperException.SessionMovedException
try
checkSession(sessionId, owner);
catch (KeeperException.UnknownSessionException e)
throw new KeeperException.SessionExpiredException();
// 本地会话个数
public long getLocalSessionCount()
return 0;
// 支持本地会话
@Override
public boolean isLocalSessionsEnabled()
return false;
// 全局会话id信息
public Set<Long> globalSessions()
return sessionsById.keySet();
// 本地会话
public Set<Long> localSessions()
return Collections.emptySet();
以上是关于zookeeper源码解析--会话管理--SessionTrackerImpl的主要内容,如果未能解决你的问题,请参考以下文章