在线程之间共享引用的终身麻烦

Posted

技术标签:

【中文标题】在线程之间共享引用的终身麻烦【英文标题】:Lifetime troubles sharing references between threads 【发布时间】:2015-04-23 15:59:56 【问题描述】:

我有一个启动工作线程的线程,所有线程都有望永远存在。每个工作线程都维护自己的Sockets 列表。

某些操作要求我遍历当前活动的所有套接字,但我在尝试创建包含指向另一个列表拥有的套接字的指针的套接字主列表时遇到了生命周期问题。

use std::str, thread;
use std::thread::JoinHandle;
use std::io::Read, Write;
use std::net::TcpListener, TcpStream;
use std::sync::Arc, Mutex;
use std::ops::DerefMut;
use std::sync::mpsc::channel, Sender, Receiver, TryRecvError;
use self::socketlist::SocketList;
use self::mastersocketlist::MasterSocketList;

pub struct Socket 
    user: String,
    stream: TcpStream,


mod socketlist 
    use self::SocketList::Node, End;
    use super::Socket;

    pub enum SocketList 
        Node(Socket, Box<SocketList>),
        End,
    

    impl SocketList 
        pub fn new() -> SocketList 
            End
        

        pub fn add(self, socket: Socket) -> SocketList 
            Node(socket, Box::new(self))
        

        pub fn newest<'a>(&'a mut self) -> Result<&'a Socket, String> 
            match *self 
                Node(ref mut socket, ref mut next) => Ok(socket),
                End => Err("No socket available".to_string()),
            
        
    


mod mastersocketlist 
    use self::MasterSocketList::Node, End;
    use super::Socket;

    pub enum MasterSocketList<'a> 
        Node(Box<&'a Socket>, Box<MasterSocketList<'a>>),
        End,
    

    impl<'a> MasterSocketList<'a> 
        pub fn new() -> MasterSocketList<'a> 
            End
        

        pub fn add(self, socket: &'a Socket) -> MasterSocketList<'a> 
            MasterSocketList::Node(Box::new(&socket), Box::new(self))
        
    


pub struct SlotManager 
    prox: JoinHandle<()>,
    prox_tx: Sender<TcpStream>,


impl SlotManager 
    pub fn new() -> SlotManager 
        let (tx, rx): (Sender<TcpStream>, Receiver<TcpStream>) = channel();

        let tx_clone = tx.clone();
        let prox = thread::spawn(move || SlotManager::event_loop(tx, rx));

        SlotManager 
            prox: prox,
            prox_tx: tx_clone,
        
    

    pub fn sender(&self) -> Sender<TcpStream> 
        self.prox_tx.clone()
    

    fn event_loop(tx: Sender<TcpStream>, rx: Receiver<TcpStream>) 
        let socket_list = Arc::new(Mutex::new(MasterSocketList::new()));
        let mut slot = Slot::new(socket_list.clone());
        loop 
            match rx.try_recv() 
                Ok(stream) => slot.new_connection(stream),
                Err(e) => 
            
        
    


pub struct Slot 
    prox: JoinHandle<()>,
    prox_tx: Sender<TcpStream>,


impl Slot 
    pub fn new(master_socket_list: Arc<Mutex<MasterSocketList>>) -> Slot 
        let (tx, rx): (Sender<TcpStream>, Receiver<TcpStream>) = channel();

        let tx_clone = tx.clone();
        let prox = thread::spawn(move || Slot::event_loop(tx, rx, master_socket_list));

        Slot 
            prox: prox,
            prox_tx: tx_clone,
        
    

    pub fn new_connection(&self, stream: TcpStream) 
        self.prox_tx.send(stream);
    

    fn event_loop(tx: Sender<TcpStream>,
                  rx: Receiver<TcpStream>,
                  master_socket_list: Arc<Mutex<MasterSocketList>>) 

        let mut sockets = SocketList::new();
        loop 
            // Check for new connections
            match rx.try_recv() 
                Ok(stream) => 
                    let mut socket = Socket 
                        user: "default".to_string(),
                        stream: stream,
                    ;
                    sockets = sockets.add(socket);

                    let mut msl_guard = match master_socket_list.lock() 
                        Ok(guard) => guard,
                        Err(poisoned) => poisoned.into_inner(),
                    ;
                    let mut msl_handle = msl_guard.deref_mut();
                    *msl_handle = msl_handle.add(sockets.newest().unwrap());
                
                Err(e) => 
            
        
    


fn main() 
    let mut slot_manager = SlotManager::new();
    let listener = TcpListener::bind("127.0.0.1:1234").unwrap();
    for stream in listener.incoming() 
        match stream 
            Ok(stream) => 
                let sender = slot_manager.sender();
                thread::spawn(move || 
                    sender.send(stream);
                    //process_new_connection(stream, sender)
                );
            
            Err(e) => println!("Connection error: ", e),
        
    
    drop(listener);

我收到的错误...

error[E0477]: the type `[closure@src/main.rs:107:34: 107:86 tx:std::sync::mpsc::Sender<std::net::TcpStream>, rx:std::sync::mpsc::Receiver<std::net::TcpStream>, master_socket_list:std::sync::Arc<std::sync::Mutex<mastersocketlist::MasterSocketList<'_>>>]` does not fulfill the required lifetime
   --> src/main.rs:107:20
    |
107 |         let prox = thread::spawn(move || Slot::event_loop(tx, rx, master_socket_list));
    |                    ^^^^^^^^^^^^^
    |
    = note: type must outlive the static lifetime

我什至不知道我正在尝试的内容是否可以作为安全代码。

我希望mastersocketlist 包含一个指向套接字的指针,其中套接字的生命周期由创建它的线程定义。我相信这就是所有这些错误的含义,但我不知道如何提供适当的生命周期注释来修复它。

【问题讨论】:

【参考方案1】:

Rust 的一大优点是跨函数的类型检查仅由函数签名完成。这意味着您可以用unimplemented!() 替换大部分函数体并保留类型检查错误。

重复该过程几次,您最终不会调用很多函数 - 删除那些。内联模块和减少结构/枚举也有帮助。

在某些时候你的错误会消失 - 问题的第一条线索!坚持下去,你会得到一个微小的复制品:

use std::sync::Arc, Mutex;
use std::thread;

pub enum MasterSocketList<'a> 
    One(&'a u8),


pub struct Slot;

impl Slot 
    pub fn new<'a>(master_socket_list: Arc<Mutex<MasterSocketList<'a>>>) -> Slot 
        thread::spawn(move || 
            master_socket_list;
        );
        unimplemented!();
    


fn main() 

检查错误,它仍然匹配:

error[E0477]: the type `[closure@src/main.rs:12:23: 14:10 master_socket_list:std::sync::Arc<std::sync::Mutex<MasterSocketList<'a>>>]` does not fulfill the required lifetime
  --> src/main.rs:12:9
   |
12 |         thread::spawn(move || 
   |         ^^^^^^^^^^^^^
   |
   = note: type must satisfy the static lifetime

让我们检查文档中thread::spawn 的签名:

pub fn spawn<F, T>(f: F) -> JoinHandle<T>
where
    F: FnOnce() -> T,
    F: Send + 'static,
    T: Send + 'static, 

这里的关键点是F: Send + 'static - 你给spawn 的闭包必须只包含持续整个程序生命周期的引用。这是因为spawn 可以创建变得分离的线程。一旦分离,线程可以永远存在,所以所有引用必须至少存在那么长时间,否则你会得到悬空引用,这是一件坏事! Rust 再次拯救了这一天!

如果你想保证线程会在一个已知点终止,你可以使用作用域线程,比如scoped-threadpool或crossbeam提供的那些。

如果您的代码中没有包含生命周期的变量,则使用某种类型的共享所有权,如 Arc 与确保只有一个线程可以改变变量的东西配对,如 Mutex充足的。这允许每个线程拥有共享值,最终在最后一个线程退出时将其删除。详情请见How do I share a mutable object between threads?。

【讨论】:

哇,我不知道我可以在那里拍unimplemented()! 并使示例更清晰。非常感谢您的帮助!

以上是关于在线程之间共享引用的终身麻烦的主要内容,如果未能解决你的问题,请参考以下文章

多线程编程

多线程 threading

python__系统 : 线程

vuex 实现vue中多个组件之间数据同步以及数据共享。

java核心-多线程-线程类基础知识

Java并发编程:CallableFuture和FutureTask及在项目中的引用