Rust中的channel
Posted mutourend
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Rust中的channel相关的知识,希望对你有一定的参考价值。
1. std::sync::mpsc::channel
支持多Sender,仅支持1个Receiver,可保证接收消息的顺序与发送的顺序一致。
pub fn channel<T>() -> (Sender<T>, Receiver<T>)
会创建新的async channel,返回的是sender/receiver对。
所有经由Sender
发送的数据顺序,与 在Receiver
端收到的数据顺序是一致的。
没有任何send操作可阻塞线程,该channel可认为是具有“无限buffer”的,而recv操作将阻塞知道有消息过来。(而对于sync_channel,当其达到buffer limit时会阻塞)。
Sender
可复制多次send
到同一channel,但仅支持一个`Receiver``。
当Receiver
断开时,Sender
的send操作会受到SendError。同理,当Sender
断开时,Receiver
的recv操作将受到RecvError。
示例:
use std::sync::mpsc::channel;
use std::thread;
let (sender, receiver) = channel();
// Spawn off an expensive computation
thread::spawn(move|| {
sender.send(expensive_computation()).unwrap();
});
// Do some useful work for awhile
// Let's see what that answer was
println!("{:?}", receiver.recv().unwrap());
2. crossbeam_channel
为多生产者,多接收者的消息传输通道。
channel通道的创建方式有2种:
- bounded:通道具有容量上限,即通道内同时容纳的消息数有限制。
use crossbeam_channel::bounded;
// Create a channel that can hold at most 5 messages at a time.
let (s, r) = bounded(5);
// Can send only 5 messages without blocking.
for i in 0..5 {
s.send(i).unwrap();
}
// Another call to `send` would block because the channel is full.
// s.send(5).unwrap();
- unbounded:通道的容量无上限,即通道内可同时容纳任意多的消息。
use crossbeam_channel::unbounded;
// Create an unbounded channel.
let (s, r) = unbounded();
// Can send any number of messages into the channel without blocking.
for i in 0..1000 {
s.send(i).unwrap();
}
特例情况为0容量通道,即通道内无法容纳消息。对应的,发送和接收操作必须成对出现:
use std::thread;
use crossbeam_channel::bounded;
// Create a zero-capacity channel.
let (s, r) = bounded(0);
// Sending blocks until a receive operation appears on the other side.
thread::spawn(move || s.send("Hi!").unwrap());
// Receiving blocks until a send operation appears on the other side.
assert_eq!(r.recv(), Ok("Hi!"));
支持复制Sender和Receiver:
use crossbeam_channel::unbounded;
let (s1, r1) = unbounded();
let (s2, r2) = (s1.clone(), r1.clone());
let (s3, r3) = (s2.clone(), r2.clone());
s1.send(10).unwrap();
s2.send(20).unwrap();
s3.send(30).unwrap();
assert_eq!(r3.recv(), Ok(10));
assert_eq!(r1.recv(), Ok(20));
assert_eq!(r2.recv(), Ok(30));
当所有的Senders或Receivers 与通道连接断开,则通道处于disconnected状态,消息不再可发送成功,但是通道内剩余的消息仍然可被接收。对处于disconnected状态的通道进行发送或接收操作都不会阻塞:
use crossbeam_channel::{unbounded, RecvError};
let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// The only sender is dropped, disconnecting the channel.
drop(s);
// The remaining messages can be received.
assert_eq!(r.recv(), Ok(1));
assert_eq!(r.recv(), Ok(2));
assert_eq!(r.recv(), Ok(3));
// There are no more messages in the channel.
assert!(r.is_empty());
// Note that calling `r.recv()` does not block.
// Instead, `Err(RecvError)` is returned immediately.
assert_eq!(r.recv(), Err(RecvError));
发送和接收操作支持3种模式:
- 非阻塞(立即返回成功或失败)
- 阻塞(等待操作成功或通道disconnected)
- 超时阻塞(仅阻塞一段时间)
use crossbeam_channel::{bounded, RecvError, TryRecvError};
let (s, r) = bounded(1);
// Send a message into the channel.
s.send("foo").unwrap();
// This call would block because the channel is full.
// s.send("bar").unwrap();
// Receive the message.
assert_eq!(r.recv(), Ok("foo"));
// This call would block because the channel is empty.
// r.recv();
// Try receiving a message without blocking.
assert_eq!(r.try_recv(), Err(TryRecvError::Empty));
// Disconnect the channel.
drop(s);
// This call doesn't block because the channel is now disconnected.
assert_eq!(r.recv(), Err(RecvError));
借助try_iter
可非阻塞的获取通道内的所有消息:
use crossbeam_channel::unbounded;
let (s, r) = unbounded();
s.send(1).unwrap();
s.send(2).unwrap();
s.send(3).unwrap();
// No need to drop the sender.
// Receive all messages currently in the channel.
let v: Vec<_> = r.try_iter().collect();
assert_eq!(v, [1, 2, 3]);
参考资料
[1] https://doc.rust-lang.org/std/sync/mpsc/fn.channel.html
[2] https://docs.rs/crossbeam-channel/0.5.1/crossbeam_channel/
以上是关于Rust中的channel的主要内容,如果未能解决你的问题,请参考以下文章
[Go] 通过 17 个简短代码片段,切底弄懂 channel 基础
golang 片段7 for https://medium.com/@francesc/why-are-there-nil-channels-in-go-9877cc0b2308