google cloud pub sub中两个主题之间的消息传输
Posted
技术标签:
【中文标题】google cloud pub sub中两个主题之间的消息传输【英文标题】:Message transfer in between two topics in google cloud pub sub 【发布时间】:2021-10-08 18:40:22 【问题描述】:我们有一个用例,在 UI 的任何操作上,我们需要同步读取来自 google pub/sub 主题 A 的消息并将这些消息移动到主题 B。
以下是为处理此行为而编写的代码,它来自 Google Pub Sub 文档,用于同步访问主题。
public static int subscribeSync(String projectId, String subscriptionId, Integer numOfMessages, int count, String acknowledgementTopic) throws IOException
SubscriberStubSettings subscriberStubSettings =
SubscriberStubSettings.newBuilder()
.setTransportChannelProvider(
SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
.setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).
.build())
.build();
try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings))
String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
PullRequest pullRequest =
PullRequest.newBuilder()
.setMaxMessages(numOfMessages)
.setSubscription(subscriptionName)
.build();
// Use pullCallable().futureCall to asynchronously perform this operation.
PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
List<String> ackIds = new ArrayList<>();
for (ReceivedMessage message : pullResponse.getReceivedMessagesList())
// START - CODE TO PUBLISH MESSAGE TO TOPIC B
**publishMessage(message.getMessage(),acknowledgementTopic,projectId);**
// END - CODE TO PUBLISH MESSAGE TO TOPIC B
ackIds.add(message.getAckId());
// Acknowledge received messages.
AcknowledgeRequest acknowledgeRequest =
AcknowledgeRequest.newBuilder()
.setSubscription(subscriptionName)
.addAllAckIds(ackIds)
.build();
// Use acknowledgeCallable().futureCall to asynchronously perform this operation.
subscriber.acknowledgeCallable().call(acknowledgeRequest);
count=pullResponse.getReceivedMessagesList().size();
catch(Exception e)
log.error(e.getMessage());
return count;
以下是向主题 B 发布消息的示例代码
public static void publishMessage(PubsubMessage pubsubMessage,String Topic,String projectId)
Publisher publisher = null;
ProjectTopicName topicName =ProjectTopicName.newBuilder().setProject(projectId).setTopic(Topic).build();
try
// Publish the messages to normal topic.
publisher = Publisher.newBuilder(topicName).build();
catch (IOException e)
log.error(e.getMessage());
publisher.publish(pubsubMessage);
这是处理此用例的正确方法还是可以以其他方式处理。我们不想使用 Cloud Dataflow。有人可以让我们知道这是否正常或有问题。 该代码有效,但有时消息即使在同步消费之后仍保留在主题 A 上。 谢谢'
【问题讨论】:
您能否详细解释一下您的用例以及为什么需要从 A 获取消息然后发布到 B? @guillaume blaquiere 此用例要求消息必须根据按钮单击等 UI 操作从主题 A 移动到 B。 【参考方案1】:显示的代码存在一些问题。
你真的应该只在有特定原因需要这样做的情况下使用同步拉取。一般来说,最好使用asynchronous pull via the client libraries。它将更有效并减少将消息从一个主题移动到另一个主题的延迟。您没有展示如何调用subscribeSync
,但为了有效地处理消息并确保您实际处理所有消息,您需要连续多次并行调用它。如果您要坚持使用同步拉取,那么您应该重用 SubscriberStub
对象,因为每次调用都重新创建它是低效的。
您不会重复使用您的 Publisher
对象。因此,您无法利用发布者客户端可以执行的batching。您应该创建一次 Publisher
并在您的调用中重复使用它以发布到同一主题。如果传入的主题可能因消息而异,则保留从主题到发布者的映射,并从映射中检索正确的主题。
您无需等待调用publish
的结果。此调用可能会失败,但您不处理该失败。结果,您可以在第一个主题上确认消息而没有实际发布,从而导致消息丢失。
关于您关于重复的问题,Pub/Sub 提供至少一次交付保证,因此即使正确确认,仍然有可能再次接收消息(典型的重复率约为 0.1%)。重复可能有许多不同的原因。在您的情况下,由于您正在按顺序处理消息并为每次调用重新创建发布者,因此可能是在确认截止日期到期之前未确认后续消息,从而导致重新传递。
【讨论】:
以上是关于google cloud pub sub中两个主题之间的消息传输的主要内容,如果未能解决你的问题,请参考以下文章
Google Cloud Pub/Sub - 捕获发送到死信主题的消息的消息传递失败原因[关闭]
Google Pub/Sub + Cloud Run 生成多个容器
如何通过转发到死信主题来限制 Google Pub/Sub 交付尝试?