是否可以使用 Akka Stream 从数据库表中创建“无限”流
Posted
技术标签:
【中文标题】是否可以使用 Akka Stream 从数据库表中创建“无限”流【英文标题】:Is it possible to create an "infinite" stream from a database table using Akka Stream 【发布时间】:2016-02-19 16:04:02 【问题描述】:我正在使用 Akka Streams 2.4.2,我想知道是否可以设置一个使用数据库表作为源的流,并且每当有一条记录添加到表中时,该记录就会被具体化并推送到下游?
更新:2/23/16
我已经实施了@PH88 的解决方案。这是我的表定义:
case class Record(id: Int, value: String)
class Records(tag: Tag) extends Table[Record](tag, "my_stream")
def id = column[Int]("id")
def value = column[String]("value")
def * = (id, value) <> (Record.tupled, Record.unapply)
下面是实现:
implicit val system = ActorSystem("Publisher")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
try
val newRecStream = Source.unfold((0, List[Record]())) n =>
try
val q = for (r <- TableQuery[Records].filter(row => row.id > n._1)) yield (r)
val r = Source.fromPublisher(db.stream(q.result)).collect
case rec => println(s"$rec.id, $rec.value"); rec
.runFold((n._1, List[Record]()))
case ((id, xs), current) => (current.id, current :: xs)
val answer: (Int, List[Record]) = Await.result(r, 5.seconds)
Option(answer, None)
catch case e:Exception => println(e); Option(n, e)
Await.ready(newRecStream.throttle(1, 1.second, 1, ThrottleMode.shaping).runForeach(_ => ()), Duration.Inf)
finally
system.shutdown
db.close
但我的问题是,当我尝试调用 flatMapConcat
时,我得到的类型是 Serializable
。
更新:2/24/16
更新以尝试来自@PH88 的db.run
建议:
implicit val system = ActorSystem("Publisher")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
val disableAutoCommit = SimpleDBIO(_.connection.setAutoCommit(false))
val queryLimit = 1
try
val newRecStream = Source.unfoldAsync(0) n =>
val q = TableQuery[Records].filter(row => row.id > n).take(queryLimit)
db.run(q.result).map recs =>
Some(recs.last.id, recs)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.flatMapConcat recs =>
Source.fromIterator(() => recs.iterator)
.runForeach rec =>
println(s"$rec.id, $rec.value")
Await.ready(newRecStream, Duration.Inf)
catch
case ex: Throwable => println(ex)
finally
system.shutdown
db.close
哪个有效(我将查询限制更改为 1,因为我的数据库表中目前只有几个项目) - 除非它打印程序存在的表中的最后一行。这是我的日志输出:
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback.groovy]
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Could NOT find resource [logback-test.xml]
17:09:27,982 |-INFO in ch.qos.logback.classic.LoggerContext[default] - Found resource [logback.xml] at [file:/Users/xxxxxxx/dev/src/scratch/scala/fpp-in-scala/target/scala-2.11/classes/logback.xml]
17:09:28,062 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - About to instantiate appender of type [ch.qos.logback.core.ConsoleAppender]
17:09:28,064 |-INFO in ch.qos.logback.core.joran.action.AppenderAction - Naming appender as [STDOUT]
17:09:28,079 |-INFO in ch.qos.logback.core.joran.action.NestedComplexPropertyIA - Assuming default type [ch.qos.logback.classic.encoder.PatternLayoutEncoder] for [encoder] property
17:09:28,102 |-INFO in ch.qos.logback.classic.joran.action.LoggerAction - Setting level of logger [application] to DEBUG
17:09:28,103 |-INFO in ch.qos.logback.classic.joran.action.RootLoggerAction - Setting level of ROOT logger to INFO
17:09:28,103 |-INFO in ch.qos.logback.core.joran.action.AppenderRefAction - Attaching appender named [STDOUT] to Logger[ROOT]
17:09:28,103 |-INFO in ch.qos.logback.classic.joran.action.ConfigurationAction - End of configuration.
17:09:28,104 |-INFO in ch.qos.logback.classic.joran.JoranConfigurator@4278284b - Registering current configuration as safe fallback point
17:09:28.117 [main] INFO com.zaxxer.hikari.HikariDataSource - pg-postgres - is starting.
1, WASSSAAAAAAAP!
2, WHAAAAT?!?
3, booyah!
4, what!
5, This rocks!
6, Again!
7, Again!2
8, I love this!
9, Akka Streams rock
10, Tuning jdbc
17:09:39.000 [main] INFO com.zaxxer.hikari.pool.HikariPool - pg-postgres - is closing down.
Process finished with exit code 0
找到丢失的部分 - 需要替换它:
Some(recs.last.id, recs)
用这个:
val lastId = if(recs.isEmpty) n else recs.last.id
Some(lastId, recs)
当结果集为空时,对 recs.last.id 的调用抛出 java.lang.UnsupportedOperationException: empty.last
。
【问题讨论】:
我能找到的最接近的是 slick.typesafe.com/doc/3.1.1/dbio.html#streaming 。但这似乎是从不会提供实时更新的静态 ResultSet 中流式传输结果... @Mark,您可以使用 db.run 来简化您的代码。请参阅下面的更新答案 【参考方案1】:一般而言,SQL 数据库是一种“被动”结构,不会像您描述的那样主动推动更改。您只能通过定期轮询来“模拟”“推送”,例如:
val newRecStream = Source
// Query for table changes
.unfold(initState) lastState =>
// query for new data since lastState and save the current state into newState...
Some((newState, newRecords))
// Throttle to limit the poll frequency
.throttle(...)
// breaks down into individual records...
.flatMapConcat newRecords =>
Source.unfold(newRecords) pendingRecords =>
if (records is empty)
None
else
// take one record from pendingRecords and save to newRec. Save the rest into remainingRecords.
Some(remainingRecords, newRec)
更新日期:2016 年 2 月 24 日
基于问题的 2/23/2016 更新的伪代码示例:
implicit val system = ActorSystem("Publisher")
implicit val materializer = ActorMaterializer()
val db = Database.forConfig("pg-postgres")
val queryLimit = 10
try
val completion = Source
.unfoldAsync(0) lastRowId =>
val q = TableQuery[Records].filter(row => row.id > lastRowId).take(queryLimit)
db.run(q.result).map recs =>
Some(recs.last.id, recs)
.throttle(1, 1.second, 1, ThrottleMode.shaping)
.flatMapConcat recs =>
Source.fromIterator(() => recs.iterator)
.runForeach rec =>
println(s"$rec.id, $rec.value")
// Block forever
Await.ready(completion, Duration.Inf)
catch
case ex: Throwable => println(ex)
finally
system.shutdown
db.close
它将重复执行unfoldAsync
中的查询对DB,一次最多检索10条(queryLimit
)记录并将记录发送到下游(->throttle
->flatMapConcat
->runForeach
)。最后的Await
实际上会永远阻塞。
更新日期:2016 年 2 月 25 日
可执行的“概念验证”代码:
import akka.actor.ActorSystem
import akka.stream.ThrottleMode, ActorMaterializer
import akka.stream.scaladsl.Source
import scala.concurrent.duration.Duration
import scala.concurrent.Await, Future
import scala.concurrent.duration._
object Infinite extends App
implicit val system = ActorSystem("Publisher")
implicit val ec = system.dispatcher
implicit val materializer = ActorMaterializer()
case class Record(id: Int, value: String)
try
val completion = Source
.unfoldAsync(0) lastRowId =>
Future
val recs = (lastRowId to lastRowId + 10).map(i => Record(i, s"rec#$i"))
Some(recs.last.id, recs)
.throttle(1, 1.second, 1, ThrottleMode.Shaping)
.flatMapConcat recs =>
Source.fromIterator(() => recs.iterator)
.runForeach rec =>
println(rec)
Await.ready(completion, Duration.Inf)
catch
case ex: Throwable => println(ex)
finally
system.shutdown
【讨论】:
试图遵循你的伪代码,但我还是很新。当我调用concatFlatMap
时我卡住了,因为传入的参数是Serializeable
类型。这是因为我打电话给Await
吗?我应该怎么做?
想通了...添加了Await.ready(newRecStream.throttle(...
。谢谢你的回答。
我喜欢简化的语法,但查询不再是无限的——它在流的末尾退出。我希望它永远继续收听,一旦有新唱片它就应该把它捡起来并推出。
马克,不,这是无限的。它将在unfoldAsync
中对数据库重复执行查询,一次最多检索10 个(queryLimit
)记录并将记录发送到下游(-> 油门-> flatMapConcat -> runForeach)。最后一个Await...
实际上会永远阻塞。
原来recs.last.id
正在抛出java.lang.UnsupportedOperationException: empty.last
,所以它正在终止。一旦我添加了对 isEmpty
的检查以修复它。我已经更新了 OP 来演示修复。【参考方案2】:
这是数据库无限流工作代码。这已经过测试,在流式应用程序运行时将数百万条记录插入到 postgresql 数据库中 -
package infinite.streams.db
import akka.NotUsed
import akka.actor.ActorSystem
import akka.stream.alpakka.slick.scaladsl.SlickSession
import akka.stream.scaladsl.Flow, Sink, Source
import akka.stream.ActorMaterializer, ThrottleMode
import org.slf4j.LoggerFactory
import slick.basic.DatabaseConfig
import slick.jdbc.JdbcProfile
import scala.concurrent.duration._
import scala.concurrent.Await, ExecutionContextExecutor
case class Record(id: Int, value: String)
val content = s"<ROW><ID>$id</ID><VALUE>$value</VALUE></ROW>"
object InfiniteStreamingApp extends App
println("Starting app...")
implicit val system: ActorSystem = ActorSystem("Publisher")
implicit val ec: ExecutionContextExecutor = system.dispatcher
implicit val materializer: ActorMaterializer = ActorMaterializer()
println("Initializing database configuration...")
val databaseConfig: DatabaseConfig[JdbcProfile] = DatabaseConfig.forConfig[JdbcProfile]("postgres3")
implicit val session: SlickSession = SlickSession.forConfig(databaseConfig)
import databaseConfig.profile.api._
class Records(tag: Tag) extends Table[Record](tag, "test2")
def id = column[Int]("c1")
def value = column[String]("c2")
def * = (id, value) <> (Record.tupled, Record.unapply)
val db = databaseConfig.db
println("Prime for streaming...")
val logic: Flow[(Int, String), (Int, String), NotUsed] = Flow[(Int, String)].map
case (id, value) => (id, value.toUpperCase)
val fetchSize = 5
try
val done = Source
.unfoldAsync(0)
lastId =>
println(s"Fetching next: $fetchSize records with id > $lastId")
val query = TableQuery[Records].filter(_.id > lastId).take(fetchSize)
db.run(query.result.withPinnedSession)
.map
recs => Some(recs.last.id, recs)
.throttle(5, 1.second, 1, ThrottleMode.shaping)
.flatMapConcat
recs => Source.fromIterator(() => recs.iterator)
.map(x => (x.id, x.content))
.via(logic)
.log("*******Post Transformation******")
// .runWith(Sink.foreach(r => println("SINK: " + r._2)))
// Use runForeach or runWith(Sink)
.runForeach(rec => println("REC: " + rec))
println("Waiting for result....")
Await.ready(done, Duration.Inf)
catch
case ex: Throwable => println(ex.getMessage)
finally
println("Streaming end successfully")
db.close()
system.terminate()
application.conf
akka
loggers = ["akka.event.slf4j.Slf4jLogger"]
loglevel = "INFO"
# Load using SlickSession.forConfig("slick-postgres")
postgres3
profile = "slick.jdbc.PostgresProfile$"
db
dataSourceClass = "slick.jdbc.DriverDataSource"
properties =
driver = "org.postgresql.Driver"
url = "jdbc:postgresql://localhost/testdb"
user = "postgres"
password = "postgres"
numThreads = 2
【讨论】:
以上是关于是否可以使用 Akka Stream 从数据库表中创建“无限”流的主要内容,如果未能解决你的问题,请参考以下文章
Akka(24): Stream:从外部系统控制数据流-control live stream from external system
Akka(23): Stream:自定义流构件功能-Custom defined stream processing stages
Akka(18): Stream:组合数据流,组件-Graph components
Akka(22): Stream:实时操控:动态管道连接-MergeHub,BroadcastHub and PartitionHub