[ARVADOS] updated: e185fca1249abda3de3ac9237218bb0c8560d2ab
Git user
git at public.curoverse.com
Mon Feb 29 15:52:46 EST 2016
Summary of changes:
sdk/cli/bin/crunch-job | 275 ++++++++++++++++++++++++++-----------------------
1 file changed, 147 insertions(+), 128 deletions(-)
via e185fca1249abda3de3ac9237218bb0c8560d2ab (commit)
via 3c90228c2e0466b2e69c08e0ad610dd5d619b4f8 (commit)
via d41563d9a62450f86214c4feac774dd82fc4311c (commit)
via f9d868590efb81cee078d19d3be91ef297634499 (commit)
from ad48bb33bf49e3fec668a0ccf788ad9b2ffcaa80 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit e185fca1249abda3de3ac9237218bb0c8560d2ab
Merge: ad48bb3 3c90228
Author: Tom Clegg <tom at curoverse.com>
Date: Mon Feb 29 13:38:02 2016 -0500
8099: 7263: Merge branch 'hgi/7263-even-better-busy-behavior' of github.com:wtsi-hgi/arvados into 8099-babysit-all-srun
Conflicts:
sdk/cli/bin/crunch-job
diff --cc sdk/cli/bin/crunch-job
index b63886e,c8a1de9..aff3b2f
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@@ -1130,122 -1181,133 +1131,133 @@@ sub update_progress_stat
sub reapchildren
{
- my $pid = waitpid (-1, WNOHANG);
- return 0 if $pid <= 0;
-
- my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
- . "."
- . $slot[$proc{$pid}->{slot}]->{cpu});
- my $jobstepidx = $proc{$pid}->{jobstepidx};
- my $elapsed = time - $proc{$pid}->{time};
- my $Jobstep = $jobstep[$jobstepidx];
-
- my $childstatus = $?;
- my $exitvalue = $childstatus >> 8;
- my $exitinfo = "exit ".exit_status_s($childstatus);
- $Jobstep->{'arvados_task'}->reload;
- my $task_success = $Jobstep->{'arvados_task'}->{success};
-
- Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
-
- if (!defined $task_success) {
- # task did not indicate one way or the other --> fail
- Log($jobstepidx, sprintf(
- "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
- exit_status_s($childstatus)));
- $Jobstep->{'arvados_task'}->{success} = 0;
- $Jobstep->{'arvados_task'}->save;
- $task_success = 0;
- }
-
- if (!$task_success)
+ my $children_reaped = 0;
-
- while((my $pid = waitpid (-1, WNOHANG)) > 0)
++ while ((my $pid = waitpid (-1, WNOHANG)) > 0)
{
- my $temporary_fail;
- $temporary_fail ||= $Jobstep->{tempfail};
- $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
-
- ++$thisround_failed;
- ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
-
- # Check for signs of a failed or misconfigured node
- if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
- 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
- # Don't count this against jobstep failure thresholds if this
- # node is already suspected faulty and srun exited quickly
- if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
- $elapsed < 5) {
- Log ($jobstepidx, "blaming failure on suspect node " .
- $slot[$proc{$pid}->{slot}]->{node}->{name});
- $temporary_fail ||= 1;
- }
- ban_node_by_slot($proc{$pid}->{slot});
+ my $childstatus = $?;
++
+ my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
+ . "."
+ . $slot[$proc{$pid}->{slot}]->{cpu});
- my $jobstepid = $proc{$pid}->{jobstep};
++ my $jobstepidx = $proc{$pid}->{jobstepidx};
+
+ if (!WIFEXITED($childstatus))
+ {
+ # child did not exit (may be temporarily stopped)
- Log ($jobstepid, "child $pid did not actually exit in reapchildren, ignoring for now.");
++ Log ($jobstepidx, "child $pid did not actually exit in reapchildren, ignoring for now.");
+ next;
}
- Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
- ++$Jobstep->{'failures'},
- $temporary_fail ? 'temporary' : 'permanent',
- $elapsed));
+ $children_reaped++;
+ my $elapsed = time - $proc{$pid}->{time};
- my $Jobstep = $jobstep[$jobstepid];
++ my $Jobstep = $jobstep[$jobstepidx];
+
+ my $exitvalue = $childstatus >> 8;
+ my $exitinfo = "exit ".exit_status_s($childstatus);
+ $Jobstep->{'arvados_task'}->reload;
+ my $task_success = $Jobstep->{'arvados_task'}->{success};
+
- Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
++ Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
+
+ if (!defined $task_success) {
+ # task did not indicate one way or the other --> fail
- Log($jobstepid, sprintf(
++ Log($jobstepidx, sprintf(
+ "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
+ exit_status_s($childstatus)));
+ $Jobstep->{'arvados_task'}->{success} = 0;
+ $Jobstep->{'arvados_task'}->save;
+ $task_success = 0;
+ }
- if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
- # Give up on this task, and the whole job
- $main::success = 0;
+ if (!$task_success)
+ {
+ my $temporary_fail;
+ $temporary_fail ||= $Jobstep->{tempfail};
+ $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
+
+ ++$thisround_failed;
+ ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
+
+ # Check for signs of a failed or misconfigured node
+ if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
+ 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
+ # Don't count this against jobstep failure thresholds if this
+ # node is already suspected faulty and srun exited quickly
+ if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
+ $elapsed < 5) {
- Log ($jobstepid, "blaming failure on suspect node " .
++ Log ($jobstepidx, "blaming failure on suspect node " .
+ $slot[$proc{$pid}->{slot}]->{node}->{name});
+ $temporary_fail ||= 1;
+ }
+ ban_node_by_slot($proc{$pid}->{slot});
+ }
+
- Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
- ++$Jobstep->{'failures'},
- $temporary_fail ? 'temporary' : 'permanent',
- $elapsed));
++ Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
++ ++$Jobstep->{'failures'},
++ $temporary_fail ? 'temporary' : 'permanent',
++ $elapsed));
+
+ if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
+ # Give up on this task, and the whole job
+ $main::success = 0;
+ }
+ # Put this task back on the todo queue
- push @jobstep_todo, $jobstepid;
++ push @jobstep_todo, $jobstepidx;
+ $Job->{'tasks_summary'}->{'failed'}++;
}
- # Put this task back on the todo queue
- push @jobstep_todo, $jobstepidx;
- $Job->{'tasks_summary'}->{'failed'}++;
- }
- else
- {
- ++$thisround_succeeded;
- $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
- $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
- $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
- push @jobstep_done, $jobstepidx;
- Log ($jobstepidx, "success in $elapsed seconds");
- }
- $Jobstep->{exitcode} = $childstatus;
- $Jobstep->{finishtime} = time;
- $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
- $Jobstep->{'arvados_task'}->save;
- process_stderr_final ($jobstepidx);
- Log ($jobstepidx, sprintf("task output (%d bytes): %s",
- length($Jobstep->{'arvados_task'}->{output}),
- $Jobstep->{'arvados_task'}->{output}));
-
- close $reader{$jobstepidx};
- delete $reader{$jobstepidx};
- delete $slot[$proc{$pid}->{slot}]->{pid};
- push @freeslot, $proc{$pid}->{slot};
- delete $proc{$pid};
-
- if ($task_success) {
- # Load new tasks
- my $newtask_list = [];
- my $newtask_results;
- do {
- $newtask_results = api_call(
- "job_tasks/list",
- 'where' => {
- 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
- },
- 'order' => 'qsequence',
- 'offset' => scalar(@$newtask_list),
- );
- push(@$newtask_list, @{$newtask_results->{items}});
- } while (@{$newtask_results->{items}});
- foreach my $arvados_task (@$newtask_list) {
- my $jobstep = {
- 'level' => $arvados_task->{'sequence'},
- 'failures' => 0,
- 'arvados_task' => $arvados_task
- };
- push @jobstep, $jobstep;
- push @jobstep_todo, $#jobstep;
+ else
+ {
+ ++$thisround_succeeded;
+ $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
+ $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+ $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
- push @jobstep_done, $jobstepid;
- Log ($jobstepid, "success in $elapsed seconds");
++ push @jobstep_done, $jobstepidx;
++ Log ($jobstepidx, "success in $elapsed seconds");
}
+ $Jobstep->{exitcode} = $childstatus;
+ $Jobstep->{finishtime} = time;
+ $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
+ $Jobstep->{'arvados_task'}->save;
- process_stderr ($jobstepid, $task_success);
- Log ($jobstepid, sprintf("task output (%d bytes): %s",
- length($Jobstep->{'arvados_task'}->{output}),
- $Jobstep->{'arvados_task'}->{output}));
++ process_stderr_final ($jobstepidx);
++ Log ($jobstepidx, sprintf("task output (%d bytes): %s",
++ length($Jobstep->{'arvados_task'}->{output}),
++ $Jobstep->{'arvados_task'}->{output}));
+
- close $reader{$jobstepid};
- delete $reader{$jobstepid};
++ close $reader{$jobstepidx};
++ delete $reader{$jobstepidx};
+ delete $slot[$proc{$pid}->{slot}]->{pid};
+ push @freeslot, $proc{$pid}->{slot};
+ delete $proc{$pid};
+
+ if ($task_success) {
+ # Load new tasks
+ my $newtask_list = [];
+ my $newtask_results;
+ do {
+ $newtask_results = api_call(
+ "job_tasks/list",
+ 'where' => {
+ 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
+ },
+ 'order' => 'qsequence',
+ 'offset' => scalar(@$newtask_list),
+ );
+ push(@$newtask_list, @{$newtask_results->{items}});
+ } while (@{$newtask_results->{items}});
+ foreach my $arvados_task (@$newtask_list) {
+ my $jobstep = {
+ 'level' => $arvados_task->{'sequence'},
+ 'failures' => 0,
+ 'arvados_task' => $arvados_task
+ };
+ push @jobstep, $jobstep;
+ push @jobstep_todo, $#jobstep;
+ }
+ }
+ $progress_is_dirty = 1;
}
- $progress_is_dirty = 1;
- 1;
+ return $children_reaped;
}
sub check_refresh_wanted
@@@ -1292,10 -1351,13 +1304,13 @@@ sub check_squeu
# squeue check interval (15s) this should make the squeue check an
# infrequent event.
my $silent_procs = 0;
- for my $procinfo (values %proc)
- for my $js (map {$jobstep[$_->{jobstep}]} values %proc)
++ for my $js (map {$jobstep[$_->{jobstepidx}]} values %proc)
{
- my $jobstep = $jobstep[$procinfo->{jobstepidx}];
- if ($jobstep->{stderr_at} < $last_squeue_check)
+ if (!exists($js->{stderr_at}))
+ {
+ $js->{stderr_at} = 0;
+ }
+ if ($js->{stderr_at} < $last_squeue_check)
{
$silent_procs++;
}
@@@ -1305,16 -1367,16 +1320,16 @@@
# use killem() on procs whose killtime is reached
while (my ($pid, $procinfo) = each %proc)
{
- my $jobstep = $jobstep[$procinfo->{jobstepidx}];
- my $js = $jobstep[$procinfo->{jobstep}];
++ my $js = $jobstep[$procinfo->{jobstepidx}];
if (exists $procinfo->{killtime}
&& $procinfo->{killtime} <= time
- && $jobstep->{stderr_at} < $last_squeue_check)
+ && $js->{stderr_at} < $last_squeue_check)
{
my $sincewhen = "";
- if ($jobstep->{stderr_at}) {
- $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
+ if ($js->{stderr_at}) {
+ $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
}
- Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+ Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
killem ($pid);
}
}
@@@ -1383,38 -1446,37 +1398,42 @@@ sub release_allocatio
sub readfrompipes
{
my $gotsome = 0;
+ my %fd_job;
+ my $sel = IO::Select->new();
- foreach my $job (keys %reader)
+ foreach my $jobstepidx (keys %reader)
{
- my $buf;
- if ($jobstep[$jobstepidx]->{stdout_r} &&
- 0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536))
- {
- print STDERR $buf if $ENV{CRUNCH_DEBUG};
- if (exists $jobstep[$jobstepidx]->{stdout_captured}) {
- $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
- }
- $gotsome = 1;
- my $fd = $reader{$job};
++ my $fd = $reader{$jobstepidx};
+ $sel->add($fd);
- $fd_job{$fd} = $job;
++ $fd_job{$fd} = $jobstepidx;
++
++ if (my $stdout_fd = $jobstep[$jobstepidx]->{stdout_r}) {
++ $sel->add($stdout_fd);
++ $fd_job{$stdout_fd} = $jobstepidx;
+ }
- if (0 < sysread ($reader{$jobstepidx}, $buf, 65536))
+ }
+ # select on all reader fds with 0.1s timeout
+ my @ready_fds = $sel->can_read(0.1);
+ foreach my $fd (@ready_fds)
+ {
+ my $buf;
+ if (0 < sysread ($fd, $buf, 65536))
{
print STDERR $buf if $ENV{CRUNCH_DEBUG};
- my $job = $fd_job{$fd};
- $jobstep[$job]->{stderr_at} = time;
- $jobstep[$job]->{stderr} .= $buf;
++ my $jobstepidx = $fd_job{$fd};
+ $jobstep[$jobstepidx]->{stderr_at} = time;
+ $jobstep[$jobstepidx]->{stderr} .= $buf;
- if (exists $jobstep[$jobstepidx]->{stderr_captured}) {
- $jobstep[$jobstepidx]->{stderr_captured} .= $buf;
- }
- $gotsome = 1;
# Consume everything up to the last \n
- preprocess_stderr ($job);
+ preprocess_stderr ($jobstepidx);
- if (length ($jobstep[$job]->{stderr}) > 16384)
+ if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
{
# If we get a lot of stderr without a newline, chop off the
# front to avoid letting our buffer grow indefinitely.
- substr ($jobstep[$job]->{stderr},
- 0, length($jobstep[$job]->{stderr}) - 8192) = "";
+ substr ($jobstep[$jobstepidx]->{stderr},
+ 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
}
+ $gotsome = 1;
}
}
return $gotsome;
commit 3c90228c2e0466b2e69c08e0ad610dd5d619b4f8
Author: Joshua Randall <joshua.randall at sanger.ac.uk>
Date: Thu Feb 11 10:58:18 2016 +0000
Fixes reference to $js->{killtime} instead of $procinfo->{killtime}
diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index a31fd71..c8a1de9 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -1417,7 +1417,7 @@ sub check_squeue
if ($procinfo->{time} < time - 60
&& $procinfo->{jobstepname}
&& !exists $ok{$procinfo->{jobstepname}}
- && !exists $js->{killtime})
+ && !exists $procinfo->{killtime})
{
# According to slurm, this task has ended (successfully or not)
# -- but our srun child hasn't exited. First we must wait (30
commit d41563d9a62450f86214c4feac774dd82fc4311c
Author: Joshua Randall <joshua.randall at sanger.ac.uk>
Date: Tue Feb 2 13:29:36 2016 +0000
adds numerous crunch-job performance improvements
diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index f83e7ba..a31fd71 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -801,6 +801,7 @@ if ($initial_tasks_this_level < @node) {
@freeslot = (0..$#slot);
}
my $round_num_freeslots = scalar(@freeslot);
+print STDERR "crunch-job have ${round_num_freeslots} free slots for ${initial_tasks_this_level} initial tasks at this level, ".scalar(@node)." nodes, and ".scalar(@slot)." slots\n";
my %round_max_slots = ();
for (my $ii = $#freeslot; $ii >= 0; $ii--) {
@@ -1062,9 +1063,6 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
{
update_progress_stats();
}
- if (!$gotsome) {
- select (undef, undef, undef, 0.1);
- }
$working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
$_->{node}->{hold_count} < 4 } @slot);
if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
@@ -1183,122 +1181,133 @@ sub update_progress_stats
sub reapchildren
{
- my $pid = waitpid (-1, WNOHANG);
- return 0 if $pid <= 0;
-
- my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
- . "."
- . $slot[$proc{$pid}->{slot}]->{cpu});
- my $jobstepid = $proc{$pid}->{jobstep};
- my $elapsed = time - $proc{$pid}->{time};
- my $Jobstep = $jobstep[$jobstepid];
-
- my $childstatus = $?;
- my $exitvalue = $childstatus >> 8;
- my $exitinfo = "exit ".exit_status_s($childstatus);
- $Jobstep->{'arvados_task'}->reload;
- my $task_success = $Jobstep->{'arvados_task'}->{success};
-
- Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
-
- if (!defined $task_success) {
- # task did not indicate one way or the other --> fail
- Log($jobstepid, sprintf(
- "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
- exit_status_s($childstatus)));
- $Jobstep->{'arvados_task'}->{success} = 0;
- $Jobstep->{'arvados_task'}->save;
- $task_success = 0;
- }
+ my $children_reaped = 0;
- if (!$task_success)
+ while((my $pid = waitpid (-1, WNOHANG)) > 0)
{
- my $temporary_fail;
- $temporary_fail ||= $Jobstep->{tempfail};
- $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
-
- ++$thisround_failed;
- ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
-
- # Check for signs of a failed or misconfigured node
- if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
- 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
- # Don't count this against jobstep failure thresholds if this
- # node is already suspected faulty and srun exited quickly
- if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
- $elapsed < 5) {
- Log ($jobstepid, "blaming failure on suspect node " .
- $slot[$proc{$pid}->{slot}]->{node}->{name});
- $temporary_fail ||= 1;
- }
- ban_node_by_slot($proc{$pid}->{slot});
+ my $childstatus = $?;
+ my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
+ . "."
+ . $slot[$proc{$pid}->{slot}]->{cpu});
+ my $jobstepid = $proc{$pid}->{jobstep};
+
+ if (!WIFEXITED($childstatus))
+ {
+ # child did not exit (may be temporarily stopped)
+ Log ($jobstepid, "child $pid did not actually exit in reapchildren, ignoring for now.");
+ next;
}
- Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
- ++$Jobstep->{'failures'},
- $temporary_fail ? 'temporary' : 'permanent',
- $elapsed));
+ $children_reaped++;
+ my $elapsed = time - $proc{$pid}->{time};
+ my $Jobstep = $jobstep[$jobstepid];
+
+ my $exitvalue = $childstatus >> 8;
+ my $exitinfo = "exit ".exit_status_s($childstatus);
+ $Jobstep->{'arvados_task'}->reload;
+ my $task_success = $Jobstep->{'arvados_task'}->{success};
+
+ Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
+
+ if (!defined $task_success) {
+ # task did not indicate one way or the other --> fail
+ Log($jobstepid, sprintf(
+ "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
+ exit_status_s($childstatus)));
+ $Jobstep->{'arvados_task'}->{success} = 0;
+ $Jobstep->{'arvados_task'}->save;
+ $task_success = 0;
+ }
- if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
- # Give up on this task, and the whole job
- $main::success = 0;
+ if (!$task_success)
+ {
+ my $temporary_fail;
+ $temporary_fail ||= $Jobstep->{tempfail};
+ $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
+
+ ++$thisround_failed;
+ ++$thisround_failed_multiple if $Jobstep->{'failures'} >= 1;
+
+ # Check for signs of a failed or misconfigured node
+ if (++$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} >=
+ 2+$slot[$proc{$pid}->{slot}]->{node}->{ncpus}) {
+ # Don't count this against jobstep failure thresholds if this
+ # node is already suspected faulty and srun exited quickly
+ if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
+ $elapsed < 5) {
+ Log ($jobstepid, "blaming failure on suspect node " .
+ $slot[$proc{$pid}->{slot}]->{node}->{name});
+ $temporary_fail ||= 1;
+ }
+ ban_node_by_slot($proc{$pid}->{slot});
+ }
+
+ Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
+ ++$Jobstep->{'failures'},
+ $temporary_fail ? 'temporary' : 'permanent',
+ $elapsed));
+
+ if (!$temporary_fail || $Jobstep->{'failures'} >= 3) {
+ # Give up on this task, and the whole job
+ $main::success = 0;
+ }
+ # Put this task back on the todo queue
+ push @jobstep_todo, $jobstepid;
+ $Job->{'tasks_summary'}->{'failed'}++;
}
- # Put this task back on the todo queue
- push @jobstep_todo, $jobstepid;
- $Job->{'tasks_summary'}->{'failed'}++;
- }
- else
- {
- ++$thisround_succeeded;
- $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
- $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
- $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
- push @jobstep_done, $jobstepid;
- Log ($jobstepid, "success in $elapsed seconds");
- }
- $Jobstep->{exitcode} = $childstatus;
- $Jobstep->{finishtime} = time;
- $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
- $Jobstep->{'arvados_task'}->save;
- process_stderr ($jobstepid, $task_success);
- Log ($jobstepid, sprintf("task output (%d bytes): %s",
- length($Jobstep->{'arvados_task'}->{output}),
- $Jobstep->{'arvados_task'}->{output}));
-
- close $reader{$jobstepid};
- delete $reader{$jobstepid};
- delete $slot[$proc{$pid}->{slot}]->{pid};
- push @freeslot, $proc{$pid}->{slot};
- delete $proc{$pid};
-
- if ($task_success) {
- # Load new tasks
- my $newtask_list = [];
- my $newtask_results;
- do {
- $newtask_results = api_call(
- "job_tasks/list",
- 'where' => {
- 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
- },
- 'order' => 'qsequence',
- 'offset' => scalar(@$newtask_list),
- );
- push(@$newtask_list, @{$newtask_results->{items}});
- } while (@{$newtask_results->{items}});
- foreach my $arvados_task (@$newtask_list) {
- my $jobstep = {
- 'level' => $arvados_task->{'sequence'},
- 'failures' => 0,
- 'arvados_task' => $arvados_task
- };
- push @jobstep, $jobstep;
- push @jobstep_todo, $#jobstep;
+ else
+ {
+ ++$thisround_succeeded;
+ $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
+ $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+ $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
+ push @jobstep_done, $jobstepid;
+ Log ($jobstepid, "success in $elapsed seconds");
+ }
+ $Jobstep->{exitcode} = $childstatus;
+ $Jobstep->{finishtime} = time;
+ $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
+ $Jobstep->{'arvados_task'}->save;
+ process_stderr ($jobstepid, $task_success);
+ Log ($jobstepid, sprintf("task output (%d bytes): %s",
+ length($Jobstep->{'arvados_task'}->{output}),
+ $Jobstep->{'arvados_task'}->{output}));
+
+ close $reader{$jobstepid};
+ delete $reader{$jobstepid};
+ delete $slot[$proc{$pid}->{slot}]->{pid};
+ push @freeslot, $proc{$pid}->{slot};
+ delete $proc{$pid};
+
+ if ($task_success) {
+ # Load new tasks
+ my $newtask_list = [];
+ my $newtask_results;
+ do {
+ $newtask_results = api_call(
+ "job_tasks/list",
+ 'where' => {
+ 'created_by_job_task_uuid' => $Jobstep->{'arvados_task'}->{uuid}
+ },
+ 'order' => 'qsequence',
+ 'offset' => scalar(@$newtask_list),
+ );
+ push(@$newtask_list, @{$newtask_results->{items}});
+ } while (@{$newtask_results->{items}});
+ foreach my $arvados_task (@$newtask_list) {
+ my $jobstep = {
+ 'level' => $arvados_task->{'sequence'},
+ 'failures' => 0,
+ 'arvados_task' => $arvados_task
+ };
+ push @jobstep, $jobstep;
+ push @jobstep_todo, $#jobstep;
+ }
}
+ $progress_is_dirty = 1;
}
- $progress_is_dirty = 1;
- 1;
+ return $children_reaped;
}
sub check_refresh_wanted
@@ -1342,9 +1351,13 @@ sub check_squeue
# squeue check interval (15s) this should make the squeue check an
# infrequent event.
my $silent_procs = 0;
- for my $jobstep (values %proc)
+ for my $js (map {$jobstep[$_->{jobstep}]} values %proc)
{
- if ($jobstep->{stderr_at} < $last_squeue_check)
+ if (!exists($js->{stderr_at}))
+ {
+ $js->{stderr_at} = 0;
+ }
+ if ($js->{stderr_at} < $last_squeue_check)
{
$silent_procs++;
}
@@ -1352,17 +1365,18 @@ sub check_squeue
return if $silent_procs == 0;
# use killem() on procs whose killtime is reached
- while (my ($pid, $jobstep) = each %proc)
+ while (my ($pid, $procinfo) = each %proc)
{
- if (exists $jobstep->{killtime}
- && $jobstep->{killtime} <= time
- && $jobstep->{stderr_at} < $last_squeue_check)
+ my $js = $jobstep[$procinfo->{jobstep}];
+ if (exists $procinfo->{killtime}
+ && $procinfo->{killtime} <= time
+ && $js->{stderr_at} < $last_squeue_check)
{
my $sincewhen = "";
- if ($jobstep->{stderr_at}) {
- $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
+ if ($js->{stderr_at}) {
+ $sincewhen = " in last " . (time - $js->{stderr_at}) . "s";
}
- Log($jobstep->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+ Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
killem ($pid);
}
}
@@ -1397,12 +1411,13 @@ sub check_squeue
}
# Check for child procs >60s old and not mentioned by squeue.
- while (my ($pid, $jobstep) = each %proc)
+ while (my ($pid, $procinfo) = each %proc)
{
- if ($jobstep->{time} < time - 60
- && $jobstep->{jobstepname}
- && !exists $ok{$jobstep->{jobstepname}}
- && !exists $jobstep->{killtime})
+ my $js = $jobstep[$procinfo->{jobstep}];
+ if ($procinfo->{time} < time - 60
+ && $procinfo->{jobstepname}
+ && !exists $ok{$procinfo->{jobstepname}}
+ && !exists $js->{killtime})
{
# According to slurm, this task has ended (successfully or not)
# -- but our srun child hasn't exited. First we must wait (30
@@ -1411,8 +1426,8 @@ sub check_squeue
# terminated, we'll conclude some slurm communication
# error/delay has caused the task to die without notifying srun,
# and we'll kill srun ourselves.
- $jobstep->{killtime} = time + 30;
- Log($jobstep->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
+ $procinfo->{killtime} = time + 30;
+ Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
}
}
}
@@ -1431,12 +1446,23 @@ sub release_allocation
sub readfrompipes
{
my $gotsome = 0;
+ my %fd_job;
+ my $sel = IO::Select->new();
foreach my $job (keys %reader)
{
+ my $fd = $reader{$job};
+ $sel->add($fd);
+ $fd_job{$fd} = $job;
+ }
+ # select on all reader fds with 0.1s timeout
+ my @ready_fds = $sel->can_read(0.1);
+ foreach my $fd (@ready_fds)
+ {
my $buf;
- if (0 < sysread ($reader{$job}, $buf, 65536))
+ if (0 < sysread ($fd, $buf, 65536))
{
print STDERR $buf if $ENV{CRUNCH_DEBUG};
+ my $job = $fd_job{$fd};
$jobstep[$job]->{stderr_at} = time;
$jobstep[$job]->{stderr} .= $buf;
commit f9d868590efb81cee078d19d3be91ef297634499
Author: Tom Clegg <tom at curoverse.com>
Date: Fri Jan 22 15:02:21 2016 -0500
7263: Avoid getting stuck processing stderr for one task for a long time.
Do not sleep(0.1) unless pipes are idle.
diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index 70d05f0..f83e7ba 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -1057,12 +1057,14 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
check_refresh_wanted();
check_squeue();
update_progress_stats();
- select (undef, undef, undef, 0.1);
}
elsif (time - $progress_stats_updated >= 30 || $progress_is_dirty)
{
update_progress_stats();
}
+ if (!$gotsome) {
+ select (undef, undef, undef, 0.1);
+ }
$working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
$_->{node}->{hold_count} < 4 } @slot);
if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
@@ -1432,15 +1434,21 @@ sub readfrompipes
foreach my $job (keys %reader)
{
my $buf;
- while (0 < sysread ($reader{$job}, $buf, 8192))
+ if (0 < sysread ($reader{$job}, $buf, 65536))
{
print STDERR $buf if $ENV{CRUNCH_DEBUG};
$jobstep[$job]->{stderr_at} = time;
$jobstep[$job]->{stderr} .= $buf;
+
+ # Consume everything up to the last \n
preprocess_stderr ($job);
+
if (length ($jobstep[$job]->{stderr}) > 16384)
{
- substr ($jobstep[$job]->{stderr}, 0, 8192) = "";
+ # If we get a lot of stderr without a newline, chop off the
+ # front to avoid letting our buffer grow indefinitely.
+ substr ($jobstep[$job]->{stderr},
+ 0, length($jobstep[$job]->{stderr}) - 8192) = "";
}
$gotsome = 1;
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list