为 Spark Rows 定义新模式
Posted
技术标签:
【中文标题】为 Spark Rows 定义新模式【英文标题】:Defining new schema for Spark Rows 【发布时间】:2016-04-14 17:06:40 【问题描述】:我有一个 DataFrame,其中一个列包含一串 JSON。到目前为止,我已经按照JavaRDD.map
方法的要求实现了Function
接口:Function<Row,Row>()
。在这个函数中,我解析 JSON,并创建一个新行,其附加列来自 JSON 中的值。例如:
原始行:
+------+-----------------------------------+
| id | json |
+------+-----------------------------------+
| 1 | "id":"abcd", "name":"dmux",... |
+------------------------------------------+
应用我的功能后:
+------+----------+-----------+
| id | json_id | json_name |
+------+----------+-----------+
| 1 | abcd | dmux |
+-----------------+-----------+
我在尝试从返回的 JavaRDD 创建一个新的 DataFrame 时遇到了麻烦。现在我有了这些新行,我需要创建一个模式。模式高度依赖于 JSON 的结构,因此我试图找出一种将模式数据与 Row
对象一起从函数传回的方法。我不能使用 broadcast
变量,因为 SparkContext 没有传递到函数中。
除了在Function
的调用者中循环遍历每一列之外,我还有哪些选择?
【问题讨论】:
【参考方案1】:您可以创建一个StructType
。这是Scala
,但它的工作方式相同:
val newSchema = StructType(Array(
StructField("id", LongType, false),
StructField("json_id", StringType, false),
StructField("json_name", StringType, false)
))
val newDf = sqlContext.createDataFrame(rdd, newSchema)
顺便说一句,您需要确保您的 rdd
是 RDD[Row]
类型。
【讨论】:
David,我目前在调用者方法中使用 StructType,但我不知道我的 JSON 中还有哪些 StructFields。JSON
模式是否逐行相同?最好是这样,否则你不能做你想做的事。
每一行都相同,但可能因数据帧而异。
那么您只需要基于JSON
架构动态构建您的StructType
。由于StructType
采用Array
的StructField
,只需将您的JSON
列映射到StructField
的Array
并将其传递给StructType
的构造函数
您将需要在映射之外的JSON
结构——在您创建JavaRDD
之前或之后——但无论如何,您必须在上下文之外创建StructType
处理你的RDD
以上是关于为 Spark Rows 定义新模式的主要内容,如果未能解决你的问题,请参考以下文章
spark Spark Streamingkafka数据源Direct模式 自定义数据源