ElasticJob源码部分解读-Zookeeper建立连接
Posted 低调的洋仔
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ElasticJob源码部分解读-Zookeeper建立连接相关的知识,希望对你有一定的参考价值。
private static CoordinatorRegistryCenter createRegistryCenter()
CoordinatorRegistryCenter regCenter = new ZookeeperRegistryCenter(new ZookeeperConfiguration(SERVERLIST, "movie_data"));
regCenter.init();
return regCenter;
与注册中心简历连接的代码
ZookeeperConfig只用于存储配置的数据的,返回一个ZookeeperConfiguration的实例。
然后ZookeeperRegistryCenter这个类先封装了zkConfig(上面的configuration简写),然后再调用init来建立连接的。
public void init()
log.debug("Elastic job: zookeeper registry center init, server lists is: .", this.zkConfig.getServerLists());
Builder builder = CuratorFrameworkFactory.builder().connectString(this.zkConfig.getServerLists()).retryPolicy(new ExponentialBackoffRetry(this.zkConfig.getBaseSleepTimeMilliseconds(), this.zkConfig.getMaxRetries(), this.zkConfig.getMaxSleepTimeMilliseconds())).namespace(this.zkConfig.getNamespace());
if (0 != this.zkConfig.getSessionTimeoutMilliseconds())
builder.sessionTimeoutMs(this.zkConfig.getSessionTimeoutMilliseconds());
if (0 != this.zkConfig.getConnectionTimeoutMilliseconds())
builder.connectionTimeoutMs(this.zkConfig.getConnectionTimeoutMilliseconds());
if (!Strings.isNullOrEmpty(this.zkConfig.getDigest()))
builder.authorization("digest", this.zkConfig.getDigest().getBytes(Charsets.UTF_8)).aclProvider(new ACLProvider()
public List<ACL> getDefaultAcl()
return Ids.CREATOR_ALL_ACL;
public List<ACL> getAclForPath(String path)
return Ids.CREATOR_ALL_ACL;
);
this.client = builder.build();
this.client.start();
try
if (!this.client.blockUntilConnected(this.zkConfig.getMaxSleepTimeMilliseconds() * this.zkConfig.getMaxRetries(), TimeUnit.MILLISECONDS))
this.client.close();
throw new OperationTimeoutException();
catch (Exception var3)
RegExceptionHandler.handleException(var3);
这里创建了一个Builder对象,然后调用build方法。
public static class Builder
private EnsembleProvider ensembleProvider;
private int sessionTimeoutMs;
private int connectionTimeoutMs;
private int maxCloseWaitMs;
private RetryPolicy retryPolicy;
private ThreadFactory threadFactory;
private String namespace;
private List<AuthInfo> authInfos;
private byte[] defaultData;
private CompressionProvider compressionProvider;
private ZookeeperFactory zookeeperFactory;
private ACLProvider aclProvider;
private boolean canBeReadOnly;
private boolean useContainerParentsIfAvailable;
public CuratorFramework build()
return new CuratorFrameworkImpl(this);
build方法又调用了CuratorFrameworkImpl构造方法。
public CuratorFrameworkImpl(Builder builder)
ZookeeperFactory localZookeeperFactory = this.makeZookeeperFactory(builder.getZookeeperFactory());
this.client = new CuratorZookeeperClient(localZookeeperFactory, builder.getEnsembleProvider(), builder.getSessionTimeoutMs(), builder.getConnectionTimeoutMs(), new Watcher()
public void process(WatchedEvent watchedEvent)
CuratorEvent event = new CuratorEventImpl(CuratorFrameworkImpl.this, CuratorEventType.WATCHED, watchedEvent.getState().getIntValue(), CuratorFrameworkImpl.this.unfixForNamespace(watchedEvent.getPath()), (String)null, (Object)null, (Stat)null, (byte[])null, (List)null, watchedEvent, (List)null);
CuratorFrameworkImpl.this.processEvent(event);
, builder.getRetryPolicy(), builder.canBeReadOnly());
this.listeners = new ListenerContainer();
this.unhandledErrorListeners = new ListenerContainer();
this.backgroundOperations = new DelayQueue();
this.namespace = new NamespaceImpl(this, builder.getNamespace());
this.threadFactory = this.getThreadFactory(builder);
this.maxCloseWaitMs = builder.getMaxCloseWaitMs();
this.connectionStateManager = new ConnectionStateManager(this, builder.getThreadFactory());
this.compressionProvider = builder.getCompressionProvider();
this.aclProvider = builder.getAclProvider();
this.state = new AtomicReference(CuratorFrameworkState.LATENT);
this.useContainerParentsIfAvailable = builder.useContainerParentsIfAvailable();
byte[] builderDefaultData = builder.getDefaultData();
this.defaultData = builderDefaultData != null ? Arrays.copyOf(builderDefaultData, builderDefaultData.length) : new byte[0];
this.authInfos = this.buildAuths(builder);
this.failedDeleteManager = new FailedDeleteManager(this);
this.namespaceFacadeCache = new NamespaceFacadeCache(this);
然后调用了makeZookeeperFactory方法。
private ZookeeperFactory makeZookeeperFactory(final ZookeeperFactory actualZookeeperFactory)
return new ZookeeperFactory()
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
ZooKeeper zooKeeper = actualZookeeperFactory.newZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
Iterator var6 = CuratorFrameworkImpl.this.authInfos.iterator();
while(var6.hasNext())
AuthInfo auth = (AuthInfo)var6.next();
zooKeeper.addAuthInfo(auth.getScheme(), auth.getAuth());
return zooKeeper;
;
然后调用了actualZookeeperFactory中的newZookeeper方法。其实是调用了DefaultZookeeperFactory中的newZookeeper方法,
public ZooKeeper newZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws Exception
return new ZooKeeper(connectString, sessionTimeout, watcher, canBeReadOnly);
然后调用里面的new Zookeeper方法。
public ZooKeeper(String connectString, int sessionTimeout, Watcher watcher, boolean canBeReadOnly) throws IOException
this.watchManager = new ZooKeeper.ZKWatchManager((ZooKeeper.SyntheticClass_1)null);
LOG.info("Initiating client connection, connectString=" + connectString + " sessionTimeout=" + sessionTimeout + " watcher=" + watcher);
this.watchManager.defaultWatcher = watcher;
ConnectStringParser connectStringParser = new ConnectStringParser(connectString);
HostProvider hostProvider = new StaticHostProvider(connectStringParser.getServerAddresses());
this.cnxn = new ClientCnxn(connectStringParser.getChrootPath(), hostProvider, sessionTimeout, this, this.watchManager, getClientCnxnSocket(), canBeReadOnly);
this.cnxn.start();
注意其中ConnnectionStringParser类是用于转换你传递进来的zookeeper的连接信息的,会按照逗号进行切分。
public ConnectStringParser(String connectString)
int off = connectString.indexOf(47);
if (off >= 0)
String chrootPath = connectString.substring(off);
if (chrootPath.length() == 1)
this.chrootPath = null;
else
PathUtils.validatePath(chrootPath);
this.chrootPath = chrootPath;
connectString = connectString.substring(0, off);
else
this.chrootPath = null;
String[] hostsList = connectString.split(",");
String[] arr$ = hostsList;
int len$ = hostsList.length;
for(int i$ = 0; i$ < len$; ++i$)
String host = arr$[i$];
int port = 2181;
int pidx = host.lastIndexOf(58);
if (pidx >= 0)
if (pidx < host.length() - 1)
port = Integer.parseInt(host.substring(pidx + 1));
host = host.substring(0, pidx);
this.serverAddresses.add(InetSocketAddress.createUnresolved(host, port));
这里面serverAddresses里面封装了InetSocketAddress的信息。
ClientCnxn这个类中实际上是建立了连接
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, boolean canBeReadOnly) throws IOException
this(chrootPath, hostProvider, sessionTimeout, zooKeeper, watcher, clientCnxnSocket, 0L, new byte[16], canBeReadOnly);
public ClientCnxn(String chrootPath, HostProvider hostProvider, int sessionTimeout, ZooKeeper zooKeeper, ClientWatchManager watcher, ClientCnxnSocket clientCnxnSocket, long sessionId, byte[] sessionPasswd, boolean canBeReadOnly)
this.authInfo = new CopyOnWriteArraySet();
this.pendingQueue = new LinkedList();
this.outgoingQueue = new LinkedList();
this.sessionPasswd = new byte[16];
this.closing = false;
this.seenRwServerBefore = false;
this.eventOfDeath = new Object();
this.xid = 1;
this.state = States.NOT_CONNECTED;
this.zooKeeper = zooKeeper;
this.watcher = watcher;
this.sessionId = sessionId;
this.sessionPasswd = sessionPasswd;
this.sessionTimeout = sessionTimeout;
this.hostProvider = hostProvider;
this.chrootPath = chrootPath;
this.connectTimeout = sessionTimeout / hostProvider.size();
this.readTimeout = sessionTimeout * 2 / 3;
this.readOnly = canBeReadOnly;
this.sendThread = new ClientCnxn.SendThread(clientCnxnSocket);
this.eventThread = new ClientCnxn.EventThread();
这里创建了Thread线程类。
然后在调用start的时候,实际上调用了里面的方法。
public void start()
this.sendThread.start();
this.eventThread.start();
然后调用的start方法实际上会调用run方法。
public void run()
this.clientCnxnSocket.introduce(this, ClientCnxn.this.sessionId);
this.clientCnxnSocket.updateNow();
this.clientCnxnSocket.updateLastSendAndHeard();
long lastPingRwServer = System.currentTimeMillis();
boolean var4 = true;
while(ClientCnxn.this.state.isAlive())
try
if (!this.clientCnxnSocket.isConnected())
if (!this.isFirstConnect)
try
Thread.sleep((long)this.r.nextInt(1000));
catch (InterruptedException var9)
ClientCnxn.LOG.warn("Unexpected exception", var9);
if (ClientCnxn.this.closing || !ClientCnxn.this.state.isAlive())
break;
this.startConnect();
this.clientCnxnSocket.updateLastSendAndHeard();
而run方法里面,会进行判断,如果没初始化这个连接的话,那么就进行初始化。
private void startConnect() throws IOException
ClientCnxn.this.state = States.CONNECTING;
InetSocketAddress addr;
if (this.rwServerAddress != null)
addr = this.rwServerAddress;
this.rwServerAddress = null;
else
addr = ClientCnxn.this.hostProvider.next(1000L);
this.setName(this.getName().replaceAll("\\\\(.*\\\\)", "(" + addr.getHostName() + ":" + addr.getPort() + ")"));
if (ZooKeeperSaslClient.isEnabled())
try
String principalUserName = System.getProperty("zookeeper.sasl.client.username", "zookeeper");
ClientCnxn.this.zooKeeperSaslClient = new ZooKeeperSaslClient(principalUserName + "/" + addr.getHostName());
catch (LoginException var3)
ClientCnxn.LOG.warn("SASL configuration failed: " + var3 + " Will continue connection to Zookeeper server without " + "SASL authentication, if Zookeeper server allows it.");
ClientCnxn.this.eventThread.queueEvent(new WatchedEvent(EventType.None, KeeperState.AuthFailed, (String)null));
this.saslLoginFailed = true;
this.logStartConnect(addr);
this.clientCnxnSocket.connect(addr);
以上是关于ElasticJob源码部分解读-Zookeeper建立连接的主要内容,如果未能解决你的问题,请参考以下文章
源码分析ElasticJob任务错过机制(misfire)与幂等性