使用Hystrix实现降级熔断

之前有说到过,分布式系统降级的方式可以通过配置中心手动降级。今天介绍一下通过Hystrix实现自动降级。

降级Demo

话不多说,下面先来一个Demo(对于Hystrix的依赖,这里就不再介绍了)。

GetStockServiceCommand

public class GetStockServiceCommand extends HystrixCommand<String> {

    private StockService stockService;

    public GetStockServiceCommand (StockService stockService) {
        super(setter());
        this.stockService = stockService;
    }

    private static Setter setter() {
        // 服务分组
        HystrixCommandGroupKey groupKey = HystrixCommandGroupKey.Factory.asKey("stock");
        // 命令配置
        HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
                .withExecutionIsolationStrategy(HystrixCommandProperties.ExecutionIsolationStrategy.THREAD)
                .withFallbackEnabled(true)
                .withFallbackIsolationSemaphoreMaxConcurrentRequests(100)
                .withExecutionIsolationThreadInterruptOnFutureCancel(true)
                .withExecutionIsolationThreadInterruptOnTimeout(true)
                .withExecutionTimeoutEnabled(true)
                .withExecutionTimeoutInMilliseconds(1000);
        return HystrixCommand.Setter.withGroupKey(groupKey).andCommandPropertiesDefaults(commandProperties);
    }

    @Override
    protected String run() throws Exception {
        // 可以通过异常/Thread.sleep()模拟超时
        return stockService.getStock();
    }

    @Override
    protected String getFallback() {
        return "有货";
    }
}

StockService

public class StockService {

    public String getStock() {
        throw new RuntimeException("出现异常了!");
    }
}

GetStockTest

public class GetStockTest {

    public static void main(String[] args) {
        StockService stockService = new StockService();
        GetStockServiceCommand stockServiceCommand = new GetStockServiceCommand(stockService);
        String result = stockServiceCommand.execute();// 同步执行
        System.out.println(result);

        // 异步调用, 可自由控制获取结果时机
        // Future<String> future = helloworldCommand.queue();
        // get操作不能超过command定义的超时时间, 默认1秒
        // result = future.get(100, TimeUnit.MILLISECONDS);
    }
}

GetStockTest为测试入口,为了对stockService服务做自动降级。对stockService做了一层Command包装,然后调用execute()去执行GetStockServiceCommand里面的run()方法。因为stockService.getStock()方法抛出了异常,所以会执行降级操作,也就是getFallback()方法会被执行并返回。

降级参数

使用HystrixCommandProperties配置和getFallback()方法可以实现降级处理。下面详细介绍一下配置参数:

withFallbackEnabled:是否启用降级,若启用,则在超时或异常时调用getFallback进行降级。(默认开启)

withFallbackIsolationSemaphoreMaxConcurrentRequests:配置了fallback()请求并发的信号量,当调用fallback()的并发超过阀值(默认10),则会进入快速失败。

withExecutionIsolationThreadInterruptOnFutureCancel:当隔离策略为THREAD时,当线程执行超时,是否进行中断处理,即异步的Future#cancel()。(默认为false)

withExecutionIsolationThreadInterruptOnTimeout:当隔离策略为THREAD时,当线程执行超时,是否进行中断处理。(默认为true)这里指的是同步调用:execute()

withExecutionTimeoutEnabled:是否启用超时机制,默认为true。

withExecutionTimeoutInMilliseconds:执行超时时间,默认1000毫秒。1、配置线程隔离,则执行中断处理;2、配置信号量隔离,则进行终止操作。因为信号量隔离和主线程是在一个线程中执行,其不会中断线程处理。所以要根据实际情况选择类型。

除了上面的部分参数,对于getFallback()还需要注意以下的几点:

1、最大并发数受fallbackIsolationSemaphoreMaxConcurrentRequests控制,如果失败率非常高,则需要重新配置该参数。如果并发数超过了该配置,则不会再执行getFallback(),而是快速失败。如抛出HystrixRuntimeException的异常。

2、该方法不能进行网络调用,应该只是返回兜底的数据。

3、如果必须要走一个网络调用,则就需要调用另外一个Command。

4、Command可以有降级和熔断机制,而getFallback只有fallbackIsolationSemaphoreMaxConcurrentRequest参数控制最大并发数。

熔断

Command首先调用HystrixCircuitBreaker#allowRequest判断是否熔断了,如果没有则执行Command#run方法;若熔断了则直接调用Command#getFallback方法降级处理。

通过circuitBreakerSleepWindowInMilliseconds可以控制一个时间窗口内,可进行一次请求测试。若测试成功,则闭合熔断开关,否则还是打开状态,从而实现了快速失败和恢复。关于熔断有以下几个概念需要了解一下:

概念

闭合(Closed):如果配置了熔断开关强制闭合,或者当前请求失败率没有超过阀值,则熔断开关处于闭合状态,此时不会启动熔断机制,即不进行降级处理。

打开(Open):如果配置了熔断开关强制打开,或者当前失败率超过了阀值,则熔断开关打开,此时会调用getFallback()方法进行降级处理。

半打开(Half-Open):当熔断处于打开状态后,不能一直熔断下去,需要在一个时间窗口之后进行重试,这就是半打开状态。Hystrix允许在circuitBreakerSleepWindowInMilliseconds的时间窗口内进行一次重试。重试成功后闭合熔断开关,否则熔断开关还是处于打开状态。

上面所指的失败包含:异常、超时、线程池拒绝、信号量拒绝的总和。

配置示例

HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
    .withCircuitBreakerEnabled(true)// 默认为true
    .withCircuitBreakerForceClosed(false)// 默认为false
    .withCircuitBreakerForceOpen(false)// 默认为false
    .withCircuitBreakerErrorThresholdPercentage(50)// 默认50%
    .withCircuitBreakerRequestVolumeThreshold(20)// 默认为20
    .withCircuitBreakerSleepWindowInMilliseconds(5000)// 默认5秒

withCircuitBreakerEnabled:是否开启熔断机制,默认为true。

withCircuitBreakerForceClosed:是否强制关闭熔断开关,如果强制关闭了熔断开关,则请求不会被降级,一些场景可以动态设置该开关,默认为false。

withCircuitBreakerForceOpen:是否强制打开熔断开关,如果打开了,则请求强制降级调用getFallback处理,可以通过动态配置来打开开关实现一些特殊需求,默认为false。

withCircuitBreakerErrorThresholdPercentage:如果在一个采样时间窗口内,失败率超过该配置,则自动打开熔断开关,快速失败。默认采样周期为10秒,失败率为50%。

withCircuitBreakerRequestVolumeThreshold:在熔断开关闭合的情况下,在进行失败率判断之前,一个采样周期内必须进行至少N个请求才能进行采样统计。目的是有足够的采样使得失败率计算的比较接近真实值,默认为20.

withCircuirBreakerSleepWindowInMilliseconds:熔断后的重试时间窗口,在窗口内只允许一次重试。在熔断开关打开后,若重试成功,则重试Health采样统计,并闭合熔断开关实现快速恢复。否则熔断开关还是打开状态,会进行快速失败。

通过下面的方法可以获取熔断器的状态:

isCircuitBreakerOpen:熔断开关是否打开了,通过 circuitBreakerForceOpen().get() || (!circuitBreakerForceClosed().get() && circuitBreaker.isOpen()) 判断。

isResponseShortCircuited:isCircuitBreakerOpen=true,且调用getFallback()时返回true。

采样统计

Hystrix在内存中存储采样数据,支持如下3种采样:

BucketedCounterStream:计数统计。记录一定时间窗口内的失败、超时、线程池拒绝、信号量拒绝数量。写入第N组时,用前N-1组统计,然后基于时间窗口平滑后移统计。

RollingConcurrencyStream:最大并发数统计。如Command/ThreadPool的最大并发数。

RollingDistributionStream:延迟百分比统计,和HystrixRollingNumber类似,差别在于其是百分位数的统计。比如每组记录P(如100)个数值,统计时用前N-1组数据,将分组数据按从小到大排序,然后累加,处于p%位置的就是p百分位数,通过它可以实现P50、P99、P999,Hystrix用来统计延时的分布情况。

1、Command、ThreadPool计数/最大并发采样统计

HystrixThreadPoolProperties.Setter threadPoolProperties = HystrixThreadPoolProperties.Setter()
    .withMetricsRollingStatisticalWindowInMilliseconds(1000)
    .withMetricsRollingStatisticalWindowBuckets(10);
HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
    .withMetricsRollingStatisticalWindowInMilliseconds(10000)
    .withMetricsRollingStatisticalWindowBuckets(10);

withMetricsRollingStatisticalWindowInMilliseconds:配置采样统计滚转之间窗口,默认为10秒。

withMetricsRollingStatisticalWindowBuckets:配置采用统计滚转时间窗口内的桶的总数量,默认为10,比如时间窗口为10000,桶数量为10,则采用统计间隔为每秒一个桶统计。

2、Command健康度采样统计

HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
    .withMetricsRollingStatisticalWindowInMilliseconds(10000)
    .withMetricsHealthSnapshotIntervalInMilliseconds(500);

withMetricsRollingStatisticalWindowInMilliseconds:同上。

withMetricsHealthSnapshotIntervalInMilliseconds:记录健康度采用统计的快照频率,默认为500ms,即500ms一个采样统计间隔,那么桶的数量为10000/500=20个。

该统计在熔断机制中使用时,如果计算熔断的频率非常高,则需要控制好采样的频率。如果太频繁,就有可能造成CPU计算密集。所以选择Hystrix要注意此处的性能消耗和调优,如果此处是瓶颈,则可以费除掉统计。

3、Command时延分布采样统计

HystrixCommandProperties.Setter commandProperties = HystrixCommandProperties.Setter()
    .withMetricsRollingPercentileWindowInMilliseconds(60000)
    .withMetricsRollingPercentileWindowBuckets(6);

上面默认采样滚转时间窗口为60S,有6个桶,即每10S一个桶统计。

其他说明

Hystrix流程结构

流程说明

1、每次调用创建一个新的HystrixCommand,把依赖调用封装在run()方法中。

2、执行execute/queue做同步或异步调用。

3、判断熔断器(circuit-breaker)是否打开,如果打开跳到步骤8,进行降级策略,如果关闭进入步骤4。

4、判断线程池/队列/信号量是否跑满,如果跑满进入降级步骤8,否则继续后续步骤。

5、调用HystrixCommand的run方法,运行依赖逻辑。

5a、依赖逻辑调用超时,进入步骤8。

6、判断逻辑是否调用成功。

6a、返回成功调用结果。

6b、调用出错,进入步骤8。

7、计算熔断器状态,所有的运行状态(成功、失败、拒绝、超时)上报给熔断器,用于统计从而判断熔断器状态。

8、getFallback()降级逻辑。

以下四种情况将触发getFallback调用:

(1) run()方法抛出非HystrixBadRequestException异常

(2) run()方法调用超时

(3) 熔断器开启拦截调用 

(4) 线程池/队列/信号量是否跑满

8a、没有实现getFallback的Command将直接抛出异常。

8b、fallback降级逻辑调用成功直接返回。

8c、降级逻辑调用失败抛出异常。

9、返回执行成功结果。

线程/信号量隔离

线程隔离

把执行依赖代码的线程与请求线程分离,请求线程可以自由控制离开的时间(异步过程)。通过线程池大小可以控制并发量,当线程池饱和时可以提前拒绝服务,防止依赖问题扩散。线上建议线程池不要设置过大,否则大量堵塞线程有可能会拖慢服务器。

信号量隔离

信号隔离也可以用于限制并发访问,防止阻塞扩散,与线程隔离最大不同在于执行依赖代码的线程依然是请求线程(该线程需要通过信号申请),如果客户端是可信的且可以快速返回,可以使用信号隔离替换线程隔离,降低开销。


参考:《亿级流量网站架构核心技术》、https://github.com/Netflix/Hystrix/wiki​​​​​​​