ThreadLocal 与 @Aspect 的组合使用(线程绑定)

Posted 张志翔 ̮

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了ThreadLocal 与 @Aspect 的组合使用(线程绑定)相关的知识,希望对你有一定的参考价值。

        最近项目中看到了一种 ThreadLocal 与 @Aspect 的组合使用方式,也就是平常我们所说的线程绑定,我认为是很有意思的一种方式,特此记录便于日后查阅。

        1、RateLimitPolicyThreadLocal 层

package com.fenqile.platform.manage.apigateway.service.threadlocal;

/**
 * 限流上下文存储工具
 * @author gingerchen
 * @date 2021/8/6 10:28
 */
public class RateLimitPolicyThreadLocal 
    private RateLimitPolicyThreadLocal() 
    

    private static ThreadLocal<Integer> threadLocal = new ThreadLocal<>();

    public static Integer getRecordIndex() 
        return threadLocal.get();
    

    public static void clear() 
        threadLocal.remove();
    

    public static void setConfigInfo(Integer id) 
        if (threadLocal.get() == null) 
            threadLocal.set(id);
        
    

        2、RateLimitPolicyMqLogAspect 层

package com.fenqile.platform.manage.apigateway.service.aspect;

import com.fenqile.platform.manage.apigateway.dao.hystrixconfig.RateLimitPolicyLogMapper;
import com.fenqile.platform.manage.apigateway.dao.hystrixconfig.RateLimitPolicyMapper;
import com.fenqile.platform.manage.apigateway.domain.hystrixconfig.RateLimitPolicy;
import com.fenqile.platform.manage.apigateway.domain.hystrixconfig.RateLimitPolicyLog;
import com.fenqile.platform.manage.apigateway.manager.RateLimitPolicyManager;
import com.fenqile.platform.manage.apigateway.service.impl.GatewayConfigManager;
import com.fenqile.platform.manage.apigateway.service.threadlocal.RateLimitPolicyThreadLocal;
import com.fenqile.platform.manage.common.constant.GatewayConstant;
import com.google.common.collect.Maps;
import org.aspectj.lang.annotation.AfterReturning;
import org.aspectj.lang.annotation.Aspect;
import org.aspectj.lang.annotation.Pointcut;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Service;

import java.util.HashMap;

/***
 * 添加和修改限流规则促发日志保存
 * @author gingerchen
 * @date 2021/8/6 10:15
 */
@Aspect
@Service
public class RateLimitPolicyMqLogAspect 

    private static final Logger LOG = LoggerFactory.getLogger(RateLimitPolicyMqLogAspect.class);

    /**
     * gatewayConfig mq topic
     */
    @Value("$rocketmq_topic_fql_test")
    private String topic;

    private final RateLimitPolicyLogMapper rateLimitPolicyLogMapper;

    private final RateLimitPolicyManager rateLimitPolicyManager;

    private final GatewayConfigManager configManager;

    /**
     * 新增记录
     */
    private static final String ASPECT_INSERT_EXECUTION = "execution(* com.fenqile.platform.manage.apigateway.service.impl.RateLimitPolicyServiceImpl.insert*(..))";

    /**
     * 修改记录 / 修改记录状态
     */
    private static final String ASPECT_UPDATE_EXECUTION = "execution(* com.fenqile.platform.manage.apigateway.service.impl.RateLimitPolicyServiceImpl.update*(..))";

    @Autowired
    public RateLimitPolicyMqLogAspect(RateLimitPolicyLogMapper rateLimitPolicyLogMapper,
                                      RateLimitPolicyManager rateLimitPolicyManager,
                                      GatewayConfigManager configManager) 
        this.rateLimitPolicyLogMapper = rateLimitPolicyLogMapper;
        this.rateLimitPolicyManager = rateLimitPolicyManager;
        this.configManager = configManager;
    

    @Pointcut(ASPECT_INSERT_EXECUTION + "||" + ASPECT_UPDATE_EXECUTION)
    public void pointCut() 
        //切点
    

    @AfterReturning(value = "pointCut()")
    public void handleMqLog() 
        Integer index = RateLimitPolicyThreadLocal.getRecordIndex();
        RateLimitPolicyThreadLocal.clear();

        if (index == null) 
            return;
        

        RateLimitPolicyLog recordLog = new RateLimitPolicyLog();

        try 
            //查询原纪录
            RateLimitPolicy record = rateLimitPolicyManager.get(index);

            //保存操作流水记录
            BeanUtils.copyProperties(record, recordLog, "index");
            recordLog.setSourceIndex(record.getIndex());

            rateLimitPolicyLogMapper.insertSelective(recordLog);

            LOG.info("保存的限流规则配置流水记录为|recordLog=", recordLog);

            //推送mq
            HashMap<String, Object> body = Maps.newHashMap();
            body.put("type", GatewayConstant.CONFIG_MESSAGE_ID_TYPE_RATE_LIMIT_POLICY);
            body.put("id", record.getIndex());
            configManager.pushMq(body, record.getEnv(), topic);

         catch (Exception e) 
            LOG.error("限流规则配置流水记录或mq推送异常", e);
        
    

        3、RateLimitPolicyServiceImpl 层

package com.fenqile.platform.manage.apigateway.service.impl;

import com.alibaba.dubbo.common.api.LsfApi;
import com.alibaba.dubbo.rpc.service.BusinessException;
import com.alibaba.fastjson.JSONArray;
import com.alibaba.fastjson.JSONObject;
import com.fenqile.platManage.common.constant.ResultCode;
import com.fenqile.platform.manage.apigateway.domain.hystrixconfig.RateLimitPolicy;
import com.fenqile.platform.manage.apigateway.domain.urlinterfacerule.UrlInterfaceRule;
import com.fenqile.platform.manage.apigateway.manager.RateLimitPolicyManager;
import com.fenqile.platform.manage.apigateway.model.hystrixconfig.QueryRateLimitPolicyReq;
import com.fenqile.platform.manage.apigateway.model.hystrixconfig.RateLimitPolicyVO;
import com.fenqile.platform.manage.apigateway.model.urlinterfacerule.UrlInterfaceRuleVo;
import com.fenqile.platform.manage.apigateway.service.RateLimitPolicyService;
import com.fenqile.platform.manage.apigateway.service.threadlocal.RateLimitPolicyThreadLocal;
import com.fenqile.platform.manage.common.constant.GatewayConstant;
import com.fenqile.platform.manage.common.constant.ServiceConstant;
import com.fenqile.platform.manage.common.enumeration.BizErrorCodeEnum;
import com.fenqile.platform.manage.common.model.DataResp;
import com.fenqile.platform.manage.common.model.UpdateStatusReq;
import com.fenqile.platform.manage.common.utils.AprClient;
import com.fenqile.platform.manage.enums.BizTypeEnum;
import com.github.pagehelper.PageHelper;
import com.github.pagehelper.PageInfo;
import org.apache.commons.lang3.StringUtils;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.BeanUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import tk.mybatis.mapper.entity.Example;

import java.util.Date;
import java.util.List;

import static com.fenqile.platform.manage.common.constant.GatewayConstant.*;
import static com.fenqile.platform.manage.common.constant.GatewayConstant.AUTH_TYPE_TEXT;

/**
 * 限流规则服务实现类
 * @author gingerchen
 * @date 2021/8/6 10:35
 */
@Service("rateLimitPolicyService")
public class RateLimitPolicyServiceImpl implements RateLimitPolicyService 
    private static final Logger LOG = LoggerFactory.getLogger(RateLimitPolicyServiceImpl.class);

    private final RateLimitPolicyManager rateLimitPolicyManager;


    @Autowired
    public RateLimitPolicyServiceImpl(RateLimitPolicyManager rateLimitPolicyManager) 
        this.rateLimitPolicyManager = rateLimitPolicyManager;
    

    /**
     * 条件查询
     *
     * @param req 查询条件: status 和index
     * @return pageInfo
     */
    @Override
    public PageInfo<RateLimitPolicy> getRateLimitPolicyList(QueryRateLimitPolicyReq req) 
        LOG.info("查询rateLimitPolicy|入参req=", req);

        Example example = new Example(RateLimitPolicy.class);
        Example.Criteria criteria = example.createCriteria();

        if (StringUtils.isNotBlank(req.getRateId())) 
            criteria.andEqualTo(GatewayConstant.FIELD_ROUTE_ID, req.getRateId());
        

        if (req.getEnv() != null) 
            criteria.andEqualTo(GatewayConstant.FIELD_ENV, req.getEnv());
        

        if (req.getControlStatus() != null
                && req.getControlStatus() != GatewayConstant.GATEWAY_QUERY_STATUS_ALL) 
            criteria.andEqualTo(GatewayConstant.FIELD_CONTROL_STATUS, req.getControlStatus());
        

        //普通用户可以查询所有记录;可以按照接口管理人来查询
        if (StringUtils.isNotBlank(req.getMin())) 
            criteria.andLike(GatewayConstant.FIELD_MIN, "%" + req.getMin() + "%");
        
        if(req.getStatus() == null) 
            req.setStatus(1);
        
        //默认只查询有效记录
        criteria.andEqualTo(GatewayConstant.FIELD_STATUS, req.getStatus());
        example.setOrderByClause(ServiceConstant.SQL_MODIFY_TIME_DESC);

        List<RateLimitPolicy> pageList = null;
        PageHelper.startPage(req.getPageIndex(), req.getPageSize());
        try 
            pageList = rateLimitPolicyManager.selectByExample(example);

         catch (Exception e) 
            LOG.error("查询rateLimitPolicy异常|入参req=", req, e);
            throw new BusinessException("查询rateLimitPolicy异常");
        

        return new PageInfo<>(pageList);

    

    @Override
    public DataResp<RateLimitPolicy> getRateLimitPolicyByPrimaryKey(Integer index) 
        LOG.info("根据ID查询RateLimitPolicy|入参index=", index);
        DataResp<RateLimitPolicy> resp = new DataResp<>(ResultCode.NORMAL);

        RateLimitPolicy record;
        try 
            record = rateLimitPolicyManager.get(index);

         catch (Exception e) 
            LOG.error("根据ID查询RateLimitPolicy异常|入参index=", index, e);
            return resp.setCode(ResultCode.ERROR);
        

        return resp.setData(record);
    

    @Override
    public DataResp<RateLimitPolicy> insertRateLimitPolicy(RateLimitPolicyVO vo) 
        LOG.info("新增RateLimitPolicy数据记录|入参vo=", vo);
        DataResp<RateLimitPolicy> resp = new DataResp<>(ResultCode.INSERT_NORMAL);

        //参数校验
        if (verifyEmptyFields(vo)) 
            LOG.info("参数为空|vo=", vo);
            return resp.setCode(ResultCode.EMPTY_FIELD);
        
        // 2021-04-26,必须要有operator
        String operator = vo.getOperate();
        if (StringUtils.isBlank(operator)) 
            LOG.error(">>> no operator for params=", operator);
            throw new BusinessException(BizErrorCodeEnum.INVALID_PARAM_ERROR.getMsg());
        

        //校验3:url重复
        if (verifyDuplicateRecord(vo)) 
            LOG.info("新增限流路由规则:限流规则重复");
            return resp.setCode(ResultCode.DUPLICATE_URL);
        

        RateLimitPolicy record = new RateLimitPolicy();
        BeanUtils.copyProperties(vo, record);
        //设定默认值
        setDefaultVal(record);
        // 2021-08-17,新增应用默认status=-1,表示待审批
        String env = LsfApi.getCurEnv();
        boolean isStable = false;
        if (StringUtils.isNotBlank(env)) 
            if (env.contains("stable") || env.contains("dev")) 
                isStable = true;
            
        
        record.setStatus(isStable ? 1 : -1);
        try 
            int id = rateLimitPolicyManager.add(record);
            LOG.info("插入记录成功|record=", record);
            // 2021-08-17,提交审批,审批通过后修改状态为生效
            if (!isStable) 
                String logicId = LOP_APPROVAL_LOGIC_RATE_LIMIT_ID_PREFIX + operator + "-" + id;
                try 
                    JSONArray attachInfoArr = new JSONArray();
                    attachInfoArr.add(GATEWAY_APPROVAL_ATTACH_INFO + operator);
                    JSONObject conditionJson = new JSONObject();
                    conditionJson.put(AUTH_TYPE, AUTH_TYPE_TEXT);
                    AprClient.submitApprovalRequest(operator, logicId, attachInfoArr.toJSONString(),
                            conditionJson, "网关限流规则上线", BizTypeEnum.GATEWAY_URL);
                 catch (Exception e) 
                    LOG.error("error on submit approval info=", logicId);
                    throw new BusinessException(BizErrorCodeEnum.DEFAULT_ERROR_CODE.getMsg());
                
            
         catch (Exception e) 
            LOG.error("插入记录异常|record=", record, e);
            throw new BusinessException(ResultCode.INSERT_ERRO);
        

        //绑定操作记录到threadLocal
        RateLimitPolicyThreadLocal.setConfigInfo(record.getIndex());

        return resp.setData(record);
    

    private boolean verifyDuplicateRecord(RateLimitPolicyVO vo) 
        Example example = new Example(RateLimitPolicy.class);
        Example.Criteria criteria = example.createCriteria();
        criteria.andEqualTo(GatewayConstant.FIELD_ROUTE_ID, vo.getRouteId());
        criteria.andNotEqualTo(GatewayConstant.FIELD_STATUS, 0);
        criteria.andEqualTo(GatewayConstant.FIELD_ENV, vo.getEnv());
        if (vo.getIndex() != null) 
            criteria.andNotEqualTo(GatewayConstant.FIELD_INDEX, vo.getIndex());
        
        return rateLimitPolicyManager.selectCountByExample(example) > 0;
    


    private void setDefaultVal(RateLimitPolicy record) 
        record.setCreateTime(new Date());
        record.setModifyTime(new Date());
        if(record.getRouteId()!=null) 
            record.setRouteIdHash(record.getRouteId().hashCode());
        
    

    private boolean verifyEmptyFields(RateLimitPolicyVO vo) 
        return vo.getBurstCapacity() == null || vo.getReplenishRate() == null || vo.getOpen() == null
            || vo.getRouteId() == null;
    

    @Override
    public DataResp<RateLimitPolicy> updateRateLimitPolicy(RateLimitPolicyVO vo) 
        LOG.info("更新RateLimitPolicy数据记录|入参vo=", vo);
        DataResp<RateLimitPolicy> resp = new DataResp<>(ResultCode.UPDATE_NORMAL);

        //参数校验
        if (verifyEmptyFields(vo) || vo.getIndex() == null) 
            return resp.setCode(ResultCode.EMPTY_FIELD);
        
        //校验3:url重复
        if (verifyDuplicateRecord(vo)) 
            LOG.info("新增限流路由规则:限流规则重复");
            return resp.setCode(ResultCode.DUPLICATE_URL);
        
        RateLimitPolicy record = new RateLimitPolicy();
        BeanUtils.copyProperties(vo, record, "createTime", "env");
        record.setModifyTime(new Date());
        if(record.getRouteId()!=null) 
            record.setRouteIdHash(record.getRouteId().hashCode());
        

        try 
            rateLimitPolicyManager.update(record);
            LOG.info("更新记录成功|record=", record);

         catch (Exception e) 
            LOG.error("更新记录失败|入参req=", vo,e);
            throw new BusinessException(ResultCode.UPDATE_ERRO);
        

        //绑定操作记录到threadLocal
        RateLimitPolicyThreadLocal.setConfigInfo(record.getIndex());

        return resp.setData(record);
    

    @Override
    public DataResp<RateLimitPolicy> updateRateLimitPolicyStatus(UpdateStatusReq req) 
        LOG.info("修改记录状态|入参req=", req);
        DataResp<RateLimitPolicy> resp = new DataResp<>(ResultCode.UPDATE_NORMAL);

        if (req.getIndex() == null || req.getStatus() == null) 
            return resp.setCode(ResultCode.INVALID_PARAM);
        

        RateLimitPolicy model = new RateLimitPolicy();
        model.setModifyTime(new Date());
        model.setIndex(req.getIndex());
        model.setStatus(req.getStatus());

        try 
            rateLimitPolicyManager.update(model);
            LOG.info("修改记录状态|model=", model);

         catch (Exception e) 
            LOG.error("修改记录状态异常|入参req=", req, e);
            throw new BusinessException(ResultCode.UPDATE_ERRO);
        

        RateLimitPolicy record = rateLimitPolicyManager.get(req.getIndex());

        //绑定操作记录到threadLocal
        RateLimitPolicyThreadLocal.setConfigInfo(record.getIndex());

        return resp;
    


        到此 ThreadLocal 与 @Aspect 的组合使用(线程绑定)介绍完成。

以上是关于ThreadLocal 与 @Aspect 的组合使用(线程绑定)的主要内容,如果未能解决你的问题,请参考以下文章

java中的引用与ThreadLocal

ThreadLocal 应用原理解析与常见问题

ThreadLocal的设计理念与作用

类ThreadLocal的使用与源码分析

ThreadLocal的理解与应用场景分析

分析Threadlocal内部实现原理,并解决Threadlocal的ThreadLocalMap的hash冲突与内存泄露