Zookeeper Session源码
Posted Dazzling
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper Session源码相关的知识,希望对你有一定的参考价值。
我们说客户端与服务端建立连接交互的时候会创建一个 Session 与之对应,那假设客户端请求来了,服务端是如何处理的?Session 又是如何创建出来的?
我们先来看第一个问题:服务端如何处理客户端发来的请求?
一、如何处理请求
所谓的请求全称是网络请求,涉及到网络就少不了 Socket 通信,ZooKeeper 采取的是 NIO 的方式,提供了一个 NIOServerCnxn
实例来维护每一个客户端的连接,也就是说客户端与服务端的通信都是靠 NIOServerCnxn
这个类来处理的,无非就干两件事:接收客户端的请求以及处理请求(将请求体从网络 I/O 中读出来做处理)。这个处理类的核心方法是doIO()
,也就是如下:
public class NIOServerCnxn extends ServerCnxn
void doIO(SelectionKey k) throws InterruptedException
// ...
核心处理类结构已经清晰了,那我们接下来就分析 doIO()
这个方法就好了,我上面说这个方法无非就干两件事:接收请求、处理请求。那如何接收请求呢?请求又分为两种:创建连接的请求和发送数据的请求。
我们逐个分析:
- 何为创建连接的请求?其实很好理解,就是第一次客户端和服务端通信的时候要建立连接,建立完连接才能发送数据进行真正请求。
- 何为发送数据的请求?上面创建完连接了才能真正发送数据给服务端。
一言以蔽之就是:客户端要想和服务端通信必须先建立连接才能发送数据,且连接只需要建立一次即可。所以我们会有一个变量: initialized
代表是否初始化完成,也就是代表是否已经建立过连接了。代码如下:
public void 接收请求()
if (!initialized)
// 还没初始化,那就建立连接
readConnectRequest();
else
// 初始化完了,那就发送数据
readRequest();
我们继续分析:readConnectRequest()
,也就是如何创建连接?首先我们能明确一点的是创建完连接后肯定要把 initialized
状态变为 true,代表已经建立完成。我们先把这块代码写下:
private void readConnectRequest() throws IOException, InterruptedException, ClientCnxnLimitException
// 省略真正建立连接代码
initialized = true;
在开始真正建立连接之前,我们肯定都知道一点:客户端会传输一些数据给服务端,但是网络传输都是靠字节数组,所以服务端接收到数据后第一件事就是拿到字节数组进行反序列化,反序列化成一个对象,我们叫这个对象为:ConnectRequest
。我们继续完善下代码:
public void 建立连接()
// 拿到客户端发来的字节数组
BinaryInputArchive bia = BinaryInputArchive.getArchive(new ByteBufferInputStream(incomingBuffer));
ConnectRequest connReq = new ConnectRequest();
// 反序列化成ConnectRequest对象
connReq.deserialize(bia, "connect");
我们在上一 Session 原理篇的时候说过 Session 是有生命周期的,带时效性的,也就是有过期时间的。那这个过期时间肯定是客户端和服务端建立连接的时候通过客户端发过去的,所以我们反序列化出来的对象里还会有 sessionTimeout
字段,如下:
// 基本验证
int sessionTimeout = connReq.getTimeOut();
int minSessionTimeout = getMinSessionTimeout();
if (sessionTimeout < minSessionTimeout)
sessionTimeout = minSessionTimeout;
int maxSessionTimeout = getMaxSessionTimeout();
if (sessionTimeout > maxSessionTimeout)
sessionTimeout = maxSessionTimeout;
很简单的一些验证,就好比我们业务代码中分页一样,都会判断不能小于 1、不能大于最大条数等验证。这个过期时间传给 Server 后,并不是真正的过期时间,因为我们在上一篇中也讲解过了,真实过期时间会被计算为 tickTime
的倍数。
有了上面这些参数后我们就可以真正创建 Session 了:
long id = createSession(cnxn, passwd, sessionTimeout);
在开始如何创建 Session 之前,我们先画个流程图:
现在创建 Session 的时机和前置条件都搞懂了,那创建 Session 都需要经过哪些步骤呢?
二、如何创建 Session
其实创建 Session 的原理我们在上一篇都讲解得很清楚了,按照大步骤来划分的话无非就是下面这四步:
- 按照一定规则为客户端生成 SessionId;
- Session 的创建以及过期机制;
- 每次正常 CRUD 以及定时心跳 Ping 都会重新刷新 Session 的过期时间,我们称这个过程为 Session 的激活;
- Session 到期后如何回收。
本篇不打算详细剖析上面这四步,那样会太占用篇幅,所以我把这四点放到下篇单独讲解,本篇讲解整个脉络,也可以称之为“框架”。比如我们上面讲解了服务端是如何处理请求的,然后现在我们假设 Session 已经创建完成了,“框架”如下:
public void proces ()
// 1. 接收请求
// 2. 处理请求
// 3. 创建Session
// 3.1 生成SessionId
// 3.2 Session过期机制
// 3.3 Session激活
// 3.4 Session回收
上面已经为我们提供好了如何激活 Session 的方法,但是什么时候触发这个方法呢?我们也说了是在正常 CRUD 或者心跳 Ping 的时候会进行激活,在剖析这块代码之前我们先时光倒流,回忆一下最开始我们讲如何处理请求的流程:
public void 接收请求()
if (!initialized)
// 还没初始化,那就建立连接
readConnectRequest();
else
// 初始化完了,那就发送数据
readRequest();
如果没建立过连接,那么就建立连接readConnectRequest()
;如果之前建立过连接,那就是正常地发送数据(可能是正常的 CRUD,也可能是心跳 Ping 请求)。所以大家肯定突然明白一个事情,那就是我们调用 Session 激活方法的入口就是这里——readRequest()
,接下来我们一起看下是如何调用的吧~
Zookeeper源码阅读 Seesion
前言
前面三篇主要从client的角度说了下client和server建立连接的过程,这一篇和后面一篇开始看下Zookeeper中非常重要的一个概念:Session,session是zookeeper client和server建立和维护连接的单位(我这个描述感觉有点奇怪 ?? )。
Session状态
Zookeeper的所有操作基本都是基于session的,如之前提到的wathcer的机制,客户端请求的顺序执行和临时节点的生命周期。
从我们使用API的角度,session的连接和保持就是客户端通过实例化Zookeeper对象来与Zookeeper server端创建并保持连接TCP连接的过程。在客户端与服务器端成功创建了一个连接后,一个会话就被创建了。而在一个会话的生命周期中,session的状态可能在几种不同的状态中切换,而这些状态可以分为connecting,connected,reconnecting,reconnected,close等。
状态切换
- 客户端尝试去连接服务器端(public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException),这时客户端回去尝试连接服务器,而session的状态就变成了connecting。这个过程之前在讲sengthread的部分有详细讲过,具体是client会从server(HostProvider)的列表里逐个尝试连接;
- 由于网络或程序等原因导致服务器和客户端断开连接,此时客户端会尝试去重新连接server,则session重新进入connecting状态;
- 重连成功后,session变为connected状态;
- 会话超时,权限检查失败或客户端主动发起断开连接请求后session变为close状态。
p.s. 这里要提一下第一步,在3.2.0版本中增加了chroot后缀(配置时加在后面,不是说这个chroot的功能是后缀,而且恰恰相反,功能是前缀)的设置,在zk的配置中类似配置 "127.0.0.1:4545/app/a" 或者 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"这样的设置,那么在zookeeper上所有的节点前都回家再/app/a的前缀。所以连接后在server上的根节点是/app/a。
作用:This feature is particularly useful in multi-tenant environments where each user of a particular ZooKeeper service could be rooted differently. This makes re-use much simpler as each user can code his/her application as if it were rooted at "/", while actual location (say /app/a) could be determined at deployment time.
Zookeeper官方文档是这样描述的,就是说在一台机器或vm上有多个tenant安装zookeeper,通过在配置中增加这样的配置,这样根据节点本身就知道它的具体位置。
Zookeeper官方用下图来表示状态的变化:
结合在sendthread介绍中说的,在startconnect方法中连接中会把状态设置为States.CONNECTING,连接成功后,在Onconnected方法里会把状态设置为CONNECTED。
在zk的create/exists/getchildren…等等接口内部最后回去submitRequest并把生成的packet放入queue中,在queuePacket方法的cnLossPacket方法中会根据状态去处理session超时,验证失败和连接丢失的问题。
private void conLossPacket(Packet p) {
if (p.replyHeader == null) {
return;
}
switch (state) {
case AUTH_FAILED://验证失败
p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
break;
case CLOSED://session超时导致close
p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
break;
default://其他原因导致连接丢失
p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
}
finishPacket(p);
}
在创建Session时,需要设置Session Timeout这个重要参数。这是Zookeeper服务允许一个Session在定义它失效之前的时间。如果服务在时间t内不能看到与一个Session关联的消息,它将定义这个Session失效。如果客户端在1/3 t时间内没有听到任何从服务器过来的消息,它将发送一个心跳消息给服务器。在(2/3)t时间, Zookeeper客户端开始寻找另一个Zookeeper服务器,并且它有另外的(1/3)t的时间寻找。
会话创建
实体
public interface SessionTracker {
public static interface Session {
long getSessionId();,
int getTimeout();
boolean isClosing();
}
public static interface SessionExpirer {
void expire(Session session);
long getServerId();
}
可以看到,在SessionTracker接口中有两个内部接口Session和SessionExpirer,可以看到分别和session与session过期有关系。
public static class SessionImpl implements Session {
SessionImpl(long sessionId, int timeout, long expireTime) {
this.sessionId = sessionId;
this.timeout = timeout;
this.tickTime = expireTime;
isClosing = false;
}
final long sessionId;
final int timeout;
long tickTime;
boolean isClosing;
Object owner;
public long getSessionId() { return sessionId; }
public int getTimeout() { return timeout; }
public boolean isClosing() { return isClosing; }
}
在SessionTrackerImpl类中有Session接口的实现类,此类也代表了一个真正的session对象。可以看到SessionImpl类中有几个变量:
sessionId:会话ID,用来标识一个唯一会话。每次客户端和server连接创建新会话时,zk会为其分别一个全局唯一的ID;
timeout:在创建zookeeper对象时传入的参数,客户端向server发送了这个参数后,服务器会根据timeout时间来判断session的状态;
ticktime:下次会话超时的时间点,大约为当前时间+timeout,具体之后详细解释;
isclosing:表明一个会话是否已经被关闭,如果一个会话已经被标记为closing,server便不会处理来自此session的请求。
SessionId生成策略
在ZookeeperServer的processConnectRequest方法中有对客户端建立连接请求的处理:
if (sessionId != 0) {//sessionId已经存在
long clientSessionId = connReq.getSessionId();
LOG.info("Client attempting to renew session 0x"
+ Long.toHexString(clientSessionId)
+ " at " + cnxn.getRemoteSocketAddress());
serverCnxnFactory.closeSession(sessionId);
cnxn.setSessionId(sessionId);
reopenSession(cnxn, sessionId, passwd, sessionTimeout);//重新打开session
} else {
LOG.info("Client attempting to establish new session at "
+ cnxn.getRemoteSocketAddress());
createSession(cnxn, passwd, sessionTimeout);//新建session
}
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
long sessionId = sessionTracker.createSession(timeout);
synchronized public long createSession(int sessionTimeout) {
addSession(nextSessionId, sessionTimeout);
return nextSessionId++;//每次取过之后nextSessionId+1
}
synchronized public void addSession(long id, int sessionTimeout) {
sessionsWithTimeout.put(id, sessionTimeout);
if (sessionsById.get(id) == null) {
SessionImpl s = new SessionImpl(id, sessionTimeout, 0);//新建sessionImpl对象
可以看到在每次新建session是建立在已经保存的nextSessionId的基础上的。然后看一下nextSessionId的初始化:
public static long initializeNextSession(long id) {
long nextSid = 0;
nextSid = (Time.currentElapsedTime() << 24) >>> 8;
nextSid = nextSid | (id <<56);
return nextSid;
}
initializeNextSession方法在zookeeperserver启动时的startup方法中,startup方法会初始化SessionTrackerImpl变量,此时nextSessionId会被初始化。
这里用到了Time.currentElapsedTime()方法去获得当前的时间,是一个64位的值。但是在之前的版本中用的是System.currentTimeMillis() 方法。为什么要用新的方法替代原来的值,事实上在正常情况下都不会有问题,但是如果有人修改了系统的时间,那么原来的方法就可能有问题。
至于nextSid生成的算法:系统时间先左移24位然后无符号右移8位然后和myid文件中的唯一id值左移56位生成的值做或操作,这样可以生产一个64位的唯一ID,然后后面的session基于这个值递增获得。这也是为什么在myid文件中配置唯一id时必须要小于256的原因。
SessionTracker
sessiontracker的作用就是server用来管理会话的,它负责了session的创建,管理和删除,整个session的生命周期都在sessiontracker的管理之下。每个session在sessiontracker内都分成三份保存。
public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
private static final Logger LOG = LoggerFactory.getLogger(SessionTrackerImpl.class);
HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>();
HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();//
ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
long nextSessionId = 0;//下一次session的id
long nextExpirationTime;//最近的超时时间
int expirationInterval;//超时检查间隔
static class SessionSet {
HashSet<SessionImpl> sessions = new HashSet<SessionImpl>();
}
sessionsById是根据session的id来管理session实体的属性;而sessionSets则是根据下次超时时间来归档回话,便于会话管理和超时审查;sessionsWithTimeout是线程安全的,它也是按照id来保存session的超时时间,sessionsWithTimeout和zk的内存数据库相通,会定期同步到快照中。
思考
这一篇主要说了些宏观的概念和session id的生成机制,比较泛,但是是下一篇的基础。
参考
https://zookeeper.apache.org/doc/r3.3.6/zookeeperProgrammers.html
https://blog.csdn.net/jeff_fangji/article/details/43916359
https://www.jianshu.com/p/594129a44814
http://www.cnblogs.com/leesf456/p/6103870.html
https://xt00002003.iteye.com/blog/2302392
以上是关于Zookeeper Session源码的主要内容,如果未能解决你的问题,请参考以下文章