[ARVADOS] updated: 4db7620243da1b74d41a44ef9d1c6a200b6eaae5

git at public.curoverse.com git at public.curoverse.com
Mon Jun 15 17:41:02 EDT 2015


Summary of changes:
 sdk/cli/bin/crunch-job | 4 ++--
 1 file changed, 2 insertions(+), 2 deletions(-)

  discards  85c53a9973232f020c55e08c0a788186c21477e8 (commit)
       via  4db7620243da1b74d41a44ef9d1c6a200b6eaae5 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (85c53a9973232f020c55e08c0a788186c21477e8)
            \
             N -- N -- N (4db7620243da1b74d41a44ef9d1c6a200b6eaae5)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 4db7620243da1b74d41a44ef9d1c6a200b6eaae5
Author: Brett Smith <brett at curoverse.com>
Date:   Mon Jun 15 13:54:36 2015 -0400

    4410: Crunch retries jobs when all SLURM nodes fail.
    
    See the ticket for detailed background discussion and implementation
    rationale, especially notes 13 and 14.

diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index d8ae6e6..f8b7e16 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -98,6 +98,7 @@ use File::Path qw( make_path remove_tree );
 
 use constant TASK_TEMPFAIL => 111;
 use constant EX_TEMPFAIL => 75;
+use constant EX_RETRY_UNLOCKED => 93;
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
@@ -721,6 +722,7 @@ ONELEVEL:
 my $thisround_succeeded = 0;
 my $thisround_failed = 0;
 my $thisround_failed_multiple = 0;
+my $working_slot_count = scalar(@slot);
 
 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
 		       or $a <=> $b } @jobstep_todo;
@@ -986,6 +988,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     {
       update_progress_stats();
     }
+    $working_slot_count = scalar(grep { $_->{node}->{losing_streak} == 0 &&
+                                        $_->{node}->{hold_count} < 4 } @slot);
     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
 	($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
     {
@@ -1009,10 +1013,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
 
     # give up if no nodes are succeeding
-    if (!grep { $_->{node}->{losing_streak} == 0 &&
-                    $_->{node}->{hold_count} < 4 } @slot) {
-      my $message = "Every node has failed -- giving up on this round";
-      Log (undef, $message);
+    if ($working_slot_count < 1) {
+      Log(undef, "Every node has failed -- giving up");
       last THISROUND;
     }
   }
@@ -1048,18 +1050,17 @@ freeze_if_want_freeze();
 
 if (!defined $main::success)
 {
-  if (@jobstep_todo &&
-      $thisround_succeeded == 0 &&
-      ($thisround_failed == 0 || $thisround_failed > 4))
-  {
+  if (!@jobstep_todo) {
+    $main::success = 1;
+  } elsif ($working_slot_count < 1) {
+    save_meta();
+    exit(EX_RETRY_UNLOCKED);
+  } elsif ($thisround_succeeded == 0 &&
+           ($thisround_failed == 0 || $thisround_failed > 4)) {
     my $message = "stop because $thisround_failed tasks failed and none succeeded";
     Log (undef, $message);
     $main::success = 0;
   }
-  if (!@jobstep_todo)
-  {
-    $main::success = 1;
-  }
 }
 
 goto ONELEVEL if !defined $main::success;
@@ -1556,6 +1557,8 @@ sub fhbits
 # Send log output to Keep via arv-put.
 #
 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_out_buf is a string containing all output read from arv-put so far.
+# $log_pipe_out_select is an IO::Select object around $log_pipe_out.
 # $log_pipe_pid is the pid of the arv-put subprocess.
 #
 # The only functions that should access these variables directly are:
@@ -1564,6 +1567,13 @@ sub fhbits
 #     Starts an arv-put pipe, reading data on stdin and writing it to
 #     a $logfilename file in an output collection.
 #
+# log_writer_read_output([$timeout])
+#     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
+#     Passes $timeout to the select() call, with a default of 0.01.
+#     Returns the result of the last read() call on $log_pipe_out, or
+#     -1 if read() wasn't called because select() timed out.
+#     Only other log_writer_* functions should need to call this.
+#
 # log_writer_send($txt)
 #     Writes $txt to the output log collection.
 #
@@ -1574,25 +1584,40 @@ sub fhbits
 #     Returns a true value if there is currently a live arv-put
 #     process, false otherwise.
 #
-my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
+my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
+    $log_pipe_pid);
 
 sub log_writer_start($)
 {
   my $logfilename = shift;
   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
                         'arv-put',
-                        '--portable-data-hash',
-                        '--project-uuid', $Job->{owner_uuid},
+                        '--stream',
                         '--retries', '3',
-                        '--name', $logfilename,
                         '--filename', $logfilename,
                         '-');
+  $log_pipe_out_buf = "";
+  $log_pipe_out_select = IO::Select->new($log_pipe_out);
+}
+
+sub log_writer_read_output {
+  my $timeout = shift || 0.01;
+  my $read = -1;
+  while ($read && $log_pipe_out_select->can_read($timeout)) {
+    $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
+                 length($log_pipe_out_buf));
+  }
+  if (!defined($read)) {
+    Log(undef, "error reading log manifest from arv-put: $!");
+  }
+  return $read;
 }
 
 sub log_writer_send($)
 {
   my $txt = shift;
   print $log_pipe_in $txt;
+  log_writer_read_output();
 }
 
 sub log_writer_finish()
@@ -1600,22 +1625,24 @@ sub log_writer_finish()
   return unless $log_pipe_pid;
 
   close($log_pipe_in);
-  my $arv_put_output;
 
-  my $s = IO::Select->new($log_pipe_out);
-  if ($s->can_read(120)) {
-    sysread($log_pipe_out, $arv_put_output, 1024);
-    chomp($arv_put_output);
-  } else {
+  my $read_result = log_writer_read_output(120);
+  if ($read_result == -1) {
     Log (undef, "timed out reading from 'arv-put'");
+  } elsif ($read_result != 0) {
+    Log(undef, "failed to read arv-put log manifest to EOF");
   }
 
   waitpid($log_pipe_pid, 0);
-  $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
   if ($?) {
-    Log("log_writer_finish: arv-put exited ".exit_status_s($?))
+    Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
   }
 
+  close($log_pipe_out);
+  my $arv_put_output = $log_pipe_out_buf;
+  $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
+      $log_pipe_out_select = undef;
+
   return $arv_put_output;
 }
 
@@ -1679,10 +1706,22 @@ sub save_meta
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
   return unless log_writer_is_active();
 
-  my $loglocator = log_writer_finish();
-  Log (undef, "log manifest is $loglocator");
-  $Job->{'log'} = $loglocator;
-  $Job->update_attributes('log', $loglocator);
+  my $log_manifest = "";
+  if ($Job->{log}) {
+    my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
+    $log_manifest .= $prev_log_coll->{manifest_text};
+  }
+  $log_manifest .= log_writer_finish();
+
+  my $log_coll = api_call(
+    "collections/create", ensure_unique_name => 1, collection => {
+      manifest_text => $log_manifest,
+      owner_uuid => $Job->{owner_uuid},
+      name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
+    });
+  Log(undef, "log manifest is " . $log_coll->{portable_data_hash});
+  $Job->{'log'} = $log_coll->{portable_data_hash};
+  $Job->update_attributes('log', $log_coll->{portable_data_hash});
 }
 
 
diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb
index 3c1e4c6..439bddb 100755
--- a/services/api/script/crunch-dispatch.rb
+++ b/services/api/script/crunch-dispatch.rb
@@ -54,6 +54,8 @@ class Dispatcher
   include ApplicationHelper
 
   EXIT_TEMPFAIL = 75
+  EXIT_RETRY_UNLOCKED = 93
+  RETRY_UNLOCKED_LIMIT = 3
 
   def initialize
     @crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
@@ -77,6 +79,8 @@ class Dispatcher
     @pipe_auth_tokens = {}
     @running = {}
     @todo = []
+    @todo_job_retries = {}
+    @job_retry_counts = Hash.new(0)
     @todo_pipelines = []
   end
 
@@ -86,7 +90,7 @@ class Dispatcher
 
   def refresh_todo
     if $options[:jobs]
-      @todo = Job.queue.select(&:repository)
+      @todo = @todo_job_retries.values + Job.queue.select(&:repository)
     end
     if $options[:pipelines]
       @todo_pipelines = PipelineInstance.queue
@@ -417,6 +421,10 @@ class Dispatcher
                    '--job', job.uuid,
                    '--git-dir', @arvados_internal]
 
+      if @todo_job_retries.include?(job.uuid)
+        cmd_args << "--force-unlock"
+      end
+
       $stderr.puts "dispatch: #{cmd_args.join ' '}"
 
       begin
@@ -452,6 +460,7 @@ class Dispatcher
         log_throttle_bytes_skipped: 0,
       }
       i.close
+      @todo_job_retries.delete(job.uuid)
       update_node_status
     end
   end
@@ -650,18 +659,36 @@ class Dispatcher
 
     # Wait the thread (returns a Process::Status)
     exit_status = j_done[:wait_thr].value.exitstatus
+    exit_tempfail = exit_status == EXIT_TEMPFAIL
 
     $stderr.puts "dispatch: child #{pid_done} exit #{exit_status}"
     $stderr.puts "dispatch: job #{job_done.uuid} end"
 
     jobrecord = Job.find_by_uuid(job_done.uuid)
-    if exit_status != EXIT_TEMPFAIL and jobrecord.state == "Running"
-      # crunch-job did not return exit code 75 (see below) and left the job in
-      # the "Running" state, which means there was an unhandled error.  Fail
-      # the job.
-      jobrecord.state = "Failed"
-      if not jobrecord.save
-        $stderr.puts "dispatch: jobrecord.save failed"
+
+    if exit_status == EXIT_RETRY_UNLOCKED
+      # The job failed because all of the nodes allocated to it
+      # failed.  Only this crunch-dispatch process can retry the job:
+      # it's already locked, and there's no way to put it back in the
+      # Queued state.  Put it in our internal todo list unless the job
+      # has failed this way excessively.
+      @job_retry_counts[jobrecord.uuid] += 1
+      exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT
+      if exit_tempfail
+        @todo_job_retries[jobrecord.uuid] = jobrecord
+      else
+        $stderr.puts("dispatch: job #{jobrecord.uuid} exceeded node failure retry limit -- giving up")
+      end
+    end
+
+    if !exit_tempfail
+      @job_retry_counts.delete(jobrecord.uuid)
+      if jobrecord.state == "Running"
+        # Apparently there was an unhandled error.  Fail the job.
+        jobrecord.state = "Failed"
+        if not jobrecord.save
+          $stderr.puts "dispatch: jobrecord.save failed"
+        end
       end
     else
       # Don't fail the job if crunch-job didn't even get as far as
@@ -670,6 +697,9 @@ class Dispatcher
       # queue. If crunch-job exited after losing a race to another
       # crunch-job process, it exits 75 and we should leave the job
       # record alone so the winner of the race do its thing.
+      # If crunch-job exited after all of its allocated nodes failed,
+      # it exits 93, and we want to retry it later (see the
+      # EXIT_RETRY_UNLOCKED `if` block).
       #
       # There is still an unhandled race condition: If our crunch-job
       # process is about to lose a race with another crunch-job
@@ -683,7 +713,7 @@ class Dispatcher
 
     # Invalidate the per-job auth token, unless the job is still queued and we
     # might want to try it again.
-    if jobrecord.state != "Queued"
+    if jobrecord.state != "Queued" and !@todo_job_retries.include?(jobrecord.uuid)
       j_done[:job_auth].update_attributes expires_at: Time.now
     end
 

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list