无法在 Spark 流应用程序中打印

Posted

技术标签:

【中文标题】无法在 Spark 流应用程序中打印【英文标题】:Not able to print in Spark streaming application 【发布时间】:2017-07-17 18:49:56 【问题描述】:

火花流应用程序不会将简单的语句打印到driver's stdout,这里我试图在转换 dstream_2 之后打印一些语句,但它只打印第一批。我希望它会在每次批处理执行时打印出来。

val sparkConf = new SparkConf().setMaster("yarn-cluster")
                               .setAppName("SparkJob")
                               .set("spark.executor.memory","2G")
                               .set("spark.dynamicAllocation.executorIdleTimeout","5")


val streamingContext = new StreamingContext(sparkConf, Minutes(1))

var historyRdd: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD

var historyRdd_2: RDD[(String, ArrayList[String])] = streamingContext.sparkContext.emptyRDD


val stream_1 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams ,  Set(inputTopic_1))
val dstream_2 = KafkaUtils.createDirectStream[String, GenericData.Record, StringDecoder, GenericDataRecordDecoder](streamingContext, kafkaParams ,  Set(inputTopic_2))


val dstream_2 = stream_2.map((r: Tuple2[String, GenericData.Record]) => 

  //some mapping

//Not Working
print("Printing Test")
val historyDStream = dstream_1.transform(rdd => rdd.union(historyRdd))
dstream_2.foreachRDD(r => r.repartition(500))
val historyDStream_2 = dstream_2.transform(rdd => rdd.union(historyRdd_2))
val fullJoinResult = historyDStream.fullOuterJoin(historyDStream_2)

 val filtered = fullJoinResult.filter(r => r._2._1.isEmpty)


filtered.foreachRDDrdd =>

  val formatted = rdd.map(r  => (r._1 , r._2._2.get)) 

  historyRdd_2.unpersist(false) // unpersist the 'old' history RDD
  historyRdd_2 = formatted // assign the new history
  historyRdd_2.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation



val filteredStream = fullJoinResult.filter(r => r._2._2.isEmpty)


filteredStream.foreachRDDrdd =>
  val formatted = rdd.map(r => (r._1 , r._2._1.get)) 
  historyRdd.unpersist(false) // unpersist the 'old' history RDD
  historyRdd = formatted // assign the new history
  historyRdd.persist(StorageLevel.MEMORY_AND_DISK) // cache the computation

streamingContext.start()
streamingContext.awaitTermination()

【问题讨论】:

【参考方案1】:

print("Printing Test") 在该位置上只会在第一次评估程序时打印一次。 要在每个批处理间隔上添加一些控制台输出,我们需要将 I/O 操作放在输出操作的范围内:

每次都会打印出来:

dstream2.foreachRDD _ -> print("Printing Test") 

【讨论】:

日志记录(slf4j)也一样吗? 您能否提供这种打印行为的原因。 日志语句应该一样。重新行为:火花流在 dstreams 上运行。 dstream 操作范围之外的任何内容都将被评估为程序中的任何正常代码。要理解的重要一点是,对 dstream 的操作仅在程序中声明。实际执行发生在火花流调度程序中。 因此,如果我必须在脚本中进行一些计算,然后将其转储到 HDFS ,我怎么能做到这一点。并且还调用 foreachRDD 可能会产生开销。 如果需要,我建议你尝试提出一个新问题

以上是关于无法在 Spark 流应用程序中打印的主要内容,如果未能解决你的问题,请参考以下文章

与硬编码输入相比,使用 fgets 从用户获取密钥时无法打印密钥流

标准流打印无关字符

Spark结构化流内存绑定

如何在 Spark 流数据框中获取列的滞后?

地铁译:Spark for python developers ---构建Spark批处理和流处理应用前的数据准备

如何在已应用于 Spark RDD 的函数中打印语句?