流式传输作业失败-状态架构不兼容问题

Posted

技术标签:

【中文标题】流式传输作业失败-状态架构不兼容问题【英文标题】:Streaming job failure-State Schema not Compatible issue 【发布时间】:2021-06-11 05:35:01 【问题描述】:

我的流式传输作业现在因以下错误而失败,流式传输作业运行了将近 2 个月,并且它是完全无状态的转换,只需要将新行附加到目标增量表。在流式传输之前,我手动将架构提供给 csv 文件,甚至验证了流式作业架构和下游表架构都与数据类型完美匹配。

不确定,为什么即使在无状态转换中,我也会收到以下错误。任何帮助将不胜感激。

File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 2442, in _call_proxy
return_value = getattr(self.pool[obj_id], method)(*params)
File "/databricks/spark/python/pyspark/sql/utils.py", line 195, in call
raise e
File "/databricks/spark/python/pyspark/sql/utils.py", line 192, in call
self.func(DataFrame(jdf, self.sql_ctx), batch_id)
File "<command-422857213447422>", line 2, in write_to_managed_table
print(f"inside foreachBatch for batch_id:batchId, rows in passed dataframe: micro_batch_df.count()")
File "/databricks/spark/python/pyspark/sql/dataframe.py", line 670, in count
return int(self._jdf.count())
File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/java_gateway.py", line 1304, in __call__
return_value = get_return_value(
File "/databricks/spark/python/pyspark/sql/utils.py", line 110, in deco
return f(*a, **kw)
File "/databricks/spark/python/lib/py4j-0.10.9-src.zip/py4j/protocol.py", line 326, in get_return_value
raise Py4JJavaError(
py4j.protocol.Py4JJavaError: An error occurred while calling o433.count.
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 28 in stage 13792.0 
failed 4 times, most recent failure: Lost task 28.3 in stage 13792.0 (TID 752198) 
(10.139.64.13 executor 45): 
org.apache.spark.sql.execution.streaming.state.StateSchemaNotCompatible: Provided schema 
doesn't match to the schema for existing state! Please note that Spark allow difference of 
field name: check count of fields and data type of each field.

【问题讨论】:

【参考方案1】:

CSV 文件可能有问题,它可能已损坏。 您可以通过将“mode”选项设置为“PERMISSIVE”或“DROPMALFORMED”来忽略此 csv 文件。

mode(默认PERMISSIVE):允许在解析过程中处理损坏记录的模式。 PERMISSIVE :当遇到损坏的记录时,将其他字段设置为 null。当用户设置模式时,它会为额外的字段设置 null。 DROPMALFORMED :忽略整个损坏的记录。 FAILFAST :遇到损坏的记录时抛出异常。

https://spark.apache.org/docs/latest/api/scala/org/apache/spark/sql/streaming/DataStreamReader.html#csv(path:String):org.apache.spark.sql.DataFrame

spark.read.format("csv")
.option("header,"true")
.option("path","your.csv")
.option("mode","DROPMALFORMED")
.schema(csvSchema)
.load()

【讨论】:

以上是关于流式传输作业失败-状态架构不兼容问题的主要内容,如果未能解决你的问题,请参考以下文章

如何在不中断流式传输作业的情况下更改 spark spark 流式事件中的 json 架构?

错误:流式传输作业失败:流分析作业存在验证错误:当前不支持到端点的多个输入列

Spark 流式传输作业在被驱动程序停止后失败

使用“添加文件”部分(shell 脚本)时,Hortonworks Hue Oozie 流式传输作业失败

如何在谷歌云数据流中停止流式传输管道

将 mp4 文件流式传输到其他网站