IntelliJ 中的结构化流未向控制台显示 DataFrame

Posted

技术标签:

【中文标题】IntelliJ 中的结构化流未向控制台显示 DataFrame【英文标题】:Structured Streaming in IntelliJ not showing DataFrame to console 【发布时间】:2018-09-08 00:15:48 【问题描述】:

我正在尝试使用结构化流加载 Spark 流数据帧,但无法使用 IntelliJ Idea 在控制台中看到任何输出。

我的代码:

import org.apache.spark.sql._

Object SparkConsumerTest 

  def main(args: Array[String]): Unit = 

    System.setProperty("hadoop.home.dir", "C:\\hadoop\\")

    val spark = SparkSession
      .builder
      .appName("test_local")
      .config("spark.master", "local")
      .getOrCreate()

    val data_stream = spark.readStream.text("src/main/resources/data_string.txt")

    val result = data_stream.writeStream.format("console").start()

我的 data_string.txt 文件中有什么

structured streaming

这是我运行应用程序后 IntelliJ Idea 中的控制台/运行窗口

Using Spark's default log4j profile:
org/apache/spark/log4j-defaults.properties

18/09/07 19:03:33 INFO SparkContext: Running Spark version 2.1.0
18/09/07 19:03:33 WARN NativeCodeLoader: Unable to load native-hadoop
library for your platform... using builtin-java classes where
applicable 

18/09/07 19:03:33 INFO SecurityManager: Changing view acls to: userID 

18/09/07 19:03:33 INFO SecurityManager: Changing modify acls to:
userID 

18/09/07 19:03:33 INFO SecurityManager: Changing view acls groups to: 


18/09/07 19:03:33 INFO SecurityManager: Changing modify acls groups
to:  

18/09/07 19:03:33 INFO SecurityManager: SecurityManager:
authentication  disabled; ui acls disabled; users  with view
permissions: Set(userID); groups with view permissions: Set(); users 
with modify permissions: Set(userID); groups with modify permissions:
Set() 

18/09/07 19:03:34 INFO Utils: Successfully started service
'sparkDriver' on port 60845. 

18/09/07 19:03:34 INFO SparkEnv: Registering MapOutputTracker 

18/09/07 19:03:34 INFO SparkEnv: Registering BlockManagerMaster 

18/09/07 19:03:34 INFO BlockManagerMasterEndpoint: Using
org.apache.spark.storage.DefaultTopologyMapper for getting topology
information 

18/09/07 19:03:34 INFO BlockManagerMasterEndpoint:
BlockManagerMasterEndpoint up 

18/09/07 19:03:34 INFO DiskBlockManager: Created local directory at
C:\Users\userID\AppData\Local\Temp\etc...


18/09/07 19:03:34 INFO MemoryStore: MemoryStore started with capacity
1983.3 MB 

18/09/07 19:03:34 INFO SparkEnv: Registering OutputCommitCoordinator 

18/09/07 19:03:35 INFO Utils: Successfully started service 'SparkUI'
on port 4040. 

18/09/07 19:03:35 INFO SparkUI: Bound SparkUI to
0.0.0.0, and started at "http address"

18/09/07 19:03:35 INFO Executor: Starting executor ID driver on host
localhost 

18/09/07 19:03:35 INFO Utils: Successfully started service
'org.apache.spark.network.netty.NettyBlockTransferService' on port
60855. 

18/09/07 19:03:35 INFO NettyBlockTransferService: Server created on
"server address" 

18/09/07 19:03:35 INFO BlockManager: Using
org.apache.spark.storage.RandomBlockReplicationPolicy for block
replication policy 

18/09/07 19:03:35 INFO BlockManagerMaster: Registering BlockManager
BlockManagerId(driver, server address, 60855, None) 

18/09/07 19:03:35 INFO BlockManagerMasterEndpoint: Registering block
manager server address with 1983.3 MB RAM, BlockManagerId(driver,
server address, 60855, None) 

18/09/07 19:03:35 INFO BlockManagerMaster: Registered BlockManager
BlockManagerId(driver, server address, 60855, None) 

18/09/07 19:03:35 INFO BlockManager: Initialized BlockManager:
BlockManagerId(driver,
server address, 60855, None) 

18/09/07 19:03:35 INFO SharedState: Warehouse path is
'file:/C:/Users/userid/Documents//SparkTestLocal/spark-warehouse/'.

Process finished with exit code 0

【问题讨论】:

【参考方案1】:

这是因为您没有输入awaitTermination。您需要添加以下内容,

result.awaitTermination()

在开始查询后,

val result = data_stream.writeStream.format("console").start()

希望这会有所帮助。

【讨论】:

这成功了,谢谢!我想知道为什么结构化流媒体指南的示例中没有更清楚地说明这一点。

以上是关于IntelliJ 中的结构化流未向控制台显示 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

Firestore 单文档快照流未更新

最小成本流未优化路线

CloudWatch 中的 AWS Canary 未向服务报告

**已解决** discord.js guildMemberAdd() 未向对象添加正确的值

WebClient 未向 Windows Phone 中的 Web 服务发送授权标头

Firebase 未向设备中的电话号码发送 OTP