rocketMQ之批处理消息
Posted 一只猪的思考
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了rocketMQ之批处理消息相关的知识,希望对你有一定的参考价值。
批量消息发送
批量消息发送能够提高发送效率,提升系统吞吐量。同一批批量消息的topic、waitStoreMsgOK属性必须保持一致,批量消息不支持延迟消息。批量消息发送一次最多可以发送 4MiB 的消息,但是如果需要发送更大的消息,建议将较大的消息分成多个不超过 1MiB 的小消息。
1 发送批量消息
如果你一次只发送不超过 4MiB 的消息,使用批处理很容易:
String topic = "BatchTest";
List<Message> messages = new ArrayList<>();
messages.add(new Message(topic, "TagA", "OrderID001", "Hello world 0".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID002", "Hello world 1".getBytes()));
messages.add(new Message(topic, "TagA", "OrderID003", "Hello world 2".getBytes()));
try
producer.send(messages);
catch (Exception e)
e.printStackTrace();
//handle the error
2 拆分
当您发送较大的消息时,复杂性会增加,如果您不确定它是否超过 4MiB的限制。 这时候,您最好将较大的消息分成多个不超过 1MiB 的小消息:
public class ListSplitter implements Iterator<List<Message>>
private final int SIZE_LIMIT = 1024 * 1024 * 4;
private final List<Message> messages;
private int currIndex;
public ListSplitter(List<Message> messages)
this.messages = messages;
@Override public boolean hasNext()
return currIndex < messages.size();
@Override public List<Message> next()
int startIndex = getStartIndex();
int nextIndex = startIndex;
int totalSize = 0;
for (; nextIndex < messages.size(); nextIndex++)
Message message = messages.get(nextIndex);
int tmpSize = calcMessageSize(message);
if (tmpSize + totalSize > SIZE_LIMIT)
break;
else
totalSize += tmpSize;
List<Message> subList = messages.subList(startIndex, nextIndex);
currIndex = nextIndex;
return subList;
private int getStartIndex()
Message currMessage = messages.get(currIndex);
int tmpSize = calcMessageSize(currMessage);
while(tmpSize > SIZE_LIMIT)
currIndex += 1;
Message message = messages.get(curIndex);
tmpSize = calcMessageSize(message);
return currIndex;
private int calcMessageSize(Message message)
int tmpSize = message.getTopic().length() + message.getBody().length();
Map<String, String> properties = message.getProperties();
for (Map.Entry<String, String> entry : properties.entrySet())
tmpSize += entry.getKey().length() + entry.getValue().length();
tmpSize = tmpSize + 20; // Increase the log overhead by 20 bytes
return tmpSize;
// then you could split the large list into small ones:
ListSplitter splitter = new ListSplitter(messages);
while (splitter.hasNext())
try
List<Message> listItem = splitter.next();
producer.send(listItem);
catch (Exception e)
e.printStackTrace();
// handle the error
以上是关于rocketMQ之批处理消息的主要内容,如果未能解决你的问题,请参考以下文章