spring boot 处理大请求体 - 通过多线程处理批量

Posted

技术标签:

【中文标题】spring boot 处理大请求体 - 通过多线程处理批量【英文标题】:spring boot handling big request body - processing bulk through multithreading 【发布时间】:2019-11-01 09:11:52 【问题描述】:

我有一个 spring boot 控制器,它吸引了许多用户,如下所示

示例 json


  "users": [
     "name":"john", "age":18, "type":"1",
     "name":"kim", , "age":18, "type":"2",
     "name":"Fits", "age":18, "type","3",
  ]
 

请求处理程序

@RequestMapping(value = "/users", method = RequestMethod.POST, headers = "Accept=application/json")
public void Add(@RequestBody List<user> users) throws Exception 

 // Here I am iterating users and writing one by one to different message topic based on the type
 // if any error in the given user while writing to message topic I am storing that user in other DB



当我在用户列表中有大约 100 个用户时它工作得很好,但如果列表很大,比如 1000 等,它会花费太多时间。 那么有没有我可以分配给它来执行此操作的春季批处理作业?

我只想返回 http 响应代码 202 来请求并将此有效负载分配给 spring 批处理作业

【问题讨论】:

我不明白你在问什么。您是否希望有一些现有的 Batch 作业定义可以为您执行此操作?在我看来,您想要转移到批处理作业的逻辑非常特定于您自己的领域和需求,并且此时不会只存在于野外。如果是这样,那么您是否真的在问“我如何使用 Spring Batch?”这个问题超出了本网站的范围。我假设有几十个文档集和教程可以教你如何使用 Spring Batch。我建议你看看那里。 是的,我想利用 Spring Batch 并将我的业务逻辑添加到其中.. 您是否开始尝试这样做?您是否可以向我们展示代码,以便我们可以看到您正在尝试什么?如果不是,那么这对于本网站来说不是一个合适的问题。该站点旨在解决特定的编程问题和问题,而不是提供使用 Spring Batch 等整个技术的培训。同样,我希望已经有很多资源可以帮助您解决这个问题。 谷歌“Spring Batch”,您将获得大量潜在的学习选择。 我不清楚您到底有什么具体问题。你想知道如何通过 HTTP 请求启动 Spring 批处理作业吗?您想知道如何将业务逻辑编写为 Spring 批处理作业吗?你想知道如何将参数从 HTTP 请求传递到 Spring 批处理作业吗? 【参考方案1】:

一种选择是使用 Spring Async Task 在单独的线程中长时间运行进程,因此不会等待执行整个请求并发送回响应。

首先像这样配置异步任务。

@Configuration
@EnableAsync
public class AsynchTaskConfiguration

    @Bean
    public Executor taskExecutor() 
        ThreadPoolTaskExecutor executor = new ThreadPoolTaskExecutor();
        executor.setCorePoolSize(2);
        executor.setMaxPoolSize(2);
        executor.setQueueCapacity(500);
        executor.setThreadNamePrefix("ProcessUsers-");
        executor.initialize();
        return executor;
    

在这里你可以在你的服务中使用异步任务来处理用户

@Service
public class UserProcessingService 


    private final AnotherUserProcessingService service;
    @Autowired
    public UserProcessingService (AnotherUserProcessingService service) 
        this.service= service;
    

    @Async
    public CompletableFuture<List<User>> processUser(List<User> users) throws InterruptedException 

        users.forEach(user -> logger.info("Processing " + user));
        List<User> usersListResult = service.process(users);
        // Artificial delay of 1s for demonstration purposes
        Thread.sleep(1000L);
        return CompletableFuture.completedFuture(usersListResult);
    


processUser(User user) 带有@Async 注释,表示该方法将根据上面提供的taskExecutor 配置在单独的线程中运行。 @EnableAsync 启用 Spring 在后台线程中运行任何带有 @Async 注释的方法。 并确保使用异步任务处理用户的服务必须在@Configuration 类中创建或由@ComponentScan 获取。您可以根据需要自定义您的taskExecutor

您可以在这里找到ThreadPoolTaskExecutor 的工作原理。

【讨论】:

我有用户列表,如何将用户列表传递给异步任务并在异步任务线程池中一一迭代 嗨,这里如果我将池大小设置为 10,并且所有线程将从头开始迭代用户列表,如何跟踪标记,所以每个线程从用户列表中选择不同的用户 我需要接收该请求并分配给 asnyc 任务然后返回 202 在我处理的异步任务中,如果我以某种异步方式提供任何错误.. 由于我返回 202 我可能会在处理第一个请求时收到更多请求,所以它会在队列中还是需要分开处理..【参考方案2】:

https://github.com/softnrajkumar1994/multithreading-example

重要

1) 而不是 1000 个用户在一个请求中,请将这些用户列表作为小块发送

public class ThreadManager 

    private static ThreadPoolExecutor stpe = null;


    static 
         /**
         *
         *  corePoolSize --->the number of threads to keep in the pool, even
         *        if they are idle, unless @code allowCoreThreadTimeOut is set
         *        
         *  maximumPoolSize --- >the maximum number of threads to allow in the
         *        pool
         *        
         *  keepAliveTime---> when the number of threads is greater than
         *        the core, this is the maximum time that excess idle threads
         *        will wait for new tasks before terminating.
         *        
         *  unit the time unit for the @code keepAliveTime argument
         *  
         *  workQueue the queue to use for holding tasks before they are
         *        executed.  This queue will hold only the @code Runnable
         *        tasks submitted by the @code execute method.

         */
        stpe = new ThreadPoolExecutor(5, 10, 1000, TimeUnit.MILLISECONDS, new LinkedBlockingQueue<>(1090));
        System.out.println("THREAD MANAGER INTIALIZED SUCCESSFULLY");
    

    public static void execute(Runnable task) 
        stpe.execute(task);
    

上述类将接收可运行的任务,并与线程池的空闲线程一起执行。

示例用户类:

 public class User 

    private String name;
    private String mobile;
    private String email;

    public String getName() 
        return name;
    

    public void setName(String name) 
        this.name = name;
    

    public String getMobile() 
        return mobile;
    

    public void setMobile(String mobile) 
        this.mobile = mobile;
    

    public String getEmail() 
        return email;
    

    public void setEmail(String email) 
        this.email = email;
    



@RestController
public class UserController 

    @PostMapping("/users")
    public void Add(@RequestBody List<User> users) throws Exception 
        /**
         * Here we are rotating user's list and assigning each and every user into a
         * separate worker thread, so that work will be done parallely
         */
        for (User user : users) 
            try 
                ThreadManager.execute(new UserWork(user));
             catch (Exception e) 
                e.printStackTrace();
            
        
    


用于处理用户对象的自定义可运行工作程序类。您是否在可运行方法中实现业务。

  public class UserWork implements Runnable 



    private User user;

    public UserWork(User user) 
        this.user = user;
    

    @Override
    public void run() 
        // Please add your businees logic here
// Here I am iterating users and writing one by one to different message topic based on the type
 // if any error in the given user while writing to message topic I am storing that user in other DB
    


【讨论】:

以上是关于spring boot 处理大请求体 - 通过多线程处理批量的主要内容,如果未能解决你的问题,请参考以下文章

Spring Boot 2从入门到入坟 | 请求参数处理篇:常用参数注解之@RequestBody

Spring Boot 简单的请求示例(包括请求体验证)

spring boot aop打印http请求回复日志包含请求体

Spring-boot-routeController接收参数的几种方式

Spring-Boot:同时处理多个请求

Spring Boot-全局异常处理