Spark:如何重用在数据帧中定义了所有字段的相同数组模式

Posted

技术标签:

【中文标题】Spark:如何重用在数据帧中定义了所有字段的相同数组模式【英文标题】:Spark : How to reuse the same array schema that has all fields defined across the data-frame 【发布时间】:2021-09-30 19:48:50 【问题描述】:

我有数百列 a,b,c ... 。我想修改数据框架构,其中每个数组将具有相同的形状 datenumval 字段。

有成千上万的id,所以我只想修改模式而不是数据框。下一步将使用修改后的模式来有效地将数据加载到数据框。我想避免使用 UDF 来修改整个数据框。

输入架构:

df.printSchema()

root
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- num: long (nullable = true) !!! NOTE : `num` !!!
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- d: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- id: long (nullable = true)

必需的输出模式:

root
 |-- a: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- num: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- b: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- num: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- c: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- num: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- d: array (nullable = true)
 |    |-- element: struct (containsNull = true)
 |    |    |-- date: long (nullable = true)
 |    |    |-- num: long (nullable = true)
 |    |    |-- val: long (nullable = true)
 |-- id: long (nullable = true)

重现输入架构:

df = spark.read.json(sc.parallelize([
  """"id":1,"a":["date":2001,"num":1,"date":2002,,"date":2003,],"b":["date":2001,"val":4,"date":2002,"val":5,"date":2003,"val":6],"d":["date":2001,"val":21,"date":2002,"val":22,"date":2003,"val":23],"c":["date":2000,"val":30,"date":2001,"val":31,"date":2002,"val":32,"date":2003,"val":33]""",
  """"id":2,"a":["date":2001,"num":2,"date":2002,"date":2003],"b":["date":2001,"val":4,"date":2002,"val":5,"date":2003,"val":6],"d":["date":2001,"val":21,"date":2002,"val":22,"date":2003,"val":23],"c":["date":1990,"val":39,"date":2000,"val":30,"date":2001,"val":31,"date":2002,"val":32,"date":2003,"val":33,"date":2004,"val":34]"""
]))


 for field in df.schema:
    print(field)

打印输出:

StructField(a,ArrayType(StructType(List(StructField(date,LongType,true),StructField(num,LongType,true),StructField(val,LongType,true))),true),true)
StructField(b,ArrayType(StructType(List(StructField(date,LongType,true),StructField(val,LongType,true))),true),true)
StructField(c,ArrayType(StructType(List(StructField(date,LongType,true),StructField(val,LongType,true))),true),true)
StructField(d,ArrayType(StructType(List(StructField(date,LongType,true),StructField(val,LongType,true))),true),true)
StructField(id,LongType,true)

解决方案(有关详细信息,请参阅下面的 OneCricketeer 答案):

from pyspark.sql.types import StructField, StructType, LongType, ArrayType

jsonstr=[
  """"id":1,"a":["date":2001,"val":1,"num":1,"date":2002,"val":2,"date":2003,"val":3],"b":["date":2001,"val":4,"date":2002,"val":5,"date":2003,"val":6],"d":["date":2001,"val":21,"date":2002,"val":22,"date":2003,"val":23],"c":["date":2000,"val":30,"date":2001,"val":31,"date":2002,"val":32,"date":2003,"val":33]""",
  """"id":2,"a":["date":2001,"val":1,"date":2002,"val":2,"date":2003,"val":3],"b":["date":2001,"val":4,"date":2002,"val":5,"date":2003,"val":6],"d":["date":2001,"val":21,"date":2002,"val":22,"date":2003,"val":23],"c":["date":1990,"val":39,"date":2000,"val":30,"date":2001,"val":31,"date":2002,"val":32,"date":2003,"val":33,"date":2004,"val":34]"""
]

array_schema = ArrayType(StructType([
    
    StructField('date' ,LongType(),True),
    StructField('num' ,LongType(),True),
    StructField('val' ,LongType(),True)]),
    True)


keys = ['a', 'b', 'c', 'd'] 
fields = [StructField(k, array_schema, True) for k in keys] 
fields.append(StructField('id',LongType(),True))
df_schema = StructType(fields)

dff = spark.read.json(sc.parallelize(jsonstr),df_schema)

【问题讨论】:

如果您不想列出 date, num, val 的每一对,请使用 Map 类型列表而不是 Struct 我会试试这个,但如果有人知道如何在我的示例中应用 MapType,我会接受它作为工作解决方案:sparkbyexamples.com/pyspark/pyspark-maptype-dict-examples 您仍然需要列出“数百种地图类型”(但是您是如何获得数百个字母的呢?)。但这最终与将 ArrayType/StructType 提取到可以在每个外部 StructField 中重用的变量相同。另外,NOTE : num 是什么意思?是的,您为第一个数组定义了它,但没有为其他数组定义它,因此除了第一个数组之外它不存在 【参考方案1】:

我认为真正的解决方案是使用一致的名称,或者至少在字段确实不同时使用更具描述性的名称。 “num”和“val”基本上是同义词

如果我理解这个问题,您想重用定义了所有字段的相同数组架构

array_schema = ArrayType(StructType([StructField('date' ,LongType(),False),StructField('num' ,LongType(),True),StructField('val' ,LongType(),True))),True) 

df_schema = StructType([
StructField('a',array_schema,True)
StructField('b',array_schema,True)
...
StructField('id',LongType(),True)
])

或者你可以在一个循环中这样做,这是安全的,因为它应用在 Spark 驱动程序中

keys = ['a', 'b'] 
fields = [StructField(k, array_schema, True) for k in keys] 
fields.append(StructField('id',LongType(),True))
df_schema = StructType(fields) 

(如果没有空值,则将每个布尔值更改为 False)

那么你需要把这个模式提供给你的读取函数

spark.read.schema(df_schema).json(...

如果仍有更多字段无法一致地应用于所有“键”,则使用ArrayType(MapType(StringType(), LongType()), False)

【讨论】:

非常感谢,这正是我正在寻找的。我无法运行array_schema 有一些问题 array_schema = ArrayType(StructType([ StructField('date' ,LongType,True), StructField('num' ,LongType,True), StructField('val' ,LongType,True ) ]), True ) 我得到 AssertionError: dataType should be an instance of 你缺少括号

以上是关于Spark:如何重用在数据帧中定义了所有字段的相同数组模式的主要内容,如果未能解决你的问题,请参考以下文章

Spark SCALA - 连接两个数据帧,其中一个数据帧中的连接值位于第二个数据帧中的两个字段之间

是否可以以相同或不同的顺序将具有相同标题或标题子集的多个 csv 文件读取到 spark 数据帧中?

如何以自定义格式加载带有时间戳的 CSV?

制作自定义SSIS包以供重用

删除Spark数据帧中具有句点的列名称

如何确定 Apache Spark 数据帧中的分区大小