[ARVADOS] created: b0b276ff6121aace3c52ee855752df6852120343
git at public.curoverse.com
git at public.curoverse.com
Wed Jul 15 11:02:09 EDT 2015
at b0b276ff6121aace3c52ee855752df6852120343 (commit)
commit b0b276ff6121aace3c52ee855752df6852120343
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Wed Jul 15 11:00:48 2015 -0400
6221: Successfully writes trash lists.
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 2078edc..28d558b 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -131,7 +131,7 @@ func singlerun() {
summary.WritePullLists(arvLogger, pullLists)
- summary.WriteTrashLists(arvLogger, trashLists)
+ keep.SendTrashLists(arvLogger, kc, trashLists)
// Log that we're finished. We force the recording, since go will
// not wait for the write timer before exiting.
diff --git a/services/datamanager/keep/keep.go b/services/datamanager/keep/keep.go
index c2346cd..112823e 100644
--- a/services/datamanager/keep/keep.go
+++ b/services/datamanager/keep/keep.go
@@ -9,6 +9,7 @@ import (
"fmt"
"git.curoverse.com/arvados.git/sdk/go/arvadosclient"
"git.curoverse.com/arvados.git/sdk/go/blockdigest"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
"git.curoverse.com/arvados.git/sdk/go/logger"
"git.curoverse.com/arvados.git/services/datamanager/loggerutil"
"io"
@@ -22,6 +23,7 @@ import (
)
type ServerAddress struct {
+ SSL bool `json:service_ssl_flag`
Host string `json:"service_host"`
Port int `json:"service_port"`
Uuid string `json:"uuid"`
@@ -89,7 +91,11 @@ func (s ServerAddress) String() string {
}
func (s ServerAddress) HostPort() string {
- return fmt.Sprintf("%s:%d", s.Host, s.Port)
+ if s.SSL {
+ return fmt.Sprintf("https://%s:%d", s.Host, s.Port)
+ } else {
+ return fmt.Sprintf("http://%s:%d", s.Host, s.Port)
+ }
}
func getDataManagerToken(arvLogger *logger.Logger) string {
@@ -447,3 +453,61 @@ func (readServers *ReadServers) Summarize(arvLogger *logger.Logger) {
}
}
+
+type TrashRequest struct {
+ Locator string `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
+}
+
+type TrashList []TrashRequest
+
+func SendTrashLists(arvLogger *logger.Logger, kc *keepclient.KeepClient, spl map[string]TrashList) {
+ count := 0
+ rendezvous := make(chan bool)
+
+ for url, v := range spl {
+ count += 1
+ log.Printf("Sending trash list to %v", url)
+
+ go (func(url string, v TrashList) {
+ defer (func() {
+ rendezvous <- true
+ })()
+
+ pipeReader, pipeWriter := io.Pipe()
+ go (func() {
+ enc := json.NewEncoder(pipeWriter)
+ enc.Encode(v)
+ pipeWriter.Close()
+ })()
+
+ req, err := http.NewRequest("PUT", fmt.Sprintf("%s/trash", url), pipeReader)
+ if err != nil {
+ log.Printf("Error creating trash list request for %v error: %v", url, err.Error())
+ return
+ }
+
+ // Add api token header
+ req.Header.Add("Authorization", fmt.Sprintf("OAuth2 %s", getDataManagerToken(arvLogger)))
+
+ // Make the request
+ var resp *http.Response
+ if resp, err = kc.Client.Do(req); err != nil {
+ log.Printf("Error sending trash list to %v error: %v", url, err.Error())
+ return
+ }
+
+ if resp.StatusCode != http.StatusOK {
+ log.Printf("Error sending trash list to %v error: %v", url, err.Error())
+ }
+
+ io.Copy(ioutil.Discard, resp.Body)
+ resp.Body.Close()
+ })(url, v)
+
+ }
+
+ for i := 0; i < count; i += 1 {
+ <-rendezvous
+ }
+}
diff --git a/services/datamanager/summary/pull_list.go b/services/datamanager/summary/pull_list.go
index d2eef93..3a246a2 100644
--- a/services/datamanager/summary/pull_list.go
+++ b/services/datamanager/summary/pull_list.go
@@ -126,7 +126,7 @@ func CreatePullServers(cs CanonicalString,
for _, host := range sortedServers {
// Strip the protocol portion of the url.
// Use the canonical copy of the string to avoid memory waste.
- server := cs.Get(RemoveProtocolPrefix(host))
+ server := cs.Get(host)
_, hasBlock := serverHasBlock[server]
if hasBlock {
// The from field should include the protocol.
@@ -175,7 +175,7 @@ func WritePullLists(arvLogger *logger.Logger,
pullLists map[string]PullList) {
r := strings.NewReplacer(":", ".")
for host, list := range pullLists {
- filename := fmt.Sprintf("pull_list.%s", r.Replace(host))
+ filename := fmt.Sprintf("pull_list.%s", r.Replace(RemoveProtocolPrefix(host)))
pullListFile, err := os.Create(filename)
if err != nil {
loggerutil.FatalWithMessage(arvLogger,
diff --git a/services/datamanager/summary/pull_list_test.go b/services/datamanager/summary/pull_list_test.go
index 8f17d28..fb8631b 100644
--- a/services/datamanager/summary/pull_list_test.go
+++ b/services/datamanager/summary/pull_list_test.go
@@ -270,10 +270,3 @@ func (s *MySuite) TestBuildPullLists(c *C) {
},
})
}
-
-func (s *MySuite) TestRemoveProtocolPrefix(c *C) {
- c.Check(RemoveProtocolPrefix("blah"), Equals, "blah")
- c.Check(RemoveProtocolPrefix("bl/ah"), Equals, "ah")
- c.Check(RemoveProtocolPrefix("http://blah.com"), Equals, "blah.com")
- c.Check(RemoveProtocolPrefix("https://blah.com:8900"), Equals, "blah.com:8900")
-}
diff --git a/services/datamanager/summary/trash_list.go b/services/datamanager/summary/trash_list.go
index fcec557..efb40e2 100644
--- a/services/datamanager/summary/trash_list.go
+++ b/services/datamanager/summary/trash_list.go
@@ -14,18 +14,17 @@ import (
"time"
)
-type TrashRequest struct {
- Locator string `json:"locator"`
- BlockMtime int64 `json:"block_mtime"`
-}
-
-type TrashList []TrashRequest
-
func BuildTrashLists(kc *keepclient.KeepClient,
keepServerInfo *keep.ReadServers,
- keepBlocksNotInCollections BlockSet) (m map[string]TrashList) {
+ keepBlocksNotInCollections BlockSet) (m map[string]keep.TrashList) {
- m = make(map[string]TrashList)
+ // Servers that are writeable
+ writableServers := map[string]struct{}{}
+ for _, url := range kc.WritableLocalRoots() {
+ writableServers[url] = struct{}{}
+ }
+
+ m = make(map[string]keep.TrashList)
_ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
if err != nil {
@@ -43,7 +42,12 @@ func BuildTrashLists(kc *keepclient.KeepClient,
if block_on_server.Mtime < expiry {
// block is older than expire cutoff
srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
- m[srv] = append(m[srv], TrashRequest{Locator: block.String(), BlockMtime: block_on_server.Mtime})
+
+ _, writable := writableServers[srv]
+
+ if writable {
+ m[srv] = append(m[srv], keep.TrashRequest{Locator: block.Digest.String(), BlockMtime: block_on_server.Mtime})
+ }
}
}
}
@@ -56,10 +60,10 @@ func BuildTrashLists(kc *keepclient.KeepClient,
// This is just a hack for prototyping, it is not expected to be used
// in production.
func WriteTrashLists(arvLogger *logger.Logger,
- trashLists map[string]TrashList) {
+ trashLists map[string]keep.TrashList) {
r := strings.NewReplacer(":", ".")
for host, list := range trashLists {
- filename := fmt.Sprintf("trash_list.%s", r.Replace(host))
+ filename := fmt.Sprintf("trash_list.%s", r.Replace(RemoveProtocolPrefix(host)))
trashListFile, err := os.Create(filename)
if err != nil {
loggerutil.FatalWithMessage(arvLogger,
diff --git a/services/keepstore/trash_worker.go b/services/keepstore/trash_worker.go
index 6257f7b..5e56f03 100644
--- a/services/keepstore/trash_worker.go
+++ b/services/keepstore/trash_worker.go
@@ -33,11 +33,16 @@ func TrashItem(trashRequest TrashRequest) {
blob_signature_ttl)
return
}
+
for _, volume := range KeepVM.AllWritable() {
mtime, err := volume.Mtime(trashRequest.Locator)
- if err != nil || trashRequest.BlockMtime != mtime.Unix() {
+ if err != nil {
+ log.Printf("%v Delete(%v): %v", volume, trashRequest.Locator, err)
continue
}
+ if trashRequest.BlockMtime != mtime.Unix() {
+ log.Printf("%v Delete(%v): mtime does not match", volume, trashRequest.Locator)
+ }
if never_delete {
err = errors.New("did not delete block because never_delete is true")
commit 2578d032288f24988a54ee57708fe71902e4ef92
Author: Peter Amstutz <peter.amstutz at curoverse.com>
Date: Tue Jul 14 22:06:18 2015 -0400
6221: Generate trash list, not sent to keep servers yet.
diff --git a/services/datamanager/datamanager.go b/services/datamanager/datamanager.go
index 9161266..2078edc 100644
--- a/services/datamanager/datamanager.go
+++ b/services/datamanager/datamanager.go
@@ -125,9 +125,14 @@ func singlerun() {
replicationSummary.UnderReplicatedBlocks)
pullLists := summary.BuildPullLists(pullServers)
+ trashLists := summary.BuildTrashLists(kc,
+ &keepServerInfo,
+ replicationSummary.KeepBlocksNotInCollections)
summary.WritePullLists(arvLogger, pullLists)
+ summary.WriteTrashLists(arvLogger, trashLists)
+
// Log that we're finished. We force the recording, since go will
// not wait for the write timer before exiting.
if arvLogger != nil {
diff --git a/services/datamanager/summary/trash_list.go b/services/datamanager/summary/trash_list.go
new file mode 100644
index 0000000..fcec557
--- /dev/null
+++ b/services/datamanager/summary/trash_list.go
@@ -0,0 +1,78 @@
+// Code for generating trash lists
+package summary
+
+import (
+ "encoding/json"
+ "fmt"
+ "git.curoverse.com/arvados.git/sdk/go/keepclient"
+ "git.curoverse.com/arvados.git/sdk/go/logger"
+ "git.curoverse.com/arvados.git/services/datamanager/keep"
+ "git.curoverse.com/arvados.git/services/datamanager/loggerutil"
+ "log"
+ "os"
+ "strings"
+ "time"
+)
+
+type TrashRequest struct {
+ Locator string `json:"locator"`
+ BlockMtime int64 `json:"block_mtime"`
+}
+
+type TrashList []TrashRequest
+
+func BuildTrashLists(kc *keepclient.KeepClient,
+ keepServerInfo *keep.ReadServers,
+ keepBlocksNotInCollections BlockSet) (m map[string]TrashList) {
+
+ m = make(map[string]TrashList)
+
+ _ttl, err := kc.Arvados.Discovery("blobSignatureTtl")
+ if err != nil {
+ log.Printf("Failed to get blobSignatureTtl: %v", err)
+ return
+ }
+
+ ttl := int64(_ttl.(float64))
+
+ // expire unreferenced blocks more than "ttl" seconds old.
+ expiry := time.Now().UTC().Unix() - ttl
+
+ for block, _ := range keepBlocksNotInCollections {
+ for _, block_on_server := range keepServerInfo.BlockToServers[block] {
+ if block_on_server.Mtime < expiry {
+ // block is older than expire cutoff
+ srv := keepServerInfo.KeepServerIndexToAddress[block_on_server.ServerIndex].String()
+ m[srv] = append(m[srv], TrashRequest{Locator: block.String(), BlockMtime: block_on_server.Mtime})
+ }
+ }
+ }
+ return
+}
+
+// Writes each pull list to a file.
+// The filename is based on the hostname.
+//
+// This is just a hack for prototyping, it is not expected to be used
+// in production.
+func WriteTrashLists(arvLogger *logger.Logger,
+ trashLists map[string]TrashList) {
+ r := strings.NewReplacer(":", ".")
+ for host, list := range trashLists {
+ filename := fmt.Sprintf("trash_list.%s", r.Replace(host))
+ trashListFile, err := os.Create(filename)
+ if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Failed to open %s: %v", filename, err))
+ }
+ defer trashListFile.Close()
+
+ enc := json.NewEncoder(trashListFile)
+ err = enc.Encode(list)
+ if err != nil {
+ loggerutil.FatalWithMessage(arvLogger,
+ fmt.Sprintf("Failed to write trash list to %s: %v", filename, err))
+ }
+ log.Printf("Wrote trash list to %s.", filename)
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list