[ARVADOS] created: 1.3.0-1811-g62d28600c
Git user
git at public.curoverse.com
Wed Oct 30 03:36:57 UTC 2019
at 62d28600cbfc31f8e72c61e4519ff198cb66a02a (commit)
commit 62d28600cbfc31f8e72c61e4519ff198cb66a02a
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Oct 29 16:51:13 2019 -0400
15521: Convert remaining log uses to logrus.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/keepstore/command.go b/services/keepstore/command.go
index c589e639f..006d24463 100644
--- a/services/keepstore/command.go
+++ b/services/keepstore/command.go
@@ -190,7 +190,7 @@ func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token str
// Initialize the trashq and workers
h.trashq = NewWorkQueue()
for i := 0; i < 1 || i < h.Cluster.Collections.BlobTrashConcurrency; i++ {
- go RunTrashWorker(h.volmgr, h.Cluster, h.trashq)
+ go RunTrashWorker(h.volmgr, h.Logger, h.Cluster, h.trashq)
}
// Set up routes and metrics
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 9d69b9fa4..54b4871fa 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -892,10 +892,7 @@ func ExpectStatusCode(
testname string,
expectedStatus int,
response *httptest.ResponseRecorder) {
- if response.Code != expectedStatus {
- c.Errorf("%s: expected status %d, got %+v",
- testname, expectedStatus, response)
- }
+ c.Check(response.Code, check.Equals, expectedStatus, check.Commentf("%s", testname))
}
func ExpectBody(
@@ -1147,12 +1144,7 @@ func (s *HandlerSuite) TestUntrashHandler(c *check.C) {
"",
http.StatusOK,
response)
- expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
- if response.Body.String() != expected {
- c.Errorf(
- "Untrash response mismatched: expected %s, got:\n%s",
- expected, response.Body.String())
- }
+ c.Check(response.Body.String(), check.Equals, "Successfully untrashed on: [MockVolume], [MockVolume]\n")
}
func (s *HandlerSuite) TestUntrashHandlerWithNoWritableVolumes(c *check.C) {
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 30ea695f0..0fcc12144 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -11,7 +11,6 @@ import (
"encoding/json"
"fmt"
"io"
- "log"
"net/http"
"os"
"regexp"
@@ -342,25 +341,25 @@ func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
}
var ds debugStats
runtime.ReadMemStats(&ds.MemStats)
- err := json.NewEncoder(resp).Encode(&ds)
+ data, err := json.Marshal(&ds)
if err != nil {
- http.Error(resp, err.Error(), 500)
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(data)
}
// StatusHandler addresses /status.json requests.
func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
stLock.Lock()
rtr.readNodeStatus(&st)
- jstat, err := json.Marshal(&st)
+ data, err := json.Marshal(&st)
stLock.Unlock()
- if err == nil {
- resp.Write(jstat)
- } else {
- log.Printf("json.Marshal: %s", err)
- log.Printf("NodeStatus = %v", &st)
- http.Error(resp, err.Error(), 500)
+ if err != nil {
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(data)
}
// populate the given NodeStatus struct with current values.
@@ -457,28 +456,19 @@ func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
continue
} else {
result.Failed++
- log.Println("DeleteHandler:", err)
+ ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
}
}
-
- var st int
-
if result.Deleted == 0 && result.Failed == 0 {
- st = http.StatusNotFound
- } else {
- st = http.StatusOK
+ resp.WriteHeader(http.StatusNotFound)
+ return
}
-
- resp.WriteHeader(st)
-
- if st == http.StatusOK {
- if body, err := json.Marshal(result); err == nil {
- resp.Write(body)
- } else {
- log.Printf("json.Marshal: %s (result = %v)", err, result)
- http.Error(resp, err.Error(), 500)
- }
+ body, err := json.Marshal(result)
+ if err != nil {
+ http.Error(resp, err.Error(), http.StatusInternalServerError)
+ return
}
+ resp.Write(body)
}
/* PullHandler processes "PUT /pull" requests for the data manager.
@@ -600,6 +590,7 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
return
}
+ log := ctxlog.FromContext(req.Context())
hash := mux.Vars(req)["hash"]
if len(rtr.volmgr.AllWritable()) == 0 {
@@ -615,27 +606,26 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
if os.IsNotExist(err) {
numNotFound++
} else if err != nil {
- log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+ log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
failedOn = append(failedOn, vol.String())
} else {
- log.Printf("Untrashed %v on volume %v", hash, vol.String())
+ log.Infof("Untrashed %v on volume %v", hash, vol.String())
untrashedOn = append(untrashedOn, vol.String())
}
}
if numNotFound == len(rtr.volmgr.AllWritable()) {
http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
- return
- }
-
- if len(failedOn) == len(rtr.volmgr.AllWritable()) {
+ } else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
} else {
- respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+ respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
if len(failedOn) > 0 {
- respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+ respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
+ http.Error(resp, respBody, http.StatusInternalServerError)
+ } else {
+ fmt.Fprintln(resp, respBody)
}
- resp.Write([]byte(respBody))
}
}
@@ -659,6 +649,8 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
// DiskHashError.
//
func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+ log := ctxlog.FromContext(ctx)
+
// Attempt to read the requested hash from a keep volume.
errorToCaller := NotFoundError
@@ -676,7 +668,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
// volumes. If all volumes report IsNotExist,
// we return a NotFoundError.
if !os.IsNotExist(err) {
- log.Printf("%s: Get(%s): %s", vol, hash, err)
+ log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
}
// If some volume returns a transient error, return it to the caller
// instead of "Not found" so it can retry.
@@ -686,19 +678,16 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
continue
}
// Check the file checksum.
- //
filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
if filehash != hash {
// TODO: Try harder to tell a sysadmin about
// this.
- log.Printf("%s: checksum mismatch for request %s (actual %s)",
- vol, hash, filehash)
+ log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
errorToCaller = DiskHashError
continue
}
if errorToCaller == DiskHashError {
- log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
- vol, hash)
+ log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
}
return size, nil
}
@@ -754,12 +743,14 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
// Choose a Keep volume to write to.
// If this volume fails, try all of the volumes in order.
if mnt := volmgr.NextWritable(); mnt != nil {
- if err := mnt.Put(ctx, hash, block); err == nil {
+ if err := mnt.Put(ctx, hash, block); err != nil {
+ log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+ } else {
return mnt.Replication, nil // success!
}
- if ctx.Err() != nil {
- return 0, ErrClientDisconnect
- }
+ }
+ if ctx.Err() != nil {
+ return 0, ErrClientDisconnect
}
writables := volmgr.AllWritable()
@@ -774,15 +765,17 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
if ctx.Err() != nil {
return 0, ErrClientDisconnect
}
- if err == nil {
+ switch err {
+ case nil:
return vol.Replication, nil // success!
- }
- if err != FullError {
+ case FullError:
+ continue
+ default:
// The volume is not full but the
// write did not succeed. Report the
// error and continue trying.
allFull = false
- log.Errorf("%s: Write(%s): %s", vol, hash, err)
+ log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
}
}
@@ -800,6 +793,7 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
// premature garbage collection. Otherwise, it returns a non-nil
// error.
func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+ log := ctxlog.FromContext(ctx)
var bestErr error = NotFoundError
for _, mnt := range volmgr.AllWritable() {
err := mnt.Compare(ctx, hash, buf)
@@ -811,7 +805,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
// to tell which one is wanted if we have
// both, so there's no point writing it even
// on a different volume.)
- log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+ log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
return 0, err
} else if os.IsNotExist(err) {
// Block does not exist. This is the only
@@ -821,11 +815,11 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
// Couldn't open file, data is corrupt on
// disk, etc.: log this abnormal condition,
// and try the next volume.
- log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+ log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
continue
}
if err := mnt.Touch(hash); err != nil {
- log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
+ log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
bestErr = err
continue
}
@@ -859,18 +853,6 @@ func GetAPIToken(req *http.Request) string {
return ""
}
-// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestampHex cannot be
-// parsed as a hexadecimal string.
-func IsExpired(timestampHex string) bool {
- ts, err := strconv.ParseInt(timestampHex, 16, 0)
- if err != nil {
- log.Printf("IsExpired: %s", err)
- return true
- }
- return time.Unix(ts, 0).Before(time.Now())
-}
-
// canDelete returns true if the user identified by apiToken is
// allowed to delete blocks.
func (rtr *router) canDelete(apiToken string) bool {
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 220377af2..08cc591fc 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -16,7 +16,6 @@ import (
"fmt"
"io"
"io/ioutil"
- "log"
"net/http"
"os"
"regexp"
@@ -37,11 +36,12 @@ func init() {
}
func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
- v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
+ v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
err := json.Unmarshal(volume.DriverParameters, &v)
if err != nil {
return nil, err
}
+ v.logger = logger.WithField("Volume", v.String())
return v, v.check()
}
@@ -340,7 +340,7 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
rdr, err = v.bucket.GetReader(loc)
if err != nil {
- log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
+ v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
err = v.translateError(err)
}
return
@@ -465,7 +465,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
go func() {
defer func() {
if ctx.Err() != nil {
- v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+ v.logger.Debugf("abandoned PutReader goroutine finished with err: %s", err)
}
}()
defer close(ready)
@@ -477,7 +477,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
}()
select {
case <-ctx.Done():
- v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
+ v.logger.Debugf("taking PutReader's input away: %s", ctx.Err())
// Our pipe might be stuck in Write(), waiting for
// PutReader() to read. If so, un-stick it. This means
// PutReader will get corrupt data, but that's OK: the
@@ -485,7 +485,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
go io.Copy(ioutil.Discard, bufr)
// CloseWithError() will return once pending I/O is done.
bufw.CloseWithError(ctx.Err())
- v.logger.Debugf("%s: abandoning PutReader goroutine", v)
+ v.logger.Debugf("abandoning PutReader goroutine")
return ctx.Err()
case <-ready:
// Unblock pipe in case PutReader did not consume it.
@@ -523,13 +523,13 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
// The data object X exists, but recent/X is missing.
err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- log.Printf("error: creating %q: %s", "recent/"+loc, err)
+ v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
return zeroTime, v.translateError(err)
}
- log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
+ v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
resp, err = v.bucket.Head("recent/"+loc, nil)
if err != nil {
- log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
+ v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
return zeroTime, v.translateError(err)
}
} else if err != nil {
@@ -544,12 +544,14 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
// Use a merge sort to find matching sets of X and recent/X.
dataL := s3Lister{
+ Logger: v.logger,
Bucket: v.bucket.Bucket(),
Prefix: prefix,
PageSize: v.IndexPageSize,
Stats: &v.bucket.stats,
}
recentL := s3Lister{
+ Logger: v.logger,
Bucket: v.bucket.Bucket(),
Prefix: "recent/" + prefix,
PageSize: v.IndexPageSize,
@@ -744,24 +746,24 @@ func (v *S3Volume) fixRace(loc string) bool {
trash, err := v.bucket.Head("trash/"+loc, nil)
if err != nil {
if !os.IsNotExist(v.translateError(err)) {
- log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
}
return false
}
trashTime, err := v.lastModified(trash)
if err != nil {
- log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
+ v.logger.WithError(err).Errorf("fixRace: error parsing time %q", trash.Header.Get("Last-Modified"))
return false
}
recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil {
- log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
+ v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
return false
}
recentTime, err := v.lastModified(recent)
if err != nil {
- log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
+ v.logger.WithError(err).Errorf("fixRace: error parsing time %q", recent.Header.Get("Last-Modified"))
return false
}
@@ -772,11 +774,11 @@ func (v *S3Volume) fixRace(loc string) bool {
return false
}
- log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
- log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+ v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+ v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
err = v.safeCopy(loc, "trash/"+loc)
if err != nil {
- log.Printf("error: fixRace: %s", err)
+ v.logger.WithError(err).Error("fixRace: copy failed")
return false
}
return true
@@ -819,24 +821,24 @@ func (v *S3Volume) EmptyTrash() {
trashT, err := time.Parse(time.RFC3339, trash.LastModified)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+ v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err)
return
}
recent, err := v.bucket.Head("recent/"+loc, nil)
if err != nil && os.IsNotExist(v.translateError(err)) {
- log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
+ v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
err = v.Untrash(loc)
if err != nil {
- log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
}
return
} else if err != nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
return
}
recentT, err := v.lastModified(recent)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
+ v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+loc, recent.Header.Get("Last-Modified"))
return
}
if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
@@ -849,18 +851,18 @@ func (v *S3Volume) EmptyTrash() {
// Note this means (TrashSweepInterval
// < BlobSigningTTL - raceWindow) is
// necessary to avoid starvation.
- log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
+ v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
v.fixRace(loc)
v.Touch(loc)
return
}
_, err := v.bucket.Head(loc, nil)
if os.IsNotExist(err) {
- log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
+ v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
v.fixRace(loc)
return
} else if err != nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
return
}
}
@@ -869,7 +871,7 @@ func (v *S3Volume) EmptyTrash() {
}
err = v.bucket.Del(trash.Key)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", trash.Key)
return
}
atomic.AddInt64(&bytesDeleted, trash.Size)
@@ -877,16 +879,16 @@ func (v *S3Volume) EmptyTrash() {
_, err = v.bucket.Head(loc, nil)
if err == nil {
- log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+ v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
return
}
if !os.IsNotExist(v.translateError(err)) {
- log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
return
}
err = v.bucket.Del("recent/" + loc)
if err != nil {
- log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+ v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
}
}
@@ -903,6 +905,7 @@ func (v *S3Volume) EmptyTrash() {
}
trashL := s3Lister{
+ Logger: v.logger,
Bucket: v.bucket.Bucket(),
Prefix: "trash/",
PageSize: v.IndexPageSize,
@@ -915,12 +918,13 @@ func (v *S3Volume) EmptyTrash() {
wg.Wait()
if err := trashL.Error(); err != nil {
- log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
+ v.logger.WithError(err).Error("EmptyTrash: lister failed")
}
- log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+ v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
type s3Lister struct {
+ Logger logrus.FieldLogger
Bucket *s3.Bucket
Prefix string
PageSize int
@@ -967,7 +971,7 @@ func (lister *s3Lister) getPage() {
lister.buf = make([]s3.Key, 0, len(resp.Contents))
for _, key := range resp.Contents {
if !strings.HasPrefix(key.Key, lister.Prefix) {
- log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
+ lister.Logger.Warnf("s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
continue
}
lister.buf = append(lister.buf, key)
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 49ea24aa0..dbd6a45ed 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -11,7 +11,6 @@ import (
"encoding/json"
"fmt"
"io"
- "log"
"net/http"
"net/http/httptest"
"os"
@@ -499,11 +498,11 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- log.Printf("PutRaw: %s: %+v", loc, err)
+ v.logger.Printf("PutRaw: %s: %+v", loc, err)
}
err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
if err != nil {
- log.Printf("PutRaw: recent/%s: %+v", loc, err)
+ v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
}
}
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index ba1455ac6..3b1bd0423 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -6,10 +6,10 @@ package main
import (
"errors"
- "log"
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "github.com/sirupsen/logrus"
)
// RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
@@ -18,19 +18,19 @@ import (
// Delete the block indicated by the trash request Locator
// Repeat
//
-func RunTrashWorker(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashq *WorkQueue) {
+func RunTrashWorker(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashq *WorkQueue) {
for item := range trashq.NextItem {
trashRequest := item.(TrashRequest)
- TrashItem(volmgr, cluster, trashRequest)
+ TrashItem(volmgr, logger, cluster, trashRequest)
trashq.DoneItem <- struct{}{}
}
}
// TrashItem deletes the indicated block from every writable volume.
-func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest TrashRequest) {
+func TrashItem(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashRequest TrashRequest) {
reqMtime := time.Unix(0, trashRequest.BlockMtime)
if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() {
- log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
+ logger.Warnf("client asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
arvados.Duration(time.Since(reqMtime)),
trashRequest.Locator,
trashRequest.BlockMtime,
@@ -43,7 +43,7 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
if uuid := trashRequest.MountUUID; uuid == "" {
volumes = volmgr.AllWritable()
} else if mnt := volmgr.Lookup(uuid, true); mnt == nil {
- log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
+ logger.Warnf("trash request for nonexistent mount: %v", trashRequest)
return
} else {
volumes = []*VolumeMount{mnt}
@@ -52,11 +52,11 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
for _, volume := range volumes {
mtime, err := volume.Mtime(trashRequest.Locator)
if err != nil {
- log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+ logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
continue
}
if trashRequest.BlockMtime != mtime.UnixNano() {
- log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
+ logger.Infof("%v Trash(%v): stored mtime %v does not match trash list value %v; skipping", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
continue
}
@@ -67,9 +67,9 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
}
if err != nil {
- log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+ logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
} else {
- log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator)
+ logger.Infof("%v Trash(%v) OK", volume, trashRequest.Locator)
}
}
}
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index bd3743090..c2052077f 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -9,6 +9,7 @@ import (
"context"
"time"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
check "gopkg.in/check.v1"
)
@@ -291,7 +292,7 @@ func (s *HandlerSuite) performTrashWorkerTest(c *check.C, testData TrashWorkerTe
}
}
}
- go RunTrashWorker(s.handler.volmgr, s.cluster, trashq)
+ go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq)
// Install gate so all local operations block until we say go
gate := make(chan struct{})
diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go
index 6504f9c16..f41bd30d3 100644
--- a/services/keepstore/unix_volume.go
+++ b/services/keepstore/unix_volume.go
@@ -11,7 +11,6 @@ import (
"fmt"
"io"
"io/ioutil"
- "log"
"os"
"os/exec"
"path/filepath"
@@ -38,6 +37,7 @@ func newDirectoryVolume(cluster *arvados.Cluster, volume arvados.Volume, logger
if err != nil {
return nil, err
}
+ v.logger = v.logger.WithField("Volume", v.String())
return v, v.check()
}
@@ -84,7 +84,7 @@ type UnixVolume struct {
// "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
func (v *UnixVolume) GetDeviceID() string {
giveup := func(f string, args ...interface{}) string {
- log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...)
+ v.logger.Infof(f+"; using blank DeviceID for volume %s", append(args, v)...)
return ""
}
buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
@@ -143,7 +143,7 @@ func (v *UnixVolume) GetDeviceID() string {
link := filepath.Join(udir, uuid)
fi, err = os.Stat(link)
if err != nil {
- log.Printf("error: stat %q: %s", link, err)
+ v.logger.WithError(err).Errorf("stat(%q) failed", link)
continue
}
if fi.Sys().(*syscall.Stat_t).Ino == ino {
@@ -271,15 +271,12 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
}
bdir := v.blockDir(loc)
if err := os.MkdirAll(bdir, 0755); err != nil {
- log.Printf("%s: could not create directory %s: %s",
- loc, bdir, err)
- return err
+ return fmt.Errorf("error creating directory %s: %s", bdir, err)
}
tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc)
if tmperr != nil {
- log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
- return tmperr
+ return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, tmperr)
}
bpath := v.blockPath(loc)
@@ -291,19 +288,20 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
n, err := io.Copy(tmpfile, rdr)
v.os.stats.TickOutBytes(uint64(n))
if err != nil {
- log.Printf("%s: writing to %s: %s", v, bpath, err)
+ err = fmt.Errorf("error writing %s: %s", bpath, err)
tmpfile.Close()
v.os.Remove(tmpfile.Name())
return err
}
if err := tmpfile.Close(); err != nil {
- log.Printf("closing %s: %s", tmpfile.Name(), err)
+ err = fmt.Errorf("error closing %s: %s", tmpfile.Name(), err)
v.os.Remove(tmpfile.Name())
return err
}
if err := v.os.Rename(tmpfile.Name(), bpath); err != nil {
- log.Printf("rename %s %s: %s", tmpfile.Name(), bpath, err)
- return v.os.Remove(tmpfile.Name())
+ err = fmt.Errorf("error renaming %s to %s: %s", tmpfile.Name(), bpath, err)
+ v.os.Remove(tmpfile.Name())
+ return err
}
return nil
}
@@ -314,14 +312,14 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
func (v *UnixVolume) Status() *VolumeStatus {
fi, err := v.os.Stat(v.Root)
if err != nil {
- log.Printf("%s: os.Stat: %s", v, err)
+ v.logger.WithError(err).Error("stat failed")
return nil
}
devnum := fi.Sys().(*syscall.Stat_t).Dev
var fs syscall.Statfs_t
if err := syscall.Statfs(v.Root, &fs); err != nil {
- log.Printf("%s: statfs: %s", v, err)
+ v.logger.WithError(err).Error("statfs failed")
return nil
}
// These calculations match the way df calculates disk usage:
@@ -380,8 +378,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
blockdirpath := filepath.Join(v.Root, names[0])
blockdir, err := v.os.Open(blockdirpath)
if err != nil {
- log.Print("Error reading ", blockdirpath, ": ", err)
- lastErr = err
+ v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+ lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
continue
}
v.os.stats.TickOps("readdir")
@@ -391,8 +389,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
if err == io.EOF {
break
} else if err != nil {
- log.Print("Error reading ", blockdirpath, ": ", err)
- lastErr = err
+ v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+ lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
break
}
name := fileInfo[0].Name()
@@ -408,9 +406,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
" ", fileInfo[0].ModTime().UnixNano(),
"\n")
if err != nil {
- log.Print("Error writing : ", err)
- lastErr = err
- break
+ blockdir.Close()
+ return fmt.Errorf("error writing: %s", err)
}
}
blockdir.Close()
@@ -534,7 +531,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
if avail, err := v.FreeDiskSpace(); err == nil {
isFull = avail < MinFreeKilobytes
} else {
- log.Printf("%s: FreeDiskSpace: %s", v, err)
+ v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v)
isFull = false
}
@@ -584,7 +581,7 @@ func (v *UnixVolume) lock(ctx context.Context) error {
}()
select {
case <-ctx.Done():
- log.Printf("%s: client hung up while waiting for Serialize lock (%s)", v, time.Since(t0))
+ v.logger.Infof("client hung up while waiting for Serialize lock (%s)", time.Since(t0))
go func() {
<-locked
v.locker.Unlock()
@@ -653,7 +650,7 @@ func (v *UnixVolume) EmptyTrash() {
}
deadline, err := strconv.ParseInt(matches[2], 10, 64)
if err != nil {
- log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
+ v.logger.WithError(err).Errorf("EmptyTrash: %v: ParseInt(%q) failed", path, matches[2])
return
}
atomic.AddInt64(&bytesInTrash, info.Size())
@@ -663,7 +660,7 @@ func (v *UnixVolume) EmptyTrash() {
}
err = v.os.Remove(path)
if err != nil {
- log.Printf("EmptyTrash: Remove %v: %v", path, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: Remove(%q) failed", path)
return
}
atomic.AddInt64(&bytesDeleted, info.Size())
@@ -688,7 +685,7 @@ func (v *UnixVolume) EmptyTrash() {
err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
if err != nil {
- log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
+ v.logger.WithError(err).Errorf("EmptyTrash: filepath.Walk(%q) failed", path)
return nil
}
todo <- dirent{path, info}
@@ -698,10 +695,10 @@ func (v *UnixVolume) EmptyTrash() {
wg.Wait()
if err != nil {
- log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+ v.logger.WithError(err).Error("EmptyTrash failed")
}
- log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+ v.logger.Infof("EmptyTrash stats: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
}
type unixStats struct {
diff --git a/services/keepstore/unix_volume_test.go b/services/keepstore/unix_volume_test.go
index 1ffc46513..664956f7b 100644
--- a/services/keepstore/unix_volume_test.go
+++ b/services/keepstore/unix_volume_test.go
@@ -19,6 +19,7 @@ import (
"time"
"git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/ctxlog"
"github.com/prometheus/client_golang/prometheus"
"github.com/sirupsen/logrus"
check "gopkg.in/check.v1"
@@ -90,6 +91,7 @@ func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Clu
Root: d,
locker: locker,
cluster: cluster,
+ logger: ctxlog.TestLogger(c),
volume: volume,
metrics: metrics,
},
commit 55f206c71dd495293c896527a6d8f041db8b9b99
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Oct 29 11:02:49 2019 -0400
15521: Include request ID etc. in PutBlock log messages.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 86de8fa19..30ea695f0 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -733,6 +733,8 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
// provide as much detail as possible.
//
func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+ log := ctxlog.FromContext(ctx)
+
// Check that BLOCK's checksum matches HASH.
blockhash := fmt.Sprintf("%x", md5.Sum(block))
if blockhash != hash {
@@ -762,7 +764,7 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
writables := volmgr.AllWritable()
if len(writables) == 0 {
- log.Print("No writable volumes.")
+ log.Error("no writable volumes")
return 0, FullError
}
@@ -780,12 +782,12 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
// write did not succeed. Report the
// error and continue trying.
allFull = false
- log.Printf("%s: Write(%s): %s", vol, hash, err)
+ log.Errorf("%s: Write(%s): %s", vol, hash, err)
}
}
if allFull {
- log.Print("All volumes are full.")
+ log.Error("all volumes are full")
return 0, FullError
}
// Already logged the non-full errors.
commit 142b8f261e7006faf1f1ad846d3355b278f5d921
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Tue Oct 29 10:18:40 2019 -0400
15521: Fix truncated index response.
The client (keep-balance) looks for a blank line at the end of the
index response as an assurance the response is not truncated. If
headers have already been sent, http.Error("") writes a blank line --
so we were inadvertently assuring the client that truncated responses
were not truncated.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 86504422d..86de8fa19 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -282,19 +282,15 @@ func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
for _, v := range vols {
if err := v.IndexTo(prefix, resp); err != nil {
- // We can't send an error message to the
- // client because we might have already sent
- // headers and index content. All we can do is
- // log the error in our own logs, and (in
- // cases where headers haven't been sent yet)
- // set a 500 status.
+ // We can't send an error status/message to
+ // the client because IndexTo() might have
+ // already written body content. All we can do
+ // is log the error in our own logs.
//
- // If headers have already been sent, the
- // client must notice the lack of trailing
+ // The client must notice the lack of trailing
// newline as an indication that the response
// is incomplete.
- log.Printf("index error from volume %s: %s", v, err)
- http.Error(resp, "", http.StatusInternalServerError)
+ ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
return
}
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list