如何解析 pyspark 的 DataStreamReader 中的 json 字符串列并创建数据框
Posted
技术标签:
【中文标题】如何解析 pyspark 的 DataStreamReader 中的 json 字符串列并创建数据框【英文标题】:How to parse a json string column in pyspark's DataStreamReader and create a Data Frame 【发布时间】:2019-02-15 01:21:48 【问题描述】:我正在阅读来自 kafka 主题的消息
messageDFRaw = spark.readStream\
.format("kafka")\
.option("kafka.bootstrap.servers", "localhost:9092")\
.option("subscribe", "test-message")\
.load()
messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as dict")
当我从上述查询中打印数据框时,我得到以下控制台输出。
|key|dict|
|#badbunny |"channel": "#badbunny", "username": "mgat22", "message": "cool"|
如何从 DataStreamReader 创建一个数据框,以便我有一个列为 |key|channel| username| message|
的数据框
我尝试遵循How to read records in JSON format from Kafka using Structured Streaming? 中接受的答案
struct = StructType([
StructField("channel", StringType()),
StructField("username", StringType()),
StructField("message", StringType()),
])
messageDFRaw.select(from_json("CAST(value AS STRING)", struct))
但是,我在from_json()
中得到Expected type 'StructField', got 'StructType' instead
【问题讨论】:
【参考方案1】:我忽略了from_json()
中的警告Expected type 'StructField', got 'StructType' instead
。
但是,我最初必须从 kafka 消息中转换值,然后再解析 json 模式。
messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
messageParsedDF = messageDF.select(from_json("value", struct_schema).alias("message"))
messageFlattenedDF = messageParsedDF.selectExpr("value.channel", "value.username", "value.message")
【讨论】:
以上是关于如何解析 pyspark 的 DataStreamReader 中的 json 字符串列并创建数据框的主要内容,如果未能解决你的问题,请参考以下文章
Flink第四篇之Flink的DataStream API(算子解析)
如何在 Pyspark 中以编程方式解析固定宽度的文本文件?
PySpark算子处理空间数据全解析: 如何在PySpark里面使用空间运算接口