google cloud pub sub中两个主题之间的消息传输

Posted

技术标签:

【中文标题】google cloud pub sub中两个主题之间的消息传输【英文标题】:Message transfer in between two topics in google cloud pub sub 【发布时间】:2021-10-08 18:40:22 【问题描述】:

我们有一个用例,在 UI 的任何操作上,我们需要同步读取来自 google pub/sub 主题 A 的消息并将这些消息移动到主题 B。

以下是为处理此行为而编写的代码,它来自 Google Pub Sub 文档,用于同步访问主题。

   public static int subscribeSync(String projectId, String subscriptionId, Integer numOfMessages, int count, String acknowledgementTopic) throws IOException 
    SubscriberStubSettings subscriberStubSettings =
            SubscriberStubSettings.newBuilder()
            .setTransportChannelProvider(
                    SubscriberStubSettings.defaultGrpcTransportProviderBuilder()
                    .setMaxInboundMessageSize(20 * 1024 * 1024) // 20MB (maximum message size).
                    .build())
            .build(); 

    try (SubscriberStub subscriber = GrpcSubscriberStub.create(subscriberStubSettings)) 
        String subscriptionName = ProjectSubscriptionName.format(projectId, subscriptionId);
        PullRequest pullRequest =
                PullRequest.newBuilder()
                .setMaxMessages(numOfMessages)
                .setSubscription(subscriptionName)
                .build(); 

        // Use pullCallable().futureCall to asynchronously perform this operation.
        PullResponse pullResponse = subscriber.pullCallable().call(pullRequest);
        List<String> ackIds = new ArrayList<>();
        for (ReceivedMessage message : pullResponse.getReceivedMessagesList()) 
            // START - CODE TO PUBLISH MESSAGE TO TOPIC B
            **publishMessage(message.getMessage(),acknowledgementTopic,projectId);**
            // END - CODE TO PUBLISH MESSAGE TO TOPIC B
            ackIds.add(message.getAckId());
        
        // Acknowledge received messages.
        AcknowledgeRequest acknowledgeRequest =
                AcknowledgeRequest.newBuilder()
                .setSubscription(subscriptionName)
                .addAllAckIds(ackIds)
                .build();
        
        // Use acknowledgeCallable().futureCall to asynchronously perform this operation.
        subscriber.acknowledgeCallable().call(acknowledgeRequest);
        count=pullResponse.getReceivedMessagesList().size();
    catch(Exception e) 
        log.error(e.getMessage());
    
    return count;

以下是向主题 B 发布消息的示例代码

public static void publishMessage(PubsubMessage pubsubMessage,String Topic,String projectId) 
    Publisher publisher = null;
    ProjectTopicName topicName =ProjectTopicName.newBuilder().setProject(projectId).setTopic(Topic).build();
    try 
        // Publish the messages to normal topic.
        publisher = Publisher.newBuilder(topicName).build();
     catch (IOException e) 
        log.error(e.getMessage());
    

    publisher.publish(pubsubMessage);


这是处理此用例的正确方法还是可以以其他方式处理。我们不想使用 Cloud Dataflow。有人可以让我们知道这是否正常或有问题。 该代码有效,但有时消息即使在同步消费之后仍保留在主题 A 上。 谢谢'

【问题讨论】:

您能否详细解释一下您的用例以及为什么需要从 A 获取消息然后发布到 B? @guillaume blaquiere 此用例要求消息必须根据按钮单击等 UI 操作从主题 A 移动到 B。 【参考方案1】:

显示的代码存在一些问题。

    你真的应该只在有特定原因需要这样做的情况下使用同步拉取。一般来说,最好使用asynchronous pull via the client libraries。它将更有效并减少将消息从一个主题移动到另一个主题的延迟。您没有展示如何调用subscribeSync,但为了有效地处理消息并确保您实际处理所有消息,您需要连续多次并行调用它。如果您要坚持使用同步拉取,那么您应该重用 SubscriberStub 对象,因为每次调用都重新创建它是低效的。

    您不会重复使用您的 Publisher 对象。因此,您无法利用发布者客户端可以执行的batching。您应该创建一次 Publisher 并在您的调用中重复使用它以发布到同一主题。如果传入的主题可能因消息而异,则保留从主题到发布者的映射,并从映射中检索正确的主题。

    您无需等待调用publish 的结果。此调用可能会失败,但您不处理该失败。结果,您可以在第一个主题上确认消息而没有实际发布,从而导致消息丢失。

关于您关于重复的问题,Pub/Sub 提供至少一次交付保证,因此即使正确确认,仍然有可能再次接收消息(典型的重复率约为 0.1%)。重复可能有许多不同的原因。在您的情况下,由于您正在按顺序处理消息并为每次调用重新创建发布者,因此可能是在确认截止日期到期之前未确认后续消息,从而导致重新传递。

【讨论】:

以上是关于google cloud pub sub中两个主题之间的消息传输的主要内容,如果未能解决你的问题,请参考以下文章

Google Cloud Pub/Sub - 捕获发送到死信主题的消息的消息传递失败原因[关闭]

Google Cloud Pub/Sub 发布者生命周期

Google Pub/Sub + Cloud Run 生成多个容器

如何通过转发到死信主题来限制 Google Pub/Sub 交付尝试?

从 Compute Engine 发布到 Cloud Pub/Sub 主题时的 DEADLINE_EXCEEDED

Google Cloud Pub/Sub,使用 HTTP PUSH 请求发布