在 GRPC 中拦截/记录请求和响应

Posted

技术标签:

【中文标题】在 GRPC 中拦截/记录请求和响应【英文标题】:Intercepting/Logging requests and Responses in GRPC 【发布时间】:2018-04-19 16:28:56 【问题描述】:

我正在使用 GRPC 开发一个聊天应用程序,其中服务器从客户端接收信息并将其发送回与其连接的所有客户端。为此,我使用 Saturnism's chat-example 作为参考。我已经复制了代码,代码可以编译并运行,但服务器应该永远不会收到来自客户端的任何请求。

我的问题是:

    有没有办法在 GRPC 中启用 verbos 服务器端和客户端日志记录,以查看进出哪些请求和响应以及哪些可能失败? 我将以下代码用于服务器和客户端。以下代码中可能缺少/错误的内容会导致客户端和服务器之间没有通信。

WingokuServer.java

public class WingokuServer 
    public static void main(String[] args) throws IOException, InterruptedException 
        Server server = ServerBuilder.forPort(8091)
                .intercept(recordRequestHeadersInterceptor())
                .addService(new WingokuServiceImpl())
                .build();

        System.out.println("Starting server...");
        server.start();
        System.out.println("Server started!");
        server.awaitTermination();
    

WingokuServerSideServiceImplementation:

public class WingokuServiceImpl extends WingokuServiceGrpc.WingokuServiceImplBase 
    private static Set<StreamObserver<Response>> observers =
            Collections.newSetFromMap(new ConcurrentHashMap<>());

    public WingokuServiceImpl() 
        System.out.println("WingokuServiceImp");
    

    @Override
    public StreamObserver<Request> messages(StreamObserver<Response> responseObserver) 
        System.out.println("messages");
        observers.add(responseObserver);
        return new StreamObserver<Request>() 
            @Override
            public void onNext(Request request) 
                System.out.println("Server onNext: ");
                System.out.println("request from client is: "+ request.getRequestMessage());
                Response response = Response.newBuilder().setResponseMessage("new Message From server at time: "+ System.nanoTime()).build();
                for (StreamObserver<Response> observer : observers) 
                    observer.onNext(response);
                
            

            @Override
            public void onError(Throwable throwable) 
                System.out.println("Server onError: ");
                throwable.printStackTrace();
            

            @Override
            public void onCompleted() 
                observers.remove(responseObserver);
                System.out.println("Server onCompleted ");
            
        ;
    

WingokuClient:

public class WingokuClient 
    public static void main(String[] args) 
        ManagedChannel channel = ManagedChannelBuilder.forAddress("localhost", 8091).usePlaintext(true).build();
        WingokuServiceGrpc.WingokuServiceStub asyncStub = WingokuServiceGrpc.newStub(channel);
        StreamObserver<Request> requestStreamObserver = asyncStub.messages(new StreamObserver<Response>() 
            @Override
            public void onNext(Response response) 
                System.out.println("Client onNext");
                System.out.println("REsponse from server is: "+ response.getResponseMessage());
            

            @Override
            public void onError(Throwable throwable) 
                System.out.println("Client onError");
                throwable.printStackTrace();
            

            @Override
            public void onCompleted() 
                System.out.println("Client OnComplete");
            
        );

        requestStreamObserver.onNext(Request.newBuilder().setRequestMessage("Message From Client").build());
        requestStreamObserver.onCompleted();
        channel.shutdown();
        System.out.println("exiting client");
    

编辑:

代码没有问题。有用。我只需要将 awaitTermination 添加到客户端的通道,因为没有它只会立即关闭客户端和服务器之间的连接,甚至可能在请求从客户端传出到网络之前。这就是服务器从未收到任何请求的原因。

但是,我关于启用详细日志记录和/或向服务器端添加某种拦截器的问题仍未得到解答。所以我很期待在这里得到专家的指点。

【问题讨论】:

【参考方案1】:

多年后让我也回答这个问题(希望对谁会遇到同样的问题有用)。 我基本上以Shoohei的响应为例,尽量压缩它来解决。

服务器拦截器

public class ServerLogInterceptor implements ServerInterceptor 

@Override
public <ReqT, RespT> Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call, Metadata headers, ServerCallHandler<ReqT, RespT> next) 

    ServerCall<ReqT, RespT> listener = new ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(call) 

        @Override
        public void sendMessage(RespT message) 
            log.debug("Sending message to cliens: ",  message);
            super.sendMessage(message);
        
    ;

    return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(next.startCall(listener, headers)) 

        @Override
        public void onMessage(ReqT message) 
            log.debug("Received message from cliens: ", message);
            super.onMessage(message);
        

    ;

客户端拦截器

    public class ClientLogInterceptor  implements ClientInterceptor 

    @Override
    public <ReqT, RespT> ClientCall<ReqT, RespT> interceptCall(
        MethodDescriptor<ReqT, RespT> method,
        CallOptions callOptions,
        Channel next
    ) 
        return new SimpleForwardingClientCall<ReqT, RespT>(next.newCall(method, callOptions)) 
            @Override
            public void sendMessage(ReqT message) 
                log.debug("Sending message to modules: ", message);
                super.sendMessage(message);
            

            @Override
            public void start(Listener<RespT> responseListener, Metadata headers) 
                super.start(new ForwardingClientCallListener.SimpleForwardingClientCallListener<RespT>(responseListener) 

                    @Override
                    public void onMessage(RespT message) 
                        log.debug("Received message from modules: ", message);
                        super.onMessage(message);
                    

                , headers);
            

        ;
    


(我不确定我是否正确粘贴了代码,以防只是添加或删除一些括号)

【讨论】:

【参考方案2】:

我找到了一种使用拦截器在服务器端和客户端记录请求和响应的方法,它使代码更清晰。 也可以使用 sleuth 进行追踪。

请使用弹簧:

implementation 'io.github.lognet:grpc-spring-boot-starter'

服务器部分

然后您可以使用 GRpcGlobalInterceptor 注释

import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
import org.lognet.springboot.grpc.GRpcGlobalInterceptor;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

@GRpcGlobalInterceptor
public class GrpcInterceptor implements ServerInterceptor 
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);

    @Override
    public <M, R> ServerCall.Listener<M> interceptCall(
            ServerCall<M, R> call, Metadata headers, ServerCallHandler<M, R> next) 
        String traceId = headers.get(TRACE_ID_KEY);
        // TODO: Add traceId to sleuth
        logger.warn("traceId from client: . TODO: Add traceId to sleuth", traceId);

        GrpcServerCall grpcServerCall = new GrpcServerCall(call);

        ServerCall.Listener listener = next.startCall(grpcServerCall, headers);

        return new GrpcForwardingServerCallListener<M>(call.getMethodDescriptor(), listener) 
            @Override
            public void onMessage(M message) 
                logger.info("Method: , Message: ", methodName, message);
                super.onMessage(message);
            
        ;
    

    private class GrpcServerCall<M, R> extends ServerCall<M, R> 

        ServerCall<M, R> serverCall;

        protected GrpcServerCall(ServerCall<M, R> serverCall) 
            this.serverCall = serverCall;
        

        @Override
        public void request(int numMessages) 
            serverCall.request(numMessages);
        

        @Override
        public void sendHeaders(Metadata headers) 
            serverCall.sendHeaders(headers);
        

        @Override
        public void sendMessage(R message) 
            logger.info("Method: , Response: ", serverCall.getMethodDescriptor().getFullMethodName(), message);
            serverCall.sendMessage(message);
        

        @Override
        public void close(Status status, Metadata trailers) 
            serverCall.close(status, trailers);
        

        @Override
        public boolean isCancelled() 
            return serverCall.isCancelled();
        

        @Override
        public MethodDescriptor<M, R> getMethodDescriptor() 
            return serverCall.getMethodDescriptor();
        
    

    private class GrpcForwardingServerCallListener<M> extends io.grpc.ForwardingServerCallListener.SimpleForwardingServerCallListener<M> 

        String methodName;

        protected GrpcForwardingServerCallListener(MethodDescriptor method, ServerCall.Listener<M> listener) 
            super(listener);
            methodName = method.getFullMethodName();
        
    

客户端部分

拦截器:

import io.grpc.CallOptions;
import io.grpc.Channel;
import io.grpc.ClientCall;
import io.grpc.ClientInterceptor;
import io.grpc.Metadata;
import io.grpc.MethodDescriptor;
import io.grpc.Status;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.stereotype.Component;

import java.util.concurrent.TimeUnit;

import static io.grpc.Metadata.ASCII_STRING_MARSHALLER;

@Component
public class BackendInterceptor implements ClientInterceptor 
    private Logger logger = LoggerFactory.getLogger(this.getClass());

    public static final Metadata.Key<String> TRACE_ID_KEY = Metadata.Key.of("traceId", ASCII_STRING_MARSHALLER);

    @Override
    public <M, R> ClientCall<M, R> interceptCall(
            final MethodDescriptor<M, R> method, CallOptions callOptions, Channel next) 
        return new BackendForwardingClientCall<M, R>(method,
                next.newCall(method, callOptions.withDeadlineAfter(10000, TimeUnit.MILLISECONDS))) 

            @Override
            public void sendMessage(M message) 
                logger.info("Method: , Message: ", methodName, message);
                super.sendMessage(message);
            

            @Override
            public void start(Listener<R> responseListener, Metadata headers) 
                // TODO: Use the sleuth traceId instead of 999
                headers.put(TRACE_ID_KEY, "999");

                BackendListener<R> backendListener = new BackendListener<>(methodName, responseListener);
                super.start(backendListener, headers);
            
        ;
    

    private class BackendListener<R> extends ClientCall.Listener<R> 

        String methodName;
        ClientCall.Listener<R> responseListener;

        protected BackendListener(String methodName, ClientCall.Listener<R> responseListener) 
            super();
            this.methodName = methodName;
            this.responseListener = responseListener;
        

        @Override
        public void onMessage(R message) 
            logger.info("Method: , Response: ", methodName, message);
            responseListener.onMessage(message);
        

        @Override
        public void onHeaders(Metadata headers) 
            responseListener.onHeaders(headers);
        

        @Override
        public void onClose(Status status, Metadata trailers) 
            responseListener.onClose(status, trailers);
        

        @Override
        public void onReady() 
            responseListener.onReady();
        
    

    private class BackendForwardingClientCall<M, R> extends io.grpc.ForwardingClientCall.SimpleForwardingClientCall<M, R> 

        String methodName;

        protected BackendForwardingClientCall(MethodDescriptor<M, R> method, ClientCall delegate) 
            super(delegate);
            methodName = method.getFullMethodName();
        
    

将拦截器添加到通道中:

ManagedChannel managedChannel = ManagedChannelBuilder
                .forAddress(_URL_, _PORT_).usePlaintext().intercept(backendInterceptor).build();

【讨论】:

【参考方案3】:

另外,如果您想打印在服务器上看到的消息内容或标题,您可以创建一个 ServerInterceptor: https://grpc.io/grpc-java/javadoc/io/grpc/ServerInterceptor.html

您可以查看有关 ServerInterceptor 和 ClientInterceptor 工作原理的示例目录。不存在记录网络事件的预先存在的拦截器。

【讨论】:

【参考方案4】:

您可以在 Netty 传输中打开帧记录。首先,创建一个名为logging.properties 的文件。在文件中放入以下内容:

handlers=java.util.logging.ConsoleHandler
io.grpc.netty.level=FINE
java.util.logging.ConsoleHandler.level=FINE
java.util.logging.ConsoleHandler.formatter=java.util.logging.SimpleFormatter

然后使用 jvm 标志启动 Java 二进制文件 -Djava.util.logging.config.file=logging.properties

【讨论】:

以上是关于在 GRPC 中拦截/记录请求和响应的主要内容,如果未能解决你的问题,请参考以下文章

grpc swift客户端拦截器提议

C# 中 WCF 和 MVC Web API 的通用请求响应拦截器

在c#中等待grpc拦截器中的延续方法

NestJS:记录来自 HttpService 调用的请求/响应?

在 JavaScript 中拦截 fetch() API 请求和响应

grpc-源码解析