2年前 (2022-11-14)  相关技术 |   抢沙发  1060 
文章评分 0 次,平均分 0.0

在本文中,我们将讨论Resilience4j库【https://github.com/resilience4j/resilience4j】。

该库通过管理远程通信的容错能力来帮助实现弹性系统。

该库受到Hystrix的启发,但提供了更方便的API和许多其他功能,如Rate Limiter(阻止太频繁的请求)、Bulkhead(避免太多并发请求)等。

Maven设置

首先,我们需要将目标模块添加到pom.xml(例如,我们在这里添加断路器):

<dependency>
    <groupId>io.github.resilience4j</groupId>
    <artifactId>resilience4j-circuitbreaker</artifactId>
    <version>0.12.1</version>
</dependency>

断路器

请注意,对于这个模块,我们需要上面所示的resilience4j断路器依赖关系。

断路器模式有助于我们在远程服务关闭时防止级联故障。

在多次尝试失败后,我们可以认为服务不可用/过载,并急切地拒绝所有后续的请求。这样,我们可以为可能失败的调用节省系统资源。

让我们看看如何通过Resilience4j实现这一点。

首先,我们需要定义要使用的设置。最简单的方法是使用默认设置:

CircuitBreakerRegistry circuitBreakerRegistry
  = CircuitBreakerRegistry.ofDefaults();

还可以使用自定义参数:

CircuitBreakerConfig config = CircuitBreakerConfig.custom()
  .failureRateThreshold(20)
  .ringBufferSizeInClosedState(5)
  .build();

在这里,我们将速率阈值设置为20%,并且最少尝试5次调用。

然后,我们创建一个断路器对象并通过它调用远程服务:

interface RemoteService {
    int process(int i);
}

CircuitBreakerRegistry registry = CircuitBreakerRegistry.of(config);
CircuitBreaker circuitBreaker = registry.circuitBreaker("my");
Function<Integer, Integer> decorated = CircuitBreaker
  .decorateFunction(circuitBreaker, service::process);

最后,让我们通过JUnit测试来看看这是如何工作的。

我们将尝试调用服务10次。我们应该能够验证调用至少尝试了5次,然后在20%的调用失败后立即停止:

when(service.process(any(Integer.class))).thenThrow(new RuntimeException());

for (int i = 0; i < 10; i++) {
    try {
        decorated.apply(i);
    } catch (Exception ignore) {}
}

verify(service, times(5)).process(any(Integer.class));

断路器的状态和设置

断路器可以处于以下三种状态之一:

  • CLOSED – 一切正常,无短路
  • OPEN – 远程服务器关闭,所有对它的请求都短路
  • HALF_OPEN – 进入OPEN状态后经过的配置时间,断路器允许请求检查远程服务是否恢复在线

我们可以配置以下设置:

  • 断路器打开并开始短路调用的故障率阈值
  • 等待时间,定义断路器在切换到半开之前应保持断开的时间
  • 断路器半开或闭合时环形缓冲区的大小
  • 处理电路断路器事件的自定义电路断路器EventListener
  • 一个自定义谓词,用于评估异常是否应算作失败,从而提高失败率

Rate Limiter

与上一节类似,此功能需要resilience4j Rate Limiter速率限制器依赖性。

顾名思义,此功能允许限制对某些服务的访问。它的API与断路器非常相似——有注册表、配置和限制器类。

下面是它的示例:

RateLimiterConfig config = RateLimiterConfig.custom().limitForPeriod(2).build();
RateLimiterRegistry registry = RateLimiterRegistry.of(config);
RateLimiter rateLimiter = registry.rateLimiter("my");
Function<Integer, Integer> decorated
  = RateLimiter.decorateFunction(rateLimiter, service::process);

现在,如果需要符合速率限制器配置,则所有调用都将调用修饰的服务块。

我们可以配置如下参数:

  • 限额刷新周期
  • 刷新期的权限限制
  • 默认等待权限持续时间

Bulkhead

这里,我们首先需要resilience4j-bulkhead依赖性。

可以限制对特定服务的并发调用数。

让我们看一个使用Bulkhead API配置最大并发调用数的示例:

BulkheadConfig config = BulkheadConfig.custom().maxConcurrentCalls(1).build();
BulkheadRegistry registry = BulkheadRegistry.of(config);
Bulkhead bulkhead = registry.bulkhead("my");
Function<Integer, Integer> decorated
  = Bulkhead.decorateFunction(bulkhead, service::process);

为了测试此配置,我们将调用模拟服务的方法。

然后,我们确保隔板不允许任何其他呼叫:

CountDownLatch latch = new CountDownLatch(1);
when(service.process(anyInt())).thenAnswer(invocation -> {
    latch.countDown();
    Thread.currentThread().join();
    return null;
});

ForkJoinTask<?> task = ForkJoinPool.commonPool().submit(() -> {
    try {
        decorated.apply(1);
    } finally {
        bulkhead.onComplete();
    }
});
latch.await();
assertThat(bulkhead.isCallPermitted()).isFalse();

我们可以配置以下设置:

  • bulkhead允许的最大并行执行量
  • 线程尝试进入饱和bulkhead时等待的最大时间

Retry

对于这个特性,我们需要将resilience4j-retry库添加到项目中。

我们可以使用retry API自动重试失败的调用:

RetryConfig config = RetryConfig.custom().maxAttempts(2).build();
RetryRegistry registry = RetryRegistry.of(config);
Retry retry = registry.retry("my");
Function<Integer, Void> decorated
  = Retry.decorateFunction(retry, (Integer s) -> {
        service.process(s);
        return null;
    });

现在让我们模拟在远程服务调用期间抛出异常的情况,并确保库自动重试失败的调用:

when(service.process(anyInt())).thenThrow(new RuntimeException());
try {
    decorated.apply(1);
    fail("Expected an exception to be thrown if all retries failed");
} catch (Exception e) {
    verify(service, times(2)).process(any(Integer.class));
}

我们还可以配置以下内容:

  • 最大尝试次数
  • 重试前的等待时间
  • 一个自定义函数,用于修改故障后的等待间隔
  • 一个自定义谓词,用于评估异常是否会导致重试调用

Cache

缓存模块需要resilience4j-cache依赖项。

初始化看起来与其他模块略有不同:

这里的缓存是由使用的JSR-107缓存实现完成的,Resilience4j提供了一种应用它的方法。

请注意,没有用于修饰函数的API(如Cache.decorateFunction(Function)),该API仅支持SupplierCallable类型。

TimeLimiter

对于这个模块,我们必须添加resilience4j-timelimiter依赖项。

使用TimeLimiter可以限制调用远程服务的时间。

为了演示,让我们设置一个配置超时为1毫秒的TimeLimiter:

long ttl = 1;
TimeLimiterConfig config
  = TimeLimiterConfig.custom().timeoutDuration(Duration.ofMillis(ttl)).build();
TimeLimiter timeLimiter = TimeLimiter.of(config);

接下来,让我们验证Resilience4j是否调用Future.get()

Future futureMock = mock(Future.class);
Callable restrictedCall
  = TimeLimiter.decorateFutureSupplier(timeLimiter, () -> futureMock);
restrictedCall.call();

verify(futureMock).get(ttl, TimeUnit.MILLISECONDS);

我们还可以将其与断路器相结合:

Callable chainedCallable
  = CircuitBreaker.decorateCallable(circuitBreaker, restrictedCall);

附加模块

Resilience4j还提供了许多附加模块,便于与流行的框架和库集成。

一些更知名的集成包括:

  • Spring Boot – resilience4j-spring-boot module
  • Ratpack – resilience4j-ratpack module
  • Retrofit – resilience4j-retrofit module
  • Vertx – resilience4j-vertx module
  • Dropwizard – resilience4j-metrics module
  • Prometheus – resilience4j-prometheus module

结论

在本文中,我们介绍了Resilience4j库的不同方面,并了解了如何使用它来解决服务器间通信中的各种容错问题。

源码地址:https://github.com/eugenp/tutorials/tree/master/libraries-6

 

除特别注明外,本站所有文章均为老K的Java博客原创,转载请注明出处来自https://javakk.com/2763.html

关于

发表评论

表情 格式

暂无评论

登录

忘记密码 ?

切换登录

注册