用 Source.fromGraph 替换已弃用的 Source.actorPublisher - 如何限制?

Posted

技术标签:

【中文标题】用 Source.fromGraph 替换已弃用的 Source.actorPublisher - 如何限制?【英文标题】:Replacing deprecated Source.actorPublisher by Source.fromGraph - how to limit? 【发布时间】:2017-07-01 21:54:55 【问题描述】:

现在Source.actorPublisher 已被弃用,我想找到一个合适的替代品。

警告:我仍然是 Akka 新手,正在努力寻找自己的方式!

基本上我有一个 websocket,服务器每秒推送一条新消息。

相关代码:

// OLD, deprecated way
//val source: Source[TextMessage.Strict, ActorRef] = Source.actorPublisher[String](Props[KeepAliveActor]).map(i => TextMessage(i))

// NEW way
val sourceGraph: Graph[SourceShape[TextMessage.Strict], NotUsed] = new KeepAliveSource
val source: Source[TextMessage.Strict, NotUsed] = Source.fromGraph(sourceGraph)

val requestHandler: HttpRequest => HttpResponse =

  case req @ HttpRequest(GET, Uri.Path("/ws"), _, _, _) =>
    req.header[UpgradeToWebSocket] match
    
      case Some(upgrade) => upgrade.handleMessagesWithSinkSource(Sink.ignore, source)
      case None => HttpResponse(400, entity = "Not a valid websocket request")
    
  case r: HttpRequest =>
    r.discardEntityBytes() // important to drain incoming HTTP Entity stream
    HttpResponse(404, entity = "Unknown resource!")

老演员:(基本摘自:Actorpublisher as source in handleMessagesWithSinkSource)

case class KeepAlive()

class KeepAliveActor extends ActorPublisher[String]

  import scala.concurrent.duration._
  implicit val ec = context.dispatcher

  val tick = context.system.scheduler.schedule(1 second, 1 second, self, KeepAlive())

  var cnt = 0
  var buffer = Vector.empty[String]

  override def receive: Receive =
  
    case KeepAlive() =>
    
      cnt = cnt + 1
      if (buffer.isEmpty && totalDemand > 0)
      
        onNext(s"$cntth Message from server!")
      
      else 
        buffer :+= cnt.toString
        if (totalDemand > 0) 
          val (use,keep) = buffer.splitAt(totalDemand.toInt)
          buffer = keep
          use foreach onNext
        
      
    
  

  override def postStop() = tick.cancel()

老方法很有效。

现在是基于 GraphStage 的新代码

class KeepAliveSource extends GraphStage[SourceShape[TextMessage.Strict]]

  import scala.concurrent.duration._

  override def createLogic(inheritedAttributes: Attributes): GraphStageLogic =
  
    new TimerGraphStageLogic(shape)
    
      // All state MUST be inside the GraphStageLogic,
      // never inside the enclosing GraphStage.
      // This state is safe to access and modify from all the
      // callbacks that are provided by GraphStageLogic and the
      // registered handlers.

      private var counter = 1
      setHandler(out, new OutHandler
      
        override def onPull(): Unit =
        
          push(out, TextMessage(counter.toString))
          counter += 1
          schedulePeriodically(None, 1 second)
        
      )
    
  

  val out: Outlet[TextMessage.Strict] = Outlet("KeepAliveSource")
  override def shape: SourceShape[TextMessage.Strict] = SourceShape(out)

无论出于何种原因,这仍然让我感到困惑,尽管我曾假设 schedulePeriodically(None, 1 second) 会在每条消息之间添加 1 秒的延迟。不过我显然错了。

增加这个值并不会改变我糟糕的浏览器无法处理请求和崩溃的事实(我可以在simple websocket client的日志中看到)

【问题讨论】:

【参考方案1】:

schedulePeriodically 调用不会影响舞台的行为。每当下游阶段请求消息时,都会调用onPull 处理程序,并且会立即发送消息push。这就是为什么您看不到任何节流的原因。

虽然GraphStage DSL(你选择的那个)非常灵活,但也更难做对。对于像这样的简单任务,最好利用 Akka 提供的更高级别的阶段。喜欢Source.tick (docs)。

  val tickingSource: Source[String, Cancellable] = 
    Source.tick(initialDelay = 1.second, interval = 5.seconds, tick = "hello")

在您的示例中,您需要发布一个递增的计数器,以便您可以将更多逻辑附加到滴答源,例如

  val tickingSource: Source[Strict, Cancellable] =
    Source.tick(initialDelay = 1.second, interval = 5.seconds, tick = NotUsed)
      .zipWithIndex
      .map case (_, counter) ⇒ TextMessage(counter.toString) 

如果您对底层GraphStage 的工作原理感兴趣,可以随时查看TickSource 代码本身(请参阅github)。 主要区别在于TickSourceonTimer 回调中调用push(来自TimerGraphStageLogic,您可以覆盖)。

【讨论】:

以及如何使用GraphStageTimerGraphStageLogic 在答案中添加了信息 对我不起作用,因为 TimerGraphStageLogic 它说 not yet initialized: only setHandler is allowed in GraphStageLogic constructor

以上是关于用 Source.fromGraph 替换已弃用的 Source.actorPublisher - 如何限制?的主要内容,如果未能解决你的问题,请参考以下文章

用 QuerydslJpaPredicateExecutor 替换已弃用的 QuerydslJpaRepository 失败

尝试替换已弃用的 loadnibnamed:owner

如何阻止 TinyMCE 用 span 替换已弃用的标签

如何替换已弃用的 imp.load_dynamic 的用法?

替换已弃用的函数 mysql_connect [重复]

Snapkit 常量替换已弃用的 .priorityMedium() .priorityHigh() .priorityLow()?