使用通道时Rust中的内存分配

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用通道时Rust中的内存分配相关的知识,希望对你有一定的参考价值。

最近,我开始和Rust一起玩。试图了解它是如何工作的。在Kotlin,Typescript和Go之后,我感到头疼。)我编写了一个小应用程序,该程序从通道读取消息并将其写入文件。我收到意外的内存使用情况。下面的代码。如果有人能解释我做错了什么,我将非常感谢。

use std::sync::mpsc::{Sender, Receiver};
use std::sync::mpsc;
use std::thread;
use std::io::{Write};
use std::fs::File;
use std::fs::OpenOptions;
use jemalloc_ctl::{stats, epoch};

const MSG_NUM: usize = 10000000;
const BUFFER_SIZE: usize = 1000;

#[global_allocator]
static ALLOC: jemallocator::Jemalloc = jemallocator::Jemalloc;

fn main() {

    let e = epoch::mib().unwrap();
    let allocated = stats::allocated::mib().unwrap();
    let resident = stats::resident::mib().unwrap();

    let (sender, receiver): (Sender<Vec<u8>>, Receiver<Vec<u8>>) = mpsc::channel();
    let (buffered_sender, buffered_receiver): (Sender<Vec<Message>>, Receiver<Vec<Message>>) = mpsc::channel();

    {
        for _ in 0..MSG_NUM {
            match sender.send(String::from("Hello World!").into_bytes()) {
                Ok(_) => continue,
                Err(err) => {
                    println!("Error {}", err);
                    continue
                },
            }
        }
        drop(sender)
    }

    e.advance().unwrap();
    println!("Step 1. {} MB allocated. {} MB resident", allocated.read().unwrap() / 1000000, resident.read().unwrap() / 1000000);

    {
        let mut buffer: Vec<Message> = Vec::new();

        loop {

            let next_msg = match receiver.recv() {
                Ok(msg) => msg,
                Err(_) => {
                    println!("Channel closed for \"receiver\".");
                    break;
                }
            };

            buffer.push(Message {bytes: next_msg});

            if buffer.len() == BUFFER_SIZE {
                match buffered_sender.send(buffer.clone()) {
                    Ok(_) => {},
                    Err(err) => {
                        println!("Error: {}", err);
                        continue;
                    }
                }
                buffer.clear()
            }
        }

        drop(buffered_sender);
    };

    e.advance().unwrap();
    println!("Step 2. Excpected to see same amount of memory like in Step 1, but was: {} MB allocated. {} MB resident", allocated.read().unwrap() / 1000000, resident.read().unwrap() / 1000000);

    thread::spawn(move || {
        let mut file = OpenOptions::new().create(true).append(true).open("foo.txt").unwrap();

        loop {
            match buffered_receiver.recv() {
                Ok(messages) => {
                    on_msg(messages, &mut file);
                },
                Err(_) => {
                    println!("Channel closed for \"buffered_receiver\".");
                    break;
                }
            };
        }

        e.advance().unwrap();
        println!("Step 3. Excpected to see around 0 MB allocated, but was: {} MB allocated. {} MB resident", allocated.read().unwrap() / 1000000, resident.read().unwrap() / 1000000);
    });

    loop {

    }
}

fn on_msg(buffer: Vec<Message>, file: &mut File) {
    let mut bytes: Vec<u8> = Vec::new();
    for msg in buffer.iter() {
        bytes.extend(msg.bytes.iter());
    }
    let _ = file.write(&*bytes); 
}

#[derive(Clone)]
struct Message {
    bytes: Vec<u8>
}

执行结果:

Step 1. 640 MB allocated. 653 MB resident
Channel closed for "receiver".
Step 2. Excpected to see same amount of memory like in Step 1, but was: 886 MB allocated. 942 MB resident
Channel closed for "buffered_receiver".
Step 3. Excpected to see around 0 MB allocated, but was: 480 MB allocated. 880 MB resident
答案

您只删除通道的“发送者”端,我希望大多数缓冲都发生在接收器端,因为发送永不失败,并且您可以在关闭通道后再使用recv()。

运行您的原始脚本,我明白了:

Step 1. 640 MB allocated. 652 MB resident
Channel closed for "receiver".
Step 2. Excpected to see same amount of memory like in Step 1, but was: 886 MB allocated. 943 MB resident
Channel closed for "buffered_receiver".
Step 3. Excpected to see around 0 MB allocated, but was: 480 MB allocated. 943 MB resident

修改脚本以删除接收器(在读取线程的末尾但在推进时期之前,与receiverbuffered_sender同时[buffered_receiver),我得到:

Step 1. 640 MB allocated. 652 MB resident
Channel closed for "receiver".
Step 2. Excpected to see same amount of memory like in Step 1, but was: 406 MB allocated. 943 MB resident
Channel closed for "buffered_receiver".
Step 3. Excpected to see around 0 MB allocated, but was: 0 MB allocated. 943 MB resident

偶然地crossbeam channel(通常认为在每个点上都优于stdlib)的行为似乎与您期望的一样,将脚本转换为它们(这也可以简化它,因为可以迭代crossbeam通道) :

Step 1. 490 MB allocated. 508 MB resident
Channel closed for "receiver".
Step 2. Excpected to see same amount of memory like in Step 1, but was: 406 MB allocated. 790 MB resident
Channel closed for "buffered_receiver".
Step 3. Excpected to see around 0 MB allocated, but was: 0 MB allocated. 790 MB resident

以上是关于使用通道时Rust中的内存分配的主要内容,如果未能解决你的问题,请参考以下文章

Rust - 使用 Rayon 进行排列 - 向量内存分配错误

Rust 内存管理

怎样才能强制 Rust 获得分配的内存的所有权,而不是通过其安全方法分配的内存?

如何确定 Rust 中的 new() 何时在堆栈或堆上分配

golang 转到片段以观察运行时行为和内存分配

Rust 是不是将添加到向量中的单个项目装箱?