在 Java 中,如何创建等效的 Apache Avro 容器文件,而不必强制使用文件作为媒介?

Posted

技术标签:

【中文标题】在 Java 中,如何创建等效的 Apache Avro 容器文件,而不必强制使用文件作为媒介?【英文标题】:In Java, how can I create an equivalent of an Apache Avro container file without being forced to use a File as a medium? 【发布时间】:2011-11-24 04:13:19 【问题描述】:

如果任何精通 Apache Avro 的 Java 实现的人正在阅读这篇文章,这在某种程度上是在黑暗中拍摄。

我的高级目标是通过某种方式在网络上传输一系列 avro 数据(例如 HTTP,但特定协议对于此目的并不那么重要)。在我的上下文中,我有一个 HttpServletResponse 我需要以某种方式将此数据写入。

我最初尝试将数据写入相当于 avro 容器文件的虚拟版本(假设“响应”的类型为 HttpServletResponse):

response.setContentType("application/octet-stream");
response.setHeader("Content-transfer-encoding", "binary");
ServletOutputStream outStream = response.getOutputStream();
BufferedOutputStream bos = new BufferedOutputStream(outStream);

Schema someSchema = Schema.parse(".....some valid avro schema....");
GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("somefield", someData);
...

GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
DataFileWriter<GenericRecord> fileWriter = new DataFileWriter<GenericRecord>(datumWriter);
fileWriter.create(someSchema, bos);
fileWriter.append(someRecord);
fileWriter.close();
bos.flush();

这一切都很好而且很花哨,但事实证明 Avro 并没有真正提供一种方法来读取除了实际文件之外的容器文件:DataFileReader 只有两个构造函数:

public DataFileReader(File file, DatumReader<D> reader);

public DataFileReader(SeekableInput sin, DatumReader<D> reader);

其中 SeekableInput 是一些特定于 avro 的自定义表单,其创建最终也会从文件中读取。现在考虑到这一点,除非有某种方法可以将 InputStream 强制转换为文件(http://***.com/questions/578305/create-a-java-file-object-or-equivalent-using-a-byte- array-in-memory-without-a 表示没有,我也尝试过查看 Java 文档),如果 OutputStream 另一端的阅读器接收到该 avro 容器文件(我不确定为什么他们允许将 avro 二进制容器文件输出到任意 OutputStream 而不提供从另一端的相应 InputStream 读取它们的方法,但这不是重点)。容器文件阅读器的实现似乎需要具体 File 提供的“可搜索”功能。

好的,所以看起来这种方法不能满足我的要求。创建一个模仿 avro 容器文件的 JSON 响应怎么样?

public static Schema WRAPPER_SCHEMA = Schema.parse(
  "\"type\": \"record\", " +
   "\"name\": \"AvroContainer\", " +
   "\"doc\": \"a JSON avro container file\", " +
   "\"namespace\": \"org.bar.foo\", " +
   "\"fields\": [" +
     "\"name\": \"schema\", \"type\": \"string\", \"doc\": \"schema representing the included data\", " +
     "\"name\": \"data\", \"type\": \"bytes\", \"doc\": \"packet of data represented by the schema\"]"
  );

鉴于上述限制,我不确定这是否是解决此问题的最佳方法,但看起来这可能会奏效。我将把模式(例如,上面的“Schema someSchema”)作为字符串放在“模式”字段中,然后放入适合该模式的记录的 avro-binary-serialized 形式(即“GenericRecord someRecord") 在“数据”字段中。

我实际上想知道下面描述的具体细节,但我认为也值得提供一个更大的背景,这样如果有更好的高级方法我可以采取(这种方法有效,但感觉不太理想)请告诉我。

我的问题是,假设我采用这种基于 JSON 的方法,如何将我的 Record 的 avro 二进制表示写入 AvroContainer 模式的“数据”字段?例如,我到了这里:

ByteArrayOutputStream baos = new ByteArrayOutputStream();
GenericDatumWriter<GenericRecord> datumWriter = new GenericDatumWriter<GenericRecord>(someSchema);
Encoder e = new BinaryEncoder(baos);
datumWriter.write(resultsRecord, e);
e.flush();

GenericRecord someRecord = new GenericData.Record(someSchema);
someRecord.put("schema", someSchema.toString());
someRecord.put("data", ByteBuffer.wrap(baos.toByteArray()));
datumWriter = new GenericDatumWriter<GenericRecord>(WRAPPER_SCHEMA);
JsonGenerator jsonGenerator = new JsonFactory().createJsonGenerator(baos, JsonEncoding.UTF8);
e = new JsonEncoder(WRAPPER_SCHEMA, jsonGenerator);
datumWriter.write(someRecord, e);
e.flush();

PrintWriter printWriter = response.getWriter(); // recall that response is the HttpServletResponse
response.setContentType("text/plain");
response.setCharacterEncoding("UTF-8");
printWriter.print(baos.toString("UTF-8"));

我最初尝试省略 ByteBuffer.wrap 子句,但后来是行

datumWriter.write(someRecord, e);

引发了我无法将字节数组转换为 ByteBuffer 的异常。很公平,看起来当调用 Encoder 类(其中 JsonEncoder 是一个子类)来编写一个 avro Bytes 对象时,它需要一个 ByteBuffer 作为参数。因此,我尝试用 java.nio.ByteBuffer.wrap 封装 byte[],但是当数据被打印出来时,它被打印为一个直接的字节序列,而不是通过 avro 十六进制表示:

"data": "bytes": ".....some gibberish other than the expected format...

这似乎不对。根据 avro 文档,他们给出的示例字节对象说我需要放入一个 json 对象,其示例看起来像“\u00FF”,而我放入的内容显然不是那种格式。我现在想知道的是:

什么是 avro 字节格式的示例?它看起来像“\uDEADBEEFDEADBEEF...”吗? 如何将二进制 avro 数据(由 BinaryEncoder 输出到 byte[] 数组中)强制转换为可以粘贴到 GenericRecord 对象中并在 JSON 中正确打印的格式?例如,我想要一个对象数据,我可以为其调用一些 GenericRecord "someRecord.put("data", DATA);"里面有我的 avro 序列化数据? 当给定文本 JSON 表示并希望重新创建由 AvroContainer 格式 JSON 表示的 GenericRecord 时,我如何将该数据读回另一端(消费者)端的字节数组? (重申之前的问题)有没有更好的方法可以做这一切?

【问题讨论】:

org.apache.avro.file.DataFileStream ? SeekableInput 不仅仅是一些特定于 avro 的自定义表单,其创建最终会从文件中读取。有SeekableByteArrayInput 从内存中的字节数组读取。 非常好的问题 - 需要随机访问的要求非常奇怪,因为如果没有巨大的缓冲区就不可能满足。然而似乎也没有必要这样做......我不知道为什么觉得需要随机访问。许多其他数据格式不添加此类处理要求。 (刚刚发生在这件事上。)我不明白你想要做什么——如果你只是传输一条 Avro 消息(比如在消息队列中),那么正常的写作-to-a-byte-buffer 如果你想要:发送模式,发送数据,都可以恢复。关于你的问题,我错过了什么? Dave - 这是“我想发送数千条相同的记录”的问题 - 为每条记录发送 Avro 消息意味着为每条记录发送架构。容器文件是一种指定的方法,用于发送一次模式,然后是一堆记录。当然,您可以自己执行此操作(如其中一个答案中所述)-但是如果可用,为什么不遵循 Avro 概述的规范呢? 【参考方案1】:

正如 Knut 所说,如果你想使用文件以外的东西,你可以:

使用 SeekableByteArrayInput,正如 Knut 所说,你可以将任何东西塞进字节数组中 以您自己的方式实现 SeekablInput - 例如,如果您从一些奇怪的数据库结构中获取它。 或者只使用一个文件。为什么不呢?

这些是你的答案。

【讨论】:

太棒了,这正是我需要的。 此外,使用文件会增加磁盘 I/O 的开销,因此如果您通过网络接收字节数组,您不希望先将其放入文件然后再读取它(磁盘 I/O 往返!!!)。【参考方案2】:

我解决此问题的方法是将架构与数据分开发送。我设置了一个连接握手,从服务器向下传输模式,然后我来回发送编码数据。您必须像这样创建一个外部包装器对象:

'name':'Wrapper','type':'record','fields':[
  'name':'schemaName','type':'string',
  'name':'records','type':'type':'array','items':'bytes'
]

您首先将记录数组逐个编码为编码字节数组的数组。一个数组中的所有内容都应具有相同的架构。然后使用上述模式对包装器对象进行编码——将“schemaName”设置为用于对数组进行编码的模式的名称。

在服务器上,您将首先解码包装对象。一旦你解码了包装对象,你就知道了 schemaName,并且你有一个你知道如何解码的对象数组——你可以随意使用!

请注意,如果您使用像WebSockets 这样的协议和像Socket.IO 这样的引擎(对于Node.js),Socket.io 为您提供了一个基于通道的浏览器和浏览器之间的通信层。服务器。在这种情况下,只需为每个通道使用特定的模式,在发送之前对每条消息进行编码。当连接启动时,您仍然必须共享模式——但如果您使用WebSockets,这很容易实现。完成后,您在客户端和服务器之间就有了任意数量的强类型双向流。

【讨论】:

虽然不是一个糟糕的解决方案,但它甚至无法解决 OP 提出的问题。【参考方案3】:

在 Java 和 Scala 下,我们尝试通过使用 Scala nitro codegen 生成的代码来使用 inception。 Inception 是 javascript mtth/avsc 库如何解决这个 problem。但是,我们在使用 Java 库时遇到了几个序列化问题,其中始终有错误的字节被注入到字节流中 - 我们无法弄清楚这些字节的来源。

当然,这意味着使用 ZigZag 编码构建我们自己的 Varint 实现。嗯。

这里是:

package com.terradatum.query

import java.io.ByteArrayOutputStream
import java.nio.ByteBuffer
import java.security.MessageDigest
import java.util.UUID

import akka.actor.ActorSystem
import akka.stream.stage._
import akka.stream.Attributes, FlowShape, Inlet, Outlet
import com.nitro.scalaAvro.runtime.GeneratedMessage
import com.terradatum.diagnostics.AkkaLogging
import org.apache.avro.Schema
import org.apache.avro.generic.GenericDatumWriter, GenericRecord
import org.apache.avro.io.EncoderFactory
import org.elasticsearch.search.SearchHit

import scala.collection.mutable.ArrayBuffer
import scala.reflect.ClassTag

/*
* The original implementation of this helper relied exclusively on using the Header Avro record and inception to create
* the header. That didn't work for us because somehow erroneous bytes were injected into the output.
*
* Specifically:
* 1. 0x08 prepended to the magic
* 2. 0x0020 between the header and the sync marker
*
* Rather than continue to spend a large number of hours trying to troubleshoot why the Avro library was producing such
* erroneous output, we build the Avro Container File using a combination of our own code and Avro library code.
*
* This means that Terradatum code is responsible for the Avro Container File header (including magic, file metadata and
* sync marker) and building the blocks. We only use the Avro library code to build the binary encoding of the Avro
* records.
*
* @see https://avro.apache.org/docs/1.8.1/spec.html#Object+Container+Files
*/
object AvroContainerFileHelpers 

  val magic: ByteBuffer = 
    val magicBytes = "Obj".getBytes ++ Array[Byte](1.toByte)
    val mg = ByteBuffer.allocate(magicBytes.length).put(magicBytes)
    mg.position(0)
    mg
  

  def makeSyncMarker(): Array[Byte] = 
    val digester = MessageDigest.getInstance("MD5")
    digester.update(s"$UUID.randomUUID@$System.currentTimeMillis()".getBytes)
    val marker = ByteBuffer.allocate(16).put(digester.digest()).compact()
    marker.position(0)
    marker.array()
  

  /*
  * Note that other implementations of avro container files, such as the javascript library
  * mtth/avsc uses "inception" to encode the header, that is, a datum following a header
  * schema should produce valid headers. We originally had attempted to do the same but for
  * an unknown reason two bytes wore being inserted into our header, one at the very beginning
  * of the header before the MAGIC marker, and one right before the syncmarker of the header.
  * We were unable to determine why this wasn't working, and so this solution was used instead
  * where the record/map is encoded per the avro spec manually without the use of "inception."
  */
  def header(schema: Schema, syncMarker: Array[Byte]): Array[Byte] = 
    def avroMap(map: Map[String, ByteBuffer]): Array[Byte] = 
      val mapBytes = map.flatMap 
        case (k, vBuff) =>
          val v = vBuff.array()
          val byteStr = k.getBytes()
          Varint.encodeLong(byteStr.length) ++ byteStr ++ Varint.encodeLong(v.length) ++ v
      
      Varint.encodeLong(map.size.toLong) ++ mapBytes ++ Varint.encodeLong(0)
    

    val schemaBytes = schema.toString.getBytes
    val schemaBuffer = ByteBuffer.allocate(schemaBytes.length).put(schemaBytes)
    schemaBuffer.position(0)
    val metadata = Map("avro.schema" -> schemaBuffer)
    magic.array() ++ avroMap(metadata) ++ syncMarker
  

  def block(binaryRecords: Seq[Array[Byte]], syncMarker: Array[Byte]): Array[Byte] = 
    val countBytes = Varint.encodeLong(binaryRecords.length.toLong)
    val sizeBytes = Varint.encodeLong(binaryRecords.foldLeft(0)(_+_.length).toLong)

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]()

    buff.append(countBytes:_*)
    buff.append(sizeBytes:_*)
    binaryRecords.foreach  rec =>
      buff.append(rec:_*)
    
    buff.append(syncMarker:_*)

    buff.toArray
  

  def encodeBlock[T](schema: Schema, records: Seq[GenericRecord], syncMarker: Array[Byte]): Array[Byte] = 
    //block(records.map(encodeRecord(schema, _)), syncMarker)
    val writer = new GenericDatumWriter[GenericRecord](schema)
    val out = new ByteArrayOutputStream()
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    records.foreach(record => writer.write(record, binaryEncoder))
    binaryEncoder.flush()
    val flattenedRecords = out.toByteArray
    out.close()

    val buff: ArrayBuffer[Byte] = new scala.collection.mutable.ArrayBuffer[Byte]()

    val countBytes = Varint.encodeLong(records.length.toLong)
    val sizeBytes = Varint.encodeLong(flattenedRecords.length.toLong)

    buff.append(countBytes:_*)
    buff.append(sizeBytes:_*)
    buff.append(flattenedRecords:_*)
    buff.append(syncMarker:_*)

    buff.toArray
  

  def encodeRecord[R <: GeneratedMessage with com.nitro.scalaAvro.runtime.Message[R]: ClassTag](
      entity: R
  ): Array[Byte] =
    encodeRecord(entity.companion.schema, entity.toMutable)

  def encodeRecord(schema: Schema, record: GenericRecord): Array[Byte] = 
    val writer = new GenericDatumWriter[GenericRecord](schema)
    val out = new ByteArrayOutputStream()
    val binaryEncoder = EncoderFactory.get().binaryEncoder(out, null)
    writer.write(record, binaryEncoder)
    binaryEncoder.flush()
    val bytes = out.toByteArray
    out.close()
    bytes
  


/**
  * Encoding of integers with variable-length encoding.
  *
  * The avro specification uses a variable length encoding for integers and longs.
  * If the most significant bit in a integer or long byte is 0 then it knows that no
  * more bytes are needed, if the most significant bit is 1 then it knows that at least one
  * more byte is needed. In signed ints and longs the most significant bit is traditionally
  * used to represent the sign of the integer or long, but for us it's used to encode whether
  * more bytes are needed. To get around this limitation we zig-zag through whole numbers such that
  * negatives are odd numbers and positives are even numbers:
  *
  * i.e. -1, -2, -3 would be encoded as 1, 3, 5, and so on
  * while 1,  2,  3 would be encoded as 2, 4, 6, and so on.
  *
  * More information is available in the avro specification here:
  * @see http://lucene.apache.org/core/3_5_0/fileformats.html#VInt
  *      https://developers.google.com/protocol-buffers/docs/encoding?csw=1#types
  */
object Varint 

  import scala.collection.mutable

  def encodeLong(longVal: Long): Array[Byte] = 
    val buff = new ArrayBuffer[Byte]()
    Varint.zigZagSignedLong(longVal, buff)
    buff.toArray[Byte]
  

  def encodeInt(intVal: Int): Array[Byte] = 
    val buff = new ArrayBuffer[Byte]()
    Varint.zigZagSignedInt(intVal, buff)
    buff.toArray[Byte]
  

  def zigZagSignedLong[T <: mutable.Buffer[Byte]](x: Long, dest: T): Unit = 
    // sign to even/odd mapping: http://code.google.com/apis/protocolbuffers/docs/encoding.html#types
    writeUnsignedLong((x << 1) ^ (x >> 63), dest)
  

  def writeUnsignedLong[T <: mutable.Buffer[Byte]](v: Long, dest: T): Unit = 
    var x = v
    while ((x & 0xFFFFFFFFFFFFFF80L) != 0L) 
      dest += ((x & 0x7F) | 0x80).toByte
      x >>>= 7
    
    dest += (x & 0x7F).toByte
  

  def zigZagSignedInt[T <: mutable.Buffer[Byte]](x: Int, dest: T): Unit = 
    writeUnsignedInt((x << 1) ^ (x >> 31), dest)
  

  def writeUnsignedInt[T <: mutable.Buffer[Byte]](v: Int, dest: T): Unit = 
    var x = v
    while ((x & 0xFFFFF80) != 0L) 
      dest += ((x & 0x7F) | 0x80).toByte
      x >>>= 7
    
    dest += (x & 0x7F).toByte
  

【讨论】:

以上是关于在 Java 中,如何创建等效的 Apache Avro 容器文件,而不必强制使用文件作为媒介?的主要内容,如果未能解决你的问题,请参考以下文章

如何以编程方式创建等效的 SwiftUI 堆栈视图分隔符

如何在Java中对原语进行等效的通过引用传递

使用 Apache spark java 搜索替换

如何在 Java 中生成与 Python 示例等效的 HMAC?

如何在 Java 中生成与 Python 示例等效的 HMAC?

将 cacert 卷曲到 Java HttpClient 等效项