[ARVADOS] created: 72af8e487f399fd726b8985a346a37f2e2160ca6

git at public.curoverse.com git at public.curoverse.com
Fri Aug 29 15:50:20 EDT 2014


        at  72af8e487f399fd726b8985a346a37f2e2160ca6 (commit)


commit 72af8e487f399fd726b8985a346a37f2e2160ca6
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Aug 29 15:48:54 2014 -0400

    3550: Add --run-jobs-here flag to arv-run-pipeline-instance.
    
    Run new jobs locally with arv-crunch-job instead of asking
    crunch-dispatch to run them.

diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance
index e9fe104..0f1f70e 100755
--- a/sdk/cli/bin/arv-run-pipeline-instance
+++ b/sdk/cli/bin/arv-run-pipeline-instance
@@ -151,11 +151,19 @@ p = Trollop::Parser.new do
       :short => :none,
       :type => :string)
   opt(:submit,
-      "Do not try to satisfy any components. Just create a pipeline instance and output its UUID.",
+      "Submit the pipeline instance to the server, and exit. Let the Crunch dispatch service to satisfy the components by finding/running jobs.",
+      :short => :none,
+      :type => :boolean)
+  opt(:run_pipeline_here,
+      "Manage the pipeline instance in-process. Submit jobs to Crunch as needed. Do not exit until the pipeline finishes (or fails).",
+      :short => :none,
+      :type => :boolean)
+  opt(:run_jobs_here,
+      "Manage the pipeline instance in-process. Find/run/watch jobs until the pipeline finishes (or fails). Implies --run-pipeline-here.",
       :short => :none,
       :type => :boolean)
   opt(:run_here,
-      "Manage the pipeline in process.",
+      "Synonym for --run-jobs-here.",
       :short => :none,
       :type => :boolean)
   stop_on [:'--']
@@ -165,6 +173,9 @@ $options = Trollop::with_standard_exception_handling p do
 end
 $debuglevel = $options[:debug_level] || ($options[:debug] && 1) || 0
 
+$options[:run_jobs_here] ||= $options[:run_here] # old flag name
+$options[:run_pipeline_here] ||= $options[:run_jobs_here] # B requires A
+
 if $options[:instance]
   if $options[:template] or $options[:submit]
     abort "#{$0}: syntax error: --instance cannot be combined with --template or --submit."
@@ -175,8 +186,8 @@ elsif not $options[:template]
   abort
 end
 
-if $options[:run_here] == $options[:submit]
-  abort "#{$0}: syntax error: you must supply either --run-here or --submit."
+if $options[:run_pipeline_here] == $options[:submit]
+  abort "#{$0}: error: you must supply --run-pipeline-here, --run-jobs-here, or --submit."
 end
 
 # Suppress SSL certificate checks if ARVADOS_API_HOST_INSECURE
@@ -491,6 +502,7 @@ class WhRunPipelineInstance
           # No job yet associated with this component and is component inputs
           # are fully specified (any output_of script_parameters are resolved
           # to real value)
+          my_submit_id = "instance #{@instance[:uuid]} rand #{rand(2**64).to_s(36)}"
           job = JobCache.create(@instance, cname, {
             :script => c[:script],
             :script_parameters => c[:script_parameters],
@@ -499,6 +511,8 @@ class WhRunPipelineInstance
             :nondeterministic => c[:nondeterministic],
             :runtime_constraints => c[:runtime_constraints],
             :owner_uuid => owner_uuid,
+            :is_locked_by_uuid => (@options[:run_jobs_here] ? owner_uuid : nil),
+            :submit_id => my_submit_id,
           }, {
             # This is the right place to put these attributes when
             # dealing with new API servers.
@@ -511,12 +525,50 @@ class WhRunPipelineInstance
           if job
             debuglog "component #{cname} new job #{job[:uuid]}"
             c[:job] = job
+            c[:run_in_process] = (@options[:run_jobs_here] and
+                                  job[:submit_id] == my_submit_id)
           else
             debuglog "component #{cname} new job failed", 0
             job_creation_failed += 1
           end
         end
 
+        if c[:job] and c[:run_in_process]
+          report_status
+          begin
+            require 'open3'
+            Open3.popen3("arv-crunch-job", "--force-unlock",
+                         "--job", c[:job][:uuid]) do |stdin, stdout, stderr, wait_thr|
+              debuglog "arv-crunch-job pid #{wait_thr.pid} started", 0
+              stdin.close
+              while true
+                rready, wready, = IO.select([stdout, stderr], [])
+                break if !rready[0]
+                begin
+                  buf = rready[0].read_nonblock(2**20)
+                rescue EOFError
+                  break
+                end
+                (rready[0] == stdout ? $stdout : $stderr).write(buf)
+              end
+              stdout.close
+              stderr.close
+              debuglog "arv-crunch-job pid #{wait_thr.pid} exit #{wait_thr.value.to_i}", 0
+            end
+            if not $arv.job.get(uuid: c[:job][:uuid])[:finished_at]
+              raise Exception.new("arv-crunch-job did not set finished_at.")
+            end
+          rescue Exception => e
+            debuglog "Interrupted (#{e}). Failing job.", 0
+            $arv.job.update(uuid: c[:job][:uuid],
+                            job: {
+                              finished_at: Time.now,
+                              running: false,
+                              success: false
+                            })
+          end
+        end
+
         if c[:job] and c[:job][:uuid]
           if (c[:job][:running] or
               not (c[:job][:finished_at] or c[:job][:cancelled_at]))
@@ -689,6 +741,8 @@ class WhRunPipelineInstance
                       "failed #{c[:job][:finished_at]}"
                     elsif c[:job][:started_at]
                       "started #{c[:job][:started_at]}"
+                    elsif c[:job][:is_locked_by_uuid]
+                      "starting #{c[:job][:started_at]}"
                     else
                       "queued #{c[:job][:created_at]}"
                     end

commit 8a4691d711947a83e9155599e96c4173e655a8f3
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Aug 29 15:46:35 2014 -0400

    3550: Improve startup time by removing excess api client instantiation.

diff --git a/sdk/cli/bin/arv-run-pipeline-instance b/sdk/cli/bin/arv-run-pipeline-instance
index 101b4ee..e9fe104 100755
--- a/sdk/cli/bin/arv-run-pipeline-instance
+++ b/sdk/cli/bin/arv-run-pipeline-instance
@@ -59,8 +59,6 @@
 class WhRunPipelineInstance
 end
 
-$application_version = 1.0
-
 if RUBY_VERSION < '1.9.3' then
   abort <<-EOS
 #{$0.gsub(/^\.\//,'')} requires Ruby version 1.9.3 or higher.
@@ -109,21 +107,6 @@ if $arvados_api_host.match /local/
   suppress_warnings { OpenSSL::SSL::VERIFY_PEER = OpenSSL::SSL::VERIFY_NONE }
 end
 
-class Google::APIClient
-  def discovery_document(api, version)
-    api = api.to_s
-    return @discovery_documents["#{api}:#{version}"] ||=
-      begin
-        response = self.execute!(
-                                 :http_method => :get,
-                                 :uri => self.discovery_uri(api, version),
-                                 :authenticated => false
-                                 )
-        response.body.class == String ? JSON.parse(response.body) : response.body
-      end
-  end
-end
-
 
 # Parse command line options (the kind that control the behavior of
 # this program, that is, not the pipeline component parameters).
@@ -214,13 +197,9 @@ end
 
 # Set up the API client.
 
-$client ||= Google::APIClient.
-  new(:host => $arvados_api_host,
-      :application_name => File.split($0).last,
-      :application_version => $application_version.to_s)
-$arvados = $client.discovered_api('arvados', $arvados_api_version)
 $arv = Arvados.new api_version: 'v1'
-
+$client = $arv.client
+$arvados = $arv.arvados_api
 
 class PipelineInstance
   def self.find(uuid)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list