在 GRPC 中拦截/记录请求和响应
Posted
技术标签:
【中文标题】在 GRPC 中拦截/记录请求和响应【英文标题】:Intercepting/Logging requests and Responses in GRPC 【发布时间】:2018-04-19 16:28:56 【问题描述】:我正在使用 GRPC 开发一个聊天应用程序,其中服务器从客户端接收信息并将其发送回与其连接的所有客户端。为此,我使用 Saturnism's chat-example 作为参考。我已经复制了代码,代码可以编译并运行,但服务器应该永远不会收到来自客户端的任何请求。
我的问题是:
-
有没有办法在 GRPC 中启用 verbos 服务器端和客户端日志记录,以查看进出哪些请求和响应以及哪些可能失败?
我将以下代码用于服务器和客户端。以下代码中可能缺少/错误的内容会导致客户端和服务器之间没有通信。
WingokuServer.java
public class WingokuServer
public static void main(String[] args) throws IOException, InterruptedException
Server server = ServerBuilder.forPort(8091)
.intercept(recordRequestHeadersInterceptor())
.addService(new WingokuServiceImpl())
.build();
System.out.println("Starting server...");
server.start();
System.out.println("Server started!");
server.awaitTermination();
WingokuServerSideServiceImplementation:
public class WingokuServiceImpl extends WingokuServiceGrpc.WingokuServiceImplBase
private static Set<StreamObserver<Response>> observers =
Collections.newSetFromMap(new ConcurrentHashMap<>());
public WingokuServiceImpl()
System.out.println("WingokuServiceImp");
@Override
public StreamObserver<Request> messages(StreamObserver<Response> responseObserver)
System.out.println("messages");
observers.add(responseObserver);
return new StreamObserver<Request>()
@Override
public void onNext(Request request)
System.out.println("Server onNext: ");
System.out.println("request from client is: "+ request.getRequestMessage());
Response response = Response.newBuilder().setResponseMessage("new Message From server at time: "+ System.nanoTime()).build();
for (StreamObserver<Response> observer : observers)
observer.onNext(response);
@Override
public void onError(Throwable throwable)
System.out.println("Server onError: ");
throwable.printStackTrace();
@Override
public void onCompleted()
observers.remove(responseObserver);
System.out.println("Server onCompleted ");
;
WingokuClient:
public class WingokuClient
public static void main(String[] args)
ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8091).usePlaintext(true).build();
WingokuServiceGrpc.WingokuServiceStub asyncStub = WingokuServiceGrpc.newStub(channel);
StreamObserver<Request> requestStreamObserver = asyncStub.messages(new StreamObserver<Response>()
@Override
public void onNext(Response response)
System.out.println("Client onNext");
System.out.println("REsponse from server is: "+ response.getResponseMessage());
@Override
public void onError(Throwable throwable)
System.out.println("Client onError");
throwable.printStackTrace();
@Override
public void onCompleted()
System.out.println("Client OnComplete");
);
requestStreamObserver.onNext(Request.newBuilder().setRequestMessage("Message From Client").build());
requestStreamObserver.onCompleted();
channel.shutdown();
System.out.println("exiting client");
编辑:
代码没有问题。有用。我只需要将 awaitTermination 添加到客户端的通道,因为没有它只会立即关闭客户端和服务器之间的连接,甚至可能在请求从客户端传出到网络之前。这就是服务器从未收到任何请求的原因。
但是,我关于启用详细日志记录和/或向服务器端添加某种拦截器的问题仍未得到解答。所以我很期待在这里得到专家的指点。
【问题讨论】:
【参考方案1】:多年后让我也回答这个问题(希望对谁会遇到同样的问题有用)。 我基本上以Shoohei的响应为例,尽量压缩它来解决。
服务器拦截器
public class ServerLogInterceptor implements ServerInterceptor
@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next)
ServerCall<ReqT, RespT> listener = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call)
@Override
public void sendMessage(RespT message)
log.debug("Sending message to cliens: ", message);
super.sendMessage(message);
;
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(listener, headers))
@Override
public void onMessage(ReqT message)
log.debug("Received message from cliens: ", message);
super.onMessage(message);
;
客户端拦截器
public class ClientLogInterceptor implements ClientInterceptor
@Override
public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
MethodDescriptor<ReqT, RespT> method,
CallOptions callOptions,
Channel next
)
return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions))
@Override
public void sendMessage(ReqT message)
log.debug("Sending message to modules: ", message);
super.sendMessage(message);
@Override
public void start(Listener<RespT> responseListener, Metadata headers)
super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener)
@Override
public void onMessage(RespT message)
log.debug("Received message from modules: ", message);
super.onMessage(message);
, headers);
;
(我不确定我是否正确粘贴了代码,以防只是添加或删除一些括号)
【讨论】:
【参考方案2】:我找到了一种使用拦截器在服务器端和客户端记录请求和响应的方法,它使代码更清晰。 也可以使用 sleuth 进行追踪。
请使用弹簧:
implementation 'io.github.lognet:grpc-spring-boot-starter'
服务器部分
然后您可以使用 GRpcGlobalInterceptor 注释
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import org.lognet.springboot.grpc.GRpcGlobalInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
@GRpcGlobalInterceptor
public class GrpcInterceptor implements ServerInterceptor
private Logger logger = LoggerFactory.getLogger(this.getClass());
public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);
@Override
public <M, R> ServerCall.Listener<M> interceptCall(
ServerCall<M, R> call, Metadata headers, ServerCallHandler<M, R> next)
String traceId = headers.get(TRACE_ID_KEY);
// TODO: Add traceId to sleuth
logger.warn("traceId from client: . TODO: Add traceId to sleuth", traceId);
GrpcServerCall grpcServerCall = new GrpcServerCall(call);
ServerCall.Listener listener = next.startCall(grpcServerCall, headers);
return new GrpcForwardingServerCallListener<M>(call.getMethodDescriptor(), listener)
@Override
public void onMessage(M message)
logger.info("Method: , Message: ", methodName, message);
super.onMessage(message);
;
private class GrpcServerCall<M, R> extends ServerCall<M, R>
ServerCall<M, R> serverCall;
protected GrpcServerCall(ServerCall<M, R> serverCall)
this.serverCall = serverCall;
@Override
public void request(int numMessages)
serverCall.request(numMessages);
@Override
public void sendHeaders(Metadata headers)
serverCall.sendHeaders(headers);
@Override
public void sendMessage(R message)
logger.info("Method: , Response: ", serverCall.getMethodDescriptor().getFullMethodName(), message);
serverCall.sendMessage(message);
@Override
public void close(Status status, Metadata trailers)
serverCall.close(status, trailers);
@Override
public boolean isCancelled()
return serverCall.isCancelled();
@Override
public MethodDescriptor<M, R> getMethodDescriptor()
return serverCall.getMethodDescriptor();
private class GrpcForwardingServerCallListener<M> extends io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener<M>
String methodName;
protected GrpcForwardingServerCallListener(MethodDescriptor method, ServerCall.Listener<M> listener)
super(listener);
methodName = method.getFullMethodName();
客户端部分
拦截器:
import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;
import java.util.concurrent.TimeUnit;
import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;
@Component
public class BackendInterceptor implements ClientInterceptor
private Logger logger = LoggerFactory.getLogger(this.getClass());
public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);
@Override
public <M, R> ClientCall<M, R> interceptCall(
final MethodDescriptor<M, R> method, CallOptions callOptions, Channel next)
return new BackendForwardingClientCall<M, R>(method,
next.newCall(method, callOptions.withDeadlineAfter(10000, TimeUnit.MILLISECONDS)))
@Override
public void sendMessage(M message)
logger.info("Method: , Message: ", methodName, message);
super.sendMessage(message);
@Override
public void start(Listener<R> responseListener, Metadata headers)
// TODO: Use the sleuth traceId instead of 999
headers.put(TRACE_ID_KEY, "999");
BackendListener<R> backendListener = new BackendListener<>(methodName, responseListener);
super.start(backendListener, headers);
;
private class BackendListener<R> extends ClientCall.Listener<R>
String methodName;
ClientCall.Listener<R> responseListener;
protected BackendListener(String methodName, ClientCall.Listener<R> responseListener)
super();
this.methodName = methodName;
this.responseListener = responseListener;
@Override
public void onMessage(R message)
logger.info("Method: , Response: ", methodName, message);
responseListener.onMessage(message);
@Override
public void onHeaders(Metadata headers)
responseListener.onHeaders(headers);
@Override
public void onClose(Status status, Metadata trailers)
responseListener.onClose(status, trailers);
@Override
public void onReady()
responseListener.onReady();
private class BackendForwardingClientCall<M, R> extends io.grpc.ForwardingClientCall.SimpleForwardingClientCall<M, R>
String methodName;
protected BackendForwardingClientCall(MethodDescriptor<M, R> method, ClientCall delegate)
super(delegate);
methodName = method.getFullMethodName();
将拦截器添加到通道中:
ManagedChannel managedChannel = ManagedChannelBuilder
.forAddress(_URL_, _PORT_).usePlaintext().intercept(backendInterceptor).build();
【讨论】:
【参考方案3】:另外,如果您想打印在服务器上看到的消息内容或标题,您可以创建一个 ServerInterceptor: https://grpc.io/grpc-java/javadoc/io/grpc/ServerInterceptor.html
您可以查看有关 ServerInterceptor 和 ClientInterceptor 工作原理的示例目录。不存在记录网络事件的预先存在的拦截器。
【讨论】:
【参考方案4】:您可以在 Netty 传输中打开帧记录。首先,创建一个名为logging.properties
的文件。在文件中放入以下内容:
handlers=java.util.logging.ConsoleHandler
io.grpc.netty.level=FINE
java.util.logging.ConsoleHandler.level=FINE
java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter
然后使用 jvm 标志启动 Java 二进制文件
-Djava.util.logging.config.file=logging.properties
【讨论】:
以上是关于在 GRPC 中拦截/记录请求和响应的主要内容,如果未能解决你的问题,请参考以下文章
C# 中 WCF 和 MVC Web API 的通用请求响应拦截器
NestJS:记录来自 HttpService 调用的请求/响应?