转换列并更新 DataFrame
Posted
技术标签:
【中文标题】转换列并更新 DataFrame【英文标题】:Transforming a column and update the DataFrame 【发布时间】:2018-03-08 12:53:08 【问题描述】:所以,我在下面做的是从DataFrame
中删除一列A
,因为我想应用一个转换(这里我只是json.loads
一个JSON 字符串)并用转换后的列替换旧列一。转换后,我只需加入两个结果数据框。
df = df_data.drop('A').join(
df_data[['ID', 'A']].rdd\
.map(lambda x: (x.ID, json.loads(x.A))
if x.A is not None else (x.ID, None))\
.toDF()\
.withColumnRenamed('_1', 'ID')\
.withColumnRenamed('_2', 'A'),
['ID']
)
我不喜欢这当然是我面临的开销,因为我必须执行withColumnRenamed
操作。
有了 pandas 我都会做这样的事情:
pdf = pd.DataFrame([json.dumps([0]*np.random.randint(5,10)) for i in range(10)], columns=['A'])
pdf.A = pdf.A.map(lambda x: json.loads(x))
pdf
但以下在 pyspark 中不起作用:
df.A = df[['A']].rdd.map(lambda x: json.loads(x.A))
那么有没有比我在第一个代码中所做的更简单的方法?
【问题讨论】:
.toDF()
可以为列名采用位置参数。如果您担心对withColumnRenamed()
的调用,您可以通过toDF(["ID", "A"])
来避免它。
【参考方案1】:
我认为您不需要删除该列并进行连接。以下代码应* 与您发布的内容等效:
cols = df_data.columns
df = df_data.rdd\
.map(
lambda row: tuple(
[row[c] if c != 'A' else (json.loads(row[c]) if row[c] is not None else None)
for c in cols]
)
)\
.toDF(cols)
*我还没有实际测试过这段代码,但我认为这应该可以工作。
但要回答您的一般问题,您可以使用 withColumn()
就地转换列。
df = df_data.withColumn("A", my_transformation_function("A").alias("A"))
其中my_transformation_function()
可以是udf
或pyspark sql function
。
【讨论】:
啊,第二个选项绝对是我想要的!谢谢!【参考方案2】:据我所知,您正在努力实现这样的目标吗?
import pyspark.sql.functions as F
import json
json_convert = F.udf(lambda x: json.loads(x) if x is not None else None)
cols = df_data.columns
df = df_data.select([json_convert(F.col('A')).alias('A')] + \
[col for col in cols if col != 'A'])
【讨论】:
这是我们能做的最简单的事情吗? 恐怕是的:) 不,你可以做一些更简单的事情:df = df_data.withColumn("A", json_convert(F.col('A')).alias('A'))
我同意@pault 不知道我是怎么错过的
啊,太好了。谢谢mayank,但我会接受@pault 的回答,因为它肯定更简单。但是感谢你们俩!以上是关于转换列并更新 DataFrame的主要内容,如果未能解决你的问题,请参考以下文章