Akka(17): Stream:数据流基础组件-Source,Flow,Sink简介

Posted 雪川大虫


    在大数据程序流行的今天,许多程序都面临着共同的难题:程序输入数据趋于无限大,抵达时间又不确定。一般的解决方法是采用回调函数(callback-function)来实现的,但这样的解决方案很容易造成“回调地狱(callback hell)”,即所谓的“goto-hell”:程序控制跳来跳去很难跟踪,特别是一些变量如果在回调函数中更改后产生不可预料的结果。数据流(stream)是一种解决问题的有效编程方式。Stream是一个抽象概念,能把程序数据输入过程和其它细节隐蔽起来,通过申明方式把数据处理过程描述出来,使整体程序逻辑更容易理解跟踪。当然,牺牲的是对一些运算细节的控制能力。我们在前面介绍过scalaz-stream,它与akka-stream的主要区别在于:





   * Helper to create [[Source]] from `Iterable`.
   * Example usage: `Source(Seq(1,2,3))`
   * Starts a new `Source` from the given `Iterable`. This is like starting from an
   * Iterator, but every Subscriber directly attached to the Publisher of this
   * stream will see an individual flow of elements (always starting from the
   * beginning) regardless of when they subscribed.
  def apply[T](iterable: immutable.Iterable[T]): Source[T, NotUsed] =

   * Create a `Source` with one element.
   * Every connected `Sink` of this stream will see an individual stream consisting of one element.
  def single[T](element: T): Source[T, NotUsed] =
    fromGraph(new GraphStages.SingleSource(element))

   * Helper to create [[Source]] from `Iterator`.
   * Example usage: `Source.fromIterator(() => Iterator.from(0))`
   * Start a new `Source` from the given function that produces anIterator.
   * The produced stream of elements will continue until the iterator runs empty
   * or fails during evaluation of the `next()` method.
   * Elements are pulled out of the iterator in accordance with the demand coming
   * from the downstream transformation steps.
  def fromIterator[T](f: () ⇒ Iterator[T]): Source[T, NotUsed] =
    apply(new immutable.Iterable[T] {
      override def iterator: Iterator[T] = f()
      override def toString: String = "() => Iterator"

   * Starts a new `Source` from the given `Future`. The stream will consist of
   * one element when the `Future` is completed with a successful value, which
   * may happen before or after materializing the `Flow`.
   * The stream terminates with a failure if the `Future` is completed with a failure.
  def fromFuture[T](future: Future[T]): Source[T, NotUsed] =
    fromGraph(new FutureSource(future))

   * Helper to create [[Source]] from `Publisher`.
   * Construct a transformation starting with given publisher. The transformation steps
   * are executed by a series of [[org.reactivestreams.Processor]] instances
   * that mediate the flow of elements downstream and the propagation of
   * back-pressure upstream.
  def fromPublisher[T](publisher: Publisher[T]): Source[T, NotUsed] =
    fromGraph(new PublisherSource(publisher, DefaultAttributes.publisherSource, shape("PublisherSource")))

   * A graph with the shape of a source logically is a source, this method makes
   * it so also in type.
  def fromGraph[T, M](g: Graph[SourceShape[T], M]): Source[T, M] = g match {
    case s: Source[T, M]         ⇒ s
    case s: javadsl.Source[T, M] ⇒ s.asScala
    case other ⇒ new Source(
      LinearTraversalBuilder.fromBuilder(other.traversalBuilder, other.shape, Keep.right),


   * A `Source` with no elements, i.e. an empty stream that is completed immediately for every connected `Sink`.
  def empty[T]: Source[T, NotUsed] = _empty
  private[this] val _empty: Source[Nothing, NotUsed] =

   * Create a `Source` that will continually emit the given element.
  def repeat[T](element: T): Source[T, NotUsed] = {
    val next = Some((element, element))
    unfold(element)(_ ⇒ next).withAttributes(DefaultAttributes.repeat)

   * Creates [[Source]] that will continually produce given elements in specified order.
   * Starts a new ‘cycled‘ `Source` from the given elements. The producer stream of elements
   * will continue infinitely by repeating the sequence of elements provided by function parameter.
  def cycle[T](f: () ⇒ Iterator[T]): Source[T, NotUsed] = {
    val iterator = Iterator.continually { val i = f(); if (i.isEmpty) throw new IllegalArgumentException("empty iterator") else i }.flatten
    fromIterator(() ⇒ iterator).withAttributes(DefaultAttributes.cycledSource)


   * A `Sink` that will consume the stream and discard the elements.
  def ignore: Sink[Any, Future[Done]] = fromGraph(GraphStages.IgnoreSink)

   * A `Sink` that will invoke the given procedure for each received element. The sink is materialized
   * into a [[scala.concurrent.Future]] will be completed with `Success` when reaching the
   * normal end of the stream, or completed with `Failure` if there is a failure signaled in
   * the stream..
  def foreach[T](f: T ⇒ Unit): Sink[T, Future[Done]] =





  override def via[T, Mat2](flow: Graph[FlowShape[Out, T], Mat2]): Repr[T] = viaMat(flow)(Keep.left)

  override def viaMat[T, Mat2, Mat3](flow: Graph[FlowShape[Out, T], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): Source[T, Mat3] = {
    val toAppend =
      if (flow.traversalBuilder eq Flow.identityTraversalBuilder)

    new Source[T, Mat3](
      traversalBuilder.append(toAppend, flow.shape, combine),

   * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],
   * concatenating the processing steps of both.
  def to[Mat2](sink: Graph[SinkShape[Out], Mat2]): RunnableGraph[Mat] = toMat(sink)(Keep.left)

   * Connect this [[akka.stream.scaladsl.Source]] to a [[akka.stream.scaladsl.Sink]],
   * concatenating the processing steps of both.
  def toMat[Mat2, Mat3](sink: Graph[SinkShape[Out], Mat2])(combine: (Mat, Mat2) ⇒ Mat3): RunnableGraph[Mat3] = {
    RunnableGraph(traversalBuilder.append(sink.traversalBuilder, sink.shape, combine))

可以发现via和to分别是viaMat和toMat的简写,分别固定了Keep.left。意思是选择左边数据流图的运算结果。我们上面提过akka-stream是在actor系统里处理数据流元素的。在这个过程中同时可以用actor内部状态来产生运算结果。via和to连接了左右两个graph,并且选择了左边graph的运算结果。我们可以用viaMat和toMat来选择右边graph运算结果。这是通过combine: (Mat,Mat2)=>Mat3这个函数实现的。akka-stream提供了一个Keep对象来表达这种选择:

 * Convenience functions for often-encountered purposes like keeping only the
 * left (first) or only the right (second) of two input values.
object Keep {
  private val _left = (l: Any, r: Any) ⇒ l
  private val _right = (l: Any, r: Any) ⇒ r
  private val _both = (l: Any, r: Any) ⇒ (l, r)
  private val _none = (l: Any, r: Any) ⇒ NotUsed

  def left[L, R]: (L, R) ⇒ L = _left.asInstanceOf[(L, R) ⇒ L]
  def right[L, R]: (L, R) ⇒ R = _right.asInstanceOf[(L, R) ⇒ R]
  def both[L, R]: (L, R) ⇒ (L, R) = _both.asInstanceOf[(L, R) ⇒ (L, R)]
  def none[L, R]: (L, R) ⇒ NotUsed = _none.asInstanceOf[(L, R) ⇒ NotUsed]


Source[+Out, +Mat]       //Out代表元素类型,Mat为运算结果类型
Flow[-In, +Out, +Mat]    //In,Out为数据流元素类型,Mat是运算结果类型
Sink[-In, +Mat]          //In是数据元素类型,Mat是运算结果类型


 * Flow with attached input and output, can be executed.
final case class RunnableGraph[+Mat](override val traversalBuilder: TraversalBuilder) extends Graph[ClosedShape, Mat] {
  override def shape = ClosedShape

   * Transform only the materialized value of this RunnableGraph, leaving all other properties as they were.
  def mapMaterializedValue[Mat2](f: Mat ⇒ Mat2): RunnableGraph[Mat2] =
    copy(traversalBuilder.transformMat(f.asInstanceOf[Any ⇒ Any]))

   * Run this flow and return the materialized instance from the flow.
  def run()(implicit materializer: Materializer): Mat = materializer.materialize(this)

上面shape = ClosedShape代表RunnableGraph的形状是闭合的(ClosedShape),意思是说:一个可运行的graph所有输人输出端口都必须是连接的。


import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka._
import scala.concurrent._

object SourceDemo extends App {
  implicit val sys=ActorSystem("demo")
  implicit val mat=ActorMaterializer()
  implicit val ec=sys.dispatcher

  val s1: Source[Int,NotUsed] = Source(1 to 10)
  val sink: Sink[Any,Future[Done]] = Sink.foreach(println)
  val rg1: RunnableGraph[NotUsed] = s1.to(sink)
  val rg2: RunnableGraph[Future[Done]]  = s1.toMat(sink)(Keep.right)
  val res1: NotUsed = rg1.run()


  val res2: Future[Done] = rg2.run()
  res2.andThen {
    case _ => sys.terminate()


  val seq = Seq[Int](1,2,3)
  def toIterator() = seq.iterator
  val flow1: Flow[Int,Int,NotUsed] = Flow[Int].map(_ + 2)
  val flow2: Flow[Int,Int,NotUsed] = Flow[Int].map(_ * 3)
  val s2 = Source.fromIterator(toIterator)
  val s3 = s1 ++ s2

  val s4: Source[Int,NotUsed] = s3.viaMat(flow1)(Keep.right)
  val s5: Source[Int,NotUsed] = s3.via(flow1).async.viaMat(flow2)(Keep.right)
  val s6: Source[Int,NotUsed] = s4.async.viaMat(flow2)(Keep.right)
  (s5.toMat(sink)(Keep.right).run()).andThen {case _ => sys.terminate()}



实际上我们可以用akka-stream Source提供的方法糖来直接运算数据流,如下:

  val fres = s6.runFold(0)(_ + _)
  fres.onSuccess{case a => println(a)}
  fres.andThen{case _ => sys.terminate()}


   * Shortcut for running this `Source` with a fold function.
   * The given function is invoked for every received element, giving it its previous
   * output (or the given `zero` value) and the element as input.
   * The returned [[scala.concurrent.Future]] will be completed with value of the final
   * function evaluation when the input stream ends, or completed with `Failure`
   * if there is a failure signaled in the stream.
  def runFold[U](zero: U)(f: (U, Out) ⇒ U)(implicit materializer: Materializer): Future[U] = runWith(Sink.fold(zero)(f))

   * Shortcut for running this `Source` with a foldAsync function.
   * The given function is invoked for every received element, giving it its previous
   * output (or the given `zero` value) and the element as input.
   * The returned [[scala.concurrent.Future]] will be completed with value of the final
   * function evaluation when the input stream ends, or completed with `Failure`
   * if there is a failure signaled in the stream.
  def runFoldAsync[U](zero: U)(f: (U, Out) ⇒ Future[U])(implicit materializer: Materializer): Future[U] = runWith(Sink.foldAsync(zero)(f))

   * Shortcut for running this `Source` with a reduce function.
   * The given function is invoked for every received element, giving it its previous
   * output (from the second element) and the element as input.
   * The returned [[scala.concurrent.Future]] will be completed with value of the final
   * function evaluation when the input stream ends, or completed with `Failure`
   * if there is a failure signaled in the stream.
   * If the stream is empty (i.e. completes before signalling any elements),
   * the reduce stage will fail its downstream with a [[NoSuchElementException]],
   * which is semantically in-line with that Scala‘s standard library collections
   * do in such situations.
  def runReduce[U >: Out](f: (U, U) ⇒ U)(implicit materializer: Materializer): Future[U] =

   * Shortcut for running this `Source` with a foreach procedure. The given procedure is invoked
   * for each received element.
   * The returned [[scala.concurrent.Future]] will be completed with `Success` when reaching the
   * normal end of the stream, or completed with `Failure` if there is a failure signaled in
   * the stream.
  // FIXME: Out => Unit should stay, right??
  def runForeach(f: Out ⇒ Unit)(implicit materializer: Materializer): Future[Done] = runWith(Sink.foreach(f))


   * Connect this `Source` to a `Sink` and run it. The returned value is the materialized value
   * of the `Sink`, e.g. the `Publisher` of a [[akka.stream.scaladsl.Sink#publisher]].
  def runWith[Mat2](sink: Graph[SinkShape[Out], Mat2])(implicit materializer: Materializer): Mat2 = toMat(sink)(Keep.right).run()



import akka.actor._
import akka.stream._
import akka.stream.scaladsl._
import akka._
import scala.concurrent._

object SourceDemo extends App {
  implicit val sys=ActorSystem("demo")
  implicit val mat=ActorMaterializer()
  implicit val ec=sys.dispatcher

  val s1: Source[Int,NotUsed] = Source(1 to 10)
  val sink: Sink[Any,Future[Done]] = Sink.foreach(println)
  val rg1: RunnableGraph[NotUsed] = s1.to(sink)
  val rg2: RunnableGraph[Future[Done]]  = s1.toMat(sink)(Keep.right)
  val res1: NotUsed = rg1.run()


  val res2: Future[Done] = rg2.run()
  res2.andThen {
    case _ =>   //sys.terminate()

  val seq = Seq[Int](1,2,3)
  def toIterator() = seq.iterator
  val flow1: Flow[Int,Int,NotUsed] = Flow[Int].map(_ + 2)
  val flow2: Flow[Int,Int,NotUsed] = Flow[Int].map(_ * 3)
  val s2 = Source.fromIterator(toIterator)
  val s3 = s1 ++ s2

  val s4: Source[Int,NotUsed] = s3.viaMat(flow1)(Keep.right)
  val s5: Source[Int,NotUsed] = s3.via(flow1).async.viaMat(flow2)(Keep.right)
  val s6: Source[Int,NotUsed] = s4.async.viaMat(flow2)(Keep.right)
  (s5.toMat(sink)(Keep.right).run()).andThen {case _ => } //sys.terminate()}

  val fres = s6.runFold(0)(_ + _)
  fres.onSuccess{case a => println(a)}
  fres.andThen{case _ => sys.terminate()}

















