[ARVADOS] created: 91db2d8fd32bc3f6c2a26ffc37f6591b1e5f380b

Git user git at public.curoverse.com
Mon Oct 2 16:59:21 EDT 2017

        at  91db2d8fd32bc3f6c2a26ffc37f6591b1e5f380b (commit)

commit 91db2d8fd32bc3f6c2a26ffc37f6591b1e5f380b
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date:   Thu Sep 28 23:28:13 2017 -0400

    12260: Health check aggregator.
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>

diff --git a/build/run-tests.sh b/build/run-tests.sh
index 81c6612..63c8801 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -75,6 +75,7 @@ services/arv-git-httpd
@@ -800,6 +801,7 @@ gostuff=(
+    services/health
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
new file mode 100644
index 0000000..537de2a
--- /dev/null
+++ b/sdk/go/arvados/config.go
@@ -0,0 +1,87 @@
+package arvados
+import (
+	"fmt"
+	"os"
+	"strings"
+	"git.curoverse.com/arvados.git/sdk/go/config"
+type Config struct {
+	Clusters map[string]Cluster
+// GetConfig returns the current system config, loading it from
+// /etc if needed.
+func GetConfig() (*Config, error) {
+	var cfg Config
+	err := config.LoadFile(&cfg, "/etc/arvados/config.yml")
+	return &cfg, err
+// GetCluster returns the cluster ID and config for the given
+// cluster, or the default/only configured cluster if clusterID is "".
+func (sc *Config) GetCluster(clusterID string) (*Cluster, error) {
+	if clusterID == "" {
+		if len(sc.Clusters) != 1 {
+			return nil, fmt.Errorf("multiple clusters configured, cannot choose")
+		} else {
+			for id, cc := range sc.Clusters {
+				cc.ClusterID = id
+				return &cc, nil
+			}
+		}
+	}
+	if cc, ok := sc.Clusters[clusterID]; !ok {
+		return nil, fmt.Errorf("cluster %q is not configured", clusterID)
+	} else {
+		cc.ClusterID = clusterID
+		return &cc, nil
+	}
+type Cluster struct {
+	ClusterID       string `json:"-"`
+	ManagementToken string
+	SystemNodes     map[string]SystemNode
+// GetThisSystemNodeConfig returns a SystemNode for the node we're
+// running on right now.
+func (cc *Cluster) GetThisSystemNode() (*SystemNode, error) {
+	hostname, err := os.Hostname()
+	if err != nil {
+		return nil, err
+	}
+	return cc.GetSystemNode(hostname)
+// GetSystemNodeConfig returns a NodeConfig for the given node. An
+// error is returned if the appropriate configuration can't be
+// determined (e.g., this does not appear to be a system node).
+func (cc *Cluster) GetSystemNode(node string) (*SystemNode, error) {
+	// Generally node is "a.b.ca", use the first of {"a.b.ca",
+	// "a.b", "a"} that has an entry in SystemNodes.
+	labels := strings.Split(node, ".")
+	for j := len(labels); j > 0; j-- {
+		hostpart := strings.Join(labels[:j], ".")
+		if cfg, ok := cc.SystemNodes[hostpart]; ok {
+			return &cfg, nil
+		}
+	}
+	// If node is not listed, but "*" gives a default system node
+	// config, use the default config.
+	if cfg, ok := cc.SystemNodes["*"]; ok {
+		return &cfg, nil
+	}
+	return nil, fmt.Errorf("config does not provision host %q as a system node", node)
+type SystemNode struct {
+	Keepstore Keepstore
+type Keepstore struct {
+	Listen string
diff --git a/sdk/go/auth/auth.go b/sdk/go/auth/auth.go
index 730989b..ea49243 100644
--- a/sdk/go/auth/auth.go
+++ b/sdk/go/auth/auth.go
@@ -39,7 +39,7 @@ var DecodeTokenCookie func(string) ([]byte, error) = base64.URLEncoding.DecodeSt
 func (a *Credentials) LoadTokensFromHTTPRequest(r *http.Request) {
 	// Load plain token from "Authorization: OAuth2 ..." header
 	// (typically used by smart API clients)
-	if toks := strings.SplitN(r.Header.Get("Authorization"), " ", 2); len(toks) == 2 && toks[0] == "OAuth2" {
+	if toks := strings.SplitN(r.Header.Get("Authorization"), " ", 2); len(toks) == 2 && (toks[0] == "OAuth2" || toks[0] == "Bearer") {
 		a.Tokens = append(a.Tokens, toks[1])
diff --git a/sdk/go/health/aggregator.go b/sdk/go/health/aggregator.go
new file mode 100644
index 0000000..6993187
--- /dev/null
+++ b/sdk/go/health/aggregator.go
@@ -0,0 +1,197 @@
+package health
+import (
+	"context"
+	"encoding/json"
+	"fmt"
+	"net"
+	"net/http"
+	"sync"
+	"time"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/auth"
+const defaultTimeout = arvados.Duration(2 * time.Second)
+// Aggregator implements http.Handler. It handles "GET /_health/all"
+// by checking the health of all configured services on the cluster
+// and responding 200 if everything is healthy.
+type Aggregator struct {
+	setupOnce  sync.Once
+	httpClient *http.Client
+	timeout    arvados.Duration
+	Config *arvados.Config
+	// If non-nil, Log is called after handling each request.
+	Log func(*http.Request, error)
+func (agg *Aggregator) setup() {
+	agg.httpClient = http.DefaultClient
+	if agg.timeout == 0 {
+		// this is always the case, except in the test suite
+		agg.timeout = defaultTimeout
+	}
+func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	sendErr := func(statusCode int, err error) {
+		resp.WriteHeader(statusCode)
+		json.NewEncoder(resp).Encode(map[string]interface{}{"error": err})
+		if agg.Log != nil {
+			agg.Log(req, err)
+		}
+	}
+	resp.Header().Set("Content-Type", "application/json")
+	if agg.Config == nil {
+		cfg, err := arvados.GetConfig()
+		if err != nil {
+			err = fmt.Errorf("arvados.GetConfig(): %s", err)
+			sendErr(http.StatusInternalServerError, err)
+			return
+		}
+		agg.Config = cfg
+	}
+	cluster, err := agg.Config.GetCluster("")
+	if err != nil {
+		err = fmt.Errorf("arvados.GetCluster(): %s", err)
+		sendErr(http.StatusInternalServerError, err)
+		return
+	}
+	if !agg.checkAuth(req, cluster) {
+		sendErr(http.StatusUnauthorized, errUnauthorized)
+		return
+	}
+	if req.URL.Path != "/_health/all" {
+		sendErr(http.StatusNotFound, errNotFound)
+		return
+	}
+	json.NewEncoder(resp).Encode(agg.checkClusterHealth(cluster))
+	if agg.Log != nil {
+		agg.Log(req, nil)
+	}
+type serviceHealth struct {
+	Health string `json:"health"`
+	N      int    `json:"n"`
+type clusterHealthResponse struct {
+	Health    string                            `json:"health"`
+	Endpoints map[string]map[string]interface{} `json:"endpoints"`
+	Services  map[string]serviceHealth          `json:"services"`
+func (agg *Aggregator) checkClusterHealth(cluster *arvados.Cluster) clusterHealthResponse {
+	resp := clusterHealthResponse{
+		Health:    "OK",
+		Endpoints: make(map[string]map[string]interface{}),
+		Services:  make(map[string]serviceHealth),
+	}
+	mtx := sync.Mutex{}
+	wg := sync.WaitGroup{}
+	for node, nodeConfig := range cluster.SystemNodes {
+		for svc, addr := range map[string]string{
+			"keepstore": nodeConfig.Keepstore.Listen,
+		} {
+			if addr == "" {
+				continue
+			}
+			wg.Add(1)
+			go func() {
+				defer wg.Done()
+				pingResp := agg.ping(node, addr)
+				mtx.Lock()
+				defer mtx.Unlock()
+				resp.Endpoints[node+"/"+svc+"/_health/ping"] = pingResp
+				svHealth := resp.Services[svc]
+				if agg.isOK(pingResp) {
+					svHealth.N++
+				} else {
+					resp.Health = "ERROR"
+				}
+				resp.Services[svc] = svHealth
+			}()
+		}
+	}
+	wg.Wait()
+	for svc, svHealth := range resp.Services {
+		if svHealth.N > 0 {
+			svHealth.Health = "OK"
+		} else {
+			svHealth.Health = "ERROR"
+		}
+		resp.Services[svc] = svHealth
+	}
+	return resp
+func (agg *Aggregator) isOK(result map[string]interface{}) bool {
+	h, ok := result["health"].(string)
+	return ok && h == "OK"
+func (agg *Aggregator) ping(node, addr string) (result map[string]interface{}) {
+	t0 := time.Now()
+	result = make(map[string]interface{})
+	var err error
+	defer func() {
+		result["responseTime"] = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
+		if err != nil {
+			result["health"], result["error"] = "ERROR", err
+		}
+	}()
+	_, port, err := net.SplitHostPort(addr)
+	if err != nil {
+		return
+	}
+	req, err := http.NewRequest("GET", "http://"+node+":"+port+"/_health/ping", nil)
+	if err != nil {
+		return
+	}
+	ctx, cancel := context.WithCancel(req.Context())
+	go func() {
+		select {
+		case <-time.After(time.Duration(agg.timeout)):
+			cancel()
+		case <-ctx.Done():
+		}
+	}()
+	req = req.WithContext(ctx)
+	resp, err := agg.httpClient.Do(req)
+	if err != nil {
+		return
+	}
+	err = json.NewDecoder(resp.Body).Decode(result)
+	if err != nil {
+		return
+	}
+	if resp.StatusCode != 200 {
+		err = fmt.Errorf("HTTP %d %s", resp.StatusCode, resp.Status)
+		return
+	}
+	return
+func (agg *Aggregator) checkAuth(req *http.Request, cluster *arvados.Cluster) bool {
+	creds := auth.NewCredentialsFromHTTPRequest(req)
+	for _, token := range creds.Tokens {
+		if token != "" && token == cluster.ManagementToken {
+			return true
+		}
+	}
+	return false
diff --git a/sdk/go/health/aggregator_test.go b/sdk/go/health/aggregator_test.go
new file mode 100644
index 0000000..2cb7122
--- /dev/null
+++ b/sdk/go/health/aggregator_test.go
@@ -0,0 +1,73 @@
+package health
+import (
+	"encoding/json"
+	"net/http"
+	"net/http/httptest"
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/arvadostest"
+	"gopkg.in/check.v1"
+type AggregatorSuite struct {
+	handler *Aggregator
+	req     *http.Request
+	resp    *httptest.ResponseRecorder
+// Gocheck boilerplate
+var _ = check.Suite(&AggregatorSuite{})
+func (s *AggregatorSuite) TestInterface(c *check.C) {
+	var _ http.Handler = &Aggregator{}
+func (s *AggregatorSuite) SetUpTest(c *check.C) {
+	s.handler = &Aggregator{Config: &arvados.Config{
+		Clusters: map[string]arvados.Cluster{
+			"zzzzz": {
+				ManagementToken: arvadostest.ManagementToken,
+				SystemNodes:     map[string]arvados.SystemNode{},
+			},
+		},
+	}}
+	s.req = httptest.NewRequest("GET", "/_health/all", nil)
+	s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+	s.resp = httptest.NewRecorder()
+func (s *AggregatorSuite) TestNoAuth(c *check.C) {
+	s.req.Header.Del("Authorization")
+	s.handler.ServeHTTP(s.resp, s.req)
+	s.checkError(c)
+	c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
+func (s *AggregatorSuite) TestBadAuth(c *check.C) {
+	s.req.Header.Set("Authorization", "xyzzy")
+	s.handler.ServeHTTP(s.resp, s.req)
+	s.checkError(c)
+	c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
+func (s *AggregatorSuite) TestEmptyConfig(c *check.C) {
+	s.handler.ServeHTTP(s.resp, s.req)
+	s.checkOK(c)
+func (s *AggregatorSuite) checkError(c *check.C) {
+	c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
+	var body map[string]interface{}
+	err := json.NewDecoder(s.resp.Body).Decode(&body)
+	c.Check(err, check.IsNil)
+	c.Check(body["health"], check.Not(check.Equals), "OK")
+func (s *AggregatorSuite) checkOK(c *check.C) {
+	c.Check(s.resp.Code, check.Equals, http.StatusOK)
+	var body map[string]interface{}
+	err := json.NewDecoder(s.resp.Body).Decode(&body)
+	c.Check(err, check.IsNil)
+	c.Check(body["health"], check.Equals, "OK")
diff --git a/services/health/main.go b/services/health/main.go
new file mode 100644
index 0000000..7f4d648
--- /dev/null
+++ b/services/health/main.go
@@ -0,0 +1,33 @@
+package main
+import (
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+	"git.curoverse.com/arvados.git/sdk/go/health"
+	"git.curoverse.com/arvados.git/sdk/go/httpserver"
+	log "github.com/Sirupsen/logrus"
+func main() {
+	log.SetFormatter(&log.JSONFormatter{
+		TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
+	})
+	sysConf, err := arvados.GetSystemConfig()
+	if err != nil {
+		log.Fatal(err)
+	}
+	srv := &httpserver.Server{
+		Addr: ":", // FIXME: should be dictated by Health on this SystemNode
+		Handler: &health.Aggregator{
+			SystemConfig: sysConf,
+		},
+	}
+	srv.HandleFunc()
+	if err := srv.Start(); err != nil {
+		log.Fatal(err)
+	}
+	log.WithField("Listen", srv.Addr).Info("listening")
+	if err := srv.Wait(); err != nil {
+		log.Fatal(err)
+	}



More information about the arvados-commits mailing list