拉取请求上的 GCP Pub/Sub 过滤

Posted

技术标签:

【中文标题】拉取请求上的 GCP Pub/Sub 过滤【英文标题】:GCP Pub/Sub filtering on the pull request 【发布时间】:2021-11-26 10:46:52 【问题描述】:

我想利用与 GCP CLI 类似的拉取请求来进行 Pub/Sub 订阅:

gcloud pubsub 订阅 pull --filter

我希望在 Java 客户端库中利用相同的功能。

有没有办法做到这一点?

谢谢。

【问题讨论】:

【参考方案1】:

如果您正在寻找执行 pubsub 的 java 客户端库,请查找以下文档。如果您需要特定的东西,请正确更新您的问题

https://cloud.google.com/pubsub/docs/quickstart-client-libraries#pubsub-client-libraries-java

【讨论】:

【参考方案2】:

gcloud 中的--filter 选项不是 Pub/Sub 或服务所固有的,而是gcloud 命令基础架构本身提供的实用程序。过滤完全在客户端完成。另请注意,这仅影响消息列表的显示,而不影响实际返回的消息。如果你运行gcloud topic filters,你可以看到更多关于这个功能的细节:

大多数 gcloud 命令会在成功时返回资源列表。默认 它们在标准输出上打印得很漂亮。这 --format=NAMEATTRIBUTES 和 --filter=EXPRESSION 标志以及投影可用于格式化和更改默认值 输出更有意义的结果。

因此,如果您想在 Java 中执行此操作,则需要编写代码以在接收消息时应用过滤器。根据Java asynchronous pull sample,您需要将消息接收者更改为:

  private boolean shouldProcessMessage(PubsubMessage message) 
    // Change to perform whatever filtering you want on messages
    // to determine if they should be processed.
    return true;
  

  private void processMessage(PubsubMessage message) 
    // Put logic here to handle the message.
  

  ...
    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> 
          if (shouldProcessMessage(message)) 
            processMessage(message);
          
          consumer.ack();
        ;

这是假设您不希望再次递送与您的过滤器不匹配的邮件。如果您确实希望再次发送它们,则需要在这些消息上调用 consumer.nack() 而不是 consumer.ack()

如果您要进行的所有过滤都针对消息属性,那么您可以利用Pub/Sub's built-in filtering。此功能允许您检查属性是否存在,检查属性值是否相等,以及检查属性值的前缀。这种类型的过滤器被声明为订阅创建的一部分,因此除非您以编程方式创建订阅,否则您不会有任何与之关联的 Java 代码。如果您使用这种类型的过滤,与过滤器不匹配的消息不会传递给您的订阅者,因此您的MessageReceiver 不需要检查它是否应该处理此类消息;它可以假设它收到的唯一消息是与过滤器匹配的消息。

【讨论】:

以上是关于拉取请求上的 GCP Pub/Sub 过滤的主要内容,如果未能解决你的问题,请参考以下文章

使用 Tibco 和 GCP Pub/Sub 将旧的 On-Prem .Net 应用程序集成到 GCP

验证数据已发送到 GCP Pub/Sub

确认后 GCP 消息保留在 Pub/Sub 中

如何将 blob 文件发布到 GCP Pub/Sub?

python Python:订阅GCP Pub / Sub

python Python:订阅GCP Pub / Sub