如何在 Pyspark 中计算或管理流数据

Posted

技术标签:

【中文标题】如何在 Pyspark 中计算或管理流数据【英文标题】:How to calculate or manage streaming data in Pyspark 【发布时间】:2020-06-02 20:06:44 【问题描述】:

我想从流数据中计算数据,然后发送到网页。 例如:我将计算流数据中 TotalSales 列的总和。 但它在 summary = dataStream.select('TotalSales').groupby().sum().toPandas() 处出错,这是我的代码。

import os
from pyspark.sql import SparkSession
from pyspark.sql.types import StructType
from pyspark.sql.functions import *

spark = SparkSession.builder.appName("Python Spark SQL basic example").config("spark.some.config.option", "some-value").getOrCreate()
schema = StructType().add("_c0", "integer").add("InvoiceNo", "string").add("Quantity","integer").add("InvoiceDate","date").add("UnitPrice","integer").add("CustomerID","double").add("TotalSales","integer")
INPUT_DIRECTORY = "C:/Users/HP/Desktop/test/jsonFile"
dataStream = spark.readStream.format("json").schema(schema).load(INPUT_DIRECTORY)
query = dataStream.writeStream.format("console").start()

summary = dataStream.select('TotalSales').groupby().sum().toPandas()
print(query.id)
query.awaitTermination();

这是命令行上显示的错误。

Traceback (most recent call last):
  File "testStreaming.py", line 12, in <module>
    dataStream = dataStream.toPandas()
  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\dataframe.py", line 2150, in toPandas
    pdf = pd.DataFrame.from_records(self.collect(), columns=self.columns)
  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\dataframe.py", line 534, in collect
    sock_info = self._jdf.collectToPython()
  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\py4j\java_gateway.py", line 1257, in __call__
answer, self.gateway_client, self.target_id, self.name)
  File "C:\Users\HP\AppData\Local\Programs\Python\Python36\lib\site-packages\pyspark\sql\utils.py", line 69, in deco
raise AnalysisException(s.split(': ', 1)[1], stackTrace)
pyspark.sql.utils.AnalysisException: 'Queries with streaming sources must be executed with writeStream.start();;\nFileSource[C:/Users/HP/Desktop/test/jsonFile]'

感谢您的回答。

【问题讨论】:

【参考方案1】:

你为什么要创建一个 pandas Df

toPandas 将创建一个位于驱动程序节点本地的 DataFrame。我不确定你想在这里实现什么。 Pandas DataFrame 表示一组固定的元组,其中结构化流是连续的数据流。

现在解决此问题的一种可能方法是完成您想要执行的整个过程并将输出发送到 parquet/csv 文件并使用此 parquet/csv 等文件创建 pandas DF。

summary = dataStream.select('TotalSales').groupby().sum()
query = dataStream.writeStream.format("parquet").outputMode("complete").start(outputPathDir)
query.awaitTermination()

【讨论】:

我试过你的建议,它的工作。我有另一个问题。如何从数据框(汇总变量)中提取值。例如:我尝试打印一个汇总变量。就像这样。 DataFrame[sum(TotalSales): bigint]。我需要使用数据框中的值来显示在我的网页上。 数据作为数据框。 json 文件中的数据示例:"InvoiceNo":"539993","CustomerID":13313.0,"TotalSales":678 "InvoiceNo":"539993","CustomerID":13313.0,"TotalSales":430 "InvoiceNo":"539993","CustomerID":13313.0,"TotalSales":350 如果我对您的理解正确,您想计算col TotalSales 的累积总和。好吧,这是一个多重聚合的问题,不幸的是结构化流目前不支持,但你可以使用计算窗口上的总和或在某个列上分组的解决方法,将其存储在文件中 n 使用 Spark Sql 来计算总和并在网页上显示结果。

以上是关于如何在 Pyspark 中计算或管理流数据的主要内容,如果未能解决你的问题,请参考以下文章

如何使用pyspark流计算csv文件中的条目数

如何在 pyspark 结构化流中使用 maxOffsetsPerTrigger?

如何在 pyspark 的结构化流作业中运行地图转换

如何在 PySpark 中计算不同窗口大小的滚动总和

如何在我的 pyspark 代码中访问 S3 中的 Amazon kinesis 流文件?

如何在 Pyspark 2.1 中使用窗口函数来计算星期几的出现次数