运行产生线程的可中断 Rust 程序

Posted

技术标签:

【中文标题】运行产生线程的可中断 Rust 程序【英文标题】:Running interruptible Rust program that spawns threads 【发布时间】:2017-02-22 22:50:11 【问题描述】:

我正在尝试编写一个生成一堆线程然后在最后加入线程的程序。我希望它是可中断的,因为我的计划是让它成为一个在 UNIX 服务中不断运行的程序。

这个想法是worker_pool 将包含所有已生成的线程,因此可以随时调用terminate 来收集它们。

我似乎找不到使用 chan_select 板条箱的方法来执行此操作,因为这需要我首先生成一个线程来生成我的子线程,一旦我这样做了,我就不能再使用 worker_pool 变量在中断时加入线程时,因为它必须被移出主循环。如果您注释掉中断中终止工作人员的行,它会编译。

我有点沮丧,因为这在 C 中真的很容易做到。我可以设置一个静态指针,但是当我尝试在 Rust 中这样做时,我得到了一个错误,因为我使用了一个向量作为我的线程,并且我无法在静态中初始化为空向量。我知道在中断代码中加入工作人员是安全的,因为执行会在此处停止等待信号。

也许有更好的方法来处理信号,或者我错过了一些我可以做的事情。

错误和代码如下:

MacBook8088:video_ingest pjohnson$ cargo run
   Compiling video_ingest v0.1.0 (file:///Users/pjohnson/projects/video_ingest)
error[E0382]: use of moved value: `worker_pool`
  --> src/main.rs:30:13
   |
24 |     thread::spawn(move || run(sdone, &mut worker_pool));
   |                   ------- value moved (into closure) here
...
30 |             worker_pool.terminate();
   |             ^^^^^^^^^^^ value used here after move
<chan macros>:42:47: 43:23 note: in this expansion of chan_select! (defined in <chan macros>)
src/main.rs:27:5: 35:6 note: in this expansion of chan_select! (defined in <chan macros>)
   |
   = note: move occurs because `worker_pool` has type `video_ingest::WorkerPool`, which does not implement the `Copy` trait

main.rs

#[macro_use]
extern crate chan;
extern crate chan_signal;
extern crate video_ingest;

use chan_signal::Signal;
use video_ingest::WorkerPool;
use std::thread;
use std::ptr;

///
/// Starts processing
/// 
fn main() 
    let mut worker_pool = WorkerPool  join_handles: vec![] ;

    // Signal gets a value when the OS sent a INT or TERM signal.
    let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]);

    // When our work is complete, send a sentinel value on `sdone`.
    let (sdone, rdone) = chan::sync(0);

    // Run work.
    thread::spawn(move || run(sdone, &mut worker_pool));

    // Wait for a signal or for work to be done.
    chan_select! 
        signal.recv() -> signal => 
            println!("received signal: :?", signal);
            worker_pool.terminate(); // <-- Comment out to compile
        ,
        rdone.recv() => 
            println!("Program completed normally.");
        
    


fn run(sdone: chan::Sender<()>, worker_pool: &mut WorkerPool)  
    loop 
        worker_pool.ingest();
        worker_pool.terminate();
    

lib.rs

extern crate libc;

use std::thread;
use std::thread::JoinHandle;
use std::os::unix::thread::JoinHandleExt;
use libc::pthread_join;
use libc::c_void;
use std::ptr;
use std::time::Duration;

pub struct WorkerPool 
    pub join_handles: Vec<JoinHandle<()>>


impl WorkerPool 

    ///
    /// Does the actual ingestion
    ///
    pub fn ingest(&mut self) 

        // Use 9 threads for an example.
        for i in 0..10 
            self.join_handles.push(
                thread::spawn(move || 

                    // Get the videos
                    println!("Getting videos for thread ", i);
                    thread::sleep(Duration::new(5, 0));
                )
            );
        
    

    ///
    /// Joins all threads
    ///
    pub fn terminate(&mut self) 
        println!("Total handles: ", self.join_handles.len());

        for handle in &self.join_handles 
            println!("Joining thread...");

            unsafe 
                let mut state_ptr: *mut *mut c_void = 0 as *mut *mut c_void;
                pthread_join(handle.as_pthread_t(), state_ptr);
            
        

        self.join_handles = vec![];
    

【问题讨论】:

欢迎来到 Stack Overflow!您是否已经理解为什么停止任意线程是Very Bad Idea(不是特定于语言的问题)?除此之外,您需要提供minimal reproducible example。现在,呈现的代码似乎更像是一个愿望清单和一个隐含的要求社区为您编写实现的请求。表面可见的问题似乎WorkerPool 没有实现Copy,因此将其移动transfers ownership。 您还应该包含您收到的错误消息并显示research and attempts at fixing it you've already performed的内容。 感谢您的快速回复。我已经包含了 WorkerPool 结构的完整代码以及我在编译时收到的错误。我不想停止线程;我想通过加入来收集它们。我同意阻止他们不是一个好主意。 我能够使用here 的指导删除第一个错误。谢谢你的提示。不过,我希望我不必让它变得不安全。 此外,即使编译,连接也不起作用。看起来我正在获取 pthread_t 的内存地址,但连接从未完成。 【参考方案1】:

terminate可以随时调用来领取。

我不想停止线程;我想用join 收集它们。我同意阻止他们不是一个好主意。

这两个陈述对我来说没有意义。您只能在线程完成时加入。 “可中断”和“随时”这两个词意味着您可以尝试停止线程在它仍在进行某些处理时。你想要哪种行为?

如果您希望能够停止部分完成的线程,您必须增强您的代码以检查它是否应该提前退出。这通常会因为您正在执行一些您无法控制的大型计算而变得复杂。理想情况下,你把它分成几块并经常检查你的退出标志。例如,对于视频工作,您可以检查每一帧。那么响应延迟大致就是处理一帧的时间。

这在 C 中真的很容易做到。

这真的很容易做错。例如,当前呈现的代码尝试从两个不同的线程对池执行突变,而无需任何同步。这是制作损坏、难以调试的代码的必经之路。

// 以 9 个线程为例。

0..10 创建 10 个线程。


无论如何,似乎缺少的知识是ArcMutexArc 允许在线程之间共享单个项目的所有权,Mutex 允许在线程之间进行运行时可变借用。

#[macro_use]
extern crate chan;
extern crate chan_signal;

use chan_signal::Signal;
use std::thread::self, JoinHandle;
use std::sync::Arc, Mutex;

fn main() 
    let worker_pool = Arc::new(Mutex::new(WorkerPool::new()));

    let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]);

    let (work_done_tx, work_done_rx) = chan::sync(0);

    let worker_pool_clone = worker_pool.clone();
    thread::spawn(move || run(work_done_tx, worker_pool_clone));

    // Wait for a signal or for work to be done.
    chan_select! 
        signal.recv() -> signal => 
            println!("received signal: :?", signal);
            let mut pool = worker_pool.lock().expect("Unable to lock the pool");
            pool.terminate();
        ,
        work_done_rx.recv() => 
            println!("Program completed normally.");
        
    


fn run(_work_done_tx: chan::Sender<()>, worker_pool: Arc<Mutex<WorkerPool>>)  
    loop 
        let mut worker_pool = worker_pool.lock().expect("Unable to lock the pool");
        worker_pool.ingest();
        worker_pool.terminate();
    


pub struct WorkerPool 
    join_handles: Vec<JoinHandle<()>>,


impl WorkerPool 
    pub fn new() -> Self 
        WorkerPool 
            join_handles: vec![],
        
    

    pub fn ingest(&mut self) 
        self.join_handles.extend(
            (0..10).map(|i| 
                thread::spawn(move || 
                    println!("Getting videos for thread ", i);
                )
            )
        )
    

    pub fn terminate(&mut self) 
        for handle in self.join_handles.drain(..) 
            handle.join().expect("Unable to join thread")
        
    

注意程序逻辑本身还是很差的;即使发送了中断,run 中的loop 仍会继续执行。主线程将锁定互斥锁,加入所有当前线程1,解锁互斥锁并退出程序。但是,循环可以在主线程退出并开始处理一些新数据之前锁定互斥锁!然后程序在处理过程中退出。就好像你根本没有处理中断一样。

1:哈哈,骗你! 此时没有正在运行的线程。由于互斥锁对整个loop 都已锁定,因此唯一可以进行另一个锁定的时间是在循环重置时。但是,由于循环中的最后一条指令是加入所有线程,因此不会再运行了。

我不想让程序在所有线程完成之前终止。

也许这是归约问题的产物,但我看不出无限循环如何退出,所以“我完成了”频道似乎是多余的。

我可能会在收到中断时添加一个“请停止”的标志。然后我会检查它而不是无限循环,并在退出程序之前等待正在运行的线程完成。

use std::sync::atomic::AtomicBool, Ordering;

fn main() 
    let worker_pool = WorkerPool::new();

    let signal = chan_signal::notify(&[Signal::INT, Signal::TERM]);
    let please_stop = Arc::new(AtomicBool::new(false));

    let threads_please_stop = please_stop.clone();
    let runner = thread::spawn(|| run(threads_please_stop, worker_pool));

    // Wait for a signal
    chan_select! 
        signal.recv() -> signal => 
            println!("received signal: :?", signal);
            please_stop.store(true, Ordering::SeqCst);
        ,
    

    runner.join().expect("Unable to join runner thread");


fn run(please_stop: Arc<AtomicBool>, mut worker_pool: WorkerPool)  
    while !please_stop.load(Ordering::SeqCst) 
        worker_pool.ingest();
        worker_pool.terminate();
    

【讨论】:

我非常感谢详细的答案和编辑。当您说程序逻辑“差”时,您的意思是上面的解决方案没有以最佳方式解决程序,还是您脚注中的那部分玩笑?你会有什么不同的做法? 关于您关于中断线程的第一条评论,我确实想确保它们完成,这就是我使用 join 的原因。我不想让程序在所有线程完成之前终止。你还发现那里有冲突吗? @PaulHowardJohnson “差”,我的意思是“可能不是预期的行为”。代码完成所有这些工作以接受信号并做出响应,但程序可以在工作发生时仍然退出。如果在工作过程中退出是可以的,那么我不会做任何特别的事情来处理中断并让它杀死程序;默认行为。 另外,本身,脚注并不是一个玩笑。这只是指出代码中似乎存在不匹配的期望的另一种方式。如果我们只在所有线程都停止运行时退出循环,就不可能有任何线程在运行...... 但这就是问题的症结所在:在所有工作完成之前,程序不能停止运行,因为它可能会破坏从获取视频对象的调用中接收到的数据。如果数据库插入发生在程序终止时,那将是一个严重的问题。您的please_stop 代码正是我在看到您的回复之前所想的。我们只需要确保循环以有序的方式退出,然后我们就可以保证所有线程都已加入,我们可以开始了。

以上是关于运行产生线程的可中断 Rust 程序的主要内容,如果未能解决你的问题,请参考以下文章

Rust编程语言入门之无畏并发

线程中断以及线程中断引发的那些问题

探究wait与waitpid之间的那些事

我们必须在ondestroy方法中中断后台线程吗?

在Windows或Linux等高级系统里,驱动程序是以线程的形式独立运行,还是做为过程被线程调用运行?

线程 'main' 在 Rust 中溢出了它的堆栈