并行处理传入数据的最佳方法? (Java/Groovy)
Posted
技术标签:
【中文标题】并行处理传入数据的最佳方法? (Java/Groovy)【英文标题】:Best way to process incoming data in parallel? (Java/Groovy) 【发布时间】:2021-03-27 08:29:25 【问题描述】:我有一个程序可以解析来自多个设备(大约 1000 个设备)的报告,将它们保存到数据库,然后对它们进行额外处理。
解析报告可以同时完成,但保存到数据库和额外的处理需要根据它们来自的设备 ID 进行一些同步(因为可能需要更新数据库上的相同数据)。
因此,只要线程处理来自不同设备 ID 的报告,我就可以并行运行处理。
最有效的处理方法是什么?
示例
我最初考虑使用线程池并锁定设备 ID,但如果我收到来自单个设备的大量报告,那将不会有效。
例如,考虑一个有 4 个线程和 10 个传入报告的线程池:
Report # | DeviceID |
---|---|
1 | A |
2 | A |
3 | A |
4 | A |
5 | A |
6 | B |
7 | C |
8 | D |
9 | E |
10 | F |
线程 1 将开始处理 A 的报告,线程 2-4 将等到线程 1 完成,其余的报告将排队。
如果可以将 A 的其余报告排队,这样可以更有效地处理 B/C/D 报告。有没有有效的方法来做到这一点?
【问题讨论】:
只是一个建议,而不是像下面那些经过深思熟虑的答案,但是您可以在每个线程内使用一个大线程池(如果所有线程同时运行,其大小可以避免系统过载)在设备 ID 上同步(作为实习生字符串或类似字符串)。可能不如建议的答案那么有效,但要简单得多。 【参考方案1】:织机项目
在观看了 2020 年末在 YouTube.com 上与 Oracle Project Loom 负责人 Ron Pressler 的一些视频后,解决方案非常简单,新的虚拟线程(纤程)功能将在 Java 的未来版本中推出:
调用一个新的Executors
方法来创建一个使用虚拟线程(纤程)而不是平台/内核线程的executor service。
将所有传入的报告处理任务提交给该执行器服务。
在每个任务中,尝试获取一个semaphore,为您的 1000 台设备中的每台设备提供一个信号量。
该信号量将是一次仅处理每个设备一个输入的方式,以并行化每个源设备。如果代表特定设备的信号量不可用,只需阻塞 - 让您的报告处理线程等待,直到信号量可用。
Project Loom 维护着许多轻量级虚拟线程(纤程),甚至数百万个,它们在几个重量级平台/内核线程上运行。这使得阻塞线程变得便宜。
Early builds of a JDK binary with Project Loom 内置 macOS/Linux/Windows 现已推出。
警告:我不是并发专家,也不是 Project Loom。但您的特定用例似乎与 Ron Pressler 在他的视频中提出的一些具体建议相匹配。
示例代码
这里是一些我参考过的示例代码。我不确定这是否是一个很好的例子。
我使用了 Java 16 的早期访问版本,专门使用 Project Loom 技术构建:Build 16-loom+9-316 (2020/11/30)
for macOS Intel。
package work.basil.example;
import java.time.*;
import java.util.*;
import java.util.concurrent.*;
/**
* An example of using Project Loom virtual threads to more simply process incoming data on background threads.
* <p>
* This code was built as a possible solution to this Question at ***.com: https://***.com/q/65327325/642706
* <p>
* Posted in my Answer at ***.com: https://***.com/a/65328799/642706
* <p>
* ©2020 Basil Bourque. 2020-12.
* <p>
* This work by Basil Bourque is licensed under CC BY 4.0. To view a copy of this license, visit https://creativecommons.org/licenses/by/4.0
* <p>
* Caveats:
* - Project Loom is still in early-release, available only as a special build of OpenJDK for Java 16.
* - I am *not* an expert on concurrency in general, nor Project Loom in particular. This code is merely me guessing and experimenting.
*/
public class App
// FYI, Project Loom links:
// https://wiki.openjdk.java.net/display/loom/Main
// http://jdk.java.net/loom/ (special early-access builds of Java 16 with Project Loom built-in)
// https://download.java.net/java/early_access/loom/docs/api/ (Javadoc)
// https://www.youtube.com/watch?v=23HjZBOIshY (Ron Pressler talk, 2020-07)
public static void main ( String[] args )
System.out.println( "java.version: " + System.getProperty( "java.version" ) );
App app = new App();
app.checkForProjectLoom();
app.demo();
public static boolean projectLoomIsPresent ( )
try
Thread.class.getDeclaredMethod( "startVirtualThread" , Runnable.class );
return true;
catch ( NoSuchMethodException e )
return false;
private void checkForProjectLoom ( )
if ( App.projectLoomIsPresent() )
System.out.println( "INFO - Running on a JVM with Project Loom technology. " + Instant.now() );
else
throw new IllegalStateException( "Project Loom technology not present in this Java implementation. " + Instant.now() );
record ReportProcessorRunnable(Semaphore semaphore , Integer deviceIdentifier , boolean printToConsole , Queue < String > fauxDatabase) implements Runnable
@Override
public void run ( )
// Our goal is to serialize the report-processing per device.
// Each device can have only one report being processed at a time.
// In Project Loom this can be accomplished simply by spawning virtual threads for all such
// reports but process them serially by synchronizing on a binary (single-permit) semaphore.
// Each thread working on a report submitted for that device waits on semaphore assigned to that device.
// Blocking to wait for the semaphore is cheap in Project Loom using virtual threads. The underlying
// platform/kernel thread carrying this virtual thread will be assigned other work while this
// virtual thread is parked.
try
semaphore.acquire(); // Blocks until the semaphore for this particular device becomes available. Blocking is cheap on a virtual thread.
// Simulate more lengthy work being done by sleeping the virtual thread handling this task via the executor service.
try Thread.sleep( Duration.ofMillis( 100 ) ); catch ( InterruptedException e ) e.printStackTrace();
String fauxData = "Insert into database table for device ID # " + this.deviceIdentifier + " at " + Instant.now();
fauxDatabase.add( fauxData );
if ( Objects.nonNull( this.printToConsole ) && this.printToConsole ) System.out.println( fauxData );
semaphore.release(); // For fun, comment-out this line to see the effect of the per-device semaphore at runtime.
catch ( InterruptedException e )
e.printStackTrace();
record IncomingReportsSimulatorRunnable(Map < Integer, Semaphore > deviceToSemaphoreMap ,
ExecutorService reportProcessingExecutorService ,
int countOfReportsToGeneratePerBatch ,
boolean printToConsole ,
Queue < String > fauxDatabase)
implements Runnable
@Override
public void run ( )
if ( printToConsole ) System.out.println( "INFO - Generating " + countOfReportsToGeneratePerBatch + " reports at " + Instant.now() );
for ( int i = 0 ; i < countOfReportsToGeneratePerBatch ; i++ )
// Make a new Runnable task containing report data to be processed, and submit this task to the executor service using virtual threads.
// To simulate a device sending in a report, we randomly pick one of the devices to pretend it is our source of report data.
final List < Integer > deviceIdentifiers = List.copyOf( deviceToSemaphoreMap.keySet() );
int randomIndexNumber = ThreadLocalRandom.current().nextInt( 0 , deviceIdentifiers.size() );
Integer deviceIdentifier = deviceIdentifiers.get( randomIndexNumber );
Semaphore semaphore = deviceToSemaphoreMap.get( deviceIdentifier );
Runnable processReport = new ReportProcessorRunnable( semaphore , deviceIdentifier , printToConsole , fauxDatabase );
reportProcessingExecutorService.submit( processReport );
private void demo ( )
// Configure experiment.
Duration durationOfExperiment = Duration.ofSeconds( 20 );
int countOfReportsToGeneratePerBatch = 7; // Would be 40 per the Stack Overflow Question.
boolean printToConsole = true;
// To use as a concurrent list, I found this suggestion to use `ConcurrentLinkedQueue`: https://***.com/a/25630263/642706
Queue < String > fauxDatabase = new ConcurrentLinkedQueue < String >();
// Represent each of the thousand devices that are sending us report data to be processed.
// We map each device to a Java `Semaphore` object, to serialize the processing of multiple reports per device.
final int firstDeviceNumber = 1_000;
final int countDevices = 10; // Would be 1_000 per the Stack Overflow question.
final Map < Integer, Semaphore > deviceToSemaphoreMap = new TreeMap <>();
for ( int i = 0 ; i < countDevices ; i++ )
Integer deviceIdentifier = i + firstDeviceNumber; // Our devices are identified as numbered 1,000 to 1,999.
Semaphore semaphore = new Semaphore( 1 , true ); // A single permit to make a binary semaphore, and make it fair.
deviceToSemaphoreMap.put( deviceIdentifier , semaphore );
// Run experiment.
// Notice that in Project Loom the `ExecutorService` interface is now `AutoCloseable`, for use in try-with-resources syntax.
try (
ScheduledExecutorService reportGeneratingExecutorService = Executors.newSingleThreadScheduledExecutor() ;
ExecutorService reportProcessingExecutorService = Executors.newVirtualThreadExecutor() ;
)
Runnable simulateIncommingReports = new IncomingReportsSimulatorRunnable( deviceToSemaphoreMap , reportProcessingExecutorService , countOfReportsToGeneratePerBatch , printToConsole , fauxDatabase );
ScheduledFuture scheduledFuture = reportGeneratingExecutorService.scheduleAtFixedRate( simulateIncommingReports , 0 , 1 , TimeUnit.SECONDS );
try Thread.sleep( durationOfExperiment ); catch ( InterruptedException e ) e.printStackTrace();
// Notice that when reaching this point we block until all submitted tasks still running are finished,
// because that is the new behavior of `ExecutorService` being `AutoCloseable`.
System.out.println( "INFO - executor services shut down at this point. " + Instant.now() );
// Results of experiment
System.out.println( "fauxDatabase.size(): " + fauxDatabase.size() );
System.out.println( "fauxDatabase = " + fauxDatabase );
【讨论】:
这很有趣,可以简化事情。不幸的是,我们暂时还停留在 JDK 8 上,但我会继续关注它。谢谢。【参考方案2】:尝试使用优先队列。队列中最高优先级的项目将被线程池选择处理。例如:
注意:我知道优先级队列通常不使用数组来实现,并且一些优先级队列使用较小的索引值来获得较高的优先级。为了简单起见,我只是使用这种表示法。
让(设备 ID,优先级)。 让当前线程池为空 -> []
说,我们收到了 10 份报告 -> [(A, 1), (A, 1), (A, 1), (B, 1), (B, 1), (C, 1), (D, 1), (E, 1), ( F, 1), (G, 1)] (表示收到报告时已填满的优先级队列)。
因此,您将第一个项目出列并将其提供给线程池。然后递减优先级队列中设备 ID A 的所有项目的优先级。如下所示:
(A, 1) 已出列,因此您只需获得 A。优先级队列将在递减仍在队列中的 A 的优先级后移动。 [(B, 1), (B, 1), (C, 1), (D, 1), (E, 1), (F, 1), (G, 1), (A, 0), ( A, 0)]
【讨论】:
感谢您的想法,但更新队列实际上并不可行,因为我们每秒可能会收到大约 40 个报告。以上是关于并行处理传入数据的最佳方法? (Java/Groovy)的主要内容,如果未能解决你的问题,请参考以下文章