缓存和持久化数据集

Posted

技术标签:

【中文标题】缓存和持久化数据集【英文标题】:cache and persist datasets 【发布时间】:2017-09-15 15:38:45 【问题描述】:

我想多次使用 org.apache.flink.api.scala.DataSet 对象:

使用 count() 打印行数, 写入 neo4j 数据库, 转换为 Gelly 图形对象, 等

对于这些动作中的每一个,Flink 都会完全重新计算 DataSet 的值,而不是缓存它。我在 Spark 中找不到任何 cache() 或 persist() 函数。

这确实对我的应用程序产生了巨大的影响,它有大约 1.000.000 个数据以及许多连接/coGroup 使用等:运行时间似乎增加了 3 倍,也就是几个小时!那么如何缓存或持久化数据集并显着减少运行时间呢?

我正在使用最新的 Flink 版本 1.3.2 和 Scala 2.11。

例子:

package dummy

import org.apache.flink.api.scala._
import org.apache.flink.graph.scala.Graph
import org.apache.flink.graph.Edge, Vertex
import org.apache.logging.log4j.scala.Logging

object Trials extends Logging 

  def main(args: Array[String]) 
    val env = ExecutionEnvironment.getExecutionEnvironment

    // some dataset which could be huge in reality
    val dataSet = env.fromElements((1, 436), (2, 235), (3, 67), (4, 51), (5, 15), (6, 62), (7, 155))

    // some complex joins, coGroup functions etc.
    val joined = dataSet.cross(dataSet).filter(tuple => (tuple._1._2 + tuple._2._2) % 7 == 0)

    // log the number of rows --> performs the join above
    logger.info(f"results contains $joined.count() rows")

    // convert to Gelly graph format
    val graph = Graph.fromDataSet(
      dataSet.map(nodeTuple => new Vertex[Long, Long](nodeTuple._1, nodeTuple._2)),
      joined.map(edgeTuple => new Edge[Long, String](edgeTuple._1._1, edgeTuple._2._1, "someValue")),
      env
    )

    // do something with the graph
    logger.info("get number of vertices")
    val numberOfVertices = graph.numberOfVertices()
    logger.info("get number of edges")
    val numberOfEdges = graph.numberOfEdges() // --> performs the join again!

    logger.info(f"the graph has $numberOfVertices vertices and $numberOfEdges edges")
  


所需库:log4j-core、log4j-api-scala_2.11、flink-core、flink-scala_2.11、flink-gelly-scala_2.10

【问题讨论】:

你能发布一个你想要缓存的动作和数据集的例子吗? 添加了我的代码的简化版本 你找到解决这个问题的方法了吗?我有同样的问题 一段时间后我们才找到ci.apache.org/projects/flink/flink-docs-release-1.6/dev/batch/…,但我们不再尝试了。据我所知,它仅适用于静态数据源,而不适用于连接/交叉/过滤等操作的结果。也许解决方案是在中间使用一些 NoSQL 数据库或队列,并在其他几个中读取Flink 进程......虽然由于读/写开销,我不太喜欢这个想法;-/ 当有人知道如何做时,我会很感激在这里找到一些指导! 【参考方案1】:

我认为,如果您需要对同一个流执行多个操作,则值得使用侧输出 - https://ci.apache.org/projects/flink/flink-docs-stable/dev/stream/side_output.html

一旦您执行了一些复杂的连接、coGroup 函数等并获得了joined 数据集,您就可以将这些值收集到不同的侧输出 - 一个稍后将计算计数,另一个将完成其他工作.

【讨论】:

听起来不错..无法尝试,因为我换了其他项目,但似乎解决了问题。

以上是关于缓存和持久化数据集的主要内容,如果未能解决你的问题,请参考以下文章

缓存和持久化有啥区别?

RDD缓存

(为啥)我们需要在 RDD 上调用缓存还是持久化

RDD的缓存,依赖,spark提交任务流程

Redis学习之数据持久化与数据恢复

面试系列16 redis的持久化