Pyspark:如何遍历数据框列?
Posted
技术标签:
【中文标题】Pyspark:如何遍历数据框列?【英文标题】:Pyspark: How to iterate through data frame columns? 【发布时间】:2020-05-21 12:47:19 【问题描述】:我是 pyspark 的新手。我通常和熊猫一起工作。我使用 pyspark 中的列逐行迭代。我的数据集如下所示:-
+-------------------+--------------------+--------+-----+
| DateTime| user_name|keyboard|mouse|
+-------------------+--------------------+--------+-----+
|2019-10-21 08:35:01|prathameshsalap@g...| 333.0|658.0|
|2019-10-21 08:35:01|vaishusawant143@g...| 447.5| 0.0|
|2019-10-21 08:35:01| you@example.com| 0.5| 1.0|
|2019-10-21 08:40:01| you@example.com| 0.0| 0.0|
|2019-10-21 08:40:01|prathameshsalap@g...| 227.0|366.0|
|2019-10-21 08:40:02|vaishusawant143@g...| 472.0| 0.0|
|2019-10-21 08:45:01| you@example.com| 0.0| 0.0|
|2019-10-21 08:45:01|prathameshsalap@g...| 35.0|458.0|
|2019-10-21 08:45:01|vaishusawant143@g...| 1659.5| 0.0|
|2019-10-21 08:50:01| you@example.com| 0.0| 0.0|
+-------------------+--------------------+--------+-----+
在 pandas 数据框中,它也有一个给定的索引,但在 spark 中没有。 在熊猫中:-
## pandas
usr_log = pd.read_csv("data.csv")
unique_users = usr_log.user_name.unique()
usr_log.sort_values(by='DateTime', inplace=True)
users_new_data = dict()
users_new_data[user] = 'start_time': None
for user in unique_users:
count_idle = 0
## first part of the question
for index in usr_log.index:
if user == usr_log['user_name'][index]:
if users_new_data[user]['start_time'] is None:
users_new_data[user]['start_time'] = usr_log['DateTime'][index]
## Second part of the question
if usr_log['keyboard'][index] == 0 and usr_log['mouse'][index] == 0:
count_idle += 1
else:
count_idle = 0
if count_idle >= 5:
if count_idle == 5:
users_new_data[usr_log['user_name'][index]]['idle_time'] \
= users_new_data[usr_log['user_name'][index]].get('idle_time') \
+ datetime.timedelta(0, 1500)
else:
users_new_data[usr_log['user_name'][index]]['idle_time'] \
= users_new_data[usr_log['user_name'][index]].get('idle_time') \
+ datetime.timedelta(0, 300)
同样的事情如何在 spark 中做到这一点?
对于 5 分钟后生成的每个用户数据(例如,如果用户在 8:30:01 开始,则下一个日志在 8:35:01 生成)。在第二个问题中,我想为每个用户找到一个空闲时间。空闲时间的计算是如果他在接下来的 30 分钟(1500)内不移动鼠标或使用键盘,那么我添加用户空闲时间。
将字典值转换为数据框后,我的预期输出如下:-
+--------------------+-------------------+-------------------+
| user_name| start_time| idle_time|
+--------------------+-------------------+-------------------+
|prathameshsalap@g...|2019-10-21 08:35:01|2019-10-21 05:05:00|
|vaishusawant143@g...|2019-10-21 08:35:01|2019-10-21 02:15:00|
| you@example.com|2019-10-21 08:35:01|2019-10-21 01:30:00|
+--------------------+-------------------+-------------------+
【问题讨论】:
你能发布你的预期输出吗? 你能再检查一遍吗? @AjayKharade 这里一头雾水,无法关联输入输出数据集? 您有什么不明白的请澄清一下?在这里,按日期时间排序后,如果开始时间为无,我会找到每个用户的开始时间。我希望你明白了。 @AjayKharade 现在,我可以关联输入和输出了,谢谢。我已经发布了相同的解决方案。 【参考方案1】:如果你想为每个用户找到他们拥有的第一个时间戳,你可以先在 pandas 中简化它,这样做:
usr_log[['user_name','DateTime']].groupby(['user_name']).min()
和火花会非常相似
urs_log = sparkSession.read.csv(...)
urs_log.groupBy("user_name").agg(min("DateTime"))
您只需将DateTime
列重命名为您想要的列,并尽量不要使用for loops in pandas。
在 spark 中,你有一个分布式集合,不可能进行 for 循环,你必须对列应用转换,永远不要对单行数据应用逻辑。
【讨论】:
感谢您帮助我。我还有一个问题。我可以在这里发帖吗?与此问题相同。@Alfilercio 我在这里又添加了一个问题,请再次检查。 @Alfilercio【参考方案2】:这是相同的解决方案,
dataFrame = (spark.read.format("csv").option("sep", ",").option("header", "true").load("data.csv"))
df.show()
+-------------------+--------------------+--------+-----+
| DateTime| user_name|keyboard|mouse|
+-------------------+--------------------+--------+-----+
|2019-10-21 08:35:01|prathameshsalap@g...| 333.0|658.0|
|2019-10-21 08:35:01|vaishusawant143@g...| 447.5| 0.0|
|2019-10-21 08:35:01| you@example.com| 0.5| 1.0|
|2019-10-21 08:40:01|prathameshsalap@g...| 227.0|366.0|
|2019-10-21 08:40:02|vaishusawant143@g...| 472.0| 0.0|
|2019-10-21 08:45:01| you@example.com| 0.0| 0.0|
|2019-10-21 08:45:01|prathameshsalap@g...| 35.0|458.0|
|2019-10-21 08:45:01|vaishusawant143@g...| 1659.5| 0.0|
|2019-10-21 08:50:01| you@example.com| 0.0| 0.0|
+-------------------+--------------------+--------+-----+
df1 = df.groupBy("user_name").agg(min("DateTime"))
df1.show()
+--------------------+-------------------+
| user_name| min(DateTime)|
+--------------------+-------------------+
|prathameshsalap@g...|2019-10-21 08:35:01|
|vaishusawant143@g...|2019-10-21 08:35:01|
| you@example.com|2019-10-21 08:35:01|
+--------------------+-------------------+
其他部分 -
df1 = df.withColumn("count",when(((col("keyboard")==0.0) & (col("mouse")==0.0)), count_idle+1).otherwise(0))
df2 = df1.withColumn("Idle_Sec",when((col("count")==0), 300).otherwise(1500))
df2.show()
+-------------------+--------------------+--------+-----+-----+--------+
| DateTime| user_name|keyboard|mouse|count|Idle_Sec|
+-------------------+--------------------+--------+-----+-----+--------+
|2019-10-21 08:35:01|prathameshsalap@g...| 333.0|658.0| 0| 300|
|2019-10-21 08:35:01|vaishusawant143@g...| 447.5| 0.0| 0| 300|
|2019-10-21 08:35:01| you@example.com| 0.5| 1.0| 0| 300|
|2019-10-21 08:40:01| you@example.com| 0.0| 0.0| 1| 1500|
|2019-10-21 08:40:01|prathameshsalap@g...| 227.0|366.0| 0| 300|
|2019-10-21 08:40:02|vaishusawant143@g...| 472.0| 0.0| 0| 300|
|2019-10-21 08:45:01| you@example.com| 0.0| 0.0| 1| 1500|
|2019-10-21 08:45:01|prathameshsalap@g...| 35.0|458.0| 0| 300|
|2019-10-21 08:45:01|vaishusawant143@g...| 1659.5| 0.0| 0| 300|
|2019-10-21 08:50:01| you@example.com| 0.0| 0.0| 1| 1500|
+-------------------+--------------------+--------+-----+-----+--------+
df3 = df2.groupBy("user_name").agg(min("DateTime").alias("start_time"),sum("Idle_Sec").alias("Sum_Idle_Sec"))
+--------------------+-------------------+------------+
| user_name| start_time|Sum_Idle_Sec|
+--------------------+-------------------+------------+
|prathameshsalap@g...|2019-10-21 08:35:01| 900|
|vaishusawant143@g...|2019-10-21 08:35:01| 900|
| you@example.com|2019-10-21 08:35:01| 4800|
+--------------------+-------------------+------------+
df3.withColumn("Idle_time",(F.unix_timestamp("start_time") + col("Sum_Idle_Sec")).cast('timestamp')).show()
+--------------------+-------------------+---------+----------------------+
| user_name| start_time|Sum_Idle_Sec| Idle_time|
+--------------------+-------------------+---------+----------------------+
|prathameshsalap@g...|2019-10-21 08:35:01| 900|2019-10-21 08:50:01|
|vaishusawant143@g...|2019-10-21 08:35:01| 900|2019-10-21 08:50:01|
| you@example.com|2019-10-21 08:35:01| 4800|2019-10-21 09:55:01|
+--------------------+-------------------+---------+----------------------+
【讨论】:
感谢您帮助我@Ajay Kharede。我还有一个与此主题相关的问题。 你可以问下一个问题,但如果上下文不同,最好将下一个问题放在 SO 上。 请再次检查。再次感谢您。@Ajay Kharade 不确定,我无法匹配您的输出,也找不到与之匹配的逻辑,我编辑了答案,您正在寻找吗? 对不起,你没有明白我的意思。在这里,我想计算每个用户的空闲时间。如果任何用户不移动鼠标或使用键盘,则持续 30 分钟。然后在空闲时间增加 30 分钟。像任何用户时间 10:30:45 到 11:06:00 一样,不要移动鼠标或使用键盘,然后我想添加 35:15 (35min15sec) (2019-10-21 00:35:15)。 @Ajay 卡拉德【参考方案3】:您应该按照以下示例进行操作:
df.withColumn("user_name", do_something)“do_something”可以是您定义的任何函数。
【讨论】:
我只想使用 spark 数据框。所以当我创建一个新的 do_something() 函数来迭代如何使用火花时。以上是关于Pyspark:如何遍历数据框列?的主要内容,如果未能解决你的问题,请参考以下文章
如何将 pyspark 数据框列中的值与 pyspark 中的另一个数据框进行比较