Zookeeper源码阅读 Seesion

Posted gongcomeon

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Zookeeper源码阅读 Seesion相关的知识,希望对你有一定的参考价值。

前言

前面三篇主要从client的角度说了下client和server建立连接的过程,这一篇和后面一篇开始看下Zookeeper中非常重要的一个概念:Session,session是zookeeper client和server建立和维护连接的单位(我这个描述感觉有点奇怪 ?? )。

Session状态

Zookeeper的所有操作基本都是基于session的,如之前提到的wathcer的机制,客户端请求的顺序执行和临时节点的生命周期。

从我们使用API的角度,session的连接和保持就是客户端通过实例化Zookeeper对象来与Zookeeper server端创建并保持连接TCP连接的过程。在客户端与服务器端成功创建了一个连接后,一个会话就被创建了。而在一个会话的生命周期中,session的状态可能在几种不同的状态中切换,而这些状态可以分为connecting,connected,reconnecting,reconnected,close等。

状态切换

  1. 客户端尝试去连接服务器端(public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher) throws IOException),这时客户端回去尝试连接服务器,而session的状态就变成了connecting。这个过程之前在讲sengthread的部分有详细讲过,具体是client会从server(HostProvider)的列表里逐个尝试连接;
  2. 由于网络或程序等原因导致服务器和客户端断开连接,此时客户端会尝试去重新连接server,则session重新进入connecting状态;
  3. 重连成功后,session变为connected状态;
  4. 会话超时,权限检查失败或客户端主动发起断开连接请求后session变为close状态。

p.s. 这里要提一下第一步,在3.2.0版本中增加了chroot后缀(配置时加在后面,不是说这个chroot的功能是后缀,而且恰恰相反,功能是前缀)的设置,在zk的配置中类似配置 "127.0.0.1:4545/app/a" 或者 "127.0.0.1:3000,127.0.0.1:3001,127.0.0.1:3002/app/a"这样的设置,那么在zookeeper上所有的节点前都回家再/app/a的前缀。所以连接后在server上的根节点是/app/a。

作用:This feature is particularly useful in multi-tenant environments where each user of a particular ZooKeeper service could be rooted differently. This makes re-use much simpler as each user can code his/her application as if it were rooted at "/", while actual location (say /app/a) could be determined at deployment time. Zookeeper官方文档是这样描述的,就是说在一台机器或vm上有多个tenant安装zookeeper,通过在配置中增加这样的配置,这样根据节点本身就知道它的具体位置。

Zookeeper官方用下图来表示状态的变化:

技术分享图片

结合在sendthread介绍中说的,在startconnect方法中连接中会把状态设置为States.CONNECTING,连接成功后,在Onconnected方法里会把状态设置为CONNECTED。

在zk的create/exists/getchildren…等等接口内部最后回去submitRequest并把生成的packet放入queue中,在queuePacket方法的cnLossPacket方法中会根据状态去处理session超时,验证失败和连接丢失的问题。

private void conLossPacket(Packet p) {
    if (p.replyHeader == null) {
        return;
    }
    switch (state) {
    case AUTH_FAILED://验证失败
        p.replyHeader.setErr(KeeperException.Code.AUTHFAILED.intValue());
        break;
    case CLOSED://session超时导致close
        p.replyHeader.setErr(KeeperException.Code.SESSIONEXPIRED.intValue());
        break;
    default://其他原因导致连接丢失
        p.replyHeader.setErr(KeeperException.Code.CONNECTIONLOSS.intValue());
    }
    finishPacket(p);
}

在创建Session时,需要设置Session Timeout这个重要参数。这是Zookeeper服务允许一个Session在定义它失效之前的时间。如果服务在时间t内不能看到与一个Session关联的消息,它将定义这个Session失效。如果客户端在1/3 t时间内没有听到任何从服务器过来的消息,它将发送一个心跳消息给服务器。在(2/3)t时间, Zookeeper客户端开始寻找另一个Zookeeper服务器,并且它有另外的(1/3)t的时间寻找。

会话创建

实体

public interface SessionTracker {
public static interface Session {
    long getSessionId();, 
    int getTimeout();
    boolean isClosing();
}
public static interface SessionExpirer {
    void expire(Session session);

    long getServerId();
}

可以看到,在SessionTracker接口中有两个内部接口Session和SessionExpirer,可以看到分别和session与session过期有关系。

public static class SessionImpl implements Session {
    SessionImpl(long sessionId, int timeout, long expireTime) {
        this.sessionId = sessionId;
        this.timeout = timeout;
        this.tickTime = expireTime;
        isClosing = false;
    }

    final long sessionId;
    final int timeout;
    long tickTime;
    boolean isClosing;

    Object owner;

    public long getSessionId() { return sessionId; }
    public int getTimeout() { return timeout; }
    public boolean isClosing() { return isClosing; }
}

在SessionTrackerImpl类中有Session接口的实现类,此类也代表了一个真正的session对象。可以看到SessionImpl类中有几个变量:

sessionId:会话ID,用来标识一个唯一会话。每次客户端和server连接创建新会话时,zk会为其分别一个全局唯一的ID;

timeout:在创建zookeeper对象时传入的参数,客户端向server发送了这个参数后,服务器会根据timeout时间来判断session的状态;

ticktime:下次会话超时的时间点,大约为当前时间+timeout,具体之后详细解释;

isclosing:表明一个会话是否已经被关闭,如果一个会话已经被标记为closing,server便不会处理来自此session的请求。

SessionId生成策略

在ZookeeperServer的processConnectRequest方法中有对客户端建立连接请求的处理:

if (sessionId != 0) {//sessionId已经存在
    long clientSessionId = connReq.getSessionId();
    LOG.info("Client attempting to renew session 0x"
            + Long.toHexString(clientSessionId)
            + " at " + cnxn.getRemoteSocketAddress());
    serverCnxnFactory.closeSession(sessionId);
    cnxn.setSessionId(sessionId);
    reopenSession(cnxn, sessionId, passwd, sessionTimeout);//重新打开session
} else {
    LOG.info("Client attempting to establish new session at "
            + cnxn.getRemoteSocketAddress());
    createSession(cnxn, passwd, sessionTimeout);//新建session
}
long createSession(ServerCnxn cnxn, byte passwd[], int timeout) {
    long sessionId = sessionTracker.createSession(timeout);
synchronized public long createSession(int sessionTimeout) {
    addSession(nextSessionId, sessionTimeout);
    return nextSessionId++;//每次取过之后nextSessionId+1
}
synchronized public void addSession(long id, int sessionTimeout) {
    sessionsWithTimeout.put(id, sessionTimeout);
    if (sessionsById.get(id) == null) {
        SessionImpl s = new SessionImpl(id, sessionTimeout, 0);//新建sessionImpl对象

可以看到在每次新建session是建立在已经保存的nextSessionId的基础上的。然后看一下nextSessionId的初始化:

public static long initializeNextSession(long id) {
    long nextSid = 0;
    nextSid = (Time.currentElapsedTime() << 24) >>> 8;
    nextSid =  nextSid | (id <<56);
    return nextSid;
}

initializeNextSession方法在zookeeperserver启动时的startup方法中,startup方法会初始化SessionTrackerImpl变量,此时nextSessionId会被初始化。

这里用到了Time.currentElapsedTime()方法去获得当前的时间,是一个64位的值。但是在之前的版本中用的是System.currentTimeMillis() 方法。为什么要用新的方法替代原来的值,事实上在正常情况下都不会有问题,但是如果有人修改了系统的时间,那么原来的方法就可能有问题。

至于nextSid生成的算法:系统时间先左移24位然后无符号右移8位然后和myid文件中的唯一id值左移56位生成的值做或操作,这样可以生产一个64位的唯一ID,然后后面的session基于这个值递增获得。这也是为什么在myid文件中配置唯一id时必须要小于256的原因。

SessionTracker

sessiontracker的作用就是server用来管理会话的,它负责了session的创建,管理和删除,整个session的生命周期都在sessiontracker的管理之下。每个session在sessiontracker内都分成三份保存。

public class SessionTrackerImpl extends ZooKeeperCriticalThread implements SessionTracker {
    private static final Logger LOG = LoggerFactory.getLogger(SessionTrackerImpl.class);

    HashMap<Long, SessionImpl> sessionsById = new HashMap<Long, SessionImpl>(); 

    HashMap<Long, SessionSet> sessionSets = new HashMap<Long, SessionSet>();//

    ConcurrentHashMap<Long, Integer> sessionsWithTimeout;
    long nextSessionId = 0;//下一次session的id
    long nextExpirationTime;//最近的超时时间

    int expirationInterval;//超时检查间隔
static class SessionSet {
    HashSet<SessionImpl> sessions = new HashSet<SessionImpl>();
}

sessionsById是根据session的id来管理session实体的属性;而sessionSets则是根据下次超时时间来归档回话,便于会话管理和超时审查;sessionsWithTimeout是线程安全的,它也是按照id来保存session的超时时间,sessionsWithTimeout和zk的内存数据库相通,会定期同步到快照中。

思考

这一篇主要说了些宏观的概念和session id的生成机制,比较泛,但是是下一篇的基础。

参考

https://zookeeper.apache.org/doc/r3.3.6/zookeeperProgrammers.html

https://blog.csdn.net/jeff_fangji/article/details/43916359

https://www.jianshu.com/p/594129a44814

http://www.cnblogs.com/leesf456/p/6103870.html

https://xt00002003.iteye.com/blog/2302392

以上是关于Zookeeper源码阅读 Seesion的主要内容,如果未能解决你的问题,请参考以下文章

Zookeeper源码阅读 Server端Watcher

zookeeper 源码 选举和同步数据

ZooKeeper 源码阅读版本选择

ZooKeeper源码阅读心得分享+源码基本结构+源码环境搭建

ZooKeeper源码阅读心得分享+源码基本结构+源码环境搭建

ZooKeeper 源码阅读