Python进程不能腌制

Posted

技术标签:

【中文标题】Python进程不能腌制【英文标题】:Python Process cannot pickle 【发布时间】:2021-11-01 00:51:08 【问题描述】:

代码:

from aiohttp import web
from aiortc.mediastreams import MediaStreamTrack
from aiortc import RTCPeerConnection, RTCSessionDescription
from aiortc.contrib.media import MediaPlayer
import asyncio
import json
import os
from multiprocessing import Process, freeze_support
from queue import Queue
import sys
import threading
from time import sleep
import fractions
import time

class Radioserver(Process):
    def __init__(self,q):
        super().__init__()
        self.q = q
        self.ROOT = os.path.dirname(__file__)
        self.pcs = []
        self.channels = []
        self.stream_offers = []
        self.requests = []
    
    def run(self):
        self.app = web.Application()
        self.app.on_shutdown.append(self.on_shutdown)
        self.app.router.add_get("/", self.index)
        self.app.router.add_get("/radio.js", self.javascript)
        self.app.router.add_get("/jquery-3.5.1.min.js", self.jquery)
        self.app.router.add_post("/offer", self.offer)
        
        threading.Thread(target=self.fill_the_queues).start()
        web.run_app(self.app, access_log=None, host="192.168.1.20", port="8080", ssl_context=None)

    def fill_the_queues(self):
        while(True):
            frame = self.q.get()
            for stream_offer in self.stream_offers:
                stream_offer.q.put(frame)

    async def index(self,request):
        content = open(os.path.join(self.ROOT, "index.html"), encoding="utf8").read()
        return web.Response(content_type="text/html", text=content)


    async def javascript(self,request):
        content = open(os.path.join(self.ROOT, "radio.js"), encoding="utf8").read()
        return web.Response(content_type="application/javascript", text=content)

    async def jquery(self,request):
        content = open(os.path.join(self.ROOT, "jquery-3.5.1.min.js"), encoding="utf8").read()
        return web.Response(content_type="application/javascript", text=content)

    async def offer(self,request):  
        params = await request.json()
        offer = RTCSessionDescription(sdp=params["sdp"], type=params["type"])

        pc = RTCPeerConnection()
        self.pcs.append(pc)
        self.requests.append(request)
        
        # prepare epalxeis media
        self.stream_offers.append(CustomRadioStream())
        pc.addTrack(self.stream_offers[-1])


        @pc.on("iceconnectionstatechange")
        async def on_iceconnectionstatechange():
            if pc.iceConnectionState == "failed":
                self.pcs.remove(pc)
                self.requests.remove(request)
                print(str(request.remote)+" disconnected from radio server")
                print("Current peer connections:"+str(len(self.pcs)))

        # handle offer
        await pc.setRemoteDescription(offer)

        # send answer
        answer = await pc.createAnswer()
        await pc.setLocalDescription(answer)
        
            

        return web.Response(content_type="application/json",text=json.dumps("sdp": pc.localDescription.sdp, "type": pc.localDescription.type))

    async def on_shutdown(self,app):
        # close peer connections
        if self.pcs:
            coros = [pc.close() for pc in self.pcs]
            await asyncio.gather(*coros)
            self.pcs = []
            self.channels = []
            self.stream_offers = []
            
"""
some other classes here such as CustomRadioStream and RadioOutputStream
"""


if __name__ == "__main__":
    freeze_support()
    q = Queue()
    custom_server_child_process = RadioServer(q)
    custom_server_child_process.start()

错误

Traceback (most recent call last):
  File "123.py", line 106, in <module>
    custom_server_child_process.start()
  File "C:/msys64/mingw64/lib/python3.8/multiprocessing/process.py", line 121, i
n start
    self._popen = self._Popen(self)
  File "C:/msys64/mingw64/lib/python3.8/multiprocessing/context.py", line 224, i
n _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:/msys64/mingw64/lib/python3.8/multiprocessing/context.py", line 327, i
n _Popen
    return Popen(process_obj)
  File "C:/msys64/mingw64/lib/python3.8/multiprocessing/popen_spawn_win32.py", l
ine 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:/msys64/mingw64/lib/python3.8/multiprocessing/reduction.py", line 60,
in dump
    ForkingPickler(file, protocol).dump(obj)
TypeError: cannot pickle '_thread.lock' object

我做错了什么? 如果我直接调用run函数(而不是start),那么没有问题,但是我想对这个类使用处理。

编辑: 使用 multiprocessing.Queue 可以正常工作,但现在使用类似代码时出现此错误:

$ python "Papinhio_player.py"
Traceback (most recent call last):
  File "Papinhio_player.py", line 3078, in <module>
    program = PapinhioPlayerCode()
  File "Papinhio_player.py", line 250, in __init__
    self.manage_decks_instance = Manage_Decks(self)
  File "C:\python\scripts\Papinhio player\src\main\python_files/manage_decks.py"
, line 356, in __init__
    self.custom_server_child_process.start()
  File "C:/msys64/mingw64/lib/python3.8/multiprocessing/process.py", line 121, i
n start
    self._popen = self._Popen(self)
  File "C:/msys64/mingw64/lib/python3.8/multiprocessing/context.py", line 224, i
n _Popen
    return _default_context.get_context().Process._Popen(process_obj)
  File "C:/msys64/mingw64/lib/python3.8/multiprocessing/context.py", line 327, i
n _Popen
    return Popen(process_obj)
  File "C:/msys64/mingw64/lib/python3.8/multiprocessing/popen_spawn_win32.py", l
ine 93, in __init__
    reduction.dump(process_obj, to_child)
  File "C:/msys64/mingw64/lib/python3.8/multiprocessing/reduction.py", line 60,
in dump
    ForkingPickler(file, protocol).dump(obj)
  File "stringsource", line 2, in av.audio.codeccontext.AudioCodecContext.__redu
ce_cython__
TypeError: self.parser,self.ptr cannot be converted to a Python object for pickl
ing
Traceback (most recent call last):
  File "<string>", line 1, in <module>
  File "C:/msys64/mingw64/lib/python3.8/multiprocessing/spawn.py", line 116, in
spawn_main
    exitcode = _main(fd, parent_sentinel)
  File "C:/msys64/mingw64/lib/python3.8/multiprocessing/spawn.py", line 126, in
_main
    self = reduction.pickle.load(from_parent)
EOFError: Ran out of input

【问题讨论】:

我认为你需要使用multiprocessing.Queue,来获取可以转移到不同进程的对象,而不是queue.Queue 尝试dill 而不是pickle dill.readthedocs.io/en/latest 【参考方案1】:

有些对象不能序列化然后反序列化。

您发布的堆栈跟踪提到:

TypeError: cannot pickle '_thread.lock' object

一个锁,它持有一个状态在内存中,并保证没有其他进程可以拥有相同的锁在同一时刻,通常是一个非常糟糕的候选者这个操作——反序列化​​时应该创建什么?


要解决此问题:选择一种方法来选择要序列化的对象的相关字段,然后腌制/取消腌制该部分。

【讨论】:

以上是关于Python进程不能腌制的主要内容,如果未能解决你的问题,请参考以下文章

为啥我不能腌制一个 typing.NamedTuple 而我可以腌制一个 collections.namedtuple?

悲怆多处理不能腌制

在类中使用 ProcessPoolExecutor 时无法腌制协程对象

准确确定在 Python 多处理期间腌制的内容

python 属性错误:无法腌制本地对象。使用多处理

复制的套接字没有被腌制