Spark Streaming Scala 将不同结构的 json 组合成一个 DataFrame

Posted

技术标签:

【中文标题】Spark Streaming Scala 将不同结构的 json 组合成一个 DataFrame【英文标题】:Spark Streaming Scala combine json of different structure to form a DataFrame 【发布时间】:2017-07-14 10:31:28 【问题描述】:

我正在尝试处理来自 Kinesis 的 Json 字符串。 Json 字符串可以有几种不同的形式。从 Kinesis,我创建了一个 DStream:

val kinesisStream = KinesisUtils.createStream(
 ssc, appName, "Kinesis_Stream", "kinesis.ap-southeast-1.amazonaws.com",
 "region", InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)

 val lines = kinesisStream.map(x => new String(x))

 lines.foreachRDD((rdd, time) =>

   val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
   import sqlContext.implicits.StringToColumn

   if(rdd.count() > 0)
    // Process jsons here
    // Json strings here would have either one of the formats below
   
 )

RDD 字符串将具有这些 json 字符串之一。 收藏:

[
  
    "data": 
      "ApplicationVersion": "1.0.3 (65)",
      "ProjectId": 30024,
      "TargetId": "4138",
      "Timestamp": 0
    ,
    "host": "host1"
  ,
  
    "data": 
      "ApplicationVersion": "1.0.3 (65)",
      "ProjectId": 30025,
      "TargetId": "4139",
      "Timestamp": 0
    ,
    "host": "host1"
  
]

有些 Json 字符串是像这样的单个对象:


      "ApplicationVersion": "1.0.3 (65)",
      "ProjectId": 30026,
      "TargetId": "4140",
      "Timestamp": 0

我希望能够从“data”键中提取对象,如果它是第一种Json字符串并与第二种Json结合形成RDD/DataFrame,我该如何实现?

最终我希望我的数据框是这样的:

+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
|        1.0.3 (65)|    30024|    4138|        0|
|        1.0.3 (65)|    30025|    4139|        0|
|        1.0.3 (65)|    30026|    4140|        0|
+------------------+---------+--------+---------+

抱歉,Scala 和 Spark 的新手。我一直在查看现有示例,但遗憾的是没有找到解决方案。

非常感谢。

【问题讨论】:

【参考方案1】:

本例使用json4s

import org.json4s._
import org.json4s.jackson.JsonMethods._

implicit val format = DefaultFormats

case class jsonschema ( ApplicationVersion: String, ProjectId: String, TargetId: String, Timestamp:Int )

val string1 = """
[ 
  "data" : 
    "ApplicationVersion" : "1.0.3 (65)",
    "ProjectId" : 30024,
    "TargetId" : "4138",
    "Timestamp" : 0
  ,
  "host" : "host1"
, 
  "data" : 
    "ApplicationVersion" : "1.0.3 (65)",
    "ProjectId" : 30025,
    "TargetId" : "4139",
    "Timestamp" : 0
  ,
  "host" : "host1"
 ]

"""

val string2 = """
[ 
  "ApplicationVersion" : "1.0.3 (65)",
  "ProjectId" : 30025,
  "TargetId" : "4140",
  "Timestamp" : 0
, 
  "ApplicationVersion" : "1.0.3 (65)",
  "ProjectId" : 30025,
  "TargetId" : "4141",
  "Timestamp" : 0
 ]
"""

val json1 = (parse(string1) \ "data").extract[List[jsonschema]]

val json2 = parse(string2).extract[List[jsonschema]]

val jsonRDD = json1.union(json2)

val df = sqlContext.createDataFrame(jsonRDD)

df.show


+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
|        1.0.3 (65)|    30024|    4138|        0|
|        1.0.3 (65)|    30025|    4139|        0|
|        1.0.3 (65)|    30025|    4140|        0|
|        1.0.3 (65)|    30025|    4141|        0|
+------------------+---------+--------+---------+

【讨论】:

感谢您的快速响应!抱歉,我忘了提到我正在使用 Spark Streaming DStreams,我已经更新了我的问题。不过,您的回复仍然很有帮助! 如果您能够从 DStream 中提取字符串,那么代码应该或多或少可以工作。 谢谢!这通过使用 json4s 为我指明了正确的方向。这允许我在转换为 DF 之前先处理 json 字符串【参考方案2】:

您可以在从第一个Dataframe 中选择data.* 列后使用联合:

val spark = SparkSession.builder().master("local[*]").getOrCreate()    
val sc = spark.sparkContext

// Assuming you store your jsons in two separate strings `json1` and `json2`
val df1 = spark.read.json(sc.parallelize(Seq(json1)))
val df2 = spark.read.json(sc.parallelize(Seq(json2)))

import spark.implicits._
df1.select($"data.*") // Select only the data columns from first Dataframe
  .union(df2)         // Union the two Dataframes as they have the same structure
  .show()

编辑[其他解决方案链接]

在您编辑问题后,我了解到您在解析 JSON 文件时需要某种回退机制。使用任何 JSON 解析库还有更多方法可以做到这一点 - Play 有一个很好的解决方案 here,我认为它已经解释了如何以优雅的方式解决该问题。

一旦您有了 RDD[Data],其中 data 是您的“变体”类型,您就可以使用 rdd.toDF() 简单地将其转换为 Dataframe

希望对您有所帮助。

【讨论】:

感谢安德烈的快速响应,我很感激,而且非常有用!抱歉,我忘了提到我使用的是 Spark Streaming DStreams,我已经更新了我上面的问题。 我明白了。是否有一种简单的方法可以知道哪个对象何时到来? 很遗憾没有 我已经编辑了答案。这应该可以帮助您实现所需的目标,我只是想避免将所有细节都放在这里,因为这实际上是两个独立的问题。希望对你有帮助:)

以上是关于Spark Streaming Scala 将不同结构的 json 组合成一个 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 Scala Case Class 在 Spark Structured Streaming 中映射 Kafka 源

scala spark-streaming整合kafka (spark 2.3 kafka 0.10)

Spark 大数据 视频教程 安装 SQL Streaming Scala Hive Hadoop

大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池

013- Kafka应用之Kafka与Spark Streaming整合

如何在idea里面直接运行spark streaming程序