将 JSON 加载到 Spark 数据框

Posted

技术标签:

【中文标题】将 JSON 加载到 Spark 数据框【英文标题】:Load JSON to Spark Dataframe 【发布时间】:2020-03-23 03:49:16 【问题描述】:

如何使用 python 将 json 字符串加载到 Spark Dataframe 中

例子:

sampleJson = [
 ('"user":1, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]',), 
 ('"user":2, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]',), 
 ('"user":3, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]',), 
 ('"user":4, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]',), 
 ('"user":5, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]',), 
 ('"user":6, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]',), 
]

将其加载到具有架构的数据框中的最佳方法是什么

user - int 
IP - String

任何建议都会有所帮助

【问题讨论】:

你想把每个 IP 放在单独的一行吗? @Shaido-ReinstateMonica - 是的,1 行应该有 1 个 IP。用户 1 将被重复 4 次,一次为 evbery IP 【参考方案1】:

package spark

import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.explode, col

object LoadJsonToDataFrame extends App 


  val sampleJson = """[
                     |  "user":1, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"],
                     |  "user":2, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"],
                     |  "user":3, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"],
                     |  "user":4, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"],
                     |  "user":5, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"],
                     |  "user":6, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]
                     |]""".stripMargin


  val spark = SparkSession.builder()
    .master("local")
    .appName("DataFrame-example")
    .getOrCreate()

  import spark.implicits._

  val df1 = spark.sparkContext.parallelize(Seq(sampleJson)).toDF("value")
  df1.show(false)
  val rdd = df1.rdd.map(_.getString(0))

  val ds = rdd.toDS
  val df = spark.read.json(ds)

  df.show(false)
//  +----------------------------------------+----+
//  |IP                                      |user|
//  +----------------------------------------+----+
//  |[10.0.0.1, 10.0.0.2, 10.0.0.3, 10.0.0.4]|1   |
//  |[10.0.0.1, 10.0.0.2, 10.0.0.3, 10.0.0.4]|2   |
//  |[10.0.0.1, 10.0.0.2, 10.0.0.3, 10.0.0.4]|3   |
//  |[10.0.0.1, 10.0.0.2, 10.0.0.3, 10.0.0.4]|4   |
//  |[10.0.0.1, 10.0.0.2, 10.0.0.3, 10.0.0.4]|5   |
//  |[10.0.0.1, 10.0.0.2, 10.0.0.3, 10.0.0.4]|6   |
//  +----------------------------------------+----+
  df.printSchema()
//  root
//  |-- IP: array (nullable = true)
//  |    |-- element: string (containsNull = true)
//  |-- user: long (nullable = true)

 // explode
      val explodeDF = df
        .withColumn("ipExplode", explode(col("IP")))
          .select('user, 'ipExplode)

      explodeDF.show(50, false)
//      +----+---------+
//      |user|ipExplode|
//      +----+---------+
//      |1   |10.0.0.1 |
//      |1   |10.0.0.2 |
//      |1   |10.0.0.3 |
//      |1   |10.0.0.4 |
//      |2   |10.0.0.1 |
//      |2   |10.0.0.2 |
//      |2   |10.0.0.3 |
//      |2   |10.0.0.4 |
//      |3   |10.0.0.1 |
//      |3   |10.0.0.2 |
//      |3   |10.0.0.3 |
//      |3   |10.0.0.4 |
//      |4   |10.0.0.1 |
//      |4   |10.0.0.2 |
//      |4   |10.0.0.3 |
//      |4   |10.0.0.4 |
//      |5   |10.0.0.1 |
//      |5   |10.0.0.2 |
//      |5   |10.0.0.3 |
//      |5   |10.0.0.4 |
//      |6   |10.0.0.1 |
//      |6   |10.0.0.2 |
//      |6   |10.0.0.3 |
//      |6   |10.0.0.4 |
//      +----+---------+


【讨论】:

能否将其转为每行 1 个 IP。每个用户将被重复 4 次,每个 IP 一次。 在我的回答中添加了带有爆炸性的示例。

以上是关于将 JSON 加载到 Spark 数据框的主要内容,如果未能解决你的问题,请参考以下文章

将 JSON 多行文件加载到 pyspark 数据框中

如何将 JSON 格式的单行 Spark 数据框分解为多行?

加快将 json 数据加载到数据框中

加载 json 文件以触发数据框

在 spark 数据框中使用 where 子句加载数据

将json查询结果加载到数据框