自定义springboot组件--基于nacos和spring-cloud-loadbalancer实现灰度发布

Posted Instanceztt

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了自定义springboot组件--基于nacos和spring-cloud-loadbalancer实现灰度发布相关的知识,希望对你有一定的参考价值。

一 灰度发布

灰度发布也叫金丝雀发布,起源是,矿井工人发现,金丝雀对瓦斯气体很敏感,矿工会在下井之前,先放一只金丝雀到井中,如果金丝雀不叫了,就代表瓦斯浓度高.

在灰度发布开始后,先启动一个新版本应用,但是并不直接将流量切过来,而是测试人员对新版本进行线上测试,启动的这个新版本应用,就是我们的金丝雀。如果没有问题,那么可以将少量的用户流量导入到新版本上,然后再对新版本做运行状态观察,收集各种运行时数据,如果此时对新旧版本做各种数据对比,就是所谓的A/B测试。

当确认新版本运行良好后,再逐步将更多的流量导入到新版本上,在此期间,还可以不断地调整新旧两个版本的运行的服务器副本数量,以使得新版本能够承受越来越大的流量压力。直到将100%的流量都切换到新版本上,最后关闭剩下的老版本服务,完成灰度发布。

如果在灰度发布过程中(灰度期)发现了新版本有问题,就应该立即将流量切回老版本上,这样,就会将负面影响控制在最小范围内。

二 实现思路

2.1 基本思路

Spring Cloud LoadBalancer是一个客户端负载均衡器,类似于Ribbon,但是由于Ribbon已经进入维护模式,并且Ribbon 2并不与Ribbon 1相互兼容,所以Spring Cloud全家桶在Spring Cloud Commons项目中,添加了Spring cloud Loadbalancer作为新的负载均衡器,并且做了向前兼容;

本文主要是基于Spring Cloud LoadBalancer,重写Spring Cloud LoadBalancer默认的客户端负载均衡器,实现灰度均衡

我们可以看到Spring Cloud LoadBalancer提供的负载均衡策略比较少,内置轮询随机的负载均衡策略,默认轮询策略,本文主要覆写轮询策略的解析请求的请求头获得当前请求的版本后,从nacos匹配到相对应的版本实例实现灰度版本的负载均衡.

2.2 代码实现

2.2.1 自定义灰度负载均衡策略

重写轮询策略,实现自定义的灰度策略

自定义灰度策略继承轮询策略,重写choose方法,解析当前请求头中版本号与nacos元数据中的版本号比对,匹配后返回相应的服务实例.

/**
 *  自定义灰度校验策略
 * @author likun
 * @date 2022年06月23日 16:08
 */
@Slf4j
public class GrayRoundRobinLoadBalancer extends RoundRobinLoadBalancer 
    final String serviceId;
    ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider;
    public GrayRoundRobinLoadBalancer(ObjectProvider<ServiceInstanceListSupplier> serviceInstanceListSupplierProvider, String serviceId) 
        super(serviceInstanceListSupplierProvider, serviceId);
        this.serviceId=serviceId;
        this.serviceInstanceListSupplierProvider=serviceInstanceListSupplierProvider;
    


    @Override
    public Mono<Response<ServiceInstance>> choose(Request request) 
        //从当前请求头中获得请求的版本号
        ServiceInstanceListSupplier supplier = (ServiceInstanceListSupplier)this.serviceInstanceListSupplierProvider.getIfAvailable(NoopServiceInstanceListSupplier::new);
        return supplier.get(request).next().map((serviceInstances) -> this.getInstanceResponse(request, serviceInstances));
    

    public Response<ServiceInstance> getInstanceResponse(Request request, List<ServiceInstance> serviceInstances)
        // 注册中心没有可用的实例
        if (CollUtil.isEmpty(serviceInstances))
            log.warn("No servers available for service",serviceId);
            return new EmptyResponse();
        
        if (request==null||request.getContext()==null)
            return super.choose(request).block();
        
        DefaultRequestContext requestContext = (DefaultRequestContext) request.getContext();

        if (!(requestContext.getClientRequest() instanceof RequestData))
            return super.choose(request).block();
        

        RequestData requestData = (RequestData) requestContext.getClientRequest();

        String version = requestData.getHeaders().getFirst(CommonConstants.VERSION);

        if (StrUtil.isBlank(version))
            return super.choose(request).block();
        
        // 判断nacos中有没有相对应的版本号
        List<ServiceInstance> serviceInstanceList = serviceInstances.stream().filter(serviceInstance -> 
            NacosServiceInstance nacosServiceInstance = (NacosServiceInstance) serviceInstance;
            // 获得当前配置中的元数据信息
            Map<String, String> metadata = nacosServiceInstance.getMetadata();

            String targetVersion = MapUtil.getStr(metadata, CommonConstants.VERSION);
            return version.equalsIgnoreCase(targetVersion);
        ).collect(Collectors.toList());

        if (CollUtil.isNotEmpty(serviceInstanceList))
            // 从匹配到的结果中随机的返回一个
            ServiceInstance serviceInstance = RandomUtil.randomEle(serviceInstanceList);
            return new DefaultResponse(serviceInstance);
        else 
            return super.choose(request).block();
        
    

注册自定义灰度轮询策略

继承LoadBalancerClientConfiguration 配置覆盖掉原有的轮询策略的注册

@Configuration(
        proxyBeanMethods = false
)
@ConditionalOnDiscoveryEnabled
public class GrayLoadBalancerClientConfiguration extends LoadBalancerClientConfiguration 
    /**
     * 注入自定义的灰度策略
     * @param environment
     * @param loadBalancerClientFactory
     * @return
     */
    @Override
    @Bean
    @ConditionalOnMissingBean
    public ReactorLoadBalancer<ServiceInstance> reactorServiceInstanceLoadBalancer(Environment environment, LoadBalancerClientFactory loadBalancerClientFactory) 
        String name = environment.getProperty(LoadBalancerClientFactory.PROPERTY_NAME);
        return new GrayRoundRobinLoadBalancer(loadBalancerClientFactory.getLazyProvider(name, ServiceInstanceListSupplier.class), name);
    

客户端负载均衡策略配置

@Configuration
// 配置自定义的负载均衡策略
@LoadBalancerClients(defaultConfiguration = GrayLoadBalancerClientConfiguration.class)
@ConditionalOnProperty(name = "gray.rule.enabled",matchIfMissing = true,havingValue = "true")
public class XlcpGrayAutoConfiguration 


2.2.2 feign接口中版本号的传递

通过fegin的内置拦截器,保证版本号的传递

public class GrayFeigeInterceptor implements RequestInterceptor 

    @Override
    public void apply(RequestTemplate requestTemplate) 
        String version = WebUtils.getRequest().getHeader(CommonConstants.VERSION);
        if (StrUtil.isBlank(version))
            version= HeaderVersionHolder.getVersion();
        
        requestTemplate.header(CommonConstants.VERSION,version);
    

通过阿里的ttl保证子线程中version不丢失

/**
 * 解决线程之间version的传递
 * @author likun
 * @date 2022年06月23日 17:06
 */
public class HeaderVersionHolder 

    public static TransmittableThreadLocal<String> VERSION = new TransmittableThreadLocal<String>();

    public static void setVersion(String version)
        VERSION.set(version);
    

    public static String getVersion()
        return VERSION.get();
    

    public static void remove()
        VERSION.remove();
    



拦截器注册到spring容器中

@Configuration
@ConditionalOnWebApplication(type = ConditionalOnWebApplication.Type.SERVLET)
@ConditionalOnProperty(name = "gray.rule.enabled",matchIfMissing = true,havingValue = "true")
@AutoConfigureAfter(XlcpGrayAutoConfiguration.class)
public class GrayFeignAutoConfiguration 
    @Bean
    public GrayFeigeInterceptor grayFeigeInterceptor()
        return new GrayFeigeInterceptor();
    

2.2.3 定义spring.factories

三 客户端调用


idea开启多实例
本地虚拟集群


客户端调用

2.1 客户端直接调用

 @GetMapping("/testGray")
    @Inner(value = false)
    public R testGray()
        Environment environment = SpringContextHolder.getBean(Environment.class);
        String port = environment.getProperty("server.port");
        log.info("我被调用了,我的端口是:",port);
        
        return R.ok("我被调用了,我的端口是:"+port);
    

调用结果

2.2 内部feign调用


自定义springboot组件--基于模板模式对原生springboot的rabbitmq组件进行扩展

一 引入

在我们的日常开发中,消息中间件已经成为了java研发工程师的一项必备技能,本文主要是基于对springboot原生组件的扩展开发,基于模板设计模式和静态代理模式,简化了队列路由的绑定,交由公共模板进行统一的绑定,并在公用模板中保证了消息的幂等性和消息的可靠性投递,将这些类似的代码抽离出来,让开发者只专注于业务逻辑的开发.

整体实现思路:

  • 开发者申明路由交换机等基础元数据后,交由元数据解析器完成交换机,路由的申明及相应关系的绑定
  • 开发者申明消息监听器后,通过消息侦听容器完成队列与监听器的绑定
  • 开发者通过消息发射器,发布消息后,系统通过不通模板完成消息的发送

二 逻辑实现

2.1 元数据解析器的构建

申明基础信息接口

/**
 *  用于定义交换机队列等核心参数 便于队列交换机等初始化
 * @author likun
 * @date 2022/6/17 11:22
 */
public interface MessageMetaData 
    /**
     * 获取队列名称
     * @return
     */
    String getQueue();

    /**
     * 交换机类型
     * @return
     */
    ExchangeTypeEnum getExchangeType();

    /**
     * 队列配置
     * @return
     */
    default Map<String,Object> getQueueArgs()
        return null;
    ;

    /**
     * 交换机配置
     * @return
     */
    default Map<String,Object> getExchangeArgs()
        return null;
    ;

    /**
     * 消息扩展属性
     * @return
     */
    default MessageProperties getMessageProperties()
        return null;
    ;

    default void setMessageProperties(MessageProperties messageProperties)

    ;


定义不同的元数据模板

public abstract class FanoutMessageMetaData implements MessageMetaData 

    private MessageProperties messageProperties = null;

    @Override
    public ExchangeTypeEnum getExchangeType() 
        return ExchangeTypeEnum.FANOUT;
    

    /**
     * 交换机名称
     * @return
     */
    abstract public String getExchange();


    @Override
    public MessageProperties getMessageProperties() 
        return this.messageProperties;
    

    @Override
    public void setMessageProperties(MessageProperties messageProperties) 
        this.messageProperties=messageProperties;
    


public abstract class DirectMessageMetadata implements MessageMetaData
    private MessageProperties messageProperties;
    @Override
    public ExchangeTypeEnum getExchangeType() 
        return ExchangeTypeEnum.DIRECT;
    

    /**
     * 交换机名称
     * @return
     */
    abstract public String getExchange();

    @Override
    public MessageProperties getMessageProperties() 
        return this.messageProperties;
    

    @Override
    public void setMessageProperties(MessageProperties messageProperties) 
        this.messageProperties=messageProperties;
    

申明元数据解析接口,定义解析规范

/**
 * 判断是否支持当前交换机类型
 * @author likun
 * @date 2022/6/17 11:44
 */
public interface Support 
    Boolean support(ExchangeTypeEnum exchangeTypeEnum);


/**
 *  核心参数解析器 解析核心参数并完成队列交换机的绑定
 * @author likun
 * @date 2022/6/17 11:42
 */
public interface MessageMetaDataResolver extends Support 
    /**
     * 解析核心参数
     * @param messageMetaData
     */
    void resolve(MessageMetaData messageMetaData);

定义不同模板的解析器

@RequiredArgsConstructor
@Slf4j
public abstract class AbstractMessageMetadataResolver implements MessageMetaDataResolver 
    private final RabbitAdmin rabbitAdmin;


    @Override
    public void resolve(MessageMetaData messageMetaData) 
        if (support(messageMetaData.getExchangeType()))
            doResolve(messageMetaData);
        
    

    /**
     * 下游子类实现
     * @param messageMetaData
     */
    abstract void doResolve(MessageMetaData messageMetaData);

    /**
     * 申明队列
     */
    public void declareQueue(Queue queue)
        rabbitAdmin.declareQueue(queue);
        log.info("queue [] declared.",queue);
    

    public void declareExchange(Exchange exchange)
        rabbitAdmin.declareExchange(exchange);
        log.info("exchange [] declared.",exchange);
    

    public void declareBinding(Binding binding)
        rabbitAdmin.declareBinding(binding);
        log.info("binding [] declared.",binding);
    


public class DirectMessageMetadataResolver extends AbstractMessageMetadataResolver

    public DirectMessageMetadataResolver(RabbitAdmin rabbitAdmin) 
        super(rabbitAdmin);
    

    @Override
    public Boolean support(ExchangeTypeEnum exchangeTypeEnum) 
        return ExchangeTypeEnum.DIRECT.equals(exchangeTypeEnum);
    

    @Override
    void doResolve(MessageMetaData messageMetaData) 
        DirectMessageMetadata directMessageMetadata = (DirectMessageMetadata) messageMetaData;

        // 申明队列
        Queue queue = new Queue(directMessageMetadata.getQueue(), true, false, false, directMessageMetadata.getQueueArgs());
        declareQueue(queue);

        // 申明交换机
        DirectExchange exchange = new DirectExchange(directMessageMetadata.getExchange(), true, false, directMessageMetadata.getExchangeArgs());
        declareExchange(exchange);

        // 交换机绑定队列
        Binding binding = BindingBuilder.bind(queue).to(exchange).withQueueName();
        declareBinding(binding);
    


public class FanoutMessageMetaDateResolver extends AbstractMessageMetadataResolver
    public FanoutMessageMetaDateResolver(RabbitAdmin rabbitAdmin) 
        super(rabbitAdmin);
    

    @Override
    public Boolean support(ExchangeTypeEnum exchangeTypeEnum) 
        return ExchangeTypeEnum.FANOUT.equals(exchangeTypeEnum);
    

    @Override
    void doResolve(MessageMetaData messageMetaData) 
        FanoutMessageMetaData FanoutMessageMetaData = (FanoutMessageMetaData) messageMetaData;
        // 申明队列
        Queue queue = new Queue(FanoutMessageMetaData.getQueue(), true, false, false, FanoutMessageMetaData.getQueueArgs());
        declareQueue(queue);

        // 申明交换机
        FanoutExchange exchange = new FanoutExchange(FanoutMessageMetaData.getExchange(), true, false, FanoutMessageMetaData.getExchangeArgs());
        declareExchange(exchange);

        // 队列绑定交换机
        Binding binding = BindingBuilder.bind(queue).to(exchange);
        declareBinding(binding);

    


申明静态代理器

/**
 * 委派解析器
 * @author likun
 * @date 2022年06月17日 14:05
 */
@Slf4j
public class DelegatingMessageMetaDataResolver implements MessageMetaDataResolver 

    private final Map<ExchangeTypeEnum,MessageMetaDataResolver> messageMetaDataResolverMap = new ConcurrentHashMap<>();

    public DelegatingMessageMetaDataResolver(Map<ExchangeTypeEnum,MessageMetaDataResolver> messageMetaDataResolverMap)
       this.messageMetaDataResolverMap.putAll(messageMetaDataResolverMap);
    

    @Override
    public Boolean support(ExchangeTypeEnum exchangeTypeEnum) 
        return true;
    

    @Override
    public void resolve(MessageMetaData messageMetaData) 
        if (messageMetaData==null)
            log.error("metaDate resolve must have messageMetaData but is null");
            return;
        
        MessageMetaDataResolver messageMetaDataResolver = messageMetaDataResolverMap.get(messageMetaData.getExchangeType());
        if (messageMetaDataResolver==null)
            log.error("messageMetaDataResolver is null");
            return;
        
        messageMetaDataResolver.resolve(messageMetaData);
    

元数据初始化器

/**
 * 元数据核心参数初始化器
 * @author likun
 * @date 2022年06月17日 15:43
 */
@RequiredArgsConstructor
@Slf4j
public class MessageMetaDataInitalizer implements InitializingBean, Ordered 

    private final XlcpMqProperties xlcpMqProperties;

    private final MessageMetaDataResolver messageMetaDataResolver;

    @Override
    public void afterPropertiesSet() throws Exception 
        log.info("autoCreateMq configured with []",xlcpMqProperties.getAutoCreateMq());
        if (xlcpMqProperties.getAutoCreateMq())
            Map<String, MessageMetaData> messageMetaDataMap = SpringContextHolder.getBeansOfType(MessageMetaData.class);
            messageMetaDataMap.forEach((key,messageMetaData)->
                log.info("Mq auto declared with metaDate []",messageMetaData);
                messageMetaDataResolver.resolve(messageMetaData);
            );
        
    


    @Override
    public int getOrder() 
        return Ordered.LOWEST_PRECEDENCE;
    

将静态代理器和初始器交由spring容器管理

2.2 动态消息监听器构建

申明抽象消息监听器

@Slf4j
public abstract class DynamicMessageListener<T> extends AbstractAdaptableMessageListener 
    private final MessageMetaData messageMetaData;

    public static final String MQ_MESSAGE_ID_PREFIX = "mq:message:id";

    public DynamicMessageListener(MessageMetaData messageMetaData)
        this.messageMetaData=messageMetaData;
    

    public void doExcute(Message message, Channel channel, Consumer<Object> consumer) throws IOException 

        MessageProperties messageProperties=message.getMessageProperties();
        String messageId=messageProperties.getMessageId();

        // 校验当前消息是否被消费
        RedisTemplate redisTemplate=SpringContextHolder.getBean(RedisTemplate.class);

        redisTemplate.setKeySerializer(new StringRedisSerializer());

        String mqMessageMqKey = String.format("%s%s%s",MQ_MESSAGE_ID_PREFIX, StrPool.COLON,messageId);

        if (redisTemplate.hasKey(mqMessageMqKey))
            ackOk(message,channel);
            log.info("当前消息已经被消费了,消息id为:",messageId);
            return;
        
        MessageConverter messageConverter=getMessageConverter();
        Object convetMessag= messageConverter.fromMessage(message);
        Boolean ackFalg = Boolean.FALSE;
        try 
            consumer.accept(convetMessag);
            // 存入redis中确保消息已经被消费了 保存30分钟
            redisTemplate.opsForValue().set(mqMessageMqKey,"",30, TimeUnit.MINUTES);
         catch (Exception exception) 
            exception.printStackTrace();
            // 签收失败将消息重新投递到队列中
            ackFalg=onError(message,channel);
        finally 
            if (!ackFalg) 
                ackOk(message, channel);
            
        

    

    protected Boolean onError(Message message,Channel channel) throws IOException 
        channel.basicReject(message.getMessageProperties().getDeliveryTag(),true);
        return Boolean.TRUE;
    

    protected void ackOk(Message message,Channel channel) throws IOException 
        channel.basicAck(message.getMessageProperties().getDeliveryTag(), false);
    

    public MessageMetaData getMessageMetaData()
        return messageMetaData;
    

申明单消息监听器模板和批量消息监听模板

public abstract class SingleDynamicMessageListener<T> extends DynamicMessageListener<T>
    public SingleDynamicMessageListener(MessageMetaData messageMetaData) 
        super(messageMetaData);
    

    public abstract void onMessage(T t);

    @Override
    public void onMessage(Message message, Channel channel) throws Exception 
        doExcute(message,channel,object->
            onMessage((T)object);
        );
    




public abstract class BatchDynamicMessageListener<T> extends DynamicMessageListener<T> 

    public BatchDynamicMessageListener(MessageMetaData messageMetaData) 
        super(messageMetaData);
    

    @Override
    public void onMessage(Message message, Channel channel) throws Exception 
        MessageConverter messageConverter=getMessageConverter();
        doExcute(message,channel,(obj)->
            if (message instanceof Collection)
                onMessageBatch((Collection<T>) obj);
            else 
                ArrayList<T> list = new ArrayList<>();
                list.add((T)message);
                onMessageBatch(list);
            
        );
    

    /**
     * 批量消息
     * @param collection
     */
    abstract void onMessageBatch(Collection<T> collection);

申明动态容器监听器

@RequiredArgsConstructor
public class DynamicMessageListenerContainer extends SimpleRabbitListenerContainerFactory 
    private final int DEFAULT_PREFETCH_COUNT = 1;

    private final ConnectionFactory connectionFactory;

    /**
     * 后缀
     */
    public static final String LISTENER_CONTAINER_SUFFIX = "SimpleMessageListenerContainer";

    private int prefetchCount = DEFAULT_PREFETCH_COUNT;

    public SimpleMessageListenerContainer createListenerContainer(DynamicMessageListener dynamicMessageListener, MessageMetaData messageMetaData)
        SimpleMessageListenerContainer simpleMessageListenerContainer=new SimpleMessageListenerContainer();
        simpleMessageListenerContainer.setQueueNames(messageMetaData.getQueue());
        simpleMessageListenerContainer.setMessageListener(dynamicMessageListener);
        simpleMessageListenerContainer.setConnectionFactory(connectionFactory);
        simpleMessageListenerContainer.setPrefetchCount(prefetchCount);
        //设置当前消息为手动签收
        simpleMessageListenerContainer.setAcknowledgeMode(AcknowledgeMode.MANUAL);
        return simpleMessageListenerContainer;
    

    public void以上是关于自定义springboot组件--基于nacos和spring-cloud-loadbalancer实现灰度发布的主要内容,如果未能解决你的问题,请参考以下文章

springboot自动装配---实现一个自定义自动装配组件

nacos使用-服务注册中心和配置中心

SpringBoot2 整合Nacos组件,环境搭建和入门案例详解

SpringBoot SpringCloud Nacos等一些组件版本对应

SpringBoot2.x系列教程(七十)Spring Boot Actuator集成及自定义Endpoint详解

基于springBoot + VUE三 || 项目环境搭建-Nacos