[ARVADOS] updated: ad7679cfe57733940f8461097ee01bfd97997ce6

git at public.curoverse.com git at public.curoverse.com
Mon Jun 22 16:55:21 EDT 2015


Summary of changes:
 sdk/cli/bin/crunch-job                 | 150 +++++++++++++++++++++++----------
 services/api/script/crunch-dispatch.rb |  69 ++++++++++++---
 2 files changed, 161 insertions(+), 58 deletions(-)

       via  ad7679cfe57733940f8461097ee01bfd97997ce6 (commit)
       via  b269c28f1d54e8609f36c8aeb77a2b6025172066 (commit)
       via  24b4d1ad90558332cd5251b265a54c21ffdbfd36 (commit)
      from  5386f6657234f3c24a4783cf63ab85016eda85b8 (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit ad7679cfe57733940f8461097ee01bfd97997ce6
Merge: 5386f66 b269c28
Author: Brett Smith <brett at curoverse.com>
Date:   Mon Jun 22 16:55:07 2015 -0400

    Merge branch '4410-slurm-fails-are-tempfails-wip'
    
    Closes #4410, #6283.


commit b269c28f1d54e8609f36c8aeb77a2b6025172066
Author: Brett Smith <brett at curoverse.com>
Date:   Mon Jun 15 13:54:36 2015 -0400

    4410: Crunch retries jobs when all SLURM nodes fail.
    
    See the ticket for detailed background discussion and implementation
    rationale, especially notes 13 and 14.
    
    This required a couple of ancillary changes:
    
    * crunch-job now makes a distinction between "task failed because a
      node failed," and "task failed for other temporary reason."  It uses
      this additional information to decide when it should retry tasks
      itself, and when it needs to give up and kick the problem up to
      crunch-dispatch.
    
    * crunch-job now handles creating log collections itself from
      manifests generated by arv-put.  This enables it to append to logs
      generated during previous attempts to run the job.

diff --git a/sdk/cli/bin/crunch-job b/sdk/cli/bin/crunch-job
index d8ae6e6..b38efdc 100755
--- a/sdk/cli/bin/crunch-job
+++ b/sdk/cli/bin/crunch-job
@@ -98,6 +98,7 @@ use File::Path qw( make_path remove_tree );
 
 use constant TASK_TEMPFAIL => 111;
 use constant EX_TEMPFAIL => 75;
+use constant EX_RETRY_UNLOCKED => 93;
 
 $ENV{"TMPDIR"} ||= "/tmp";
 unless (defined $ENV{"CRUNCH_TMP"}) {
@@ -292,9 +293,16 @@ foreach (@sinfo)
   {
     Log (undef, "node $nodename - $ncpus slots");
     my $node = { name => $nodename,
-		 ncpus => $ncpus,
-		 losing_streak => 0,
-		 hold_until => 0 };
+                 ncpus => $ncpus,
+                 # The number of consecutive times a task has been dispatched
+                 # to this node and failed.
+                 losing_streak => 0,
+                 # The number of consecutive times that SLURM has reported
+                 # a node failure since the last successful task.
+                 fail_count => 0,
+                 # Don't dispatch work to this node until this time
+                 # (in seconds since the epoch) has passed.
+                 hold_until => 0 };
     foreach my $cpu (1..$ncpus)
     {
       push @slot, { node => $node,
@@ -721,6 +729,7 @@ ONELEVEL:
 my $thisround_succeeded = 0;
 my $thisround_failed = 0;
 my $thisround_failed_multiple = 0;
+my $working_slot_count = scalar(@slot);
 
 @jobstep_todo = sort { $jobstep[$a]->{level} <=> $jobstep[$b]->{level}
 		       or $a <=> $b } @jobstep_todo;
@@ -950,6 +959,7 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
   $Jobstep->{slotindex} = $childslot;
   delete $Jobstep->{stderr};
   delete $Jobstep->{finishtime};
+  delete $Jobstep->{tempfail};
 
   $Jobstep->{'arvados_task'}->{started_at} = strftime "%Y-%m-%dT%H:%M:%SZ", gmtime($Jobstep->{starttime});
   $Jobstep->{'arvados_task'}->save;
@@ -986,6 +996,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     {
       update_progress_stats();
     }
+    $working_slot_count = scalar(grep { $_->{node}->{fail_count} == 0 &&
+                                        $_->{node}->{hold_count} < 4 } @slot);
     if (($thisround_failed_multiple >= 8 && $thisround_succeeded == 0) ||
 	($thisround_failed_multiple >= 16 && $thisround_failed_multiple > $thisround_succeeded))
     {
@@ -1009,10 +1021,8 @@ for (my $todo_ptr = 0; $todo_ptr <= $#jobstep_todo; $todo_ptr ++)
     }
 
     # give up if no nodes are succeeding
-    if (!grep { $_->{node}->{losing_streak} == 0 &&
-                    $_->{node}->{hold_count} < 4 } @slot) {
-      my $message = "Every node has failed -- giving up on this round";
-      Log (undef, $message);
+    if ($working_slot_count < 1) {
+      Log(undef, "Every node has failed -- giving up");
       last THISROUND;
     }
   }
@@ -1048,18 +1058,18 @@ freeze_if_want_freeze();
 
 if (!defined $main::success)
 {
-  if (@jobstep_todo &&
-      $thisround_succeeded == 0 &&
-      ($thisround_failed == 0 || $thisround_failed > 4))
-  {
+  if (!@jobstep_todo) {
+    $main::success = 1;
+  } elsif ($working_slot_count < 1) {
+    save_output_collection();
+    save_meta();
+    exit(EX_RETRY_UNLOCKED);
+  } elsif ($thisround_succeeded == 0 &&
+           ($thisround_failed == 0 || $thisround_failed > 4)) {
     my $message = "stop because $thisround_failed tasks failed and none succeeded";
     Log (undef, $message);
     $main::success = 0;
   }
-  if (!@jobstep_todo)
-  {
-    $main::success = 1;
-  }
 }
 
 goto ONELEVEL if !defined $main::success;
@@ -1067,16 +1077,7 @@ goto ONELEVEL if !defined $main::success;
 
 release_allocation();
 freeze();
-my $collated_output = &create_output_collection();
-
-if (!$collated_output) {
-  Log (undef, "Failed to write output collection");
-}
-else {
-  Log(undef, "job output $collated_output");
-  $Job->update_attributes('output' => $collated_output);
-}
-
+my $collated_output = save_output_collection();
 Log (undef, "finish");
 
 save_meta();
@@ -1141,7 +1142,7 @@ sub reapchildren
   if (!$task_success)
   {
     my $temporary_fail;
-    $temporary_fail ||= $Jobstep->{node_fail};
+    $temporary_fail ||= $Jobstep->{tempfail};
     $temporary_fail ||= ($exitvalue == TASK_TEMPFAIL);
 
     ++$thisround_failed;
@@ -1179,6 +1180,7 @@ sub reapchildren
     ++$thisround_succeeded;
     $slot[$proc{$pid}->{slot}]->{node}->{losing_streak} = 0;
     $slot[$proc{$pid}->{slot}]->{node}->{hold_until} = 0;
+    $slot[$proc{$pid}->{slot}]->{node}->{fail_count} = 0;
     push @jobstep_done, $jobstepid;
     Log ($jobstepid, "success in $elapsed seconds");
   }
@@ -1389,10 +1391,19 @@ sub preprocess_stderr
       # whoa.
       $main::please_freeze = 1;
     }
-    elsif ($line =~ /(srun: error: (Node failure on|Unable to create job step|.*: Communication connection failure))|arvados.errors.Keep/) {
-      $jobstep[$job]->{node_fail} = 1;
+    elsif ($line =~ /srun: error: Node failure on/) {
+      my $job_slot_index = $jobstep[$job]->{slotindex};
+      $slot[$job_slot_index]->{node}->{fail_count}++;
+      $jobstep[$job]->{tempfail} = 1;
+      ban_node_by_slot($job_slot_index);
+    }
+    elsif ($line =~ /srun: error: (Unable to create job step|.*: Communication connection failure)/) {
+      $jobstep[$job]->{tempfail} = 1;
       ban_node_by_slot($jobstep[$job]->{slotindex});
     }
+    elsif ($line =~ /arvados\.errors\.Keep/) {
+      $jobstep[$job]->{tempfail} = 1;
+    }
   }
 }
 
@@ -1511,6 +1522,20 @@ print (arvados.api("v1").collections().
   return $joboutput;
 }
 
+# Calls create_output_collection, logs the result, and returns it.
+# If that was successful, save that as the output in the job record.
+sub save_output_collection {
+  my $collated_output = create_output_collection();
+
+  if (!$collated_output) {
+    Log(undef, "Failed to write output collection");
+  }
+  else {
+    Log(undef, "job output $collated_output");
+    $Job->update_attributes('output' => $collated_output);
+  }
+  return $collated_output;
+}
 
 sub killem
 {
@@ -1556,6 +1581,8 @@ sub fhbits
 # Send log output to Keep via arv-put.
 #
 # $log_pipe_in and $log_pipe_out are the input and output filehandles to the arv-put pipe.
+# $log_pipe_out_buf is a string containing all output read from arv-put so far.
+# $log_pipe_out_select is an IO::Select object around $log_pipe_out.
 # $log_pipe_pid is the pid of the arv-put subprocess.
 #
 # The only functions that should access these variables directly are:
@@ -1564,6 +1591,13 @@ sub fhbits
 #     Starts an arv-put pipe, reading data on stdin and writing it to
 #     a $logfilename file in an output collection.
 #
+# log_writer_read_output([$timeout])
+#     Read output from $log_pipe_out and append it to $log_pipe_out_buf.
+#     Passes $timeout to the select() call, with a default of 0.01.
+#     Returns the result of the last read() call on $log_pipe_out, or
+#     -1 if read() wasn't called because select() timed out.
+#     Only other log_writer_* functions should need to call this.
+#
 # log_writer_send($txt)
 #     Writes $txt to the output log collection.
 #
@@ -1574,25 +1608,40 @@ sub fhbits
 #     Returns a true value if there is currently a live arv-put
 #     process, false otherwise.
 #
-my ($log_pipe_in, $log_pipe_out, $log_pipe_pid);
+my ($log_pipe_in, $log_pipe_out, $log_pipe_out_buf, $log_pipe_out_select,
+    $log_pipe_pid);
 
 sub log_writer_start($)
 {
   my $logfilename = shift;
   $log_pipe_pid = open2($log_pipe_out, $log_pipe_in,
                         'arv-put',
-                        '--portable-data-hash',
-                        '--project-uuid', $Job->{owner_uuid},
+                        '--stream',
                         '--retries', '3',
-                        '--name', $logfilename,
                         '--filename', $logfilename,
                         '-');
+  $log_pipe_out_buf = "";
+  $log_pipe_out_select = IO::Select->new($log_pipe_out);
+}
+
+sub log_writer_read_output {
+  my $timeout = shift || 0.01;
+  my $read = -1;
+  while ($read && $log_pipe_out_select->can_read($timeout)) {
+    $read = read($log_pipe_out, $log_pipe_out_buf, 65536,
+                 length($log_pipe_out_buf));
+  }
+  if (!defined($read)) {
+    Log(undef, "error reading log manifest from arv-put: $!");
+  }
+  return $read;
 }
 
 sub log_writer_send($)
 {
   my $txt = shift;
   print $log_pipe_in $txt;
+  log_writer_read_output();
 }
 
 sub log_writer_finish()
@@ -1600,22 +1649,24 @@ sub log_writer_finish()
   return unless $log_pipe_pid;
 
   close($log_pipe_in);
-  my $arv_put_output;
 
-  my $s = IO::Select->new($log_pipe_out);
-  if ($s->can_read(120)) {
-    sysread($log_pipe_out, $arv_put_output, 1024);
-    chomp($arv_put_output);
-  } else {
+  my $read_result = log_writer_read_output(120);
+  if ($read_result == -1) {
     Log (undef, "timed out reading from 'arv-put'");
+  } elsif ($read_result != 0) {
+    Log(undef, "failed to read arv-put log manifest to EOF");
   }
 
   waitpid($log_pipe_pid, 0);
-  $log_pipe_pid = $log_pipe_in = $log_pipe_out = undef;
   if ($?) {
-    Log("log_writer_finish: arv-put exited ".exit_status_s($?))
+    Log(undef, "log_writer_finish: arv-put exited " . exit_status_s($?))
   }
 
+  close($log_pipe_out);
+  my $arv_put_output = $log_pipe_out_buf;
+  $log_pipe_pid = $log_pipe_in = $log_pipe_out = $log_pipe_out_buf =
+      $log_pipe_out_select = undef;
+
   return $arv_put_output;
 }
 
@@ -1679,10 +1730,21 @@ sub save_meta
   return if $justcheckpoint;  # checkpointing is not relevant post-Warehouse.pm
   return unless log_writer_is_active();
 
-  my $loglocator = log_writer_finish();
-  Log (undef, "log manifest is $loglocator");
-  $Job->{'log'} = $loglocator;
-  $Job->update_attributes('log', $loglocator);
+  my $log_manifest = "";
+  if ($Job->{log}) {
+    my $prev_log_coll = api_call("collections/get", uuid => $Job->{log});
+    $log_manifest .= $prev_log_coll->{manifest_text};
+  }
+  $log_manifest .= log_writer_finish();
+
+  my $log_coll = api_call(
+    "collections/create", ensure_unique_name => 1, collection => {
+      manifest_text => $log_manifest,
+      owner_uuid => $Job->{owner_uuid},
+      name => sprintf("Log from %s job %s", $Job->{script}, $Job->{uuid}),
+    });
+  Log(undef, "log collection is " . $log_coll->{portable_data_hash});
+  $Job->update_attributes('log' => $log_coll->{portable_data_hash});
 }
 
 
diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb
index 3c1e4c6..515bfaa 100755
--- a/services/api/script/crunch-dispatch.rb
+++ b/services/api/script/crunch-dispatch.rb
@@ -54,6 +54,8 @@ class Dispatcher
   include ApplicationHelper
 
   EXIT_TEMPFAIL = 75
+  EXIT_RETRY_UNLOCKED = 93
+  RETRY_UNLOCKED_LIMIT = 3
 
   def initialize
     @crunch_job_bin = (ENV['CRUNCH_JOB_BIN'] || `which arv-crunch-job`.strip)
@@ -77,6 +79,8 @@ class Dispatcher
     @pipe_auth_tokens = {}
     @running = {}
     @todo = []
+    @todo_job_retries = {}
+    @job_retry_counts = Hash.new(0)
     @todo_pipelines = []
   end
 
@@ -86,7 +90,7 @@ class Dispatcher
 
   def refresh_todo
     if $options[:jobs]
-      @todo = Job.queue.select(&:repository)
+      @todo = @todo_job_retries.values + Job.queue.select(&:repository)
     end
     if $options[:pipelines]
       @todo_pipelines = PipelineInstance.queue
@@ -417,6 +421,10 @@ class Dispatcher
                    '--job', job.uuid,
                    '--git-dir', @arvados_internal]
 
+      if @todo_job_retries.include?(job.uuid)
+        cmd_args << "--force-unlock"
+      end
+
       $stderr.puts "dispatch: #{cmd_args.join ' '}"
 
       begin
@@ -452,6 +460,7 @@ class Dispatcher
         log_throttle_bytes_skipped: 0,
       }
       i.close
+      @todo_job_retries.delete(job.uuid)
       update_node_status
     end
   end
@@ -650,26 +659,49 @@ class Dispatcher
 
     # Wait the thread (returns a Process::Status)
     exit_status = j_done[:wait_thr].value.exitstatus
+    exit_tempfail = exit_status == EXIT_TEMPFAIL
 
     $stderr.puts "dispatch: child #{pid_done} exit #{exit_status}"
     $stderr.puts "dispatch: job #{job_done.uuid} end"
 
     jobrecord = Job.find_by_uuid(job_done.uuid)
-    if exit_status != EXIT_TEMPFAIL and jobrecord.state == "Running"
-      # crunch-job did not return exit code 75 (see below) and left the job in
-      # the "Running" state, which means there was an unhandled error.  Fail
-      # the job.
-      jobrecord.state = "Failed"
-      if not jobrecord.save
-        $stderr.puts "dispatch: jobrecord.save failed"
+
+    if exit_status == EXIT_RETRY_UNLOCKED
+      # The job failed because all of the nodes allocated to it
+      # failed.  Only this crunch-dispatch process can retry the job:
+      # it's already locked, and there's no way to put it back in the
+      # Queued state.  Put it in our internal todo list unless the job
+      # has failed this way excessively.
+      @job_retry_counts[jobrecord.uuid] += 1
+      exit_tempfail = @job_retry_counts[jobrecord.uuid] <= RETRY_UNLOCKED_LIMIT
+      if exit_tempfail
+        @todo_job_retries[jobrecord.uuid] = jobrecord
+      else
+        $stderr.puts("dispatch: job #{jobrecord.uuid} exceeded node failure retry limit -- giving up")
+      end
+    end
+
+    if !exit_tempfail
+      @job_retry_counts.delete(jobrecord.uuid)
+      if jobrecord.state == "Running"
+        # Apparently there was an unhandled error.  That could potentially
+        # include "all allocated nodes failed" when we don't to retry
+        # because the job has already been retried RETRY_UNLOCKED_LIMIT
+        # times.  Fail the job.
+        jobrecord.state = "Failed"
+        if not jobrecord.save
+          $stderr.puts "dispatch: jobrecord.save failed"
+        end
       end
     else
-      # Don't fail the job if crunch-job didn't even get as far as
-      # starting it. If the job failed to run due to an infrastructure
+      # If the job failed to run due to an infrastructure
       # issue with crunch-job or slurm, we want the job to stay in the
       # queue. If crunch-job exited after losing a race to another
       # crunch-job process, it exits 75 and we should leave the job
-      # record alone so the winner of the race do its thing.
+      # record alone so the winner of the race can do its thing.
+      # If crunch-job exited after all of its allocated nodes failed,
+      # it exits 93, and we want to retry it later (see the
+      # EXIT_RETRY_UNLOCKED `if` block).
       #
       # There is still an unhandled race condition: If our crunch-job
       # process is about to lose a race with another crunch-job
@@ -683,7 +715,7 @@ class Dispatcher
 
     # Invalidate the per-job auth token, unless the job is still queued and we
     # might want to try it again.
-    if jobrecord.state != "Queued"
+    if jobrecord.state != "Queued" and !@todo_job_retries.include?(jobrecord.uuid)
       j_done[:job_auth].update_attributes expires_at: Time.now
     end
 
@@ -737,6 +769,14 @@ class Dispatcher
       select(@running.values.collect { |j| [j[:stdout], j[:stderr]] }.flatten,
              [], [], 1)
     end
+    # If there are jobs we wanted to retry, we have to mark them as failed now.
+    # Other dispatchers can't pick them up because we hold their lock.
+    @todo_job_retries.each_key do |job_uuid|
+      job = Job.find_by_uuid(job_uuid)
+      if job.state == "Running"
+        fail_job(job, "crunch-dispatch was stopped during job's tempfail retry loop")
+      end
+    end
   end
 
   protected

commit 24b4d1ad90558332cd5251b265a54c21ffdbfd36
Author: Brett Smith <brett at curoverse.com>
Date:   Mon Jun 15 17:04:25 2015 -0400

    4410: crunch-dispatch logs crunch-job exit later.
    
    This makes it easier to log the exit code, and makes the logs look
    nicer because the exit log doesn't interrupt crunch-job's stderr.

diff --git a/services/api/script/crunch-dispatch.rb b/services/api/script/crunch-dispatch.rb
index 1002f91..3c1e4c6 100755
--- a/services/api/script/crunch-dispatch.rb
+++ b/services/api/script/crunch-dispatch.rb
@@ -634,8 +634,6 @@ class Dispatcher
     return if !pid_done
 
     job_done = j_done[:job]
-    $stderr.puts "dispatch: child #{pid_done} exit"
-    $stderr.puts "dispatch: job #{job_done.uuid} end"
 
     # Ensure every last drop of stdout and stderr is consumed.
     read_pipes
@@ -653,6 +651,9 @@ class Dispatcher
     # Wait the thread (returns a Process::Status)
     exit_status = j_done[:wait_thr].value.exitstatus
 
+    $stderr.puts "dispatch: child #{pid_done} exit #{exit_status}"
+    $stderr.puts "dispatch: job #{job_done.uuid} end"
+
     jobrecord = Job.find_by_uuid(job_done.uuid)
     if exit_status != EXIT_TEMPFAIL and jobrecord.state == "Running"
       # crunch-job did not return exit code 75 (see below) and left the job in

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list