发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询

Posted

技术标签:

【中文标题】发生异常:pyspark.sql.utils.AnalysisException \'必须使用 writeStream.start();;\\nkafka\' 执行带有流式源的查询【英文标题】:Exception has occurred: pyspark.sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询 【发布时间】:2019-02-01 04:22:55 【问题描述】:

在代码处 如果不是 df.head(1).isEmpty: 我有异常,

Exception has occurred: pyspark.sql.utils.AnalysisException 'Queries with streaming sources must be executed with writeStream.start();;\nkafka'

我不知道如何在流数据中使用 if。 当我使用 jupyter 执行每一行时,代码很好,我可以得到我的结果。但使用 .py 并不好。

我的目的是:我想使用流式传输每隔一秒从 kafka 获取数据,然后我将每批流式传输数据(一批意味着我一秒得到的数据)转换为 pandas 数据帧,然后我使用 pandas 函数对数据做一些事情,最后我将结果发送到其他 kafka 主题。

请帮助我,并原谅我的泳池英语,非常感谢。

sc = SparkContext("local[2]", "OdometryConsumer")
spark = SparkSession(sparkContext=sc) \
    .builder \
    .appName("StructuredNetworkWordCount") \
    .getOrCreate()

# Enable Arrow-based columnar data transfers
spark.conf.set("spark.sql.execution.arrow.enabled", "true")


df = spark \
  .readStream \
  .format("kafka") \
  .option("kafka.bootstrap.servers", "localhost:9092") \
  .option("subscribe", "data") \
  .load()
ds = df.selectExpr("CAST(key AS STRING)", "CAST(value AS STRING)")
print(type(ds))

if not df.head(1).isEmpty:
  alertQuery = ds \
          .writeStream \
          .queryName("qalerts")\
          .format("memory")\
          .start()

  alerts = spark.sql("select * from qalerts")
  pdAlerts = alerts.toPandas()
  a = pdAlerts['value'].tolist()

  d = []
  for i in a:
      x = json.loads(i)
      d.append(x)

  df = pd.DataFrame(d)
  print(df)
  ds = df['jobID'].unique().tolist()


  dics = 
  for source in ds:
      ids = df.loc[df['jobID'] == source, 'id'].tolist()
      dics[source]=ids

  print(dics)  
query = ds \
  .writeStream \
  .queryName("tableName") \
  .format("console") \
  .start()

query.awaitTermination()

【问题讨论】:

【参考方案1】:

删除if not df.head(1).isEmpty:,你应该没问题。

异常的原因很简单,即流式查询是一种结构化查询,永不结束并不断执行。根本不可能查看单个元素,因为没有“单个元素”,但是(可能)有数千个元素,而且很难说你什么时候想在幕后看,只看到一个单个元素。

【讨论】:

以上是关于发生异常:pyspark.sql.utils.AnalysisException '必须使用 writeStream.start();;\nkafka' 执行带有流式源的查询的主要内容,如果未能解决你的问题,请参考以下文章

c# 在WebClient 请求期间发生异常

应用程序的组件中发生了未经处理的异常,下方显示“调用的目标发生了异常”时如何解决?

处理您的请求时发生异常。此外,在执行自定义错误页面时发生了另一个异常

“处理您的请求时发生异常。此外,执行自定义错误页面时发生另一个异常......”

FIFO可能会发生Belady异常,堆栈算法不会发生Belady异常,如LRU。证明为何不会异常。

当自动关闭资源时尝试使用资源引发异常以及异常时会发生啥