akka 流 SubFlow 的隐式类(通用路径相关类型)

Posted

技术标签:

【中文标题】akka 流 SubFlow 的隐式类(通用路径相关类型)【英文标题】:Implicit class for akka stream SubFlow (generic path dependent types) 【发布时间】:2021-12-10 23:09:13 【问题描述】:

我很难获得 implicit class 来编译 akka.stream.scaladsl.SubFlow

我的测试代码:

val subFlow = Source(List("1", "2", "3"))
  .groupBy(1, f)

val richSubFlow = new SideEffectfulSubFlowOps(subFlow)

val got = richSubFlow
  .withSideEffect((elem: String) => recordedItems.add(elem))
  .mergeSubstreams
  .to(Sink.seq)

/* In the end I would like to write it like this:
val got = Source(List("1", "2", "3"))
  .groupBy(1, f)
  .withSideEffect((elem: String) => recordedItems.add(elem))
  .mergeSubstreams
  .to(Sink.seq)
*/ 

到目前为止我的隐式类。

  implicit class SideEffectfulSubFlowOps[+Out, +Mat, FOps <: FlowOps[Out, Mat], C](val enrichedSubFlow: SubFlow[Out, Mat, FOps#Repr, C]) extends AnyVal 

    /** Perform a side effect without mutating the stream's element.
     *  Unlike [[SubFlow.alsoTo]] and [[SubFlow.wireTap]], this operation has the same semantics as [[SubFlow.map]] regarding backpressure, and concurrency */
    def withSideEffect(f: Out => Unit): enrichedSubFlow.Repr[Out] = 
      enrichedSubFlow.map  o =>
        f(o)
        o
      
    
  

不幸的是,我无法确定为隐式类定义的正确泛型类型。

编译器错误:

[error] SubFlowExtensionsSpec.scala:21:43: type mismatch;
[error]  found   : akka.stream.scaladsl.SubFlow[String,akka.NotUsed,[+O]akka.stream.scaladsl.Source[O,akka.NotUsed],akka.stream.scaladsl.RunnableGraph[akka.NotUsed]]
[error]  required: akka.stream.scaladsl.SubFlow[?,?,?#Repr,?]
[error]       val x = new SideEffectfulSubFlowOps(subFlow)

看子流的定义:trait SubFlow[+Out, +Mat, +F[+_], C] extends FlowOps[Out, Mat] 我不明白我需要如何在我的隐式类上定义泛型类型,然后将其用于SubFlowFC 类型。

Scala 版本:2.12.12

【问题讨论】:

如果这不仅仅是一个示例,并且您想在子流中通过传递产生副作用,为什么不直接使用wireTapsubFlow.wireTap(recordedItems.add(_)).mergeSubstreams 您是否尝试过从ìmplicit class 中删除差异? @LeviRamsey 这是一个有意识的决定,因为我们想要与 map 相同的流语义(关于流控制)。 【参考方案1】:

尽量使用更高级的类型参数,如SubFlow的定义

implicit class SideEffectfulSubFlowOps[+Out, +Mat, +FOps[+_], C](val enrichedSubFlow: SubFlow[Out, Mat, FOps, C]) extends AnyVal

【讨论】:

我已经尝试过了,它也不起作用。编译器错误:argument expression's type is not compatible with formal parameter type @leozilla 无法复制。 scastie.scala-lang.org/RlTubBvcTQadwh9S7dW7Kw请提供未编译时出现此错误的整个代码。 如果您更改为 scala 2.12.12,它将不再编译。 scastie.scala-lang.org/IIeRtsG1SImlxacl2HzZIA @leozilla 问题似乎是没有推断出类型参数。第一种情况(显式调用)可以保存指定类型参数scastie.scala-lang.org/DmytroMitin/76yS2AG6SMGDrEZZDC9VdA 不能用扩展方法指定它们的问题。恐怕2.12这里能做的不多。

以上是关于akka 流 SubFlow 的隐式类(通用路径相关类型)的主要内容,如果未能解决你的问题,请参考以下文章

akka-http 错误:找不到参数 um 的隐式值:akka.http.scaladsl.unmarshalling.FromRequestUnmarshaller

akka-http:找不到参数解组的隐式值

Scala中的隐式转换|理解

Scala基础高阶函数隐式转换AKKA编程

scala中隐式转换之隐式类

隐式类类型转换以及如何避免隐式转换