如何在pyspark上更改JSON结构?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在pyspark上更改JSON结构?相关的知识,希望对你有一定的参考价值。

我有两个由kafka读取的json文件,这是它们的printSchema()

JSON1 printSchema:

root
 |-- _id: string (nullable = true)
 |-- Data: string (nullable = true)
 |-- NomeAzienda: string (nullable = true)
 |-- Valori_Di_Borsa: struct (nullable = false)
 |    |-- PrezzoUltimoContratto: double (nullable = true)
 |    |-- Var%: double (nullable = true)
 |    |-- VarAssoluta: double (nullable = true)
 |    |-- OraUltimoContratto: string (nullable = true)
 |    |-- QuantitaUltimo: double (nullable = true)
 |    |-- QuantitaAcquisto: double (nullable = true)
 |    |-- QuantitaVendita: double (nullable = true)
 |    |-- QuantitaTotale: double (nullable = true)
 |    |-- NumeroContratti: double (nullable = true)
 |    |-- MaxOggi: double (nullable = true)
 |    |-- MinOggi: double (nullable = true)

JSON2 printSchema():

root
 |-- _id: string (nullable = true)
 |-- News: struct (nullable = false)
 |    |-- TitoloNews: string (nullable = true)
 |    |-- TestoNews: string (nullable = true)
 |    |-- DataNews: string (nullable = true)
 |    |-- OraNews: long (nullable = true)
 |    |-- SoggettoNews: string (nullable = true)

结合两个JSON,我得到这个printSchema():

root
 |-- _id: string (nullable = true)
 |-- Data: string (nullable = true)
 |-- NomeAzienda: string (nullable = true)
 |-- Valori_Di_Borsa: struct (nullable = false)
 |    |-- PrezzoUltimoContratto: double (nullable = true)
 |    |-- Var%: double (nullable = true)
 |    |-- VarAssoluta: double (nullable = true)
 |    |-- OraUltimoContratto: string (nullable = true)
 |    |-- QuantitaUltimo: double (nullable = true)
 |    |-- QuantitaAcquisto: double (nullable = true)
 |    |-- QuantitaVendita: double (nullable = true)
 |    |-- QuantitaTotale: double (nullable = true)
 |    |-- NumeroContratti: double (nullable = true)
 |    |-- MaxOggi: double (nullable = true)
 |    |-- MinOggi: double (nullable = true)
 |-- _id: string (nullable = true)
 |-- News: struct (nullable = false)
 |    |-- TitoloNews: string (nullable = true)
 |    |-- TestoNews: string (nullable = true)
 |    |-- DataNews: string (nullable = true)
 |    |-- OraNews: long (nullable = true)
 |    |-- SoggettoNews: string (nullable = true)

但是我想得到的结果是这样的:

更新根目录:

 -- _id: string (nullable = true)
 -- Data: string (nullable = true)
 -- NomeAzienda: string (nullable = true)
 -- Valori_Di_Borsa: struct (nullable = false)
     |-- PrezzoUltimoContratto: double (nullable = true)
     |-- Var%: double (nullable = true)
     |-- VarAssoluta: double (nullable = true)
     |-- OraUltimoContratto: string (nullable = true)
     |-- QuantitaUltimo: double (nullable = true)
     |-- QuantitaAcquisto: double (nullable = true)
     |-- QuantitaVendita: double (nullable = true)
     |-- QuantitaTotale: double (nullable = true)
     |-- NumeroContratti: double (nullable = true)
     |-- MaxOggi: double (nullable = true)
     |-- MinOggi: double (nullable = true)
     |-- News: struct (nullable = false)
                |-- id: string (nullable = true)
                |-- TitoloNews: string (nullable = true)
                |-- TestoNews: string (nullable = true)
                |-- DataNews: string (nullable = true)
                |-- OraNews: long (nullable = true)
                |-- SoggettoNews: string (nullable = true)

如何使用pyspark做到这一点?

这是我的代码:

   df_borsa = spark.readStream.format("kafka") \
                  .option("kafka.bootstrap.servers", kafka_broker) \
                  .option("startingOffsets", "latest") \
                  .option("subscribe","Be_borsa") \
                  .load() \
                  .selectExpr("CAST(value AS STRING)") 

   df_news = spark.readStream.format("kafka") \
                  .option("kafka.bootstrap.servers", kafka_broker) \
                  .option("startingOffsets", "latest") \
                  .option("subscribe","Ita_news") \
                  .load() \
                  .selectExpr("CAST(value AS STRING)") 

    df_borsa =df_borsa.withColumn("Valori_Di_Borsa",F.struct(F.col("PrezzoUltimoContratto"),F.col("Var%"),F.col("VarAssoluta"),F.col("OraUltimoContratto"),F.col("QuantitaUltimo"),F.col("QuantitaAcquisto"),F.col("QuantitaVendita"),F.col("QuantitaTotale"),F.col("NumeroContratti"),F.col("MaxOggi"),F.col("MinOggi")))

    df_borsa.printSchema()

    df_news = df_news.withColumn("News",F.struct(F.col("TitoloNews"),F.col("TestoNews"),F.col("DataNews"),F.col("OraNews"),F.col("SoggettoNews")))

    df_news.printSchema()

    df_join = df_borsa.join(df_news)

    df_join.printSchema()
答案

检查下面的代码。

提取结构Valori_Di_Borsa列,添加News列并重建结构。

df_join = df_borsa.join(df_news)
.withColumn("Valori_Di_Borsa",F.struct(F.col("Valori_Di_Borsa.*",F.col("News"))))

以上是关于如何在pyspark上更改JSON结构?的主要内容,如果未能解决你的问题,请参考以下文章

在 PySpark 中定义 JSON 模式结构的配置文件

如何更改pyspark中的并行任务数

如何在 AWS Glue pyspark 脚本中合并两个节点

使 Spark 结构化流中的 JSON 可以在 python (pyspark) 中作为没有 RDD 的数据帧访问

如何使用 Pyspark 创建列表 json?

如何在pyspark /中的结构内爆炸结构中的内部数组