gRPC 服务器端流式传输:如何无限期地继续流式传输?

Posted

技术标签:

【中文标题】gRPC 服务器端流式传输:如何无限期地继续流式传输?【英文标题】:gRPC Server-Side Streaming: How to Continue Stream Indefinitely? 【发布时间】:2020-01-16 16:48:20 【问题描述】:

我在使用 NodeJS 编写的轻量级 gRPC 服务器时遇到问题。我正在参考文档here。我已经能够编译代表消息和服务的 proto 文件,并且已经成功地使用服务器端流方法建立了 gRPC 服务器,我可以通过 BloomRPC 触发。

我有一个名为parcel 的原始消息,它有一个字段:parcel_id。我希望这种方法每秒传输一个数据包。我在这方面的第一个基本步骤是每秒执行一分钟的循环,并通过call.write(parcel) 应用一个新包裹。我已经包含了下面的方法,当我通过 gRPC 调用它时它执行没有错误。

/**
 * Implements the updateParcel RPC method.
 * Feeds new parcel to the passed in "call" param
 * until the simulation is stopped.
 */
function updateParcels(call) 
  console.log("Parcels requested...");

  // Continuously stream parcel protos to requester
  let i = 0;
  let id = 0;
  while(i < 60)
    // Create dummy parcel
    let parcel = new messages.Parcel();
    parcel.setParcelId(id);
    id++;// Increment id

    // Write parcel to call object
    console.log("Sending parcel...");
    call.write(parcel);

    // Sleep for a second (1000 millis) before repeating
    sleep(1000);
  
  call.end();

我的问题是,虽然我能够调用我的方法并接收结果,但行为是我在客户端立即收到第一个结果(对于 NodeJS 客户端代码 BloomRPC 调用),但只有在服务器执行 call.end() 后才能一次性收到最后 59 个结果。没有错误,并且我在客户端收到的包裹对象是准确的并且格式正确,它们只是按照描述进行了批处理。

我怎样才能实现我的包裹实时源源不断?这可能吗?我看过但不能确定 - gRPC 服务器端流默认情况下是否具有批处理行为? 我已尽力理解 gRPC 文档,但我无法确定我是否' m 只是试图强制 gRPC 服务器端流做一些他们不打算做的事情。感谢您的帮助,如果我可以提供更多信息,请告诉我,因为这是我的第一个与 gRPC 相关的 SO 问题,我可能错过了一些相关信息。

【问题讨论】:

它可能与 gRPC 无关,你的 sleep 在那里实现是什么? node 提供的默认值是一个 promise,因此您可能必须将该函数声明为 async 并调用 await sleep(1000) 才能使其工作。 @mrbm 就是这样。很高兴你指出了这一点。我只是使用另一个 javascript 项目的实用程序文件夹中的睡眠功能 - 在这种情况下碰巧不起作用。采纳您的建议后完全解决。 @mrbm 如果您将该评论作为答案发布,我将很乐意接受。 很高兴为您提供帮助! 【参考方案1】:

它可能与 gRPC 无关,但与那里使用的 sleep 实现有关。

node 提供的默认值是一个 Promise,因此要使其工作,您可能必须将函数声明为 async 并调用 await sleep(1000);

【讨论】:

以上是关于gRPC 服务器端流式传输:如何无限期地继续流式传输?的主要内容,如果未能解决你的问题,请参考以下文章

gRPC 流式传输极简入门指南

gRPC 流式传输极简入门指南

gRPC 客户端不使用服务器端流

gRPC - Firestore 如何实现服务器-> 客户端实时流式传输

gRPC之流式调用原理http2协议分析

检查 gRPC 流何时为空或不是流式传输数据