在 MessageReciever 之外确认 pubSub 消息
Posted
技术标签:
【中文标题】在 MessageReciever 之外确认 pubSub 消息【英文标题】:Ack pubSub message outside of the MessageReciever 【发布时间】:2021-10-17 22:41:38 【问题描述】:我正在使用 async Pull 从 pupSub 主题中提取消息,进行一些处理并将消息发送到 ActiveMQ 主题。
使用 pupSub 的当前配置,我必须在收到消息时 ack() 。但是,这不适合我的用例,因为我只需要在成功处理并发送到另一个主题后 ack() 消息。这意味着(根据我的理解)在 messageReciver 之外确认()消息。
我尝试保存每条消息及其 AckReplyConsumer 以便以后能够调用它并 ack() 消息,但这不能按预期工作。 并非所有消息都正确 ack( ) 编辑。
所以我想知道这是否可能。如果是的话如何
我的订阅者配置
public Subscriber getSubscriber(CompositeConfigurationElement compositeConfigurationElement, Queue<CustomPupSubMessage> messages) throws IOException
ProjectSubscriptionName subscriptionName = ProjectSubscriptionName.of(compositeConfigurationElement.getPubsub().getProjectid(),
compositeConfigurationElement.getSubscriber().getSubscriptionId());
ExecutorProvider executorProvider =
InstantiatingExecutorProvider.newBuilder().setExecutorThreadCount(2).build();
// Instantiate an asynchronous message receiver.
MessageReceiver receiver =
(PubsubMessage message, AckReplyConsumer consumer) ->
messages.add(CustomPupSubMessage.builder().message(message).consumer(consumer).build());
;
// The subscriber will pause the message stream and stop receiving more messages from the
// server if any one of the conditions is met.
FlowControlSettings flowControlSettings =
FlowControlSettings.newBuilder()
// 1,000 outstanding messages. Must be >0. It controls the maximum number of messages
// the subscriber receives before pausing the message stream.
.setMaxOutstandingElementCount(compositeConfigurationElement.getSubscriber().getOutstandingElementCount())
// 100 MiB. Must be >0. It controls the maximum size of messages the subscriber
// receives before pausing the message stream.
.setMaxOutstandingRequestBytes(100L * 1024L * 1024L)
.build();
//read credentials
InputStream input = new FileInputStream(compositeConfigurationElement.getPubsub().getSecret());
CredentialsProvider credentialsProvider = FixedCredentialsProvider.create(ServiceAccountCredentials.fromStream(input));
Subscriber subscriber = Subscriber.newBuilder(subscriptionName, receiver)
.setParallelPullCount(compositeConfigurationElement.getSubscriber().getSubscriptionParallelThreads())
.setFlowControlSettings(flowControlSettings)
.setCredentialsProvider(credentialsProvider)
.setExecutorProvider(executorProvider)
.build();
return subscriber;
我的处理部分
jmsConnection.start();
for (int i = 0; i < patchSize; i++)
var message = messages.poll();
if (message != null)
byte[] payload = message.getMessage().getData().toByteArray();
jmsMessage = jmsSession.createBytesMessage();
jmsMessage.writeBytes(payload);
jmsMessage.setJMSMessageID(message.getMessage().getMessageId());
producer.send(jmsMessage);
list.add(message.getConsumer());
else break;
jmsSession.commit();
jmsSession.close();
jmsConnection.close();
// if upload is successful then ack the messages
log.info("sent " + list.size() + " in direction " + dest);
list.forEach(consumer -> consumer.ack());
【问题讨论】:
【参考方案1】:没有什么需要在MessageReceiver
回调中确认消息,您应该能够异步确认消息。有几点需要牢记和注意:
-
检查以确保您在确认截止日期到期之前调用确认。默认情况下,Java 客户端库会将确认截止日期延长至多 1 小时,因此,如果您的处理时间少于该时间,则应该没问题。
如果您的订阅者经常受到流量控制,请考虑将您传递给
setParallelPullCount
的值减小为 1。您传入的流量控制设置会传递给每个流,而不是在它们之间进行划分,因此如果每个流都能够接收传入的全部值并且您的处理速度足够慢,您可能在客户端库中超过 1 小时的最后期限,甚至还没有收到消息,从而导致重复传递。如果您处理消息的速度比单个流传递消息的速度要快得多,那么您实际上只需要将setParallelPullCount
设置为更大的值。
确保您的客户端库版本至少为 1.109.0。该版本对流量控制的方式进行了一些改进。
请注意,Pub/Sub 具有至少一次传递语义,这意味着即使正确调用了 ack,也可以重新传递消息。请注意,不确认或取消单个消息可能会导致重新传递在单个批次中一起发布的所有消息。请参阅“消息重新传递和重复率
“Fine-tuning Pub/Sub performance with batch and flow control settings”的“部分。”
如果所有这些仍然不能解决问题,那么最好尝试创建一个小型、独立的示例来重现该问题并在GitHub repo 中打开一个错误。
【讨论】:
以上是关于在 MessageReciever 之外确认 pubSub 消息的主要内容,如果未能解决你的问题,请参考以下文章