将 Spark Structure Streaming DataFrames 转换为 Pandas DataFrame

Posted

技术标签:

【中文标题】将 Spark Structure Streaming DataFrames 转换为 Pandas DataFrame【英文标题】:Convert Spark Structure Streaming DataFrames to Pandas DataFrame 【发布时间】:2019-03-05 17:28:57 【问题描述】:

我设置了一个使用 Kafka 主题的 Spark Streaming 应用程序,我需要使用一些接受 Pandas Dataframe 的 API,但是当我尝试转换它时,我得到了这个

: org.apache.spark.sql.AnalysisException: Queries with streaming sources must be executed with writeStream.start();;
kafka
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.org$apache$spark$sql$catalyst$analysis$UnsupportedOperationChecker$$throwError(UnsupportedOperationChecker.scala:297)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:36)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$$anonfun$checkForBatch$1.apply(UnsupportedOperationChecker.scala:34)
        at org.apache.spark.sql.catalyst.trees.TreeNode.foreachUp(TreeNode.scala:127)
        at org.apache.spark.sql.catalyst.analysis.UnsupportedOperationChecker$.checkForBatch(UnsupportedOperationChecker.scala:34)
        at org.apache.spark.sql.execution.QueryExecution.assertSupported(QueryExecution.scala:63)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData$lzycompute(QueryExecution.scala:74)
        at org.apache.spark.sql.execution.QueryExecution.withCachedData(QueryExecution.scala:72)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan$lzycompute(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.optimizedPlan(QueryExecution.scala:78)
        at org.apache.spark.sql.execution.QueryExecution.completeString(QueryExecution.scala:219)
        at org.apache.spark.sql.execution.QueryExecution.toString(QueryExecution.scala:202)
        at org.apache.spark.sql.execution.SQLExecution$.withNewExecutionId(SQLExecution.scala:62)
        at org.apache.spark.sql.Dataset.withNewExecutionId(Dataset.scala:2832)
        at org.apache.spark.sql.Dataset.collectToPython(Dataset.scala:2809)
        at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62)
        at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
        at java.lang.reflect.Method.invoke(Method.java:498)
        at py4j.reflection.MethodInvoker.invoke(MethodInvoker.java:244)
        at py4j.reflection.ReflectionEngine.invoke(ReflectionEngine.java:357)
        at py4j.Gateway.invoke(Gateway.java:282)
        at py4j.commands.AbstractCommand.invokeMethod(AbstractCommand.java:132)
        at py4j.commands.CallCommand.execute(CallCommand.java:79)
        at py4j.GatewayConnection.run(GatewayConnection.java:238)
        at java.lang.Thread.run(Thread.java:745)

这是我的python代码

spark = SparkSession\
    .builder\
    .appName("sparkDf to pandasDf")\
    .getOrCreate()

sparkDf = spark.readStream\
    .format("kafka")\
    .option("kafka.bootstrap.servers", "kafkahost:9092")\
    .option("subscribe", "mytopic")\
    .option("startingOffsets", "earliest")\
    .load()


pandas_df =  sparkDf.toPandas()

query = sparkDf.writeStream\
    .outputMode("append")\
    .format("console")\
    .option("truncate", "false")\
    .trigger(processingTime="5 seconds")\
    .start()\
    .awaitTermination()

现在我知道我正在创建流数据帧的另一个实例,但无论我在哪里尝试使用 start() 和 awaitTermination(),我都会收到相同的错误。

有什么想法吗?

【问题讨论】:

【参考方案1】:

TL;DR这样的操作是行不通的。

现在我知道我正在创建流数据帧的另一个实例

嗯,问题是你真的不知道。 toPandas,在 DataFrame 上调用会创建一个简单的、本地的、非分布式的 Pandas DataFrame、in memory of the driver node。

它不仅与 Spark 无关,而且作为抽象,本质上与结构化流不兼容 - Pandas DataFrame 表示一组固定的元组,而结构化流表示一个无限的元组流。

目前尚不清楚您要在这里实现什么,这可能是 XY 问题,但如果您确实需要使用带有结构化流的 Pandas,您可以尝试使用 pandas_udf - SCALARGROUPED_MAP 变体至少与基于时间的基本触发器兼容(也可能支持其他变体,尽管某些组合显然没有任何意义,而且我不知道有任何官方兼容性矩阵)。

【讨论】:

叹息,这么多小时试图让它工作却发现我做不到。感谢您的回复 @anonuser1234 请不要向信使开枪 :) 但是,如果您提供有关为什么想要 Pandas 对象的更多详细信息,我可以尝试扩展答案。 当然,我正在进行的项目正在使用 Panda Dataframes 来做一些 NLP 工作。我试图从 Spark Structure 流中获取我们需要的数据,然后将其作为 panda DF 发送,让 NLP 人员完成他们的工作。 如果主要目标是建模/在线学习,那么 Spark Structured Streaming 在这里不会有太大帮助,尽管 foreach 的作者可能会在这里提供一些机会。对于预测等pandas_udfs 应该就可以了。 我们想利用 Apache Spark 和他们的 RDD(他们的 Dataframes 在上面运行)来处理大量数据。

以上是关于将 Spark Structure Streaming DataFrames 转换为 Pandas DataFrame的主要内容,如果未能解决你的问题,请参考以下文章

移植一个VB6可以正常操作的CH341DLL,移植到VB2010出现的数据读写错误的问题

分布式资本

恐慌:json:无法将数组解组为 main.Structure 类型的 Go 值

将“Emgu.CV.Image<Emgu.CV.Structure.Bgr,byte>”转换为“System.Drawing.Image”

DAO package structure

csharp [ArasLabs / override-default-structure-browser]将分类属性添加到与上下文项相同类型的项中