将spark结构化流数据帧转换为JSON

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了将spark结构化流数据帧转换为JSON相关的知识,希望对你有一定的参考价值。

我正在使用具有以下结构的spark结构化流读取流:

col1
col2
col3

在一些转换后,我想以json格式将数据帧写入控制台。我正在尝试以下方法:

df.select(to_json($"*"))
      .writeStream
      .outputMode("append")
      .format("console")
      .start()

但我得到了Invalid usage of '*' in expression 'structstojson';

有没有办法将所有行连接到同一列,以便能够使用to_json

预期输出是一个数据框,其中一列在每行上都有json数据:

{"col1":"val11","col2":"val12","col3":"val13"}
{"col1":"val21","col2":"val22","col3":"val23"}
答案

qazxsw poi有以下定义:

to_json

这是我们的数据框:

def to_json(e: org.apache.spark.sql.Column): org.apache.spark.sql.Column
def to_json(e: org.apache.spark.sql.Column,options: java.util.Map[String,String]): org.apache.spark.sql.Column
def to_json(e: org.apache.spark.sql.Column,options: Map[String,String]): org.apache.spark.sql.Column

你需要创建一个df.show +----+----+----+ |col1|col2|col3| +----+----+----+ | a| b| c| | d| e| f| +----+----+----+ ,然后在上面调用struct。就像是 :

to_json

以上是关于将spark结构化流数据帧转换为JSON的主要内容,如果未能解决你的问题,请参考以下文章

Spark 数据框将嵌套的 JSON 转换为单独的列

使用 pyspark 将 spark 数据帧转换为嵌套 JSON

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

Spark:如何从 Spark 数据帧行解析和转换 json 字符串

如何将spark数据帧列名和行数据转换为json数据

通过将键作为列将 json 字典转换为 spark 数据帧