将具有无效字符的嵌套字段从 Spark 2 导出到 Parquet [重复]

Posted

技术标签:

【中文标题】将具有无效字符的嵌套字段从 Spark 2 导出到 Parquet [重复]【英文标题】:Exporting nested fields with invalid characters from Spark 2 to Parquet [duplicate] 【发布时间】:2017-01-13 17:31:17 【问题描述】:

我正在尝试使用 spark 2.0.2 将 JSON 文件转换为 parquet。

JSON 文件来自外部来源,因此在到达之前无法更改架构。 该文件包含属性映射。在我收到文件之前,属性名称是未知的。 属性名称包含无法在 parquet 中使用的字符。

    "id" : 1,
    "name" : "test",
    "attributes" : 
        "name=attribute" : 10,
        "name=attribute with space" : 100,
        "name=something else" : 10
    

parquet 中不能同时使用空格和等号,出现以下错误:

 org.apache.spark.sql.AnalysisException:属性名称“name=attribute”在“,;()\n\t=" 中包含无效字符。请使用别名重命名。; 
由于这些是嵌套字段,我无法使用别名重命名它们,这是真的吗? 我已尝试按照此处的建议重命名架构中的字段:How to rename fields in an DataFrame corresponding to nested JSON。这适用于某些文件,但是,我现在得到以下 ***: java.lang.***Error 在 scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65) 在 org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:258) 在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1563) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1579) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 在 scala.collection.immutable.List.foreach(List.scala:381) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1578) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1576) 在 scala.collection.immutable.List.foreach(List.scala:381) 在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1576) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1579) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 在 scala.collection.immutable.List.foreach(List.scala:381) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1578) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1576) 在 scala.collection.immutable.List.foreach(List.scala:381) ... 重复 ...

我想做以下事情之一:

当我将数据加载到 spark 中时,从字段名称中去除无效字符 更改架构中的列名而不导致堆栈溢出 以某种方式更改架构以加载原始数据,但在内部使用以下内容:

    "id" : 1,
    "name" : "test",
    "attributes" : [
        "key":"name=attribute", "value" : 10,
        "key":"name=attribute with space", "value"  : 100,
        "key":"name=something else", "value" : 10
    ]

【问题讨论】:

您找到解决方案了吗? 我还在使用我发布的解决方案 这不应被标记为重复。这个问题是关于嵌套列的,它与其他问题完全不同。 @eliasah 你怎么看? 【参考方案1】:

我是这样解决问题的:

df.toDF(df
    .schema
    .fieldNames
    .map(name => "[ ,;()\\n\\t=]+".r.replaceAllIn(name, "_")): _*)

我用“_”替换了所有不正确的符号。

【讨论】:

这仅适用于没有嵌套字段的架构,OP 的问题是关于嵌套字段。 您可以重做任何您想要的架构。您只需要实现方案树下降并将正则表达式模式应用于所有名称。如何实现树下降超出了这个问题的范围【参考方案2】:

到目前为止,我发现唯一可行的解​​决方案是使用修改后的架构重新加载数据。新架构会将属性加载到地图中。

Dataset<Row> newData = sql.read().json(path);
StructType newSchema = (StructType) toMapType(newData.schema(), null, "attributes");
newData = sql.read().schema(newSchema).json(path);

private DataType toMapType(DataType dataType, String fullColName, String col) 
    if (dataType instanceof StructType) 
        StructType structType = (StructType) dataType;

        List<StructField> renamed = Arrays.stream(structType.fields()).map(
            f -> toMapType(f, fullColName == null ? f.name() : fullColName + "." + f.name(), col)).collect(Collectors.toList());
        return new StructType(renamed.toArray(new StructField[renamed.size()]));
    
    return dataType;


private StructField toMapType(StructField structField, String fullColName, String col) 
    if (fullColName.equals(col)) 
        return new StructField(col, new MapType(DataTypes.StringType, DataTypes.LongType, true), true, Metadata.empty());
     else if (col.startsWith(fullColName)) 
        return new StructField(structField.name(), toMapType(structField.dataType(), fullColName, col), structField.nullable(), structField.metadata());
    
    return structField;


【讨论】:

【参考方案3】:

@: 我也有同样的问题。

在我们的例子中,我们解决了讨好 DataFrame 的问题。

  val ALIAS_RE: Regex = "[_.:@]+".r
  val FIRST_AT_RE: Regex = "^_".r

  def getFieldAlias(field_name: String): String = 
    FIRST_AT_RE.replaceAllIn(ALIAS_RE.replaceAllIn(field_name, "_"), "")
  

  def selectFields(df: DataFrame, fields: List[String]): DataFrame = 
    var fields_to_select = List[Column]()
    for (field <- fields) 
      val alias = getFieldAlias(field)
      fields_to_select +:= col(field).alias(alias)
    

    df.select(fields_to_select: _*)
  

所以下面的json:

 
  object: 'blabla',
  schema: 
    @type: 'blabla',
    name@id: 'blabla'
  

这将被转换为[object, schema.@type, schema.name@id]@dots(在您的情况下为 =)会给 SparkSQL 带来问题。

所以在我们的 SelectFields 之后,您可以以 [对象、schema_type、schema_name_id]。受宠若惊的 DataFrame。

【讨论】:

以上是关于将具有无效字符的嵌套字段从 Spark 2 导出到 Parquet [重复]的主要内容,如果未能解决你的问题,请参考以下文章

使用 Spark 访问嵌套在结构中的 json 数组

Spark:如何从具有属性的多个嵌套 XML 文件转换为 Data Frame 数据

如何更改 DataFrame 的架构(修复一些嵌套字段的名称)?

如何将具有嵌套StructType的列转换为Spark SQL中的类实例?

AWS Athena 将结构数组导出到 JSON

sql 语句多层嵌套查询 使用别名 字段无效,如何解决(有图)