[ARVADOS] updated: 91b1d56098b3c6cb54abaabbec68937fe58191fd

Git user git at public.curoverse.com
Tue May 10 01:39:19 EDT 2016


Summary of changes:
 .../arvados/v1/containers_controller.rb            |   7 +
 services/api/app/models/container.rb               |  70 ++++++--
 services/api/app/models/container_request.rb       |  62 ++++---
 services/api/config/routes.rb                      |   4 +-
 .../20160506175108_add_auths_to_container.rb       |   6 +
 ...9143250_add_auth_and_lock_to_container_index.rb |  19 +++
 services/api/db/structure.sql                      |  12 +-
 services/api/lib/current_api_client.rb             |   6 +
 .../test/fixtures/api_client_authorizations.yml    |   2 +-
 services/api/test/fixtures/container_requests.yml  |  13 ++
 services/api/test/fixtures/containers.yml          |   4 +-
 .../arvados/v1/containers_controller_test.rb       |  52 ++++++
 services/api/test/test_helper.rb                   |   4 +
 services/api/test/unit/container_test.rb           | 188 ++++++++++++---------
 .../crunch-dispatch-local/crunch-dispatch-local.go |  37 ++--
 .../crunch-dispatch-local_test.go                  |  19 +--
 16 files changed, 344 insertions(+), 161 deletions(-)
 create mode 100644 services/api/db/migrate/20160506175108_add_auths_to_container.rb
 create mode 100644 services/api/db/migrate/20160509143250_add_auth_and_lock_to_container_index.rb
 create mode 100644 services/api/test/fixtures/container_requests.yml
 create mode 100644 services/api/test/functional/arvados/v1/containers_controller_test.rb

  discards  2757c83be8fefbd5ac6a70970ab03d7803569b58 (commit)
       via  91b1d56098b3c6cb54abaabbec68937fe58191fd (commit)
       via  74189ab0578afbf445a38cbb99bf47d432e3c551 (commit)

This update added new revisions after undoing existing revisions.  That is
to say, the old revision is not a strict subset of the new revision.  This
situation occurs when you --force push a change and generate a repository
containing something like this:

 * -- * -- B -- O -- O -- O (2757c83be8fefbd5ac6a70970ab03d7803569b58)
            \
             N -- N -- N (91b1d56098b3c6cb54abaabbec68937fe58191fd)

When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit 91b1d56098b3c6cb54abaabbec68937fe58191fd
Author: Tom Clegg <tom at curoverse.com>
Date:   Mon May 9 15:33:05 2016 -0400

    8128: Add runtime tokens for containers, and locks for multiple dispatchers

diff --git a/services/api/app/controllers/arvados/v1/containers_controller.rb b/services/api/app/controllers/arvados/v1/containers_controller.rb
index 04a5ed0..3580397 100644
--- a/services/api/app/controllers/arvados/v1/containers_controller.rb
+++ b/services/api/app/controllers/arvados/v1/containers_controller.rb
@@ -4,4 +4,11 @@ class Arvados::V1::ContainersController < ApplicationController
   accept_attribute_as_json :runtime_constraints, Hash
   accept_attribute_as_json :command, Array
 
+  def auth
+    if @object.locked_by_uuid != Thread.current[:api_client_authorization].uuid
+      raise ArvadosModel::PermissionDeniedError.new("Not locked by your token")
+    end
+    @object = @object.auth
+    show
+  end
 end
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index a014552..845374e 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -16,9 +16,12 @@ class Container < ArvadosModel
   validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
   validate :validate_state_change
   validate :validate_change
+  validate :validate_lock
+  after_validation :assign_auth
   after_save :handle_completed
 
   has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
+  belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
 
   api_accessible :user, extend: :common do |t|
     t.add :command
@@ -37,6 +40,7 @@ class Container < ArvadosModel
     t.add :runtime_constraints
     t.add :started_at
     t.add :state
+    t.add :auth_uuid
   end
 
   # Supported states for a container
@@ -61,27 +65,17 @@ class Container < ArvadosModel
   end
 
   def update_priority!
-    if [Queued, Running].include? self.state
+    if [Queued, Locked, Running].include? self.state
       # Update the priority of this container to the maximum priority of any of
       # its committed container requests and save the record.
-      max = 0
-      ContainerRequest.where(container_uuid: uuid).each do |cr|
-        if cr.state == ContainerRequest::Committed and cr.priority > max
-          max = cr.priority
-        end
-      end
-      self.priority = max
+      self.priority = ContainerRequest.
+        where(container_uuid: uuid,
+              state: ContainerRequest::Committed).
+        maximum('priority')
       self.save!
     end
   end
 
-  def locked_by_uuid
-    # Stub to permit a single dispatch to recognize its own containers
-    if current_user.is_admin
-      Thread.current[:api_client_authorization].andand.uuid
-    end
-  end
-
   protected
 
   def fill_field_defaults
@@ -151,6 +145,52 @@ class Container < ArvadosModel
     check_update_whitelist permitted
   end
 
+  def validate_lock
+    if locked_by_uuid_was
+      if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
+        # Notably, prohibit changing state or locked_by_uuid:
+        check_update_whitelist [:priority]
+      end
+    end
+
+    if [Locked, Running].include? self.state
+      need_lock = Thread.current[:api_client_authorization].uuid
+    else
+      need_lock = nil
+    end
+    if self.locked_by_uuid_changed?
+      if self.locked_by_uuid != need_lock
+        return errors.add :locked_by_uuid, "can only change to #{need_lock}"
+      end
+    end
+    self.locked_by_uuid = need_lock
+  end
+
+  def assign_auth
+    if self.auth_uuid_changed?
+      return errors.add :auth_uuid, 'is readonly'
+    end
+    if not [Locked, Running].include? self.state
+      # don't need one
+      self.auth.andand.update_attributes(expires_at: db_current_time)
+      self.auth = nil
+      return
+    elsif self.auth
+      # already have one
+      return
+    end
+    cr = ContainerRequest.
+      where('container_uuid=? and priority>0', self.uuid).
+      order('priority desc').
+      first
+    if !cr
+      return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
+    end
+    self.auth = ApiClientAuthorization.
+      create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
+              api_client_id: 0)
+  end
+
   def handle_completed
     # This container is finished so finalize any associated container requests
     # that are associated with this container.
diff --git a/services/api/app/models/container_request.rb b/services/api/app/models/container_request.rb
index acb751c..6353132 100644
--- a/services/api/app/models/container_request.rb
+++ b/services/api/app/models/container_request.rb
@@ -78,32 +78,32 @@ class ContainerRequest < ArvadosModel
     self.cwd ||= "."
   end
 
-  # Turn a container request into a container.
+  # Create a new container (or find an existing one) to satisfy this
+  # request.
   def resolve
-    # In the future this will do things like resolve symbolic git and keep
-    # references to content addresses.
-    Container.create!({ :command => self.command,
-                        :container_image => self.container_image,
-                        :cwd => self.cwd,
-                        :environment => self.environment,
-                        :mounts => self.mounts,
-                        :output_path => self.output_path,
-                        :runtime_constraints => self.runtime_constraints })
+    # TODO: resolve symbolic git and keep references to content
+    # addresses.
+    c = act_as_system_user do
+      Container.create!(command: self.command,
+                        container_image: self.container_image,
+                        cwd: self.cwd,
+                        environment: self.environment,
+                        mounts: self.mounts,
+                        output_path: self.output_path,
+                        runtime_constraints: self.runtime_constraints)
+    end
+    self.container_uuid = c.uuid
   end
 
   def set_container
-    if self.container_uuid_changed?
-      if not current_user.andand.is_admin and not self.container_uuid.nil?
-        errors.add :container_uuid, "can only be updated to nil."
-      end
-    else
-      if self.state_changed?
-        if self.state == Committed and (self.state_was == Uncommitted or self.state_was.nil?)
-          act_as_system_user do
-            self.container_uuid = self.resolve.andand.uuid
-          end
-        end
-      end
+    if (container_uuid_changed? and
+        not current_user.andand.is_admin and
+        not container_uuid.nil?)
+      errors.add :container_uuid, "can only be updated to nil."
+      return false
+    end
+    if state_changed? and state == Committed and container_uuid.nil?
+      resolve
     end
   end
 
@@ -158,16 +158,14 @@ class ContainerRequest < ArvadosModel
   end
 
   def update_priority
-    if [Committed, Final].include? self.state and (self.state_changed? or
-                                                   self.priority_changed? or
-                                                   self.container_uuid_changed?)
-      [self.container_uuid_was, self.container_uuid].each do |cuuid|
-        unless cuuid.nil?
-          c = Container.find_by_uuid cuuid
-          act_as_system_user do
-            c.update_priority!
-          end
-        end
+    if self.state_changed? or
+        self.priority_changed? or
+        self.container_uuid_changed?
+      act_as_system_user do
+        Container.
+          where('uuid in (?)',
+                [self.container_uuid_was, self.container_uuid].compact).
+          map(&:update_priority!)
       end
     end
   end
diff --git a/services/api/config/routes.rb b/services/api/config/routes.rb
index c85a3fc..4cea874 100644
--- a/services/api/config/routes.rb
+++ b/services/api/config/routes.rb
@@ -28,7 +28,9 @@ Server::Application.routes.draw do
       end
       resources :humans
       resources :job_tasks
-      resources :containers
+      resources :containers do
+        get 'auth', on: :member
+      end
       resources :container_requests
       resources :jobs do
         get 'queue', on: :collection
diff --git a/services/api/db/migrate/20160506175108_add_auths_to_container.rb b/services/api/db/migrate/20160506175108_add_auths_to_container.rb
new file mode 100644
index 0000000..d714a49
--- /dev/null
+++ b/services/api/db/migrate/20160506175108_add_auths_to_container.rb
@@ -0,0 +1,6 @@
+class AddAuthsToContainer < ActiveRecord::Migration
+  def change
+    add_column :containers, :auth_uuid, :string
+    add_column :containers, :locked_by_uuid, :string
+  end
+end
diff --git a/services/api/db/migrate/20160509143250_add_auth_and_lock_to_container_index.rb b/services/api/db/migrate/20160509143250_add_auth_and_lock_to_container_index.rb
new file mode 100644
index 0000000..4329ac0
--- /dev/null
+++ b/services/api/db/migrate/20160509143250_add_auth_and_lock_to_container_index.rb
@@ -0,0 +1,19 @@
+class AddAuthAndLockToContainerIndex < ActiveRecord::Migration
+  Columns_were = ["uuid", "owner_uuid", "modified_by_client_uuid", "modified_by_user_uuid", "state", "log", "cwd", "output_path", "output", "container_image"]
+  Columns = Columns_were + ["auth_uuid", "locked_by_uuid"]
+  def up
+    begin
+      remove_index :containers, :name => 'containers_search_index'
+    rescue
+    end
+    add_index(:containers, Columns, name: "containers_search_index")
+  end
+
+  def down
+    begin
+      remove_index :containers, :name => 'containers_search_index'
+    rescue
+    end
+    add_index(:containers, Columns_were, name: "containers_search_index")
+  end
+end
diff --git a/services/api/db/structure.sql b/services/api/db/structure.sql
index 3ec420c..4bf4a17 100644
--- a/services/api/db/structure.sql
+++ b/services/api/db/structure.sql
@@ -339,7 +339,9 @@ CREATE TABLE containers (
     progress double precision,
     priority integer,
     updated_at timestamp without time zone NOT NULL,
-    exit_code integer
+    exit_code integer,
+    auth_uuid character varying(255),
+    locked_by_uuid character varying(255)
 );
 
 
@@ -1472,7 +1474,7 @@ CREATE INDEX container_requests_search_index ON container_requests USING btree (
 -- Name: containers_search_index; Type: INDEX; Schema: public; Owner: -; Tablespace: 
 --
 
-CREATE INDEX containers_search_index ON containers USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, state, log, cwd, output_path, output, container_image);
+CREATE INDEX containers_search_index ON containers USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, state, log, cwd, output_path, output, container_image, auth_uuid, locked_by_uuid);
 
 
 --
@@ -2583,4 +2585,8 @@ INSERT INTO schema_migrations (version) VALUES ('20160208210629');
 
 INSERT INTO schema_migrations (version) VALUES ('20160209155729');
 
-INSERT INTO schema_migrations (version) VALUES ('20160324144017');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20160324144017');
+
+INSERT INTO schema_migrations (version) VALUES ('20160506175108');
+
+INSERT INTO schema_migrations (version) VALUES ('20160509143250');
\ No newline at end of file
diff --git a/services/api/lib/current_api_client.rb b/services/api/lib/current_api_client.rb
index 2e78612..fbd4ef5 100644
--- a/services/api/lib/current_api_client.rb
+++ b/services/api/lib/current_api_client.rb
@@ -124,12 +124,18 @@ module CurrentApiClient
   end
 
   def act_as_user user
+    #auth_was = Thread.current[:api_client_authorization]
     user_was = Thread.current[:user]
     Thread.current[:user] = user
+    #Thread.current[:api_client_authorization] = ApiClientAuthorization.
+    #  where('user_id=? and scopes is null', user.id).
+    #  order('expires_at desc').
+    #  first
     begin
       yield
     ensure
       Thread.current[:user] = user_was
+      #Thread.current[:api_client_authorization] = auth_was
     end
   end
 
diff --git a/services/api/test/fixtures/api_client_authorizations.yml b/services/api/test/fixtures/api_client_authorizations.yml
index 485b6d1..d6c0e40 100644
--- a/services/api/test/fixtures/api_client_authorizations.yml
+++ b/services/api/test/fixtures/api_client_authorizations.yml
@@ -273,7 +273,7 @@ fuse:
 
 dispatch1:
   uuid: zzzzz-gj3su-k9dvestay1plssr
-  api_client: trusted
+  api_client: untrusted
   user: system_user
   api_token: kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw
   expires_at: 2038-01-01 00:00:00
\ No newline at end of file
diff --git a/services/api/test/fixtures/container_requests.yml b/services/api/test/fixtures/container_requests.yml
new file mode 100644
index 0000000..c9f3427
--- /dev/null
+++ b/services/api/test/fixtures/container_requests.yml
@@ -0,0 +1,13 @@
+queued:
+  owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  state: Committed
+  priority: 1
+  created_at: 2016-01-11 11:11:11.111111111 Z
+  updated_at: 2016-01-11 11:11:11.111111111 Z
+  modified_at: 2016-01-11 11:11:11.111111111 Z
+  modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+  container_image: test
+  cwd: test
+  output_path: test
+  command: ["echo", "hello"]
+  container_uuid: zzzzz-dz642-queuedcontainer
diff --git a/services/api/test/fixtures/containers.yml b/services/api/test/fixtures/containers.yml
index 22004b4..e80086e 100644
--- a/services/api/test/fixtures/containers.yml
+++ b/services/api/test/fixtures/containers.yml
@@ -1,6 +1,6 @@
 queued:
   uuid: zzzzz-dz642-queuedcontainer
-  owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  owner_uuid: zzzzz-tpzed-000000000000000
   state: Queued
   priority: 1
   created_at: 2016-01-11 11:11:11.111111111 Z
@@ -13,7 +13,7 @@ queued:
 
 completed:
   uuid: zzzzz-dz642-compltcontainer
-  owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+  owner_uuid: zzzzz-tpzed-000000000000000
   state: Complete
   priority: 1
   created_at: 2016-01-11 11:11:11.111111111 Z
diff --git a/services/api/test/functional/arvados/v1/containers_controller_test.rb b/services/api/test/functional/arvados/v1/containers_controller_test.rb
new file mode 100644
index 0000000..d9f7d96
--- /dev/null
+++ b/services/api/test/functional/arvados/v1/containers_controller_test.rb
@@ -0,0 +1,52 @@
+require 'test_helper'
+
+class Arvados::V1::ContainersControllerTest < ActionController::TestCase
+  test 'create' do
+    authorize_with :system_user
+    post :create, {
+      container: {
+        command: ['echo', 'hello'],
+        container_image: 'test',
+        output_path: 'test',
+      },
+    }
+    assert_response :success
+  end
+
+  [Container::Queued, Container::Complete].each do |state|
+    test "cannot get auth in #{state} state" do
+      authorize_with :dispatch1
+      get :auth, id: containers(:queued).uuid
+      assert_response 403
+    end
+  end
+
+  test 'cannot get auth with wrong token' do
+    authorize_with :dispatch1
+    c = containers(:queued)
+    assert c.update_attributes(state: Container::Locked), show_errors(c)
+
+    authorize_with :system_user
+    get :auth, id: c.uuid
+    assert_response 403
+  end
+
+  test 'get auth' do
+    authorize_with :dispatch1
+    c = containers(:queued)
+    assert c.update_attributes(state: Container::Locked), show_errors(c)
+    get :auth, id: c.uuid
+    assert_response :success
+    assert_operator 32, :<, json_response['api_token'].length
+    assert_equal 'arvados#apiClientAuthorization', json_response['kind']
+  end
+
+  test 'no auth in container response' do
+    authorize_with :dispatch1
+    c = containers(:queued)
+    assert c.update_attributes(state: Container::Locked), show_errors(c)
+    get :show, id: c.uuid
+    assert_response :success
+    assert_nil json_response['auth']
+  end
+end
diff --git a/services/api/test/test_helper.rb b/services/api/test/test_helper.rb
index 25ab286..ef08c72 100644
--- a/services/api/test/test_helper.rb
+++ b/services/api/test/test_helper.rb
@@ -36,6 +36,10 @@ module ArvadosTestSupport
   def auth(api_client_auth_name)
     {'HTTP_AUTHORIZATION' => "OAuth2 #{api_token(api_client_auth_name)}"}
   end
+
+  def show_errors model
+    return lambda { model.errors.full_messages.inspect }
+  end
 end
 
 class ActiveSupport::TestCase
diff --git a/services/api/test/unit/container_test.rb b/services/api/test/unit/container_test.rb
index 84713c2..9cc0981 100644
--- a/services/api/test/unit/container_test.rb
+++ b/services/api/test/unit/container_test.rb
@@ -1,22 +1,34 @@
 require 'test_helper'
 
 class ContainerTest < ActiveSupport::TestCase
-  def minimal_new
-    c = Container.new
-    c.command = ["echo", "foo"]
-    c.container_image = "img"
-    c.output_path = "/tmp"
-    c
-  end
-
-  def show_errors c
-    return lambda { c.errors.full_messages.inspect }
+  include DbCurrentTime
+
+  DEFAULT_ATTRS = {
+    command: ['echo', 'foo'],
+    container_image: 'img',
+    output_path: '/tmp',
+    priority: 1,
+  }
+
+  def minimal_new attrs={}
+    cr = ContainerRequest.new DEFAULT_ATTRS.merge(attrs)
+    act_as_user users(:active) do
+      cr.save!
+    end
+    c = Container.new DEFAULT_ATTRS.merge(attrs)
+    act_as_system_user do
+      c.save!
+      assert cr.update_attributes(container_uuid: c.uuid,
+                                  state: ContainerRequest::Committed,
+                                  ), show_errors(cr)
+    end
+    return c, cr
   end
 
   def check_illegal_updates c, bad_updates
     bad_updates.each do |u|
       refute c.update_attributes(u), u.inspect
-      refute c.valid?
+      refute c.valid?, u.inspect
       c.reload
     end
   end
@@ -28,6 +40,8 @@ class ContainerTest < ActiveSupport::TestCase
                               {environment: {"FOO" => "BAR"}},
                               {mounts: {"FOO" => "BAR"}},
                               {output_path: "/tmp3"},
+                              {locked_by_uuid: "zzzzz-gj3su-027z32aux8dg2s1"},
+                              {auth_uuid: "zzzzz-gj3su-017z32aux8dg2s1"},
                               {runtime_constraints: {"FOO" => "BAR"}}]
   end
 
@@ -39,7 +53,6 @@ class ContainerTest < ActiveSupport::TestCase
   def check_no_change_from_cancelled c
     check_illegal_modify c
     check_bogus_states c
-
     check_illegal_updates c, [{ priority: 3 },
                               { state: Container::Queued },
                               { state: Container::Locked },
@@ -49,13 +62,11 @@ class ContainerTest < ActiveSupport::TestCase
 
   test "Container create" do
     act_as_system_user do
-      c = minimal_new
-      c.environment = {}
-      c.mounts = {"BAR" => "FOO"}
-      c.output_path = "/tmp"
-      c.priority = 1
-      c.runtime_constraints = {}
-      c.save!
+      c, _ = minimal_new(environment: {},
+                      mounts: {"BAR" => "FOO"},
+                      output_path: "/tmp",
+                      priority: 1,
+                      runtime_constraints: {})
 
       check_illegal_modify c
       check_bogus_states c
@@ -67,89 +78,100 @@ class ContainerTest < ActiveSupport::TestCase
   end
 
   test "Container running" do
-    act_as_system_user do
-      c = minimal_new
-      c.save!
+    c, _ = minimal_new priority: 1
 
-      check_illegal_updates c, [{state: Container::Running},
-                                {state: Container::Complete}]
+    set_user_from_auth :dispatch1
+    check_illegal_updates c, [{state: Container::Running},
+                              {state: Container::Complete}]
 
-      c.update_attributes! state: Container::Locked
-      c.update_attributes! state: Container::Running
+    c.update_attributes! state: Container::Locked
+    c.update_attributes! state: Container::Running
 
-      check_illegal_modify c
-      check_bogus_states c
+    check_illegal_modify c
+    check_bogus_states c
 
-      check_illegal_updates c, [{state: Container::Queued}]
-      c.reload
+    check_illegal_updates c, [{state: Container::Queued}]
+    c.reload
 
-      c.update_attributes! priority: 3
-    end
+    c.update_attributes! priority: 3
   end
 
   test "Lock and unlock" do
-    act_as_system_user do
-      c = minimal_new
-      c.save!
-      assert_equal Container::Queued, c.state
+    c, cr = minimal_new priority: 0
 
-      refute c.update_attributes(state: Container::Running), "not locked"
-      c.reload
-      refute c.update_attributes(state: Container::Complete), "not locked"
-      c.reload
+    set_user_from_auth :dispatch1
+    assert_equal Container::Queued, c.state
 
-      assert c.update_attributes(state: Container::Locked), show_errors(c)
-      assert c.update_attributes(state: Container::Queued), show_errors(c)
+    refute c.update_attributes(state: Container::Locked), "no priority"
+    c.reload
+    assert cr.update_attributes priority: 1
 
-      refute c.update_attributes(state: Container::Running), "not locked"
-      c.reload
+    refute c.update_attributes(state: Container::Running), "not locked"
+    c.reload
+    refute c.update_attributes(state: Container::Complete), "not locked"
+    c.reload
 
-      assert c.update_attributes(state: Container::Locked), show_errors(c)
-      assert c.update_attributes(state: Container::Running), show_errors(c)
+    assert c.update_attributes(state: Container::Locked), show_errors(c)
+    assert c.locked_by_uuid
+    assert c.auth_uuid
 
-      refute c.update_attributes(state: Container::Locked), "already running"
-      c.reload
-      refute c.update_attributes(state: Container::Queued), "already running"
-      c.reload
+    assert c.update_attributes(state: Container::Queued), show_errors(c)
+    refute c.locked_by_uuid
+    refute c.auth_uuid
 
-      assert c.update_attributes(state: Container::Complete), show_errors(c)
-    end
+    refute c.update_attributes(state: Container::Running), "not locked"
+    c.reload
+    refute c.locked_by_uuid
+    refute c.auth_uuid
+
+    assert c.update_attributes(state: Container::Locked), show_errors(c)
+    assert c.update_attributes(state: Container::Running), show_errors(c)
+    assert c.locked_by_uuid
+    assert c.auth_uuid
+
+    auth_uuid_was = c.auth_uuid
+
+    refute c.update_attributes(state: Container::Locked), "already running"
+    c.reload
+    refute c.update_attributes(state: Container::Queued), "already running"
+    c.reload
+
+    assert c.update_attributes(state: Container::Complete), show_errors(c)
+    refute c.locked_by_uuid
+    refute c.auth_uuid
+
+    auth_exp = ApiClientAuthorization.find_by_uuid(auth_uuid_was).expires_at
+    assert_operator auth_exp, :<, db_current_time
   end
 
   test "Container queued cancel" do
-    act_as_system_user do
-      c = minimal_new
-      c.save!
-      assert c.update_attributes(state: Container::Cancelled), show_errors(c)
-      check_no_change_from_cancelled c
-    end
+    c, _ = minimal_new
+    set_user_from_auth :dispatch1
+    assert c.update_attributes(state: Container::Cancelled), show_errors(c)
+    check_no_change_from_cancelled c
   end
 
   test "Container locked cancel" do
-    act_as_system_user do
-      c = minimal_new
-      c.save!
-      assert c.update_attributes(state: Container::Locked), show_errors(c)
-      assert c.update_attributes(state: Container::Cancelled), show_errors(c)
-      check_no_change_from_cancelled c
-    end
+    c, _ = minimal_new
+    set_user_from_auth :dispatch1
+    assert c.update_attributes(state: Container::Locked), show_errors(c)
+    assert c.update_attributes(state: Container::Cancelled), show_errors(c)
+    check_no_change_from_cancelled c
   end
 
   test "Container running cancel" do
-    act_as_system_user do
-      c = minimal_new
-      c.save!
-      c.update_attributes! state: Container::Queued
-      c.update_attributes! state: Container::Locked
-      c.update_attributes! state: Container::Running
-      c.update_attributes! state: Container::Cancelled
-      check_no_change_from_cancelled c
-    end
+    c, _ = minimal_new
+    set_user_from_auth :dispatch1
+    c.update_attributes! state: Container::Queued
+    c.update_attributes! state: Container::Locked
+    c.update_attributes! state: Container::Running
+    c.update_attributes! state: Container::Cancelled
+    check_no_change_from_cancelled c
   end
 
   test "Container create forbidden for non-admin" do
     set_user_from_auth :active_trustedclient
-    c = minimal_new
+    c = Container.new DEFAULT_ATTRS
     c.environment = {}
     c.mounts = {"BAR" => "FOO"}
     c.output_path = "/tmp"
@@ -161,16 +183,14 @@ class ContainerTest < ActiveSupport::TestCase
   end
 
   test "Container only set exit code on complete" do
-    act_as_system_user do
-      c = minimal_new
-      c.save!
-      c.update_attributes! state: Container::Locked
-      c.update_attributes! state: Container::Running
+    c, _ = minimal_new
+    set_user_from_auth :dispatch1
+    c.update_attributes! state: Container::Locked
+    c.update_attributes! state: Container::Running
 
-      check_illegal_updates c, [{exit_code: 1},
-                                {exit_code: 1, state: Container::Cancelled}]
+    check_illegal_updates c, [{exit_code: 1},
+                              {exit_code: 1, state: Container::Cancelled}]
 
-      assert c.update_attributes(exit_code: 1, state: Container::Complete)
-    end
+    assert c.update_attributes(exit_code: 1, state: Container::Complete)
   end
 end

commit 74189ab0578afbf445a38cbb99bf47d432e3c551
Author: Tom Clegg <tom at curoverse.com>
Date:   Thu May 5 17:50:44 2016 -0400

    8128: Update crunch-dispatch-local to use new Locked state.

diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index e05c0c5..d8b6037 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -72,7 +72,7 @@ func doMain() error {
 	}(sigChan)
 
 	// Run all queued containers
-	runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+	runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
 
 	// Finished dispatching; interrupt any crunch jobs that are still running
 	for _, cmd := range runningCmds {
@@ -91,8 +91,8 @@ func doMain() error {
 // Any errors encountered are logged but the program would continue to run (not exit).
 // This is because, once one or more crunch jobs are running,
 // we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
-	ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
+	ticker := time.NewTicker(pollInterval)
 
 	for {
 		select {
@@ -107,9 +107,10 @@ func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunComman
 
 // Container data
 type Container struct {
-	UUID     string `json:"uuid"`
-	State    string `json:"state"`
-	Priority int    `json:"priority"`
+	UUID         string `json:"uuid"`
+	State        string `json:"state"`
+	Priority     int    `json:"priority"`
+	LockedByUUID string `json:"locked_by_uuid"`
 }
 
 // ContainerList is a list of the containers from api
@@ -118,7 +119,7 @@ type ContainerList struct {
 }
 
 // Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
+func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
 	params := arvadosclient.Dict{
 		"filters": [][]string{[]string{"state", "=", "Queued"}},
 	}
@@ -133,16 +134,16 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
 	for i := 0; i < len(containers.Items); i++ {
 		log.Printf("About to run queued container %v", containers.Items[i].UUID)
 		// Run the container
-		go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+		go run(containers.Items[i].UUID, crunchRunCommand, pollInterval)
 	}
 }
 
 // Run queued container:
-// Set container state to locked (TBD)
+// Set container state to Locked
 // Run container using the given crunch-run command
 // Set the container state to Running
 // If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
+func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
 	cmd := exec.Command(crunchRunCommand, uuid)
 
 	cmd.Stdin = nil
@@ -158,7 +159,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 	runningCmds[uuid] = cmd
 	runningCmdsMutex.Unlock()
 
-	log.Printf("Started container run for %v", uuid)
+	log.Printf("Starting container %v", uuid)
 
 	// Add this crunch job to waitGroup
 	waitGroup.Add(1)
@@ -167,54 +168,73 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
 	// Update container status to Running
 	err := arv.Update("containers", uuid,
 		arvadosclient.Dict{
-			"container": arvadosclient.Dict{"state": "Running"}},
+			"container": arvadosclient.Dict{"state": "Locked"}},
 		nil)
 	if err != nil {
-		log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+		log.Printf("Error updating container %v to 'Locked' state: %v", uuid, err)
 	}
 
-	// A goroutine to terminate the runner if container priority becomes zero
-	priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+	cmdExited := make(chan struct{})
+
+	// Kill the child process if container priority changes to zero
 	go func() {
-		for _ = range priorityTicker.C {
+		ticker := time.NewTicker(pollInterval)
+		defer ticker.Stop()
+		for {
+			select {
+			case <-cmdExited:
+				return
+			case <-ticker.C:
+			}
 			var container Container
 			err := arv.Get("containers", uuid, nil, &container)
 			if err != nil {
-				log.Printf("Error getting container info for %v: %q", uuid, err)
-			} else {
-				if container.Priority == 0 {
-					priorityTicker.Stop()
-					cmd.Process.Signal(os.Interrupt)
-				}
+				log.Printf("Error getting container %v: %q", uuid, err)
+				continue
+			}
+			if container.Priority == 0 {
+				log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+				cmd.Process.Signal(os.Interrupt)
 			}
 		}
 	}()
 
-	// Wait for the crunch job to exit
+	// Wait for crunch-run to exit
 	if _, err := cmd.Process.Wait(); err != nil {
 		log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
 	}
+	close(cmdExited)
 
-	// Remove the crunch job to runningCmds
+	defer log.Printf("Finished container run for %v", uuid)
+
+	// Remove the crunch job from runningCmds
 	runningCmdsMutex.Lock()
 	delete(runningCmds, uuid)
 	runningCmdsMutex.Unlock()
 
-	priorityTicker.Stop()
-
-	// The container state should be 'Complete'
+	// The container state should now be 'Complete' if everything
+	// went well. If it started but crunch-run didn't change its
+	// final state to 'Running', fix that now. If it never even
+	// started, cancel it as unrunnable. (TODO: Requeue instead,
+	// and fix tests so they can tell something happened even if
+	// the final state is Queued.)
 	var container Container
 	err = arv.Get("containers", uuid, nil, &container)
-	if container.State == "Running" {
-		log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
+	if err != nil {
+		log.Printf("Error getting final container state: %v", err)
+	}
+	fixState := map[string]string{
+		"Running": "Complete",
+		"Locked": "Cancelled",
+	}
+	if newState, ok := fixState[container.State]; ok {
+		log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
 		err = arv.Update("containers", uuid,
 			arvadosclient.Dict{
-				"container": arvadosclient.Dict{"state": "Complete"}},
+				"container": arvadosclient.Dict{"state": newState}},
 			nil)
 		if err != nil {
-			log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
+			log.Printf("Error updating container state to '%s' for %v: %q", newState, uuid, err)
 		}
 	}
-
-	log.Printf("Finished container run for %v", uuid)
 }
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index 3ec1e2e..ebbe79c 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -4,12 +4,11 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
 
-	"io/ioutil"
+	"bytes"
 	"log"
 	"net/http"
 	"net/http/httptest"
 	"os"
-	"strings"
 	"syscall"
 	"testing"
 	"time"
@@ -80,11 +79,11 @@ func (s *TestSuite) Test_doMain(c *C) {
 	c.Check(err, IsNil)
 	c.Assert(len(containers.Items), Equals, 0)
 
-	// Previously "Queued" container should now be in "Complete" state
+	// Previously "Queued" container should now be in "Cancelled" state
 	var container Container
 	err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
 	c.Check(err, IsNil)
-	c.Check(container.State, Equals, "Complete")
+	c.Check(container.State, Equals, "Cancelled")
 }
 
 func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
@@ -101,7 +100,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
 	apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
 		arvadostest.StubResponse{500, string(`{}`)}
 
-	testWithServerStub(c, apiStubResponses, "echo", "Error updating container state")
+	testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to 'Locked' state")
 }
 
 func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
@@ -139,10 +138,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 		Retries:   0,
 	}
 
-	tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
-	c.Check(err, IsNil)
-	defer os.Remove(tempfile.Name())
-	log.SetOutput(tempfile)
+	buf := bytes.NewBuffer(nil)
+	log.SetOutput(buf)
+	defer log.SetOutput(os.Stderr)
 
 	go func() {
 		time.Sleep(2 * time.Second)
@@ -154,6 +152,5 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
 	// Wait for all running crunch jobs to complete / terminate
 	waitGroup.Wait()
 
-	buf, _ := ioutil.ReadFile(tempfile.Name())
-	c.Check(strings.Contains(string(buf), expected), Equals, true)
+	c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
 }

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list