如何将包含 JSON 的输入 CSV 数据转换为 spark 数据集?

Posted

技术标签:

【中文标题】如何将包含 JSON 的输入 CSV 数据转换为 spark 数据集?【英文标题】:How to transform input CSV data containing JSON into a spark Dataset? 【发布时间】:2019-01-29 22:41:56 【问题描述】:

我有一个包含以下数据的 CSV:

dept|emp_json|location
finance| "employee":["name":"firstName":"John","lasteName":"Doe","address":"street":"1234 West Broad St","unit":"8505","city":"Columbus","name":"firstName":"Alex","lasteName":"Messi","address":"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"]|OH


我能够读取文件并创建数据集并提取 Json 列:

Dataset<Row> empDetlsDS = sparkSession.read().option("header", "true").option(delimiter, "|").schema(mySchema).csv(inputCSVPath);
Dataset<Row> empDetlsJsonDocDS = empDetlsDS.select(emp_json);

我想展平 JSON 并创建一个输出数据集,其中员工数组中存在以下格式的行数:

dept    |emp_name   |emp_address              |emp_city|location  |
---------------------------------------------------------------
finance |John Doe   |1234 West Broad St 8505  |Columbus|OH        |
finance |Alex Messi |4321 North Meecham Rd 300|Salinas |OH        |
-------------------------------------------------------------------


如果有人对使用 Java 和 Spark 有任何建议,请提供帮助。提前致谢。

【问题讨论】:

有多少行数据?如果它只是一行,那么在读取 csv 之后,您可以选择 json 列转换为 rdd 并将其提供给 spark.read.json 然后添加几列带有 Dept 和 location 的文字。如果有很多,那么您可能需要使用键/值 rdd 嗨@sramalingam24 感谢您的回复。我有 100 万条记录要处理,我想用 Java 和 spark 来处理。 我不是java人,但我假设它应该很容易从scala翻译,这里有一些scala中的例子,你可以看看docs.databricks.com/spark/latest/dataframes-datasets/… 【参考方案1】:

@tkkman 这是我所说的scala方式。 rdd 方式已被弃用,现在推荐使用 DataSet 方式,所以在 Java 中应该很简单

import spark.implicits._
import org.apache.spark.sql.functions._

val df = spark.read.option("delimiter","|").option("header","true").csv("/FileStore/tables/test.txt")

val jdf = spark.read.json(df.select("emp_json").rdd.map(_.toString)).select(explode($"employee").alias("emp"))
.select($"emp.name.firstName",$"emp.name.lasteName",$"emp.address.street",$"emp.address.unit",$"emp.address.city")

jdf.printSchema

jdf.withColumn("dept", lit("finance")).withColumn("city",lit("OH")).show(false)

+---------+---------+---------------------+----+----+-------+
|firstName|lasteName|street               |unit|city|dept   |
+---------+---------+---------------------+----+----+-------+
|John     |Doe      |1234 West Broad St   |8505|OH  |finance|
|Alex     |Messi    |4321 North Meecham Rd|300 |OH  |finance|
+---------+---------+---------------------+----+----+-------+

【讨论】:

【参考方案2】:

在 Java 中,你可以这样做:

package net.jgp.books.sparkInAction.ch12.lab950CsvWithEmbdeddedJson;

import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.lit;

import java.io.Serializable;

import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;

/**
 * Ingesting a CSV with embedded JSON.
 * 
 * @author jgp
 */
public class CsvWithEmbdeddedJsonApp implements Serializable 
  private static final long serialVersionUID = 19711L;

  /**
   * Turns a Row into JSON. NOt very fail safe, but done to illustrate.
   * 
   * @author jgp
   */
  private final class Jsonifier
      implements MapFunction<Row, String> 
    private static final long serialVersionUID = 19712L;

    @Override
    public String call(Row r) throws Exception 
      StringBuffer sb = new StringBuffer();
      sb.append(" \"dept\": \"");
      sb.append(r.getString(0));
      sb.append("\",");
      String s = r.getString(1).toString();
      if (s != null) 
        s = s.trim();
        if (s.charAt(0) == '') 
          s = s.substring(1, s.length() - 1);
        
      
      sb.append(s);
      sb.append(", \"location\": \"");
      sb.append(r.getString(2));
      sb.append("\"");
      return sb.toString();
    
  

  /**
   * main() is your entry point to the application.
   * 
   * @param args
   */
  public static void main(String[] args) 
    CsvWithEmbdeddedJsonApp app = new CsvWithEmbdeddedJsonApp();
    app.start();
  

  /**
   * The processing code.
   */
  private void start() 
    // Creates a session on a local master
    SparkSession spark = SparkSession.builder()
        .appName("Processing of invoices")
        .master("local[*]")
        .getOrCreate();

    Dataset<Row> df = spark
        .read()
        .option("header", "true")
        .option("delimiter", "|")
        .csv("data/misc/csv_with_embedded_json.csv");
    df.show(5, false);
    df.printSchema();

    Dataset<String> ds = df.map(
        new Jsonifier(),
        Encoders.STRING());
    ds.show(5, false);
    ds.printSchema();

    Dataset<Row> dfJson = spark.read().json(ds);
    dfJson.show(5, false);
    dfJson.printSchema();

    dfJson = dfJson
        .withColumn("emp", explode(dfJson.col("employee")))
        .drop("employee");
    dfJson.show(5, false);
    dfJson.printSchema();

    dfJson = dfJson
        .withColumn("emp_name",
            concat(
                dfJson.col("emp.name.firstName"),
                lit(" "),
                dfJson.col("emp.name.lasteName")))
        .withColumn("emp_address",
            concat(dfJson.col("emp.address.street"),
                lit(" "),
                dfJson.col("emp.address.unit")))
        .withColumn("emp_city", dfJson.col("emp.address.city"))
        .drop("emp");
    dfJson.show(5, false);
    dfJson.printSchema();
  

像往常一样,Java 非常冗长 :) - 我不抱怨。我留下了很多 printSchema() 和 show() 来说明构建过程。 Jsonifierclass 可以以一种更好、更通用的方式实现,但它提供了一个想法(如果需要,您也可以将其作为 lambda 实现)。

输出是:

+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|dept   |emp_json                                                                                                                                                                                                                                                              |location|
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|finance| "employee":["name":"firstName":"John","lasteName":"Doe","address":"street":"1234 West Broad St","unit":"8505","city":"Columbus","name":"firstName":"Alex","lasteName":"Messi","address":"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"]|OH      |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+

root
 |-- dept: string (nullable = true)
 |-- emp_json: string (nullable = true)
 |-- location: string (nullable = true)

+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value                                                                                                                                                                                                                                                                                                      |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "dept": "finance", "employee":["name":"firstName":"John","lasteName":"Doe","address":"street":"1234 West Broad St","unit":"8505","city":"Columbus","name":"firstName":"Alex","lasteName":"Messi","address":"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"], "location": "OH"|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+

root
 |-- value: string (nullable = true)

+-------+-------------------------------------------------------------------------------------------------------------+--------+
|dept   |employee                                                                                                     |location|
+-------+-------------------------------------------------------------------------------------------------------------+--------+
|finance|[[[Columbus, 1234 West Broad St, 8505], [John, Doe]], [[Salinas, 4321 North Meecham Rd, 300], [Alex, Messi]]]|OH      |
+-------+-------------------------------------------------------------------------------------------------------------+--------+

root
 |-- dept: string (nullable = true)
 |-- employee: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- address: struct (nullable = true)
 |    |    |    |-- city: string (nullable = true)
 |    |    |    |-- street: string (nullable = true)
 |    |    |    |-- unit: string (nullable = true)
 |    |    |-- name: struct (nullable = true)
 |    |    |    |-- firstName: string (nullable = true)
 |    |    |    |-- lasteName: string (nullable = true)
 |-- location: string (nullable = true)

+-------+--------+------------------------------------------------------+
|dept   |location|emp                                                   |
+-------+--------+------------------------------------------------------+
|finance|OH      |[[Columbus, 1234 West Broad St, 8505], [John, Doe]]   |
|finance|OH      |[[Salinas, 4321 North Meecham Rd, 300], [Alex, Messi]]|
+-------+--------+------------------------------------------------------+

root
 |-- dept: string (nullable = true)
 |-- location: string (nullable = true)
 |-- emp: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- street: string (nullable = true)
 |    |    |-- unit: string (nullable = true)
 |    |-- name: struct (nullable = true)
 |    |    |-- firstName: string (nullable = true)
 |    |    |-- lasteName: string (nullable = true)

+-------+--------+----------+-------------------------+--------+
|dept   |location|emp_name  |emp_address              |emp_city|
+-------+--------+----------+-------------------------+--------+
|finance|OH      |John Doe  |1234 West Broad St 8505  |Columbus|
|finance|OH      |Alex Messi|4321 North Meecham Rd 300|Salinas |
+-------+--------+----------+-------------------------+--------+

root
 |-- dept: string (nullable = true)
 |-- location: string (nullable = true)
 |-- emp_name: string (nullable = true)
 |-- emp_address: string (nullable = true)
 |-- emp_city: string (nullable = true)

【讨论】:

感谢您的帮助。我使用 flatmap 函数传递行并将 json 列扩展为多列作为输出,然后根据某些条件从一行中扩展多行。【参考方案3】:

如果你有一个正确的 json 模式,那么你可以使用 explode 方法,然后使用点运算符选择你想要的列。 (例如:emp_json.name,emp_json.address...)

示例代码

val flatJSON = df.select($"dept", explode($"emp_json").as("emp))

flatJSON.select("dept", "emp.name","emp.address")

【讨论】:

【参考方案4】:

看看这个:

scala> val df = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("/tmp/stack/tkkman.csv")
df: org.apache.spark.sql.DataFrame = [dept: string, emp_json: string ... 1 more field]

scala> df.show(false)
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|dept   |emp_json                                                                                                                                                                                                                                                              |location|
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|finance| "employee":["name":"firstName":"John","lasteName":"Doe","address":"street":"1234 West Broad St","unit":"8505","city":"Columbus","name":"firstName":"Alex","lasteName":"Messi","address":"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"]|OH      |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+


scala> df.printSchema
root
 |-- dept: string (nullable = true)
 |-- emp_json: string (nullable = true)
 |-- location: string (nullable = true)

scala> val jsonstr = """ "employee":["name":"firstName":"John","lasteName":"Doe","address":"street":"1234 West Broad St","unit":"8505","city":"Columbus","name":"firstName":"Alex","lasteName":"Messi","address":"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"]"""
jsonstr: String =  "employee":["name":"firstName":"John","lasteName":"Doe","address":"street":"1234 West Broad St","unit":"8505","city":"Columbus","name":"firstName":"Alex","lasteName":"Messi","address":"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"]

scala> val dfj = spark.read.json(Seq(jsonstr).toDS)
dfj: org.apache.spark.sql.DataFrame = [employee: array<struct<address:struct<city:string,street:string,unit:string>,name:struct<firstName:string,lasteName:string>>>]

scala> dfj.show(false)
+-------------------------------------------------------------------------------------------------------------+
|employee                                                                                                     |
+-------------------------------------------------------------------------------------------------------------+
|[[[Columbus, 1234 West Broad St, 8505], [John, Doe]], [[Salinas, 4321 North Meecham Rd, 300], [Alex, Messi]]]|
+-------------------------------------------------------------------------------------------------------------+


scala> dfj.schema
res51: org.apache.spark.sql.types.StructType = StructType(StructField(employee,ArrayType(StructType(StructField(address,StructType(StructField(city,StringType,true), StructField(street,StringType,true), StructField(unit,StringType,true)),true), StructField(name,StructType(StructField(firstName,StringType,true), StructField(lasteName,StringType,true)),true)),true),true))


scala> val sch_emp = dfj.schema
sch_emp: org.apache.spark.sql.types.StructType = StructType(StructField(employee,ArrayType(StructType(StructField(address,StructType(StructField(city,StringType,true), StructField(street,StringType,true), StructField(unit,StringType,true)),true), StructField(name,StructType(StructField(firstName,StringType,true), StructField(lasteName,StringType,true)),true)),true),true))

scala> val df2 = df.select(col("*"),from_json('emp_json,sch_emp).as("emp"))
df2: org.apache.spark.sql.DataFrame = [dept: string, emp_json: string ... 2 more fields]

scala> df2.select(explode($"emp.employee")).printSchema
root
 |-- col: struct (nullable = true)
 |    |-- address: struct (nullable = true)
 |    |    |-- city: string (nullable = true)
 |    |    |-- street: string (nullable = true)
 |    |    |-- unit: string (nullable = true)
 |    |-- name: struct (nullable = true)
 |    |    |-- firstName: string (nullable = true)
 |    |    |-- lasteName: string (nullable = true)


scala> df2.select(col("*"),explode($"emp.employee").as("emp2")).select('dept,concat($"emp2.name.firstName",lit(" "),$"emp2.name.lasteName").as("emp_name"),$"emp2.address.street" as "emp_address", $"emp2.address.city" as "emp_city", 'location).show(false)
+-------+----------+---------------------+--------+--------+
|dept   |emp_name  |emp_address          |emp_city|location|
+-------+----------+---------------------+--------+--------+
|finance|John Doe  |1234 West Broad St   |Columbus|OH      |
|finance|Alex Messi|4321 North Meecham Rd|Salinas |OH      |
+-------+----------+---------------------+--------+--------+


scala>

【讨论】:

以上是关于如何将包含 JSON 的输入 CSV 数据转换为 spark 数据集?的主要内容,如果未能解决你的问题,请参考以下文章

如何将CSV格式转换成JSON格式

将其字段中带有逗号的 .csv 文件转换为 JSON/TXT

如何将 JSON 转换为 CSV 格式并存储在变量中

如何使用 jq 将任意简单 JSON 转换为 CSV?

如何将 csv 转换为 json 并编写特定函数以使用 javascript 生成图形?

csv转换obj