请教activemq应用中消息进入DLQ的问题
Posted
tags:
篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了请教activemq应用中消息进入DLQ的问题相关的知识,希望对你有一定的参考价值。
参考技术A 一.Prop类(用来读取属性文件,单例)package com.sitinspring.standardWeblogicJms;
import java.io.FileInputStream;
import java.util.Hashtable;
import java.util.Properties;
import javax.naming.Context;
import javax.naming.InitialContext;
public class Props
private static final String File_Name = "jmsSetting.properties";
private static Properties propts;
public static void makeProptsInstance()
propts = new Properties();
try
propts.load(new FileInputStream(File_Name));
catch (Exception ex)
ex.printStackTrace();
public static String get(String name)
if(propts==null)
makeProptsInstance();
return (String)propts.get(name);
@SuppressWarnings("unchecked")
public static Context getInitialContext()
Context context=null;
String jndiFactory=Props.get("jndi.factory");
String providerUrl=Props.get("jndi.provider.url");
Hashtable env=new Hashtable();
env.put(Context.INITIAL_CONTEXT_FACTORY, jndiFactory);
env.put(Context.PROVIDER_URL, providerUrl);
try
context=new InitialContext(env);
catch (Exception ex)
ex.printStackTrace();
return context;
二.QueueBase类(QueueComsumer和QueueSupplier的基类,用于归纳一些两类共通的东西)
package com.sitinspring.standardWeblogicJms;
import javax.jms.Queue;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
import javax.jms.QueueSession;
import javax.jms.Session;
import javax.naming.Context;
public class QueueBase
protected QueueConnectionFactory queueConnectionFactory;
protected QueueConnection queueConnection;
protected QueueSession queueSession;
protected Queue queue;
public QueueBase(Context context)
try
String jmsFactory=Props.get("jms.factory");
queueConnectionFactory=(QueueConnectionFactory)context.lookup(jmsFactory);
queueConnection=queueConnectionFactory.createQueueConnection();
queueSession=queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
String queueName=Props.get("queue.name");
queue=(Queue)context.lookup(queueName);
catch(Exception ex)
ex.printStackTrace();
三.QueueComsumer类(用于接收消息)
package com.sitinspring.standardWeblogicJms;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageListener;
import javax.jms.QueueReceiver;
import javax.jms.TextMessage;
import javax.naming.Context;
public class QueueComsumer extends QueueBase implements MessageListener
private QueueReceiver queueReceiver;
public QueueComsumer(Context context)
super(context);
try
queueReceiver = queueSession.createReceiver(queue);
queueReceiver.setMessageListener(this);
queueConnection.start();
catch (Exception ex)
ex.printStackTrace();
public void onMessage(Message message)
if (message instanceof TextMessage)
TextMessage txtmsg = (TextMessage) message;
try
System.out.println("I have received the TextMassage:"
+ txtmsg.getText());
catch (JMSException e)
e.printStackTrace();
public void close() throws JMSException
queueReceiver.close();
queueSession.close();
queueConnection.close();
public static void main(String[] args)
Context context = Props.getInitialContext();
QueueComsumer queueComsumer = new QueueComsumer(context);
synchronized (queueComsumer)
while (true)
try
queueComsumer.wait();
catch (InterruptedException interruptedException)
interruptedException.printStackTrace();
try
queueComsumer.close();
catch (JMSException e)
e.printStackTrace();
四.QueueSupplier类(用于发送消息)
package com.sitinspring.standardWeblogicJms;
import javax.jms.JMSException;
import javax.jms.QueueSender;
import javax.jms.TextMessage;
import javax.naming.Context;
public class QueueSupplier extends QueueBase
private QueueSender queueSender;
public QueueSupplier(Context context)
super(context);
try
queueSender = queueSession.createSender(queue);
catch (Exception ex)
ex.printStackTrace();
public void sendMsg(String msg) throws JMSException
TextMessage txtMsg = queueSession.createTextMessage();
txtMsg.setText(msg);
queueConnection.start();
queueSender.send(txtMsg);
public void close() throws JMSException
queueSender.close();
queueSession.close();
queueConnection.close();
public static void main(String[] args)
Context context = Props.getInitialContext();
QueueSupplier queueSupplier = new QueueSupplier(context);
try
queueSupplier.sendMsg("Hello World");
System.out.println("A message have been sent!");
catch (JMSException ex)
ex.printStackTrace();
finally
try
queueSupplier.close();
catch (JMSException ex)
ex.printStackTrace();
抛出异常时消息不会进入死信队列
【中文标题】抛出异常时消息不会进入死信队列【英文标题】:Messages not going to dead-letter queue on throwing exception 【发布时间】:2021-12-05 07:30:58 【问题描述】:我有一个应用程序,我必须在特定情况下将消息移动到 DLQ(死信队列),但在第一条消息上,抛出异常,但它忽略了我的 max-attempts:2 值,我可以看到我的异常消息 4 次,消息还没有发送到 DLQ。
正如spring cloud stream docs 中提到的,消息应该在抛出异常时转到DLQ。当我尝试发送另一条消息时,控制台日志中不断出现不同的异常,直到我重新启动应用程序以再次获取第一条消息,但现在消息转到 DLQ。
我尝试使用 .onErrorContinue,在这种情况下,我的应用程序不会在第一条消息上中断,但消息不会发送到 DLQ。
编辑:
spring-cloud-stream repo 上有一个针对此问题的公开事件 https://github.com/spring-cloud/spring-cloud-stream/issues/1922
如果有人有任何解决方法可以重试,反应函数中的死信队列请提供答案。
我的申请
@Bean
public Function<Flux<String>, Flux<?>> processor()
return PaymentDetailsFlux -> PaymentDetailsFlux
// .retryWhen(Retry.backoff(3, Duration.ofMillis(100000)))
.flatMap(
paymentDetails ->
throw new RuntimeException("intentional");
/* extracting payload from input */
// Map<Object, Object> payload = new HashMap<>();
// try
// ObjectMapper jsonMapper = new ObjectMapper();
// payload = jsonMapper.readValue(paymentDetails, Map.class);
// catch (JsonProcessingException e)
// log.error("error while serializing input payload: "+paymentDetails);
// e.printStackTrace();
//
// String message_id = (String) payload.get("message_id");
// log.info("<"+message_id+">"+" Received message from payment-supplier");
// log.info("<"+message_id+">"+" message payload: "+paymentDetails);
//
// /* DLQ if rule not found */
// if(!isValidateMAP(payload))
// throw new RuntimeException("intentional");
//// return Flux.error(new RuntimeException("<"+message_id+"> "+errMessage));
//
// return Mono.just(paymentDetails);
);
// .onErrorContinue((throwable, o) ->
// log.error(throwable.getMessage());
// );
我的应用程序.yml
spring:
application:
name: payment-supplier
profiles:
active: dev
cloud:
function:
definition: processor;ruleProcessor
stream:
bindings:
processor-in-0:
destination: processor-in #input topic name
processor-in-0.group: processor #queue name
processor-out-0:
destination: rule-in #output topic name
ruleProcessor-in-0:
destination: ruleProcessor-in #input topic name
ruleProcessor-in-0.group: processor #queue name
#ruleProcessor-out-0: #StreamBridge
default-binder: rabbit
#Defining DLQ - Dead Letter Queue
rabbit:
bindings:
processor-in-0:
consumer:
auto-bind-dlq: true
republish-to-dlq: true
max-attempts: 2
requeueRejected: true
backOffInitialInterval: 900000
backOffMaxInterval: 900000
producer:
autoBindDlq: true
republishToDlq: true
requeueRejected: true
maxAttempts: 2
backOffInitialInterval: 900000
backOffMaxInterval: 900000
#Defining DLQ - Dead Letter Queue
# rabbit:
default:
consumer:
auto-bind-dlq: true
republishToDlq : true
maxAttempts : 3
requeueRejected : true
# backOffInitialInterval: 900000
# backOffMaxInterval: 900000
producer:
auto-bind-dlq: true
republishToDlq : true
maxAttempts : 3
requeueRejected : true
# backOffInitialInterval: 900000
# backOffMaxInterval: 900000
第一条消息
2021-10-18 13:12:40.704 ERROR 6704 --- [-in.processor-1] c.f.c.c.BeanFactoryAwareFunctionRegistry : Failed to invoke function 'processor'
java.lang.RuntimeException: intentional
at com.gaic.paymentprocessor.service.PaymentProcessorService.lambda$processor$0(PaymentProcessorService.java:50) ~[classes/:na]
springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.javaspringframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:374) ~[spring-integration-amqp-5.5.4.jar:5.5.4]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.1.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.1.jar:na]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:370) ~[spring-integration-amqp-5.5.4.jar:5.5.4]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(Aborg.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) ~[spring-rabbit-2.3.10.jar:2.3.10]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-10-18 13:12:40.704 ERROR 6704 --- [-in.processor-1] c.f.c.c.BeanFactoryAwareFunctionRegistry : Failed to convert output
java.lang.RuntimeException: intentional
at com.gaic.paymentprocessor.service.PaymentProcessorService.lambda$processor$0(PaymentProcessorService.java:50) ~[classes/:na]
springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) ~[spring-rabbit-2.3.10.jar:2.3.10]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
2021-10-18 13:12:40.704 ERROR 6704 --- [-in.processor-1] onfiguration$FunctionToDestinationBinder : Failure was detected during execution of the reactive function 'processor'
2021-10-18 13:12:40.712 ERROR 6704 --- [-in.processor-1] reactor.core.publisher.Operators : Operator called default onErrorDropped
reactor.core.Exceptions$ErrorCallbackNotImplemented: java.lang.RuntimeException: intentional
Caused by: java.lang.RuntimeException: intentional
at com.gaic.paymentprocessor.service.PaymentProcessorService.lambda$processor$0(PaymentProcessorService.java:50) ~[classes/:na]
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200) ~[reactor-core-3.4.10.jar:3.4.10]
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120) ~[reactor-core-3.4.10.jar:3.4.10]org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72) ~[spring-integration-core-5.5.4.jar:5.5.4]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317) ~[spring-integration-core-5.5.4.jar:5.5.4]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272) ~[spring-integration-core-5.5.4.jar:5.5.4]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.3.10.jar:5.3.10]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.3.10.jar:5.3.10]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.3.10.jar:5.3.10]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.3.10.jar:5.3.10]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208) ~[spring-integration-core-5.5.4.jar:5.5.4]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:69) ~[spring-integration-amqp-5.5.4.jar:5.5.4]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:374) ~[spring-integration-amqp-5.5.4.jar:5.5.4]
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329) ~[spring-retry-1.3.1.jar:na]
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225) ~[spring-retry-1.3.1.jar:na]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:370) ~[spring-integration-amqp-5.5.4.jar:5.5.4]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289) ~[spring-rabbit-2.3.10.jar:2.3.10]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195) ~[spring-rabbit-2.3.10.jar:2.3.10]
at java.base/java.lang.Thread.run(Thread.java:834) ~[na:na]
java.lang.RuntimeException: intentional
at com.gaic.paymentprocessor.service.PaymentProcessorService.lambda$processor$0(PaymentProcessorService.java:50)
at reactor.core.publisher.FluxFlatMap$FlatMapMain.onNext(FluxFlatMap.java:386)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxMap$MapSubscriber.onNext(FluxMap.java:120)
at reactor.core.publisher.FluxPeek$PeekSubscriber.onNext(FluxPeek.java:200)
at reactor.core.publisher.UnicastManySinkNoBackpressure.tryEmitNext(UnicastManySinkNoBackpressure.java:120)
at reactor.core.publisher.SinkManySerialized.tryEmitNext(SinkManySerialized.java:100)
at org.springframework.integration.util.IntegrationReactiveUtils.lambda$adaptSubscribableChannelToPublisher$8(IntegrationReactiveUtils.java:142)
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115)
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:133)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:69)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:374)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:370)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
at java.base/java.lang.Thread.run(Thread.java:834)
第二条消息日志
2021-10-18 12:57:25.422 ERROR 9928 --- [-in.processor-1] o.s.integration.handler.LoggingHandler : org.springframework.messaging.MessageDeliveryException: Dispatcher has no subscribers for channel 'payment-supplier.processor-in-0'.; nested exception is org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[1330], headers=amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=processor-in, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=processor-in.processor, amqp_redelivered=false, id=368d916d-af66-24c1-59c1-491d5dbc6dfa, amqp_consumerTag=amq.ctag-fibQI2iMkG5_giNWmMMwlA, sourceData=(Body:'[B@1d04b729(byte[1330])' MessageProperties [headers=, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=false, receivedExchange=processor-in, receivedRoutingKey=, deliveryTag=2, consumerTag=amq.ctag-fibQI2iMkG5_giNWmMMwlA, consumerQueue=processor-in.processor]), contentType=application/json, timestamp=1634542042374], failedMessage=GenericMessage [payload=byte[1330], headers=amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=processor-in, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=processor-in.processor, amqp_redelivered=false, id=368d916d-af66-24c1-59c1-491d5dbc6dfa, amqp_consumerTag=amq.ctag-fibQI2iMkG5_giNWmMMwlA, sourceData=(Body:'[B@1d04b729(byte[1330])' MessageProperties [headers=, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=false, receivedExchange=processor-in, receivedRoutingKey=, deliveryTag=2, consumerTag=amq.ctag-fibQI2iMkG5_giNWmMMwlA, consumerQueue=processor-in.processor]), contentType=application/json, timestamp=1634542042374]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:76)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:317)
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:272)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166)
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47)
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109)
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:208)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$1300(AmqpInboundChannelAdapter.java:69)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.lambda$onMessage$0(AmqpInboundChannelAdapter.java:374)
at org.springframework.retry.support.RetryTemplate.doExecute(RetryTemplate.java:329)
at org.springframework.retry.support.RetryTemplate.execute(RetryTemplate.java:225)
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:370)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1656)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1575)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1563)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1554)
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1498)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:968)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:914)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:83)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1289)
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1195)
at java.base/java.lang.Thread.run(Thread.java:834)
Caused by: org.springframework.integration.MessageDispatchingException: Dispatcher has no subscribers, failedMessage=GenericMessage [payload=byte[1330], headers=amqp_receivedDeliveryMode=PERSISTENT, amqp_receivedExchange=processor-in, amqp_deliveryTag=2, deliveryAttempt=3, amqp_consumerQueue=processor-in.processor, amqp_redelivered=false, id=368d916d-af66-24c1-59c1-491d5dbc6dfa, amqp_consumerTag=amq.ctag-fibQI2iMkG5_giNWmMMwlA, sourceData=(Body:'[B@1d04b729(byte[1330])' MessageProperties [headers=, contentLength=0, receivedDeliveryMode=PERSISTENT, redelivered=false, receivedExchange=processor-in, receivedRoutingKey=, deliveryTag=2, consumerTag=amq.ctag-fibQI2iMkG5_giNWmMMwlA, consumerQueue=processor-in.processor]), contentType=application/json, timestamp=1634542042374]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:139)
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:106)
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:72)
... 23 more
【问题讨论】:
【参考方案1】:我已经在 Github (https://github.com/spring-cloud/spring-cloud-stream/issues/1922) 中回答了问题,这里是答案
嗨@vishalmamidi,
我已经使用StreamBridge实现了我自己的DLQ版本,基本上,您需要实现onErrorNext
并通过StreamBridge将您的消息发送到DLQ然后返回Mono.empty()
,请记住您需要使用@987654324 @ 以防止您的上游/下游错误出现任何奇怪的行为,因此它将确保错误转到onErrorNext
而不是onErrorContinue
。这是我使用的代码sn-p
@Bean
@ConditionalOnProperty("app.reader.writerTest.enabled", havingValue = "true", matchIfMissing = true)
fun writerTest(
streamBridge: StreamBridge
) = Function<Flux<Message<TestReaderRequest>>, Mono<Void>> item ->
item
.map
println("TEST: $it")
it
.flatMap request ->
Mono
.just("Service Test")
.map
throw Exception("Testing DLQ")
.handleDlq(request, streamBridge, logger, 1)
.then()
fun <T> Mono<T>.handleDlq(request: Message<*>, streamBridge: StreamBridge, logger: Logger, retryThreshold: Int = 1) =
onErrorResume exception -> handleDlq<T>(request, retryThreshold, streamBridge, logger, exception)
.onErrorStop()
fun <T> Flux<T>.handleDlq(request: Message<*>, streamBridge: StreamBridge, logger: Logger, retryThreshold: Int = 1) =
onErrorResume exception -> handleDlq<T>(request, retryThreshold, streamBridge, logger, exception)
.onErrorStop()
private fun <T> handleDlq(
request: Message<*>,
retryThreshold: Int,
streamBridge: StreamBridge,
logger: Logger,
exception: Throwable
): Mono<T>
logger.error(
"Unable to finish the operation of request: $request due to: ",
exception
)
val receivedTopic = request.headers[RECEIVED_TOPIC]
val attempts = request.headers[DELIVERY_ATTEMPT, AtomicInteger::class.java] ?: AtomicInteger(1)
val topic = if (attempts.get() < retryThreshold)
attempts.incrementAndGet()
logger.info("About to retry the message of $request for topic: $receivedTopic, attempt #$attempts.get() from $retryThreshold")
receivedTopic.toString()
else
// TODO: needs to find the dlq name from application.yml
val dlq = receivedTopic.toString() + "-dlq"
logger.info("About to send the message of $request to dlq: $dlq after retrying $attempts.get() time(s)")
dlq
val message = MessageBuilder
.fromMessage(request)
.setHeaders(buildRetryOrDlqMessageHeaders(request, attempts, topic, exception))
.build()
streamBridge.send(topic, message)
return Mono.empty()
private fun buildRetryOrDlqMessageHeaders(
request: Message<*>,
attempts: AtomicInteger,
topic: String,
exception: Throwable
): MessageHeaderAccessor
val messageHeaders = MessageHeaderAccessor(request)
messageHeaders.setHeader("spring.cloud.stream.sendto.destination", topic)
messageHeaders.setHeader(DELIVERY_ATTEMPT, attempts)
messageHeaders.setHeader(DLT_ORIGINAL_TOPIC, request.headers[RECEIVED_TOPIC])
messageHeaders.setHeader(DLT_EXCEPTION_FQCN, exception.javaClass.name)
messageHeaders.setHeader(DLT_EXCEPTION_MESSAGE, exception.message)
messageHeaders.setHeader(DLT_EXCEPTION_STACKTRACE, exception.stackTraceToString())
return messageHeaders
如果您有任何问题,请告诉我,希望对您有所帮助
【讨论】:
以上是关于请教activemq应用中消息进入DLQ的问题的主要内容,如果未能解决你的问题,请参考以下文章