具有多处理功能的 Python socketio
Posted
技术标签:
【中文标题】具有多处理功能的 Python socketio【英文标题】:Python socketio with multiprocessing 【发布时间】:2021-03-28 04:59:48 【问题描述】:所以我一直在努力解决这个让我发疯的泡菜错误。我有以下带有以下代码的主引擎类:
import eventlet
import socketio
import multiprocessing
from multiprocessing import Queue
from multi import Sioserever
class masterEngine:
if __name__ == '__main__':
serverObj = SIOSerever()
try:
receiveData = multiprocessing.Process(target=serverObj.run)
receiveData.start()
receiveProcess = multiprocessing.Process(target=serverObj.fetchFromQueue)
receiveProcess.start()
receiveData.join()
receiveProcess.join()
except Exception as error:
print(error)
我还有另一个名为 multi 的文件,其运行如下:
import multiprocessing
from multiprocessing import Queue
import eventlet
import socketio
class SIOSerever:
def __init__(self):
self.cycletimeQueue = Queue()
self.sio = socketio.Server(cors_allowed_origins='*',logger=False)
self.app = socketio.WSGIApp(self.sio, static_files='/': 'index.html',)
self.ws_server = eventlet.listen(('0.0.0.0', 5000))
@self.sio.on('production')
def p_message(sid, message):
self.cycletimeQueue.put(message)
print("I logged : "+str(message))
def run(self):
eventlet.wsgi.server(self.ws_server, self.app)
def fetchFromQueue(self):
while True:
cycle = self.cycletimeQueue.get()
print(cycle)
如您所见,我可以尝试创建两个我想独立运行的 def run 和 fetchFromQueue 进程。
我的 run 函数启动 python-socket 服务器,我从 html 网页发送一些数据到该服务器(无需多处理即可完美运行)。然后我尝试将接收到的数据推送到队列,以便我的其他函数可以检索它并使用接收到的数据。
我需要对从套接字接收到的数据执行一组耗时操作,这就是我将其全部推入队列的原因。
在运行主引擎类时,我收到以下信息:
Can't pickle <class 'threading.Thread'>: it's not the same object as threading.Thread
I ended!
[Finished in 0.5s]
你能帮我解决我做错了什么吗?
【问题讨论】:
【参考方案1】:来自多处理programming guidelines:
将资源显式传递给子进程
在使用 fork start 方法的 Unix 上,子进程可以使用在使用全局资源的父进程中创建的共享资源。但是,最好将对象作为参数传递给子进程的构造函数。
除了使代码(可能)与 Windows 和其他启动方法兼容之外,这还确保只要子进程仍然存在,该对象就不会在父进程中被垃圾收集。如果在父进程中对对象进行垃圾回收时释放了某些资源,这可能很重要。
因此,我稍微修改了您的示例,删除了所有不必要的内容,但展示了一种将共享队列显式传递给所有使用它的进程的方法:
import multiprocessing
MAX = 5
class SIOSerever:
def __init__(self, queue):
self.cycletimeQueue = queue
def run(self):
for i in range(MAX):
self.cycletimeQueue.put(i)
@staticmethod
def fetchFromQueue(cycletimeQueue):
while True:
cycle = cycletimeQueue.get()
print(cycle)
if cycle >= MAX - 1:
break
def start_server(queue):
server = SIOSerever(queue)
server.run()
if __name__ == '__main__':
try:
queue = multiprocessing.Queue()
receiveData = multiprocessing.Process(target=start_server, args=(queue,))
receiveData.start()
receiveProcess = multiprocessing.Process(target=SIOSerever.fetchFromQueue, args=(queue,))
receiveProcess.start()
receiveData.join()
receiveProcess.join()
except Exception as error:
print(error)
0
1
...
【讨论】:
那么在上面的例子中,Queue的最大尺寸定义为5对吧?如果我想要不定尺寸怎么办?另外,我想将从套接字接收到的数据放入队列中。您所做的只是添加数字。 队列是无限的,我使用 5 个项目来进行演示,以使程序终止。您可以将 int 放入队列 str 或 bytes 而不是。 你能帮我看看如何将我从套接字服务器接收到的数据推送到队列中吗?以上是关于具有多处理功能的 Python socketio的主要内容,如果未能解决你的问题,请参考以下文章
Python socketio 示例连接到 cryptocompare
用 Python flask_socketio 反应 socket.io-client 不处理事件