每次将大约固定大小的字节数组发送到另一个方法

Posted

技术标签:

【中文标题】每次将大约固定大小的字节数组发送到另一个方法【英文标题】:Sending byte array of approximately fixed size everytime to another method 【发布时间】:2017-10-30 19:36:46 【问题描述】:

我有一个方法,它的参数是Partition 枚举。通过传递不同的partition 值,在同一时间段内,多个后台线程(最多 15 个)将调用此方法。这里dataHoldersByPartitionPartitionConcurrentLinkedQueue<DataHolder>ImmutableMap

  private final ImmutableMap<Partition, ConcurrentLinkedQueue<DataHolder>> dataHoldersByPartition;

  //... some code to populate entry in `dataHoldersByPartition` map

  private void validateAndSend(final Partition partition)   
    ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
    Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder = new HashMap<>();
    int totalSize = 0;      
    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll())  != null)       
      byte[] clientKeyBytes = dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8);
      if (clientKeyBytes.length > 255)
        continue;

      byte[] processBytes = dataHolder.getProcessBytes();
      int clientKeyLength = clientKeyBytes.length;
      int processBytesLength = processBytes.length;

      int additionalLength = clientKeyLength + processBytesLength;
      if (totalSize + additionalLength > 50000) 
        Message message = new Message(clientKeyBytesAndProcessBytesHolder, partition);
        // here size of `message.serialize()` byte array should always be less than 50k at all cost
        sendToDatabase(message.getAddress(), message.serialize());
        clientKeyBytesAndProcessBytesHolder = new HashMap<>();
        totalSize = 0;
      
      clientKeyBytesAndProcessBytesHolder.put(clientKeyBytes, processBytes);
      totalSize += additionalLength;
    
    // calling again with remaining values only if clientKeyBytesAndProcessBytesHolder is not empty
    if(!clientKeyBytesAndProcessBytesHolder.isEmpty()) 
        Message message = new Message(partition, clientKeyBytesAndProcessBytesHolder);
        // here size of `message.serialize()` byte array should always be less than 50k at all cost
        sendToDatabase(message.getAddress(), message.serialize());      
    
  

下面是我的Message类:

public final class Message 
  private final byte dataCenter;
  private final byte recordVersion;
  private final Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder;
  private final long address;
  private final long addressFrom;
  private final long addressOrigin;
  private final byte recordsPartition;
  private final byte replicated;

  public Message(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder, Partition recordPartition) 
    this.clientKeyBytesAndProcessBytesHolder = clientKeyBytesAndProcessBytesHolder;
    this.recordsPartition = (byte) recordPartition.getPartition();
    this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
    this.recordVersion = 1;
    this.replicated = 0;
    long packedAddress = new Data().packAddress();
    this.address = packedAddress;
    this.addressFrom = 0L;
    this.addressOrigin = packedAddress;
  

  // Output of this method should always be less than 50k always
  public byte[] serialize() 
    // 36 + dataSize + 1 + 1 + keyLength + 8 + 2;
    int bufferCapacity = getBufferCapacity(clientKeyBytesAndProcessBytesHolder);

    ByteBuffer byteBuffer = ByteBuffer.allocate(bufferCapacity).order(ByteOrder.BIG_ENDIAN);
    // header layout
    byteBuffer.put(dataCenter).put(recordVersion).putInt(clientKeyBytesAndProcessBytesHolder.size())
        .putInt(bufferCapacity).putLong(address).putLong(addressFrom).putLong(addressOrigin)
        .put(recordsPartition).put(replicated);

    // data layout
    for (Map.Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) 
      byte keyType = 0;
      byte[] key = entry.getKey();
      byte[] value = entry.getValue();
      byte keyLength = (byte) key.length;
      short valueLength = (short) value.length;

      ByteBuffer dataBuffer = ByteBuffer.wrap(value);
      long timestamp = valueLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();

      byteBuffer.put(keyType).put(keyLength).put(key).putLong(timestamp).putShort(valueLength)
          .put(value);
    
    return byteBuffer.array();
  

  private int getBufferCapacity(Map<byte[], byte[]> clientKeyBytesAndProcessBytesHolder) 
    int size = 36;
    for (Entry<byte[], byte[]> entry : clientKeyBytesAndProcessBytesHolder.entrySet()) 
      size += 1 + 1 + 8 + 2;
      size += entry.getKey().length;
      size += entry.getValue().length;
    
    return size;
  

    // getters and to string method here

基本上,我必须确保在validateAndSend 方法中调用sendToDatabase 方法时,message.serialize() 字节数组的大小应始终小于 50k。我的sendToDatabase 方法发送来自serialize 方法的字节数组。因此,例如,如果我在 dataHolders CLQ 中有 60k 条记录,那么我将在 validateAndSend 方法中发送两个块:

首先,我将创建一个大约小于 50k 的字节数组(意味着来自 message.serialize() 的字节数组小于 50k)并在其上调用 sendToDatabase 方法。 其次,我会调用sendToDatabase方法进行remiing part。

为了完成上述任务,我在 validateAndSend 方法中使用了 totalSize 变量,该变量试图测量 50k 大小,但看起来我的方法可能不正确,我可能会丢弃一些记录或每次发送超过 50k猜测?

看起来我的Message 类知道clientKeyBytesAndProcessBytesHolder 映射,我可以使用此映射通过调用getBufferCapacity 方法来准确定义大小,如果大约小于50k 则调用sendToDatabase 方法?

【问题讨论】:

我想我可能有一个解决方案,但我需要你确认我对你正在尝试做的事情的理解是正确的,并且我没有遗漏任何重要部分: - 每个分区都有一个队列存储键值对的位置; - 单个线程处理每个队列; - 键值对以序列化的形式传输到大小不超过 50K 的字节数组中; - 一旦数组达到容量,它就会被发送到数据库,并开始为新数组收集字节。 整个代码看起来还有一些你没有告诉的限制和方面。否则代码过于复杂。例如。仅仅为了能够序列化而为每个垃圾数据创建一个消息实例是有味道的,因为序列化的结果首先提供了对正确实例化的限制。这可能最好通过clientKeyBytesAndProcessBytesHolder 上的循环来提供服务,该循环构建(序列化)缓冲区直到大小限制,然后将缓冲区刷新到sendToDatabase,直到所有内容都已发送。 @yegodm 是的,这正是我正在做的事情。 @rpy 我不确定你在这里谈论什么样的限制。让我知道有哪些不清楚的地方,我会尝试相应地改进我的问题。 代码是否无法按照您的预期工作,或者您是否在寻求改进代码结构的提示? 【参考方案1】:

您可以通过对职责进行排序来获得更简洁的代码。 目前,Message 类负责将 DataHolder 项转换为序列化形式。但也期望 t 确保满足大小限制。不幸的是,调用方法正在检查大小预期,而不知道 Message 类的大小要求。

我建议将正确的垃圾数据发送到 Message 类,从而将“关于正确数据垃圾格式的知识”删除到 Message 类本身。

您可能还注意到,当前的实现考虑了每个项目的完整标题大小,而每个 serialize() 只添加一次标题

请在下面找到建议改进的草图。代码需要进一步完善。但它主要用于说明结构和可读性/可维护性方面的基本改进。

为了将sendToDatabase() 功能与Message 类隔离开来,我只是添加了一个简单的接口:

// decoupling the sending logic from the formatting
// if external requirements suggest linking such functionality into the message class
// such interface would be unnecessary
public interface DatabaseDelivery 
    void sendToDatabase(long addres, byte[] messagePayload);

消息类更改为处理垃圾邮件和大小限制。现在是Closeable,表示您最后应该调用close()。 (因此您可以考虑在当前版本的 java 中使用适当的构造)

public final class Message implements Closeable 
    // or initialize it from some external source if this might change dynamically
    private static final int MAX_SIZE = 50000;
    // better determine this in sync with addHeader() method
    private static final int HEADER_SIZE = 36;

    private final byte dataCenter;
    private final byte recordVersion;
    private final long address;
    private final long addressFrom;
    private final long addressOrigin;
    private final byte recordsPartition;
    private final byte replicated;
    private final DatabaseDelivery delivery;
    private final ByteBuffer itemBuffer = ByteBuffer.allocate(MAX_SIZE);
    private int pendingItems = 0;

    public Message(final Partition recordPartition, final DatabaseDelivery databaseDelivery) 
        this.recordsPartition = (byte) recordPartition.getPartition();
        this.dataCenter = Utils.CURRENT_LOCATION.get().datacenter();
        this.recordVersion = 1;
        this.replicated = 0;
        final long packedAddress = new Data().packAddress();
        this.address = packedAddress;
        this.addressFrom = 0L;
        this.addressOrigin = packedAddress;
        this.delivery = databaseDelivery;
    

    private void addHeader(final ByteBuffer buffer, final int items) 
        buffer.put(dataCenter)
              .put(recordVersion)
              .putInt(items)
              .putInt(buffer.capacity())
              .putLong(address)
              .putLong(addressFrom)
              .putLong(addressOrigin)
              .put(recordsPartition)
              .put(replicated);
    

    private void sendData() 
        if (itemBuffer.position() == 0) 
            // no data to be sent
            //Properties: itemBuffer serialized size == 0
            return;
        
        final ByteBuffer buffer = ByteBuffer.allocate(MAX_SIZE);
        addHeader(buffer, pendingItems);
        itembuffer.flip();
        buffer.put(itemBuffer);
        delivery.sendToDatabase(address, Arrays.copyOf(buffer.array(),buffer.position());
        itemBuffer.clear();
        pendingItems = 0;
        //Properties: itemBuffer serialized size == 0                
    

    public void addAndSendJunked(final byte[] key, final byte[] data) 
        if (key.length > 255) 
            return;
        
        if (data.length > 255) 
            return;
        
        final byte keyLength = (byte) key.length;
        final byte dataLength = (byte) data.length;

        final int additionalSize = dataLength + keyLength + 1 + 1 + 8 + 2;
        final int newSize = itemBuffer.position() + additionalSize;
        //Properties: itemBuffer serialized size < MAX
        if (newSize >= (MAX_SIZE-HEADER_SIZE)) 
            sendData();
        
        if (additionalSize > (MAX_SIZE-HEADER_SIZE)) 
            //XXX Use exception that is appropriate for your application
            //XXX You might add sizes involved for ease of analysis
            throw new AppConfigurationException("Size of single item exceeds maximum size");
        
        //Properties: itemBuffer size (old+new or new) < MAX 

        final ByteBuffer dataBuffer = ByteBuffer.wrap(data);
        final long timestamp = dataLength > 10 ? dataBuffer.getLong(2) : System.currentTimeMillis();
        // data layout
        itemBuffer.put((byte) 0).put(keyLength).put(key).putLong(timestamp).putShort(dataLength).put(data);
        pendingItems++ ;

//属性:itemBuffer size

    @Override
    public void close() 
        if (pendingItems > 0) 
            sendData();
        
    

最后你的调用代码会变成:

private void validateAndSend(final Partition partition) 
    final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);

    // the instance providing sendToDatabase() method
    // just for cutting off details external to the discussion
    final DatabaseDelivery delivery = this;
    final Message message = new Message(partition, this);

    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll()) != null) 
        // XXX: why is client key using explicit encoding while process bytes is not?
        message.addAndSendJunked(dataHolder.getClientKey().getBytes(StandardCharsets.UTF_8), dataHolder.getProcessBytes());
    
    message.close();

请注意,我在可能需要注意的地方添加了一些标记 (XXX)。 (但是,这些可以从所提供的外部信息中解释)

还有一些细节可以考虑。 例如。我不相信使用 ByteBuffer 是给定用例的正确集合(在大多数地方)。

编辑: 关于测试,由于代码体积小,您可能会考虑应用形式验证(至少部分)。这类似于现代编译器对静态代码分析所做的:你遍历你的代码(用纸和铅笔)并派生在那个地方持有的属性。我在上面的代码中添加了 cmets(标记为 //Properties),以说明您可能会这样做。 (注意:这是一个简单的说明,肯定需要为每个语句派生和完成更多属性)。我只是对结果缓冲区大小做了一些最小的归因。 (使用MAX' as placeholder for the maximum acceptable size of the item part of the final buffer, akaMAX_SIZE-HEADER_SIZE`)。

当然,人们可能会(正确地)建议为关键案例编写测试。在这种情况下,这将是白盒测试。在(已知)实现的极端情况下测试代码的正确功能。您还需要进行黑盒测试来测试您的代码相对于规范的行为。

您还可以添加运行时检查以确保关键部分的正确行为。例如。在执行sendToDatabase() 时,您可以检查最大尺寸要求。然而,这样的测试需要适当的输入来合理化正确的行为。使用通过静态分析从代码派生的属性,可以提供良好行为的证明,而最终不会怀疑没有找到会导致失败的一个测试用例。

【讨论】:

感谢您的详细解答。我为具有validateAndSend 方法的Processor 类创建了一个gist。只是为了向您展示我是如何使用这些类的。很多线程会同时调用addHolderByPartition方法来填充与分区对应的dataHolder,然后我在每个分区的同一个Processor类中有一个后台线程,它调用与每个分区对应的validateAndSend方法。 现在知道了我的Processor 类,您还建议进行哪些其他更改以使此代码更加优美?我认为我的处理器代码会让您了解我的代码的结构以及如果需要我们可以做些什么来改进它。 正如我已经提到的,ByteBuffer 不是与所示代码最佳匹配的数据结构。但是,也许sendToDatabase 的实施方式可以从该选择中获利。另一个是标题。它具有在填充缓冲区后恰好已知的包中项目的数量。 (可能是导致代码复杂的原因)。因此,您可以创建标题并在最后修补正确的值。 您的Processor 类起初Message 使用大量(常量)值,感觉就像配置数据。但没有包括这方面的解决方案。 使用ConcurrentLinkedQueue,数据创建线程和处理器之间的通信非常直接。它将允许任意数量(在可用资源范围内)的线程。而且您的代码甚至可以与处理相同分区的多个线程一起使用。 (但是,接收方可能需要一些注意。但是,正确使用数据库事务可能会这样做)。【参考方案2】:

所以这是我的尝试(这个问题最好向代码审查社区提出,但无论如何)。它依赖于对Message 的一些设计更改,因此它变得更像Builder 模式。缓冲区成为消息的一部分。它的占用是通过对BufferOverflowException 异常做出反应来控制的。一旦发生,缓冲区将回滚到最后一个成功添加的结果,分配新消息,并重试添加相同的数据。缓冲区完成后,记录总数和总大小将写入标头,并将整个缓冲区转储到字节数组(我可能会尝试避免这种额外的转换并直接在 sendToDatabase 中对缓冲区进行操作,但那是暂时超出范围):

// TODO: structure has been adjusted for testing purposes
enum Partition

    A(0x1);

    private final int _partition;

    int getPartition()
    
        return _partition;
    

    Partition(final int partition)
    
        _partition = partition;
    


// TODO: structure has been adjusted for testing purposes
final static class DataHolder

    private final String _clientKey;
    private final byte[] _processBytes;

    public DataHolder(
        final String clientKey,
        final String value)
    
        _clientKey = clientKey;
        byte[] valueBytes = value.getBytes();
        // simulate payload including extra bytes for the header
        final ByteBuffer buffer = ByteBuffer.allocate(4 + 8 + valueBytes.length)
                                            .order(ByteOrder.BIG_ENDIAN);
        buffer.putInt(0).putLong(System.currentTimeMillis()).put(valueBytes);
        _processBytes = readToBytes(buffer);
    

    String getClientKey()
    
        return _clientKey;
    

    byte[] getProcessBytes()
    
        return _processBytes;
    


// API has been changed to something more like the Builder pattern
final static class Message

    private final long address;
    private final long addressFrom;
    private final long addressOrigin;
    private final byte recordsPartition;
    private final byte replicated;
    private final ByteBuffer buffer;
    private final int writeStatsPosition;
    private int payloadCount;

    Message(Partition recordPartition, int sizeLimit)
    
        this.recordsPartition = (byte) recordPartition.getPartition();
        this.replicated = 0;
        // TODO: temporarily replaced with a hard-coded constant
        long packedAddress = 123456789L;
        this.address = packedAddress;
        this.addressFrom = 0L;
        this.addressOrigin = packedAddress;
        buffer = ByteBuffer.allocate(sizeLimit).order(ByteOrder.BIG_ENDIAN);
        // TODO: temporarily replaced with a hard-coded constant
        byte dataCenter = 0x1;
        byte recordVersion = 1;
        buffer.put(dataCenter).put(recordVersion);
        writeStatsPosition = buffer.position();
        buffer.putInt(datacenter).putInt(recordVersion);
        buffer.putLong(address).putLong(addressFrom).putLong(addressOrigin)
                  .put(recordsPartition).put(replicated);
    

    /**
     * Tries to add another pair of client key and process bytes to
     * the current message. Returns true if successfully added, false -
     * if the data cannot be accommodated due to message binary size limit.
     */
    boolean add(byte[] key, byte[] value)
    
        try
        
            byte keyType = 0;
            byte keyLength = (byte) key.length;
            short valueLength = (short) value.length;
            ByteBuffer valueAsBuffer = ByteBuffer.wrap(value);
            long timestamp = valueAsBuffer.capacity() > 10 ? valueAsBuffer.getLong(2) : System.currentTimeMillis();
            payloadCount++;
            // remember position in the buffer to roll back to in case of overflow
            buffer.mark();
            buffer.put(keyType).put(keyLength).put(key);
            buffer.putLong(timestamp).putShort(valueLength).put(value);

            return true;
        
        catch (BufferOverflowException e)
        
            payloadCount--;
            buffer.reset();
            return false;
        
    

    byte[] serialize()
    
        int finalPosition = buffer.position();
        // adjust the message header with the totals
        buffer.putInt(writeStatsPosition, payloadCount)
              .putInt(writeStatsPosition + 4, finalPosition);
        return readToBytes(buffer);
    


static void validateAndSend(final Partition partition, final Supplier<Message> messageFactory)
    throws InterruptedException

    final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(partition);
    Message message = messageFactory.get();
    DataHolder dataHolder;
    while ((dataHolder = dataHolders.poll()) != null)
    
        final byte[] keyBytes = dataHolder.getClientKey()
                                    .getBytes(StandardCharsets.UTF_8);
        final int keyLength = keyBytes.length;
        if (keyLength > 255)
        
            continue;
        

        while (!message.add(keyBytes, dataHolder.getProcessBytes()))
        
            // TODO: consider proper handling of the case when the buffer size is too small to accept even a single pair
            Preconditions.checkState(message.payloadCount > 0,
                "buffer size too small to accommodate payload");
            final byte[] serializedMessage = message.serialize();
            // TODO: makes sense to introduce a message consumer interface and call it here instead of sendToDatabase() - simplifies testing
            sendToDatabase(message.address, serializedMessage);
            message = messageFactory.get();
        
    
    if (message.payloadCount > 0)
    
        byte[] serializedMessage = message.serialize();
        sendToDatabase(message.address, serializedMessage);
    


static void sendToDatabase(long address, byte[] serializedMessage)

    // TODO: added simulating activity
    System.out.printf("Sending %d bytes to %d: %s%n",
        serializedMessage.length, address, DatatypeConverter.printHexBinary(serializedMessage));


static byte[] readToBytes(ByteBuffer buffer)

    buffer.flip();
    byte[] bytes = new byte[buffer.remaining()];
    buffer.get(bytes);
    return bytes;


public static void main(String[] args)
    throws ExecutionException, InterruptedException

    // TODO: using small value for testing - must be set to 50K in real case
    final int maxMessageSize = 80;
    final Supplier<Message> messageFactory = new Supplier<Message>()
    
        @Override
        public Message get()
        
            return new Message(Partition.A, maxMessageSize);
        
    ;

    final ConcurrentLinkedQueue<DataHolder> dataHolders = dataHoldersByPartition.get(Partition.A);
    dataHolders.add(new DataHolder("0000000001", "alpha"));
    dataHolders.add(new DataHolder("0000000002", "bravo"));
    dataHolders.add(new DataHolder("0000000003", "charlie"));
    dataHolders.add(new DataHolder("0000000004", "delta"));
    dataHolders.add(new DataHolder("0000000005", "echo"));
    dataHolders.add(new DataHolder("0000000006", "foxtrot"));

    validateAndSend(Partition.A, messageFactory);

【讨论】:

以上是关于每次将大约固定大小的字节数组发送到另一个方法的主要内容,如果未能解决你的问题,请参考以下文章

如何在 C++ 中将字节数组发送到另一个进程

计算数组的位/字节大小

无法将固定大小的字节数组从结构复制到 C# 结构中的另一个数组

C语言中计算数组长度的方法是啥

什么时候用字节数组,什么时候用流?

将 long 转换为字节数组并将其添加到另一个数组