如何将包含 JSON 的输入 CSV 数据转换为 spark 数据集?
Posted
技术标签:
【中文标题】如何将包含 JSON 的输入 CSV 数据转换为 spark 数据集?【英文标题】:How to transform input CSV data containing JSON into a spark Dataset? 【发布时间】:2019-01-29 22:41:56 【问题描述】:我有一个包含以下数据的 CSV:
dept|emp_json|location
finance| "employee":["name":"firstName":"John","lasteName":"Doe","address":"street":"1234 West Broad St","unit":"8505","city":"Columbus","name":"firstName":"Alex","lasteName":"Messi","address":"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"]|OH
我能够读取文件并创建数据集并提取 Json 列:
Dataset<Row> empDetlsDS = sparkSession.read().option("header", "true").option(delimiter, "|").schema(mySchema).csv(inputCSVPath);
Dataset<Row> empDetlsJsonDocDS = empDetlsDS.select(emp_json);
我想展平 JSON 并创建一个输出数据集,其中员工数组中存在以下格式的行数:
dept |emp_name |emp_address |emp_city|location |
---------------------------------------------------------------
finance |John Doe |1234 West Broad St 8505 |Columbus|OH |
finance |Alex Messi |4321 North Meecham Rd 300|Salinas |OH |
-------------------------------------------------------------------
如果有人对使用 Java 和 Spark 有任何建议,请提供帮助。提前致谢。
【问题讨论】:
有多少行数据?如果它只是一行,那么在读取 csv 之后,您可以选择 json 列转换为 rdd 并将其提供给 spark.read.json 然后添加几列带有 Dept 和 location 的文字。如果有很多,那么您可能需要使用键/值 rdd 嗨@sramalingam24 感谢您的回复。我有 100 万条记录要处理,我想用 Java 和 spark 来处理。 我不是java人,但我假设它应该很容易从scala翻译,这里有一些scala中的例子,你可以看看docs.databricks.com/spark/latest/dataframes-datasets/… 【参考方案1】:@tkkman 这是我所说的scala方式。 rdd 方式已被弃用,现在推荐使用 DataSet 方式,所以在 Java 中应该很简单
import spark.implicits._
import org.apache.spark.sql.functions._
val df = spark.read.option("delimiter","|").option("header","true").csv("/FileStore/tables/test.txt")
val jdf = spark.read.json(df.select("emp_json").rdd.map(_.toString)).select(explode($"employee").alias("emp"))
.select($"emp.name.firstName",$"emp.name.lasteName",$"emp.address.street",$"emp.address.unit",$"emp.address.city")
jdf.printSchema
jdf.withColumn("dept", lit("finance")).withColumn("city",lit("OH")).show(false)
+---------+---------+---------------------+----+----+-------+
|firstName|lasteName|street |unit|city|dept |
+---------+---------+---------------------+----+----+-------+
|John |Doe |1234 West Broad St |8505|OH |finance|
|Alex |Messi |4321 North Meecham Rd|300 |OH |finance|
+---------+---------+---------------------+----+----+-------+
【讨论】:
【参考方案2】:在 Java 中,你可以这样做:
package net.jgp.books.sparkInAction.ch12.lab950CsvWithEmbdeddedJson;
import static org.apache.spark.sql.functions.concat;
import static org.apache.spark.sql.functions.explode;
import static org.apache.spark.sql.functions.lit;
import java.io.Serializable;
import org.apache.spark.api.java.function.MapFunction;
import org.apache.spark.sql.Dataset;
import org.apache.spark.sql.Encoders;
import org.apache.spark.sql.Row;
import org.apache.spark.sql.SparkSession;
/**
* Ingesting a CSV with embedded JSON.
*
* @author jgp
*/
public class CsvWithEmbdeddedJsonApp implements Serializable
private static final long serialVersionUID = 19711L;
/**
* Turns a Row into JSON. NOt very fail safe, but done to illustrate.
*
* @author jgp
*/
private final class Jsonifier
implements MapFunction<Row, String>
private static final long serialVersionUID = 19712L;
@Override
public String call(Row r) throws Exception
StringBuffer sb = new StringBuffer();
sb.append(" \"dept\": \"");
sb.append(r.getString(0));
sb.append("\",");
String s = r.getString(1).toString();
if (s != null)
s = s.trim();
if (s.charAt(0) == '')
s = s.substring(1, s.length() - 1);
sb.append(s);
sb.append(", \"location\": \"");
sb.append(r.getString(2));
sb.append("\"");
return sb.toString();
/**
* main() is your entry point to the application.
*
* @param args
*/
public static void main(String[] args)
CsvWithEmbdeddedJsonApp app = new CsvWithEmbdeddedJsonApp();
app.start();
/**
* The processing code.
*/
private void start()
// Creates a session on a local master
SparkSession spark = SparkSession.builder()
.appName("Processing of invoices")
.master("local[*]")
.getOrCreate();
Dataset<Row> df = spark
.read()
.option("header", "true")
.option("delimiter", "|")
.csv("data/misc/csv_with_embedded_json.csv");
df.show(5, false);
df.printSchema();
Dataset<String> ds = df.map(
new Jsonifier(),
Encoders.STRING());
ds.show(5, false);
ds.printSchema();
Dataset<Row> dfJson = spark.read().json(ds);
dfJson.show(5, false);
dfJson.printSchema();
dfJson = dfJson
.withColumn("emp", explode(dfJson.col("employee")))
.drop("employee");
dfJson.show(5, false);
dfJson.printSchema();
dfJson = dfJson
.withColumn("emp_name",
concat(
dfJson.col("emp.name.firstName"),
lit(" "),
dfJson.col("emp.name.lasteName")))
.withColumn("emp_address",
concat(dfJson.col("emp.address.street"),
lit(" "),
dfJson.col("emp.address.unit")))
.withColumn("emp_city", dfJson.col("emp.address.city"))
.drop("emp");
dfJson.show(5, false);
dfJson.printSchema();
像往常一样,Java 非常冗长 :) - 我不抱怨。我留下了很多 printSchema() 和 show() 来说明构建过程。 Jsonifier
class 可以以一种更好、更通用的方式实现,但它提供了一个想法(如果需要,您也可以将其作为 lambda 实现)。
输出是:
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|dept |emp_json |location|
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|finance| "employee":["name":"firstName":"John","lasteName":"Doe","address":"street":"1234 West Broad St","unit":"8505","city":"Columbus","name":"firstName":"Alex","lasteName":"Messi","address":"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"]|OH |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
root
|-- dept: string (nullable = true)
|-- emp_json: string (nullable = true)
|-- location: string (nullable = true)
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
|value |
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
| "dept": "finance", "employee":["name":"firstName":"John","lasteName":"Doe","address":"street":"1234 West Broad St","unit":"8505","city":"Columbus","name":"firstName":"Alex","lasteName":"Messi","address":"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"], "location": "OH"|
+-----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+
root
|-- value: string (nullable = true)
+-------+-------------------------------------------------------------------------------------------------------------+--------+
|dept |employee |location|
+-------+-------------------------------------------------------------------------------------------------------------+--------+
|finance|[[[Columbus, 1234 West Broad St, 8505], [John, Doe]], [[Salinas, 4321 North Meecham Rd, 300], [Alex, Messi]]]|OH |
+-------+-------------------------------------------------------------------------------------------------------------+--------+
root
|-- dept: string (nullable = true)
|-- employee: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- address: struct (nullable = true)
| | | |-- city: string (nullable = true)
| | | |-- street: string (nullable = true)
| | | |-- unit: string (nullable = true)
| | |-- name: struct (nullable = true)
| | | |-- firstName: string (nullable = true)
| | | |-- lasteName: string (nullable = true)
|-- location: string (nullable = true)
+-------+--------+------------------------------------------------------+
|dept |location|emp |
+-------+--------+------------------------------------------------------+
|finance|OH |[[Columbus, 1234 West Broad St, 8505], [John, Doe]] |
|finance|OH |[[Salinas, 4321 North Meecham Rd, 300], [Alex, Messi]]|
+-------+--------+------------------------------------------------------+
root
|-- dept: string (nullable = true)
|-- location: string (nullable = true)
|-- emp: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- street: string (nullable = true)
| | |-- unit: string (nullable = true)
| |-- name: struct (nullable = true)
| | |-- firstName: string (nullable = true)
| | |-- lasteName: string (nullable = true)
+-------+--------+----------+-------------------------+--------+
|dept |location|emp_name |emp_address |emp_city|
+-------+--------+----------+-------------------------+--------+
|finance|OH |John Doe |1234 West Broad St 8505 |Columbus|
|finance|OH |Alex Messi|4321 North Meecham Rd 300|Salinas |
+-------+--------+----------+-------------------------+--------+
root
|-- dept: string (nullable = true)
|-- location: string (nullable = true)
|-- emp_name: string (nullable = true)
|-- emp_address: string (nullable = true)
|-- emp_city: string (nullable = true)
【讨论】:
感谢您的帮助。我使用 flatmap 函数传递行并将 json 列扩展为多列作为输出,然后根据某些条件从一行中扩展多行。【参考方案3】:如果你有一个正确的 json 模式,那么你可以使用 explode 方法,然后使用点运算符选择你想要的列。 (例如:emp_json.name,emp_json.address...)
示例代码
val flatJSON = df.select($"dept", explode($"emp_json").as("emp))
flatJSON.select("dept", "emp.name","emp.address")
【讨论】:
【参考方案4】:看看这个:
scala> val df = spark.read.format("csv").option("header", "true").option("delimiter", "|").option("inferSchema","true").load("/tmp/stack/tkkman.csv")
df: org.apache.spark.sql.DataFrame = [dept: string, emp_json: string ... 1 more field]
scala> df.show(false)
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|dept |emp_json |location|
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
|finance| "employee":["name":"firstName":"John","lasteName":"Doe","address":"street":"1234 West Broad St","unit":"8505","city":"Columbus","name":"firstName":"Alex","lasteName":"Messi","address":"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"]|OH |
+-------+----------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------------+--------+
scala> df.printSchema
root
|-- dept: string (nullable = true)
|-- emp_json: string (nullable = true)
|-- location: string (nullable = true)
scala> val jsonstr = """ "employee":["name":"firstName":"John","lasteName":"Doe","address":"street":"1234 West Broad St","unit":"8505","city":"Columbus","name":"firstName":"Alex","lasteName":"Messi","address":"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"]"""
jsonstr: String = "employee":["name":"firstName":"John","lasteName":"Doe","address":"street":"1234 West Broad St","unit":"8505","city":"Columbus","name":"firstName":"Alex","lasteName":"Messi","address":"street":"4321 North Meecham Rd","unit":"300","city":"Salinas"]
scala> val dfj = spark.read.json(Seq(jsonstr).toDS)
dfj: org.apache.spark.sql.DataFrame = [employee: array<struct<address:struct<city:string,street:string,unit:string>,name:struct<firstName:string,lasteName:string>>>]
scala> dfj.show(false)
+-------------------------------------------------------------------------------------------------------------+
|employee |
+-------------------------------------------------------------------------------------------------------------+
|[[[Columbus, 1234 West Broad St, 8505], [John, Doe]], [[Salinas, 4321 North Meecham Rd, 300], [Alex, Messi]]]|
+-------------------------------------------------------------------------------------------------------------+
scala> dfj.schema
res51: org.apache.spark.sql.types.StructType = StructType(StructField(employee,ArrayType(StructType(StructField(address,StructType(StructField(city,StringType,true), StructField(street,StringType,true), StructField(unit,StringType,true)),true), StructField(name,StructType(StructField(firstName,StringType,true), StructField(lasteName,StringType,true)),true)),true),true))
scala> val sch_emp = dfj.schema
sch_emp: org.apache.spark.sql.types.StructType = StructType(StructField(employee,ArrayType(StructType(StructField(address,StructType(StructField(city,StringType,true), StructField(street,StringType,true), StructField(unit,StringType,true)),true), StructField(name,StructType(StructField(firstName,StringType,true), StructField(lasteName,StringType,true)),true)),true),true))
scala> val df2 = df.select(col("*"),from_json('emp_json,sch_emp).as("emp"))
df2: org.apache.spark.sql.DataFrame = [dept: string, emp_json: string ... 2 more fields]
scala> df2.select(explode($"emp.employee")).printSchema
root
|-- col: struct (nullable = true)
| |-- address: struct (nullable = true)
| | |-- city: string (nullable = true)
| | |-- street: string (nullable = true)
| | |-- unit: string (nullable = true)
| |-- name: struct (nullable = true)
| | |-- firstName: string (nullable = true)
| | |-- lasteName: string (nullable = true)
scala> df2.select(col("*"),explode($"emp.employee").as("emp2")).select('dept,concat($"emp2.name.firstName",lit(" "),$"emp2.name.lasteName").as("emp_name"),$"emp2.address.street" as "emp_address", $"emp2.address.city" as "emp_city", 'location).show(false)
+-------+----------+---------------------+--------+--------+
|dept |emp_name |emp_address |emp_city|location|
+-------+----------+---------------------+--------+--------+
|finance|John Doe |1234 West Broad St |Columbus|OH |
|finance|Alex Messi|4321 North Meecham Rd|Salinas |OH |
+-------+----------+---------------------+--------+--------+
scala>
【讨论】:
以上是关于如何将包含 JSON 的输入 CSV 数据转换为 spark 数据集?的主要内容,如果未能解决你的问题,请参考以下文章
将其字段中带有逗号的 .csv 文件转换为 JSON/TXT