Spark Streaming Scala 将不同结构的 json 组合成一个 DataFrame
Posted
技术标签:
【中文标题】Spark Streaming Scala 将不同结构的 json 组合成一个 DataFrame【英文标题】:Spark Streaming Scala combine json of different structure to form a DataFrame 【发布时间】:2017-07-14 10:31:28 【问题描述】:我正在尝试处理来自 Kinesis 的 Json 字符串。 Json 字符串可以有几种不同的形式。从 Kinesis,我创建了一个 DStream:
val kinesisStream = KinesisUtils.createStream(
ssc, appName, "Kinesis_Stream", "kinesis.ap-southeast-1.amazonaws.com",
"region", InitialPositionInStream.LATEST, kinesisCheckpointInterval, StorageLevel.MEMORY_AND_DISK_2)
val lines = kinesisStream.map(x => new String(x))
lines.foreachRDD((rdd, time) =>
val sqlContext = SQLContextSingleton.getInstance(rdd.sparkContext)
import sqlContext.implicits.StringToColumn
if(rdd.count() > 0)
// Process jsons here
// Json strings here would have either one of the formats below
)
RDD 字符串将具有这些 json 字符串之一。 收藏:
[
"data":
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30024,
"TargetId": "4138",
"Timestamp": 0
,
"host": "host1"
,
"data":
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30025,
"TargetId": "4139",
"Timestamp": 0
,
"host": "host1"
]
有些 Json 字符串是像这样的单个对象:
"ApplicationVersion": "1.0.3 (65)",
"ProjectId": 30026,
"TargetId": "4140",
"Timestamp": 0
我希望能够从“data”键中提取对象,如果它是第一种Json字符串并与第二种Json结合形成RDD/DataFrame,我该如何实现?
最终我希望我的数据框是这样的:
+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
| 1.0.3 (65)| 30024| 4138| 0|
| 1.0.3 (65)| 30025| 4139| 0|
| 1.0.3 (65)| 30026| 4140| 0|
+------------------+---------+--------+---------+
抱歉,Scala 和 Spark 的新手。我一直在查看现有示例,但遗憾的是没有找到解决方案。
非常感谢。
【问题讨论】:
【参考方案1】:本例使用json4s
:
import org.json4s._
import org.json4s.jackson.JsonMethods._
implicit val format = DefaultFormats
case class jsonschema ( ApplicationVersion: String, ProjectId: String, TargetId: String, Timestamp:Int )
val string1 = """
[
"data" :
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30024,
"TargetId" : "4138",
"Timestamp" : 0
,
"host" : "host1"
,
"data" :
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30025,
"TargetId" : "4139",
"Timestamp" : 0
,
"host" : "host1"
]
"""
val string2 = """
[
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30025,
"TargetId" : "4140",
"Timestamp" : 0
,
"ApplicationVersion" : "1.0.3 (65)",
"ProjectId" : 30025,
"TargetId" : "4141",
"Timestamp" : 0
]
"""
val json1 = (parse(string1) \ "data").extract[List[jsonschema]]
val json2 = parse(string2).extract[List[jsonschema]]
val jsonRDD = json1.union(json2)
val df = sqlContext.createDataFrame(jsonRDD)
df.show
+------------------+---------+--------+---------+
|ApplicationVersion|ProjectId|TargetId|Timestamp|
+------------------+---------+--------+---------+
| 1.0.3 (65)| 30024| 4138| 0|
| 1.0.3 (65)| 30025| 4139| 0|
| 1.0.3 (65)| 30025| 4140| 0|
| 1.0.3 (65)| 30025| 4141| 0|
+------------------+---------+--------+---------+
【讨论】:
感谢您的快速响应!抱歉,我忘了提到我正在使用 Spark Streaming DStreams,我已经更新了我的问题。不过,您的回复仍然很有帮助! 如果您能够从 DStream 中提取字符串,那么代码应该或多或少可以工作。 谢谢!这通过使用 json4s 为我指明了正确的方向。这允许我在转换为 DF 之前先处理 json 字符串【参考方案2】:您可以在从第一个Dataframe
中选择data.*
列后使用联合:
val spark = SparkSession.builder().master("local[*]").getOrCreate()
val sc = spark.sparkContext
// Assuming you store your jsons in two separate strings `json1` and `json2`
val df1 = spark.read.json(sc.parallelize(Seq(json1)))
val df2 = spark.read.json(sc.parallelize(Seq(json2)))
import spark.implicits._
df1.select($"data.*") // Select only the data columns from first Dataframe
.union(df2) // Union the two Dataframes as they have the same structure
.show()
编辑[其他解决方案链接]
在您编辑问题后,我了解到您在解析 JSON 文件时需要某种回退机制。使用任何 JSON 解析库还有更多方法可以做到这一点 - Play 有一个很好的解决方案 here,我认为它已经解释了如何以优雅的方式解决该问题。
一旦您有了 RDD[Data]
,其中 data 是您的“变体”类型,您就可以使用 rdd.toDF()
简单地将其转换为 Dataframe
。
希望对您有所帮助。
【讨论】:
感谢安德烈的快速响应,我很感激,而且非常有用!抱歉,我忘了提到我使用的是 Spark Streaming DStreams,我已经更新了我上面的问题。 我明白了。是否有一种简单的方法可以知道哪个对象何时到来? 很遗憾没有 我已经编辑了答案。这应该可以帮助您实现所需的目标,我只是想避免将所有细节都放在这里,因为这实际上是两个独立的问题。希望对你有帮助:)以上是关于Spark Streaming Scala 将不同结构的 json 组合成一个 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章
如何使用 Scala Case Class 在 Spark Structured Streaming 中映射 Kafka 源
scala spark-streaming整合kafka (spark 2.3 kafka 0.10)
Spark 大数据 视频教程 安装 SQL Streaming Scala Hive Hadoop
大数据技术之_27_电商平台数据分析项目_02_预备知识 + Scala + Spark Core + Spark SQL + Spark Streaming + Java 对象池