尽管收获,父母不会等待子进程完成
Posted
技术标签:
【中文标题】尽管收获,父母不会等待子进程完成【英文标题】:Parent doesn't wait for child processes to finish despite reaping 【发布时间】:2019-10-13 02:08:21 【问题描述】:我完全清楚有大量文章解释了父子进程动态的内部工作原理。我已经完成了它们,并让我的东西按照我想要的方式工作,几乎。但是有一件事让我很烦,尽管多次尝试,我还是无法理解。
问题:尽管收割了孩子,但 main 并没有等待所有孩子完成并过早退出。我相信我确实从子进程中正确退出并且我已经在子进程中安装了 REAPER - 那么在子进程完成之前 main 是如何退出的?
不在这里寻找解决方案 - 但我需要一个新的方向,以便我可以在下周努力工作。到目前为止 - 我觉得我已经用尽了我的选择并尝试了很多东西,但都无济于事。
我想要达到的目标的一些背景:
总而言之 - 我希望所有孩子都完成,然后我才想继续做进一步的事情。每个子进程产生一堆线程,这些线程由所述子进程正确连接,然后继续以exit(0)
退出。
您可能会在程序中观察到的额外喧嚣只不过是我们的要求,即我们要使用 5 个 API(引擎),但只能使用固定的批量大小,例如每次 10 个。我为每个引擎启动子进程并为每个请求启动线程 - 然后我等待所有线程完成,加入它们,然后子进程才退出。直到现在我才能将下一批请求存入同一个引擎,并且我对所有引擎都执行此操作,直到我用完我的请求总数,比如 10000。
每个请求可能需要 1 秒到 2 小时之间的任何时间 - 基本上它们是从 HTTP API 获取的 CSV 报告。
我的问题是,当我用尽全部请求时 - 我无法等待让 MAIN 等待所有子进程完成。这很奇怪,也是我要解决的问题。
有什么想法吗?
我的程序输出:
[compuser@lenovoe470:little-stuff]$ perl 07--20190526-batch-processing-using-threads-with-busy-pool-detection-2.pl 12
26710: STARTING TASKS IN BATCHES
26710: RUNNING batch_engine 1_e1 tasks (1 2)
26710: RUNNING batch_engine 2_e2 tasks (3 4)
26710: RUNNING batch_engine 3_e3 tasks (5 6 7)
26710: BUSY_ENGINE: e1.
26710: BUSY_ENGINE: e2.
26710: BUSY_ENGINE: e3.
26710: BUSY_ENGINE: e1.
26710: BUSY_ENGINE: e2.
26710:26712: TASK_ORCHESTRATOR: >> finished batch_engine (2_e2) tasks (3 4)
26710: PID (26712) has finished with status (0). updating proc hash
26710: BUSY_ENGINE: e3.
26710:26713: TASK_ORCHESTRATOR: >> finished batch_engine (3_e3) tasks (5 6 7)
26710:26711: TASK_ORCHESTRATOR: >> finished batch_engine (1_e1) tasks (1 2)
26710: PID (26713) has finished with status (0). updating proc hash
26710: BUSY_ENGINE: e1.
26710: PID (26711) has finished with status (0). updating proc hash
26710: RUNNING batch_engine 4_e2 tasks (8 9)
26710: RUNNING batch_engine 5_e3 tasks (10 11 12)
26710: FINISHED TASKS IN BATCHES
[compuser@lenovoe470:little-stuff]$ 1:26722: TASK_ORCHESTRATOR: >> finished batch_engine (5_e3) tasks (10 11 12)
1:26721: TASK_ORCHESTRATOR: >> finished batch_engine (4_e2) tasks (8 9)
在上面的输出中:
运行 batch_engine 意味着我正在运行一批编号的任务。 BUSY_ENGINE 表示端点/引擎正忙,因为它已经忙于处理请求的最大批量大小。我需要等待。 finished batch_engine 表示子进程已完成对特定引擎/端点的给定批次请求的处理。它退出并且 main 检测到当前引擎现在是空闲的,并且下一批可以排队 如果我们看到最后 2 行,很明显子进程的输出已经溢出并且主进程提前退出,而没有等待正在运行的子进程。为什么?有什么帮助吗?我的程序:
#!/usr/bin/env perl
use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use Thread qw(async);
STDOUT->autoflush(1);
# doesn't work
sub reaper
my $reaped;
while (($reaped = waitpid (-1,&WNOHANG) > 0))
print "$$: reaped: $reaped\n";
sleep(1);
$SIGCHLD = \&reaper;
# doesn't work
my @total_tasks = (1 .. shift || 9);
my @engines = (qw/e1 e2 e3/);
my $sizes = e1 => 2, e2 => 2, e3 => 3, ;
my $proc_hash;
my $global_string = "ENGINE";
# source: https://duyanghao.github.io/ways_avoid_zombie_process/
#
sub REAPER
local ($!, $?);
while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 )
if ( WIFEXITED($?) )
# my
my $ret_code = WEXITSTATUS($?);
print "$$: PID ($reaped_pid) has finished with status ($ret_code). updating proc hash\n";
my $engine_name = $proc_hash->$reaped_pid;
delete ($proc_hash->$reaped_pid);
delete ($proc_hash->$engine_name);
# my
# original
#my $ret_code = WEXITSTATUS($?);
#print "child process:$pid exit with code:$ret_code\n";
# original
#
$SIGCHLD = \&REAPER;
sub random_sleep_time
return (int(rand(5)+1))
#return (sprintf "%.2f",(rand(1)+1))
sub task_runner
my @args = @_;
my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
STDOUT->autoflush(1);
my $task_time = random_sleep_time();
sleep ($task_time);
threads->exit(0);
#print "$$:".(threads->tid()).": TASK_RUNNER: $global_string ($batch_engine) task ($task) finished in $task_time seconds\n";
#return;
;
sub task_orchestrator
my ($batch_engine, @tasks) = @_;
my $engine = (split (/_/,$batch_engine))[1];
my $task_orch_pid = fork();
die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;
if ($task_orch_pid != 0)
$proc_hash->$engine = $task_orch_pid;
$proc_hash->$task_orch_pid = $engine;
if ($task_orch_pid == 0)
STDOUT->autoflush(1);
my @tids;
for (my $i=1 ; $i <= $#tasks ; $i++) push (@tids,$i)
foreach my $task_number (0 .. $#tasks)
$tids [$task_number] = threads->create (
\&task_runner,[$batch_engine,$tasks [$task_number]]
);
my $ppid = getppid();
foreach my $tid (@tids) $tid->join()
print "$ppid:$$: TASK_ORCHESTRATOR: >> finished batch_engine ($batch_engine) tasks (@tasks)\n";
exit (0);
sub update_proc_hash
my $finished_pid = waitpid (-1, POSIX->WNOHANG);
if ($finished_pid > 0)
print "$$: PID ($finished_pid) has finished. updating proc hash\n";
my $engine_name = $proc_hash->$finished_pid;
delete ($proc_hash->$finished_pid);
delete ($proc_hash->$engine_name);
my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks)
foreach my $engine (@engines)
update_proc_hash();
if (exists $proc_hash->$engine)
print "$$: BUSY_ENGINE: $engine.\n";
sleep (1);
next;
else
my @engine_tasks;
my $engine_max_tasks = $sizes->$engine;
while ($engine_max_tasks-- != 0)
my $task = shift @total_tasks;
push (@engine_tasks,$task) if $task;
if (@engine_tasks)
my $batch_engine = $batch.'_'.$engine;
print "$$: RUNNING batch_engine $batch_engine tasks (@engine_tasks)\n";
task_orchestrator ("$batch_engine",@engine_tasks);
$batch++;
REAPER();
print "$$: FINISHED TASKS IN BATCHES\n";
__END__
3 天后更新:感谢 SO 社区。再次感谢所有抽出时间研究此问题并帮助发现和纠正问题的人。非常感谢。
请允许我将新的输出与最终程序分享给大家参考。
使用修复后的输出:
User@Host:/cygdrive/c/bash-home> perl test.pl
22044: STARTING TASKS IN BATCHES
22044: MAIN: engine (e1) is RUNNING batch #1 tasks: (1 2)
22044: MAIN: engine (e2) is RUNNING batch #2 tasks: (3 4 5)
22044: MAIN: engine (e3) is RUNNING batch #3 tasks: (6 7)
41456: TASK_RUNNER: engine (e1) finished batch #1 task #1 in (1.80) seconds
41456: TASK_RUNNER: engine (e1) finished batch #1 task #2 in (1.31) seconds
41456: TASK_ORCHESTRATOR: engine (e1) finished batch #1 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (41456) has finished with status (0).
18252: TASK_RUNNER: engine (e2) finished batch #2 task #3 in (1.04) seconds
18252: TASK_RUNNER: engine (e2) finished batch #2 task #4 in (1.91) seconds
18252: TASK_RUNNER: engine (e2) finished batch #2 task #5 in (1.63) seconds
18252: TASK_ORCHESTRATOR: engine (e2) finished batch #2 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (18252) has finished with status (0).
14544: TASK_RUNNER: engine (e3) finished batch #3 task #6 in (1.42) seconds
14544: TASK_RUNNER: engine (e3) finished batch #3 task #7 in (1.84) seconds
14544: TASK_ORCHESTRATOR: engine (e3) finished batch #3 tasks in (1.00) seconds.
22044: REAPER: TASK_ORCHESTRATOR pid (14544) has finished with status (0).
22044: MAIN: engine (e1) is RUNNING batch #4 tasks: (8 9)
22044: MAIN: engine (e2) is RUNNING batch #5 tasks: (10)
37612: TASK_RUNNER: engine (e1) finished batch #4 task #8 in (1.19) seconds
37612: TASK_RUNNER: engine (e1) finished batch #4 task #9 in (1.31) seconds
37612: TASK_ORCHESTRATOR: engine (e1) finished batch #4 tasks in (1.00) seconds.
16300: TASK_RUNNER: engine (e2) finished batch #5 task #10 in (1.53) seconds
16300: TASK_ORCHESTRATOR: engine (e2) finished batch #5 tasks in (1.00) seconds.
22044: ALL ORCHESTRATORS HAVE FINISHED
22044: FINISHED TASKS IN BATCHES
最终工作计划:
#!/usr/bin/env perl
use strict;
use warnings;
use Data::Dumper;
use POSIX ':sys_wait_h';
use threads;
STDOUT->autoflush(1);
my @total_tasks = (1 .. 10);
my $sleep_time = 1;
my @engines = (qw/e1 e2 e3/);
my $sizes =
e1 => 2,
e2 => 3,
e3 => 2,
;
my $proc_hash;
my $global_string = "engine";
sub REAPER
local ($!, $?);
while ( (my $reaped_pid = waitpid(-1, WNOHANG)) > 0 )
if ( WIFEXITED($?) )
my $ret_code = WEXITSTATUS($?);
print "$$: REAPER: TASK_ORCHESTRATOR pid ($reaped_pid) has finished with status ($ret_code).\n";
my $engine_name = $proc_hash->$reaped_pid;
delete ($proc_hash->$reaped_pid);
delete ($proc_hash->$engine_name);
$SIGCHLD = \&REAPER;
sub random_sleep_time return sprintf ("%.2f",(rand ($sleep_time||5) + 1))
sub task_runner
STDOUT->autoflush(1);
my @args = @_;
my ($batch_engine, $task) = ($args[0]->[0],$args[0]->[1]);
my ($batch, $engine) = split (/_/,$batch_engine);
my $task_time = random_sleep_time();
sleep ($task_time);
print "$$: TASK_RUNNER: $global_string ($engine) finished batch #$batch task #$task in ($task_time) seconds\n";
threads->exit(0);
;
sub task_orchestrator
my ($batch_engine, @tasks) = @_;
my ($batch, $engine) = split (/_/,$batch_engine);
my $task_orch_pid = fork();
die "Failed to fork task_orchestrator\n" if not defined $task_orch_pid;
if ($task_orch_pid != 0)
$proc_hash->$engine = $task_orch_pid;
$proc_hash->$task_orch_pid = $engine;
if ($task_orch_pid == 0)
STDOUT->autoflush(1);
my @tids;
my $start_time = time;
for (my $i=1 ; $i <= $#tasks ; $i++) push (@tids,$i)
foreach my $task_number (0 .. $#tasks)
$tids [$task_number] = threads->create (
\&task_runner,[$batch_engine,$tasks [$task_number]]
);
foreach my $tid (@tids) $tid->join()
my $end_time = time;
my $total_time = sprintf ("%.2f",($end_time - $start_time));
print "$$: TASK_ORCHESTRATOR: engine ($engine) finished batch #$batch tasks in ($total_time) seconds.\n";
exit (0);
my $batch=1;
print "$$: STARTING TASKS IN BATCHES\n";
while (@total_tasks)
foreach my $engine (@engines)
if (exists $proc_hash->$engine)
sleep (1);
next;
else
my @engine_tasks;
my $engine_max_tasks = $sizes->$engine;
while ($engine_max_tasks-- != 0)
my $task = shift @total_tasks;
push (@engine_tasks,$task) if $task;
if (@engine_tasks)
my $batch_engine = $batch.'_'.$engine;
print "$$: MAIN: engine ($engine) is RUNNING batch #$batch tasks: (@engine_tasks)\n";
task_orchestrator ($batch_engine,@engine_tasks);
$batch++;
# All 3 below work properly
#sleep (.2) while ((waitpid(-1, WNOHANG)) >= 0);
#sleep (.2) while ((waitpid(-1, WNOHANG)) != -1);
sleep (.2) while ((waitpid(-1, WNOHANG)) > -1);
print "$$: ALL ORCHESTRATORS HAVE FINISHED\n";
print "$$: FINISHED TASKS IN BATCHES\n";
__END__
【问题讨论】:
【参考方案1】:在退出主循环时,您调用 REAPER(),它执行非阻塞 waitpid()。非阻塞。非。而且它没有阻塞。所以它正在退出。
当我在这里时,我注意到您的 update_proc_hash() 函数不像其他执行 waitpid() 的事情那样循环,所以它没有捕捉到它所能捕捉到的所有东西。帮自己一个忙,把所有这些东西都整理好。
【讨论】:
但是 REAPER 不是在做一个 while 循环吗?那不应该在执行时阻止主要吗?循环尝试执行而获得的 PID 大于 0 - 休眠 1 秒?你怎么看。 @User9102d82 是的,while
循环会 这样做——只要它继续运行,就是这样。但是当waitpid
返回 0 时它会退出,并且当那里有子进程时你会得到(这正是你想要的)。所以问题不在于非阻塞操作,而是while
循环中的条件> 0
。
@User9102d82 REAPER 正在使用 WNOHANG 选项对 waitpid() 执行 while 循环。此选项可防止 waitpid() 实际等待:它获取已终止的子进程,返回已故进程的 pid,如果还有更多尚未终止的进程则返回零,如果没有则返回 -1更多的子进程。最后,您要进行 阻塞 等待(而不是非阻塞等待),以便在所有孩子完成之前不会退出。
@TFBW 完全正确,但正如他们所说,while
循环一直在旋转。它会一次又一次地回来检查您的描述。 (它应该有一个sleep
!)非阻塞性质不是问题——只要它没有在0
return 退出(见我的回答)。
@TFBW:感谢您回复我的问题,感谢您抽出时间来回答。【参考方案2】:
waitpid
如果有匹配 PID 的子进程但还没有终止,则可以返回 0
对于-1
,这适用于任何子进程,因此具有多个子进程的代码肯定会遇到来自REAPER
中的非阻塞waitpid
的零返回;只要有未终止的子进程,这正是我们等待的方式。但是你的 while
循环一开始就退出了这样的零。
一种方法是轮询非负回报
use warnings;
use strict;
use feature 'say';
use POSIX ':sys_wait_h';
use Time::HiRes qw(sleep) ;
for (1..4)
my $pid = fork // die "Can't fork: $!";
if ($pid == 0)
sleep rand 4;
say "\tkid $$ exiting";
exit;
;
;
while ( (my $kid = waitpid -1, WNOHANG) > -1 )
say "got $kid" if $kid > 0;
sleep 0.2;
打印
孩子 12687 退出 得到 12687 孩子 12690 退出 得到 12690 孩子 12689 退出 得到 12689 孩子 12688 退出 得到 12688请适当调整轮询周期。请注意,由于这会捕获 任何 个子进程,因此它可能会干扰其他分叉,如果此时有任何未等待的分叉。
或者你可以等待
while ( (my $kid = waitpid -1, 0) > -1 )
say "got $kid";
您现在也可以在此处执行> 0
,因为调用阻塞后,此处不会有0
返回。虽然我们只需要在 -1
返回时终止循环(不再有进程),就像以前一样。
主要区别在于该块仅在子进程实际退出时执行,因此如果您需要密切关注一些长期运行的子进程正在做什么(并且可能限制他们的运行时间或防止挂起的作业),那就是这种形式并不容易;你想要一个非阻塞操作。
请注意,某些细节,特别是与退货有关的细节,可能因系统而异。
这个简单的版本是只等待这些特定的 PID,收集为你fork
foreach my $pid (@pids)
my $gone = waitpid $pid, 0;
say "Process $gone exited with $?" if $gone > 0; # -1 if reaped already
每个进程使用waitpid
阻塞。这样做的问题是,如果一个进程运行的时间比其他进程长得多(或挂起),这个循环将被卡住等待它。而且,一般来说,我们宁愿让子进程在退出时获得,而不是按照它们启动的顺序。
【讨论】:
@User9102d82 为清楚起见编辑了解释(希望如此!)。添加了一个更基本的方法来做到这一点。 为什么不直接使用 wait() 或 waitpid(-1) 而不使用 WNOHANG 选项?非阻塞等待的全部意义在于处理您不想想要暂停并等待事情完成的情况,您只想清理已经完成的事情。 @TFBW 当然,也可以这样做。非阻塞的允许一个人在检查之间做其他事情(也许检查这些进程是如何/做什么的),长时间等待可能有用。 (我可能会添加更多讨论) @User9102d82 以另一种方式添加 谢谢@zdim,这行得通。我错过了不检查大于 -1 的问题,这是关键问题。我选择你的解释作为答案,因为它直接切中要害,而且更加详细和详尽。以上是关于尽管收获,父母不会等待子进程完成的主要内容,如果未能解决你的问题,请参考以下文章