SpringCloud(Hoxton.SR3)基础篇:第五章Hystrix-request collapsing(请求合并)

Posted 圣痕道心

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SpringCloud(Hoxton.SR3)基础篇:第五章Hystrix-request collapsing(请求合并)相关的知识,希望对你有一定的参考价值。

介绍:

  Hystrix的请求合并就是把重复的请求批量的用一个HystrixCommand命令去执行,以减少通信消耗和线程数的占用。Hystrix的请求合并用到了HystrixCollapser这个抽象类,它在HystrixCommand之前前放置一个合并处理器,将处于一个很短的时间窗(默认10ms)内对同一依赖服务的多个请求进行整合并以批量方式发起请求的功能(服务提供方也需要提供相应的匹配实现接口)。下面我们通过一个例子来看看怎么使用。

示例:

一.首先我们需要一个EurekaServer来作为注册中心。可以参考代码示例搭建一个 springcloud-eureka-service工程

二.新建一个服务提供者工程provider-user

  (1)pom.xml相关依赖

    <parent>
        <groupId>org.springframework.boot</groupId>
        <artifactId>spring-boot-starter-parent</artifactId>
        <version>2.2.6.RELEASE</version>
        <relativePath /> <!-- lookup parent from repository -->
    </parent>
    <properties>
        <project.build.sourceEncoding>UTF-8</project.build.sourceEncoding>
        <project.reporting.outputEncoding>UTF-8</project.reporting.outputEncoding>
        <!-- jdk版本 -->
        <java.version>1.8</java.version>
        <!-- SpringCloud版本号,官方最新稳定版本 -->
        <spring-cloud.version>Hoxton.SR3</spring-cloud.version>
    </properties>
    
    <!--依赖管理,用于管理spring-cloud的依赖 -->
    <dependencyManagement>
        <dependencies>
            <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>${spring-cloud.version}</version>
                <type>pom</type>
                <scope>import</scope>
            </dependency>
        </dependencies>
    </dependencyManagement>

     <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- eureka客户端依赖jar包 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!-- springBoot运维监控,打开eureka健康检查需要的jar依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
    </dependencies>

 

  (2)新建一个User.java。这个model必须要有一个无参的默认构造器,否则后面的实验会报错

package com.qxj.cloud.entity;

import java.math.BigDecimal;

import javax.persistence.Column;
import javax.persistence.Entity;
import javax.persistence.GeneratedValue;
import javax.persistence.GenerationType;
import javax.persistence.Id;

@Entity
public class User {

    public User(Long id, String username) {
        this.id = id;
        this.username = username;
    }

    public User() {
    }

    @Id
    @GeneratedValue(strategy = GenerationType.AUTO)
    private Long id;

    @Column
    private String username;

    @Column
    private String name;

    @Column
    private int age;

    @Column
    private BigDecimal balance;

    setters()&getters();
}

  (3)新建一个提供服务的controller

import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;
import java.util.stream.Collectors;

import org.apache.commons.lang.StringUtils;
import org.springframework.web.bind.annotation.PathVariable;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.qxj.cloud.entity.User;

//RestController =  @Controller + @ResponseBody
@RestController
public class UserController {

    @RequestMapping(value = "/users/{ids}", method = RequestMethod.GET,produces="application/json;charset=UTF-8")
    public List<User> batchUser(@PathVariable("ids") String ids) {
        System.out.println("ids===:" + ids);
        
        List<Long> idList = strArrayConvertToLongList(ids);
        List<User> lists = new ArrayList<User>();
        
        for(int i=0;i<idList.size();i++) {            
            User user = new User(Long.parseLong(i+""),"同学"+i);
            lists.add(user);
        }

        return lists;
    }
    
    /**
     * ids转List<Long>
     * @param ids
     * @return
     */
    private static List<Long> strArrayConvertToLongList(String ids){
        String[] idArray = StringUtils.split(ids,",");
        return Arrays.stream(idArray).map(s -> Long.parseLong(s.trim())).collect(Collectors.toList());
    }

}

  (4)springboot的启动入口类(注意:Controller等注解类都得在springboot入口类的相同包或子包下面才能被扫描)

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;

//Springboot启动入口类
@SpringBootApplication
//eureka客户端注解
@EnableEurekaClient
public class SimpleProviderUserApplication {
    
    public static void main(String[] args) {
        SpringApplication.run(SimpleProviderUserApplication.class, args);
    }
}

  (5)配置文件application.yml

server:
   port: 7900
spring:
   application:
      name: provider-user
#eureka客户端连接配置
eureka:
   client:
      #开启健康检查
      healthcheck:
         enabled: true
      service-url:
      #注册中心地址
         defaultZone: http://user:password123@localhost:8761/eureka
   instance:
      #将ip注册到eureka上
      prefer-ip-address: true
      #微服务向eureka注册实例名${spring.cloud.client.ip-address} 表示ip地址
      instance-id: ${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}}
 

 三.新建一个服务消费者工程consumer-movie-ribbon-with-hystrix-collapser,在这个工程里我们测试hystrix批量服务调用的请求合并功能

  继承HystrixCollapser实现合并请求

  (1)pom.xml相关配置

    <dependencies>
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-web</artifactId>
        </dependency>
        <!-- eureka客户端依赖jar包 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-eureka-client</artifactId>
        </dependency>
        <!-- springBoot运维监控依赖 -->
        <dependency>
            <groupId>org.springframework.boot</groupId>
            <artifactId>spring-boot-starter-actuator</artifactId>
        </dependency>
        <!-- hystrix依赖 -->
        <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-netflix-hystrix</artifactId>
        </dependency>
    </dependencies>

 

 

  (2)新建一个User.java。这个model一定要有无参的构造器

import java.math.BigDecimal;

public class User {
  private Long id;

  private String username;

  private String name;

  private Short age;

  private BigDecimal balance;
  
  //一定要有无参的构造器
  public User(){}
  
  public Long getId() {
    return this.id;
  }

  getter() & setter();

}

 

 

  (3) service负责调用服务

import java.util.List;

import com.qxj.cloud.entity.User;

public interface UserService {
    public User find(Long id);

    public List<User> findAll(List<Long> ids);
}
import java.util.Arrays;
import java.util.List;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import com.qxj.cloud.entity.User;

@Service("userService")
public class UserServiceImpl implements UserService{
    @Autowired
    private RestTemplate restTemplate;
    
    @Override
    public User find(Long id) {
        return restTemplate.getForObject("http://provider-user:7900/users/{1}", User.class,id);
    }

    @Override
    public List<User> findAll(List<Long> ids) {
        System.out.println("finaAll request:---------" + ids + "Thread.currentThread().getName():-------" + Thread.currentThread().getName());
        User[] users = restTemplate.getForObject("http://provider-user:7900/users/{1}", User[].class,StringUtils.join(ids,","));
        return Arrays.asList(users);
    }
    
}

  (4)HystrixCommand命令执行请求

import java.util.ArrayList;
import java.util.List;

import com.netflix.hystrix.HystrixCommand;
import com.netflix.hystrix.HystrixCommandGroupKey;
import com.netflix.hystrix.HystrixCommandKey;
import com.qxj.cloud.entity.User;
import com.qxj.cloud.service.UserService;

/**
 * 自定义HystrixCommand实现
 * @author computer
 *
 */
public class UserBatchCommand extends HystrixCommand<List<User>>{
    
    private UserService userService;
    private List<Long> ids;
    
    //自定义构造函数
    public UserBatchCommand(UserService userService,List<Long> userIds) {
        super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")).
                andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
        this.userService = userService;
        this.ids = userIds;
    }

    @Override
    protected List<User> run() throws Exception {
        return this.userService.findAll(ids);
    }

    @Override
    protected List<User> getFallback() {
        List<User> list = new ArrayList<User>();
        User user = new User();
        user.setId(0L);
        list.add(user);
        return list;
    }
    
}

  (5)HystrixCollapser命令来做请求合并

import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.stream.Collectors;

import com.netflix.hystrix.HystrixCollapser;
import com.netflix.hystrix.HystrixCollapserKey;
import com.netflix.hystrix.HystrixCollapserProperties;
import com.netflix.hystrix.HystrixCommand;
import com.qxj.cloud.entity.User;
import com.qxj.cloud.service.UserService;

/**
 * 通过看HystrixCollapser类的源码: public abstract class
 * HystrixCollapser<BatchReturnType, ResponseType, RequestArgumentType>
 * 我们可以知道List<User>表示:合并后批量请求的返回类型
 * User表示:单个请求返回的类型
 * Long表示:请求参数类型
 * 
 * @author computer
 *
 */
public class UserCollapseCommand extends HystrixCollapser<List<User>, User, Long>{
    
    private UserService userService;

    private Long id;
    

    public UserCollapseCommand(String collapserKey,UserService userService, Long id) {
        //接收Setter对象 设置key 和 合并请求时间100ms
        super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey(collapserKey)).
                andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter().withTimerDelayInMilliseconds(100)));
        this.userService = userService;
        this.id = id;
    }
    
    @Override
    public Long getRequestArgument() {
        // TODO Auto-generated method stub
        return id;
    }

    /**
     * @param collapsedRequests 保存了延迟时间窗中收集到的所有获取单个User的请求。通过获取这些请求的参数来组织
     *                          我们准备的批量请求命令UserBatchCommand实例
     */
    @Override
    protected HystrixCommand<List<User>> createCommand(Collection<CollapsedRequest<User, Long>> requests) {
        List<Long> ids = new ArrayList<Long>(requests.size());
        //CollapsedRequest::getArgument 知识点  lambda表达式的一种简写
        //将请求查询参数id收集到ids集合里
        ids.addAll(requests.stream().map(CollapsedRequest::getArgument).collect(Collectors.toList()));
        return new UserBatchCommand(userService,ids);
    }

    /**
     * 在批量请求命令UserBatchCommand实例被触发执行完成后,该方法开始执行,
     * 在这里我们通过批量结果batchResponse对象,为collapsedRequests中每个合并前的单个请求设置返回结果。来完成批量结果到单个请求结果的转换
     * @param batchResponse 保存了createCommand中组织的批量请求命令的返回结果
     * @param collapsedRequests 代表了每个合并的请求
     */
    @Override
    protected void mapResponseToRequests(List<User> batchResponse, Collection<CollapsedRequest<User, Long>> requests) {
        System.out.println("mapResponseToRequests========>");
        int count = 0;
        //合并请求成功
        if(batchResponse.size() == requests.size()) {
            for(CollapsedRequest<User,Long> collapsedRequest:requests) {
                User user = batchResponse.get(count++);
                collapsedRequest.setResponse(user);
            }
        }else {
            //合并请求失败
            User user = batchResponse.get(count);
            for(CollapsedRequest<User,Long> collapsedRequest:requests) {
                collapsedRequest.setResponse(user);
            }
        }
    }

}

  (6)写一个controller来辅助测试

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.qxj.cloud.command.UserCollapseCommand;
import com.qxj.cloud.entity.User;
import com.qxj.cloud.service.UserService;

@RestController
public class MovieController {

    @Autowired
    private UserService userService;
    
    @RequestMapping(value = "/collapseTest", method = RequestMethod.GET,produces = "application/json;charset=UTF-8")
    public List<User> requestCollapseTest() {
        List<Long> ids = new ArrayList<Long>();
        ids.add(1L);
        ids.add(2L);
        ids.add(3L);
        ids.add(4L);
        ids.add(5L);
        List<User> users = userService.findAll(ids);
        return users;
    }
    
    @RequestMapping(value = "/collapse", method = RequestMethod.GET,produces = "application/json;charset=UTF-8")
    public List<User> requestCollapse() {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        List<User> list = new ArrayList<User>();
        try {
            String collapserKey = "userCollapseCommand";
            Future<User> f1 = new UserCollapseCommand(collapserKey,userService, 1L).queue();
            Future<User> f2 = new UserCollapseCommand(collapserKey,userService, 2L).queue();
            Future<User> f3 = new UserCollapseCommand(collapserKey,userService, 3L).queue();
            
            Thread.sleep(3000);
            
            Future<User> f4 = new UserCollapseCommand(collapserKey,userService, 4L).queue();
            Future<User> f5 = new UserCollapseCommand(collapserKey,userService, 5L).queue();

            User u1 = f1.get();
            User u2 = f2.get();
            User u3 = f3.get();

            User u4 = f4.get();
            User u5 = f5.get();
            
            list.add(u1);
            list.add(u2);
            list.add(u3);
            list.add(u4);
            list.add(u5);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            context.close();
        }
        return list;
    }
}

  (7)springboot的启动入口类

import org.springframework.boot.SpringApplication;
import org.springframework.boot.autoconfigure.SpringBootApplication;
import org.springframework.cloud.client.circuitbreaker.EnableCircuitBreaker;
import org.springframework.cloud.client.loadbalancer.LoadBalanced;
import org.springframework.cloud.netflix.eureka.EnableEurekaClient;
import org.springframework.cloud.netflix.ribbon.RibbonClient;
import org.springframework.context.annotation.Bean;
import org.springframework.web.client.RestTemplate;

@SpringBootApplication
//该注解表明应用既作为eureka实例又为eureka client 可以发现注册的服务
@EnableEurekaClient
//在启动该微服务的时候就能去加载我们的自定义Ribbon配置类
@RibbonClient(name = "provider-user")
//Hystrix启动类注解,允许断路器
@EnableCircuitBreaker
public class ConsumerMovieRibbonApplication {
    
    @Bean
    //使用该注解可以用 http://provider-user:7900/simple/ 虚拟主机名  访问地址
    @LoadBalanced
    public RestTemplate restTemplate() {
        return new RestTemplate();
    }
    
    public static void main(String[] args) {
        SpringApplication.run(ConsumerMovieRibbonApplication.class, args);
    }
}

  (8)application.yml配置信息

server:
  port: 8010
spring:
  application:
      name: consumer-movie-ribbon-with-hystrix-collapser
#eureka客户端连接配置
eureka:
   client:
      healthcheck:
         enabled: true
      service-url:
      #注册中心地址
         defaultZone: http://user:password123@localhost:8761/eureka
   instance:
      #将ip注册到eureka上
      prefer-ip-address: true
      #微服务向eureka注册实例名${spring.cloud.client.ip-address} 表示ip地址 spring2.0以上为ip-address
      instance-id: ${spring.application.name}:${spring.cloud.client.ip-address}:${spring.application.instance_id:${server.port}}

  测试:

 

  1.运行springcloud-eureka-service启动注册中心

  2.运行provider-user启动服务提供者

  3.运行consumer-movie-ribbon-with-hystrix-collapser启动服务消费者

  4.在浏览器输入: http://localhost:8010/collapse

  输出结果:从结果中我们看到前3次请求合并为一个请求,后面2次请求合并为了一个请求

 

 

 

 

 

 

  注解方式实现合并请求

  (1)修改service实现类

import java.util.List;
import java.util.concurrent.Future;

import com.qxj.cloud.entity.User;

public interface PeopleService {
    public Future<User> find(Long id);

    public List<User> findAll(List<Long> ids);
}
import java.util.Arrays;
import java.util.List;
import java.util.concurrent.Future;

import org.apache.commons.lang.StringUtils;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Service;
import org.springframework.web.client.RestTemplate;

import com.netflix.hystrix.contrib.javanica.annotation.HystrixCollapser;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixCommand;
import com.netflix.hystrix.contrib.javanica.annotation.HystrixProperty;
import com.qxj.cloud.entity.User;

@Service("peopleService")
public class PeopleServiceImpl implements PeopleService {

    @Autowired
    private RestTemplate restTemplate;

    //批量调用方法findAll,合并请求时间200ms
    @HystrixCollapser(batchMethod = "findAll", collapserProperties = {
            @HystrixProperty(name = "timerDelayInMilliseconds", value = "200") })
    public Future<User> find(Long id) {
        throw new RuntimeException("This method body should not be executed");
    }

    @HystrixCommand
    public List<User> findAll(List<Long> ids) {
        System.out.println(
                "Annotation---------" + ids + "Thread.currentThread().getName():" + Thread.currentThread().getName());
        User[] users = restTemplate.getForObject("http://provider-user:7900/users/{1}", User[].class,
                StringUtils.join(ids, ","));
        return Arrays.asList(users);
    }
    
}

  (2)修改control层代码

 

import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.Future;

import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.web.bind.annotation.RequestMapping;
import org.springframework.web.bind.annotation.RequestMethod;
import org.springframework.web.bind.annotation.RestController;

import com.netflix.hystrix.strategy.concurrency.HystrixRequestContext;
import com.qxj.cloud.entity.User;
import com.qxj.cloud.service.PeopleService;

@RestController
public class MovieAnnotationController {

    @Autowired
    private PeopleService peopleService;
    
    @RequestMapping(value = "/AnnoCollapseTest", method = RequestMethod.GET,produces = "application/json;charset=UTF-8")
    public List<User> requestCollapseTest() {
        List<Long> ids = new ArrayList<Long>();
        ids.add(1L);
        ids.add(2L);
        ids.add(3L);
        ids.add(4L);
        ids.add(5L);
        List<User> users = peopleService.findAll(ids);
        return users;
    }
    
    @RequestMapping(value = "/AnnoCollapse", method = RequestMethod.GET,produces = "application/json;charset=UTF-8")
    public List<User> requestCollapse() {
        HystrixRequestContext context = HystrixRequestContext.initializeContext();
        List<User> list = new ArrayList<User>();
        try {
            Future<User> f1 = peopleService.find(1L);
            Future<User> f2 = peopleService.find(2L);
            Future<User> f3 = peopleService.find(3L);

            Thread.sleep(3000);

            Future<User> f4 = peopleService.find(4L);
            Future<User> f5 = peopleService.find(5L);
            
            User u1 = f1.get();
            User u2 = f2.get();
            User u3 = f3.get();

            User u4 = f4.get();
            User u5 = f5.get();
            
            list.add(u1);
            list.add(u2);
            list.add(u3);
            list.add(u4);
            list.add(u5);
        } catch (Exception e) {
            e.printStackTrace();
        }finally {
            context.close();
        }
        return list;
    }
}

 

  测试结果

 

 

 

 

 

   参考文献:https://www.cnblogs.com/happyflyingpig/p/8136450.html

 

以上是关于SpringCloud(Hoxton.SR3)基础篇:第五章Hystrix-request collapsing(请求合并)的主要内容,如果未能解决你的问题,请参考以下文章

SpringCloud(Hoxton.SR3)基础篇:第四章Hystrix请求熔断与服务降级

SpringCloud(Hoxton.SR3)基础篇:第五章Hystrix-request collapsing(请求合并)

SpringCloud(Hoxton.SR3)基础篇:第三章Eureka集群 高可用的认证服务实现与搭建

Hystrix熔断

seata分布式事务配置示例

4 年 46 个版本,一文读懂 Spring Cloud 发展历史