foreach 内的 Spark Streaming 过滤条件 - NullPointerException
Posted
技术标签:
【中文标题】foreach 内的 Spark Streaming 过滤条件 - NullPointerException【英文标题】:Spark Streaming filter condition inside foreach - NullPointerException 【发布时间】:2016-08-02 14:17:00 【问题描述】:我们有一个流数据,我在 HBase 表中有一些主信息。对于每一行,我都需要查找 HBase 主表并获取一些配置文件信息。我的代码是这样的
val con = new setContext(hadoopHome,sparkMaster)
val l_sparkcontext = con.getSparkContext
val l_hivecontext = con.getHiveContext
val topicname = "events"
val ssc = new StreamingContext(l_sparkcontext, Seconds(30))
val eventsStream = KafkaUtils.createStream(ssc,"xxx.xxx.142.xxx:2181","receive_rest_events",Map(topicname.toString -> 10))
println("Kafka Stream for receiving Events.." )
val profile_data = l_hivecontext.sql("select gender, income, age, riid from hbase_customer_profile")
profile_data.foreach(println)
val tabBC = l_sparkcontext.broadcast(profile_data)
eventsStream.foreachRDD(rdd =>
rdd.foreach(record =>
val subs_profile_rows = tabBC.value
val Rows = record._2.split(rowDelim)
Rows.foreach(row =>
val values = row.split(colDelim)
val riid = values(1).toInt
val cond = "riid = " + riid
println("Condition : ", cond)
val enriched_events = subs_profile_rows.filter(cond)
) // End of Rows
) // End of RDD
) // End of Events Stream
不幸的是,我总是在过滤器上点击 NPE。我在这里遵循了几个问题和答案来跨工作节点广播值,但没有任何帮助。有人可以帮忙吗。
问候
巴拉
【问题讨论】:
检查是否使用了无法序列化的值。 我不确定 profile_data 是否应该在 foreach 中创建,那是不可序列化的。 【参考方案1】:您的上下文使用看起来有点可疑......对我来说,看起来您正在创建两个单独的上下文(一个火花,一个用于火花流),然后尝试在这些上下文之间共享一个广播变量(这不会工作)。
我们编写了一些类似的代码。如果您有兴趣,以下是展示我们如何在 Splice Machine(开源)中做到这一点的视频。我会尝试找到代码或让其他人为您发布。
http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-part/
http://community.splicemachine.com/splice-machine-tutorial-video-configuring-kafka-feed-splice-machine-ii/
祝你好运。
【讨论】:
谢谢约翰。我会查看视频。要求是从 HBase 表中读取来自 DStream 的数据的配置文件信息。我也得到了 forEachPartition(将更改的代码作为下一条评论发布),但这给了我不同的错误。如果你能得到它,我会等待代码。非常感谢您的帮助 由于空间限制,我将不得不将我的代码分成 2 个帖子。 - 开始类 setContext(argHadoopHome: String, argSparkMaster: String) System.setProperty("hadoop.home.dir",argHadoopHome) val conf = new SparkConf().setMaster(argSparkMaster); conf.setAppName("Evts"); private val l_valSparkContext = new SparkContext(conf) private val l_hiveContext = new HiveContext(l_valSparkContext) def getSparkContext = l_valSparkContext def getHiveContext = l_hiveContext def getconfContext = conf object receiveEvents def main(args: Array[String]) : Unit = var rD = "\r\n" var cD = "," var sM = "spark://nm2: 7077" var ip = "nm2:2181" var hadoopHome = "/home/.." val con = new setContext(ip,sM) val l_sparkcontext = con.getSparkContext val topicname = "evt" val ssc = new StreamingContext(l_sparkcontext, Seconds(9)) val eventsStream = KafkaUtils.createStream(ssc,"nm2:2181","rcv",Map(topicname.toString -> 2)) val profile_data = w_hivecontext.sql("从 hb_cust_pro 中选择性别、收入、年龄") eventsStream.foreachRDD(rdd => rdd.foreachPartition(records => val wc = new setContext(hH,sM) // worker 中的连接 val w_hivecontext = wc.getHiveContext // 序列化记录。 foreach(rec => // Recs val Rows = rec._2.split(rD) // as Rows Rows.foreach(row => // Row val values = row.split(cD) // 拆分为列 val riid = values(1).toInt // for filter val cond = "riid="+riid profile_data.filter(cond) // 延迟加载-过滤条件 profile_data.foreach(println) ) //Rows ) //RDD ) //Part ) //Str ssc.start() ssc.awaitTermination() 再次感谢,代码不会像上面粘贴的那样工作。由于空间限制,我不得不删除一些东西。但它会给出整个画面。谢谢!以上是关于foreach 内的 Spark Streaming 过滤条件 - NullPointerException的主要内容,如果未能解决你的问题,请参考以下文章
Apache Spark 2.2.0 中文文档 - Structured Streaming 编程指南 | ApacheCN
Spark Streaming性能优化: 如何在生产环境下应对流数据峰值巨变
Spark(21)——foreachPartition foreach
C++ MFC VC 6.0 到 VS2013 lStreamReturn = GetRichEditCtrl().StreamIn(SF_RTF, es);