在运行时动态创建 Akka 流
Posted
技术标签:
【中文标题】在运行时动态创建 Akka 流【英文标题】:Dynamically creating Akka Stream Flows at Runtime 【发布时间】:2017-09-24 04:32:26 【问题描述】:我目前正在尝试在运行时动态创建 Akka 流图定义。这个想法是用户将能够以交互方式定义流并将它们附加到现有/正在运行的BroadcastHubs
。这意味着我不知道在编译时将使用哪些流甚至多少流。
不幸的是,我正在为泛型/类型擦除而苦苦挣扎。坦率地说,我什至不确定我试图在 JVM 上做些什么。
我有一个函数将返回一个 Akka Streams Flow
代表两个连接的 Flows
。它使用 Scala 的TypeTags
来绕过类型擦除。如果第一个流的输出类型与第二个流的输入类型相同,则可以连接成功。这工作得很好。
import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.Flow, GraphDSL
import scala.reflect.runtime.universe._
import scala.util.Failure, Success, Try
def connect[A: TypeTag, B: TypeTag, C: TypeTag, D: TypeTag](a: Flow[A, B, NotUsed],
b: Flow[C, D, NotUsed]): Try[Flow[A, D, NotUsed]] =
Try
if (typeOf[B] =:= typeOf[C])
val c = b.asInstanceOf[Flow[B, D, NotUsed]]
Flow.fromGraph
GraphDSL.create(a, c)((m1, m2) => NotUsed.getInstance()) implicit b =>
(s1, s2) =>
s1 ~> s2
FlowShape(s1.in, s2.out)
else
throw new RuntimeException(s"Connection failed. Incompatible types: $typeOf[B] and $typeOf[C]")
所以如果我有Flow[A,B]
和Flow[C,D]
,则结果将是Flow[A,D]
,假设B 和C 是同一类型。
我还具有尝试将List
的Flows
合并/减少为单个Flow
的功能。让我们假设此列表源自文件或 Web 请求的流定义列表。
def merge(fcs: List[Flow[_, _, NotUsed]]): Try[Option[Flow[_, _, NotUsed]]] =
fcs match
case Nil => Success(None)
case h :: Nil => Success(Some(h))
case h :: t =>
val n = t.head
connect(h, n) match
case Success(fc) => merge(fc :: t)
case Failure(e) => Failure(e)
不幸的是,由于Flows
存储在List
中,由于标准Lists
上的类型擦除,我丢失了所有类型信息,因此无法在运行时连接Flows
。这是一个例子:
def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)
def flowI2S() = Flow.fromFunction[Int, String](_.toString)
val a = flowIdentity[Int]()
val b = flowIdentity[Int]()
val c = flowI2S()
val d = flowIdentity[String]()
val fcs: List[Flow[_, _, NotUsed]] = List(a, b, c, d)
val y = merge(fcs)
这会导致异常:
Failure(java.lang.RuntimeException: Connection failed. Incompatible types _$4 and _$3)
我一直在研究 Miles Sabin 的Shapeless,并认为我可以使用HLists
来保留类型信息。不幸的是,这似乎只有在我在编译时知道列表的各个类型和长度时才有效。如果我将特定的 HList
向上转换为 HList
,看起来我又丢失了类型信息。
val fcs: HList = a :: b :: c :: d :: HNil
所以我的问题是……这可能吗?有没有办法用无形泛型魔法来做到这一点(最好不需要使用特定的非存在类型提取器)?我希望找到尽可能通用的解决方案,我们将不胜感激。
谢谢!
【问题讨论】:
首先,在第一个代码示例中,所有类型在编译时都是已知的,因此可以将其简化为 3 种类型而不是 4 种:Flow[A, B]
和 Flow[B, C]
只是好奇:您是否考虑过编译用户创建的图表?即从一些 DSL 生成 scala 代码,然后编译,然后加载到运行时?
这实际上是一个好主意...我没想到。这将在有人创建图表时提供所需的类型安全性。现在我只需要处理类加载器魔法......谢谢!
【参考方案1】:
正如您已经注意到的,它不起作用的原因是该列表删除了您拥有的类型。因此这是不可能的。 如果您知道可以用作中间类型的所有类型,则可以通过添加解析函数来解决该问题。添加这样的功能还将简化您的连接方法。我将添加一个代码 sn-p。我希望它会很清楚。
def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)
def flowI2S() = Flow.fromFunction[Int, String](_.toString)
def main(args: Array[String]): Unit =
val idInt1 = flowIdentity[Int]()
val idInt2 = flowIdentity[Int]()
val int2String = flowI2S()
val idString = flowIdentity[String]()
val fcs = List(idInt1, idInt2, int2String, idString)
val source = Source(1 to 10)
val mergedGraph = merge(fcs).get.asInstanceOf[Flow[Int, String, NotUsed]]
source.via(mergedGraph).to(Sink.foreach(println)).run()
def merge(fcs: List[Flow[_, _, NotUsed]]): Option[Flow[_, _, NotUsed]] =
fcs match
case Nil => None
case h :: Nil => Some(h)
case h :: t =>
val n = t.head
val fc = resolveConnect(h, n)
merge(fc :: t.tail)
def resolveConnect(a: Flow[_, _, NotUsed], b: Flow[_, _, NotUsed]): Flow[_, _, NotUsed] =
if (a.isInstanceOf[Flow[_, Int, NotUsed]] && b.isInstanceOf[Flow[Int, _, NotUsed]])
connectInt(a.asInstanceOf[Flow[_, Int, NotUsed]], b.asInstanceOf[Flow[Int, _, NotUsed]])
else if (a.isInstanceOf[Flow[_, String, NotUsed]] && b.isInstanceOf[Flow[String, _, NotUsed]])
connectString(a.asInstanceOf[Flow[_, String, NotUsed]], b.asInstanceOf[Flow[String, _, NotUsed]])
else
throw new UnsupportedOperationException
def connectInt(a: Flow[_, Int, NotUsed], b: Flow[Int, _, NotUsed]): Flow[_, _, NotUsed] =
a.via(b)
def connectString(a: Flow[_, String, NotUsed], b: Flow[String, _, NotUsed]): Flow[_, _, NotUsed] =
a.via(b)
附言
那里隐藏着另一个错误,即无限循环。调用合并递归时,应该删除第一个元素,因为它已经合并到主流程中。
【讨论】:
你的connect(a,b)
已经存在于Akka流中,它被称为a.via(b)
。至于您提供的resolveConnect
函数,您提供的转换实际上都是毫无意义的,因为它们是在转换已擦除类型。换句话说:即使在运行时,这些演员表也对你没有帮助。为了使演员表有用,您必须映射 Flow
并调用 asInstanceOf
。
关于 via 方法。你说的对。谢谢。我会更新代码。关于“无意义的强制转换”,请在陈述错误之前尝试运行代码。
我想我解释了为什么强制转换没有意义:它们是在擦除类型上。下面是一个示例:val flowInt: Flow[_, _, NotUsed] = Flow[Int]; val flowString: Flow[_, _, NotUsed] = Flow[String]; resolveConnect(flowInt, flowString)
不会导致强制转换异常,尽管事实上 Flow[Int, Int, NotUsed]
绝对不能连接到 Flow[String, String, NotUsed]
。
您的解释如何与更新后的代码运行并打印数字 1 到 10 相一致?
演员没有检查/做任何事情。您甚至可以转换为虚拟类型(并跳过isInstanceOf
),您的代码将起作用:def resolveConnect(a: Flow[_, _, NotUsed], b: Flow[_, _, NotUsed]): Flow[_, _, NotUsed] = a.asInstanceOf[Flow[_, Nothing, NotUsed]].via(b.asInstanceOf[Flow[Nothing, _, NotUsed]])
。 instanceOf
检查带来了一种错误的安全感:它们没有进行任何运行时检查(因此我称它们为“无意义的”。以上是关于在运行时动态创建 Akka 流的主要内容,如果未能解决你的问题,请参考以下文章
我是不是需要重复使用相同的 Akka ActorSystem 或者我可以在每次需要时创建一个?
Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub