Ranger admin web 模块分析之policy查询缓存
Posted 顧棟
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Ranger admin web 模块分析之policy查询缓存相关的知识,希望对你有一定的参考价值。
文章目录
ranger admin web 模块分析之policy查询缓存
admin web模块
开放的api 在类PublicAPIsv2中,这个类中的方法都是实时查询的。主要API涉及
- ServiceDef Manipulation APIs
- Service Manipulation APIs
- Policy Manipulation APIs。
管理端WebUI的查询policy走ServiceREST中的getServicePolicies方法。
@GET
@Path("/policies/service/{id}")
@Produces({"application/json", "application/xml"})
public RangerPolicyList getServicePolicies(@PathParam("id") Long serviceId,
@Context HttpServletRequest request) {
...
...
...
}
根据 传入的条件参数构造SearchFilter对象,如果 isAdminUserWithNoFilterParams(filter)
,则通过svcStore.getPaginatedServicePolicies(serviceId, filter)
分页查询,通过ServiceDBStore对象的getServicePolicies
查询XXService下的policies,其中通过缓存类RangerServicePoliciesCache下的getServicePolicies
方法获取policy。
如果不采用缓存 会调用ServiceREST的getServicePolicies
方法。
SearchFilter中的成员属性,默认排序是以id的asc进行
private Map<String, String> params;
private int startIndex;// 开始下标
private int maxRows = Integer.MAX_VALUE; // 最大行数
private boolean getCount = true; // 是否计算总数
private String sortBy; // 排序字段 只有(createTime,updateTime,policyId,policyName)
private String sortType;//排序类型 asc desc
RangerServicePoliciesCache类详解
该类主要用于查询和缓存各Service的所有Policies。RangerServicePoliciesCache采用单例模式进行实例化,它的私有构造函数中会加载ranger.admin.policy.download.usecache
和ranger.admin.policy.download.cache.max.waittime.for.update
两个配置数据
第一个代表是否采用缓存 默认开启
第二个是缓存getLast的锁等待时间Update的时间默认10s。
主要方法getServicePolicies(String serviceName, Long serviceId, ServiceStore serviceStore)
public ServicePolicies getServicePolicies(String serviceName, Long serviceId, ServiceStore serviceStore) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("==> RangerServicePoliciesCache.getServicePolicies(" + serviceName + ", " + serviceId + ")");
}
ServicePolicies ret = null;
if (StringUtils.isNotBlank(serviceName) && serviceId != null) {
if (LOG.isDebugEnabled()) {
LOG.debug("useServicePoliciesCache=" + useServicePoliciesCache);
}
ServicePolicies servicePolicies = null;
// 是否开启缓存的使用
if (!useServicePoliciesCache) {
if (serviceStore != null) {
try {
//方法体中正在去查询police的是ServiceDBStore的List<RangerPolicy> getServicePoliciesFromDb(XXService service)
servicePolicies = serviceStore.getServicePolicies(serviceName);
} catch (Exception exception) {
LOG.error("getServicePolicies(" + serviceName + "): failed to get latest policies from service-store", exception);
}
} else {
LOG.error("getServicePolicies(" + serviceName + "): failed to get latest policies as service-store is null!");
}
} else {
ServicePoliciesWrapper servicePoliciesWrapper = null;
// 代码块加锁
synchronized (this) {
servicePoliciesWrapper = servicePoliciesMap.get(serviceName);
if (servicePoliciesWrapper != null) {
if (!serviceId.equals(servicePoliciesWrapper.getServiceId())) {
if (LOG.isDebugEnabled()) {
LOG.debug("Service [" + serviceName + "] changed service-id from " + servicePoliciesWrapper.getServiceId()
+ " to " + serviceId);
LOG.debug("Recreating servicePoliciesWrapper for serviceName [" + serviceName + "]");
}
servicePoliciesMap.remove(serviceName);
servicePoliciesWrapper = null;
}
}
if (servicePoliciesWrapper == null) {
servicePoliciesWrapper = new ServicePoliciesWrapper(serviceId);
servicePoliciesMap.put(serviceName, servicePoliciesWrapper);
}
}
if (serviceStore != null) {
// 从缓存中获取policy
boolean refreshed = servicePoliciesWrapper.getLatestOrCached(serviceName, serviceStore);
if(LOG.isDebugEnabled()) {
LOG.debug("getLatestOrCached returned " + refreshed);
}
} else {
LOG.error("getServicePolicies(" + serviceName + "): failed to get latest policies as service-store is null!");
}
servicePolicies = servicePoliciesWrapper.getServicePolicies();
}
ret = servicePolicies;
} else {
LOG.error("getServicePolicies() failed to get policies as serviceName is null or blank and/or serviceId is null!");
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== RangerServicePoliciesCache.getServicePolicies(" + serviceName + ", " + serviceId + "): count=" + ((ret == null || ret.getPolicies() == null) ? 0 : ret.getPolicies().size()));
}
return ret;
}
ServiceDBStore的List getServicePoliciesFromDb(XXService service)
private List<RangerPolicy> getServicePoliciesFromDb(XXService service) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("==> ServiceDBStore.getServicePoliciesFromDb(" + service.getName() + ")");
}
// dao管理器和事务管理器去实例化一个RangerPolicyRetriever 通过policyRetriever查询RangerPolicy
RangerPolicyRetriever policyRetriever = new RangerPolicyRetriever(daoMgr, txManager);
List<RangerPolicy> ret = policyRetriever.getServicePolicies(service);
if (LOG.isDebugEnabled()) {
LOG.debug("<== ServiceDBStore.getServicePoliciesFromDb(" + service.getName() + "): count=" + ((ret == null) ? 0 : ret.size()));
}
return ret;
}
RangerPolicyRetriever.getServicePolicies(final XXService xService)方法体
public List<RangerPolicy> getServicePolicies(final XXService xService) {
String serviceName = xService == null ? null : xService.getName();
Long serviceId = xService == null ? null : xService.getId();
if(LOG.isDebugEnabled()) {
LOG.debug("==> RangerPolicyRetriever.getServicePolicies(serviceName=" + serviceName + ", serviceId=" + serviceId + ")");
}
List<RangerPolicy> ret = null;
RangerPerfTracer perf = null;
if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "RangerPolicyRetriever.getServicePolicies(serviceName=" + serviceName + ",serviceId=" + serviceId + ")");
}
if(xService != null) {
if (txTemplate == null) {
if (LOG.isDebugEnabled()) {
LOG.debug("Transaction Manager is null; Retrieving policies in the existing transaction");
}
// RetrieverContext执行构造函数的时候就已经去查数据库得到xPolicies了
RetrieverContext ctx = new RetrieverContext(xService);
ret = ctx.getAllPolicies();
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Retrieving policies in a new, read-only transaction");
}
// 存在事务则 通过新建一个线程来做 在线程中将事务变为只读事务去查询 还是通过实例化RetrieverContext实现查询
PolicyLoaderThread t = new PolicyLoaderThread(txTemplate, xService);
t.setDaemon(true);
t.start();
try {
// 将调用线程需要等线程t执行完才能继续执行 调用线程阻塞
t.join();
ret = t.getPolicies();
} catch (InterruptedException ie) {
LOG.error("Failed to retrieve policies in a new, read-only thread.", ie);
}
}
} else {
if(LOG.isDebugEnabled()) {
LOG.debug("RangerPolicyRetriever.getServicePolicies(xService=" + xService + "): invalid parameter");
}
}
RangerPerfTracer.log(perf);
if(LOG.isDebugEnabled()) {
LOG.debug("<== RangerPolicyRetriever.getServicePolicies(serviceName=" + serviceName + ", serviceId=" + serviceId + "): policyCount=" + (ret == null ? 0 : ret.size()));
}
return ret;
}
RetrieverContext类是RangerPolicyRetriever的内部类,有关于serviceId,policyId的构造方法,查询数据库中的数据时在构造方法中实现的。
RangerServicePoliciesCache有一个私有内部类ServicePoliciesWrapper 缓存的policy是存在这个类的ServicePolicies对象中的。
getLatestOrCached方法体中用了ReentrantLock锁来控制控制查询。getLatest来控制缓存是否从数据库刷新
boolean getLatestOrCached(String serviceName, ServiceStore serviceStore) throws Exception {
boolean ret = false;
try {
ret = lock.tryLock(waitTimeInSeconds, TimeUnit.SECONDS);
if (ret) {
getLatest(serviceName, serviceStore);
}
} catch (InterruptedException exception) {
LOG.error("getLatestOrCached:lock got interrupted..", exception);
} finally {
if (ret) {
lock.unlock();
}
}
return ret;
}
void getLatest(String serviceName, ServiceStore serviceStore) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("==> ServicePoliciesWrapper.getLatest(" + serviceName + ")");
}
if (LOG.isDebugEnabled()) {
LOG.debug("Found ServicePolicies in-cache : " + (servicePolicies != null));
}
Long servicePolicyVersionInDb = serviceStore.getServicePolicyVersion(serviceName);
// servicePolicyVersionInDb数据库中serviceName的policy版本号 在为空和传入的版本号不一致时进行从DB刷新
if (servicePolicies == null || servicePolicyVersionInDb == null || !servicePolicyVersionInDb.equals(servicePolicies.getPolicyVersion())) {
if (LOG.isDebugEnabled()) {
LOG.debug("loading servicePolicies from db ... cachedServicePoliciesVersion=" + (servicePolicies != null ? servicePolicies.getPolicyVersion() : null) + ", servicePolicyVersionInDb=" + servicePolicyVersionInDb);
}
long startTimeMs = System.currentTimeMillis();
ServicePolicies servicePoliciesFromDb = serviceStore.getServicePolicies(serviceName);
long dbLoadTime = System.currentTimeMillis() - startTimeMs;
if (dbLoadTime > longestDbLoadTimeInMs) {
longestDbLoadTimeInMs = dbLoadTime;
}
updateTime = new Date();
if (servicePoliciesFromDb != null) {
if (servicePoliciesFromDb.getPolicyVersion() == null) {
servicePoliciesFromDb.setPolicyVersion(0L);
}
servicePolicies = servicePoliciesFromDb;
// 会去清空policy中的一些属性置null
pruneUnusedAttributes();
}
}
if (LOG.isDebugEnabled()) {
LOG.debug("<== ServicePoliciesWrapper.getLatest(" + serviceName + ")");
}
}
ServiceREST.getServicePoliciesIfUpdated详解
入参
@PathParam("serviceName") String serviceName,
@QueryParam("lastKnownVersion") Long lastKnownVersion,
@DefaultValue("0") @QueryParam("lastActivationTime") Long lastActivationTime,
@QueryParam("pluginId") String pluginId,
@DefaultValue("") @QueryParam("clusterName") String clusterName,
@Context HttpServletRequest request
出参
ServicePolicies
逻辑
// 验证请求是否合法
serviceUtil.isValidateHttpsAuthentication(serviceName, request);
...
...
if (isValid) {
if (lastKnownVersion == null) {
lastKnownVersion = Long.valueOf(-1);
}
try {
if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "ServiceREST.getServicePoliciesIfUpdated(serviceName=" + serviceName + ",lastKnownVersion=" + lastKnownVersion + ",lastActivationTime=" + lastActivationTime + ")");
}
// 去查询policy
ServicePolicies servicePolicies = svcStore.getServicePoliciesIfUpdated(serviceName, lastKnownVersion);
if (servicePolicies == null) {
downloadedVersion = lastKnownVersion;
httpCode = HttpServletResponse.SC_NOT_MODIFIED;
logMsg = "No change since last update";
} else {
downloadedVersion = servicePolicies.getPolicyVersion();
ret = filterServicePolicies(servicePolicies);
httpCode = HttpServletResponse.SC_OK;
logMsg = "Returning " + (ret.getPolicies() != null ? ret.getPolicies().size() : 0) + " policies. Policy version=" + ret.getPolicyVersion();
}
} catch (Throwable excp) {
LOG.error("getServicePoliciesIfUpdated(" + serviceName + ", " + lastKnownVersion + ", " + lastActivationTime + ") failed");
httpCode = HttpServletResponse.SC_BAD_REQUEST;
logMsg = excp.getMessage();
} finally {
createPolicyDownloadAudit(serviceName, lastKnownVersion, pluginId, httpCode, clusterName, request);
RangerPerfTracer.log(perf);
}
}
isValidateHttpsAuthentication方法体
// 说明 serviceName 必须有值
if (serviceName == null || serviceName.isEmpty()) {
LOG.error("ServiceName not provided");
throw restErrorUtil.createRESTException("Unauthorized access.",
MessageEnums.OPER_NOT_ALLOWED_FOR_ENTITY);
}
RangerService service = null;
try {
// 查询表"x_service"和"x_service_config_map"
service = svcStore.getServiceByName(serviceName);
} catch (Exception e) {
LOG.error("Requested Service not found. serviceName=" + serviceName);
throw restErrorUtil.createRESTException("Service:" + serviceName + " not found",
MessageEnums.DATA_NOT_FOUND);
}
getServicePoliciesIfUpdated(String serviceName, Long lastKnownVersion)方法体
public ServicePolicies getServicePoliciesIfUpdated(String serviceName, Long lastKnownVersion) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("==> ServiceDBStore.getServicePoliciesIfUpdated(" + serviceName + ", " + lastKnownVersion + ")");
}
ServicePolicies ret = null<以上是关于Ranger admin web 模块分析之policy查询缓存的主要内容,如果未能解决你的问题,请参考以下文章