[ARVADOS] updated: 91b1d56098b3c6cb54abaabbec68937fe58191fd
Git user
git at public.curoverse.com
Tue May 10 01:39:19 EDT 2016
Summary of changes:
.../arvados/v1/containers_controller.rb | 7 +
services/api/app/models/container.rb | 70 ++++++--
services/api/app/models/container_request.rb | 62 ++++---
services/api/config/routes.rb | 4 +-
.../20160506175108_add_auths_to_container.rb | 6 +
...9143250_add_auth_and_lock_to_container_index.rb | 19 +++
services/api/db/structure.sql | 12 +-
services/api/lib/current_api_client.rb | 6 +
.../test/fixtures/api_client_authorizations.yml | 2 +-
services/api/test/fixtures/container_requests.yml | 13 ++
services/api/test/fixtures/containers.yml | 4 +-
.../arvados/v1/containers_controller_test.rb | 52 ++++++
services/api/test/test_helper.rb | 4 +
services/api/test/unit/container_test.rb | 188 ++++++++++++---------
.../crunch-dispatch-local/crunch-dispatch-local.go | 37 ++--
.../crunch-dispatch-local_test.go | 19 +--
16 files changed, 344 insertions(+), 161 deletions(-)
create mode 100644 services/api/db/migrate/20160506175108_add_auths_to_container.rb
create mode 100644 services/api/db/migrate/20160509143250_add_auth_and_lock_to_container_index.rb
create mode 100644 services/api/test/fixtures/container_requests.yml
create mode 100644 services/api/test/functional/arvados/v1/containers_controller_test.rb
discards 2757c83be8fefbd5ac6a70970ab03d7803569b58 (commit)
via 91b1d56098b3c6cb54abaabbec68937fe58191fd (commit)
via 74189ab0578afbf445a38cbb99bf47d432e3c551 (commit)
This update added new revisions after undoing existing revisions. That is
to say, the old revision is not a strict subset of the new revision. This
situation occurs when you --force push a change and generate a repository
containing something like this:
* -- * -- B -- O -- O -- O (2757c83be8fefbd5ac6a70970ab03d7803569b58)
\
N -- N -- N (91b1d56098b3c6cb54abaabbec68937fe58191fd)
When this happens we assume that you've already had alert emails for all
of the O revisions, and so we here report only the revisions in the N
branch from the common base, B.
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 91b1d56098b3c6cb54abaabbec68937fe58191fd
Author: Tom Clegg <tom at curoverse.com>
Date: Mon May 9 15:33:05 2016 -0400
8128: Add runtime tokens for containers, and locks for multiple dispatchers
diff --git a/services/api/app/controllers/arvados/v1/containers_controller.rb b/services/api/app/controllers/arvados/v1/containers_controller.rb
index 04a5ed0..3580397 100644
--- a/services/api/app/controllers/arvados/v1/containers_controller.rb
+++ b/services/api/app/controllers/arvados/v1/containers_controller.rb
@@ -4,4 +4,11 @@ class Arvados::V1::ContainersController < ApplicationController
accept_attribute_as_json :runtime_constraints, Hash
accept_attribute_as_json :command, Array
+ def auth
+ if @object.locked_by_uuid != Thread.current[:api_client_authorization].uuid
+ raise ArvadosModel::PermissionDeniedError.new("Not locked by your token")
+ end
+ @object = @object.auth
+ show
+ end
end
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index a014552..845374e 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -16,9 +16,12 @@ class Container < ArvadosModel
validates :command, :container_image, :output_path, :cwd, :priority, :presence => true
validate :validate_state_change
validate :validate_change
+ validate :validate_lock
+ after_validation :assign_auth
after_save :handle_completed
has_many :container_requests, :foreign_key => :container_uuid, :class_name => 'ContainerRequest', :primary_key => :uuid
+ belongs_to :auth, :class_name => 'ApiClientAuthorization', :foreign_key => :auth_uuid, :primary_key => :uuid
api_accessible :user, extend: :common do |t|
t.add :command
@@ -37,6 +40,7 @@ class Container < ArvadosModel
t.add :runtime_constraints
t.add :started_at
t.add :state
+ t.add :auth_uuid
end
# Supported states for a container
@@ -61,27 +65,17 @@ class Container < ArvadosModel
end
def update_priority!
- if [Queued, Running].include? self.state
+ if [Queued, Locked, Running].include? self.state
# Update the priority of this container to the maximum priority of any of
# its committed container requests and save the record.
- max = 0
- ContainerRequest.where(container_uuid: uuid).each do |cr|
- if cr.state == ContainerRequest::Committed and cr.priority > max
- max = cr.priority
- end
- end
- self.priority = max
+ self.priority = ContainerRequest.
+ where(container_uuid: uuid,
+ state: ContainerRequest::Committed).
+ maximum('priority')
self.save!
end
end
- def locked_by_uuid
- # Stub to permit a single dispatch to recognize its own containers
- if current_user.is_admin
- Thread.current[:api_client_authorization].andand.uuid
- end
- end
-
protected
def fill_field_defaults
@@ -151,6 +145,52 @@ class Container < ArvadosModel
check_update_whitelist permitted
end
+ def validate_lock
+ if locked_by_uuid_was
+ if locked_by_uuid_was != Thread.current[:api_client_authorization].uuid
+ # Notably, prohibit changing state or locked_by_uuid:
+ check_update_whitelist [:priority]
+ end
+ end
+
+ if [Locked, Running].include? self.state
+ need_lock = Thread.current[:api_client_authorization].uuid
+ else
+ need_lock = nil
+ end
+ if self.locked_by_uuid_changed?
+ if self.locked_by_uuid != need_lock
+ return errors.add :locked_by_uuid, "can only change to #{need_lock}"
+ end
+ end
+ self.locked_by_uuid = need_lock
+ end
+
+ def assign_auth
+ if self.auth_uuid_changed?
+ return errors.add :auth_uuid, 'is readonly'
+ end
+ if not [Locked, Running].include? self.state
+ # don't need one
+ self.auth.andand.update_attributes(expires_at: db_current_time)
+ self.auth = nil
+ return
+ elsif self.auth
+ # already have one
+ return
+ end
+ cr = ContainerRequest.
+ where('container_uuid=? and priority>0', self.uuid).
+ order('priority desc').
+ first
+ if !cr
+ return errors.add :auth_uuid, "cannot be assigned because priority <= 0"
+ end
+ self.auth = ApiClientAuthorization.
+ create!(user_id: User.find_by_uuid(cr.modified_by_user_uuid).id,
+ api_client_id: 0)
+ end
+
def handle_completed
# This container is finished so finalize any associated container requests
# that are associated with this container.
diff --git a/services/api/app/models/container_request.rb b/services/api/app/models/container_request.rb
index acb751c..6353132 100644
--- a/services/api/app/models/container_request.rb
+++ b/services/api/app/models/container_request.rb
@@ -78,32 +78,32 @@ class ContainerRequest < ArvadosModel
self.cwd ||= "."
end
- # Turn a container request into a container.
+ # Create a new container (or find an existing one) to satisfy this
+ # request.
def resolve
- # In the future this will do things like resolve symbolic git and keep
- # references to content addresses.
- Container.create!({ :command => self.command,
- :container_image => self.container_image,
- :cwd => self.cwd,
- :environment => self.environment,
- :mounts => self.mounts,
- :output_path => self.output_path,
- :runtime_constraints => self.runtime_constraints })
+ # TODO: resolve symbolic git and keep references to content
+ # addresses.
+ c = act_as_system_user do
+ Container.create!(command: self.command,
+ container_image: self.container_image,
+ cwd: self.cwd,
+ environment: self.environment,
+ mounts: self.mounts,
+ output_path: self.output_path,
+ runtime_constraints: self.runtime_constraints)
+ end
+ self.container_uuid = c.uuid
end
def set_container
- if self.container_uuid_changed?
- if not current_user.andand.is_admin and not self.container_uuid.nil?
- errors.add :container_uuid, "can only be updated to nil."
- end
- else
- if self.state_changed?
- if self.state == Committed and (self.state_was == Uncommitted or self.state_was.nil?)
- act_as_system_user do
- self.container_uuid = self.resolve.andand.uuid
- end
- end
- end
+ if (container_uuid_changed? and
+ not current_user.andand.is_admin and
+ not container_uuid.nil?)
+ errors.add :container_uuid, "can only be updated to nil."
+ return false
+ end
+ if state_changed? and state == Committed and container_uuid.nil?
+ resolve
end
end
@@ -158,16 +158,14 @@ class ContainerRequest < ArvadosModel
end
def update_priority
- if [Committed, Final].include? self.state and (self.state_changed? or
- self.priority_changed? or
- self.container_uuid_changed?)
- [self.container_uuid_was, self.container_uuid].each do |cuuid|
- unless cuuid.nil?
- c = Container.find_by_uuid cuuid
- act_as_system_user do
- c.update_priority!
- end
- end
+ if self.state_changed? or
+ self.priority_changed? or
+ self.container_uuid_changed?
+ act_as_system_user do
+ Container.
+ where('uuid in (?)',
+ [self.container_uuid_was, self.container_uuid].compact).
+ map(&:update_priority!)
end
end
end
diff --git a/services/api/config/routes.rb b/services/api/config/routes.rb
index c85a3fc..4cea874 100644
--- a/services/api/config/routes.rb
+++ b/services/api/config/routes.rb
@@ -28,7 +28,9 @@ Server::Application.routes.draw do
end
resources :humans
resources :job_tasks
- resources :containers
+ resources :containers do
+ get 'auth', on: :member
+ end
resources :container_requests
resources :jobs do
get 'queue', on: :collection
diff --git a/services/api/db/migrate/20160506175108_add_auths_to_container.rb b/services/api/db/migrate/20160506175108_add_auths_to_container.rb
new file mode 100644
index 0000000..d714a49
--- /dev/null
+++ b/services/api/db/migrate/20160506175108_add_auths_to_container.rb
@@ -0,0 +1,6 @@
+class AddAuthsToContainer < ActiveRecord::Migration
+ def change
+ add_column :containers, :auth_uuid, :string
+ add_column :containers, :locked_by_uuid, :string
+ end
+end
diff --git a/services/api/db/migrate/20160509143250_add_auth_and_lock_to_container_index.rb b/services/api/db/migrate/20160509143250_add_auth_and_lock_to_container_index.rb
new file mode 100644
index 0000000..4329ac0
--- /dev/null
+++ b/services/api/db/migrate/20160509143250_add_auth_and_lock_to_container_index.rb
@@ -0,0 +1,19 @@
+class AddAuthAndLockToContainerIndex < ActiveRecord::Migration
+ Columns_were = ["uuid", "owner_uuid", "modified_by_client_uuid", "modified_by_user_uuid", "state", "log", "cwd", "output_path", "output", "container_image"]
+ Columns = Columns_were + ["auth_uuid", "locked_by_uuid"]
+ def up
+ begin
+ remove_index :containers, :name => 'containers_search_index'
+ rescue
+ end
+ add_index(:containers, Columns, name: "containers_search_index")
+ end
+
+ def down
+ begin
+ remove_index :containers, :name => 'containers_search_index'
+ rescue
+ end
+ add_index(:containers, Columns_were, name: "containers_search_index")
+ end
+end
diff --git a/services/api/db/structure.sql b/services/api/db/structure.sql
index 3ec420c..4bf4a17 100644
--- a/services/api/db/structure.sql
+++ b/services/api/db/structure.sql
@@ -339,7 +339,9 @@ CREATE TABLE containers (
progress double precision,
priority integer,
updated_at timestamp without time zone NOT NULL,
- exit_code integer
+ exit_code integer,
+ auth_uuid character varying(255),
+ locked_by_uuid character varying(255)
);
@@ -1472,7 +1474,7 @@ CREATE INDEX container_requests_search_index ON container_requests USING btree (
-- Name: containers_search_index; Type: INDEX; Schema: public; Owner: -; Tablespace:
--
-CREATE INDEX containers_search_index ON containers USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, state, log, cwd, output_path, output, container_image);
+CREATE INDEX containers_search_index ON containers USING btree (uuid, owner_uuid, modified_by_client_uuid, modified_by_user_uuid, state, log, cwd, output_path, output, container_image, auth_uuid, locked_by_uuid);
--
@@ -2583,4 +2585,8 @@ INSERT INTO schema_migrations (version) VALUES ('20160208210629');
INSERT INTO schema_migrations (version) VALUES ('20160209155729');
-INSERT INTO schema_migrations (version) VALUES ('20160324144017');
\ No newline at end of file
+INSERT INTO schema_migrations (version) VALUES ('20160324144017');
+
+INSERT INTO schema_migrations (version) VALUES ('20160506175108');
+
+INSERT INTO schema_migrations (version) VALUES ('20160509143250');
\ No newline at end of file
diff --git a/services/api/lib/current_api_client.rb b/services/api/lib/current_api_client.rb
index 2e78612..fbd4ef5 100644
--- a/services/api/lib/current_api_client.rb
+++ b/services/api/lib/current_api_client.rb
@@ -124,12 +124,18 @@ module CurrentApiClient
end
def act_as_user user
+ #auth_was = Thread.current[:api_client_authorization]
user_was = Thread.current[:user]
Thread.current[:user] = user
+ #Thread.current[:api_client_authorization] = ApiClientAuthorization.
+ # where('user_id=? and scopes is null', user.id).
+ # order('expires_at desc').
+ # first
begin
yield
ensure
Thread.current[:user] = user_was
+ #Thread.current[:api_client_authorization] = auth_was
end
end
diff --git a/services/api/test/fixtures/api_client_authorizations.yml b/services/api/test/fixtures/api_client_authorizations.yml
index 485b6d1..d6c0e40 100644
--- a/services/api/test/fixtures/api_client_authorizations.yml
+++ b/services/api/test/fixtures/api_client_authorizations.yml
@@ -273,7 +273,7 @@ fuse:
dispatch1:
uuid: zzzzz-gj3su-k9dvestay1plssr
- api_client: trusted
+ api_client: untrusted
user: system_user
api_token: kwi8oowusvbutahacwk2geulqewy5oaqmpalczfna4b6bb0hfw
expires_at: 2038-01-01 00:00:00
\ No newline at end of file
diff --git a/services/api/test/fixtures/container_requests.yml b/services/api/test/fixtures/container_requests.yml
new file mode 100644
index 0000000..c9f3427
--- /dev/null
+++ b/services/api/test/fixtures/container_requests.yml
@@ -0,0 +1,13 @@
+queued:
+ owner_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ state: Committed
+ priority: 1
+ created_at: 2016-01-11 11:11:11.111111111 Z
+ updated_at: 2016-01-11 11:11:11.111111111 Z
+ modified_at: 2016-01-11 11:11:11.111111111 Z
+ modified_by_user_uuid: zzzzz-tpzed-xurymjxw79nv3jz
+ container_image: test
+ cwd: test
+ output_path: test
+ command: ["echo", "hello"]
+ container_uuid: zzzzz-dz642-queuedcontainer
diff --git a/services/api/test/fixtures/containers.yml b/services/api/test/fixtures/containers.yml
index 22004b4..e80086e 100644
--- a/services/api/test/fixtures/containers.yml
+++ b/services/api/test/fixtures/containers.yml
@@ -1,6 +1,6 @@
queued:
uuid: zzzzz-dz642-queuedcontainer
- owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+ owner_uuid: zzzzz-tpzed-000000000000000
state: Queued
priority: 1
created_at: 2016-01-11 11:11:11.111111111 Z
@@ -13,7 +13,7 @@ queued:
completed:
uuid: zzzzz-dz642-compltcontainer
- owner_uuid: zzzzz-tpzed-d9tiejq69daie8f
+ owner_uuid: zzzzz-tpzed-000000000000000
state: Complete
priority: 1
created_at: 2016-01-11 11:11:11.111111111 Z
diff --git a/services/api/test/functional/arvados/v1/containers_controller_test.rb b/services/api/test/functional/arvados/v1/containers_controller_test.rb
new file mode 100644
index 0000000..d9f7d96
--- /dev/null
+++ b/services/api/test/functional/arvados/v1/containers_controller_test.rb
@@ -0,0 +1,52 @@
+require 'test_helper'
+
+class Arvados::V1::ContainersControllerTest < ActionController::TestCase
+ test 'create' do
+ authorize_with :system_user
+ post :create, {
+ container: {
+ command: ['echo', 'hello'],
+ container_image: 'test',
+ output_path: 'test',
+ },
+ }
+ assert_response :success
+ end
+
+ [Container::Queued, Container::Complete].each do |state|
+ test "cannot get auth in #{state} state" do
+ authorize_with :dispatch1
+ get :auth, id: containers(:queued).uuid
+ assert_response 403
+ end
+ end
+
+ test 'cannot get auth with wrong token' do
+ authorize_with :dispatch1
+ c = containers(:queued)
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+
+ authorize_with :system_user
+ get :auth, id: c.uuid
+ assert_response 403
+ end
+
+ test 'get auth' do
+ authorize_with :dispatch1
+ c = containers(:queued)
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ get :auth, id: c.uuid
+ assert_response :success
+ assert_operator 32, :<, json_response['api_token'].length
+ assert_equal 'arvados#apiClientAuthorization', json_response['kind']
+ end
+
+ test 'no auth in container response' do
+ authorize_with :dispatch1
+ c = containers(:queued)
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ get :show, id: c.uuid
+ assert_response :success
+ assert_nil json_response['auth']
+ end
+end
diff --git a/services/api/test/test_helper.rb b/services/api/test/test_helper.rb
index 25ab286..ef08c72 100644
--- a/services/api/test/test_helper.rb
+++ b/services/api/test/test_helper.rb
@@ -36,6 +36,10 @@ module ArvadosTestSupport
def auth(api_client_auth_name)
{'HTTP_AUTHORIZATION' => "OAuth2 #{api_token(api_client_auth_name)}"}
end
+
+ def show_errors model
+ return lambda { model.errors.full_messages.inspect }
+ end
end
class ActiveSupport::TestCase
diff --git a/services/api/test/unit/container_test.rb b/services/api/test/unit/container_test.rb
index 84713c2..9cc0981 100644
--- a/services/api/test/unit/container_test.rb
+++ b/services/api/test/unit/container_test.rb
@@ -1,22 +1,34 @@
require 'test_helper'
class ContainerTest < ActiveSupport::TestCase
- def minimal_new
- c = Container.new
- c.command = ["echo", "foo"]
- c.container_image = "img"
- c.output_path = "/tmp"
- c
- end
-
- def show_errors c
- return lambda { c.errors.full_messages.inspect }
+ include DbCurrentTime
+
+ DEFAULT_ATTRS = {
+ command: ['echo', 'foo'],
+ container_image: 'img',
+ output_path: '/tmp',
+ priority: 1,
+ }
+
+ def minimal_new attrs={}
+ cr = ContainerRequest.new DEFAULT_ATTRS.merge(attrs)
+ act_as_user users(:active) do
+ cr.save!
+ end
+ c = Container.new DEFAULT_ATTRS.merge(attrs)
+ act_as_system_user do
+ c.save!
+ assert cr.update_attributes(container_uuid: c.uuid,
+ state: ContainerRequest::Committed,
+ ), show_errors(cr)
+ end
+ return c, cr
end
def check_illegal_updates c, bad_updates
bad_updates.each do |u|
refute c.update_attributes(u), u.inspect
- refute c.valid?
+ refute c.valid?, u.inspect
c.reload
end
end
@@ -28,6 +40,8 @@ class ContainerTest < ActiveSupport::TestCase
{environment: {"FOO" => "BAR"}},
{mounts: {"FOO" => "BAR"}},
{output_path: "/tmp3"},
+ {locked_by_uuid: "zzzzz-gj3su-027z32aux8dg2s1"},
+ {auth_uuid: "zzzzz-gj3su-017z32aux8dg2s1"},
{runtime_constraints: {"FOO" => "BAR"}}]
end
@@ -39,7 +53,6 @@ class ContainerTest < ActiveSupport::TestCase
def check_no_change_from_cancelled c
check_illegal_modify c
check_bogus_states c
-
check_illegal_updates c, [{ priority: 3 },
{ state: Container::Queued },
{ state: Container::Locked },
@@ -49,13 +62,11 @@ class ContainerTest < ActiveSupport::TestCase
test "Container create" do
act_as_system_user do
- c = minimal_new
- c.environment = {}
- c.mounts = {"BAR" => "FOO"}
- c.output_path = "/tmp"
- c.priority = 1
- c.runtime_constraints = {}
- c.save!
+ c, _ = minimal_new(environment: {},
+ mounts: {"BAR" => "FOO"},
+ output_path: "/tmp",
+ priority: 1,
+ runtime_constraints: {})
check_illegal_modify c
check_bogus_states c
@@ -67,89 +78,100 @@ class ContainerTest < ActiveSupport::TestCase
end
test "Container running" do
- act_as_system_user do
- c = minimal_new
- c.save!
+ c, _ = minimal_new priority: 1
- check_illegal_updates c, [{state: Container::Running},
- {state: Container::Complete}]
+ set_user_from_auth :dispatch1
+ check_illegal_updates c, [{state: Container::Running},
+ {state: Container::Complete}]
- c.update_attributes! state: Container::Locked
- c.update_attributes! state: Container::Running
+ c.update_attributes! state: Container::Locked
+ c.update_attributes! state: Container::Running
- check_illegal_modify c
- check_bogus_states c
+ check_illegal_modify c
+ check_bogus_states c
- check_illegal_updates c, [{state: Container::Queued}]
- c.reload
+ check_illegal_updates c, [{state: Container::Queued}]
+ c.reload
- c.update_attributes! priority: 3
- end
+ c.update_attributes! priority: 3
end
test "Lock and unlock" do
- act_as_system_user do
- c = minimal_new
- c.save!
- assert_equal Container::Queued, c.state
+ c, cr = minimal_new priority: 0
- refute c.update_attributes(state: Container::Running), "not locked"
- c.reload
- refute c.update_attributes(state: Container::Complete), "not locked"
- c.reload
+ set_user_from_auth :dispatch1
+ assert_equal Container::Queued, c.state
- assert c.update_attributes(state: Container::Locked), show_errors(c)
- assert c.update_attributes(state: Container::Queued), show_errors(c)
+ refute c.update_attributes(state: Container::Locked), "no priority"
+ c.reload
+ assert cr.update_attributes priority: 1
- refute c.update_attributes(state: Container::Running), "not locked"
- c.reload
+ refute c.update_attributes(state: Container::Running), "not locked"
+ c.reload
+ refute c.update_attributes(state: Container::Complete), "not locked"
+ c.reload
- assert c.update_attributes(state: Container::Locked), show_errors(c)
- assert c.update_attributes(state: Container::Running), show_errors(c)
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ assert c.locked_by_uuid
+ assert c.auth_uuid
- refute c.update_attributes(state: Container::Locked), "already running"
- c.reload
- refute c.update_attributes(state: Container::Queued), "already running"
- c.reload
+ assert c.update_attributes(state: Container::Queued), show_errors(c)
+ refute c.locked_by_uuid
+ refute c.auth_uuid
- assert c.update_attributes(state: Container::Complete), show_errors(c)
- end
+ refute c.update_attributes(state: Container::Running), "not locked"
+ c.reload
+ refute c.locked_by_uuid
+ refute c.auth_uuid
+
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ assert c.update_attributes(state: Container::Running), show_errors(c)
+ assert c.locked_by_uuid
+ assert c.auth_uuid
+
+ auth_uuid_was = c.auth_uuid
+
+ refute c.update_attributes(state: Container::Locked), "already running"
+ c.reload
+ refute c.update_attributes(state: Container::Queued), "already running"
+ c.reload
+
+ assert c.update_attributes(state: Container::Complete), show_errors(c)
+ refute c.locked_by_uuid
+ refute c.auth_uuid
+
+ auth_exp = ApiClientAuthorization.find_by_uuid(auth_uuid_was).expires_at
+ assert_operator auth_exp, :<, db_current_time
end
test "Container queued cancel" do
- act_as_system_user do
- c = minimal_new
- c.save!
- assert c.update_attributes(state: Container::Cancelled), show_errors(c)
- check_no_change_from_cancelled c
- end
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ assert c.update_attributes(state: Container::Cancelled), show_errors(c)
+ check_no_change_from_cancelled c
end
test "Container locked cancel" do
- act_as_system_user do
- c = minimal_new
- c.save!
- assert c.update_attributes(state: Container::Locked), show_errors(c)
- assert c.update_attributes(state: Container::Cancelled), show_errors(c)
- check_no_change_from_cancelled c
- end
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ assert c.update_attributes(state: Container::Locked), show_errors(c)
+ assert c.update_attributes(state: Container::Cancelled), show_errors(c)
+ check_no_change_from_cancelled c
end
test "Container running cancel" do
- act_as_system_user do
- c = minimal_new
- c.save!
- c.update_attributes! state: Container::Queued
- c.update_attributes! state: Container::Locked
- c.update_attributes! state: Container::Running
- c.update_attributes! state: Container::Cancelled
- check_no_change_from_cancelled c
- end
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ c.update_attributes! state: Container::Queued
+ c.update_attributes! state: Container::Locked
+ c.update_attributes! state: Container::Running
+ c.update_attributes! state: Container::Cancelled
+ check_no_change_from_cancelled c
end
test "Container create forbidden for non-admin" do
set_user_from_auth :active_trustedclient
- c = minimal_new
+ c = Container.new DEFAULT_ATTRS
c.environment = {}
c.mounts = {"BAR" => "FOO"}
c.output_path = "/tmp"
@@ -161,16 +183,14 @@ class ContainerTest < ActiveSupport::TestCase
end
test "Container only set exit code on complete" do
- act_as_system_user do
- c = minimal_new
- c.save!
- c.update_attributes! state: Container::Locked
- c.update_attributes! state: Container::Running
+ c, _ = minimal_new
+ set_user_from_auth :dispatch1
+ c.update_attributes! state: Container::Locked
+ c.update_attributes! state: Container::Running
- check_illegal_updates c, [{exit_code: 1},
- {exit_code: 1, state: Container::Cancelled}]
+ check_illegal_updates c, [{exit_code: 1},
+ {exit_code: 1, state: Container::Cancelled}]
- assert c.update_attributes(exit_code: 1, state: Container::Complete)
- end
+ assert c.update_attributes(exit_code: 1, state: Container::Complete)
end
end
commit 74189ab0578afbf445a38cbb99bf47d432e3c551
Author: Tom Clegg <tom at curoverse.com>
Date: Thu May 5 17:50:44 2016 -0400
8128: Update crunch-dispatch-local to use new Locked state.
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local.go b/services/crunch-dispatch-local/crunch-dispatch-local.go
index e05c0c5..d8b6037 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local.go
@@ -72,7 +72,7 @@ func doMain() error {
}(sigChan)
// Run all queued containers
- runQueuedContainers(*pollInterval, *priorityPollInterval, *crunchRunCommand)
+ runQueuedContainers(time.Duration(*pollInterval)*time.Second, time.Duration(*priorityPollInterval)*time.Second, *crunchRunCommand)
// Finished dispatching; interrupt any crunch jobs that are still running
for _, cmd := range runningCmds {
@@ -91,8 +91,8 @@ func doMain() error {
// Any errors encountered are logged but the program would continue to run (not exit).
// This is because, once one or more crunch jobs are running,
// we would need to wait for them complete.
-func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunCommand string) {
- ticker := time.NewTicker(time.Duration(pollInterval) * time.Second)
+func runQueuedContainers(pollInterval, priorityPollInterval time.Duration, crunchRunCommand string) {
+ ticker := time.NewTicker(pollInterval)
for {
select {
@@ -107,9 +107,10 @@ func runQueuedContainers(pollInterval, priorityPollInterval int, crunchRunComman
// Container data
type Container struct {
- UUID string `json:"uuid"`
- State string `json:"state"`
- Priority int `json:"priority"`
+ UUID string `json:"uuid"`
+ State string `json:"state"`
+ Priority int `json:"priority"`
+ LockedByUUID string `json:"locked_by_uuid"`
}
// ContainerList is a list of the containers from api
@@ -118,7 +119,7 @@ type ContainerList struct {
}
// Get the list of queued containers from API server and invoke run for each container.
-func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
+func dispatchLocal(pollInterval time.Duration, crunchRunCommand string) {
params := arvadosclient.Dict{
"filters": [][]string{[]string{"state", "=", "Queued"}},
}
@@ -133,16 +134,16 @@ func dispatchLocal(priorityPollInterval int, crunchRunCommand string) {
for i := 0; i < len(containers.Items); i++ {
log.Printf("About to run queued container %v", containers.Items[i].UUID)
// Run the container
- go run(containers.Items[i].UUID, crunchRunCommand, priorityPollInterval)
+ go run(containers.Items[i].UUID, crunchRunCommand, pollInterval)
}
}
// Run queued container:
-// Set container state to locked (TBD)
+// Set container state to Locked
// Run container using the given crunch-run command
// Set the container state to Running
// If the container priority becomes zero while crunch job is still running, terminate it.
-func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
+func run(uuid string, crunchRunCommand string, pollInterval time.Duration) {
cmd := exec.Command(crunchRunCommand, uuid)
cmd.Stdin = nil
@@ -158,7 +159,7 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
runningCmds[uuid] = cmd
runningCmdsMutex.Unlock()
- log.Printf("Started container run for %v", uuid)
+ log.Printf("Starting container %v", uuid)
// Add this crunch job to waitGroup
waitGroup.Add(1)
@@ -167,54 +168,73 @@ func run(uuid string, crunchRunCommand string, priorityPollInterval int) {
// Update container status to Running
err := arv.Update("containers", uuid,
arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Running"}},
+ "container": arvadosclient.Dict{"state": "Locked"}},
nil)
if err != nil {
- log.Printf("Error updating container state to 'Running' for %v: %q", uuid, err)
+ log.Printf("Error updating container %v to 'Locked' state: %v", uuid, err)
}
- // A goroutine to terminate the runner if container priority becomes zero
- priorityTicker := time.NewTicker(time.Duration(priorityPollInterval) * time.Second)
+ cmdExited := make(chan struct{})
+
+ // Kill the child process if container priority changes to zero
go func() {
- for _ = range priorityTicker.C {
+ ticker := time.NewTicker(pollInterval)
+ defer ticker.Stop()
+ for {
+ select {
+ case <-cmdExited:
+ return
+ case <-ticker.C:
+ }
var container Container
err := arv.Get("containers", uuid, nil, &container)
if err != nil {
- log.Printf("Error getting container info for %v: %q", uuid, err)
- } else {
- if container.Priority == 0 {
- priorityTicker.Stop()
- cmd.Process.Signal(os.Interrupt)
- }
+ log.Printf("Error getting container %v: %q", uuid, err)
+ continue
+ }
+ if container.Priority == 0 {
+ log.Printf("Sending SIGINT to pid %d to cancel container %v", cmd.Process.Pid, uuid)
+ cmd.Process.Signal(os.Interrupt)
}
}
}()
- // Wait for the crunch job to exit
+ // Wait for crunch-run to exit
if _, err := cmd.Process.Wait(); err != nil {
log.Printf("Error while waiting for crunch job to finish for %v: %q", uuid, err)
}
+ close(cmdExited)
- // Remove the crunch job to runningCmds
+ defer log.Printf("Finished container run for %v", uuid)
+
+ // Remove the crunch job from runningCmds
runningCmdsMutex.Lock()
delete(runningCmds, uuid)
runningCmdsMutex.Unlock()
- priorityTicker.Stop()
-
- // The container state should be 'Complete'
+ // The container state should now be 'Complete' if everything
+ // went well. If it started but crunch-run didn't change its
+ // final state to 'Running', fix that now. If it never even
+ // started, cancel it as unrunnable. (TODO: Requeue instead,
+ // and fix tests so they can tell something happened even if
+ // the final state is Queued.)
var container Container
err = arv.Get("containers", uuid, nil, &container)
- if container.State == "Running" {
- log.Printf("After crunch-run process termination, the state is still 'Running' for %v. Updating it to 'Complete'", uuid)
+ if err != nil {
+ log.Printf("Error getting final container state: %v", err)
+ }
+ fixState := map[string]string{
+ "Running": "Complete",
+ "Locked": "Cancelled",
+ }
+ if newState, ok := fixState[container.State]; ok {
+ log.Printf("After crunch-run process termination, the state is still '%s' for %v. Updating it to '%s'", container.State, uuid, newState)
err = arv.Update("containers", uuid,
arvadosclient.Dict{
- "container": arvadosclient.Dict{"state": "Complete"}},
+ "container": arvadosclient.Dict{"state": newState}},
nil)
if err != nil {
- log.Printf("Error updating container state to Complete for %v: %q", uuid, err)
+ log.Printf("Error updating container state to '%s' for %v: %q", newState, uuid, err)
}
}
-
- log.Printf("Finished container run for %v", uuid)
}
diff --git a/services/crunch-dispatch-local/crunch-dispatch-local_test.go b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
index 3ec1e2e..ebbe79c 100644
--- a/services/crunch-dispatch-local/crunch-dispatch-local_test.go
+++ b/services/crunch-dispatch-local/crunch-dispatch-local_test.go
@@ -4,12 +4,11 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
- "io/ioutil"
+ "bytes"
"log"
"net/http"
"net/http/httptest"
"os"
- "strings"
"syscall"
"testing"
"time"
@@ -80,11 +79,11 @@ func (s *TestSuite) Test_doMain(c *C) {
c.Check(err, IsNil)
c.Assert(len(containers.Items), Equals, 0)
- // Previously "Queued" container should now be in "Complete" state
+ // Previously "Queued" container should now be in "Cancelled" state
var container Container
err = arv.Get("containers", "zzzzz-dz642-queuedcontainer", nil, &container)
c.Check(err, IsNil)
- c.Check(container.State, Equals, "Complete")
+ c.Check(container.State, Equals, "Cancelled")
}
func (s *MockArvadosServerSuite) Test_APIErrorGettingContainers(c *C) {
@@ -101,7 +100,7 @@ func (s *MockArvadosServerSuite) Test_APIErrorUpdatingContainerState(c *C) {
apiStubResponses["/arvados/v1/containers/zzzzz-dz642-xxxxxxxxxxxxxx1"] =
arvadostest.StubResponse{500, string(`{}`)}
- testWithServerStub(c, apiStubResponses, "echo", "Error updating container state")
+ testWithServerStub(c, apiStubResponses, "echo", "Error updating container zzzzz-dz642-xxxxxxxxxxxxxx1 to 'Locked' state")
}
func (s *MockArvadosServerSuite) Test_ContainerStillInRunningAfterRun(c *C) {
@@ -139,10 +138,9 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
Retries: 0,
}
- tempfile, err := ioutil.TempFile(os.TempDir(), "temp-log-file")
- c.Check(err, IsNil)
- defer os.Remove(tempfile.Name())
- log.SetOutput(tempfile)
+ buf := bytes.NewBuffer(nil)
+ log.SetOutput(buf)
+ defer log.SetOutput(os.Stderr)
go func() {
time.Sleep(2 * time.Second)
@@ -154,6 +152,5 @@ func testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubRespon
// Wait for all running crunch jobs to complete / terminate
waitGroup.Wait()
- buf, _ := ioutil.ReadFile(tempfile.Name())
- c.Check(strings.Contains(string(buf), expected), Equals, true)
+ c.Check(buf.String(), Matches, `(?ms).*`+expected+`.*`)
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list