将文件列表 (JSON) 转换为数据框

Posted

技术标签:

【中文标题】将文件列表 (JSON) 转换为数据框【英文标题】:Transform a list of files (JSON) to a dataframe 【发布时间】:2018-02-12 13:33:32 【问题描述】:

Spark 版本: '2.0.0.2.5.0.0-1245'


所以,我最初的问题有所改变,但仍然是同一个问题。

我想要做的是加载大量 JSON 文件并将它们转换为 DataFrame - 也可能将它们保存为 CSV 或 parquet 文件以供进一步处理。每个 JSON 文件代表最终 DataFrame 中的一行。

import os
import glob

HDFS_MOUNT = # ...
DATA_SET_BASE = # ...

schema = StructType([
        StructField("documentId", StringType(), True),
        StructField("group", StringType(), True),
        StructField("text", StringType(), True)     
])

# Get the file paths    
file_paths = glob.glob(os.path.join(HDFS_MOUNT, DATA_SET_BASE, '**/*.json'))
file_paths = [f.replace(HDFS_MOUNT + '/', '') for f in file_paths]

print('Found :d files'.format(len(file_paths))) # 676 files

sql = SQLContext(sc)

df = sql.read.json(file_paths, schema=schema)

print('Loaded :d rows'.format(df.count())) # 9660 rows (what !?)

除了有 9660 行而不是 676 行(可用文件数)之外,我还遇到内容似乎是 None 的问题:

df.head(2)[0].asDict()

给予


 'documentId': None,
 'group': None,
 'text': None,


示例数据

这当然只是假数据,但它与实际数据相似。

注意:某些字段可能会丢失,例如text 不能一直存在。

a.json


  "documentId" : "001",
  "group" : "A",
  "category" : "indexed_document",
  "linkIDs": ["adiojer", "asdi555", "1337"]

b.json


  "documentId" : "002",
  "group" : "B",
  "category" : "indexed_document",
  "linkIDs": ["linkId", "1000"],
  "text": "This is the text of this document"

【问题讨论】:

所有文件的结构都一样吗?它们是否放在一个目录中? @IuriiNedostup 好吧,RDBDocument.to_row 确保每一行具有相同的结构 - 我不知道是否有更好的方法来做到这一点,但每个 JSON 基本上代表最后一行数据框。是的,JSON 文件在同一个目录中。 【参考方案1】:

假设你所有的文件都具有相同的结构并且在同一个目录中:

df = sql_cntx.read.json('/hdfs/path/to/folder/*.json')

如果任何列的所有行都具有 Null 值,则可能会出现问题。然后 spark 将无法确定架构,因此您可以选择告诉 spark 使用哪个架构:

from pyspark import SparkContext, SQLContext
from pyspark.sql.types import StructType, StructField, StringType, LongType

sc = SparkContext(appName="My app")
sql_cntx = SQLContext(sc)

schema = StructType([
    StructField("field1", StringType(), True),
    StructField("field2", LongType(), True)
])

df = sql_cntx.read.json('/hdfs/path/to/folder/*.json', schema=schema)

统一更新: 如果文件具有多行格式的 json,您可以尝试以下代码:

sc = SparkContext(appName='Test') 
sql_context = SQLContext(sc) 

rdd = sc.wholeTextFiles('/tmp/test/*.json').values() 
df = sql_context.read.json(rdd, schema=schema)
df.show()

【讨论】:

我明白了……嗯,是的……可能是因为空值。我现在看到的唯一选择是读取 json 标头,定义架构并制作数据框。 可以发几行作为例子吗? 我添加了两个“示例文件”。它表明,例如text 并不总是存在。所有字段都应为StringType。我试图指定 schame 但 df.head(1)[0].asDict() 返回所有值都是 None :/ 我现在已经将我原来的问题稍微改成了我们现在所处的位置。奇怪的是行数是例如不同的.. 快速问题:文件可以有多行吗?它们是如何分开的?

以上是关于将文件列表 (JSON) 转换为数据框的主要内容,如果未能解决你的问题,请参考以下文章

将熊猫数据框列表转换为 json

Pyspark 将 json 数组转换为数据帧行

如何将 json 对象列表转换为单个 pyspark 数据框?

如何使用pyspark将具有多个可能值的Json数组列表转换为数据框中的列

将字典列表转换为数据框 [重复]

从文件中读取json数据并将t转换为列表的函数[重复]