转换列并更新 DataFrame

Posted

技术标签:

【中文标题】转换列并更新 DataFrame【英文标题】:Transforming a column and update the DataFrame 【发布时间】:2018-03-08 12:53:08 【问题描述】:

所以,我在下面做的是从DataFrame 中删除一列A,因为我想应用一个转换(这里我只是json.loads 一个JSON 字符串)并用转换后的列替换旧列一。转换后,我只需加入两个结果数据框。

df = df_data.drop('A').join(
    df_data[['ID', 'A']].rdd\
        .map(lambda x: (x.ID, json.loads(x.A)) 
             if x.A is not None else (x.ID, None))\
        .toDF()\
        .withColumnRenamed('_1', 'ID')\
        .withColumnRenamed('_2', 'A'),
    ['ID']
)

我不喜欢这当然是我面临的开销,因为我必须执行withColumnRenamed 操作。

有了 pandas 我都会做这样的事情:

pdf = pd.DataFrame([json.dumps([0]*np.random.randint(5,10)) for i in range(10)], columns=['A'])
pdf.A = pdf.A.map(lambda x: json.loads(x))
pdf

但以下在 pyspark 中不起作用:

df.A = df[['A']].rdd.map(lambda x: json.loads(x.A))

那么有没有比我在第一个代码中所做的更简单的方法?

【问题讨论】:

.toDF() 可以为列名采用位置参数。如果您担心对withColumnRenamed() 的调用,您可以通过toDF(["ID", "A"]) 来避免它。 【参考方案1】:

我认为您不需要删除该列并进行连接。以下代码应* 与您发布的内容等效:

cols = df_data.columns
df = df_data.rdd\
    .map(
        lambda row: tuple(
            [row[c] if c != 'A' else (json.loads(row[c]) if row[c] is not None else None) 
             for c in cols]
        )
    )\
    .toDF(cols)

*我还没有实际测试过这段代码,但我认为这应该可以工作。

但要回答您的一般问题,您可以使用 withColumn() 就地转换列。

df = df_data.withColumn("A", my_transformation_function("A").alias("A"))

其中my_transformation_function() 可以是udfpyspark sql function

【讨论】:

啊,第二个选项绝对是我想要的!谢谢!【参考方案2】:

据我所知,您正在努力实现这样的目标吗?

import pyspark.sql.functions as F
import json

json_convert = F.udf(lambda x: json.loads(x) if x is not None else None)

cols = df_data.columns
df = df_data.select([json_convert(F.col('A')).alias('A')] + \
                    [col for col in cols if col != 'A'])

【讨论】:

这是我们能做的最简单的事情吗? 恐怕是的:) 不,你可以做一些更简单的事情:df = df_data.withColumn("A", json_convert(F.col('A')).alias('A')) 我同意@pault 不知道我是怎么错过的 啊,太好了。谢谢mayank,但我会接受@pault 的回答,因为它肯定更简单。但是感谢你们俩!

以上是关于转换列并更新 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

更新列并从更新的 col 更新另一个 col

如何根据数据类型识别列并在pyspark中转换它们?

更新列并立即使用结果更新同一语句中的第二列

计算重复列并更新计数器列

返回 Laravel 中模型的特定列并转换为 Json

向数据框添加列并在 pyspark 中更新