Pyspark常用数据分析基础操作
Posted 山顶夕景
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了Pyspark常用数据分析基础操作相关的知识,希望对你有一定的参考价值。
文章目录
- 一、pyspark.sql部分
- 1.窗口函数
- 2.更换列名:
- 3.sql将一个字段根据某个字符拆分成多个字段显示
- 4.pd和spark的dataframe进行转换:
- 5.报错ValueError: Some of types cannot be determined after inferring
- 6.df按行打乱
- 7.表格的联结
- 8.dataframe的操作
- 9.createDataFrame的几种方法
- 10.pd dataframe与spark dataframe转换,通过sql语句间接对pandas的dataframe进行处理
- 11.filter筛选
- 12. 新增或者修改spark.sql中dataframe的某列
- 13.将dataframe保存为csv文件
- 14. 取出对应表项内容
- 15.agg和groupby结合使用
- 二、Spark Core模块
- 三、MLlib模块
- 四、推荐算法
- Reference
一、pyspark.sql部分
1.窗口函数
# 数据的分组聚合,找到每个用户最近的3次收藏beat(用window开窗函数)
from pyspark.sql.window import Window
import pyspark.sql.functions as F
window_out = Window.partitionBy("user_id") \\
.orderBy(F.desc("collect_time"))
# user_feed.withColumn("rank", F.rank().over(window_out)).show(truncate = False)
# user_feed.withColumn("rank", F.rank().over(window_out)).show(40)
user_feed_test = user_feed.withColumn("rank", F.row_number().over(window_out)) \\
.where(F.col('rank') <= 3)
user_feed_test.show(30)
结果如下,和mysql的窗口函数类似的,以每个user_id
分组,然后组内排序,这里我只获取排序后collect_time
前3的数据,即最近3次的用户收藏数据:
+--------+-------+------------+--------------------+----+
| user_id|beat_id|collect_type| collect_time|rank|
+--------+-------+------------+--------------------+----+
|10065188| 827272| 4|2021-08-22 04:54:...| 1|
|10065188| 885812| 5|2020-10-23 18:53:...| 2|
|10068979|1069390| 5|2021-06-20 07:44:...| 1|
|10074915| -2| 4|2021-11-27 13:42:...| 1|
|10074915|1122682| 4|2021-09-07 14:26:...| 2|
|10075397| 947751| 4|2022-01-30 07:30:...| 1|
|10075397| 336641| 5|2022-01-30 07:23:...| 2|
|10075397| 886179| 4|2022-01-05 10:35:...| 3|
|10104842| 886462| 1|2021-02-28 17:04:...| 1|
|10122654|1531961| 4|2022-03-16 11:09:...| 1|
|10122654| 893655| 4|2022-03-15 04:32:...| 2|
|10122654| 303121| 4|2022-03-14 05:59:...| 3|
|10134095| 0| 3|2021-07-24 13:02:...| 1|
|10134095|1023250| 4|2021-07-22 00:31:...| 2|
|10139927| 0| 5|2020-09-05 19:14:...| 1|
|10139927| 0| 5|2020-09-03 17:51:...| 2|
|10245428| 889915| 5|2020-05-18 14:41:...| 1|
|10245428|1073074| 5|2020-05-18 14:07:...| 2|
+--------+-------+------------+--------------------+----+
2.更换列名:
如现在有个人员信息表,新加上一列coun try
字段信息:
# 修改列名
from pyspark.sql.functions import col
# df2 = df1.withColumn("avg_resp_rate", col("sum_def_imps")/col("sum_count")).withColumn("avg_ctr", col("sum_clicks")/col("sum_imps"))
# another example
from pyspark.sql.functions import col, lit
from pyspark.sql.types import StructType, StructField, StringType,IntegerType
data = [('James','','Smith','1991-04-01','M',3000),
('Michael','Rose','','2000-05-19','M',4000),
('Robert','','Williams','1978-09-05','M',4000),
('Maria','Anne','Jones','1967-12-01','F',4000),
('Jen','Mary','Brown','1980-02-17','F',-1)
]
print(data)
"""
[('James', '', 'Smith', '1991-04-01', 'M', 3000),
('Michael', 'Rose', '', '2000-05-19', 'M', 4000),
('Robert', '', 'Williams', '1978-09-05', 'M', 4000),
('Maria', 'Anne', 'Jones', '1967-12-01', 'F', 4000),
('Jen', 'Mary', 'Brown', '1980-02-17', 'F', -1)]
"""
先给出对应的字段,创建我们的DataFrame
格式,然后通过withColumn
新加上一列,其中lit("ABC")
是指整列的数据都是对应的ABC
字符串:
# schema只需要给出列名即可
columns = ["firstname","middlename","lastname","dob","gender","salary"]
# 增加
df = spark.createDataFrame(data=data, schema = columns)
df.show()
# 增加or修改列
df2 = df.withColumn("salary",col("salary").cast("Integer"))
df2.show()
df3 = df.withColumn("salary",col("salary")*100)
df3.show()
# lit默认均是USA
df5 = df.withColumn("Coun try", lit("ABC"))
df5.show()
结果如下:
+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+----------+------+------+
| James| | Smith|1991-04-01| M| 3000|
| Michael| Rose| |2000-05-19| M| 4000|
| Robert| |Williams|1978-09-05| M| 4000|
| Maria| Anne| Jones|1967-12-01| F| 4000|
| Jen| Mary| Brown|1980-02-17| F| -1|
+---------+----------+--------+----------+------+------+
+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+----------+------+------+
| James| | Smith|1991-04-01| M| 3000|
| Michael| Rose| |2000-05-19| M| 4000|
| Robert| |Williams|1978-09-05| M| 4000|
| Maria| Anne| Jones|1967-12-01| F| 4000|
| Jen| Mary| Brown|1980-02-17| F| -1|
+---------+----------+--------+----------+------+------+
+---------+----------+--------+----------+------+------+
|firstname|middlename|lastname| dob|gender|salary|
+---------+----------+--------+----------+------+------+
| James| | Smith|1991-04-01| M|300000|
| Michael| Rose| |2000-05-19| M|400000|
| Robert| |Williams|1978-09-05| M|400000|
| Maria| Anne| Jones|1967-12-01| F|400000|
| Jen| Mary| Brown|1980-02-17| F| -100|
+---------+----------+--------+----------+------+------+
+---------+----------+--------+----------+------+------+--以上是关于Pyspark常用数据分析基础操作的主要内容,如果未能解决你的问题,请参考以下文章
PySpark数据分析基础:pyspark.sql.SparkSession类方法详解及操作+代码展示