抽象一个与其 Future 对应的同步函数
Posted
技术标签:
【中文标题】抽象一个与其 Future 对应的同步函数【英文标题】:Abstract a synchronous function with its Future counterpart 【发布时间】:2017-09-01 06:09:37 【问题描述】:我写了一个非常简单的机制,它只允许在给定数量的seconds
期间调用max
数量的函数。将其视为基本速率限制器。
它将执行限制作为参数并返回原始执行的返回值。
问题是执行可以是同步的(=> A
类型)或异步的(=> Future[A]
类型),这会导致两个极其相似的函数:
case class Limiter[A](max: Int, seconds: Int)
private val queue = Queue[Long]()
def limit(value: => A): Option[A] =
val now = System.currentTimeMillis()
if (queue.length == max)
val oldest = queue.head
if (now - oldest < seconds * 1000) return None
else queue.dequeue()
queue.enqueue(now)
Some(value)
def limitFuture(future: => Future[A]): Future[Option[A]] =
val now = System.currentTimeMillis()
if (queue.length == max)
val oldest = queue.head
if (now - oldest < seconds * 1000) return Future(None)
else queue.dequeue()
future.map x =>
queue.enqueue(now)
Some(x)
(我实际上并没有使用Option
,而是我定义的一组类型,为简单起见仅使用Option
)
执行示例:
// Prevent more than 5 runs/minute. Useful for example to prevent email spamming
val limit = Limit[Boolean](5, 60)
val result = limitFuture sendEmail(...) // `sendEmail` returns a future
// Prevent more than 1 run/hour. Useful for example to cache html response
val limit = Limit[String](1, 3600)
val limit getHTML(...) // `getHTML` returns the HTML as a string directly
我如何重构这些方法以避免重复?以后的需求可能包括其他参数类型,而不仅仅是直接类型 + Future
d 类型,所以我想保持我的选项打开如果有可能。
到目前为止,我能想到的唯一“解决方案”是替换 limit
:
def limit(value: => A): Option[A] =
Await.result(limitFuture(Future.successful(value)), 5.seconds)
嗯,它有效,但感觉倒退了。我宁愿让=> A
成为其他方法扩展的基本版本,或者更好的是limit
和limitFuture
都可以扩展的通用(私有)方法。
实际上,如果单个 limit
函数可以不管参数如何处理这个问题会更好,但我怀疑这是可能的。
【问题讨论】:
您打算在多线程环境中使用您的代码吗?如果是,那么你有竞争条件。如果不是,那么您可以将enqueue
移动到异步变体中的 future.map 之前。然后,您将能够在单独的通用函数中简单地提取所有常见逻辑。
我不打算在多线程环境中运行它,但由于这对我来说是一个学习练习,我很乐意改进并发现如何在我需要的时候不犯同样的错误to :) 我无法从future.map
中提取enqueue
,因为只有成功的运行应该被排入队列。在上面的示例中,如果 sendEmail
由于某种原因失败,则该尝试不应计入限制。有意义吗?
如果您只有一个线程,则根本不清楚为什么要使用异步 api。无论如何,目前您的方法在语义上是不同的。在第一种方法中,now
被排队之前计算惰性value
,而在第二种方法中,它是在未来成功之后,如您所述。
“如果你只有一个线程,那你根本就不清楚为什么要使用异步 api。” > 公平点。 2 个原因:sendEmail
正在使用外部库发送电子邮件,它返回 Future
,我决定只是传递它。虽然我可以阻止sendEmail
并完成它。其他原因:这最初是我学习如何做这些事情的练习。我可以四处走走,但这会在我真正需要的时候教我:D
“无论如何,目前你的方法在语义上是不同的。” > 非常正确,谢谢。我需要用val option = Some(value); queue.enqueue(now); option
替换queue.enqueue(now); Some(value)
。这样看起来更好吗?
【参考方案1】:
您可以将其浓缩为一种使用隐式参数处理差异的方法:
trait Limitable[A, B]
type Out
def none: Out
def some(b: B, f: () => Unit): Out
implicit def rawLimitable[A]: Limitable[A, A] = new Limitable[A, A]
type Out = Option[A]
def none = None
def some(a: A, f: () => Unit): Out =
f()
Some(a)
implicit def futureLimitable[A]: Limitable[A, Future[A]] = new Limitable[A, Future[A]]
type Out = Future[Option[A]]
def none = Future(None)
def some(future: Future[A], f: () => Unit): Out = future.map a =>
f()
Some(a)
case class Limiter[A](max: Int, seconds: Int)
private val queue = Queue[Long]()
def limit[B](in: => B)(implicit l: Limitable[A, B]): l.Out =
val now = System.currentTimeMillis()
if (queue.length == max)
val oldest = queue.head
if (now - oldest < seconds * 1000) return l.none
else queue.dequeue()
l.some(in, () => queue.enqueue(now))
并像这样使用它:
val limit = Limit[String](1, 3600)
limit.limit("foo")
limit.limit(Future("bar"))
【讨论】:
好东西,我要玩这个,谢谢! 不幸的是,我无法编译它。虽然上面的代码本身可以编译,但使用返回的值却不能。为了处理问题中给出的示例,我尝试将值与Option[A]
或Future[Option[A]]
(type Out
的实现)进行模式匹配,如下所示:limiter.limit(fetcher) match case None => ...; case Some(...)
。问题是,编译失败,pattern type is incompatible with expected type; found: None.type; required: Limitable[A,A]#Out
。我可以使用什么技巧来完成这项工作?【参考方案2】:
您可以使用来自cats
或scalaz
的Applicative
类型类。除其他外,Applicative 允许您将值提升到某个上下文 F
(使用 pure
)并且也是一个函子,因此您可以在 F[A]
上使用 map
。
目前您希望它用于Id
和Future
类型(您需要在Future 应用程序工作范围内的ExecutionContext)。它适用于 Vector
或 Validated
之类的东西,但添加自定义集合类型可能会遇到问题。
import cats._, implicits._
import scala.concurrent._
import scala.collection.mutable.Queue
case class Limiter[A](max: Int, seconds: Int)
private val queue = Queue[Long]()
def limitA[F[_]: Applicative](value: => F[A]): F[Option[A]] =
val now = System.currentTimeMillis()
if (queue.length == max)
val oldest = queue.head
if (now - oldest < seconds * 1000) return none[A].pure[F]
else queue.dequeue()
value.map x =>
queue.enqueue(now)
x.some
// or leave these e.g. for source compatibility
def limit(value: => A): Option[A] = limitA[Id](value)
def limitFuture(future: => Future[A])(implicit ec: ExecutionContext): Future[Option[A]] = limitA(future)
注意事项:
我使用none[A]
而不是None: Option[A]
和a.some
而不是Some(a): Option[A]
。这些帮助器在 cats
和 scalaz
中都可用,您需要它们,因为这里的 F[_]
未定义为协变。
您必须将Id
明确指定为类型,例如.limitA[Id](3)
。但是,Future
并非如此。
你map
的电话很奇怪。解析为:
future.map
queue.enqueue(now) // in current thread
x => Some(x)
与此相同
queue.enqueue(now) // in current thread
future.map
x => Some(x)
【讨论】:
哇,这里有很多好东西! 很多 您使用/描述的内容对我来说是未知的,所以我肯定会自学所有这些,非常感谢。与此同时,如果我可以在我的案例中应用@JoeK 的解决方案,我可能会选择它(我为 SO 问题简化了我的示例),但我真的希望我能接受这两个答案:) "你的 map 调用很奇怪。被解析为:[...] 和 [...] 是一样的" > 不过不太一样,因为如果未来失败,我不想将执行时间排入队列。每the API doc forFuture#map
:“通过将函数应用于这个未来的成功结果来创建一个新的未来。”
@astorije 在您的代码中,即使未来失败,该值也会被排队。您用于map
的函数不是整个...
块,而是该块的返回值(函数x => Some(x)
)。它上面的代码将在执行map
之前执行。与我上面的版本比较。
好吧,我该死!不知何故,到目前为止,我一直认为future.map queue.enqueue(now); Some(_)
扩展为future.map x => queue.enqueue(now); Some(x)
(我的意思),而不是future.map queue.enqueue(now); x => Some(x)
!很抱歉跑题了,但你能解释一下是怎么回事吗?我在网上花了很长时间试图找到有关该扩展细节的好读物,但一无所获,您能否指点一下?无论如何,感谢您的通知,我正在更新问题!以上是关于抽象一个与其 Future 对应的同步函数的主要内容,如果未能解决你的问题,请参考以下文章