AI智能
改变未来

SpringCloud升级之路2020.0.x版-44.避免链路信息丢失做的设计(2)

本系列代码地址:https://github.com/JoJoTec/spring-cloud-parent

我们在这一节我们将继续讲解避免链路信息丢失做的设计,主要针对获取到现有 Span 之后,如何保证每个 GlobalFilter 都能保持链路信息。首先,我们自定义 Reactor 的核心 Publisher 即 Mono 和 Flux 的工厂,将链路信息封装进去,保证由这个工厂生成的 Mono 和 Flux,都是只要是这个工厂生成的 Mono 和 Flux 之间无论怎么拼接都会保持链路信息的:

自定义 Mono 和 Flux 的工厂

公共 Subscriber 封装,将 reactor Subscriber 的所有关键接口,都检查当前上下文是否有链路信息,即 Span,如果没有就包裹上,如果有则直接执行即可。

public class TracedCoreSubscriber<T> implements Subscriber<T>{private final Subscriber<T> delegate;private final Tracer tracer;private final CurrentTraceContext currentTraceContext;private final Span span;TracedCoreSubscriber(Subscriber<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {this.delegate = delegate;this.tracer = tracer;this.currentTraceContext = currentTraceContext;this.span = span;}@Overridepublic void onSubscribe(Subscription s) {executeWithinScope(() -> {delegate.onSubscribe(s);});}@Overridepublic void onError(Throwable t) {executeWithinScope(() -> {delegate.onError(t);});}@Overridepublic void onComplete() {executeWithinScope(() -> {delegate.onComplete();});}@Overridepublic void onNext(T o) {executeWithinScope(() -> {delegate.onNext(o);});}private void executeWithinScope(Runnable runnable) {//如果当前没有链路信息,强制包裹if (tracer.currentSpan() == null) {try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(this.span.context())) {runnable.run();}} else {//如果当前已有链路信息,则直接执行runnable.run();}}}

之后分别定义所有 Flux 的代理 TracedFlux,和所有 Mono 的代理 TracedMono,其实就是在 subscribe 的时候,用 TracedCoreSubscriber 包装传入的 CoreSubscriber:

public class TracedFlux<T> extends Flux<T> {private final Flux<T> delegate;private final Tracer tracer;private final CurrentTraceContext currentTraceContext;private final Span span;TracedFlux(Flux<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {this.delegate = delegate;this.tracer = tracer;this.currentTraceContext = currentTraceContext;this.span = span;}@Overridepublic void subscribe(CoreSubscriber<? super T> actual) {delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));}}public class TracedMono<T> extends Mono<T> {private final Mono<T> delegate;private final Tracer tracer;private final CurrentTraceContext currentTraceContext;private final Span span;TracedMono(Mono<T> delegate, Tracer tracer, CurrentTraceContext currentTraceContext, Span span) {this.delegate = delegate;this.tracer = tracer;this.currentTraceContext = currentTraceContext;this.span = span;}@Overridepublic void subscribe(CoreSubscriber<? super T> actual) {delegate.subscribe(new TracedCoreSubscriber(actual, tracer, currentTraceContext, span));}}

定义工厂类,使用请求 ServerWebExchange 和原始 Flux 创建 TracedFlux,以及使用请求 ServerWebExchange 和原始 Mono 创建 TracedMono,并且 Span 是通过 Attributes 获取的,根据前文的源码分析我们知道,这个 Attribute 是通过 TraceWebFilter 放入 Attributes 的。由于我们只在 GatewayFilter 中使用,一定在 TraceWebFilter 之后 所以这个 Attribute 一定存在。

@Componentpublic class TracedPublisherFactory {protected static final String TRACE_REQUEST_ATTR = Span.class.getName();@Autowiredprivate Tracer tracer;@Autowiredprivate CurrentTraceContext currentTraceContext;public <T> Flux<T> getTracedFlux(Flux<T> publisher, ServerWebExchange exchange) {return new TracedFlux<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));}public <T> Mono<T> getTracedMono(Mono<T> publisher, ServerWebExchange exchange) {return new TracedMono<>(publisher, tracer, currentTraceContext, (Span) exchange.getAttributes().get(TRACE_REQUEST_ATTR));}}

公共抽象 GlobalFilter – CommonTraceFilter

我们编写所有我们后面要实现的 GlobalFilter 的抽象类,这个抽象类的主要功能是:

  • 保证继承这个抽象类的 GlobalFilter 本身以及拼接的链路中,是有链路信息的,其实就是保证它 filter 返回的 Mono 是由我们上面实现的 Factory 生成的即可。
  • 不同 GlobalFilter 之间需要排序,有顺序的执行,这个通过实现 Ordered 接口即可
package com.github.jojotech.spring.cloud.apigateway.filter;import com.github.jojotech.spring.cloud.apigateway.common.TraceWebFilterUtil;import com.github.jojotech.spring.cloud.apigateway.common.TracedPublisherFactory;import reactor.core.publisher.Mono;import org.springframework.beans.factory.annotation.Autowired;import org.springframework.cloud.gateway.filter.GatewayFilterChain;import org.springframework.cloud.gateway.filter.GlobalFilter;import org.springframework.cloud.sleuth.CurrentTraceContext;import org.springframework.cloud.sleuth.Span;import org.springframework.cloud.sleuth.Tracer;import org.springframework.core.Ordered;import org.springframework.web.server.ServerWebExchange;/*** 所有 filter 的子类* 主要保证 span 的完整性,在某些情况下,span 会半途停止,导致日志中没有 traceId 和 spanId* 参考:https://github.com/spring-cloud/spring-cloud-sleuth/issues/2004*/public abstract class AbstractTracedFilter implements GlobalFilter, Ordered {@Autowiredprotected Tracer tracer;@Autowiredprotected TracedPublisherFactory tracedPublisherFactory;@Autowiredprotected CurrentTraceContext currentTraceContext;@Overridepublic Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {Mono<Void> traced;if (tracer.currentSpan() == null) {try (CurrentTraceContext.Scope scope = this.currentTraceContext.maybeScope(((Span) exchange.getAttributes().get(TraceWebFilterUtil.TRACE_REQUEST_ATTR)).context())) {traced = traced(exchange, chain);}}else {//如果当前已有链路信息,则直接执行traced = traced(exchange, chain);}return tracedPublisherFactory.getTracedMono(traced, exchange);}protected abstract Mono<Void> traced(ServerWebExchange exchange, GatewayFilterChain chain);}

这样,我们就可以基于这个抽象类去实现需要定制的 GlobalFilter 了

微信搜索“我的编程喵”关注公众号,每日一刷,轻松提升技术,斩获各种offer

赞(0) 打赏
未经允许不得转载:爱站程序员基地 » SpringCloud升级之路2020.0.x版-44.避免链路信息丢失做的设计(2)