ElasticJob源码部分解读-Zookeeper建立连接

Posted 低调的洋仔

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticJob源码部分解读-Zookeeper建立连接相关的知识,希望对你有一定的参考价值。

    private static CoordinatorRegistryCenter createRegistryCenter() 
        CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(SERVERLIST, "movie_data"));
        regCenter.init();
        return regCenter;
    

与注册中心简历连接的代码

ZookeeperConfig只用于存储配置的数据的,返回一个ZookeeperConfiguration的实例。

然后ZookeeperRegistryCenter这个类先封装了zkConfig(上面的configuration简写),然后再调用init来建立连接的。

 

 public void init() 
        log.debug("Elastic job: zookeeper registry center init, server lists is: .", this.zkConfig.getServerLists());
        Builder builder = CuratorFrameworkFactory.builder().connectString(this.zkConfig.getServerLists()).retryPolicy(new ExponentialBackoffRetry(this.zkConfig.getBaseSleepTimeMilliseconds(), this.zkConfig.getMaxRetries(), this.zkConfig.getMaxSleepTimeMilliseconds())).namespace(this.zkConfig.getNamespace());
        if (0 != this.zkConfig.getSessionTimeoutMilliseconds()) 
            builder.sessionTimeoutMs(this.zkConfig.getSessionTimeoutMilliseconds());
        

        if (0 != this.zkConfig.getConnectionTimeoutMilliseconds()) 
            builder.connectionTimeoutMs(this.zkConfig.getConnectionTimeoutMilliseconds());
        

        if (!Strings.isNullOrEmpty(this.zkConfig.getDigest())) 
            builder.authorization("digest", this.zkConfig.getDigest().getBytes(Charsets.UTF_8)).aclProvider(new ACLProvider() 
                public List<ACL> getDefaultAcl() 
                    return Ids.CREATOR_ALL_ACL;
                

                public List<ACL> getAclForPath(String path) 
                    return Ids.CREATOR_ALL_ACL;
                
            );
        

        this.client = builder.build();
        this.client.start();

        try 
            if (!this.client.blockUntilConnected(this.zkConfig.getMaxSleepTimeMilliseconds() * this.zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS)) 
                this.client.close();
                throw new OperationTimeoutException();
            
         catch (Exception var3) 
            RegExceptionHandler.handleException(var3);
        

    


这里创建了一个Builder对象,然后调用build方法。

 

 

    public static class Builder 
        private EnsembleProvider ensembleProvider;
        private int sessionTimeoutMs;
        private int connectionTimeoutMs;
        private int maxCloseWaitMs;
        private RetryPolicy retryPolicy;
        private ThreadFactory threadFactory;
        private String namespace;
        private List<AuthInfo> authInfos;
        private byte[] defaultData;
        private CompressionProvider compressionProvider;
        private ZookeeperFactory zookeeperFactory;
        private ACLProvider aclProvider;
        private boolean canBeReadOnly;
        private boolean useContainerParentsIfAvailable;

        public CuratorFramework build() 
            return new CuratorFrameworkImpl(this);
        

build方法又调用了CuratorFrameworkImpl构造方法。

 

 

    public CuratorFrameworkImpl(Builder builder) 
        ZookeeperFactory localZookeeperFactory = this.makeZookeeperFactory(builder.getZookeeperFactory());
        this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher() 
            public void process(WatchedEvent watchedEvent) 
                CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), CuratorFrameworkImpl.this.unfixForNamespace(watchedEvent.getPath()), (String)null, (Object)null, (Stat)null, (byte[])null, (List)null, watchedEvent, (List)null);
                CuratorFrameworkImpl.this.processEvent(event);
            
        , builder.getRetryPolicy(), builder.canBeReadOnly());
        this.listeners = new ListenerContainer();
        this.unhandledErrorListeners = new ListenerContainer();
        this.backgroundOperations = new DelayQueue();
        this.namespace = new NamespaceImpl(this, builder.getNamespace());
        this.threadFactory = this.getThreadFactory(builder);
        this.maxCloseWaitMs = builder.getMaxCloseWaitMs();
        this.connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
        this.compressionProvider = builder.getCompressionProvider();
        this.aclProvider = builder.getAclProvider();
        this.state = new AtomicReference(CuratorFrameworkState.LATENT);
        this.useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
        byte[] builderDefaultData = builder.getDefaultData();
        this.defaultData = builderDefaultData != null ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
        this.authInfos = this.buildAuths(builder);
        this.failedDeleteManager = new FailedDeleteManager(this);
        this.namespaceFacadeCache = new NamespaceFacadeCache(this);
    

然后调用了makeZookeeperFactory方法。

 

 

    private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory) 
        return new ZookeeperFactory() 
            public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception 
                ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
                Iterator var6 = CuratorFrameworkImpl.this.authInfos.iterator();

                while(var6.hasNext()) 
                    AuthInfo auth = (AuthInfo)var6.next();
                    zooKeeper.addAuthInfo(auth.getScheme(), auth.getAuth());
                

                return zooKeeper;
            
        ;
    

然后调用了actualZookeeperFactory中的newZookeeper方法。其实是调用了DefaultZookeeperFactory中的newZookeeper方法,

    public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception 
        return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
    

然后调用里面的new Zookeeper方法。

 

 

   public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException 
        this.watchManager = new ZooKeeper.ZKWatchManager((ZooKeeper.SyntheticClass_1)null);
        LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
        this.watchManager.defaultWatcher = watcher;
        ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
        HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
        this.cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, this.watchManager, getClientCnxnSocket(), canBeReadOnly);
        this.cnxn.start();
    

注意其中ConnnectionStringParser类是用于转换你传递进来的zookeeper的连接信息的,会按照逗号进行切分。

 

 

public ConnectStringParser(String connectString) 
        int off = connectString.indexOf(47);
        if (off >= 0) 
            String chrootPath = connectString.substring(off);
            if (chrootPath.length() == 1) 
                this.chrootPath = null;
             else 
                PathUtils.validatePath(chrootPath);
                this.chrootPath = chrootPath;
            

            connectString = connectString.substring(0, off);
         else 
            this.chrootPath = null;
        

        String[] hostsList = connectString.split(",");
        String[] arr$ = hostsList;
        int len$ = hostsList.length;

        for(int i$ = 0; i$ < len$; ++i$) 
            String host = arr$[i$];
            int port = 2181;
            int pidx = host.lastIndexOf(58);
            if (pidx >= 0) 
                if (pidx < host.length() - 1) 
                    port = Integer.parseInt(host.substring(pidx + 1));
                

                host = host.substring(0, pidx);
            

            this.serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
        

    

这里面serverAddresses里面封装了InetSocketAddress的信息。

 

ClientCnxn这个类中实际上是建立了连接

 

    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException 
        this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0L, new byte[16], canBeReadOnly);
    

    public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly) 
        this.authInfo = new CopyOnWriteArraySet();
        this.pendingQueue = new LinkedList();
        this.outgoingQueue = new LinkedList();
        this.sessionPasswd = new byte[16];
        this.closing = false;
        this.seenRwServerBefore = false;
        this.eventOfDeath = new Object();
        this.xid = 1;
        this.state = States.NOT_CONNECTED;
        this.zooKeeper = zooKeeper;
        this.watcher = watcher;
        this.sessionId = sessionId;
        this.sessionPasswd = sessionPasswd;
        this.sessionTimeout = sessionTimeout;
        this.hostProvider = hostProvider;
        this.chrootPath = chrootPath;
        this.connectTimeout = sessionTimeout / hostProvider.size();
        this.readTimeout = sessionTimeout * 2 / 3;
        this.readOnly = canBeReadOnly;
        this.sendThread = new ClientCnxn.SendThread(clientCnxnSocket);
        this.eventThread = new ClientCnxn.EventThread();
    


这里创建了Thread线程类。

 

然后在调用start的时候,实际上调用了里面的方法。

 

    public void start() 
        this.sendThread.start();
        this.eventThread.start();
    

然后调用的start方法实际上会调用run方法。

 

 

public void run() 
            this.clientCnxnSocket.introduce(this, ClientCnxn.this.sessionId);
            this.clientCnxnSocket.updateNow();
            this.clientCnxnSocket.updateLastSendAndHeard();
            long lastPingRwServer = System.currentTimeMillis();
            boolean var4 = true;

            while(ClientCnxn.this.state.isAlive()) 
                try 
                    if (!this.clientCnxnSocket.isConnected()) 
                        if (!this.isFirstConnect) 
                            try 
                                Thread.sleep((long)this.r.nextInt(1000));
                             catch (InterruptedException var9) 
                                ClientCnxn.LOG.warn("Unexpected exception", var9);
                            
                        

                        if (ClientCnxn.this.closing || !ClientCnxn.this.state.isAlive()) 
                            break;
                        

                        this.startConnect();
                        this.clientCnxnSocket.updateLastSendAndHeard();
                    


而run方法里面,会进行判断,如果没初始化这个连接的话,那么就进行初始化。

 

 

 

 

private void startConnect() throws IOException 
            ClientCnxn.this.state = States.CONNECTING;
            InetSocketAddress addr;
            if (this.rwServerAddress != null) 
                addr = this.rwServerAddress;
                this.rwServerAddress = null;
             else 
                addr = ClientCnxn.this.hostProvider.next(1000L);
            

            this.setName(this.getName().replaceAll("\\\\(.*\\\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
            if (ZooKeeperSaslClient.isEnabled()) 
                try 
                    String principalUserName = System.getProperty("zookeeper.sasl.client.username", "zookeeper");
                    ClientCnxn.this.zooKeeperSaslClient = new ZooKeeperSaslClient(principalUserName + "/" + addr.getHostName());
                 catch (LoginException var3) 
                    ClientCnxn.LOG.warn("SASL configuration failed: " + var3 + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it.");
                    ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.AuthFailed, (String)null));
                    this.saslLoginFailed = true;
                
            

            this.logStartConnect(addr);
            this.clientCnxnSocket.connect(addr);
        

 

 

 

以上是关于ElasticJob源码部分解读-Zookeeper建立连接的主要内容,如果未能解决你的问题,请参考以下文章

源码分析ElasticJob任务错过机制(misfire)与幂等性

ElasticJob源码分析--定时任务执行JobScheduler类分析

GitHub上持续冲榜,ElasticJob重启

ZooKeeper 源码阅读

ElasticJob和SpringBoot

SpringBoot2 整合ElasticJob框架,定制化管理流程