RemoteChunking 中的 Spring Batch 集成超时
Posted
技术标签:
【中文标题】RemoteChunking 中的 Spring Batch 集成超时【英文标题】:Spring Batch Integration Timeout in RemoteChunking 【发布时间】:2019-02-21 07:49:39 【问题描述】:我正在尝试使用 Spring Boot、Spring Batch 和 Spring Integrations 配置 RemoteChunking 任务。
我已经配置了一个activeMQ
服务器,我开始按照官方文档https://docs.spring.io/spring-batch/4.0.x/reference/html/spring-batch-integration.html#remote-chunking 配置Spring Batch。
我的主配置:
import com.arrobaautowired.payment.Payment;
import com.arrobaautowired.record.Record;
import lombok.extern.slf4j.Slf4j;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.batch.core.Job;
import org.springframework.batch.core.Step;
import org.springframework.batch.core.configuration.annotation.EnableBatchProcessing;
import org.springframework.batch.core.configuration.annotation.JobBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepBuilderFactory;
import org.springframework.batch.core.configuration.annotation.StepScope;
import org.springframework.batch.core.step.tasklet.TaskletStep;
import org.springframework.batch.integration.chunk.ChunkMessageChannelItemWriter;
import org.springframework.batch.item.file.MultiResourceItemReader;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.channel.QueueChannel;
import org.springframework.integration.core.MessagingTemplate;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;
@Configuration
@Slf4j
@EnableBatchProcessing
public class MasterBatchConfiguration
private final static String MASTER_JOB_TEST = "JOB_MASTER";
private final static String MATER_JOB_STEP = "STEP-1";
private final static int CHUNK_SIZE = 50;
private JobBuilderFactory jobBuilderFactory;
private StepBuilderFactory stepBuilderFactory;
private MultiResourceItemReader<Record> filesReader;
private StepListener stepListener;
@Autowired
public MasterBatchConfiguration(JobBuilderFactory jobBuilderFactory, StepBuilderFactory stepBuilderFactory, MultiResourceItemReader<Record> filesReader, StepListener stepListener)
this.jobBuilderFactory = jobBuilderFactory;
this.stepBuilderFactory = stepBuilderFactory;
this.filesReader = filesReader;
this.stepListener = stepListener;
@Bean
public Job processRecordsJob(JobCompletionNotificationListener listener, Step step1)
return jobBuilderFactory
.get(MASTER_JOB_TEST)
.listener(listener)
.flow(step1)
.end()
.build();
@Bean
public TaskletStep step1()
return stepBuilderFactory.get(MATER_JOB_STEP)
.<Record, Payment>chunk(CHUNK_SIZE)
.reader(filesReader)
.writer(itemWriter())
.listener(stepListener)
.build();
@Bean
public ActiveMQConnectionFactory connectionFactory()
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
factory.setTrustAllPackages(Boolean.TRUE);
return factory;
/*
* Configure outbound flow (requests going to workers)
*/
@Bean
public DirectChannel requests()
return new DirectChannel();
@Bean
public IntegrationFlow jmsOutboundFlow(ActiveMQConnectionFactory connectionFactory)
return IntegrationFlows
.from("requests")
.handle(Jms
.outboundAdapter(connectionFactory)
.destination("requests"))
.get();
/*
* Configure inbound flow (replies coming from workers)
*/
@Bean
public QueueChannel replies()
return new QueueChannel();
@Bean
public IntegrationFlow inboundFlow(ActiveMQConnectionFactory connectionFactory)
return IntegrationFlows
.from(Jms.messageDrivenChannelAdapter(connectionFactory).destination("replies"))
.channel(replies())
.get();
/*
* Configure the ChunkMessageChannelItemWriter.
* Se trata de un ItemWriter especial, @link ChunkMessageChannelItemWriter, que se encarga de enviar la información al pooleer (Middleware externo) y recogerla.
*/
@Bean
@StepScope
public ChunkMessageChannelItemWriter<Payment> itemWriter()
ChunkMessageChannelItemWriter<Payment> chunkMessageChannelItemWriter = new ChunkMessageChannelItemWriter<>();
chunkMessageChannelItemWriter.setMessagingOperations(messagingTemplate());
chunkMessageChannelItemWriter.setReplyChannel(replies());
return chunkMessageChannelItemWriter;
@Bean
public MessagingTemplate messagingTemplate()
MessagingTemplate messagingTemplate = new MessagingTemplate();
messagingTemplate.setDefaultChannel(requests());
messagingTemplate.setReceiveTimeout(2000);
return messagingTemplate;
我的奴隶配置:
import com.arrobaautowired.processor.PaymentWriter;
import com.arrobaautowired.processor.ComplexRecordProcessor;
import com.arrobaautowired.processor.SimpleRecordProcessor;
import com.arrobaautowired.record.Record;
import org.apache.activemq.spring.ActiveMQConnectionFactory;
import org.springframework.batch.core.step.item.SimpleChunkProcessor;
import org.springframework.batch.integration.chunk.ChunkProcessorChunkHandler;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import org.springframework.context.annotation.Profile;
import org.springframework.integration.annotation.IntegrationComponentScan;
import org.springframework.integration.annotation.ServiceActivator;
import org.springframework.integration.channel.DirectChannel;
import org.springframework.integration.config.AggregatorFactoryBean;
import org.springframework.integration.config.EnableIntegration;
import org.springframework.integration.dsl.IntegrationFlow;
import org.springframework.integration.dsl.IntegrationFlows;
import org.springframework.integration.jms.dsl.Jms;
@Configuration
@IntegrationComponentScan
@EnableIntegration
public class WorkerBatchConfiguration
@Bean
public ActiveMQConnectionFactory connectionFactory()
ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory();
factory.setBrokerURL("tcp://localhost:61616");
factory.setTrustAllPackages(Boolean.TRUE);
return factory;
@Bean
public DirectChannel requests()
return new DirectChannel();
@Bean
public DirectChannel replies()
return new DirectChannel();
@Bean
public IntegrationFlow jmsIn()
return IntegrationFlows
.from(Jms
.messageDrivenChannelAdapter(connectionFactory())
.configureListenerContainer(c -> c.subscriptionDurable(false))
.destination("requests"))
.channel(requests())
.get();
@Bean
public IntegrationFlow outgoingReplies()
return IntegrationFlows
.from("replies")
.handle(Jms
.outboundGateway(connectionFactory())
.requestDestination("replies"))
.get();
@Bean
@ServiceActivator(inputChannel = "requests", outputChannel = "replies", sendTimeout = "10000")
public ChunkProcessorChunkHandler<Record> chunkProcessorChunkHandler()
ChunkProcessorChunkHandler chunkProcessorChunkHandler = new ChunkProcessorChunkHandler();
chunkProcessorChunkHandler.setChunkProcessor(new SimpleChunkProcessor(recordProcessor(), paymentWriter()));
return chunkProcessorChunkHandler;
@Bean
public SimpleRecordProcessor recordProcessor()
return new SimpleRecordProcessor();
@Bean
public PaymentWriter paymentWriter()
return new PaymentWriter();
看起来工作正常,但是当从机完成一个块时,它会向完成工作的主机发送响应,并且从机显示“超时错误”:
2018-09-17 13:15:21.509 调试 75729 --- [erContainer#0-1] c.a.processor.PaymentWriter: FINALIZADO CHUNK ============================= 2018-09-17 13:15:21.510 调试 75729 --- [onPool-worker-4] caprocessor.PaymentWriter : PAYMENT: Payment(fullName=Culver Gapper, bic=ES08 9240 0446 6617 7749 9525, amount=75189, 货币=€) 2018-09-17 13:15:21.510 调试 75729 --- [erContainer#0-1] caprocessor.PaymentWriter : PAYMENT: Payment(fullName=Burgess Feldbau, bic=ES62 6361 1904 4990 0753 3877, amount=null, 货币=€) 2018-09-17 13:15:21.510 调试 75729 --- [onPool-worker-0] caprocessor.PaymentWriter : PAYMENT: Payment(fullName=Brenda Waddell, bic=ES23 4535 5585 5095 5691 1491, amount=28353, 货币=€) 2018-09-17 13:15:21.510 调试 75729 --- [onPool-worker-6] caprocessor.PaymentWriter : PAYMENT: Payment(fullName=Brittney Bliben, bic=ES88 3076 6115 5504 4561 1796, amount=86995, 货币=€) 2018-09-17 13:15:21.510 调试 75729 --- [onPool-worker-0] caprocessor.PaymentWriter : PAYMENT: Payment(fullName=Hortensia Willshee, bic=ES62 7020 0819 9813 3352 2742, amount=null, 货币=€) 2018-09-17 13:15:21.510 调试 75729 --- [onPool-worker-4] caprocessor.PaymentWriter : PAYMENT: Payment(fullName=Roselin Maccrie, bic=ES34 5876 6541 1999 9568 8714, amount=29865, 货币=€) 2018-09-17 13:15:21.510 调试 75729 --- [erContainer#0-1] caprocessor.PaymentWriter : PAYMENT: Payment(fullName=Jonathan Parlet, bic=ES74 5605 5066 6941 1376 6204, amount=null, 货币=€) 2018-09-17 13:15:21.510 调试 75729 --- [erContainer#0-1] caprocessor.PaymentWriter : PAYMENT: Payment(fullName=Ilise Semiras, bic=ES59 4689 9344 4052 2235 5296, amount=10698, 货币=€) 2018-09-17 13:15:21.510 调试 75729 --- [onPool-worker-5] caprocessor.PaymentWriter : PAYMENT: Payment(fullName=Audrey Lempenny, bic=ES45 2456 6470 0023 3823 3629, amount=56543, 货币=€) 2018-09-17 13:15:21.510 调试 75729 --- [onPool-worker-2] caprocessor.PaymentWriter : PAYMENT: Payment(fullName=Hayyim Fetter, bic=ES76 5134 4202 2267 7072 2547, amount=14662, 货币=€) 2018-09-17 13:15:21.510 调试 75729 --- [erContainer#0-1] c.a.processor.PaymentWriter: =============================================== 2018-09-17 13:15:21.510 调试 75729 --- [erContainer#0-1] osintegration.channel.DirectChannel :在频道“回复”上预发送,消息:GenericMessage [payload=ChunkResponse:jobId=1,sequence= 0, stepContribution=[StepContribution: 读取=0, 写入=10, 过滤=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING], 成功=true, headers=jms_redelivered=true, JMSXDeliveryCount=2 , jms_destination=queue://requests, id=16120698-22b4-c615-502b-7c6d050d82c6, priority=4, jms_timestamp=1537182878228, jms_messageId=ID:mbp-de-jose.neoris.cxnetworks.net-59228-15371828778 :2:1:1:1,时间戳=1537182921510] 2018-09-17 13:15:21.510 DEBUG 75729 --- [erContainer#0-1] osintegration.jms.JmsOutboundGateway:outcomingReplies.org.springframework.integration.jms.JmsOutboundGateway#0 收到消息:GenericMessage [payload=ChunkResponse : jobId=1, sequence=0, stepContribution=[StepContribution: read=0,written=10,filtered=0, readSkips=0, writeSkips=0, processSkips=0, exitStatus=EXECUTING],successful=true, headers= jms_redelivered=true, JMSXDeliveryCount=2, jms_destination=queue://requests, id=16120698-22b4-c615-502b-7c6d050d82c6, priority=4, jms_timestamp=1537182878228, jms_messageId=ID:mbp-de-jose.neoris.cxnetworks。 net-59228-1537182877872-1:2:1:1:1,时间戳=1537182921510] 2018-09-17 13:15:21.514 调试 75729 --- [erContainer#0-1] osintegration.jms.JmsOutboundGateway:回复:temp-queue://ID:mbp-de-jose.neoris.cxnetworks.net -58907-1537181494749-3:24:1 2018-09-17 13:15:26.536 WARN 75729 --- [erContainer#0-1] o.s.j.l.DefaultMessageListenerContainer:JMS 消息侦听器的执行失败,并且没有设置 ErrorHandler。 org.springframework.integration.MessageTimeoutException:未能在超时时间内接收 JMS 响应:5000 毫秒 在 org.springframework.integration.jms.JmsOutboundGateway.handleRequestMessage(JmsOutboundGateway.java:762) ~[spring-integration-jms-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:109) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:394) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutput(AbstractMessageProducingHandler.java:426) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.handler.AbstractMessageProducingHandler.produceOutput(AbstractMessageProducingHandler.java:336) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.handler.AbstractMessageProducingHandler.sendOutputs(AbstractMessageProducingHandler.java:227) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.handler.AbstractReplyProducingMessageHandler.handleMessageInternal(AbstractReplyProducingMessageHandler.java:115) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.handler.AbstractMessageHandler.handleMessage(AbstractMessageHandler.java:158) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.dispatcher.AbstractDispatcher.tryOptimizedDispatch(AbstractDispatcher.java:116) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.dispatcher.UnicastingDispatcher.doDispatch(UnicastingDispatcher.java:132) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.dispatcher.UnicastingDispatcher.dispatch(UnicastingDispatcher.java:105) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.channel.AbstractSubscribableChannel.doSend(AbstractSubscribableChannel.java:73) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.channel.AbstractMessageChannel.send(AbstractMessageChannel.java:445) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:181) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:160) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.messaging.core.GenericMessagingTemplate.doSend(GenericMessagingTemplate.java:47) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.send(AbstractMessageSendingTemplate.java:108) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:150) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.messaging.core.AbstractMessageSendingTemplate.convertAndSend(AbstractMessageSendingTemplate.java:142) ~[spring-messaging-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.integration.gateway.MessagingGatewaySupport.send(MessagingGatewaySupport.java:415) ~[spring-integration-core-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.jms.ChannelPublishingJmsMessageListener$GatewayDelegate.send(ChannelPublishingJmsMessageListener.java:511) ~[spring-integration-jms-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.integration.jms.ChannelPublishingJmsMessageListener.onMessage(ChannelPublishingJmsMessageListener.java:341) ~[spring-integration-jms-5.0.8.RELEASE.jar:5.0.8.RELEASE] 在 org.springframework.jms.listener.AbstractMessageListenerContainer.doInvokeListener(AbstractMessageListenerContainer.java:736) ~[spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.jms.listener.AbstractMessageListenerContainer.invokeListener(AbstractMessageListenerContainer.java:696) ~[spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.jms.listener.AbstractMessageListenerContainer.doExecuteListener(AbstractMessageListenerContainer.java:674) ~[spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.jms.listener.AbstractPollingMessageListenerContainer.doReceiveAndExecute(AbstractPollingMessageListenerContainer.java:318) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.jms.listener.AbstractPollingMessageListenerContainer.receiveAndExecute(AbstractPollingMessageListenerContainer.java:257) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.invokeListener(DefaultMessageListenerContainer.java:1189) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.executeOngoingLoop(DefaultMessageListenerContainer.java:1179) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 org.springframework.jms.listener.DefaultMessageListenerContainer$AsyncMessageListenerInvoker.run(DefaultMessageListenerContainer.java:1076) [spring-jms-5.0.9.RELEASE.jar:5.0.9.RELEASE] 在 java.lang.Thread.run(Thread.java:745) [na:1.8.0_121]代码在 (https://github.com/jamataran/spring-batch-scale)[https://github.com/jamataran/spring-batch-scale]
【问题讨论】:
我看不出你的配置有什么问题。这是在每个块请求之后发生还是在所有块都处理完之后发生? @MahmoudBenHassine 感谢您的关注,每一块都会发生。 查看我对此事的回答。 v4.0 文档中的远程分块示例实际上是不正确的。在工作人员方面,应使用outboundAdapter
而不是outboundGateway
。这已在 4.1 版的文档中得到修复,请参阅 docs.spring.io/spring-batch/4.1.x/reference/html/…。所以Artem的答案是正确的。
【参考方案1】:
我认为您的问题出在worker
这边:
@Bean
public IntegrationFlow outgoingReplies()
return IntegrationFlows
.from("replies")
.handle(Jms
.outboundGateway(connectionFactory())
.requestDestination("replies"))
.get();
你只是发送回复,除了来自主人的任何东西,你什么都不做。
必须是单向的
Jms
.outboundAdapter(connectionFactory())
.destination("replies")`
【讨论】:
抱歉,它不起作用...(IntegrationFlows 需要来自) 嗯?我的意思是你需要用outboundAdapter
替换outboundGateway
。流程还是一样的
@Jose 我在您的存储库中测试了 Artem 建议的修复程序,它工作正常。正如 cmets 中所说,您从 v4.0 的文档中复制的示例不正确。 v4.1 文档中的示例与 Artem 建议的相同,并修复了超时问题。
是的!它也适用于我!我今天早上将答案标记为有效
作为记录,这里是报告不正确的远程分块示例的票证:jira.spring.io/browse/BATCH-2721。以上是关于RemoteChunking 中的 Spring Batch 集成超时的主要内容,如果未能解决你的问题,请参考以下文章