使用 pyspark 在 Spark Streaming 中的 to.JSON()

Posted

技术标签:

【中文标题】使用 pyspark 在 Spark Streaming 中的 to.JSON()【英文标题】:to.JSON() in Spark Streaming using pyspark 【发布时间】:2016-06-30 07:26:16 【问题描述】:

我使用toJSON()方法在spark Streaming的transform()函数中将DataFrame转换为RDD of json文档。

我正在使用pyspark 进行如下编码:

def process(rdd):
  rddDataframe = sqlContext.createDataFrame(rdd)
  rddback = rddDataFrame.toJSON()
return rdd

dstream_test = dstream_in.transform(lambda rdd: process(rdd))

但我收到以下错误:

 UnpicklingError: invalid load key, ''

请帮我解决这个问题。

【问题讨论】:

您的代码没有意义。能否请您提供数据样本? 【参考方案1】:

不要将 rdd 传递给函数,将函数传递给你的 rdd。

为每一行定义你的转换,然后发送它

def transform(row):
    ....

your_rdd = your_rdd.map(transform)

【讨论】:

以上是关于使用 pyspark 在 Spark Streaming 中的 to.JSON()的主要内容,如果未能解决你的问题,请参考以下文章

Python脚本会使用pyspark在spark中工作吗

Pyspark - 在作为列表的 spark 数据框列上使用 reducebykey

使用 Pyspark 使用 Spark 读取巨大 Json 文件的第一行

在 spark 版本 2.2.0 中使用 python(pyspark) 从 mqtt 获取数据流

PySpark - 为 SQL Server 使用 Spark 连接器

pyspark在spark sql中函数之间的使用范围