在 Python 中使用 Spark Streaming 解析 JSON 消息
Posted
技术标签:
【中文标题】在 Python 中使用 Spark Streaming 解析 JSON 消息【英文标题】:parsing JSON messages with Spark Streaming in Python 【发布时间】:2015-09-30 20:35:35 【问题描述】:我有一个 "UserID": "Xxxx", "Count": 000
形式的 JSON 消息 Dstream。我想找出解析它的最佳方法,以便创建数据框。
这种情况下1和2有什么区别:
parsed = kafkaStream.map(lambda x: json.loads(x))
parsed = kafkaStream.map(lambda x: json.loads(x[1])
【问题讨论】:
你试过了吗? 【参考方案1】:这是 KafkaStream 特定的问题。您正在从 Kafka DSstream 接收 PAIR RDD。一对 rdd 是两个元素元组(键,值)。这就是为什么您必须选择第二个元素来检索值的原因。我会写
parsed = kafkaStream.map(lambda (key, value): json.loads(value))
在 Python 中,建议对未使用的变量使用 _
,但在这种情况下,我会使用 key
来提醒我 lambda 正在接收对 RDD。
【讨论】:
【参考方案2】:当您执行json.loads(x)
时,字符串(您的消息)被解析为字典,不确定您要对json.loads(x[1])
做什么,但如果您想要字典第一个键的值,您应该去json.loads(x)["UserId"]
。不知道是不是你没看懂。
例子:
import json
raw = """
"UserId": "Xxx",
"Count": "0000"
"""
print(type(raw))
print(raw)
parsed = json.loads(raw)
print(type(parsed))
print(parsed)
parsed_partial = json.loads(raw)["UserId"]
print(type(parsed_partial))
print(parsed_partial)
输出:
<class 'str'>
"UserId": "Xxx",
"Count": "0000"
<class 'dict'>
'UserId': 'Xxx', 'Count': '0000'
Xxx
如需了解map()
,请阅读this。
【讨论】:
谢谢 jlnabais。我认为我的问题措辞/标记不正确,因为我对堆栈溢出、python、kafka 和 spark 流都很陌生。我想了解如何解析来自 kafka 的火花流作为 Dstream 消耗的 json 消息。在这种情况下, json.loads(x[0]) 是偏移量,而 json.loads(x[1]) 是包含 "UserID": "Xxxx", "Count": 000 的消息以上是关于在 Python 中使用 Spark Streaming 解析 JSON 消息的主要内容,如果未能解决你的问题,请参考以下文章
Spark Direct Stream 不会为每个 kafka 分区创建并行流
Spark Stream Kafka 在 JavaStreamingContext.start 处挂起,没有创建 Spark 作业
Java8函数式编程:类比Spark RDD算子的Stream流操作