无法在 Spark 流应用程序中打印
Posted
技术标签:
【中文标题】无法在 Spark 流应用程序中打印【英文标题】:Not able to print in Spark streaming application 【发布时间】:2017-07-17 18:49:56 【问题描述】:火花流应用程序不会将简单的语句打印到driver's stdout
,这里我试图在转换 dstream_2 之后打印一些语句,但它只打印第一批。我希望它会在每次批处理执行时打印出来。
val sparkConf = new SparkConf().setMaster("yarn-cluster")
.setAppName("SparkJob")
.set("spark.executor.memory","2G")
.set("spark.dynamicAllocation.executorIdleTimeout","5")
val streamingContext = new StreamingContext(sparkConf, Minutes(1))
var historyRdd: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD
var historyRdd_2: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD
val stream_1 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams , Set(inputTopic_1))
val dstream_2 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams , Set(inputTopic_2))
val dstream_2 = stream_2.map((r: Tuple2[String, GenericData.Record]) =>
//some mapping
//Not Working
print("Printing Test")
val historyDStream = dstream_1.transform(rdd => rdd.union(historyRdd))
dstream_2.foreachRDD(r => r.repartition(500))
val historyDStream_2 = dstream_2.transform(rdd => rdd.union(historyRdd_2))
val fullJoinResult = historyDStream.fullOuterJoin(historyDStream_2)
val filtered = fullJoinResult.filter(r => r._2._1.isEmpty)
filtered.foreachRDDrdd =>
val formatted = rdd.map(r => (r._1 , r._2._2.get))
historyRdd_2.unpersist(false) // unpersist the 'old' history RDD
historyRdd_2 = formatted // assign the new history
historyRdd_2.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
val filteredStream = fullJoinResult.filter(r => r._2._2.isEmpty)
filteredStream.foreachRDDrdd =>
val formatted = rdd.map(r => (r._1 , r._2._1.get))
historyRdd.unpersist(false) // unpersist the 'old' history RDD
historyRdd = formatted // assign the new history
historyRdd.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation
streamingContext.start()
streamingContext.awaitTermination()
【问题讨论】:
【参考方案1】:print("Printing Test")
在该位置上只会在第一次评估程序时打印一次。
要在每个批处理间隔上添加一些控制台输出,我们需要将 I/O 操作放在输出操作的范围内:
每次都会打印出来:
dstream2.foreachRDD _ -> print("Printing Test")
【讨论】:
日志记录(slf4j)也一样吗? 您能否提供这种打印行为的原因。 日志语句应该一样。重新行为:火花流在 dstreams 上运行。 dstream 操作范围之外的任何内容都将被评估为程序中的任何正常代码。要理解的重要一点是,对 dstream 的操作仅在程序中声明。实际执行发生在火花流调度程序中。 因此,如果我必须在脚本中进行一些计算,然后将其转储到 HDFS ,我怎么能做到这一点。并且还调用foreachRDD
可能会产生开销。
如果需要,我建议你尝试提出一个新问题以上是关于无法在 Spark 流应用程序中打印的主要内容,如果未能解决你的问题,请参考以下文章
与硬编码输入相比,使用 fgets 从用户获取密钥时无法打印密钥流