发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询
Posted
技术标签:
【中文标题】发生异常:pyspark.sql.utils.AnalysisException \'必须使用 writeStream.start();;\\nkafka\' 执行带有流式源的查询【英文标题】:Exception has occurred: pyspark.sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询 【发布时间】:2019-02-01 04:22:55 【问题描述】:在代码处 如果不是 df.head(1).isEmpty: 我有异常,
Exception has occurred: pyspark.sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'
我不知道如何在流数据中使用 if。 当我使用 jupyter 执行每一行时,代码很好,我可以得到我的结果。但使用 .py 并不好。
我的目的是:我想使用流式传输每隔一秒从 kafka 获取数据,然后我将每批流式传输数据(一批意味着我一秒得到的数据)转换为 pandas 数据帧,然后我使用 pandas 函数对数据做一些事情,最后我将结果发送到其他 kafka 主题。
请帮助我,并原谅我的泳池英语,非常感谢。
sc = SparkContext("local[2]", "OdometryConsumer")
spark = SparkSession(sparkContext=sc) \
.builder \
.appName("StructuredNetworkWordCount") \
.getOrCreate()
# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")
df = spark \
.readStream \
.format("kafka") \
.option("kafka.bootstrap.servers", "localhost:9092") \
.option("subscribe", "data") \
.load()
ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(type(ds))
if not df.head(1).isEmpty:
alertQuery = ds \
.writeStream \
.queryName("qalerts")\
.format("memory")\
.start()
alerts = spark.sql("select * from qalerts")
pdAlerts = alerts.toPandas()
a = pdAlerts['value'].tolist()
d = []
for i in a:
x = json.loads(i)
d.append(x)
df = pd.DataFrame(d)
print(df)
ds = df['jobID'].unique().tolist()
dics =
for source in ds:
ids = df.loc[df['jobID'] == source, 'id'].tolist()
dics[source]=ids
print(dics)
query = ds \
.writeStream \
.queryName("tableName") \
.format("console") \
.start()
query.awaitTermination()
【问题讨论】:
【参考方案1】:删除if not df.head(1).isEmpty:
,你应该没问题。
异常的原因很简单,即流式查询是一种结构化查询,永不结束并不断执行。根本不可能查看单个元素,因为没有“单个元素”,但是(可能)有数千个元素,而且很难说你什么时候想在幕后看,只看到一个单个元素。
【讨论】:
以上是关于发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询的主要内容,如果未能解决你的问题,请参考以下文章
应用程序的组件中发生了未经处理的异常,下方显示“调用的目标发生了异常”时如何解决?
处理您的请求时发生异常。此外,在执行自定义错误页面时发生了另一个异常
“处理您的请求时发生异常。此外,执行自定义错误页面时发生另一个异常......”