使用由`fork`创建的多个C线程的回调函数时,Rust Mutex不起作用
Posted
技术标签:
【中文标题】使用由`fork`创建的多个C线程的回调函数时,Rust Mutex不起作用【英文标题】:Rust Mutex is not working when using a callback function from multiple C threads created by `fork` 【发布时间】:2019-03-26 09:35:16 【问题描述】:我正在使用 C 库 Cuba,它使用从 C 中创建的多个线程调用的回调函数。 Cuba 并行化基于 fork
/wait
POSIX 函数而不是 pthreads (arxiv.org/abs/1408.6373)。它在core
参数中给出了当前线程。
我正在尝试将此回调函数的结果记录到屏幕和文件中。如果我使用println!
,我会得到预期的输出,但如果我使用slog
,当我使用Mutex
排水管时,输出会被破坏。如果我使用async
漏极,我根本没有输出。
Mutex
是否没有锁定,因为它看不到该函数实际上是从另一个线程调用的?我试图用 Rust 线程重新创建问题,但不能。最好我想让async
排水管工作。
以下是给出问题行为的示例程序。回调将vegas
函数的最后一个参数作为其参数之一。这是记录器克隆的向量。这样,每个核心都应该有自己的记录器副本:
#[macro_use]
extern crate slog;
extern crate cuba;
extern crate slog_term;
use slog::Drain;
// this function is called from different c threads
// `core` indicates which thread
fn integrand(
_x: &[f64],
_f: &mut [f64],
loggers: &mut Vec<slog::Logger>,
_nvec: usize,
core: i32,
) -> Result<(), &'static str>
info!(loggers[core as usize], "A\nB\nC");
Ok(())
fn main()
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build();
let drain = std::sync::Mutex::new(drain).fuse();
let log = slog::Logger::root(drain, o!());
let mut integrator = cuba::CubaIntegrator::new(integrand);
integrator.set_cores(10, 1000); // set 10 cores
integrator.vegas(
1,
1,
cuba::CubaVerbosity::Progress,
0,
vec![log.clone(); 11],
);
输出:
C
INFO Mar 26A
B
C 10:27
:42.195 MarINFO 26 10:27A
B
C:42.195
MarINFO 26 10:27A
B
C:42.195
INFO A
B
C
Mar 26 10:27:42.196 INFO A
B
C
Mar 26 10:27:42.196 INFO A
B
C
【问题讨论】:
尝试将log.clone()
(克隆根记录器)替换为log.new(o!())
(构建子记录器)。 (Logger::new docs)
不幸的是,这无济于事。显式创建新的子记录器(vec![log.new(o!()), log.new(o!()),...]
也无济于事。
【参考方案1】:
古巴 C 库has this to say:
Windows 用户:Cuba 3 及更高版本使用
fork(2)
来并行化执行线程。但是,此 POSIX 函数不是 Windows API 的一部分,而且还以一种基本方式使用,因此不能简单地使用CreateProcess
等来解决它。唯一可行的仿真似乎可以通过 Cygwin 获得。
这是代码的复制品。我们fork
然后孩子和父母在打印东西时尝试持有互斥锁。插入sleep
以鼓励操作系统调度程序尝试其他线程:
use nix::unistd::fork, ForkResult; // 0.13.0
use std::sync::Mutex, thread, time::Duration;
fn main()
let shared = Mutex::new(10);
match fork()
Ok(ForkResult::Parent .. ) =>
let max = shared.lock().unwrap();
for _ in 0..*max
println!("Parent");
thread::sleep(Duration::from_millis(10));
Ok(ForkResult::Child) =>
let max = shared.lock().unwrap();
for _ in 0..*max
println!("Child");
thread::sleep(Duration::from_millis(10));
Err(e) =>
eprintln!("Error: ", e);
$ cargo run
Parent
Child
Parent
Child
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
Child
Parent
将fork
与线程一起使用真的很难处理;我清楚地记得以前寻找过与此相关的可怕问题。我发现有两个深入研究的资源:
后者说(强调我的):
我可以在分叉之前创建互斥锁吗?
是的 - 但是子进程和父进程将不共享虚拟内存,并且每个进程都有一个相互独立的互斥体。
(高级说明:如果使用正确的选项创建互斥体并使用共享内存段,则有一些使用共享内存的高级选项允许子级和父级共享互斥锁。请参阅procs, fork(), and mutexes)
如果我使用异步消耗,我根本没有输出。
另见:
Why doesn't a lazy_static slog::Logger print until a non-static logger is used?我会不相信 Cuba Rust 库。主要有两点:
如果正在创建线程,则用户数据泛型类型应绑定Sync
或Send
,仅限于可以安全地在线程之间共享/传输数据的类型。
传递给integrand
函数的用户数据应该不是&mut
。 Rust 的一个基本概念是,任何时候都只能有一个对任何数据的可变引用。古巴可以轻松地让您规避这一点。
这里是 Cubase Rust 和 C 库的尝试复制:
#[macro_use]
extern crate slog;
use slog::Drain;
fn integrand(loggers: &mut Vec<slog::Logger>, core: i32)
info!(loggers[core as usize], "A\nB\nC\n", core);
fn main()
let decorator = slog_term::TermDecorator::new().build();
let drain = slog_term::CompactFormat::new(decorator).build();
let drain = std::sync::Mutex::new(drain).fuse();
let log = slog::Logger::root(drain, o!());
let logs = vec![log.clone(); 11];
cuba_repro(logs, integrand);
use std::ffi::c_void, thread;
type Integrand<T> = fn(&mut T, i32);
fn cuba_repro<T>(mut user_data: T, mut integrand: Integrand<T>)
// From the `vegas` method
let user_data_ptr = &mut user_data as *mut _ as *mut c_void;
let integrand_ptr = &mut integrand as *mut _ as *mut c_void;
unsafe cuba_repro_ffi::<T>(user_data_ptr, integrand_ptr)
unsafe fn cuba_repro_ffi<T>(user_data: *const c_void, integrand: *const c_void)
let user_data = FfiDoesNotCareAboutSendOrSync(user_data);
let integrand = FfiDoesNotCareAboutSendOrSync(integrand);
let threads: Vec<_> = (0..4).map(move |i|
thread::spawn(move ||
// C doesn't care about this pedantry
let user_data = &mut *(user_data.0 as *const T as *mut T);
let integrand = &mut *(integrand.0 as *const Integrand<T> as *mut Integrand<T>);
// From the `c_integrand` function
let k: &mut T = &mut *(user_data as *mut _);
let _ignored = integrand(k, i);
)
).collect();
for t in threads t.join().unwrap()
#[derive(Copy, Clone)]
struct FfiDoesNotCareAboutSendOrSync<T>(T);
unsafe impl<T> Send for FfiDoesNotCareAboutSendOrSync<T>
unsafe impl<T> Sync for FfiDoesNotCareAboutSendOrSync<T>
为了让 Rust 编译器忽略 Cuba 库和相关 FFI 正在执行的大量不安全和违规行为,我必须进行 大量 更改。
这个代码示例确实实际上每个都按顺序打印出 4 个日志语句,所以这不是一个完整的答案。但是,我相当确定 Cuba 库正在触发未定义的行为,这意味着任何结果都是可能的,包括明显有效。
【讨论】:
我编写了 Rust cuba 绑定,但我知道它并不完美。除了让用户使用 C cuba 库移交的核心 id 之外,我没有找到更好的方法来获得某种形式的内存安全。您认为问题出在绑定还是 Cuba 处理其多线程的方式上?关于异步,我认为这个问题是并发内存访问问题,因为我的程序运行了几分钟并且应该产生相当多的输出。 @BenRuijl 我对底层代码了解不多,无法真正提供有效的指导。古巴的主题是如何开始的?如果它们不是通过 pthreads(或 Rust 用于thread::spawn
的等效 OS 机制),我完全可以看到 Mutex
不会按预期锁定,因为簿记数据没有在它的任何线程上正确设置正在使用。
@BenRuijl 我没有找到更好的方法来获得某种形式的内存安全 - 如果您将内存不安全暴露给您的用户,您需要将您的函数标记为不安全并且解释用户打算如何避免不安全。在不对所有人造成伤害的情况下,让代码看起来很安全。
显然 Cuba 并行化是基于 fork/wait POSIX 函数而不是 pthreads (arxiv.org/abs/1408.6373)。你知道如何让 Rust 明白回调函数是从不同的分支调用的吗?
感谢您的更新。我想记录到不同的文件是最简单的选择......以上是关于使用由`fork`创建的多个C线程的回调函数时,Rust Mutex不起作用的主要内容,如果未能解决你的问题,请参考以下文章