序列化avro schema

Posted 永远相信神话

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了序列化avro schema相关的知识,希望对你有一定的参考价值。

// 将schema转换为字节数组

val schemaBytes: Array[Byte] = schema.toString.getBytes(StandardCharsets.UTF_8)

// 将字节数组包装为可序列化的对象

val serializableSchemaBytes: SerializableBytes = new SerializableBytes(schemaBytes)

// 将字节数组反序列化为schema对象

val schemaString = new String(serializableSchemaBytes.bytes, StandardCharsets.UTF_8)

val schema = new Schema.Parser().parse(schemaString)

import java.io.ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream, Serializable

class SerializableBytes(val bytes: Array[Byte]) extends Serializable

  private def writeObject(out: ObjectOutputStream): Unit =

    out.writeInt(bytes.length)

    out.write(bytes)

 

  private def readObject(in: ObjectInputStream): Unit =

    val length = in.readInt()

    val buffer = new Array[Byte](length)

    in.readFully(buffer)

    bytes = buffer

 

  def getInputStream: ByteArrayInputStream = new ByteArrayInputStream(bytes)

  def getOutputStream: ByteArrayOutputStream = new ByteArrayOutputStream(bytes.length)

需要将SerializableBytes类定义在Spark应用程序的可访问范围内,例如定义在主类或者全局对象中。另外,由于Schema对象在executor端反序列化时会重新创建,因此需要确保Schema类的定义也在executor端的类路径下。

NotSerializableException: org.apache.avro.Schema$RecordSchema

以上是关于序列化avro schema的主要内容,如果未能解决你的问题,请参考以下文章

avro序列化详细操作

在avro序列化数据上构建搜索层索引

反序列化 Avro Spark

json文档缺少字段的Avro序列化问题

在 Python 中使用“SchemaRegistryClient”反序列化 AVRO 消息

使用avro序列化和反序列化