检查 gRPC 流何时为空或不是流式传输数据

Posted

技术标签:

【中文标题】检查 gRPC 流何时为空或不是流式传输数据【英文标题】:Checking to see when a gRPC stream is empty or isn't streaming data 【发布时间】:2018-10-26 15:16:17 【问题描述】:

我正在创建一个连接到服务器的存根,该服务器以特定间隔流式传输数据,然后将其上传到 TSDB。我已经实施了批处理来优化上传,但是如果在一个时间间隔内流出的数据量与批量大小不一致,那么一些数据将在下一个时间间隔之前不会被上传,这是我不想要的。 gRPC 存根上是否有办法检查流是否为空?

class DialInClient(object):
    def __init__(self, host, port, timeout=100000000, user='root', password='lablab'):
        self._host = host
        self._port = port
        self._timeout = float(timeout)
        self._channel = None
        self._cisco_ems_stub = None
        self._connected = False
        self._metadata = [('username', user), ('password', password)]

    def subscribe(self, sub_id):
        sub_args = CreateSubsArgs(ReqId=1, encode=3, subidstr=sub_id)
        stream = self._cisco_ems_stub.CreateSubs(sub_args, timeout=self._timeout, metadata=self._metadata)
        for segment in stream:
            yield segment 

    def connect(self):
        self._channel = grpc.insecure_channel(':'.join([self._host,self._port]))
        try:
            grpc.channel_ready_future(self._channel).result(timeout=10)
            self._connected = True
        except grpc.FutureTimeoutError as e:
            raise DeviceFailedToConnect from e
        else:
            self._cisco_ems_stub = gRPCConfigOperStub(self._channel)

如果我设置了一个低超时,整个通道断开连接,我想在 for 循环中添加某种超时以进行流式传输,看看我是否在 1 秒内没有得到另一个片段 yield None 告诉我的另一个这就是结束的部分,并且在没有完整批量大小的情况下上传。

【问题讨论】:

【参考方案1】:

GRPC 本身不存在这种机制,但threading 库应该允许您在批次满之前发送它们。我已经包含了 python GRPC hello world example 的修改版本,让您了解如何做到这一点。

from __future__ import print_function                                                                                                        

import grpc                                                                                                                                  

import helloworld_pb2
import helloworld_pb2_grpc                                                                                                                   

import threading
from six.moves import queue
import time 

# 10 second batches    
BATCH_PERIOD = 10.0

def collect_responses(resp_queue, finished):                                                                                                 
    with grpc.insecure_channel('localhost:50051') as channel:
        stub = helloworld_pb2_grpc.GreeterStub(channel)                                                                                      
        for i, response in enumerate(stub.SayHello(helloworld_pb2.HelloRequest(name='you', num_greetings="100"))):                           
            resp_queue.put(response)                                                                                                         
    finished.set()                                                                                                                           

def is_batch_end(batch_start):                                                                                                               
    return time.time() - batch_start < BATCH_PERIOD                                                                                          

def get_remaining_time(time_start):                                                                                                          
    return (time_start + BATCH_PERIOD) - time.time()

def batch_responses(resp_queue, finished):
    batch_num = 0
    while True:        
        batch_resps = []
        batch_start = time.time()
        remaining_time = get_remaining_time(batch_start)                                                                                     
        while remaining_time > 0.0 and not finished.is_set():
            try:       
                batch_resps.append(resp_queue.get())                                                                                         
            except queue.Empty:                                                                                                              
                pass                                                                                                                         
            finally:
                remaining_time = get_remaining_time(batch_start)
        print("Batch  ():".format(batch_num + 1, len(batch_resps)))                                                                      
        for resp in batch_resps:                                                                                                             
            print("  ''".format(resp.message))
        batch_num += 1

def run():                                                                                                                                   
    resp_queue = queue.Queue()
    finished = threading.Event()                                                                                                             
    client_thread = threading.Thread(target=collect_responses, args=(resp_queue, finished))                                                  
    client_thread.start()
    batch_responses(resp_queue, finished)                                                                                                    
    client_thread.join()

if __name__ == '__main__':                                                                                                                   
    run() 

【讨论】:

以上是关于检查 gRPC 流何时为空或不是流式传输数据的主要内容,如果未能解决你的问题,请参考以下文章

C# gRPC 文件流式传输,原始文件小于流式传输的文件

spark中的isNullOrEmpty函数检查数据框中的列是不是为空或空字符串

EXCEL VBA 检查条目是不是为空或不是“空格”

检查键盘输入是不是为空或不是“Y”或不是“X”

检查所有三列是不是不为空或为空

在我添加错误消息以检查 CapNum 是不是为空或已存在于数据库中之前,我的代码一直在工作