以一种有效的方式使用 ByteBuffer 将标题和数据布局打包在一个字节数组中?

Posted

技术标签:

【中文标题】以一种有效的方式使用 ByteBuffer 将标题和数据布局打包在一个字节数组中?【英文标题】:Pack header and data layout in one byte array using ByteBuffer in an efficient way? 【发布时间】:2017-01-16 23:37:46 【问题描述】:

我有一个标题和数据,我需要在一个Byte Array 中表示它们。而且我有一种特殊的格式可以将标头打包到Byte Array 中,还有一种不同的格式可以将数据打包到Byte Array 中。在我拥有这两个之后,我需要从中制作一个最终的Byte Array

下面是在C++ 中定义的布局,因此我必须在Java 中做。

// below is my header offsets layout

// addressedCenter must be the first byte
static constexpr uint32_t  addressedCenter      = 0;
static constexpr uint32_t  version              = addressedCenter + 1;
static constexpr uint32_t  numberOfRecords      = version + 1;
static constexpr uint32_t  bufferUsed           = numberOfRecords + sizeof(uint32_t);
static constexpr uint32_t  location             = bufferUsed + sizeof(uint32_t);
static constexpr uint32_t  locationFrom         = location + sizeof(CustomerAddress);
static constexpr uint32_t  locationOrigin       = locationFrom + sizeof(CustomerAddress);
static constexpr uint32_t  partition            = locationOrigin + sizeof(CustomerAddress);
static constexpr uint32_t  copy                 = partition + 1;

// this is the full size of the header
static constexpr uint32_t headerOffset = copy + 1;

CustomerAddressuint64_t 的typedef,它是这样组成的-

typedef uint64_t   CustomerAddress;

void client_data(uint8_t datacenter, 
                 uint16_t clientId, 
                 uint8_t dataId, 
                 uint32_t dataCounter,
                 CustomerAddress& customerAddress)

    customerAddress = (uint64_t(datacenter) << 56)
                    + (uint64_t(clientId) << 40)
                    + (uint64_t(dataId) << 32)
                    + dataCounter;

下面是我的数据布局 -

// below is my data layout -
//
// key type - 1 byte
// key len - 1 byte
// key (variable size = key_len)
// timestamp (sizeof uint64_t)
// data size (sizeof uint16_t)
// data (variable size = data size)

问题陈述:-

现在对于项目的一部分,我试图在 Java 中的一个特定类中表示整体内容,这样我就可以传递必要的字段,它可以让我从中得到一个最终的Byte Array,它将有标题首先是数据:

下面是我的DataFrame类:

public final class DataFrame 
  private final byte addressedCenter;
  private final byte version;
  private final Map<byte[], byte[]> keyDataHolder;
  private final long location;
  private final long locationFrom;
  private final long locationOrigin;
  private final byte partition;
  private final byte copy;

  public DataFrame(byte addressedCenter, byte version,
      Map<byte[], byte[]> keyDataHolder, long location, long locationFrom,
      long locationOrigin, byte partition, byte copy) 
    this.addressedCenter = addressedCenter;
    this.version = version;
    this.keyDataHolder = keyDataHolder;
    this.location = location;
    this.locationFrom = locationFrom;
    this.locationOrigin = locationOrigin;
    this.partition = partition;
    this.copy = copy;
  

  public byte[] serialize() 
    // All of the data is embedded in a binary array with fixed maximum size 70000
    ByteBuffer byteBuffer = ByteBuffer.allocate(70000);
    byteBuffer.order(ByteOrder.BIG_ENDIAN);

    int numOfRecords = keyDataHolder.size();
    int bufferUsed = getBufferUsed(keyDataHolder); // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;

    // header layout
    byteBuffer.put(addressedCenter); // byte
    byteBuffer.put(version); // byte
    byteBuffer.putInt(numOfRecords); // int
    byteBuffer.putInt(bufferUsed); // int
    byteBuffer.putLong(location); // long
    byteBuffer.putLong(locationFrom); // long
    byteBuffer.putLong(locationOrigin); // long
    byteBuffer.put(partition); // byte
    byteBuffer.put(copy); // byte

    // now the data layout
    for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) 
      byte keyType = 0;
      byte keyLength = (byte) entry.getKey().length;
      byte[] key = entry.getKey();
      byte[] data = entry.getValue();
      short dataSize = (short) data.length;

      ByteBuffer dataBuffer = ByteBuffer.wrap(data);
      long timestamp = 0;

      if (dataSize > 10) 
        timestamp = dataBuffer.getLong(2);              
             

      byteBuffer.put(keyType);
      byteBuffer.put(keyLength);
      byteBuffer.put(key);
      byteBuffer.putLong(timestamp);
      byteBuffer.putShort(dataSize);
      byteBuffer.put(data);
    
    return byteBuffer.array();
  

  private int getBufferUsed(final Map<byte[], byte[]> keyDataHolder) 
    int size = 36;
    for (Map.Entry<byte[], byte[]> entry : keyDataHolder.entrySet()) 
      size += 1 + 1 + 8 + 2;
      size += entry.getKey().length;
      size += entry.getValue().length;
    
    return size;
    

下面是我如何使用上面的DataFrame 类:

  public static void main(String[] args) throws IOException 
    // header layout
    byte addressedCenter = 0;
    byte version = 1;

    long location = packCustomerAddress((byte) 12, (short) 13, (byte) 32, (int) 120);
    long locationFrom = packCustomerAddress((byte) 21, (short) 23, (byte) 41, (int) 130);
    long locationOrigin = packCustomerAddress((byte) 21, (short) 24, (byte) 41, (int) 140);

    byte partition = 3;
    byte copy = 0;

    // this map will have key as the actual key and value as the actual data, both in byte array
    // for now I am storing only two entries in this map
    Map<byte[], byte[]> keyDataHolder = new HashMap<byte[], byte[]>();
    for (int i = 1; i <= 2; i++) 
      keyDataHolder.put(generateKey(), getMyData());
    

    DataFrame records =
        new DataFrame(addressedCenter, version, keyDataHolder, location, locationFrom,
            locationOrigin, partition, copy);

    // this will give me final packed byte array
    // which will have header and data in it.
    byte[] packedArray = records.serialize();
  

  private static long packCustomerAddress(byte datacenter, short clientId, byte dataId,
      int dataCounter) 
    return ((long) (datacenter) << 56) | ((long) clientId << 40) | ((long) dataId << 32)
        | ((long) dataCounter);
     

正如您在我的DataFrame 类中看到的,我正在分配ByteBuffer,预定义大小为70000。有没有更好的方法可以分配我在制作ByteBuffer 时使用的大小,而不是使用硬编码的70000

与我正在做的将我的标题和数据打包在一个字节数组中相比,还有什么更好的方法吗?我还需要确保它是线程安全的,因为它可以被多个线程调用。

【问题讨论】:

byteBuffer 在多线程上下文中不应该是静态的。 【参考方案1】:

在制作ByteBuffer而不是使用硬编码的70000时,有没有更好的方法可以分配我正在使用的大小?

至少有两种不重叠的方法。你可以同时使用。

一个是缓冲池。您应该找出在高峰期需要多少缓冲区,并使用高于它的最大值,例如最大值 + 最大值 / 2,最大值 + 平均值,最大值 + 众数,2 * 最大值。

import java.nio.ByteBuffer;
import java.nio.ByteOrder;
import java.util.concurrent.CompletionStage;
import java.util.concurrent.LinkedBlockingDeque;
import java.util.function.Consumer;
import java.util.function.Function;

public class ByteBufferPool 
    private final int bufferCapacity;
    private final LinkedBlockingDeque<ByteBuffer> queue;

    public ByteBufferPool(int limit, int bufferCapacity) 
        if (limit < 0) throw new IllegalArgumentException("limit must not be negative.");
        if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative.");

        this.bufferCapacity = bufferCapacity;
        this.queue = (limit == 0) ? null : new LinkedBlockingDeque<>(limit);
    

    public ByteBuffer acquire() 
        ByteBuffer buffer = (queue == null) ? null : queue.pollFirst();
        if (buffer == null) 
            buffer = ByteBuffer.allocate(bufferCapacity);
        
        else 
            buffer.clear();
            buffer.order(ByteOrder.BIG_ENDIAN);
        
        return buffer;
    

    public boolean release(ByteBuffer buffer) 
        if (buffer == null) throw new IllegalArgumentException("buffer must not be null.");
        if (buffer.capacity() != bufferCapacity) throw new IllegalArgumentException("buffer has unsupported capacity.");
        if (buffer.isDirect()) throw new IllegalArgumentException("buffer must not be direct.");
        if (buffer.isReadOnly()) throw new IllegalArgumentException("buffer must not be read-only.");

        return (queue == null) ? false : queue.offerFirst(buffer);
    

    public void withBuffer(Consumer<ByteBuffer> action) 
        if (action == null) throw new IllegalArgumentException("action must not be null.");

        ByteBuffer buffer = acquire();
        try 
            action.accept(buffer);
        
        finally 
            release(buffer);
        
    

    public <T> T withBuffer(Function<ByteBuffer, T> function) 
        if (function == null) throw new IllegalArgumentException("function must not be null.");

        ByteBuffer buffer = acquire();
        try 
            return function.apply(buffer);
        
        finally 
            release(buffer);
        
    

    public <T> CompletionStage<T> withBufferAsync(Function<ByteBuffer, CompletionStage<T>> asyncFunction) 
        if (asyncFunction == null) throw new IllegalArgumentException("asyncFunction must not be null.");

        ByteBuffer buffer = acquire();
        CompletionStage<T> future = null;
        try 
            future = asyncFunction.apply(buffer);
        
        finally 
            if (future == null) 
                release(buffer);
            
            else 
                future = future.whenComplete((result, throwable) -> release(buffer));
            
        
        return future;
    

withBuffer 方法允许直接使用池,而acquirerelease 允许分离获取点和释放点。

另一个是分离序列化接口,例如putputIntputLong,然后您可以在其中实现字节计数类和实际的字节缓冲类。您应该向此类接口添加一个方法以了解序列化程序是否正在计算字节数或缓冲,以避免不必要的字节生成,以及另一种直接增加字节使用量的方法,这在计算某些编码中的字符串大小而不实际序列化时很有用.

public interface ByteSerializer 
    ByteSerializer put(byte value);

    ByteSerializer putInt(int value);

    ByteSerializer putLong(long value);

    boolean isSerializing();

    ByteSerializer add(int bytes);

    int position();

 

public class ByteCountSerializer implements ByteSerializer 
    private int count = 0;

    @Override
    public ByteSerializer put(byte value) 
        count += 1;
        return this;
    

    @Override
    public ByteSerializer putInt(int value) 
        count += 4;
        return this;
    

    @Override
    public ByteSerializer putLong(long value) 
        count += 8;
        return this;
    

    @Override
    public boolean isSerializing() 
        return false;
    

    @Override
    public ByteSerializer add(int bytes) 
        if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative.");

        count += bytes;
        return this;
    

    @Override
    public int position() 
        return count;
    

 

import java.nio.ByteBuffer;

public class ByteBufferSerializer implements ByteSerializer 
    private final ByteBuffer buffer;

    public ByteBufferSerializer(int bufferCapacity) 
        if (bufferCapacity < 0) throw new IllegalArgumentException("bufferCapacity must not be negative.");

        this.buffer = ByteBuffer.allocate(bufferCapacity);
    

    @Override
    public ByteSerializer put(byte value) 
        buffer.put(value);
        return this;
    

    @Override
    public ByteSerializer putInt(int value) 
        buffer.putInt(value);
        return this;
    

    @Override
    public ByteSerializer putLong(long value) 
        buffer.putLong(value);
        return this;
    

    @Override
    public boolean isSerializing() 
        return true;
    

    @Override
    public ByteSerializer add(int bytes) 
        if (bytes < 0) throw new IllegalArgumentException("bytes must not be negative.");

        for (int b = 0; b < bytes; b++) 
            buffer.put((byte)0);
        
        return this;
        // or throw new UnsupportedOperationException();
    

    @Override
    public int position() 
        return buffer.position();
    

    public ByteBuffer buffer() 
        return buffer;
    

在您的代码中,您会按照以下方式做一些事情(未经测试):

ByteCountSerializer counter = new ByteCountSerializer();
dataFrame.serialize(counter);
ByteBufferSerializer serializer = new ByteByfferSerializer(counter.position());
dataFrame.serialize(serializer);
ByteBuffer buffer = serializer.buffer();
// ... write buffer, ?, profit ...

您的DataFrame.serialize 方法应该被重构以接受ByteSerializer,并且在它会生成数据的情况下,它应该检查isSerializing 以了解它是否应该只计算大小或实际写入字节。

我把这两种方法结合起来作为练习,主要是因为这在很大程度上取决于你决定如何去做。

例如,您可以让 ByteBufferSerializer 直接使用池并保留任意容量(例如您的 70000),您可以按容量池化 ByteBuffers(但不是所需容量,而是使用 2 的最小幂大于需要的容量,并在从acquire返回之前设置缓冲区的限制),或者你可以直接池ByteBufferSerializers,只要你添加一个reset()方法。

与我正在做的将我的标头和数据打包在一个字节数组中相比,还有什么更好的方法吗?

是的。传递字节缓冲实例,而不是让某些方法返回字节数组,这些字节数组在检查长度或复制内容后立即被丢弃。

我还需要确保它是线程安全的,因为它可以被多个线程调用。

只要每个缓冲区只被一个线程使用,并且有适当的同步,您就不必担心。

正确的同步意味着您的池管理器在其方法中具有获取和释放语义,并且如果多个线程在从池中获取缓冲区和将其返回到池之间使用缓冲区,则您将在停止使用的线程中添加释放语义缓冲区并在开始使用缓冲区的线程中添加获取语义。例如,如果您通过CompletableFutures 传递缓冲区,则不必担心这一点,或者如果您使用ExchangerBlockingQueue 的正确实现在线程之间进行显式通信。

来自java.util.concurrent的包描述:

java.util.concurrent 及其子包中所有类的方法将这些保证扩展到更高级别的同步。特别是:

在将对象放入任何并发集合之前线程中的操作happen-before 在另一个线程中从集合中访问或删除该元素之后的操作。

在将Runnable 提交给Executor 之前线程中的操作发生之前它的执行开始。对于Callables 提交给ExecutorService 也是如此。

在另一个线程中通过Future.get() 检索结果之后,由Future 表示的异步计算所采取的操作happen-before 操作。

李>

“释放”同步器方法之前的操作,例如Lock.unlockSemaphore.releaseCountDownLatch.countDown happen-before 成功“获取”方法之后的操作,例如 @ 987654356@、Semaphore.acquireCondition.awaitCountDownLatch.await 在另一个线程中的同一个同步器对象上。

对于通过Exchanger 成功交换对象的每对线程,每个线程中exchange() 之前的操作发生之前 相应exchange() 之后的操作另一个线程。

调用CyclicBarrier.awaitPhaser.awaitAdvance(及其变体)之前的操作happen-before 屏障操作执行的操作,以及屏障操作执行的操作happen-before 在其他线程中从相应的await 成功返回之后的操作。

【讨论】:

感谢您的建议。您能否为我的回答中的前两个建议提供一个示例,以便我更好地理解?现在我很困惑这将如何工作。 好的,我添加了示例。 我会非常诚实.. 我能够从你的例子中掌握一些部分。但我无法理解如何在我的代码中使用您的建议。您提到您将把它作为一个练习留给我,恐怕我不确定我将如何与我的集成。我一直使用非常低级别的字节缓冲区 API 并处理字节,所以请原谅我对此的无知。我必须学习所有这些东西的样子。如果您可以帮助我提供一个示例,它将如何与我的集成,那么它将有很大帮助。 我作为一个练习离开了你将同时进行池和仅计数序列化程序的部分。尽管大多数高性能服务器代码最终会池化缓冲区,但我建议您分析并收集任何证据表明您的缓冲区不是短命的,因为如果它们是短命的,那么 GC 会很好地从最年轻的一代中清除它们( s) 和池化意味着开销。 老实说,如果即使在示例之后您仍然只了解一部分并且看不到如何集成到您的代码中,那么您真的需要 Java 培训。这些示例非常基本,直接针对主题,并且大部分是样板文件,如果您使用LinkedBlockingDeque 而不是ConcurrentLinkedDeque 来摆脱limitcount 字段,因为LinkedBlockingDeque 可以有一个限制和LinkedBlockingDeque.size() 是 O(1)。【参考方案2】:

另一种方法是通过DataOutputStream 围绕ByteArrayOutputStream,但您应该将性能调整集中在需要的地方,这不是其中之一。效率在这里不是任何问题。网络 I/O 将以数量级占主导地位。

使用ByteArrayOutputStream 的另一个原因是您不必提前猜测缓冲区大小:它会根据需要增长。

为了保持线程安全,请仅使用局部变量。

【讨论】:

即使网络 I/O 的效率降低了几个数量级,如果您要实现的服务器必须同时处理数百万个活动连接,该怎么办?缓冲区分配(和复制)是用 Java 和 .NET 实现的此类服务器中性能的罪魁祸首之一。 @acelent 那么你需要大量的 CPU 能力。但是除了预先分配了 7000 字节之外,OP 的代码并没有什么特别低效的地方。

以上是关于以一种有效的方式使用 ByteBuffer 将标题和数据布局打包在一个字节数组中?的主要内容,如果未能解决你的问题,请参考以下文章

Kotlin:如何将两个语句“应用”和“让”以一种很好的方式放在一起?

将异常的堆栈跟踪转换为 byte[] 数组或 ByteBuffer (Java) 是不是更有效?

Mudblazor DatePicker 绑定仅以一种方式工作

在一个类中设置一个函数,该函数将以一种可以在未来函数中引用的方式读取 csv 数据

如何以有效的方式同步列表和可枚举?

如何使用 Spring 以一种宁静的方式过滤数据?