如何在 gRPC 服务器中添加全局异常拦截器?
Posted
技术标签:
【中文标题】如何在 gRPC 服务器中添加全局异常拦截器?【英文标题】:How to add global exception interceptor in gRPC server? 【发布时间】:2017-02-09 08:43:52 【问题描述】:在gRPC中,如何添加一个全局异常拦截器,拦截任意RuntimeException
并将有意义的信息传播给客户端?
例如,divide
方法可能会抛出 ArithmeticException
和 / by zero
消息。在服务器端,我可以写:
@Override
public void divide(DivideRequest request, StreamObserver<DivideResponse> responseObserver)
int dom = request.getDenominator();
int num = request.getNumerator();
double result = num / dom;
responseObserver.onNext(DivideResponse.newBuilder().setValue(result).build());
responseObserver.onCompleted();
如果客户端通过 denominator = 0 ,它将得到:
Exception in thread "main" io.grpc.StatusRuntimeException: UNKNOWN
服务器输出
Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$2@62e95ade
java.lang.ArithmeticException: / by zero
客户不知道发生了什么。
如果我想将/ by zero
消息传递给客户端,我必须将服务器修改为:
(如question 中所述)
try
double result = num / dom;
responseObserver.onNext(DivideResponse.newBuilder().setValue(result).build());
responseObserver.onCompleted();
catch (Exception e)
logger.error("onError : " , e.getMessage());
responseObserver.onError(new StatusRuntimeException(Status.INTERNAL.withDescription(e.getMessage())));
如果客户端发送 denominator = 0 ,它将得到:
Exception in thread "main" io.grpc.StatusRuntimeException: INTERNAL: / by zero
好的,/ by zero
已传递给客户端。
但问题是,在真正的企业环境中,会有很多RuntimeException
s,如果我想将这些异常的消息传递给客户端,我必须尝试捕获每个方法,这很麻烦.
是否有任何全局拦截器可以拦截每个方法,捕获RuntimeException
并触发onError
并将错误消息传播给客户端?这样我就不必在我的服务器代码中处理RuntimeException
s。
非常感谢!
注意:
<grpc.version>1.0.1</grpc.version>
com.google.protobuf:proton:3.1.0
io.grpc:protoc-gen-grpc-java:1.0.1
【问题讨论】:
【参考方案1】:public class GrpcExceptionHandler implements ServerInterceptor
private final Logger logger = LoggerFactory.getLogger (GrpcExceptionHandler.class);
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall (ServerCall<ReqT, RespT> call,
Metadata headers,
ServerCallHandler<ReqT, RespT> next)
logger.info ("GRPC call at: ", Instant.now ());
ServerCall.Listener<ReqT> listener;
try
listener = next.startCall (call, headers);
catch (Throwable ex)
logger.error ("Uncaught exception from grpc service");
call.close (Status.INTERNAL
.withCause (ex)
.withDescription ("Uncaught exception from grpc service"), null);
return new ServerCall.Listener<ReqT>() ;
return listener;
上面的示例拦截器。
当然,你需要先引导它,然后再期待它的任何东西;
serverBuilder.addService (ServerInterceptors.intercept (bindableService, interceptor));
更新
public interface ServerCallHandler<RequestT, ResponseT>
/**
* Produce a non-@code null listener for the incoming call. Implementations are free to call
* methods on @code call before this method has returned.
*
* <p>If the implementation throws an exception, @code call will be closed with an error.
* Implementations must not throw an exception if they started processing that may use @code
* call on another thread.
*
* @param call object for responding to the remote client.
* @return listener for processing incoming request messages for @code call
*/
ServerCall.Listener<RequestT> startCall(
ServerCall<RequestT, ResponseT> call,
Metadata headers);
可悲的是,不同的线程上下文意味着没有异常处理范围,所以我的答案不是您正在寻找的解决方案..
【讨论】:
对不起,它不起作用。这条线logger.error ("Uncaught exception from grpc service");
没有到达!我也觉得奇怪。
好吧,拦截确实发生了,但正如文档所述,next.startCall(call, headers)
立即返回并在另一个线程中执行,最终我们失去了异常的堆栈范围。可悲的是,我不知道目前是否有任何解决方法。【参考方案2】:
你读过 grpc java examples 的拦截器吗?
所以在我的例子中,我们使用代码和消息作为标准来定义服务器发送到客户端的错误类型。
示例:服务器发送响应,如
code: 409,
message: 'Id xxx aldready exist'
因此,您可以在客户端设置客户端拦截器,以获取该代码并使用反射进行响应。仅供参考,我们使用Lognet Spring Boot starter for grpc 作为服务器,使用 Spring boot 作为客户端。
【讨论】:
【参考方案3】:我已经使用 AOP 来处理全局的 rpc 错误,我觉得它很方便。我在guice中使用AOP,spring的使用方式应该类似
-
定义方法拦截器
```
public class ServerExceptionInterceptor implements MethodInterceptor
final static Logger logger = LoggerFactory.getLogger(ServerExceptionInterceptor.class);
public Object invoke(MethodInvocation invocation) throws Throwable
try
return invocation.proceed();
catch (Exception ex)
String stackTrace = Throwables.getStackTraceAsString(ex);
logger.error("##grpc server side error, ", stackTrace);
Object[] args = invocation.getArguments();
StreamObserver<?> responseObserver = (StreamObserver<?>)args[1];
responseObserver.onError(Status.INTERNAL
.withDescription(stackTrace)
.withCause(ex)
.asRuntimeException());
return null;
@Target(ElementType.METHOD, ElementType.TYPE) @Retention(RUNTIME)
public @interface WrapError
String value() default "";
```
在所有 rpc 方法中添加 @WrapError
@Override @WrapError
public void sayHello(HelloRequest req, StreamObserver<HelloReply> responseObserver)
HelloReply reply = HelloReply.newBuilder().setMessage("Hello " + req.getName()).build();
logger.info("#rpc server, sayHello, planId: ", req.getName());
if(true) throw new RuntimeException("testing-rpc-error"); //simulate an exception
responseObserver.onNext(reply);
responseObserver.onCompleted();
-
在guice模块中绑定拦截器
ServerExceptionInterceptor interceptor = new ServerExceptionInterceptor();
requestInjection(interceptor);
bindInterceptor(Matchers.any(), Matchers.annotatedWith(WrapError.class), interceptor);
4.测试
【讨论】:
【参考方案4】:以下代码将捕获所有运行时异常,另请参考链接https://github.com/grpc/grpc-java/issues/1552
public class GlobalGrpcExceptionHandler implements ServerInterceptor
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> call,
Metadata requestHeaders, ServerCallHandler<ReqT, RespT> next)
ServerCall.Listener<ReqT> delegate = next.startCall(call, requestHeaders);
return new SimpleForwardingServerCallListener<ReqT>(delegate)
@Override
public void onHalfClose()
try
super.onHalfClose();
catch (Exception e)
call.close(Status.INTERNAL
.withCause (e)
.withDescription("error message"), new Metadata());
;
【讨论】:
【参考方案5】:TransmitStatusRuntimeExceptionInterceptor 与您想要的非常相似,只是它只捕获StatusRuntimeException
。你可以 fork 它并让它捕获所有异常。
要为服务器上的所有服务安装拦截器,可以使用 gRPC 1.5.0 中添加的ServerBuilder.intercept()
【讨论】:
【参考方案6】:在 Kotlin 中,您必须 structure your ServerInterceptor differently。我在 Micronaut 中使用了 grpc-kotlin,异常从未出现在 SimpleForwardingServerCallListener
onHalfClose
或其他处理程序中。
【讨论】:
【参考方案7】:如果您想在所有 gRPC 端点(包括处理客户端流的那些)和拦截器中捕获异常,您可能需要类似的东西到以下:
import io.grpc.ForwardingServerCallListener;
import io.grpc.Metadata;
import io.grpc.ServerCall;
import io.grpc.ServerCallHandler;
import io.grpc.ServerInterceptor;
import io.grpc.Status;
public class GlobalGrpcExceptionHandler implements ServerInterceptor
@Override
public <ReqT, RespT> ServerCall.Listener<ReqT> interceptCall(ServerCall<ReqT, RespT> serverCall,
Metadata requestHeaders,
ServerCallHandler<ReqT, RespT> serverCallHandler)
try
ServerCall.Listener<ReqT> delegate = serverCallHandler.startCall(serverCall, requestHeaders);
return new ForwardingServerCallListener.SimpleForwardingServerCallListener<ReqT>(delegate)
@Override
public void onMessage(ReqT message)
try
super.onMessage(message); // Here onNext is called (in case of client streams)
catch (Throwable e)
handleEndpointException(e, serverCall);
@Override
public void onHalfClose()
try
super.onHalfClose(); // Here onCompleted is called (in case of client streams)
catch (Throwable e)
handleEndpointException(e, serverCall);
;
catch (Throwable t)
return handleInterceptorException(t, serverCall);
private <ReqT, RespT> void handleEndpointException(Throwable t, ServerCall<ReqT, RespT> serverCall)
serverCall.close(Status.INTERNAL
.withCause(t)
.withDescription("An exception occurred in the endpoint implementation"), new Metadata());
private <ReqT, RespT> ServerCall.Listener<ReqT> handleInterceptorException(Throwable t, ServerCall<ReqT, RespT> serverCall)
serverCall.close(Status.INTERNAL
.withCause(t)
.withDescription("An exception occurred in a **subsequent** interceptor"), new Metadata());
return new ServerCall.Listener<ReqT>()
// no-op
;
免责声明:我通过检查实现收集了这一点,我没有在文档中阅读它,我不确定它是否会改变。作为参考,我指的是io.grpc
版本1.30
。
【讨论】:
【参考方案8】:如果您可以使用yidongnan/grpc-spring-boot-starter将您的gRPC服务器应用程序转换为spring boot,那么您可以编写@GrpcAdvice,类似于Spring Boot @ControllerAdvice为
@GrpcAdvice
public class ExceptionHandler
@GrpcExceptionHandler(ValidationErrorException.class)
public StatusRuntimeException handleValidationError(ValidationErrorException cause)
Status.INVALID_ARGUMENT.withDescription("Invalid Argument")
.asRuntimeException()
【讨论】:
【参考方案9】:在 Kotlin 上,在侦听器的方法上添加 try/catch 不起作用,由于某种原因,异常被吞没了。
按照@markficket 发布的链接,我实现了一个解决方案,创建了SimpleForwardingServerCall
的实现。
class ErrorHandlerServerInterceptor : ServerInterceptor
private inner class StatusExceptionHandlingServerCall<ReqT, RespT>(delegate: ServerCall<ReqT, RespT>)
: ForwardingServerCall.SimpleForwardingServerCall<ReqT, RespT>(delegate)
override fun close(status: Status, trailers: Metadata)
status.run
when
isOk -> status
cause is MyException -> myExceptionHandler(cause as MyException)
else -> defaultExceptionHandler(cause)
.let super.close(it, trailers)
private fun myExceptionHandler(cause: MyException): Status = cause.run ...
private fun defaultExceptionHandler(cause: Throwable?): Status = cause?.run ...
override fun <ReqT : Any, RespT : Any> interceptCall(
call: ServerCall<ReqT, RespT>,
metadata: Metadata,
next: ServerCallHandler<ReqT, RespT>
): ServerCall.Listener<ReqT> =
next.startCall(StatusExceptionHandlingServerCall(call), metadata)
那当然需要在服务器创建时添加拦截器
ServerBuilder
.forPort(port)
.executor(Dispatchers.IO.asExecutor())
.addService(service)
.intercept(ErrorHandlerServerInterceptor())
.build()
然后你可以简单地在你的 gRPC 方法上抛出异常
override suspend fun someGrpcCall(request: Request): Response
... code ...
throw NotFoundMyException("Cannot find entity")
【讨论】:
以上是关于如何在 gRPC 服务器中添加全局异常拦截器?的主要内容,如果未能解决你的问题,请参考以下文章