TBB 管道的错误输出

Posted

技术标签:

【中文标题】TBB 管道的错误输出【英文标题】:incorrect output with TBB pipeline 【发布时间】:2014-04-29 05:37:07 【问题描述】:

我在1.txt, 2.txt ... 100.txt等文本文件中写了一个不同值(100次)的C结构


我在 Linux 上使用英特尔 TBB。我已经创建了:

    InputFilter (serial_in_order MODE) TransformFIlter (serial_in_order 模式) OutputFilter(Serial_in_order 模式)

InputFilter 从文件中读取结构并将其传递给 TransformFilter。 TrasnformFilter 更新结构值并将其传递给 OutputFilter。 OutputFilter 将新结构写入光盘。

基本上,它是一个结构体的简单读写应用程序。

class InputFilter: public tbb::filter 
public:
    InputFilter( int );
    ~InputFilter();
private:
    int total_streams;
    int count;
    struct video_process_object input_obj;
    void* operator()( void* );
;

InputFilter::InputFilter( int x )
        : filter( serial_in_order ) 
    total_streams = x;
    count = 1;


InputFilter::~InputFilter() 
    total_streams = 0;


void* InputFilter::operator()( void* ) 
    char path[50] =  ;
    sprintf( path, "input//%d.txt", count );
    printf( "Path : %s\n", path );
    FILE *fp;
    fp = fopen( path, "r" );

    if( fp == NULL || count > total_streams ) 
        fclose( fp );
        printf( "\n*******Cannot find more data.Terminating********\n\n\n" );
        return NULL;
    

    fscanf( fp, "%d", &input_obj.video_id );
    fscanf( fp, "%s", &input_obj.storage_url );
    fscanf( fp, "%s", &input_obj.storage_type );
    fscanf( fp, "%d", &input_obj.face_detect );
    fscanf( fp, "%d", &input_obj.face_recognise );
    fscanf( fp, "%d", &input_obj.scene_recognise );
    fscanf( fp, "%d", &input_obj.activity_recognise );
    fscanf( fp, "%d", &input_obj.speech_recognise );
    fclose( fp );

    count++;
    return &input_obj;


class TransformFilter: public tbb::filter 
public:
    TransformFilter();
    ~TransformFilter();
private:
    struct video_process_object input_transform;
    void* operator()( void* );
;

TransformFilter::TransformFilter()
        : filter( serial_in_order ) 


TransformFilter::~TransformFilter() 


void* TransformFilter::operator()( void *item ) 

    input_transform = *static_cast<struct video_process_object*>( item );

    input_transform.video_id += 1000;
    strcat( input_transform.storage_url, "  nabeel" );
    strcat( input_transform.storage_type, " N" );
    input_transform.face_detect += 1000;
    input_transform.face_recognise += 1000;

    return &input_transform;


class OutputFilter: public tbb::filter 
public:
    OutputFilter();
    ~OutputFilter();
private:
    struct video_process_object output_obj;
    void* operator()( void* );
;

OutputFilter::OutputFilter()
        : filter( serial_in_order ) 
    int status = mkdir( "output", S_IRWXU | S_IRWXG | S_IRWXO );
    if( status == -1 )
        printf( "\nOutput directory already exists\n\n" );


OutputFilter::~OutputFilter() 


void* OutputFilter::operator()( void *item ) 

    output_obj = *static_cast<struct video_process_object*>( item );

    FILE *fp;

    char path[50] =  ;
    sprintf( path, "output//%d.txt", output_obj.video_id - 1000 );
    printf( "Output Path : %s\t\t %d\n\n", path, output_obj.video_id );

    if( (fp = fopen( path, "w" )) == NULL ) 
        fprintf( stderr, "Cannot open output file.\n" );
        return NULL;
    

    fprintf( fp, "%d\n", output_obj.video_id );
    fprintf( fp, "%s\n", output_obj.storage_url );
    fprintf( fp, "%s\n", output_obj.storage_type );
    fprintf( fp, "%d\n", output_obj.face_detect );
    fprintf( fp, "%d\n", output_obj.face_recognise );
    fprintf( fp, "%d\n", output_obj.scene_recognise );
    fprintf( fp, "%d\n", output_obj.activity_recognise );
    fprintf( fp, "%d\n", output_obj.speech_recognise );

    fclose( fp );
    return NULL;


int main() 
    tbb::pipeline pipeline;

    InputFilter input_filter( 100 );
    pipeline.add_filter( input_filter );

    TransformFilter transform_filter;
    pipeline.add_filter( transform_filter );

    OutputFilter output_filter;
    pipeline.add_filter( output_filter );

    tbb::tick_count t0 = tbb::tick_count::now();

    tbb::task_scheduler_init init_parallel;
    pipeline.run( 1 );
    tbb::tick_count t1 = tbb::tick_count::now();

    return 0;

对于少量文件(例如 5 或 10)一切正常。当我读取大量文件(例如 50 或 100)时,问题就开始了。问题是:

有时 InputFilter 会读取 10.txt 文件,TransformFilter 会对其进行处理。但是 InputFilter 立即读取 11.txt。 OutputFIlter 跳过 10.txt 并处理 11.txt。

我如何确保不会发生这种情况?

【问题讨论】:

问题(如果有)在过滤器定义中。请将它们添加到代码中。另外,您指定了tokens=1,这意味着串行执行,是有意的吗?并请修正缩进(删除'主程序的代码'之前的空格) 不,我尝试使用不同数量的令牌。它不起作用,这就是为什么我最后尝试使用一个。 .我附上了完整的代码 我还意识到了一件事。 .我修复了我的代码。现在它与串行执行工作正常。虽然它不适用于并行执行,例如如果我使用的令牌超过 1。 【参考方案1】:

存在数据竞争,因为 video_process_objects 被放置在过滤器结构中,并通过过滤器之间的引用传递(当然它们是并行运行的)。因此,当InputFilter 开始处理下一个令牌,将新数据读取到它的 video_process_object 中,而第一个令牌刚刚开始通过TransformFilter 中的相同地址读取数据时,您就会遇到这种情况:

     Token 1                ||         Token 2
input_filter.operator()     ||
transform_filter.operator() ||  input_filter.operator()
...

要修复它,动态分配数据,例如:

struct video_process_object *input_obj_ptr = new video_process_object;
fscanf( fp, "%d", &input_obj_ptr->video_id );
...
return input_obj_ptr;

并在最后一个过滤器中释放它,因为它的返回值无论如何都会被忽略。 this old presentation 中的幻灯片 49-50 绘制了类似的代码。

最后,让我挑战一下你对 tbb::pipelen 和 serial_in_order 过滤器类型的选择。 TBB Reference 说:

并行过滤器在实用时是首选,因为它们允许并行加速。如果过滤器必须是串行的,则在实用时首选无序变体,因为它对处理顺序的限制较少。

由于处理和文件是独立的,我认为没有理由设置这个额外的“有序”限制。为了更好地构建代码,另一个要考虑的引用:

函数 parallel_pipeline 提供了一种强类型的 lambda 友好方式来构建和运行管道。

【讨论】:

嗨。 .感谢您的 cmets。我使用了动态分配,一切似乎都很好。最后一件事 。 .在动态分配中,我将如何释放内存?因为我从过滤器返回结构值 关于带有过滤器的serial_in_order或serial_out_order,我只是在做一些实验,因为最终目标是在不同的应用程序中使用它,这将使用这个序列。在当前示例中,您不必正确。我也在学习使用 TBB。可能在未来几天,我会跳到parallel_pipeline。谢谢指出 在最后一个过滤器中释放它,因为它的返回值无论如何都会被忽略。 您好,非常感谢。你能看看我的问题吗?我更新了它,需要更多的指导。 . . 它值得一个单独的问题,请写下来并把链接放在这里,我会看看(也请从这里的描述中删除第二个问题)

以上是关于TBB 管道的错误输出的主要内容,如果未能解决你的问题,请参考以下文章

我怎样才能管道标准错误,而不是标准输出?

管道弹出标准错误和标准输出

重定向与管道

管道与重定向(有重定向习题未做)

IO和管道

标准输入输出和管道