为 Spark Rows 定义新模式

Posted

技术标签:

【中文标题】为 Spark Rows 定义新模式【英文标题】:Defining new schema for Spark Rows 【发布时间】:2016-04-14 17:06:40 【问题描述】:

我有一个 DataFrame,其中一个列包含一串 JSON。到目前为止,我已经按照JavaRDD.map 方法的要求实现了Function 接口:Function<Row,Row>()。在这个函数中,我解析 JSON,并创建一个新行,其附加列来自 JSON 中的值。例如:

原始行:

+------+-----------------------------------+
|  id  |        json                       |
+------+-----------------------------------+
|  1   | "id":"abcd", "name":"dmux",...  |
+------------------------------------------+

应用我的功能后:

+------+----------+-----------+
|  id  | json_id  | json_name |
+------+----------+-----------+
|  1   | abcd     | dmux      |
+-----------------+-----------+

我在尝试从返回的 JavaRDD 创建一个新的 DataFrame 时遇到了麻烦。现在我有了这些新行,我需要创建一个模式。模式高度依赖于 JSON 的结构,因此我试图找出一种将模式数据与 Row 对象一起从函数传回的方法。我不能使用 broadcast 变量,因为 SparkContext 没有传递到函数中。

除了在Function 的调用者中循环遍历每一列之外,我还有哪些选择?

【问题讨论】:

【参考方案1】:

您可以创建一个StructType。这是Scala,但它的工作方式相同:

val newSchema = StructType(Array(
  StructField("id", LongType, false),
  StructField("json_id", StringType, false),
  StructField("json_name", StringType, false)
))

val newDf = sqlContext.createDataFrame(rdd, newSchema)

顺便说一句,您需要确保您的 rddRDD[Row] 类型。

【讨论】:

David,我目前在调用者方法中使用 StructType,但我不知道我的 JSON 中还有哪些 StructFields。 JSON 模式是否逐行相同?最好是这样,否则你不能做你想做的事。 每一行都相同,但可能因数据帧而异。 那么您只需要基于JSON 架构动态构建您的StructType。由于StructType 采用ArrayStructField,只需将您的JSON 列映射到StructFieldArray 并将其传递给StructType 的构造函数 您将需要在映射之外的JSON 结构——在您创建JavaRDD 之前或之后——但无论如何,您必须在上下文之外创建StructType处理你的RDD

以上是关于为 Spark Rows 定义新模式的主要内容,如果未能解决你的问题,请参考以下文章

spark-xml 中具有嵌套父节点的自定义模式

spark中的CSV自定义模式[重复]

Spark 将自定义模式应用于 DataFrame

spark Spark Streamingkafka数据源Direct模式 自定义数据源

spark - 没有定义模式,也没有在下面找到 Parquet 数据文件或摘要文件

Spark:如何重用在数据帧中定义了所有字段的相同数组模式