无证约束?发布到主题 *from* pubsub 触发器

Posted

技术标签:

【中文标题】无证约束?发布到主题 *from* pubsub 触发器【英文标题】:Undocumented Constraint? publishing to topic *from* pubsub trigger 【发布时间】:2021-09-20 05:12:07 【问题描述】:

我不知道我是不是疯了,或者这是一个没有记录的限制(我已经搜索了 GCP API 文档):

是否可以在“主题 A”上有一个带有 pubsub 触发器的云函数,并在该云函数中向“主题 B”发布一条消息。

我已经尝试了所有其他运行相同代码的触发器(云函数作为 HTTP 触发器、云存储触发器、Firebase 触发器),它们都成功发布到主题。 但是,当我(几乎是字面意思)将我的代码复制粘贴到 pubsub 触发器中时,在使用消息之后,当它尝试将它自己的消息发布到下一个主题时,它只是挂起。尝试发布时,该函数只是超时

所以回顾一下,在 GCP 中是否可以执行以下操作?

PubSub Topic A --> Cloud Function --> Pubsub Topic B

提前感谢您的澄清!这一切都在 Java 11 中。这是代码:

...<bunch of imports>

public class SignedURLGenerator implements BackgroundFunction<PubSubMessage> 
  private static final String PROJECT_ID = System.getenv("GOOGLE_CLOUD_PROJECT");
  private static final Logger logger = Logger.getLogger(SignedURLGenerator.class.getName());

  /**
  * Handle the incoming PubsubMessage
  **/
 @Override
  public void accept(PubSubMessage message, Context context) throws IOException, InterruptedException 
    String data = new String(Base64.getDecoder().decode(message.data));
    System.out.println("The input message is: " + data.toString());

    //Do a bunch of other stuff not relevant to the issue at hand...

    publishSignedURL(url.toString());
  

  //Here's the interesting part
  public static void publishSignedURL(String message) throws IOException, InterruptedException 
    String topicName = "url-ready-notifier";
    String responseMessage;
    Publisher publisher = null;
    

    try 
      // Create the PubsubMessage object
      ByteString byteStr = ByteString.copyFrom(message, StandardCharsets.UTF_8);
      PubsubMessage pubsubApiMessage = PubsubMessage.newBuilder().setData(byteStr).build();
      System.out.println("Message Constructed:" + message); 
      //This part works fine, the message gets constructed

      publisher = Publisher.newBuilder(ProjectTopicName.of(PROJECT_ID, topicName)).build();
      System.out.println("Publisher Created.");
      //This part also works fine, the publisher gets created

      publisher.publish(pubsubApiMessage).get();
      responseMessage = "Message published.";
      //The code NEVER GETS HERE.  The message is never published.  And eventually the cloud function time's out :(    

     catch (InterruptedException | ExecutionException e) 
        System.out.println("Something went wrong with publishing: " + e.getMessage());
      

    System.out.println("Everything wrapped up.");

  

编辑 根据要求,这是我当前的 POM

    <?xml version="1.0" encoding="UTF-8"?>
    <project xmlns="http://maven.apache.org/POM/4.0.0"
             xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
             xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
      <modelVersion>4.0.0</modelVersion>
      <groupId>cloudfunctions</groupId>
      <artifactId>pubsub-function</artifactId>
      <version>1.0-SNAPSHOT</version>
    
      <properties>
        <maven.compiler.target>11</maven.compiler.target>
        <maven.compiler.source>11</maven.compiler.source>
      </properties>
    
      <dependencies>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>libraries-bom</artifactId>
          <version>20.6.0</version>
          <type>pom</type>
          <scope>import</scope>
      </dependency>
        <dependency>
          <groupId>com.google.cloud.functions</groupId>
          <artifactId>functions-framework-api</artifactId>
          <version>1.0.1</version>
          <type>jar</type>
        </dependency>
        <dependency>
          <groupId>com.google.cloud</groupId>
          <artifactId>google-cloud-storage</artifactId>
          <version>1.117.1</version>
        </dependency>
        <dependency>
         <groupId>com.google.cloud</groupId>
          <artifactId>google-cloud-pubsub</artifactId>
          <version>1.113.4</version>
        </dependency>
        <dependency>
          <groupId>com.google.api</groupId>
          <artifactId>gax</artifactId>
          <version>1.66.0</version>
        </dependency>
        <dependency>
          <groupId>com.google.api</groupId>
          <artifactId>gax-grpc</artifactId>
          <version>1.66.0</version>
        </dependency>
        <dependency>
          <groupId>org.threeten</groupId>
          <artifactId>threetenbp</artifactId>
          <version>0.7.2</version>
        </dependency>    
      </dependencies>
    </project>

【问题讨论】:

【参考方案1】:

您可以尝试在发布者客户端中显式设置流控制参数吗?像这样

       publisher = Publisher.newBuilder(ProjectTopicName.of(PROJECT_ID, topicName)).setBatchingSettings(BatchingSettings.newBuilder()
                .setDelayThreshold(Duration.of(10, ChronoUnit.SECONDS))
                .setElementCountThreshold(1L)
                .setIsEnabled(true)
                .build()).build();

我不知道会发生什么,可能是 PubSub 的默认和全局配置。如果不是这样,我会删除这个答案。


编辑 1

这里是 Publisher 父类上的构建器类的屏幕截图

您拥有该库的所有默认值。但是,您观察到的行为是不正常的。即使您在 PubSub 触发器中,默认值也必须保持默认值。我将打开一个问题并直接将其转发给团队。

【讨论】:

嘿!有效!太棒了。但也很混乱。想到两个问题: 1. 为什么这个特定的代码需要流控制参数?为什么其他触发器都不需要它来发布? 2. 默认的 GCP 批量大小是多少?在发表此评论之前,我查看了多个 GCP 文档以查看默认值,但谷歌所说的只是“有默认值”,而不是默认值是什么......有什么想法吗? 我分享了一个屏幕截图。这个类很有趣,了解底层机制,但是你在 Cloud Functions 上看到的不正常! 信息:issuetracker.google.com/issues/193263948 谢谢 guillaume,我会关注这个问题并希望它得到解决。此外,通读 Publisher 类的 javadoc [googleapis.dev/java/gax/latest/com/google/api/gax/batching/… 显示默认的 Batch Size 为 1。 你能分享你的函数框架和 PubSub 的依赖版本(或你的完整 pom.xml 文件)。另外,您可以尝试使用所有库的最新版本吗?并基于此示例:github.com/GoogleCloudPlatform/java-docs-samples/tree/…

以上是关于无证约束?发布到主题 *from* pubsub 触发器的主要内容,如果未能解决你的问题,请参考以下文章

将消息发布到 GCP pubSub 主题失败

如何从 PubSub 主题读取数据并将其解析到光束管道中并打印

PubsubFileInjector 无法提交到 PubSub 主题

Google Dataflow:根据条件仅将消息输出到 PubSub 主题之一

如何从 Cloudflare 工作人员内部发布到 GCP PubSub 主题

如何将 GCP Pubsub 订阅的消息转发到另一个主题?