[ARVADOS] created: 1.2.0-91-ge7b3a06c0

Git user git at public.curoverse.com
Fri Sep 21 16:50:53 EDT 2018


        at  e7b3a06c0176f74082db9b436f773af2003cb824 (commit)


commit e7b3a06c0176f74082db9b436f773af2003cb824
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri Sep 21 16:48:01 2018 -0400

    10181: Fix timing sensitivity in test case.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/logging_test.go b/services/crunch-run/logging_test.go
index f118781c6..64fb791a5 100644
--- a/services/crunch-run/logging_test.go
+++ b/services/crunch-run/logging_test.go
@@ -169,7 +169,10 @@ func (s *LoggingTestSuite) TestLogCheckpoint(c *C) {
 
 		mt, err := cr.LogCollection.MarshalManifest(".")
 		c.Check(err, IsNil)
-		c.Check(mt, Equals, ". 4dc76e0a212bfa30c39d76d8c16da0c0+1038 afc503bc1b9a828b4bb543cb629e936c+78 0:1077:crunch-run.txt 1077:39:stdout.txt\n")
+		// Block packing depends on whether there's a
+		// checkpoint between the two Goodbyes -- either way
+		// the first block will be 4dc76.
+		c.Check(mt, Matches, `. 4dc76e0a212bfa30c39d76d8c16da0c0\+1038 (afc503bc1b9a828b4bb543cb629e936c\+78|90699dc22545cd74a0664303f70bc05a\+39 276b49339fd5203d15a93ff3de11bfb9\+39) 0:1077:crunch-run.txt 1077:39:stdout.txt\n`)
 	}
 }
 

commit 54e25e6ade9290937d855c3fdf9ee3b492706cb4
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri Sep 21 16:34:51 2018 -0400

    10181: Load checkpoint configs from discovery doc if available.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 5d85bcf6c..fc9c6f628 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -196,7 +196,7 @@ var crunchLogThrottlePeriod time.Duration = time.Second * 60
 var crunchLogThrottleLines int64 = 1024
 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
 var crunchLogBytesPerEvent int64 = 4096
-var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
+var crunchLogSecondsBetweenEvents = time.Second
 var crunchLogCheckpointMaxDuration = time.Hour / 2
 var crunchLogCheckpointMaxBytes = int64(1 << 25)
 
@@ -375,38 +375,33 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
 
 // load the rate limit discovery config parameters
 func loadLogThrottleParams(clnt IArvadosClient) {
-	param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
-	if err == nil {
-		crunchLimitLogBytesPerJob = int64(param.(float64))
-	}
-
-	param, err = clnt.Discovery("crunchLogThrottleBytes")
-	if err == nil {
-		crunchLogThrottleBytes = int64(param.(float64))
-	}
-
-	param, err = clnt.Discovery("crunchLogThrottlePeriod")
-	if err == nil {
-		crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
-	}
-
-	param, err = clnt.Discovery("crunchLogThrottleLines")
-	if err == nil {
-		crunchLogThrottleLines = int64(param.(float64))
+	loadDuration := func(dst *time.Duration, key string) {
+		if param, err := clnt.Discovery(key); err != nil {
+			return
+		} else if d, ok := param.(float64); !ok {
+			return
+		} else {
+			*dst = time.Duration(d) * time.Second
+		}
 	}
-
-	param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
-	if err == nil {
-		crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
+	loadInt64 := func(dst *int64, key string) {
+		if param, err := clnt.Discovery(key); err != nil {
+			return
+		} else if val, ok := param.(float64); !ok {
+			return
+		} else {
+			*dst = int64(val)
+		}
 	}
 
-	param, err = clnt.Discovery("crunchLogBytesPerEvent")
-	if err == nil {
-		crunchLogBytesPerEvent = int64(param.(float64))
-	}
+	loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
+	loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
+	loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
+	loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
+	loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
+	loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
+	loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
+	loadInt64(&crunchLogCheckpointMaxBytes, "crunchLogCheckpointMaxBytes")
+	loadDuration(&crunchLogCheckpointMaxDuration, "crunchLogCheckpointMaxDuration")
 
-	param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
-	if err == nil {
-		crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
-	}
 }

commit e87d633ee2adaacedd56a74e5a249e6e753359e3
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri Sep 21 16:23:58 2018 -0400

    10181: Cleanup identifiers.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 4319757a6..157cc8184 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1178,13 +1178,13 @@ func (runner *ContainerRunner) WaitFinish() error {
 }
 
 func (runner *ContainerRunner) checkpointLogs() {
-	logCheckpointTicker := time.NewTicker(crunchLogCheckpointMaxDuration / 360)
-	defer logCheckpointTicker.Stop()
+	ticker := time.NewTicker(crunchLogCheckpointMaxDuration / 360)
+	defer ticker.Stop()
 
-	logCheckpointTime := time.Now().Add(crunchLogCheckpointMaxDuration)
-	logCheckpointBytes := crunchLogCheckpointMaxBytes
+	saveAtTime := time.Now().Add(crunchLogCheckpointMaxDuration)
+	saveAtSize := crunchLogCheckpointMaxBytes
 	var savedSize int64
-	for range logCheckpointTicker.C {
+	for range ticker.C {
 		runner.logMtx.Lock()
 		done := runner.LogsPDH != nil
 		runner.logMtx.Unlock()
@@ -1192,11 +1192,11 @@ func (runner *ContainerRunner) checkpointLogs() {
 			return
 		}
 		size := runner.LogCollection.Size()
-		if size == savedSize || (time.Now().Before(logCheckpointTime) && size < logCheckpointBytes) {
+		if size == savedSize || (time.Now().Before(saveAtTime) && size < saveAtSize) {
 			continue
 		}
-		logCheckpointTime = time.Now().Add(crunchLogCheckpointMaxDuration)
-		logCheckpointBytes = runner.LogCollection.Size() + crunchLogCheckpointMaxBytes
+		saveAtTime = time.Now().Add(crunchLogCheckpointMaxDuration)
+		saveAtSize = runner.LogCollection.Size() + crunchLogCheckpointMaxBytes
 		saved, err := runner.saveLogCollection()
 		if err != nil {
 			runner.CrunchLog.Printf("error updating log collection: %s", err)

commit f51f7c4548948bcaa36999915ab8a1714825efcf
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Fri Sep 21 15:30:05 2018 -0400

    10181: Update container log PDH after updating log collection.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index e289c824c..4319757a6 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1197,11 +1197,21 @@ func (runner *ContainerRunner) checkpointLogs() {
 		}
 		logCheckpointTime = time.Now().Add(crunchLogCheckpointMaxDuration)
 		logCheckpointBytes = runner.LogCollection.Size() + crunchLogCheckpointMaxBytes
-		_, err := runner.saveLogCollection()
+		saved, err := runner.saveLogCollection()
 		if err != nil {
 			runner.CrunchLog.Printf("error updating log collection: %s", err)
 			continue
 		}
+
+		var updated arvados.Container
+		err = runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+			"container": arvadosclient.Dict{"log": saved.PortableDataHash},
+		}, &updated)
+		if err != nil {
+			runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
+			continue
+		}
+
 		savedSize = size
 	}
 }

commit df50f6c77c345507502eae91bcb4713b5f8bcd5b
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Sep 20 13:52:47 2018 -0400

    10181: Permit dispatcher to update log while container is running.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/doc/api/methods/containers.html.textile.liquid b/doc/api/methods/containers.html.textile.liquid
index 9ebd91d2b..e073d1c6a 100644
--- a/doc/api/methods/containers.html.textile.liquid
+++ b/doc/api/methods/containers.html.textile.liquid
@@ -28,7 +28,7 @@ table(table table-bordered table-condensed).
 |state|string|The allowed states are "Queued", "Locked", "Running", "Cancelled" and "Complete".|See "Container states":#container_states for more details.|
 |started_at|datetime|When this container started running.|Null if container has not yet started.|
 |finished_at|datetime|When this container finished.|Null if container has not yet finished.|
-|log|string|Portable data hash of the collection containing logs from a completed container run.|Null if the container is not yet finished.|
+|log|string|UUID or portable data hash of a collection containing the log messages produced when executing the container.|PDH after the container is finished, otherwise UUID or null.|
 |environment|hash|Environment variables and values that should be set in the container environment (@docker run --env@). This augments and (when conflicts exist) overrides environment variables given in the image's Dockerfile.|Must be equal to a ContainerRequest's environment in order to satisfy the ContainerRequest.|
 |cwd|string|Initial working directory.|Must be equal to a ContainerRequest's cwd in order to satisfy the ContainerRequest|
 |command|array of strings|Command to execute.| Must be equal to a ContainerRequest's command in order to satisfy the ContainerRequest.|
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index 5fea2d5bc..e8a70499a 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -377,24 +377,11 @@ class Container < ArvadosModel
     current_user.andand.is_admin
   end
 
-  def permission_to_update
-    # Override base permission check to allow auth_uuid to set progress and
-    # output (only).  Whether it is legal to set progress and output in the current
-    # state has already been checked in validate_change.
-    current_user.andand.is_admin ||
-      (!current_api_client_authorization.nil? and
-       [self.auth_uuid, self.locked_by_uuid].include? current_api_client_authorization.uuid)
-  end
-
   def ensure_owner_uuid_is_permitted
-    # Override base permission check to allow auth_uuid to set progress and
-    # output (only).  Whether it is legal to set progress and output in the current
-    # state has already been checked in validate_change.
-    if !current_api_client_authorization.nil? and self.auth_uuid == current_api_client_authorization.uuid
-      check_update_whitelist [:progress, :output]
-    else
-      super
-    end
+    # validate_change ensures owner_uuid can't be changed at all --
+    # except during create, which requires admin privileges. Checking
+    # permission here would be superfluous.
+    true
   end
 
   def set_timestamps
@@ -420,6 +407,8 @@ class Container < ArvadosModel
 
   def validate_change
     permitted = [:state]
+    progress_attrs = [:progress, :runtime_status, :log, :output]
+    final_attrs = [:exit_code, :finished_at]
 
     if self.new_record?
       permitted.push(:owner_uuid, :command, :container_image, :cwd,
@@ -430,26 +419,26 @@ class Container < ArvadosModel
 
     case self.state
     when Locked
-      permitted.push :priority, :runtime_status
+      permitted.push :priority, :runtime_status, :log
 
     when Queued
       permitted.push :priority
 
     when Running
-      permitted.push :priority, :progress, :output, :runtime_status
+      permitted.push :priority, *progress_attrs
       if self.state_changed?
         permitted.push :started_at
       end
 
     when Complete
       if self.state_was == Running
-        permitted.push :finished_at, :output, :log, :exit_code
+        permitted.push *final_attrs, *progress_attrs
       end
 
     when Cancelled
       case self.state_was
       when Running
-        permitted.push :finished_at, :output, :log
+        permitted.push :finished_at, *progress_attrs
       when Queued, Locked
         permitted.push :finished_at, :log
       end
@@ -459,6 +448,15 @@ class Container < ArvadosModel
       return false
     end
 
+    if current_api_client_authorization.andand.uuid.andand == self.auth_uuid
+      # The contained process itself can update progress indicators,
+      # but can't change priority etc.
+      permitted = permitted & [:state, :progress, :output]
+    elsif self.locked_by_uuid && self.locked_by_uuid != current_api_client_authorization.andand.uuid
+      # When locked, progress fields cannot be updated by the wrong
+      # dispatcher, even though it has admin privileges.
+      permitted = permitted - progress_attrs
+    end
     check_update_whitelist permitted
   end
 
diff --git a/services/api/test/unit/container_test.rb b/services/api/test/unit/container_test.rb
index b8acd4fd0..0c5e2e7ad 100644
--- a/services/api/test/unit/container_test.rb
+++ b/services/api/test/unit/container_test.rb
@@ -652,32 +652,41 @@ class ContainerTest < ActiveSupport::TestCase
     assert c.update_attributes(exit_code: 1, state: Container::Complete)
   end
 
-  test "locked_by_uuid can set output on running container" do
+  test "locked_by_uuid can update log when locked/running, and output when running" do
     c, _ = minimal_new
     set_user_from_auth :dispatch1
     c.lock
-    c.update_attributes! state: Container::Running
-
     assert_equal c.locked_by_uuid, Thread.current[:api_client_authorization].uuid
+    assert c.update_attributes(log: collections(:real_log_collection).portable_data_hash)
+    assert c.update_attributes(state: Container::Running)
 
-    assert c.update_attributes output: collections(:collection_owned_by_active).portable_data_hash
-    assert c.update_attributes! state: Container::Complete
+    assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
+    assert c.update_attributes(log: nil)
+    assert c.update_attributes(log: collections(:real_log_collection).portable_data_hash)
+    assert c.update_attributes(state: Container::Complete, log: collections(:real_log_collection).portable_data_hash)
+    c.reload
+    assert_equal c.output, collections(:collection_owned_by_active).portable_data_hash
+    assert_equal c.log, collections(:real_log_collection).portable_data_hash
+    refute c.update_attributes(output: nil)
+    refute c.update_attributes(log: nil)
   end
 
-  test "auth_uuid can set output on running container, but not change container state" do
+  test "auth_uuid can set output, progress on running container -- but not state, log" do
     c, _ = minimal_new
     set_user_from_auth :dispatch1
     c.lock
     c.update_attributes! state: Container::Running
 
-    Thread.current[:api_client_authorization] = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
-    Thread.current[:user] = User.find_by_id(Thread.current[:api_client_authorization].user_id)
-    assert c.update_attributes output: collections(:collection_owned_by_active).portable_data_hash
+    auth = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
+    Thread.current[:api_client_authorization] = auth
+    Thread.current[:api_client] = auth.api_client
+    Thread.current[:token] = auth.token
+    Thread.current[:user] = auth.user
 
-    assert_raises ArvadosModel::PermissionDeniedError do
-      # auth_uuid cannot set container state
-      c.update_attributes state: Container::Complete
-    end
+    assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
+    assert c.update_attributes(progress: 0.5)
+    refute c.update_attributes(log: collections(:real_log_collection).portable_data_hash)
+    refute c.update_attributes(state: Container::Complete)
   end
 
   test "not allowed to set output that is not readable by current user" do
@@ -701,9 +710,7 @@ class ContainerTest < ActiveSupport::TestCase
     c.update_attributes! state: Container::Running
 
     set_user_from_auth :running_to_be_deleted_container_auth
-    assert_raises ArvadosModel::PermissionDeniedError do
-      c.update_attributes! output: collections(:foo_file).portable_data_hash
-    end
+    refute c.update_attributes(output: collections(:foo_file).portable_data_hash)
   end
 
   test "can set trashed output on running container" do

commit 8f592b4a53b368f82ff8be375aa163728699b5a9
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Sep 20 01:04:52 2018 -0400

    10181: Save log collection snapshots periodically during run.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 730194b82..e289c824c 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -91,37 +91,39 @@ type PsProcess interface {
 // ContainerRunner is the main stateful struct used for a single execution of a
 // container.
 type ContainerRunner struct {
-	Docker    ThinDockerClient
-	client    *arvados.Client
-	ArvClient IArvadosClient
-	Kc        IKeepClient
-	arvados.Container
+	Docker          ThinDockerClient
+	client          *arvados.Client
+	ArvClient       IArvadosClient
+	Kc              IKeepClient
+	Container       arvados.Container
 	ContainerConfig dockercontainer.Config
-	dockercontainer.HostConfig
-	token       string
-	ContainerID string
-	ExitCode    *int
-	NewLogWriter
-	loggingDone   chan bool
-	CrunchLog     *ThrottledLogger
-	Stdout        io.WriteCloser
-	Stderr        io.WriteCloser
-	LogCollection arvados.CollectionFileSystem
-	LogsPDH       *string
-	RunArvMount
-	MkTempDir
-	ArvMount      *exec.Cmd
-	ArvMountPoint string
-	HostOutputDir string
-	Binds         []string
-	Volumes       map[string]struct{}
-	OutputPDH     *string
-	SigChan       chan os.Signal
-	ArvMountExit  chan error
-	SecretMounts  map[string]arvados.Mount
-	MkArvClient   func(token string) (IArvadosClient, error)
-	finalState    string
-	parentTemp    string
+	HostConfig      dockercontainer.HostConfig
+	token           string
+	ContainerID     string
+	ExitCode        *int
+	NewLogWriter    NewLogWriter
+	loggingDone     chan bool
+	CrunchLog       *ThrottledLogger
+	Stdout          io.WriteCloser
+	Stderr          io.WriteCloser
+	logUUID         string
+	logMtx          sync.Mutex
+	LogCollection   arvados.CollectionFileSystem
+	LogsPDH         *string
+	RunArvMount     RunArvMount
+	MkTempDir       MkTempDir
+	ArvMount        *exec.Cmd
+	ArvMountPoint   string
+	HostOutputDir   string
+	Binds           []string
+	Volumes         map[string]struct{}
+	OutputPDH       *string
+	SigChan         chan os.Signal
+	ArvMountExit    chan error
+	SecretMounts    map[string]arvados.Mount
+	MkArvClient     func(token string) (IArvadosClient, error)
+	finalState      string
+	parentTemp      string
 
 	ListProcesses func() ([]PsProcess, error)
 
@@ -1175,6 +1177,35 @@ func (runner *ContainerRunner) WaitFinish() error {
 	}
 }
 
+func (runner *ContainerRunner) checkpointLogs() {
+	logCheckpointTicker := time.NewTicker(crunchLogCheckpointMaxDuration / 360)
+	defer logCheckpointTicker.Stop()
+
+	logCheckpointTime := time.Now().Add(crunchLogCheckpointMaxDuration)
+	logCheckpointBytes := crunchLogCheckpointMaxBytes
+	var savedSize int64
+	for range logCheckpointTicker.C {
+		runner.logMtx.Lock()
+		done := runner.LogsPDH != nil
+		runner.logMtx.Unlock()
+		if done {
+			return
+		}
+		size := runner.LogCollection.Size()
+		if size == savedSize || (time.Now().Before(logCheckpointTime) && size < logCheckpointBytes) {
+			continue
+		}
+		logCheckpointTime = time.Now().Add(crunchLogCheckpointMaxDuration)
+		logCheckpointBytes = runner.LogCollection.Size() + crunchLogCheckpointMaxBytes
+		_, err := runner.saveLogCollection()
+		if err != nil {
+			runner.CrunchLog.Printf("error updating log collection: %s", err)
+			continue
+		}
+		savedSize = size
+	}
+}
+
 // CaptureOutput saves data from the container's output directory if
 // needed, and updates the container output accordingly.
 func (runner *ContainerRunner) CaptureOutput() error {
@@ -1312,26 +1343,45 @@ func (runner *ContainerRunner) CommitLogs() error {
 		// -- it exists only to send logs to other channels.
 		return nil
 	}
+	saved, err := runner.saveLogCollection()
+	if err != nil {
+		return err
+	}
+	runner.logMtx.Lock()
+	defer runner.logMtx.Unlock()
+	runner.LogsPDH = &saved.PortableDataHash
+	return nil
+}
 
+func (runner *ContainerRunner) saveLogCollection() (response arvados.Collection, err error) {
+	runner.logMtx.Lock()
+	defer runner.logMtx.Unlock()
+	if runner.LogsPDH != nil {
+		// Already finalized.
+		return
+	}
 	mt, err := runner.LogCollection.MarshalManifest(".")
 	if err != nil {
-		return fmt.Errorf("While creating log manifest: %v", err)
-	}
-
-	var response arvados.Collection
-	err = runner.ArvClient.Create("collections",
-		arvadosclient.Dict{
-			"ensure_unique_name": true,
-			"collection": arvadosclient.Dict{
-				"is_trashed":    true,
-				"name":          "logs for " + runner.Container.UUID,
-				"manifest_text": mt}},
-		&response)
+		err = fmt.Errorf("error creating log manifest: %v", err)
+		return
+	}
+	reqBody := arvadosclient.Dict{
+		"collection": arvadosclient.Dict{
+			"is_trashed":    true,
+			"name":          "logs for " + runner.Container.UUID,
+			"manifest_text": mt}}
+	if runner.logUUID == "" {
+		reqBody["ensure_unique_name"] = true
+		err = runner.ArvClient.Create("collections", reqBody, &response)
+	} else {
+		err = runner.ArvClient.Update("collections", runner.logUUID, reqBody, &response)
+	}
 	if err != nil {
-		return fmt.Errorf("While creating log collection: %v", err)
+		err = fmt.Errorf("error saving log collection: %v", err)
+		return
 	}
-	runner.LogsPDH = &response.PortableDataHash
-	return nil
+	runner.logUUID = response.UUID
+	return
 }
 
 // UpdateContainerRunning updates the container state to "Running"
@@ -1630,6 +1680,7 @@ func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClie
 	cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
 
 	loadLogThrottleParams(api)
+	go cr.checkpointLogs()
 
 	return cr, nil
 }
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 8d8e04000..217d4236b 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -230,6 +230,7 @@ func (client *ArvTestClient) Create(resourceType string,
 		mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
 		outmap := output.(*arvados.Collection)
 		outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
+		outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%15.15x", md5.Sum([]byte(mt)))
 	}
 
 	return nil
@@ -316,6 +317,10 @@ func (client *ArvTestClient) Update(resourceType string, uuid string, parameters
 		if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
 			client.WasSetRunning = true
 		}
+	} else if resourceType == "collections" {
+		mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
+		output.(*arvados.Collection).UUID = uuid
+		output.(*arvados.Collection).PortableDataHash = fmt.Sprintf("%x", md5.Sum([]byte(mt)))
 	}
 	return nil
 }
@@ -1142,7 +1147,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		cr.ArvMountPoint = ""
 		cr.Container.Mounts = make(map[string]arvados.Mount)
 		cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
-		cr.OutputPath = "/tmp"
+		cr.Container.OutputPath = "/tmp"
 		cr.statInterval = 5 * time.Second
 		err := cr.SetupMounts()
 		c.Check(err, IsNil)
@@ -1161,7 +1166,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		cr.Container.Mounts = make(map[string]arvados.Mount)
 		cr.Container.Mounts["/out"] = arvados.Mount{Kind: "tmp"}
 		cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
-		cr.OutputPath = "/out"
+		cr.Container.OutputPath = "/out"
 
 		err := cr.SetupMounts()
 		c.Check(err, IsNil)
@@ -1179,7 +1184,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		cr.ArvMountPoint = ""
 		cr.Container.Mounts = make(map[string]arvados.Mount)
 		cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
-		cr.OutputPath = "/tmp"
+		cr.Container.OutputPath = "/tmp"
 
 		apiflag := true
 		cr.Container.RuntimeConstraints.API = &apiflag
@@ -1203,7 +1208,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 		cr.Container.Mounts = map[string]arvados.Mount{
 			"/keeptmp": {Kind: "collection", Writable: true},
 		}
-		cr.OutputPath = "/keeptmp"
+		cr.Container.OutputPath = "/keeptmp"
 
 		os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
 
@@ -1225,7 +1230,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 			"/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
 			"/keepout": {Kind: "collection", Writable: true},
 		}
-		cr.OutputPath = "/keepout"
+		cr.Container.OutputPath = "/keepout"
 
 		os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
 		os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
@@ -1251,7 +1256,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 			"/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
 			"/keepout": {Kind: "collection", Writable: true},
 		}
-		cr.OutputPath = "/keepout"
+		cr.Container.OutputPath = "/keepout"
 
 		os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
 		os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
@@ -1332,7 +1337,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 			"/tmp":     {Kind: "tmp"},
 			"/tmp/foo": {Kind: "collection"},
 		}
-		cr.OutputPath = "/tmp"
+		cr.Container.OutputPath = "/tmp"
 
 		os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
 
@@ -1362,7 +1367,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 				Path:             "baz",
 				Writable:         true},
 		}
-		cr.OutputPath = "/tmp"
+		cr.Container.OutputPath = "/tmp"
 
 		os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
 		os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz", os.ModePerm)
@@ -1391,7 +1396,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 			"/tmp":     {Kind: "tmp"},
 			"/tmp/foo": {Kind: "tmp"},
 		}
-		cr.OutputPath = "/tmp"
+		cr.Container.OutputPath = "/tmp"
 
 		err := cr.SetupMounts()
 		c.Check(err, NotNil)
@@ -1439,7 +1444,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
 				Path:   "/",
 			},
 		}
-		cr.OutputPath = "/tmp"
+		cr.Container.OutputPath = "/tmp"
 
 		err := cr.SetupMounts()
 		c.Check(err, IsNil)
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index ce0a66126..5d85bcf6c 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -197,6 +197,8 @@ var crunchLogThrottleLines int64 = 1024
 var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
 var crunchLogBytesPerEvent int64 = 4096
 var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
+var crunchLogCheckpointMaxDuration = time.Hour / 2
+var crunchLogCheckpointMaxBytes = int64(1 << 25)
 
 // ArvLogWriter is an io.WriteCloser that processes each write by
 // writing it through to another io.WriteCloser (typically a
diff --git a/services/crunch-run/logging_test.go b/services/crunch-run/logging_test.go
index 13a171ae8..f118781c6 100644
--- a/services/crunch-run/logging_test.go
+++ b/services/crunch-run/logging_test.go
@@ -37,6 +37,8 @@ var _ = Suite(&LoggingTestSuite{})
 
 func (s *LoggingTestSuite) SetUpTest(c *C) {
 	s.client = arvados.NewClientFromEnv()
+	crunchLogCheckpointMaxDuration = time.Hour * 24 * 365
+	crunchLogCheckpointMaxBytes = 1 << 50
 }
 
 func (s *LoggingTestSuite) TestWriteLogs(c *C) {
@@ -129,6 +131,48 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
 	c.Check(mt, Equals, ". 48f9023dc683a850b1c9b482b14c4b97+163 0:83:crunch-run.txt 83:80:stdout.txt\n")
 }
 
+func (s *LoggingTestSuite) TestLogCheckpoint(c *C) {
+	for _, trial := range []struct {
+		maxBytes    int64
+		maxDuration time.Duration
+	}{
+		{1000, 10 * time.Second},
+		{1000000, time.Millisecond},
+	} {
+		c.Logf("max %d bytes, %s", trial.maxBytes, trial.maxDuration)
+		crunchLogCheckpointMaxBytes = trial.maxBytes
+		crunchLogCheckpointMaxDuration = trial.maxDuration
+
+		api := &ArvTestClient{}
+		kc := &KeepTestClient{}
+		defer kc.Close()
+		cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+		c.Assert(err, IsNil)
+		ts := &TestTimestamper{}
+		cr.CrunchLog.Timestamper = ts.Timestamp
+		w, err := cr.NewLogWriter("stdout")
+		c.Assert(err, IsNil)
+		stdout := NewThrottledLogger(w)
+		stdout.Timestamper = ts.Timestamp
+
+		c.Check(cr.logUUID, Equals, "")
+		cr.CrunchLog.Printf("Hello %1000s", "space")
+		for i, t := 0, time.NewTicker(time.Millisecond); i < 5000 && cr.logUUID == ""; i++ {
+			<-t.C
+		}
+		c.Check(cr.logUUID, Not(Equals), "")
+		cr.CrunchLog.Print("Goodbye")
+		fmt.Fprint(stdout, "Goodbye\n")
+		cr.CrunchLog.Close()
+		stdout.Close()
+		w.Close()
+
+		mt, err := cr.LogCollection.MarshalManifest(".")
+		c.Check(err, IsNil)
+		c.Check(mt, Equals, ". 4dc76e0a212bfa30c39d76d8c16da0c0+1038 afc503bc1b9a828b4bb543cb629e936c+78 0:1077:crunch-run.txt 1077:39:stdout.txt\n")
+	}
+}
+
 func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) {
 	s.testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
 }

commit 8f1d40a2256f12bd16f07c10dc4d1f0c3e8ddc8f
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Sep 20 00:59:35 2018 -0400

    10181: Add Size method to arvados.CollectionFileSystem.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 7ce37aa24..bcf2f4810 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -31,6 +31,8 @@ type CollectionFileSystem interface {
 	// Prefix (normally ".") is a top level directory, effectively
 	// prepended to all paths in the returned manifest.
 	MarshalManifest(prefix string) (string, error)
+
+	Size() int64
 }
 
 type collectionFileSystem struct {
@@ -139,6 +141,10 @@ func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
 	return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
 }
 
+func (fs *collectionFileSystem) Size() int64 {
+	return fs.fileSystem.root.(*dirnode).Size()
+}
+
 // filenodePtr is an offset into a file that is (usually) efficient to
 // seek to. Specifically, if filenode.repacked==filenodePtr.repacked
 // then
@@ -877,6 +883,18 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
 	return
 }
 
+func (dn *dirnode) Size() (bytes int64) {
+	dn.RLock()
+	defer dn.RUnlock()
+	for _, i := range dn.inodes {
+		switch i := i.(type) {
+		case *filenode, *dirnode:
+			bytes += i.Size()
+		}
+	}
+	return
+}
+
 type segment interface {
 	io.ReaderAt
 	Len() int
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index d2f55d0e3..ac3bbbbad 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -353,6 +353,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
 	c.Check(err, check.IsNil)
 	m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
 	c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
+	c.Check(s.fs.Size(), check.Equals, int64(6))
 }
 
 func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
@@ -1060,6 +1061,7 @@ func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
 	f, err := coll.FileSystem(nil, nil)
 	c.Check(err, check.IsNil)
 	c.Logf("%s loaded", time.Now())
+	c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
 
 	for i := 0; i < dirCount; i++ {
 		for j := 0; j < fileCount; j++ {

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list