序列化avro schema
Posted 永远相信神话
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了序列化avro schema相关的知识,希望对你有一定的参考价值。
// 将schema转换为字节数组
val schemaBytes: Array[Byte] = schema.toString.getBytes(StandardCharsets.UTF_8)
// 将字节数组包装为可序列化的对象
val serializableSchemaBytes: SerializableBytes = new SerializableBytes(schemaBytes)
// 将字节数组反序列化为schema对象
val schemaString = new String(serializableSchemaBytes.bytes, StandardCharsets.UTF_8)
val schema = new Schema.Parser().parse(schemaString)
import java.io.ByteArrayInputStream, ByteArrayOutputStream, ObjectInputStream, ObjectOutputStream, Serializable
class SerializableBytes(val bytes: Array[Byte]) extends Serializable
private def writeObject(out: ObjectOutputStream): Unit =
out.writeInt(bytes.length)
out.write(bytes)
private def readObject(in: ObjectInputStream): Unit =
val length = in.readInt()
val buffer = new Array[Byte](length)
in.readFully(buffer)
bytes = buffer
def getInputStream: ByteArrayInputStream = new ByteArrayInputStream(bytes)
def getOutputStream: ByteArrayOutputStream = new ByteArrayOutputStream(bytes.length)
需要将SerializableBytes类定义在Spark应用程序的可访问范围内,例如定义在主类或者全局对象中。另外,由于Schema对象在executor端反序列化时会重新创建,因此需要确保Schema类的定义也在executor端的类路径下。
NotSerializableException: org.apache.avro.Schema$RecordSchema
以上是关于序列化avro schema的主要内容,如果未能解决你的问题,请参考以下文章