将 Spark 结构化流与 Confluent Schema Registry 集成

Posted

技术标签:

【中文标题】将 Spark 结构化流与 Confluent Schema Registry 集成【英文标题】:Integrating Spark Structured Streaming with the Confluent Schema Registry 【发布时间】:2018-07-30 15:58:15 【问题描述】:

我在 Spark Structured Streaming 中使用 Kafka Source 来接收 Confluent 编码的 Avro 记录。我打算使用 Confluent Schema Registry,但与 spark 结构化流的集成似乎是不可能的。

我已经看到了这个问题,但无法让它与 Confluent Schema Registry 一起使用。 Reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)

【问题讨论】:

感谢@VinothChinnasamy,但您的链接是关于经典 spark 流式传输的,我说的是 spark STRUCTURED 流式传输 你需要尊重 kafka spark 集成:spark.apache.org/docs/latest/… @G.Saleh 谢谢你,但你误解了这个问题。 请对 Confluence 问题投票:github.com/confluentinc/schema-registry/issues/755 reading Avro messages from Kafka with Spark 2.0.2 (structured streaming)的可能重复 【参考方案1】:

免责声明

此代码仅在本地主机上进行了测试,据报道在集群环境中遇到了序列化程序问题。有一个alternative solution(步骤 7-9,在步骤 10 中使用 Scala 代码)将模式 ID 提取到列中,查找每个唯一 ID,然后使用模式广播变量,这将在规模上更好地工作。

另外,还有一个external library AbsaOSS/ABRiS 也可以通过 Spark 使用注册表


由于最有用的 the other answer 已被删除,我想通过一些重构和 cmets 重新添加它。

这里是所需的依赖项。 使用 Confluent 5.x 和 Spark 2.4 测试的代码

     <dependency>
            <groupId>io.confluent</groupId>
            <artifactId>kafka-avro-serializer</artifactId>
            <version>$confluent.version</version>
            <exclusions> 
                <!-- Conflicts with Spark's version -->
                <exclusion> 
                    <groupId>org.apache.kafka</groupId>
                    <artifactId>kafka-clients</artifactId>
                </exclusion>
            </exclusions>
     </dependency>
 
    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-sql-kafka-0-10_$scala.version</artifactId>
        <version>$spark.version</version>
    </dependency>

    <dependency>
        <groupId>org.apache.spark</groupId>
        <artifactId>spark-avro_$scala.version</artifactId>
        <version>$spark.version</version>
    </dependency>

这是 Scala 实现(仅在 master=local[*] 本地测试)

第一部分,定义导入、一些字段和一些辅助方法来获取模式

import io.confluent.kafka.schemaregistry.client.CachedSchemaRegistryClient, SchemaRegistryClient
import io.confluent.kafka.serializers.AbstractKafkaAvroDeserializer
import org.apache.avro.Schema
import org.apache.avro.generic.GenericRecord
import org.apache.commons.cli.CommandLine
import org.apache.spark.sql._
import org.apache.spark.sql.avro.SchemaConverters
import org.apache.spark.sql.streaming.OutputMode

object App 

  private var schemaRegistryClient: SchemaRegistryClient = _

  private var kafkaAvroDeserializer: AvroDeserializer = _

  def lookupTopicSchema(topic: String, isKey: Boolean = false) = 
    schemaRegistryClient.getLatestSchemaMetadata(topic + (if (isKey) "-key" else "-value")).getSchema
  

  def avroSchemaToSparkSchema(avroSchema: String) = 
    SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
  

 // ... continues below

然后定义一个简单的 main 方法,解析 CMD args 以获取 Kafka 详细信息

  def main(args: Array[String]): Unit = 
    val cmd: CommandLine = parseArg(args)

    val master = cmd.getOptionValue("master", "local[*]")
    val spark = SparkSession.builder()
      .appName(App.getClass.getName)
      .master(master)
      .getOrCreate()

    val bootstrapServers = cmd.getOptionValue("bootstrap-server")
    val topic = cmd.getOptionValue("topic")
    val schemaRegistryUrl = cmd.getOptionValue("schema-registry")

    consumeAvro(spark, bootstrapServers, topic, schemaRegistryUrl)

    spark.stop()
  


  // ... still continues

然后,消费Kafka主题并反序列化的重要方法

  private def consumeAvro(spark: SparkSession, bootstrapServers: String, topic: String, schemaRegistryUrl: String): Unit = 
    import spark.implicits._

    // Setup the Avro deserialization UDF
    schemaRegistryClient = new CachedSchemaRegistryClient(schemaRegistryUrl, 128)
    kafkaAvroDeserializer = new AvroDeserializer(schemaRegistryClient) 
    spark.udf.register("deserialize", (bytes: Array[Byte]) =>
      kafkaAvroDeserializer.deserialize(bytes)
    )

    // Load the raw Kafka topic (byte stream)
    val rawDf = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", bootstrapServers)
      .option("subscribe", topic)
      .option("startingOffsets", "earliest")
      .load()

    // Deserialize byte stream into strings (Avro fields become JSON)
    import org.apache.spark.sql.functions._
    val jsonDf = rawDf.select(
      // 'key.cast(DataTypes.StringType),  // string keys are simplest to use
      callUDF("deserialize", 'key).as("key"), // but sometimes they are avro
      callUDF("deserialize", 'value).as("value")
      // excluding topic, partition, offset, timestamp, etc
    )

    // Get the Avro schema for the topic from the Schema Registry and convert it into a Spark schema type
    val dfValueSchema = 
      val rawSchema = lookupTopicSchema(topic)
      avroSchemaToSparkSchema(rawSchema)
    

    // Apply structured schema to JSON stream
    val parsedDf = jsonDf.select(
      'key, // keys are usually plain strings
      // values are JSONified Avro records
      from_json('value, dfValueSchema.dataType).alias("value")
    ).select(
      'key,
      $"value.*" // flatten out the value
    )

    // parsedDf.printSchema()

    // Sample schema output
    // root
    // |-- key: string (nullable = true)
    // |-- header: struct (nullable = true)
    // |    |-- time: long (nullable = true)
    // |    ...

    // TODO: Do something interesting with this stream
    parsedDf.writeStream
      .format("console")
      .outputMode(OutputMode.Append())
      .option("truncate", false)
      .start()
      .awaitTermination()
  

 // still continues

命令行解析器允许传入引导服务器、模式注册表、主题名称和 Spark 主服务器。

  private def parseArg(args: Array[String]): CommandLine = 
    import org.apache.commons.cli._

    val options = new Options

    val masterOption = new Option("m", "master", true, "Spark master")
    masterOption.setRequired(false)
    options.addOption(masterOption)

    val bootstrapOption = new Option("b", "bootstrap-server", true, "Bootstrap servers")
    bootstrapOption.setRequired(true)
    options.addOption(bootstrapOption)

    val topicOption = new Option("t", "topic", true, "Kafka topic")
    topicOption.setRequired(true)
    options.addOption(topicOption)

    val schemaRegOption = new Option("s", "schema-registry", true, "Schema Registry URL")
    schemaRegOption.setRequired(true)
    options.addOption(schemaRegOption)

    val parser = new BasicParser
    parser.parse(options, args)
  

  // still continues

为了使上面的 UDF 工作,需要有一个反序列化器将字节的 DataFrame 转换为包含反序列化 Avro 的字节

  // Simple wrapper around Confluent deserializer
  class AvroDeserializer extends AbstractKafkaAvroDeserializer 
    def this(client: SchemaRegistryClient) 
      this()
      // TODO: configure the deserializer for authentication 
      this.schemaRegistry = client
    

    override def deserialize(bytes: Array[Byte]): String = 
      val value = super.deserialize(bytes)
      value match 
        case str: String =>
          str
        case _ =>
          val genericRecord = value.asInstanceOf[GenericRecord]
          genericRecord.toString
      
    
  

 // end 'object App'

将这些块中的每一个放在一起,在将-b localhost:9092 -s http://localhost:8081 -t myTopic 添加到 Run Configurations > Program Arguments

后,它可以在 IntelliJ 中工作

【讨论】:

它在独立集群模式下不起作用..throws 无法执行用户定义的函数(anonfun$consumeAvro$1: (binary) => string) 或此 *** 帖子中的任何有效解决方案?集群模式 嗨@OneCricketeer,您使用的spark.version 是什么?和confluent.version? @Minnie 可能是 2.4.x 和 5.x @Minnie 单引号创建一个Symbol object。获得一列的方法是Column("key")$"key",但这需要更多的输入【参考方案2】:

pyspark 的另一个非常简单的替代方案(不完全支持模式注册,如模式注册、兼容性检查等)可能是:

import requests

from pyspark.sql.types import *
from pyspark.sql.functions import *
from pyspark.sql.avro.functions import *

# variables
topic = "my-topic"
schemaregistry = "http://localhost:8081"
kafka_brokers = "kafka1:9092,kafka2:9092"

# retrieve the latest schema
response = requests.get('/subjects/-value/versions/latest/schema'.format(schemaregistry, topic))

# error check
response.raise_for_status()

# extract the schema from the response
schema = response.text

# run the query
query = spark.readStream.format("kafka") \
    .option("kafka.bootstrap.servers", kafka_brokers) \
    .option("subscribe", topic) \
    .load() \
    # The magic goes here:
    # Skip the first 5 bytes (reserved by schema registry encoding protocol)
    .selectExpr("substring(value, 6) as avro_value") \
    .select(from_avro(col("avro_value"), schema).alias("data")) \
    .select(col("data.my_field")) \
    .writeStream \
    .format("console") \
    .outputMode("complete") \
    .start()

【讨论】:

#魔法在这里:这对我有用。但是为什么我们需要跳过前 5 个字节。 嗨@Venkat,这是必要的,因为 Confluent 为它的内部 wire format 保留了第一个字节【参考方案3】:

根据@cricket_007 的回答,我创建了以下可以在我们的集群环境中运行的解决方案,包括以下新功能:

您需要添加广播变量以将一些值传输到集群环境的地图操作中。 Schema.Parser 和 KafkaAvroDeserializer 都不能在 spark 中序列化,因此您需要在 map 操作中初始化它们 我的结构化流使用了 foreachBatch 输出接收器。 我应用了 org.apache.spark.sql.avro.SchemaConverters 将 avro 模式格式转换为 spark StructType,以便您可以在 from_json 列函数中使用它来解析 Kafka 主题字段(键和值)中的数据帧。李>

首先,你需要加载一些包:

SCALA_VERSION="2.11"
SPARK_VERSION="2.4.4"
CONFLUENT_VERSION="5.2.2"

jars=(
  "org.apache.spark:spark-sql-kafka-0-10_$SCALA_VERSION:$SPARK_VERSION"    ## format("kafka")
  "org.apache.spark:spark-avro_$SCALA_VERSION:$SPARK_VERSION"    ## SchemaConverters
  "io.confluent:kafka-schema-registry:$CONFLUENT_VERSION"   ## import io.confluent.kafka.schemaregistry.client.rest.RestService
  "io.confluent:kafka-avro-serializer:$CONFLUENT_VERSION"   ## import io.confluent.kafka.serializers.KafkaAvroDeserializer
)

./bin/spark-shell --packages $"$jars[*]"// /,

以下是我在 spark-shell 中测试的全部代码:

import org.apache.avro.Schema
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import io.confluent.kafka.schemaregistry.client.rest.RestService

import org.apache.spark.sql.streaming.Trigger
import org.apache.spark.sql.DataFrame
import org.apache.spark.sql.functions._
import org.apache.spark.sql.types._
import org.apache.spark.sql.avro.SchemaConverters

import scala.collection.JavaConverters._
import java.time.LocalDateTime

spark.sparkContext.setLogLevel("Error")

val brokerServers = "xxx.yyy.zzz:9092"
val topicName = "mytopic" 
val schemaRegistryURL = "http://xxx.yyy.zzz:8081"

val restService = new RestService(schemaRegistryURL)

val exParser = new Schema.Parser
//-- For both key and value
val schemaNames = Seq("key", "value")
val schemaStrings = schemaNames.map(i => (i -> restService.getLatestVersion(s"$topicName-$i").getSchema)).toMap
val tempStructMap = schemaStrings.transform((k,v) => SchemaConverters.toSqlType(exParser.parse(v)).dataType)
val schemaStruct = new StructType().add("key", tempStructMap("key")).add("value", tempStructMap("value"))
//-- For key only 
// val schemaStrings = restService.getLatestVersion(s"$topicName-key").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType
//-- For value only 
// val schemaStrings = restService.getLatestVersion(s"$topicName-value").getSchema
// val schemaStruct = SchemaConverters.toSqlType(exParser.parse(schemaStrings)).dataType


val query = spark
  .readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", brokerServers)
  .option("subscribe", topicName)
  .load()
  .writeStream
  .outputMode("append")
  //.option("checkpointLocation", s"cos://$bucket.service/checkpoints/$tableName")
  .foreachBatch((batchDF: DataFrame, batchId: Long) => 

    val bcTopicName = sc.broadcast(topicName)
    val bcSchemaRegistryURL = sc.broadcast(schemaRegistryURL)
    val bcSchemaStrings = sc.broadcast(schemaStrings)
    
    val rstDF = batchDF.map 
      row =>
      
        val props = Map("schema.registry.url" -> bcSchemaRegistryURL.value)
        //-- For both key and value
        val isKeys =  Map("key" -> true, "value" -> false)
        val deserializers = isKeys.transform (k,v) => 
            val des = new KafkaAvroDeserializer
            des.configure(props.asJava, v)
            des 
        
        //-- For key only 
        // val deserializer = new KafkaAvroDeserializer
        // deserializer.configure(props.asJava, true)
        //-- For value only 
        // val deserializer = new KafkaAvroDeserializer
        // deserializer.configure(props.asJava, false)
        

        val inParser = new Schema.Parser
        //-- For both key and value
        val values = bcSchemaStrings.value.transform( (k,v) => 
            deserializers(k).deserialize(bcTopicName.value, row.getAs[Array[Byte]](k), inParser.parse(v)).toString)
        s""""key": $values("key"), "value": $values("value") """
        //-- For key only 
        // deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("key"), inParser.parse(bcSchemaStrings.value)).toString
        //-- For value only 
        // deserializer.deserialize(bcTopicName.value, row.getAs[Array[Byte]]("value"), inParser.parse(bcSchemaStrings.value)).toString  
      
      .select(from_json(col("value"), schemaStruct).as("root"))
      .select("root.*")

    println(s"$LocalDateTime.now --- Batch $batchId: $rstDF.count rows")
    rstDF.printSchema
    rstDF.show(false)    

  )
  .trigger(Trigger.ProcessingTime("60 seconds"))
  .start()

query.awaitTermination()

【讨论】:

由于某种原因,广播字符串在map 中不起作用。为什么? 我不确定您是否需要为每批广播,我相信反序列化也不使用主题名称 嗨 timothyzhang,您不需要像 @OneCricketeer 那样的 UDF? 嗨@timothyzhang,你在你的版本测试中遇到过这个问题吗? ***.com/questions/63846392/…【参考方案4】:

This library 将为您完成这项工作。它通过 Spark Structured Stream 连接到 Confluent Schema Registry。

对于 Confluent,它处理与有效负载一起发送的架构 ID。

在 README 中,您将找到如何执行此操作的代码 sn-p。

披露:我为 ABSA 工作并开发了这个库。

【讨论】:

这个库中的描述似乎不正确,例如在 deripton 中有 2.0.0 版本,但在 maven 中我只看到 1.0.0 我也无法构建项目。我有一个错误:[错误] E:\projects\dvsts\ABRiS\src\test\scala\za\co\absa\abris\avro\read\confluent\ScalaConfluentKafkaAvroDeserializerSpec.scala:113:错误:类 MockedSchemaRegistryClient 需要抽象,因为:[错误] 它有 8 个未实现的成员。 @Mikhail,昨天更新了新版本,可能当您检查 Maven Central 时它尚未同步。你可以在这里找到它:mvnrepository.com/artifact/za.co.absa/abris/2.0.0 很高兴看到这个答案的示例用法 @cricket_007,这个库是否适用于 spark Java api,因为在所有导入之后我无法获得 fromavro 方法。可以吗?【参考方案5】:

这是我将 spark 结构化流与 kafka 和模式注册表集成的代码示例(scala 中的代码)

import org.apache.spark.sql.SparkSession
import io.confluent.kafka.schemaregistry.client.rest.RestService // <artifactId>kafka-schema-registry</artifactId>
import org.apache.spark.sql.avro.from_avro // <artifactId>spark-avro_$scala.compat.version</artifactId>
import org.apache.spark.sql.functions.col

object KafkaConsumerAvro 

  def main(args: Array[String]): Unit = 

    val KAFKA_BOOTSTRAP_SERVERS = "localhost:9092"
    val SCHEMA_REGISTRY_URL = "http://localhost:8081"
    val TOPIC = "transactions"

    val spark: SparkSession = SparkSession.builder().appName("KafkaConsumerAvro").getOrCreate()
    spark.sparkContext.setLogLevel("ERROR")

    val df = spark.readStream
      .format("kafka")
      .option("kafka.bootstrap.servers", KAFKA_BOOTSTRAP_SERVERS)
      .option("subscribe", TOPIC)
      .option("startingOffsets", "earliest") // from starting
      .load()

//     Prints Kafka schema with columns (topic, offset, partition e.t.c)
    df.printSchema()

//    Create REST service to access schema registry and retrieve topic schema (latest)
    val restService = new RestService(SCHEMA_REGISTRY_URL)
    val valueRestResponseSchema = restService.getLatestVersion(TOPIC + "-value")
    val jsonSchema = valueRestResponseSchema.getSchema

    val transactionDF = df.select(
      col("key").cast("string"), // cast to string from binary value
      from_avro(col("value"), jsonSchema).as("transaction"), // convert from avro value
      col("topic"),
      col("offset"),
      col("timestamp"),
      col("timestampType"))
    transactionDF.printSchema()

//    Stream data to console for testing
    transactionDF.writeStream
      .format("console")
      .outputMode("append")
      .start()
      .awaitTermination()
  


从 kafka 主题读取时,我们有这种模式:

键:二进制 |值:二进制 |主题:字符串 |分区:整数 |偏移量:长 |时间戳:时间戳 |时间戳类型:整数 |

正如我们所见,key 和 value 是二进制的,所以我们需要将 key 转换为字符串,在这种情况下,value 是 avro 格式的,所以我们可以通过调用 from_avro 函数来实现。

除了 Spark 和 Kafka 依赖之外,我们还需要这个依赖:

<!-- READ AND WRITE AVRO DATA -->
<dependency>
  <groupId>org.apache.spark</groupId>
  <artifactId>spark-avro_$scala.compat.version</artifactId>
  <version>$spark.version</version>
</dependency>
<!-- INTEGRATION WITH SCHEMA REGISTRY -->
<dependency>
  <groupId>io.confluent</groupId>
  <artifactId>kafka-schema-registry</artifactId>
  <version>$confluent.version</version>
</dependency>

【讨论】:

能否请您解释一下我们如何在您的程序中传递架构注册表凭据以备不时之需? 我不需要对架构注册表进行身份验证,但我找到了以下信息:(docs.confluent.io/current/schema-registry/security/index.html),在此链接中,您可以配置架构注册表授权以与 RBAC Kafka 集群通信. (docs.confluent.io/current/schema-registry/security/…) 要传递架构注册表凭据,请参阅此答案:***.com/a/58930199/6002794 这将在独立集群或纱线模式下工作吗?【参考方案6】:

我花了几个月的时间阅读源代码并进行测试。简而言之,Spark 只能处理 String 和 Binary 序列化。您必须手动反序列化数据。在 spark 中,创建 confluent rest 服务对象以获取 schema。使用 Avro 解析器将响应对象中的模式字符串转换为 Avro 模式。接下来,照常阅读Kafka主题。然后使用 Confluent KafkaAvroDeSerializer 映射二进制类型的“值”列。我强烈建议进入这些类的源代码,因为这里有很多内容,所以为简洁起见,我将省略很多细节。

//Used Confluent version 3.2.2 to write this. 
import io.confluent.kafka.schemaregistry.client.rest.RestService
import io.confluent.kafka.serializers.KafkaAvroDeserializer
import org.apache.avro.Schema

case class DeserializedFromKafkaRecord(key: String, value: String)

val schemaRegistryURL = "http://127.0.0.1:8081"

val topicName = "Schema-Registry-Example-topic1"
val subjectValueName = topicName + "-value"

//create RestService object
val restService = new RestService(schemaRegistryURL)

//.getLatestVersion returns io.confluent.kafka.schemaregistry.client.rest.entities.Schema object.
val valueRestResponseSchema = restService.getLatestVersion(subjectValueName)

//Use Avro parsing classes to get Avro Schema
val parser = new Schema.Parser
val topicValueAvroSchema: Schema = parser.parse(valueRestResponseSchema.getSchema)

//key schema is typically just string but you can do the same process for the key as the value
val keySchemaString = "\"string\""
val keySchema = parser.parse(keySchemaString)

//Create a map with the Schema registry url.
//This is the only Required configuration for Confluent's KafkaAvroDeserializer.
val props = Map("schema.registry.url" -> schemaRegistryURL)

//Declare SerDe vars before using Spark structured streaming map. Avoids non serializable class exception.
var keyDeserializer: KafkaAvroDeserializer = null
var valueDeserializer: KafkaAvroDeserializer = null

//Create structured streaming DF to read from the topic.
val rawTopicMessageDF = sql.readStream
  .format("kafka")
  .option("kafka.bootstrap.servers", "127.0.0.1:9092")
  .option("subscribe", topicName)
  .option("startingOffsets", "earliest")
  .option("maxOffsetsPerTrigger", 20)  //remove for prod
  .load()

//instantiate the SerDe classes if not already, then deserialize!
val deserializedTopicMessageDS = rawTopicMessageDF.map
  row =>
    if (keyDeserializer == null) 
      keyDeserializer = new KafkaAvroDeserializer
      keyDeserializer.configure(props.asJava, true)  //isKey = true
    
    if (valueDeserializer == null) 
      valueDeserializer = new KafkaAvroDeserializer
      valueDeserializer.configure(props.asJava, false) //isKey = false
    

    //Pass the Avro schema.
    val deserializedKeyString = keyDeserializer.deserialize(topicName, row.key, keySchema).toString //topic name is actually unused in the source code, just required by the signature. Weird right?
    val deserializedValueString = valueDeserializer.deserialize(topicName, row.value, topicValueAvroSchema).toString

    DeserializedFromKafkaRecord(deserializedKeyString, deserializedValueString)


val deserializedDSOutputStream = deserializedTopicMessageDS.writeStream
    .outputMode("append")
    .format("console")
    .option("truncate", false)
    .start()

【讨论】:

你能详细说明一下评论topic name is actually unused in the source code, just required by the signature. Weird right? 似乎反序列化方法的签名需要一个字符串,但它在函数体中未使用。 KafkaAvroDeserializer.java 嗨,我正在尝试实现相同的代码。我在 keyDeserializer.deserialize(topicName, row.key, keySchema).toString 遇到异常,说 keySchema 是 org.apache.avro.Schema,其中需要的是 Array[Byte]。检查了它看起来需要 Array[Byte] github.com/confluentinc/schema-registry/blob/master/… 的源代码。我在这里缺少什么? @tstites,我无法在任何融合存储库中找到 io.confluent.kafka.schemaregistry.client.rest.RestService 这个包,你能给出这个包的这个 jar 或 mvn 存储库的位置吗? @Karthikeyan github.com/confluentinc/schema-registry/blob/master/client/src/… 是io.confluent:kafka-schema-registry-client 的一部分,并且repo 在这里docs.confluent.io/current/clients/…【参考方案7】:

对于任何想要在pyspark 中执行此操作的人:felipe 引用的库之前对我来说在 JVM 上运行良好,所以我编写了一个将它集成到 python 中的小型包装函数。这看起来很 hacky,因为 scala 语言中隐含的许多类型必须在 py4j 中显式指定。不过,到目前为止一直运行良好,即使在 spark 2.4.1 中也是如此。

def expand_avro(spark_context, sql_context, data_frame, schema_registry_url, topic):
    j = spark_context._gateway.jvm
    dataframe_deserializer = j.za.co.absa.abris.avro.AvroSerDe.DataframeDeserializer(data_frame._jdf)
    naming_strategy = getattr(
        getattr(j.za.co.absa.abris.avro.read.confluent.SchemaManager,
                "SchemaStorageNamingStrategies$"), "MODULE$").TOPIC_NAME()
    conf = getattr(getattr(j.scala.collection.immutable.Map, "EmptyMap$"), "MODULE$")
    conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.url", schema_registry_url))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("schema.registry.topic", topic))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.id", "latest"))
    conf = getattr(conf, "$plus")(j.scala.Tuple2("value.schema.naming.strategy", naming_strategy))
    schema_path = j.scala.Option.apply(None)
    conf = j.scala.Option.apply(conf)
    policy = getattr(j.za.co.absa.abris.avro.schemas.policy.SchemaRetentionPolicies, "RETAIN_SELECTED_COLUMN_ONLY$")()
    data_frame = dataframe_deserializer.fromConfluentAvro("value", schema_path, conf, policy)
    data_frame = DataFrame(data_frame, sql_context)
    return data_frame

为此,您必须将库添加到 spark 包中,例如

os.environ['PYSPARK_SUBMIT_ARGS'] = '--packages ' \
    'org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.1,' \
    'org.apache.spark:spark-avro_2.11:2.4.1,' \
    'za.co.absa:abris_2.11:2.2.2 ' \
    '--repositories https://packages.confluent.io/maven/ ' \
    'pyspark-shell'

【讨论】:

我们如何在 spark 结构化流中使用这个函数,我有 spark 2.3.2 没有 from_avro 和 to_avro 函数可用 @Rafa 然后你需要添加databricks spark-avro 库【参考方案8】:

Databricks 现在提供此功能,但您必须为此付费 :-(

dataDF
  .select(
    to_avro($"key", lit("t-key"), schemaRegistryAddr).as("key"),
    to_avro($"value", lit("t-value"), schemaRegistryAddr).as("value"))
  .writeStream
  .format("kafka")
  .option("kafka.bootstrap.servers", servers)
  .option("topic", "t")
  .save()

见: https://docs.databricks.com/spark/latest/structured-streaming/avro-dataframe.html了解更多信息

一个不错的免费替代品是 ABRIS。请参阅:https://github.com/AbsaOSS/ABRiS 我们可以看到的唯一缺点是您需要在运行时提供 avro 模式文件,以便框架可以在将数据帧发布到 Kafka 主题之前在您的数据帧上强制执行此模式。

【讨论】:

只有 Databricks 支持注册表,Apache Spark 本身不支持 Databricks 是否支持 Confluent 的 Schema Registry?或其他类型的模式注册表。如果您可以使用 Databricks,有人知道如何传递模式注册表凭据。我这样说是因为我找到的示例没有对此发表评论。 @xav 是的。 Databricks 与 Confluent 合作支持 Avro + Schema Registry 功能

以上是关于将 Spark 结构化流与 Confluent Schema Registry 集成的主要内容,如果未能解决你的问题,请参考以下文章

Apache Spark 结构化流与 Apache Flink:有啥区别?

Spark 2.3.1结构化流kafka ClassNotFound [重复]

Spark Confluent 模式注册表客户端 - 无法识别的字段“schemaType”

使用 Avro 序列化器将 Spark Structured Streaming 数据发送到 Confluent Kafka

企业版Spark Databricks + 企业版Kafka Confluent 联合高效挖掘数据价值

无法使用 Confluent Elasticsearch 接收器连接器将 Kafka 主题数据转换为结构化 JSON