PubSub 模拟器 - ( 支持 Proto Buffer 发布/接收消息)

Posted

技术标签:

【中文标题】PubSub 模拟器 - ( 支持 Proto Buffer 发布/接收消息)【英文标题】:PubSub Emulator - ( Support Proto Buffer publish/receive msg) 【发布时间】:2021-12-13 11:37:16 【问题描述】:

我正在开发一个解决方案,使用一个通用的 Proto Buffer 库来发送和接收 msg,直接使用 proto 缓冲区序列化 (ByteString) 和从 (ByteString) 直接反序列化到同一个 Proto Buffer 类。到目前为止,我的解决方案无法正常工作。就在我使用真正的 PubSub 时。

基于The doc: Testing apps locally with the emulator 信息和knowing limitations 部分更具体:

模拟器不提供对协议缓冲区的架构支持。

虽然,我没有在主题/订阅中使用任何架构定义。只需以编程方式使用通用的原型缓冲区库。恐怕存在 Pubsub 仿真限制,因此我的解决方案不适用于仿真器。

欢迎向我的测试班提出任何澄清。

package com.example.pubsubgcpspringapplications;


import java.io.IOException;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.TimeoutException;

import com.alpian.common.pubsub.messages.OnfidoVerificationEvent;
import com.example.pubsubgcpspringapplications.config.PubSubTestConfig;
import com.example.pubsubgcpspringapplications.services.MessageRealGcpService;
import com.example.pubsubgcpspringapplications.util.DataGenerationUtils;
import com.google.api.core.ApiFuture;
import com.google.cloud.pubsub.v1.AckReplyConsumer;
import com.google.cloud.pubsub.v1.MessageReceiver;
import com.google.cloud.pubsub.v1.Publisher;
import com.google.cloud.pubsub.v1.Subscriber;
import com.google.protobuf.ByteString;
import com.google.protobuf.InvalidProtocolBufferException;
import com.google.protobuf.util.JsonFormat;
import com.google.pubsub.v1.ProjectSubscriptionName;
import com.google.pubsub.v1.PubsubMessage;
import lombok.SneakyThrows;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.Test;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.boot.test.context.SpringBootTest;
import org.springframework.test.context.ActiveProfiles;

//@ActiveProfiles("test")
public class EmulatorPubSubWithSpringTest 

  @BeforeAll
  static void startUpTests() throws IOException 
    PubSubTestConfig.setupPubSubEmulator();
  

  @SneakyThrows
  @Test
  void successfulTest() throws InterruptedException 

    var status = DataGenerationUtils.STATUS_COMPLETE;
    var result = DataGenerationUtils.RESULT_CLEAR;
    var subResult = DataGenerationUtils.SUB_RESULT_CLEAR;

    var documentReport = DataGenerationUtils.generateOnfidoDocumentReport(status, result, subResult);
    var facialSimilarityReport = DataGenerationUtils
        .generateOnfidoFacialSimiliratyVideoReport(status, result, subResult);

    OnfidoVerificationEvent.Builder builder = OnfidoVerificationEvent.newBuilder();
    builder.setCheckId(DataGenerationUtils.FAKE_CHECK_ID);
    builder.setApplicantId(DataGenerationUtils.FAKE_APPLICANT_ID);
    builder.setDocument(documentReport);
    builder.setFacialSimilarityVideo(facialSimilarityReport);
    OnfidoVerificationEvent onfidoVerificationEvent = builder.build();

    publishProtoMessageTest(onfidoVerificationEvent);

    MessageReceiver receiver =
        (PubsubMessage message, AckReplyConsumer consumer) -> 
          ByteString data = message.getData();

          // Get the schema encoding type.
          String encoding = message.getAttributesMap().get("googclient_schemaencoding");

          block:
          try 
            switch (encoding) 
              case "BINARY":
                // Obtain an object of the generated proto class.
                OnfidoVerificationEvent state = OnfidoVerificationEvent.parseFrom(data);
                System.out.println("Received a BINARY-formatted message: " + state);
                break;

              case "JSON":
                OnfidoVerificationEvent.Builder stateBuilder = OnfidoVerificationEvent.newBuilder();
                JsonFormat.parser().merge(data.toStringUtf8(), stateBuilder);
                System.out.println("Received a JSON-formatted message:" + stateBuilder.build());
                break;

              default:
                break block;
            
           catch (InvalidProtocolBufferException e) 
            e.printStackTrace();
          

          consumer.ack();
          System.out.println("Ack'ed the message");
        ;

    ProjectSubscriptionName subscriptionName =
        ProjectSubscriptionName.of(PubSubTestConfig.PROJECT_ID, PubSubTestConfig.SUBSCRIPTION_NAME);

    // Create subscriber client.
    Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver).build();

    try 
      subscriber.startAsync().awaitRunning();
      System.out.printf("Listening for messages on %s:\n", subscriptionName);
      subscriber.awaitTerminated(30, TimeUnit.SECONDS);
     catch (TimeoutException timeoutException) 
      subscriber.stopAsync();
    

    Thread.sleep(15000);

  

  public static void publishProtoMessageTest(OnfidoVerificationEvent onfidoVerificationEvent)
      throws IOException, ExecutionException, InterruptedException 

    Publisher publisher = null;

    block:
    try 
      publisher = Publisher.newBuilder("projects/my-project-id/topics/topic-one").build();
      PubsubMessage.Builder message = PubsubMessage.newBuilder();
      // Prepare an appropriately formatted message based on topic encoding.
      message.setData(onfidoVerificationEvent.toByteString());
      System.out.println("Publishing a BINARY-formatted message:\n" + message);

      // Publish the message.
      ApiFuture<String> future = publisher.publish(message.build());
      //System.out.println("Published message ID: " + future.get());

     finally 
      if (publisher != null) 
        publisher.shutdown();
        publisher.awaitTermination(1, TimeUnit.MINUTES);
      
    
  



注意:请,我只是从谷歌教程中复制了一些截图代码并对其进行了修改。我不想使用 JSON,只是使用 proto 文件发布和接收 msg。

提前非常感谢!

【问题讨论】:

当您说您的解决方案不起作用时,您是什么意思?你有错误吗? 是的,由于某种原因,我可以发布 ''' 已发布消息 ID:3372434705961298 ''' 但我无法收到消息 ''' 正在监听子二上的消息:2021-10 -28 14:12:57.210 错误 4570 --- [bscriber-SE-2-1] cgcpvStreamingSubscriberConnection:终止流式处理异常引起:com.google.api.gax.rpc.NotFoundException:io.grpc.StatusRuntimeException:NOT_FOUND : 找不到资源(资源=分二)。 ''' 但是,订阅已创建。我知道这一点,因为我可以在另一个测试中使用它。但是使用 Json 格式和 PubSubTemplate。 虽然,您的错误消息表明可能未创建订阅,可能是应用程序访问了错误的发布子实例。添加以下行来获取环境详细信息并打印它以检查您是否在正确的 PubSub 实例上:string emulatorHostAndPort = Environment.GetEnvironmentVariable("PUBSUB_EMULATOR_HOST"); 我所有的课程都使用相同的主机。我按照你的建议做了。谢谢! 【参考方案1】:

编辑:在 cmets 和另一个发布的答案中更好地说明模拟器。


正如您所指出的,PubSub 模拟器目前not support 使用 os protobuffer 消息,这就是您在代码中使用的(Snippets 来自 protobuf 模式类型的 Publish / Receive 消息),目前不支持。您可以尝试使用Avro schema type 在Google issue tracker 上打开feature request,以便在PubSub 模拟器中使用protobuffer 架构。

【讨论】:

是的,但我还是有点不舒服。因为当谷歌说。 “协议缓冲区的架构支持。”是一个限制。这意味着,PubSub Emulator 不允许在 Emulator Topic 创建中定义模式,或者我们无法使用 PubSub Emulator 测试 Protobuf msgs?因为就我而言,我只想使用 protobuf 发送和接收消息。但我不能使用 Protobuf 来做到这一点。我的代码只能使用真正的 PubSub。你对此有什么肯定吗? 从 sn-ps 中了解到,不支持使用 protobuf 模式创建过滤器和发布/接收消息的接缝。不推荐使用 PubSub 模拟器来测试生产环境。更好的方法是为此建立一个测试项目。 架构不适用于模拟器中的协议缓冲区的限制仅适用于您创建架构并将其附加到 Pub/Sub 主题的情况。没有什么可以阻止您将序列化的协议缓冲区作为消息中的数据实际发送。模拟器中的限制与 .proto 解析器仅在 C++ 中可用而模拟器是用 Java 编写的事实有关。 感谢@KamalAboul-Hosn 的澄清。 Alexandre,如果其他澄清对您有更好的帮助,请移动或删除接受。【参考方案2】:

“找不到资源”问题与 Pub/Sub 模拟器不支持协议缓冲区架构没有任何关系。如果您尝试以不受支持的方式使用协议缓冲区(这将创建一个使用PROTCOL_BUFFER 作为其typeSchema 对象),那么您将返回一个错误,特别是关于缺乏对协议缓冲区的支持模拟器中的模式。

您的问题看起来更像以下之一:

    订阅名称与您创建的订阅名称不匹配。 您实际上并未在模拟器中创建订阅,而是在实际的 Pub/Sub 服务中创建了它。 您没有通过设置 PUBSUB_EMULATOR_HOST 环境变量将订阅者指向模拟器。

您应该验证该订阅是否存在于模拟器中。您可以通过对它运行gcloud 工具来做到这一点。假设您使用以下命令启动了模拟器:

gcloud beta emulators pubsub start --project=my-test-project

如果这会在端口 8085 上启动您的模拟器,您可以通过运行以下命令检查您的订阅是否存在:

> CLOUDSDK_API_ENDPOINT_OVERRIDES_PUBSUB=http://localhost:8085/ gcloud --project my-test-topic pubsub subscriptions list

如果您在运行该命令时订阅不存在,则意味着您可能没有在模拟器中创建订阅,而是在实际服务中创建了它。如果您确实看到它,那么这可能意味着您的订阅者没有向模拟器发送请求,而是实际上向 Pub/Sub 服务本身发送请求。

【讨论】:

以上是关于PubSub 模拟器 - ( 支持 Proto Buffer 发布/接收消息)的主要内容,如果未能解决你的问题,请参考以下文章

本地 Pubsub 模拟器不适用于 Dataflow

是否可以从 Cloud Build 步骤启动 PubSub 模拟器

Firebase 模拟器:在函数中使用 PubSub

尝试在本地运行 PubSub 模拟器时出错

golang protobuf unknown字段透传

Firebase 函数不会看到 pubsub 模拟器在本地运行