控制衍生期货的数量以产生背压

Posted

技术标签:

【中文标题】控制衍生期货的数量以产生背压【英文标题】:Controlling the number of spawned futures to create backpressure 【发布时间】:2018-06-24 08:22:31 【问题描述】:

我正在使用futures-rs powered version of the Rusoto AWS Kinesis library。我需要生成 AWS Kinesis 请求的深层管道以实现高吞吐量,因为 Kinesis 对每个 HTTP 请求的记录限制为 500 条。结合发送请求的 50 毫秒延迟,我需要开始生成许多并发请求。我希望在大约 100 个正在进行的请求中创建某个位置。

Rusoto put_records 函数签名如下所示:

fn put_records(
    &self,
    input: &PutRecordsInput,
) -> RusotoFuture<PutRecordsOutput, PutRecordsError>

RusotoFuture 是这样定义的包装器:

/// Future that is returned from all rusoto service APIs.
pub struct RusotoFuture<T, E> 
    inner: Box<Future<Item = T, Error = E> + 'static>,

内部Future 被包裹,但RusutoFuture 仍然实现Future::poll(),所以我相信它与futures-rs 生态系统兼容。 RusotoFuture 提供同步调用:

impl<T, E> RusotoFuture<T, E> 
    /// Blocks the current thread until the future has resolved.
    ///
    /// This is meant to provide a simple way for non-async consumers
    /// to work with rusoto.
    pub fn sync(self) -> Result<T, E> 
        self.wait()
    

我可以发出请求并sync() 它,从 AWS 获取结果。我想创建许多请求,将它们放入某种队列/列表中,然后收集完成的请求。如果请求出错,我需要重新发出请求(这在 Kinesis 中有些正常,尤其是在达到分片吞吐量限制时)。如果请求成功完成,我应该发出一个包含新数据的请求。我可以为每个请求生成一个线程并同步它,但是当我运行异步 IO 线程时,这似乎效率低下。

我曾尝试在我的应用程序线程中使用 futures::sync::mpsc::channel(不是从 Tokio 反应器内部运行),但每当我克隆 tx 时,它都会生成自己的缓冲区,从而消除 send 上的任何类型的背压:

fn kinesis_pipeline(client: DefaultKinesisClient, stream_name: String, num_puts: usize, puts_size: usize) 
    use futures::sync::mpsc:: channel, spawn ;
    use futures:: Sink, Future, Stream ;
    use futures::stream::Sender;
    use rusoto_core::reactor::DEFAULT_REACTOR;

    let client = Arc::new(KinesisClient::simple(Region::UsWest2));
    let data = FauxData::new(); // a data generator for testing

    let (mut tx, mut rx) = channel(1);

    for rec in data 
        tx.clone().send(rec);
    

没有克隆,我有错误:

error[E0382]: use of moved value: `tx`
   --> src/main.rs:150:9
    |
150 |         tx.send(rec);
    |         ^^ value moved here in previous iteration of loop
    |
    = note: move occurs because `tx` has type `futures::sync::mpsc::Sender<rusoto_kinesis::PutRecordsRequestEntry>`, which does not implement the `Copy` trait

我还根据建议查看了futures::mpsc::sync::spawn,但它拥有rx(作为Stream)的所有权,并且不能解决我对txCopy 造成无限行为的问题。

我希望如果我能让channel/spawn 使用正常,我将拥有一个系统,它需要RusotoFutures,等待它们完成,然后为我提供一个简单的方法来完成结果来自我的应用程序线程。

【问题讨论】:

你看过Stream::buffered了吗?将它与(可能是unsync)频道结合起来,也许它可以满足您的需求。无论如何,您可能需要通过(Rc&lt;RefCell&lt;..&gt;&gt;Arc&lt;Mutex&lt;..&gt;&gt;)将句柄共享给mpsc::Sender @Stefan 为什么需要Rc / Arc?你不应该能够克隆Sender吗? @Shepmaster 对,它自己已经这样做了,clone 应该没问题。我猜“每当我 clone tx 它生成自己的缓冲区时”让我感到困惑(这不应该是真的,缓冲区应该被共享)。 另外channel(1) 是一个非常小的缓冲区,考虑到您正在构建循环依赖关系:如果一个请求可以触发一个新请求,但必须等待推送一个新请求直到完成,它将阻塞永远。我会改用unbounded() 关于请求触发新请求导致死锁的要点。现在我只是想了解请求的背压管道的核心思想。我会注意到这是一个未来的问题。 【参考方案1】:

据我所知,channel 的问题不是Sender 的单个克隆将容量增加一,而是您为要发送的每个项目克隆了Sender .

您在没有clone 的情况下看到的错误来自您对Sink::send 接口的错误使用。使用clone,您实际上应该会看到警告:

warning: unused `futures::sink::Send` which must be used: futures do nothing unless polled

也就是说:您当前的代码实际上并没有发送任何东西!

为了应用背压,您需要链接那些 send 调用;每个都应该等到前一个完成(你也需要等待最后一个!);成功后,您将获得Sender 回复。最好的方法是使用 iter_ok 从迭代器生成 Stream 并将其传递给 send_all

现在你有了一个需要“开车”的未来SendAll。如果您忽略结果并在错误时出现恐慌 (.then(|r| r.unwrap(); Ok::&lt;(), ()&gt;(()) )),您可以将其作为单独的任务生成,但也许您希望将其集成到您的主应用程序中(即在 Box 中返回它)。

// this returns a `Box<Future<Item = (), Error = ()>>`. you may
// want to use a different error type
Box::new(tx.send_all(iter_ok(data)).map(|_| ()).map_err(|_| ()))

RusotoFuture::syncFuture::wait

不要使用Future::wait:它已经在一个分支中被弃用,而且它通常不会做你真正想要的。我怀疑RusotoFuture 是否意识到这些问题,所以我建议避免使用RusotoFuture::sync

克隆Sender 增加通道容量

正如您正确指出的那样,克隆 Sender 会将容量增加一倍。

这样做似乎是为了提高性能:Sender 以未阻塞(“未停放”)状态开始;如果Sender 未被阻止,则它可以发送项目而不会被阻止。但是,如果Sender 发送项目时队列中的项目数量达到配置的限制,Sender 将被阻止(“停放”)。 (从队列中删除项目将在特定时间解除对Sender 的阻止。)

这意味着在内部队列达到限制后,每个 Sender 仍然可以发送一个项目,这会导致容量增加的记录效果,但前提是实际上所有 Senders 都在发送项目 - 未使用 @987654357 @s 不会增加观察到的容量。

性能提升来自于这样一个事实,即只要您没有达到限制,它就不需要停放和通知任务(这很繁重)。

mpsc 模块顶部的私人文档描述了更多细节。

【讨论】:

以上是关于控制衍生期货的数量以产生背压的主要内容,如果未能解决你的问题,请参考以下文章

WebFlux控制器返回流量和背压

-远期和期货价格的确定--课后作业--金融衍生工具

flink的背压问题产生原因和解决方法

gRPC背压流控压缩及JSON通信知识笔记

案例研究丨宁证期货通过JumpServer有效实现安全运维控制

期货知识点第二章