Hystrix入门指南

Posted gaoyanqing

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hystrix入门指南相关的知识,希望对你有一定的参考价值。

Introduction

 

1、Where does the name come from?

hystrix对应的中文名字是“豪猪”,豪猪周身长满了刺,能保护自己不受天敌的伤害,代表了一种防御机制,这与hystrix本身的功能不谋而合,因此Netflix团队将该框架命名为Hystrix,并使用了对应的卡通形象做作为logo。

2、What Is Hystrix?

在一个分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等,如何能够保证在一个依赖出问题的情况下,不会导致整体服务失败,这个就是Hystrix需要做的事情。Hystrix提供了熔断、隔离、Fallback、cache、监控等功能,能够在一个、或多个依赖同时出现问题时保证系统依然可用。

 

   

3、Hello Hystrix

 1 public class CommandHelloWorld extends HystrixCommand<String> {
 2  
 3     private final String name;
 4  
 5     public CommandHelloWorld(String name) {
 6         super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); //必须
 7         this.name = name;
 8     }
 9  
10     @Override
11     protected String run() {
12         /*
13          网络调用 或者其他一些业务逻辑,可能会超时或者抛异常
14         */
15         return "Hello " + name + "!";
16     }
17 }
18   
19 String s = new CommandHelloWorld("Bob").execute(); //
20 Future<String> s = new CommandHelloWorld("Bob").queue();
21 Observable<String> s = new CommandHelloWorld("Bob").observe();
22 Observable<String> s = new CommandHelloWorld("Bob").toObservable()

    说明:

    • execute() — blocks, then returns the single response received from the dependency (or throws an exception in case of an error)
    • queue() — returns a Future with which you can obtain the single response from the dependency
    • observe() — subscribes to the Observable that represents the response(s) from the dependency and returns an Observable that replicates that source Observable
    • toObservable() — returns an Observable that, when you subscribe to it, will execute the Hystrix command and emit its responses
 

4、Flow Chart

 

 说明:

  1. Construct a HystrixCommand or HystrixObservableCommand Object
  2. Execute the Command
  3. Is the Response Cached?
  4. Is the Circuit Open?
  5. Is the Thread Pool/Queue/Semaphore Full?
  6. HystrixObservableCommand.construct() or HystrixCommand.run()
  7. Calculate Circuit Health
  8. Get the Fallback
  9. Return the Successful Response

 常用功能介绍

依赖隔离

一个用户请求的成功执行,肯能依赖数十上百个外部服务,如果没有隔离,单个依赖的失败,可能会印象其他依赖的正常执行。如下图所示,为每个依赖配置了单独线程池

   

在下图中,当Dep I 出现问题时,DepA 和Dep M大以来可以正常执行

 线程池隔离的使用例子

 1 public class CommandHelloWorld extends HystrixCommand<String> {
 2  
 3     private final String name;
 4  
 5     public CommandHelloWorld(String name) {
 6         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
 7                 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可选,默认 使用 this.getClass().getSimpleName();
 8                 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)));
 9          
10         this.name = name;
11     }
12  
13     @Override
14     protected String run() throws InterruptedException {
15         System.out.println("running");
16         TimeUnit.MILLISECONDS.sleep(1000);
17         return "Hello " + name + "!";
18     }
19      
20 }

线程池常用参数设置:

实现类:HystrixThreadPoolProperties

名称
类型
含义
默认值
coreSize
Integer
线程池大小 10
maxQueueSize
Integer
队列大小,一经初始化后不能修改 -1
queueSizeRejectionThreshold
Integer
队列reject阈值,可以动态修改
maxQueueSize>0是生效,一般设置为小于
maxQueueSizede 的数值
 
5

 

 

 

 

 

 

 

 

 

Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
                .withExecutionTimeoutInMilliseconds(500))  //超时时间
 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可选,默认 使用 this.getClass().getSimpleName();
 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)
        .withMaxQueueSize(10).withQueueSizeRejectionThreshold(7))

 

Q: 怎么设置线程池大小?

  A:Qps* Tp99 +冗余线程

信号量隔离

线程池隔离中,发起请求的线程和真实执行的线程不是同一个线程,使用信号量隔离时,它们是同一个线程, 两种隔离的区别如下图:

 1 public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> {
 2  
 3     private final int id;
 4     private long start,end ;
 5  
 6     public CommandUsingSemaphoreIsolation(int id) {
 7         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
 8                 // since we\'re doing an in-memory cache lookup we choose SEMAPHORE isolation
 9                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
10                         .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) //设置使用信号量隔离策略
11                                 .withExecutionIsolationSemaphoreMaxConcurrentRequests(3)  //设置信号量隔离时的最大并发请求数
12                                 .withFallbackIsolationSemaphoreMaxConcurrentRequests(5)     //设置fallback的最大并发数
13                         .withExecutionTimeoutInMilliseconds(300)));   //设置超时时间
14         this.id = id;
15         this.start = System.currentTimeMillis();
16     }
17  
18     @Override
19     protected String run() throws InterruptedException {
20         // a real implementation would retrieve data from in memory data structure
21         TimeUnit.MILLISECONDS.sleep(id*30);
22         System.out.println("running normal, id="+id);
23         return "ValueFromHashMap_" + id;
24     }
25  
26     @Override
27     protected String getFallback(){
28         System.out.println(" fallback, id="+id);
29         return "fallback:"+id;
30     }
31  
32 }
33   
34 @Test
35 public void maxCurrentRequst() throws InterruptedException {
36     int count =10;
37     while (count >0){
38         int id = count--;
39         new Thread(() -> {
40             try {
41                 new CommandUsingSemaphoreIsolation(id).execute();
42             }catch (Exception ex){
43                 System.out.println("Exception:"+ex.getMessage()+" id="+id);
44             }
45         }).start();
46     }
47  
48     TimeUnit.SECONDS.sleep(100);
49 }
50  //注:使用信号量隔离,在同一个线程中即使循环调用new CommandUsingSemaphoreIsolation(id).queue(),run方法也是顺序执行;

//控制台输出

fallback, id=10
fallback, id=9
fallback, id=5
fallback, id=8
fallback, id=1
Exception:CommandUsingSemaphoreIsolation fallback execution rejected. id=4
Exception:CommandUsingSemaphoreIsolation fallback execution rejected. id=7
running normal, id=2
running normal, id=3
running normal, id=6

Q: 什么时候使用线程池隔离,什么使用信号量隔离?

A:  线程池隔离缺点是带来一定的开销,但不会阻塞请求线程,适合于于IO密集型的任务

信号量隔离使用用户请求线程,没有格外线程切换开销,使用与执行时间和执行逻辑都比较短的本地计算。比如CPU密集型的任务

Fallback

Q1: 为什么需要fallback?

简单来说,在依赖调用失败时,我们一般会需要提供降级方案,Hystrix对此提供了支持

降级 
 1 public class CommandHelloWorld extends HystrixCommand<String> {
 2  
 3     private final String name;
 4  
 5     public CommandHelloWorld(String name) {
 6         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
 7                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
 8                         .withExecutionTimeoutInMilliseconds(500))  //超时时间
 9                 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可选,默认 使用 this.getClass().getSimpleName();
10                 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4)));
11  
12         this.name = name;
13     }
14  
15     @Override
16     protected String run() throws InterruptedException {
17         System.out.println("running");
18         TimeUnit.MILLISECONDS.sleep(1000);
19         return "Hello " + name + "!";
20     }
21  
22     @Override
23     protected String getFallback() {
24         return "Hello "+"Fallback";
25     }
26 }
27   
28 @Test
29 public void fallbackTest(){
30     assertEquals("Hello Fallback",new CommandHelloWorld("World").execute());
31 }

 Q2:什么情况下会触发fallback?

简单来说,就是run方法抛异常,超时,线程/信号量reject、短路

Failure Type
Exception class
Exception.cause
subject to fallback
FAILURE HystrixRuntimeException underlying exception (user-controlled) YES
TIMEOUT HystrixRuntimeException j.u.c.TimeoutException YES
SHORT_CIRCUITED HystrixRuntimeException j.l.RuntimeException YES
THREAD_POOL_REJECTED HystrixRuntimeException j.u.c.RejectedExecutionException YES
SEMAPHORE_REJECTED HystrixRuntimeException j.l.RuntimeException YES
BAD_REQUEST HystrixBadRequestException underlying exception (user-controlled) NO

 

 

 

 

 

 

以下为测试的主程序:

 1 public class CommandHelloFailure extends HystrixCommand<String> {
 2  
 3     private final String name;
 4  
 5     public CommandHelloFailure(String name) {
 6         super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
 7                 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
 8                         .withExecutionTimeoutInMilliseconds(1000))  //超时时间
 9                 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))
10                 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(3)));
11  
12         this.name = name;
13     }
14  
15     @Override
16     protected String run() throws InterruptedException {
17         String theadName = this.getThreadPoolKey().name();
18         String cmdKey=this.getThreadPoolKey().name();
19         System.out.println("running begin , threadPool="+theadName+" cmdKey="+cmdKey+" name="+name);
20  
21         if("Exception".equals(name)) {
22             throw new RuntimeException("this command always fails");
23         }else if("Timeout".equals(name)){
24             TimeUnit.SECONDS.sleep(2);
25         }else if("Reject".equals(name)){
26             TimeUnit.MILLISECONDS.sleep(800);
27         }
28         System.out.println(" run end");
29  
30         return "Hello " + name + "!";
31     }
32  
33     @Override
34     protected String getFallback() {
35         StringBuilder sb = new StringBuilder("running fallback");
36         boolean isRejected = isResponseRejected();
37         boolean isException = isFailedExecution();
38         boolean isTimeout= isResponseTimedOut();
39         boolean isCircut = isCircuitBreakerOpen();
40  
41         sb.append(", isRejected:").append(isRejected);
42         sb.append(", isException:"+isException);
43         if(isException){
44             sb.append(" msg=").append(getExecutionException().getMessage());
45         }
46         sb.append(",  isTimeout: "+isTimeout);
47         sb.append(",  isCircut:"+isCircut);
48  
49         sb.append(", group:").append(this.getCommandGroup().name());
50         sb.append(", threadpool:").append(getThreadPoolKey().name());
51         System.out.println(sb.toString());
52  
53         String msg="Hello Failure " + name + "!";
54         return msg;
55     }
56 }

 

FAILURE 

测试由异常导致的fallback

1 @Test
2 public void expTest() {
3     assertEquals("Hello Failure Exception!", new CommandHelloFailure("Exception").execute());
4 }
5   
//控制台输出

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Exception
running fallback, isRejected:false, isException:true msg=this command always fails, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool

 

TIMEOUT

测试有超时导致的fallback

 
@Test
public void timeOutTest() {
    assertEquals("Hello Failure Timeout!", new CommandHelloFailure("Timeout").execute());
}
  
//控制台输出

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Timeout
running fallback, isRejected:false, isException:false, isTimeout: true, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool

 

THREAD_POOL_REJECTED

并发执行的任务数超过线程池和队列之和会被reject,导致fallback

1 @Test
2 public void rejectTest() throws InterruptedException {
3     int count = 5;
4     while (count-- > 0){
5         new CommandHelloFailure("Reject").queue();
6         TimeUnit.MILLISECONDS.sleep(100);
7     }
8 }

//控制台输出

running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject
running begin , threadPool=ExampleGroup-pool cmdKey=ExampleGroup-pool name=Reject
running fallback, isRejected:true, isException:false, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool
running fallback, isRejected:true, isException:false, isTimeout: false, isCircut:false, group:ExampleGroup, threadpool:ExampleGroup-pool

SEMAPHORE_REJECTED  与 THREAD_POOL_REJECTED 类似,不再演示

 

SHORT_CIRCUITED

在一定时间内,用户请求超过一定的比例失败时(timeout, failure, reject),断路器就会打开;短路器打开后所有请求直接走fallback

参数设置

名称
类型
含义
默认值
circuitBreakerEnabled Boolean 是否启用断路器 true
circuitBreakerErrorThresholdPercentage Integer 错误百分比,超过该值打开断路器 50
circuitBreakerForceClosed Boolean 强制断路器打开 false
circuitBreakerForceOpen Boolean 强制短路器关闭 false
circuitBreakerRequestVolumeThreshold Integer 10s中内最少的请求量,大于该值,断路器配置才会生效 20
circuitBreakerSleepWindowInMilliseconds Integer 短路器打开后多长时间尝试关闭(Half open) 5s

 

 

 

 

 

 

 

一般配置如下:

1 Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))  //必须
2         .andCommandPropertiesDefaults(HystrixCommandProperties.Setter()
3                 .withExecutionTimeoutInMilliseconds(50)//超时时间
4                 .withCircuitBreakerRequestVolumeThreshold(5)
5                 .withCircuitBreakerSleepWindowInMilliseconds(1000)
6                 .withCircuitBreakerErrorThresholdPercentage(50))
7         .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool"))  //可选,默认 使用 this.getClass().getSimpleName();
8         .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4));

以上配置的含义是: 在10s内,如果请求在5个及以上,且有50%失败的情况下,开启断路器;断路器开启1000ms后尝试关闭

短路器的工作机制,引用自官方文档:

The precise way that the circuit opening and closing occurs is as follows:
Assuming the volume across a circuit meets a certain threshold (HystrixCommandProperties.circuitBreakerRequestVolumeThreshold())...
And assuming that the error percentage exceeds the threshold error percentage (HystrixCommandProperties.circuitBreakerErrorThresholdPercentage())...
Then the circuit-breaker transitions from CLOSED to OPEN.
While it is open, it short-circuits all requests made against that circuit-breaker.
After some amount of time (HystrixCommandProperties.circuitBreakerSleepWindowInMilliseconds()), the next single request is let through (this is the HALF-OPEN state). If the request fails, the circuit-breaker returns to the OPEN state for the duration of the sleep window. If the request succeeds, the circuit-breaker transitions to CLOSED and the logic in 1. takes over again.

Q3:fallback时我们应该怎么办?

一般有以下几种策略:

1、不实现getFallback方法:依赖调用失败时直接抛出异常

2、实现getFallback方法,返回默认值:这是一种常见的策略

3、实现getFallback方法,走降级方案

此外,生产环境中,fallback时,一般需要打点记录

请求合并

简单来说,就是将一段时间内的多次请求合并为一次请求,常用于网络IO中,能减少IO次数,缺点是增加平均延迟

以下是测试代码主程序:

 1 public class CommandCollapserGetValueForKey extends HystrixCollapser<List<String>, String, Integer> {
 2  
 3     private final Integer key;
 4  
 5     public CommandCollapserGetValueForKey(Integer key) {
 6         super(Setter.withCollapserKey(HystrixCollapserKey.Factory.asKey("Collapser"))
 7                 .andCollapserPropertiesDefaults(HystrixCollapserProperties.Setter()
 8                         .withMaxRequestsInBatch(3)
 9                 .withTimerDelayInMilliseconds(10)));
10         this.key = key;
11     }
12  
13     @Override
14     public Integer getRequestArgument() {
15         return key;
16     }
17  
18     @Override
19     protected HystrixCommand<List<String>> createCommand(final Collection<CollapsedRequest<String, Integer>> requests) {
20         return new BatchCommand(requests);
21     }
22  
23     @Override
24     protected void mapResponseToRequests(List<String> batchResponse, Collection<CollapsedRequest<String, Integer>> requests) {
25         int count = 0;
26         for (CollapsedRequest<String, Integer> request : requests) {
27             request.setResponse(batchResponse.get(count++));
28         }
29     }
30  
31     private static final class BatchCommand extends HystrixCommand<List<String>> {
32         private final Collection<CollapsedRequest<String, Integer>> requests;
33  
34         private BatchCommand(Collection<CollapsedRequest<String, Integer>> requests) {
35             super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"))
36                     .andCommandKey(HystrixCommandKey.Factory.asKey("GetValueForKey")));
37             this.requests = requests;
38         }
39  
40         @Override
41         protected List<String> run() {
42             System.out.println("BatchCommand run  "+requests.size());
43             ArrayList<String> response = new ArrayList<String>();
44             for (CollapsedRequest<String, Integer> request : requests) {
45                 // artificial response for each argument received in the batch
46                 response.add("ValueForKey: " + request.getArgument());
47             }
48             return response;
49         }
50     }
51 }
52   
53   
54 @Test
55 Hystrix入门指南

Hystrix入门指南

Hystrix入门教程

hystrix入门-原生API使用

二.Hystrix开发实践入门

Spring-cloud Hystrix入门