Spring Cloud Hystrix 源码系列:工作原理
  JmMM2CFhUghs 2023年11月02日 58 0


Hystrix 译为 "豪猪",豪猪的棘刺能保护自己不受天敌伤害,代表了强大的防御能力。Hystrix 基于 RxJava 进行实现,RxJava 是一种基于观察者模式的响应式编程框架。Spring Cloud Hystrix 基于 Netflix Hystrix 实现,具备服务降级、服务熔断、线程与信号隔离、请求缓存、请求合并以及服务监控等强大功能。本文基于hystrix-core 1.5.18(近年来几乎很少更新,建议升级)。

目录

1. 由来

2. 功能介绍

2.1 HystrixCommand/HystrixObservableCommand

2.2 基本用法

3. 工作原理

3.1 构建命令

3.2 执行命令

3.3 检查缓存

3.4 断路器是否打开

3.5 检查线程池/信号量情况

3.6 执行任务

3.7 断路器健康检查

3.8 失败时执行 Fallback

3.9 返回执行结果


1. 由来

在单体应用中,一类服务、一个线程、一个Bug等局部因素压垮整个系统也是屡见不鲜。微服务中,服务间依赖重重,通过隔离,很好的控制住风险范围,再结合请求拒绝和超时控制,有效剔除 “老鼠屎”,避免坏了一锅粥。总之,隔离设计是绝妙的防护罩。Netflix 将该组件取名为 Hystrix,宣言为 "defend your app",寓意应该是:当系统受到伤害时,能够像豪猪的棘刺一样保护系统

Spring Cloud Hystrix 源码系列:工作原理_断路器

2. 功能介绍

Hystrix主要提供了以下功能点:

  1. 熔断器(Circuit Breaker)
  2. 隔离(Isolation),提供璧仓模式,实现了线程池隔离和信号量隔离
  3. 回退(fallback),Hystrix会在run()执行过程中出现错误、超时、线程池拒绝、断路器熔断等情况时进行降级处理,有default fallback、单级fallback、多级fallback。
  4. 请求合并(Request Collapsing),@HystrixCollapser,适用于请求的合并,通过指定时间窗口@HystrixProperty(name = "timerDelayInMilliseconds", value = "50")及@HystrixProperty(name = "maxRequestsInBatch", value = "200")来执行批量方法,暂时不展开讲。
  5. 请求缓存(Request Caching)
  6. 仪表盘

2.1 HystrixCommand/HystrixObservableCommand

Hystrix有两个请求命令 HystrixCommand、HystrixObservableCommand。

Spring Cloud Hystrix 源码系列:工作原理_线程池_02

HystrixCommand用在依赖服务返回单个操作结果的时候:

  • execute():同步执行。从依赖的服务返回一个单一的结果对象,或是在发生错误的时候抛出异常。
  • queue():异步执行。直接返回一个Future对象,其中包含了服务执行结束时要返回的单一结果对象。

HystrixObservableCommand用在依赖服务返回多个操作结果的时候:

  • observe():返回Obervable对象,他代表了操作的多个结果,它是一个Hot Observable
  • toObservable():同样返回Observable对象,也代表了操作多个结果,但它返回的是一个Cold Observable。

2.2 基本用法

有两种:手动自定义command和使用注解,手动自定义这里就不介绍了。这里介绍下注解需要两步(对破析源码有用):

第一步:隐式模式(用户不需要做什么,但你要知道),spirng boot会自动加载Feign的配置类HystrixAutoConfiguration(spring-cloud-netflix-core-1.4.4.RELEASE.jar/META-INF/spring.factories)

第二步:应用系统启动类中添加@EnableHystrix,它的作用是将spring.cloud.circuit.breaker.enabled设为true。

Hystrix默认配置都在HystrixCommandProperties类中,更多

###全集配置(设置熔断超时时间)
hystrix.command.default.execution.isolation.thread.timeoutInMilliseconds=10000
###局部配置
hystrix.command.hello.execution.isolation.thread.timeoutInMilliseconds=10000
public class CommandHelloWorld extends HystrixCommand<String> {
    private final String name;
    public CommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }
    @Override
    protected String run() throws Exception {
        int i = 1/0;
        return "Hello " + name + "!";
    }
    /**
     * 降级
     * 
     */
    @Override
    protected String getFallback() {
        return "faild";
    }
}
public class ObservableCommandHelloWorld extends HystrixObservableCommand<String> {
    private final String name;
    public ObservableCommandHelloWorld(String name) {
        super(HystrixCommandGroupKey.Factory.asKey("ExampleGroup"));
        this.name = name;
    }
    @Override
    protected Observable<String> construct() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    if(!subscriber.isUnsubscribed()) {
                        subscriber.onNext("Hello");
                        int i = 1 / 0; //模拟异常
                        subscriber.onNext(name + "!");
                        subscriber.onCompleted();
                    }
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).subscribeOn(Schedulers.io());
    }
    /**
     * 服务降级
     */
    @Override
    protected Observable<String> resumeWithFallback() {
        return Observable.create(new Observable.OnSubscribe<String>() {
            @Override
            public void call(Subscriber<? super String> subscriber) {
                try {
                    if (!subscriber.isUnsubscribed()) {
                        subscriber.onNext("失败了!");
                        subscriber.onNext("找大神来排查一下吧!");
                        subscriber.onCompleted();
                    }
                } catch (Exception e) {
                    subscriber.onError(e);
                }
            }
        }).subscribeOn(Schedulers.io());
    }
}
public class CommandHelloWorldTest {
    @Test
    public void testAll() {
        //同步
        new CommandHelloWorld("World").execute();
        //异步
        Future<String> fWorld = new CommandHelloWorld("World").queue();
        //代码1,Hot Observable不论 “事件源” 是否有“订阅者”都会在创建后对事件进行发布。每一个“订阅者”都有可能从“事件源”的中途开始的,并可能只是看到了整个操作的局部过程
        Observable<String> ho = new CommandHelloWorld("World").observe();
        ho.subscribe(new Action1<String>() {
            @Override
            public void call(String s) {
                System.out.println("call:" + s);
            }
        });
        //代码2,Cold Observable在没有 “订阅者” 的时候并不会发布,而是进行等待,知道有 “订阅者” 之后才发布事件
        Observable<String> co = new CommandHelloWorld("World").toObservable();
        System.out.println(co.toBlocking().single());       
        
        Observable<String> observable= new ObservableCommandHelloWorld("World").observe();
        Iterator<String> iterator = observable.toBlocking().getIterator();
         while(iterator.hasNext()) {
            System.out.println(iterator.next());
        }
    }
}

思考:HystrixCommand已具备了observe()和toObservable()的功能,和HystrixObservableCommand有和不同?

是的,但它的实现有一定的局限性,它返回的Observable只能发射一次数据,而HystrixObservableCommand实现的命令可以获取能发多次的Observable。

@HystrixCommand(fallbackMethod = "error")
public String hello() {
    return restTemplate.getForEntity("http://serviceName/hello", String.class).getBody();
}
public String error() {
    //多级降级
    return new FirstLevelFallbackCommand(tag).execute();
}
/**
 * LAZY参数表示使用toObservable()方式执行
 */
@HystrixCommand(observableExecutionMode = ObservableExecutionMode.LAZY, fallbackMethod = "toObserbableError") 
public Observable<String> getUserByName(final String name) {
    return Observable.create(new Observable.OnSubscribe<String>() {
        @Override
        public void call(Subscriber<? super String> subscriber) {
            try {
                if(!subscriber.isUnsubscribed()) {
                    subscriber.onNext("找到");
                    subscriber.onNext(name);
                    int i = 1/0; 抛异常,模拟服务降级
                    subscriber.onNext("了");
                    subscriber.onCompleted();
                }
            } catch (Exception e) {
                subscriber.onError(e);
            }
        }
    });
}
private static class FirstLevelFallbackCommand extends HystrixCommand<String> { 
    private String tag; 
    public FirstLevelFallbackCommand(String tag) { 
	super(Setter.withGroupKey(HystrixCommandGroupKey.Factory.asKey("FirstLevelFallbackCommand")));
	this.tag = tag;
    } 
    @Override
    protected String run() throws Exception {
      if ("error".equals(tag)) {
        throw new Exception("一级降级失败,二级降级处理");
      }
      return "成功";
    } 
    @Override
    protected String getFallback() {
      // 实际项目中,通常会在这里做残缺降级。
      System.out.println("二级降级执行成功");
      return "成功";
    } 
}

思考:多级降级的时候,为何将降级command单独做一个线程池?

如果主流程的command都失败了,可能线程池都已经被占满了,降级command必须用自己的独立的线程池。

3. 工作原理

 Hystrix 工作原理,当需要完成某项任务时,通过 Hystrix 将任务包裹起来,交由 Hystrix 来完成任务,从而享受 Hystrix 带来保护。这和古代镖局生意有点类似,将任务委托给镖局,以期安全完成任务。官方 Wiki 中对每一步都做了详细的描述,可以直接参考。下面 流程图 来源于 Hystrix Wiki

Spring Cloud Hystrix 源码系列:工作原理_Hystrix_03

3.1 构建命令

前面讲过Hystrix 提供了两个Command,可以使用这两个对象来包裹待执行的任务。 注解@HystrixCommand标记方法,Hystrix 将利用AOP自动将目标方法包装成HystrixCommand来执行,也可以继承他们来创建Command。任务委托给 Hystrix 后,Hystrix 可以应用自己的一系列保护机制,在执行用户任务的各节点(执行前、执行后、异常、超时等)做一系列的事情。

3.2 执行命令

有四种方式执行command:

  • R execute():同步执行,从依赖服务得到单一结果对象,实现为 queue().get()
  • Future queue():异步执行,返回一个 Future 以便获取执行结果,也是单一结果对象,实现为 toObservable().toBlocking().toFuture()
  • Observable observe():hot observable,创建Observable后会订阅Observable,可以返回多个结果
  • Observable toObservable():cold observable,返回一个Observable,只有订阅时才会执行,可以返回多个结果
public R execute() {
    return queue().get();// 利用queue()拿到Future, 执行 get()同步等待拿到执行结果
}
public Future<R> queue() {
    // 实现为 toObservable().toBlocking().toFuture()
    final Future<R> delegate = toObservable().toBlocking().toFuture();
    return delegate;
}
//利用toObservable()得到Observable并直接订阅它,立即执行命令
public Observable<R> observe() {
    ReplaySubject<R> subject = ReplaySubject.create();
    final Subscription sourceSubscription = toObservable().subscribe(subject);
    ...
}

3.3 检查缓存

第3到9步骤构成了 Hystrix 的保护能力,通过这一些列步骤来执行任务,从而起到保护作用。

如果启用了 Hystrix Cache,任务执行前将先判断是否有相同命令执行的缓存。如果有则直接返回缓存的结果;如果没有缓存的结果,但启动了缓存,将缓存本次执行结果以供后续使用。

3.4 断路器是否打开

断路器(circuit-breaker)和保险丝类似,保险丝在发生危险时将会烧断以保护电路,而断路器可以在达到我们设定的阀值时触发短路(比如请求失败率达到50%),拒绝执行任何请求。如果断路器被打开,Hystrix 将不会执行命令,直接进入Fallback处理逻辑。

3.5 检查线程池/信号量情况

Hystrix 隔离方式有线程池隔离和信号量隔离。当使用Hystrix线程池时,Hystrix 默认为每个依赖服务分配10个线程,当10个线程都繁忙时,将拒绝执行命令。信号量同理。

3.6 执行任务

通过HystrixObservableCommand.construct()或者 HystrixCommand.run()

3.7 断路器健康检查

每次开始执行command、结束执行command以及发生异常等情况时,都会记录执行情况,例如:成功、失败、拒绝以及超时等情况,会定期处理这些数据,再根据设定的条件来判断是否开启断路器。

3.8 失败时执行 Fallback

在命令失败时执行用户指定的 Fallback 逻辑。上图中的断路、线程池拒绝、信号量拒绝、执行执行、执行超时都会进入 Fallback 处理。

3.9 返回执行结果

原始结果将以Observable形式返回,在返回给用户之前,会根据调用方式的不同做一些处理。下面是 Hystrix Return flow

Spring Cloud Hystrix 源码系列:工作原理_Hystrix_04

如果你对Hystrix 的源码比较感兴趣,可以看下一篇“HystrixCommandAspect入口解析”。

【版权声明】本文内容来自摩杜云社区用户原创、第三方投稿、转载,内容版权归原作者所有。本网站的目的在于传递更多信息,不拥有版权,亦不承担相应法律责任。如果您发现本社区中有涉嫌抄袭的内容,欢迎发送邮件进行举报,并提供相关证据,一经查实,本社区将立刻删除涉嫌侵权内容,举报邮箱: cloudbbs@moduyun.com

  1. 分享:
最后一次编辑于 2023年11月08日 0

暂无评论

推荐阅读
JmMM2CFhUghs