如何使用带有 from_avro 标准功能的 Confluent Schema Registry? [复制]
Posted
技术标签:
【中文标题】如何使用带有 from_avro 标准功能的 Confluent Schema Registry? [复制]【英文标题】:How to use Confluent Schema Registry with from_avro standard function? [duplicate] 【发布时间】:2020-01-16 21:35:04 【问题描述】:My Kafka 和 Schema Registry 基于 Confluent Community Platform 5.2.2,My Spark 有 2.4.4 版本。我开始使用 Spark REPL env:
./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4
并为 Spark 会话设置 Kafka 源:
val brokerServers = "my_confluent_server:9092"
val topicName = "my_kafka_topic_name"
val df = spark
.readStream
.format("kafka")
.option("kafka.bootstrap.servers", brokerServers)
.option("subscribe", topicName)
.load()
我得到了关于键和值的架构信息:
import io.confluent.kafka.schemaregistry.client.rest.RestService
val schemaRegistryURL = "http://my_confluent_server:8081"
val restService = new RestService(schemaRegistryURL)
val keyRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-key")
val valueRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-value")
首先,如果我用 writeStream 查询“key”,即
import org.apache.spark.sql.avro._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import java.time.LocalDateTime
val query = df.writeStream
.outputMode("append")
.foreachBatch((batchDF: DataFrame, batchId: Long) =>
val rstDF = batchDF
.select(
from_avro($"key", keyRestResponseSchemaStr).as("key"),
from_avro($"value", valueRestResponseSchemaStr).as("value"))
println(s"$LocalDateTime.now --- Batch $batchId, $batchDF.count rows")
//rstDF.select("value").show
rstDF.select("key").show
)
.trigger(Trigger.ProcessingTime("120 seconds"))
.start()
query.awaitTermination()
没有错误,甚至显示了行数,但我无法获得任何数据。
2019-09-16T10:30:16.984 --- Batch 0, 0 rows
+---+
|key|
+---+
+---+
2019-09-16T10:32:00.401 --- Batch 1, 27 rows
+---+
|key|
+---+
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
+---+
only showing top 20 rows
但如果我选择“值”:
import org.apache.spark.sql.avro._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import java.time.LocalDateTime
val query = df.writeStream
.outputMode("append")
.foreachBatch((batchDF: DataFrame, batchId: Long) =>
val rstDF = batchDF
.select(
from_avro($"key", keyRestResponseSchemaStr).as("key"),
from_avro($"value", valueRestResponseSchemaStr).as("value"))
println(s"$LocalDateTime.now --- Batch $batchId, $batchDF.count rows")
rstDF.select("value").show
//rstDF.select("key").show
)
.trigger(Trigger.ProcessingTime("120 seconds"))
.start()
query.awaitTermination()
我收到消息:
2019-09-16T10:34:54.287 --- Batch 0, 0 rows
+-----+
|value|
+-----+
+-----+
2019-09-16T10:36:00.416 --- Batch 1, 19 rows
19/09/16 10:36:03 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 3)
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:50)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$(Unknown Source)
at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
at org.apache.spark.scheduler.Task.run(Task.scala:123)
at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
at java.lang.Thread.run(Thread.java:748)
所以我认为问题有两个层面:
首先,key和value有不同的avro反序列化逻辑,目前“from_avro”只支持key,而不是比值
即使是key,也没有错误,但是“from_avro”的反序列化器无法获取真实数据。
你认为我有什么错误的步骤吗?或者,是否需要增强 from_avro 和 to_avro?
谢谢。
【问题讨论】:
【参考方案1】:您的键和值完全是字节数组,并以整数值作为其 ID 的前缀。 Spark-Avro 不支持该格式,仅支持包含模式作为记录一部分的“Avro 容器对象”格式。
换句话说,你需要invoke the functions from Confluent deserializers,而不是“plain Avro”反序列化器,以便首先获取 Avro 对象,然后你可以在这些对象上放置模式
Spark 应该增强 from_avro 和 to_avro?
他们应该,但他们不会。参考SPARK-26314。旁注,Databricks 确实提供与同名功能的模式注册表集成,只是为了增加混乱
解决方法是使用这个库 - https://github.com/AbsaOSS/ABRiS
或在Integrating Spark Structured Streaming with the Confluent Schema Registry查看其他解决方案
【讨论】:
非常感谢,cricket_007。实际上,我在最后一个链接中提到了您的解决方案。我目前的解决方法是使用 KafkaAvroDeserializer。 然后使用 spark.read.json 将值转换为 DataFrame以上是关于如何使用带有 from_avro 标准功能的 Confluent Schema Registry? [复制]的主要内容,如果未能解决你的问题,请参考以下文章
使用带有 promise 而不是 thunk 的 co 库有啥好处?
使用带有标准输入和标准输出重定向的 2 进程管道时如何避免标准输入上的重复输入