SparkSql处理嵌套json数据

Posted 郭小白

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了SparkSql处理嵌套json数据相关的知识,希望对你有一定的参考价值。

一、数据准备:

{
"dc_id": "dc-101",
"source": {
    "sensor-igauge": {
      "id": 10,
      "ip": "68.28.91.22",
      "description": "Sensor attached to the container ceilings",
      "temp":35,
      "c02_level": 1475,
      "geo": {"lat":38.00, "long":97.00}                        
    },
    "sensor-ipad": {
      "id": 13,
      "ip": "67.185.72.1",
      "description": "Sensor ipad attached to carbon cylinders",
      "temp": 34,
      "c02_level": 1370,
      "geo": {"lat":47.41, "long":-122.00}
    },
    "sensor-inest": {
      "id": 8,
      "ip": "208.109.163.218",
      "description": "Sensor attached to the factory ceilings",
      "temp": 40,
      "c02_level": 1346,
      "geo": {"lat":33.61, "long":-111.89}
    },
    "sensor-istick": {
      "id": 5,
      "ip": "204.116.105.67",
      "description": "Sensor embedded in exhaust pipes in the ceilings",
      "temp": 40,
      "c02_level": 1574,
      "geo": {"lat":35.93, "long":-85.46}
    }
  }
}

代码示例:

package spark.project_1

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, sql}

/**
  * Author Mr. Guo
  * Create 2018/10/19 - 14:36
  */
case class DeviceAlert(dcId: String, deviceType: String, ip: String, deviceId: Long, temp: Long, c02_level: Long,
                       lat: Double, lon: Double)

object dispose_json {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    val ssc = new sql.SparkSession
    .Builder()
      .config(conf)
      .master("local[2]")
      .appName("dispose_json")
      .getOrCreate()

    ssc.sparkContext.setLogLevel("error")
    println("--------------------------------------------------------------------")
    //导入隐式转换
    import ssc.implicits._
    val dataDS1 = Seq(
      """
        |{
        |"dc_id": "dc-101",
        |"source": {
        |    "sensor-igauge": {
        |      "id": 10,
        |      "ip": "68.28.91.22",
        |      "description": "Sensor attached to the container ceilings",
        |      "temp":35,
        |      "c02_level": 1475,
        |      "geo": {"lat":38.00, "long":97.00}
        |    },
        |    "sensor-ipad": {
        |      "id": 13,
        |      "ip": "67.185.72.1",
        |      "description": "Sensor ipad attached to carbon cylinders",
        |      "temp": 34,
        |      "c02_level": 1370,
        |      "geo": {"lat":47.41, "long":-122.00}
        |    },
        |    "sensor-inest": {
        |      "id": 8,
        |      "ip": "208.109.163.218",
        |      "description": "Sensor attached to the factory ceilings",
        |      "temp": 40,
        |      "c02_level": 1346,
        |      "geo": {"lat":33.61, "long":-111.89}
        |    },
        |    "sensor-istick": {
        |      "id": 5,
        |      "ip": "204.116.105.67",
        |      "description": "Sensor embedded in exhaust pipes in the ceilings",
        |      "temp": 40,
        |      "c02_level": 1574,
        |      "geo": {"lat":35.93, "long":-85.46}
        |    }
        |  }
        |}
      """.stripMargin).toDS()
    //定义schema
    val schema1 = new StructType()
      .add("dc_id", StringType)
      .add("source",
        MapType(StringType,
          new StructType()
            .add("description", StringType)
            .add("ip", StringType)
            .add("id", LongType)
            .add("temp", LongType)
            .add("c02_level", LongType)
            .add("geo",
              new StructType()
                .add("lat", DoubleType)
                .add("long", DoubleType)
            )
        )
      )
val df1 = ssc.read.schema(schema1).json(dataDS1.rdd)
    df1.printSchema()
    df1.show(false)
    println("=======================================")
    val explodeDF = df1.select($"dc_id", explode($"source"))
    explodeDF.printSchema()
    explodeDF.show(10, false)
    println("=======================================")
    val notifydevicesDS = explodeDF.select($"dc_id" as "dcId",
      $"key" as "deviceType",
      ‘value.getItem("ip") as ‘ip,
      ‘value.getItem("id") as ‘deviceId,
      ‘value.getItem("c02_level") as ‘c02_level,
      ‘value.getItem("temp") as ‘temp,
      ‘value.getItem("geo").getItem("lat") as ‘lat,
      ‘value.getItem("geo").getItem("long") as ‘lon)
      .as[DeviceAlert]
    notifydevicesDS.printSchema()
    notifydevicesDS.show(20, false)

ssc.stop()

二、数据准备

{
  "devices": {
     "thermostats": {
        "peyiJNo0IldT2YlIVtYaGQ": {
          "device_id": "peyiJNo0IldT2YlIVtYaGQ",
          "locale": "en-US",
          "software_version": "4.0",
          "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
          "where_name": "Hallway Upstairs",
          "last_connection": "2016-10-31T23:59:59.000Z",
          "is_online": true,
          "can_cool": true,
          "can_heat": true,
          "is_using_emergency_heat": true,
          "has_fan": true,
          "fan_timer_active": true,
          "fan_timer_timeout": "2016-10-31T23:59:59.000Z",
          "temperature_scale": "F",
          "target_temperature_f": 72,
          "target_temperature_high_f": 80,
          "target_temperature_low_f": 65,
          "eco_temperature_high_f": 80,
          "eco_temperature_low_f": 65,
          "away_temperature_high_f": 80,
          "away_temperature_low_f": 65,
          "hvac_mode": "heat",
          "humidity": 40,
          "hvac_state": "heating",
          "is_locked": true,
          "locked_temp_min_f": 65,
          "locked_temp_max_f": 80
          }
        },
        "smoke_co_alarms": {
          "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {
            "device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
            "locale": "en-US",
            "software_version": "1.01",
            "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
            "where_name": "Jane‘s Room",
            "last_connection": "2016-10-31T23:59:59.000Z",
            "is_online": true,
            "battery_health": "ok",
            "co_alarm_state": "ok",
            "smoke_alarm_state": "ok",
            "is_manual_test_active": true,
            "last_manual_test_time": "2016-10-31T23:59:59.000Z",
            "ui_color_state": "gray"
            }
          },
       "cameras": {
        "awJo6rH0IldT2YlIVtYaGQ": {
          "device_id": "awJo6rH",
          "software_version": "4.0",
          "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
          "where_name": "Foyer",
          "is_online": true,
          "is_streaming": true,
          "is_audio_input_enabled": true,
          "last_is_online_change": "2016-12-29T18:42:00.000Z",
          "is_video_history_enabled": true,
          "web_url": "https://home.nest.com/cameras/device_id?auth=access_token",
          "app_url": "nestmobile://cameras/device_id?auth=access_token",
          "is_public_share_enabled": true,
          "activity_zones": { "name": "Walkway", "id": 244083 },
          "last_event": "2016-10-31T23:59:59.000Z"
          }
        }
      }
     }

代码示例:

package spark.project_1

import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.{SparkConf, sql}

/**
  * Author Mr. Guo
  * Create 2018/10/19 - 14:36
  */
case class DeviceAlert(dcId: String, deviceType: String, ip: String, deviceId: Long, temp: Long, c02_level: Long,
                       lat: Double, lon: Double)

object dispose_json {
  def main(args: Array[String]): Unit = {

    val conf = new SparkConf()
    val ssc = new sql.SparkSession
    .Builder()
      .config(conf)
      .master("local[2]")
      .appName("dispose_json")
      .getOrCreate()

    ssc.sparkContext.setLogLevel("error")
    println("--------------------------------------------------------------------")
    //导入隐式转换
    import ssc.implicits._
    val dataDS2 = Seq(
      """
        |{
        |  "devices": {
        |     "thermostats": {
        |        "peyiJNo0IldT2YlIVtYaGQ": {
        |          "device_id": "peyiJNo0IldT2YlIVtYaGQ",
        |          "locale": "en-US",
        |          "software_version": "4.0",
        |          "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
        |          "where_name": "Hallway Upstairs",
        |          "last_connection": "2016-10-31T23:59:59.000Z",
        |          "is_online": true,
        |          "can_cool": true,
        |          "can_heat": true,
        |          "is_using_emergency_heat": true,
        |          "has_fan": true,
        |          "fan_timer_active": true,
        |          "fan_timer_timeout": "2016-10-31T23:59:59.000Z",
        |          "temperature_scale": "F",
        |          "target_temperature_f": 72,
        |          "target_temperature_high_f": 80,
        |          "target_temperature_low_f": 65,
        |          "eco_temperature_high_f": 80,
        |          "eco_temperature_low_f": 65,
        |          "away_temperature_high_f": 80,
        |          "away_temperature_low_f": 65,
        |          "hvac_mode": "heat",
        |          "humidity": 40,
        |          "hvac_state": "heating",
        |          "is_locked": true,
        |          "locked_temp_min_f": 65,
        |          "locked_temp_max_f": 80
        |          }
        |        },
        |        "smoke_co_alarms": {
        |          "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs": {
        |            "device_id": "RTMTKxsQTCxzVcsySOHPxKoF4OyCifrs",
        |            "locale": "en-US",
        |            "software_version": "1.01",
        |            "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
        |            "where_name": "Jane‘s Room",
        |            "last_connection": "2016-10-31T23:59:59.000Z",
        |            "is_online": true,
        |            "battery_health": "ok",
        |            "co_alarm_state": "ok",
        |            "smoke_alarm_state": "ok",
        |            "is_manual_test_active": true,
        |            "last_manual_test_time": "2016-10-31T23:59:59.000Z",
        |            "ui_color_state": "gray"
        |            }
        |          },
        |       "cameras": {
        |        "awJo6rH0IldT2YlIVtYaGQ": {
        |          "device_id": "awJo6rH",
        |          "software_version": "4.0",
        |          "structure_id": "VqFabWH21nwVyd4RWgJgNb292wa7hG_dUwo2i2SG7j3-BOLY0BA4sw",
        |          "where_name": "Foyer",
        |          "is_online": true,
        |          "is_streaming": true,
        |          "is_audio_input_enabled": true,
        |          "last_is_online_change": "2016-12-29T18:42:00.000Z",
        |          "is_video_history_enabled": true,
        |          "web_url": "https://home.nest.com/cameras/device_id?auth=access_token",
        |          "app_url": "nestmobile://cameras/device_id?auth=access_token",
        |          "is_public_share_enabled": true,
        |          "activity_zones": { "name": "Walkway", "id": 244083 },
        |          "last_event": "2016-10-31T23:59:59.000Z"
        |          }
        |        }
        |      }
        |     }
      """.stripMargin).toDS()

val schmea2 = new StructType()
      .add("devices",
        new StructType()
          .add("thermostats", MapType(StringType,
            new StructType()
              .add("device_id", StringType)
              .add("locale", StringType)
              .add("software_version", StringType)
              .add("structure_id", StringType)
              .add("where_name", StringType)
              .add("last_connection", StringType)
              .add("is_online", BooleanType)
              .add("can_cool", BooleanType)
              .add("can_heat", BooleanType)
              .add("is_using_emergency_heat", BooleanType)
              .add("has_fan", BooleanType)
              .add("fan_timer_active", BooleanType)
              .add("fan_timer_timeout", StringType)
              .add("temperature_scale", StringType)
              .add("target_temperature_f", DoubleType)
              .add("target_temperature_high_f", DoubleType)
              .add("target_temperature_low_f", DoubleType)
              .add("eco_temperature_high_f", DoubleType)
              .add("eco_temperature_low_f", DoubleType)
              .add("away_temperature_high_f", DoubleType)
              .add("away_temperature_low_f", DoubleType)
              .add("hvac_mode", StringType)
              .add("humidity", DoubleType)
              .add("hvac_state", StringType)
              .add("is_locked", BooleanType)
              .add("locked_temp_min_f", DoubleType)
              .add("locked_temp_max_f", DoubleType)))
          .add("smoke_co_alarms", MapType(StringType,
            new StructType()
              .add("device_id", StringType)
              .add("locale", StringType)
              .add("software_version", StringType)
              .add("structure_id", StringType)
              .add("where_name", StringType)
              .add("last_connection", StringType)
              .add("is_online", BooleanType)
              .add("battery_health", StringType)
              .add("co_alarm_state", StringType)
              .add("smoke_alarm_state", StringType)
              .add("is_manual_test_active", BooleanType)
              .add("last_manual_test_time", StringType)
              .add("ui_color_state", StringType)))
          .add("cameras", MapType(StringType,
            new StructType()
              .add("device_id", StringType)
              .add("software_version", StringType)
              .add("structure_id", StringType)
              .add("where_name", StringType)
              .add("is_online", BooleanType)
              .add("is_streaming", BooleanType)
              .add("is_audio_input_enabled", BooleanType)
              .add("last_is_online_change", StringType)
              .add("is_video_history_enabled", BooleanType)
              .add("web_url", StringType)
              .add("app_url", StringType)
              .add("is_public_share_enabled", BooleanType)
              .add("activity_zones",
                new StructType()
                  .add("name", StringType)
                  .add("id", LongType))
              .add("last_event", StringType))))

val df2 = ssc.read.schema(schmea2).json(dataDS2.rdd)
    val stringJsonDF = df2.select(to_json(struct($"*"))).toDF("nestDevice")
    val mapColumnsDF = df2.select($"devices".getItem("smoke_co_alarms").alias("smoke_alarms"),
      $"devices".getItem("cameras").alias("cameras"),
      $"devices".getItem("thermostats").alias("thermostats"))

    val explodeThermostatsDF = mapColumnsDF.select(explode($"thermostats"))
    val explodeCamerasDF = mapColumnsDF.select(explode($"cameras"))
    val explodeSmokedAlarmsDF = df2.select(explode($"devices.smoke_co_alarms"))
    explodeSmokedAlarmsDF.printSchema()

    val thermostateDF =
      explodeThermostatsDF.select($"value".getItem("device_id").alias("device_id"),
        $"value".getItem("locale").alias("locale"),
        $"value".getItem("where_name").alias("location"),
        $"value".getItem("last_connection").alias("last_connection"),
        $"value".getItem("humidity").alias("humidity"),
        $"value".getItem("target_temperature_f").alias("target_temperature_f"),
        $"value".getItem("hvac_mode").alias("mode"),
        $"value".getItem("software_version").alias("version"))

    val cameraDF =
      explodeCamerasDF.select($"value".getItem("device_id").alias("device_id"),
        $"value".getItem("where_name").alias("location"),
        $"value".getItem("software_version").alias("version"),
        $"value".getItem("activity_zones").getItem("name").alias("name"),
        $"value".getItem("activity_zones").getItem("id").alias("id"))

    val smokedAlarmsDF =
      explodeSmokedAlarmsDF.select($"value".getItem("device_id").alias("device_id"),
        $"value".getItem("where_name").alias("location"),
        $"value".getItem("software_version").alias("version"),
        $"value".getItem("last_connection").alias("last_connected"),
        $"value".getItem("battery_health").alias("battery_health"))

    cameraDF.show

    val joineDFs = thermostateDF.join(cameraDF,"version")
    joineDFs.show(10,false)

    ssc.stop()

  

以上是关于SparkSql处理嵌套json数据的主要内容,如果未能解决你的问题,请参考以下文章

Spark SQL 嵌套 JSON 错误“输入时没有可行的替代方案”

Spark高级操作之json复杂和嵌套数据结构的操作一

sparksql系列 Json转Map,多文件生成

Spark SQL大数据处理并写入Elasticsearch

sparksql仅包含英文字母的数据

如何在 Ios 中解析数组数据中的嵌套 Json 对象