zookeeper源码解析--请求处理--PrepRequestProcessor
Posted raindayinrain
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了zookeeper源码解析--请求处理--PrepRequestProcessor相关的知识,希望对你有一定的参考价值。
PrepRequestProcessor
1.从待处理请求队列取出一个请求
2.如果是requestOfDeath
退出
3.置空请求对象的事务头,事务体。
4.请求未被过滤下,执行pRequestHelper
5.将请求交给下一级处理
pRequestHelper
对写请求设置
对请求进行合法性检查
按请求内容预先更新节点状态信息
为请求对象设置事务头,事务体,事务摘要
对请求处理中遭遇错误的,采用抛出异常方式
对错误的写请求,其事务头,事务体包含错误及错误信息,
对错误的请求,请求本身包含抛出异常信息。
如果请求类型为OpCode.createContainer/OpCode.create/OpCode.create2
动态创建CreateRequest对象
//基于原始请求对象,
//对请求进行合法性检查,
//预先对按请求进行节点更新&数据访问,
//设置好请求对象的事务头,事务体,事务摘要
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
如果请求类型为OpCode.createTTL
CreateTTLRequest createTtlRequest = new CreateTTLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, createTtlRequest, true);
如果请求类型为OpCode.deleteContainer/OpCode.delete
DeleteRequest deleteRequest = new DeleteRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, deleteRequest, true);
如果请求类型为OpCode.setData
SetDataRequest setDataRequest = new SetDataRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setDataRequest, true);
如果请求类型为OpCode.reconfig
ReconfigRequest reconfigRequest = new ReconfigRequest();
ByteBufferInputStream.byteBuffer2Record(request.request, reconfigRequest);
pRequest2Txn(request.type, zks.getNextZxid(), request, reconfigRequest, true);
如果请求类型为OpCode.setACL
SetACLRequest setAclRequest = new SetACLRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, setAclRequest, true);
如果请求类型为OpCode.check
CheckVersionRequest checkRequest = new CheckVersionRequest();
pRequest2Txn(request.type, zks.getNextZxid(), request, checkRequest, true);
如果请求类型为OpCode.multi
// 动态构造多操作请求,请求实体
MultiOperationRecord multiRequest = new MultiOperationRecord();
// 反向序列化得到数据实体
try
ByteBufferInputStream.byteBuffer2Record(request.request, multiRequest);
catch (IOException e)
// 抛出异常时,仍然设置事务头
request.setHdr(
new TxnHeader(
request.sessionId, request.cxid,
zks.getNextZxid(), Time.currentWallTime(), OpCode.multi));
// 再次抛出
throw e;
// 事务容器
List<Txn> txns = new ArrayList<Txn>();
// 获取下一zxid
long zxid = zks.getNextZxid();
KeeperException ke = null;
// 获取每个子操作&最新状态信息,子操作父节点&最新状态信息
Map<String, ChangeRecord> pendingChanges = getPendingChanges(multiRequest);
// 设置请求的事务头
request.setHdr(
new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), request.type));
// 对每个子操作逐个处理
for (Op op : multiRequest)
// 子操作的,请求实体
Record subrequest = op.toRequestRecord();
int type;
Record txn;
// 表示之前某个子操作遭遇异常
if (ke != null)
// 设置类别
type = OpCode.error;
// 设置事务体
txn = new ErrorTxn(Code.RUNTIMEINCONSISTENCY.intValue());
else
try
//基于原始请求对象,
//对请求进行合法性检查,
//预先对按请求进行节点更新&数据访问,
//设置好请求对象的事务头,事务体,事务摘要
pRequest2Txn(op.getType(), zxid, request, subrequest, false);
// 子操作类别
type = op.getType();
// 获得事务体
txn = request.getTxn();
// 如果子操作在于异常
catch (KeeperException e)
// 异常记录
ke = e;
// 类别记录
type = OpCode.error;
// 用于记录失败的事务体
txn = new ErrorTxn(e.code().intValue());
if (e.code().intValue() > Code.APIERROR.intValue())
LOG.info(
"Got user-level KeeperException when processing aborting"
+ " remaining multi ops. Error Path: Error:",
+ request.toString(), e.getPath(), e.getMessage());
// 将异常信息设置到请求对象
request.setException(e);
// 一旦一个子操作遭遇异常,执行回滚
// 回滚需要保证复合操作每个子操作对ChangeRecord的影响撤销。
// 把ChangeRecord恢复到执行复合操作前
rollbackPendingChanges(zxid, pendingChanges);
try (ByteArrayOutputStream baos = new ByteArrayOutputStream())
BinaryOutputArchive boa = BinaryOutputArchive.getArchive(baos);
// 事务体对象序列化
txn.serialize(boa, "request");
ByteBuffer bb = ByteBuffer.wrap(baos.toByteArray());
// 存储到Txn对象集合
txns.add(new Txn(type, bb.array()));
// 复合请求的事务体,也包含多个Txn分别对应每个子操作
request.setTxn(new MultiTxn(txns));
if (digestEnabled)
// 设置事务摘要
setTxnDigest(request);
如果请求类型为OpCode.createSession/OpCode.closeSession
// 本地会话不需追踪?
if (!request.isLocalSession())
pRequest2Txn(request.type, zks.getNextZxid(), request, null, true);
如果请求类型为OpCode.sync/OpCode.exists/OpCode.getData/OpCode.getACL
OpCode.getChildren/OpCode.getAllChildrenNumber/OpCode.getChildren2/OpCode.ping
OpCode.setWatches/OpCode.setWatches2/OpCode.checkWatches/OpCode.removeWatches
OpCode.getEphemerals/OpCode.multiRead/OpCode.addWatch/OpCode.addWatch
// 会话检查
// 只读请求不需要
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
pRequest2TxnCreate(type, request, record, deserialize);
// 做新建节点前的合法性,权限,配额检查
// 一切合法下,更新父节点,新建节点状态信息。
// 设置好本请求对象的事务头,事务体,事务摘要信息
if (deserialize)
// 从字节流得到数据对象
ByteBufferInputStream.byteBuffer2Record(request.request, record);
// 从数据对象提取信息
if (type == OpCode.createTTL)
CreateTTLRequest createTtlRequest = (CreateTTLRequest) record;
flags = createTtlRequest.getFlags();
path = createTtlRequest.getPath();
acl = createTtlRequest.getAcl();
data = createTtlRequest.getData();
ttl = createTtlRequest.getTtl();
else
CreateRequest createRequest = (CreateRequest) record;
flags = createRequest.getFlags();
path = createRequest.getPath();
acl = createRequest.getAcl();
data = createRequest.getData();
ttl = -1;
CreateMode createMode = CreateMode.fromFlag(flags);
// 合法性验证,不合法通过抛出特定类型包含特定信息异常来层层回退
validateCreateRequest(path, createMode, request, ttl);
// 父路径
String parentPath = validatePathForCreate(path, request.sessionId);
// 权限
List<ACL> listACL = fixupACL(path, request.authInfo, acl);
// 父亲路径最新状态
ChangeRecord parentRecord = getRecordForPath(parentPath);
// 权限检查
zks.checkACL(
request.cnxn, parentRecord.acl,
ZooDefs.Perms.CREATE, request.authInfo, path, listACL);
// 父亲路径下遭遇改变次数
int parentCVersion = parentRecord.stat.getCversion();
// 新节点完整路径
if (createMode.isSequential())
path = path + String.format(Locale.ENGLISH, "%010d", parentCVersion);
// 验证路径合法性
validatePath(path, request.sessionId);
try
// 如果要创建的路径已经存在了
if (getRecordForPath(path) != null)
// 抛出特定类型,包含特定信息的异常
throw new KeeperException.NodeExistsException(path);
catch (KeeperException.NoNodeException e)
boolean ephemeralParent = EphemeralType.get(
parentRecord.stat.getEphemeralOwner()) == EphemeralType.NORMAL;
// 这类父路径不可包含子节点
if (ephemeralParent)
throw new KeeperException.NoChildrenForEphemeralsException(path);
// 更新父节点遭遇变化次数
int newCversion = parentRecord.stat.getCversion() + 1;
// 检查配额限制,配额限制超出时,通过抛出特定类型,包含特定信息的异常来层层退出
zks.checkQuota(path, null, data, OpCode.create);
// 设置事务体
if (type == OpCode.createContainer)
request.setTxn(new CreateContainerTxn(path, data, listACL, newCversion));
else if (type == OpCode.createTTL)
request.setTxn(new CreateTTLTxn(path, data, listACL, newCversion, ttl));
else
request.setTxn(
new CreateTxn(path, data, listACL, createMode.isEphemeral(), newCversion));
TxnHeader hdr = request.getHdr();
long ephemeralOwner = 0;
// 创建模式--创建容器节点
if (createMode.isContainer())
ephemeralOwner = EphemeralType.CONTAINER_EPHEMERAL_OWNER;
// 创建模式--创建自带生命期节点
else if (createMode.isTTL())
ephemeralOwner = EphemeralType.TTL.toEphemeralOwner(ttl);
// 创建模式--创建临时节点【会话关闭,会话所属临时节点自动销毁】
else if (createMode.isEphemeral())
ephemeralOwner = request.sessionId;
// 创建节点状态
StatPersisted s
= DataTree.createStat(hdr.getZxid(), hdr.getTime(), ephemeralOwner);
// 基于原有父节点的parentRecord得到新的
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount++;
// 记录了此节点下孩子累计新增孩子数量-变化为节点孩子变化
parentRecord.stat.setCversion(newCversion);
// 记录了最后修改此节点状态的事务id
parentRecord.stat.setPzxid(request.getHdr().getZxid());
// 更新整体和节点自身摘要信息
parentRecord.precalculatedDigest
= precalculateDigest(DigestOpCode.UPDATE, parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
// 一个节点可能包含多个关联项
zks.outstandingChanges.add(c);
// 一个节点,只有一个路径,也就只记录一个关联项,反映此节点最新状态
zks.outstandingChangesForPath.put(c.path, c);
// 反映新建节点
ChangeRecord nodeRecord
= new ChangeRecord(request.getHdr().getZxid(), path, s, 0, listACL);
nodeRecord.data = data;
nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.ADD, path, nodeRecord.data, s);
setTxnDigest(request, nodeRecord.precalculatedDigest);
request.setTxnDigest(new TxnDigest(digestCalculator.getDigestVersion(), preCalculatedDigest.treeDigest));
addChangeRecord(nodeRecord);
pRequest2Txn(request.type, zks.getNextZxid(), request, create2Request, true);
基于原始请求对象,
对请求进行合法性检查,
预先对按请求进行节点更新&数据访问,
设置好请求对象的事务头,事务体,事务摘要
如果请求对象事务头为null
request.setHdr(
new TxnHeader(request.sessionId, request.cxid, zxid, Time.currentWallTime(), type));
如果类别为OpCode.create/OpCode.create2/OpCode.createTTL/OpCode.createContainer
// 做新建节点前的合法性,权限,配额检查
// 一切合法下,更新父节点,新建节点状态信息。
// 设置好本请求对象的事务头,事务体,事务摘要信息
pRequest2TxnCreate(type, request, record, deserialize);
如果类别为OpCode.deleteContainer
String path = new String(request.request.array(), UTF_8);
// 获取父路径,及验证路径
String parentPath = getParentPathAndValidate(path);
// 获取删除路径最新信息
ChangeRecord nodeRecord = getRecordForPath(path);
// 如果删除节点包含子节点,不可删除
if (nodeRecord.childCount > 0)
throw new KeeperException.NotEmptyException(path);
// 要删除节点类别非法
if (EphemeralType.get(nodeRecord.stat.getEphemeralOwner())
== EphemeralType.NORMAL)
throw new KeeperException.BadVersionException(path);
// 获取父节点最新状态
ChangeRecord parentRecord = getRecordForPath(parentPath);
// 设置改请求的事务体
request.setTxn(new DeleteTxn(path));
// 基于parentRecord得到新的
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
parentRecord.childCount--;
parentRecord.stat.setPzxid(request.getHdr().getZxid());
parentRecord.precalculatedDigest
= precalculateDigest(DigestOpCode.UPDATE,
parentPath, parentRecord.data, parentRecord.stat);
// 完成对父节点最新状态的更新
addChangeRecord(parentRecord);
// 对删除节点,设置改节点ChangeRecord
// 通过改ChangeRecord可辨别节点已被删除
nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
// 更新摘要信息
nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
// 设置事务摘要
setTxnDigest(request, nodeRecord.precalculatedDigest);
// 完成对删除节点的状态更新
addChangeRecord(nodeRecord);
如果类别为OpCode.delete
// 会话检查
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
DeleteRequest deleteRequest = (DeleteRequest) record;
// 反向序列化得到数据实体
if (deserialize)
ByteBufferInputStream.byteBuffer2Record(request.request, deleteRequest);
// 要删除路径
String path = deleteRequest.getPath();
// 要删除路径父路径
String parentPath = getParentPathAndValidate(path);
// 父路径最新状态
ChangeRecord parentRecord = getRecordForPath(parentPath);
// 权限检查
zks.checkACL(
request.cnxn, parentRecord.acl, ZooDefs.Perms.DELETE, request.authInfo, path, null);
// 要删除路径自身
// 路径不存在时,通过设置异常类型,异常信息,层层退出
ChangeRecord nodeRecord = getRecordForPath(path);
// 版本检查
checkAndIncVersion(nodeRecord.stat.getVersion(), deleteRequest.getVersion(), path);
// 有孩子的节点不可被删除
if (nodeRecord.childCount > 0)
throw new KeeperException.NotEmptyException(path);
// 设置事务体
request.setTxn(new DeleteTxn(path));
// 基于parentRecord得到新的
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
// 更新父节点最新状态信息
// 状态--孩子数量
parentRecord.childCount--;
// 状态--最后对其修改的事务的id
parentRecord.stat.setPzxid(request.getHdr().getZxid());
// 更新摘要
parentRecord.precalculatedDigest
= precalculateDigest(DigestOpCode.UPDATE,
parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
// 为要删除节点,动态构造ChangeRecord
nodeRecord = new ChangeRecord(request.getHdr().getZxid(), path, null, -1, null);
// 更新摘要
nodeRecord.precalculatedDigest = precalculateDigest(DigestOpCode.REMOVE, path);
// 设置事务摘要
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
如果类别为OpCode.setData
// 检查会话
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
// 事务体
SetDataRequest setDataRequest = (SetDataRequest) record;
// 反向序列化得到数据对象
if (deserialize)
ByteBufferInputStream.byteBuffer2Record(request.request, setDataRequest);
// 路径
path = setDataRequest.getPath();
// 验证路径合法性
validatePath(path, request.sessionId);
// 获取此路径最新信息
nodeRecord = getRecordForPath(path);
// 权限检查
zks.checkACL(
request.cnxn, nodeRecord.acl, ZooDefs.Perms.WRITE, request.authInfo, path, null);
// 配额检查
zks.checkQuota(path, nodeRecord.data, setDataRequest.getData(), OpCode.setData);
// 版本检查
int newVersion =
checkAndIncVersion(nodeRecord.stat.getVersion(), setDataRequest.getVersion(), path);
// 设置事务体
request.setTxn(new SetDataTxn(path, setDataRequest.getData(), newVersion));
// 基于nodeRecode得到新的
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
// 新的相比旧的,版本变了
nodeRecord.stat.setVersion(newVersion);
// 修改时间变了
nodeRecord.stat.setMtime(request.getHdr().getTime());
// 最后修改的事务id变了
nodeRecord.stat.setMzxid(zxid);
// 节点数据内容变了
nodeRecord.data = setDataRequest.getData();
// 更新摘要
nodeRecord.precalculatedDigest
= precalculateDigest(DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
// 设置事务摘要
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
如果类别为OpCode.setACL
// 检查会话
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
// 事务体
SetACLRequest setAclRequest = (SetACLRequest) record;
// 反向序列化得到数据对象
if (deserialize)
ByteBufferInputStream.byteBuffer2Record(request.request, setAclRequest);
// 路径
path = setAclRequest.getPath();
// 验证路径
validatePath(path, request.sessionId);
// 权限列表
List<ACL> listACL = fixupACL(path, request.authInfo, setAclRequest.getAcl());
// 路径最新状态
nodeRecord = getRecordForPath(path);
// 权限检查
zks.checkACL(
request.cnxn, nodeRecord.acl, ZooDefs.Perms.ADMIN, request.authInfo, path, listACL);
// 版本检查
newVersion
= checkAndIncVersion(nodeRecord.stat.getAversion(), setAclRequest.getVersion(), path);
// 设置事务体
request.setTxn(new SetACLTxn(path, listACL, newVersion));
// 基于nodeRecord得到新的
nodeRecord = nodeRecord.duplicate(request.getHdr().getZxid());
// 设置新的版本
nodeRecord.stat.setAversion(newVersion);
// 更新摘要信息
nodeRecord.precalculatedDigest
= precalculateDigest(DigestOpCode.UPDATE, path, nodeRecord.data, nodeRecord.stat);
// 设置事务摘要
setTxnDigest(request, nodeRecord.precalculatedDigest);
addChangeRecord(nodeRecord);
如果类型为OpCode.createSession
request.request.rewind();
// 获取超时时间?
int to = request.request.getInt();
// 设置事务体
request.setTxn(new CreateSessionTxn(to));
request.request.rewind();
// 会话追踪
zks.sessionTracker.trackSession(request.sessionId, to);
// 会话拥有者,为请求拥有者
zks.setOwner(request.sessionId, request.getOwner());
如果类型为OpCode.closeSession
// 当前时间
long startTime = Time.currentElapsedTime();
synchronized (zks.outstandingChanges)
// 获取属于此会话的所有临时节点
Set<String> es = zks.getZKDatabase().getEphemerals(request.sessionId);
// 对所有排队中ChangeRecord
for (ChangeRecord c : zks.outstandingChanges)
// 表示待实施请求中,已经包含删除某路径请求
if (c.stat == null)
// 将此会话下已经规划为删除的路径从这里的临时节点中去掉,避免重复删除
es.remove(c.path);
// 表示待实施请求中,包含某个路径属于此临时会话
else if (c.stat.getEphemeralOwner() == request.sessionId)
// 将改路径加入这里的临时节点。放置漏掉属于此会话的待实施的临时节点。
es.add(c.path);
// 对此会话的每个临时节点,对其父节点进行更新
for (String path2Delete : es)
if (digestEnabled)
// 获取删除节点,父节点&验证
parentPath = getParentPathAndValidate(path2Delete);
// 获取父节点最新状态
parentRecord = getRecordForPath(parentPath);
// 基于parentRecord得到新的
parentRecord = parentRecord.duplicate(request.getHdr().getZxid());
// 更新修改父节点孩子的事务id
parentRecord.stat.setPzxid(request.getHdr().getZxid());
// 更新摘要
parentRecord.precalculatedDigest
= precalculateDigest(DigestOpCode.UPDATE,
parentPath, parentRecord.data, parentRecord.stat);
addChangeRecord(parentRecord);
// 动态构造要删除节点的nodeRecord
nodeRecord =
new ChangeRecord(request.getHdr().getZxid(), path2Delete, null, 0, null);
// 更新摘要
nodeRecord.precalculatedDigest
= precalculateDigest(DigestOpCode.REMOVE, path2Delete);
addChangeRecord(nodeRecord);
if (ZooKeeperServer.isCloseSessionTxnEnabled())
// 设置事务体
request.setTxn(new CloseSessionTxn(new ArrayList<String>(es)));
// 设置会话状态为关闭中
zks.sessionTracker.setSessionClosing(request.sessionId);
ServerMetrics.getMetrics()
.CLOSE_SESSION_PREP_TIME.add(Time.currentElapsedTime() - startTime);
如果类型为OpCode.check
// 会话检查
zks.sessionTracker.checkSession(request.sessionId, request.getOwner());
CheckVersionRequest checkVersionRequest = (CheckVersionRequest) record;
// 反向序列化得到数据实体
if (deserialize)
ByteBufferInputStream.byteBuffer2Record(request.request, checkVersionRequest);
// 获取路径
path = checkVersionRequest.getPath();
// 验证路径
validatePath(path, request.sessionId);
// 获取路径最新信息
nodeRecord = getRecordForPath(path);
// 权限检查
zks.checkACL(
request.cnxn, nodeRecord.acl, ZooDefs.Perms.READ, request.authInfo, path, null);
// 设置事务体
request.setTxn(
new CheckVersionTxn(path,
checkAndIncVersion(nodeRecord.stat.getVersion(),
checkVersionRequest.getVersion(), null));
以上是关于zookeeper源码解析--请求处理--PrepRequestProcessor的主要内容,如果未能解决你的问题,请参考以下文章