并行处理传入数据的最佳方法? (Java/Groovy)

Posted

技术标签:

【中文标题】并行处理传入数据的最佳方法? (Java/Groovy)【英文标题】:Best way to process incoming data in parallel? (Java/Groovy) 【发布时间】:2021-03-27 08:29:25 【问题描述】:

我有一个程序可以解析来自多个设备(大约 1000 个设备)的报告,将它们保存到数据库,然后对它们进行额外处理。

解析报告可以同时完成,但保存到数据库和额外的处理需要根据它们来自的设备 ID 进行一些同步(因为可能需要更新数据库上的相同数据)。

因此,只要线程处理来自不同设备 ID 的报告,我就可以并行运行处理。

最有效的处理方法是什么?

示例

我最初考虑使用线程池并锁定设备 ID,但如果我收到来自单个设备的大量报告,那将不会有效。

例如,考虑一个有 4 个线程和 10 个传入报告的线程池:

Report # DeviceID
1 A
2 A
3 A
4 A
5 A
6 B
7 C
8 D
9 E
10 F

线程 1 将开始处理 A 的报告,线程 2-4 将等到线程 1 完成,其余的报告将排队。

如果可以将 A 的其余报告排队,这样可以更有效地处理 B/C/D 报告。有没有有效的方法来做到这一点?

【问题讨论】:

只是一个建议,而不是像下面那些经过深思熟虑的答案,但是您可以在每个线程内使用一个大线程池(如果所有线程同时运行,其大小可以避免系统过载)在设备 ID 上同步(作为实习生字符串或类似字符串)。可能不如建议的答案那么有效,但要简单得多。 【参考方案1】:

织机项目

在观看了 2020 年末在 YouTube.com 上与 Oracle Project Loom 负责人 Ron Pressler 的一些视频后,解决方案非常简单,新的虚拟线程(纤程)功能将在 Java 的未来版本中推出:

调用一个新的Executors 方法来创建一个使用虚拟线程(纤程)而不是平台/内核线程的executor service。 将所有传入的报告处理任务提交给该执行器服务。 在每个任务中,尝试获取一个semaphore,为您的 1000 台设备中的每台设备提供一个信号量。

该信号量将是一次仅处理每个设备一个输入的方式,以并行化每个源设备。如果代表特定设备的信号量不可用,只需阻塞 - 让您的报告处理线程等待,直到信号量可用。

Project Loom 维护着许多轻量级虚拟线程(纤程),甚至数百万个,它们在几个重量级平台/内核线程上运行。这使得阻塞线程变得便宜。

Early builds of a JDK binary with Project Loom 内置 macOS/Linux/Windows 现已推出。

警告:我不是并发专家,也不是 Project Loom。但您的特定用例似乎与 Ron Pressler 在他的视频中提出的一些具体建议相匹配。

示例代码

这里是一些我参考过的示例代码。我不确定这是否是一个很好的例子。

我使用了 Java 16 的早期访问版本,专门使用 Project Loom 技术构建:Build 16-loom+9-316 (2020/11/30) for macOS Intel。

package work.basil.example;

import java.time.*;
import java.util.*;
import java.util.concurrent.*;

/**
 * An example of using Project Loom virtual threads to more simply process incoming data on background threads.
 * <p>
 * This code was built as a possible solution to this Question at ***.com: https://***.com/q/65327325/642706
 * <p>
 * Posted in my Answer at ***.com: https://***.com/a/65328799/642706
 * <p>
 * ©2020 Basil Bourque. 2020-12.
 * <p>
 * This work by Basil Bourque is licensed under CC BY 4.0. To view a copy of this license, visit https://creativecommons.org/licenses/by/4.0
 * <p>
 * Caveats:
 * - Project Loom is still in early-release, available only as a special build of OpenJDK for Java 16.
 * - I am *not* an expert on concurrency in general, nor Project Loom in particular. This code is merely me guessing and experimenting.
 */
public class App

    // FYI, Project Loom links:
    // https://wiki.openjdk.java.net/display/loom/Main
    // http://jdk.java.net/loom/  (special early-access builds of Java 16 with Project Loom built-in)
    // https://download.java.net/java/early_access/loom/docs/api/ (Javadoc)
    // https://www.youtube.com/watch?v=23HjZBOIshY  (Ron Pressler talk, 2020-07)

    public static void main ( String[] args )
    
        System.out.println( "java.version: " + System.getProperty( "java.version" ) );
        App app = new App();
        app.checkForProjectLoom();
        app.demo();
    

    public static boolean projectLoomIsPresent ( )
    
        try
        
            Thread.class.getDeclaredMethod( "startVirtualThread" , Runnable.class );
            return true;
        
        catch ( NoSuchMethodException e )
        
            return false;
        
    

    private void checkForProjectLoom ( )
    
        if ( App.projectLoomIsPresent() )
        
            System.out.println( "INFO - Running on a JVM with Project Loom technology. " + Instant.now() );
         else
        
            throw new IllegalStateException( "Project Loom technology not present in this Java implementation. " + Instant.now() );
        
    

    record ReportProcessorRunnable(Semaphore semaphore , Integer deviceIdentifier , boolean printToConsole , Queue < String > fauxDatabase) implements Runnable
    
        @Override
        public void run ( )
        
            // Our goal is to serialize the report-processing per device.
            // Each device can have only one report being processed at a time.
            // In Project Loom this can be accomplished simply by spawning virtual threads for all such
            // reports but process them serially by synchronizing on a binary (single-permit) semaphore.
            // Each thread working on a report submitted for that device waits on semaphore assigned to that device.
            // Blocking to wait for the semaphore is cheap in Project Loom using virtual threads. The underlying
            // platform/kernel thread carrying this virtual thread will be assigned other work while this
            // virtual thread is parked.
            try
            
                semaphore.acquire(); // Blocks until the semaphore for this particular device becomes available. Blocking is cheap on a virtual thread.
                // Simulate more lengthy work being done by sleeping the virtual thread handling this task via the executor service.
                try Thread.sleep( Duration.ofMillis( 100 ) ); catch ( InterruptedException e ) e.printStackTrace();
                String fauxData = "Insert into database table for device ID # " + this.deviceIdentifier + " at " + Instant.now();
                fauxDatabase.add( fauxData );
                if ( Objects.nonNull( this.printToConsole ) && this.printToConsole )  System.out.println( fauxData ); 
                semaphore.release();  // For fun, comment-out this line to see the effect of the per-device semaphore at runtime.
            
            catch ( InterruptedException e )
            
                e.printStackTrace();
            
        
    

    record IncomingReportsSimulatorRunnable(Map < Integer, Semaphore > deviceToSemaphoreMap ,
                                            ExecutorService reportProcessingExecutorService ,
                                            int countOfReportsToGeneratePerBatch ,
                                            boolean printToConsole ,
                                            Queue < String > fauxDatabase)
            implements Runnable
    
        @Override
        public void run ( )
        
            if ( printToConsole ) System.out.println( "INFO - Generating " + countOfReportsToGeneratePerBatch + " reports at " + Instant.now() );
            for ( int i = 0 ; i < countOfReportsToGeneratePerBatch ; i++ )
            
                // Make a new Runnable task containing report data to be processed, and submit this task to the executor service using virtual threads.
                // To simulate a device sending in a report, we randomly pick one of the devices to pretend it is our source of report data.
                final List < Integer > deviceIdentifiers = List.copyOf( deviceToSemaphoreMap.keySet() );
                int randomIndexNumber = ThreadLocalRandom.current().nextInt( 0 , deviceIdentifiers.size() );
                Integer deviceIdentifier = deviceIdentifiers.get( randomIndexNumber );
                Semaphore semaphore = deviceToSemaphoreMap.get( deviceIdentifier );
                Runnable processReport = new ReportProcessorRunnable( semaphore , deviceIdentifier , printToConsole , fauxDatabase );
                reportProcessingExecutorService.submit( processReport );
            
        
    

    private void demo ( )
    
        // Configure experiment.
        Duration durationOfExperiment = Duration.ofSeconds( 20 );
        int countOfReportsToGeneratePerBatch = 7;  // Would be 40 per the Stack Overflow Question.
        boolean printToConsole = true;

        // To use as a concurrent list, I found this suggestion to use `ConcurrentLinkedQueue`: https://***.com/a/25630263/642706
        Queue < String > fauxDatabase = new ConcurrentLinkedQueue < String >();

        // Represent each of the thousand devices that are sending us report data to be processed.
        // We map each device to a Java `Semaphore` object, to serialize the processing of multiple reports per device.
        final int firstDeviceNumber = 1_000;
        final int countDevices = 10; // Would be 1_000 per the Stack Overflow question.
        final Map < Integer, Semaphore > deviceToSemaphoreMap = new TreeMap <>();
        for ( int i = 0 ; i < countDevices ; i++ )
        
            Integer deviceIdentifier = i + firstDeviceNumber; // Our devices are identified as numbered 1,000 to 1,999.
            Semaphore semaphore = new Semaphore( 1 , true ); // A single permit to make a binary semaphore, and make it fair.
            deviceToSemaphoreMap.put( deviceIdentifier , semaphore );
        

        // Run experiment.
        // Notice that in Project Loom the `ExecutorService` interface is now `AutoCloseable`, for use in try-with-resources syntax.
        try (
                ScheduledExecutorService reportGeneratingExecutorService = Executors.newSingleThreadScheduledExecutor() ;
                ExecutorService reportProcessingExecutorService = Executors.newVirtualThreadExecutor() ;
        )
        
            Runnable simulateIncommingReports = new IncomingReportsSimulatorRunnable( deviceToSemaphoreMap , reportProcessingExecutorService , countOfReportsToGeneratePerBatch , printToConsole , fauxDatabase );
            ScheduledFuture scheduledFuture = reportGeneratingExecutorService.scheduleAtFixedRate( simulateIncommingReports , 0 , 1 , TimeUnit.SECONDS );
            try Thread.sleep( durationOfExperiment ); catch ( InterruptedException e ) e.printStackTrace();
        
        // Notice that when reaching this point we block until all submitted tasks still running are finished,
        // because that is the new behavior of `ExecutorService` being `AutoCloseable`.
        System.out.println( "INFO - executor services shut down at this point. " + Instant.now() );

        // Results of experiment
        System.out.println( "fauxDatabase.size(): " + fauxDatabase.size() );
        System.out.println( "fauxDatabase = " + fauxDatabase );
    

【讨论】:

这很有趣,可以简化事情。不幸的是,我们暂时还停留在 JDK 8 上,但我会继续关注它。谢谢。【参考方案2】:

尝试使用优先队列。队列中最高优先级的项目将被线程池选择处理。例如:

注意:我知道优先级队列通常不使用数组来实现,并且一些优先级队列使用较小的索引值来获得较高的优先级。为了简单起见,我只是使用这种表示法。

让(设备 ID,优先级)。 让当前线程池为空 -> []

说,我们收到了 10 份报告 -> [(A, 1), (A, 1), (A, 1), (B, 1), (B, 1), (C, 1), (D, 1), (E, 1), ( F, 1), (G, 1)] (表示收到报告时已填满的优先级队列)。

因此,您将第一个项目出列并将其提供给线程池。然后递减优先级队列中设备 ID A 的所有项目的优先级。如下所示:

(A, 1) 已出列,因此您只需获得 A。优先级队列将在递减仍在队列中的 A 的优先级后移动。 [(B, 1), (B, 1), (C, 1), (D, 1), (E, 1), (F, 1), (G, 1), (A, 0), ( A, 0)]

【讨论】:

感谢您的想法,但更新队列实际上并不可行,因为我们每秒可能会收到大约 40 个报告。

以上是关于并行处理传入数据的最佳方法? (Java/Groovy)的主要内容,如果未能解决你的问题,请参考以下文章

使用 XMPP 处理传入消息的最佳方法

spring boot 并行传出请求的最佳实践是啥?

python - 如何在一个套接字上使用传入数据流来处理Python中的多个并行进程?

JMS 队列上多线程消息处理的最佳实践

在 Python 中划分大文件以进行多处理的最佳方法是啥?

使用 Sqoop 并行导入 Oracle 表的最佳方法是啥?