将 Json 的 Dataset 列解析为 Dataset<Row>
Posted
技术标签:
【中文标题】将 Json 的 Dataset 列解析为 Dataset<Row>【英文标题】:parse Dataset column of Json to Dataset<Row> 【发布时间】:2016-11-22 09:32:52 【问题描述】:拥有Dataset<Row>
的单列json字符串:
+--------------------+
| value|
+--------------------+
|"Context":"00AA0...|
+--------------------+
Json 示例:
"Context":"00AA00AA","MessageType":"1010","Module":"1200"
我怎样才能最有效地获得如下所示的Dataset<Row>
:
+--------+-----------+------+
| Context|MessageType|Module|
+--------+-----------+------+
|00AA00AA| 1010| 1200|
+--------+-----------+------+
我正在处理流中的这些数据,我知道当我从文件中读取数据时,spark 可以自己执行此操作:
spark
.readStream()
.schema(MyPojo.getSchema())
.json("src/myinput")
但现在我正在从 kafka 读取数据,它以另一种形式为我提供数据。 我知道我可以使用 Gson 之类的解析器,但我想让 spark 为我做这件事。
【问题讨论】:
【参考方案1】:试试这个示例。
public class SparkJSONValueDataset
public static void main(String[] args)
SparkSession spark = SparkSession
.builder()
.appName("SparkJSONValueDataset")
.config("spark.sql.warehouse.dir", "/file:C:/temp")
.master("local")
.getOrCreate();
//Prepare data Dataset<Row>
List<String> data = Arrays.asList("\"Context\":\"00AA00AA\",\"MessageType\":\"1010\",\"Module\":\"1200\"");
Dataset<Row> df = spark.createDataset(data, Encoders.STRING()).toDF().withColumnRenamed("_1", "value");
df.show();
//convert to Dataset<String> and Read
Dataset<String> df1 = df.as(Encoders.STRING());
Dataset<Row> df2 = spark.read().json(df1.javaRDD());
df2.show();
spark.stop();
【讨论】:
为您解答。猜猜这会起作用,但我真的不喜欢将我的 DF 再次发送给读者的想法:(以上是关于将 Json 的 Dataset 列解析为 Dataset<Row>的主要内容,如果未能解决你的问题,请参考以下文章
json转dataset的另外一种解析方式自动生成guid强关联
将Dataset中的列类型转换为python中具有特定格式的日期时间类型时出错
使用 Apache Spark 和 Java 将 CSV 解析为 DataFrame/DataSet