Quarkus 阻塞 grpc vert.x 事件循环错误

Posted

技术标签:

【中文标题】Quarkus 阻塞 grpc vert.x 事件循环错误【英文标题】:Quarkus blocking grpc vert.x eventloop error 【发布时间】:2021-06-02 18:40:10 【问题描述】:

尝试使用 grpc 与 Quarkus 1.12.1.Final 进行微服务之间的通信。我正在尝试访问我的实体,但出现 vert.x-eventloop-thread-o 错误;

(vert.x-eventloop-thread-0) Exception while executing runnable io.grpc.internal.ServerImpl$JumpToApplicationThreadServerStreamListener$1HalfClosed@8e09d21: java.lang.IllegalStateException: You have attempted to perform a blocking operation on a IO thread. This is not allowed, as blocking the IO thread will cause major performance issues with your application. If you want to perform blocking EntityManager operations make sure you are doing it from a worker thread.

据说在我的方法中添加@Blocking 应该可以解决这个问题,但这似乎不起作用。

@Singleton
public class FillProductService extends SourceGrpc.SourceImplBase 

    @Override
    @Blocking
    public void fillProductsWithSourceInfo(FillProductsRequest request, StreamObserver<FillProductsReply> responseObserver) 
        List<SourceInfo> sourceInfo = request.getProductIdsList().stream().map(productId -> 
            List<SourceEntity> sources = SourceEntity.find("productId = ?1", productId, Sort.descending("upvotes")).list();
            double bestPrice = !sources.isEmpty() ? sources.get(0).price : 0.00;
            return SourceInfo.newBuilder().setSources(sources.size())
                        .setBestPrice(bestPrice).setProductId(productId).build();
        ).collect(Collectors.toList());

        responseObserver.onNext(FillProductsReply.newBuilder().addAllSourceInfo(sourceInfo).build());
        responseObserver.onCompleted();
    

我也尝试过使用 Mutiny,但它似乎给出了同样的错误

【问题讨论】:

请提供更多 IllegalStateException 堆栈跟踪。 在正常执行中是否会发生,例如java -jar,还是本地人?我们有一个错误导致@Blocking 在开发模式下无法正常工作。这已在 2.0.0.Alpha2 中修复 【参考方案1】:

对 runSubscritionOn 使用 mutiny 现在确实给了我一个回应;

@Singleton
public class FillProductService extends MutinySourceGrpc.SourceImplBase 

    @Override
    public Uni<FillProductsReply> fillProductsWithSourceInfo(FillProductsRequest request) 
        return Uni.createFrom().item(() -> 
            List<SourceInfo> sourceInfo = request.getProductIdsList().stream().map(productId -> 
                List<SourceEntity> sources = SourceEntity.find("productId = ?1", productId).list();
                double bestPrice = !sources.isEmpty() ? sources.get(0).price : 0.00;
                return SourceInfo.newBuilder().setSources(sources.size())
                            .setBestPrice(bestPrice).setProductId(productId).build();
            ).collect(Collectors.toList());
            return FillProductsReply.newBuilder().addAllSourceInfo(sourceInfo).build();
        ).runSubscriptionOn(Infrastructure.getDefaultWorkerPool())
        .onFailure().invoke(t -> System.out.println("Oh no! We received a failure: " + t.getMessage())
        );
    

【讨论】:

以上是关于Quarkus 阻塞 grpc vert.x 事件循环错误的主要内容,如果未能解决你的问题,请参考以下文章

Reactive SQL 客户端 (Quarkus/Vert.X) 中的 Kotlin 协程事务

Quarkus gRPC 构建问题

VERT.X介绍(续)

Vert.x初体验

在保持活动超时时为 Vert.x 邮件发送器重置连接

如何使用 Vert.x 实现非阻塞 REST Web 服务