在 spark Dataframe 中动态创建多列

Posted

技术标签:

【中文标题】在 spark Dataframe 中动态创建多列【英文标题】:Creating multiple columns in spark Dataframe dynamically 【发布时间】:2017-09-11 11:50:40 【问题描述】:

我有字典,其中包含以下信息,

dict_segs = 'key1' : 'a' : 'col1' : 'value1', 'col2' : 'value2', 'col3': 'value3', 
                'b' : 'col2' : 'value2', 'col3' : 'value3', 
                'c' : 'col1' : 'value1',
        'key2' : 'd' : 'col3' : 'value3', 'col2' : 'value2',
                'f' : 'col1' : 'value1', 'col4' : 'value4'

待办事项:

keys 基本上是“segments”,其基础字典,即 key1 的 a、b、c 是“subsegments”。对于每个子段,过滤条件在子段的基础字典中可用,即 a、b、c、d、f。另外,子段字典键的过滤条件也是pyspark数据框的列名。

我想在 pyspark 数据框中为每个段一次性创建子段列,当满足过滤条件时,每个子段列的值将为 1,否则为 0,类似于,

for item in dict_segs:
    pyspark_dataframe.withColumn(*dict_segs[item].keys(), when(meeting filter criteria with respect to each key), 1).otherwise(0))

在进行研究时,我能够在 scala 中找到类似的东西,但列过滤条件是静态的,但对于上述逻辑,即动态。请看下面的scala逻辑,

Spark/Scala repeated calls to withColumn() using the same function on multiple columns

需要支持根据上面的伪代码为每个段派生上述逻辑。

谢谢。

【问题讨论】:

【参考方案1】:

您正在寻找select 声明:

让我们创建一个示例数据框:

df = spark.createDataFrame(
    sc.parallelize([["value" + str(i) for i in range(1, 5)], ["value" + str(i) for i in range(5, 9)]]), 
    ["col" + str(i) for i in range(1, 5)]
)

+------+------+------+------+
|  col1|  col2|  col3|  col4|
+------+------+------+------+
|value1|value2|value3|value4|
|value5|value6|value7|value8|
+------+------+------+------+

现在对于字典中的所有 keysdict_seg[key] 中的所有 subkeysdict_seg[key][subkey] 中的所有 columns

import pyspark.sql.functions as psf
df.select(
    ["*"] +
    [
        eval('&'.join([
            '(df["' + c + '"] == "' + dict_segs[k][sk][c] + '")' for c in dict_segs[k][sk].keys()
        ])).cast("int").alias(sk) 
        for k in dict_segs.keys() for sk in dict_segs[k].keys()
    ]
).show()

+------+------+------+------+---+---+---+---+---+
|  col1|  col2|  col3|  col4|  a|  b|  c|  d|  f|
+------+------+------+------+---+---+---+---+---+
|value1|value2|value3|value4|  1|  1|  1|  1|  1|
|value5|value6|value7|value8|  0|  0|  0|  0|  0|
+------+------+------+------+---+---+---+---+---+
"*" 允许您保留所有以前存在的列,可以将其替换为 df.columnsalias(sk) 允许您将名称 sk 赋予新列 cast("int") 将 boolean 类型更改为 int 类型

我真的不明白为什么你有一个深度 3 的字典,似乎 key1, key2 并不是真的有用。

【讨论】:

感谢完美的解决方案。对于字典中的额外级别。我在其中一个过滤条件中使用此信息。但是,经过分析发现您的观点是有效的,并且不是必需的。掉了一级,最后保留了两级。 酷我很高兴能帮上忙。不要忘记将问题标记为已解决 当然,你能帮忙传递列表而不是价值。所以我修改为: eval('&'.join([ '(df["' + c + '"] == "' + dict_segs[k][sk][c] + '")' for c in dict_segs[k][sk].keys() ])) 修改:eval('&'.join([ '(tbl["' + c + '"].isin("' + v + '") )' for c, v in self.dict_tstPlan[sk].iteritems() ])) 我想传递 'v' 值的列表... 您必须使用 '","'.join(v) 将列表转换为字符串,因此在 eval 函数中您最终会得到:eval('&'.join([ '(tbl["' + c + '"].isin(["' + '","'.join(v) + '"]))'

以上是关于在 spark Dataframe 中动态创建多列的主要内容,如果未能解决你的问题,请参考以下文章

使用多列作为存储在 Apache Spark 中的数组中的键来连接两个 Dataframe

合并 Spark DataFrame 中的多列 [Java]

将 Spark Dataframe 字符串列拆分为多列

将 Spark Dataframe 字符串列拆分为多列

Apache Spark Dataframe Groupby agg() 用于多列

从 Spark DataFrame 中的单列派生多列