如何在pyspark中将rdd行转换为带有json结构的数据框?

Posted

技术标签:

【中文标题】如何在pyspark中将rdd行转换为带有json结构的数据框?【英文标题】:How convert rdd row to a dataframe with json struct in pyspark? 【发布时间】:2019-03-15 22:15:25 【问题描述】:

我将以下 json 发送到路径“/home/host/test”,以便程序可以使用 spark 流捕获它并能够对其进行查询。

"id": "1", description: "test"
"id": "1", description: "test"

但是当我执行查询时,它看起来像下面的结构

root
   | --word: String (Nulleable = true)

我得到以下结果:

+ ------------------- +
| word |
---------------------
| "id": "1", "test"
| "id": "1", "test"

我需要这样的结构

root
   | --id: String (Nulleable = true)
   | --description string (Nulleable = true)

我需要得到如下结果

 ----------------
| id | description
----------------
| "1" | "test" |
| "1" | "test" |
----------------    

这是我的 pyspkark 代码

from __future__ import print_function
import os
import sys
from pyspark import SparkContext
from pyspark.sql.functions import col, explode
from pyspark.streaming import StreamingContext
from pyspark.sql import SQLContext, Row
from pyspark.sql import SQLContext


if __name__ == "__main__":

sc = SparkContext(appName="PythonSqlNetworkWordCount")
ssc = StreamingContext(sc, 3)
sqlcontextoriginal = SQLContext(sc)

# Create a socket stream on target ip:port and count the
# words in input stream of \n delimited text (eg. generated by 'nc')
lines = ssc.textFileStream("/home/host/test")

# Convert RDDs of the words DStream to DataFrame and run SQL query
def process(time, rdd):
    print("========= %s =========" % str(time))

    try:
        # Get the singleton instance of SQLContext
        sqlContext = SQLContext(rdd.context)
        # Convert RDD[String] to RDD[Row] to DataFrame
        rowRdd = rdd.map(lambda w: Row(word=w))
        wordsDataFrame = sqlContext.createDataFrame(rowRdd).toJSON()

        json = sqlContext.read.json(wordsDataFrame)
        # Register as table
        json.createOrReplaceTempView("words")
        json.printSchema()

        wordCountsDataFrame = sqlContext.sql("select * from words ")
        wordCountsDataFrame.show()

    except:
        pass

lines.foreachRDD(process)
ssc.start()
ssc.awaitTermination()

【问题讨论】:

【参考方案1】:

好的,我找到了解决方案。

我不得不使用 sql.read.json 直接将它作为参数传递给 rdd。

json = sqlContext.read.json(rdd)

【讨论】:

以上是关于如何在pyspark中将rdd行转换为带有json结构的数据框?的主要内容,如果未能解决你的问题,请参考以下文章

在pyspark中将带有字符串json字符串的列转换为带有字典的列

如何在 PySpark 1.6 中将 DataFrame 列从字符串转换为浮点/双精度?

在 Pyspark 中将流水线 RDD 转换为 Dataframe [重复]

在 PySpark 中将 Python Dict 转换为稀疏 RDD 或 DF

如何在 PySpark 中将两个 rdd 合并为一个

在 Pyspark 中将字典转换为数据框