如何通过多线程使用socket和pyqt避免数据丢失

Posted

技术标签:

【中文标题】如何通过多线程使用socket和pyqt避免数据丢失【英文标题】:How to avoid data loss using socket and pyqt with multithread 【发布时间】:2020-02-13 09:34:50 【问题描述】:

我不断地同时从多个客户端接收图像到单个服务器。我已经使用 pyqt5 和 QRunnable 来实现这一点。该代码将接受客户端连接将其传递给线程并开始将图像和数据连同它一起流式传输。但是我观察到每个线程中没有接收/丢弃一些图像。这是我的服务器端线程代码,它从客户端流式传输图像:

class threading(QRunnable):
    def __init__(self,t1,t2,index,ttype,ptype,test):
        super(threading,self).__init__()

        self.t1=t1
        self.t2=t2

        self.ch=index
        self.ttype=ttype
        self.ptype=ptype
        self.test=test
        self.top1=""
        self.bot1=""
        self.img1=np.zeros((161,182,3),dtype=np.uint8)
        self.signals=WorkerSignals()
        self.top_process=top_data()
        self.bot_process=bot_data()

    def recvall(self,sock, count):
        buf = b''
        while count:
            if sock:
                newbuf = sock.recv(count)
            if not newbuf: return None
            buf += newbuf
            count -= len(newbuf)
        return buf

    def stream(self,conn):
        length = '0'
        length = (self.recvall(conn,5))
        print ('data len',length)
        if(length[3]==35):
            stringData = self.recvall(conn, int(length[:3]))

        elif(length[4]==35):
            stringData = self.recvall(conn, int(length[:4]))

        else:
            stringData = self.recvall(conn, int(length))
            print('act len',int(length))
            #print('sting 3',stringData)

        data = np.frombuffer(stringData, dtype='uint8')
        decimg = cv2.imdecode(data,1)
        #conn.close()
        return decimg

    def run(self):

        c=1
        print('thread',self.ch,self.test)
        global b1,b2,b3,b4,b5,b6
        if self.test==2:
            for fille in os.listdir(b1[self.ch-1]):
                if fille.endswith('.jpg'):
                    os.remove(b1[self.ch-1]+fille)
            for fille in os.listdir(b2[self.ch-1]):
                if fille.endswith('.jpg'):
                    os.remove(b2[self.ch-1]+fille)
            for fille in os.listdir(b3[self.ch-1]):
                if fille.endswith('.jpg'):
                    os.remove(b3[self.ch-1]+fille)
            for fille in os.listdir(b4[self.ch-1]):
                if fille.endswith('.jpg'):
                    os.remove(b4[self.ch-1]+fille)
            for fille in os.listdir(b5[self.ch-1]):
                if fille.endswith('.jpg'):
                    os.remove(b5[self.ch-1]+fille)
            for fille in os.listdir(b6[self.ch-1]):
                if fille.endswith('.jpg'):
                    os.remove(b6[self.ch-1]+fille)
            for fille in os.listdir(b7[self.ch-1]):
                if fille.endswith('.jpg'):
                    os.remove(b7[self.ch-1]+fille)
        while not stopEvent.is_set():

            img1=self.stream(self.t1)
            d1=convertimg(img1)
            if self.ch>=7:
                self.bot1=self.recvall(self.t2,30)
            if self.ch<=6:
                self.top1=self.recvall(self.t2,40)
            if self.test>0:
                if self.ch<=6:
                    self.top_process.taskvar(self.top1,img1,self.ttype,self.ptype,self.test,self.ch)
                    self.top_process.start()
                    self.top_process.signals.finish.connect(self.sendata,Qt.DirectConnection)
                if self.ch>=7:
                    self.bot_process.taskvar(self.bot1,img1,self.ttype,self.ptype,self.test,self.ch)
                    self.bot_process.start()


            if self.ch==1:
                self.signals.i1.emit(d1)

            if self.ch==2:
                self.signals.i2.emit(d1)

            if self.ch==3:
                self.signals.i3.emit(d1)

            if self.ch==4:
                self.signals.i4.emit(d1)

            if self.ch==5:
                self.signals.i5.emit(d1)

            if self.ch==6:
                self.signals.i6.emit(d1)

            if self.ch==7:
                self.signals.i7.emit(d1)

            if self.ch==8:
                self.signals.i8.emit(d1)

            if self.ch==9:
                self.signals.i9.emit(d1)

            if self.ch==10:
                self.signals.i10.emit(d1)

            if self.ch==11:
                self.signals.i11.emit(d1)

            if self.ch==12:
                self.signals.i12.emit(d1)
            c=c+1

有 12 个客户端同时向服务器发送一个图像和一个字符串。有人可以告诉我为什么会丢失数据吗?是竞争条件还是接收代码时出错。

【问题讨论】:

为什么要指出数据丢失了?你怎么确定它是客户端还是服务器?如果您需要帮助,请清楚地解释您的真正问题并证明您所说的内容,提供您的客户端和服务器的minimal reproducible example 【参考方案1】:

解决此问题的一种方法是添加互斥体。

from PyQt5.QtCore import QMutex

你把它放在你的初始化中:

self.mutex = QMutex()

然后你锁定接收图片的那部分代码:

if self.mutex.tryLock():
    # receive picture

self.mutex.unlock()

有关更多信息和示例,您可以查看官方文档 QMutex

让我知道它是否有效!

【讨论】:

不。流式传输变慢,每个客户端的数据丢失都是随机的

以上是关于如何通过多线程使用socket和pyqt避免数据丢失的主要内容,如果未能解决你的问题,请参考以下文章

如何通过多线程代码python提高Webscraping代码速度

是否有可能以某种方式同时在 ZMQ 中使用 Send/Recv(通过多线程)?

如何通过多线程 Java 编程最大限度地利用资源(RAM 和 CPU)?

如何通过多线程概念下载单个文件

在 Table Storage azure 中使用 Etag 通过多线程更新属性

JNI 通过多线程从 C++ 调用 Java