将具有无效字符的嵌套字段从 Spark 2 导出到 Parquet [重复]
Posted
技术标签:
【中文标题】将具有无效字符的嵌套字段从 Spark 2 导出到 Parquet [重复]【英文标题】:Exporting nested fields with invalid characters from Spark 2 to Parquet [duplicate] 【发布时间】:2017-01-13 17:31:17 【问题描述】:我正在尝试使用 spark 2.0.2 将 JSON 文件转换为 parquet。
JSON 文件来自外部来源,因此在到达之前无法更改架构。 该文件包含属性映射。在我收到文件之前,属性名称是未知的。 属性名称包含无法在 parquet 中使用的字符。
"id" : 1,
"name" : "test",
"attributes" :
"name=attribute" : 10,
"name=attribute with space" : 100,
"name=something else" : 10
parquet 中不能同时使用空格和等号,出现以下错误:
org.apache.spark.sql.AnalysisException:属性名称“name=attribute”在“,;()\n\t=" 中包含无效字符。请使用别名重命名。;由于这些是嵌套字段,我无法使用别名重命名它们,这是真的吗? 我已尝试按照此处的建议重命名架构中的字段:How to rename fields in an DataFrame corresponding to nested JSON。这适用于某些文件,但是,我现在得到以下 ***: java.lang.***Error 在 scala.runtime.BoxesRunTime.boxToInteger(BoxesRunTime.java:65) 在 org.apache.spark.scheduler.DAGScheduler.getCacheLocs(DAGScheduler.scala:258) 在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1563) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1579) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 在 scala.collection.immutable.List.foreach(List.scala:381) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1578) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1576) 在 scala.collection.immutable.List.foreach(List.scala:381) 在 org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal(DAGScheduler.scala:1576) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply$mcVI$sp(DAGScheduler.scala:1579) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2$$anonfun$apply$1.apply(DAGScheduler.scala:1578) 在 scala.collection.immutable.List.foreach(List.scala:381) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1578) 在 org.apache.spark.scheduler.DAGScheduler$$anonfun$org$apache$spark$scheduler$DAGScheduler$$getPreferredLocsInternal$2.apply(DAGScheduler.scala:1576) 在 scala.collection.immutable.List.foreach(List.scala:381) ... 重复 ...
我想做以下事情之一:
当我将数据加载到 spark 中时,从字段名称中去除无效字符 更改架构中的列名而不导致堆栈溢出 以某种方式更改架构以加载原始数据,但在内部使用以下内容:
"id" : 1,
"name" : "test",
"attributes" : [
"key":"name=attribute", "value" : 10,
"key":"name=attribute with space", "value" : 100,
"key":"name=something else", "value" : 10
]
【问题讨论】:
您找到解决方案了吗? 我还在使用我发布的解决方案 这不应被标记为重复。这个问题是关于嵌套列的,它与其他问题完全不同。 @eliasah 你怎么看? 【参考方案1】:我是这样解决问题的:
df.toDF(df
.schema
.fieldNames
.map(name => "[ ,;()\\n\\t=]+".r.replaceAllIn(name, "_")): _*)
我用“_”替换了所有不正确的符号。
【讨论】:
这仅适用于没有嵌套字段的架构,OP 的问题是关于嵌套字段。 您可以重做任何您想要的架构。您只需要实现方案树下降并将正则表达式模式应用于所有名称。如何实现树下降超出了这个问题的范围【参考方案2】:到目前为止,我发现唯一可行的解决方案是使用修改后的架构重新加载数据。新架构会将属性加载到地图中。
Dataset<Row> newData = sql.read().json(path);
StructType newSchema = (StructType) toMapType(newData.schema(), null, "attributes");
newData = sql.read().schema(newSchema).json(path);
private DataType toMapType(DataType dataType, String fullColName, String col)
if (dataType instanceof StructType)
StructType structType = (StructType) dataType;
List<StructField> renamed = Arrays.stream(structType.fields()).map(
f -> toMapType(f, fullColName == null ? f.name() : fullColName + "." + f.name(), col)).collect(Collectors.toList());
return new StructType(renamed.toArray(new StructField[renamed.size()]));
return dataType;
private StructField toMapType(StructField structField, String fullColName, String col)
if (fullColName.equals(col))
return new StructField(col, new MapType(DataTypes.StringType, DataTypes.LongType, true), true, Metadata.empty());
else if (col.startsWith(fullColName))
return new StructField(structField.name(), toMapType(structField.dataType(), fullColName, col), structField.nullable(), structField.metadata());
return structField;
【讨论】:
【参考方案3】:@:
我也有同样的问题。
在我们的例子中,我们解决了讨好 DataFrame 的问题。
val ALIAS_RE: Regex = "[_.:@]+".r
val FIRST_AT_RE: Regex = "^_".r
def getFieldAlias(field_name: String): String =
FIRST_AT_RE.replaceAllIn(ALIAS_RE.replaceAllIn(field_name, "_"), "")
def selectFields(df: DataFrame, fields: List[String]): DataFrame =
var fields_to_select = List[Column]()
for (field <- fields)
val alias = getFieldAlias(field)
fields_to_select +:= col(field).alias(alias)
df.select(fields_to_select: _*)
所以下面的json:
object: 'blabla',
schema:
@type: 'blabla',
name@id: 'blabla'
这将被转换为[object, schema.@type, schema.name@id]。 @ 和 dots(在您的情况下为 =)会给 SparkSQL 带来问题。
所以在我们的 SelectFields 之后,您可以以 [对象、schema_type、schema_name_id]。受宠若惊的 DataFrame。
【讨论】:
以上是关于将具有无效字符的嵌套字段从 Spark 2 导出到 Parquet [重复]的主要内容,如果未能解决你的问题,请参考以下文章
Spark:如何从具有属性的多个嵌套 XML 文件转换为 Data Frame 数据
如何更改 DataFrame 的架构(修复一些嵌套字段的名称)?