Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.6) 之 WebSocketRoutingFilter

扫码关注公众号:Java 技术驿站

发送:vip
将链接复制到本浏览器,永久解锁本站全部文章

【公众号:Java 技术驿站】 【加作者微信交流技术,拉技术群】
免费领取10G资料包与项目实战视频资料

摘要: 原创出处 http://www.iocoder.cn/Spring-Cloud-Gateway/filter-websocket-routing/ 「芋道源码」欢迎转载,保留摘要,谢谢!

本文主要基于 Spring-Cloud-Gateway 2.0.X M4


1. 概述

本文主要分享 WebsocketRoutingFilter 的代码实现

WebsocketRoutingFilter ,Websocket 路由网关过滤器。其根据 ws:// / wss:// 前缀( Scheme )过滤处理,代理后端 Websocket 服务,提供给客户端连接。如下图 :

  • 目前一个 RouteDefinition 只能指定一个后端 WebSocket 服务。官方正在计划在 LoadBalancerClientFilter 上实现 Websocket 的负载均衡功能。也就说,未来一个 RouteDefinition 能够指定多个后端 WebSocket 服务。

Websocket 的 RouteDefinition 配置如下 :

cloud:
    gateway:
      routes:
      - id: websocket_test
        uri: ws://localhost:9000
        order: 8000
        predicates:
        - Path=/echo
  • uri 使用 ws:// 或者 wss:// 为前缀。

推荐 Spring Cloud 书籍

推荐 Spring Cloud 视频

2. 环境搭建

在解析源码之前,我们先以 wscat 搭建一个 WebSocket 服务。

第一步,安装 wscat 。

npm install -g wscat

第二步,启动 wscat 。

wscat --listen 9000

第三步,连接 wscat 。

 wscat --listen 9000

第四步,配置 RouteDefinition ,并启动 Spring Cloud Gateway 。

cloud:
    gateway:
      routes:
      - id: websocket_test
        uri: ws://localhost:9000
        order: 8000
        predicates:
        - Path=/echo

第五步,通过 Gateway 连接 wscat 。

wscat --connect ws://localhost:8080/echo

大功告成。

注意,wscat 同一时间仅允许一个客户端连接。

3. WebsocketRoutingFilter

org.springframework.cloud.gateway.filter.WebsocketRoutingFilter ,Websocket 路由网关过滤器。

构造方法,代码如下 :

public class WebsocketRoutingFilter implements GlobalFilter, Ordered {
    public static final String SEC_WEBSOCKET_PROTOCOL = "Sec-WebSocket-Protocol";

    private final WebSocketClient webSocketClient;
    private final WebSocketService webSocketService;

    public WebsocketRoutingFilter(WebSocketClient webSocketClient) {
        this(webSocketClient, new HandshakeWebSocketService());
    }

    public WebsocketRoutingFilter(WebSocketClient webSocketClient,
            WebSocketService webSocketService) {
        this.webSocketClient = webSocketClient;
        this.webSocketService = webSocketService;
    }

}

#getOrder() 方法,代码如下 :

@Override
public int getOrder() {
    return Ordered.LOWEST_PRECEDENCE;
}

#filter(ServerWebExchange, GatewayFilterChain) 方法,代码如下 :

  1: @Override
  2: public Mono<Void> filter(ServerWebExchange exchange, GatewayFilterChain chain) {
  3:    // 获得 requestUrl
  4:    URI requestUrl = exchange.getRequiredAttribute(GATEWAY_REQUEST_URL_ATTR);
  5:
  6:    // 判断是否能够处理
  7:    String scheme = requestUrl.getScheme();
  8:    if (isAlreadyRouted(exchange) || (!scheme.equals("ws") && !scheme.equals("wss"))) {
  9:        return chain.filter(exchange);
 10:    }
 11:
 12:    // 设置已经路由
 13:    setAlreadyRouted(exchange);
 14:
 15:    // 处理连接请求
 16:    return this.webSocketService.handleRequest(exchange,
 17:            new ProxyWebSocketHandler(requestUrl, this.webSocketClient, exchange.getRequest().getHeaders()));
 18: }
  • 第 4 行 :获得 requestUrl
  • 第 7 至 10 行 :判断 ForwardRoutingFilter 是否能够处理该请求,需要满足两个条件 :

    • ws:// 或者 wss:// 前缀( Scheme ) 。
    • 调用 ServerWebExchangeUtils#isAlreadyRouted(ServerWebExchange) 方法,判断该请求暂未被其他 Routing 网关处理。代码如下 :

      public static boolean isAlreadyRouted(ServerWebExchange exchange) {
          return exchange.getAttributeOrDefault(GATEWAY_ALREADY_ROUTED_ATTR, false);
      }
      
      • x
  • 第 13 行 :设置该请求已经被处理。代码如下 :

    public static void setAlreadyRouted(ServerWebExchange exchange) {
        exchange.getAttributes().put(GATEWAY_ALREADY_ROUTED_ATTR, true);
    }
    
  • 第 15 至 16 行 :调用 WebSocketService#hanldeRequest(ServerWebExchange, WebSocketHandler) 方法,处理客户端发起的连接请求( Handshake Request ) 。这个方法的实现不在本文范围内,但是良心如笔者,大概讲下涉及到的类 :

3.1 ProxyWebSocketHandler

org.springframework.cloud.gateway.filter.WebsocketRoutingFilter.ProxyWebSocketHandler代理后端 WebSocket 服务处理器。

构造方法,代码如下 :

  1: private static class ProxyWebSocketHandler implements WebSocketHandler {
  2:
  3:    private final WebSocketClient client;
  4:    private final URI url;
  5:    private final HttpHeaders headers;
  6:    private final List<String> subProtocols;
  7:
  8:    public ProxyWebSocketHandler(URI url, WebSocketClient client, HttpHeaders headers) {
  9:        this.client = client;
 10:        this.url = url;
 11:        this.headers = new HttpHeaders();//headers;
 12:        //TODO: better strategy to filter these headers?
 13:        headers.entrySet().forEach(header -> {
 14:            if (!header.getKey().toLowerCase().startsWith("sec-websocket")
 15:                    && !header.getKey().equalsIgnoreCase("upgrade")
 16:                    && !header.getKey().equalsIgnoreCase("connection")) {
 17:                this.headers.addAll(header.getKey(), header.getValue());
 18:            }
 19:        });
 20:        List<String> protocols = headers.get(SEC_WEBSOCKET_PROTOCOL);
 21:        if (protocols != null) {
 22:            this.subProtocols = protocols;
 23:        } else {
 24:            this.subProtocols = Collections.emptyList();
 25:        }
 26:    }
 27: }

#handle(WebSocketSession) 方法,代码如下 :

  1: @Override
  2: public Mono<Void> handle(WebSocketSession session) {
  3:    // pass headers along so custom headers can be sent through
  4:    return client.execute(url, this.headers, new WebSocketHandler() {
  5:        @Override
  6:        public Mono<Void> handle(WebSocketSession proxySession) {
  7:            // Use retain() for Reactor Netty
  8:            // 转发消息 客户端 =》后端服务
  9:            Mono<Void> proxySessionSend = proxySession
 10:                    .send(session.receive().doOnNext(WebSocketMessage::retain));
 11:            // 转发消息 后端服务=》客户端
 12:            // .log("proxySessionSend", Level.FINE);
 13:            Mono<Void> serverSessionSend = session
 14:                    .send(proxySession.receive().doOnNext(WebSocketMessage::retain));
 15:                    // .log("sessionSend", Level.FINE);
 16:
 17:            //
 18:            return Mono.when(proxySessionSend, serverSessionSend).then();
 19:        }
 20:
 21:        /**
 22:         * Copy subProtocols so they are available downstream.
 23:         * @return
 24:         */
 25:        @Override
 26:        public List<String> getSubProtocols() {
 27:            return ProxyWebSocketHandler.this.subProtocols;
 28:        }
 29:    });
 30: }
  • 第 6 行 :调用 WebSocketClient#execute(URI, HttpHeaders, WebSocketHandler) 方法,连接后端【被代理】的 WebSocket 服务。连接成功后,回调 WebSocketHandler 实现的内部类的 #handle(WebSocketSession) 方法。
  • WebSocketHandler 实现的内部类
    • 第 9 至 10 行 :转发消息,客户端 => 后端服务。
    • 第 13 至 14 行 :转发消息,后端服务 => 客户端。
    • 第 18 行 :调用 Mono#when() 方法,合并 proxySessionSend / serverSessionSend 两个 Mono 。调用 Mono#then() 方法,参数为空,合并的 Mono 不发射数据出来。RxJava 和 Reactor 类似,可以参考 《ReactiveX文档中文翻译 —— And/Then/When》 学习下 when / and / then 操作符。
    • 下图可以帮助理解下这个类的用途 :
赞(0) 打赏
版权归原创作者所有,任何形式的转载请联系博主:daming_90:Java 技术驿站 » Spring-Cloud-Gateway 源码解析 —— 过滤器 (4.6) 之 WebSocketRoutingFilter

评论 抢沙发

  • 昵称 (必填)
  • 邮箱 (必填)
  • 网址

觉得文章有用就打赏一下文章作者

支付宝扫一扫打赏

微信扫一扫打赏