通过 Tomcat servlet 代理常规 HTTP 和 WebSocket
Posted
技术标签:
【中文标题】通过 Tomcat servlet 代理常规 HTTP 和 WebSocket【英文标题】:Proxy regular HTTP and WebSocket via Tomcat servlet 【发布时间】:2021-10-07 13:29:06 【问题描述】:我正在实现一个 Web 应用程序,除其他外,它必须显示代理到后端服务的网页并与之交互。为此,我使用了HTTP-Proxy-Servlet,它在大多数情况下都运行良好。
但是,某些后端服务的网页使用 websockets 和 doesn't support websockets 以上的代理 servlet。
我尝试通过向后端重构 websocket 调用然后在流之间复制来实现它,但这不起作用。浏览器报告“Invalid frame header”,Tomcat 失败
Error parsing HTTP request header
Invalid character found in method name. HTTP method names must be tokens
at org.apache.coyote.http11.Http11InputBuffer.parseRequestLine(Http11InputBuffer.java:414)
我的代码:
import java.io.IOException;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import javax.servlet.ServletException;
import javax.servlet.http.*;
import org.apache.http.HttpRequest;
import org.mitre.dsmiley.httpproxy.ProxyServlet;
public class ProxyWithWebSocket extends ProxyServlet
private static final long serialVersionUID = -2566573965489129976L;
protected ExecutorService exec;
@Override
public void init() throws ServletException
super.init();
exec = Executors.newCachedThreadPool();
@Override
public void destroy()
super.destroy();
exec.shutdown();
@Override
protected void service(HttpServletRequest servletRequest, HttpServletResponse servletResponse)
throws ServletException, IOException
var wsKey = servletRequest.getHeader("Sec-WebSocket-Key");
if (wsKey != null)
//initialize request attributes from caches if unset by a subclass by this point
if (servletRequest.getAttribute(ATTR_TARGET_URI) == null)
servletRequest.setAttribute(ATTR_TARGET_URI, targetUri);
if (servletRequest.getAttribute(ATTR_TARGET_HOST) == null)
servletRequest.setAttribute(ATTR_TARGET_HOST, targetHost);
String proxyRequestUri = rewriteUrlFromRequest(servletRequest);
URL u = new URL(proxyRequestUri);
var servletIn = servletRequest.getInputStream();
var servletOut = servletResponse.getOutputStream();
try (Socket sock = new Socket(u.getHost(), u.getPort()))
var sockIn = sock.getInputStream();
var sockOut = sock.getOutputStream();
StringBuilder req = new StringBuilder(512);
req.append("GET " + u.getFile()).append(" HTTP/1.1");
System.out.println(" > WS|" + req);
req.append("\r\n");
var en = servletRequest.getHeaderNames();
while (en.hasMoreElements())
var n = en.nextElement();
String header = servletRequest.getHeader(n);
System.out.println(" > WS| " + n + ": " + header);
req.append(n + ": " + header + "\r\n");
req.append("\r\n");
sockOut.write(req.toString().getBytes(StandardCharsets.UTF_8));
sockOut.flush();
StringBuilder responseBytes = new StringBuilder(512);
int b = 0;
while (b != -1)
b = sockIn.read();
if (b != -1)
responseBytes.append((char)b);
var len = responseBytes.length();
if (len >= 4
&& responseBytes.charAt(len - 4) == '\r'
&& responseBytes.charAt(len - 3) == '\n'
&& responseBytes.charAt(len - 2) == '\r'
&& responseBytes.charAt(len - 1) == '\n'
)
break;
String[] rows = responseBytes.toString().split("\r\n");
String response = rows[0];
System.out.println(" < WS|" + response);
int idx1 = response.indexOf(' ');
int idx2 = response.indexOf(' ', idx1 + 1);
for (int i = 1; i < rows.length; i++)
String line = rows[i];
int idx3 = line.indexOf(":");
var k = line.substring(0, idx3);
var headerField = line.substring(idx3 + 2);
System.out.println(" < WS| " + k + ": " + headerField);
servletResponse.setHeader(k, headerField);
servletResponse.setStatus(Integer.parseInt(response.substring(idx1 + 1, idx2)));
servletResponse.flushBuffer();
System.out.println(" < WS| Flush");
var f1 = exec.submit(() ->
var c = 0;
var bs = 0;
while ((bs = servletIn.read()) != -1)
sockOut.write(bs);
c++;
System.out.println(" > WS| Done: " + c);
return null;
);
var f2 = exec.submit(() ->
var c = 0;
var bs = 0;
while ((bs = sockIn.read()) != -1)
servletOut.write(bs);
servletOut.flush();
c++;
System.out.println(" < WS| Done: " + c);
return null;
);
try
f1.get();
catch (Exception ex)
f2.cancel(true);
return;
try
f2.get();
catch (Exception ex)
else
super.service(servletRequest, servletResponse);
典型的交换看起来像这样(通过那些 println):
> WS|GET /cellhub?id=NhWO8SnGyDb_Vrk23rmhVQ HTTP/1.1
> WS| host: localhost:8080
> WS| connection: Upgrade
> WS| pragma: no-cache
> WS| cache-control: no-cache
> WS| user-agent: Mozilla/5.0 (Windows NT 10.0; Win64; x64) AppleWebKit/537.36 (Khtml, like Gecko) Chrome/94.0.4606.71 Safari/537.36
> WS| upgrade: websocket
> WS| origin: http://localhost:8080
> WS| sec-websocket-version: 13
> WS| accept-encoding: gzip, deflate, br
> WS| accept-language: hu,hu-HU;q=0.9,en-US;q=0.8,en;q=0.7
> WS| cookie: JSESSIONID=57E4B30452BC3EB2657139DAF70E65AD; JSESSIONID=AD5E7BB5FE17B4072F3ABEE32B9479AC
> WS| sec-websocket-key: nrZWEb6Co4DKggUNwPeV8g==
> WS| sec-websocket-extensions: permessage-deflate; client_max_window_bits
< WS|HTTP/1.1 101 Switching Protocols
< WS| Connection: Upgrade
< WS| Date: Thu, 07 Oct 2021 13:18:41 GMT
< WS| Server: Kestrel
< WS| Upgrade: websocket
< WS| Sec-WebSocket-Accept: /9uN8ZF67WepGJQ3+DPBLMCBotc=
< WS| Flush
> WS| Done: 0
< WS| Done: 42
我怎样才能做到这一点?
编辑
我发现HttpServletRequest.upgrade
方法似乎用于更改协议。我在标题复制后更新了部分:
int respCode = Integer.parseInt(response.substring(idx1 + 1, idx2));
if (respCode != 101)
servletResponse.setStatus(respCode);
servletResponse.flushBuffer();
System.out.println(" < WS| Flush");
closeSocket = true;
else
var uh = servletRequest.upgrade(WsUpgradeHandler.class);
uh.preInit(exec, sockIn, sockOut, sock);
WsUpgradeHandler
在哪里
public static class WsUpgradeHandler implements HttpUpgradeHandler
ExecutorService exec;
InputStream sockIn;
OutputStream sockOut;
Socket sock;
Future<?> f1;
Future<?> f2;
public WsUpgradeHandler()
public void preInit(ExecutorService exec, InputStream sockIn, OutputStream sockOut, Socket sock)
this.exec = exec;
this.sockIn = sockIn;
this.sockOut = sockOut;
this.sock = sock;
@Override
public void init(WebConnection wc)
System.out.println(" * WS| Upgrade begin");
try
var servletIn = wc.getInputStream();
var servletOut = wc.getOutputStream();
f1 = exec.submit(() ->
System.out.println(" > WS| Client -> Backend");
var c = 0;
var bs = 0;
try
while ((bs = servletIn.read()) != -1)
sockOut.write(bs);
c++;
catch (Exception exc)
exc.printStackTrace();
finally
sockOut.close();
System.out.println(" > WS| Done: " + c);
return null;
);
f2 = exec.submit(() ->
System.out.println(" > WS| Backend -> Client");
var c = 0;
try
var bs = 0;
while ((bs = sockIn.read()) != -1)
servletOut.write(bs);
servletOut.flush();
c++;
catch (Exception exc)
exc.printStackTrace();
finally
servletOut.close();
System.out.println(" < WS| Done: " + c);
return null;
);
catch (IOException ex)
ex.printStackTrace();
@Override
public void destroy()
System.out.println(" * WS| Upgrade closing");
f1.cancel(true);
f2.cancel(true);
try
sock.close();
catch (IOException ex)
System.out.println(" * WS| Upgrade close");
这确实适用于传递消息,但如果来自浏览器的 websocket 连接结束,此时 Tomcat 的 CPU 利用率会非常高(不应发生其他活动)。似乎 Tomcat 的部分或全部 NIO 线程都在旋转,而我正在使用的线程池不再有线程。
【问题讨论】:
如果您在UpgradeHandler
中使用 Servlet 3.1 non-blocking IO 并使用 WebSocket API 连接到远程服务器(参见 this question),您可能会获得更好的结果。
我怀疑这些会起作用。这是一个代理,不知道它必须服务哪个 websocket 端点,因此无法预先命名处理程序。此外,我不认为 ws 处理程序可以使用与常规 HTTP 流量相同的 servlet 映射,因此上面的 switcheroo。
【参考方案1】:
我想我设法解决了这个问题。
上面的代码几乎是正确的,但有一个例外:显然init()
方法在使用阻塞模式时不应返回,如this Tomcat 测试示例所示。
第二个问题,即高 CPU 使用率被追踪到之前有 bugs 的 tomcat 中的轮询线程。我在 Tomcat 9.0.12 中运行我的代码,一旦升级到 Tomcat 9.0.54,CPU 使用问题就消失了。
因此,完整的工作代码如下所示:(我知道,我知道,字节显示和手动准备 HTML 请求并不是最优的,但这就是 Loom 的用途,对吧;)
import java.io.*;
import java.net.*;
import java.nio.charset.StandardCharsets;
import java.util.concurrent.*;
import javax.servlet.ServletException;
import javax.servlet.http.*;
import org.apache.http.HttpRequest;
import org.mitre.dsmiley.httpproxy.ProxyServlet;
public class ProxyWithWebSocket extends ProxyServlet
private static final long serialVersionUID = -2566573965489129976L;
protected ExecutorService exec;
@Override
public void init() throws ServletException
super.init();
exec = Executors.newCachedThreadPool();
@Override
public void destroy()
super.destroy();
exec.shutdown();
@Override
protected void copyRequestHeaders(HttpServletRequest servletRequest, HttpRequest proxyRequest)
super.copyRequestHeaders(servletRequest, proxyRequest);
String userId = (String)servletRequest.getAttribute("UserID");
if (userId != null)
proxyRequest.addHeader("UserID", userId);
@Override
protected void service(HttpServletRequest servletRequest, HttpServletResponse servletResponse)
throws ServletException, IOException
var wsKey = servletRequest.getHeader("Sec-WebSocket-Key");
if (wsKey != null)
//initialize request attributes from caches if unset by a subclass by this point
if (servletRequest.getAttribute(ATTR_TARGET_URI) == null)
servletRequest.setAttribute(ATTR_TARGET_URI, targetUri);
if (servletRequest.getAttribute(ATTR_TARGET_HOST) == null)
servletRequest.setAttribute(ATTR_TARGET_HOST, targetHost);
String proxyRequestUri = rewriteUrlFromRequest(servletRequest);
URL u = new URL(proxyRequestUri);
Socket sock = new Socket(u.getHost(), u.getPort());
boolean closeSocket = false;
try
var sockIn = sock.getInputStream();
var sockOut = sock.getOutputStream();
StringBuilder req = new StringBuilder(512);
req.append("GET " + u.getFile()).append(" HTTP/1.1");
System.out.println(" > WS|" + req);
req.append("\r\n");
var en = servletRequest.getHeaderNames();
while (en.hasMoreElements())
var n = en.nextElement();
String header = servletRequest.getHeader(n);
System.out.println(" > WS| " + n + ": " + header);
req.append(n + ": " + header + "\r\n");
req.append("\r\n");
sockOut.write(req.toString().getBytes(StandardCharsets.UTF_8));
sockOut.flush();
StringBuilder responseBytes = new StringBuilder(512);
int b = 0;
while (b != -1)
b = sockIn.read();
if (b != -1)
responseBytes.append((char)b);
var len = responseBytes.length();
if (len >= 4
&& responseBytes.charAt(len - 4) == '\r'
&& responseBytes.charAt(len - 3) == '\n'
&& responseBytes.charAt(len - 2) == '\r'
&& responseBytes.charAt(len - 1) == '\n'
)
break;
String[] rows = responseBytes.toString().split("\r\n");
String response = rows[0];
System.out.println(" < WS|" + response);
int idx1 = response.indexOf(' ');
int idx2 = response.indexOf(' ', idx1 + 1);
for (int i = 1; i < rows.length; i++)
String line = rows[i];
int idx3 = line.indexOf(":");
var k = line.substring(0, idx3);
var headerField = line.substring(idx3 + 2);
System.out.println(" < WS| " + k + ": " + headerField);
servletResponse.setHeader(k, headerField);
int respCode = Integer.parseInt(response.substring(idx1 + 1, idx2));
if (respCode != 101)
servletResponse.setStatus(respCode);
servletResponse.flushBuffer();
System.out.println(" < WS| Flush");
closeSocket = true;
else
var uh = servletRequest.upgrade(WsUpgradeHandler.class);
uh.preInit(exec, sockIn, sockOut, sock);
finally
if (closeSocket)
sock.close();
else
super.service(servletRequest, servletResponse);
public static class WsUpgradeHandler implements HttpUpgradeHandler
ExecutorService exec;
InputStream sockIn;
OutputStream sockOut;
Socket sock;
Future<?> f2;
public WsUpgradeHandler()
public void preInit(ExecutorService exec, InputStream sockIn, OutputStream sockOut, Socket sock)
this.exec = exec;
this.sockIn = sockIn;
this.sockOut = sockOut;
this.sock = sock;
@Override
public void init(WebConnection wc)
System.out.println(" * WS| Upgrade begin");
try
var servletIn = wc.getInputStream();
var servletOut = wc.getOutputStream();
f2 = exec.submit(() ->
System.out.println(" > WS| Backend -> Client");
var c = 0;
try
var bs = 0;
while ((bs = sockIn.read()) != -1)
servletOut.write(bs);
servletOut.flush();
c++;
catch (SocketException | EOFException exc)
// this is fine
catch (Exception exc)
exc.printStackTrace();
finally
servletOut.close();
System.out.println(" < WS| Done: " + c);
return null;
);
System.out.println(" > WS| Client -> Backend");
var c = 0;
var bs = 0;
try
while ((bs = servletIn.read()) != -1)
sockOut.write(bs);
c++;
catch (SocketException | EOFException exc)
// this is fine
catch (Exception exc)
exc.printStackTrace();
finally
sockOut.close();
System.out.println(" > WS| Done: " + c);
f2.get();
catch (Exception ex)
ex.printStackTrace();
finally
if (f2 != null)
f2.cancel(true);
@Override
public void destroy()
System.out.println(" * WS| Upgrade closing");
if (f2 != null)
f2.cancel(true);
try
sock.close();
catch (IOException ex)
System.out.println(" * WS| Upgrade close");
【讨论】:
以上是关于通过 Tomcat servlet 代理常规 HTTP 和 WebSocket的主要内容,如果未能解决你的问题,请参考以下文章