如何解析 pyspark 的 DataStreamReader 中的 json 字符串列并创建数据框

Posted

技术标签:

【中文标题】如何解析 pyspark 的 DataStreamReader 中的 json 字符串列并创建数据框【英文标题】:How to parse a json string column in pyspark's DataStreamReader and create a Data Frame 【发布时间】:2019-02-15 01:21:48 【问题描述】:

我正在阅读来自 kafka 主题的消息

messageDFRaw = spark.readStream\
                    .format("kafka")\
                    .option("kafka.bootstrap.servers", "localhost:9092")\
                    .option("subscribe", "test-message")\
                    .load()

messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING) as dict")

当我从上述查询中打印数据框时,我得到以下控制台输出。

|key|dict|
|#badbunny |"channel": "#badbunny", "username": "mgat22", "message": "cool"|

如何从 DataStreamReader 创建一个数据框,以便我有一个列为 |key|channel| username| message| 的数据框

我尝试遵循How to read records in JSON format from Kafka using Structured Streaming? 中接受的答案

struct = StructType([
    StructField("channel", StringType()),
    StructField("username", StringType()),
    StructField("message", StringType()),
])

messageDFRaw.select(from_json("CAST(value AS STRING)", struct))

但是,我在from_json() 中得到Expected type 'StructField', got 'StructType' instead

【问题讨论】:

【参考方案1】:

我忽略了from_json() 中的警告Expected type 'StructField', got 'StructType' instead

但是,我最初必须从 kafka 消息中转换值,然后再解析 json 模式。

messageDF = messageDFRaw.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")

messageParsedDF = messageDF.select(from_json("value", struct_schema).alias("message"))

messageFlattenedDF = messageParsedDF.selectExpr("value.channel", "value.username", "value.message")

【讨论】:

以上是关于如何解析 pyspark 的 DataStreamReader 中的 json 字符串列并创建数据框的主要内容,如果未能解决你的问题,请参考以下文章

Flink第四篇之Flink的DataStream API(算子解析)

如何在 Pyspark 中以编程方式解析固定宽度的文本文件?

PySpark算子处理空间数据全解析: 如何在PySpark里面使用空间运算接口

如何解析sql语句插入以使用pyspark获取值

如何解析 pyspark 的 DataStreamReader 中的 json 字符串列并创建数据框

如何解析pyspark中的空格分隔数据?