使用由`fork`创建的多个C线程的回调函数时,Rust Mutex不起作用

Posted

技术标签:

【中文标题】使用由`fork`创建的多个C线程的回调函数时,Rust Mutex不起作用【英文标题】:Rust Mutex is not working when using a callback function from multiple C threads created by `fork` 【发布时间】:2019-03-26 09:35:16 【问题描述】:

我正在使用 C 库 Cuba,它使用从 C 中创建的多个线程调用的回调函数。 Cuba 并行化基于 fork/wait POSIX 函数而不是 pthreads (arxiv.org/abs/1408.6373)。它在core 参数中给出了当前线程。

我正在尝试将此回调函数的结果记录到屏幕和文件中。如果我使用println!,我会得到预期的输出,但如果我使用slog,当我使用Mutex 排水管时,输出会被破坏。如果我使用async 漏极,我根本没有输出。

Mutex 是否没有锁定,因为它看不到该函数实际上是从另一个线程调用的?我试图用 Rust 线​​程重新创建问题,但不能。最好我想让async 排水管工作。

以下是给出问题行为的示例程序。回调将vegas 函数的最后一个参数作为其参数之一。这是记录器克隆的向量。这样,每个核心都应该有自己的记录器副本:

#[macro_use]
extern crate slog;
extern crate cuba;
extern crate slog_term;

use slog::Drain;

// this function is called from different c threads
// `core` indicates which thread
fn integrand(
    _x: &[f64],
    _f: &mut [f64],
    loggers: &mut Vec<slog::Logger>,
    _nvec: usize,
    core: i32,
) -> Result<(), &'static str> 
    info!(loggers[core as usize], "A\nB\nC");

    Ok(())


fn main() 
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let mut integrator = cuba::CubaIntegrator::new(integrand);
    integrator.set_cores(10, 1000); // set 10 cores

    integrator.vegas(
        1,
        1,
        cuba::CubaVerbosity::Progress,
        0,
        vec![log.clone(); 11],
    );

输出:

C 
INFO Mar 26A
B
C 10:27
:42.195 MarINFO 26  10:27A
B
C:42.195
 MarINFO 26  10:27A
B
C:42.195
 INFO A
B
C
Mar 26 10:27:42.196 INFO A
B
C
Mar 26 10:27:42.196 INFO A
B
C

【问题讨论】:

尝试将log.clone()(克隆根记录器)替换为log.new(o!())(构建子记录器)。 (Logger::new docs) 不幸的是,这无济于事。显式创建新的子记录器(vec![log.new(o!()), log.new(o!()),...] 也无济于事。 【参考方案1】:

古巴 C 库has this to say:

Windows 用户:Cuba 3 及更高版本使用fork(2) 来并行化执行线程。但是,此 POSIX 函数不是 Windows API 的一部分,而且还以一种基本方式使用,因此不能简单地使用 CreateProcess 等来解决它。唯一可行的仿真似乎可以通过 Cygwin 获得。

这是代码的复制品。我们fork 然后孩子和父母在打印东西时尝试持有互斥锁。插入sleep 以鼓励操作系统调度程序尝试其他线程:

use nix::unistd::fork, ForkResult; // 0.13.0
use std::sync::Mutex, thread, time::Duration;

fn main() 
    let shared = Mutex::new(10);

    match fork() 
        Ok(ForkResult::Parent  .. ) => 
            let max = shared.lock().unwrap();
            for _ in 0..*max 
                println!("Parent");
                thread::sleep(Duration::from_millis(10));
            
        
        Ok(ForkResult::Child) => 
            let max = shared.lock().unwrap();
            for _ in 0..*max 
                println!("Child");
                thread::sleep(Duration::from_millis(10));
            
        
        Err(e) => 
            eprintln!("Error: ", e);
        
    

$ cargo run

Parent
Child
Parent
Child
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent

fork 与线程一起使用真的很难处理;我清楚地记得以前寻找过与此相关的可怕问题。我发现有两个深入研究的资源:

Mutexes And fork()ing In Shared Libraries Synchronization, Part 1: Mutex Locks

后者说(强调我的):

我可以在分叉之前创建互斥锁吗?

是的 - 但是子进程和父进程将不共享虚拟内存,并且每个进程都有一个相互独立的互斥体

(高级说明:如果使用正确的选项创建互斥体并使用共享内存段,则有一些使用共享内存的高级选项允许子级和父级共享互斥锁。请参阅procs, fork(), and mutexes)


如果我使用异步消耗,我根本没有输出。

另见:

Why doesn't a lazy_static slog::Logger print until a non-static logger is used?

我会相信 Cuba Rust 库。主要有两点:

    如果正在创建线程,则用户数据泛型类型应绑定SyncSend,仅限于可以安全地在线程之间共享/传输数据的类型。

    传递给integrand 函数的用户数据应该&amp;mut。 Rust 的一个基本概念是,任何时候都只能有一个对任何数据的可变引用。古巴可以轻松地让您规避这一点。

这里是 Cubase Rust 和 C 库的尝试复制:

#[macro_use]
extern crate slog;

use slog::Drain;

fn integrand(loggers: &mut Vec<slog::Logger>, core: i32) 
    info!(loggers[core as usize], "A\nB\nC\n", core);


fn main() 
    let decorator = slog_term::TermDecorator::new().build();
    let drain = slog_term::CompactFormat::new(decorator).build();
    let drain = std::sync::Mutex::new(drain).fuse();

    let log = slog::Logger::root(drain, o!());

    let logs = vec![log.clone(); 11];

    cuba_repro(logs, integrand);


use std::ffi::c_void, thread;

type Integrand<T> = fn(&mut T, i32);

fn cuba_repro<T>(mut user_data: T, mut integrand: Integrand<T>) 
    // From the `vegas` method
    let user_data_ptr = &mut user_data as *mut _ as *mut c_void;
    let integrand_ptr = &mut integrand as *mut _ as *mut c_void;

    unsafe  cuba_repro_ffi::<T>(user_data_ptr, integrand_ptr) 


unsafe fn cuba_repro_ffi<T>(user_data: *const c_void, integrand: *const c_void) 
    let user_data = FfiDoesNotCareAboutSendOrSync(user_data);
    let integrand = FfiDoesNotCareAboutSendOrSync(integrand);

    let threads: Vec<_> = (0..4).map(move |i| 
        thread::spawn(move || 
            // C doesn't care about this pedantry
            let user_data = &mut *(user_data.0 as *const T as *mut T);
            let integrand = &mut *(integrand.0 as *const Integrand<T> as *mut Integrand<T>);

            // From the `c_integrand` function
            let k: &mut T = &mut *(user_data as *mut _);
            let _ignored = integrand(k, i);
        )
    ).collect();

    for t in threads  t.join().unwrap() 


#[derive(Copy, Clone)]
struct FfiDoesNotCareAboutSendOrSync<T>(T);
unsafe impl<T> Send for FfiDoesNotCareAboutSendOrSync<T> 
unsafe impl<T> Sync for FfiDoesNotCareAboutSendOrSync<T> 

为了让 Rust 编译器忽略 Cuba 库和相关 FFI 正在执行的大量不安全和违规行为,我必须进行 大量 更改。

这个代码示例确实实际上每个都按顺序打印出 4 个日志语句,所以这不是一个完整的答案。但是,我相当确定 Cuba 库正在触发未定义的行为,这意味着任何结果都是可能的,包括明显有效。

【讨论】:

我编写了 Rust cuba 绑定,但我知道它并不完美。除了让用户使用 C cuba 库移交的核心 id 之外,我没有找到更好的方法来获得某种形式的内存安全。您认为问题出在绑定还是 Cuba 处理其多线程的方式上?关于异步,我认为这个问题是并发内存访问问题,因为我的程序运行了几分钟并且应该产生相当多的输出。 @BenRuijl 我对底层代码了解不多,无法真正提供有效的指导。古巴的主题是如何开始的?如果它们不是通过 pthreads(或 Rust 用于 thread::spawn 的等效 OS 机制),我完全可以看到 Mutex 不会按预期锁定,因为簿记数据没有在它的任何线程上正确设置正在使用。 @BenRuijl 我没有找到更好的方法来获得某种形式的内存安全 - 如果您将内存不安全暴露给您的用户,您需要将您的函数标记为不安全并且解释用户打算如何避免不安全。在不对所有人造成伤害的情况下,让代码看起来很安全。 显然 Cuba 并行化是基于 fork/wait POSIX 函数而不是 pthreads (arxiv.org/abs/1408.6373)。你知道如何让 Rust 明白回调函数是从不同的分支调用的吗? 感谢您的更新。我想记录到不同的文件是最简单的选择......

以上是关于使用由`fork`创建的多个C线程的回调函数时,Rust Mutex不起作用的主要内容,如果未能解决你的问题,请参考以下文章

类内定义线程的回调函数问题

linux下多进程或者多线程编程的问题。新手,望指教!

main函数由哪个进程创建?

fork/Join 线程可以创建多个吗?

Unix编程-线程

基于LINUX下的多线程