检查 gRPC 流何时为空或不是流式传输数据
Posted
技术标签:
【中文标题】检查 gRPC 流何时为空或不是流式传输数据【英文标题】:Checking to see when a gRPC stream is empty or isn't streaming data 【发布时间】:2018-10-26 15:16:17 【问题描述】:我正在创建一个连接到服务器的存根,该服务器以特定间隔流式传输数据,然后将其上传到 TSDB。我已经实施了批处理来优化上传,但是如果在一个时间间隔内流出的数据量与批量大小不一致,那么一些数据将在下一个时间间隔之前不会被上传,这是我不想要的。 gRPC 存根上是否有办法检查流是否为空?
class DialInClient(object):
def __init__(self, host, port, timeout=100000000, user='root', password='lablab'):
self._host = host
self._port = port
self._timeout = float(timeout)
self._channel = None
self._cisco_ems_stub = None
self._connected = False
self._metadata = [('username', user), ('password', password)]
def subscribe(self, sub_id):
sub_args = CreateSubsArgs(ReqId=1, encode=3, subidstr=sub_id)
stream = self._cisco_ems_stub.CreateSubs(sub_args, timeout=self._timeout, metadata=self._metadata)
for segment in stream:
yield segment
def connect(self):
self._channel = grpc.insecure_channel(':'.join([self._host,self._port]))
try:
grpc.channel_ready_future(self._channel).result(timeout=10)
self._connected = True
except grpc.FutureTimeoutError as e:
raise DeviceFailedToConnect from e
else:
self._cisco_ems_stub = gRPCConfigOperStub(self._channel)
如果我设置了一个低超时,整个通道断开连接,我想在 for 循环中添加某种超时以进行流式传输,看看我是否在 1 秒内没有得到另一个片段 yield None
告诉我的另一个这就是结束的部分,并且在没有完整批量大小的情况下上传。
【问题讨论】:
【参考方案1】:GRPC 本身不存在这种机制,但threading
库应该允许您在批次满之前发送它们。我已经包含了 python GRPC hello world example 的修改版本,让您了解如何做到这一点。
from __future__ import print_function
import grpc
import helloworld_pb2
import helloworld_pb2_grpc
import threading
from six.moves import queue
import time
# 10 second batches
BATCH_PERIOD = 10.0
def collect_responses(resp_queue, finished):
with grpc.insecure_channel('localhost:50051') as channel:
stub = helloworld_pb2_grpc.GreeterStub(channel)
for i, response in enumerate(stub.SayHello(helloworld_pb2.HelloRequest(name='you', num_greetings="100"))):
resp_queue.put(response)
finished.set()
def is_batch_end(batch_start):
return time.time() - batch_start < BATCH_PERIOD
def get_remaining_time(time_start):
return (time_start + BATCH_PERIOD) - time.time()
def batch_responses(resp_queue, finished):
batch_num = 0
while True:
batch_resps = []
batch_start = time.time()
remaining_time = get_remaining_time(batch_start)
while remaining_time > 0.0 and not finished.is_set():
try:
batch_resps.append(resp_queue.get())
except queue.Empty:
pass
finally:
remaining_time = get_remaining_time(batch_start)
print("Batch ():".format(batch_num + 1, len(batch_resps)))
for resp in batch_resps:
print(" ''".format(resp.message))
batch_num += 1
def run():
resp_queue = queue.Queue()
finished = threading.Event()
client_thread = threading.Thread(target=collect_responses, args=(resp_queue, finished))
client_thread.start()
batch_responses(resp_queue, finished)
client_thread.join()
if __name__ == '__main__':
run()
【讨论】:
以上是关于检查 gRPC 流何时为空或不是流式传输数据的主要内容,如果未能解决你的问题,请参考以下文章