[ARVADOS] updated: efb77a42a055f7081a1ff1e2fa712089be71dcc0

git at public.curoverse.com git at public.curoverse.com
Tue Feb 17 18:07:29 EST 2015


Summary of changes:
 sdk/go/logger/logger.go             | 32 ++++++++++++++++++--------------
 services/datamanager/datamanager.go | 24 +++++++++++++++++++++---
 2 files changed, 39 insertions(+), 17 deletions(-)

       via  efb77a42a055f7081a1ff1e2fa712089be71dcc0 (commit)
       via  f5a886733b2f628f462dcc03f45d20621c8ee015 (commit)
       via  83ea6e36b19db2c9a45be87c900efbbd9ea8bdb9 (commit)
      from  319503f1a8eda9fb9cea0bff038ad437e88ebeac (commit)

Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.


commit efb77a42a055f7081a1ff1e2fa712089be71dcc0
Merge: 319503f f5a8867
Author: mishaz <misha at curoverse.com>
Date:   Tue Feb 17 23:07:14 2015 +0000

    Merge branch '3408-production-datamanager' refs #3408


commit f5a886733b2f628f462dcc03f45d20621c8ee015
Merge: 83ea6e3 1713f54
Author: mishaz <misha at curoverse.com>
Date:   Tue Feb 17 23:06:32 2015 +0000

    Merge branch 'master' into 3408-production-datamanager refs #3408


commit 83ea6e36b19db2c9a45be87c900efbbd9ea8bdb9
Author: mishaz <misha at curoverse.com>
Date:   Tue Feb 17 23:02:36 2015 +0000

    Changes to allow datamanager to run indefinitely:
    
    Logger's worker goroutine returns after final write.
    minutes-between-runs flag specifies how many minutes to wait between runs (0 means don't loop)

diff --git a/sdk/go/logger/logger.go b/sdk/go/logger/logger.go
index 361e34f..ce18e90 100644
--- a/sdk/go/logger/logger.go
+++ b/sdk/go/logger/logger.go
@@ -26,9 +26,10 @@ import (
 )
 
 const (
-	startSuffix   = "-start"
-	partialSuffix = "-partial"
-	finalSuffix   = "-final"
+	startSuffix              = "-start"
+	partialSuffix            = "-partial"
+	finalSuffix              = "-final"
+	numberNoMoreWorkMessages = 2 // To return from FinalUpdate() & Work().
 )
 
 type LoggerParams struct {
@@ -63,6 +64,7 @@ type Logger struct {
 	workToDo    chan LogMutator // Work to do in the worker thread.
 	writeTicker *time.Ticker    // On each tick we write the log data to arvados, if it has been modified.
 	hasWritten  bool            // Whether we've written at all yet.
+	noMoreWork  chan bool       // Signals that we're done writing.
 
 	writeHooks []LogMutator // Mutators we call before each write.
 }
@@ -78,12 +80,13 @@ func NewLogger(params LoggerParams) *Logger {
 	}
 
 	l := &Logger{
-		data: make(map[string]interface{}),
-		entry: make(map[string]interface{}),
-		properties: make(map[string]interface{}),
-		params: params,
-		workToDo: make(chan LogMutator, 10),
-		writeTicker: time.NewTicker(params.WriteInterval)}
+		data:        make(map[string]interface{}),
+		entry:       make(map[string]interface{}),
+		properties:  make(map[string]interface{}),
+		params:      params,
+		workToDo:    make(chan LogMutator, 10),
+		writeTicker: time.NewTicker(params.WriteInterval),
+		noMoreWork:  make(chan bool, numberNoMoreWorkMessages)}
 
 	l.data["log"] = l.entry
 	l.entry["properties"] = l.properties
@@ -111,9 +114,6 @@ func (l *Logger) Update(mutator LogMutator) {
 // go will not wait for timers (including the pending write timer) to
 // go off before exiting.
 func (l *Logger) FinalUpdate(mutator LogMutator) {
-	// Block on this channel until everything finishes
-	done := make(chan bool)
-
 	// TODO(misha): Consider not accepting any future updates somehow,
 	// since they won't get written if they come in after this.
 
@@ -129,11 +129,13 @@ func (l *Logger) FinalUpdate(mutator LogMutator) {
 	// Perform the final write and signal that we can return.
 	l.workToDo <- func(p map[string]interface{}, e map[string]interface{}) {
 		l.write(true)
-		done <- true
+		for i := 0; i < numberNoMoreWorkMessages; {
+			l.noMoreWork <- true
+		}
 	}
 
 	// Wait until we've performed the write.
-	<-done
+	<-l.noMoreWork
 }
 
 // Adds a hook which will be called every time this logger writes an entry.
@@ -157,6 +159,8 @@ func (l *Logger) work() {
 		case mutator := <-l.workToDo:
 			mutator(l.properties, l.entry)
 			l.modified = true
+		case <-l.noMoreWork:
+			return
 		}
 	}
 }
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 5b45153..a8e506e 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -15,8 +15,9 @@ import (
 )
 
 var (
-	logEventTypePrefix        string
+	logEventTypePrefix  string
 	logFrequencySeconds int
+	minutesBetweenRuns  int
 )
 
 func init() {
@@ -28,11 +29,28 @@ func init() {
 		"log-frequency-seconds",
 		20,
 		"How frequently we'll write log entries in seconds.")
+	flag.IntVar(&minutesBetweenRuns,
+		"minutes-between-runs",
+		0,
+		"How many minutes we wait betwen data manager runs. 0 means run once and exit.")
 }
 
 func main() {
 	flag.Parse()
+	if minutesBetweenRuns == 0 {
+		singlerun()
+	} else {
+		waitTime := time.Minute * time.Duration(minutesBetweenRuns)
+		for {
+			log.Println("Beginning Run")
+			singlerun()
+			log.Printf("Sleeping for %d minutes", minutesBetweenRuns)
+			time.Sleep(waitTime)
+		}
+	}
+}
 
+func singlerun() {
 	arv, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
 		log.Fatalf("Error setting up arvados client %s", err.Error())
@@ -47,8 +65,8 @@ func main() {
 	var arvLogger *logger.Logger
 	if logEventTypePrefix != "" {
 		arvLogger = logger.NewLogger(logger.LoggerParams{Client: arv,
-			EventTypePrefix:     logEventTypePrefix,
-			WriteInterval: time.Second * time.Duration(logFrequencySeconds)})
+			EventTypePrefix: logEventTypePrefix,
+			WriteInterval:   time.Second * time.Duration(logFrequencySeconds)})
 	}
 
 	loggerutil.LogRunInfo(arvLogger)

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list