Ranger admin web 模块分析之policy查询缓存

Posted 顧棟

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Ranger admin web 模块分析之policy查询缓存相关的知识,希望对你有一定的参考价值。

ranger admin web 模块分析之policy查询缓存

admin web模块

开放的api 在类PublicAPIsv2中,这个类中的方法都是实时查询的。主要API涉及

  • ServiceDef Manipulation APIs
  • Service Manipulation APIs
  • Policy Manipulation APIs。

管理端WebUI的查询policy走ServiceREST中的getServicePolicies方法。

    @GET
    @Path("/policies/service/{id}")
    @Produces({"application/json", "application/xml"})
    public RangerPolicyList getServicePolicies(@PathParam("id") Long serviceId,
                                               @Context HttpServletRequest request) {
   ...
   ...
   ...
}

根据 传入的条件参数构造SearchFilter对象,如果 isAdminUserWithNoFilterParams(filter),则通过svcStore.getPaginatedServicePolicies(serviceId, filter)分页查询,通过ServiceDBStore对象的getServicePolicies查询XXService下的policies,其中通过缓存类RangerServicePoliciesCache下的getServicePolicies方法获取policy。

如果不采用缓存 会调用ServiceREST的getServicePolicies方法。

SearchFilter中的成员属性,默认排序是以id的asc进行

private Map<String, String> params;
private int                 startIndex;// 开始下标
private int                 maxRows    = Integer.MAX_VALUE; // 最大行数
private boolean             getCount   = true; // 是否计算总数
private String              sortBy; // 排序字段 只有(createTime,updateTime,policyId,policyName)
private String              sortType;//排序类型 asc desc

RangerServicePoliciesCache类详解

该类主要用于查询和缓存各Service的所有Policies。RangerServicePoliciesCache采用单例模式进行实例化,它的私有构造函数中会加载ranger.admin.policy.download.usecacheranger.admin.policy.download.cache.max.waittime.for.update两个配置数据

第一个代表是否采用缓存 默认开启

第二个是缓存getLast的锁等待时间Update的时间默认10s。

主要方法getServicePolicies(String serviceName, Long serviceId, ServiceStore serviceStore)

	public ServicePolicies getServicePolicies(String serviceName, Long serviceId, ServiceStore serviceStore) throws Exception {

		if (LOG.isDebugEnabled()) {
			LOG.debug("==> RangerServicePoliciesCache.getServicePolicies(" + serviceName + ", " + serviceId + ")");
		}

		ServicePolicies ret = null;

		if (StringUtils.isNotBlank(serviceName) && serviceId != null) {

			if (LOG.isDebugEnabled()) {
				LOG.debug("useServicePoliciesCache=" + useServicePoliciesCache);
			}

			ServicePolicies servicePolicies = null;
            // 是否开启缓存的使用
			if (!useServicePoliciesCache) {
				if (serviceStore != null) {
					try {
                        //方法体中正在去查询police的是ServiceDBStore的List<RangerPolicy> getServicePoliciesFromDb(XXService service)
						servicePolicies = serviceStore.getServicePolicies(serviceName);
					} catch (Exception exception) {
						LOG.error("getServicePolicies(" + serviceName + "): failed to get latest policies from service-store", exception);
					}
				} else {
					LOG.error("getServicePolicies(" + serviceName + "): failed to get latest policies as service-store is null!");
				}
			} else {
				ServicePoliciesWrapper servicePoliciesWrapper = null;
                // 代码块加锁
				synchronized (this) {
					servicePoliciesWrapper = servicePoliciesMap.get(serviceName);

					if (servicePoliciesWrapper != null) {
						if (!serviceId.equals(servicePoliciesWrapper.getServiceId())) {
							if (LOG.isDebugEnabled()) {
								LOG.debug("Service [" + serviceName + "] changed service-id from " + servicePoliciesWrapper.getServiceId()
										+ " to " + serviceId);
								LOG.debug("Recreating servicePoliciesWrapper for serviceName [" + serviceName + "]");
							}
							servicePoliciesMap.remove(serviceName);
							servicePoliciesWrapper = null;
						}
					}

					if (servicePoliciesWrapper == null) {
						servicePoliciesWrapper = new ServicePoliciesWrapper(serviceId);
						servicePoliciesMap.put(serviceName, servicePoliciesWrapper);
					}
				}

				if (serviceStore != null) {
                    // 从缓存中获取policy
					boolean refreshed = servicePoliciesWrapper.getLatestOrCached(serviceName, serviceStore);

					if(LOG.isDebugEnabled()) {
						LOG.debug("getLatestOrCached returned " + refreshed);
					}
				} else {
					LOG.error("getServicePolicies(" + serviceName + "): failed to get latest policies as service-store is null!");
				}

				servicePolicies = servicePoliciesWrapper.getServicePolicies();
			}

			ret = servicePolicies;

		} else {
			LOG.error("getServicePolicies() failed to get policies as serviceName is null or blank and/or serviceId is null!");
		}

		if (LOG.isDebugEnabled()) {
			LOG.debug("<== RangerServicePoliciesCache.getServicePolicies(" + serviceName + ", " + serviceId + "): count=" + ((ret == null || ret.getPolicies() == null) ? 0 : ret.getPolicies().size()));
		}

		return ret;
	}

ServiceDBStore的List getServicePoliciesFromDb(XXService service)

private List<RangerPolicy> getServicePoliciesFromDb(XXService service) throws Exception {
    if (LOG.isDebugEnabled()) {
        LOG.debug("==> ServiceDBStore.getServicePoliciesFromDb(" + service.getName() + ")");
    }
    
    // dao管理器和事务管理器去实例化一个RangerPolicyRetriever 通过policyRetriever查询RangerPolicy
    RangerPolicyRetriever policyRetriever = new RangerPolicyRetriever(daoMgr, txManager);

    List<RangerPolicy> ret = policyRetriever.getServicePolicies(service);

    if (LOG.isDebugEnabled()) {
        LOG.debug("<== ServiceDBStore.getServicePoliciesFromDb(" + service.getName() + "): count=" + ((ret == null) ? 0 : ret.size()));
    }

    return ret;
}

RangerPolicyRetriever.getServicePolicies(final XXService xService)方法体

	public List<RangerPolicy> getServicePolicies(final XXService xService) {
		String serviceName = xService == null ? null : xService.getName();
		Long   serviceId   = xService == null ? null : xService.getId();

		if(LOG.isDebugEnabled()) {
			LOG.debug("==> RangerPolicyRetriever.getServicePolicies(serviceName=" + serviceName + ", serviceId=" + serviceId + ")");
		}

		List<RangerPolicy> ret  = null;
		RangerPerfTracer   perf = null;

		if(RangerPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
			perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "RangerPolicyRetriever.getServicePolicies(serviceName=" + serviceName + ",serviceId=" + serviceId + ")");
		}

		if(xService != null) {
			if (txTemplate == null) {
				if (LOG.isDebugEnabled()) {
					LOG.debug("Transaction Manager is null; Retrieving policies in the existing transaction");
				}
                // RetrieverContext执行构造函数的时候就已经去查数据库得到xPolicies了
				RetrieverContext ctx = new RetrieverContext(xService);
				ret = ctx.getAllPolicies();
			} else {
				if (LOG.isDebugEnabled()) {
					LOG.debug("Retrieving policies in a new, read-only transaction");
				}
                // 存在事务则 通过新建一个线程来做 在线程中将事务变为只读事务去查询 还是通过实例化RetrieverContext实现查询
				PolicyLoaderThread t = new PolicyLoaderThread(txTemplate, xService);
				t.setDaemon(true);
				t.start();
				try {
                    // 将调用线程需要等线程t执行完才能继续执行  调用线程阻塞
					t.join();
					ret = t.getPolicies();
				} catch (InterruptedException ie) {
					LOG.error("Failed to retrieve policies in a new, read-only thread.", ie);
				}
			}
		} else {
			if(LOG.isDebugEnabled()) {
				LOG.debug("RangerPolicyRetriever.getServicePolicies(xService=" + xService + "): invalid parameter");
			}
		}

		RangerPerfTracer.log(perf);

		if(LOG.isDebugEnabled()) {
			LOG.debug("<== RangerPolicyRetriever.getServicePolicies(serviceName=" + serviceName + ", serviceId=" + serviceId + "): policyCount=" + (ret == null ? 0 : ret.size()));
		}

		return ret;
	}

RetrieverContext类是RangerPolicyRetriever的内部类,有关于serviceId,policyId的构造方法,查询数据库中的数据时在构造方法中实现的。

RangerServicePoliciesCache有一个私有内部类ServicePoliciesWrapper 缓存的policy是存在这个类的ServicePolicies对象中的。

getLatestOrCached方法体中用了ReentrantLock锁来控制控制查询。getLatest来控制缓存是否从数据库刷新

		boolean getLatestOrCached(String serviceName, ServiceStore serviceStore) throws Exception {
			boolean ret = false;

			try {
				ret = lock.tryLock(waitTimeInSeconds, TimeUnit.SECONDS);
				if (ret) {
					getLatest(serviceName, serviceStore);
				}
			} catch (InterruptedException exception) {
				LOG.error("getLatestOrCached:lock got interrupted..", exception);
			} finally {
				if (ret) {
					lock.unlock();
				}
			}

			return ret;
		}

		void getLatest(String serviceName, ServiceStore serviceStore) throws Exception {

			if (LOG.isDebugEnabled()) {
				LOG.debug("==> ServicePoliciesWrapper.getLatest(" + serviceName + ")");
			}

			if (LOG.isDebugEnabled()) {
				LOG.debug("Found ServicePolicies in-cache : " + (servicePolicies != null));
			}

			Long servicePolicyVersionInDb = serviceStore.getServicePolicyVersion(serviceName);
            // servicePolicyVersionInDb数据库中serviceName的policy版本号 在为空和传入的版本号不一致时进行从DB刷新
			if (servicePolicies == null || servicePolicyVersionInDb == null || !servicePolicyVersionInDb.equals(servicePolicies.getPolicyVersion())) {
				if (LOG.isDebugEnabled()) {
					LOG.debug("loading servicePolicies from db ... cachedServicePoliciesVersion=" + (servicePolicies != null ? servicePolicies.getPolicyVersion() : null) + ", servicePolicyVersionInDb=" + servicePolicyVersionInDb);
				}

				long startTimeMs = System.currentTimeMillis();

				ServicePolicies servicePoliciesFromDb = serviceStore.getServicePolicies(serviceName);

				long dbLoadTime = System.currentTimeMillis() - startTimeMs;

				if (dbLoadTime > longestDbLoadTimeInMs) {
					longestDbLoadTimeInMs = dbLoadTime;
				}
				updateTime = new Date();

				if (servicePoliciesFromDb != null) {
					if (servicePoliciesFromDb.getPolicyVersion() == null) {
						servicePoliciesFromDb.setPolicyVersion(0L);
					}
					servicePolicies = servicePoliciesFromDb;
                    // 会去清空policy中的一些属性置null
					pruneUnusedAttributes();
				}
			}

			if (LOG.isDebugEnabled()) {
				LOG.debug("<== ServicePoliciesWrapper.getLatest(" + serviceName + ")");
			}
		}

ServiceREST.getServicePoliciesIfUpdated详解

入参

@PathParam("serviceName") String serviceName, 
@QueryParam("lastKnownVersion") Long lastKnownVersion,
@DefaultValue("0") @QueryParam("lastActivationTime") Long lastActivationTime,
@QueryParam("pluginId") String pluginId,
@DefaultValue("") @QueryParam("clusterName") String clusterName,
@Context HttpServletRequest request

出参

ServicePolicies

逻辑

// 验证请求是否合法
serviceUtil.isValidateHttpsAuthentication(serviceName, request);

...
...

if (isValid) {
            if (lastKnownVersion == null) {
                lastKnownVersion = Long.valueOf(-1);
            }

            try {
                if (RangerPerfTracer.isPerfTraceEnabled(PERF_LOG)) {
                    perf = RangerPerfTracer.getPerfTracer(PERF_LOG, "ServiceREST.getServicePoliciesIfUpdated(serviceName=" + serviceName + ",lastKnownVersion=" + lastKnownVersion + ",lastActivationTime=" + lastActivationTime + ")");
                }
                // 去查询policy
                ServicePolicies servicePolicies = svcStore.getServicePoliciesIfUpdated(serviceName, lastKnownVersion);

                if (servicePolicies == null) {
                    downloadedVersion = lastKnownVersion;
                    httpCode = HttpServletResponse.SC_NOT_MODIFIED;
                    logMsg = "No change since last update";
                } else {
                    downloadedVersion = servicePolicies.getPolicyVersion();
                    ret = filterServicePolicies(servicePolicies);
                    httpCode = HttpServletResponse.SC_OK;
                    logMsg = "Returning " + (ret.getPolicies() != null ? ret.getPolicies().size() : 0) + " policies. Policy version=" + ret.getPolicyVersion();
                }
            } catch (Throwable excp) {
                LOG.error("getServicePoliciesIfUpdated(" + serviceName + ", " + lastKnownVersion + ", " + lastActivationTime + ") failed");

                httpCode = HttpServletResponse.SC_BAD_REQUEST;
                logMsg = excp.getMessage();
            } finally {
                createPolicyDownloadAudit(serviceName, lastKnownVersion, pluginId, httpCode, clusterName, request);
                RangerPerfTracer.log(perf);
            }
        }
isValidateHttpsAuthentication方法体
// 说明 serviceName 必须有值
if (serviceName == null || serviceName.isEmpty()) {
            LOG.error("ServiceName not provided");
            throw restErrorUtil.createRESTException("Unauthorized access.",
                    MessageEnums.OPER_NOT_ALLOWED_FOR_ENTITY);
}

RangerService service = null;
        try {
            // 查询表"x_service"和"x_service_config_map"
            service = svcStore.getServiceByName(serviceName);
        } catch (Exception e) {
            LOG.error("Requested Service not found. serviceName=" + serviceName);
            throw restErrorUtil.createRESTException("Service:" + serviceName + " not found",
                    MessageEnums.DATA_NOT_FOUND);
}

getServicePoliciesIfUpdated(String serviceName, Long lastKnownVersion)方法体
public ServicePolicies getServicePoliciesIfUpdated(String serviceName, Long lastKnownVersion) throws Exception {
        if (LOG.isDebugEnabled()) {
            LOG.debug("==> ServiceDBStore.getServicePoliciesIfUpdated(" + serviceName + ", " + lastKnownVersion + ")");
        }

        ServicePolicies ret = null<

以上是关于Ranger admin web 模块分析之policy查询缓存的主要内容,如果未能解决你的问题,请参考以下文章

忘记并重置 Ranger web ui 密码

安装Ranger Admin组件 Ranger HDFS 组件Ranger-usersync组件

Windows环境IDEA下Ranger1.2.0源码编译

Django之admin的使用及源码分析

数据治理(十四):Ranger同步Linux用户

大数据平台之权限管理组件 - Aapche Ranger