如何同时从 grpc 流中读取和写入

Posted

技术标签:

【中文标题】如何同时从 grpc 流中读取和写入【英文标题】:How can I read and write from a grpc stream simultaneously 【发布时间】:2019-02-01 14:21:58 【问题描述】:

我现在正在实现 Raft 算法,我想使用 gRPC 流来执行此操作。我的主要想法是为每个节点创建 3 个流到每个其他对等点,一个流将传输一种类型的 RPC,有AppendEntriesRequestVoteInstallSnapshot。我在route_guide 的有限帮助下编写了一些代码,因为在其双向流演示RouteChat 中,客户端在开始读取之前发送了所有数据。

首先,我想随时写入流,所以我写了以下代码

void RaftMessagesStreamClientSync::AsyncRequestVote(const RequestVoteRequest& request)
    std::string peer_name = this->peer_name;
    debug("GRPC: Send RequestVoteRequest from %s to %s\n", request.name().c_str(), peer_name.c_str());
    request_vote_stream->Write(request);

同时,我希望一个线程继续从流中读取,如以下代码,在构造 RaftMessagesStreamClientSync 后立即调用。

void RaftMessagesStreamClientSync::handle_response()
    // strongThis is a must 
    auto strongThis = shared_from_this();
    t1 = new std::thread([strongThis]()
        RequestVoteResponse response;
        while (strongThis->request_vote_stream->Read(&response)) 
            debug("GRPC: Recv RequestVoteResponse from %s, me %s\n", response.name().c_str(), strongThis->raft_node->name.c_str());
            ...
        
    );
    ...

为了初始化 3 个流,我必须这样写构造函数,我在这里使用 3 ClientContext 因为document says one ClientContext for one RPC

struct RaftMessagesStreamClientSync : std::enable_shared_from_this<RaftMessagesStreamClientSync>
    typedef grpc::ClientReaderWriter<RequestVoteRequest, RequestVoteResponse> CR;
    typedef grpc::ClientReaderWriter<AppendEntriesRequest, AppendEntriesResponse> CA;
    typedef grpc::ClientReaderWriter<InstallSnapshotRequest, InstallSnapshotResponse> CI;

    std::unique_ptr<CR> request_vote_stream;
    std::unique_ptr<CA> append_entries_stream;
    std::unique_ptr<CI> install_snapshot_stream;
    ClientContext context_r;
    ClientContext context_a;
    ClientContext context_i;
    std::thread * t1 = nullptr;
    std::thread * t2 = nullptr;
    std::thread * t3 = nullptr;
    ...

RaftMessagesStreamClientSync::RaftMessagesStreamClientSync(const char * addr, struct RaftNode * _raft_node) : raft_node(_raft_node), peer_name(addr) 
    std::shared_ptr<Channel> channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
    stub = raft_messages::RaftStreamMessages::NewStub(channel);
    // 1
    request_vote_stream = stub->RequestVote(&context_r);
    // 2
    append_entries_stream = stub->AppendEntries(&context_a);
    // 3
    install_snapshot_stream = stub->InstallSnapshot(&context_i);    

~RaftMessagesStreamClientSync() 
    raft_node = nullptr;
    t1->join();
    t2->join();
    t3->join();
    delete t1;
    delete t2;
    delete t3;

然后我实现服务端

Status RaftMessagesStreamServiceImpl::RequestVote(ServerContext* context, ::grpc::ServerReaderWriter< ::raft_messages::RequestVoteResponse, RequestVoteRequest>* stream)
    RequestVoteResponse response;
    RequestVoteRequest request;
    while (stream->Read(&request)) 
        ...
    

    return Status::OK;

然后发生2个问题:

    当我使用 3 个节点进行测试时,实际上为每个节点创建了 2 个RaftMessagesStreamServiceImpl,从 1 到 3 的语句执行时间很长。 没有从服务器端接收到 RPC。 使用Bidi Aysnc Server时也有类似的问题,但是我不知道这篇文章对我有什么帮助。

更新

经过一些调试,我发现request_vote_stream-&gt;Write(request)返回0,根据document, means the stream is closed。可是为什么关了呢?

【问题讨论】:

您的代码是 C++。如果您根本不使用C语言,请不要添加C语言标签。 【参考方案1】:

经过一番调试,我发现这两个问题都是由于我在创建服务器之前创建客户端的一个问题。

因为我最初使用一元RPC调用,所以之前来自客户端的调用只会导致gRPC错误代码14。程序继续,因为在服务器创建后发送的每个调用都可以正确处理。

但是,当涉及到流式调用时,stub-&gt;RequestVote(&amp;context_r) 最终会调用一个阻塞函数ClientReaderWriter::ClientReaderWriter,它会尝试连接到现在没有创建的服务器。

/// Block to create a stream and write the initial metadata and \a request
/// out. Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call.
ClientReaderWriter(::grpc::ChannelInterface* channel,
                 const ::grpc::internal::RpcMethod& method,
                 ClientContext* context)
  : context_(context),
    cq_(grpc_completion_queue_attributes
        GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
        GRPC_CQ_DEFAULT_POLLING),  // Pluckable cq
    call_(channel->CreateCall(method, context, &cq_)) 
if (!context_->initial_metadata_corked_) 
  ::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
      ops;
  ops.SendInitialMetadata(context->send_initial_metadata_,
                          context->initial_metadata_flags());
  call_.PerformOps(&ops);
  cq_.Pluck(&ops);


因此,连接尚未建立。

【讨论】:

以上是关于如何同时从 grpc 流中读取和写入的主要内容,如果未能解决你的问题,请参考以下文章

如何从 BluetoothChat 的输入/输出流中读取/写入原始十六进制字节?

使用 IPC C# 时如何有效地从管道流中读取

如何在从 NodeJS 中的多个输入流中读取时写入单个文件

如何从/向流写入/读取位? (C#)

当多个进程尝试同时写入然后从文件中读取时,如何防止竞争条件

如何从 Spark 结构化流中的 Cassandra 等外部存储读取 Kafka 和查询?