在 spark Dataframe 中动态创建多列
Posted
技术标签:
【中文标题】在 spark Dataframe 中动态创建多列【英文标题】:Creating multiple columns in spark Dataframe dynamically 【发布时间】:2017-09-11 11:50:40 【问题描述】:我有字典,其中包含以下信息,
dict_segs = 'key1' : 'a' : 'col1' : 'value1', 'col2' : 'value2', 'col3': 'value3',
'b' : 'col2' : 'value2', 'col3' : 'value3',
'c' : 'col1' : 'value1',
'key2' : 'd' : 'col3' : 'value3', 'col2' : 'value2',
'f' : 'col1' : 'value1', 'col4' : 'value4'
待办事项:
keys 基本上是“segments”,其基础字典,即 key1 的 a、b、c 是“subsegments”。对于每个子段,过滤条件在子段的基础字典中可用,即 a、b、c、d、f。另外,子段字典键的过滤条件也是pyspark数据框的列名。
我想在 pyspark 数据框中为每个段一次性创建子段列,当满足过滤条件时,每个子段列的值将为 1,否则为 0,类似于,
for item in dict_segs:
pyspark_dataframe.withColumn(*dict_segs[item].keys(), when(meeting filter criteria with respect to each key), 1).otherwise(0))
在进行研究时,我能够在 scala 中找到类似的东西,但列过滤条件是静态的,但对于上述逻辑,即动态。请看下面的scala逻辑,
Spark/Scala repeated calls to withColumn() using the same function on multiple columns
需要支持根据上面的伪代码为每个段派生上述逻辑。
谢谢。
【问题讨论】:
【参考方案1】:
您正在寻找select
声明:
让我们创建一个示例数据框:
df = spark.createDataFrame(
sc.parallelize([["value" + str(i) for i in range(1, 5)], ["value" + str(i) for i in range(5, 9)]]),
["col" + str(i) for i in range(1, 5)]
)
+------+------+------+------+
| col1| col2| col3| col4|
+------+------+------+------+
|value1|value2|value3|value4|
|value5|value6|value7|value8|
+------+------+------+------+
现在对于字典中的所有 keys
、dict_seg[key]
中的所有 subkeys
和 dict_seg[key][subkey]
中的所有 columns
:
import pyspark.sql.functions as psf
df.select(
["*"] +
[
eval('&'.join([
'(df["' + c + '"] == "' + dict_segs[k][sk][c] + '")' for c in dict_segs[k][sk].keys()
])).cast("int").alias(sk)
for k in dict_segs.keys() for sk in dict_segs[k].keys()
]
).show()
+------+------+------+------+---+---+---+---+---+
| col1| col2| col3| col4| a| b| c| d| f|
+------+------+------+------+---+---+---+---+---+
|value1|value2|value3|value4| 1| 1| 1| 1| 1|
|value5|value6|value7|value8| 0| 0| 0| 0| 0|
+------+------+------+------+---+---+---+---+---+
"*"
允许您保留所有以前存在的列,可以将其替换为 df.columns
。
alias(sk)
允许您将名称 sk
赋予新列
cast("int")
将 boolean 类型更改为 int 类型
我真的不明白为什么你有一个深度 3 的字典,似乎 key1, key2
并不是真的有用。
【讨论】:
感谢完美的解决方案。对于字典中的额外级别。我在其中一个过滤条件中使用此信息。但是,经过分析发现您的观点是有效的,并且不是必需的。掉了一级,最后保留了两级。 酷我很高兴能帮上忙。不要忘记将问题标记为已解决 当然,你能帮忙传递列表而不是价值。所以我修改为: eval('&'.join([ '(df["' + c + '"] == "' + dict_segs[k][sk][c] + '")' for c in dict_segs[k][sk].keys() ])) 修改:eval('&'.join([ '(tbl["' + c + '"].isin("' + v + '") )' for c, v in self.dict_tstPlan[sk].iteritems() ])) 我想传递 'v' 值的列表... 您必须使用'","'.join(v)
将列表转换为字符串,因此在 eval 函数中您最终会得到:eval('&'.join([ '(tbl["' + c + '"].isin(["' + '","'.join(v) + '"]))'
以上是关于在 spark Dataframe 中动态创建多列的主要内容,如果未能解决你的问题,请参考以下文章
使用多列作为存储在 Apache Spark 中的数组中的键来连接两个 Dataframe
合并 Spark DataFrame 中的多列 [Java]