如何使用带有 from_avro 标准功能的 Confluent Schema Registry? [复制]

Posted

技术标签:

【中文标题】如何使用带有 from_avro 标准功能的 Confluent Schema Registry? [复制]【英文标题】:How to use Confluent Schema Registry with from_avro standard function? [duplicate] 【发布时间】:2020-01-16 21:35:04 【问题描述】:

My Kafka 和 Schema Registry 基于 Confluent Community Platform 5.2.2,My Spark 有 2.4.4 版本。我开始使用 Spark REPL env:

./bin/spark-shell --packages org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.4,org.apache.spark:spark-avro_2.11:2.4.4

并为 Spark 会话设置 Kafka 源:

val brokerServers = "my_confluent_server:9092"
val topicName = "my_kafka_topic_name" 
val df = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load()

我得到了关于键和值的架构信息:

import io.confluent.kafka.schemaregistry.client.rest.RestService
val schemaRegistryURL = "http://my_confluent_server:8081"
val restService = new RestService(schemaRegistryURL)
val keyRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-key")
val valueRestResponseSchemaStr: String = restService.getLatestVersionSchemaOnly(topicName + "-value")

首先,如果我用 writeStream 查询“key”,即

import org.apache.spark.sql.avro._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import java.time.LocalDateTime
val query = df.writeStream
  .outputMode("append")
  .foreachBatch((batchDF: DataFrame, batchId: Long) => 
    val rstDF = batchDF
      .select(
        from_avro($"key", keyRestResponseSchemaStr).as("key"),
        from_avro($"value", valueRestResponseSchemaStr).as("value"))

    println(s"$LocalDateTime.now --- Batch $batchId, $batchDF.count rows")
    //rstDF.select("value").show
    rstDF.select("key").show
  )
  .trigger(Trigger.ProcessingTime("120 seconds"))
  .start()

query.awaitTermination()

没有错误,甚至显示了行数,但我无法获得任何数据。

2019-09-16T10:30:16.984 --- Batch 0, 0 rows
+---+
|key|
+---+
+---+

2019-09-16T10:32:00.401 --- Batch 1, 27 rows
+---+
|key|
+---+
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
| []|
+---+
only showing top 20 rows

但如果我选择“”:

import org.apache.spark.sql.avro._
import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import java.time.LocalDateTime
val query = df.writeStream
  .outputMode("append")
  .foreachBatch((batchDF: DataFrame, batchId: Long) => 
    val rstDF = batchDF
      .select(
        from_avro($"key", keyRestResponseSchemaStr).as("key"),
        from_avro($"value", valueRestResponseSchemaStr).as("value"))

    println(s"$LocalDateTime.now --- Batch $batchId, $batchDF.count rows")
    rstDF.select("value").show
    //rstDF.select("key").show
  )
  .trigger(Trigger.ProcessingTime("120 seconds"))
  .start()

query.awaitTermination()

我收到消息:

2019-09-16T10:34:54.287 --- Batch 0, 0 rows
+-----+
|value|
+-----+
+-----+

2019-09-16T10:36:00.416 --- Batch 1, 19 rows
19/09/16 10:36:03 ERROR Executor: Exception in task 0.0 in stage 4.0 (TID 3)
org.apache.avro.AvroRuntimeException: Malformed data. Length is negative: -1
    at org.apache.avro.io.BinaryDecoder.doReadBytes(BinaryDecoder.java:336)
    at org.apache.avro.io.BinaryDecoder.readString(BinaryDecoder.java:263)
    at org.apache.avro.io.ResolvingDecoder.readString(ResolvingDecoder.java:201)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:422)
    at org.apache.avro.generic.GenericDatumReader.readString(GenericDatumReader.java:414)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:181)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.readField(GenericDatumReader.java:232)
    at org.apache.avro.generic.GenericDatumReader.readRecord(GenericDatumReader.java:222)
    at org.apache.avro.generic.GenericDatumReader.readWithoutConversion(GenericDatumReader.java:175)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:153)
    at org.apache.avro.generic.GenericDatumReader.read(GenericDatumReader.java:145)
    at org.apache.spark.sql.avro.AvroDataToCatalyst.nullSafeEval(AvroDataToCatalyst.scala:50)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.serializefromobject_doConsume_0$(Unknown Source)
    at org.apache.spark.sql.catalyst.expressions.GeneratedClass$GeneratedIteratorForCodegenStage1.processNext(Unknown Source)
    at org.apache.spark.sql.execution.BufferedRowIterator.hasNext(BufferedRowIterator.java:43)
    at org.apache.spark.sql.execution.WholeStageCodegenExec$$anonfun$13$$anon$1.hasNext(WholeStageCodegenExec.scala:636)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:255)
    at org.apache.spark.sql.execution.SparkPlan$$anonfun$2.apply(SparkPlan.scala:247)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.RDD$$anonfun$mapPartitionsInternal$1$$anonfun$apply$24.apply(RDD.scala:836)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.rdd.MapPartitionsRDD.compute(MapPartitionsRDD.scala:52)
    at org.apache.spark.rdd.RDD.computeOrReadCheckpoint(RDD.scala:324)
    at org.apache.spark.rdd.RDD.iterator(RDD.scala:288)
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:90)
    at org.apache.spark.scheduler.Task.run(Task.scala:123)
    at org.apache.spark.executor.Executor$TaskRunner$$anonfun$10.apply(Executor.scala:408)
    at org.apache.spark.util.Utils$.tryWithSafeFinally(Utils.scala:1360)
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:414)
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1149)
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:624)
    at java.lang.Thread.run(Thread.java:748)

所以我认为问题有两个层面:

    首先,keyvalue有不同的avro反序列化逻辑,目前“from_avro”只支持key,而不是比

    即使是key,也没有错误,但是“from_avro”的反序列化器无法获取真实数据。

你认为我有什么错误的步骤吗?或者,是否需要增强 from_avro 和 to_avro?

谢谢。

【问题讨论】:

【参考方案1】:

您的键和值完全是字节数组,并以整数值作为其 ID 的前缀。 Spark-Avro 不支持该格式,仅支持包含模式作为记录一部分的“Avro 容器对象”格式。

换句话说,你需要invoke the functions from Confluent deserializers,而不是“plain Avro”反序列化器,以便首先获取 Avro 对象,然后你可以在这些对象上放置模式

Spark 应该增强 from_avro 和 to_avro?

他们应该,但他们不会。参考SPARK-26314。旁注,Databricks 确实提供与同名功能的模式注册表集成,只是为了增加混乱

解决方法是使用这个库 - https://github.com/AbsaOSS/ABRiS

或在Integrating Spark Structured Streaming with the Confluent Schema Registry查看其他解决方案

【讨论】:

非常感谢,cricket_007。实际上,我在最后一个链接中提到了您的解决方案。我目前的解决方法是使用 KafkaAvroDeserializer。 然后使用 spark.read.json 将值转换为 DataFrame

以上是关于如何使用带有 from_avro 标准功能的 Confluent Schema Registry? [复制]的主要内容,如果未能解决你的问题,请参考以下文章

铯 - 点击后,将地图指向标准标记,并带有图标

使用带有 promise 而不是 thunk 的 co 库有啥好处?

使用带有标准输入和标准输出重定向的 2 进程管道时如何避免标准输入上的重复输入

SAP MM账期开关功能设置

如何使用带有 Java 代码的 WEKA 评估类来显示标准差值

如何使用标准 C#.NET 发送带有“apns-expiration”标头的推送通知?