本文基于Spring Cloud Greenwich.SR2、Spring Boot 2.1.6.RELEASE
feign: client: config: default: connect-timeout: 2000 read-timeout: 2000
Spring Cloud默认关闭了Feign的重试机制
//org.springframework.cloud.openfeign.FeignClientsConfiguration @Bean @ConditionalOnMissingBean public Retryer feignRetryer() { return Retryer.NEVER_RETRY; } //feign.Retryer /** * Implementation that never retries request. It propagates the RetryableException. */ Retryer NEVER_RETRY = new Retryer() { @Override public void continueOrPropagate(RetryableException e) { throw e; } @Override public Retryer clone() { return this; } };
@Bean public Retryer feignRetryer() { return new Retryer.Default(); }
#ribbon的超时时间 #如果ribbon和feign的超时时间都配置了,ribbon的配置会被覆盖 ribbon: ReadTimeout: 3000 ConnectTimeout: 3000
ribbon: MaxAutoRetries: 1 #同一台实例最大重试次数,不包括首次调用 MaxAutoRetriesNextServer: 1 #重试负载均衡其他的实例最大重试次数,不包括首次调用 OkToRetryOnAllOperations: false #是否所有操作都重试
hystrix: command: default: execution: timeout: enabled: true isolation: thread: timeoutInMilliseconds: 1000
protected static int getRibbonTimeout(IClientConfig config, String commandKey) { int ribbonTimeout; // 这是比较异常的情况,不说 if (config == null) { ribbonTimeout = RibbonClientConfiguration.DEFAULT_READ_TIMEOUT + RibbonClientConfiguration.DEFAULT_CONNECT_TIMEOUT; } else { // 这里获取了四个参数,ReadTimeout,ConnectTimeout,MaxAutoRetries, MaxAutoRetriesNextServer int ribbonReadTimeout = getTimeout(config, commandKey, "ReadTimeout", IClientConfigKey.Keys.ReadTimeout, RibbonClientConfiguration.DEFAULT_READ_TIMEOUT); int ribbonConnectTimeout = getTimeout(config, commandKey, "ConnectTimeout", IClientConfigKey.Keys.ConnectTimeout, RibbonClientConfiguration.DEFAULT_CONNECT_TIMEOUT); int maxAutoRetries = getTimeout(config, commandKey, "MaxAutoRetries", IClientConfigKey.Keys.MaxAutoRetries, DefaultClientConfigImpl.DEFAULT_MAX_AUTO_RETRIES); int maxAutoRetriesNextServer = getTimeout(config, commandKey, "MaxAutoRetriesNextServer", IClientConfigKey.Keys.MaxAutoRetriesNextServer, DefaultClientConfigImpl.DEFAULT_MAX_AUTO_RETRIES_NEXT_SERVER); // ribbonTimeout的计算方法在这里,以上文的设置为例 // ribbonTimeout = (3000 + 3000) * (1 + 1) * (1 + 1) = 24000(毫秒) ribbonTimeout = (ribbonReadTimeout + ribbonConnectTimeout) * (maxAutoRetries + 1) * (maxAutoRetriesNextServer + 1); } return ribbonTimeout; }
@Bean @Primary @LoadBalanced public RestTemplate lbRestTemplate() { SimpleClientHttpRequestFactory simpleClientHttpRequestFactory = new SimpleClientHttpRequestFactory(); simpleClientHttpRequestFactory.setConnectTimeout(1000); simpleClientHttpRequestFactory.setReadTimeout(1000); return new RestTemplate(simpleClientHttpRequestFactory); }
//AbstractLoadBalancerAwareClient.java public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommandcommand = buildLoadBalancerCommand(request, requestConfig); //省略... } protected LoadBalancerCommand buildLoadBalancerCommand(final S request, final IClientConfig config) { RequestSpecificRetryHandler handler = getRequestSpecificRetryHandler(request, config); //省略... } //FeignLoadBalancer.java //FeignLoadBalancer是AbstractLoadBalancerAwareClient的子类 @Override public RequestSpecificRetryHandler getRequestSpecificRetryHandler( RibbonRequest request, IClientConfig requestConfig) { //如果isOkToRetryOnAllOperations参数为true if (this.ribbon.isOkToRetryOnAllOperations()) { return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig); } if (!request.toRequest().httpMethod().name().equals("GET")) { return new RequestSpecificRetryHandler(true, false, this.getRetryHandler(), requestConfig); } else { return new RequestSpecificRetryHandler(true, true, this.getRetryHandler(), requestConfig); } } //RequestSpecificRetryHandler.java /** * okToRetryOnConnectErrors:只对连接错误发起重试 * okToRetryOnAllErrors:对于所有错误都会发起重试 */ public RequestSpecificRetryHandler(boolean okToRetryOnConnectErrors, boolean okToRetryOnAllErrors, RetryHandler baseRetryHandler, @Nullable IClientConfig requestConfig) { Preconditions.checkNotNull(baseRetryHandler); this.okToRetryOnConnectErrors = okToRetryOnConnectErrors; this.okToRetryOnAllErrors = okToRetryOnAllErrors; this.fallback = baseRetryHandler; if (requestConfig != null) { if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetries)) { retrySameServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetries); } if (requestConfig.containsProperty(CommonClientConfigKey.MaxAutoRetriesNextServer)) { retryNextServer = requestConfig.get(CommonClientConfigKey.MaxAutoRetriesNextServer); } } }
//RequestSpecificRetryHandler.java @Override public boolean isRetriableException(Throwable e, boolean sameServer) { if (okToRetryOnAllErrors) { return true; } else if (e instanceof ClientException) { ClientException ce = (ClientException) e; if (ce.getErrorType() == ClientException.ErrorType.SERVER_THROTTLED) { return !sameServer; } else { return false; } } else { return okToRetryOnConnectErrors && isConnectionException(e); } }
//SynchronousMethodHandler.java @Override public Object invoke(Object[] argv) throws Throwable { RequestTemplate template = buildTemplateFromArgs.create(argv); //这里我们假设你开启了Feign的重试,并且使用的是Retryer.Default这个类 Retryer retryer = this.retryer.clone(); while (true) { try { //这里会调用到5.3节executeWithLoadBalancer()方法 return executeAndDecode(template); } catch (RetryableException e) { try { //在重试次数之内,会等待一段时间返回,继续while循环,否则会抛出异常跳出循环 retryer.continueOrPropagate(e); } catch (RetryableException th) { Throwable cause = th.getCause(); if (propagationPolicy == UNWRAP && cause != null) { throw cause; } else { throw th; } } if (logLevel != Logger.Level.NONE) { logger.logRetry(metadata.configKey(), logLevel); } continue; } } }
//AbstractLoadBalancerAwareClient.java public T executeWithLoadBalancer(final S request, final IClientConfig requestConfig) throws ClientException { LoadBalancerCommandcommand = buildLoadBalancerCommand(request, requestConfig); try { return command.submit( new ServerOperation () { @Override public Observable call(Server server) { URI finalUri = reconstructURIWithServer(server, request.getUri()); S requestForServer = (S) request.replaceUri(finalUri); try { return Observable.just(AbstractLoadBalancerAwareClient.this.execute(requestForServer, requestConfig)); } catch (Exception e) { return Observable.error(e); } } }) .toBlocking() .single(); } catch (Exception e) { Throwable t = e.getCause(); if (t instanceof ClientException) { throw (ClientException) t; } else { throw new ClientException(e); } } }
//LoadBalancerCommand.java public Observablesubmit(final ServerOperation operation) { final ExecutionInfoContext context = new ExecutionInfoContext(); if (listenerInvoker != null) { try { listenerInvoker.onExecutionStart(); } catch (AbortExecutionException e) { return Observable.error(e); } } final int maxRetrysSame = retryHandler.getMaxRetriesOnSameServer(); final int maxRetrysNext = retryHandler.getMaxRetriesOnNextServer(); // Use the load balancer Observable o = (server == null ? selectServer() : Observable.just(server)) .concatMap(new Func1 >() { @Override // Called for each server being selected public Observable call(Server server) { context.setServer(server); final ServerStats stats = loadBalancerContext.getServerStats(server); // Called for each attempt and retry Observable o = Observable .just(server) .concatMap(new Func1 >() { @Override public Observable call(final Server server) { context.incAttemptCount(); loadBalancerContext.noteOpenConnection(stats); if (listenerInvoker != null) { try { listenerInvoker.onStartWithServer(context.toExecutionInfo()); } catch (AbortExecutionException e) { return Observable.error(e); } } final Stopwatch tracer = loadBalancerContext.getExecuteTracer().start(); return operation.call(server).doOnEach(new Observer () { private T entity; @Override public void onCompleted() { recordStats(tracer, stats, entity, null); // TODO: What to do if onNext or onError are never called? } @Override public void onError(Throwable e) { recordStats(tracer, stats, null, e); logger.debug("Got error {} when executed on server {}", e, server); if (listenerInvoker != null) { listenerInvoker.onExceptionWithServer(e, context.toExecutionInfo()); } } @Override public void onNext(T entity) { this.entity = entity; if (listenerInvoker != null) { listenerInvoker.onExecutionSuccess(entity, context.toExecutionInfo()); } } private void recordStats(Stopwatch tracer, ServerStats stats, Object entity, Throwable exception) { tracer.stop(); loadBalancerContext.noteRequestCompletion(stats, entity, exception, tracer.getDuration(TimeUnit.MILLISECONDS), retryHandler); } }); } }); if (maxRetrysSame > 0) o = o.retry(retryPolicy(maxRetrysSame, true)); return o; } }); if (maxRetrysNext > 0 && server == null) o = o.retry(retryPolicy(maxRetrysNext, false)); return o.onErrorResumeNext(new Func1 >() { @Override public Observable call(Throwable e) { if (context.getAttemptCount() > 0) { if (maxRetrysNext > 0 && context.getServerAttemptCount() == (maxRetrysNext + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_NEXTSERVER_EXCEEDED, "Number of retries on next server exceeded max " + maxRetrysNext + " retries, while making a call for: " + context.getServer(), e); } else if (maxRetrysSame > 0 && context.getAttemptCount() == (maxRetrysSame + 1)) { e = new ClientException(ClientException.ErrorType.NUMBEROF_RETRIES_EXEEDED, "Number of retries exceeded max " + maxRetrysSame + " retries, while making a call for: " + context.getServer(), e); } } if (listenerInvoker != null) { listenerInvoker.onExecutionFailed(e, context.toFinalExecutionInfo()); } return Observable.error(e); } }); }
