无法使用 .net 客户端 sdk 使用来自 Google PubSub 模拟器的消息

Posted

技术标签:

【中文标题】无法使用 .net 客户端 sdk 使用来自 Google PubSub 模拟器的消息【英文标题】:Can't consume messages from Google PubSub emulator using .net client sdk 【发布时间】:2021-12-13 22:18:46 【问题描述】:

启动 Google PubSub 模拟器并向其发送一些消息后,我启动了被测系统,当我尝试启动 SubscriberClient 时,出现异常:

Grpc.Core.RpcException
  HResult=0x80131500
  Message=Status(StatusCode="NotFound", Detail="Subscription does not exist (resource=franchisingSalesReceipts-motorpromo-sub-dl)", DebugException="Grpc.Core.Internal.CoreErrorDetailException: "created":"@1635433978.567000000","description":"Error received from peer ipv4:127.0.0.1:8085","file":"..\..\..\src\core\lib\surface\call.cc","file_line":1067,"grpc_message":"Subscription does not exist (resource=franchisingSalesReceipts-motorpromo-sub-dl)","grpc_status":5")
  Source=Google.Cloud.PubSub.V1
  StackTrace:
   at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.HandleRpcFailure(Exception e)
   at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.HandlePullMessageData(Task`1 moveNextTask)
   at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.<>c__DisplayClass46_0.<HandlePullMoveNext>b__1()
   at Google.Cloud.PubSub.V1.SubscriberClientImpl.SingleChannel.<StartAsync>d__38.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at Google.Cloud.PubSub.V1.Tasks.ForwardingAwaiter.GetResult()
   at Google.Cloud.PubSub.V1.Tasks.Extensions.<>c__DisplayClass4_0.<<ConfigureAwaitHideErrors>g__Inner|0>d.MoveNext()
   at System.Runtime.ExceptionServices.ExceptionDispatchInfo.Throw()
   at System.Runtime.CompilerServices.TaskAwaiter.ThrowForNonSuccess(Task task)
   at System.Runtime.CompilerServices.TaskAwaiter.HandleNonSuccessAndDebuggerNotification(Task task)
   at System.Runtime.CompilerServices.ConfiguredTaskAwaitable.ConfiguredTaskAwaiter.GetResult()
   at PromotionEngine.OneToOne.Infrastructure.Services.GooglePubSubConsumerService.<RegisterEventHandler>d__6`2.MoveNext() in C:\Users\jmourao\workspace\MotorPromocao\PromotionEngine.OneToOne\src\PromotionEngine.OneToOne.Infrastructure\Services\GooglePubSubConsumerService.cs:line 57

  This exception was originally thrown at this call stack:
    [External Code]
    PromotionEngine.OneToOne.Infrastructure.Services.GooglePubSubConsumerService.RegisterEventHandler<TWorker, TMessage>(string) in GooglePubSubConsumerService.cs

重现步骤

使用此命令启动模拟器后:

gcloud beta emulators pubsub start --project=integracao-apigee-dev

我使用以下方式连接发布者:

public GcpEmulatorPublisherService(
            ILogger<GcpEmulatorPublisherService> logger,
            IConfiguration config)
        
            _logger = logger;

            string endpoint = config.GetValue<string>("GoogleCloudPubSub:host");
            string projectId = config.GetValue<string>("GoogleCloudPubSub:projectId");
            string[] topics = config.GetSection("GoogleCloudPubSub:topics").Get<string[]>();
            string currentTopicId = config.GetValue<string>("GoogleCloudPubSub:currentTopicId");

            PublisherServiceApiClient publisherService = new PublisherServiceApiClientBuilder
            
                Endpoint = endpoint,
                ChannelCredentials = ChannelCredentials.Insecure
            .Build();

            foreach (string topicId in topics)
            
                TopicName topicName = new TopicName(projectId, topicId);
                try
                
                    publisherService.CreateTopic(topicName);
                
                catch (RpcException ex) when (ex.StatusCode == StatusCode.AlreadyExists)
                
                    _logger.LogInformation($"Topic already exists: ProjectId projectId TopicId topicId");
                
            

            var settings = new PublisherClient.ClientCreationSettings(
                credentials: ChannelCredentials.Insecure,
                serviceEndpoint: endpoint
            );

            _publisherClient = PublisherClient.Create(new TopicName(projectId, currentTopicId), settings);
        

之后,我只需使用不安全凭据和 localhost ServiceEndpoint 启动我的待测系统

SubscriberClient.ClientCreationSettings clientCreationSettings = new(clientCount: 1, credentials: ChannelCredentials.Insecure, serviceEndpoint: "localhost:8085");

var subscriberClient = await SubscriberClient.CreateAsync(subscription, clientCreationSettings);

到这里之前一切正常。

当我尝试像这样开始订阅时:

await subscriberClient.StartAsync(async (PubsubMessage message, CancellationToken cancellationToken) =>
                
                    try
                    
                        TMessage messageBody = ConvertMessageBody<TMessage>(message);
                        TWorker worker = DependencyResolver.GetRequiredService<TWorker>();
                        await worker.ProcessAsync(messageBody);
                        return await Task.FromResult(SubscriberClient.Reply.Ack);
                    
                    catch (Exception exception)
                    
                        string exceptionMessage = string.Format(
                            "Exception reading message from subscription from supplier 0. See inner exception for details. Message=1",
                            topicName, exception.Message);
                        Exception trackedException = new(exceptionMessage, exception);
                        _telemetryWrapper.TrackException(trackedException);
                        _logger.LogError(trackedException, "Error processing message");
                        return await Task.FromResult(SubscriberClient.Reply.Nack);
                    
                ).ConfigureAwait(false);

我得到了上述异常。

【问题讨论】:

代码中的哪一行导致了错误? 等待subscriberClient.StartAsync(...) 【参考方案1】:

我发现了导致问题的原因。

我没有创建我试图与我的被测系统(消费者)一起使用的订阅。

我不知道您必须创建一个主题和一个订阅,以便消费者应用程序可以使用此订阅来接收消息。

所以在我的 Publisher 测试应用程序中,我正在创建所需的订阅,就像在文档中一样:

// Subscribe to the topic.
SubscriberServiceApiClient subscriberService = await SubscriberServiceApiClient.CreateAsync();
SubscriptionName subscriptionName = new SubscriptionName(projectId, subscriptionId);
subscriberService.CreateSubscription(subscriptionName, topicName, pushConfig: null, ackDeadlineSeconds: 60);

我还发现,如果您尝试使用与 topicId 相同的 subscriptionId 创建订阅,它将无法正常工作。

【讨论】:

以上是关于无法使用 .net 客户端 sdk 使用来自 Google PubSub 模拟器的消息的主要内容,如果未能解决你的问题,请参考以下文章

记录问题:goland无法识别sdk的问题

使用 .Net 客户端 SDK 获取 Azure DevOps 团队

chainmaker-go-sdk 查看客户端日志

Go语言安装和配置SDK

Go 每日一库之 jsonrpc:来自标准库

如果无法识别引用,则使用Microsoft.NET.Sdk.Razor时发布失败,但构建工作正常