并行读取 STDIN 时 $_ 为空

Posted

技术标签:

【中文标题】并行读取 STDIN 时 $_ 为空【英文标题】:$_ empty while reading STDIN in parallel 【发布时间】:2017-01-29 20:28:26 【问题描述】:

我有一个遗留项目,它从 STDIN 获取大量数据并在 perl 脚本中逐行处理。行顺序并不重要。 这需要很长时间,所以我想并行进行。

经过一番研究,我发现Parallel::Loops 看起来很合适,但我无法让它工作,因为$_ 是空的。我的代码是:

#Initialize all vars etc

$pl->while ( sub  <STDIN> , sub 
    print $_       # but $_ is empty

其他从 STDIN ir 并行读取的方式也很受欢迎。


更新:

在获得所有帮助后,我可以管理一些工作代码,谢谢。我要做一个简短的摘要。澄清一下:

    这是一种解析器,它有 3000 多行正则表达式和 自动生成的条件。

    我用于测试的输入是一个 POS 标记的文本,这个文件有 1071406 行。

    我的硬件是:SSD 磁盘、中档 i5 上一代和 8gb RAM DDR4。

结论:

    因为 cmets 建议 IO 操作使我的脚本变慢。 所有建议都带来了改进,特别是包括处理成串行而不是逐行处理的建议。 答案包含对未来工作非常有用的线程实现。 Framework Parallel::ForkManager 在执行时间上引入了很多延迟。我总是在 5 分钟后终止脚本,因为没有并行性的脚本大约需要 6 分钟。 Framework Parallel::Loops 引入了一些改进。该脚本大约需要 3 分钟才能完成。 使用 GNU 并行是一种简单的优化方法。 使用 Threads 包我得到了最好的时间,1 分 45 秒,但它非常接近 GNU 并行,所以你可以尝试一下,并努力移植代码。 使用@ikegami 答案中的线程包读取一堆行,时间与应用@tanktalus 解决方案的时间相同,逐行读取。

最后,我将使用@ikegami 解决方案,我认为当数据量增加时会更好。例如,我将要处理的行数调整为 100.000,因为它比 10.000 获得更好的结果。这种差异大约是 8 秒。

下一个自然步骤是将所有内容写入文件而不是使用 STDOUT,我希望这有助于进一步减少时间。

【问题讨论】:

保持您的代码不变并将其包装在 GNU Parallel 中也许...cat hugeData | parallel --pipe ./existingScript.pl。或者 shebang-wrap 你现有的脚本...gnu.org/software/parallel/parallel_tutorial.html#Shebang cat hugeData | 最好替换为&lt;hugeData @MarkSetchell 谢谢。但是时间比按原样运行脚本要差一些。 好的,希望值得一试。我想时间主要是 I/O 而不是计算,所以并行处理不会有太大帮助。 @MarkSetchell 是的,看来你是对的。稍后我将尝试带全文的脚本,这需要 1 周才能完成。也许在那里我们可以看到差异。 【参考方案1】:

$_ 从未设置,因为您从未分配给$_

别忘了

while (<STDIN>)  ... 

简称

while (defined( $_ = <STDIN> ))  ... 

这意味着您希望使用以下内容:

$pl->while ( sub  defined( $_ = <STDIN> ) , sub 
    print $_;

也就是说,破坏$_ 是个坏主意。它很可能被调用者中的 for (...) 别名为其他变量。

这意味着您应该使用以下内容:

my $line;
$pl->while ( sub  defined( $line = <STDIN> ) , sub 
    print $line;

您可能会发现,将作品分解成更粗略的线条单位会产生更好的性能,因为它会降低偷听与作品的比率。

use constant WORK_UNIT_SIZE => 100;

my $done = 0;
my @lines;
$pl->while ( sub 
    @lines = ();
    return 0 if $done;

    while (@lines < WORK_UNIT_SIZE) 
        my $line = <>;
        if (!defined($line)) 
            $done = 1;
            return 0+@lines;
        

        push @lines, $line;
    

    return 1;
, sub 
    for (@lines) 
        print $_;
    

最后,您应该重用它们,而不是为每个工作单元创建一个新任务!下面使用线程演示了这一点。

use threads            qw( async );
use Thread::Queue 3.01 qw( );

use constant NUM_WORKERS    => 8;
use constant WORK_UNIT_SIZE => 100;

sub worker 
    my ($job) = @_;
    for (@$job) 
        print $_;
    


my $q = Thread::Queue->new();
$q->limit(NUM_WORKERS * 4);

async  while (defined( my $job = $q->dequeue() ))  worker($job);  
    for 1..NUM_WORKERS;

my $done = 0;    
while (!$done) 
    my @lines;
    while (@lines < WORK_UNIT_SIZE) 
        my $line = <>;
        if (!defined($line)) 
            $done = 1;
            last;
        

        push @lines, $line;
    

    $q->enqueue(\@lines) if @lines;


$q->end();
$_->join for threads->list;

【讨论】:

谢谢,这个问题的答案很完美。问题是它比没有并行性的脚本慢。我用减少的输入(大约 40mb 纯文本)进行了一些测试,完成时间超过 2 分钟,而原始脚本需要 12 秒。 如果每行的工作量很小,当然是。您需要更大的工作单元 是的,你是对的。此刻的工作量很小,只是几个正则匹配 所以一次阅读更多行。让每个工作人员一次处理 10、100、1000 行。 添加到我的答案中。【参考方案2】:

我不知道使用Parallel::Loops 的具体好处(很可能有)。这与Parallel::ForkManager 相同,这是Parallel::Loops 使用的。

use warnings;
use strict;
use feature 'say';

use Parallel::ForkManager;   

my $max_procs = 30; 
my $pm = Parallel::ForkManager->new($max_procs);   

# Retrieve data returned by children in the callback
my %ret_data;      
$pm->run_on_finish( sub  
    my ($pid, $exit, $ident, $signal, $core, $dataref) = @_; 
    $ret_data$pid = $dataref;
);

while (my $input = <STDIN>)

    chomp($input);

    $pm->start and next;
    my $ret = run_job($input);
    $pm->finish(0, \$ret);

$pm->wait_all_children;

foreach my $pid (keys %ret_data) 
    say "$pid returned: $$ret_data$pid";


sub run_job  
    my ($input) = @_; 
    # your processing
    return $input;    # to have something to check
 

此代码从子进程返回一个标量,一个值。您可以返回任何数据结构,请参阅文档中的 Retrieving data structures from child processes 和 this post 示例。

数据是通过文件返回的,对于大数据或许多快速进程,这可能会减慢速度。

如果在终端进行测试,则使用Ctrl-d 停止输入(或在chomp 之后添加last if $input !~ /\S/; 以以空行停止——但不是通过其他方式将数据传递给STDIN)。


已澄清每个STDIN 读取只是要处理的一行。然后我们应该在生成新进程之前收集更多行,否则开销太大。

my $num_lines_to_collect = 1000;

my @lines_to_process;         # collect lines for each fork

while (my $input = <STDIN>)

    chomp($input);
    push @lines_to_process, $input;
    next if $. % $num_lines_to_collect != 0;

    $pm->start and next;
    my $ret = run_job( \@lines_to_process );
    $pm->finish(0, \$ret);

    @lines_to_process = ();   # empty it for the next round

$pm->wait_all_children;

我们将行添加到数组@lines_to_process,并且仅当当前行号$.$num_lines_to_collect 的倍数时才继续触发新的fork。因此,每个$num_lines_collect 都会启动一个作业,因此每个作业都会处理这么多。我将它设置为1000,实验。

【讨论】:

我不需要返回,因为我打印到 STDOUT,所以我删除了它。该解决方案有效,所以我赞成。这个问题与接受的答案相同,这比没有并行性的脚本慢。我使用减少的输入(大约 40mb 纯文本)进行了一些测试,完成时间超过 2 分钟,而原始脚本需要 12 秒。但我认为这应该是一个新的问题。谢谢 我认为“输入”不仅仅是要处理的一行,它会触发一项工作。如果只是一件小事,而不是每次生成一个进程,那就是一个巨大的开销。因此,阅读一堆行,而不是触发一个过程。我为此添加了代码。 第一名,感谢您花时间详细说明。其次,似乎线程需要更多的工作负载。我在玩num_lines_to_collect,我得到了最好的结果,值为 100000,这个结果比原始脚本慢了一秒。我将继续努力,并为工作增加工作量。我会发布结果。 @IvánRodríguezTorres 嗯。可能是每一行的处理太少了,所以从磁盘读取占了大部分处理时间。不过,我希望拥有多个工作会带来一些的收益。从另一端尝试 - 收集这么多行,首先只有两个作业,然后是三个,等等。 // 你的文件有多大(多少行),每行做了什么(大致)?你提到“很少的正则表达式”......是这样吗? // 你是在使用一些非常旧/很差的硬件吗? @IvánRodríguezTorres 另外,您说“我打印到 STDOUT”...您是否在处理时将 ​​每一行 打印到 STDOUT?如果是,请更改它。让每个作业收集结果,然后将它们打印出来,然后打印到文件中。 (每个作业到自己的文件,然后可以合并。)你的整个事情看起来像 I/O 绑定,并且添加很多 (slow) 打印到STDOUT 不会帮助。【参考方案3】:

这里可能最简单的方法是创建一个线程池,每个线程都在同一个队列上进行侦听,然后让一个线程(可能是主线程)读取文件并将每一行推送到队列中。

use strict;
use warnings;
use Thread qw(async);
use Thread::Queue;

my $q = Thread::Queue->new();
$q->limit(32); # no point in reading in more than this into memory.

my @thr = map 
    async 
        while (defined (my $line = $q->dequeue()) ) 
            print $line;
        
    ;
 1..4; # 4 worker threads

while (<STDIN>)

    $q->enqueue($_);

$q->end();

$_->join for Thread->list;

作为一个警告点,如果您需要将数据从工作线程推送回主线程,请注意。它不像其他语言那么简单。

更新:从线程切换到线程。虽然 async 函数被记录为返回线程对象,但这似乎对我不起作用,因此也必须更改连接。

【讨论】:

我还没有接受答案,因为在应用您的解决方案后,脚本会抛出前所未有的错误。如果我设法解决它们,我会接受,因为您的队列系统看起来非常有用。 我觉得我最近在 Perl 方面的经验不足以投反对票,但 官方 不鼓励使用 threads 模块。它们并不像您期望的那样轻巧。 (我确实记得大约 7 年前曾尝试在一个 I/O 绑定项目中使用它们,并且他们使代码慢了 5 倍。) @chepner 众所周知,鉴于“不鼓励”使用,我一直在考虑仍然合理的使用。 (毕竟线程确实提供了特定的好处。)所以,我很好奇——在你的项目中,它们“慢 5 倍”......与什么相比?顺序处理,还是分叉,还是……?顺便说一句,我也想知道投反对票的原因。 @chepner,关于 他们使代码慢了大约 5 倍",那么你做错了严重的事情。这就是警告出现的原因。多任务处理很困难。 @chepner, Re "threads 模块被官方禁止",他们被劝阻的原因在上一段中说:它们很重,多任务处理很复杂。这同样适用于 Parallel::Loops。同一个作者会同样气馁。

以上是关于并行读取 STDIN 时 $_ 为空的主要内容,如果未能解决你的问题,请参考以下文章

.Net 中的字典是不是可能在并行读取和写入时导致死锁?

Python BigQuery 存储。并行读取多个流

并行程序模拟

无法并行读取相同的文件

使用Java 8 Parallel Stream在并行读取多个文件时排除某些文件

如何防止来自 Cassandra 的 Dataflow 读取并行度降低