复制的套接字没有被腌制

Posted

技术标签:

【中文标题】复制的套接字没有被腌制【英文标题】:A copied socket is not being pickled 【发布时间】:2021-12-15 16:09:45 【问题描述】:

我正在尝试复制一个套接字并将其发送到 Python 中的不同进程。

套接字是在 rust 中创建的,并通过 PyO3 作为 Python 对象共享。

这是共享套接字代码


use pyo3::prelude::*;

use socket2::Domain, Protocol, Socket, Type;
use std::net::SocketAddr;

#[pyclass]
#[derive(Debug)]
pub struct SocketHeld 
    pub socket: Socket,


#[pymethods]
impl SocketHeld 
    #[new]
    pub fn new(address: String, port: i32) -> PyResult<SocketHeld> 
        let socket = Socket::new(Domain::IPV4, Type::STREAM, Some(Protocol::TCP))?;
        println!("", address);
        let address: SocketAddr = address.parse()?;
        socket.set_reuse_address(true)?;
        //socket.set_reuse_port(true)?;
        socket.bind(&address.into())?;
        socket.listen(1024)?;

        Ok(SocketHeld  socket )
    

    pub fn try_clone(&self) -> PyResult<SocketHeld> 
        let copied = self.socket.try_clone()?;
        Ok(SocketHeld  socket: copied )
    


impl SocketHeld 
    pub fn get_socket(&self) -> Socket 
        self.socket.try_clone().unwrap()
    



下面是 python 代码,我试图在其中启动两个不同的进程。我尝试使用原生多进程库、多进程库的分支甚至是 pathos 库。



    def start(self, url="127.0.0.1", port=5000):
        """
        [Starts the server]

        :param port [int]: [reperesents the port number at which the server is listening]
        """
        socket = SocketHeld(f"0.0.0.0:port", port)
        if not self.dev:
            from pathos.pools import ProcessPool
            pool = ProcessPool(nodes=2)
            # spawned_process(url, port, self.routes, socket.try_clone(), f"Process 1")
            pool.map(spawned_process, [(url, port, self.routes, socket.try_clone(), f"Process 1"), (url, port, self.routes, socket.try_clone(), f"Process 2")])
            # for i in range(2):
            #     copied = socket.try_clone()
            #     p = Pool().map(
            #         spawned_process,
            #         args=(self.routes, copied, f"Process i"),
            #     )
            #     p.start()

            # input("Press Cntrl + C to stop \n")
            # self.server.start(url, port)
        else:
            ...

但是,我仍然收到无法序列化对象的错误。

我收到以下错误:


Traceback (most recent call last):
  File "integration_tests/base_routes.py", line 75, in <module>
    app.start(port=5000, url='0.0.0.0')
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/robyn/__init__.py", line 95, in start
    pool.map(spawned_process, [(url, port, self.routes, socket.try_clone(), f"Process 1"), (url, port, self.routes, socket.try_clone(), f"Process 2")])
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/pathos/multiprocessing.py", line 139, in map
    return _pool.map(star(f), zip(*args)) # chunksize
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/pool.py", line 364, in map
    return self._map_async(func, iterable, mapstar, chunksize).get()
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/pool.py", line 771, in get
    raise self._value
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/pool.py", line 537, in _handle_tasks
    put(task)
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/connection.py", line 209, in send
    self._send_bytes(_ForkingPickler.dumps(obj))
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/multiprocess/reduction.py", line 54, in dumps
    cls(buf, protocol, *args, **kwds).dump(obj)
  File "/Users/bruhh/.pyenv/versions/maturin/lib/python3.8/site-packages/dill/_dill.py", line 498, in dump
    StockPickler.dump(self, obj)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 485, in dump
    self.save(obj)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 899, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 884, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 558, in save
    f(self, obj)  # Call unbound method with explicit self
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 899, in save_tuple
    save(element)
  File "/Users/bruhh/.pyenv/versions/3.8.5/lib/python3.8/pickle.py", line 576, in save
    rv = reduce(self.proto)
TypeError: cannot pickle 'builtins.SocketHeld' object


在概念上这里的某个地方出错了吗?解决办法是什么?

PS:

我正在尝试在进程中启动服务器运行时。


def spawned_process(url, port, handlers, socket, name):
    import asyncio
    import uvloop

    uvloop.install()
    loop = uvloop.new_event_loop()
    asyncio.set_event_loop(loop)

    print(handlers)
    server = Server()


    for i in handlers:
        route_type, endpoint, handler, is_async, number_of_params = i
        print(i)
        server.add_route(route_type, endpoint, handler, is_async, number_of_params)

    print(socket, name)
    server.start(url, port, socket, name)
    asyncio.get_event_loop().run_forever()

【问题讨论】:

我认为您不能将套接字发送到另一个进程(可能是另一个线程)。你真的尝试在 python 中对套接字库本身做同样的事情吗? @Netwave ,我还没有。因为大部分代码库只是生锈的。我试图坚持下去。另外,我不明白它在概念上会有什么不同。因为,套接字被克隆得很好。我认为这应该可以正常工作。 【参考方案1】:

套接字基本上只是对某些操作系统内核结构的进程相关引用。由于酸洗仅涉及此参考的用户空间部分而不涉及内核结构,因此不能简单地在其他进程、不同机器等处酸洗和恢复套接字。

在 UNIX 系统中,文件描述符可以通过 UNIX 域套接字在进程之间传递,这将在另一个进程中创建对相同内核结构的另一个引用。但这不适用于 SSL 套接字,因为有一些与此套接字相关的用户空间状态,它既不是文件描述符进程的一部分,也不是 Python 特定的酸洗的一部分。

【讨论】:

我应该采取什么方法? @SanskarJethi,一些生产者/消费者模式可能会做。但我们缺乏关于您要做什么的背景信息。 @SanskarJethi:你只问XY problem的问题Y。这个问题 Y 不能以一般的方式解决。因此,您可能需要重新考虑解决问题 X 的方法。由于我们不知道 X,因此我们无能为力。 抱歉信息有限。我正在尝试在每个进程中运行一个执行运行时。基本上,我正在尝试从多个进程中读取 TCP 套接字。我已经更新了描述中的代码sn-p,也许会更有帮助。 再想一想,也许尝试队列会起作用。因为我无论如何都在主进程中复制套接字,我不需要在进程之间共享单个套接字。

以上是关于复制的套接字没有被腌制的主要内容,如果未能解决你的问题,请参考以下文章

C++:如何测量非阻塞套接字的实际上传速率

阻塞和非阻塞同步和异步

关闭 PHP HTTP 请求然后运行函数? [复制]

套接字描述符是唯一的吗? [复制]

如何检测远程侧套接字关闭? [复制]

套接字连接被中止 - CommunicationException