如何同时从 grpc 流中读取和写入
Posted
技术标签:
【中文标题】如何同时从 grpc 流中读取和写入【英文标题】:How can I read and write from a grpc stream simultaneously 【发布时间】:2019-02-01 14:21:58 【问题描述】:我现在正在实现 Raft 算法,我想使用 gRPC 流来执行此操作。我的主要想法是为每个节点创建 3 个流到每个其他对等点,一个流将传输一种类型的 RPC,有AppendEntries
、RequestVote
和InstallSnapshot
。我在route_guide 的有限帮助下编写了一些代码,因为在其双向流演示RouteChat
中,客户端在开始读取之前发送了所有数据。
首先,我想随时写入流,所以我写了以下代码
void RaftMessagesStreamClientSync::AsyncRequestVote(const RequestVoteRequest& request)
std::string peer_name = this->peer_name;
debug("GRPC: Send RequestVoteRequest from %s to %s\n", request.name().c_str(), peer_name.c_str());
request_vote_stream->Write(request);
同时,我希望一个线程继续从流中读取,如以下代码,在构造 RaftMessagesStreamClientSync
后立即调用。
void RaftMessagesStreamClientSync::handle_response()
// strongThis is a must
auto strongThis = shared_from_this();
t1 = new std::thread([strongThis]()
RequestVoteResponse response;
while (strongThis->request_vote_stream->Read(&response))
debug("GRPC: Recv RequestVoteResponse from %s, me %s\n", response.name().c_str(), strongThis->raft_node->name.c_str());
...
);
...
为了初始化 3 个流,我必须这样写构造函数,我在这里使用 3 ClientContext
因为document says one ClientContext for one RPC
struct RaftMessagesStreamClientSync : std::enable_shared_from_this<RaftMessagesStreamClientSync>
typedef grpc::ClientReaderWriter<RequestVoteRequest, RequestVoteResponse> CR;
typedef grpc::ClientReaderWriter<AppendEntriesRequest, AppendEntriesResponse> CA;
typedef grpc::ClientReaderWriter<InstallSnapshotRequest, InstallSnapshotResponse> CI;
std::unique_ptr<CR> request_vote_stream;
std::unique_ptr<CA> append_entries_stream;
std::unique_ptr<CI> install_snapshot_stream;
ClientContext context_r;
ClientContext context_a;
ClientContext context_i;
std::thread * t1 = nullptr;
std::thread * t2 = nullptr;
std::thread * t3 = nullptr;
...
RaftMessagesStreamClientSync::RaftMessagesStreamClientSync(const char * addr, struct RaftNode * _raft_node) : raft_node(_raft_node), peer_name(addr)
std::shared_ptr<Channel> channel = grpc::CreateChannel(addr, grpc::InsecureChannelCredentials());
stub = raft_messages::RaftStreamMessages::NewStub(channel);
// 1
request_vote_stream = stub->RequestVote(&context_r);
// 2
append_entries_stream = stub->AppendEntries(&context_a);
// 3
install_snapshot_stream = stub->InstallSnapshot(&context_i);
~RaftMessagesStreamClientSync()
raft_node = nullptr;
t1->join();
t2->join();
t3->join();
delete t1;
delete t2;
delete t3;
然后我实现服务端
Status RaftMessagesStreamServiceImpl::RequestVote(ServerContext* context, ::grpc::ServerReaderWriter< ::raft_messages::RequestVoteResponse, RequestVoteRequest>* stream)
RequestVoteResponse response;
RequestVoteRequest request;
while (stream->Read(&request))
...
return Status::OK;
然后发生2个问题:
-
当我使用 3 个节点进行测试时,实际上为每个节点创建了 2 个
RaftMessagesStreamServiceImpl
,从 1 到 3 的语句执行时间很长。
没有从服务器端接收到 RPC。
使用Bidi Aysnc Server时也有类似的问题,但是我不知道这篇文章对我有什么帮助。
更新
经过一些调试,我发现request_vote_stream->Write(request)
返回0,根据document, means the stream is closed。可是为什么关了呢?
【问题讨论】:
您的代码是 C++。如果您根本不使用C语言,请不要添加C语言标签。 【参考方案1】:经过一番调试,我发现这两个问题都是由于我在创建服务器之前创建客户端的一个问题。
因为我最初使用一元RPC调用,所以之前来自客户端的调用只会导致gRPC错误代码14。程序继续,因为在服务器创建后发送的每个调用都可以正确处理。
但是,当涉及到流式调用时,stub->RequestVote(&context_r)
最终会调用一个阻塞函数ClientReaderWriter::ClientReaderWriter
,它会尝试连接到现在没有创建的服务器。
/// Block to create a stream and write the initial metadata and \a request
/// out. Note that \a context will be used to fill in custom initial metadata
/// used to send to the server when starting the call.
ClientReaderWriter(::grpc::ChannelInterface* channel,
const ::grpc::internal::RpcMethod& method,
ClientContext* context)
: context_(context),
cq_(grpc_completion_queue_attributes
GRPC_CQ_CURRENT_VERSION, GRPC_CQ_PLUCK,
GRPC_CQ_DEFAULT_POLLING), // Pluckable cq
call_(channel->CreateCall(method, context, &cq_))
if (!context_->initial_metadata_corked_)
::grpc::internal::CallOpSet<::grpc::internal::CallOpSendInitialMetadata>
ops;
ops.SendInitialMetadata(context->send_initial_metadata_,
context->initial_metadata_flags());
call_.PerformOps(&ops);
cq_.Pluck(&ops);
因此,连接尚未建立。
【讨论】:
以上是关于如何同时从 grpc 流中读取和写入的主要内容,如果未能解决你的问题,请参考以下文章