控制衍生期货的数量以产生背压
Posted
技术标签:
【中文标题】控制衍生期货的数量以产生背压【英文标题】:Controlling the number of spawned futures to create backpressure 【发布时间】:2018-06-24 08:22:31 【问题描述】:我正在使用futures-rs powered version of the Rusoto AWS Kinesis library。我需要生成 AWS Kinesis 请求的深层管道以实现高吞吐量,因为 Kinesis 对每个 HTTP 请求的记录限制为 500 条。结合发送请求的 50 毫秒延迟,我需要开始生成许多并发请求。我希望在大约 100 个正在进行的请求中创建某个位置。
Rusoto put_records
函数签名如下所示:
fn put_records(
&self,
input: &PutRecordsInput,
) -> RusotoFuture<PutRecordsOutput, PutRecordsError>
RusotoFuture
是这样定义的包装器:
/// Future that is returned from all rusoto service APIs.
pub struct RusotoFuture<T, E>
inner: Box<Future<Item = T, Error = E> + 'static>,
内部Future
被包裹,但RusutoFuture
仍然实现Future::poll()
,所以我相信它与futures-rs
生态系统兼容。 RusotoFuture
提供同步调用:
impl<T, E> RusotoFuture<T, E>
/// Blocks the current thread until the future has resolved.
///
/// This is meant to provide a simple way for non-async consumers
/// to work with rusoto.
pub fn sync(self) -> Result<T, E>
self.wait()
我可以发出请求并sync()
它,从 AWS 获取结果。我想创建许多请求,将它们放入某种队列/列表中,然后收集完成的请求。如果请求出错,我需要重新发出请求(这在 Kinesis 中有些正常,尤其是在达到分片吞吐量限制时)。如果请求成功完成,我应该发出一个包含新数据的请求。我可以为每个请求生成一个线程并同步它,但是当我运行异步 IO 线程时,这似乎效率低下。
我曾尝试在我的应用程序线程中使用 futures::sync::mpsc::channel
(不是从 Tokio 反应器内部运行),但每当我克隆 tx
时,它都会生成自己的缓冲区,从而消除 send
上的任何类型的背压:
fn kinesis_pipeline(client: DefaultKinesisClient, stream_name: String, num_puts: usize, puts_size: usize)
use futures::sync::mpsc:: channel, spawn ;
use futures:: Sink, Future, Stream ;
use futures::stream::Sender;
use rusoto_core::reactor::DEFAULT_REACTOR;
let client = Arc::new(KinesisClient::simple(Region::UsWest2));
let data = FauxData::new(); // a data generator for testing
let (mut tx, mut rx) = channel(1);
for rec in data
tx.clone().send(rec);
没有克隆,我有错误:
error[E0382]: use of moved value: `tx`
--> src/main.rs:150:9
|
150 | tx.send(rec);
| ^^ value moved here in previous iteration of loop
|
= note: move occurs because `tx` has type `futures::sync::mpsc::Sender<rusoto_kinesis::PutRecordsRequestEntry>`, which does not implement the `Copy` trait
我还根据建议查看了futures::mpsc::sync::spawn
,但它拥有rx
(作为Stream
)的所有权,并且不能解决我对tx
的Copy
造成无限行为的问题。
我希望如果我能让channel
/spawn
使用正常,我将拥有一个系统,它需要RusotoFuture
s,等待它们完成,然后为我提供一个简单的方法来完成结果来自我的应用程序线程。
【问题讨论】:
你看过Stream::buffered
了吗?将它与(可能是unsync
)频道结合起来,也许它可以满足您的需求。无论如何,您可能需要通过(Rc<RefCell<..>>
或Arc<Mutex<..>>
)将句柄共享给mpsc::Sender
。
@Stefan 为什么需要Rc
/ Arc
?你不应该能够克隆Sender
吗?
@Shepmaster 对,它自己已经这样做了,clone
应该没问题。我猜“每当我 clone
tx 它生成自己的缓冲区时”让我感到困惑(这不应该是真的,缓冲区应该被共享)。
另外channel(1)
是一个非常小的缓冲区,考虑到您正在构建循环依赖关系:如果一个请求可以触发一个新请求,但必须等待推送一个新请求直到完成,它将阻塞永远。我会改用unbounded()
。
关于请求触发新请求导致死锁的要点。现在我只是想了解请求的背压管道的核心思想。我会注意到这是一个未来的问题。
【参考方案1】:
据我所知,channel
的问题不是Sender
的单个克隆将容量增加一,而是您为要发送的每个项目克隆了Sender
.
您在没有clone
的情况下看到的错误来自您对Sink::send
接口的错误使用。使用clone
,您实际上应该会看到警告:
warning: unused `futures::sink::Send` which must be used: futures do nothing unless polled
也就是说:您当前的代码实际上并没有发送任何东西!
为了应用背压,您需要链接那些 send
调用;每个都应该等到前一个完成(你也需要等待最后一个!);成功后,您将获得Sender
回复。最好的方法是使用 iter_ok
从迭代器生成 Stream
并将其传递给 send_all
。
现在你有了一个需要“开车”的未来SendAll
。如果您忽略结果并在错误时出现恐慌 (.then(|r| r.unwrap(); Ok::<(), ()>(()) )
),您可以将其作为单独的任务生成,但也许您希望将其集成到您的主应用程序中(即在 Box
中返回它)。
// this returns a `Box<Future<Item = (), Error = ()>>`. you may
// want to use a different error type
Box::new(tx.send_all(iter_ok(data)).map(|_| ()).map_err(|_| ()))
RusotoFuture::sync
和 Future::wait
不要使用Future::wait
:它已经在一个分支中被弃用,而且它通常不会做你真正想要的。我怀疑RusotoFuture
是否意识到这些问题,所以我建议避免使用RusotoFuture::sync
。
克隆Sender
增加通道容量
正如您正确指出的那样,克隆 Sender
会将容量增加一倍。
这样做似乎是为了提高性能:Sender
以未阻塞(“未停放”)状态开始;如果Sender
未被阻止,则它可以发送项目而不会被阻止。但是,如果Sender
发送项目时队列中的项目数量达到配置的限制,Sender
将被阻止(“停放”)。 (从队列中删除项目将在特定时间解除对Sender
的阻止。)
这意味着在内部队列达到限制后,每个 Sender
仍然可以发送一个项目,这会导致容量增加的记录效果,但前提是实际上所有 Sender
s 都在发送项目 - 未使用 @987654357 @s 不会增加观察到的容量。
性能提升来自于这样一个事实,即只要您没有达到限制,它就不需要停放和通知任务(这很繁重)。
mpsc
模块顶部的私人文档描述了更多细节。
【讨论】:
以上是关于控制衍生期货的数量以产生背压的主要内容,如果未能解决你的问题,请参考以下文章