将 RDD 中的 JSON 行转换为 Apache Spark 中的数据帧

Posted

技术标签:

【中文标题】将 RDD 中的 JSON 行转换为 Apache Spark 中的数据帧【英文标题】:Convert lines of JSON in RDD to dataframe in Apache Spark 【发布时间】:2016-10-02 15:09:16 【问题描述】:

我在 S3 中有大约 17,000 个文件,如下所示:

"hour": "00", "month": "07", "second": "00", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"
"hour": "00", "month": "07", "second": "01", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"
"hour": "00", "month": "07", "second": "02", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"
"hour": "00", "month": "07", "second": "03", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"
"hour": "00", "month": "07", "second": "04", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"

我每天有一个文件。每个文件都包含每秒的记录。 ∴ 一个文件中有 86,000 条记录。每个文件都有一个类似“YYYY-MM-DD”的文件名。

我使用 boto3 生成存储桶中的文件列表。这里我只选择了 10 个使用前缀的文件。

import boto3
s3_list = []
s3 = boto3.resource('s3')
my_bucket = s3.Bucket('time-waits-for-no-man')
for object in my_bucket.objects.filter(Prefix='1972-05-1):
    s3_list.append(object.key)

此函数返回文件列表(S3 密钥)。然后我定义一个函数来获取文件并返回行:

def FileRead(s3Key):
    s3obj = boto3.resource('s3').Object(bucket_name='bucket', key=s3Key)
    contents = s3obj.get()['Body'].read().decode('utf-8')
    yield Row(**contents)

然后我使用 flatMap 分发这个函数:

job = sc.parallelize(s3_list)
foo = job.flatMap(FileRead)

问题

但是,我无法弄清楚如何将这些行正确地泵入 Dataframe。

>>> foo.toDF().show()
+--------------------+                                                          
|                  _1|
+--------------------+
|"hour": "00", "m...|
|"hour": "00", "m...|
|"hour": "00", "m...|
|"hour": "00", "m...|
|"hour": "00", "m...|
|"hour": "00", "m...|
|"hour": "00", "m...|
|"hour": "00", "m...|
|"hour": "00", "m...|
|"hour": "00", "m...|
+--------------------+

>>> foo.toDF().count()
10  

请有人告诉我如何做到这一点?

【问题讨论】:

【参考方案1】:

这是针对同一问题的另一种解决方案。

from pyspark.sql.types import StructType,StructField,StringType
fields =['hour','month','second','year','timezone','day','minute']

schema = StructType([
StructField(field,StringType(),True) for field in fields
])

js =(
  "hour": "00", "month": "07", "second": "00", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00",
  "hour": "00", "month": "07", "second": "01", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00",
  "hour": "00", "month": "07", "second": "02", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00",
  "hour": "00", "month": "07", "second": "03", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00",
  "hour": "00", "month": "07", "second": "04", "year": "1970", "timezone": "-00:00", "day": "12", "minute": "00"
)



rdd = sc.parallelize(js)
jsDF = spark.createDataFrame(rdd,schema)

jsDF.show()

【讨论】:

【参考方案2】:

您可能应该直接使用json 阅读器 (spark.read.json / sqlContext.read.json),但如果您知道架构,您可以尝试手动解析 JSON 字符串:

from pyspark.sql.types import StructField, StructType, StringType
from pyspark.sql import Row
import json

fields = ['day', 'hour', 'minute', 'month', 'second', 'timezone', 'year']
schema =  StructType([
  StructField(field, StringType(), True) for field in fields
])

def parse(s, fields):
    try:
        d = json.loads(s[0])
        return [tuple(d.get(field) for field in fields)]
    except:
        return []

spark.createDataFrame(foo.flatMap(lambda s: parse(s, fields)), schema)

你也可以使用get_json_object:

from pyspark.sql.functions import get_json_object

df.select([
    get_json_object("value", "$.0".format(field)).alias(field)
    for field in fields
])

【讨论】:

【参考方案3】:

最后我得到了它:

def FileRead(s3Key):
    s3obj = boto3.resource('s3').Object(bucket_name='bucket', key=s3Key)
    contents = s3obj.get()['Body'].read().decode()
    result = []
    meow = contents.split('\n')
    index = 0
    limit = 10
    for item in meow:
        index += 1
        result.append(json.loads(item))
        if index == limit:
            return result

job = sc.parallelize(s3_list)
foo = job.flatMap(distributedJsonRead)
df = foo.toDF()

感谢@user6910411 的启发。

【讨论】:

以上是关于将 RDD 中的 JSON 行转换为 Apache Spark 中的数据帧的主要内容,如果未能解决你的问题,请参考以下文章

将 RDD 转换为 Dataframe Spark

如何在火花中将rdd对象转换为数据框

使用 Java 将数据存储为 Apache Spark 中的配置单元表

无法转换RDD [ConsumerRecord]

如何将 cassandraRow 转换为 Row(apache spark)?

Spark核心-RDD