[ARVADOS] created: 1.2.0-12-g63bae17d7
Git user
git at public.curoverse.com
Thu Aug 16 15:43:38 EDT 2018
at 63bae17d784c2c1522a087d71a0fcb2a9b6eddcd (commit)
commit 63bae17d784c2c1522a087d71a0fcb2a9b6eddcd
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Aug 16 15:38:42 2018 -0400
13959: Use logrus for crunch-dispatch-slurm logging.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/dispatch/dispatch.go b/sdk/go/dispatch/dispatch.go
index e0dc2eefd..74cefed05 100644
--- a/sdk/go/dispatch/dispatch.go
+++ b/sdk/go/dispatch/dispatch.go
@@ -9,12 +9,12 @@ package dispatch
import (
"context"
"fmt"
- "log"
"sync"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
+ "github.com/Sirupsen/logrus"
)
const (
@@ -25,10 +25,18 @@ const (
Cancelled = arvados.ContainerStateCancelled
)
+type Logger interface {
+ Printf(string, ...interface{})
+ Warnf(string, ...interface{})
+ Debugf(string, ...interface{})
+}
+
// Dispatcher struct
type Dispatcher struct {
Arv *arvadosclient.ArvadosClient
+ Logger Logger
+
// Queue polling frequency
PollPeriod time.Duration
@@ -62,6 +70,10 @@ type DispatchFunc func(*Dispatcher, arvados.Container, <-chan arvados.Container)
// dispatcher's token. When a new one appears, Run calls RunContainer
// in a new goroutine.
func (d *Dispatcher) Run(ctx context.Context) error {
+ if d.Logger == nil {
+ d.Logger = logrus.StandardLogger()
+ }
+
err := d.Arv.Call("GET", "api_client_authorizations", "", "current", nil, &d.auth)
if err != nil {
return fmt.Errorf("error getting my token UUID: %v", err)
@@ -135,7 +147,7 @@ func (d *Dispatcher) Run(ctx context.Context) error {
// Containers that I know about that didn't show up in any
// query should be let go.
for uuid, tracker := range todo {
- log.Printf("Container %q not returned by any query, stopping tracking.", uuid)
+ d.Logger.Printf("Container %q not returned by any query, stopping tracking.", uuid)
tracker.close()
}
@@ -145,7 +157,10 @@ func (d *Dispatcher) Run(ctx context.Context) error {
// Start a runner in a new goroutine, and send the initial container
// record to its updates channel.
func (d *Dispatcher) start(c arvados.Container) *runTracker {
- tracker := &runTracker{updates: make(chan arvados.Container, 1)}
+ tracker := &runTracker{
+ updates: make(chan arvados.Container, 1),
+ logger: d.Logger,
+ }
tracker.updates <- c
go func() {
d.RunContainer(d, c, tracker.updates)
@@ -174,7 +189,7 @@ func (d *Dispatcher) checkForUpdates(filters [][]interface{}, todo map[string]*r
err := d.Arv.List("containers", params, &list)
if err != nil {
- log.Printf("Error getting list of containers: %q", err)
+ d.Logger.Warnf("error getting list of containers: %q", err)
return false
}
d.checkListForUpdates(list.Items, todo)
@@ -197,7 +212,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
delete(todo, c.UUID)
if c.LockedByUUID != "" && c.LockedByUUID != d.auth.UUID {
- log.Printf("debug: ignoring %s locked by %s", c.UUID, c.LockedByUUID)
+ d.Logger.Debugf("ignoring %s locked by %s", c.UUID, c.LockedByUUID)
} else if alreadyTracking {
switch c.State {
case Queued:
@@ -215,7 +230,7 @@ func (d *Dispatcher) checkListForUpdates(containers []arvados.Container, todo ma
}
err := d.lock(c.UUID)
if err != nil {
- log.Printf("debug: error locking container %s: %s", c.UUID, err)
+ d.Logger.Debugf("error locking container %s: %s", c.UUID, err)
break
}
c.State = Locked
@@ -239,7 +254,7 @@ func (d *Dispatcher) UpdateState(uuid string, state arvados.ContainerState) erro
"container": arvadosclient.Dict{"state": state},
}, nil)
if err != nil {
- log.Printf("Error updating container %s to state %q: %s", uuid, state, err)
+ d.Logger.Warnf("error updating container %s to state %q: %s", uuid, state, err)
}
return err
}
@@ -294,6 +309,7 @@ func (d *Dispatcher) TrackContainer(uuid string) error {
type runTracker struct {
closing bool
updates chan arvados.Container
+ logger Logger
}
func (tracker *runTracker) close() {
@@ -309,7 +325,7 @@ func (tracker *runTracker) update(c arvados.Container) {
}
select {
case <-tracker.updates:
- log.Printf("debug: runner is handling updates slowly, discarded previous update for %s", c.UUID)
+ tracker.logger.Debugf("runner is handling updates slowly, discarded previous update for %s", c.UUID)
default:
}
tracker.updates <- c
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
index b4103cc62..b12be91c9 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm.go
@@ -23,9 +23,15 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/config"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "github.com/Sirupsen/logrus"
"github.com/coreos/go-systemd/daemon"
)
+type logger interface {
+ dispatch.Logger
+ Fatalf(string, ...interface{})
+}
+
const initialNiceValue int64 = 10000
var (
@@ -35,6 +41,7 @@ var (
type Dispatcher struct {
*dispatch.Dispatcher
+ logger logrus.FieldLogger
cluster *arvados.Cluster
sqCheck *SqueueChecker
slurm Slurm
@@ -60,10 +67,17 @@ type Dispatcher struct {
}
func main() {
- disp := &Dispatcher{}
+ logger := logrus.StandardLogger()
+ if os.Getenv("DEBUG") != "" {
+ logger.SetLevel(logrus.DebugLevel)
+ }
+ logger.Formatter = &logrus.JSONFormatter{
+ TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
+ }
+ disp := &Dispatcher{logger: logger}
err := disp.Run(os.Args[0], os.Args[1:])
if err != nil {
- log.Fatal(err)
+ logrus.Fatalf("%s", err)
}
}
@@ -101,7 +115,7 @@ func (disp *Dispatcher) configure(prog string, args []string) error {
return nil
}
- log.Printf("crunch-dispatch-slurm %s started", version)
+ disp.logger.Printf("crunch-dispatch-slurm %s started", version)
err := disp.readConfig(*configPath)
if err != nil {
@@ -129,7 +143,7 @@ func (disp *Dispatcher) configure(prog string, args []string) error {
os.Setenv("ARVADOS_KEEP_SERVICES", strings.Join(disp.Client.KeepServiceURIs, " "))
os.Setenv("ARVADOS_EXTERNAL_CLIENT", "")
} else {
- log.Printf("warning: Client credentials missing from config, so falling back on environment variables (deprecated).")
+ disp.logger.Warnf("Client credentials missing from config, so falling back on environment variables (deprecated).")
}
if *dumpConfig {
@@ -138,7 +152,7 @@ func (disp *Dispatcher) configure(prog string, args []string) error {
siteConfig, err := arvados.GetConfig(arvados.DefaultConfigFile)
if os.IsNotExist(err) {
- log.Printf("warning: no cluster config (%s), proceeding with no node types defined", err)
+ disp.logger.Warnf("no cluster config (%s), proceeding with no node types defined", err)
} else if err != nil {
return fmt.Errorf("error loading config: %s", err)
} else if disp.cluster, err = siteConfig.GetCluster(""); err != nil {
@@ -150,20 +164,25 @@ func (disp *Dispatcher) configure(prog string, args []string) error {
// setup() initializes private fields after configure().
func (disp *Dispatcher) setup() {
+ if disp.logger == nil {
+ disp.logger = logrus.StandardLogger()
+ }
arv, err := arvadosclient.MakeArvadosClient()
if err != nil {
- log.Fatalf("Error making Arvados client: %v", err)
+ disp.logger.Fatalf("Error making Arvados client: %v", err)
}
arv.Retries = 25
disp.slurm = &slurmCLI{}
disp.sqCheck = &SqueueChecker{
+ Logger: disp.logger,
Period: time.Duration(disp.PollPeriod),
PrioritySpread: disp.PrioritySpread,
Slurm: disp.slurm,
}
disp.Dispatcher = &dispatch.Dispatcher{
Arv: arv,
+ Logger: disp.logger,
RunContainer: disp.runContainer,
PollPeriod: time.Duration(disp.PollPeriod),
MinRetryPeriod: time.Duration(disp.MinRetryPeriod),
@@ -321,7 +340,7 @@ func (disp *Dispatcher) runContainer(_ *dispatch.Dispatcher, ctr arvados.Contain
case <-ctx.Done():
// Disappeared from squeue
if err := disp.Arv.Get("containers", ctr.UUID, nil, &ctr); err != nil {
- log.Printf("Error getting final container state for %s: %s", ctr.UUID, err)
+ log.Printf("error getting final container state for %s: %s", ctr.UUID, err)
}
switch ctr.State {
case dispatch.Running:
diff --git a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
index 4ef4ba1d5..9b858f331 100644
--- a/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
+++ b/services/crunch-dispatch-slurm/crunch-dispatch-slurm_test.go
@@ -11,7 +11,6 @@ import (
"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"net/http/httptest"
"os"
@@ -25,6 +24,7 @@ import (
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/arvadostest"
"git.curoverse.com/arvados.git/sdk/go/dispatch"
+ "github.com/Sirupsen/logrus"
. "gopkg.in/check.v1"
)
@@ -138,7 +138,11 @@ func (s *IntegrationSuite) integrationTest(c *C,
}
s.disp.slurm = &s.slurm
- s.disp.sqCheck = &SqueueChecker{Period: 500 * time.Millisecond, Slurm: s.disp.slurm}
+ s.disp.sqCheck = &SqueueChecker{
+ Logger: logrus.StandardLogger(),
+ Period: 500 * time.Millisecond,
+ Slurm: s.disp.slurm,
+ }
err = s.disp.Dispatcher.Run(ctx)
<-doneRun
@@ -246,7 +250,7 @@ func (s *StubbedSuite) TestAPIErrorGettingContainers(c *C) {
apiStubResponses["/arvados/v1/api_client_authorizations/current"] = arvadostest.StubResponse{200, `{"uuid":"` + arvadostest.Dispatch1AuthUUID + `"}`}
apiStubResponses["/arvados/v1/containers"] = arvadostest.StubResponse{500, string(`{}`)}
- s.testWithServerStub(c, apiStubResponses, "echo", "Error getting list of containers")
+ s.testWithServerStub(c, apiStubResponses, "echo", "error getting list of containers")
}
func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arvadostest.StubResponse, crunchCmd string, expected string) {
@@ -264,8 +268,8 @@ func (s *StubbedSuite) testWithServerStub(c *C, apiStubResponses map[string]arva
}
buf := bytes.NewBuffer(nil)
- log.SetOutput(io.MultiWriter(buf, os.Stderr))
- defer log.SetOutput(os.Stderr)
+ logrus.SetOutput(io.MultiWriter(buf, os.Stderr))
+ defer logrus.SetOutput(os.Stderr)
s.disp.CrunchRunCommand = []string{crunchCmd}
diff --git a/services/crunch-dispatch-slurm/squeue.go b/services/crunch-dispatch-slurm/squeue.go
index 0ce4fb673..5aee7e087 100644
--- a/services/crunch-dispatch-slurm/squeue.go
+++ b/services/crunch-dispatch-slurm/squeue.go
@@ -7,7 +7,6 @@ package main
import (
"bytes"
"fmt"
- "log"
"sort"
"strings"
"sync"
@@ -27,6 +26,7 @@ type slurmJob struct {
// Squeue implements asynchronous polling monitor of the SLURM queue using the
// command 'squeue'.
type SqueueChecker struct {
+ Logger logger
Period time.Duration
PrioritySpread int64
Slurm Slurm
@@ -121,7 +121,7 @@ func (sqc *SqueueChecker) reniceAll() {
}
err := sqc.Slurm.Renice(job.uuid, niceNew)
if err != nil && niceNew > slurm15NiceLimit && strings.Contains(err.Error(), "Invalid nice value") {
- log.Printf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
+ sqc.Logger.Warnf("container %q clamping nice values at %d, priority order will not be correct -- see https://dev.arvados.org/projects/arvados/wiki/SLURM_integration#Limited-nice-values-SLURM-15", job.uuid, slurm15NiceLimit)
job.hitNiceLimit = true
}
}
@@ -143,7 +143,7 @@ func (sqc *SqueueChecker) check() {
stdout, stderr := &bytes.Buffer{}, &bytes.Buffer{}
cmd.Stdout, cmd.Stderr = stdout, stderr
if err := cmd.Run(); err != nil {
- log.Printf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
+ sqc.Logger.Warnf("Error running %q %q: %s %q", cmd.Path, cmd.Args, err, stderr.String())
return
}
@@ -156,7 +156,7 @@ func (sqc *SqueueChecker) check() {
var uuid, state, reason string
var n, p int64
if _, err := fmt.Sscan(line, &uuid, &n, &p, &state, &reason); err != nil {
- log.Printf("warning: ignoring unparsed line in squeue output: %q", line)
+ sqc.Logger.Warnf("ignoring unparsed line in squeue output: %q", line)
continue
}
@@ -192,10 +192,10 @@ func (sqc *SqueueChecker) check() {
// "launch failed requeued held" seems to be
// another manifestation of this problem,
// resolved the same way.
- log.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
+ sqc.Logger.Printf("releasing held job %q (priority=%d, state=%q, reason=%q)", uuid, p, state, reason)
sqc.Slurm.Release(uuid)
} else if state != "RUNNING" && p <= 2*slurm15NiceLimit && replacing.wantPriority > 0 {
- log.Printf("warning: job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
+ sqc.Logger.Warnf("job %q has low priority %d, nice %d, state %q, reason %q", uuid, p, n, state, reason)
}
}
sqc.lock.Lock()
diff --git a/services/crunch-dispatch-slurm/squeue_test.go b/services/crunch-dispatch-slurm/squeue_test.go
index ef036dabd..de674a139 100644
--- a/services/crunch-dispatch-slurm/squeue_test.go
+++ b/services/crunch-dispatch-slurm/squeue_test.go
@@ -7,6 +7,7 @@ package main
import (
"time"
+ "github.com/Sirupsen/logrus"
. "gopkg.in/check.v1"
)
@@ -24,6 +25,7 @@ func (s *SqueueSuite) TestReleasePending(c *C) {
queue: uuids[0] + " 10000 4294000000 PENDING Resources\n" + uuids[1] + " 10000 4294000111 PENDING Resources\n" + uuids[2] + " 10000 0 PENDING BadConstraints\n",
}
sqc := &SqueueChecker{
+ Logger: logrus.StandardLogger(),
Slurm: slurm,
Period: time.Hour,
}
@@ -88,6 +90,7 @@ func (s *SqueueSuite) TestReniceAll(c *C) {
queue: test.squeue,
}
sqc := &SqueueChecker{
+ Logger: logrus.StandardLogger(),
Slurm: slurm,
PrioritySpread: test.spread,
Period: time.Hour,
@@ -112,6 +115,7 @@ func (s *SqueueSuite) TestReniceInvalidNiceValue(c *C) {
rejectNice10K: true,
}
sqc := &SqueueChecker{
+ Logger: logrus.StandardLogger(),
Slurm: slurm,
PrioritySpread: 1,
Period: time.Hour,
@@ -155,6 +159,7 @@ func (s *SqueueSuite) TestSetPriorityBeforeQueued(c *C) {
slurm := &slurmFake{}
sqc := &SqueueChecker{
+ Logger: logrus.StandardLogger(),
Slurm: slurm,
Period: time.Hour,
}
commit 21e13deb6b38f6bea48923306755f648acd2d794
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Aug 16 14:53:21 2018 -0400
13959: Default 5-minute timeout for API calls.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/sdk/go/arvadosclient/arvadosclient.go b/sdk/go/arvadosclient/arvadosclient.go
index 91da5a3fd..e3a9f4ae8 100644
--- a/sdk/go/arvadosclient/arvadosclient.go
+++ b/sdk/go/arvadosclient/arvadosclient.go
@@ -173,8 +173,11 @@ func New(c *arvados.Client) (*ArvadosClient, error) {
ApiServer: c.APIHost,
ApiToken: c.AuthToken,
ApiInsecure: c.Insecure,
- Client: &http.Client{Transport: &http.Transport{
- TLSClientConfig: MakeTLSConfig(c.Insecure)}},
+ Client: &http.Client{
+ Timeout: 5 * time.Minute,
+ Transport: &http.Transport{
+ TLSClientConfig: MakeTLSConfig(c.Insecure)},
+ },
External: false,
Retries: 2,
KeepServiceURIs: c.KeepServiceURIs,
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list