向数据框添加列并在 pyspark 中更新

Posted

技术标签:

【中文标题】向数据框添加列并在 pyspark 中更新【英文标题】:Adding column to dataframe and updating in pyspark 【发布时间】:2017-10-16 20:44:14 【问题描述】:

我在 pyspark 中有一个数据框:

ratings = spark.createDataFrame(
    sc.textFile("transactions.json").map(lambda l: json.loads(l)),
)
ratings.show()

+--------+-------------------+------------+----------+-------------+-------+
|click_id|         created_at|          ip|product_id|product_price|user_id|
+--------+-------------------+------------+----------+-------------+-------+
|     123|2016-10-03 12:50:33| 10.10.10.10|     98373|        220.5|      1|
|     124|2017-02-03 11:51:33| 10.13.10.10|     97373|        320.5|      1|
|     125|2017-10-03 12:52:33| 192.168.2.1|     96373|         20.5|      1|
|     126|2017-10-03 13:50:33|172.16.11.10|     88373|        220.5|      2|
|     127|2017-10-03 13:51:33| 10.12.15.15|     87373|        320.5|      2|
|     128|2017-10-03 13:52:33|192.168.1.10|     86373|         20.5|      2|
|     129|2017-08-03 14:50:33| 10.13.10.10|     78373|        220.5|      3|
|     130|2017-10-03 14:51:33| 12.168.1.60|     77373|        320.5|      3|
|     131|2017-10-03 14:52:33| 10.10.30.30|     76373|         20.5|      3|
+--------+-------------------+------------+----------+-------------+-------+

ratings.registerTempTable("transactions")
final_df = sqlContext.sql("select * from transactions");

我想在这个数据框中添加一个名为status 的新列,然后根据created_atuser_id 更新状态列。

created_atuser_id 从给定的表 transations 中读取并传递给函数 get_status(user_id,created_at),该函数返回 status。这个status需要放入事务表中作为对应user_idcreated_at的新列

我可以在 pyspark 中运行 alter 和 update 命令吗? 如何使用 pyspark 做到这一点?

【问题讨论】:

【参考方案1】:

目前尚不清楚您到底想做什么。您应该查看window functions,它们允许您比较、汇总...帧中的行。

例如

import pyspark.sql.functions as psf
from pyspark.sql import Window
w = Window.partitionBy("user_id").orderBy(psf.desc("created_at"))
ratings.withColumn(
    "status", 
    psf.when(psf.row_number().over(w) == 1, "active").otherwise("inactive")).sort("click_id").show()

+--------+-------------------+------------+----------+-------------+-------+--------+
|click_id|         created_at|          ip|product_id|product_price|user_id|  status|
+--------+-------------------+------------+----------+-------------+-------+--------+
|     123|2016-10-03 12:50:33| 10.10.10.10|     98373|        220.5|      1|inactive|
|     124|2017-02-03 11:51:33| 10.13.10.10|     97373|        320.5|      1|inactive|
|     125|2017-10-03 12:52:33| 192.168.2.1|     96373|         20.5|      1|  active|
|     126|2017-10-03 13:50:33|172.16.11.10|     88373|        220.5|      2|inactive|
|     127|2017-10-03 13:51:33| 10.12.15.15|     87373|        320.5|      2|inactive|
|     128|2017-10-03 13:52:33|192.168.1.10|     86373|         20.5|      2|  active|
|     129|2017-08-03 14:50:33| 10.13.10.10|     78373|        220.5|      3|inactive|
|     130|2017-10-03 14:51:33| 12.168.1.60|     77373|        320.5|      3|inactive|
|     131|2017-10-03 14:52:33| 10.10.30.30|     76373|         20.5|      3|  active|
+--------+-------------------+------------+----------+-------------+-------+--------+

它为您提供每个用户的最后一次点击

如果你想传递一个UDF 来从两个现有的列中创建一个新列。 假设您有一个将user_idcreated_at 作为参数的函数

from pyspark.sql.types import *
def get_status(user_id,created_at): 
    ...

get_status_udf = psf.udf(get_status, StringType())

StringType() 或您的函数输出的任何数据类型

ratings.withColumn("status", get_status_udf("user_id", "created_at"))

【讨论】:

created_atuser_id 从给定的表 transations 中读取并传递给返回 status 的函数 get_status(user_id,created_at)。这个status需要放入事务表中作为对应user_idcreated_at的新列

以上是关于向数据框添加列并在 pyspark 中更新的主要内容,如果未能解决你的问题,请参考以下文章

添加列并附加数据框

向 pyspark 中的数据框添加列

我们可以从另一个数据框向数据框添加新列吗

Pyspark 向数据框添加顺序和确定性索引

如何创建 Pyspark UDF 以向数据框添加新列

如何向视图添加列?