在 ChunkProcessorChunkHandler 上找不到 Spring Batch SpelEvaluationException 方法
Posted
技术标签:
【中文标题】在 ChunkProcessorChunkHandler 上找不到 Spring Batch SpelEvaluationException 方法【英文标题】:SpringBatch SpelEvaluationException Method cannot be found on ChunkProcessorChunkHandler 【发布时间】:2019-07-14 07:43:40 【问题描述】:我有一个通过 RabbitMQ 执行远程分块的 SpringBatch 作业 基本上,有两个应用程序 master 和 worker,每个应用程序都是 spring-boot 应用程序。 以下主配置:
@Bean
public DirectChannel requestsToWorkers()
return new DirectChannel();
@Bean
public IntegrationFlow outboundFlow()
return IntegrationFlows
.from(requestsToWorkers())
.handle(Amqp.outboundAdapter(configurableRabbitTemplate).routingKey("master-route"))
.get();
@Bean
public QueueChannel repliesFromWorkers()
return new QueueChannel();
@Bean
public IntegrationFlow inboundFlow()
return IntegrationFlows
.from(Amqp.inboundAdapter(configurableRabbitMqConnectionFactory, messageBrokerProperties.getMessageQueue().getQueueName()))
.channel(repliesFromWorkers())
.get();
@Bean
public ItemReader<Integer> testIntegerItemReader()
return () -> new Random().nextInt();
@Bean
public ItemWriter<Integer> testAqmpItemWriter()
final MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requestsToWorkers());
messagingTemplate.setReceiveTimeout(3000);
final ChunkMessageChannelItemWriter<Integer> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate);
chunkMessageChannelItemWriter.setReplyChannel(repliesFromWorkers());
return chunkMessageChannelItemWriter;
@Bean
public Job testJob(final ItemReader<Integer> testIntegerItemReader,
final ItemWriter<Integer> testAqmpItemWriter)
return jobBuilderFactory.get("testJob")
.incrementer(new RunIdIncrementer())
.start(
stepBuilderFactory.get("masterStep")
.<Integer, Integer>chunk(1)
.reader(testIntegerItemReader)
.writer(testAqmpItemWriter)
.build()
)
.build();
和适当的工人配置:
@Bean
public DirectChannel requestsChannel()
return new DirectChannel();
@Bean
public DirectChannel repliesChannel()
return new DirectChannel();
@Bean
public IntegrationFlow inboundFlow()
return IntegrationFlows
.from(Amqp.inboundAdapter(configurableRabbitMqConnectionFactory, messageBrokerProperties.getMessageQueue().getQueueName()))
.channel(requestsChannel())
.get();
@Bean
public IntegrationFlow outboundFlow()
return IntegrationFlows
.from(repliesChannel())
.handle(Amqp.outboundAdapter(configurableRabbitTemplate).routingKey("worker-route"))
.get();
@Bean
public ItemProcessor<Integer, Integer> processor()
return account ->
System.out.println("Processed random int " + account);
return account;
;
@Bean
public ItemWriter<Integer> writer()
return response -> System.out.println("Value written to AMQP " + response);
@Bean
@ServiceActivator(inputChannel = "requestsChannel", outputChannel = "repliesChannel")
public ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler(final ItemProcessor<Integer, Integer> processor,
final ItemWriter<Integer> writer)
final SimpleChunkProcessor<Integer, Integer> chunkProcessor = new SimpleChunkProcessor<>(processor, writer);
final ChunkProcessorChunkHandler<Integer> chunkProcessorChunkHandler = new ChunkProcessorChunkHandler<>();
chunkProcessorChunkHandler.setChunkProcessor(chunkProcessor);
return chunkProcessorChunkHandler;
每个模块都使用 SpringBoot 2.1.2。 总的来说,一切看起来都很好,并且数据块正在通过 RabbitMQ 成功地从 master 发送到 worker,但是当 worker 尝试实际读取消息时,我收到以下异常
org.springframework.amqp.rabbit.listener.exception.ListenerExecutionFailedException: Listener threw exception
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.wrapToListenerExecutionFailedExceptionIfNeeded(AbstractMessageListenerContainer.java:1613) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1517) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.actualInvokeListener(AbstractMessageListenerContainer.java:1440) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:1428) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:1423) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.executeListener(AbstractMessageListenerContainer.java:1372) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.doReceiveAndExecute(SimpleMessageListenerContainer.java:870) [spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.receiveAndExecute(SimpleMessageListenerContainer.java:854) [spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer.access$1600(SimpleMessageListenerContainer.java:78) [spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.mainLoop(SimpleMessageListenerContainer.java:1137) [spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at org.springframework.amqp.rabbit.listener.SimpleMessageListenerContainer$AsyncMessageProcessingConsumer.run(SimpleMessageListenerContainer.java:1043) [spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
at java.lang.Thread.run(Thread.java:748) [na:1.8.0_181]
Caused by: org.springframework.messaging.MessageHandlingException: nested exception is org.springframework.expression.spel.SpelEvaluationException: EL1004E: Method call: Method handleChunk(byte[]) cannot be found on type org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler
at org.springframework.integration.handler.MethodInvokingMessageProcessor.processMessage(MethodInvokingMessageProcessor.java:109) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.ServiceActivatingHandler.handleRequestMessage(ServiceActivatingHandler.java:93) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:123) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:431) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.doProduceOutput(AbstractMessageProducingHandler.java:284) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:265) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:223) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:129) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:162) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:115) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:453) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:401) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:187) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:166) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:109) ~[spring-messaging-5.1.4.RELEASE.jar:5.1.4.RELEASE]
at org.springframework.integration.endpoint.MessageProducerSupport.sendMessage(MessageProducerSupport.java:205) ~[spring-integration-core-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter.access$600(AmqpInboundChannelAdapter.java:57) ~[spring-integration-amqp-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.createAndSend(AmqpInboundChannelAdapter.java:237) ~[spring-integration-amqp-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.integration.amqp.inbound.AmqpInboundChannelAdapter$Listener.onMessage(AmqpInboundChannelAdapter.java:204) ~[spring-integration-amqp-5.1.2.RELEASE.jar:5.1.2.RELEASE]
at org.springframework.amqp.rabbit.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:1514) ~[spring-rabbit-2.1.3.RELEASE.jar:2.1.3.RELEASE]
... 10 common frames omitted
已经在这个问题上花费了很多时间 非常感谢您的帮助
【问题讨论】:
【参考方案1】:handleChunk
需要 ChunkRequest
,而不是 byte[]
,因此看起来入站通道适配器中的反序列化出现了问题。它应该有一个处理java.io.Serializable
(ChunkRequest
实现)的SimpleMessageConverter
(默认值)。
如果没有 content-type 消息属性,就会发生这种情况(如果转换器不理解内容类型,它只会返回 byte[]
)。
出站适配器使用相同的转换器,因此应该可以正常设置。
查看 Rabbit Management UI 上的消息,确保标题正确。它应该有这个值application/x-java-serialized-object
。
您还可以通过启用调试日志来检查消息和标头。
【讨论】:
感谢您的回复。我仔细检查了所有内容,并且兔子消息中存在 Content-Type 标头甚至更多,我尝试传递简单的 JSON,提供所需的 Jackson2JsonMessageConverter,但问题仍然存在 你能在调试器中运行它并在AmqpInboundChannelAdapter.Listener.createMessage()
中设置断点吗?有效负载转换由该方法中的第一行执行。或许可以进入fromMessage
方法看看发生了什么。
你说得对,问题与消息转换器有关。我在这里犯了一个错误,我在 AmqpTemplate 配置级别指定了消息转换器,什么时候应该在主 itemWriter 级别执行此操作(messagingTemplate.setMessageConverters(new SimpleMessageConverter)
)谢谢您的帮助以上是关于在 ChunkProcessorChunkHandler 上找不到 Spring Batch SpelEvaluationException 方法的主要内容,如果未能解决你的问题,请参考以下文章
在 React 应用程序中在哪里转换数据 - 在 Express 中还是在前端使用 React?