[ARVADOS] updated: b1484741b1f386baef94af4f879ff5209af98e04

Git user git at public.curoverse.com
Sat Feb 20 00:11:06 EST 2016


Summary of changes:
 apps/workbench/.gitignore                          |  3 +
 .../app/controllers/actions_controller.rb          | 29 +++++++++
 .../app/controllers/application_controller.rb      | 73 +++++++---------------
 .../app/views/application/_breadcrumbs.html.erb    | 69 ++++++++++++++++++++
 .../views/application/_projects_tree_menu.html.erb | 20 ++++++
 .../app/views/application/_show_star.html.erb      |  9 +++
 apps/workbench/app/views/application/star.js.erb   |  2 +
 apps/workbench/app/views/layouts/body.html.erb     | 70 +--------------------
 apps/workbench/app/views/projects/_choose.html.erb | 48 ++++++++------
 apps/workbench/app/views/projects/show.html.erb    |  1 +
 apps/workbench/config/routes.rb                    |  1 +
 .../test/controllers/projects_controller_test.rb   | 50 +++++++++++++++
 apps/workbench/test/integration/projects_test.rb   | 32 +++++++++-
 sdk/cli/bin/crunch-job                             | 14 ++++-
 sdk/cwl/arvados_cwl/__init__.py                    | 52 ++++++++++++---
 sdk/cwl/setup.py                                   |  4 +-
 sdk/perl/.gitignore                                |  1 +
 sdk/python/arvados/commands/run.py                 |  6 +-
 sdk/python/arvados/events.py                       |  5 +-
 sdk/python/setup.py                                |  3 +-
 services/api/.gitignore                            |  3 +
 services/api/test/fixtures/groups.yml              | 12 ++++
 services/api/test/fixtures/links.yml               | 35 +++++++++++
 .../arvados/v1/groups_controller_test.rb           | 43 +++++++++++++
 24 files changed, 425 insertions(+), 160 deletions(-)
 create mode 100644 apps/workbench/app/views/application/_show_star.html.erb
 create mode 100644 apps/workbench/app/views/application/star.js.erb
 create mode 100644 sdk/perl/.gitignore

  discards  948d5c23b0964eae013d0792109b79e41b410c2d (commit)
       via  b1484741b1f386baef94af4f879ff5209af98e04 (commit)
       via  ec3ff9b42a2b6a5d50603aced6275110fdc90912 (commit)
       via  03245f8fb2e143864966dc151bf12368d2bd78fa (commit)
       via  21112a4f6cd07b1b157463b257333302fdf57db2 (commit)
       via  32e3f6eb604d3692f10f16220a78e07c056be00e (commit)
       via  617607fee610ce77ec68fb943aa84b792327c350 (commit)
       via  800816a9888ac843381c08b266d607b98945cdb8 (commit)
       via  984652e494eacaf8aba2badcbce0b6b79bebf4a3 (commit)
       via  b39ef1930d3e1a608ef632ed8c4f81a02f91b99f (commit)
       via  e192dbe1b1100ffb811b151fca0562f906b8ec44 (commit)
       via  65740fba9a4e44687c47b49311b6c9cc32326eeb (commit)
       via  549adc0fdf4731cee1bae2aac58b959266db26d7 (commit)
       via  57d5e8e0f8e8c3871e2d95d14e3088c0a3f41a9b (commit)
       via  c592f178659ed5ebfd03720a02940daa62931d14 (commit)
       via  7befcf18a5a8249ba73ec6c2009a8a82861b84bf (commit)
       via  f3f96f2c0c57a7793428d168668f5b4cc130ab4f (commit)
       via  9a2ad7a89ec86bb17d4c6a0767fdda087136b733 (commit)
       via  4b3ada6aea3fe90aedfe610827eaf4e2c1442f03 (commit)
       via  b3bb4cdff0b10f42624b67ae19c03f1edfcecab0 (commit)
       via  3e515256d1ce0c7c09f4f8454f9761f261fb8b71 (commit)
       via  0d8d1ff55e2ba154b396e1879928a126745c1089 (commit)
       via  92ac862af66a1c60fb4e7bf5c916552ab505ff49 (commit)
       via  443f3228eb4c56849f77ae9c421dd1cc6fdbc5f1 (commit)
       via  fd5fc5a6c2cc3bec082877627d8d395950024930 (commit)
       via  a6310e8b0eedce1323f02d285c0709516558f937 (commit)
       via  d6c21313094521040c9b013172c5ebe136341c03 (commit)
       via  1ca3b77ff42e1bfa0076a27bba1daa0406655082 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (948d5c23b0964eae013d0792109b79e41b410c2d)
            \
             N -- N -- N (b1484741b1f386baef94af4f879ff5209af98e04)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit b1484741b1f386baef94af4f879ff5209af98e04
Author: Tom Clegg <tom at curoverse.com>
Date:   Sat Feb 20 00:08:34 2016 -0500

    8099: When invoking setup tasks via srun, check slurm queue and propagate stderr to logs.

diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index ae210a6..76b41aa 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -183,11 +183,12 @@ if (($Job || $local_job)->{docker_image_locator}) {
   $cmd = [$docker_bin, 'ps', '-q'];
 }
 Log(undef, "Sanity check is `@$cmd`");
-srun(["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
-     $cmd,
-     {fork => 1});
-if ($? != 0) {
-  Log(undef, "Sanity check failed: ".exit_status_s($?));
+my ($exited, $stdout, $stderr) = srun_sync(
+  ["srun", "--nodes=\Q$ENV{SLURM_NNODES}\E", "--ntasks-per-node=1"],
+  $cmd,
+  {label => "sanity check"});
+if ($exited != 0) {
+  Log(undef, "Sanity check failed: ".exit_status_s($exited));
   exit EX_TEMPFAIL;
 }
 Log(undef, "Sanity check OK");
@@ -386,28 +387,17 @@ my $nodelist = join(",", @node);
 my $git_tar_count = 0;
 
 if (!defined $no_clear_tmp) {
-  # Clean out crunch_tmp/work, crunch_tmp/opt, crunch_tmp/src*
-  Log (undef, "Clean work dirs");
-
-  my $cleanpid = fork();
-  if ($cleanpid == 0)
-  {
-    # Find FUSE mounts under $CRUNCH_TMP and unmount them.
-    # Then clean up work directories.
-    # TODO: When #5036 is done and widely deployed, we can limit mount's
-    # -t option to simply fuse.keep.
-    srun (["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
-          ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid']);
-    exit (1);
-  }
-  while (1)
-  {
-    last if $cleanpid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($cleanpid);
-    select (undef, undef, undef, 0.1);
-  }
-  if ($?) {
-    Log(undef, "Clean work dirs: exit ".exit_status_s($?));
+  # Find FUSE mounts under $CRUNCH_TMP and unmount them.  Then clean
+  # up work directories crunch_tmp/work, crunch_tmp/opt,
+  # crunch_tmp/src*.
+  # 
+  # TODO: When #5036 is done and widely deployed, we can limit mount's
+  # -t option to simply fuse.keep.
+  my ($exited, $stdout, $stderr) = srun_sync(
+    ["srun", "--nodelist=$nodelist", "-D", $ENV{'TMPDIR'}],
+    ['bash', '-ec', '-o', 'pipefail', 'mount -t fuse,fuse.keep | awk "(index(\$3, \"$CRUNCH_TMP\") == 1){print \$3}" | xargs -r -n 1 fusermount -u -z; sleep 1; rm -rf $JOB_WORK $CRUNCH_INSTALL $CRUNCH_TMP/task $CRUNCH_TMP/src* $CRUNCH_TMP/*.cid'],
+    {label => "clean work dirs"});
+  if ($exited != 0) {
     exit(EX_RETRY_UNLOCKED);
   }
 }
@@ -428,30 +418,22 @@ if ! $docker_bin images -q --no-trunc --all | grep -qxF \Q$docker_hash\E; then
     arv-get \Q$docker_locator$docker_stream/$docker_hash.tar\E | $docker_bin load
 fi
 };
-  my $docker_pid = fork();
-  if ($docker_pid == 0)
-  {
-    srun (["srun", "--nodelist=" . join(',', @node)],
-          ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script]);
-    exit ($?);
-  }
-  while (1)
-  {
-    last if $docker_pid == waitpid (-1, WNOHANG);
-    freeze_if_want_freeze ($docker_pid);
-    select (undef, undef, undef, 0.1);
-  }
-  if ($? != 0)
+
+  my ($exited, $stdout, $stderr) = srun_sync(
+    ["srun", "--nodelist=" . join(',', @node)],
+    ["/bin/bash", "-o", "pipefail", "-ec", $docker_install_script],
+    {label => "load docker image"});
+  if ($exited != 0)
   {
-    Log(undef, "Installing Docker image from $docker_locator exited " . exit_status_s($?));
     exit(EX_RETRY_UNLOCKED);
   }
 
   # Determine whether this version of Docker supports memory+swap limits.
-  srun(["srun", "--nodelist=" . $node[0]],
-       ["/bin/sh", "-ec", "$docker_bin run --help | grep -qe --memory-swap="],
-      {fork => 1});
-  $docker_limitmem = ($? == 0);
+  ($exited, $stdout, $stderr) = srun_sync(
+    ["srun", "--nodelist=" . $node[0]],
+    [$docker_bin, 'run', '--help'],
+    {label => "check --memory-swap feature"});
+  $docker_limitmem = ($stdout =~ /--memory-swap/);
 
   # Find a non-root Docker user to use.
   # Tries the default user for the container, then 'crunch', then 'nobody',
@@ -461,20 +443,22 @@ fi
   # Docker containers.
   my @tryusers = ("", "crunch", "nobody");
   foreach my $try_user (@tryusers) {
+    my $label;
     my $try_user_arg;
     if ($try_user eq "") {
-      Log(undef, "Checking if container default user is not UID 0");
+      $label = "check whether default user is UID 0";
       $try_user_arg = "";
     } else {
-      Log(undef, "Checking if user '$try_user' is not UID 0");
+      $label = "check whether user '$try_user' is UID 0";
       $try_user_arg = "--user=$try_user";
     }
-    srun(["srun", "--nodelist=" . $node[0]],
-         ["/bin/sh", "-ec",
-          "a=`$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user` && " .
-          " test \$a -ne 0"],
-         {fork => 1});
-    if ($? == 0) {
+    my ($exited, $stdout, $stderr) = srun_sync(
+      ["srun", "--nodelist=" . $node[0]],
+      ["/bin/sh", "-ec",
+       "$docker_bin run $docker_run_args $try_user_arg $docker_hash id --user"],
+      {label => $label});
+    chomp($stdout);
+    if ($exited == 0 && $stdout =~ /^\d+$/ && $stdout > 0) {
       $dockeruserarg = $try_user_arg;
       if ($try_user eq "") {
         Log(undef, "Container will run with default user");
@@ -664,11 +648,9 @@ if (!defined $git_archive) {
   }
 }
 else {
-  my $install_exited;
+  my $exited;
   my $install_script_tries_left = 3;
   for (my $attempts = 0; $attempts < 3; $attempts++) {
-    Log(undef, "Run install script on all workers");
-
     my @srunargs = ("srun",
                     "--nodelist=$nodelist",
                     "-D", $ENV{'TMPDIR'}, "--job-name=$job_id");
@@ -676,59 +658,21 @@ else {
                     "mkdir -p $ENV{CRUNCH_INSTALL} && cd $ENV{CRUNCH_TMP} && perl -");
 
     $ENV{"CRUNCH_GIT_ARCHIVE_HASH"} = md5_hex($git_archive);
-    my ($install_stderr_r, $install_stderr_w);
-    pipe $install_stderr_r, $install_stderr_w or croak("pipe() failed: $!");
-    set_nonblocking($install_stderr_r);
-    my $installpid = fork();
-    if ($installpid == 0)
-    {
-      close($install_stderr_r);
-      fcntl($install_stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
-      open(STDOUT, ">&", $install_stderr_w);
-      open(STDERR, ">&", $install_stderr_w);
-      srun (\@srunargs, \@execargs, {}, $build_script . $git_archive);
-      exit (1);
-    }
-    close($install_stderr_w);
-    # Tell freeze_if_want_freeze how to kill the child, otherwise the
-    # "waitpid(installpid)" loop won't get interrupted by a freeze:
-    $proc{$installpid} = {};
-    my $stderr_buf = '';
-    # Track whether anything appears on stderr other than slurm errors
-    # ("srun: ...") and the "starting: ..." message printed by the
-    # srun subroutine itself:
+    my ($stdout, $stderr);
+    ($exited, $stdout, $stderr) = srun_sync(
+      \@srunargs, \@execargs,
+      {label => "run install script on all workers"},
+      $build_script . $git_archive);
+
     my $stderr_anything_from_script = 0;
-    my $match_our_own_errors = '^(srun: error: |starting: \[)';
-    while ($installpid != waitpid(-1, WNOHANG)) {
-      freeze_if_want_freeze ($installpid);
-      # Wait up to 0.1 seconds for something to appear on stderr, then
-      # do a non-blocking read.
-      my $bits = fhbits($install_stderr_r);
-      select ($bits, undef, $bits, 0.1);
-      if (0 < sysread ($install_stderr_r, $stderr_buf, 8192, length($stderr_buf)))
-      {
-        while ($stderr_buf =~ /^(.*?)\n/) {
-          my $line = $1;
-          substr $stderr_buf, 0, 1+length($line), "";
-          Log(undef, "stderr $line");
-          if ($line !~ /$match_our_own_errors/) {
-            $stderr_anything_from_script = 1;
-          }
-        }
-      }
-    }
-    delete $proc{$installpid};
-    $install_exited = $?;
-    close($install_stderr_r);
-    if (length($stderr_buf) > 0) {
-      if ($stderr_buf !~ /$match_our_own_errors/) {
+    for my $line (split(/\n/, $stderr)) {
+      if ($line !~ /^(srun: error: |starting: \[)/) {
         $stderr_anything_from_script = 1;
       }
-      Log(undef, "stderr $stderr_buf")
     }
 
-    Log (undef, "Install script exited ".exit_status_s($install_exited));
-    last if $install_exited == 0 || $main::please_freeze;
+    last if $exited == 0 || $main::please_freeze;
+
     # If the install script fails but doesn't print an error message,
     # the next thing anyone is likely to do is just run it again in
     # case it was a transient problem like "slurm communication fails
@@ -744,7 +688,7 @@ else {
     unlink($tar_filename);
   }
 
-  if ($install_exited != 0) {
+  if ($exited != 0) {
     croak("Giving up");
   }
 }
@@ -824,6 +768,8 @@ update_progress_stats();
 THISROUND:
 for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
 {
+  print STDERR "@jobstep_todo\n";
+  print STDERR "@jobstep\n";
   # Don't create new tasks if we already know the job's final result.
   last if defined($main::success);
 
@@ -1013,11 +959,12 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     next;
   }
   shift @freeslot;
-  $proc{$childpid} = { jobstep => $id,
-		       time => time,
-		       slot => $childslot,
-		       jobstepname => "$job_id.$id.$childpid",
-		     };
+  $proc{$childpid} = {
+    jobstepidx => $id,
+    time => time,
+    slot => $childslot,
+    jobstepname => "$job_id.$id.$childpid",
+  };
   croak ("assert failed: \$slot[$childslot]->{'pid'} exists") if exists $slot[$childslot]->{pid};
   $slot[$childslot]->{pid} = $childpid;
 
@@ -1191,9 +1138,9 @@ sub reapchildren
   my $whatslot = ($slot[$proc{$pid}->{slot}]->{node}->{name}
 		  . "."
 		  . $slot[$proc{$pid}->{slot}]->{cpu});
-  my $jobstepid = $proc{$pid}->{jobstep};
+  my $jobstepidx = $proc{$pid}->{jobstepidx};
   my $elapsed = time - $proc{$pid}->{time};
-  my $Jobstep = $jobstep[$jobstepid];
+  my $Jobstep = $jobstep[$jobstepidx];
 
   my $childstatus = $?;
   my $exitvalue = $childstatus >> 8;
@@ -1201,11 +1148,11 @@ sub reapchildren
   $Jobstep->{'arvados_task'}->reload;
   my $task_success = $Jobstep->{'arvados_task'}->{success};
 
-  Log ($jobstepid, "child $pid on $whatslot $exitinfo success=$task_success");
+  Log ($jobstepidx, "child $pid on $whatslot $exitinfo success=$task_success");
 
   if (!defined $task_success) {
     # task did not indicate one way or the other --> fail
-    Log($jobstepid, sprintf(
+    Log($jobstepidx, sprintf(
           "ERROR: Task process exited %s, but never updated its task record to indicate success and record its output.",
           exit_status_s($childstatus)));
     $Jobstep->{'arvados_task'}->{success} = 0;
@@ -1229,14 +1176,14 @@ sub reapchildren
       # node is already suspected faulty and srun exited quickly
       if ($slot[$proc{$pid}->{slot}]->{node}->{hold_until} &&
 	  $elapsed < 5) {
-	Log ($jobstepid, "blaming failure on suspect node " .
+	Log ($jobstepidx, "blaming failure on suspect node " .
              $slot[$proc{$pid}->{slot}]->{node}->{name});
         $temporary_fail ||= 1;
       }
       ban_node_by_slot($proc{$pid}->{slot});
     }
 
-    Log ($jobstepid, sprintf('failure (#%d, %s) after %d seconds',
+    Log ($jobstepidx, sprintf('failure (#%d, %s) after %d seconds',
                              ++$Jobstep->{'failures'},
                              $temporary_fail ? 'temporary' : 'permanent',
                              $elapsed));
@@ -1246,7 +1193,7 @@ sub reapchildren
       $main::success = 0;
     }
     # Put this task back on the todo queue
-    push @jobstep_todo, $jobstepid;
+    push @jobstep_todo, $jobstepidx;
     $Job->{'tasks_summary'}->{'failed'}++;
   }
   else
@@ -1255,20 +1202,20 @@ sub reapchildren
     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
     $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
-    push @jobstep_done, $jobstepid;
-    Log ($jobstepid, "success in $elapsed seconds");
+    push @jobstep_done, $jobstepidx;
+    Log ($jobstepidx, "success in $elapsed seconds");
   }
   $Jobstep->{exitcode} = $childstatus;
   $Jobstep->{finishtime} = time;
   $Jobstep->{'arvados_task'}->{finished_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{finishtime});
   $Jobstep->{'arvados_task'}->save;
-  process_stderr ($jobstepid, $task_success);
-  Log ($jobstepid, sprintf("task output (%d bytes): %s",
+  process_stderr_final ($jobstepidx);
+  Log ($jobstepidx, sprintf("task output (%d bytes): %s",
                            length($Jobstep->{'arvados_task'}->{output}),
                            $Jobstep->{'arvados_task'}->{output}));
 
-  close $reader{$jobstepid};
-  delete $reader{$jobstepid};
+  close $reader{$jobstepidx};
+  delete $reader{$jobstepidx};
   delete $slot[$proc{$pid}->{slot}]->{pid};
   push @freeslot, $proc{$pid}->{slot};
   delete $proc{$pid};
@@ -1306,7 +1253,10 @@ sub reapchildren
 sub check_refresh_wanted
 {
   my @stat = stat $ENV{"CRUNCH_REFRESH_TRIGGER"};
-  if (@stat && $stat[9] > $latest_refresh) {
+  if (@stat &&
+      $stat[9] > $latest_refresh &&
+      # ...and we have actually locked the job record...
+      $job_id eq $Job->{'uuid'}) {
     $latest_refresh = scalar time;
     my $Job2 = api_call("jobs/get", uuid => $jobspec);
     for my $attr ('cancelled_at',
@@ -1346,7 +1296,7 @@ sub check_squeue
   my $silent_procs = 0;
   for my $procinfo (values %proc)
   {
-    my $jobstep = $jobstep[$procinfo->{jobstep}];
+    my $jobstep = $jobstep[$procinfo->{jobstepidx}];
     if ($jobstep->{stderr_at} < $last_squeue_check)
     {
       $silent_procs++;
@@ -1357,7 +1307,7 @@ sub check_squeue
   # use killem() on procs whose killtime is reached
   while (my ($pid, $procinfo) = each %proc)
   {
-    my $jobstep = $jobstep[$procinfo->{jobstep}];
+    my $jobstep = $jobstep[$procinfo->{jobstepidx}];
     if (exists $procinfo->{killtime}
         && $procinfo->{killtime} <= time
         && $jobstep->{stderr_at} < $last_squeue_check)
@@ -1366,7 +1316,7 @@ sub check_squeue
       if ($jobstep->{stderr_at}) {
         $sincewhen = " in last " . (time - $jobstep->{stderr_at}) . "s";
       }
-      Log($procinfo->{jobstep}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
+      Log($procinfo->{jobstepidx}, "killing orphaned srun process $pid (task not in slurm queue, no stderr received$sincewhen)");
       killem ($pid);
     }
   }
@@ -1416,7 +1366,7 @@ sub check_squeue
       # error/delay has caused the task to die without notifying srun,
       # and we'll kill srun ourselves.
       $procinfo->{killtime} = time + 30;
-      Log($procinfo->{jobstep}, "notice: task is not in slurm queue but srun process $pid has not exited");
+      Log($procinfo->{jobstepidx}, "notice: task is not in slurm queue but srun process $pid has not exited");
     }
   }
 }
@@ -1435,70 +1385,90 @@ sub release_allocation
 sub readfrompipes
 {
   my $gotsome = 0;
-  foreach my $job (keys %reader)
+  foreach my $jobstepidx (keys %reader)
   {
     my $buf;
-    if (0 < sysread ($reader{$job}, $buf, 65536))
+    if ($jobstep[$jobstepidx]->{stdout_r} &&
+        0 < sysread ($jobstep[$jobstepidx]->{stdout_r}, $buf, 65536))
+    {
+      print STDERR $buf if $ENV{CRUNCH_DEBUG};
+      if (exists $jobstep[$jobstepidx]->{stdout_captured}) {
+        $jobstep[$jobstepidx]->{stdout_captured} .= $buf;
+      }
+      $gotsome = 1;
+    }
+    if (0 < sysread ($reader{$jobstepidx}, $buf, 65536))
     {
       print STDERR $buf if $ENV{CRUNCH_DEBUG};
-      $jobstep[$job]->{stderr_at} = time;
-      $jobstep[$job]->{stderr} .= $buf;
+      $jobstep[$jobstepidx]->{stderr_at} = time;
+      $jobstep[$jobstepidx]->{stderr} .= $buf;
+      if (exists $jobstep[$jobstepidx]->{stderr_captured}) {
+        $jobstep[$jobstepidx]->{stderr_captured} .= $buf;
+      }
+      $gotsome = 1;
 
       # Consume everything up to the last \n
-      preprocess_stderr ($job);
+      preprocess_stderr ($jobstepidx);
 
-      if (length ($jobstep[$job]->{stderr}) > 16384)
+      if (length ($jobstep[$jobstepidx]->{stderr}) > 16384)
       {
         # If we get a lot of stderr without a newline, chop off the
         # front to avoid letting our buffer grow indefinitely.
-        substr ($jobstep[$job]->{stderr},
-                0, length($jobstep[$job]->{stderr}) - 8192) = "";
+        substr ($jobstep[$jobstepidx]->{stderr},
+                0, length($jobstep[$jobstepidx]->{stderr}) - 8192) = "";
       }
-      $gotsome = 1;
     }
   }
   return $gotsome;
 }
 
 
+# Consume all full lines of stderr for a jobstep. Everything after the
+# last newline will remain in $jobstep[$jobstepidx]->{stderr} after
+# returning.
 sub preprocess_stderr
 {
-  my $job = shift;
+  my $jobstepidx = shift;
 
-  while ($jobstep[$job]->{stderr} =~ /^(.*?)\n/) {
+  while ($jobstep[$jobstepidx]->{stderr} =~ /^(.*?)\n/) {
     my $line = $1;
-    substr $jobstep[$job]->{stderr}, 0, 1+length($line), "";
-    Log ($job, "stderr $line");
+    substr $jobstep[$jobstepidx]->{stderr}, 0, 1+length($line), "";
+    Log ($jobstepidx, "stderr $line");
     if ($line =~ /srun: error: (SLURM job $ENV{SLURM_JOB_ID} has expired|Unable to confirm allocation for job $ENV{SLURM_JOB_ID})/) {
       # whoa.
       $main::please_freeze = 1;
     }
+    elsif (!exists $jobstep[$jobstepidx]->{slotindex}) {
+      # Skip the following tempfail checks if this srun proc isn't
+      # attached to a particular worker slot.
+    }
     elsif ($line =~ /srun: error: (Node failure on|Aborting, .*\bio error\b)/) {
-      my $job_slot_index = $jobstep[$job]->{slotindex};
+      my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
+      my $job_slot_index = $jobstep[$jobstepidx]->{slotindex};
       $slot[$job_slot_index]->{node}->{fail_count}++;
-      $jobstep[$job]->{tempfail} = 1;
+      $jobstep[$jobstepidx]->{tempfail} = 1;
       ban_node_by_slot($job_slot_index);
     }
     elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
-      $jobstep[$job]->{tempfail} = 1;
-      ban_node_by_slot($jobstep[$job]->{slotindex});
+      $jobstep[$jobstepidx]->{tempfail} = 1;
+      ban_node_by_slot($jobstep[$jobstepidx]->{slotindex});
     }
     elsif ($line =~ /arvados\.errors\.Keep/) {
-      $jobstep[$job]->{tempfail} = 1;
+      $jobstep[$jobstepidx]->{tempfail} = 1;
     }
   }
 }
 
 
-sub process_stderr
+sub process_stderr_final
 {
-  my $job = shift;
-  my $task_success = shift;
-  preprocess_stderr ($job);
+  my $jobstepidx = shift;
+  preprocess_stderr ($jobstepidx);
 
   map {
-    Log ($job, "stderr $_");
-  } split ("\n", $jobstep[$job]->{stderr});
+    Log ($jobstepidx, "stderr $_");
+  } split ("\n", $jobstep[$jobstepidx]->{stderr});
+  $jobstep[$jobstepidx]->{stderr} = '';
 }
 
 sub fetch_block
@@ -1636,7 +1606,7 @@ sub killem
     }
     if (!exists $proc{$_}->{"sent_$sig"})
     {
-      Log ($proc{$_}->{jobstep}, "sending 2x signal $sig to pid $_");
+      Log ($proc{$_}->{jobstepidx}, "sending 2x signal $sig to pid $_");
       kill $sig, $_;
       select (undef, undef, undef, 0.1);
       if ($sig == 2)
@@ -1760,16 +1730,21 @@ sub log_writer_is_active() {
   return $log_pipe_pid;
 }
 
-sub Log				# ($jobstep_id, $logmessage)
+sub Log				# ($jobstepidx, $logmessage)
 {
-  if ($_[1] =~ /\n/) {
+  my ($jobstepidx, $logmessage) = @_;
+  if ($logmessage =~ /\n/) {
     for my $line (split (/\n/, $_[1])) {
-      Log ($_[0], $line);
+      Log ($jobstepidx, $line);
     }
     return;
   }
   my $fh = select STDERR; $|=1; select $fh;
-  my $message = sprintf ("%s %d %s %s", $job_id, $$, @_);
+  my $task_qseq = '';
+  if (defined($jobstepidx) && exists($jobstep[$jobstepidx]->{arvados_task})) {
+    $task_qseq = $jobstepidx;
+  }
+  my $message = sprintf ("%s %d %s %s", $job_id, $$, $task_qseq, $logmessage);
   $message =~ s{([^ -\176])}{"\\" . sprintf ("%03o", ord($1))}ge;
   $message .= "\n";
   my $datetime;
@@ -1893,6 +1868,83 @@ sub freezeunquote
 }
 
 
+sub srun_sync
+{
+  my $srunargs = shift;
+  my $execargs = shift;
+  my $opts = shift || {};
+  my $stdin = shift;
+
+  my $label = exists $opts->{label} ? $opts->{label} : "@$execargs";
+  Log (undef, "$label: start");
+
+  my ($stderr_r, $stderr_w);
+  pipe $stderr_r, $stderr_w or croak("pipe() failed: $!");
+
+  my ($stdout_r, $stdout_w);
+  pipe $stdout_r, $stdout_w or croak("pipe() failed: $!");
+
+  my $srunpid = fork();
+  if ($srunpid == 0)
+  {
+    close($stderr_r);
+    close($stdout_r);
+    fcntl($stderr_w, F_SETFL, 0) or croak($!); # no close-on-exec
+    fcntl($stdout_w, F_SETFL, 0) or croak($!);
+    open(STDERR, ">&", $stderr_w);
+    open(STDOUT, ">&", $stdout_w);
+    srun ($srunargs, $execargs, $opts, $stdin);
+    exit (1);
+  }
+  close($stderr_w);
+  close($stdout_w);
+
+  set_nonblocking($stderr_r);
+  set_nonblocking($stdout_r);
+
+  # Add entries to @jobstep and %proc so check_squeue() and
+  # freeze_if_want_freeze() can treat it like a job task process.
+  push @jobstep, {
+    stderr => '',
+    stderr_at => 0,
+    stderr_captured => '',
+    stdout_r => $stdout_r,
+    stdout_captured => '',
+  };
+  my $jobstepidx = $#jobstep;
+  $proc{$srunpid} = {
+    jobstepidx => $jobstepidx,
+  };
+  $reader{$jobstepidx} = $stderr_r;
+
+  while ($srunpid != waitpid ($srunpid, WNOHANG)) {
+    my $busy = readfrompipes();
+    if (!$busy || ($latest_refresh + 2 < scalar time)) {
+      check_refresh_wanted();
+      check_squeue();
+    }
+    if (!$busy) {
+      select(undef, undef, undef, 0.1);
+    }
+    killem(keys %proc) if $main::please_freeze;
+  }
+  my $exited = $?;
+
+  1 while readfrompipes();
+  process_stderr_final ($jobstepidx);
+
+  Log (undef, "$label: exit ".exit_status_s($exited));
+
+  close($stdout_r);
+  close($stderr_r);
+  delete $proc{$srunpid};
+  delete $reader{$jobstepidx};
+
+  my $j = pop @jobstep;
+  return ($exited, $j->{stdout_captured}, $j->{stderr_captured});
+}
+
+
 sub srun
 {
   my $srunargs = shift;
diff --git a/sdk/cli/test/binstub_clean_fail/mount b/sdk/cli/test/binstub_clean_fail/mount
index 961ac28..52a7353 100755
--- a/sdk/cli/test/binstub_clean_fail/mount
+++ b/sdk/cli/test/binstub_clean_fail/mount
@@ -1,3 +1,3 @@
 #!/bin/sh
 echo >&2 Failing mount stub was called
-exit 1
+exit 44
diff --git a/sdk/cli/test/test_crunch-job.rb b/sdk/cli/test/test_crunch-job.rb
index 22d756a..0fbff2e 100644
--- a/sdk/cli/test/test_crunch-job.rb
+++ b/sdk/cli/test/test_crunch-job.rb
@@ -91,7 +91,7 @@ class TestCrunchJob < Minitest::Test
       tryjobrecord j, binstubs: ['clean_fail']
     end
     assert_match /Failing mount stub was called/, err
-    assert_match /Clean work dirs: exit 1\n$/, err
+    assert_match /clean work dirs: exit 44\n$/, err
     assert_equal SPECIAL_EXIT[:EX_RETRY_UNLOCKED], $?.exitstatus
   end
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list