火花流到pyspark json文件中的数据帧

Posted

技术标签:

【中文标题】火花流到pyspark json文件中的数据帧【英文标题】:spark streaming to data frame in pyspark json file 【发布时间】:2017-07-29 19:01:51 【问题描述】:

我需要 pyspark 的帮助。我正在从 kafka 流式传输 json 数据,我需要将 as Dataframe 转换为 pyspark。要流式传输,我使用了以下代码。

from __future__ import print_function
import sys
import csv
import json
from pyspark import SparkContext
from pyspark.streaming import StreamingContext
from pyspark.streaming.kafka import KafkaUtils
from pyspark.sql import SparkSession
from pyspark.sql import SQLContext
from pyspark.sql import Row
import pandas as pd
global gspark
def convert_Json2DF(time,rdd):
    nf = gspark.read.json(rdd)
    nf.toDF().show()
    # Convert RDD[String] to RDD[Row] to DataFrame
    #rowRdd = rdd.map(lambda w: Row(word=w))
    #wordsDataFrame = gspark.createDataFrame(rowRdd)
    #pdf = wordsDataFrame.toDF()
    #pdf.show()
if __name__ == "__main__":
    if len(sys.argv) != 3:
        print("Usage: kafka_wordcount.py <zk> <topic>", file=sys.stderr)
        exit(-1)
    gspark = SparkSession \
        .builder \
        .appName("SparkSteaming Kafka Receiver") \
        .config("spark.some.config.option", "some-value") \
        .config("spark.ui.port", 22300) \
        .config("spark.executor.instances", 4) \
        .config("spark.executor.cores", 4) \
        .getOrCreate()
    sc = gspark.sparkContext
    SQLContext= SQLContext(sc)
    ssc = StreamingContext(sc, 15)
    zkQuorum, topic = sys.argv[1:]
    kvs = KafkaUtils.createStream(ssc, zkQuorum, "spark-streaming-consumer", topic: 1)
    lines = kvs.map(lambda (key,value): json.loads(value))
    lines.pprint()
    lines.foreachRDD(Json2DF)
ssc.start()
ssc.awaitTermination()

对于上述代码,我无法将 json 数据转换为 Dataframe。谁能纠正我需要在 Json2DF 函数或主函数中进行更改的地方。

谢谢 巴拉

【问题讨论】:

【参考方案1】:

首先,确保所有 json 数据具有相同的架构。

def check_json(js, col):
    try:
        data = json.loads(js)
        return [data.get(i) for i in col]
    except:
        return []


def convert_json2df(rdd, col):
    ss = SparkSession(rdd.context)
    if rdd.isEmpty():
        return
    df = ss.createDataFrame(rdd, schema=StructType("based on 'col'"))
    df.show()


cols = ['idx', 'name']

sc = SparkContext()
ssc = StreamingContext(sc, 5)

lines = ssc.socketTextStream('localhost', 9999) \
    .map(lambda x: check_json(x, cols)) \
    .filter(lambda x: x) \
    .foreachRDD(lambda x: convert_json2df(x, cols))

ssc.start()
ssc.awaitTermination()

【讨论】:

以上是关于火花流到pyspark json文件中的数据帧的主要内容,如果未能解决你的问题,请参考以下文章

Pyspark 将 json 数组转换为数据帧行

如何在火花上将json字符串转换为数据帧

Databricks 火花 UDF 不适用于过滤的数据帧

嵌套的json扁平化火花数据框

PySpark:我们应该迭代更新数据帧吗?

Pyspark 将 JSON 读取为 dict 或 struct 而不是数据帧/RDD