在 Python 中使用 Spark Streaming 解析 JSON 消息

Posted

技术标签:

【中文标题】在 Python 中使用 Spark Streaming 解析 JSON 消息【英文标题】:parsing JSON messages with Spark Streaming in Python 【发布时间】:2015-09-30 20:35:35 【问题描述】:

我有一个 "UserID": "Xxxx", "Count": 000 形式的 JSON 消息 Dstream。我想找出解析它的最佳方法,以便创建数据框。

这种情况下1和2有什么区别:

    parsed = kafkaStream.map(lambda x: json.loads(x)) parsed = kafkaStream.map(lambda x: json.loads(x[1])

【问题讨论】:

你试过了吗? 【参考方案1】:

这是 KafkaStream 特定的问题。您正在从 Kafka DSstream 接收 PAIR RDD。一对 rdd 是两个元素元组(键,值)。这就是为什么您必须选择第二个元素来检索值的原因。我会写

parsed = kafkaStream.map(lambda (key, value): json.loads(value))

在 Python 中,建议对未使用的变量使用 _,但在这种情况下,我会使用 key 来提醒我 lambda 正在接收对 RDD。

【讨论】:

【参考方案2】:

当您执行json.loads(x) 时,字符串(您的消息)被解析为字典,不确定您要对json.loads(x[1]) 做什么,但如果您想要字典第一个键的值,您应该去json.loads(x)["UserId"]。不知道是不是你没看懂。

例子:

import json

raw = """
    "UserId": "Xxx", 
    "Count": "0000"
"""

print(type(raw))
print(raw)

parsed = json.loads(raw)

print(type(parsed))
print(parsed)

parsed_partial = json.loads(raw)["UserId"]

print(type(parsed_partial))
print(parsed_partial)

输出:

<class 'str'>

    "UserId": "Xxx", 
    "Count": "0000"

<class 'dict'>
'UserId': 'Xxx', 'Count': '0000'
Xxx

如需了解map(),请阅读this。

【讨论】:

谢谢 jlnabais。我认为我的问题措辞/标记不正确,因为我对堆栈溢出、python、kafka 和 spark 流都很陌生。我想了解如何解析来自 kafka 的火花流作为 Dstream 消耗的 json 消息。在这种情况下, json.loads(x[0]) 是偏移量,而 json.loads(x[1]) 是包含 "UserID": "Xxxx", "Count": 000 的消息

以上是关于在 Python 中使用 Spark Streaming 解析 JSON 消息的主要内容,如果未能解决你的问题,请参考以下文章

Spark Direct Stream 不会为每个 kafka 分区创建并行流

Spark Stream Kafka 在 JavaStreamingContext.start 处挂起,没有创建 Spark 作业

Java8函数式编程:类比Spark RDD算子的Stream流操作

当我们按键加入两个 Stream 组时,内部会发生啥?

如何将 Spark 结构化流与 Kafka Direct Stream 结合使用?

Spark2.3(三十七):Stream join Stream(res文件每天更新一份)