如何将火花流输出转换为数据帧或存储在表中

Posted

技术标签:

【中文标题】如何将火花流输出转换为数据帧或存储在表中【英文标题】:How to convert spark streaming output into dataframe or storing in table 【发布时间】:2018-02-26 15:23:31 【问题描述】:

我的代码是:

val lines = KafkaUtils.createStream(ssc, "localhost:2181", "spark-streaming-consumer-group", Map("hello" -> 5))
val data=lines.map(_._2)
data.print()

我的输出有 50 个不同的值,格式如下

"id:st04","data:26-02-2018 20:30:40","temp:30", "press:20"

谁能帮我将这些数据以表格形式存储为

| id |date               |temp|press|   
|st01|26-02-2018 20:30:40| 30 |20   |  
|st01|26-02-2018 20:30:45| 80 |70   |  

我会很感激的。

【问题讨论】:

【参考方案1】:

您可以使用 foreachRDD 函数,以及普通的 Dataset API:

data.foreachRDD(rdd => 
    // rdd is RDD[String]
    // foreachRDD is executed on the  driver, so you can use SparkSession here; spark is SparkSession, for Spark 1.x use SQLContext
    val df = spark.read.json(rdd); // or sqlContext.read.json(rdd)
    df.show(); 
    df.write.saveAsTable("here some unique table ID");
);

但是,如果您使用 Spark 2.x,我建议您使用结构化流:

val stream = spark.readStream.format("kafka").load()
val data = stream
            .selectExpr("cast(value as string) as value")
            .select(from_json(col("value"), schema))
data.writeStream.format("console").start();

您必须手动指定架构,但这很简单:) 还要在任何处理之前导入org.apache.spark.sql.functions._

【讨论】:

我不能使用 spark.load.json。即使使用 SQLContext 也会给出错误。我试过这个 val sqlcontext=new SQLContext(sc) import sqlcontext.implicits._ data.foreachRDD rdd=> val dfi=sqlcontext.load.json(rdd) 但它对 sqlcontext 上的重载定义给出了模棱两可的参考。 @huny 您使用哪个 Spark 版本?错误是什么?一般来说,它应该工作。也许你有一些极端情况:) 我使用的是 spark 2.10 版本 1.5.1 @ T. Gawęda 我试过了,val res=rdd.toDF().registerTempTable("sensor") System.ln.print(res).. 没有错误,但显示为空() 非常感谢您的耐心和解答。终于解决了:)

以上是关于如何将火花流输出转换为数据帧或存储在表中的主要内容,如果未能解决你的问题,请参考以下文章

有啥方法可以检查我的流分析输入是不是已经在表中?

如何从单个 JSON(IOT HUB)将流分析输出转换为多行

Oozie - 从 Hive 操作中捕获输出

如何获取火花行的 value_counts?

Snowflake TIMESTAMP - 将毫秒值保留为 .000000,存储在表中

将文件输出流转换为字符串