foreach 内的 Spark Streaming 过滤条件 - NullPointerException

Posted

技术标签:

【中文标题】foreach 内的 Spark Streaming 过滤条件 - NullPointerException【英文标题】:Spark Streaming filter condition inside foreach - NullPointerException 【发布时间】:2016-08-02 14:17:00 【问题描述】:

我们有一个流数据,我在 HBase 表中有一些主信息。对于每一行,我都需要查找 HBase 主表并获取一些配置文件信息。我的代码是这样的

val con             = new setContext(hadoopHome,sparkMaster)
val l_sparkcontext  = con.getSparkContext
val l_hivecontext   = con.getHiveContext

val topicname       = "events"
val ssc             = new StreamingContext(l_sparkcontext, Seconds(30))
val eventsStream = KafkaUtils.createStream(ssc,"xxx.xxx.142.xxx:2181","receive_rest_events",Map(topicname.toString -> 10))
println("Kafka Stream for receiving Events.." )

val profile_data = l_hivecontext.sql("select gender, income, age, riid from hbase_customer_profile")
profile_data.foreach(println)
val tabBC = l_sparkcontext.broadcast(profile_data)

eventsStream.foreachRDD(rdd => 
    rdd.foreach(record => 
    val subs_profile_rows = tabBC.value
    val Rows = record._2.split(rowDelim)
    Rows.foreach(row => 
      val values = row.split(colDelim)
      val riid = values(1).toInt
      val cond = "riid = " + riid
      println("Condition : ", cond)
      val enriched_events = subs_profile_rows.filter(cond)
    ) // End of Rows
  ) // End of RDD
) // End of Events Stream

不幸的是,我总是在过滤器上点击 NPE。我在这里遵循了几个问题和答案来跨工作节点广播值,但没有任何帮助。有人可以帮忙吗。

问候

巴拉

【问题讨论】:

检查是否使用了无法序列化的值。 我不确定 profile_data 是否应该在 foreach 中创建,那是不可序列化的。 【参考方案1】:

您的上下文使用看起来有点可疑......对我来说,看起来您正在创建两个单独的上下文(一个火花,一个用于火花流),然后尝试在这些上下文之间共享一个广播变量(这不会工作)。

我们编写了一些类似的代码。如果您有兴趣,以下是展示我们如何在 Splice Machine(开源)中做到这一点的视频。我会尝试找到代码或让其他人为您发布。

http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-part/

http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-ii/

祝你好运。

【讨论】:

谢谢约翰。我会查看视频。要求是从 HBase 表中读取来自 DStream 的数据的配置文件信息。我也得到了 forEachPartition(将更改的代码作为下一条评论发布),但这给了我不同的错误。如果你能得到它,我会等待代码。非常感谢您的帮助 由于空间限制,我将不得不将我的代码分成 2 个帖子。 - 开始类 setContext(argHadoopHome: String, argSparkMaster: String) System.setProperty("hadoop.home.dir",argHadoopHome) val conf = new SparkConf().setMaster(argSparkMaster); conf.setAppName("Evts"); private val l_valSparkContext = new SparkContext(conf) private val l_hiveContext = new HiveContext(l_valSparkContext) def getSparkContext = l_valSparkContext def getHiveContext = l_hiveContext def getconfContext = conf object receiveEvents def main(args: Array[String]) : Unit = var rD = "\r\n" var cD = "," var sM = "spark://nm2: 7077" var ip = "nm2:2181" var hadoopHome = "/home/.." val con = new setContext(ip,sM) val l_sparkcontext = con.getSparkContext val topicname = "evt" val ssc = new StreamingContext(l_sparkcontext, Seconds(9)) val eventsStream = KafkaUtils.createStream(ssc,"nm2:2181","rcv",Map(topicname.toString -> 2)) val profile_data = w_hivecontext.sql("从 hb_cust_pro 中选择性别、收入、年龄") eventsStream.foreachRDD(rdd => rdd.foreachPartition(records => val wc = new setContext(hH,sM) // worker 中的连接 val w_hivecontext = wc.getHiveContext // 序列化记录。 foreach(rec => // Recs val Rows = rec._2.split(rD) // as Rows Rows.foreach(row => // Row val values = row.split(cD) // 拆分为列 val riid = values(1).toInt // for filter val cond = "riid="+riid profile_data.filter(cond) // 延迟加载-过滤条件 profile_data.foreach(println) ) //Rows ) //RDD ) //Part ) //Str ssc.start() ssc.awaitTermination() 再次感谢,代码不会像上面粘贴的那样工作。由于空间限制,我不得不删除一些东西。但它会给出整个画面。谢谢!

以上是关于foreach 内的 Spark Streaming 过滤条件 - NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章

sparkstreaming+kafka

Spark 结构化流文件源起始偏移量

Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN

Spark Streaming性能优化: 如何在生产环境下应对流数据峰值巨变

Spark(21)——foreachPartition foreach

C++ MFC VC 6.0 到 VS2013 lStreamReturn = GetRichEditCtrl().StreamIn(SF_RTF, es);