缓存和持久化数据集
Posted
技术标签:
【中文标题】缓存和持久化数据集【英文标题】:cache and persist datasets 【发布时间】:2017-09-15 15:38:45 【问题描述】:我想多次使用 org.apache.flink.api.scala.DataSet 对象:
使用 count() 打印行数, 写入 neo4j 数据库, 转换为 Gelly 图形对象, 等对于这些动作中的每一个,Flink 都会完全重新计算 DataSet 的值,而不是缓存它。我在 Spark 中找不到任何 cache() 或 persist() 函数。
这确实对我的应用程序产生了巨大的影响,它有大约 1.000.000 个数据以及许多连接/coGroup 使用等:运行时间似乎增加了 3 倍,也就是几个小时!那么如何缓存或持久化数据集并显着减少运行时间呢?
我正在使用最新的 Flink 版本 1.3.2 和 Scala 2.11。
例子:
package dummy
import org.apache.flink.api.scala._
import org.apache.flink.graph.scala.Graph
import org.apache.flink.graph.Edge, Vertex
import org.apache.logging.log4j.scala.Logging
object Trials extends Logging
def main(args: Array[String])
val env = ExecutionEnvironment.getExecutionEnvironment
// some dataset which could be huge in reality
val dataSet = env.fromElements((1, 436), (2, 235), (3, 67), (4, 51), (5, 15), (6, 62), (7, 155))
// some complex joins, coGroup functions etc.
val joined = dataSet.cross(dataSet).filter(tuple => (tuple._1._2 + tuple._2._2) % 7 == 0)
// log the number of rows --> performs the join above
logger.info(f"results contains $joined.count() rows")
// convert to Gelly graph format
val graph = Graph.fromDataSet(
dataSet.map(nodeTuple => new Vertex[Long, Long](nodeTuple._1, nodeTuple._2)),
joined.map(edgeTuple => new Edge[Long, String](edgeTuple._1._1, edgeTuple._2._1, "someValue")),
env
)
// do something with the graph
logger.info("get number of vertices")
val numberOfVertices = graph.numberOfVertices()
logger.info("get number of edges")
val numberOfEdges = graph.numberOfEdges() // --> performs the join again!
logger.info(f"the graph has $numberOfVertices vertices and $numberOfEdges edges")
所需库:log4j-core、log4j-api-scala_2.11、flink-core、flink-scala_2.11、flink-gelly-scala_2.10
【问题讨论】:
你能发布一个你想要缓存的动作和数据集的例子吗? 添加了我的代码的简化版本 你找到解决这个问题的方法了吗?我有同样的问题 一段时间后我们才找到ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/…,但我们不再尝试了。据我所知,它仅适用于静态数据源,而不适用于连接/交叉/过滤等操作的结果。也许解决方案是在中间使用一些 NoSQL 数据库或队列,并在其他几个中读取Flink 进程......虽然由于读/写开销,我不太喜欢这个想法;-/ 当有人知道如何做时,我会很感激在这里找到一些指导! 【参考方案1】:我认为,如果您需要对同一个流执行多个操作,则值得使用侧输出 - https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html。
一旦您执行了一些复杂的连接、coGroup 函数等并获得了joined
数据集,您就可以将这些值收集到不同的侧输出 - 一个稍后将计算计数,另一个将完成其他工作.
【讨论】:
听起来不错..无法尝试,因为我换了其他项目,但似乎解决了问题。以上是关于缓存和持久化数据集的主要内容,如果未能解决你的问题,请参考以下文章