如何在Pyspark中计算或管理流数据?

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了如何在Pyspark中计算或管理流数据?相关的知识,希望对你有一定的参考价值。

我想从流媒体数据中提取数据,然后发送到网页上。例如:我想计算流式数据中的TotalSales列的总和。我将计算流数据中的TotalSales列的总和。 但它在以下地方出错 summary = dataStream.select('TotalSales').groupby().sum().toPandas() 这是我的代码。

import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
schema = StructType().add("_c0", "integer").add("InvoiceNo", "string").add("Quantity","integer").add("InvoiceDate","date").add("UnitPrice","integer").add("CustomerID","double").add("TotalSales","integer")
INPUT_DIRECTORY = "C:/Users/HP/Desktop/test/jsonFile"
dataStream = spark.readStream.format("json").schema(schema).load(INPUT_DIRECTORY)
query = dataStream.writeStream.format("console").start()

summary = dataStream.select('TotalSales').groupby().sum().toPandas()
print(query.id)
query.awaitTermination();

这是命令行上显示的错误。

Traceback (most recent call last):
  File "testStreaming.py", line 12, in <module>
    dataStream = dataStream.toPandas()
  File "C:UsersHPAppDataLocalProgramsPythonPython36libsite-packagespysparksqldataframe.py", line 2150, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:UsersHPAppDataLocalProgramsPythonPython36libsite-packagespysparksqldataframe.py", line 534, in collect
    sock_info = self._jdf.collectToPython()
  File "C:UsersHPAppDataLocalProgramsPythonPython36libsite-packagespy4jjava_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "C:UsersHPAppDataLocalProgramsPythonPython36libsite-packagespysparksqlutils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;
FileSource[C:/Users/HP/Desktop/test/jsonFile]'

谢谢你的回答。

答案

为什么你要创建一个Pandas Df?

toPandas将创建一个DataFrame,它是你的驱动节点的本地数据。 我不知道你想在这里实现什么。 Pandas DataFrame代表一组固定的tuples,而结构化流则是一个连续的数据流。

现在一个可能的解决这个问题的办法是完成你要做的整个过程,并将输出发送到一个parquetcsv文件,并使用这个parquetcsv等文件来创建一个pandas DF。

summary = dataStream.select('TotalSales').groupby().sum()
query = dataStream.writeStream.format("parquet").outputMode("complete").start(outputPathDir)
query.awaitTermination()

以上是关于如何在Pyspark中计算或管理流数据?的主要内容,如果未能解决你的问题,请参考以下文章

如何使用pyspark流计算csv文件中的条目数

如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?

如何将 Pyspark 数据帧存储到 HBase

数据分析工具篇——pyspark应用详解

火花流到pyspark json文件中的数据帧