Spark:如何重用在数据帧中定义了所有字段的相同数组模式
Posted
技术标签:
【中文标题】Spark:如何重用在数据帧中定义了所有字段的相同数组模式【英文标题】:Spark : How to reuse the same array schema that has all fields defined across the data-frame 【发布时间】:2021-09-30 19:48:50 【问题描述】:我有数百列 a,b,c ... 。我想修改数据框架构,其中每个数组将具有相同的形状 date
、num
和 val
字段。
有成千上万的id
,所以我只想修改模式而不是数据框。下一步将使用修改后的模式来有效地将数据加载到数据框。我想避免使用 UDF 来修改整个数据框。
输入架构:
df.printSchema()
root
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- num: long (nullable = true) !!! NOTE : `num` !!!
| | |-- val: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- c: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- d: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- val: long (nullable = true)
|-- id: long (nullable = true)
必需的输出模式:
root
|-- a: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- num: long (nullable = true)
| | |-- val: long (nullable = true)
|-- b: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- num: long (nullable = true)
| | |-- val: long (nullable = true)
|-- c: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- num: long (nullable = true)
| | |-- val: long (nullable = true)
|-- d: array (nullable = true)
| |-- element: struct (containsNull = true)
| | |-- date: long (nullable = true)
| | |-- num: long (nullable = true)
| | |-- val: long (nullable = true)
|-- id: long (nullable = true)
重现输入架构:
df = spark.read.json(sc.parallelize([
""""id":1,"a":["date":2001,"num":1,"date":2002,,"date":2003,],"b":["date":2001,"val":4,"date":2002,"val":5,"date":2003,"val":6],"d":["date":2001,"val":21,"date":2002,"val":22,"date":2003,"val":23],"c":["date":2000,"val":30,"date":2001,"val":31,"date":2002,"val":32,"date":2003,"val":33]""",
""""id":2,"a":["date":2001,"num":2,"date":2002,"date":2003],"b":["date":2001,"val":4,"date":2002,"val":5,"date":2003,"val":6],"d":["date":2001,"val":21,"date":2002,"val":22,"date":2003,"val":23],"c":["date":1990,"val":39,"date":2000,"val":30,"date":2001,"val":31,"date":2002,"val":32,"date":2003,"val":33,"date":2004,"val":34]"""
]))
for field in df.schema:
print(field)
打印输出:
StructField(a,ArrayType(StructType(List(StructField(date,LongType,true),StructField(num,LongType,true),StructField(val,LongType,true))),true),true)
StructField(b,ArrayType(StructType(List(StructField(date,LongType,true),StructField(val,LongType,true))),true),true)
StructField(c,ArrayType(StructType(List(StructField(date,LongType,true),StructField(val,LongType,true))),true),true)
StructField(d,ArrayType(StructType(List(StructField(date,LongType,true),StructField(val,LongType,true))),true),true)
StructField(id,LongType,true)
解决方案(有关详细信息,请参阅下面的 OneCricketeer 答案):
from pyspark.sql.types import StructField, StructType, LongType, ArrayType
jsonstr=[
""""id":1,"a":["date":2001,"val":1,"num":1,"date":2002,"val":2,"date":2003,"val":3],"b":["date":2001,"val":4,"date":2002,"val":5,"date":2003,"val":6],"d":["date":2001,"val":21,"date":2002,"val":22,"date":2003,"val":23],"c":["date":2000,"val":30,"date":2001,"val":31,"date":2002,"val":32,"date":2003,"val":33]""",
""""id":2,"a":["date":2001,"val":1,"date":2002,"val":2,"date":2003,"val":3],"b":["date":2001,"val":4,"date":2002,"val":5,"date":2003,"val":6],"d":["date":2001,"val":21,"date":2002,"val":22,"date":2003,"val":23],"c":["date":1990,"val":39,"date":2000,"val":30,"date":2001,"val":31,"date":2002,"val":32,"date":2003,"val":33,"date":2004,"val":34]"""
]
array_schema = ArrayType(StructType([
StructField('date' ,LongType(),True),
StructField('num' ,LongType(),True),
StructField('val' ,LongType(),True)]),
True)
keys = ['a', 'b', 'c', 'd']
fields = [StructField(k, array_schema, True) for k in keys]
fields.append(StructField('id',LongType(),True))
df_schema = StructType(fields)
dff = spark.read.json(sc.parallelize(jsonstr),df_schema)
【问题讨论】:
如果您不想列出date, num, val
的每一对,请使用 Map 类型列表而不是 Struct
我会试试这个,但如果有人知道如何在我的示例中应用 MapType,我会接受它作为工作解决方案:sparkbyexamples.com/pyspark/pyspark-maptype-dict-examples
您仍然需要列出“数百种地图类型”(但是您是如何获得数百个字母的呢?)。但这最终与将 ArrayType/StructType 提取到可以在每个外部 StructField 中重用的变量相同。另外,NOTE : num
是什么意思?是的,您为第一个数组定义了它,但没有为其他数组定义它,因此除了第一个数组之外它不存在
【参考方案1】:
我认为真正的解决方案是使用一致的名称,或者至少在字段确实不同时使用更具描述性的名称。 “num”和“val”基本上是同义词
如果我理解这个问题,您想重用定义了所有字段的相同数组架构
array_schema = ArrayType(StructType([StructField('date' ,LongType(),False),StructField('num' ,LongType(),True),StructField('val' ,LongType(),True))),True)
df_schema = StructType([
StructField('a',array_schema,True)
StructField('b',array_schema,True)
...
StructField('id',LongType(),True)
])
或者你可以在一个循环中这样做,这是安全的,因为它应用在 Spark 驱动程序中
keys = ['a', 'b']
fields = [StructField(k, array_schema, True) for k in keys]
fields.append(StructField('id',LongType(),True))
df_schema = StructType(fields)
(如果没有空值,则将每个布尔值更改为 False)
那么你需要把这个模式提供给你的读取函数
spark.read.schema(df_schema).json(...
如果仍有更多字段无法一致地应用于所有“键”,则使用ArrayType(MapType(StringType(), LongType()), False)
【讨论】:
非常感谢,这正是我正在寻找的。我无法运行array_schema
有一些问题 array_schema = ArrayType(StructType([ StructField('date' ,LongType,True), StructField('num' ,LongType,True), StructField('val' ,LongType,True ) ]), True ) 我得到 AssertionError: dataType 以上是关于Spark:如何重用在数据帧中定义了所有字段的相同数组模式的主要内容,如果未能解决你的问题,请参考以下文章
Spark SCALA - 连接两个数据帧,其中一个数据帧中的连接值位于第二个数据帧中的两个字段之间