重试机制: 在发生异常时,重新尝试请求,多次还是失败时,才会抛出异常。
应用场景: 可能由于网络抖动出现第一次调用失败,尝试几次就可以恢复正常。
比如Spring提供的声明式的重试类库Spring-Retry
。
Feign Core
也提供了自己的重试机制,基于Retryer
接口。
Retryer接口位于 feign-core
包中,声明了一个连续重试方法,及一个从不重试的实例对象。
public interface Retryer extends Cloneable {
// 不重试Retryer 示例对象,直接抛出异常
Retryer NEVER_RETRY = new Retryer() {
public void continueOrPropagate(RetryableException e) {
throw e;
}
public Retryer clone() {
return this;
}
};
// 连续重试
void continueOrPropagate(RetryableException var1);
}
Default
是Retryer 接口的默认实现类,也就是Feign 的默认重试策略。
public static class Default implements Retryer {
// 最大访问次数(包含了第一次和重试的次数)
private final int maxAttempts;
// 重试的间隔时间
private final long period;
// 最大重试间隔
private final long maxPeriod;
// 当前访问次数
int attempt;
// 总的重试间隔
long sleptForMillis;
// 默认配置(重试间隔100毫秒、最大重试间隔1S、最多访问5次)
public Default() {
this(100L, TimeUnit.SECONDS.toMillis(1L), 5);
}
public Default(long period, long maxPeriod, int maxAttempts) {
this.period = period;
this.maxPeriod = maxPeriod;
this.maxAttempts = maxAttempts;
this.attempt = 1;
}
protected long currentTimeMillis() {
return System.currentTimeMillis();
}
// 默认重试算法
public void continueOrPropagate(RetryableException e) {
// 如果重试的次数attempt大于最大重试次数,直接抛出异常
// attempt的初始值为1 ,i++是先进行运算,所以就表示当前重试次数,比较结束了之后再+1,
// 当第attempt 为5时,也就是第五次进入这里,条件为true,则会直接抛出异常,不再进行请求发送。
// 所以最终是,重试了四次,总共请求了5次。
if (this.attempt++ >= this.maxAttempts) {
throw e;
} else {
long interval;
// 响应数据是否包含了 Retry-After 头,这个头用来告诉用户代理需要等待多长时间之后才能继续发送请求
if (e.retryAfter() != null) {
interval = e.retryAfter().getTime() - this.currentTimeMillis();
if (interval > this.maxPeriod) {
interval = this.maxPeriod;
}
if (interval < 0L) {
return;
}
} else {
// 计算重试时间间隔
interval = this.nextMaxInterval();
}
try {
// 线程睡眠时间间隔 第一次为 150ms
Thread.sleep(interval);
} catch (InterruptedException var5) {
Thread.currentThread().interrupt();
throw e;
}
// 记录时间间隔总数
this.sleptForMillis += interval;
}
}
// 计算公式=》 配置的间隔时间(默认100ms)* (1.5 的(当前重试次数)次方)
// 接着判断 当前时间时间,是否超出了最大限制,超过了则返回最大时间间隔。
long nextMaxInterval() {
long interval = (long)((double)this.period * Math.pow(1.5D, (double)(this.attempt - 1)));
return interval > this.maxPeriod ? this.maxPeriod : interval;
}
public Retryer clone() {
return new Retryer.Default(this.period, this.maxPeriod, this.maxAttempts);
}
}
默认是不进行重试,所以需要配置。
@Bean
Retryer feignRetryer() {
return new Retryer.Default();
}
在订单服务配置一个超长的线程睡眠,我们手动触发一个读取超时,然后由这个超时异常,由最里层往上进行分析,因为之前分析过很多次执行流程源码了,这里只着重看下异常。
这个异常是有底层HTTP 框架抛出的,会被负载均衡客户端捕获到,这里catch 了Exception
,所以异常都会被捕获,捕获了之后,会转化为ClientException
。
static FeignException errorExecuting(Request request, IOException cause) {
return new RetryableException(-1, String.format("%s executing %s %s", cause.getMessage(), request.httpMethod(), request.url()), request.httpMethod(), cause, (Date)null, request);
}
最终,RetryableException
还是会被方法处理器的invoke
方法所捕获到,进行重试,直到重试到最大次数时,还是失败,就会抛出异常,结束循环,执行失败。
可以看到方法执行器,每次执行的时候,都会创建一个重试器,而且方法执行的时候,是一个死循环,只有抛出异常时或者正常返回时,才会结束。
public Object invoke(Object[] argv) throws Throwable {
RequestTemplate template = this.buildTemplateFromArgs.create(argv);
Options options = this.findOptions(argv);
// 每次请求都会创建一个重试处理器
Retryer retryer = this.retryer.clone();
while(true) {
try {
return this.executeAndDecode(template, options);
} catch (RetryableException var9) {
RetryableException e = var9;
// 捕获到RetryableException ,则会调用重试处理器,主要是进行执行时间计算,没有抛出异常,则继续循环
try {
retryer.continueOrPropagate(e);
} catch (RetryableException var8) {
Throwable cause = var8.getCause();
if (this.propagationPolicy == ExceptionPropagationPolicy.UNWRAP && cause != null) {
throw cause;
}
throw var8;
}
if (this.logLevel != Level.NONE) {
this.logger.logRetry(this.metadata.configKey(), this.logLevel);
}
}
}
}
默认的重试算法,会有一个间隔时间,线程会休眠,然后再重新执行请求(在Default 源码中已经分析过了)。
计算公式=》 配置的间隔时间(默认100ms)* (1.5 的(当前重试次数)次方)。
long interval = (long)((double)this.period * Math.pow(1.5D, (double)(this.attempt - 1)));
return interval > this.maxPeriod ? this.maxPeriod : interval;
这里可以举个例子,默认最大重试次数为5次,最大间隔时间为1秒,第一次休眠时间为100ms,由计算公式我们可以推倒出来如下结果:
1. 第一次重试,100*(1.5的1次方)间隔150 毫秒后再次请求
2. 第二次重试,100*(1.5的2次方)间隔225 毫秒后再次请求
3. 第三次重试,100*(1.5的3次方)间隔337 毫秒后再次请求
4. 第四次重试,100*(1.5的4次方)间隔506 毫秒后再次请求,
5. 第五次,这个时候,访问次数attempt 达到了最大值5次,不会再重试了,而是直接抛出异常,结束请求
通过以上源码分析,可知,只有IO 异常时,才会解析为可重试异常,进行重试操作。
和Feign 一样,Ribbon 也是有重试机制,接下来按照上面的套路分析下Ribbon 。
RxJava - JVM
响应式扩展Reactive Extensions 用于使用Java VM的可观察序列编写异步和基于事件的程序的库。
RxJS 是基于观察者模式和迭代器模式以函数式编程思维来实现的。RxJS 中含有两个基本概念:Observables
与 Observe
r。Observables
作为被观察者,是一个值或事件的流集合;而Observer
则作为观察者,根据 Observables
进行处理。
Observables 与 Observer 之间的订阅发布关系(观察者模式) 如下:
订阅:Observer 通过 Observable 提供的 subscribe() 方法订阅 Observable。
发布:Observable 通过回调 next 方法向 Observer 发布事件。
RetryHandler
接口是 Ribbon 的重试处理器,用来处理重试逻辑。
public interface RetryHandler {
RetryHandler DEFAULT = new DefaultLoadBalancerRetryHandler();
// 判断当前异常是否应该重试
boolean isRetriableException(Throwable var1, boolean var2);
// 是否是熔断类型异常;
boolean isCircuitTrippingException(Throwable var1);
// 获取当前节点最大重试次数
int getMaxRetriesOnSameServer();
// 获取调用不同节点的最大重试次数
int getMaxRetriesOnNextServer();
}
DefaultLoadBalancerRetryHandler
是RetryHandler
接口的默认实现类,重点是看它的属性和构造函数:
// 定义了可以重试的异常
private List<Class<? extends Throwable>> retriable = Lists.newArrayList(new Class[]{ConnectException.class, SocketTimeoutException.class});
// 定义了可以重试的异常
private List<Class<? extends Throwable>> circuitRelated = Lists.newArrayList(new Class[]{SocketException.class, SocketTimeoutException.class});
// 对应 MaxAutoRetries 配置
protected final int retrySameServer;
// 对应 MaxAutoRetriesNextServer 配置
protected final int retryNextServer;
// 是否开启重试
protected final boolean retryEnabled;
// 没有参数时,表示不重试
public DefaultLoadBalancerRetryHandler() {
this.retrySameServer = 0;
this.retryNextServer = 0;
this.retryEnabled = false;
}
RequestSpecificRetryHandler
也实现了RetryHandler
接口,从名字上看,是具有请求特征的重试处理器,每次请求时,Ribbon 都会创建单独的一个RequestSpecificRetryHandler
(这是实际使用的处理器) ,也就是会和当前客户端的配置(IClientConfig对象)绑定,实现不同请求,不同配置策略。
可以看到,RequestSpecificRetryHandler
代理了一个RetryHandler
,默认是DefaultLoadBalancerRetryHandler
,看他们的参数,可以知道,默认Ribbon 重试机制也是关闭的。
FeignLoadBalancer
的executeWithLoadBalancer
方法中调用buildLoadBalancerCommand
方法构造LoadBalancerCommand
对象。这个类是将Ribbon
将请求转为RxJava API调用的实现。
该类的selectServer
方法,会在注册中心中,根据负载均衡算法获取到一个健康的可用服务,然后返回一个Observable
对象,这是一个观察者对象,创建的时候传入了一个Subscriber
订阅者对象,当Observable
对象被订阅时,Subscriber
中的call 方法会执行。
// 定义一个事件源
// Create操作符是用来创建一个Observable
private Observable<Server> selectServer() {
return Observable.create(new OnSubscribe<Server>() {
// Observable 包含了一个OnSubscribe 对象
// 当Observable被订阅(subscribe)时,OnSubscribe接口的call方法会被执行
@Override
public void call(Subscriber<? super Server> next) {
try {
// 事件源被订阅时,执行call
// 负载均衡查询可用服务
Server server = loadBalancerContext.getServerFromLoadBalancer(loadBalancerURI, loadBalancerKey);
//
next.onNext(server);
next.onCompleted();
} catch (Exception e) {
next.onError(e);
}
}
});
}
在submit
方法中,会创建一个Observable
对象,用来观察请求执行状态,如果失败,则会重试,达到最大次数,抛出异常,结束重试。
public Observable<T> submit(final ServerOperation<T> operation) {
// 省略.....
// 获取重试器中 最大重试次数
final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer();
final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer();
// (server == null ? selectServer() : Observable.just(server))
// 获取可用服务的Observable 对象
Observable<T> o = (server == null ? selectServer() : Observable.just(server))
// concatMap发射的数据集是有序的
.concatMap(new Func1<Server, Observable<T>>() {
@Override
// 为选中的每台服务器执行调用
public Observable<T> call(Server server) {
context.setServer(server);
// 获取当前服务的监控记录
final ServerStats stats = loadBalancerContext.getServerStats(server);
// 为每次尝试和重试调用
Observable<T> o = Observable
.just(server)
.concatMap(new Func1<Server, Observable<T>>() {
@Override
public Observable<T> call(final Server server) {
// 重试次数计数
context.incAttemptCount();
loadBalancerContext.noteOpenConnection(stats);
if (listenerInvoker != null) {
try {
listenerInvoker.onStartWithServer(context.toExecutionInfo());
} catch (AbortExecutionException e) {
return Observable.error(e);
}
}
final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start();
//
return operation.call(server).doOnEach(new Observer<T>() {
private T entity;
@Override
public void onCompleted() {
recordStats(tracer, stats, entity, null);
// TODO: What to do if onNext or onError are never called?
}
@Override
public void onError(Throwable e) {
recordStats(tracer, stats, null, e);
logger.debug("Got error {} when executed on server {}", e, server);
if (listenerInvoker != null) {
listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo());
}
}
@Override
public void onNext(T entity) {
this.entity = entity;
if (listenerInvoker != null) {
listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo());
}
}
private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) {
tracer.stop();
loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler);
}
});
}
});
// 针对同一实例的重试回调
if (maxRetrysSame > 0)
o = o.retry(retryPolicy(maxRetrysSame, true));
return o;
}
});
// 重试下一个实例的回调
if (maxRetrysNext > 0 && server == null)
o = o.retry(retryPolicy(maxRetrysNext, false));
// 重试超过次数则终止调用并设置对应异常的回调
return o.onErrorResumeNext(new Func1<Throwable, Observable<T>>() {
// 封装异常信息并返回
@Override
public Observable<T> call(Throwable e) {
// 省略......
return Observable.error(e);
}
});
}
以下为全局配置,也可以添加名称前缀,指定客户端。
# Ribbon 配置
ribbon:
MaxAutoRetries: 2
MaxAutoRetriesNextServer: 3
OkToRetryOnAllOperations: false
各参数说明如下:
重试总次数计算公式:
MaxAutoRetries+(MaxAutoRetries+1)*(MaxAutoRetriesNextServer)
第一次请求时异常,重试2次,查询下一个节点,发送一次请求失败,进入重试,再重试2次失败,继续查询三次节点重试,所以以上配置会重试(2+3*3=11)次