[ARVADOS] updated: ad48bb33bf49e3fec668a0ccf788ad9b2ffcaa80
Git user
git at public.curoverse.com
Sat Feb 20 04:26:16 EST 2016
Summary of changes:
sdk/cli/bin/crunch-job | 4 +---
1 file changed, 1 insertion(+), 3 deletions(-)
discards b1484741b1f386baef94af4f879ff5209af98e04 (commit)
via ad48bb33bf49e3fec668a0ccf788ad9b2ffcaa80 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (b1484741b1f386baef94af4f879ff5209af98e04)
\
N -- N -- N (ad48bb33bf49e3fec668a0ccf788ad9b2ffcaa80)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit ad48bb33bf49e3fec668a0ccf788ad9b2ffcaa80
Author: Tom Clegg <tom at curoverse.com>
Date: Sat Feb 20 04:17:48 2016 -0500
8099: When invoking setup tasks via srun, check slurm queue and propagate stderr to logs.
diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index ae210a6..b63886e 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -183,11 +183,12 @@ if (($Job || $local_job)->{docker_image_locator}) {
$cmd = [$docker_bin, 'ps', '-q'];
}
Log(undef, "Sanity check is `@$cmd`");
-srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
- $cmd,
- {fork => 1});
-if ($? != 0) {
- Log(undef, "Sanity check failed: ".exit_status_s($?));
+my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+ $cmd,
+ {label => "sanity check"});
+if ($exited != 0) {
+ Log(undef, "Sanity check failed: ".exit_status_s($exited));
exit EX_TEMPFAIL;
}
Log(undef, "Sanity check OK");
@@ -386,28 +387,17 @@ my $nodelist = join(",", @node);
my $git_tar_count = 0;
if (!defined $no_clear_tmp) {
- # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
- Log (undef, "Clean work dirs");
-
- my $cleanpid = fork();
- if ($cleanpid == 0)
- {
- # Find FUSE mounts under $CRUNCH_TMP and unmount them.
- # Then clean up work directories.
- # TODO: When #5036 is done and widely deployed, we can limit mount's
- # -t option to simply fuse.keep.
- srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
- ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
- exit (1);
- }
- while (1)
- {
- last if $cleanpid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($cleanpid);
- select (undef, undef, undef, 0.1);
- }
- if ($?) {
- Log(undef, "Clean work dirs: exit ".exit_status_s($?));
+ # Find FUSE mounts under $CRUNCH_TMP and unmount them. Then clean
+ # up work directories crunch_tmp/work, crunch_tmp/opt,
+ # crunch_tmp/src*.
+ #
+ # TODO: When #5036 is done and widely deployed, we can limit mount's
+ # -t option to simply fuse.keep.
+ my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+ ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
+ {label => "clean work dirs"});
+ if ($exited != 0) {
exit(EX_RETRY_UNLOCKED);
}
}
@@ -428,30 +418,22 @@ if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
fi
};
- my $docker_pid = fork();
- if ($docker_pid == 0)
- {
- srun (["srun", "--nodelist=" . join(',', @node)],
- ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script]);
- exit ($?);
- }
- while (1)
- {
- last if $docker_pid == waitpid (-1, WNOHANG);
- freeze_if_want_freeze ($docker_pid);
- select (undef, undef, undef, 0.1);
- }
- if ($? != 0)
+
+ my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodelist=" . join(',', @node)],
+ ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
+ {label => "load docker image"});
+ if ($exited != 0)
{
- Log(undef, "Installing Docker image from $docker_locator exited " . exit_status_s($?));
exit(EX_RETRY_UNLOCKED);
}
# Determine whether this version of Docker supports memory+swap limits.
- srun(["srun", "--nodelist=" . $node[0]],
- ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
- {fork => 1});
- $docker_limitmem = ($? == 0);
+ ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodelist=" . $node[0]],
+ [$docker_bin, 'run', '--help'],
+ {label => "check --memory-swap feature"});
+ $docker_limitmem = ($stdout =~ /--memory-swap/);
# Find a non-root Docker user to use.
# Tries the default user for the container, then 'crunch', then 'nobody',
@@ -461,20 +443,22 @@ fi
# Docker containers.
my @tryusers = ("", "crunch", "nobody");
foreach my $try_user (@tryusers) {
+ my $label;
my $try_user_arg;
if ($try_user eq "") {
- Log(undef, "Checking if container default user is not UID 0");
+ $label = "check whether default user is UID 0";
$try_user_arg = "";
} else {
- Log(undef, "Checking if user '$try_user' is not UID 0");
+ $label = "check whether user '$try_user' is UID 0";
$try_user_arg = "--user=$try_user";
}
- srun(["srun", "--nodelist=" . $node[0]],
- ["/bin/sh", "-ec",
- "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
- " test \$a -ne 0"],
- {fork => 1});
- if ($? == 0) {
+ my ($exited, $stdout, $stderr) = srun_sync(
+ ["srun", "--nodelist=" . $node[0]],
+ ["/bin/sh", "-ec",
+ "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
+ {label => $label});
+ chomp($stdout);
+ if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
$dockeruserarg = $try_user_arg;
if ($try_user eq "") {
Log(undef, "Container will run with default user");
@@ -664,11 +648,9 @@ if (!defined $git_archive) {
}
}
else {
- my $install_exited;
+ my $exited;
my $install_script_tries_left = 3;
for (my $attempts = 0; $attempts < 3; $attempts++) {
- Log(undef, "Run install script on all workers");
-
my @srunargs = ("srun",
"--nodelist=$nodelist",
"-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
@@ -676,59 +658,21 @@ else {
"mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
$ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
- my ($install_stderr_r, $install_stderr_w);
- pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
- set_nonblocking($install_stderr_r);
- my $installpid = fork();
- if ($installpid == 0)
- {
- close($install_stderr_r);
- fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
- open(STDOUT, ">&", $install_stderr_w);
- open(STDERR, ">&", $install_stderr_w);
- srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
- exit (1);
- }
- close($install_stderr_w);
- # Tell freeze_if_want_freeze how to kill the child, otherwise the
- # "waitpid(installpid)" loop won't get interrupted by a freeze:
- $proc{$installpid} = {};
- my $stderr_buf = '';
- # Track whether anything appears on stderr other than slurm errors
- # ("srun: ...") and the "starting: ..." message printed by the
- # srun subroutine itself:
+ my ($stdout, $stderr);
+ ($exited, $stdout, $stderr) = srun_sync(
+ \@srunargs, \@execargs,
+ {label => "run install script on all workers"},
+ $build_script . $git_archive);
+
my $stderr_anything_from_script = 0;
- my $match_our_own_errors = '^(srun: error: |starting: \[)';
- while ($installpid != waitpid(-1, WNOHANG)) {
- freeze_if_want_freeze ($installpid);
- # Wait up to 0.1 seconds for something to appear on stderr, then
- # do a non-blocking read.
- my $bits = fhbits($install_stderr_r);
- select ($bits, undef, $bits, 0.1);
- if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
- {
- while ($stderr_buf =~ /^(.*?)\n/) {
- my $line = $1;
- substr $stderr_buf, 0, 1+length($line), "";
- Log(undef, "stderr $line");
- if ($line !~ /$match_our_own_errors/) {
- $stderr_anything_from_script = 1;
- }
- }
- }
- }
- delete $proc{$installpid};
- $install_exited = $?;
- close($install_stderr_r);
- if (length($stderr_buf) > 0) {
- if ($stderr_buf !~ /$match_our_own_errors/) {
+ for my $line (split(/\n/, $stderr)) {
+ if ($line !~ /^(srun: error: |starting: \[)/) {
$stderr_anything_from_script = 1;
}
- Log(undef, "stderr $stderr_buf")
}
- Log (undef, "Install script exited ".exit_status_s($install_exited));
- last if $install_exited == 0 || $main::please_freeze;
+ last if $exited == 0 || $main::please_freeze;
+
# If the install script fails but doesn't print an error message,
# the next thing anyone is likely to do is just run it again in
# case it was a transient problem like "slurm communication fails
@@ -744,7 +688,7 @@ else {
unlink($tar_filename);
}
- if ($install_exited != 0) {
+ if ($exited != 0) {
croak("Giving up");
}
}
@@ -1013,11 +957,12 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
next;
}
shift @freeslot;
- $proc{$childpid} = { jobstep => $id,
- time => time,
- slot => $childslot,
- jobstepname => "$job_id.$id.$childpid",
- };
+ $proc{$childpid} = {
+ jobstepidx => $id,
+ time => time,
+ slot => $childslot,
+ jobstepname => "$job_id.$id.$childpid",
+ };
croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
$slot[$childslot]->{pid} = $childpid;
@@ -1191,9 +1136,9 @@ sub reapchildren
my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
. "."
. $slot[$proc{$pid}->{slot}]->{cpu});
- my $jobstepid = $proc{$pid}->{jobstep};
+ my $jobstepidx = $proc{$pid}->{jobstepidx};
my $elapsed = time - $proc{$pid}->{time};
- my $Jobstep = $jobstep[$jobstepid];
+ my $Jobstep = $jobstep[$jobstepidx];
my $childstatus = $?;
my $exitvalue = $childstatus >> 8;
@@ -1201,11 +1146,11 @@ sub reapchildren
$Jobstep->{'arvados_task'}->reload;
my $task_success = $Jobstep->{'arvados_task'}->{success};
- Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
+ Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
if (!defined $task_success) {
# task did not indicate one way or the other --> fail
- Log($jobstepid, sprintf(
+ Log($jobstepidx, sprintf(
"ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
exit_status_s($childstatus)));
$Jobstep->{'arvados_task'}->{success} = 0;
@@ -1229,14 +1174,14 @@ sub reapchildren
# node is already suspected faulty and srun exited quickly
if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
$elapsed < 5) {
- Log ($jobstepid, "blaming failure on suspect node " .
+ Log ($jobstepidx, "blaming failure on suspect node " .
$slot[$proc{$pid}->{slot}]->{node}->{name});
$temporary_fail ||= 1;
}
ban_node_by_slot($proc{$pid}->{slot});
}
- Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
+ Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
++$Jobstep->{'failures'},
$temporary_fail ? 'temporary' : 'permanent',
$elapsed));
@@ -1246,7 +1191,7 @@ sub reapchildren
$main::success = 0;
}
# Put this task back on the todo queue
- push @jobstep_todo, $jobstepid;
+ push @jobstep_todo, $jobstepidx;
$Job->{'tasks_summary'}->{'failed'}++;
}
else
@@ -1255,20 +1200,20 @@ sub reapchildren
$slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
$slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
$slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
- push @jobstep_done, $jobstepid;
- Log ($jobstepid, "success in $elapsed seconds");
+ push @jobstep_done, $jobstepidx;
+ Log ($jobstepidx, "success in $elapsed seconds");
}
$Jobstep->{exitcode} = $childstatus;
$Jobstep->{finishtime} = time;
$Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
$Jobstep->{'arvados_task'}->save;
- process_stderr ($jobstepid, $task_success);
- Log ($jobstepid, sprintf("task output (%d bytes): %s",
+ process_stderr_final ($jobstepidx);
+ Log ($jobstepidx, sprintf("task output (%d bytes): %s",
length($Jobstep->{'arvados_task'}->{output}),
$Jobstep->{'arvados_task'}->{output}));
- close $reader{$jobstepid};
- delete $reader{$jobstepid};
+ close $reader{$jobstepidx};
+ delete $reader{$jobstepidx};
delete $slot[$proc{$pid}->{slot}]->{pid};
push @freeslot, $proc{$pid}->{slot};
delete $proc{$pid};
@@ -1306,7 +1251,10 @@ sub reapchildren
sub check_refresh_wanted
{
my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
- if (@stat && $stat[9] > $latest_refresh) {
+ if (@stat &&
+ $stat[9] > $latest_refresh &&
+ # ...and we have actually locked the job record...
+ $job_id eq $Job->{'uuid'}) {
$latest_refresh = scalar time;
my $Job2 = api_call("jobs/get", uuid => $jobspec);
for my $attr ('cancelled_at',
@@ -1346,7 +1294,7 @@ sub check_squeue
my $silent_procs = 0;
for my $procinfo (values %proc)
{
- my $jobstep = $jobstep[$procinfo->{jobstep}];
+ my $jobstep = $jobstep[$procinfo->{jobstepidx}];
if ($jobstep->{stderr_at} < $last_squeue_check)
{
$silent_procs++;
@@ -1357,7 +1305,7 @@ sub check_squeue
# use killem() on procs whose killtime is reached
while (my ($pid, $procinfo) = each %proc)
{
- my $jobstep = $jobstep[$procinfo->{jobstep}];
+ my $jobstep = $jobstep[$procinfo->{jobstepidx}];
if (exists $procinfo->{killtime}
&& $procinfo->{killtime} <= time
&& $jobstep->{stderr_at} < $last_squeue_check)
@@ -1366,7 +1314,7 @@ sub check_squeue
if ($jobstep->{stderr_at}) {
$sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
}
- Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+ Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
killem ($pid);
}
}
@@ -1416,7 +1364,7 @@ sub check_squeue
# error/delay has caused the task to die without notifying srun,
# and we'll kill srun ourselves.
$procinfo->{killtime} = time + 30;
- Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
+ Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
}
}
}
@@ -1435,70 +1383,90 @@ sub release_allocation
sub readfrompipes
{
my $gotsome = 0;
- foreach my $job (keys %reader)
+ foreach my $jobstepidx (keys %reader)
{
my $buf;
- if (0 < sysread ($reader{$job}, $buf, 65536))
+ if ($jobstep[$jobstepidx]->{stdout_r} &&
+ 0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536))
+ {
+ print STDERR $buf if $ENV{CRUNCH_DEBUG};
+ if (exists $jobstep[$jobstepidx]->{stdout_captured}) {
+ $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
+ }
+ $gotsome = 1;
+ }
+ if (0 < sysread ($reader{$jobstepidx}, $buf, 65536))
{
print STDERR $buf if $ENV{CRUNCH_DEBUG};
- $jobstep[$job]->{stderr_at} = time;
- $jobstep[$job]->{stderr} .= $buf;
+ $jobstep[$jobstepidx]->{stderr_at} = time;
+ $jobstep[$jobstepidx]->{stderr} .= $buf;
+ if (exists $jobstep[$jobstepidx]->{stderr_captured}) {
+ $jobstep[$jobstepidx]->{stderr_captured} .= $buf;
+ }
+ $gotsome = 1;
# Consume everything up to the last \n
- preprocess_stderr ($job);
+ preprocess_stderr ($jobstepidx);
- if (length ($jobstep[$job]->{stderr}) > 16384)
+ if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
{
# If we get a lot of stderr without a newline, chop off the
# front to avoid letting our buffer grow indefinitely.
- substr ($jobstep[$job]->{stderr},
- 0, length($jobstep[$job]->{stderr}) - 8192) = "";
+ substr ($jobstep[$jobstepidx]->{stderr},
+ 0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
}
- $gotsome = 1;
}
}
return $gotsome;
}
+# Consume all full lines of stderr for a jobstep. Everything after the
+# last newline will remain in $jobstep[$jobstepidx]->{stderr} after
+# returning.
sub preprocess_stderr
{
- my $job = shift;
+ my $jobstepidx = shift;
- while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
+ while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
my $line = $1;
- substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
- Log ($job, "stderr $line");
+ substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
+ Log ($jobstepidx, "stderr $line");
if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
# whoa.
$main::please_freeze = 1;
}
+ elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
+ # Skip the following tempfail checks if this srun proc isn't
+ # attached to a particular worker slot.
+ }
elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
- my $job_slot_index = $jobstep[$job]->{slotindex};
+ my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
+ my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
$slot[$job_slot_index]->{node}->{fail_count}++;
- $jobstep[$job]->{tempfail} = 1;
+ $jobstep[$jobstepidx]->{tempfail} = 1;
ban_node_by_slot($job_slot_index);
}
elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
- $jobstep[$job]->{tempfail} = 1;
- ban_node_by_slot($jobstep[$job]->{slotindex});
+ $jobstep[$jobstepidx]->{tempfail} = 1;
+ ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
}
elsif ($line =~ /arvados\.errors\.Keep/) {
- $jobstep[$job]->{tempfail} = 1;
+ $jobstep[$jobstepidx]->{tempfail} = 1;
}
}
}
-sub process_stderr
+sub process_stderr_final
{
- my $job = shift;
- my $task_success = shift;
- preprocess_stderr ($job);
+ my $jobstepidx = shift;
+ preprocess_stderr ($jobstepidx);
map {
- Log ($job, "stderr $_");
- } split ("\n", $jobstep[$job]->{stderr});
+ Log ($jobstepidx, "stderr $_");
+ } split ("\n", $jobstep[$jobstepidx]->{stderr});
+ $jobstep[$jobstepidx]->{stderr} = '';
}
sub fetch_block
@@ -1636,7 +1604,7 @@ sub killem
}
if (!exists $proc{$_}->{"sent_$sig"})
{
- Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
+ Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
kill $sig, $_;
select (undef, undef, undef, 0.1);
if ($sig == 2)
@@ -1760,16 +1728,21 @@ sub log_writer_is_active() {
return $log_pipe_pid;
}
-sub Log # ($jobstep_id, $logmessage)
+sub Log # ($jobstepidx, $logmessage)
{
- if ($_[1] =~ /\n/) {
+ my ($jobstepidx, $logmessage) = @_;
+ if ($logmessage =~ /\n/) {
for my $line (split (/\n/, $_[1])) {
- Log ($_[0], $line);
+ Log ($jobstepidx, $line);
}
return;
}
my $fh = select STDERR; $|=1; select $fh;
- my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
+ my $task_qseq = '';
+ if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
+ $task_qseq = $jobstepidx;
+ }
+ my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
$message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
$message .= "\n";
my $datetime;
@@ -1893,6 +1866,83 @@ sub freezeunquote
}
+sub srun_sync
+{
+ my $srunargs = shift;
+ my $execargs = shift;
+ my $opts = shift || {};
+ my $stdin = shift;
+
+ my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
+ Log (undef, "$label: start");
+
+ my ($stderr_r, $stderr_w);
+ pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
+
+ my ($stdout_r, $stdout_w);
+ pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
+
+ my $srunpid = fork();
+ if ($srunpid == 0)
+ {
+ close($stderr_r);
+ close($stdout_r);
+ fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+ fcntl($stdout_w, F_SETFL, 0) or croak($!);
+ open(STDERR, ">&", $stderr_w);
+ open(STDOUT, ">&", $stdout_w);
+ srun ($srunargs, $execargs, $opts, $stdin);
+ exit (1);
+ }
+ close($stderr_w);
+ close($stdout_w);
+
+ set_nonblocking($stderr_r);
+ set_nonblocking($stdout_r);
+
+ # Add entries to @jobstep and %proc so check_squeue() and
+ # freeze_if_want_freeze() can treat it like a job task process.
+ push @jobstep, {
+ stderr => '',
+ stderr_at => 0,
+ stderr_captured => '',
+ stdout_r => $stdout_r,
+ stdout_captured => '',
+ };
+ my $jobstepidx = $#jobstep;
+ $proc{$srunpid} = {
+ jobstepidx => $jobstepidx,
+ };
+ $reader{$jobstepidx} = $stderr_r;
+
+ while ($srunpid != waitpid ($srunpid, WNOHANG)) {
+ my $busy = readfrompipes();
+ if (!$busy || ($latest_refresh + 2 < scalar time)) {
+ check_refresh_wanted();
+ check_squeue();
+ }
+ if (!$busy) {
+ select(undef, undef, undef, 0.1);
+ }
+ killem(keys %proc) if $main::please_freeze;
+ }
+ my $exited = $?;
+
+ 1 while readfrompipes();
+ process_stderr_final ($jobstepidx);
+
+ Log (undef, "$label: exit ".exit_status_s($exited));
+
+ close($stdout_r);
+ close($stderr_r);
+ delete $proc{$srunpid};
+ delete $reader{$jobstepidx};
+
+ my $j = pop @jobstep;
+ return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
+}
+
+
sub srun
{
my $srunargs = shift;
diff --git a/sdk/cli/test/binstub_clean_fail/mount b/sdk/cli/test/binstub_clean_fail/mount
index 961ac28..52a7353 100755
--- a/sdk/cli/test/binstub_clean_fail/mount
+++ b/sdk/cli/test/binstub_clean_fail/mount
@@ -1,3 +1,3 @@
#!/bin/sh
echo >&2 Failing mount stub was called
-exit 1
+exit 44
diff --git a/sdk/cli/test/test_crunch-job.rb b/sdk/cli/test/test_crunch-job.rb
index 22d756a..0fbff2e 100644
--- a/sdk/cli/test/test_crunch-job.rb
+++ b/sdk/cli/test/test_crunch-job.rb
@@ -91,7 +91,7 @@ class TestCrunchJob < Minitest::Test
tryjobrecord j, binstubs: ['clean_fail']
end
assert_match /Failing mount stub was called/, err
- assert_match /Clean work dirs: exit 1\n$/, err
+ assert_match /clean work dirs: exit 44\n$/, err
assert_equal SPECIAL_EXIT[:EX_RETRY_UNLOCKED], $?.exitstatus
end
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list