在s3中使用pyspark合并多个小json文件[重复]
Posted
技术标签:
【中文标题】在s3中使用pyspark合并多个小json文件[重复]【英文标题】:merge multiple small json files using pyspark in s3 [duplicate] 【发布时间】:2020-02-15 23:24:13 【问题描述】:我是 spark 的新手。
我的 s3 存储桶的子目录中有多个小的 json 文件 (1kb)。我想合并单个目录中存在的所有文件。使用 pyspark 是否有任何优化方法。
目录结构: 地区/年/月/日/小时/multiple_json_files
如上所述,我有许多目录,想将所有文件合并到一个目录中。
P.S:我尝试过使用 python 但它需要更多时间,尝试过 s3distcp 但结果相同。
谁能帮我解决这个问题
【问题讨论】:
对region/year/month/day/hour/
到*/*/*/*/*/
使用通配符。
【参考方案1】:
你可以通过下面的代码来实现
首先我们需要确保加载 spark 时 hadoop aws 包可用:
import os
os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell"
接下来我们需要让 pyspark 在 jupyter notebook 中可用:
import findspark
findspark.init()
from pyspark.sql import SparkSession
我们需要 aws 凭证才能访问 s3 存储桶。我们可以使用 configparser 包从标准 aws 文件中读取凭证。
import configparser
config = configparser.ConfigParser()
config.read(os.path.expanduser("~/.aws/credentials"))
access_id = config.get(aws_profile, "aws_access_key_id")
access_key = config.get(aws_profile, "aws_secret_access_key")
我们可以启动 spark 会话并将 aws 凭据传递给 hadoop 配置:
sc=spark.sparkContext
hadoop_conf=sc._jsc.hadoopConfiguration()
hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
hadoop_conf.set("fs.s3n.awsAccessKeyId", access_id)
hadoop_conf.set("fs.s3n.awsSecretAccessKey", access_key)
终于可以读取数据并显示出来了:
df=spark.read.json("s3n://path_of_location/*.json")
df.show()
【讨论】:
以上是关于在s3中使用pyspark合并多个小json文件[重复]的主要内容,如果未能解决你的问题,请参考以下文章
无法使用本地 PySpark 从 S3 读取 json 文件
从 S3 读取大型 JSON 文件 (3K+) 并从数组中选择特定键
PySpark:在 Spark 数据框中读取多个 XML 文件(s3 路径列表)