[ARVADOS] updated: 1.3.0-164-g855a0afbc
Git user
git at public.curoverse.com
Thu Jan 10 13:37:28 EST 2019
Summary of changes:
lib/cloud/azure.go | 210 ++++++++++++++++------------
lib/cloud/azure_test.go | 47 +++++--
lib/cloud/interfaces.go | 11 +-
lib/dispatchcloud/dispatcher_test.go | 3 +-
lib/dispatchcloud/instance_set_proxy.go | 10 +-
lib/dispatchcloud/ssh_executor/executor.go | 3 +-
lib/dispatchcloud/test/lame_instance_set.go | 13 +-
lib/dispatchcloud/test/stub_driver.go | 11 +-
lib/dispatchcloud/worker/pool.go | 5 +-
lib/dispatchcloud/worker/worker.go | 3 +-
10 files changed, 179 insertions(+), 137 deletions(-)
via 855a0afbc604487ddaedaed0cc1a4ad6da34b602 (commit)
from 1b2a4c98249e3a46242f7d10c00a70feeeb2a843 (commit)
Those revisions listed above that are new to this repository have
not appeared on any other notification email; so we list those
revisions in full, below.
commit 855a0afbc604487ddaedaed0cc1a4ad6da34b602
Author: Peter Amstutz <pamstutz at veritasgenetics.com>
Date: Thu Jan 10 13:33:50 2019 -0500
14324: Remove context from interface, use Stop() instead
Remove 'Context' passed into methods of the InstanceSet interface,
instead the AzureInstanceSet creates its own cancellable background
context. The Stop() method now cancels the context and waits for any
outstanding cloud driver work to complete before returning.
* Fix test
* Document how to run individual test cases against real cloud
* ManageBlobs runs periodically in the background
* Logging cleaned up
Arvados-DCO-1.1-Signed-off-by: Peter Amstutz <pamstutz at veritasgenetics.com>
diff --git a/lib/cloud/azure.go b/lib/cloud/azure.go
index 2ee668372..d9f1158fc 100644
--- a/lib/cloud/azure.go
+++ b/lib/cloud/azure.go
@@ -31,19 +31,19 @@ import (
)
type AzureInstanceSetConfig struct {
- SubscriptionID string `json:"subscription_id"`
- ClientID string `json:"key"`
- ClientSecret string `json:"secret"`
- TenantID string `json:"tenant_id"`
- CloudEnv string `json:"cloud_environment"`
- ResourceGroup string `json:"resource_group"`
- Location string `json:"region"`
- Network string `json:"network"`
- Subnet string `json:"subnet"`
- StorageAccount string `json:"storage_account"`
- BlobContainer string `json:"blob_container"`
- Image string `json:"image"`
- DeleteDanglingResourcesAfter float64 `json:"delete_dangling_resources_after"`
+ SubscriptionID string `mapstructure:"subscription_id"`
+ ClientID string `mapstructure:"key"`
+ ClientSecret string `mapstructure:"secret"`
+ TenantID string `mapstructure:"tenant_id"`
+ CloudEnv string `mapstructure:"cloud_environment"`
+ ResourceGroup string `mapstructure:"resource_group"`
+ Location string `mapstructure:"region"`
+ Network string `mapstructure:"network"`
+ Subnet string `mapstructure:"subnet"`
+ StorageAccount string `mapstructure:"storage_account"`
+ BlobContainer string `mapstructure:"blob_container"`
+ Image string `mapstructure:"image"`
+ DeleteDanglingResourcesAfter float64 `mapstructure:"delete_dangling_resources_after"`
}
type VirtualMachinesClientWrapper interface {
@@ -195,6 +195,11 @@ type AzureInstanceSet struct {
interfaces map[string]network.Interface
dispatcherID string
namePrefix string
+ ctx context.Context
+ stopFunc context.CancelFunc
+ stopWg sync.WaitGroup
+ deleteNIC chan string
+ deleteBlob chan storage.Blob
}
func NewAzureInstanceSet(config map[string]interface{}, dispatcherID InstanceSetID) (prv InstanceSet, err error) {
@@ -243,15 +248,65 @@ func (az *AzureInstanceSet) setup(azcfg AzureInstanceSetConfig, dispatcherID str
az.dispatcherID = dispatcherID
az.namePrefix = fmt.Sprintf("compute-%s-", az.dispatcherID)
+ az.ctx, az.stopFunc = context.WithCancel(context.Background())
+ go func() {
+ tk := time.NewTicker(5 * time.Minute)
+ for {
+ select {
+ case <-az.ctx.Done():
+ return
+ case <-tk.C:
+ az.ManageBlobs()
+ }
+ }
+ }()
+
+ az.deleteNIC = make(chan string)
+ az.deleteBlob = make(chan storage.Blob)
+
+ for i := 0; i < 4; i += 1 {
+ go func() {
+ for {
+ nicname, ok := <-az.deleteNIC
+ if !ok {
+ return
+ }
+ _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
+ if delerr != nil {
+ log.Printf("Error deleting %v: %v", nicname, delerr)
+ } else {
+ log.Printf("Deleted %v", nicname)
+ }
+ }
+ }()
+ go func() {
+ for {
+ blob, ok := <-az.deleteBlob
+ if !ok {
+ return
+ }
+ err := blob.Delete(nil)
+ if err != nil {
+ log.Printf("error deleting %v: %v", blob.Name, err)
+ } else {
+ log.Printf("Deleted blob %v", blob.Name)
+ }
+ }
+ }()
+ }
+
return nil
}
-func (az *AzureInstanceSet) Create(ctx context.Context,
+func (az *AzureInstanceSet) Create(
instanceType arvados.InstanceType,
imageId ImageID,
newTags InstanceTags,
publicKey ssh.PublicKey) (Instance, error) {
+ az.stopWg.Add(1)
+ defer az.stopWg.Done()
+
if len(newTags["node-token"]) == 0 {
return nil, fmt.Errorf("Must provide tag 'node-token'")
}
@@ -262,7 +317,6 @@ func (az *AzureInstanceSet) Create(ctx context.Context,
}
name = az.namePrefix + name
- log.Printf("name is %v", name)
timestamp := time.Now().Format(time.RFC3339Nano)
@@ -297,21 +351,17 @@ func (az *AzureInstanceSet) Create(ctx context.Context,
},
},
}
- nic, err := az.netClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
+ nic, err := az.netClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name+"-nic", nicParameters)
if err != nil {
return nil, WrapAzureError(err)
}
- log.Printf("Created NIC %v", *nic.ID)
-
instance_vhd := fmt.Sprintf("https://%s.blob.%s/%s/%s-os.vhd",
az.azconfig.StorageAccount,
az.azureEnv.StorageEndpointSuffix,
az.azconfig.BlobContainer,
name)
- log.Printf("URI instance vhd %v", instance_vhd)
-
customData := base64.StdEncoding.EncodeToString([]byte(fmt.Sprintf(`#!/bin/sh
echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
@@ -364,7 +414,7 @@ echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
},
}
- vm, err := az.vmClient.CreateOrUpdate(ctx, az.azconfig.ResourceGroup, name, vmParameters)
+ vm, err := az.vmClient.CreateOrUpdate(az.ctx, az.azconfig.ResourceGroup, name, vmParameters)
if err != nil {
return nil, WrapAzureError(err)
}
@@ -376,13 +426,16 @@ echo '%s-%s' > /home/crunch/node-token`, name, newTags["node-token"])))
}, nil
}
-func (az *AzureInstanceSet) Instances(ctx context.Context, _ InstanceTags) ([]Instance, error) {
- interfaces, err := az.ManageNics(ctx)
+func (az *AzureInstanceSet) Instances(InstanceTags) ([]Instance, error) {
+ az.stopWg.Add(1)
+ defer az.stopWg.Done()
+
+ interfaces, err := az.ManageNics()
if err != nil {
return nil, err
}
- result, err := az.vmClient.ListComplete(ctx, az.azconfig.ResourceGroup)
+ result, err := az.vmClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
if err != nil {
return nil, WrapAzureError(err)
}
@@ -403,8 +456,16 @@ func (az *AzureInstanceSet) Instances(ctx context.Context, _ InstanceTags) ([]In
return instances, nil
}
-func (az *AzureInstanceSet) ManageNics(ctx context.Context) (map[string]network.Interface, error) {
- result, err := az.netClient.ListComplete(ctx, az.azconfig.ResourceGroup)
+// ManageNics returns a list of Azure network interface resources.
+// Also performs garbage collection of NICs which have "namePrefix", are
+// not associated with a virtual machine and have a "create-at" time
+// more than DeleteDanglingResourcesAfter (to prevent racing and
+// deleting newly created NICs) in the past are deleted.
+func (az *AzureInstanceSet) ManageNics() (map[string]network.Interface, error) {
+ az.stopWg.Add(1)
+ defer az.stopWg.Done()
+
+ result, err := az.netClient.ListComplete(az.ctx, az.azconfig.ResourceGroup)
if err != nil {
return nil, WrapAzureError(err)
}
@@ -412,30 +473,6 @@ func (az *AzureInstanceSet) ManageNics(ctx context.Context) (map[string]network.
interfaces := make(map[string]network.Interface)
timestamp := time.Now()
- wg := sync.WaitGroup{}
- deletechannel := make(chan string, 20)
- defer func() {
- wg.Wait()
- close(deletechannel)
- }()
- for i := 0; i < 4; i += 1 {
- go func() {
- for {
- nicname, ok := <-deletechannel
- if !ok {
- return
- }
- _, delerr := az.netClient.Delete(context.Background(), az.azconfig.ResourceGroup, nicname)
- if delerr != nil {
- log.Printf("Error deleting %v: %v", nicname, delerr)
- } else {
- log.Printf("Deleted %v", nicname)
- }
- wg.Done()
- }
- }()
- }
-
for ; result.NotDone(); err = result.Next() {
if err != nil {
log.Printf("Error listing nics: %v", err)
@@ -448,11 +485,9 @@ func (az *AzureInstanceSet) ManageNics(ctx context.Context) (map[string]network.
if result.Value().Tags["created-at"] != nil {
created_at, err := time.Parse(time.RFC3339Nano, *result.Value().Tags["created-at"])
if err == nil {
- //log.Printf("found dangling NIC %v created %v seconds ago", *result.Value().Name, timestamp.Sub(created_at).Seconds())
if timestamp.Sub(created_at).Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
log.Printf("Will delete %v because it is older than %v s", *result.Value().Name, az.azconfig.DeleteDanglingResourcesAfter)
- wg.Add(1)
- deletechannel <- *result.Value().Name
+ az.deleteNIC <- *result.Value().Name
}
}
}
@@ -462,8 +497,16 @@ func (az *AzureInstanceSet) ManageNics(ctx context.Context) (map[string]network.
return interfaces, nil
}
-func (az *AzureInstanceSet) ManageBlobs(ctx context.Context) {
- result, err := az.storageAcctClient.ListKeys(ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
+// ManageBlobs garbage collects blobs (VM disk images) in the
+// configured storage account container. It will delete blobs which
+// have "namePrefix", are "available" (which means they are not
+// leased to a VM) and haven't been modified for
+// DeleteDanglingResourcesAfter seconds.
+func (az *AzureInstanceSet) ManageBlobs() {
+ az.stopWg.Add(1)
+ defer az.stopWg.Done()
+
+ result, err := az.storageAcctClient.ListKeys(az.ctx, az.azconfig.ResourceGroup, az.azconfig.StorageAccount)
if err != nil {
log.Printf("Couldn't get account keys %v", err)
return
@@ -479,32 +522,8 @@ func (az *AzureInstanceSet) ManageBlobs(ctx context.Context) {
blobsvc := client.GetBlobService()
blobcont := blobsvc.GetContainerReference(az.azconfig.BlobContainer)
- timestamp := time.Now()
- wg := sync.WaitGroup{}
- deletechannel := make(chan storage.Blob, 20)
- defer func() {
- wg.Wait()
- close(deletechannel)
- }()
- for i := 0; i < 4; i += 1 {
- go func() {
- for {
- blob, ok := <-deletechannel
- if !ok {
- return
- }
- err := blob.Delete(nil)
- if err != nil {
- log.Printf("error deleting %v: %v", blob.Name, err)
- } else {
- log.Printf("Deleted blob %v", blob.Name)
- }
- wg.Done()
- }
- }()
- }
-
page := storage.ListBlobsParameters{Prefix: az.namePrefix}
+ timestamp := time.Now()
for {
response, err := blobcont.ListBlobs(page)
@@ -520,8 +539,7 @@ func (az *AzureInstanceSet) ManageBlobs(ctx context.Context) {
age.Seconds() > az.azconfig.DeleteDanglingResourcesAfter {
log.Printf("Blob %v is unlocked and not modified for %v seconds, will delete", b.Name, age.Seconds())
- wg.Add(1)
- deletechannel <- b
+ az.deleteBlob <- b
}
}
if response.NextMarker != "" {
@@ -533,6 +551,10 @@ func (az *AzureInstanceSet) ManageBlobs(ctx context.Context) {
}
func (az *AzureInstanceSet) Stop() {
+ az.stopFunc()
+ az.stopWg.Wait()
+ close(az.deleteNIC)
+ close(az.deleteBlob)
}
type AzureInstance struct {
@@ -553,7 +575,10 @@ func (ai *AzureInstance) ProviderType() string {
return string(ai.vm.VirtualMachineProperties.HardwareProfile.VMSize)
}
-func (ai *AzureInstance) SetTags(ctx context.Context, newTags InstanceTags) error {
+func (ai *AzureInstance) SetTags(newTags InstanceTags) error {
+ ai.provider.stopWg.Add(1)
+ defer ai.provider.stopWg.Done()
+
tags := make(map[string]*string)
for k, v := range ai.vm.Tags {
@@ -570,7 +595,7 @@ func (ai *AzureInstance) SetTags(ctx context.Context, newTags InstanceTags) erro
Location: &ai.provider.azconfig.Location,
Tags: tags,
}
- vm, err := ai.provider.vmClient.CreateOrUpdate(ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
+ vm, err := ai.provider.vmClient.CreateOrUpdate(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name, vmParameters)
if err != nil {
return WrapAzureError(err)
}
@@ -591,8 +616,11 @@ func (ai *AzureInstance) Tags() InstanceTags {
return tags
}
-func (ai *AzureInstance) Destroy(ctx context.Context) error {
- _, err := ai.provider.vmClient.Delete(ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
+func (ai *AzureInstance) Destroy() error {
+ ai.provider.stopWg.Add(1)
+ defer ai.provider.stopWg.Done()
+
+ _, err := ai.provider.vmClient.Delete(ai.provider.ctx, ai.provider.azconfig.ResourceGroup, *ai.vm.Name)
return WrapAzureError(err)
}
@@ -600,7 +628,10 @@ func (ai *AzureInstance) Address() string {
return *(*ai.nic.IPConfigurations)[0].PrivateIPAddress
}
-func (ai *AzureInstance) VerifyHostKey(ctx context.Context, receivedKey ssh.PublicKey, client *ssh.Client) error {
+func (ai *AzureInstance) VerifyHostKey(receivedKey ssh.PublicKey, client *ssh.Client) error {
+ ai.provider.stopWg.Add(1)
+ defer ai.provider.stopWg.Done()
+
remoteFingerprint := ssh.FingerprintSHA256(receivedKey)
tags := ai.Tags()
@@ -632,7 +663,6 @@ func (ai *AzureInstance) VerifyHostKey(ctx context.Context, receivedKey ssh.Publ
nodetoken := strings.TrimSpace(string(nodetokenbytes))
expectedToken := fmt.Sprintf("%s-%s", *ai.vm.Name, nodetokenTag)
- log.Printf("%q %q", nodetoken, expectedToken)
if strings.TrimSpace(nodetoken) != expectedToken {
return fmt.Errorf("Node token did not match, expected %q got %q", expectedToken, nodetoken)
@@ -650,14 +680,12 @@ func (ai *AzureInstance) VerifyHostKey(ctx context.Context, receivedKey ssh.Publ
sp := strings.Split(string(keyfingerprintbytes), " ")
- log.Printf("%q %q", remoteFingerprint, sp[1])
-
if remoteFingerprint != sp[1] {
return fmt.Errorf("Key fingerprint did not match, expected %q got %q", sp[1], remoteFingerprint)
}
tags["ssh-pubkey-fingerprint"] = sp[1]
delete(tags, "node-token")
- ai.SetTags(ctx, tags)
+ ai.SetTags(tags)
return nil
}
diff --git a/lib/cloud/azure_test.go b/lib/cloud/azure_test.go
index a123bc4bb..24e0a70d7 100644
--- a/lib/cloud/azure_test.go
+++ b/lib/cloud/azure_test.go
@@ -1,6 +1,28 @@
// Copyright (C) The Arvados Authors. All rights reserved.
//
// SPDX-License-Identifier: AGPL-3.0
+//
+//
+// How to manually run individual tests against the real cloud
+//
+// $ go test -v git.curoverse.com/arvados.git/lib/cloud -live-azure-cfg azconfig.yml -check.f=TestListInstances
+//
+// Example azconfig.yml:
+//
+// subscription_id: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
+// key: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
+// region: centralus
+// cloud_environment: AzurePublicCloud
+// secret: XXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXXX
+// tenant_id: XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX
+// resource_group: zzzzz
+// network: zzzzz
+// subnet: zzzzz-subnet-private
+// storage_account: example
+// blob_container: vhds
+// image: "https://example.blob.core.windows.net/system/Microsoft.Compute/Images/images/zzzzz-compute-osDisk.XXXXXXXX-XXXX-XXXX-XXXX-XXXXXXXXXXXX.vhd"
+// delete_dangling_resources_after: 20
+// authorized_key: "ssh-rsa AAAAB3NzaC1yc2EAAAADAQABAAABAQDLQS1ExT2+WjA0d/hntEAyAtgeN1W2ik2QX8c2zO6HjlPHWXL92r07W0WMuDib40Pcevpi1BXeBWXA9ZB5KKMJB+ukaAu22KklnQuUmNvk6ZXnPKSkGxuCYvPQb08WhHf3p1VxiKfP3iauedBDM4x9/bkJohlBBQiFXzNUcQ+a6rKiMzmJN2gbL8ncyUzc+XQ5q4JndTwTGtOlzDiGOc9O4z5Dd76wtAVJneOuuNpwfFRVHThpJM6VThpCZOnl8APaceWXKeuwOuCae3COZMz++xQfxOfZ9Z8aIwo+TlQhsRaNfZ4Vjrop6ej8dtfZtgUFKfbXEOYaHrGrWGotFDTD example at example"
package cloud
@@ -124,8 +146,7 @@ func (*AzureInstanceSetSuite) TestCreate(c *check.C) {
nodetoken, err := randutil.String(40, "abcdefghijklmnopqrstuvwxyz0123456789")
c.Assert(err, check.IsNil)
- inst, err := ap.Create(context.Background(),
- cluster.InstanceTypes["tiny"],
+ inst, err := ap.Create(cluster.InstanceTypes["tiny"],
img, map[string]string{
"node-token": nodetoken},
pk)
@@ -143,7 +164,7 @@ func (*AzureInstanceSetSuite) TestListInstances(c *check.C) {
c.Fatal("Error making provider", err)
}
- l, err := ap.Instances(context.Background(), nil)
+ l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
@@ -159,7 +180,8 @@ func (*AzureInstanceSetSuite) TestManageNics(c *check.C) {
c.Fatal("Error making provider", err)
}
- ap.(*AzureInstanceSet).ManageNics(context.Background())
+ ap.(*AzureInstanceSet).ManageNics()
+ ap.Stop()
}
func (*AzureInstanceSetSuite) TestManageBlobs(c *check.C) {
@@ -168,7 +190,8 @@ func (*AzureInstanceSetSuite) TestManageBlobs(c *check.C) {
c.Fatal("Error making provider", err)
}
- ap.(*AzureInstanceSet).ManageBlobs(context.Background())
+ ap.(*AzureInstanceSet).ManageBlobs()
+ ap.Stop()
}
func (*AzureInstanceSetSuite) TestDestroyInstances(c *check.C) {
@@ -177,11 +200,11 @@ func (*AzureInstanceSetSuite) TestDestroyInstances(c *check.C) {
c.Fatal("Error making provider", err)
}
- l, err := ap.Instances(context.Background(), nil)
+ l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
for _, i := range l {
- c.Check(i.Destroy(context.Background()), check.IsNil)
+ c.Check(i.Destroy(), check.IsNil)
}
}
@@ -239,16 +262,16 @@ func (*AzureInstanceSetSuite) TestSetTags(c *check.C) {
if err != nil {
c.Fatal("Error making provider", err)
}
- l, err := ap.Instances(context.Background(), nil)
+ l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
if len(l) > 0 {
- err = l[0].SetTags(context.Background(), map[string]string{"foo": "bar"})
+ err = l[0].SetTags(map[string]string{"foo": "bar"})
if err != nil {
c.Fatal("Error setting tags", err)
}
}
- l, err = ap.Instances(context.Background(), nil)
+ l, err = ap.Instances(nil)
c.Assert(err, check.IsNil)
if len(l) > 0 {
@@ -262,7 +285,7 @@ func (*AzureInstanceSetSuite) TestSSH(c *check.C) {
if err != nil {
c.Fatal("Error making provider", err)
}
- l, err := ap.Instances(context.Background(), nil)
+ l, err := ap.Instances(nil)
c.Assert(err, check.IsNil)
if len(l) > 0 {
@@ -316,7 +339,7 @@ func SetupSSHClient(c *check.C, inst Instance) (*ssh.Client, error) {
return nil, errors.New("BUG: key was never provided to HostKeyCallback")
}
- err = inst.VerifyHostKey(context.Background(), receivedKey, client)
+ err = inst.VerifyHostKey(receivedKey, client)
c.Assert(err, check.IsNil)
return client, nil
diff --git a/lib/cloud/interfaces.go b/lib/cloud/interfaces.go
index c2b1acbab..e3a072582 100644
--- a/lib/cloud/interfaces.go
+++ b/lib/cloud/interfaces.go
@@ -5,7 +5,6 @@
package cloud
import (
- "context"
"io"
"time"
@@ -67,7 +66,7 @@ type ExecutorTarget interface {
// VerifyHostKey can use it to make outgoing network
// connections from the instance -- e.g., to use the cloud's
// "this instance's metadata" API.
- VerifyHostKey(context.Context, ssh.PublicKey, *ssh.Client) error
+ VerifyHostKey(ssh.PublicKey, *ssh.Client) error
}
// Instance is implemented by the provider-specific instance types.
@@ -89,10 +88,10 @@ type Instance interface {
Tags() InstanceTags
// Replace tags with the given tags
- SetTags(context.Context, InstanceTags) error
+ SetTags(InstanceTags) error
// Shut down the node
- Destroy(context.Context) error
+ Destroy() error
}
// An InstanceSet manages a set of VM instances created by an elastic
@@ -106,7 +105,7 @@ type InstanceSet interface {
//
// The returned error should implement RateLimitError and
// QuotaError where applicable.
- Create(context.Context, arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
+ Create(arvados.InstanceType, ImageID, InstanceTags, ssh.PublicKey) (Instance, error)
// Return all instances, including ones that are booting or
// shutting down. Optionally, filter out nodes that don't have
@@ -118,7 +117,7 @@ type InstanceSet interface {
// Instance object each time. Thus, the caller is responsible
// for de-duplicating the returned instances by comparing the
// InstanceIDs returned by the instances' ID() methods.
- Instances(context.Context, InstanceTags) ([]Instance, error)
+ Instances(InstanceTags) ([]Instance, error)
// Stop any background tasks and release other resources.
Stop()
diff --git a/lib/dispatchcloud/dispatcher_test.go b/lib/dispatchcloud/dispatcher_test.go
index 87122a85c..33823a828 100644
--- a/lib/dispatchcloud/dispatcher_test.go
+++ b/lib/dispatchcloud/dispatcher_test.go
@@ -5,7 +5,6 @@
package dispatchcloud
import (
- "context"
"encoding/json"
"io/ioutil"
"math/rand"
@@ -166,7 +165,7 @@ func (s *DispatcherSuite) TestDispatchToStubDriver(c *check.C) {
deadline := time.Now().Add(time.Second)
for range time.NewTicker(10 * time.Millisecond).C {
- insts, err := s.stubDriver.InstanceSets()[0].Instances(context.TODO(), nil)
+ insts, err := s.stubDriver.InstanceSets()[0].Instances(nil)
c.Check(err, check.IsNil)
queue.Update()
ents, _ := queue.Entries()
diff --git a/lib/dispatchcloud/instance_set_proxy.go b/lib/dispatchcloud/instance_set_proxy.go
index 80c510458..e728b67cd 100644
--- a/lib/dispatchcloud/instance_set_proxy.go
+++ b/lib/dispatchcloud/instance_set_proxy.go
@@ -5,8 +5,6 @@
package dispatchcloud
import (
- "context"
-
"git.curoverse.com/arvados.git/lib/cloud"
"git.curoverse.com/arvados.git/sdk/go/arvados"
"golang.org/x/crypto/ssh"
@@ -16,12 +14,12 @@ type instanceSetProxy struct {
cloud.InstanceSet
}
-func (is *instanceSetProxy) Create(ctx context.Context, it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
+func (is *instanceSetProxy) Create(it arvados.InstanceType, id cloud.ImageID, tags cloud.InstanceTags, pk ssh.PublicKey) (cloud.Instance, error) {
// TODO: return if Create failed recently with a RateLimitError or QuotaError
- return is.InstanceSet.Create(ctx, it, id, tags, pk)
+ return is.InstanceSet.Create(it, id, tags, pk)
}
-func (is *instanceSetProxy) Instances(ctx context.Context, tags cloud.InstanceTags) ([]cloud.Instance, error) {
+func (is *instanceSetProxy) Instances(tags cloud.InstanceTags) ([]cloud.Instance, error) {
// TODO: return if Instances failed recently with a RateLimitError
- return is.InstanceSet.Instances(ctx, tags)
+ return is.InstanceSet.Instances(tags)
}
diff --git a/lib/dispatchcloud/ssh_executor/executor.go b/lib/dispatchcloud/ssh_executor/executor.go
index c9b0101a7..b5dba9870 100644
--- a/lib/dispatchcloud/ssh_executor/executor.go
+++ b/lib/dispatchcloud/ssh_executor/executor.go
@@ -8,7 +8,6 @@ package ssh_executor
import (
"bytes"
- "context"
"errors"
"io"
"net"
@@ -181,7 +180,7 @@ func (exr *Executor) setupSSHClient() (*ssh.Client, error) {
}
if exr.hostKey == nil || !bytes.Equal(exr.hostKey.Marshal(), receivedKey.Marshal()) {
- err = target.VerifyHostKey(context.TODO(), receivedKey, client)
+ err = target.VerifyHostKey(receivedKey, client)
if err != nil {
return nil, err
}
diff --git a/lib/dispatchcloud/test/lame_instance_set.go b/lib/dispatchcloud/test/lame_instance_set.go
index c52f7c645..baab407a7 100644
--- a/lib/dispatchcloud/test/lame_instance_set.go
+++ b/lib/dispatchcloud/test/lame_instance_set.go
@@ -5,7 +5,6 @@
package test
import (
- "context"
"fmt"
"math/rand"
"sync"
@@ -25,13 +24,13 @@ type LameInstanceSet struct {
}
// Create returns a new instance.
-func (p *LameInstanceSet) Create(_ context.Context, instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
+func (p *LameInstanceSet) Create(instType arvados.InstanceType, imageID cloud.ImageID, tags cloud.InstanceTags, pubkey ssh.PublicKey) (cloud.Instance, error) {
inst := &lameInstance{
p: p,
id: cloud.InstanceID(fmt.Sprintf("lame-%x", rand.Uint64())),
providerType: instType.ProviderType,
}
- inst.SetTags(context.TODO(), tags)
+ inst.SetTags(tags)
if p.Hold != nil {
p.Hold <- true
}
@@ -45,7 +44,7 @@ func (p *LameInstanceSet) Create(_ context.Context, instType arvados.InstanceTyp
}
// Instances returns the instances that haven't been destroyed.
-func (p *LameInstanceSet) Instances(context.Context, cloud.InstanceTags) ([]cloud.Instance, error) {
+func (p *LameInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
p.mtx.Lock()
defer p.mtx.Unlock()
var instances []cloud.Instance
@@ -90,7 +89,7 @@ func (inst *lameInstance) Address() string {
return "0.0.0.0:1234"
}
-func (inst *lameInstance) SetTags(_ context.Context, tags cloud.InstanceTags) error {
+func (inst *lameInstance) SetTags(tags cloud.InstanceTags) error {
inst.p.mtx.Lock()
defer inst.p.mtx.Unlock()
inst.tags = cloud.InstanceTags{}
@@ -100,7 +99,7 @@ func (inst *lameInstance) SetTags(_ context.Context, tags cloud.InstanceTags) er
return nil
}
-func (inst *lameInstance) Destroy(context.Context) error {
+func (inst *lameInstance) Destroy() error {
if inst.p.Hold != nil {
inst.p.Hold <- true
}
@@ -114,6 +113,6 @@ func (inst *lameInstance) Tags() cloud.InstanceTags {
return inst.tags
}
-func (inst *lameInstance) VerifyHostKey(context.Context, ssh.PublicKey, *ssh.Client) error {
+func (inst *lameInstance) VerifyHostKey(ssh.PublicKey, *ssh.Client) error {
return nil
}
diff --git a/lib/dispatchcloud/test/stub_driver.go b/lib/dispatchcloud/test/stub_driver.go
index 8f40b85eb..8bdfaa947 100644
--- a/lib/dispatchcloud/test/stub_driver.go
+++ b/lib/dispatchcloud/test/stub_driver.go
@@ -5,7 +5,6 @@
package test
import (
- "context"
"crypto/rand"
"errors"
"fmt"
@@ -69,7 +68,7 @@ type StubInstanceSet struct {
stopped bool
}
-func (sis *StubInstanceSet) Create(_ context.Context, it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
+func (sis *StubInstanceSet) Create(it arvados.InstanceType, image cloud.ImageID, tags cloud.InstanceTags, authKey ssh.PublicKey) (cloud.Instance, error) {
sis.mtx.Lock()
defer sis.mtx.Unlock()
if sis.stopped {
@@ -97,7 +96,7 @@ func (sis *StubInstanceSet) Create(_ context.Context, it arvados.InstanceType, i
return svm.Instance(), nil
}
-func (sis *StubInstanceSet) Instances(context.Context, cloud.InstanceTags) ([]cloud.Instance, error) {
+func (sis *StubInstanceSet) Instances(cloud.InstanceTags) ([]cloud.Instance, error) {
sis.mtx.RLock()
defer sis.mtx.RUnlock()
var r []cloud.Instance
@@ -262,7 +261,7 @@ func (si stubInstance) Address() string {
return si.addr
}
-func (si stubInstance) Destroy(_ context.Context) error {
+func (si stubInstance) Destroy() error {
if math_rand.Float64() < si.svm.sis.driver.ErrorRateDestroy {
return errors.New("instance could not be destroyed")
}
@@ -278,7 +277,7 @@ func (si stubInstance) ProviderType() string {
return si.svm.providerType
}
-func (si stubInstance) SetTags(_ context.Context, tags cloud.InstanceTags) error {
+func (si stubInstance) SetTags(tags cloud.InstanceTags) error {
tags = copyTags(tags)
svm := si.svm
go func() {
@@ -297,7 +296,7 @@ func (si stubInstance) String() string {
return string(si.svm.id)
}
-func (si stubInstance) VerifyHostKey(_ context.Context, key ssh.PublicKey, client *ssh.Client) error {
+func (si stubInstance) VerifyHostKey(key ssh.PublicKey, client *ssh.Client) error {
buf := make([]byte, 512)
_, err := io.ReadFull(rand.Reader, buf)
if err != nil {
diff --git a/lib/dispatchcloud/worker/pool.go b/lib/dispatchcloud/worker/pool.go
index 30a1312d3..ff5f762c1 100644
--- a/lib/dispatchcloud/worker/pool.go
+++ b/lib/dispatchcloud/worker/pool.go
@@ -5,7 +5,6 @@
package worker
import (
- "context"
"io"
"sort"
"strings"
@@ -226,7 +225,7 @@ func (wp *Pool) Create(it arvados.InstanceType) error {
wp.creating[it] = append(wp.creating[it], now)
go func() {
defer wp.notify()
- inst, err := wp.instanceSet.Create(context.TODO(), it, wp.imageID, tags, nil)
+ inst, err := wp.instanceSet.Create(it, wp.imageID, tags, nil)
wp.mtx.Lock()
defer wp.mtx.Unlock()
// Remove our timestamp marker from wp.creating
@@ -627,7 +626,7 @@ func (wp *Pool) getInstancesAndSync() error {
wp.setupOnce.Do(wp.setup)
wp.logger.Debug("getting instance list")
threshold := time.Now()
- instances, err := wp.instanceSet.Instances(context.TODO(), cloud.InstanceTags{})
+ instances, err := wp.instanceSet.Instances(cloud.InstanceTags{})
if err != nil {
return err
}
diff --git a/lib/dispatchcloud/worker/worker.go b/lib/dispatchcloud/worker/worker.go
index a4578b26d..c26186309 100644
--- a/lib/dispatchcloud/worker/worker.go
+++ b/lib/dispatchcloud/worker/worker.go
@@ -6,7 +6,6 @@ package worker
import (
"bytes"
- "context"
"strings"
"sync"
"time"
@@ -312,7 +311,7 @@ func (wkr *worker) shutdown() {
wkr.state = StateShutdown
go wkr.wp.notify()
go func() {
- err := wkr.instance.Destroy(context.TODO())
+ err := wkr.instance.Destroy()
if err != nil {
wkr.logger.WithError(err).Warn("shutdown failed")
return
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list