在运行时动态创建 Akka 流

Posted

技术标签:

【中文标题】在运行时动态创建 Akka 流【英文标题】:Dynamically creating Akka Stream Flows at Runtime 【发布时间】:2017-09-24 04:32:26 【问题描述】:

我目前正在尝试在运行时动态创建 Akka 流图定义。这个想法是用户将能够以交互方式定义流并将它们附加到现有/正在运行的BroadcastHubs。这意味着我不知道在编译时将使用哪些流甚至多少流。

不幸的是,我正在为泛型/类型擦除而苦苦挣扎。坦率地说,我什至不确定我试图在 JVM 上做些什么。

我有一个函数将返回一个 Akka Streams Flow 代表两个连接的 Flows。它使用 Scala 的TypeTags 来绕过类型擦除。如果第一个流的输出类型与第二个流的输入类型相同,则可以连接成功。这工作得很好。

import akka.NotUsed
import akka.stream.FlowShape
import akka.stream.scaladsl.GraphDSL.Implicits._
import akka.stream.scaladsl.Flow, GraphDSL

import scala.reflect.runtime.universe._
import scala.util.Failure, Success, Try

def connect[A: TypeTag, B: TypeTag, C: TypeTag, D: TypeTag](a: Flow[A, B, NotUsed],
                                                            b: Flow[C, D, NotUsed]): Try[Flow[A, D, NotUsed]] = 
  Try 
    if (typeOf[B] =:= typeOf[C]) 
      val c = b.asInstanceOf[Flow[B, D, NotUsed]]

      Flow.fromGraph 
        GraphDSL.create(a, c)((m1, m2) => NotUsed.getInstance())  implicit b =>
          (s1, s2) =>
            s1 ~> s2
            FlowShape(s1.in, s2.out)
        
      
    
    else
      throw new RuntimeException(s"Connection failed. Incompatible types: $typeOf[B] and $typeOf[C]")
  

所以如果我有Flow[A,B]Flow[C,D],则结果将是Flow[A,D],假设B 和C 是同一类型。

我还具有尝试将ListFlows 合并/减少为单个Flow 的功能。让我们假设此列表源自文件或 Web 请求的流定义列表。

def merge(fcs: List[Flow[_, _, NotUsed]]): Try[Option[Flow[_, _, NotUsed]]] = 
  fcs match 
    case Nil => Success(None)
    case h :: Nil => Success(Some(h))
    case h :: t =>
      val n = t.head

      connect(h, n) match 
        case Success(fc) => merge(fc :: t)
        case Failure(e) => Failure(e)
      
  

不幸的是,由于Flows 存储在List 中,由于标准Lists 上的类型擦除,我丢失了所有类型信息,因此无法在运行时连接Flows。这是一个例子:

def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)

def flowI2S() = Flow.fromFunction[Int, String](_.toString)

val a = flowIdentity[Int]()
val b = flowIdentity[Int]()
val c = flowI2S()
val d = flowIdentity[String]()

val fcs: List[Flow[_, _, NotUsed]] = List(a, b, c, d)

val y = merge(fcs)

这会导致异常:

Failure(java.lang.RuntimeException: Connection failed. Incompatible types _$4 and _$3)

我一直在研究 Miles Sabin 的Shapeless,并认为我可以使用HLists 来保留类型信息。不幸的是,这似乎只有在我在编译时知道列表的各个类型和长度时才有效。如果我将特定的 HList 向上转换为 HList,看起来我又丢失了类型信息。

val fcs: HList = a :: b :: c :: d :: HNil

所以我的问题是……这可能吗?有没有办法用无形泛型魔法来做到这一点(最好不需要使用特定的非存在类型提取器)?我希望找到尽可能通用的解决方案,我们将不胜感激。

谢谢!

【问题讨论】:

首先,在第一个代码示例中,所有类型在编译时都是已知的,因此可以将其简化为 3 种类型而不是 4 种:Flow[A, B]Flow[B, C] 只是好奇:您是否考虑过编译用户创建的图表?即从一些 DSL 生成 scala 代码,然后编译,然后加载到运行时? 这实际上是一个好主意...我没想到。这将在有人创建图表时提供所需的类型安全性。现在我只需要处理类加载器魔法......谢谢! 【参考方案1】:

正如您已经注意到的,它不起作用的原因是该列表删除了您拥有的类型。因此这是不可能的。 如果您知道可以用作中间类型的所有类型,则可以通过添加解析函数来解决该问题。添加这样的功能还将简化您的连接方法。我将添加一个代码 sn-p。我希望它会很清楚。

def flowIdentity[A]() = Flow.fromFunction[A, A](x => x)
def flowI2S() = Flow.fromFunction[Int, String](_.toString)

def main(args: Array[String]): Unit = 
    val idInt1 = flowIdentity[Int]()
    val idInt2 = flowIdentity[Int]()
    val int2String = flowI2S()
    val idString = flowIdentity[String]()
    val fcs = List(idInt1, idInt2, int2String, idString)

    val source = Source(1 to 10)
    val mergedGraph = merge(fcs).get.asInstanceOf[Flow[Int, String, NotUsed]]
    source.via(mergedGraph).to(Sink.foreach(println)).run()


def merge(fcs: List[Flow[_, _, NotUsed]]): Option[Flow[_, _, NotUsed]] = 
    fcs match 
      case Nil => None
      case h :: Nil => Some(h)
      case h :: t =>
        val n = t.head

        val fc = resolveConnect(h, n)
        merge(fc :: t.tail)
    


def resolveConnect(a: Flow[_, _, NotUsed], b: Flow[_, _, NotUsed]): Flow[_, _, NotUsed] = 
    if (a.isInstanceOf[Flow[_, Int, NotUsed]] && b.isInstanceOf[Flow[Int, _, NotUsed]]) 
      connectInt(a.asInstanceOf[Flow[_, Int, NotUsed]], b.asInstanceOf[Flow[Int, _, NotUsed]])
     else if (a.isInstanceOf[Flow[_, String, NotUsed]] && b.isInstanceOf[Flow[String, _, NotUsed]]) 
      connectString(a.asInstanceOf[Flow[_, String, NotUsed]], b.asInstanceOf[Flow[String, _, NotUsed]])
     else 
      throw new UnsupportedOperationException
    


def connectInt(a: Flow[_, Int, NotUsed], b: Flow[Int, _, NotUsed]): Flow[_, _, NotUsed] = 
    a.via(b)


def connectString(a: Flow[_, String, NotUsed], b: Flow[String, _, NotUsed]): Flow[_, _, NotUsed] = 
   a.via(b)

附言

那里隐藏着另一个错误,即无限循环。调用合并递归时,应该删除第一个元素,因为它已经合并到主流程中。

【讨论】:

你的connect(a,b)已经存在于Akka流中,它被称为a.via(b)。至于您提供的resolveConnect 函数,您提供的转换实际上都是毫无意义的,因为它们是在转换已擦除类型。换句话说:即使在运行时,这些演员表也对你没有帮助。为了使演员表有用,您必须映射 Flow 并调用 asInstanceOf 关于 via 方法。你说的对。谢谢。我会更新代码。关于“无意义的强制转换”,请在陈述错误之前尝试运行代码。 我想我解释了为什么强制转换没有意义:它们是在擦除类型上。下面是一个示例:val flowInt: Flow[_, _, NotUsed] = Flow[Int]; val flowString: Flow[_, _, NotUsed] = Flow[String]; resolveConnect(flowInt, flowString) 不会导致强制转换异常,尽管事实上 Flow[Int, Int, NotUsed] 绝对不能连接到 Flow[String, String, NotUsed] 您的解释如何与更新后的代码运行并打印数字 1 到 10 相一致? 演员没有检查/做任何事情。您甚至可以转换为虚拟类型(并跳过isInstanceOf),您的代码将起作用:def resolveConnect(a: Flow[_, _, NotUsed], b: Flow[_, _, NotUsed]): Flow[_, _, NotUsed] = a.asInstanceOf[Flow[_, Nothing, NotUsed]].via(b.asInstanceOf[Flow[Nothing, _, NotUsed]])instanceOf 检查带来了一种错误的安全感:它们没有进行任何运行时检查(因此我称它们为“无意义的”。

以上是关于在运行时动态创建 Akka 流的主要内容,如果未能解决你的问题,请参考以下文章

卡夫卡火花流动态模式

004_FreeRTOS创建与删除任务

无法使用 sbt 运行简单的 akka 示例

akka系统中停止运行任务的问题

我是不是需要重复使用相同的 Akka ActorSystem 或者我可以在每次需要时创建一个?

Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub