本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent
要想实现我们上一节中提到的:
- 需要在重试以及断路中加一些日志,便于日后的优化
- 需要定义重试的 Exception,并且与断路器相结合,将非 2xx 的响应码也封装成特定的异常
- 需要在断路器相关的 Operator 中增加类似于 FeignClient 中的负载均衡的数据更新,使得负载均衡更加智能
我们需要将 resilience4j 本身提供的粘合库做一些改造,其实主要就是对 resilience4j 实现的 project reactor 的 Operator 进行改造。
关于断路器的改造
首先,WebClient 的返回对象只可能是
ClientResponse
类型,所以我们这里改造出来的 Operator 不必带上形参,只需要针对 ClientResponse 即可,即:
public class ClientResponseCircuitBreakerOperator implements UnaryOperator<Publisher<ClientResponse>> {...}
在原有的断路器逻辑中,我们需要加入针对 GET 方法以及之前定义的可以重试的路径匹配配置可以重试的逻辑,这需要我们拿到原有请求的 URL 信息。但是 ClientResponse 中并没有暴露这些信息的接口,其默认实现 DefaultClientResponse(我们只要没有自己给 WebClient 加入特殊的改造逻辑,实现都是 DefaultClientResponse) 中的
request()
方法可以获取请求 HttpRequest,其中包含 url 信息。但是这个类还有方法都是 package-private 的,我们需要反射出来:
ClientResponseCircuitBreakerSubscriber
private static final Class<?> aClass;private static final Method request;static {try {aClass = Class.forName("org.springframework.web.reactive.function.client.DefaultClientResponse");request = ReflectionUtils.findMethod(aClass, "request");request.setAccessible(true);} catch (Exception e) {throw new RuntimeException(e);}}
之后,在获取到 ClientResponse 之后记录断路器的逻辑中,需要加入上面提到的关于重试的改造,以及负载均衡器的记录:
ClientResponseCircuitBreakerSubscriber
protected void hookOnNext(ClientResponse clientResponse) {if (!isDisposed()) {if (singleProducer && successSignaled.compareAndSet(false, true)) {int rawStatusCode = clientResponse.rawStatusCode();HttpStatus httpStatus = HttpStatus.resolve(rawStatusCode);try {HttpRequest httpRequest = (HttpRequest) request.invoke(clientResponse);//判断方法是否为 GET,以及是否在可重试路径配置中,从而得出是否可以重试if (httpRequest.getMethod() != HttpMethod.GET && !webClientProperties.retryablePathsMatch(httpRequest.getURI().getPath())) {//如果不能重试,则直接返回结果circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);} else {if (httpStatus != null && httpStatus.is2xxSuccessful()) {//如果成功,则直接返回结果circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);} else {/*** 如果异常,参考 DefaultClientResponse 的代码进行异常封装* @see org.springframework.web.reactive.function.client.DefaultClientResponse#createException*/Exception exception;if (httpStatus != null) {exception = WebClientResponseException.create(rawStatusCode, httpStatus.getReasonPhrase(), clientResponse.headers().asHttpHeaders(), EMPTY, null, null);} else {exception = new UnknownHttpStatusCodeException(rawStatusCode, clientResponse.headers().asHttpHeaders(), EMPTY, null, null);}circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), exception);downstreamSubscriber.onError(exception);return;}}} catch (Exception e) {log.fatal("judge request method in circuit breaker error! the resilience4j feature would not be enabled: {}", e.getMessage(), e);circuitBreaker.onResult(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), clientResponse);}}eventWasEmitted.set(true);downstreamSubscriber.onNext(clientResponse);}}
同样的,在原有的完成,取消还有失败的记录逻辑中,也加上记录负载均衡数据:
ClientResponseCircuitBreakerSubscriber
@Overrideprotected void hookOnComplete() {if (successSignaled.compareAndSet(false, true)) {serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());}downstreamSubscriber.onComplete();}@Overridepublic void hookOnCancel() {if (!successSignaled.get()) {serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, true);if (eventWasEmitted.get()) {circuitBreaker.onSuccess(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit());} else {circuitBreaker.releasePermission();}}}@Overrideprotected void hookOnError(Throwable e) {serviceInstanceMetrics.recordServiceInstanceCalled(serviceInstance, false);circuitBreaker.onError(circuitBreaker.getCurrentTimestamp() - start, circuitBreaker.getTimestampUnit(), e);downstreamSubscriber.onError(e);}
粘合 WebClient 与 resilience4j 的同时覆盖重试逻辑
由于前面的断路器中,我们针对可以重试的非 2XX 响应封装成为 WebClientResponseException。所以在重试器中,我们需要加上针对这个异常的重试。
同时,需要将重试器放在负载均衡器之前,因为每次重试,都要从负载均衡器中获取一个新的实例。同时,断路器需要放在负载均衡器之后,因为只有在这个之后,才能获取到本次调用的实例,我们的的断路器是针对实例方法级别的:
WebClientDefaultConfiguration.java
@Beanpublic WebClient getWebClient(ReactorLoadBalancerExchangeFilterFunction lbFunction,WebClientConfigurationProperties webClientConfigurationProperties,Environment environment,RetryRegistry retryRegistry,CircuitBreakerRegistry circuitBreakerRegistry,ServiceInstanceMetrics serviceInstanceMetrics) {String name = environment.getProperty(WebClientNamedContextFactory.PROPERTY_NAME);Map<String, WebClientConfigurationProperties.WebClientProperties> configs = webClientConfigurationProperties.getConfigs();if (configs == null || configs.size() == 0) {throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs");}WebClientConfigurationProperties.WebClientProperties webClientProperties = configs.get(name);if (webClientProperties == null) {throw new BeanCreationException("Failed to create webClient, please provide configurations under namespace: webclient.configs." + name);}String serviceName = webClientProperties.getServiceName();//如果没填写微服务名称,就使用配置 key 作为微服务名称if (StringUtils.isBlank(serviceName)) {serviceName = name;}String baseUrl = webClientProperties.getBaseUrl();//如果没填写 baseUrl,就使用微服务名称填充if (StringUtils.isBlank(baseUrl)) {baseUrl = "http://" + serviceName;}Retry retry = null;try {retry = retryRegistry.retry(serviceName, serviceName);} catch (ConfigurationNotFoundException e) {retry = retryRegistry.retry(serviceName);}//覆盖其中的异常判断retry = Retry.of(serviceName, RetryConfig.from(retry.getRetryConfig()).retryOnException(throwable -> {//WebClientResponseException 会重试,因为在这里能 catch 的 WebClientResponseException 只对可以重试的请求封装了 WebClientResponseException//参考 ClientResponseCircuitBreakerSubscriber 的代码if (throwable instanceof WebClientResponseException) {log.info("should retry on {}", throwable.toString());return true;}//断路器异常重试,因为请求没有发出去if (throwable instanceof CallNotPermittedException) {log.info("should retry on {}", throwable.toString());return true;}if (throwable instanceof WebClientRequestException) {WebClientRequestException webClientRequestException = (WebClientRequestException) throwable;HttpMethod method = webClientRequestException.getMethod();URI uri = webClientRequestException.getUri();//判断是否为响应超时,响应超时代表请求已经发出去了,对于非 GET 并且没有标注可以重试的请求则不能重试boolean isResponseTimeout = false;Throwable cause = throwable.getCause();//netty 的读取超时一般是 ReadTimeoutExceptionif (cause instanceof ReadTimeoutException) {log.info("Cause is a ReadTimeoutException which indicates it is a response time out");isResponseTimeout = true;} else {//对于其他一些框架,使用了 java 底层 nio 的一般是 SocketTimeoutException,message 为 read time out//还有一些其他异常,但是 message 都会有 read time out 字段,所以通过 message 判断String message = throwable.getMessage();if (StringUtils.isNotBlank(message) && StringUtils.containsIgnoreCase(message.replace(" ", ""), "readtimeout")) {log.info("Throwable message contains readtimeout which indicates it is a response time out");isResponseTimeout = true;}}//如果请求是 GET 或者标注了重试,则直接判断可以重试if (method == HttpMethod.GET || webClientProperties.retryablePathsMatch(uri.getPath())) {log.info("should retry on {}-{}, {}", method, uri, throwable.toString());return true;} else {//否则,只针对请求还没有发出去的异常进行重试if (isResponseTimeout) {log.info("should not retry on {}-{}, {}", method, uri, throwable.toString());} else {log.info("should retry on {}-{}, {}", method, uri, throwable.toString());return true;}}}return false;}).build());HttpClient httpClient = HttpClient.create().option(ChannelOption.CONNECT_TIMEOUT_MILLIS, (int) webClientProperties.getConnectTimeout().toMillis()).doOnConnected(connection ->connection.addHandlerLast(new ReadTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds())).addHandlerLast(new WriteTimeoutHandler((int) webClientProperties.getResponseTimeout().toSeconds())));Retry finalRetry = retry;String finalServiceName = serviceName;return WebClient.builder().exchangeStrategies(ExchangeStrategies.builder().codecs(configurer -> configurer.defaultCodecs()//最大 body 占用 16m 内存.maxInMemorySize(16 * 1024 * 1024)).build()).clientConnector(new ReactorClientHttpConnector(httpClient))//Retry在负载均衡前.filter((clientRequest, exchangeFunction) -> {return exchangeFunction.exchange(clientRequest).transform(ClientResponseRetryOperator.of(finalRetry));})//负载均衡器,改写url.filter(lbFunction)//实例级别的断路器需要在负载均衡获取真正地址之后.filter((clientRequest, exchangeFunction) -> {ServiceInstance serviceInstance = getServiceInstance(clientRequest);serviceInstanceMetrics.recordServiceInstanceCall(serviceInstance);CircuitBreaker circuitBreaker;//这时候的url是经过负载均衡器的,是实例的url//需要注意的一点是,使用异步 client 的时候,最好不要带路径参数,否则这里的断路器效果不好//断路器是每个实例每个路径一个断路器String instancId = clientRequest.url().getHost() + ":" + clientRequest.url().getPort() + clientRequest.url().getPath();try {//使用实例id新建或者获取现有的CircuitBreaker,使用serviceName获取配置circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId, finalServiceName);} catch (ConfigurationNotFoundException e) {circuitBreaker = circuitBreakerRegistry.circuitBreaker(instancId);}log.info("webclient circuit breaker [{}-{}] status: {}, data: {}", finalServiceName, instancId, circuitBreaker.getState(), JSON.toJSONString(circuitBreaker.getMetrics()));return exchangeFunction.exchange(clientRequest).transform(ClientResponseCircuitBreakerOperator.of(circuitBreaker, serviceInstance, serviceInstanceMetrics, webClientProperties));}).baseUrl(baseUrl).build();}private ServiceInstance getServiceInstance(ClientRequest clientRequest) {URI url = clientRequest.url();DefaultServiceInstance defaultServiceInstance = new DefaultServiceInstance();defaultServiceInstance.setHost(url.getHost());defaultServiceInstance.setPort(url.getPort());return defaultServiceInstance;}
这样,我们就实现了我们封装的基于配置的 WebClient
微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer: