我无法在 Spring Boot 中使用 google pubsub 模拟器发送消息
Posted
技术标签:
【中文标题】我无法在 Spring Boot 中使用 google pubsub 模拟器发送消息【英文标题】:I can't send message using google pubsub emulator in spring boot 【发布时间】:2020-04-22 12:15:36 【问题描述】:我正在尝试使用 pubsub 的模拟器发送推送消息,我也在使用 spring boot,这是我的配置:
依赖:
<dependency>
<groupId>org.springframework.cloud</groupId>
<artifactId>spring-cloud-gcp-starter-pubsub</artifactId>
</dependency>
我的豆子:
@Configuration
@AutoConfigureBefore(value= GcpPubSubAutoConfiguration.class)
@EnableConfigurationProperties(value= GcpPubSubProperties.class)
public class EmulatorPubSubConfiguration
@Value("$spring.gcp.pubsub.projectid")
private String projectId;
@Value("$spring.gcp.pubsub.subscriptorid")
private String subscriptorId;
@Value("$spring.gcp.pubsub.topicid")
private String topicId;
@Bean
public Publisher pubsubEmulator() throws IOException
String hostport = System.getenv("PUBSUB_EMULATOR_HOST");
ManagedChannel channel = ManagedChannelBuilder.forTarget(hostport).usePlaintext().build();
try
TransportChannelProvider channelProvider =
FixedTransportChannelProvider.create(GrpcTransportChannel.create(channel));
CredentialsProvider credentialsProvider = NoCredentialsProvider.create();
// Set the channel and credentials provider when creating a `TopicAdminClient`.
// Similarly for SubscriptionAdminClient
TopicAdminClient topicClient =
TopicAdminClient.create(
TopicAdminSettings.newBuilder()
.setTransportChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build());
ProjectTopicName topicName = ProjectTopicName.of(projectId, topicId);
// Set the channel and credentials provider when creating a `Publisher`.
// Similarly for Subscriber
return Publisher.newBuilder(topicName)
.setChannelProvider(channelProvider)
.setCredentialsProvider(credentialsProvider)
.build();
finally
channel.shutdown();
当然,我已经将PUBSUB_EMULATOR_HOST系统变量设置为localhost:8085,模拟器在哪里运行
我创建了一个测试控制器:
用于发送推送消息@Autowired
private Publisher pubsubPublisher;
@PostMapping("/send1")
public String publishMessage(@RequestParam("message") String message) throws InterruptedException, IOException
Publisher pubsubPublisher = this.getPublisher();
ByteString data = ByteString.copyFromUtf8(message);
PubsubMessage pubsubMessage = PubsubMessage.newBuilder().setData(data).build();
ApiFuture<String> future = pubsubPublisher.publish(pubsubMessage);
//pubsubPublisher.publishAllOutstanding();
try
// Add an asynchronous callback to handle success / failure
ApiFutures.addCallback(future,
new ApiFutureCallback<String>()
@Override
public void onFailure(Throwable throwable)
if (throwable instanceof ApiException)
ApiException apiException = ((ApiException) throwable);
// details on the API exception
System.out.println(apiException.getStatusCode().getCode());
System.out.println(apiException.isRetryable());
System.out.println("Error publishing message : " + message);
System.out.println("Error publishing error : " + throwable.getMessage());
System.out.println("Error publishing cause : " + throwable.getCause());
@Override
public void onSuccess(String messageId)
// Once published, returns server-assigned message ids (unique within the topic)
System.out.println(messageId);
,
MoreExecutors.directExecutor());
finally
if (pubsubPublisher != null)
// When finished with the publisher, shutdown to free up resources.
pubsubPublisher.shutdown();
pubsubPublisher.awaitTermination(1, TimeUnit.MINUTES);
return "ok";
获取消息:
@PostMapping("/pushtest")
public String pushTest(@RequestBody CloudPubSubPushMessage request)
System.out.println( "------> message received: " + decode(request.getMessage().getData()) );
return request.toString();
我已经在模拟器中创建了我的主题和订阅,我遵循了这个教程:
https://cloud.google.com/pubsub/docs/emulator
我在模拟器中设置了端点“pushtest”以获取推送消息,使用以下命令:
python subscriber.py PUBSUB_PROJECT_ID create-push TOPIC_ID SUBSCRIPTION_ID PUSH_ENDPOINT
但是当我运行测试时,没有到达“/pushtest”端点并且我收到了这个错误:
任务 java.util.concurrent.ScheduledThreadPoolExecutor$ScheduledFutureTask@265d5d05 【未完成,task = java.util.concurrent.Executors$RunnableAdapter@a8c8be3 [包装任务 = com.google.common.util.concurrent.TrustedListenableFutureTask@1a53c57c [状态=待处理,信息=[任务=[运行=[尚未开始],com.google.api.gax.rpc.AttemptCallable@3866e1d0]]]]] 被 java.util.concurrent.ScheduledThreadPoolExecutor@3f34809a 拒绝 [已终止,池大小 = 0,活动线程 = 0,排队任务 = 0,已完成任务 = 1]
为了确保模拟器运行正常,我使用以下命令在 python 中运行测试:
python publisher.py PUBSUB_PROJECT_ID publish TOPIC_ID
而且我在“pushtest”端点中正确接收消息。
我不知道为什么对我的欺侮感到抱歉。
感谢您的帮助。
【问题讨论】:
【参考方案1】:我发现了问题。
只在 bean 中注释这一行
channel.shutdown();
哈哈很简单。
【讨论】:
以上是关于我无法在 Spring Boot 中使用 google pubsub 模拟器发送消息的主要内容,如果未能解决你的问题,请参考以下文章
我无法在 Spring Boot 中使用 google pubsub 模拟器发送消息
无法使Spring Boot + JavaFX在Eclipise中运行
在 v2.0.X 中无法使用 Spring Cloud + boot 调用 /encrypt 端点
无法在 Spring Boot 中使用 Crud Repository 从 Redis 获取结果?