[ARVADOS] created: 1.3.0-1811-g62d28600c

Git user git at public.curoverse.com
Wed Oct 30 03:36:57 UTC 2019


        at  62d28600cbfc31f8e72c61e4519ff198cb66a02a (commit)


commit 62d28600cbfc31f8e72c61e4519ff198cb66a02a
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Oct 29 16:51:13 2019 -0400

    15521: Convert remaining log uses to logrus.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/keepstore/command.go b/services/keepstore/command.go
index c589e639f..006d24463 100644
--- a/services/keepstore/command.go
+++ b/services/keepstore/command.go
@@ -190,7 +190,7 @@ func (h *handler) setup(ctx context.Context, cluster *arvados.Cluster, token str
 	// Initialize the trashq and workers
 	h.trashq = NewWorkQueue()
 	for i := 0; i < 1 || i < h.Cluster.Collections.BlobTrashConcurrency; i++ {
-		go RunTrashWorker(h.volmgr, h.Cluster, h.trashq)
+		go RunTrashWorker(h.volmgr, h.Logger, h.Cluster, h.trashq)
 	}
 
 	// Set up routes and metrics
diff --git a/services/keepstore/handler_test.go b/services/keepstore/handler_test.go
index 9d69b9fa4..54b4871fa 100644
--- a/services/keepstore/handler_test.go
+++ b/services/keepstore/handler_test.go
@@ -892,10 +892,7 @@ func ExpectStatusCode(
 	testname string,
 	expectedStatus int,
 	response *httptest.ResponseRecorder) {
-	if response.Code != expectedStatus {
-		c.Errorf("%s: expected status %d, got %+v",
-			testname, expectedStatus, response)
-	}
+	c.Check(response.Code, check.Equals, expectedStatus, check.Commentf("%s", testname))
 }
 
 func ExpectBody(
@@ -1147,12 +1144,7 @@ func (s *HandlerSuite) TestUntrashHandler(c *check.C) {
 		"",
 		http.StatusOK,
 		response)
-	expected := "Successfully untrashed on: [MockVolume],[MockVolume]"
-	if response.Body.String() != expected {
-		c.Errorf(
-			"Untrash response mismatched: expected %s, got:\n%s",
-			expected, response.Body.String())
-	}
+	c.Check(response.Body.String(), check.Equals, "Successfully untrashed on: [MockVolume], [MockVolume]\n")
 }
 
 func (s *HandlerSuite) TestUntrashHandlerWithNoWritableVolumes(c *check.C) {
diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 30ea695f0..0fcc12144 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -11,7 +11,6 @@ import (
 	"encoding/json"
 	"fmt"
 	"io"
-	"log"
 	"net/http"
 	"os"
 	"regexp"
@@ -342,25 +341,25 @@ func (rtr *router) DebugHandler(resp http.ResponseWriter, req *http.Request) {
 	}
 	var ds debugStats
 	runtime.ReadMemStats(&ds.MemStats)
-	err := json.NewEncoder(resp).Encode(&ds)
+	data, err := json.Marshal(&ds)
 	if err != nil {
-		http.Error(resp, err.Error(), 500)
+		http.Error(resp, err.Error(), http.StatusInternalServerError)
+		return
 	}
+	resp.Write(data)
 }
 
 // StatusHandler addresses /status.json requests.
 func (rtr *router) StatusHandler(resp http.ResponseWriter, req *http.Request) {
 	stLock.Lock()
 	rtr.readNodeStatus(&st)
-	jstat, err := json.Marshal(&st)
+	data, err := json.Marshal(&st)
 	stLock.Unlock()
-	if err == nil {
-		resp.Write(jstat)
-	} else {
-		log.Printf("json.Marshal: %s", err)
-		log.Printf("NodeStatus = %v", &st)
-		http.Error(resp, err.Error(), 500)
+	if err != nil {
+		http.Error(resp, err.Error(), http.StatusInternalServerError)
+		return
 	}
+	resp.Write(data)
 }
 
 // populate the given NodeStatus struct with current values.
@@ -457,28 +456,19 @@ func (rtr *router) handleDELETE(resp http.ResponseWriter, req *http.Request) {
 			continue
 		} else {
 			result.Failed++
-			log.Println("DeleteHandler:", err)
+			ctxlog.FromContext(req.Context()).WithError(err).Errorf("Trash(%s) failed on volume %s", hash, vol)
 		}
 	}
-
-	var st int
-
 	if result.Deleted == 0 && result.Failed == 0 {
-		st = http.StatusNotFound
-	} else {
-		st = http.StatusOK
+		resp.WriteHeader(http.StatusNotFound)
+		return
 	}
-
-	resp.WriteHeader(st)
-
-	if st == http.StatusOK {
-		if body, err := json.Marshal(result); err == nil {
-			resp.Write(body)
-		} else {
-			log.Printf("json.Marshal: %s (result = %v)", err, result)
-			http.Error(resp, err.Error(), 500)
-		}
+	body, err := json.Marshal(result)
+	if err != nil {
+		http.Error(resp, err.Error(), http.StatusInternalServerError)
+		return
 	}
+	resp.Write(body)
 }
 
 /* PullHandler processes "PUT /pull" requests for the data manager.
@@ -600,6 +590,7 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
 		return
 	}
 
+	log := ctxlog.FromContext(req.Context())
 	hash := mux.Vars(req)["hash"]
 
 	if len(rtr.volmgr.AllWritable()) == 0 {
@@ -615,27 +606,26 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
 		if os.IsNotExist(err) {
 			numNotFound++
 		} else if err != nil {
-			log.Printf("Error untrashing %v on volume %v", hash, vol.String())
+			log.WithError(err).Errorf("Error untrashing %v on volume %s", hash, vol)
 			failedOn = append(failedOn, vol.String())
 		} else {
-			log.Printf("Untrashed %v on volume %v", hash, vol.String())
+			log.Infof("Untrashed %v on volume %v", hash, vol.String())
 			untrashedOn = append(untrashedOn, vol.String())
 		}
 	}
 
 	if numNotFound == len(rtr.volmgr.AllWritable()) {
 		http.Error(resp, "Block not found on any of the writable volumes", http.StatusNotFound)
-		return
-	}
-
-	if len(failedOn) == len(rtr.volmgr.AllWritable()) {
+	} else if len(failedOn) == len(rtr.volmgr.AllWritable()) {
 		http.Error(resp, "Failed to untrash on all writable volumes", http.StatusInternalServerError)
 	} else {
-		respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ",")
+		respBody := "Successfully untrashed on: " + strings.Join(untrashedOn, ", ")
 		if len(failedOn) > 0 {
-			respBody += "; Failed to untrash on: " + strings.Join(failedOn, ",")
+			respBody += "; Failed to untrash on: " + strings.Join(failedOn, ", ")
+			http.Error(resp, respBody, http.StatusInternalServerError)
+		} else {
+			fmt.Fprintln(resp, respBody)
 		}
-		resp.Write([]byte(respBody))
 	}
 }
 
@@ -659,6 +649,8 @@ func (rtr *router) handleUntrash(resp http.ResponseWriter, req *http.Request) {
 // DiskHashError.
 //
 func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte, resp http.ResponseWriter) (int, error) {
+	log := ctxlog.FromContext(ctx)
+
 	// Attempt to read the requested hash from a keep volume.
 	errorToCaller := NotFoundError
 
@@ -676,7 +668,7 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 			// volumes. If all volumes report IsNotExist,
 			// we return a NotFoundError.
 			if !os.IsNotExist(err) {
-				log.Printf("%s: Get(%s): %s", vol, hash, err)
+				log.WithError(err).Errorf("Get(%s) failed on %s", hash, vol)
 			}
 			// If some volume returns a transient error, return it to the caller
 			// instead of "Not found" so it can retry.
@@ -686,19 +678,16 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 			continue
 		}
 		// Check the file checksum.
-		//
 		filehash := fmt.Sprintf("%x", md5.Sum(buf[:size]))
 		if filehash != hash {
 			// TODO: Try harder to tell a sysadmin about
 			// this.
-			log.Printf("%s: checksum mismatch for request %s (actual %s)",
-				vol, hash, filehash)
+			log.Error("checksum mismatch for block %s (actual %s) on %s", hash, filehash, vol)
 			errorToCaller = DiskHashError
 			continue
 		}
 		if errorToCaller == DiskHashError {
-			log.Printf("%s: checksum mismatch for request %s but a good copy was found on another volume and returned",
-				vol, hash)
+			log.Warn("after checksum mismatch for block %s on a different volume, a good copy was found on volume %s and returned", hash, vol)
 		}
 		return size, nil
 	}
@@ -754,12 +743,14 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 	// Choose a Keep volume to write to.
 	// If this volume fails, try all of the volumes in order.
 	if mnt := volmgr.NextWritable(); mnt != nil {
-		if err := mnt.Put(ctx, hash, block); err == nil {
+		if err := mnt.Put(ctx, hash, block); err != nil {
+			log.WithError(err).Errorf("%s: Put(%s) failed", mnt.Volume, hash)
+		} else {
 			return mnt.Replication, nil // success!
 		}
-		if ctx.Err() != nil {
-			return 0, ErrClientDisconnect
-		}
+	}
+	if ctx.Err() != nil {
+		return 0, ErrClientDisconnect
 	}
 
 	writables := volmgr.AllWritable()
@@ -774,15 +765,17 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 		if ctx.Err() != nil {
 			return 0, ErrClientDisconnect
 		}
-		if err == nil {
+		switch err {
+		case nil:
 			return vol.Replication, nil // success!
-		}
-		if err != FullError {
+		case FullError:
+			continue
+		default:
 			// The volume is not full but the
 			// write did not succeed.  Report the
 			// error and continue trying.
 			allFull = false
-			log.Errorf("%s: Write(%s): %s", vol, hash, err)
+			log.WithError(err).Errorf("%s: Put(%s) failed", vol, hash)
 		}
 	}
 
@@ -800,6 +793,7 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 // premature garbage collection. Otherwise, it returns a non-nil
 // error.
 func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []byte) (int, error) {
+	log := ctxlog.FromContext(ctx)
 	var bestErr error = NotFoundError
 	for _, mnt := range volmgr.AllWritable() {
 		err := mnt.Compare(ctx, hash, buf)
@@ -811,7 +805,7 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
 			// to tell which one is wanted if we have
 			// both, so there's no point writing it even
 			// on a different volume.)
-			log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+			log.Error("collision in Compare(%s) on volume %s", hash, mnt.Volume)
 			return 0, err
 		} else if os.IsNotExist(err) {
 			// Block does not exist. This is the only
@@ -821,11 +815,11 @@ func CompareAndTouch(ctx context.Context, volmgr *RRVolumeManager, hash string,
 			// Couldn't open file, data is corrupt on
 			// disk, etc.: log this abnormal condition,
 			// and try the next volume.
-			log.Printf("%s: Compare(%s): %s", mnt.Volume, hash, err)
+			log.WithError(err).Warnf("error in Compare(%s) on volume %s", hash, mnt.Volume)
 			continue
 		}
 		if err := mnt.Touch(hash); err != nil {
-			log.Printf("%s: Touch %s failed: %s", mnt.Volume, hash, err)
+			log.WithError(err).Errorf("error in Touch(%s) on volume %s", hash, mnt.Volume)
 			bestErr = err
 			continue
 		}
@@ -859,18 +853,6 @@ func GetAPIToken(req *http.Request) string {
 	return ""
 }
 
-// IsExpired returns true if the given Unix timestamp (expressed as a
-// hexadecimal string) is in the past, or if timestampHex cannot be
-// parsed as a hexadecimal string.
-func IsExpired(timestampHex string) bool {
-	ts, err := strconv.ParseInt(timestampHex, 16, 0)
-	if err != nil {
-		log.Printf("IsExpired: %s", err)
-		return true
-	}
-	return time.Unix(ts, 0).Before(time.Now())
-}
-
 // canDelete returns true if the user identified by apiToken is
 // allowed to delete blocks.
 func (rtr *router) canDelete(apiToken string) bool {
diff --git a/services/keepstore/s3_volume.go b/services/keepstore/s3_volume.go
index 220377af2..08cc591fc 100644
--- a/services/keepstore/s3_volume.go
+++ b/services/keepstore/s3_volume.go
@@ -16,7 +16,6 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
-	"log"
 	"net/http"
 	"os"
 	"regexp"
@@ -37,11 +36,12 @@ func init() {
 }
 
 func newS3Volume(cluster *arvados.Cluster, volume arvados.Volume, logger logrus.FieldLogger, metrics *volumeMetricsVecs) (Volume, error) {
-	v := &S3Volume{cluster: cluster, volume: volume, logger: logger, metrics: metrics}
+	v := &S3Volume{cluster: cluster, volume: volume, metrics: metrics}
 	err := json.Unmarshal(volume.DriverParameters, &v)
 	if err != nil {
 		return nil, err
 	}
+	v.logger = logger.WithField("Volume", v.String())
 	return v, v.check()
 }
 
@@ -340,7 +340,7 @@ func (v *S3Volume) getReader(loc string) (rdr io.ReadCloser, err error) {
 
 	rdr, err = v.bucket.GetReader(loc)
 	if err != nil {
-		log.Printf("warning: reading %s after successful fixRace: %s", loc, err)
+		v.logger.Warnf("reading %s after successful fixRace: %s", loc, err)
 		err = v.translateError(err)
 	}
 	return
@@ -465,7 +465,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
 	go func() {
 		defer func() {
 			if ctx.Err() != nil {
-				v.logger.Debugf("%s: abandoned PutReader goroutine finished with err: %s", v, err)
+				v.logger.Debugf("abandoned PutReader goroutine finished with err: %s", err)
 			}
 		}()
 		defer close(ready)
@@ -477,7 +477,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
 	}()
 	select {
 	case <-ctx.Done():
-		v.logger.Debugf("%s: taking PutReader's input away: %s", v, ctx.Err())
+		v.logger.Debugf("taking PutReader's input away: %s", ctx.Err())
 		// Our pipe might be stuck in Write(), waiting for
 		// PutReader() to read. If so, un-stick it. This means
 		// PutReader will get corrupt data, but that's OK: the
@@ -485,7 +485,7 @@ func (v *S3Volume) Put(ctx context.Context, loc string, block []byte) error {
 		go io.Copy(ioutil.Discard, bufr)
 		// CloseWithError() will return once pending I/O is done.
 		bufw.CloseWithError(ctx.Err())
-		v.logger.Debugf("%s: abandoning PutReader goroutine", v)
+		v.logger.Debugf("abandoning PutReader goroutine")
 		return ctx.Err()
 	case <-ready:
 		// Unblock pipe in case PutReader did not consume it.
@@ -523,13 +523,13 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
 		// The data object X exists, but recent/X is missing.
 		err = v.bucket.PutReader("recent/"+loc, nil, 0, "application/octet-stream", s3ACL, s3.Options{})
 		if err != nil {
-			log.Printf("error: creating %q: %s", "recent/"+loc, err)
+			v.logger.WithError(err).Errorf("error creating %q", "recent/"+loc)
 			return zeroTime, v.translateError(err)
 		}
-		log.Printf("info: created %q to migrate existing block to new storage scheme", "recent/"+loc)
+		v.logger.Infof("created %q to migrate existing block to new storage scheme", "recent/"+loc)
 		resp, err = v.bucket.Head("recent/"+loc, nil)
 		if err != nil {
-			log.Printf("error: created %q but HEAD failed: %s", "recent/"+loc, err)
+			v.logger.WithError(err).Errorf("HEAD failed after creating %q", "recent/"+loc)
 			return zeroTime, v.translateError(err)
 		}
 	} else if err != nil {
@@ -544,12 +544,14 @@ func (v *S3Volume) Mtime(loc string) (time.Time, error) {
 func (v *S3Volume) IndexTo(prefix string, writer io.Writer) error {
 	// Use a merge sort to find matching sets of X and recent/X.
 	dataL := s3Lister{
+		Logger:   v.logger,
 		Bucket:   v.bucket.Bucket(),
 		Prefix:   prefix,
 		PageSize: v.IndexPageSize,
 		Stats:    &v.bucket.stats,
 	}
 	recentL := s3Lister{
+		Logger:   v.logger,
 		Bucket:   v.bucket.Bucket(),
 		Prefix:   "recent/" + prefix,
 		PageSize: v.IndexPageSize,
@@ -744,24 +746,24 @@ func (v *S3Volume) fixRace(loc string) bool {
 	trash, err := v.bucket.Head("trash/"+loc, nil)
 	if err != nil {
 		if !os.IsNotExist(v.translateError(err)) {
-			log.Printf("error: fixRace: HEAD %q: %s", "trash/"+loc, err)
+			v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "trash/"+loc)
 		}
 		return false
 	}
 	trashTime, err := v.lastModified(trash)
 	if err != nil {
-		log.Printf("error: fixRace: parse %q: %s", trash.Header.Get("Last-Modified"), err)
+		v.logger.WithError(err).Errorf("fixRace: error parsing time %q", trash.Header.Get("Last-Modified"))
 		return false
 	}
 
 	recent, err := v.bucket.Head("recent/"+loc, nil)
 	if err != nil {
-		log.Printf("error: fixRace: HEAD %q: %s", "recent/"+loc, err)
+		v.logger.WithError(err).Errorf("fixRace: HEAD %q failed", "recent/"+loc)
 		return false
 	}
 	recentTime, err := v.lastModified(recent)
 	if err != nil {
-		log.Printf("error: fixRace: parse %q: %s", recent.Header.Get("Last-Modified"), err)
+		v.logger.WithError(err).Errorf("fixRace: error parsing time %q", recent.Header.Get("Last-Modified"))
 		return false
 	}
 
@@ -772,11 +774,11 @@ func (v *S3Volume) fixRace(loc string) bool {
 		return false
 	}
 
-	log.Printf("notice: fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
-	log.Printf("notice: fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
+	v.logger.Infof("fixRace: %q: trashed at %s but touched at %s (age when trashed = %s < %s)", loc, trashTime, recentTime, ageWhenTrashed, v.cluster.Collections.BlobSigningTTL)
+	v.logger.Infof("fixRace: copying %q to %q to recover from race between Put/Touch and Trash", "recent/"+loc, loc)
 	err = v.safeCopy(loc, "trash/"+loc)
 	if err != nil {
-		log.Printf("error: fixRace: %s", err)
+		v.logger.WithError(err).Error("fixRace: copy failed")
 		return false
 	}
 	return true
@@ -819,24 +821,24 @@ func (v *S3Volume) EmptyTrash() {
 
 		trashT, err := time.Parse(time.RFC3339, trash.LastModified)
 		if err != nil {
-			log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, trash.Key, trash.LastModified, err)
+			v.logger.Warnf("EmptyTrash: %q: parse %q: %s", trash.Key, trash.LastModified, err)
 			return
 		}
 		recent, err := v.bucket.Head("recent/"+loc, nil)
 		if err != nil && os.IsNotExist(v.translateError(err)) {
-			log.Printf("warning: %s: EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", v, trash.Key, "recent/"+loc, err)
+			v.logger.Warnf("EmptyTrash: found trash marker %q but no %q (%s); calling Untrash", trash.Key, "recent/"+loc, err)
 			err = v.Untrash(loc)
 			if err != nil {
-				log.Printf("error: %s: EmptyTrash: Untrash(%q): %s", v, loc, err)
+				v.logger.WithError(err).Errorf("EmptyTrash: Untrash(%q) failed", loc)
 			}
 			return
 		} else if err != nil {
-			log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, "recent/"+loc, err)
+			v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", "recent/"+loc)
 			return
 		}
 		recentT, err := v.lastModified(recent)
 		if err != nil {
-			log.Printf("warning: %s: EmptyTrash: %q: parse %q: %s", v, "recent/"+loc, recent.Header.Get("Last-Modified"), err)
+			v.logger.WithError(err).Warnf("EmptyTrash: %q: error parsing %q", "recent/"+loc, recent.Header.Get("Last-Modified"))
 			return
 		}
 		if trashT.Sub(recentT) < v.cluster.Collections.BlobSigningTTL.Duration() {
@@ -849,18 +851,18 @@ func (v *S3Volume) EmptyTrash() {
 				// Note this means (TrashSweepInterval
 				// < BlobSigningTTL - raceWindow) is
 				// necessary to avoid starvation.
-				log.Printf("notice: %s: EmptyTrash: detected old race for %q, calling fixRace + Touch", v, loc)
+				v.logger.Infof("EmptyTrash: detected old race for %q, calling fixRace + Touch", loc)
 				v.fixRace(loc)
 				v.Touch(loc)
 				return
 			}
 			_, err := v.bucket.Head(loc, nil)
 			if os.IsNotExist(err) {
-				log.Printf("notice: %s: EmptyTrash: detected recent race for %q, calling fixRace", v, loc)
+				v.logger.Infof("EmptyTrash: detected recent race for %q, calling fixRace", loc)
 				v.fixRace(loc)
 				return
 			} else if err != nil {
-				log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+				v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
 				return
 			}
 		}
@@ -869,7 +871,7 @@ func (v *S3Volume) EmptyTrash() {
 		}
 		err = v.bucket.Del(trash.Key)
 		if err != nil {
-			log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, trash.Key, err)
+			v.logger.WithError(err).Errorf("EmptyTrash: error deleting %q", trash.Key)
 			return
 		}
 		atomic.AddInt64(&bytesDeleted, trash.Size)
@@ -877,16 +879,16 @@ func (v *S3Volume) EmptyTrash() {
 
 		_, err = v.bucket.Head(loc, nil)
 		if err == nil {
-			log.Printf("warning: %s: EmptyTrash: HEAD %q succeeded immediately after deleting %q", v, loc, loc)
+			v.logger.Warnf("EmptyTrash: HEAD %q succeeded immediately after deleting %q", loc, loc)
 			return
 		}
 		if !os.IsNotExist(v.translateError(err)) {
-			log.Printf("warning: %s: EmptyTrash: HEAD %q: %s", v, loc, err)
+			v.logger.WithError(err).Warnf("EmptyTrash: HEAD %q failed", loc)
 			return
 		}
 		err = v.bucket.Del("recent/" + loc)
 		if err != nil {
-			log.Printf("warning: %s: EmptyTrash: deleting %q: %s", v, "recent/"+loc, err)
+			v.logger.WithError(err).Warnf("EmptyTrash: error deleting %q", "recent/"+loc)
 		}
 	}
 
@@ -903,6 +905,7 @@ func (v *S3Volume) EmptyTrash() {
 	}
 
 	trashL := s3Lister{
+		Logger:   v.logger,
 		Bucket:   v.bucket.Bucket(),
 		Prefix:   "trash/",
 		PageSize: v.IndexPageSize,
@@ -915,12 +918,13 @@ func (v *S3Volume) EmptyTrash() {
 	wg.Wait()
 
 	if err := trashL.Error(); err != nil {
-		log.Printf("error: %s: EmptyTrash: lister: %s", v, err)
+		v.logger.WithError(err).Error("EmptyTrash: lister failed")
 	}
-	log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+	v.logger.Infof("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
 
 type s3Lister struct {
+	Logger     logrus.FieldLogger
 	Bucket     *s3.Bucket
 	Prefix     string
 	PageSize   int
@@ -967,7 +971,7 @@ func (lister *s3Lister) getPage() {
 	lister.buf = make([]s3.Key, 0, len(resp.Contents))
 	for _, key := range resp.Contents {
 		if !strings.HasPrefix(key.Key, lister.Prefix) {
-			log.Printf("warning: s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
+			lister.Logger.Warnf("s3Lister: S3 Bucket.List(prefix=%q) returned key %q", lister.Prefix, key.Key)
 			continue
 		}
 		lister.buf = append(lister.buf, key)
diff --git a/services/keepstore/s3_volume_test.go b/services/keepstore/s3_volume_test.go
index 49ea24aa0..dbd6a45ed 100644
--- a/services/keepstore/s3_volume_test.go
+++ b/services/keepstore/s3_volume_test.go
@@ -11,7 +11,6 @@ import (
 	"encoding/json"
 	"fmt"
 	"io"
-	"log"
 	"net/http"
 	"net/http/httptest"
 	"os"
@@ -499,11 +498,11 @@ func (s *StubbedS3Suite) newTestableVolume(c *check.C, cluster *arvados.Cluster,
 func (v *TestableS3Volume) PutRaw(loc string, block []byte) {
 	err := v.bucket.Bucket().Put(loc, block, "application/octet-stream", s3ACL, s3.Options{})
 	if err != nil {
-		log.Printf("PutRaw: %s: %+v", loc, err)
+		v.logger.Printf("PutRaw: %s: %+v", loc, err)
 	}
 	err = v.bucket.Bucket().Put("recent/"+loc, nil, "application/octet-stream", s3ACL, s3.Options{})
 	if err != nil {
-		log.Printf("PutRaw: recent/%s: %+v", loc, err)
+		v.logger.Printf("PutRaw: recent/%s: %+v", loc, err)
 	}
 }
 
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index ba1455ac6..3b1bd0423 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -6,10 +6,10 @@ package main
 
 import (
 	"errors"
-	"log"
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"github.com/sirupsen/logrus"
 )
 
 // RunTrashWorker is used by Keepstore to initiate trash worker channel goroutine.
@@ -18,19 +18,19 @@ import (
 //      Delete the block indicated by the trash request Locator
 //		Repeat
 //
-func RunTrashWorker(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashq *WorkQueue) {
+func RunTrashWorker(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashq *WorkQueue) {
 	for item := range trashq.NextItem {
 		trashRequest := item.(TrashRequest)
-		TrashItem(volmgr, cluster, trashRequest)
+		TrashItem(volmgr, logger, cluster, trashRequest)
 		trashq.DoneItem <- struct{}{}
 	}
 }
 
 // TrashItem deletes the indicated block from every writable volume.
-func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest TrashRequest) {
+func TrashItem(volmgr *RRVolumeManager, logger logrus.FieldLogger, cluster *arvados.Cluster, trashRequest TrashRequest) {
 	reqMtime := time.Unix(0, trashRequest.BlockMtime)
 	if time.Since(reqMtime) < cluster.Collections.BlobSigningTTL.Duration() {
-		log.Printf("WARNING: data manager asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
+		logger.Warnf("client asked to delete a %v old block %v (BlockMtime %d = %v), but my blobSignatureTTL is %v! Skipping.",
 			arvados.Duration(time.Since(reqMtime)),
 			trashRequest.Locator,
 			trashRequest.BlockMtime,
@@ -43,7 +43,7 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
 	if uuid := trashRequest.MountUUID; uuid == "" {
 		volumes = volmgr.AllWritable()
 	} else if mnt := volmgr.Lookup(uuid, true); mnt == nil {
-		log.Printf("warning: trash request for nonexistent mount: %v", trashRequest)
+		logger.Warnf("trash request for nonexistent mount: %v", trashRequest)
 		return
 	} else {
 		volumes = []*VolumeMount{mnt}
@@ -52,11 +52,11 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
 	for _, volume := range volumes {
 		mtime, err := volume.Mtime(trashRequest.Locator)
 		if err != nil {
-			log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+			logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
 			continue
 		}
 		if trashRequest.BlockMtime != mtime.UnixNano() {
-			log.Printf("%v Trash(%v): stored mtime %v does not match trash list value %v", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
+			logger.Infof("%v Trash(%v): stored mtime %v does not match trash list value %v; skipping", volume, trashRequest.Locator, mtime.UnixNano(), trashRequest.BlockMtime)
 			continue
 		}
 
@@ -67,9 +67,9 @@ func TrashItem(volmgr *RRVolumeManager, cluster *arvados.Cluster, trashRequest T
 		}
 
 		if err != nil {
-			log.Printf("%v Trash(%v): %v", volume, trashRequest.Locator, err)
+			logger.WithError(err).Errorf("%v Trash(%v)", volume, trashRequest.Locator)
 		} else {
-			log.Printf("%v Trash(%v) OK", volume, trashRequest.Locator)
+			logger.Infof("%v Trash(%v) OK", volume, trashRequest.Locator)
 		}
 	}
 }
diff --git a/services/keepstore/trash_worker_test.go b/services/keepstore/trash_worker_test.go
index bd3743090..c2052077f 100644
--- a/services/keepstore/trash_worker_test.go
+++ b/services/keepstore/trash_worker_test.go
@@ -9,6 +9,7 @@ import (
 	"context"
 	"time"
 
+	"git.curoverse.com/arvados.git/sdk/go/ctxlog"
 	"github.com/prometheus/client_golang/prometheus"
 	check "gopkg.in/check.v1"
 )
@@ -291,7 +292,7 @@ func (s *HandlerSuite) performTrashWorkerTest(c *check.C, testData TrashWorkerTe
 			}
 		}
 	}
-	go RunTrashWorker(s.handler.volmgr, s.cluster, trashq)
+	go RunTrashWorker(s.handler.volmgr, ctxlog.TestLogger(c), s.cluster, trashq)
 
 	// Install gate so all local operations block until we say go
 	gate := make(chan struct{})
diff --git a/services/keepstore/unix_volume.go b/services/keepstore/unix_volume.go
index 6504f9c16..f41bd30d3 100644
--- a/services/keepstore/unix_volume.go
+++ b/services/keepstore/unix_volume.go
@@ -11,7 +11,6 @@ import (
 	"fmt"
 	"io"
 	"io/ioutil"
-	"log"
 	"os"
 	"os/exec"
 	"path/filepath"
@@ -38,6 +37,7 @@ func newDirectoryVolume(cluster *arvados.Cluster, volume arvados.Volume, logger
 	if err != nil {
 		return nil, err
 	}
+	v.logger = v.logger.WithField("Volume", v.String())
 	return v, v.check()
 }
 
@@ -84,7 +84,7 @@ type UnixVolume struct {
 // "fa0b6166-3b55-4994-bd3f-92f4e00a1bb0/keep".
 func (v *UnixVolume) GetDeviceID() string {
 	giveup := func(f string, args ...interface{}) string {
-		log.Printf(f+"; using blank DeviceID for volume %s", append(args, v)...)
+		v.logger.Infof(f+"; using blank DeviceID for volume %s", append(args, v)...)
 		return ""
 	}
 	buf, err := exec.Command("findmnt", "--noheadings", "--target", v.Root).CombinedOutput()
@@ -143,7 +143,7 @@ func (v *UnixVolume) GetDeviceID() string {
 		link := filepath.Join(udir, uuid)
 		fi, err = os.Stat(link)
 		if err != nil {
-			log.Printf("error: stat %q: %s", link, err)
+			v.logger.WithError(err).Errorf("stat(%q) failed", link)
 			continue
 		}
 		if fi.Sys().(*syscall.Stat_t).Ino == ino {
@@ -271,15 +271,12 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
 	}
 	bdir := v.blockDir(loc)
 	if err := os.MkdirAll(bdir, 0755); err != nil {
-		log.Printf("%s: could not create directory %s: %s",
-			loc, bdir, err)
-		return err
+		return fmt.Errorf("error creating directory %s: %s", bdir, err)
 	}
 
 	tmpfile, tmperr := v.os.TempFile(bdir, "tmp"+loc)
 	if tmperr != nil {
-		log.Printf("ioutil.TempFile(%s, tmp%s): %s", bdir, loc, tmperr)
-		return tmperr
+		return fmt.Errorf("TempFile(%s, tmp%s) failed: %s", bdir, loc, tmperr)
 	}
 
 	bpath := v.blockPath(loc)
@@ -291,19 +288,20 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
 	n, err := io.Copy(tmpfile, rdr)
 	v.os.stats.TickOutBytes(uint64(n))
 	if err != nil {
-		log.Printf("%s: writing to %s: %s", v, bpath, err)
+		err = fmt.Errorf("error writing %s: %s", bpath, err)
 		tmpfile.Close()
 		v.os.Remove(tmpfile.Name())
 		return err
 	}
 	if err := tmpfile.Close(); err != nil {
-		log.Printf("closing %s: %s", tmpfile.Name(), err)
+		err = fmt.Errorf("error closing %s: %s", tmpfile.Name(), err)
 		v.os.Remove(tmpfile.Name())
 		return err
 	}
 	if err := v.os.Rename(tmpfile.Name(), bpath); err != nil {
-		log.Printf("rename %s %s: %s", tmpfile.Name(), bpath, err)
-		return v.os.Remove(tmpfile.Name())
+		err = fmt.Errorf("error renaming %s to %s: %s", tmpfile.Name(), bpath, err)
+		v.os.Remove(tmpfile.Name())
+		return err
 	}
 	return nil
 }
@@ -314,14 +312,14 @@ func (v *UnixVolume) WriteBlock(ctx context.Context, loc string, rdr io.Reader)
 func (v *UnixVolume) Status() *VolumeStatus {
 	fi, err := v.os.Stat(v.Root)
 	if err != nil {
-		log.Printf("%s: os.Stat: %s", v, err)
+		v.logger.WithError(err).Error("stat failed")
 		return nil
 	}
 	devnum := fi.Sys().(*syscall.Stat_t).Dev
 
 	var fs syscall.Statfs_t
 	if err := syscall.Statfs(v.Root, &fs); err != nil {
-		log.Printf("%s: statfs: %s", v, err)
+		v.logger.WithError(err).Error("statfs failed")
 		return nil
 	}
 	// These calculations match the way df calculates disk usage:
@@ -380,8 +378,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 		blockdirpath := filepath.Join(v.Root, names[0])
 		blockdir, err := v.os.Open(blockdirpath)
 		if err != nil {
-			log.Print("Error reading ", blockdirpath, ": ", err)
-			lastErr = err
+			v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+			lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
 			continue
 		}
 		v.os.stats.TickOps("readdir")
@@ -391,8 +389,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 			if err == io.EOF {
 				break
 			} else if err != nil {
-				log.Print("Error reading ", blockdirpath, ": ", err)
-				lastErr = err
+				v.logger.WithError(err).Errorf("error reading %q", blockdirpath)
+				lastErr = fmt.Errorf("error reading %q: %s", blockdirpath, err)
 				break
 			}
 			name := fileInfo[0].Name()
@@ -408,9 +406,8 @@ func (v *UnixVolume) IndexTo(prefix string, w io.Writer) error {
 				" ", fileInfo[0].ModTime().UnixNano(),
 				"\n")
 			if err != nil {
-				log.Print("Error writing : ", err)
-				lastErr = err
-				break
+				blockdir.Close()
+				return fmt.Errorf("error writing: %s", err)
 			}
 		}
 		blockdir.Close()
@@ -534,7 +531,7 @@ func (v *UnixVolume) IsFull() (isFull bool) {
 	if avail, err := v.FreeDiskSpace(); err == nil {
 		isFull = avail < MinFreeKilobytes
 	} else {
-		log.Printf("%s: FreeDiskSpace: %s", v, err)
+		v.logger.WithError(err).Errorf("%s: FreeDiskSpace failed", v)
 		isFull = false
 	}
 
@@ -584,7 +581,7 @@ func (v *UnixVolume) lock(ctx context.Context) error {
 	}()
 	select {
 	case <-ctx.Done():
-		log.Printf("%s: client hung up while waiting for Serialize lock (%s)", v, time.Since(t0))
+		v.logger.Infof("client hung up while waiting for Serialize lock (%s)", time.Since(t0))
 		go func() {
 			<-locked
 			v.locker.Unlock()
@@ -653,7 +650,7 @@ func (v *UnixVolume) EmptyTrash() {
 		}
 		deadline, err := strconv.ParseInt(matches[2], 10, 64)
 		if err != nil {
-			log.Printf("EmptyTrash: %v: ParseInt(%v): %v", path, matches[2], err)
+			v.logger.WithError(err).Errorf("EmptyTrash: %v: ParseInt(%q) failed", path, matches[2])
 			return
 		}
 		atomic.AddInt64(&bytesInTrash, info.Size())
@@ -663,7 +660,7 @@ func (v *UnixVolume) EmptyTrash() {
 		}
 		err = v.os.Remove(path)
 		if err != nil {
-			log.Printf("EmptyTrash: Remove %v: %v", path, err)
+			v.logger.WithError(err).Errorf("EmptyTrash: Remove(%q) failed", path)
 			return
 		}
 		atomic.AddInt64(&bytesDeleted, info.Size())
@@ -688,7 +685,7 @@ func (v *UnixVolume) EmptyTrash() {
 
 	err := filepath.Walk(v.Root, func(path string, info os.FileInfo, err error) error {
 		if err != nil {
-			log.Printf("EmptyTrash: filepath.Walk: %v: %v", path, err)
+			v.logger.WithError(err).Errorf("EmptyTrash: filepath.Walk(%q) failed", path)
 			return nil
 		}
 		todo <- dirent{path, info}
@@ -698,10 +695,10 @@ func (v *UnixVolume) EmptyTrash() {
 	wg.Wait()
 
 	if err != nil {
-		log.Printf("EmptyTrash error for %v: %v", v.String(), err)
+		v.logger.WithError(err).Error("EmptyTrash failed")
 	}
 
-	log.Printf("EmptyTrash stats for %v: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", v.String(), bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
+	v.logger.Infof("EmptyTrash stats: Deleted %v bytes in %v blocks. Remaining in trash: %v bytes in %v blocks.", bytesDeleted, blocksDeleted, bytesInTrash-bytesDeleted, blocksInTrash-blocksDeleted)
 }
 
 type unixStats struct {
diff --git a/services/keepstore/unix_volume_test.go b/services/keepstore/unix_volume_test.go
index 1ffc46513..664956f7b 100644
--- a/services/keepstore/unix_volume_test.go
+++ b/services/keepstore/unix_volume_test.go
@@ -19,6 +19,7 @@ import (
 	"time"
 
 	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/ctxlog"
 	"github.com/prometheus/client_golang/prometheus"
 	"github.com/sirupsen/logrus"
 	check "gopkg.in/check.v1"
@@ -90,6 +91,7 @@ func (s *UnixVolumeSuite) newTestableUnixVolume(c *check.C, cluster *arvados.Clu
 			Root:    d,
 			locker:  locker,
 			cluster: cluster,
+			logger:  ctxlog.TestLogger(c),
 			volume:  volume,
 			metrics: metrics,
 		},

commit 55f206c71dd495293c896527a6d8f041db8b9b99
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Oct 29 11:02:49 2019 -0400

    15521: Include request ID etc. in PutBlock log messages.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 86de8fa19..30ea695f0 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -733,6 +733,8 @@ func GetBlock(ctx context.Context, volmgr *RRVolumeManager, hash string, buf []b
 //          provide as much detail as possible.
 //
 func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash string) (int, error) {
+	log := ctxlog.FromContext(ctx)
+
 	// Check that BLOCK's checksum matches HASH.
 	blockhash := fmt.Sprintf("%x", md5.Sum(block))
 	if blockhash != hash {
@@ -762,7 +764,7 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 
 	writables := volmgr.AllWritable()
 	if len(writables) == 0 {
-		log.Print("No writable volumes.")
+		log.Error("no writable volumes")
 		return 0, FullError
 	}
 
@@ -780,12 +782,12 @@ func PutBlock(ctx context.Context, volmgr *RRVolumeManager, block []byte, hash s
 			// write did not succeed.  Report the
 			// error and continue trying.
 			allFull = false
-			log.Printf("%s: Write(%s): %s", vol, hash, err)
+			log.Errorf("%s: Write(%s): %s", vol, hash, err)
 		}
 	}
 
 	if allFull {
-		log.Print("All volumes are full.")
+		log.Error("all volumes are full")
 		return 0, FullError
 	}
 	// Already logged the non-full errors.

commit 142b8f261e7006faf1f1ad846d3355b278f5d921
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Tue Oct 29 10:18:40 2019 -0400

    15521: Fix truncated index response.
    
    The client (keep-balance) looks for a blank line at the end of the
    index response as an assurance the response is not truncated.  If
    headers have already been sent, http.Error("") writes a blank line --
    so we were inadvertently assuring the client that truncated responses
    were not truncated.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/services/keepstore/handlers.go b/services/keepstore/handlers.go
index 86504422d..86de8fa19 100644
--- a/services/keepstore/handlers.go
+++ b/services/keepstore/handlers.go
@@ -282,19 +282,15 @@ func (rtr *router) handleIndex(resp http.ResponseWriter, req *http.Request) {
 
 	for _, v := range vols {
 		if err := v.IndexTo(prefix, resp); err != nil {
-			// We can't send an error message to the
-			// client because we might have already sent
-			// headers and index content. All we can do is
-			// log the error in our own logs, and (in
-			// cases where headers haven't been sent yet)
-			// set a 500 status.
+			// We can't send an error status/message to
+			// the client because IndexTo() might have
+			// already written body content. All we can do
+			// is log the error in our own logs.
 			//
-			// If headers have already been sent, the
-			// client must notice the lack of trailing
+			// The client must notice the lack of trailing
 			// newline as an indication that the response
 			// is incomplete.
-			log.Printf("index error from volume %s: %s", v, err)
-			http.Error(resp, "", http.StatusInternalServerError)
+			ctxlog.FromContext(req.Context()).WithError(err).Errorf("truncating index response after error from volume %s", v)
 			return
 		}
 	}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list