使用Kafka向Dlq Spring云流发送消息时出错

Posted

tags:

篇首语:本文由小常识网(cha138.com)小编为大家整理,主要介绍了使用Kafka向Dlq Spring云流发送消息时出错相关的知识,希望对你有一定的参考价值。

pom.xml

<dependencies>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-stream</artifactId>
         </dependency>
         <dependency>
            <groupId>org.springframework.cloud</groupId>
            <artifactId>spring-cloud-starter-stream-kafka</artifactId>
        </dependency>
        </dependencies>
        <dependencyManagement>
        <dependencies>
        <dependency>
                <groupId>org.springframework.cloud</groupId>
                <artifactId>spring-cloud-dependencies</artifactId>
                <version>Edgware.SR3</version>
                <type>pom</type>
                <scope>import</scope>
          </dependency>
          </dependencies>
          </dependencyManagement>

@Component
public class QueueConsumer {

    /** The Constant LOG. */
    public static final Logger LOG = LoggerFactory.getLogger(QueueConsumer.class);

    /** The processor. */
    @Autowired
    private IMessageProcessor processor;

    /**
     * Consume.
     *
     * @param message the message
     */
    @StreamListener(value = OrderEventSink.ORDER_EVENT)
    public void consume(Message<String> message) {
        try {
            processor.process(message);
        } catch (MessageProcessingFailedException e) {
            LOG.error("Error Code "+ e.getCode().getCode() + " " + e.getCode().getDescription(), e);
            throw e;
        }
    }
}
  1. 我正在使用spring cloud stream来读取来自kafka主题的消息。正在从队列中读取消息并进行处理,如果在处理时失败,则消息应该配置为错误队列,但会出现以下错误。
  2. 从邮件中提取标题时会出现异常,这可能是解决此问题的最佳方法吗?
  3. Kafka版本是1.0,kafka客户端是2.11-1.0

application.properties

     spring.cloud.stream.bindings.orderEvent.destination=orderEvents
     spring.cloud.stream.bindings.orderEvent.content- 
     type=application/json
     spring.cloud.stream.bindings.orderEvent.group=orderEvents-consumer
     spring.cloud.stream.bindings.orderEvent.consumer.back-off- 
     multiplier=5
     spring.cloud.stream.bindings.orderEvent.consumer.back-off-initial- 
     interval=60000
     spring.cloud.stream.bindings.orderEvent.consumer.max-attempts=1
     spring.cloud.stream.bindings.orderEvent.consumer.headerMode=raw
     spring.cloud.stream.bindings.kafka.binder.brokers=localhost
     spring.cloud.stream.bindings.kafka.binder.defaultBrokerPort=9092
     spring.cloud.stream.bindings.kafka.binder.zkNodes=localhost
     spring.cloud.stream.bindings.kafka.binder.defaultZkPort=2181
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     enableDlq=true
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqName=dead-queue
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqProducerProperties.configuration.key.
     serializer=org.apache.kafka.common.serialization.StringSerializer
     spring.cloud.stream.kafka.bindings.orderEvent.consumer.
     dlqProducerProperties.configuration.value.
     serializer=org.apache.kafka.common.serialization.StringSerializer

org.springframework.messaging.MessageDeliveryException:无法将消息发送到通道'scm-orderEvents.scm-orderEvents-consumer.errors';嵌套异常是java.lang.RuntimeException:java.lang.StringIndexOutOfBoundsException:字符串索引超出范围:4297 at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:451)〜[spring-integration-core-4.3 .12.RELEASE.jar:4.3.12.RELEASE]在org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:375)〜[spring-integration-core-4.3.12.RELEASE.jar:4.3。 12.RELEASE]在org.springframework.messaging的org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:115)〜[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE]。 core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:45)〜[弹簧消息传递4.3.13.RELEASE.jar:4.3.13.RELEASE]在org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:105 )〜[spring-messaging-4.3.13.RELEASE.jar:4.3.13.RELEASE] org.springframework.integration.endpoint.MessageProducerSu pport.sendErrorMessageIfNecessary(MessageProducerSupport.java:207)〜[弹簧一体化芯-4.3.12.RELEASE.jar:4.3.12.RELEASE]在org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:191 )〜[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter.access $ 200(KafkaMessageDrivenChannelAdapter.java:63)〜[spring-integration -kafka-2.1.2.RELEASE.jar:NA]在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter $ IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:372)〜[弹簧整合-卡夫卡2.1.2.RELEASE。罐子:NA]在org.springframework.integration.kafka.inbound.KafkaMessageDrivenChannelAdapter $ IntegrationRecordMessageListener.onMessage(KafkaMessageDrivenChannelAdapter.java:352)〜[弹簧整合-卡夫卡2.1.2.RELEASE.jar:NA]在org.springframework。 kafka.listener.KafkaMessageListenerContainer $ ListenerConsu mer.invokeRecordListener(KafkaMessageListenerContainer.java:794)[弹簧卡夫卡1.1.6.RELEASE.jar:NA]在org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer.invokeListener(KafkaMessageListenerContainer.java:738)[弹簧卡夫卡-1.1.6.RELEASE.jar:na] at org.springframework.kafka.listener.KafkaMessageListenerContainer $ ListenerConsumer.access $ 2200(KafkaMessageListenerContainer.java:245)[spring-kafka-1.1.6.RELEASE.jar:na] at org .springframework.kafka.listener.KafkaMessageListenerContainer $ $ ListenerConsumer ListenerInvoker.run(KafkaMessageListenerContainer.java:1031)[弹簧卡夫卡1.1.6.RELEASE.jar:NA]在java.util.concurrent.Executors $ RunnableAdapter.call(执行人.java:511)在[java:。 )[na:1.8.0_162]引起:java.lang.RuntimeException:java.lang.StringIndexOutOfBoundsException:字符串索引超出范围:org.springframework.cloud.stre中的4297 am.binder.kafka.KafkaMessageChannelBinder $ 4.handleMessage(KafkaMessageChannelBinder.java:380)〜[spring-cloud-stream-binder-kafka-1.3.2.RELEASE.jar:1.3.2.RELEASE] org.springframework.integration。 org.springframework.integration.dispatcher.BroadcastingDispatcher.dispatch(BroadcastingDispatcher.java)中的dispatcher.BroadcastingDispatcher.invokeHandler(BroadcastingDispatcher.java:236)〜[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] :185)〜[spring-integration-core-4.3.12.RELEASE.jar:4.3.12.RELEASE] org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:89)〜[spring-integration- org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:425)〜[spring-integration-core-4.3.12.RELEASE.jar :4.3.12.RELEASE] ...省略了16个常见帧引起的:java.lang.StringIndexOutOfBoundsException:字符串索引超出范围:java.lang.String.checkBounds(S中的4297) tring.java:385)~ [na:1.8.0_162] at java.lang.String。(String.java:425)〜[na:1.8.0_162] at org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.oldExtractHeaders (EmbeddedHeaderUtils.java:154)〜[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE] org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:115) 〜[spring-cloud-stream-1.3.2.RELEASE.jar:1.3.2.RELEASE]在org.springframework.cloud.stream.binder.EmbeddedHeaderUtils.extractHeaders(EmbeddedHeaderUtils.java:107)〜[spring-cloud-stream -1.3.2.RELEASE.jar:1.3.2.RELEASE]在org.springframework.cloud.stream.binder.kafka.KafkaMessageChannelBinder $ 4.handleMessage(KafkaMessageChannelBinder.java:368)〜[spring-cloud-stream-binder-kafka -1.3.2.RELEASE.jar:1.3.2.RELEASE] ...省略了20个常用帧

答案

这是kafka活页夹的1.3.2.RELEASE中的一个错误;它是fixed on master(1.3.3.BUILD-SNAPSHOT)。

BTW,最好的解决方案是使用Spring Boot 2.0.1和SCSt Emlhurst.RELEASE(由云FINCHLEY引入 - 目前在M9里程碑)。

这些版本对Kafka 1.0有本机支持。

您可能也有一些成功转移到与SCST 1.3.x兼容的kafka11 binder工件(1.3.0),如discussed on the Wiki

以上是关于使用Kafka向Dlq Spring云流发送消息时出错的主要内容,如果未能解决你的问题,请参考以下文章

具有反应流的 Spring 云流

消息队列 - 死信、延迟、重试队列

带有 Spring JMS 的 Azure 接收多次接收相同的消息并将消息移动到 DLQ

春季云流-消费者配置不起作用

向 kafka 主题发送消息时出现 TimeoutException

春季启动生产者在kafka重启后无法发送任何消息