Hystrix 译为 "豪猪",豪猪的棘刺能保护自己不受天敌伤害,代表了强大的防御能力。Hystrix 基于 RxJava 进行实现,RxJava 是一种基于观察者模式的响应式编程框架。Spring Cloud Hystrix 基于 Netflix Hystrix 实现,具备服务降级、服务熔断、线程与信号隔离、请求缓存、请求合并以及服务监控等强大功能。本文基于hystrix-core 1.5.18(近年来几乎很少更新,建议升级)。
目录
1. 由来
2. 功能介绍
2.1 HystrixCommand/HystrixObservableCommand
2.2 基本用法
3. 工作原理
3.1 构建命令
3.2 执行命令
3.3 检查缓存
3.4 断路器是否打开
3.5 检查线程池/信号量情况
3.6 执行任务
3.7 断路器健康检查
3.8 失败时执行 Fallback
3.9 返回执行结果
1. 由来
在单体应用中,一类服务、一个线程、一个Bug等局部因素压垮整个系统也是屡见不鲜。微服务中,服务间依赖重重,通过隔离,很好的控制住风险范围,再结合请求拒绝和超时控制,有效剔除 “老鼠屎”,避免坏了一锅粥。总之,隔离设计是绝妙的防护罩。Netflix 将该组件取名为 Hystrix,宣言为 "defend your app",寓意应该是:当系统受到伤害时,能够像豪猪的棘刺一样保护系统。
2. 功能介绍
Hystrix主要提供了以下功能点:
- 熔断器(Circuit Breaker)
- 隔离(Isolation),提供璧仓模式,实现了线程池隔离和信号量隔离
- 回退(fallback),Hystrix会在run()执行过程中出现错误、超时、线程池拒绝、断路器熔断等情况时进行降级处理,有default fallback、单级fallback、多级fallback。
- 请求合并(Request Collapsing),@HystrixCollapser,适用于请求的合并,通过指定时间窗口@HystrixProperty(name = "timerDelayInMilliseconds", value = "50")及@HystrixProperty(name = "maxRequestsInBatch", value = "200")来执行批量方法,暂时不展开讲。
- 请求缓存(Request Caching)
- 仪表盘
2.1 HystrixCommand/HystrixObservableCommand
Hystrix有两个请求命令 HystrixCommand、HystrixObservableCommand。
HystrixCommand用在依赖服务返回单个操作结果的时候:
- execute():同步执行。从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。
- queue():异步执行。直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。
HystrixObservableCommand用在依赖服务返回多个操作结果的时候:
- observe():返回Obervable对象,他代表了操作的多个结果,它是一个Hot Observable
- toObservable():同样返回Observable对象,也代表了操作多个结果,但它返回的是一个Cold Observable。
2.2 基本用法
有两种:手动自定义command和使用注解,手动自定义这里就不介绍了。这里介绍下注解需要两步(对破析源码有用):
第一步:隐式模式(用户不需要做什么,但你要知道),spirng boot会自动加载Feign的配置类HystrixAutoConfiguration(spring-cloud-netflix-core-1.4.4.RELEASE.jar/META-INF/spring.factories)
第二步:应用系统启动类中添加@EnableHystrix,它的作用是将spring.cloud.circuit.breaker.enabled设为true。
Hystrix默认配置都在HystrixCommandProperties类中,更多
###全集配置(设置熔断超时时间)
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=10000
###局部配置
hystrix.command.hello.execution.isolation.thread.timeoutInMilliseconds=10000
public class CommandHelloWorld extends HystrixCommand<String> {
private final String name;
public CommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected String run() throws Exception {
int i = 1/0;
return "Hello " + name + "!";
}
/**
* 降级
*
*/
@Override
protected String getFallback() {
return "faild";
}
}
public class ObservableCommandHelloWorld extends HystrixObservableCommand<String> {
private final String name;
public ObservableCommandHelloWorld(String name) {
super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
this.name = name;
}
@Override
protected Observable<String> construct() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if(!subscriber.isUnsubscribed()) {
subscriber.onNext("Hello");
int i = 1 / 0; //模拟异常
subscriber.onNext(name + "!");
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
/**
* 服务降级
*/
@Override
protected Observable<String> resumeWithFallback() {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if (!subscriber.isUnsubscribed()) {
subscriber.onNext("失败了!");
subscriber.onNext("找大神来排查一下吧!");
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
}).subscribeOn(Schedulers.io());
}
}
public class CommandHelloWorldTest {
@Test
public void testAll() {
//同步
new CommandHelloWorld("World").execute();
//异步
Future<String> fWorld = new CommandHelloWorld("World").queue();
//代码1,Hot Observable不论 “事件源” 是否有“订阅者”都会在创建后对事件进行发布。每一个“订阅者”都有可能从“事件源”的中途开始的,并可能只是看到了整个操作的局部过程
Observable<String> ho = new CommandHelloWorld("World").observe();
ho.subscribe(new Action1<String>() {
@Override
public void call(String s) {
System.out.println("call:" + s);
}
});
//代码2,Cold Observable在没有 “订阅者” 的时候并不会发布,而是进行等待,知道有 “订阅者” 之后才发布事件
Observable<String> co = new CommandHelloWorld("World").toObservable();
System.out.println(co.toBlocking().single());
Observable<String> observable= new ObservableCommandHelloWorld("World").observe();
Iterator<String> iterator = observable.toBlocking().getIterator();
while(iterator.hasNext()) {
System.out.println(iterator.next());
}
}
}
思考:HystrixCommand已具备了observe()和toObservable()的功能,和HystrixObservableCommand有和不同?
是的,但它的实现有一定的局限性,它返回的Observable只能发射一次数据,而HystrixObservableCommand实现的命令可以获取能发多次的Observable。
@HystrixCommand(fallbackMethod = "error")
public String hello() {
return restTemplate.getForEntity("http://serviceName/hello", String.class).getBody();
}
public String error() {
//多级降级
return new FirstLevelFallbackCommand(tag).execute();
}
/**
* LAZY参数表示使用toObservable()方式执行
*/
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY, fallbackMethod = "toObserbableError")
public Observable<String> getUserByName(final String name) {
return Observable.create(new Observable.OnSubscribe<String>() {
@Override
public void call(Subscriber<? super String> subscriber) {
try {
if(!subscriber.isUnsubscribed()) {
subscriber.onNext("找到");
subscriber.onNext(name);
int i = 1/0; 抛异常,模拟服务降级
subscriber.onNext("了");
subscriber.onCompleted();
}
} catch (Exception e) {
subscriber.onError(e);
}
}
});
}
private static class FirstLevelFallbackCommand extends HystrixCommand<String> {
private String tag;
public FirstLevelFallbackCommand(String tag) {
super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("FirstLevelFallbackCommand")));
this.tag = tag;
}
@Override
protected String run() throws Exception {
if ("error".equals(tag)) {
throw new Exception("一级降级失败,二级降级处理");
}
return "成功";
}
@Override
protected String getFallback() {
// 实际项目中,通常会在这里做残缺降级。
System.out.println("二级降级执行成功");
return "成功";
}
}
思考:多级降级的时候,为何将降级command单独做一个线程池?
如果主流程的command都失败了,可能线程池都已经被占满了,降级command必须用自己的独立的线程池。
3. 工作原理
Hystrix 工作原理,当需要完成某项任务时,通过 Hystrix 将任务包裹起来,交由 Hystrix 来完成任务,从而享受 Hystrix 带来保护。这和古代镖局生意有点类似,将任务委托给镖局,以期安全完成任务。官方 Wiki 中对每一步都做了详细的描述,可以直接参考。下面 流程图 来源于 Hystrix Wiki
3.1 构建命令
前面讲过Hystrix 提供了两个Command,可以使用这两个对象来包裹待执行的任务。 注解@HystrixCommand标记方法,Hystrix 将利用AOP自动将目标方法包装成HystrixCommand来执行,也可以继承他们来创建Command。任务委托给 Hystrix 后,Hystrix 可以应用自己的一系列保护机制,在执行用户任务的各节点(执行前、执行后、异常、超时等)做一系列的事情。
3.2 执行命令
有四种方式执行command:
- R execute():同步执行,从依赖服务得到单一结果对象,实现为 queue().get()
- Future queue():异步执行,返回一个 Future 以便获取执行结果,也是单一结果对象,实现为 toObservable().toBlocking().toFuture()
- Observable observe():hot observable,创建Observable后会订阅Observable,可以返回多个结果
- Observable toObservable():cold observable,返回一个Observable,只有订阅时才会执行,可以返回多个结果
public R execute() {
return queue().get();// 利用queue()拿到Future, 执行 get()同步等待拿到执行结果
}
public Future<R> queue() {
// 实现为 toObservable().toBlocking().toFuture()
final Future<R> delegate = toObservable().toBlocking().toFuture();
return delegate;
}
//利用toObservable()得到Observable并直接订阅它,立即执行命令
public Observable<R> observe() {
ReplaySubject<R> subject = ReplaySubject.create();
final Subscription sourceSubscription = toObservable().subscribe(subject);
...
}
3.3 检查缓存
第3到9步骤构成了 Hystrix 的保护能力,通过这一些列步骤来执行任务,从而起到保护作用。
如果启用了 Hystrix Cache,任务执行前将先判断是否有相同命令执行的缓存。如果有则直接返回缓存的结果;如果没有缓存的结果,但启动了缓存,将缓存本次执行结果以供后续使用。
3.4 断路器是否打开
断路器(circuit-breaker)和保险丝类似,保险丝在发生危险时将会烧断以保护电路,而断路器可以在达到我们设定的阀值时触发短路(比如请求失败率达到50%),拒绝执行任何请求。如果断路器被打开,Hystrix 将不会执行命令,直接进入Fallback处理逻辑。
3.5 检查线程池/信号量情况
Hystrix 隔离方式有线程池隔离和信号量隔离。当使用Hystrix线程池时,Hystrix 默认为每个依赖服务分配10个线程,当10个线程都繁忙时,将拒绝执行命令。信号量同理。
3.6 执行任务
通过HystrixObservableCommand.construct()或者 HystrixCommand.run()
3.7 断路器健康检查
每次开始执行command、结束执行command以及发生异常等情况时,都会记录执行情况,例如:成功、失败、拒绝以及超时等情况,会定期处理这些数据,再根据设定的条件来判断是否开启断路器。
3.8 失败时执行 Fallback
在命令失败时执行用户指定的 Fallback 逻辑。上图中的断路、线程池拒绝、信号量拒绝、执行执行、执行超时都会进入 Fallback 处理。
3.9 返回执行结果
原始结果将以Observable形式返回,在返回给用户之前,会根据调用方式的不同做一些处理。下面是 Hystrix Return flow
如果你对Hystrix 的源码比较感兴趣,可以看下一篇“HystrixCommandAspect入口解析”。