将文件列表 (JSON) 转换为数据框
Posted
技术标签:
【中文标题】将文件列表 (JSON) 转换为数据框【英文标题】:Transform a list of files (JSON) to a dataframe 【发布时间】:2018-02-12 13:33:32 【问题描述】:Spark 版本: '2.0.0.2.5.0.0-1245'
所以,我最初的问题有所改变,但仍然是同一个问题。
我想要做的是加载大量 JSON 文件并将它们转换为 DataFrame - 也可能将它们保存为 CSV 或 parquet 文件以供进一步处理。每个 JSON 文件代表最终 DataFrame 中的一行。
import os
import glob
HDFS_MOUNT = # ...
DATA_SET_BASE = # ...
schema = StructType([
StructField("documentId", StringType(), True),
StructField("group", StringType(), True),
StructField("text", StringType(), True)
])
# Get the file paths
file_paths = glob.glob(os.path.join(HDFS_MOUNT, DATA_SET_BASE, '**/*.json'))
file_paths = [f.replace(HDFS_MOUNT + '/', '') for f in file_paths]
print('Found :d files'.format(len(file_paths))) # 676 files
sql = SQLContext(sc)
df = sql.read.json(file_paths, schema=schema)
print('Loaded :d rows'.format(df.count())) # 9660 rows (what !?)
除了有 9660 行而不是 676 行(可用文件数)之外,我还遇到内容似乎是 None
的问题:
df.head(2)[0].asDict()
给予
'documentId': None,
'group': None,
'text': None,
示例数据
这当然只是假数据,但它与实际数据相似。
注意:某些字段可能会丢失,例如
text
不能一直存在。
a.json
"documentId" : "001",
"group" : "A",
"category" : "indexed_document",
"linkIDs": ["adiojer", "asdi555", "1337"]
b.json
"documentId" : "002",
"group" : "B",
"category" : "indexed_document",
"linkIDs": ["linkId", "1000"],
"text": "This is the text of this document"
【问题讨论】:
所有文件的结构都一样吗?它们是否放在一个目录中? @IuriiNedostup 好吧,RDBDocument.to_row
确保每一行具有相同的结构 - 我不知道是否有更好的方法来做到这一点,但每个 JSON 基本上代表最后一行数据框。是的,JSON 文件在同一个目录中。
【参考方案1】:
假设你所有的文件都具有相同的结构并且在同一个目录中:
df = sql_cntx.read.json('/hdfs/path/to/folder/*.json')
如果任何列的所有行都具有 Null 值,则可能会出现问题。然后 spark 将无法确定架构,因此您可以选择告诉 spark 使用哪个架构:
from pyspark import SparkContext, SQLContext
from pyspark.sql.types import StructType, StructField, StringType, LongType
sc = SparkContext(appName="My app")
sql_cntx = SQLContext(sc)
schema = StructType([
StructField("field1", StringType(), True),
StructField("field2", LongType(), True)
])
df = sql_cntx.read.json('/hdfs/path/to/folder/*.json', schema=schema)
统一更新: 如果文件具有多行格式的 json,您可以尝试以下代码:
sc = SparkContext(appName='Test')
sql_context = SQLContext(sc)
rdd = sc.wholeTextFiles('/tmp/test/*.json').values()
df = sql_context.read.json(rdd, schema=schema)
df.show()
【讨论】:
我明白了……嗯,是的……可能是因为空值。我现在看到的唯一选择是读取 json 标头,定义架构并制作数据框。 可以发几行作为例子吗? 我添加了两个“示例文件”。它表明,例如text
并不总是存在。所有字段都应为StringType
。我试图指定 schame 但 df.head(1)[0].asDict()
返回所有值都是 None
:/
我现在已经将我原来的问题稍微改成了我们现在所处的位置。奇怪的是行数是例如不同的..
快速问题:文件可以有多行吗?它们是如何分开的?以上是关于将文件列表 (JSON) 转换为数据框的主要内容,如果未能解决你的问题,请参考以下文章
如何将 json 对象列表转换为单个 pyspark 数据框?