在嵌套结构中使用 where 子句
Posted
技术标签:
【中文标题】在嵌套结构中使用 where 子句【英文标题】:using clause where within nested structure 【发布时间】:2020-01-03 16:45:06 【问题描述】:我正在创建一个结构数据框。 我想根据我的字段值创建另外 2 个结构 x2.field3 想法是如果 x2.field3==4 那么我的结构将被创建(“struct_1 "), 如果 x2.field3==3 那么我的结构将被创建("struct_2")
when(col("x2.field3").cast(IntegerType())== lit(4), struct(col("x1.field1").alias("index2")).alias("struct_1"))\
.when(col("x2.field3").cast(IntegerType())==lit(3), struct(col("x1.field1").alias("Index1")).alias("struct_2"))
我尝试了不同的解决方案但没有成功,因为我总是遇到同样的错误:
Py4JJavaError:调用 o21058.withColumn 时出错。 : org.apache.spark.sql.AnalysisException:无法解析'CASE WHEN (CAST(
x2
.field3
AS INT) = 4) THEN named_struct('index2',x1
.field1
) 当 (CAST(x2
.field3
AS INT) = 3) 那么 由于数据类型不匹配,named_struct('Index1',x1
.field1
) END': THEN 和 ELSE 表达式都应该是相同的类型或强制转换为 普通型;; '项目 [x1#5751, x2#5752, named_struct(gen1, x1#5751.field1, gen2, x1#5751.field1, NamePlaceholder, 命名结构(gen3.1,x1#5751.field1,gen3.2,x1#5751.field1,gen3.3, x1#5751.field1, gen3.4, x1#5751.field1, gen3.5, x1#5751.field1, gen3.6, x1#5751.field1, NamePlaceholder, named_struct(gen3.7.1, named_struct(gen3.7.1.1, 11, gen3.7.1.2, 40), col2, CASE WHEN (cast(x2#5752.field3 as int) = 4) THEN named_struct(index2, x1#5751.field1) WHEN (cast(x2#5752.field3 as int) = 3) THEN named_struct(Index1, x1#5751.field1) END))) AS General#5772] +- 逻辑RDD [x1#5751, x2#5752], false
我的整个代码在下面
schema = StructType(
[
StructField('x1',
StructType([
StructField('field1', IntegerType(),True),
StructField('field2', IntegerType(),True),
StructField('x12',
StructType([
StructField('field5', IntegerType(),True)
])
),
])
),
StructField('x2',
StructType([
StructField('field3', IntegerType(),True),
StructField('field4', BooleanType(),True)
])
)
])
df1 = sqlCtx.createDataFrame([Row(Row(1, 3, Row(23)), Row(3,True))], schema)
df1.printSchema()
df = df1.withColumn("General",
struct(
col("x1.field1").alias("gen1"),
col("x1.field1").alias("gen2"),
struct(col("x1.field1").alias("gen3.1"),
col("x1.field1").alias("gen3.2"),
col("x1.field1").alias("gen3.3"),
col("x1.field1").alias("gen3.4"),
col("x1.field1").alias("gen3.5"),
col("x1.field1").alias("gen3.6"),
struct(struct(lit(11).alias("gen3.7.1.1"),
lit(40).alias("gen3.7.1.2")).alias("gen3.7.1"),
when(col("x2.field3").cast(IntegerType())== lit(4), struct(col("x1.field1").alias("index2")).alias("struct_1"))\
.when(col("x2.field3").cast(IntegerType())==lit(3), struct(col("x1.field1").alias("Index1")).alias("struct_2"))
).alias("gen3.7")).alias("gen3")
)).drop('x1','x2')
df.printSchema()
【问题讨论】:
【参考方案1】:以上,struct_1
和struct_2
是独占的,所以我推荐你下面这段代码:
from pyspark.sql.types import *
from pyspark.sql import Row
from pyspark.sql.functions import *
schema = StructType(
[
StructField('x1',
StructType([
StructField('field1', IntegerType(),True),
StructField('field2', IntegerType(),True),
StructField('x12',
StructType([
StructField('field5', IntegerType(),True)
])
),
])
),
StructField('x2',
StructType([
StructField('field3', IntegerType(),True),
StructField('field4', BooleanType(),True)
])
)
])
df1 = sqlCtx.createDataFrame([Row(Row(1, 3, Row(23)), Row(1,True))], schema)
df = df1.withColumn("General",
struct(
col("x1.field1").alias("gen1"),
col("x1.field1").alias("gen2"),
struct(col("x1.field1").alias("gen3.1"),
col("x1.field1").alias("gen3.2"),
col("x1.field1").alias("gen3.3"),
col("x1.field1").alias("gen3.4"),
col("x1.field1").alias("gen3.5"),
col("x1.field1").alias("gen3.6"),
struct(struct(lit(11).alias("gen3.7.1.1"),
lit(40).alias("gen3.7.1.2")).alias("gen3.7.1"),
when(col("x2.field3").cast(IntegerType())== lit(4), struct(col("x1.field1").alias("index2"))).alias("struct_1"),
when(col("x2.field3").cast(IntegerType())==lit(3), struct(col("x1.field1").alias("Index1"))).alias("struct_2")
).alias("gen3.7")).alias("gen3")
)).drop('x1','x2')
df.printSchema()
输出:
root
|-- General: struct (nullable = false)
| |-- gen1: integer (nullable = true)
| |-- gen2: integer (nullable = true)
| |-- gen3: struct (nullable = false)
| | |-- gen3.1: integer (nullable = true)
| | |-- gen3.2: integer (nullable = true)
| | |-- gen3.3: integer (nullable = true)
| | |-- gen3.4: integer (nullable = true)
| | |-- gen3.5: integer (nullable = true)
| | |-- gen3.6: integer (nullable = true)
| | |-- gen3.7: struct (nullable = false)
| | | |-- gen3.7.1: struct (nullable = false)
| | | | |-- gen3.7.1.1: integer (nullable = false)
| | | | |-- gen3.7.1.2: integer (nullable = false)
| | | |-- struct_1: struct (nullable = true)
| | | | |-- index2: integer (nullable = true)
| | | |-- struct_2: struct (nullable = true)
| | | | |-- Index1: integer (nullable = true)
【讨论】:
感谢您的回复,但在我的情况下 x2.field3 == 3 所以我必须创建 struct_2。 (不是结构 2 和结构 1)@baitmbarek @over DataFrames 是数据的高度结构化(和强类型)表示。即使您的数据在运行时不包含任何x2.field3 = 4
,您也必须声明它是可能的,如果是这样,数据应该附加到一些struct_1
。每个可能的结果字段都应该在 DataFrame's
模式中声明
我没明白你的意思?你能解释更多吗?请? @baimbarek
在您的代码中,您告诉 Spark 将一个新的 Struct 列附加到您的 DataFrame。在gen3.gen3.7
字段中,您定义了第一个字段gen3.7.1
,其中包含两个整数属性。一切安好。但是现在,您告诉 spark 将另一个字段附加到 gen3.gen3.7
。问题,取决于数据(在每一行上,因为一切都是并行的),您希望将结构命名为 struct_1 或 struct_2。 Spark 必须为每条记录创建两个公共字段 structure
(例如,DataFrame 不像 json 那样是半结构化的)。 @over
我理解得很好。非常感谢 。我正在考虑删除 == NULL 的结构。你知道我该怎么做吗?以上是关于在嵌套结构中使用 where 子句的主要内容,如果未能解决你的问题,请参考以下文章