Pyspark常用数据分析基础操作

Posted 山顶夕景

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pyspark常用数据分析基础操作相关的知识,希望对你有一定的参考价值。

文章目录

一、pyspark.sql部分

1.窗口函数

# 数据的分组聚合,找到每个用户最近的3次收藏beat(用window开窗函数)
from pyspark.sql.window import Window
import pyspark.sql.functions as F


window_out = Window.partitionBy("user_id") \\
                   .orderBy(F.desc("collect_time")) 
# user_feed.withColumn("rank", F.rank().over(window_out)).show(truncate = False)
# user_feed.withColumn("rank", F.rank().over(window_out)).show(40)
user_feed_test = user_feed.withColumn("rank", F.row_number().over(window_out)) \\
                          .where(F.col('rank') <= 3) 
user_feed_test.show(30)

结果如下,和mysql的窗口函数类似的,以每个user_id分组,然后组内排序,这里我只获取排序后collect_time前3的数据,即最近3次的用户收藏数据:

+--------+-------+------------+--------------------+----+
| user_id|beat_id|collect_type|        collect_time|rank|
+--------+-------+------------+--------------------+----+
|10065188| 827272|           4|2021-08-22 04:54:...|   1|
|10065188| 885812|           5|2020-10-23 18:53:...|   2|
|10068979|1069390|           5|2021-06-20 07:44:...|   1|
|10074915|     -2|           4|2021-11-27 13:42:...|   1|
|10074915|1122682|           4|2021-09-07 14:26:...|   2|
|10075397| 947751|           4|2022-01-30 07:30:...|   1|
|10075397| 336641|           5|2022-01-30 07:23:...|   2|
|10075397| 886179|           4|2022-01-05 10:35:...|   3|
|10104842| 886462|           1|2021-02-28 17:04:...|   1|
|10122654|1531961|           4|2022-03-16 11:09:...|   1|
|10122654| 893655|           4|2022-03-15 04:32:...|   2|
|10122654| 303121|           4|2022-03-14 05:59:...|   3|
|10134095|      0|           3|2021-07-24 13:02:...|   1|
|10134095|1023250|           4|2021-07-22 00:31:...|   2|
|10139927|      0|           5|2020-09-05 19:14:...|   1|
|10139927|      0|           5|2020-09-03 17:51:...|   2|
|10245428| 889915|           5|2020-05-18 14:41:...|   1|
|10245428|1073074|           5|2020-05-18 14:07:...|   2|
+--------+-------+------------+--------------------+----+

2.更换列名:

如现在有个人员信息表,新加上一列coun try字段信息:

# 修改列名
from pyspark.sql.functions import col
# df2 = df1.withColumn("avg_resp_rate", col("sum_def_imps")/col("sum_count")).withColumn("avg_ctr", col("sum_clicks")/col("sum_imps"))
 
# another example
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerType


data = [('James','','Smith','1991-04-01','M',3000),
  ('Michael','Rose','','2000-05-19','M',4000),
  ('Robert','','Williams','1978-09-05','M',4000),
  ('Maria','Anne','Jones','1967-12-01','F',4000),
  ('Jen','Mary','Brown','1980-02-17','F',-1)
]
print(data)

"""
[('James', '', 'Smith', '1991-04-01', 'M', 3000), 
('Michael', 'Rose', '', '2000-05-19', 'M', 4000), 
('Robert', '', 'Williams', '1978-09-05', 'M', 4000), 
('Maria', 'Anne', 'Jones', '1967-12-01', 'F', 4000), 
('Jen', 'Mary', 'Brown', '1980-02-17', 'F', -1)]
"""

先给出对应的字段,创建我们的DataFrame格式,然后通过withColumn新加上一列,其中lit("ABC")是指整列的数据都是对应的ABC字符串:

# schema只需要给出列名即可
columns = ["firstname","middlename","lastname","dob","gender","salary"]

# 增加
df = spark.createDataFrame(data=data, schema = columns)
df.show()

# 增加or修改列
df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.show()

df3 = df.withColumn("salary",col("salary")*100)
df3.show()

# lit默认均是USA
df5 = df.withColumn("Coun try", lit("ABC"))
df5.show()

结果如下:

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|  3000|
|  Michael|      Rose|        |2000-05-19|     M|  4000|
|   Robert|          |Williams|1978-09-05|     M|  4000|
|    Maria|      Anne|   Jones|1967-12-01|     F|  4000|
|      Jen|      Mary|   Brown|1980-02-17|     F|    -1|
+---------+----------+--------+----------+------+------+

+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname|       dob|gender|salary|
+---------+----------+--------+----------+------+------+
|    James|          |   Smith|1991-04-01|     M|300000|
|  Michael|      Rose|        |2000-05-19|     M|400000|
|   Robert|          |Williams|1978-09-05|     M|400000|
|    Maria|      Anne|   Jones|1967-12-01|     F|400000|
|      Jen|      Mary|   Brown|1980-02-17|     F|  -100|
+---------+----------+--------+----------+------+------+

+---------+----------+--------+----------+------+------+--以上是关于Pyspark常用数据分析基础操作的主要内容,如果未能解决你的问题,请参考以下文章

PySpark数据分析基础:pyspark.sql.SparkSession类方法详解及操作+代码展示

PySpark|RDD编程基础

Pyspark:与使用 pandas 创建数据帧相比,为啥使用 pyspark 创建数据帧需要更多时间 [重复]

PySpark DataFrame基础操作

[Pyspark]RDD常用方法总结

[Pyspark]RDD常用方法总结