如何在 for 循环中并行处理。我的代码不正确地并行操作

Posted

技术标签:

【中文标题】如何在 for 循环中并行处理。我的代码不正确地并行操作【英文标题】:How do I parallel process in a for loop. My code parallels the operation improperly 【发布时间】:2022-01-21 10:43:08 【问题描述】:

OpenMp 初学者。我在 AMD 3970X 上运行。 我有一个循环问题,多个线程在同一个迭代中工作。 该代码应该适用于 mpq_t 总和为 1/n,n=1..100。 输出是 n、threadid、num、den、repeating、non-repeating 和 time。 对不起,如果这很长。 我已经使用了 2 个 pragma 语句的位置,但没有成功。

我按照建议更改了代码,现在出现编译错误:

digits.cc:193:54: error: user defined reduction not found for 'sum'
  193 | #pragma omp for schedule(static, chunk), reduction(+:sum)

// digits.cpp - implementation of digits with libgmp

// 20111123 tomdean - initial conversion from digits.c

// The rational number, sum(1/k,k=1..n) has three components,
// An integer >= 1, s1..ss
// A sequence of non-repeating digits, n1..nn
// A sequence of repeating digits, r1..rr
// The sum is s1..ss . n1..nn r1..rr r1..rr ...

// Calculating the number of integers, non-repeating digits, and,
// repeating digits for n > 83 requires hours#pragma omp for schedule(static, chunk), reduction(sum:+)
 of computer time on a
// 4.2GHz Core i7-3930k.

// The algorithm is simple, all values are integers and thus exact.

// From the rational number, sum, extract the numerator and denominator.
// Calculate q = nu/de, r = q - q*de.
// checked saved_r for r.  If found, return
// if num_saved_r < SAVE_MAX, save r in r_saved[num_saved_r++]
// 

#include <iostream>
#include <gmpxx.h>
#include <sys/time.h> // time calculations
#include <stdlib.h>   // exit
#include <unistd.h>   // getopt
#include <omp.h>      // openmp
#define NUM_CHK 50
#pragma omp for schedule(static, chunk), reduction(sum:+)

using namespace std;

///////////////////////////////////////////////////////////////////////
// usage
void usage(char *me) 
    cout << "Usage: " << me << " [-f <n>] [-t <m>] [-h]" << endl
         << "Where:" << endl
         << "  -f <n> - from <n> default 10" << endl
         << "  -t <n> - to <n>   default n+10" << endl
         << "  -h     - show this message" << endl;
    return;


////////////////////////////////////////////////////////////
// calc_time
double calc_time( timeval *start,  timeval *stop) 
    long sec, usec;
    sec  = stop->tv_sec  - start->tv_sec;
    usec = stop->tv_usec - start->tv_usec;
    if (usec < 0) 
        sec--;
        usec += 1000000L;
    
    return (double)sec + ((double) usec)/1.0E6;


/////////////////////////////////////////////////////////////////////
// calc_digits
void calc_digits(int idx, mpq_class sum) 
    long chkidx = 0;
    long m;
    mpz_t tnu, de, nu, q, r;
    mpz_t chk[NUM_CHK]; // saved r
    unsigned long s;
    struct timeval start, stop;
    struct timezone zone;
    int tid;

    tid = omp_get_thread_num();

    mpz_inits(nu, de, q, r, tnu, NULL);
    mpz_set(nu, sum.get_num_mpz_t());
    mpz_set(de, sum.get_den_mpz_t());

    for (s=0; s<NUM_CHK; s++) 
        mpz_init(chk[s]);
    
    (void)gettimeofday(&start, &zone);
    //cout << nu << " " << de << endl;
    chkidx = 0;
    // q = nu/de;  r = nu - q*de;
    mpz_fdiv_qr(q, r, nu, de);
    s = 1;
    mpz_set(chk[chkidx], r);
    // cout << "init "
    //     << nu << ' '
    //     << de << ' '
    //     << q << ' '
    //     << r << ' '
    //     << chkidx << ' '
    //     << chk[chkidx] <<  endl;
    chkidx++;
    m = -1;
    while (1) 
        mpz_mul_si(tnu, r, 10L);
        //tq = tnu/de;  r = tnu - tq*de;
        mpz_fdiv_r(r,tnu,de);
        
            long idx;
            m = -1;
            for (idx=0; idx<chkidx; idx++) 
                if (mpz_cmp(r, chk[idx]) == 0) m = idx;
            
        
        if (m >= 0) break;
        s++;
        if (chkidx < NUM_CHK) 
            mpz_set(chk[chkidx], r);
            // cout << "loop "/
            //     << tnu << ' '
            //     << de << ' '
            //     << q << ' '
            //     << r << ' '
            //     << chkidx << ' '
            //     << chk[chkidx] <<  endl;
            chkidx++;
        
    
    // at this point, m is num non-recurring
    //                s is the number of iterations, the total digit count
    (void)gettimeofday(&stop, &zone);
    cout << idx << ' ' << ' ' << tid << ' ' << nu
         << ' ' << de
        //<< ' ' << nu/de
         << ' ' << m
         << ' ' << s-(unsigned long long)m
         << ' ' << calc_time(&start,&stop) << endl;

    mpz_clears(q, r, tnu, NULL);
    for (s=0; s<NUM_CHK; s++) mpz_clear(chk[s]);



/////////////////////////////////////////////////////////////////////
// main
int main(int argc, char **argv) 
    long idx;
    long long from, to;
    time_t now = time(NULL);
    char ch;
    int chunk = 60;

    from = 10;   // pre-calc sum(1/k,k=1..9) start processing at k=10
    to   = 100;  // sum(1/k,k=1..100)
  
    // check optional arguments
    while ((ch = getopt(argc, argv, "hf:t:")) != -1) 
        switch (ch) 
        case 'f':
            if (sscanf(optarg, "%Ld", &from) != 1) 
                usage(argv[0]);
                return 0;
            
            break;
        case 't':
            if (sscanf(optarg, "%Ld", &to) != 1) 
                usage(argv[0]);
                return 0;
            
            break;
        case 'h':
        default:
            usage(argv[0]);
            return 0;
        
    

    cout << '#' << endl;
    cout << "# Calculate sum(1/k,k=1..n) for n = 1 to 100." << endl;
    cout << '#' << endl;
    cout << "# Columns are: N" << endl;
    cout << "#              Numerator" << endl;
    cout << "#              Denominator" << endl;
    cout << "#              Number of non-recurringdigits" << endl;
    cout << "#              Number of recurring digits." << endl;
    cout << "#              Elapsed time in seconds." << endl;
    cout << '#' << endl;
    cout << "# Started " << ctime(&now); // ctime is /n/0 terminated
    cout << '#' << endl;

    mpq_class sum(1,1);
    mpz_t nu, de;
    mpz_inits(nu, de, NULL);
  
    // advance to n = from
    for (idx=2; idx<from; idx++) 
        sum += mpq_class(1,idx);
    
    //cout << sum << endl;

    // calculate to n = 100
#pragma omp parallel default(shared)
#pragma omp for schedule(static, chunk) reduction(+:sum)
    for (idx=from; idx<to+1; idx++) 
        //cout << idx << " ";
        sum += mpq_class(1,idx);
        //cout << sum << endl;
        calc_digits(idx, sum);
        //mpz_set(nu, sum.get_num_mpz_t());
        //mpz_set(de, sum.get_den_mpz_t());
        //calc_digits(nu,de);
    

    mpz_clears(nu, de, NULL);
    // digits.cc -
    return 0;

【问题讨论】:

在第 78 次迭代中,线程 11 和 20 似乎在处理相同的数据并产生相同的输出。似乎缺少第 77 次迭代。迭代 82 也有类似的问题,似乎缺少迭代。 【参考方案1】:
    您需要将for 循环标记为#pragma omp for,以便迭代分布在线程上。或者,您可以将其与 omp parallel 结合使用。 下一个问题是您的sum += .... 语句。由于所有线程都访问sum 变量,因此您需要将并行循环标记为reduction(+:sum)omp schedule 本身不执行任何操作。将其放在omp for 行。

【讨论】:

4.请注意,由于cout &lt;&lt; xxx &lt;&lt; yyy &lt;&lt; zzz;,您的输出可能会有所不同。我建议在将字符串传递给 cout 之前编写字符串。 我更改了代码:``` // 计算为 n = 100 #pragma omp parallel default(shared) shared(idx, sum) #pragma omp for schedule(static, chunk), reduction( sum:+) for (idx=from; idx @tomdean1939 请为您的原始帖子添加更新,此处无法阅读代码。 @Laci 好点。使用stringstream ss; ss &lt;&lt; stuff ; ss &lt;&lt; morestuff; cout &lt;&lt; ss.str(); @tomdean1939 抱歉:应该是reduction(+:sum)。您是否有可以查找此类内容的书籍或教程?

以上是关于如何在 for 循环中并行处理。我的代码不正确地并行操作的主要内容,如果未能解决你的问题,请参考以下文章

如何使用 PySpark 并行化我的文件处理程序

是否可以在同一存储过程中并行处理for循环?

在 Python 中通过线程/核心/节点并行化 for 循环

运行 SYCL 代码时结果不正确。在尝试并行化循环时

在 OpenMP 中,我们如何并行运行多个代码块,其中每个代码块包含 omp single 和 omp for 循环?

在cython中并行化for循环:超越prange