ThreadLocal 与 @Aspect 的组合使用(线程绑定)
Posted 张志翔 ̮
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadLocal 与 @Aspect 的组合使用(线程绑定)相关的知识,希望对你有一定的参考价值。
最近项目中看到了一种 ThreadLocal 与 @Aspect 的组合使用方式,也就是平常我们所说的线程绑定,我认为是很有意思的一种方式,特此记录便于日后查阅。
1、RateLimitPolicyThreadLocal 层
package com.fenqile.platform.manage.apigateway.service.threadlocal;
/**
* 限流上下文存储工具
* @author gingerchen
* @date 2021/8/6 10:28
*/
public class RateLimitPolicyThreadLocal
private RateLimitPolicyThreadLocal()
private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>();
public static Integer getRecordIndex()
return threadLocal.get();
public static void clear()
threadLocal.remove();
public static void setConfigInfo(Integer id)
if (threadLocal.get() == null)
threadLocal.set(id);
2、RateLimitPolicyMqLogAspect 层
package com.fenqile.platform.manage.apigateway.service.aspect;
import com.fenqile.platform.manage.apigateway.dao.hystrixconfig.RateLimitPolicyLogMapper;
import com.fenqile.platform.manage.apigateway.dao.hystrixconfig.RateLimitPolicyMapper;
import com.fenqile.platform.manage.apigateway.domain.hystrixconfig.RateLimitPolicy;
import com.fenqile.platform.manage.apigateway.domain.hystrixconfig.RateLimitPolicyLog;
import com.fenqile.platform.manage.apigateway.manager.RateLimitPolicyManager;
import com.fenqile.platform.manage.apigateway.service.impl.GatewayConfigManager;
import com.fenqile.platform.manage.apigateway.service.threadlocal.RateLimitPolicyThreadLocal;
import com.fenqile.platform.manage.common.constant.GatewayConstant;
import com.google.common.collect.Maps;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;
import java.util.HashMap;
/***
* 添加和修改限流规则促发日志保存
* @author gingerchen
* @date 2021/8/6 10:15
*/
@Aspect
@Service
public class RateLimitPolicyMqLogAspect
private static final Logger LOG = LoggerFactory.getLogger(RateLimitPolicyMqLogAspect.class);
/**
* gatewayConfig mq topic
*/
@Value("$rocketmq_topic_fql_test")
private String topic;
private final RateLimitPolicyLogMapper rateLimitPolicyLogMapper;
private final RateLimitPolicyManager rateLimitPolicyManager;
private final GatewayConfigManager configManager;
/**
* 新增记录
*/
private static final String ASPECT_INSERT_EXECUTION = "execution(* com.fenqile.platform.manage.apigateway.service.impl.RateLimitPolicyServiceImpl.insert*(..))";
/**
* 修改记录 / 修改记录状态
*/
private static final String ASPECT_UPDATE_EXECUTION = "execution(* com.fenqile.platform.manage.apigateway.service.impl.RateLimitPolicyServiceImpl.update*(..))";
@Autowired
public RateLimitPolicyMqLogAspect(RateLimitPolicyLogMapper rateLimitPolicyLogMapper,
RateLimitPolicyManager rateLimitPolicyManager,
GatewayConfigManager configManager)
this.rateLimitPolicyLogMapper = rateLimitPolicyLogMapper;
this.rateLimitPolicyManager = rateLimitPolicyManager;
this.configManager = configManager;
@Pointcut(ASPECT_INSERT_EXECUTION + "||" + ASPECT_UPDATE_EXECUTION)
public void pointCut()
//切点
@AfterReturning(value = "pointCut()")
public void handleMqLog()
Integer index = RateLimitPolicyThreadLocal.getRecordIndex();
RateLimitPolicyThreadLocal.clear();
if (index == null)
return;
RateLimitPolicyLog recordLog = new RateLimitPolicyLog();
try
//查询原纪录
RateLimitPolicy record = rateLimitPolicyManager.get(index);
//保存操作流水记录
BeanUtils.copyProperties(record, recordLog, "index");
recordLog.setSourceIndex(record.getIndex());
rateLimitPolicyLogMapper.insertSelective(recordLog);
LOG.info("保存的限流规则配置流水记录为|recordLog=", recordLog);
//推送mq
HashMap<String, Object> body = Maps.newHashMap();
body.put("type", GatewayConstant.CONFIG_MESSAGE_ID_TYPE_RATE_LIMIT_POLICY);
body.put("id", record.getIndex());
configManager.pushMq(body, record.getEnv(), topic);
catch (Exception e)
LOG.error("限流规则配置流水记录或mq推送异常", e);
3、RateLimitPolicyServiceImpl 层
package com.fenqile.platform.manage.apigateway.service.impl;
import com.alibaba.dubbo.common.api.LsfApi;
import com.alibaba.dubbo.rpc.service.BusinessException;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fenqile.platManage.common.constant.ResultCode;
import com.fenqile.platform.manage.apigateway.domain.hystrixconfig.RateLimitPolicy;
import com.fenqile.platform.manage.apigateway.domain.urlinterfacerule.UrlInterfaceRule;
import com.fenqile.platform.manage.apigateway.manager.RateLimitPolicyManager;
import com.fenqile.platform.manage.apigateway.model.hystrixconfig.QueryRateLimitPolicyReq;
import com.fenqile.platform.manage.apigateway.model.hystrixconfig.RateLimitPolicyVO;
import com.fenqile.platform.manage.apigateway.model.urlinterfacerule.UrlInterfaceRuleVo;
import com.fenqile.platform.manage.apigateway.service.RateLimitPolicyService;
import com.fenqile.platform.manage.apigateway.service.threadlocal.RateLimitPolicyThreadLocal;
import com.fenqile.platform.manage.common.constant.GatewayConstant;
import com.fenqile.platform.manage.common.constant.ServiceConstant;
import com.fenqile.platform.manage.common.enumeration.BizErrorCodeEnum;
import com.fenqile.platform.manage.common.model.DataResp;
import com.fenqile.platform.manage.common.model.UpdateStatusReq;
import com.fenqile.platform.manage.common.utils.AprClient;
import com.fenqile.platform.manage.enums.BizTypeEnum;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tk.mybatis.mapper.entity.Example;
import java.util.Date;
import java.util.List;
import static com.fenqile.platform.manage.common.constant.GatewayConstant.*;
import static com.fenqile.platform.manage.common.constant.GatewayConstant.AUTH_TYPE_TEXT;
/**
* 限流规则服务实现类
* @author gingerchen
* @date 2021/8/6 10:35
*/
@Service("rateLimitPolicyService")
public class RateLimitPolicyServiceImpl implements RateLimitPolicyService
private static final Logger LOG = LoggerFactory.getLogger(RateLimitPolicyServiceImpl.class);
private final RateLimitPolicyManager rateLimitPolicyManager;
@Autowired
public RateLimitPolicyServiceImpl(RateLimitPolicyManager rateLimitPolicyManager)
this.rateLimitPolicyManager = rateLimitPolicyManager;
/**
* 条件查询
*
* @param req 查询条件: status 和index
* @return pageInfo
*/
@Override
public PageInfo<RateLimitPolicy> getRateLimitPolicyList(QueryRateLimitPolicyReq req)
LOG.info("查询rateLimitPolicy|入参req=", req);
Example example = new Example(RateLimitPolicy.class);
Example.Criteria criteria = example.createCriteria();
if (StringUtils.isNotBlank(req.getRateId()))
criteria.andEqualTo(GatewayConstant.FIELD_ROUTE_ID, req.getRateId());
if (req.getEnv() != null)
criteria.andEqualTo(GatewayConstant.FIELD_ENV, req.getEnv());
if (req.getControlStatus() != null
&& req.getControlStatus() != GatewayConstant.GATEWAY_QUERY_STATUS_ALL)
criteria.andEqualTo(GatewayConstant.FIELD_CONTROL_STATUS, req.getControlStatus());
//普通用户可以查询所有记录;可以按照接口管理人来查询
if (StringUtils.isNotBlank(req.getMin()))
criteria.andLike(GatewayConstant.FIELD_MIN, "%" + req.getMin() + "%");
if(req.getStatus() == null)
req.setStatus(1);
//默认只查询有效记录
criteria.andEqualTo(GatewayConstant.FIELD_STATUS, req.getStatus());
example.setOrderByClause(ServiceConstant.SQL_MODIFY_TIME_DESC);
List<RateLimitPolicy> pageList = null;
PageHelper.startPage(req.getPageIndex(), req.getPageSize());
try
pageList = rateLimitPolicyManager.selectByExample(example);
catch (Exception e)
LOG.error("查询rateLimitPolicy异常|入参req=", req, e);
throw new BusinessException("查询rateLimitPolicy异常");
return new PageInfo<>(pageList);
@Override
public DataResp<RateLimitPolicy> getRateLimitPolicyByPrimaryKey(Integer index)
LOG.info("根据ID查询RateLimitPolicy|入参index=", index);
DataResp<RateLimitPolicy> resp = new DataResp<>(ResultCode.NORMAL);
RateLimitPolicy record;
try
record = rateLimitPolicyManager.get(index);
catch (Exception e)
LOG.error("根据ID查询RateLimitPolicy异常|入参index=", index, e);
return resp.setCode(ResultCode.ERROR);
return resp.setData(record);
@Override
public DataResp<RateLimitPolicy> insertRateLimitPolicy(RateLimitPolicyVO vo)
LOG.info("新增RateLimitPolicy数据记录|入参vo=", vo);
DataResp<RateLimitPolicy> resp = new DataResp<>(ResultCode.INSERT_NORMAL);
//参数校验
if (verifyEmptyFields(vo))
LOG.info("参数为空|vo=", vo);
return resp.setCode(ResultCode.EMPTY_FIELD);
// 2021-04-26,必须要有operator
String operator = vo.getOperate();
if (StringUtils.isBlank(operator))
LOG.error(">>> no operator for params=", operator);
throw new BusinessException(BizErrorCodeEnum.INVALID_PARAM_ERROR.getMsg());
//校验3:url重复
if (verifyDuplicateRecord(vo))
LOG.info("新增限流路由规则:限流规则重复");
return resp.setCode(ResultCode.DUPLICATE_URL);
RateLimitPolicy record = new RateLimitPolicy();
BeanUtils.copyProperties(vo, record);
//设定默认值
setDefaultVal(record);
// 2021-08-17,新增应用默认status=-1,表示待审批
String env = LsfApi.getCurEnv();
boolean isStable = false;
if (StringUtils.isNotBlank(env))
if (env.contains("stable") || env.contains("dev"))
isStable = true;
record.setStatus(isStable ? 1 : -1);
try
int id = rateLimitPolicyManager.add(record);
LOG.info("插入记录成功|record=", record);
// 2021-08-17,提交审批,审批通过后修改状态为生效
if (!isStable)
String logicId = LOP_APPROVAL_LOGIC_RATE_LIMIT_ID_PREFIX + operator + "-" + id;
try
JSONArray attachInfoArr = new JSONArray();
attachInfoArr.add(GATEWAY_APPROVAL_ATTACH_INFO + operator);
JSONObject conditionJson = new JSONObject();
conditionJson.put(AUTH_TYPE, AUTH_TYPE_TEXT);
AprClient.submitApprovalRequest(operator, logicId, attachInfoArr.toJSONString(),
conditionJson, "网关限流规则上线", BizTypeEnum.GATEWAY_URL);
catch (Exception e)
LOG.error("error on submit approval info=", logicId);
throw new BusinessException(BizErrorCodeEnum.DEFAULT_ERROR_CODE.getMsg());
catch (Exception e)
LOG.error("插入记录异常|record=", record, e);
throw new BusinessException(ResultCode.INSERT_ERRO);
//绑定操作记录到threadLocal
RateLimitPolicyThreadLocal.setConfigInfo(record.getIndex());
return resp.setData(record);
private boolean verifyDuplicateRecord(RateLimitPolicyVO vo)
Example example = new Example(RateLimitPolicy.class);
Example.Criteria criteria = example.createCriteria();
criteria.andEqualTo(GatewayConstant.FIELD_ROUTE_ID, vo.getRouteId());
criteria.andNotEqualTo(GatewayConstant.FIELD_STATUS, 0);
criteria.andEqualTo(GatewayConstant.FIELD_ENV, vo.getEnv());
if (vo.getIndex() != null)
criteria.andNotEqualTo(GatewayConstant.FIELD_INDEX, vo.getIndex());
return rateLimitPolicyManager.selectCountByExample(example) > 0;
private void setDefaultVal(RateLimitPolicy record)
record.setCreateTime(new Date());
record.setModifyTime(new Date());
if(record.getRouteId()!=null)
record.setRouteIdHash(record.getRouteId().hashCode());
private boolean verifyEmptyFields(RateLimitPolicyVO vo)
return vo.getBurstCapacity() == null || vo.getReplenishRate() == null || vo.getOpen() == null
|| vo.getRouteId() == null;
@Override
public DataResp<RateLimitPolicy> updateRateLimitPolicy(RateLimitPolicyVO vo)
LOG.info("更新RateLimitPolicy数据记录|入参vo=", vo);
DataResp<RateLimitPolicy> resp = new DataResp<>(ResultCode.UPDATE_NORMAL);
//参数校验
if (verifyEmptyFields(vo) || vo.getIndex() == null)
return resp.setCode(ResultCode.EMPTY_FIELD);
//校验3:url重复
if (verifyDuplicateRecord(vo))
LOG.info("新增限流路由规则:限流规则重复");
return resp.setCode(ResultCode.DUPLICATE_URL);
RateLimitPolicy record = new RateLimitPolicy();
BeanUtils.copyProperties(vo, record, "createTime", "env");
record.setModifyTime(new Date());
if(record.getRouteId()!=null)
record.setRouteIdHash(record.getRouteId().hashCode());
try
rateLimitPolicyManager.update(record);
LOG.info("更新记录成功|record=", record);
catch (Exception e)
LOG.error("更新记录失败|入参req=", vo,e);
throw new BusinessException(ResultCode.UPDATE_ERRO);
//绑定操作记录到threadLocal
RateLimitPolicyThreadLocal.setConfigInfo(record.getIndex());
return resp.setData(record);
@Override
public DataResp<RateLimitPolicy> updateRateLimitPolicyStatus(UpdateStatusReq req)
LOG.info("修改记录状态|入参req=", req);
DataResp<RateLimitPolicy> resp = new DataResp<>(ResultCode.UPDATE_NORMAL);
if (req.getIndex() == null || req.getStatus() == null)
return resp.setCode(ResultCode.INVALID_PARAM);
RateLimitPolicy model = new RateLimitPolicy();
model.setModifyTime(new Date());
model.setIndex(req.getIndex());
model.setStatus(req.getStatus());
try
rateLimitPolicyManager.update(model);
LOG.info("修改记录状态|model=", model);
catch (Exception e)
LOG.error("修改记录状态异常|入参req=", req, e);
throw new BusinessException(ResultCode.UPDATE_ERRO);
RateLimitPolicy record = rateLimitPolicyManager.get(req.getIndex());
//绑定操作记录到threadLocal
RateLimitPolicyThreadLocal.setConfigInfo(record.getIndex());
return resp;
到此 ThreadLocal 与 @Aspect 的组合使用(线程绑定)介绍完成。
以上是关于ThreadLocal 与 @Aspect 的组合使用(线程绑定)的主要内容,如果未能解决你的问题,请参考以下文章
分析Threadlocal内部实现原理,并解决Threadlocal的ThreadLocalMap的hash冲突与内存泄露