并行读取 STDIN 时 $_ 为空
Posted
技术标签:
【中文标题】并行读取 STDIN 时 $_ 为空【英文标题】:$_ empty while reading STDIN in parallel 【发布时间】:2017-01-29 20:28:26 【问题描述】:我有一个遗留项目,它从 STDIN 获取大量数据并在 perl 脚本中逐行处理。行顺序并不重要。 这需要很长时间,所以我想并行进行。
经过一番研究,我发现Parallel::Loops
看起来很合适,但我无法让它工作,因为$_
是空的。我的代码是:
#Initialize all vars etc
$pl->while ( sub <STDIN> , sub
print $_ # but $_ is empty
其他从 STDIN ir 并行读取的方式也很受欢迎。
更新:
在获得所有帮助后,我可以管理一些工作代码,谢谢。我要做一个简短的摘要。澄清一下:
这是一种解析器,它有 3000 多行正则表达式和 自动生成的条件。
我用于测试的输入是一个 POS 标记的文本,这个文件有 1071406 行。
我的硬件是:SSD 磁盘、中档 i5 上一代和 8gb RAM DDR4。
结论:
-
因为 cmets 建议 IO 操作使我的脚本变慢。
所有建议都带来了改进,特别是包括处理成串行而不是逐行处理的建议。
答案包含对未来工作非常有用的线程实现。
Framework Parallel::ForkManager 在执行时间上引入了很多延迟。我总是在 5 分钟后终止脚本,因为没有并行性的脚本大约需要 6 分钟。
Framework Parallel::Loops 引入了一些改进。该脚本大约需要 3 分钟才能完成。
使用 GNU 并行是一种简单的优化方法。
使用 Threads 包我得到了最好的时间,1 分 45 秒,但它非常接近 GNU 并行,所以你可以尝试一下,并努力移植代码。
使用@ikegami 答案中的线程包读取一堆行,时间与应用@tanktalus 解决方案的时间相同,逐行读取。
最后,我将使用@ikegami 解决方案,我认为当数据量增加时会更好。例如,我将要处理的行数调整为 100.000,因为它比 10.000 获得更好的结果。这种差异大约是 8 秒。
下一个自然步骤是将所有内容写入文件而不是使用 STDOUT,我希望这有助于进一步减少时间。
【问题讨论】:
保持您的代码不变并将其包装在 GNU Parallel 中也许...cat hugeData | parallel --pipe ./existingScript.pl
。或者 shebang-wrap 你现有的脚本...gnu.org/software/parallel/parallel_tutorial.html#Shebang
cat hugeData |
最好替换为<hugeData
@MarkSetchell 谢谢。但是时间比按原样运行脚本要差一些。
好的,希望值得一试。我想时间主要是 I/O 而不是计算,所以并行处理不会有太大帮助。
@MarkSetchell 是的,看来你是对的。稍后我将尝试带全文的脚本,这需要 1 周才能完成。也许在那里我们可以看到差异。
【参考方案1】:
$_
从未设置,因为您从未分配给$_
!
别忘了
while (<STDIN>) ...
简称
while (defined( $_ = <STDIN> )) ...
这意味着您希望使用以下内容:
$pl->while ( sub defined( $_ = <STDIN> ) , sub
print $_;
也就是说,破坏$_
是个坏主意。它很可能被调用者中的 for (...)
别名为其他变量。
这意味着您应该使用以下内容:
my $line;
$pl->while ( sub defined( $line = <STDIN> ) , sub
print $line;
您可能会发现,将作品分解成更粗略的线条单位会产生更好的性能,因为它会降低偷听与作品的比率。
use constant WORK_UNIT_SIZE => 100;
my $done = 0;
my @lines;
$pl->while ( sub
@lines = ();
return 0 if $done;
while (@lines < WORK_UNIT_SIZE)
my $line = <>;
if (!defined($line))
$done = 1;
return 0+@lines;
push @lines, $line;
return 1;
, sub
for (@lines)
print $_;
最后,您应该重用它们,而不是为每个工作单元创建一个新任务!下面使用线程演示了这一点。
use threads qw( async );
use Thread::Queue 3.01 qw( );
use constant NUM_WORKERS => 8;
use constant WORK_UNIT_SIZE => 100;
sub worker
my ($job) = @_;
for (@$job)
print $_;
my $q = Thread::Queue->new();
$q->limit(NUM_WORKERS * 4);
async while (defined( my $job = $q->dequeue() )) worker($job);
for 1..NUM_WORKERS;
my $done = 0;
while (!$done)
my @lines;
while (@lines < WORK_UNIT_SIZE)
my $line = <>;
if (!defined($line))
$done = 1;
last;
push @lines, $line;
$q->enqueue(\@lines) if @lines;
$q->end();
$_->join for threads->list;
【讨论】:
谢谢,这个问题的答案很完美。问题是它比没有并行性的脚本慢。我用减少的输入(大约 40mb 纯文本)进行了一些测试,完成时间超过 2 分钟,而原始脚本需要 12 秒。 如果每行的工作量很小,当然是。您需要更大的工作单元 是的,你是对的。此刻的工作量很小,只是几个正则匹配 所以一次阅读更多行。让每个工作人员一次处理 10、100、1000 行。 添加到我的答案中。【参考方案2】:我不知道使用Parallel::Loops 的具体好处(很可能有)。这与Parallel::ForkManager 相同,这是Parallel::Loops
使用的。
use warnings;
use strict;
use feature 'say';
use Parallel::ForkManager;
my $max_procs = 30;
my $pm = Parallel::ForkManager->new($max_procs);
# Retrieve data returned by children in the callback
my %ret_data;
$pm->run_on_finish( sub
my ($pid, $exit, $ident, $signal, $core, $dataref) = @_;
$ret_data$pid = $dataref;
);
while (my $input = <STDIN>)
chomp($input);
$pm->start and next;
my $ret = run_job($input);
$pm->finish(0, \$ret);
$pm->wait_all_children;
foreach my $pid (keys %ret_data)
say "$pid returned: $$ret_data$pid";
sub run_job
my ($input) = @_;
# your processing
return $input; # to have something to check
此代码从子进程返回一个标量,一个值。您可以返回任何数据结构,请参阅文档中的 Retrieving data structures from child processes 和 this post 示例。
数据是通过文件返回的,对于大数据或许多快速进程,这可能会减慢速度。
如果在终端进行测试,则使用Ctrl-d
停止输入(或在chomp
之后添加last if $input !~ /\S/;
以以空行停止——但不是通过其他方式将数据传递给STDIN
)。
已澄清每个STDIN
读取只是要处理的一行。然后我们应该在生成新进程之前收集更多行,否则开销太大。
my $num_lines_to_collect = 1000;
my @lines_to_process; # collect lines for each fork
while (my $input = <STDIN>)
chomp($input);
push @lines_to_process, $input;
next if $. % $num_lines_to_collect != 0;
$pm->start and next;
my $ret = run_job( \@lines_to_process );
$pm->finish(0, \$ret);
@lines_to_process = (); # empty it for the next round
$pm->wait_all_children;
我们将行添加到数组@lines_to_process
,并且仅当当前行号$.
是$num_lines_to_collect
的倍数时才继续触发新的fork。因此,每个$num_lines_collect
都会启动一个作业,因此每个作业都会处理这么多。我将它设置为1000
,实验。
【讨论】:
我不需要返回,因为我打印到 STDOUT,所以我删除了它。该解决方案有效,所以我赞成。这个问题与接受的答案相同,这比没有并行性的脚本慢。我使用减少的输入(大约 40mb 纯文本)进行了一些测试,完成时间超过 2 分钟,而原始脚本需要 12 秒。但我认为这应该是一个新的问题。谢谢 我认为“输入”不仅仅是要处理的一行,它会触发一项工作。如果只是一件小事,而不是每次生成一个进程,那就是一个巨大的开销。因此,阅读一堆行,而不是触发一个过程。我为此添加了代码。 第一名,感谢您花时间详细说明。其次,似乎线程需要更多的工作负载。我在玩num_lines_to_collect
,我得到了最好的结果,值为 100000,这个结果比原始脚本慢了一秒。我将继续努力,并为工作增加工作量。我会发布结果。
@IvánRodríguezTorres 嗯。可能是每一行的处理太少了,所以从磁盘读取占了大部分处理时间。不过,我希望拥有多个工作会带来一些的收益。从另一端尝试 - 收集这么多行,首先只有两个作业,然后是三个,等等。 // 你的文件有多大(多少行),每行做了什么(大致)?你提到“很少的正则表达式”......是这样吗? // 你是在使用一些非常旧/很差的硬件吗?
@IvánRodríguezTorres 另外,您说“我打印到 STDOUT”...您是否在处理时将 每一行 打印到 STDOUT
?如果是,请更改它。让每个作业收集结果,然后将它们打印出来,然后打印到文件中。 (每个作业到自己的文件,然后可以合并。)你的整个事情看起来像 I/O 绑定,并且添加很多 (slow) 打印到STDOUT
不会帮助。【参考方案3】:
这里可能最简单的方法是创建一个线程池,每个线程都在同一个队列上进行侦听,然后让一个线程(可能是主线程)读取文件并将每一行推送到队列中。
use strict;
use warnings;
use Thread qw(async);
use Thread::Queue;
my $q = Thread::Queue->new();
$q->limit(32); # no point in reading in more than this into memory.
my @thr = map
async
while (defined (my $line = $q->dequeue()) )
print $line;
;
1..4; # 4 worker threads
while (<STDIN>)
$q->enqueue($_);
$q->end();
$_->join for Thread->list;
作为一个警告点,如果您需要将数据从工作线程推送回主线程,请注意。它不像其他语言那么简单。
更新:从线程切换到线程。虽然 async 函数被记录为返回线程对象,但这似乎对我不起作用,因此也必须更改连接。
【讨论】:
我还没有接受答案,因为在应用您的解决方案后,脚本会抛出前所未有的错误。如果我设法解决它们,我会接受,因为您的队列系统看起来非常有用。 我觉得我最近在 Perl 方面的经验不足以投反对票,但 官方 不鼓励使用threads
模块。它们并不像您期望的那样轻巧。 (我确实记得大约 7 年前曾尝试在一个 I/O 绑定项目中使用它们,并且他们使代码慢了 5 倍。)
@chepner 众所周知,鉴于“不鼓励”使用,我一直在考虑仍然合理的使用。 (毕竟线程确实提供了特定的好处。)所以,我很好奇——在你的项目中,它们“慢 5 倍”......与什么相比?顺序处理,还是分叉,还是……?顺便说一句,我也想知道投反对票的原因。
@chepner,关于 他们使代码慢了大约 5 倍",那么你做错了严重的事情。这就是警告出现的原因。多任务处理很困难。
@chepner, Re "threads 模块被官方禁止",他们被劝阻的原因在上一段中说:它们很重,多任务处理很复杂。这同样适用于 Parallel::Loops。同一个作者会同样气馁。以上是关于并行读取 STDIN 时 $_ 为空的主要内容,如果未能解决你的问题,请参考以下文章