rocketMQ之批处理消息

Posted 一只猪的思考

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketMQ之批处理消息相关的知识,希望对你有一定的参考价值。

批量消息发送

批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送 4MiB 的消息,但是如果需要发送更大的消息,建议将较大的消息分成多个不超过 1MiB 的小消息。

1 发送批量消息

如果你一次只发送不超过 4MiB 的消息,使用批处理很容易:

String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try 
    producer.send(messages);
 catch (Exception e) 
    e.printStackTrace();
    //handle the error

2 拆分

当您发送较大的消息时,复杂性会增加,如果您不确定它是否超过 4MiB的限制。 这时候,您最好将较大的消息分成多个不超过 1MiB 的小消息:

public class ListSplitter implements Iterator<List<Message>>  
    private final int SIZE_LIMIT = 1024 * 1024 * 4;
    private final List<Message> messages;
    private int currIndex;
    public ListSplitter(List<Message> messages)  
        this.messages = messages;
    
    @Override public boolean hasNext() 
        return currIndex < messages.size(); 
    
    @Override public List<Message> next()  
        int startIndex = getStartIndex();
        int nextIndex = startIndex;
        int totalSize = 0;
        for (; nextIndex < messages.size(); nextIndex++) 
            Message message = messages.get(nextIndex); 
            int tmpSize = calcMessageSize(message);
            if (tmpSize + totalSize > SIZE_LIMIT) 
                break; 
             else 
                totalSize += tmpSize; 
            
        
        List<Message> subList = messages.subList(startIndex, nextIndex); 
        currIndex = nextIndex;
        return subList;
    
    private int getStartIndex() 
        Message currMessage = messages.get(currIndex); 
        int tmpSize = calcMessageSize(currMessage); 
        while(tmpSize > SIZE_LIMIT) 
            currIndex += 1;
            Message message = messages.get(curIndex); 
            tmpSize = calcMessageSize(message);
        
        return currIndex; 
    
    private int calcMessageSize(Message message) 
        int tmpSize = message.getTopic().length() + message.getBody().length(); 
        Map<String, String> properties = message.getProperties();
        for (Map.Entry<String, String> entry : properties.entrySet()) 
            tmpSize += entry.getKey().length() + entry.getValue().length(); 
        
        tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes
        return tmpSize; 
    


// then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext()) 
   try 
       List<Message>  listItem = splitter.next();
       producer.send(listItem);
    catch (Exception e) 
       e.printStackTrace();
       // handle the error
   

以上是关于rocketMQ之批处理消息的主要内容,如果未能解决你的问题,请参考以下文章

rocketMQ之延时处理消息

rocketMQ之批处理消息

rocketMQ之批处理消息

rocketMQ之批处理消息

rocketmq之顺序消费

RocketMQ事务消息篇之事务消息源码分析