Spring Batch 分区处理程序错误

Posted

技术标签:

【中文标题】Spring Batch 分区处理程序错误【英文标题】:Spring Batch Partition Handler error 【发布时间】:2018-09-18 05:41:15 【问题描述】:

我对 Spring 框架和 Spring Batch 非常陌生

我正在尝试设置示例 spring 批处理远程分区示例。

我正在使用这个堆栈 Spring Boot + Spring Batch + Spring Integration + AWS SQS

以下事情,我已经成功了。

1.创建所有配置,包括通道、作业、队列和其他内容。

2.运行主进程,我可以对表进行分区并将分区元数据推送到 AWS SQS。

但是在运行从属进程时出现错误,在从属进程中,我能够从队列中拉出消息,但在 StepExecutionRequestHandler 的 handle() 方法时出现错误

org.springframework.messaging.MessageHandlingException:嵌套 例外是 org.springframework.expression.spel.SpelEvaluationException:EL1004E: 方法调用:找不到方法句柄(java.lang.String) org.springframework.batch.integration.partition.StepExecutionRequestHandler 类型,失败消息 = 通用消息 [有效负载 = StepExecutionRequest: [jobExecutionId=2, stepExecutionId=3, stepName=slaveStep], headers=sequenceNumber=2, aws_messageId="", SentTimestamp=1523215624042, sequenceSize=4, SenderId="", aws_receiptHandle="", ApproximateReceiveCount=2, 相关 ID=2:slaveStep,id="",lookupDestination=master, aws_queue=master,ApproximateFirstReceiveTimestamp=1523215634470, 时间戳=1523215864910]

@Configuration
public class JobConfiguration implements ApplicationContextAware


  @Autowired
  public JobBuilderFactory jobBuilderFactory;

  @Autowired
  public StepBuilderFactory stepBuilderFactory;

  @Autowired
  public DataSource dataSource;

  @Autowired
  public JobExplorer jobExplorer;

  @Autowired
  public JobRepository jobRepository;

  private ApplicationContext applicationContext;

  private static final int GRID_SIZE = 4;

  @Bean
  public PartitionHandler partitionHandler(MessagingTemplate messagingTemplate) throws Exception
  
    MessageChannelPartitionHandler partitionHandler = new MessageChannelPartitionHandler();
    partitionHandler.setStepName("slaveStep");
    partitionHandler.setGridSize(GRID_SIZE);
    partitionHandler.setMessagingOperations(messagingTemplate);
    partitionHandler.setPollInterval(5000l);
    partitionHandler.setJobExplorer(this.jobExplorer);

    partitionHandler.afterPropertiesSet();

    return partitionHandler;
  

  @Bean
  public ColumnRangePartitioner partitioner()
  
    ColumnRangePartitioner columnRangePartitioner = new ColumnRangePartitioner();

    columnRangePartitioner.setColumn("id");
    columnRangePartitioner.setDataSource(this.dataSource);
    columnRangePartitioner.setTable("customer");
    return columnRangePartitioner;
  

  @Bean
  @Profile("slave")
  @ServiceActivator(inputChannel = "inboundRequests", outputChannel = "outboundStaging")
  public StepExecutionRequestHandler stepExecutionRequestHandler()
  
    StepExecutionRequestHandler stepExecutionRequestHandler = new StepExecutionRequestHandler();

    BeanFactoryStepLocator stepLocator = new BeanFactoryStepLocator();
    stepLocator.setBeanFactory(this.applicationContext);
    stepExecutionRequestHandler.setStepLocator(stepLocator);
    stepExecutionRequestHandler.setJobExplorer(this.jobExplorer);

    return stepExecutionRequestHandler;
  

  @Bean(name = PollerMetadata.DEFAULT_POLLER)
  public PollerMetadata defaultPoller()
  
    PollerMetadata pollerMetadata = new PollerMetadata();
    pollerMetadata.setTrigger(new PeriodicTrigger(10));
    return pollerMetadata;
  

  @Bean
  @StepScope
  public JdbcPagingItemReader<Customer> pagingItemReader(@Value("#stepExecutionContext['minValue']") Long minValue,
      @Value("#stepExecutionContext['maxValue']") Long maxValue)
  
    System.out.println("reading " + minValue + " to " + maxValue);
    JdbcPagingItemReader<Customer> reader = new JdbcPagingItemReader<>();

    reader.setDataSource(this.dataSource);
    reader.setFetchSize(100);
    reader.setRowMapper(new CustomerRowMapper());

    mysqlPagingQueryProvider queryProvider = new MySqlPagingQueryProvider();
    queryProvider.setSelectClause("id, firstName, lastName, birthdate");
    queryProvider.setFromClause("from customer");
    queryProvider.setWhereClause("where id >= " + minValue + " and id <= " + maxValue);

    Map<String, Order> sortKeys = new HashMap<>(1);

    sortKeys.put("id", Order.ASCENDING);

    queryProvider.setSortKeys(sortKeys);

    reader.setQueryProvider(queryProvider);

    return reader;
  

  @Bean
  @StepScope
  public JdbcBatchItemWriter<Customer> customerItemWriter()
  
    JdbcBatchItemWriter<Customer> itemWriter = new JdbcBatchItemWriter<>();

    itemWriter.setDataSource(this.dataSource);
    itemWriter.setSql("INSERT INTO new_customer VALUES (:id, :firstName, :lastName, :birthdate)");
    itemWriter.setItemSqlParameterSourceProvider(new BeanPropertyItemSqlParameterSourceProvider());
    itemWriter.afterPropertiesSet();

    return itemWriter;
  

  @Bean
  public Step step1() throws Exception
  
    return stepBuilderFactory.get("step1").partitioner(slaveStep().getName(), partitioner()).step(slaveStep())
        .partitionHandler(partitionHandler(null)).build();
  

  @Bean
  public Step slaveStep()
  
    return stepBuilderFactory.get("slaveStep").<Customer, Customer>chunk(1000).reader(pagingItemReader(null, null)).writer(customerItemWriter())
        .build();
  

  @Bean
  @Profile("master")
  public Job job() throws Exception
  
    return jobBuilderFactory.get("job").start(step1()).build();
  

  @Override
  public void setApplicationContext(ApplicationContext applicationContext) throws BeansException
  
    this.applicationContext = applicationContext;
  


@Configuration
public class IntegrationConfiguration


  @Autowired
  private AmazonSQSAsync amazonSqs;

  @Bean
  public MessagingTemplate messageTemplate()
  
    MessagingTemplate messagingTemplate = new MessagingTemplate(outboundRequests());

    messagingTemplate.setReceiveTimeout(60000000l);

    return messagingTemplate;
  

  @Bean
  public DirectChannel outboundRequests()
  
    return new DirectChannel();
  


  @Bean
  @Profile("slave")
  public MessageProducer sqsMessageDrivenChannelAdapter()
  
    SqsMessageDrivenChannelAdapter adapter = new SqsMessageDrivenChannelAdapter(this.amazonSqs, "master");
    adapter.setOutputChannel(inboundRequests());
    adapter.afterPropertiesSet();
    return adapter;
  

  @Bean
  @ServiceActivator(inputChannel = "outboundRequests")
  public MessageHandler sqsMessageHandler()
  
    SqsMessageHandler messageHandler = new SqsMessageHandler(amazonSqs);
    messageHandler.setQueue("master");
    return messageHandler;
  

  @Bean
  public PollableChannel outboundStaging()
  
    return new NullChannel();
  

  @Bean
  public QueueChannel inboundRequests()
  
    return new QueueChannel();
  

谢谢

【问题讨论】:

【参考方案1】:

您应该记住,StepExecutionRequestHandler 的合同如下:

public StepExecution handle(StepExecutionRequest request)

根据您的例外情况和 SQS 的性质,inboundRequests 中消息的有效负载是一个字符串。我相信它在 JSON 中。因此,请考虑在StepExecutionRequestHandler 之前使用JsonToObjectTrabsformer

更新

有效载荷不是 JSON 格式。它是一个字符串,由 toString()StepExecutionRequest 类创建。格式为StepExecutionRequest: [jobExecutionId=2, stepExecutionId=3,stepName=slaveStep]

好的!我明白你的意思了。 SQS Message 只能有 String 正文。 SqsMessageHandler 向 SQS 发送消息默认使用 GenericMessageConverter 将传入的对象转换为字符串。

我认为您需要考虑配置 SqsMessageHandlerMappingJackson2MessageConverter 以真正将 StepExecutionRequest 序列化为正确的 JSON 并让它通过 SQS 传输。

在调用StepExecutionRequestHandler 之前的另一端(从属),您确实应该在SqsMessageDrivenChannelAdapter 之后为JsonToObjectTransformer 放置一个@Transformer

【讨论】:

@Bilan 你能告诉我在哪里做以及如何在上面的代码中做。我尝试了变压器注释,但它不起作用。 @Bilan ,有效载荷不是 JSON 格式。它是一个字符串,由 StepExecutionRequest 类的 toString() 创建。格式为 StepExecutionRequest: [jobExecutionId=2, stepExecutionId=3,stepName=slaveStep] 请在我的回答中找到更新。

以上是关于Spring Batch 分区处理程序错误的主要内容,如果未能解决你的问题,请参考以下文章

Spring批处理xmlns架构错误

Spring Batch StepScope Bean

处理器中的 Spring Batch 配置错误

Spring Batch分区不适用于复合项目处理器

Spring Boot Spring Batch:没有 Spring 批处理元数据表的多个 DataSource

Spring Batch 使用 Spring Boot - 从不重试写入错误