Hystrix入门指南
Posted gaoyanqing
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Hystrix入门指南相关的知识,希望对你有一定的参考价值。
Introduction
1、Where does the name come from?
hystrix对应的中文名字是“豪猪”,豪猪周身长满了刺,能保护自己不受天敌的伤害,代表了一种防御机制,这与hystrix本身的功能不谋而合,因此Netflix团队将该框架命名为Hystrix,并使用了对应的卡通形象做作为logo。
2、What Is Hystrix?
在一个分布式系统里,许多依赖不可避免的会调用失败,比如超时、异常等,如何能够保证在一个依赖出问题的情况下,不会导致整体服务失败,这个就是Hystrix需要做的事情。Hystrix提供了熔断、隔离、Fallback、cache、监控等功能,能够在一个、或多个依赖同时出现问题时保证系统依然可用。
3、Hello Hystrix
1 public class CommandHelloWorld extends HystrixCommand<String> { 2 3 private final String name; 4 5 public CommandHelloWorld(String name) { 6 super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")); //必须 7 this.name = name; 8 } 9 10 @Override 11 protected String run() { 12 /* 13 网络调用 或者其他一些业务逻辑,可能会超时或者抛异常 14 */ 15 return "Hello " + name + "!"; 16 } 17 } 18 19 String s = new CommandHelloWorld("Bob").execute(); // 20 Future<String> s = new CommandHelloWorld("Bob").queue(); 21 Observable<String> s = new CommandHelloWorld("Bob").observe(); 22 Observable<String> s = new CommandHelloWorld("Bob").toObservable()
说明:
execute()
— blocks, then returns the single response received from the dependency (or throws an exception in case of an error)queue()
— returns aFuture
with which you can obtain the single response from the dependencyobserve()
— subscribes to theObservable
that represents the response(s) from the dependency and returns anObservable
that replicates that sourceObservable
toObservable()
— returns anObservable
that, when you subscribe to it, will execute the Hystrix command and emit its responses
4、Flow Chart
说明:
- Construct a HystrixCommand or HystrixObservableCommand Object
- Execute the Command
- Is the Response Cached?
- Is the Circuit Open?
- Is the Thread Pool/Queue/Semaphore Full?
- HystrixObservableCommand.construct() or HystrixCommand.run()
- Calculate Circuit Health
- Get the Fallback
- Return the Successful Response
常用功能介绍
依赖隔离
一个用户请求的成功执行,肯能依赖数十上百个外部服务,如果没有隔离,单个依赖的失败,可能会印象其他依赖的正常执行。如下图所示,为每个依赖配置了单独线程池
在下图中,当Dep I 出现问题时,DepA 和Dep M大以来可以正常执行
1 public class CommandHelloWorld extends HystrixCommand<String> { 2 3 private final String name; 4 5 public CommandHelloWorld(String name) { 6 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) //必须 7 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool")) //可选,默认 使用 this.getClass().getSimpleName(); 8 .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4))); 9 10 this.name = name; 11 } 12 13 @Override 14 protected String run() throws InterruptedException { 15 System.out.println("running"); 16 TimeUnit.MILLISECONDS.sleep(1000); 17 return "Hello " + name + "!"; 18 } 19 20 }
线程池常用参数设置:
实现类:HystrixThreadPoolProperties
名称
|
类型
|
含义
|
默认值
|
---|---|---|---|
coreSize |
Integer |
线程池大小 | 10 |
maxQueueSize |
Integer |
队列大小,一经初始化后不能修改 | -1 |
queueSizeRejectionThreshold |
Integer |
队列reject阈值,可以动态修改
maxQueueSize>0是生效,一般设置为小于 maxQueueSizede 的数值 |
5 |
Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) //必须 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() .withExecutionTimeoutInMilliseconds(500)) //超时时间 .andThreadPoolKey(HystrixThreadPoolKey.Factory.asKey("ExampleGroup-pool")) //可选,默认 使用 this.getClass().getSimpleName(); .andThreadPoolPropertiesDefaults(HystrixThreadPoolProperties.Setter().withCoreSize(4) .withMaxQueueSize(10).withQueueSizeRejectionThreshold(7))
Q: 怎么设置线程池大小?
A:Qps* Tp99 +冗余线程
信号量隔离
线程池隔离中,发起请求的线程和真实执行的线程不是同一个线程,使用信号量隔离时,它们是同一个线程, 两种隔离的区别如下图:
1 public class CommandUsingSemaphoreIsolation extends HystrixCommand<String> { 2 3 private final int id; 4 private long start,end ; 5 6 public CommandUsingSemaphoreIsolation(int id) { 7 super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("ExampleGroup")) 8 // since we\'re doing an in-memory cache lookup we choose SEMAPHORE isolation 9 .andCommandPropertiesDefaults(HystrixCommandProperties.Setter() 10 .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.SEMAPHORE) //设置使用信号量隔离策略 11 .withExecutionIsolationSemaphoreMaxConcurrentRequests(3) //设置信号量隔离时的最大并发请求数 12 .withFallbackIsolationSemaphoreMaxConcurrentRequests(5) //设置fallback的最大并发数 13 .withExecutionTimeoutInMilliseconds(300))); //设置超时时间 14 this.id = id; 15 this.start = System.currentTimeMillis(); 16 } 17 18 @Override 19 protected String run() throws InterruptedException { 20 // a real implementation would retrieve data from in memory data structure 21 TimeUnit.MILLISECONDS.sleep(id*30); 22 System.out.println("running normal, id="+id); 23 return "ValueFromHashMap_" + id; 24 } 25 26 @Override 27 protected String getFallback(){ 28 System.out.println(" fallback, id="+id); 29 return "fallback:"+id; 30 } 31 32 } 33 34 @Test 35 public void maxCurrentRequst() throws InterruptedException { 36 int count =10; 37 while (count >0){ 38 int id = count--; 39 new Thread(() -> { 40 try { 41 new CommandUsingSemaphoreIsolation(id).execute(); 42 }catch (Exception ex){ 43 System.out.println("Exception:"+ex.getMessage()+" id="+id); 44 } 45 }).start(); 46 } 47 48 TimeUnit.SECONDS.sleep(100); 49 } 50 //注:使用信号量隔离,在同一个线程中即使循环调用new CommandUsingSemaphoreIsolation(id).queue(),run方法也是顺序执行;
//控制台输出
fallback, id=10
fallback, id=9
fallback, id=5
fallback, id=8
fallback, id=1
Exception:CommandUsingSemaphoreIsolation fallback execution rejected. id=4
Exception:CommandUsingSemaphoreIsolation fallback execution rejected. id=7
running normal, id=2
running normal, id=3
running normal, id=6
Q: 什么时候使用线程池隔离,什么使用信号量隔离?
A: 线程池隔离缺点是带来一定的开销,但不会阻塞请求线程,适合于于IO密集型的任务
信号量隔离使用用户请求线程,没有格外线程切换开销,使用与执行时间和执行逻辑都比较短的本地计算。比如CPU密集型的任务
Fallback
Q1: 为什么需要fallback?
简单来说,在依赖调用失败时,我们一般会需要提供降级方案,Hystrix对此提供了支持