反序列化 Avro Spark
Posted
技术标签:
【中文标题】反序列化 Avro Spark【英文标题】:Deserialize Avro Spark 【发布时间】:2020-03-04 07:15:05 【问题描述】:我正在使用以下代码将数据流推送到 Azure EventHub,并利用 Microsoft.Hadoop.Avro
.. 此代码每 5 秒运行一次,并且简单地将相同的两个 Avro 序列化项 ????????:
var strSchema = File.ReadAllText("schema.json");
var avroSerializer = AvroSerializer.CreateGeneric(strSchema);
var rootSchema = avroSerializer.WriterSchema as RecordSchema;
var itemList = new List<AvroRecord>();
dynamic record_one = new AvroRecord(rootSchema);
record_one.FirstName = "Some";
record_one.LastName = "Guy";
itemList.Add(record_one);
dynamic record_two = new AvroRecord(rootSchema);
record_two.FirstName = "A.";
record_two.LastName = "Person";
itemList.Add(record_two);
using (var buffer = new MemoryStream())
using (var writer = AvroContainer.CreateGenericWriter(strSchema, buffer, Codec.Null))
using (var streamWriter = new SequentialWriter<object>(writer, itemList.Count))
foreach (var item in itemList)
streamWriter.Write(item);
eventHubClient.SendAsync(new EventData(buffer.ToArray()));
这里使用的模式同样是简单的:
"type": "record",
"name": "User",
"namespace": "SerDes",
"fields": [
"name": "FirstName",
"type": "string"
,
"name": "LastName",
"type": "string"
]
我已经验证这一切都很好,在门户上的 Azure 流分析中有一个简单的视图:
到目前为止一切顺利,但我不能,为了我的一生,在 Databricks 中正确反序列化它利用 Scala 下的 from_avro()
命令..
将(完全相同的)模式加载为字符串:
val sampleJsonSchema = dbutils.fs.head("/mnt/schemas/schema.json")
配置 EventHub
val connectionString = ConnectionStringBuilder("<CONNECTION_STRING>")
.setEventHubName("<NAME_OF_EVENT_HUB>")
.build
val eventHubsConf = EventHubsConf(connectionString).setStartingPosition(EventPosition.fromEndOfStream)
val eventhubs = spark.readStream.format("eventhubs").options(eventHubsConf.toMap).load()
读取数据..
// this works, and i can see the serialised data
display(eventhubs.select($"body"))
// this fails, and with an exception: org.apache.spark.SparkException: Malformed records are detected in record parsing. Current parse Mode: FAILFAST. To process malformed records as null result, try setting the option 'mode' as 'PERMISSIVE'.
display(eventhubs.select(from_avro($"body", sampleJsonSchema)))
所以本质上,这里发生了什么..我正在使用与反序列化相同的模式对数据进行序列化,但是格式不正确..在这方面的文档非常稀少(在 Microsoft 网站上非常少)。
【问题讨论】:
【参考方案1】:问题
经过额外调查,(主要是在article 的帮助下)我发现我的问题是:from_avro(data: Column, jsonFormatSchema: String)
需要 spark 模式格式而不是 avro 模式格式。文档对此不是很清楚。
解决方案 1
Databricks 提供了一个方便的方法from_avro(column: Column, subject: String, schemaRegistryUrl: String))
,它从 kafka 模式注册表中获取所需的 avro 模式并自动转换为正确的格式。
不幸的是,它不适用于纯 spark,也无法在没有 kafka 模式注册表的情况下使用它。
解决方案 2
使用spark提供的schema转换:
// define avro deserializer
class AvroDeserializer() extends AbstractKafkaAvroDeserializer
override def deserialize(payload: Array[Byte]): String =
val genericRecord = this.deserialize(payload).asInstanceOf[GenericRecord]
genericRecord.toString
// create deserializer instance
val deserializer = new AvroDeserializer()
// register deserializer
spark.udf.register("deserialize_avro", (bytes: Array[Byte]) =>
deserializer.deserialize(bytes)
)
// get avro schema from registry (but I presume that it should also work with schema read from a local file)
val registryClient = new CachedSchemaRegistryClient(kafkaSchemaRegistryUrl, 128)
val avroSchema = registryClient.getLatestSchemaMetadata(topic + "-value").getSchema
val sparkSchema = SchemaConverters.toSqlType(new Schema.Parser().parse(avroSchema))
// consume data
df.selectExpr("deserialize_avro(value) as data")
.select(from_json(col("data"), sparkSchema.dataType).as("data"))
.select("data.*")
【讨论】:
所以我假设您实际上正在使用模式注册表?回想起来(现在这是一个很老的问题了)我不认为我有一个模式注册表..这意味着你可能正在利用 apache kafka 吗?不过,我会再试一试-仍然在某个地方找到代码?我还会仔细检查我的 spark 版本 附注我在 PySpark 中写了我的东西 ?from_avro
直接支持 Schema 注册表仅适用于 Databricks,因为我记得...现货 Spark 它需要 JSON 模式,您可以通过 HTTP 从注册表获取
是的,你是对的,它可以在 databricks 笔记本中使用,但不能在纯 Spark 中使用:/
编辑更多细节以上是关于反序列化 Avro Spark的主要内容,如果未能解决你的问题,请参考以下文章
在火花结构化流中反序列化 kafka avro 主题的 int 编码无效
使用 kafka lib 反序列化 PRIMITIVE AVRO KEY