[ARVADOS] created: 1.2.0-12-g63bae17d7

Git user git at public.curoverse.com
Thu Aug 16 15:43:38 EDT 2018


        at  63bae17d784c2c1522a087d71a0fcb2a9b6eddcd (commit)


commit 63bae17d784c2c1522a087d71a0fcb2a9b6eddcd
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Aug 16 15:38:42 2018 -0400

    13959: Use logrus for crunch-dispatch-slurm logging.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index e0dc2eefd..74cefed05 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -9,12 +9,12 @@ package dispatch
 import (
 	"context"
 	"fmt"
-	"log"
 	"sync"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+	"github.com/Sirupsen/logrus"
 )
 
 const (
@@ -25,10 +25,18 @@ const (
 	Cancelled = arvados.ContainerStateCancelled
 )
 
+type Logger interface {
+	Printf(string, ...interface{})
+	Warnf(string, ...interface{})
+	Debugf(string, ...interface{})
+}
+
 // Dispatcher struct
 type Dispatcher struct {
 	Arv *arvadosclient.ArvadosClient
 
+	Logger Logger
+
 	// Queue polling frequency
 	PollPeriod time.Duration
 
@@ -62,6 +70,10 @@ type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
 // dispatcher's token. When a new one appears, Run calls RunContainer
 // in a new goroutine.
 func (d *Dispatcher) Run(ctx context.Context) error {
+	if d.Logger == nil {
+		d.Logger = logrus.StandardLogger()
+	}
+
 	err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
 	if err != nil {
 		return fmt.Errorf("error getting my token UUID: %v", err)
@@ -135,7 +147,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {
 		// Containers that I know about that didn't show up in any
 		// query should be let go.
 		for uuid, tracker := range todo {
-			log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+			d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
 			tracker.close()
 		}
 
@@ -145,7 +157,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
 // Start a runner in a new goroutine, and send the initial container
 // record to its updates channel.
 func (d *Dispatcher) start(c arvados.Container) *runTracker {
-	tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+	tracker := &runTracker{
+		updates: make(chan arvados.Container, 1),
+		logger:  d.Logger,
+	}
 	tracker.updates <- c
 	go func() {
 		d.RunContainer(d, c, tracker.updates)
@@ -174,7 +189,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r
 
 		err := d.Arv.List("containers", params, &list)
 		if err != nil {
-			log.Printf("Error getting list of containers: %q", err)
+			d.Logger.Warnf("error getting list of containers: %q", err)
 			return false
 		}
 		d.checkListForUpdates(list.Items, todo)
@@ -197,7 +212,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
 		delete(todo, c.UUID)
 
 		if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
-			log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+			d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
 		} else if alreadyTracking {
 			switch c.State {
 			case Queued:
@@ -215,7 +230,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
 				}
 				err := d.lock(c.UUID)
 				if err != nil {
-					log.Printf("debug: error locking container %s: %s", c.UUID, err)
+					d.Logger.Debugf("error locking container %s: %s", c.UUID, err)
 					break
 				}
 				c.State = Locked
@@ -239,7 +254,7 @@ func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) erro
 			"container": arvadosclient.Dict{"state": state},
 		}, nil)
 	if err != nil {
-		log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
+		d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
 	}
 	return err
 }
@@ -294,6 +309,7 @@ func (d *Dispatcher) TrackContainer(uuid string) error {
 type runTracker struct {
 	closing bool
 	updates chan arvados.Container
+	logger  Logger
 }
 
 func (tracker *runTracker) close() {
@@ -309,7 +325,7 @@ func (tracker *runTracker) update(c arvados.Container) {
 	}
 	select {
 	case <-tracker.updates:
-		log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+		tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
 	default:
 	}
 	tracker.updates <- c
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index b4103cc62..b12be91c9 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -23,9 +23,15 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/config"
 	"git.curoverse.com/arvados.git/sdk/go/dispatch"
+	"github.com/Sirupsen/logrus"
 	"github.com/coreos/go-systemd/daemon"
 )
 
+type logger interface {
+	dispatch.Logger
+	Fatalf(string, ...interface{})
+}
+
 const initialNiceValue int64 = 10000
 
 var (
@@ -35,6 +41,7 @@ var (
 
 type Dispatcher struct {
 	*dispatch.Dispatcher
+	logger  logrus.FieldLogger
 	cluster *arvados.Cluster
 	sqCheck *SqueueChecker
 	slurm   Slurm
@@ -60,10 +67,17 @@ type Dispatcher struct {
 }
 
 func main() {
-	disp := &Dispatcher{}
+	logger := logrus.StandardLogger()
+	if os.Getenv("DEBUG") != "" {
+		logger.SetLevel(logrus.DebugLevel)
+	}
+	logger.Formatter = &logrus.JSONFormatter{
+		TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
+	}
+	disp := &Dispatcher{logger: logger}
 	err := disp.Run(os.Args[0], os.Args[1:])
 	if err != nil {
-		log.Fatal(err)
+		logrus.Fatalf("%s", err)
 	}
 }
 
@@ -101,7 +115,7 @@ func (disp *Dispatcher) configure(prog string, args []string) error {
 		return nil
 	}
 
-	log.Printf("crunch-dispatch-slurm %s started", version)
+	disp.logger.Printf("crunch-dispatch-slurm %s started", version)
 
 	err := disp.readConfig(*configPath)
 	if err != nil {
@@ -129,7 +143,7 @@ func (disp *Dispatcher) configure(prog string, args []string) error {
 		os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
 		os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
 	} else {
-		log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
+		disp.logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
 	}
 
 	if *dumpConfig {
@@ -138,7 +152,7 @@ func (disp *Dispatcher) configure(prog string, args []string) error {
 
 	siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile)
 	if os.IsNotExist(err) {
-		log.Printf("warning: no cluster config (%s), proceeding with no node types defined", err)
+		disp.logger.Warnf("no cluster config (%s), proceeding with no node types defined", err)
 	} else if err != nil {
 		return fmt.Errorf("error loading config: %s", err)
 	} else if disp.cluster, err = siteConfig.GetCluster(""); err != nil {
@@ -150,20 +164,25 @@ func (disp *Dispatcher) configure(prog string, args []string) error {
 
 // setup() initializes private fields after configure().
 func (disp *Dispatcher) setup() {
+	if disp.logger == nil {
+		disp.logger = logrus.StandardLogger()
+	}
 	arv, err := arvadosclient.MakeArvadosClient()
 	if err != nil {
-		log.Fatalf("Error making Arvados client: %v", err)
+		disp.logger.Fatalf("Error making Arvados client: %v", err)
 	}
 	arv.Retries = 25
 
 	disp.slurm = &slurmCLI{}
 	disp.sqCheck = &SqueueChecker{
+		Logger:         disp.logger,
 		Period:         time.Duration(disp.PollPeriod),
 		PrioritySpread: disp.PrioritySpread,
 		Slurm:          disp.slurm,
 	}
 	disp.Dispatcher = &dispatch.Dispatcher{
 		Arv:            arv,
+		Logger:         disp.logger,
 		RunContainer:   disp.runContainer,
 		PollPeriod:     time.Duration(disp.PollPeriod),
 		MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
@@ -321,7 +340,7 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
 		case <-ctx.Done():
 			// Disappeared from squeue
 			if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
-				log.Printf("Error getting final container state for %s: %s", ctr.UUID, err)
+				log.Printf("error getting final container state for %s: %s", ctr.UUID, err)
 			}
 			switch ctr.State {
 			case dispatch.Running:
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 4ef4ba1d5..9b858f331 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -11,7 +11,6 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
-	"log"
 	"net/http"
 	"net/http/httptest"
 	"os"
@@ -25,6 +24,7 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
 	"git.curoverse.com/arvados.git/sdk/go/dispatch"
+	"github.com/Sirupsen/logrus"
 	. "gopkg.in/check.v1"
 )
 
@@ -138,7 +138,11 @@ func (s *IntegrationSuite) integrationTest(c *C,
 	}
 
 	s.disp.slurm = &s.slurm
-	s.disp.sqCheck = &SqueueChecker{Period: 500 * time.Millisecond, Slurm: s.disp.slurm}
+	s.disp.sqCheck = &SqueueChecker{
+		Logger: logrus.StandardLogger(),
+		Period: 500 * time.Millisecond,
+		Slurm:  s.disp.slurm,
+	}
 
 	err = s.disp.Dispatcher.Run(ctx)
 	<-doneRun
@@ -246,7 +250,7 @@ func (s *StubbedSuite) TestAPIErrorGettingContainers(c *C) {
 	apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
 	apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
 
-	s.testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
+	s.testWithServerStub(c, apiStubResponses, "echo", "error getting list of containers")
 }
 
 func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
@@ -264,8 +268,8 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
 	}
 
 	buf := bytes.NewBuffer(nil)
-	log.SetOutput(io.MultiWriter(buf, os.Stderr))
-	defer log.SetOutput(os.Stderr)
+	logrus.SetOutput(io.MultiWriter(buf, os.Stderr))
+	defer logrus.SetOutput(os.Stderr)
 
 	s.disp.CrunchRunCommand = []string{crunchCmd}
 
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 0ce4fb673..5aee7e087 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -7,7 +7,6 @@ package main
 import (
 	"bytes"
 	"fmt"
-	"log"
 	"sort"
 	"strings"
 	"sync"
@@ -27,6 +26,7 @@ type slurmJob struct {
 // Squeue implements asynchronous polling monitor of the SLURM queue using the
 // command 'squeue'.
 type SqueueChecker struct {
+	Logger         logger
 	Period         time.Duration
 	PrioritySpread int64
 	Slurm          Slurm
@@ -121,7 +121,7 @@ func (sqc *SqueueChecker) reniceAll() {
 		}
 		err := sqc.Slurm.Renice(job.uuid, niceNew)
 		if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") {
-			log.Printf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
+			sqc.Logger.Warnf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
 			job.hitNiceLimit = true
 		}
 	}
@@ -143,7 +143,7 @@ func (sqc *SqueueChecker) check() {
 	stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
 	cmd.Stdout, cmd.Stderr = stdout, stderr
 	if err := cmd.Run(); err != nil {
-		log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
+		sqc.Logger.Warnf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
 		return
 	}
 
@@ -156,7 +156,7 @@ func (sqc *SqueueChecker) check() {
 		var uuid, state, reason string
 		var n, p int64
 		if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
-			log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
+			sqc.Logger.Warnf("ignoring unparsed line in squeue output: %q", line)
 			continue
 		}
 
@@ -192,10 +192,10 @@ func (sqc *SqueueChecker) check() {
 			// "launch failed requeued held" seems to be
 			// another manifestation of this problem,
 			// resolved the same way.
-			log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
+			sqc.Logger.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
 			sqc.Slurm.Release(uuid)
 		} else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 {
-			log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
+			sqc.Logger.Warnf("job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
 		}
 	}
 	sqc.lock.Lock()
diff --git a/services/crunch-dispatch-slurm/squeue_test.go b/services/crunch-dispatch-slurm/squeue_test.go
index ef036dabd..de674a139 100644
--- a/services/crunch-dispatch-slurm/squeue_test.go
+++ b/services/crunch-dispatch-slurm/squeue_test.go
@@ -7,6 +7,7 @@ package main
 import (
 	"time"
 
+	"github.com/Sirupsen/logrus"
 	. "gopkg.in/check.v1"
 )
 
@@ -24,6 +25,7 @@ func (s *SqueueSuite) TestReleasePending(c *C) {
 		queue: uuids[0] + " 10000 4294000000 PENDING Resources\n" + uuids[1] + " 10000 4294000111 PENDING Resources\n" + uuids[2] + " 10000 0 PENDING BadConstraints\n",
 	}
 	sqc := &SqueueChecker{
+		Logger: logrus.StandardLogger(),
 		Slurm:  slurm,
 		Period: time.Hour,
 	}
@@ -88,6 +90,7 @@ func (s *SqueueSuite) TestReniceAll(c *C) {
 			queue: test.squeue,
 		}
 		sqc := &SqueueChecker{
+			Logger:         logrus.StandardLogger(),
 			Slurm:          slurm,
 			PrioritySpread: test.spread,
 			Period:         time.Hour,
@@ -112,6 +115,7 @@ func (s *SqueueSuite) TestReniceInvalidNiceValue(c *C) {
 		rejectNice10K: true,
 	}
 	sqc := &SqueueChecker{
+		Logger:         logrus.StandardLogger(),
 		Slurm:          slurm,
 		PrioritySpread: 1,
 		Period:         time.Hour,
@@ -155,6 +159,7 @@ func (s *SqueueSuite) TestSetPriorityBeforeQueued(c *C) {
 
 	slurm := &slurmFake{}
 	sqc := &SqueueChecker{
+		Logger: logrus.StandardLogger(),
 		Slurm:  slurm,
 		Period: time.Hour,
 	}

commit 21e13deb6b38f6bea48923306755f648acd2d794
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Aug 16 14:53:21 2018 -0400

    13959: Default 5-minute timeout for API calls.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/sdk/go/arvadosclient/arvadosclient.go b/sdk/go/arvadosclient/arvadosclient.go
index 91da5a3fd..e3a9f4ae8 100644
--- a/sdk/go/arvadosclient/arvadosclient.go
+++ b/sdk/go/arvadosclient/arvadosclient.go
@@ -173,8 +173,11 @@ func New(c *arvados.Client) (*ArvadosClient, error) {
 		ApiServer:   c.APIHost,
 		ApiToken:    c.AuthToken,
 		ApiInsecure: c.Insecure,
-		Client: &http.Client{Transport: &http.Transport{
-			TLSClientConfig: MakeTLSConfig(c.Insecure)}},
+		Client: &http.Client{
+			Timeout: 5 * time.Minute,
+			Transport: &http.Transport{
+				TLSClientConfig: MakeTLSConfig(c.Insecure)},
+		},
 		External:          false,
 		Retries:           2,
 		KeepServiceURIs:   c.KeepServiceURIs,

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list