将 JSON 加载到 Spark 数据框
Posted
技术标签:
【中文标题】将 JSON 加载到 Spark 数据框【英文标题】:Load JSON to Spark Dataframe 【发布时间】:2020-03-23 03:49:16 【问题描述】:如何使用 python 将 json 字符串加载到 Spark Dataframe 中
例子:
sampleJson = [
('"user":1, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]',),
('"user":2, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]',),
('"user":3, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]',),
('"user":4, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]',),
('"user":5, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]',),
('"user":6, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]',),
]
将其加载到具有架构的数据框中的最佳方法是什么
user - int
IP - String
任何建议都会有所帮助
【问题讨论】:
你想把每个 IP 放在单独的一行吗? @Shaido-ReinstateMonica - 是的,1 行应该有 1 个 IP。用户 1 将被重复 4 次,一次为 evbery IP 【参考方案1】:package spark
import org.apache.spark.sql.SparkSession
import org.apache.spark.sql.functions.explode, col
object LoadJsonToDataFrame extends App
val sampleJson = """[
| "user":1, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"],
| "user":2, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"],
| "user":3, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"],
| "user":4, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"],
| "user":5, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"],
| "user":6, "IP" :["10.0.0.1", "10.0.0.2", "10.0.0.3", "10.0.0.4"]
|]""".stripMargin
val spark = SparkSession.builder()
.master("local")
.appName("DataFrame-example")
.getOrCreate()
import spark.implicits._
val df1 = spark.sparkContext.parallelize(Seq(sampleJson)).toDF("value")
df1.show(false)
val rdd = df1.rdd.map(_.getString(0))
val ds = rdd.toDS
val df = spark.read.json(ds)
df.show(false)
// +----------------------------------------+----+
// |IP |user|
// +----------------------------------------+----+
// |[10.0.0.1, 10.0.0.2, 10.0.0.3, 10.0.0.4]|1 |
// |[10.0.0.1, 10.0.0.2, 10.0.0.3, 10.0.0.4]|2 |
// |[10.0.0.1, 10.0.0.2, 10.0.0.3, 10.0.0.4]|3 |
// |[10.0.0.1, 10.0.0.2, 10.0.0.3, 10.0.0.4]|4 |
// |[10.0.0.1, 10.0.0.2, 10.0.0.3, 10.0.0.4]|5 |
// |[10.0.0.1, 10.0.0.2, 10.0.0.3, 10.0.0.4]|6 |
// +----------------------------------------+----+
df.printSchema()
// root
// |-- IP: array (nullable = true)
// | |-- element: string (containsNull = true)
// |-- user: long (nullable = true)
// explode
val explodeDF = df
.withColumn("ipExplode", explode(col("IP")))
.select('user, 'ipExplode)
explodeDF.show(50, false)
// +----+---------+
// |user|ipExplode|
// +----+---------+
// |1 |10.0.0.1 |
// |1 |10.0.0.2 |
// |1 |10.0.0.3 |
// |1 |10.0.0.4 |
// |2 |10.0.0.1 |
// |2 |10.0.0.2 |
// |2 |10.0.0.3 |
// |2 |10.0.0.4 |
// |3 |10.0.0.1 |
// |3 |10.0.0.2 |
// |3 |10.0.0.3 |
// |3 |10.0.0.4 |
// |4 |10.0.0.1 |
// |4 |10.0.0.2 |
// |4 |10.0.0.3 |
// |4 |10.0.0.4 |
// |5 |10.0.0.1 |
// |5 |10.0.0.2 |
// |5 |10.0.0.3 |
// |5 |10.0.0.4 |
// |6 |10.0.0.1 |
// |6 |10.0.0.2 |
// |6 |10.0.0.3 |
// |6 |10.0.0.4 |
// +----+---------+
【讨论】:
能否将其转为每行 1 个 IP。每个用户将被重复 4 次,每个 IP 一次。 在我的回答中添加了带有爆炸性的示例。以上是关于将 JSON 加载到 Spark 数据框的主要内容,如果未能解决你的问题,请参考以下文章