[ARVADOS] created: b0b276ff6121aace3c52ee855752df6852120343

git at public.curoverse.com git at public.curoverse.com
Wed Jul 15 11:02:09 EDT 2015


        at  b0b276ff6121aace3c52ee855752df6852120343 (commit)


commit b0b276ff6121aace3c52ee855752df6852120343
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Wed Jul 15 11:00:48 2015 -0400

    6221: Successfully writes trash lists.

diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 2078edc..28d558b 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -131,7 +131,7 @@ func singlerun() {
 
 	summary.WritePullLists(arvLogger, pullLists)
 
-	summary.WriteTrashLists(arvLogger, trashLists)
+	keep.SendTrashLists(arvLogger, kc, trashLists)
 
 	// Log that we're finished. We force the recording, since go will
 	// not wait for the write timer before exiting.
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
index c2346cd..112823e 100644
--- a/services/datamanager/keep/keep.go
+++ b/services/datamanager/keep/keep.go
@@ -9,6 +9,7 @@ import (
 	"fmt"
 	"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
 	"git.curoverse.com/arvados.git/sdk/go/blockdigest"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
 	"git.curoverse.com/arvados.git/sdk/go/logger"
 	"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
 	"io"
@@ -22,6 +23,7 @@ import (
 )
 
 type ServerAddress struct {
+	SSL  bool   `json:service_ssl_flag`
 	Host string `json:"service_host"`
 	Port int    `json:"service_port"`
 	Uuid string `json:"uuid"`
@@ -89,7 +91,11 @@ func (s ServerAddress) String() string {
 }
 
 func (s ServerAddress) HostPort() string {
-	return fmt.Sprintf("%s:%d", s.Host, s.Port)
+	if s.SSL {
+		return fmt.Sprintf("https://%s:%d", s.Host, s.Port)
+	} else {
+		return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
+	}
 }
 
 func getDataManagerToken(arvLogger *logger.Logger) string {
@@ -447,3 +453,61 @@ func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
 	}
 
 }
+
+type TrashRequest struct {
+	Locator    string `json:"locator"`
+	BlockMtime int64  `json:"block_mtime"`
+}
+
+type TrashList []TrashRequest
+
+func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList) {
+	count := 0
+	rendezvous := make(chan bool)
+
+	for url, v := range spl {
+		count += 1
+		log.Printf("Sending trash list to %v", url)
+
+		go (func(url string, v TrashList) {
+			defer (func() {
+				rendezvous <- true
+			})()
+
+			pipeReader, pipeWriter := io.Pipe()
+			go (func() {
+				enc := json.NewEncoder(pipeWriter)
+				enc.Encode(v)
+				pipeWriter.Close()
+			})()
+
+			req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader)
+			if err != nil {
+				log.Printf("Error creating trash list request for %v error: %v", url, err.Error())
+				return
+			}
+
+			// Add api token header
+			req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+
+			// Make the request
+			var resp *http.Response
+			if resp, err = kc.Client.Do(req); err != nil {
+				log.Printf("Error sending trash list to %v error: %v", url, err.Error())
+				return
+			}
+
+			if resp.StatusCode != http.StatusOK {
+				log.Printf("Error sending trash list to %v error: %v", url, err.Error())
+			}
+
+			io.Copy(ioutil.Discard, resp.Body)
+			resp.Body.Close()
+		})(url, v)
+
+	}
+
+	for i := 0; i < count; i += 1 {
+		<-rendezvous
+	}
+}
diff --git a/services/datamanager/summary/pull_list.go b/services/datamanager/summary/pull_list.go
index d2eef93..3a246a2 100644
--- a/services/datamanager/summary/pull_list.go
+++ b/services/datamanager/summary/pull_list.go
@@ -126,7 +126,7 @@ func CreatePullServers(cs CanonicalString,
 	for _, host := range sortedServers {
 		// Strip the protocol portion of the url.
 		// Use the canonical copy of the string to avoid memory waste.
-		server := cs.Get(RemoveProtocolPrefix(host))
+		server := cs.Get(host)
 		_, hasBlock := serverHasBlock[server]
 		if hasBlock {
 			// The from field should include the protocol.
@@ -175,7 +175,7 @@ func WritePullLists(arvLogger *logger.Logger,
 	pullLists map[string]PullList) {
 	r := strings.NewReplacer(":", ".")
 	for host, list := range pullLists {
-		filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
+		filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
 		pullListFile, err := os.Create(filename)
 		if err != nil {
 			loggerutil.FatalWithMessage(arvLogger,
diff --git a/services/datamanager/summary/pull_list_test.go b/services/datamanager/summary/pull_list_test.go
index 8f17d28..fb8631b 100644
--- a/services/datamanager/summary/pull_list_test.go
+++ b/services/datamanager/summary/pull_list_test.go
@@ -270,10 +270,3 @@ func (s *MySuite) TestBuildPullLists(c *C) {
 			},
 		})
 }
-
-func (s *MySuite) TestRemoveProtocolPrefix(c *C) {
-	c.Check(RemoveProtocolPrefix("blah"), Equals, "blah")
-	c.Check(RemoveProtocolPrefix("bl/ah"), Equals, "ah")
-	c.Check(RemoveProtocolPrefix("http://blah.com"), Equals, "blah.com")
-	c.Check(RemoveProtocolPrefix("https://blah.com:8900"), Equals, "blah.com:8900")
-}
diff --git a/services/datamanager/summary/trash_list.go b/services/datamanager/summary/trash_list.go
index fcec557..efb40e2 100644
--- a/services/datamanager/summary/trash_list.go
+++ b/services/datamanager/summary/trash_list.go
@@ -14,18 +14,17 @@ import (
 	"time"
 )
 
-type TrashRequest struct {
-	Locator    string `json:"locator"`
-	BlockMtime int64  `json:"block_mtime"`
-}
-
-type TrashList []TrashRequest
-
 func BuildTrashLists(kc *keepclient.KeepClient,
 	keepServerInfo *keep.ReadServers,
-	keepBlocksNotInCollections BlockSet) (m map[string]TrashList) {
+	keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
 
-	m = make(map[string]TrashList)
+	// Servers that are writeable
+	writableServers := map[string]struct{}{}
+	for _, url := range kc.WritableLocalRoots() {
+		writableServers[url] = struct{}{}
+	}
+
+	m = make(map[string]keep.TrashList)
 
 	_ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
 	if err != nil {
@@ -43,7 +42,12 @@ func BuildTrashLists(kc *keepclient.KeepClient,
 			if block_on_server.Mtime < expiry {
 				// block is older than expire cutoff
 				srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
-				m[srv] = append(m[srv], TrashRequest{Locator: block.String(), BlockMtime: block_on_server.Mtime})
+
+				_, writable := writableServers[srv]
+
+				if writable {
+					m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
+				}
 			}
 		}
 	}
@@ -56,10 +60,10 @@ func BuildTrashLists(kc *keepclient.KeepClient,
 // This is just a hack for prototyping, it is not expected to be used
 // in production.
 func WriteTrashLists(arvLogger *logger.Logger,
-	trashLists map[string]TrashList) {
+	trashLists map[string]keep.TrashList) {
 	r := strings.NewReplacer(":", ".")
 	for host, list := range trashLists {
-		filename := fmt.Sprintf("trash_list.%s", r.Replace(host))
+		filename := fmt.Sprintf("trash_list.%s", r.Replace(RemoveProtocolPrefix(host)))
 		trashListFile, err := os.Create(filename)
 		if err != nil {
 			loggerutil.FatalWithMessage(arvLogger,
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index 6257f7b..5e56f03 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -33,11 +33,16 @@ func TrashItem(trashRequest TrashRequest) {
 			blob_signature_ttl)
 		return
 	}
+
 	for _, volume := range KeepVM.AllWritable() {
 		mtime, err := volume.Mtime(trashRequest.Locator)
-		if err != nil || trashRequest.BlockMtime != mtime.Unix() {
+		if err != nil {
+			log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
 			continue
 		}
+		if trashRequest.BlockMtime != mtime.Unix() {
+			log.Printf("%v Delete(%v): mtime does not match", volume, trashRequest.Locator)
+		}
 
 		if never_delete {
 			err = errors.New("did not delete block because never_delete is true")

commit 2578d032288f24988a54ee57708fe71902e4ef92
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date:   Tue Jul 14 22:06:18 2015 -0400

    6221: Generate trash list, not sent to keep servers yet.

diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 9161266..2078edc 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -125,9 +125,14 @@ func singlerun() {
 		replicationSummary.UnderReplicatedBlocks)
 
 	pullLists := summary.BuildPullLists(pullServers)
+	trashLists := summary.BuildTrashLists(kc,
+		&keepServerInfo,
+		replicationSummary.KeepBlocksNotInCollections)
 
 	summary.WritePullLists(arvLogger, pullLists)
 
+	summary.WriteTrashLists(arvLogger, trashLists)
+
 	// Log that we're finished. We force the recording, since go will
 	// not wait for the write timer before exiting.
 	if arvLogger != nil {
diff --git a/services/datamanager/summary/trash_list.go b/services/datamanager/summary/trash_list.go
new file mode 100644
index 0000000..fcec557
--- /dev/null
+++ b/services/datamanager/summary/trash_list.go
@@ -0,0 +1,78 @@
+// Code for generating trash lists
+package summary
+
+import (
+	"encoding/json"
+	"fmt"
+	"git.curoverse.com/arvados.git/sdk/go/keepclient"
+	"git.curoverse.com/arvados.git/sdk/go/logger"
+	"git.curoverse.com/arvados.git/services/datamanager/keep"
+	"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+	"log"
+	"os"
+	"strings"
+	"time"
+)
+
+type TrashRequest struct {
+	Locator    string `json:"locator"`
+	BlockMtime int64  `json:"block_mtime"`
+}
+
+type TrashList []TrashRequest
+
+func BuildTrashLists(kc *keepclient.KeepClient,
+	keepServerInfo *keep.ReadServers,
+	keepBlocksNotInCollections BlockSet) (m map[string]TrashList) {
+
+	m = make(map[string]TrashList)
+
+	_ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
+	if err != nil {
+		log.Printf("Failed to get blobSignatureTtl: %v", err)
+		return
+	}
+
+	ttl := int64(_ttl.(float64))
+
+	// expire unreferenced blocks more than "ttl" seconds old.
+	expiry := time.Now().UTC().Unix() - ttl
+
+	for block, _ := range keepBlocksNotInCollections {
+		for _, block_on_server := range keepServerInfo.BlockToServers[block] {
+			if block_on_server.Mtime < expiry {
+				// block is older than expire cutoff
+				srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
+				m[srv] = append(m[srv], TrashRequest{Locator: block.String(), BlockMtime: block_on_server.Mtime})
+			}
+		}
+	}
+	return
+}
+
+// Writes each pull list to a file.
+// The filename is based on the hostname.
+//
+// This is just a hack for prototyping, it is not expected to be used
+// in production.
+func WriteTrashLists(arvLogger *logger.Logger,
+	trashLists map[string]TrashList) {
+	r := strings.NewReplacer(":", ".")
+	for host, list := range trashLists {
+		filename := fmt.Sprintf("trash_list.%s", r.Replace(host))
+		trashListFile, err := os.Create(filename)
+		if err != nil {
+			loggerutil.FatalWithMessage(arvLogger,
+				fmt.Sprintf("Failed to open %s: %v", filename, err))
+		}
+		defer trashListFile.Close()
+
+		enc := json.NewEncoder(trashListFile)
+		err = enc.Encode(list)
+		if err != nil {
+			loggerutil.FatalWithMessage(arvLogger,
+				fmt.Sprintf("Failed to write trash list to %s: %v", filename, err))
+		}
+		log.Printf("Wrote trash list to %s.", filename)
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list