[ARVADOS] created: 1.2.0-91-ge7b3a06c0
Git user
git at public.curoverse.com
Fri Sep 21 16:50:53 EDT 2018
at e7b3a06c0176f74082db9b436f773af2003cb824 (commit)
commit e7b3a06c0176f74082db9b436f773af2003cb824
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Sep 21 16:48:01 2018 -0400
10181: Fix timing sensitivity in test case.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/crunch-run/logging_test.go b/services/crunch-run/logging_test.go
index f118781c6..64fb791a5 100644
--- a/services/crunch-run/logging_test.go
+++ b/services/crunch-run/logging_test.go
@@ -169,7 +169,10 @@ func (s *LoggingTestSuite) TestLogCheckpoint(c *C) {
mt, err := cr.LogCollection.MarshalManifest(".")
c.Check(err, IsNil)
- c.Check(mt, Equals, ". 4dc76e0a212bfa30c39d76d8c16da0c0+1038 afc503bc1b9a828b4bb543cb629e936c+78 0:1077:crunch-run.txt 1077:39:stdout.txt\n")
+ // Block packing depends on whether there's a
+ // checkpoint between the two Goodbyes -- either way
+ // the first block will be 4dc76.
+ c.Check(mt, Matches, `. 4dc76e0a212bfa30c39d76d8c16da0c0\+1038 (afc503bc1b9a828b4bb543cb629e936c\+78|90699dc22545cd74a0664303f70bc05a\+39 276b49339fd5203d15a93ff3de11bfb9\+39) 0:1077:crunch-run.txt 1077:39:stdout.txt\n`)
}
}
commit 54e25e6ade9290937d855c3fdf9ee3b492706cb4
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Sep 21 16:34:51 2018 -0400
10181: Load checkpoint configs from discovery doc if available.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index 5d85bcf6c..fc9c6f628 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -196,7 +196,7 @@ var crunchLogThrottlePeriod time.Duration = time.Second * 60
var crunchLogThrottleLines int64 = 1024
var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
var crunchLogBytesPerEvent int64 = 4096
-var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
+var crunchLogSecondsBetweenEvents = time.Second
var crunchLogCheckpointMaxDuration = time.Hour / 2
var crunchLogCheckpointMaxBytes = int64(1 << 25)
@@ -375,38 +375,33 @@ func (arvlog *ArvLogWriter) rateLimit(line []byte, now time.Time) (bool, []byte)
// load the rate limit discovery config parameters
func loadLogThrottleParams(clnt IArvadosClient) {
- param, err := clnt.Discovery("crunchLimitLogBytesPerJob")
- if err == nil {
- crunchLimitLogBytesPerJob = int64(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottleBytes")
- if err == nil {
- crunchLogThrottleBytes = int64(param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottlePeriod")
- if err == nil {
- crunchLogThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
- }
-
- param, err = clnt.Discovery("crunchLogThrottleLines")
- if err == nil {
- crunchLogThrottleLines = int64(param.(float64))
+ loadDuration := func(dst *time.Duration, key string) {
+ if param, err := clnt.Discovery(key); err != nil {
+ return
+ } else if d, ok := param.(float64); !ok {
+ return
+ } else {
+ *dst = time.Duration(d) * time.Second
+ }
}
-
- param, err = clnt.Discovery("crunchLogPartialLineThrottlePeriod")
- if err == nil {
- crunchLogPartialLineThrottlePeriod = time.Duration(float64(time.Second) * param.(float64))
+ loadInt64 := func(dst *int64, key string) {
+ if param, err := clnt.Discovery(key); err != nil {
+ return
+ } else if val, ok := param.(float64); !ok {
+ return
+ } else {
+ *dst = int64(val)
+ }
}
- param, err = clnt.Discovery("crunchLogBytesPerEvent")
- if err == nil {
- crunchLogBytesPerEvent = int64(param.(float64))
- }
+ loadInt64(&crunchLimitLogBytesPerJob, "crunchLimitLogBytesPerJob")
+ loadInt64(&crunchLogThrottleBytes, "crunchLogThrottleBytes")
+ loadDuration(&crunchLogThrottlePeriod, "crunchLogThrottlePeriod")
+ loadInt64(&crunchLogThrottleLines, "crunchLogThrottleLines")
+ loadDuration(&crunchLogPartialLineThrottlePeriod, "crunchLogPartialLineThrottlePeriod")
+ loadInt64(&crunchLogBytesPerEvent, "crunchLogBytesPerEvent")
+ loadDuration(&crunchLogSecondsBetweenEvents, "crunchLogSecondsBetweenEvents")
+ loadInt64(&crunchLogCheckpointMaxBytes, "crunchLogCheckpointMaxBytes")
+ loadDuration(&crunchLogCheckpointMaxDuration, "crunchLogCheckpointMaxDuration")
- param, err = clnt.Discovery("crunchLogSecondsBetweenEvents")
- if err == nil {
- crunchLogSecondsBetweenEvents = time.Duration(float64(time.Second) * param.(float64))
- }
}
commit e87d633ee2adaacedd56a74e5a249e6e753359e3
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Sep 21 16:23:58 2018 -0400
10181: Cleanup identifiers.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 4319757a6..157cc8184 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1178,13 +1178,13 @@ func (runner *ContainerRunner) WaitFinish() error {
}
func (runner *ContainerRunner) checkpointLogs() {
- logCheckpointTicker := time.NewTicker(crunchLogCheckpointMaxDuration / 360)
- defer logCheckpointTicker.Stop()
+ ticker := time.NewTicker(crunchLogCheckpointMaxDuration / 360)
+ defer ticker.Stop()
- logCheckpointTime := time.Now().Add(crunchLogCheckpointMaxDuration)
- logCheckpointBytes := crunchLogCheckpointMaxBytes
+ saveAtTime := time.Now().Add(crunchLogCheckpointMaxDuration)
+ saveAtSize := crunchLogCheckpointMaxBytes
var savedSize int64
- for range logCheckpointTicker.C {
+ for range ticker.C {
runner.logMtx.Lock()
done := runner.LogsPDH != nil
runner.logMtx.Unlock()
@@ -1192,11 +1192,11 @@ func (runner *ContainerRunner) checkpointLogs() {
return
}
size := runner.LogCollection.Size()
- if size == savedSize || (time.Now().Before(logCheckpointTime) && size < logCheckpointBytes) {
+ if size == savedSize || (time.Now().Before(saveAtTime) && size < saveAtSize) {
continue
}
- logCheckpointTime = time.Now().Add(crunchLogCheckpointMaxDuration)
- logCheckpointBytes = runner.LogCollection.Size() + crunchLogCheckpointMaxBytes
+ saveAtTime = time.Now().Add(crunchLogCheckpointMaxDuration)
+ saveAtSize = runner.LogCollection.Size() + crunchLogCheckpointMaxBytes
saved, err := runner.saveLogCollection()
if err != nil {
runner.CrunchLog.Printf("error updating log collection: %s", err)
commit f51f7c4548948bcaa36999915ab8a1714825efcf
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Fri Sep 21 15:30:05 2018 -0400
10181: Update container log PDH after updating log collection.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index e289c824c..4319757a6 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -1197,11 +1197,21 @@ func (runner *ContainerRunner) checkpointLogs() {
}
logCheckpointTime = time.Now().Add(crunchLogCheckpointMaxDuration)
logCheckpointBytes = runner.LogCollection.Size() + crunchLogCheckpointMaxBytes
- _, err := runner.saveLogCollection()
+ saved, err := runner.saveLogCollection()
if err != nil {
runner.CrunchLog.Printf("error updating log collection: %s", err)
continue
}
+
+ var updated arvados.Container
+ err = runner.ArvClient.Update("containers", runner.Container.UUID, arvadosclient.Dict{
+ "container": arvadosclient.Dict{"log": saved.PortableDataHash},
+ }, &updated)
+ if err != nil {
+ runner.CrunchLog.Printf("error updating container log to %s: %s", saved.PortableDataHash, err)
+ continue
+ }
+
savedSize = size
}
}
commit df50f6c77c345507502eae91bcb4713b5f8bcd5b
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Sep 20 13:52:47 2018 -0400
10181: Permit dispatcher to update log while container is running.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/doc/api/methods/containers.html.textile.liquid b/doc/api/methods/containers.html.textile.liquid
index 9ebd91d2b..e073d1c6a 100644
--- a/doc/api/methods/containers.html.textile.liquid
+++ b/doc/api/methods/containers.html.textile.liquid
@@ -28,7 +28,7 @@ table(table table-bordered table-condensed).
|state|string|The allowed states are "Queued", "Locked", "Running", "Cancelled" and "Complete".|See "Container states":#container_states for more details.|
|started_at|datetime|When this container started running.|Null if container has not yet started.|
|finished_at|datetime|When this container finished.|Null if container has not yet finished.|
-|log|string|Portable data hash of the collection containing logs from a completed container run.|Null if the container is not yet finished.|
+|log|string|UUID or portable data hash of a collection containing the log messages produced when executing the container.|PDH after the container is finished, otherwise UUID or null.|
|environment|hash|Environment variables and values that should be set in the container environment (@docker run --env@). This augments and (when conflicts exist) overrides environment variables given in the image's Dockerfile.|Must be equal to a ContainerRequest's environment in order to satisfy the ContainerRequest.|
|cwd|string|Initial working directory.|Must be equal to a ContainerRequest's cwd in order to satisfy the ContainerRequest|
|command|array of strings|Command to execute.| Must be equal to a ContainerRequest's command in order to satisfy the ContainerRequest.|
diff --git a/services/api/app/models/container.rb b/services/api/app/models/container.rb
index 5fea2d5bc..e8a70499a 100644
--- a/services/api/app/models/container.rb
+++ b/services/api/app/models/container.rb
@@ -377,24 +377,11 @@ class Container < ArvadosModel
current_user.andand.is_admin
end
- def permission_to_update
- # Override base permission check to allow auth_uuid to set progress and
- # output (only). Whether it is legal to set progress and output in the current
- # state has already been checked in validate_change.
- current_user.andand.is_admin ||
- (!current_api_client_authorization.nil? and
- [self.auth_uuid, self.locked_by_uuid].include? current_api_client_authorization.uuid)
- end
-
def ensure_owner_uuid_is_permitted
- # Override base permission check to allow auth_uuid to set progress and
- # output (only). Whether it is legal to set progress and output in the current
- # state has already been checked in validate_change.
- if !current_api_client_authorization.nil? and self.auth_uuid == current_api_client_authorization.uuid
- check_update_whitelist [:progress, :output]
- else
- super
- end
+ # validate_change ensures owner_uuid can't be changed at all --
+ # except during create, which requires admin privileges. Checking
+ # permission here would be superfluous.
+ true
end
def set_timestamps
@@ -420,6 +407,8 @@ class Container < ArvadosModel
def validate_change
permitted = [:state]
+ progress_attrs = [:progress, :runtime_status, :log, :output]
+ final_attrs = [:exit_code, :finished_at]
if self.new_record?
permitted.push(:owner_uuid, :command, :container_image, :cwd,
@@ -430,26 +419,26 @@ class Container < ArvadosModel
case self.state
when Locked
- permitted.push :priority, :runtime_status
+ permitted.push :priority, :runtime_status, :log
when Queued
permitted.push :priority
when Running
- permitted.push :priority, :progress, :output, :runtime_status
+ permitted.push :priority, *progress_attrs
if self.state_changed?
permitted.push :started_at
end
when Complete
if self.state_was == Running
- permitted.push :finished_at, :output, :log, :exit_code
+ permitted.push *final_attrs, *progress_attrs
end
when Cancelled
case self.state_was
when Running
- permitted.push :finished_at, :output, :log
+ permitted.push :finished_at, *progress_attrs
when Queued, Locked
permitted.push :finished_at, :log
end
@@ -459,6 +448,15 @@ class Container < ArvadosModel
return false
end
+ if current_api_client_authorization.andand.uuid.andand == self.auth_uuid
+ # The contained process itself can update progress indicators,
+ # but can't change priority etc.
+ permitted = permitted & [:state, :progress, :output]
+ elsif self.locked_by_uuid && self.locked_by_uuid != current_api_client_authorization.andand.uuid
+ # When locked, progress fields cannot be updated by the wrong
+ # dispatcher, even though it has admin privileges.
+ permitted = permitted - progress_attrs
+ end
check_update_whitelist permitted
end
diff --git a/services/api/test/unit/container_test.rb b/services/api/test/unit/container_test.rb
index b8acd4fd0..0c5e2e7ad 100644
--- a/services/api/test/unit/container_test.rb
+++ b/services/api/test/unit/container_test.rb
@@ -652,32 +652,41 @@ class ContainerTest < ActiveSupport::TestCase
assert c.update_attributes(exit_code: 1, state: Container::Complete)
end
- test "locked_by_uuid can set output on running container" do
+ test "locked_by_uuid can update log when locked/running, and output when running" do
c, _ = minimal_new
set_user_from_auth :dispatch1
c.lock
- c.update_attributes! state: Container::Running
-
assert_equal c.locked_by_uuid, Thread.current[:api_client_authorization].uuid
+ assert c.update_attributes(log: collections(:real_log_collection).portable_data_hash)
+ assert c.update_attributes(state: Container::Running)
- assert c.update_attributes output: collections(:collection_owned_by_active).portable_data_hash
- assert c.update_attributes! state: Container::Complete
+ assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
+ assert c.update_attributes(log: nil)
+ assert c.update_attributes(log: collections(:real_log_collection).portable_data_hash)
+ assert c.update_attributes(state: Container::Complete, log: collections(:real_log_collection).portable_data_hash)
+ c.reload
+ assert_equal c.output, collections(:collection_owned_by_active).portable_data_hash
+ assert_equal c.log, collections(:real_log_collection).portable_data_hash
+ refute c.update_attributes(output: nil)
+ refute c.update_attributes(log: nil)
end
- test "auth_uuid can set output on running container, but not change container state" do
+ test "auth_uuid can set output, progress on running container -- but not state, log" do
c, _ = minimal_new
set_user_from_auth :dispatch1
c.lock
c.update_attributes! state: Container::Running
- Thread.current[:api_client_authorization] = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
- Thread.current[:user] = User.find_by_id(Thread.current[:api_client_authorization].user_id)
- assert c.update_attributes output: collections(:collection_owned_by_active).portable_data_hash
+ auth = ApiClientAuthorization.find_by_uuid(c.auth_uuid)
+ Thread.current[:api_client_authorization] = auth
+ Thread.current[:api_client] = auth.api_client
+ Thread.current[:token] = auth.token
+ Thread.current[:user] = auth.user
- assert_raises ArvadosModel::PermissionDeniedError do
- # auth_uuid cannot set container state
- c.update_attributes state: Container::Complete
- end
+ assert c.update_attributes(output: collections(:collection_owned_by_active).portable_data_hash)
+ assert c.update_attributes(progress: 0.5)
+ refute c.update_attributes(log: collections(:real_log_collection).portable_data_hash)
+ refute c.update_attributes(state: Container::Complete)
end
test "not allowed to set output that is not readable by current user" do
@@ -701,9 +710,7 @@ class ContainerTest < ActiveSupport::TestCase
c.update_attributes! state: Container::Running
set_user_from_auth :running_to_be_deleted_container_auth
- assert_raises ArvadosModel::PermissionDeniedError do
- c.update_attributes! output: collections(:foo_file).portable_data_hash
- end
+ refute c.update_attributes(output: collections(:foo_file).portable_data_hash)
end
test "can set trashed output on running container" do
commit 8f592b4a53b368f82ff8be375aa163728699b5a9
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Sep 20 01:04:52 2018 -0400
10181: Save log collection snapshots periodically during run.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/crunch-run/crunchrun.go b/services/crunch-run/crunchrun.go
index 730194b82..e289c824c 100644
--- a/services/crunch-run/crunchrun.go
+++ b/services/crunch-run/crunchrun.go
@@ -91,37 +91,39 @@ type PsProcess interface {
// ContainerRunner is the main stateful struct used for a single execution of a
// container.
type ContainerRunner struct {
- Docker ThinDockerClient
- client *arvados.Client
- ArvClient IArvadosClient
- Kc IKeepClient
- arvados.Container
+ Docker ThinDockerClient
+ client *arvados.Client
+ ArvClient IArvadosClient
+ Kc IKeepClient
+ Container arvados.Container
ContainerConfig dockercontainer.Config
- dockercontainer.HostConfig
- token string
- ContainerID string
- ExitCode *int
- NewLogWriter
- loggingDone chan bool
- CrunchLog *ThrottledLogger
- Stdout io.WriteCloser
- Stderr io.WriteCloser
- LogCollection arvados.CollectionFileSystem
- LogsPDH *string
- RunArvMount
- MkTempDir
- ArvMount *exec.Cmd
- ArvMountPoint string
- HostOutputDir string
- Binds []string
- Volumes map[string]struct{}
- OutputPDH *string
- SigChan chan os.Signal
- ArvMountExit chan error
- SecretMounts map[string]arvados.Mount
- MkArvClient func(token string) (IArvadosClient, error)
- finalState string
- parentTemp string
+ HostConfig dockercontainer.HostConfig
+ token string
+ ContainerID string
+ ExitCode *int
+ NewLogWriter NewLogWriter
+ loggingDone chan bool
+ CrunchLog *ThrottledLogger
+ Stdout io.WriteCloser
+ Stderr io.WriteCloser
+ logUUID string
+ logMtx sync.Mutex
+ LogCollection arvados.CollectionFileSystem
+ LogsPDH *string
+ RunArvMount RunArvMount
+ MkTempDir MkTempDir
+ ArvMount *exec.Cmd
+ ArvMountPoint string
+ HostOutputDir string
+ Binds []string
+ Volumes map[string]struct{}
+ OutputPDH *string
+ SigChan chan os.Signal
+ ArvMountExit chan error
+ SecretMounts map[string]arvados.Mount
+ MkArvClient func(token string) (IArvadosClient, error)
+ finalState string
+ parentTemp string
ListProcesses func() ([]PsProcess, error)
@@ -1175,6 +1177,35 @@ func (runner *ContainerRunner) WaitFinish() error {
}
}
+func (runner *ContainerRunner) checkpointLogs() {
+ logCheckpointTicker := time.NewTicker(crunchLogCheckpointMaxDuration / 360)
+ defer logCheckpointTicker.Stop()
+
+ logCheckpointTime := time.Now().Add(crunchLogCheckpointMaxDuration)
+ logCheckpointBytes := crunchLogCheckpointMaxBytes
+ var savedSize int64
+ for range logCheckpointTicker.C {
+ runner.logMtx.Lock()
+ done := runner.LogsPDH != nil
+ runner.logMtx.Unlock()
+ if done {
+ return
+ }
+ size := runner.LogCollection.Size()
+ if size == savedSize || (time.Now().Before(logCheckpointTime) && size < logCheckpointBytes) {
+ continue
+ }
+ logCheckpointTime = time.Now().Add(crunchLogCheckpointMaxDuration)
+ logCheckpointBytes = runner.LogCollection.Size() + crunchLogCheckpointMaxBytes
+ _, err := runner.saveLogCollection()
+ if err != nil {
+ runner.CrunchLog.Printf("error updating log collection: %s", err)
+ continue
+ }
+ savedSize = size
+ }
+}
+
// CaptureOutput saves data from the container's output directory if
// needed, and updates the container output accordingly.
func (runner *ContainerRunner) CaptureOutput() error {
@@ -1312,26 +1343,45 @@ func (runner *ContainerRunner) CommitLogs() error {
// -- it exists only to send logs to other channels.
return nil
}
+ saved, err := runner.saveLogCollection()
+ if err != nil {
+ return err
+ }
+ runner.logMtx.Lock()
+ defer runner.logMtx.Unlock()
+ runner.LogsPDH = &saved.PortableDataHash
+ return nil
+}
+func (runner *ContainerRunner) saveLogCollection() (response arvados.Collection, err error) {
+ runner.logMtx.Lock()
+ defer runner.logMtx.Unlock()
+ if runner.LogsPDH != nil {
+ // Already finalized.
+ return
+ }
mt, err := runner.LogCollection.MarshalManifest(".")
if err != nil {
- return fmt.Errorf("While creating log manifest: %v", err)
- }
-
- var response arvados.Collection
- err = runner.ArvClient.Create("collections",
- arvadosclient.Dict{
- "ensure_unique_name": true,
- "collection": arvadosclient.Dict{
- "is_trashed": true,
- "name": "logs for " + runner.Container.UUID,
- "manifest_text": mt}},
- &response)
+ err = fmt.Errorf("error creating log manifest: %v", err)
+ return
+ }
+ reqBody := arvadosclient.Dict{
+ "collection": arvadosclient.Dict{
+ "is_trashed": true,
+ "name": "logs for " + runner.Container.UUID,
+ "manifest_text": mt}}
+ if runner.logUUID == "" {
+ reqBody["ensure_unique_name"] = true
+ err = runner.ArvClient.Create("collections", reqBody, &response)
+ } else {
+ err = runner.ArvClient.Update("collections", runner.logUUID, reqBody, &response)
+ }
if err != nil {
- return fmt.Errorf("While creating log collection: %v", err)
+ err = fmt.Errorf("error saving log collection: %v", err)
+ return
}
- runner.LogsPDH = &response.PortableDataHash
- return nil
+ runner.logUUID = response.UUID
+ return
}
// UpdateContainerRunning updates the container state to "Running"
@@ -1630,6 +1680,7 @@ func NewContainerRunner(client *arvados.Client, api IArvadosClient, kc IKeepClie
cr.CrunchLog.Immediate = log.New(os.Stderr, containerUUID+" ", 0)
loadLogThrottleParams(api)
+ go cr.checkpointLogs()
return cr, nil
}
diff --git a/services/crunch-run/crunchrun_test.go b/services/crunch-run/crunchrun_test.go
index 8d8e04000..217d4236b 100644
--- a/services/crunch-run/crunchrun_test.go
+++ b/services/crunch-run/crunchrun_test.go
@@ -230,6 +230,7 @@ func (client *ArvTestClient) Create(resourceType string,
mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
outmap := output.(*arvados.Collection)
outmap.PortableDataHash = fmt.Sprintf("%x+%d", md5.Sum([]byte(mt)), len(mt))
+ outmap.UUID = fmt.Sprintf("zzzzz-4zz18-%15.15x", md5.Sum([]byte(mt)))
}
return nil
@@ -316,6 +317,10 @@ func (client *ArvTestClient) Update(resourceType string, uuid string, parameters
if parameters["container"].(arvadosclient.Dict)["state"] == "Running" {
client.WasSetRunning = true
}
+ } else if resourceType == "collections" {
+ mt := parameters["collection"].(arvadosclient.Dict)["manifest_text"].(string)
+ output.(*arvados.Collection).UUID = uuid
+ output.(*arvados.Collection).PortableDataHash = fmt.Sprintf("%x", md5.Sum([]byte(mt)))
}
return nil
}
@@ -1142,7 +1147,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
cr.statInterval = 5 * time.Second
err := cr.SetupMounts()
c.Check(err, IsNil)
@@ -1161,7 +1166,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts["/out"] = arvados.Mount{Kind: "tmp"}
cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
- cr.OutputPath = "/out"
+ cr.Container.OutputPath = "/out"
err := cr.SetupMounts()
c.Check(err, IsNil)
@@ -1179,7 +1184,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
cr.ArvMountPoint = ""
cr.Container.Mounts = make(map[string]arvados.Mount)
cr.Container.Mounts["/tmp"] = arvados.Mount{Kind: "tmp"}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
apiflag := true
cr.Container.RuntimeConstraints.API = &apiflag
@@ -1203,7 +1208,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
cr.Container.Mounts = map[string]arvados.Mount{
"/keeptmp": {Kind: "collection", Writable: true},
}
- cr.OutputPath = "/keeptmp"
+ cr.Container.OutputPath = "/keeptmp"
os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
@@ -1225,7 +1230,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
"/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
"/keepout": {Kind: "collection", Writable: true},
}
- cr.OutputPath = "/keepout"
+ cr.Container.OutputPath = "/keepout"
os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
@@ -1251,7 +1256,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
"/keepinp": {Kind: "collection", PortableDataHash: "59389a8f9ee9d399be35462a0f92541c+53"},
"/keepout": {Kind: "collection", Writable: true},
}
- cr.OutputPath = "/keepout"
+ cr.Container.OutputPath = "/keepout"
os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
@@ -1332,7 +1337,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
"/tmp": {Kind: "tmp"},
"/tmp/foo": {Kind: "collection"},
}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
os.MkdirAll(realTemp+"/keep1/tmp0", os.ModePerm)
@@ -1362,7 +1367,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
Path: "baz",
Writable: true},
}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541c+53", os.ModePerm)
os.MkdirAll(realTemp+"/keep1/by_id/59389a8f9ee9d399be35462a0f92541d+53/baz", os.ModePerm)
@@ -1391,7 +1396,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
"/tmp": {Kind: "tmp"},
"/tmp/foo": {Kind: "tmp"},
}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
err := cr.SetupMounts()
c.Check(err, NotNil)
@@ -1439,7 +1444,7 @@ func (s *TestSuite) TestSetupMounts(c *C) {
Path: "/",
},
}
- cr.OutputPath = "/tmp"
+ cr.Container.OutputPath = "/tmp"
err := cr.SetupMounts()
c.Check(err, IsNil)
diff --git a/services/crunch-run/logging.go b/services/crunch-run/logging.go
index ce0a66126..5d85bcf6c 100644
--- a/services/crunch-run/logging.go
+++ b/services/crunch-run/logging.go
@@ -197,6 +197,8 @@ var crunchLogThrottleLines int64 = 1024
var crunchLogPartialLineThrottlePeriod time.Duration = time.Second * 5
var crunchLogBytesPerEvent int64 = 4096
var crunchLogSecondsBetweenEvents time.Duration = time.Second * 1
+var crunchLogCheckpointMaxDuration = time.Hour / 2
+var crunchLogCheckpointMaxBytes = int64(1 << 25)
// ArvLogWriter is an io.WriteCloser that processes each write by
// writing it through to another io.WriteCloser (typically a
diff --git a/services/crunch-run/logging_test.go b/services/crunch-run/logging_test.go
index 13a171ae8..f118781c6 100644
--- a/services/crunch-run/logging_test.go
+++ b/services/crunch-run/logging_test.go
@@ -37,6 +37,8 @@ var _ = Suite(&LoggingTestSuite{})
func (s *LoggingTestSuite) SetUpTest(c *C) {
s.client = arvados.NewClientFromEnv()
+ crunchLogCheckpointMaxDuration = time.Hour * 24 * 365
+ crunchLogCheckpointMaxBytes = 1 << 50
}
func (s *LoggingTestSuite) TestWriteLogs(c *C) {
@@ -129,6 +131,48 @@ func (s *LoggingTestSuite) TestWriteMultipleLogs(c *C) {
c.Check(mt, Equals, ". 48f9023dc683a850b1c9b482b14c4b97+163 0:83:crunch-run.txt 83:80:stdout.txt\n")
}
+func (s *LoggingTestSuite) TestLogCheckpoint(c *C) {
+ for _, trial := range []struct {
+ maxBytes int64
+ maxDuration time.Duration
+ }{
+ {1000, 10 * time.Second},
+ {1000000, time.Millisecond},
+ } {
+ c.Logf("max %d bytes, %s", trial.maxBytes, trial.maxDuration)
+ crunchLogCheckpointMaxBytes = trial.maxBytes
+ crunchLogCheckpointMaxDuration = trial.maxDuration
+
+ api := &ArvTestClient{}
+ kc := &KeepTestClient{}
+ defer kc.Close()
+ cr, err := NewContainerRunner(s.client, api, kc, nil, "zzzzz-zzzzzzzzzzzzzzz")
+ c.Assert(err, IsNil)
+ ts := &TestTimestamper{}
+ cr.CrunchLog.Timestamper = ts.Timestamp
+ w, err := cr.NewLogWriter("stdout")
+ c.Assert(err, IsNil)
+ stdout := NewThrottledLogger(w)
+ stdout.Timestamper = ts.Timestamp
+
+ c.Check(cr.logUUID, Equals, "")
+ cr.CrunchLog.Printf("Hello %1000s", "space")
+ for i, t := 0, time.NewTicker(time.Millisecond); i < 5000 && cr.logUUID == ""; i++ {
+ <-t.C
+ }
+ c.Check(cr.logUUID, Not(Equals), "")
+ cr.CrunchLog.Print("Goodbye")
+ fmt.Fprint(stdout, "Goodbye\n")
+ cr.CrunchLog.Close()
+ stdout.Close()
+ w.Close()
+
+ mt, err := cr.LogCollection.MarshalManifest(".")
+ c.Check(err, IsNil)
+ c.Check(mt, Equals, ". 4dc76e0a212bfa30c39d76d8c16da0c0+1038 afc503bc1b9a828b4bb543cb629e936c+78 0:1077:crunch-run.txt 1077:39:stdout.txt\n")
+ }
+}
+
func (s *LoggingTestSuite) TestWriteLogsWithRateLimitThrottleBytes(c *C) {
s.testWriteLogsWithRateLimit(c, "crunchLogThrottleBytes", 50, 65536, "Exceeded rate 50 bytes per 60 seconds")
}
commit 8f1d40a2256f12bd16f07c10dc4d1f0c3e8ddc8f
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Sep 20 00:59:35 2018 -0400
10181: Add Size method to arvados.CollectionFileSystem.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvados/fs_collection.go b/sdk/go/arvados/fs_collection.go
index 7ce37aa24..bcf2f4810 100644
--- a/sdk/go/arvados/fs_collection.go
+++ b/sdk/go/arvados/fs_collection.go
@@ -31,6 +31,8 @@ type CollectionFileSystem interface {
// Prefix (normally ".") is a top level directory, effectively
// prepended to all paths in the returned manifest.
MarshalManifest(prefix string) (string, error)
+
+ Size() int64
}
type collectionFileSystem struct {
@@ -139,6 +141,10 @@ func (fs *collectionFileSystem) MarshalManifest(prefix string) (string, error) {
return fs.fileSystem.root.(*dirnode).marshalManifest(prefix)
}
+func (fs *collectionFileSystem) Size() int64 {
+ return fs.fileSystem.root.(*dirnode).Size()
+}
+
// filenodePtr is an offset into a file that is (usually) efficient to
// seek to. Specifically, if filenode.repacked==filenodePtr.repacked
// then
@@ -877,6 +883,18 @@ func (dn *dirnode) createFileAndParents(path string) (fn *filenode, err error) {
return
}
+func (dn *dirnode) Size() (bytes int64) {
+ dn.RLock()
+ defer dn.RUnlock()
+ for _, i := range dn.inodes {
+ switch i := i.(type) {
+ case *filenode, *dirnode:
+ bytes += i.Size()
+ }
+ }
+ return
+}
+
type segment interface {
io.ReaderAt
Len() int
diff --git a/sdk/go/arvados/fs_collection_test.go b/sdk/go/arvados/fs_collection_test.go
index d2f55d0e3..ac3bbbbad 100644
--- a/sdk/go/arvados/fs_collection_test.go
+++ b/sdk/go/arvados/fs_collection_test.go
@@ -353,6 +353,7 @@ func (s *CollectionFSSuite) TestReadWriteFile(c *check.C) {
c.Check(err, check.IsNil)
m = regexp.MustCompile(`\+A[^\+ ]+`).ReplaceAllLiteralString(m, "")
c.Check(m, check.Equals, "./dir1 3858f62230ac3c915f300c664312c63f+6 25d55ad283aa400af464c76d713c07ad+8 3:3:bar 6:3:foo\n")
+ c.Check(s.fs.Size(), check.Equals, int64(6))
}
func (s *CollectionFSSuite) TestSeekSparse(c *check.C) {
@@ -1060,6 +1061,7 @@ func (s *CollectionFSUnitSuite) TestLargeManifest(c *check.C) {
f, err := coll.FileSystem(nil, nil)
c.Check(err, check.IsNil)
c.Logf("%s loaded", time.Now())
+ c.Check(f.Size(), check.Equals, int64(42*dirCount*fileCount))
for i := 0; i < dirCount; i++ {
for j := 0; j < fileCount; j++ {
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list