在 apache spark 流中使用 foreachRDD 内的数据库连接

Posted

技术标签:

【中文标题】在 apache spark 流中使用 foreachRDD 内的数据库连接【英文标题】:Using a db connection inside foreachRDD in apache spark streaming 【发布时间】:2016-10-18 00:18:22 【问题描述】:

在火花流中,我想在处理每个批次之前查询数据库,将结果存储在可以序列化并通过网络发送到执行程序的哈希图中。

class ExecutingClass implements Serializable 
 init(DB db) 

   try(JavaStreamingContext jsc = new JavaStreamingContext(...)) 

   JavaPairInputDStream<String,String> kafkaStream = getKafkaStream(jsc);

   kafkaStream.foreachRDD(rdd -> 
   // this part is supposed to execute in the driver
  Map<String, String> indexMap = db.getIndexMap();// connects to a db, queries the results as a map

  JavaRDD<String> results = processRDD(rdd, indexMap);

  ...  

 


  
    JavaRDD<String> processRDD(JavaPairRDD<String, String> rdd,       Map<String,String> indexMap) 
 ... 
    
    

在上面的代码中,indexMap 应该在驱动程序中初始化,生成的映射用于处理 rdd。当我在 foreachRDD 闭包之外声明 indexMap 时我没有问题,但是当我在里面执行它时出现序列化错误。这是什么原因?

我想做这样的事情的原因是为了确保我从数据库中获得每个批次的最新值。我怀疑这是由于 foreachRDD 的关闭试图序列化关闭之外的所有内容。

【问题讨论】:

为什么不能为此目的使用累加器(读写)/广播(只读)?在这种情况下,因为它是读写累加器,所以有意义不是吗? 闭包内的代码将被序列化并发送给执行器。所以我假设db.getIndexMap() 不能为此目的进行序列化。 @LiMuBei 这就是问题所在。对于每一批数据,我们先查询数据库得到indexMap,然后只传递indexMap进行处理。 【参考方案1】:

您在 forEachRdd 内部使用 db 对象(这是 DB 的实例),因此 spark 尝试序列化 db ,为避免这种情况,我们需要在 forEachRdd 内部创建 DB 连接(或)您可以使用下面文章中讨论的对象池 http://www.michael-noll.com/blog/2014/10/01/kafka-spark-streaming-integration-example-tutorial/

【讨论】:

以上是关于在 apache spark 流中使用 foreachRDD 内的数据库连接的主要内容,如果未能解决你的问题,请参考以下文章

在 Spark 流中找不到 KafkaUtils 类

如何在 Spark 结构化流中获取书面记录的数量?

Spark 结构化流中的外部连接

带有自定义接收器的 Spark 结构化流中的输入行数

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

如何在 Spark 结构化流中保存通过水印丢弃的记录