Spring Cloud Gateway源码剖析
通过前面的学习,我们知道SpringCloud Gateway是一个微服务网关,主要实现不同功能服务路由,关于SpringCloud Gateway的实战使用我们就告一段落,我们接下来深入学习SpringCloud Gateway源码。
2.1 Gateway工作流程源码剖析
2.1.1 Gateway工作流程分析
前面我们已经学习过Gateway的工作流程,如上工作流程图,我们回顾一下工作流程:
1:所有都将由ReactorHttpHandlerAdapter.apply()方法拦截处理,此时会封装请求对象和响应对象,并传递到HttpWebHandlerAdapter.handle()方法。
2:HttpWebHandlerAdapter.handle(),将request和response封装成上下文对象ServerWebExchange,方法通过getDelegate()获取全局异常处理器ExceptionHandlingWebHandler执行全局异常处理
3:ExceptionHandlingWebHandler执行完成后,调用DispatcherHandler.handle(),循环所有handlerMappings查找处理当前请求的Handler
4:找到Handler后调用DispatcherHandler.invokeHandler()执行找到的Handler,此时会调用FilteringWebHandler.handle()
5:DefaultGatewayFilterChain.filter()是关键流程,所有过滤器都会在这里执行,比如服务查找、负载均衡、远程调用等,都在这一块。
上面工作流程我们都是基于说的层面,接下来我们一层一层分析Gateway源码,深入学习Gateway。
2.1.2 Gateway工作流程源码
我们首先来看一下Gateway拦截处理所有请求的方法handle():
/****
*处理所有请求
****/
@Override
public Mono handle(ServerHttpRequest request, ServerHttpResponse response) {
if (this.forwardedHeaderTransformer != null) {
request = this.forwardedHeaderTransformer.apply(request);
}
//创建网关上下文对象
ServerWebExchange exchange = createExchange(request, response);
LogFormatUtils.traceDebug(logger, traceOn ->
exchange.getLogPrefix() + formatRequest(exchange.getRequest()) +
(traceOn ? ", headers=" + formatHeaders(exchange.getRequest().getHeaders()) : ""));
//getDelegate()获取当前的Handler
return getDelegate().handle(exchange)
.doOnSuccess(aVoid -> logResponse(exchange))
.onErrorResume(ex -> handleUnresolvedError(exchange, ex))
.then(Mono.defer(response::setComplete));
}
上面getDelegate()方法源码如下:
/**
* Return the wrapped delegate.
* 返回WebHandler:处理web请求的对象
*/
public WebHandler getDelegate() {
return this.delegate;
}
当前返回的WebHandler是ExceptionHandlingWebHandler
,而ExceptionHandlingWebHandler
的delegate是FilteringWebHandler
,而FilteringWebHandler
的delegate是delegate
是DispatcherHandler
,所有的delegate的handle()
方法都会依次执行,我们可以把断点放到DispatcherHandler.handler()
方法上:
handler()方法会调用所有handlerMappings的getHandler(exchange)
方法,而getHandler(exchange)
方法会调用getHandlerInternal(exchange)
方法:
getHandlerInternal(exchange)
该方法由各个HandlerMapping
自行实现,我们可以观察下断言处理的RoutePredicateHandlerMapping
的getHandlerInternal(exchange)
方法会调用lookupRoute方法,该方法用于返回对应的路由信息:
这里的路由匹配其实就是我们项目中对应路由配置的一个一个服务的信息,这些服务信息可以帮我们找到我们要调用的真实服务:
每个Route对象如下:
Route的DEBUG数据如下:
找到对应Route后会返回指定的FilterWebHandler,如下代码:
FilterWebHandler主要包含了所有的过滤器,过滤器按照一定顺序排序,主要是order值,越小越靠前排,过滤器中主要将请求交给指定真实服务处理了,debug测试如下:
这里有RouteToRequestUrlFilter
和ForwardRoutingFilter
以及LoadBalancerClientFilter
等多个过滤器。
2.1.3 请求处理
在上面FilterWebHandler中有2个过滤器,分别为RouteToRequestUrlFilter
和ForwardRoutingFilter
。
RouteToRequestUrlFilter
:用于根据匹配的 Route,计算请求地址得到 lb://hailtaxi-order/order/list
ForwardRoutingFilter
:转发路由网关过滤器。其根据 forward:// 前缀( Scheme )过滤处理,将请求转发到当前网关实例本地接口。
2.1.3.1 RouteToRequestUrlFilter真实服务查找
RouteToRequestUrlFilter源码如下:
/***
* 处理uri过滤器
* @param exchange
* @param chain
* @return
*/
@Override
public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//获取当前的route
Route route = exchange.getAttribute(GATEWAY_ROUTE_ATTR);
if (route == null) {
return chain.filter(exchange);
}
log.trace("RouteToRequestUrlFilter start");
//得到uri = http://localhost:8001/order/list?token=123
URI uri = exchange.getRequest().getURI();
boolean encoded = containsEncodedParts(uri);
URI routeUri = route.getUri();
if (hasAnotherScheme(routeUri)) {
// this is a special url, save scheme to special attribute
// replace routeUri with schemeSpecificPart
exchange.getAttributes().put(GATEWAY_SCHEME_PREFIX_ATTR,
routeUri.getScheme());
routeUri = URI.create(routeUri.getSchemeSpecificPart());
}
if ("lb".equalsIgnoreCase(routeUri.getScheme()) && routeUri.getHost() == null) {
// Load balanced URIs should always have a host. If the host is null it is
// most
// likely because the host name was invalid (for example included an
// underscore)
throw new IllegalStateException("Invalid host: " + routeUri.toString());
}
//将uri换成 lb://hailtaxi-order/order/list?token=123
URI mergedUrl = UriComponentsBuilder.fromUri(uri)
// .uri(routeUri)
.scheme(routeUri.getScheme()).host(routeUri.getHost())
.port(routeUri.getPort()).build(encoded).toUri();
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, mergedUrl);
return chain.filter(exchange);
}
debug调试结果如下:
从上面调试结果我们可以看到所选择的Route以及uri和routeUri和mergedUrl,该过滤器其实就是将用户请求的地址换成服务地址,换成服务地址可以用来做负载均衡。
2.1.3.2 NettyRoutingFilter远程调用
SpringCloud在实现对后端服务远程调用是基于Netty发送Http请求实现,核心代码在NettyRoutingFilter.filter()
中,其中核心代码为send()方法,代码如下:
Flux responseFlux = httpClientWithTimeoutFrom(route)
// 头信息处理
.headers(headers -> {
headers.add(httpHeaders);
// Will either be set below, or later by Netty
headers.remove(HttpHeaders.HOST);
if (preserveHost) {
String host = request.getHeaders().getFirst(HttpHeaders.HOST);
headers.add(HttpHeaders.HOST, host);
}
// 执行发送,基于HTTP协议
}).request(method).uri(url).send((req, nettyOutbound) -> {
if (log.isTraceEnabled()) {
nettyOutbound
.withConnection(connection -> log.trace("outbound route: "
+ connection.channel().id().asShortText()
+ ", inbound: " + exchange.getLogPrefix()));
}
return nettyOutbound.send(request.getBody()
.map(dataBuffer -> ((NettyDataBuffer) dataBuffer)
.getNativeBuffer()));
}).
// 响应结果
responseConnection((res, connection) -> {
// Defer committing the response until all route filters have run
// Put client response as ServerWebExchange attribute and write
// response later NettyWriteResponseFilter
exchange.getAttributes().put(CLIENT_RESPONSE_ATTR, res);
exchange.getAttributes().put(CLIENT_RESPONSE_CONN_ATTR, connection);
// 获取响应结果
ServerHttpResponse response = exchange.getResponse();
// put headers and status so filters can modify the response
HttpHeaders headers = new HttpHeaders();
res.responseHeaders().forEach(
entry -> headers.add(entry.getKey(), entry.getValue()));
String contentTypeValue = headers.getFirst(HttpHeaders.CONTENT_TYPE);
if (StringUtils.hasLength(contentTypeValue)) {
exchange.getAttributes().put(ORIGINAL_RESPONSE_CONTENT_TYPE_ATTR,
contentTypeValue);
}
setResponseStatus(res, response);
// make sure headers filters run after setting status so it is
// available in response
HttpHeaders filteredResponseHeaders = HttpHeadersFilter.filter(
getHeadersFilters(), headers, exchange, Type.RESPONSE);
if (!filteredResponseHeaders
.containsKey(HttpHeaders.TRANSFER_ENCODING)
&& filteredResponseHeaders
.containsKey(HttpHeaders.CONTENT_LENGTH)) {
// It is not valid to have both the transfer-encoding header and
// the content-length header.
// Remove the transfer-encoding header in the response if the
// content-length header is present.
response.getHeaders().remove(HttpHeaders.TRANSFER_ENCODING);
}
exchange.getAttributes().put(CLIENT_RESPONSE_HEADER_NAMES,
filteredResponseHeaders.keySet());
response.getHeaders().putAll(filteredResponseHeaders);
return Mono.just(res);
});
Duration responseTimeout = getResponseTimeout(route);
上面send方法最终会调用ChannelOperations>send()
方法,而该方法其实是基于了Netty实现数据发送,核心代码如下:
2.1.3.3 Netty特性
Netty是一款基于NIO(Nonblocking I/O,非阻塞IO)开发的网络通信框架,他的并发性能得到了很大提高,对比于BIO(Blocking I/O,阻塞IO),隐藏其背后的复杂性而提供一个易于使用的 API 的客户端/服务器框架。Netty 是一个广泛使用的 Java 网络编程框架。
传输极快
Netty的传输快其实也是依赖了NIO的一个特性——零拷贝。我们知道,Java的内存有堆内存、栈内存和字符串常量池等等,其中堆内存是占用内存空间最大的一块,也是Java对象存放的地方,一般我们的数据如果需要从IO读取到堆内存,中间需要经过Socket缓冲区,也就是说一个数据会被拷贝两次才能到达他的的终点,如果数据量大,就会造成不必要的资源浪费。
Netty针对这种情况,使用了NIO中的另一大特性——零拷贝,当他需要接收数据的时候,他会在堆内存之外开辟一块内存,数据就直接从IO读到了那块内存中去,在netty里面通过ByteBuf可以直接对这些数据进行直接操作,从而加快了传输速度。
良好的封装
Netty无论是性能还是封装性都远远超越传统Socket编程。
Channel:表示一个连接,可以理解为每一个请求,就是一个Channel。
ChannelHandler:核心处理业务就在这里,用于处理业务请求。
ChannelHandlerContext:用于传输业务数据。
ChannelPipeline:用于保存处理过程需要用到的ChannelHandler和ChannelHandlerContext。
ByteBuf是一个存储字节的容器,最大特点就是使用方便,它既有自己的读索引和写索引,方便你对整段字节缓存进行读写,也支持get/set,方便你对其中每一个字节进行读写,他的数据结构如下图所示:
2.2 Gateway负载均衡源码剖析
前面源码剖析主要剖析了Gateway的工作流程,我们接下来剖析Gateway的负载均衡流程。在最后的过滤器集合中有LoadBalancerClientFilter
过滤器,该过滤器是用于实现负载均衡。
2.2.1 地址转换
LoadBalancerClientFilter
过滤器首先会将用户请求地址转换成真实服务地址,也就是IP:端口号,源码如下:
/***
* 负载均衡过滤
* @param exchange
* @param chain
* @return
*/
public Mono filter(ServerWebExchange exchange, GatewayFilterChain chain) {
//负载均衡的URL = lb://hailtaxi-order/order/list?token=123
URI url = exchange.getAttribute(GATEWAY_REQUEST_URL_ATTR);
String schemePrefix = exchange.getAttribute(GATEWAY_SCHEME_PREFIX_ATTR);
if (url == null
|| (!"lb".equals(url.getScheme()) && !"lb".equals(schemePrefix))) {
return chain.filter(exchange);
}
// preserve the original url
addOriginalRequestUrl(exchange, url);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url before: " + url);
}
//服务选择
final ServiceInstance instance = choose(exchange);
if (instance == null) {
throw NotFoundException.create(properties.isUse404(),
"Unable to find instance for " + url.getHost());
}
//用户提交的URI = http://localhost:8001/order/list?token=123
URI uri = exchange.getRequest().getURI();
// if the `lb:` mechanism was used, use `` as the default,
// if the loadbalancer doesn't provide one.
String overrideScheme = instance.isSecure() ? "https" : "http";
if (schemePrefix != null) {
overrideScheme = url.getScheme();
}
//真实服务的URL =http://192.168.211.1:18182/order/list?token=123
URI requestUrl = loadBalancer.reconstructURI(
new DelegatingServiceInstance(instance, overrideScheme), uri);
if (log.isTraceEnabled()) {
log.trace("LoadBalancerClientFilter url chosen: " + requestUrl);
}
exchange.getAttributes().put(GATEWAY_REQUEST_URL_ATTR, requestUrl);
return chain.filter(exchange);
}
2.2.2 负载均衡服务选择
上面代码的关键是choose(exchange)
的调用,该方法调用其实就是选择指定服务,这里涉及到负载均衡服务轮询调用算法等,我们可以跟踪进去查看方法执行流程。
Gateway自身已经集成Ribbon,所以看到的对象是RibbonLoadBalancerClient,我们跟踪进去接着查看:
上面方法会依次调用到getInstance()方法,该方法会返回所有可用实例,有可能有多个实例,如果有多个实例就涉及到负载均衡算法,方法调用如下图:
此时调用getServer()方法,再调用BaseLoadBalancer.chooseServer()
,这里是根据指定算法获取对应实例,代码如下:
BaseLoadBalancer
是属于Ribbon的算法,我们可以通过如下依赖包了解,并且该算法默认用的是RoundRobinRule
,也就是随机算法,如下代码:
本文由传智教育博学谷 - 狂野架构师教研团队发布,转载请注明出处!
如果本文对您有帮助,欢迎关注和点赞;如果您有任何建议也可留言评论或私信,您的支持是我坚持创作的动力