Gcloud PubSub Java 实现 - java.util.concurrent.RejectedExecutionException

Posted

技术标签:

【中文标题】Gcloud PubSub Java 实现 - java.util.concurrent.RejectedExecutionException【英文标题】:Gcloud PubSub Java implementation - java.util.concurrent.RejectedExecutionException 【发布时间】:2017-12-17 13:48:51 【问题描述】:

我使用 GCloud 文档中的示例 sn-p 作为订阅者接收 msg。我的 pubsub gcloud jar 版本是 0.19.0-alpha

问题是我可以接收带有属性映射的味精,但我一直有这个异常:

2017-07-12 16:52:25,219 [grpc-default-worker-ELG-1-16] WARN  io.netty.util.concurrent.DefaultPromise - An exception was thrown by io.grpc.netty.NettyClientHandler$3.operationComplete()
java.util.concurrent.RejectedExecutionException: Task java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@fbf4a6d rejected from java.util.concurrent.ScheduledThreadPoolExecutor@25cbe860[Terminated, pool size = 35, active threads = 0, queued tasks = 0, completed tasks = 2403]
    at java.util.concurrent.ThreadPoolExecutor$AbortPolicy.rejectedExecution(ThreadPoolExecutor.java:2047)
    at java.util.concurrent.ThreadPoolExecutor.reject(ThreadPoolExecutor.java:823)
    at java.util.concurrent.ScheduledThreadPoolExecutor.delayedExecute(ScheduledThreadPoolExecutor.java:326)
    at java.util.concurrent.ScheduledThreadPoolExecutor.schedule(ScheduledThreadPoolExecutor.java:533)
    at java.util.concurrent.ScheduledThreadPoolExecutor.execute(ScheduledThreadPoolExecutor.java:622)
    at java.util.concurrent.Executors$DelegatedExecutorService.execute(Executors.java:668)
    at io.grpc.internal.SerializingExecutor.execute(SerializingExecutor.java:110)
    at io.grpc.internal.ClientCallImpl$ClientStreamListenerImpl.onReady(ClientCallImpl.java:573)
    at io.grpc.internal.DelayedStream$DelayedStreamListener.onReady(DelayedStream.java:398)
    at io.grpc.internal.AbstractStream2$TransportState.notifyIfReady(AbstractStream2.java:305)
    at io.grpc.internal.AbstractStream2$TransportState.onStreamAllocated(AbstractStream2.java:248)
    at io.grpc.netty.NettyClientStream$TransportState.setHttp2Stream(NettyClientStream.java:227)
    at io.grpc.netty.NettyClientHandler$3.operationComplete(NettyClientHandler.java:429)
    at io.grpc.netty.NettyClientHandler$3.operationComplete(NettyClientHandler.java:417)
    at io.netty.util.concurrent.DefaultPromise.notifyListener0(DefaultPromise.java:507)
    at io.netty.util.concurrent.DefaultPromise.notifyListenersNow(DefaultPromise.java:481)
    at io.netty.util.concurrent.DefaultPromise.notifyListeners(DefaultPromise.java:420)
    at io.netty.util.concurrent.DefaultPromise.trySuccess(DefaultPromise.java:104)

之后,程序关闭并停止收听和获取消息。如何解决这种中断,我什至摆脱了 finally 具有subscriber.stopAsync() 的子句。

【问题讨论】:

你能具体说明你使用的是什么sn-p吗? https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-subscribe-java @Mikey 你解决了吗?我得到了完全相同的异常。 我确实解决了它。我遵循文档,显然有两种不同的方法可以做到这一点。 cloud.google.com/pubsub/docs/quickstart-client-libraries 您能否提及您需要什么版本来更新依赖项才能正常工作?是 1.99.0 版还是更早的版本? 【参考方案1】:

他们提供的 sn-p 中有一个错误。您需要在 messaegeIdFuture 上调用 get()。下面的代码解决了这个问题:

Publisher publisher = null;
String projectId = ServiceOptions.getDefaultProjectId();
ProjectTopicName topic = ProjectTopicName.of(projectId, "test");
ApiFuture<String> messageIdFuture = null;
try 
       publisher = Publisher.newBuilder(topic).build();
       ByteString data = ByteString.copyFromUtf8("my-message");
       PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
       messageIdFuture = publisher.publish(pubsubMessage);
     catch (IOException e) 
       e.printStackTrace();
     finally 
        messageIdFuture.get();    //This resolves this issue.
        // Wait on any pending requests
        if (publisher != null) 
            publisher.shutdown();
            //publisher.awaitTermination(1, TimeUnit.SECONDS);
        
    

【讨论】:

首先,我真的需要使用future吗?我需要始终如一地从订阅中收听,而不是主动拉取。不过还是谢谢。

以上是关于Gcloud PubSub Java 实现 - java.util.concurrent.RejectedExecutionException的主要内容,如果未能解决你的问题,请参考以下文章

JS 中的 Gcloud pubsub worker

gcloud alpha pubsub 订阅寻求方法未找到异常

gcloud pubsub 订阅 pull 经常报空消息列表

Google Cloud PubSub - 似乎无法获取主题

PubSub 死字

谷歌云跟踪 + Gcloud 登录日志查看器