使用 Avro 序列化器将 Spark Structured Streaming 数据发送到 Confluent Kafka

Posted 永远相信神话

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用 Avro 序列化器将 Spark Structured Streaming 数据发送到 Confluent Kafka相关的知识,希望对你有一定的参考价值。

import io.confluent.kafka.serializers.AbstractKafkaAvroSerDeConfig, KafkaAvroSerializer

import org.apache.avro.Schema

import org.apache.avro.generic.GenericRecord

import org.apache.kafka.clients.producer.KafkaProducer, ProducerConfig, ProducerRecord

import org.apache.kafka.common.serialization.StringSerializer

import org.apache.spark.sql.DataFrame, ForeachWriter

import org.apache.spark.sql.streaming.StreamingQuery

import org.apache.spark.sql.types.StructType

 

// 定义 Avro 的 Schema,这里假设发送的数据包含一个 "name" 字段和一个 "age" 字段

val avroSchema = new Schema.Parser().parse("""

  "type": "record",

  "namespace": "example.avro",

  "name": "User",

  "fields": [

    "name": "name", "type": "string",

    "name": "age", "type": "int"

  ]

""")

 

// 定义 Avro 序列化器的配置信息

val avroSerializerConfig = Map[String, String](

  AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"

)

 

// 定义 Kafka 生产者的配置信息

val kafkaProducerConfig = Map[String, Object](

  ProducerConfig.BOOTSTRAP_SERVERS_CONFIG -> "localhost:9092",

  ProducerConfig.KEY_SERIALIZER_CLASS_CONFIG -> classOf[StringSerializer],

  ProducerConfig.VALUE_SERIALIZER_CLASS_CONFIG -> classOf[KafkaAvroSerializer],

  AbstractKafkaAvroSerDeConfig.SCHEMA_REGISTRY_URL_CONFIG -> "http://localhost:8081"

)

 

// 定义一个自定义的 ForeachWriter,用于将数据发送到 Kafka

class KafkaAvroForeachWriter(topic: String) extends ForeachWriter[GenericRecord]

  var producer: KafkaProducer[String, GenericRecord] = _

 

  override def open(partitionId: Long, epochId: Long): Boolean =

    producer = new KafkaProducer[String, GenericRecord](kafkaProducerConfig.asJava)

    true

 

 

  override def process(record: GenericRecord): Unit =

    val producerRecord = new ProducerRecord[String, GenericRecord](topic, record)

    producer.send(producerRecord)

 

 

  override def close(errorOrNull: Throwable): Unit =

    producer.close()

 

 

// 定义 Structured Streaming 查询

val df: DataFrame = spark.readStream

  .format("kafka")

  .option("kafka.bootstrap.servers", "localhost:9092")

  .option("subscribe", "input_topic")

  .option("startingOffsets", "earliest")

  .load()

  .selectExpr("CAST(value AS STRING)") // 假设数据格式为 JSON

  .select(from_json($"value", avroSchema).as("data"))

  .select("data.*")

 

val query: StreamingQuery = df.writeStream

  .foreach(new KafkaAvroForeachWriter("output_topic"))

  .start()

 

query.awaitTermination()

import org.apache.spark.sql.Row

import org.apache.spark.sql.avro._

import org.apache.spark.sql.ForeachWriter

import org.apache.avro.generic.GenericRecord

 

class KafkaForeachWriter(topic: String, schemaString: String, kafkaParams: Map[String, Object]) extends ForeachWriter[Row]

 

  var kafkaProducer: KafkaProducer[GenericRecord, GenericRecord] = _

  var schema: Schema = _

 

  def open(partitionId: Long, version: Long): Boolean =

    // Setup Kafka Producer

    val kafkaConf = new KafkaProducerConfig(kafkaParams)

    kafkaProducer = new KafkaProducer[GenericRecord, GenericRecord](kafkaConf)

 

    // Setup Avro schema

    schema = new Schema.Parser().parse(schemaString)

 

    true

 

 

  def process(row: Row): Unit =

    // Convert Row to GenericRecord

    val genericRecord = AvroSerializer.rowToGenericRecord(row, schema)

 

    // Create Kafka record

    val record = new ProducerRecord[GenericRecord, GenericRecord](topic, genericRecord, genericRecord)

 

    // Send to Kafka

    kafkaProducer.send(record)

 

 

  def close(errorOrNull: Throwable): Unit =

    // Close Kafka Producer

    kafkaProducer.close()

 

 

Caused by: java.io.NotSerializableException: org.apache.avro.Schema$RecordSchema Serialization stack: - object not serializable (class: org.apache.avro.Schema$RecordSchema, value: "type":"record","name":"topLevelRecord","fields":["name":"avro_value","type":"type":"record","name":"avro_value","namespace":"topLevelRecord","fields":["name":"tb_name","type":"string","name":"customer_id","type":["string","null"]]])

class AvroRecordForeachWriter(schemaString: String, kafkaParams: Map[String, Object], topic: String)

    extends ForeachWriter[Row] with Serializable

 

  private var producer: KafkaProducer[Array[Byte], Array[Byte]] = _

  private var avroSchema: Schema = _

 

  override def open(partitionId: Long, version: Long): Boolean =

    producer = new KafkaProducer[Array[Byte], Array[Byte]](kafkaParams.asJava)

    avroSchema = new Schema.Parser().parse(schemaString)

    true

 

 

  override def process(value: Row): Unit =

    val genericRecord = new GenericData.Record(avroSchema)

    for (i <- 0 until value.length)

      val field = avroSchema.getFields.get(i)

      val fieldName = field.name()

      val fieldType = field.schema().getType

      val fieldValue = value.get(i)

      if (fieldType == Schema.Type.ARRAY)

        val avroArray = new GenericData.Array[AnyRef](field.schema(), fieldValue.asInstanceOf[Seq[_]].asJava)

        genericRecord.put(fieldName, avroArray)

      else

        genericRecord.put(fieldName, fieldValue)

     

   

    val writer = new ByteArrayOutputStream()

    val encoder = EncoderFactory.get().binaryEncoder(writer, null)

    val datumWriter = new GenericDatumWriter[GenericRecord](avroSchema)

    datumWriter.write(genericRecord, encoder)

    encoder.flush()

    writer.close()

    val message = new ProducerRecord[Array[Byte], Array[Byte]](topic, writer.toByteArray)

    producer.send(message)

 

 

  override def close(errorOrNull: Throwable): Unit =

    producer.close()

 

 

反序列化 Avro Spark

【中文标题】反序列化 Avro Spark【英文标题】:Deserialize Avro Spark 【发布时间】:2020-03-04 07:15:05 【问题描述】:

我正在使用以下代码将数据流推送到 Azure EventHub,并利用 Microsoft.Hadoop.Avro.. 此代码每 5 秒运行一次,并且简单地将相同的两个 Avro 序列化项 ????????:

  var strSchema = File.ReadAllText("schema.json");
  var avroSerializer = AvroSerializer.CreateGeneric(strSchema);
  var rootSchema = avroSerializer.WriterSchema as RecordSchema;

  var itemList = new List<AvroRecord>();

  dynamic record_one = new AvroRecord(rootSchema);
  record_one.FirstName = "Some";
  record_one.LastName = "Guy";
  itemList.Add(record_one);

  dynamic record_two = new AvroRecord(rootSchema);
  record_two.FirstName = "A.";
  record_two.LastName = "Person";
  itemList.Add(record_two);

  using (var buffer = new MemoryStream())
  
      using (var writer = AvroContainer.CreateGenericWriter(strSchema, buffer, Codec.Null))
      
          using (var streamWriter = new SequentialWriter<object>(writer, itemList.Count))
          
              foreach (var item in itemList)
              
                  streamWriter.Write(item);
              
          
      

      eventHubClient.SendAsync(new EventData(buffer.ToArray()));
  

这里使用的模式同样是简单的:


  "type": "record",
  "name": "User",
  "namespace": "SerDes",
  "fields": [
    
      "name": "FirstName",
      "type": "string"
    ,
    
      "name": "LastName",
      "type": "string"
    
  ]

我已经验证这一切都很好,在门户上的 Azure 流分析中有一个简单的视图:

到目前为止一切顺利,但我不能,为了我的一生,在 Databricks 中正确反序列化它利用 Scala 下的 from_avro() 命令..

将(完全相同的)模式加载为字符串:

val sampleJsonSchema = dbutils.fs.head("/mnt/schemas/schema.json")

配置 EventHub

val connectionString = ConnectionStringBuilder("<CONNECTION_STRING>")
  .setEventHubName("<NAME_OF_EVENT_HUB>")
  .build

val eventHubsConf = EventHubsConf(connectionString).setStartingPosition(EventPosition.fromEndOfStream)
val eventhubs = spark.readStream.format("eventhubs").options(eventHubsConf.toMap).load()

读取数据..

// this works, and i can see the serialised data
display(eventhubs.select($"body"))

// this fails, and with an exception: org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
display(eventhubs.select(from_avro($"body", sampleJsonSchema)))

所以本质上,这里发生了什么..我正在使用与反序列化相同的模式对数据进行序列化,但是格式不正确..在这方面的文档非常稀少(在 Microsoft 网站上非常少)。

【问题讨论】:

【参考方案1】:

问题

经过额外调查,(主要是在article 的帮助下)我发现我的问题是:from_avro(data: Column, jsonFormatSchema: String) 需要 spark 模式格式而不是 avro 模式格式。文档对此不是很清楚。

解决方案 1

Databricks 提供了一个方便的方法from_avro(column: Column, subject: String, schemaRegistryUrl: String)),它从 kafka 模式注册表中获取所需的 avro 模式并自动转换为正确的格式。

不幸的是,它不适用于纯 spark,也无法在没有 kafka 模式注册表的情况下使用它。

解决方案 2

使用spark提供的schema转换:

// define avro deserializer
class AvroDeserializer() extends AbstractKafkaAvroDeserializer 
  override def deserialize(payload: Array[Byte]): String = 
    val genericRecord = this.deserialize(payload).asInstanceOf[GenericRecord]
    genericRecord.toString
  


// create deserializer instance
val deserializer = new AvroDeserializer()

// register deserializer
spark.udf.register("deserialize_avro", (bytes: Array[Byte]) =>
  deserializer.deserialize(bytes)
)

// get avro schema from registry (but I presume that it should also work with schema read from a local file)
val registryClient = new CachedSchemaRegistryClient(kafkaSchemaRegistryUrl, 128)
val avroSchema = registryClient.getLatestSchemaMetadata(topic + "-value").getSchema
val sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))

// consume data 
df.selectExpr("deserialize_avro(value) as data")
  .select(from_json(col("data"), sparkSchema.dataType).as("data"))
  .select("data.*")

【讨论】:

所以我假设您实际上正在使用模式注册表?回想起来(现在这是一个很老的问题了)我不认为我有一个模式注册表..这意味着你可能正在利用 apache kafka 吗?不过,我会再试一试-仍然在某个地方找到代码?我还会仔细检查我的 spark 版本 附注我在 PySpark 中写了我的东西 ? from_avro 直接支持 Schema 注册表仅适用于 Databricks,因为我记得...现货 Spark 它需要 JSON 模式,您可以通过 HTTP 从注册表获取 是的,你是对的,它可以在 databricks 笔记本中使用,但不能在纯 Spark 中使用:/ 编辑更多细节

以上是关于使用 Avro 序列化器将 Spark Structured Streaming 数据发送到 Confluent Kafka的主要内容,如果未能解决你的问题,请参考以下文章

反序列化 Avro Spark

Spark sql怎么使用Kafka Avro序列化器

如何使用 spark-avro 包从 spark-shell 读取 avro 文件?

在火花结构化流中反序列化 kafka avro 主题的 int 编码无效

Thrift、Avro、Protocolbuffers - 他们都死了吗?

Spark:使用 Spark Scala 从 Kafka 读取 Avro 消息