抽象一个与其 Future 对应的同步函数

Posted

技术标签:

【中文标题】抽象一个与其 Future 对应的同步函数【英文标题】:Abstract a synchronous function with its Future counterpart 【发布时间】:2017-09-01 06:09:37 【问题描述】:

我写了一个非常简单的机制,它只允许在给定数量的seconds 期间调用max 数量的函数。将其视为基本速率限制器。

它将执行限制作为参数并返回原始执行的返回值。

问题是执行可以是同步的(=> A 类型)或异步的(=> Future[A] 类型),这会导致两个极其相似的函数:

case class Limiter[A](max: Int, seconds: Int) 
  private val queue = Queue[Long]()

  def limit(value: => A): Option[A] = 
    val now = System.currentTimeMillis()
    if (queue.length == max) 
      val oldest = queue.head
      if (now - oldest < seconds * 1000) return None
      else queue.dequeue()
    
    queue.enqueue(now)
    Some(value)
  

  def limitFuture(future: => Future[A]): Future[Option[A]] = 
    val now = System.currentTimeMillis()
    if (queue.length == max) 
      val oldest = queue.head
      if (now - oldest < seconds * 1000) return Future(None)
      else queue.dequeue()
    
    future.map  x =>
      queue.enqueue(now)
      Some(x)
    
  

(我实际上并没有使用Option,而是我定义的一组类型,为简单起见仅使用Option

执行示例:

// Prevent more than 5 runs/minute. Useful for example to prevent email spamming
val limit = Limit[Boolean](5, 60)
val result = limitFuture  sendEmail(...)  // `sendEmail` returns a future

// Prevent more than 1 run/hour. Useful for example to cache html response
val limit = Limit[String](1, 3600)
val limit  getHTML(...)  // `getHTML` returns the HTML as a string directly

我如何重构这些方法以避免重复?以后的需求可能包括其他参数类型,而不仅仅是直接类型 + Futured 类型,所以我想保持我的选项打开如果有可能。

到目前为止,我能想到的唯一“解决方案”是替换 limit

def limit(value: => A): Option[A] = 
  Await.result(limitFuture(Future.successful(value)), 5.seconds)

嗯,它有效,但感觉倒退了。我宁愿让=&gt; A 成为其他方法扩展的基本版本,或者更好的是limitlimitFuture 都可以扩展的通用(私有)方法。 实际上,如果单个 limit 函数可以不管参数如何处理这个问题会更好,但我怀疑这是可能的。

【问题讨论】:

您打算在多线程环境中使用您的代码吗?如果是,那么你有竞争条件。如果不是,那么您可以将 enqueue 移动到异步变体中的 future.map 之前。然后,您将能够在单独的通用函数中简单地提取所有常见逻辑。 我不打算在多线程环境中运行它,但由于这对我来说是一个学习练习,我很乐意改进并发现如何在我需要的时候不犯同样的错误to :) 我无法从future.map 中提取enqueue,因为只有成功的运行应该被排入队列。在上面的示例中,如果 sendEmail 由于某种原因失败,则该尝试不应计入限制。有意义吗? 如果您只有一个线程,则根本不清楚为什么要使用异步 api。无论如何,目前您的方法在语义上是不同的。在第一种方法中,now 被排队之前计算惰性value,而在第二种方法中,它是在未来成功之后,如您所述。 “如果你只有一个线程,那你根本就不清楚为什么要使用异步 api。” > 公平点。 2 个原因:sendEmail 正在使用外部库发送电子邮件,它返回 Future,我决定只是传递它。虽然我可以阻止sendEmail 并完成它。其他原因:这最初是我学习如何做这些事情的练习。我可以四处走走,但这会在我真正需要的时候教我:D “无论如何,目前你的方法在语义上是不同的。” > 非常正确,谢谢。我需要用val option = Some(value); queue.enqueue(now); option 替换queue.enqueue(now); Some(value)。这样看起来更好吗? 【参考方案1】:

您可以将其浓缩为一种使用隐式参数处理差异的方法:

trait Limitable[A, B] 
  type Out
  def none: Out
  def some(b: B, f: () => Unit): Out


implicit def rawLimitable[A]: Limitable[A, A] = new Limitable[A, A] 
  type Out = Option[A]
  def none = None
  def some(a: A, f: () => Unit): Out = 
    f()
    Some(a)
  

implicit def futureLimitable[A]: Limitable[A, Future[A]] = new Limitable[A, Future[A]] 
  type Out = Future[Option[A]]
  def none = Future(None)
  def some(future: Future[A], f: () => Unit): Out = future.map  a =>
    f()
    Some(a)
  


case class Limiter[A](max: Int, seconds: Int) 
  private val queue = Queue[Long]()

  def limit[B](in: => B)(implicit l: Limitable[A, B]): l.Out = 
    val now = System.currentTimeMillis()
    if (queue.length == max) 
      val oldest = queue.head
      if (now - oldest < seconds * 1000) return l.none
      else queue.dequeue()
    
    l.some(in, () => queue.enqueue(now))
  

并像这样使用它:

val limit = Limit[String](1, 3600)
limit.limit("foo")
limit.limit(Future("bar"))

【讨论】:

好东西,我要玩这个,谢谢! 不幸的是,我无法编译它。虽然上面的代码本身可以编译,但使用返回的值却不能。为了处理问题中给出的示例,我尝试将值与Option[A]Future[Option[A]]type Out 的实现)进行模式匹配,如下所示:limiter.limit(fetcher) match case None =&gt; ...; case Some(...) 。问题是,编译失败,pattern type is incompatible with expected type; found: None.type; required: Limitable[A,A]#Out。我可以使用什么技巧来完成这项工作?【参考方案2】:

您可以使用来自catsscalazApplicative 类型类。除其他外,Applicative 允许您将值提升到某个上下文 F(使用 pure)并且也是一个函子,因此您可以在 F[A] 上使用 map

目前您希望它用于IdFuture 类型(您需要在Future 应用程序工作范围内的ExecutionContext)。它适用于 VectorValidated 之类的东西,但添加自定义集合类型可能会遇到问题。

import cats._, implicits._
import scala.concurrent._
import scala.collection.mutable.Queue

case class Limiter[A](max: Int, seconds: Int) 
  private val queue = Queue[Long]()

  def limitA[F[_]: Applicative](value: => F[A]): F[Option[A]] = 
    val now = System.currentTimeMillis()
    if (queue.length == max) 
      val oldest = queue.head
      if (now - oldest < seconds * 1000) return none[A].pure[F]
      else queue.dequeue()
    
    value.map  x =>
      queue.enqueue(now)
      x.some
    
  

  // or leave these e.g. for source compatibility
  def limit(value: => A): Option[A] = limitA[Id](value)
  def limitFuture(future: => Future[A])(implicit ec: ExecutionContext): Future[Option[A]] = limitA(future)

注意事项: 我使用none[A] 而不是None: Option[A]a.some 而不是Some(a): Option[A]。这些帮助器在 catsscalaz 中都可用,您需要它们,因为这里的 F[_] 未定义为协变。


您必须将Id 明确指定为类型,例如.limitA[Id](3)。但是,Future 并非如此。


map的电话很奇怪。解析为:

future.map 
  queue.enqueue(now) // in current thread
  x => Some(x)

与此相同

queue.enqueue(now) // in current thread
future.map 
  x => Some(x)

【讨论】:

哇,这里有很多好东西! 很多 您使用/描述的内容对我来说是未知的,所以我肯定会自学所有这些,非常感谢。与此同时,如果我可以在我的案例中应用@JoeK 的解决方案,我可能会选择它(我为 SO 问题简化了我的示例),但我真的希望我能接受这两个答案:) "你的 map 调用很奇怪。被解析为:[...] 和 [...] 是一样的" > 不过不太一样,因为如果未来失败,我不想将执行时间排入队列。每the API doc for Future#map:“通过将函数应用于这个未来的成功结果来创建一个新的未来。” @astorije 在您的代码中,即使未来失败,该值也会被排队。您用于map 的函数不是整个... 块,而是该块的返回值(函数x =&gt; Some(x))。它上面的代码将在执行map 之前执行。与我上面的版本比较。 好吧,我该死!不知何故,到目前为止,我一直认为future.map queue.enqueue(now); Some(_) 扩展为future.map x =&gt; queue.enqueue(now); Some(x) (我的意思),而不是future.map queue.enqueue(now); x =&gt; Some(x) !很抱歉跑题了,但你能解释一下是怎么回事吗?我在网上花了很长时间试图找到有关该扩展细节的好读物,但一无所获,您能否指点一下?无论如何,感谢您的通知,我正在更新问题!

以上是关于抽象一个与其 Future 对应的同步函数的主要内容,如果未能解决你的问题,请参考以下文章

boost-同步-futures

如何在颤动中交织异步函数和同步函数

异步编程之同步异步生成器函数

Future模式详细讲解及实例分析

并发编程Future模式及JDK中的实现

颤动异步到同步编程