如何编写一个函数来并行查找大于 N 的值

Posted

技术标签:

【中文标题】如何编写一个函数来并行查找大于 N 的值【英文标题】:How to write a function to find a value bigger than N in parallel 【发布时间】:2021-04-27 17:13:55 【问题描述】:

所以我有一个函数可以在一个大的 未排序的数字数组如下所示。

import java.util.*;

public class program 

    // Linear-search function to find the index of an element
    public static int findIndex(int arr[], int t)
    
        // if array is Null
        if (arr == null) 
            return -1;
        

        // find length of array
        int len = arr.length;
        int i = 0;

        // traverse in the array
        while (i < len) 

            // if the i-th element is t
            // then return the index
            if (arr[i] > t) 
                return i;
            
            else 
                i = i + 1;
            
        
        return -1;
        

   // Driver Code
   public static void main(String[] args)
   
      int[] my_array =  5, 4, 6, 1, 3, 2, 7, 8, 9 ;

      int i = findIndex(my_array, 7);
       // find the index of 5
       System.out.println("Index position of 5 is: "
                    + my_array[i]);
   

但我必须找到一种方法来并行实现这一点。我不知道如何开始或做什么,因为我是并行编程领域的新手。

我们将不胜感激。

【问题讨论】:

【参考方案1】:

您可以使用并行流来查找使流并行。

public static int findIndex(int arr[], int t)

    // if array is Null
    if (arr == null) 
        return -1;
    

    return IntStream.rangeClosed(0, arr.length)
                    .parallel()
                    .filter(index -> arr[index] > t)
                    .findFirst()
                    .orElse(-1);

如果你是出于学习目的,可以将数组分成子数组,然后将每个部分提交到单独的线程,然后计算并返回结果。

首先,实现一个可以获取子数组并返回索引的可​​调用对象。

class RangeFinder implements Callable<Integer> 
    private final int[] arr;
    private final int startIndex;
    private final int t;

    RangeFinder(int[] arr, int startIndex, int t) 
        this.arr = arr;
        this.startIndex = startIndex;
        this.t = t;
    


    @Override
    public Integer call() throws Exception 
        for (int i = 0; i < this.arr.length; i++) 
            if (arr[i] > t) 
                return startIndex + i;
            
        
        return -1;
    

现在您可以将数组分成块并提交以进行并行处理。

public static int findIndex(int arr[], int t) throws ExecutionException, InterruptedException 
    if (arr == null) 
        return -1;
    

    int numberOfThreads = 4;
    return findIndexInParallel(arr, t, numberOfThreads);


private static int findIndexInParallel(int[] arr, int t, int threadCount) throws ExecutionException, InterruptedException 
    if(threadCount > arr.length) 
        threadCount = arr.length;
    

    int startIndex = 0;
    int range = (int) Math.ceil(arr.length / (double) threadCount);
    int endIndex = range;

    // step 1: create threads using Executor FrameWork
    ExecutorService executorService = Executors.newFixedThreadPool(threadCount);
    List<Future> futures = new ArrayList<>();
    for (int i = 0; i < threadCount; i++) 

        // step 2: create object of callable
        RangeFinder rangeFinder = new RangeFinder(Arrays.copyOfRange(arr, startIndex, endIndex), startIndex, t);

        // Step 3: submit the task to thread pool
        Future<Integer> submit = executorService.submit(rangeFinder);

        // Step 4: recalculate array indexes for the next iteration
        startIndex = startIndex + range;
        int newEndIndex = endIndex + range;
        endIndex = newEndIndex < arr.length ? newEndIndex : arr.length;
        futures.add(submit);
    

    // step 5: evaluate and return the results
    for (Future future : futures) 
        int index = (int) future.get();
        if(index != -1) 
            executorService.shutdownNow();
            return index;
        
    
    return -1;

上面的代码可以进一步重构,当然你也需要处理异常。

【讨论】:

一开始我也考虑过使用Callable,但问题是当一个线程找到该值时,你将不得不等待future.get @dreamcrash 感谢您的编辑。是的,完全同意你的看法。我也不会在产品代码中使用它,但似乎 OP 正在尝试解决学术任务。【参考方案2】:

最直接的方法是使用@Govinda Sakhare 很好地说明的Parallel Stream。

但是,如果您想将此示例用作学习如何使用线程的一种方式,请按照以下步骤并行化您的代码:

    创建线程; 将工作分配给线程,每个线程将尝试找到一个大于作为参数传递的值但仅用于数组的一部分的值; 找到该值的第一个线程会通知其他线程,以便每个线程都退出。

要创建线程,我们可以执行以下操作:

Thread[] threads = new Thread[total_threads];
for(int t = 0; t < threads.length; t++) 
    threads[t] = new Thread(/** the parallel work **/);
    threads[t].start();

要将工作分配给线程,我们需要在线程之间拆分数组。最简单的方法实际上是拆分迭代而不是数组本身。线程接收整个数组,但只使用它的一些位置,例如:

private final static int NO_FOUND = -1;

// Linear-search function to find the index of an element
public static int findIndex(int[] arr, int t, int threadId, int total_threads)
    for (int i = threadId; i < arr.length; i += total_threads)
        if ( arr[i] > t)
            return i;
    return NO_FOUND;

我们为每个线程分配一个从0 to N-1 开始的ID 范围,N 是线程总数。

为了通知线程,我们可以在线程之间使用共享的原子整数,用于更新找到的值的索引。所以最终的代码如下所示:

public class program 
    private final static int NO_FOUND = -1;

    // Linear-search function to find the index of an element
    public static int findIndex(int[] arr, int t, int threadId, int total_threads, AtomicInteger shared_index)
        for (int i = threadId; i < arr.length && shared_index.get() == -1; i += total_threads)
            if ( arr[i] > t)
                return i;
        return NO_FOUND;
    

    public static void main(String[] args) throws InterruptedException 
        final int N = 8;
        int[] my_array =  5, 4, 6, 1, 3, 2, 7, 8, 9 ;
        int total_threads = 4;

        AtomicInteger shared_index = new AtomicInteger(-1);

        Thread[] threads = new Thread[total_threads];
        for(int t = 0; t < threads.length; t++) 
            final int thread_id = t;
            threads[t] = new Thread(() ->parallel_work(N, my_array, total_threads, shared_index, thread_id));
            threads[t].start();
        

        for (Thread thread : threads) 
            thread.join();
        
        System.out.println("Index of value bigger than " + N + " : " + shared_index.get());
    

    private static void parallel_work(int n, int[] my_array, int total_threads, AtomicInteger shared_index, int thread_id) 
        int index_found = findIndex(my_array, n, thread_id, total_threads, shared_index);
        shared_index.compareAndExchange(NO_FOUND, index_found);
    

输出:

Index of value bigger than 8 : 8

【讨论】:

以上是关于如何编写一个函数来并行查找大于 N 的值的主要内容,如果未能解决你的问题,请参考以下文章

在二叉搜索树中查找中位数的错误

如何编写 R 函数来查找数据框中的特定条件

如何编写通用的 memoize 函数?

需要帮助编写一个函数来查找 N 维数组 C++ 中的最大值和最小值

如何初始化worker以并行使用包函数

2021-09-15:最长公共前缀。编写一个函数来查找字符串数组中的最长公共前缀,如果不存在公共前缀,返回空字符串 ““。力扣14。