Rust中的channel

Posted mutourend

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rust中的channel相关的知识,希望对你有一定的参考价值。

1. std::sync::mpsc::channel

支持多Sender,仅支持1个Receiver,可保证接收消息的顺序与发送的顺序一致。

pub fn channel<T>() -> (Sender<T>, Receiver<T>)

会创建新的async channel,返回的是sender/receiver对。
所有经由Sender发送的数据顺序,与 在Receiver端收到的数据顺序是一致的。
没有任何send操作可阻塞线程,该channel可认为是具有“无限buffer”的,而recv操作将阻塞知道有消息过来。(而对于sync_channel,当其达到buffer limit时会阻塞)。

Sender可复制多次send到同一channel,但仅支持一个`Receiver``。

Receiver断开时,Sender的send操作会受到SendError。同理,当Sender断开时,Receiver的recv操作将受到RecvError。

示例:

use std::sync::mpsc::channel;
use std::thread;

let (sender, receiver) = channel();

// Spawn off an expensive computation
thread::spawn(move|| {
    sender.send(expensive_computation()).unwrap();
});

// Do some useful work for awhile

// Let's see what that answer was
println!("{:?}", receiver.recv().unwrap());

2. crossbeam_channel

为多生产者,多接收者的消息传输通道。
channel通道的创建方式有2种:

  • bounded:通道具有容量上限,即通道内同时容纳的消息数有限制。
use crossbeam_channel::bounded;

// Create a channel that can hold at most 5 messages at a time.
let (s, r) = bounded(5);

// Can send only 5 messages without blocking.
for i in 0..5 {
    s.send(i).unwrap();
}

// Another call to `send` would block because the channel is full.
// s.send(5).unwrap();

  • unbounded:通道的容量无上限,即通道内可同时容纳任意多的消息。
use crossbeam_channel::unbounded;

// Create an unbounded channel.
let (s, r) = unbounded();

// Can send any number of messages into the channel without blocking.
for i in 0..1000 {
    s.send(i).unwrap();
}

特例情况为0容量通道,即通道内无法容纳消息。对应的,发送和接收操作必须成对出现:

use std::thread;
use crossbeam_channel::bounded;

// Create a zero-capacity channel.
let (s, r) = bounded(0);

// Sending blocks until a receive operation appears on the other side.
thread::spawn(move || s.send("Hi!").unwrap());

// Receiving blocks until a send operation appears on the other side.
assert_eq!(r.recv(), Ok("Hi!"));

支持复制Sender和Receiver:

use crossbeam_channel::unbounded;

let (s1, r1) = unbounded();
let (s2, r2) = (s1.clone(), r1.clone());
let (s3, r3) = (s2.clone(), r2.clone());

s1.send(10).unwrap();
s2.send(20).unwrap();
s3.send(30).unwrap();

assert_eq!(r3.recv(), Ok(10));
assert_eq!(r1.recv(), Ok(20));
assert_eq!(r2.recv(), Ok(30));

当所有的Senders或Receivers 与通道连接断开,则通道处于disconnected状态,消息不再可发送成功,但是通道内剩余的消息仍然可被接收。对处于disconnected状态的通道进行发送或接收操作都不会阻塞:

use crossbeam_channel::{unbounded, RecvError};

let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();

// The only sender is dropped, disconnecting the channel.
drop(s);

// The remaining messages can be received.
assert_eq!(r.recv(), Ok(1));
assert_eq!(r.recv(), Ok(2));
assert_eq!(r.recv(), Ok(3));

// There are no more messages in the channel.
assert!(r.is_empty());

// Note that calling `r.recv()` does not block.
// Instead, `Err(RecvError)` is returned immediately.
assert_eq!(r.recv(), Err(RecvError));

发送和接收操作支持3种模式:

  • 非阻塞(立即返回成功或失败)
  • 阻塞(等待操作成功或通道disconnected)
  • 超时阻塞(仅阻塞一段时间)
use crossbeam_channel::{bounded, RecvError, TryRecvError};

let (s, r) = bounded(1);

// Send a message into the channel.
s.send("foo").unwrap();

// This call would block because the channel is full.
// s.send("bar").unwrap();

// Receive the message.
assert_eq!(r.recv(), Ok("foo"));

// This call would block because the channel is empty.
// r.recv();

// Try receiving a message without blocking.
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));

// Disconnect the channel.
drop(s);

// This call doesn't block because the channel is now disconnected.
assert_eq!(r.recv(), Err(RecvError));

借助try_iter可非阻塞的获取通道内的所有消息:

use crossbeam_channel::unbounded;

let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// No need to drop the sender.

// Receive all messages currently in the channel.
let v: Vec<_> = r.try_iter().collect();

assert_eq!(v, [1, 2, 3]);

参考资料

[1] https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html
[2] https://docs.rs/crossbeam-channel/0.5.1/crossbeam_channel/

以上是关于Rust中的channel的主要内容,如果未能解决你的问题,请参考以下文章

[Go] 通过 17 个简短代码片段,切底弄懂 channel 基础

从 rust 中的通道迭代器获取第一个接收到的值

如何可靠地清理执行阻塞 IO 的 Rust 线​​程?

使用通道在线程之间传递Rust pnet数据包

golang 片段7 for https://medium.com/@francesc/why-are-there-nil-channels-in-go-9877cc0b2308

透过 Rust 探索系统的本原:并发原语