在s3中使用pyspark合并多个小json文件[重复]

Posted

技术标签:

【中文标题】在s3中使用pyspark合并多个小json文件[重复]【英文标题】:merge multiple small json files using pyspark in s3 [duplicate] 【发布时间】:2020-02-15 23:24:13 【问题描述】:

我是 spark 的新手。

我的 s3 存储桶的子目录中有多个小的 json 文件 (1kb)。我想合并单个目录中存在的所有文件。使用 pyspark 是否有任何优化方法。

目录结构: 地区/年/月/日/小时/multiple_json_files

如上所述,我有许多目录,想将所有文件合并到一个目录中。

P.S:我尝试过使用 python 但它需要更多时间,尝试过 s3distcp 但结果相同。

谁能帮我解决这个问题

【问题讨论】:

region/year/month/day/hour/*/*/*/*/*/ 使用通配符。 【参考方案1】:

你可以通过下面的代码来实现

首先我们需要确保加载 spark 时 hadoop aws 包可用:

import os

os.environ['PYSPARK_SUBMIT_ARGS'] = "--packages=org.apache.hadoop:hadoop-aws:2.7.3 pyspark-shell"

接下来我们需要让 pyspark 在 jupyter notebook 中可用:

import findspark
findspark.init()
from pyspark.sql import SparkSession

我们需要 aws 凭证才能访问 s3 存储桶。我们可以使用 configparser 包从标准 aws 文件中读取凭证。

 import configparser
 config = configparser.ConfigParser()
 config.read(os.path.expanduser("~/.aws/credentials"))
 access_id = config.get(aws_profile, "aws_access_key_id") 
 access_key = config.get(aws_profile, "aws_secret_access_key")

我们可以启动 spark 会话并将 aws 凭据传递给 hadoop 配置:

 sc=spark.sparkContext
 hadoop_conf=sc._jsc.hadoopConfiguration()
 hadoop_conf.set("fs.s3n.impl", "org.apache.hadoop.fs.s3native.NativeS3FileSystem")
 hadoop_conf.set("fs.s3n.awsAccessKeyId", access_id)
 hadoop_conf.set("fs.s3n.awsSecretAccessKey", access_key)

终于可以读取数据并显示出来了:

 df=spark.read.json("s3n://path_of_location/*.json")
 df.show()

【讨论】:

以上是关于在s3中使用pyspark合并多个小json文件[重复]的主要内容,如果未能解决你的问题,请参考以下文章

在 Pyspark 中合并 DataFrame

无法使用本地 PySpark 从 S3 读取 json 文件

从 S3 读取大型 JSON 文件 (3K+) 并从数组中选择特定键

PySpark:在 Spark 数据框中读取多个 XML 文件(s3 路径列表)

如何使用给定的reduce函数基于pyspark中的字段合并多个JSON数据行

加载多个文件并且缺少一个文件时,PySpark 作业失败 [重复]