[ARVADOS] updated: 96c3fcd2d013af7747f20fea55f460ca2d2dd637
git at public.curoverse.com
git at public.curoverse.com
Wed Sep 23 17:46:27 EDT 2015
Summary of changes:
services/keepstore/azure_blob_volume.go | 81 ++++++++++++++++++++++++-
services/keepstore/keepstore.go | 101 ++------------------------------
services/keepstore/keepstore_test.go | 8 +--
services/keepstore/volume_unix.go | 94 +++++++++++++++++++++++++++++
4 files changed, 182 insertions(+), 102 deletions(-)
via 96c3fcd2d013af7747f20fea55f460ca2d2dd637 (commit)
via 109cd685ecbfb5b685347731340c6dd69e630617 (commit)
from 8626abb0a44cfc303bef3552a7bc57163c79231a (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 96c3fcd2d013af7747f20fea55f460ca2d2dd637
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Sep 23 17:46:21 2015 -0400
7241: Add Put() and Check()
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index 1618593..8430d8b 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -41,7 +41,11 @@ func (s *azureVolumeAdder) Set(containerName string) error {
if flagSerializeIO {
log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
}
- *s.volumeSet = append(*s.volumeSet, NewAzureBlobVolume(azClient, containerName, flagReadonly))
+ v := NewAzureBlobVolume(azClient, containerName, flagReadonly)
+ if err := v.Check(); err != nil {
+ return err
+ }
+ *s.volumeSet = append(*s.volumeSet, v)
return nil
}
@@ -79,11 +83,24 @@ func NewAzureBlobVolume(client storage.Client, containerName string, readonly bo
}
}
+// Check returns nil if the volume is usable.
+func (v *AzureBlobVolume) Check() error {
+ ok, err := v.bsClient.ContainerExists(v.containerName)
+ if err != nil {
+ return err
+ }
+ if !ok {
+ return errors.New("container does not exist")
+ }
+ return nil
+}
+
func (v *AzureBlobVolume) Get(loc string) ([]byte, error) {
rdr, err := v.bsClient.GetBlob(v.containerName, loc)
if err != nil {
return nil, err
}
+ defer rdr.Close()
buf := bufs.Get(BlockSize)
n, err := io.ReadFull(rdr, buf)
switch err {
@@ -100,7 +117,14 @@ func (v *AzureBlobVolume) Compare(loc string, data []byte) error {
}
func (v *AzureBlobVolume) Put(loc string, block []byte) error {
- return NotFoundError
+ if err := v.bsClient.CreateBlockBlob(v.containerName, loc); err != nil {
+ return err
+ }
+ // We use the same block ID, base64("0")=="MA==", for everything.
+ if err := v.bsClient.PutBlock(v.containerName, loc, "MA==", block); err != nil {
+ return err
+ }
+ return v.bsClient.PutBlockList(v.containerName, loc, []storage.Block{{"MA==", storage.BlockStatusUncommitted}})
}
func (v *AzureBlobVolume) Touch(loc string) error {
commit 109cd685ecbfb5b685347731340c6dd69e630617
Author: Tom Clegg <tom at curoverse.com>
Date: Wed Sep 23 16:44:26 2015 -0400
7241: Accept command line flags for Azure blob volumes.
diff --git a/services/keepstore/azure_blob_volume.go b/services/keepstore/azure_blob_volume.go
index b1d528d..1618593 100644
--- a/services/keepstore/azure_blob_volume.go
+++ b/services/keepstore/azure_blob_volume.go
@@ -1,13 +1,66 @@
package main
import (
+ "errors"
+ "flag"
"fmt"
"io"
+ "io/ioutil"
+ "log"
+ "strings"
"time"
"github.com/Azure/azure-sdk-for-go/storage"
)
+var (
+ azureStorageAccountName string
+ azureStorageAccountKeyFile string
+)
+
+type azureVolumeAdder struct {
+ *volumeSet
+}
+
+func (s *azureVolumeAdder) Set(containerName string) error {
+ if containerName == "" {
+ return errors.New("no container name given")
+ }
+ buf, err := ioutil.ReadFile(azureStorageAccountKeyFile)
+ if err != nil {
+ return errors.New("reading key from " + azureStorageAccountKeyFile + ": " + err.Error())
+ }
+ accountKey := strings.TrimSpace(string(buf))
+ if accountKey == "" {
+ return errors.New("empty account key in " + azureStorageAccountKeyFile)
+ }
+ azClient, err := storage.NewBasicClient(azureStorageAccountName, accountKey)
+ if err != nil {
+ return errors.New("creating Azure storage client: " + err.Error())
+ }
+ if flagSerializeIO {
+ log.Print("Notice: -serialize is not supported by azure-blob-container volumes.")
+ }
+ *s.volumeSet = append(*s.volumeSet, NewAzureBlobVolume(azClient, containerName, flagReadonly))
+ return nil
+}
+
+func init() {
+ flag.Var(&azureVolumeAdder{&volumes},
+ "azure-storage-container-volume",
+ "Use the given container as a storage volume. Can be given multiple times.")
+ flag.StringVar(
+ &azureStorageAccountName,
+ "azure-storage-account-name",
+ "",
+ "Azure storage account name used for subsequent --azure-storage-container-volume arguments.")
+ flag.StringVar(
+ &azureStorageAccountKeyFile,
+ "azure-storage-account-key-file",
+ "",
+ "File containing the account key used for subsequent --azure-storage-container-volume arguments.")
+}
+
// An AzureBlobVolume stores and retrieves blocks in an Azure Blob
// container.
type AzureBlobVolume struct {
@@ -75,7 +128,7 @@ func (v *AzureBlobVolume) Status() *VolumeStatus {
}
func (v *AzureBlobVolume) String() string {
- return fmt.Sprintf("%+v", v.azClient)
+ return fmt.Sprintf("azure-storage-container:%+q", v.containerName)
}
func (v *AzureBlobVolume) Writable() bool {
diff --git a/services/keepstore/keepstore.go b/services/keepstore/keepstore.go
index ec11af5..c7aadac 100644
--- a/services/keepstore/keepstore.go
+++ b/services/keepstore/keepstore.go
@@ -1,9 +1,7 @@
package main
import (
- "bufio"
"bytes"
- "errors"
"flag"
"fmt"
"git.curoverse.com/arvados.git/sdk/go/keepclient"
@@ -14,7 +12,6 @@ import (
"os"
"os/signal"
"strings"
- "sync"
"syscall"
"time"
)
@@ -114,95 +111,16 @@ var KeepVM VolumeManager
var pullq *WorkQueue
var trashq *WorkQueue
+type volumeSet []Volume
+
var (
flagSerializeIO bool
flagReadonly bool
+ volumes volumeSet
)
-type volumeSet []Volume
-
-func (vs *volumeSet) Set(value string) error {
- if dirs := strings.Split(value, ","); len(dirs) > 1 {
- log.Print("DEPRECATED: using comma-separated volume list.")
- for _, dir := range dirs {
- if err := vs.Set(dir); err != nil {
- return err
- }
- }
- return nil
- }
- if len(value) == 0 || value[0] != '/' {
- return errors.New("Invalid volume: must begin with '/'.")
- }
- if _, err := os.Stat(value); err != nil {
- return err
- }
- var locker sync.Locker
- if flagSerializeIO {
- locker = &sync.Mutex{}
- }
- *vs = append(*vs, &UnixVolume{
- root: value,
- locker: locker,
- readonly: flagReadonly,
- })
- return nil
-}
-
func (vs *volumeSet) String() string {
- s := "["
- for i, v := range *vs {
- if i > 0 {
- s = s + " "
- }
- s = s + v.String()
- }
- return s + "]"
-}
-
-// Discover adds a volume for every directory named "keep" that is
-// located at the top level of a device- or tmpfs-backed mount point
-// other than "/". It returns the number of volumes added.
-func (vs *volumeSet) Discover() int {
- added := 0
- f, err := os.Open(ProcMounts)
- if err != nil {
- log.Fatalf("opening %s: %s", ProcMounts, err)
- }
- scanner := bufio.NewScanner(f)
- for scanner.Scan() {
- args := strings.Fields(scanner.Text())
- if err := scanner.Err(); err != nil {
- log.Fatalf("reading %s: %s", ProcMounts, err)
- }
- dev, mount := args[0], args[1]
- if mount == "/" {
- continue
- }
- if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
- continue
- }
- keepdir := mount + "/keep"
- if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
- continue
- }
- // Set the -readonly flag (but only for this volume)
- // if the filesystem is mounted readonly.
- flagReadonlyWas := flagReadonly
- for _, fsopt := range strings.Split(args[3], ",") {
- if fsopt == "ro" {
- flagReadonly = true
- break
- }
- if fsopt == "rw" {
- break
- }
- }
- vs.Set(keepdir)
- flagReadonly = flagReadonlyWas
- added++
- }
- return added
+ return fmt.Sprintf("%+v", (*vs)[:])
}
// TODO(twp): continue moving as much code as possible out of main
@@ -219,7 +137,6 @@ func main() {
listen string
blobSigningKeyFile string
permissionTTLSec int
- volumes volumeSet
pidfile string
)
flag.StringVar(
@@ -276,14 +193,6 @@ func main() {
"readonly",
false,
"Do not write, delete, or touch anything on the following volumes.")
- flag.Var(
- &volumes,
- "volumes",
- "Deprecated synonym for -volume.")
- flag.Var(
- &volumes,
- "volume",
- "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
flag.StringVar(
&pidfile,
"pid",
@@ -328,7 +237,7 @@ func main() {
}
if len(volumes) == 0 {
- if volumes.Discover() == 0 {
+ if (&unixVolumeAdder{&volumes}).Discover() == 0 {
log.Fatal("No volumes found.")
}
}
diff --git a/services/keepstore/keepstore_test.go b/services/keepstore/keepstore_test.go
index 3807317..daa9199 100644
--- a/services/keepstore/keepstore_test.go
+++ b/services/keepstore/keepstore_test.go
@@ -329,8 +329,8 @@ func TestDiscoverTmpfs(t *testing.T) {
f.Close()
ProcMounts = f.Name()
- var resultVols volumeSet
- added := resultVols.Discover()
+ resultVols := volumeSet{}
+ added := (&unixVolumeAdder{&resultVols}).Discover()
if added != len(resultVols) {
t.Errorf("Discover returned %d, but added %d volumes",
@@ -369,8 +369,8 @@ func TestDiscoverNone(t *testing.T) {
f.Close()
ProcMounts = f.Name()
- var resultVols volumeSet
- added := resultVols.Discover()
+ resultVols := volumeSet{}
+ added := (&unixVolumeAdder{&resultVols}).Discover()
if added != 0 || len(resultVols) != 0 {
t.Fatalf("got %d, %v; expected 0, []", added, resultVols)
}
diff --git a/services/keepstore/volume_unix.go b/services/keepstore/volume_unix.go
index 6c0f5c4..5d09e84 100644
--- a/services/keepstore/volume_unix.go
+++ b/services/keepstore/volume_unix.go
@@ -1,7 +1,10 @@
package main
import (
+ "bufio"
"bytes"
+ "errors"
+ "flag"
"fmt"
"io"
"io/ioutil"
@@ -16,6 +19,97 @@ import (
"time"
)
+type unixVolumeAdder struct {
+ *volumeSet
+}
+
+func (vs *unixVolumeAdder) Set(value string) error {
+ if dirs := strings.Split(value, ","); len(dirs) > 1 {
+ log.Print("DEPRECATED: using comma-separated volume list.")
+ for _, dir := range dirs {
+ if err := vs.Set(dir); err != nil {
+ return err
+ }
+ }
+ return nil
+ }
+ if len(value) == 0 || value[0] != '/' {
+ return errors.New("Invalid volume: must begin with '/'.")
+ }
+ if _, err := os.Stat(value); err != nil {
+ return err
+ }
+ var locker sync.Locker
+ if flagSerializeIO {
+ locker = &sync.Mutex{}
+ }
+ *vs.volumeSet = append(*vs.volumeSet, &UnixVolume{
+ root: value,
+ locker: locker,
+ readonly: flagReadonly,
+ })
+ return nil
+}
+
+func init() {
+ flag.Var(
+ &unixVolumeAdder{&volumes},
+ "volumes",
+ "Deprecated synonym for -volume.")
+ flag.Var(
+ &unixVolumeAdder{&volumes},
+ "volume",
+ "Local storage directory. Can be given more than once to add multiple directories. If none are supplied, the default is to use all directories named \"keep\" that exist in the top level directory of a mount point at startup time. Can be a comma-separated list, but this is deprecated: use multiple -volume arguments instead.")
+}
+
+// Discover adds a UnixVolume for every directory named "keep" that is
+// located at the top level of a device- or tmpfs-backed mount point
+// other than "/". It returns the number of volumes added.
+func (vs *unixVolumeAdder) Discover() int {
+ added := 0
+ f, err := os.Open(ProcMounts)
+ if err != nil {
+ log.Fatalf("opening %s: %s", ProcMounts, err)
+ }
+ scanner := bufio.NewScanner(f)
+ for scanner.Scan() {
+ args := strings.Fields(scanner.Text())
+ if err := scanner.Err(); err != nil {
+ log.Fatalf("reading %s: %s", ProcMounts, err)
+ }
+ dev, mount := args[0], args[1]
+ if mount == "/" {
+ continue
+ }
+ if dev != "tmpfs" && !strings.HasPrefix(dev, "/dev/") {
+ continue
+ }
+ keepdir := mount + "/keep"
+ if st, err := os.Stat(keepdir); err != nil || !st.IsDir() {
+ continue
+ }
+ // Set the -readonly flag (but only for this volume)
+ // if the filesystem is mounted readonly.
+ flagReadonlyWas := flagReadonly
+ for _, fsopt := range strings.Split(args[3], ",") {
+ if fsopt == "ro" {
+ flagReadonly = true
+ break
+ }
+ if fsopt == "rw" {
+ break
+ }
+ }
+ if err := vs.Set(keepdir); err != nil {
+ log.Printf("adding %q: %s", keepdir, err)
+ } else {
+ added++
+ }
+ flagReadonly = flagReadonlyWas
+ }
+ return added
+}
+
// A UnixVolume stores and retrieves blocks in a local directory.
type UnixVolume struct {
// path to the volume's root directory
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list