pyspark读写S3文件与简单处理(指定Schema,直接写S3或先本地再上传)
Posted 詩和遠方
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了pyspark读写S3文件与简单处理(指定Schema,直接写S3或先本地再上传)相关的知识,希望对你有一定的参考价值。
概述
随着AWS的流行,越来越多的企业将数据存储在S3上构建数据湖,本文示例如何用PySpark读取S3上的数据,并用结构化API处理与展示,最后简单并讨论直接写到S3与先写到本地再上传到S3的性能对比。
初始化Spark Session
读取Spark需要$SPARK_HOME/jars
下包含hadoop-aws相关jar包,目前 aws-java-sdk-1.7.4.jar、hadoop-aws-2.7.7.jar为推荐版本, 一般上述jar包在 $HADOOP_HOME/share/hadoop/tools/lib/
下均可找到,复制到$SPARK_HOME/jars
下即可。若没装Hadoop可以自行网上下载上述Jar包。
以下代码中,cfg配置文件存储了AWS的访问凭证,用 configparser 模块读取。
import os
import pyspark
from pyspark.sql import SparkSession
import configparser
config = configparser.ConfigParser()
config.read("dl.cfg")
AWS_ACCESS_KEY_ID = config['AWS']['AWS_ACCESS_KEY_ID']
AWS_SECRET_ACCESS_KEY = config['AWS']['AWS_SECRET_ACCESS_KEY']
spark = SparkSession.builder\\
.config("spark.hadoop.fs.s3a.access.key", AWS_ACCESS_KEY_ID)\\
.config("spark.hadoop.fs.s3a.secret.key", AWS_SECRET_ACCESS_KEY)\\
.config("spark.hadoop.fs.s3a.impl", "org.apache.hadoop.fs.s3a.S3AFileSystem")\\
.getOrCreate()
也可直接指定包配置
来启动,运行时会自动下载,这样 sparkSession 初始化会比较缓慢,不推荐:
import os
import pyspark
from pyspark.sql import SparkSession
os.environ['AWS_ACCESS_KEY_ID'] = config['AWS']['AWS_ACCESS_KEY_ID']
os.environ['AWS_SECRET_ACCESS_KEY'] = config['AWS']['AWS_SECRET_ACCESS_KEY']
spark = SparkSession.builder\\
.config("spark.jars.packages","org.apache.hadoop:hadoop-aws:2.7.7")\\
.getOrCreate()
从S3读取文件
# 读取单个文件
df_song = spark.read.format('json').load('s3a://MyBucket/song_data/A/A/A/TRAAAAK128F9318786.json')
# 读取多个文件
df_songs = spark.read.format('json').load('s3a://MyBucket/song_data/A/A/A/*.json')
# cache到本地,下次读取加快速度
df_songs.cache()
df_songs.count()
24
df_songs.select('artist_id').take(5)
[Row(artist_id='ARTC1LV1187B9A4858'),
Row(artist_id='ARA23XO1187B9AF18F'),
Row(artist_id='ARSVTNL1187B992A91'),
Row(artist_id='AR73AIO1187B9AD57B'),
Row(artist_id='ARXQBR11187B98A2CC')]
读取CSV
# 若文件巨大,cache慎用
df = spark.read.csv("s3a://MyBucket/pagila/payment/payment.csv").cache()
df.printSchema()
df.show(5)
df.count()
root
|-- _c0: string (nullable = true)
+--------------------+
| _c0|
+--------------------+
|payment_id;custom...|
|16050;269;2;7;1.9...|
|16051;269;1;98;0....|
|16052;269;2;678;6...|
|16053;269;2;703;0...|
+--------------------+
only showing top 5 rows
16050
可以发现CSV默认以逗号分隔,但上述文件是分号,故读出来只有一列,且应指定表头
指定分隔符号
# 这里换一种方法读 csv,本质是一样的
df = spark.read\\
.format('csv')\\
.option('sep',';')\\
.option('inferSchema',True)\\
.option('header',True)\\
.load("s3a://MyBucket/pagila/payment/payment.csv")
# 若文件巨大,cache慎用
df.cache()
# 成功分列
df.printSchema()
root
|-- payment_id: integer (nullable = true)
|-- customer_id: integer (nullable = true)
|-- staff_id: integer (nullable = true)
|-- rental_id: integer (nullable = true)
|-- amount: double (nullable = true)
|-- payment_date: string (nullable = true)
df.show(5)
+----------+-----------+--------+---------+------+--------------------+
|payment_id|customer_id|staff_id|rental_id|amount| payment_date|
+----------+-----------+--------+---------+------+--------------------+
| 16050| 269| 2| 7| 1.99|2017-01-24 21:40:...|
| 16051| 269| 1| 98| 0.99|2017-01-25 15:16:...|
| 16052| 269| 2| 678| 6.99|2017-01-28 21:44:...|
| 16053| 269| 2| 703| 0.99|2017-01-29 00:58:...|
| 16054| 269| 1| 750| 4.99|2017-01-29 08:10:...|
+----------+-----------+--------+---------+------+--------------------+
only showing top 5 rows
数据类型转换
将 payment_date 列由字符串转换为日期类型
import pyspark.sql.functions as fn
df_new = df.withColumn('payment_date',fn.to_timestamp('payment_date','yyyy-MM-dd HH:mm:ss'))
df_new.printSchema()
df_new.show(5)
root
|-- payment_id: integer (nullable = true)
|-- customer_id: integer (nullable = true)
|-- staff_id: integer (nullable = true)
|-- rental_id: integer (nullable = true)
|-- amount: double (nullable = true)
|-- payment_date: timestamp (nullable = true)
+----------+-----------+--------+---------+------+-------------------+
|payment_id|customer_id|staff_id|rental_id|amount| payment_date|
+----------+-----------+--------+---------+------+-------------------+
| 16050| 269| 2| 7| 1.99|2017-01-24 21:40:19|
| 16051| 269| 1| 98| 0.99|2017-01-25 15:16:50|
| 16052| 269| 2| 678| 6.99|2017-01-28 21:44:14|
| 16053| 269| 2| 703| 0.99|2017-01-29 00:58:02|
| 16054| 269| 1| 750| 4.99|2017-01-29 08:10:06|
+----------+-----------+--------+---------+------+-------------------+
only showing top 5 rows
增加月份列
# DataFrame API 方法
df_new = df_new.withColumn('month',fn.month('payment_date'))
df_new.show(5)
+----------+-----------+--------+---------+------+-------------------+-----+
|payment_id|customer_id|staff_id|rental_id|amount| payment_date|month|
+----------+-----------+--------+---------+------+-------------------+-----+
| 16050| 269| 2| 7| 1.99|2017-01-24 21:40:19| 1|
| 16051| 269| 1| 98| 0.99|2017-01-25 15:16:50| 1|
| 16052| 269| 2| 678| 6.99|2017-01-28 21:44:14| 1|
| 16053| 269| 2| 703| 0.99|2017-01-29 00:58:02| 1|
| 16054| 269| 1| 750| 4.99|2017-01-29 08:10:06| 1|
+----------+-----------+--------+---------+------+-------------------+-----+
only showing top 5 rows
# SQL API 方法
df_new.drop('momth') # 先删除上述已有 month 列
df_new.createOrReplaceTempView('vw_df_payment')
df_new = spark.sql('''
select payment_id
,customer_id
,staff_id
,rental_id
,amount
,payment_date
,month(payment_date) as month
from vw_df_payment
''')
df_new.show(5)
+----------+-----------+--------+---------+------+-------------------+-----+
|payment_id|customer_id|staff_id|rental_id|amount| payment_date|month|
+----------+-----------+--------+---------+------+-------------------+-----+
| 16050| 269| 2| 7| 1.99|2017-01-24 21:40:19| 1|
| 16051| 269| 1| 98| 0.99|2017-01-25 15:16:50| 1|
| 16052| 269| 2| 678| 6.99|2017-01-28 21:44:14| 1|
| 16053| 269| 2| 703| 0.99|2017-01-29 00:58:02| 1|
| 16054| 269| 1| 750| 4.99|2017-01-29 08:10:06| 1|
+----------+-----------+--------+---------+------+-------------------+-----+
only showing top 5 rows
SQL聚合计算-计算月度利润
df_new.createOrReplaceTempView('vw_df_payment')
spark.sql('''
select Month,sum(amount) as Revenue
from vw_df_payment
group by month
order by month
''').show()
+-----+------------------+
|Month| Revenue|
+-----+------------------+
| 1| 4824.429999999856|
| 2| 9631.879999999608|
| 3|23886.560000002115|
| 4|28559.460000003943|
| 5| 514.180000000001|
+-----+------------------+
df.count()
16049
df_new.count()
16049
读取CSV,指定schema
infer schema比较费时而且难免会有与实际不符的情况,下面我们指定schema。
指定Schema需要构造StructType类,或直接用类似SQL DDL风格的字符串。
# from pyspark.sql.types import StructType,StructField,IntegerType,DoubleType,DateType,TimestampType
# my_schema = StructType([
# StructField('payment_id' ,IntegerType()),
# StructField('customer_id',IntegerType()),
# StructField('staff_id',IntegerType()),
# StructField('rental_id',IntegerType()),
# StructField('amount',DoubleType()),
# StructField('payment_date',TimestampType())
# ])
# 除上述定义方法外,如下 DDL风格的 schema 定义亦可 (varchar 对应 string,datetime 对应 timestamp )
# 注:含 timestamp 类型时,一定要指定 timestampFormat
# yyyy-MM-dd HH:mm:ss,如果用 yyyy-MM-dd HH:mm:ss.SSSSSS
# spark会将 2017-01-24 21:40:19.996577+00 的微秒读为 996577000,并加到分钟
# 所以只能舍掉 SSSSSS 秒,或先读为字符串,再另外处理
my_schema = '''
payment_id int,
customer_id int,
staff_id int,
rental_id int,
amount double,
payment_date timestamp
'''
df = spark.read\\
.format('csv')\\
.schema(my_schema)\\
.option('sep',';')\\
.option('header',True)\\
.option("timestampFormat", "yyyy-MM-dd HH:mm:ss")\\
.load("s3a://MyBucket/pagila/payment/payment.csv")
df.printSchema()
df.show(5)
root
|-- payment_id: integer (nullable = true)
|-- customer_id: integer (nullable = true)
|-- staff_id: integer (nullable = true)
|-- rental_id: integer (nullable = true)
|-- amount: double (nullable = true)
|-- payment_date: timestamp (nullable = true)
+----------+-----------+--------+---------+------+-------------------+
|payment_id|customer_id|staff_id|rental_id|amount| payment_date|
+----------+-----------+--------+---------+------+-------------------+
| 16050| 269| 2| 7| 1.99|2017-01-24 21:40:19|
| 16051| 269| 1| 98| 0.99|2017-01-25 15:16:50|
| 16052| 269| 2| 678| 6.99|2017-01-28 21:44:14|
| 16053| 269| 2| 703| 0.99|2017-01-29 00:58:02|
| 16054| 269| 1| 750| 4.99|2017-01-29 08:10:06|
+----------+-----------+--------+---------+------+-------------------+
only showing top 5 rows
df.select('payment_date').take(5)
[Row(payment_date=datetime.datetime(2017, 1, 24, 21, 40, 19)),
Row(payment_date=datetime.datetime(2017, 1, 25, 15, 16, 50)),
Row(payment_date=datetime.datetime(2017, 1, 28, 21, 44, 14)),
Row(payment_date=datetime.datetime(2017, 1, 29, 0, 58, 2)),
Row(payment_date=datetime.datetime(2017, 1, 29, 8, 10, 6))]
df.createOrReplaceTempView('vw_df_payment')
spark.sql('''
select month(payment_date) as Month,sum(amount) as Revenue
from vw_df_payment
group by month(payment_date)
order by month
''').show()
+-----+------------------+
|Month| Revenue|
+-----+------------------+
| 1| 4824.429999999856|
| 2| 9631.879999999608|
| 3|23886.560000002115|
| 4|28559.460000003943|
| 5| 514.180000000001|
+-----+------------------+
写文件到S3
直接写到S3应该是比较慢的,可以先保存在本地再上传到S3,或保存到AWS EMR的HDFS中,再传到S3,虽然麻烦,可能会快一些 。
直接写S3
import time
s = time.time()
df.write.format('json')\\
.mode('overwrite')\\
.save('s3a://bucket-vincent/payment.json')
print(f'\\nrun time:time.time() - s s')
run time:121.28353834152222 s
先写到本地,再copy到S3
copy到S3调用了AWS CLI
命令
import subprocess as sub
s = time.time()
df.write.format('json')\\
.mode('overwrite')\\
.save('/home/ghost/workdata/payment.json')
s3_copy_cmd = [
'aws','s3','cp',
'/home/ghost/workdata/payment.json',
's3://bucket-vincent/payment.json',
'--recursive','--exclude','*.crc'
]
sub.run(s3_copy_cmd)
print(f'\\nrun time:time.time() - s s')
run time:30.206376314163208 s
可以发现后者运行时间有所减少。
相关参考
https://sparkour.urizone.net/recipes/using-s3/
https://www.jitsejan.com/using-spark-to-read-from-s3.html
https://gist.github.com/claudinei-daitx/3766d01b070f3f0f8d1b64fd06b71585
https://gist.github.com/bachwehbi/49e0035bdcf3d420a181415e02a189b7
https://stackoverflow.com/questions/42822483/extremely-slow-s3-write-times-from-emr-spark
以上是关于pyspark读写S3文件与简单处理(指定Schema,直接写S3或先本地再上传)的主要内容,如果未能解决你的问题,请参考以下文章
PYSPARK - 如何读取 S3 中所有子文件夹中的所有 csv 文件?