[ARVADOS] created: 91db2d8fd32bc3f6c2a26ffc37f6591b1e5f380b
Git user
git at public.curoverse.com
Mon Oct 2 16:59:21 EDT 2017
at 91db2d8fd32bc3f6c2a26ffc37f6591b1e5f380b (commit)
commit 91db2d8fd32bc3f6c2a26ffc37f6591b1e5f380b
Author: Tom Clegg <tclegg at veritasgenetics.com>
Date: Thu Sep 28 23:28:13 2017 -0400
12260: Health check aggregator.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tclegg at veritasgenetics.com>
diff --git a/build/run-tests.sh b/build/run-tests.sh
index 81c6612..63c8801 100755
--- a/build/run-tests.sh
+++ b/build/run-tests.sh
@@ -75,6 +75,7 @@ services/arv-git-httpd
services/crunchstat
services/dockercleaner
services/fuse
+services/health
services/keep-web
services/keepproxy
services/keepstore
@@ -800,6 +801,7 @@ gostuff=(
lib/crunchstat
services/arv-git-httpd
services/crunchstat
+ services/health
services/keep-web
services/keepstore
sdk/go/keepclient
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
new file mode 100644
index 0000000..537de2a
--- /dev/null
+++ b/sdk/go/arvados/config.go
@@ -0,0 +1,87 @@
+package arvados
+
+import (
+ "fmt"
+ "os"
+ "strings"
+
+ "git.curoverse.com/arvados.git/sdk/go/config"
+)
+
+type Config struct {
+ Clusters map[string]Cluster
+}
+
+// GetConfig returns the current system config, loading it from
+// /etc if needed.
+func GetConfig() (*Config, error) {
+ var cfg Config
+ err := config.LoadFile(&cfg, "/etc/arvados/config.yml")
+ return &cfg, err
+}
+
+// GetCluster returns the cluster ID and config for the given
+// cluster, or the default/only configured cluster if clusterID is "".
+func (sc *Config) GetCluster(clusterID string) (*Cluster, error) {
+ if clusterID == "" {
+ if len(sc.Clusters) != 1 {
+ return nil, fmt.Errorf("multiple clusters configured, cannot choose")
+ } else {
+ for id, cc := range sc.Clusters {
+ cc.ClusterID = id
+ return &cc, nil
+ }
+ }
+ }
+ if cc, ok := sc.Clusters[clusterID]; !ok {
+ return nil, fmt.Errorf("cluster %q is not configured", clusterID)
+ } else {
+ cc.ClusterID = clusterID
+ return &cc, nil
+ }
+}
+
+type Cluster struct {
+ ClusterID string `json:"-"`
+ ManagementToken string
+ SystemNodes map[string]SystemNode
+}
+
+// GetThisSystemNodeConfig returns a SystemNode for the node we're
+// running on right now.
+func (cc *Cluster) GetThisSystemNode() (*SystemNode, error) {
+ hostname, err := os.Hostname()
+ if err != nil {
+ return nil, err
+ }
+ return cc.GetSystemNode(hostname)
+}
+
+// GetSystemNodeConfig returns a NodeConfig for the given node. An
+// error is returned if the appropriate configuration can't be
+// determined (e.g., this does not appear to be a system node).
+func (cc *Cluster) GetSystemNode(node string) (*SystemNode, error) {
+ // Generally node is "a.b.ca", use the first of {"a.b.ca",
+ // "a.b", "a"} that has an entry in SystemNodes.
+ labels := strings.Split(node, ".")
+ for j := len(labels); j > 0; j-- {
+ hostpart := strings.Join(labels[:j], ".")
+ if cfg, ok := cc.SystemNodes[hostpart]; ok {
+ return &cfg, nil
+ }
+ }
+ // If node is not listed, but "*" gives a default system node
+ // config, use the default config.
+ if cfg, ok := cc.SystemNodes["*"]; ok {
+ return &cfg, nil
+ }
+ return nil, fmt.Errorf("config does not provision host %q as a system node", node)
+}
+
+type SystemNode struct {
+ Keepstore Keepstore
+}
+
+type Keepstore struct {
+ Listen string
+}
diff --git a/sdk/go/auth/auth.go b/sdk/go/auth/auth.go
index 730989b..ea49243 100644
--- a/sdk/go/auth/auth.go
+++ b/sdk/go/auth/auth.go
@@ -39,7 +39,7 @@ var DecodeTokenCookie func(string) ([]byte, error) = base64.URLEncoding.DecodeSt
func (a *Credentials) LoadTokensFromHTTPRequest(r *http.Request) {
// Load plain token from "Authorization: OAuth2 ..." header
// (typically used by smart API clients)
- if toks := strings.SplitN(r.Header.Get("Authorization"), " ", 2); len(toks) == 2 && toks[0] == "OAuth2" {
+ if toks := strings.SplitN(r.Header.Get("Authorization"), " ", 2); len(toks) == 2 && (toks[0] == "OAuth2" || toks[0] == "Bearer") {
a.Tokens = append(a.Tokens, toks[1])
}
diff --git a/sdk/go/health/aggregator.go b/sdk/go/health/aggregator.go
new file mode 100644
index 0000000..6993187
--- /dev/null
+++ b/sdk/go/health/aggregator.go
@@ -0,0 +1,197 @@
+package health
+
+import (
+ "context"
+ "encoding/json"
+ "fmt"
+ "net"
+ "net/http"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/auth"
+)
+
+const defaultTimeout = arvados.Duration(2 * time.Second)
+
+// Aggregator implements http.Handler. It handles "GET /_health/all"
+// by checking the health of all configured services on the cluster
+// and responding 200 if everything is healthy.
+type Aggregator struct {
+ setupOnce sync.Once
+ httpClient *http.Client
+ timeout arvados.Duration
+
+ Config *arvados.Config
+
+ // If non-nil, Log is called after handling each request.
+ Log func(*http.Request, error)
+}
+
+func (agg *Aggregator) setup() {
+ agg.httpClient = http.DefaultClient
+ if agg.timeout == 0 {
+ // this is always the case, except in the test suite
+ agg.timeout = defaultTimeout
+ }
+}
+
+func (agg *Aggregator) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ sendErr := func(statusCode int, err error) {
+ resp.WriteHeader(statusCode)
+ json.NewEncoder(resp).Encode(map[string]interface{}{"error": err})
+ if agg.Log != nil {
+ agg.Log(req, err)
+ }
+ }
+
+ resp.Header().Set("Content-Type", "application/json")
+
+ if agg.Config == nil {
+ cfg, err := arvados.GetConfig()
+ if err != nil {
+ err = fmt.Errorf("arvados.GetConfig(): %s", err)
+ sendErr(http.StatusInternalServerError, err)
+ return
+ }
+ agg.Config = cfg
+ }
+ cluster, err := agg.Config.GetCluster("")
+ if err != nil {
+ err = fmt.Errorf("arvados.GetCluster(): %s", err)
+ sendErr(http.StatusInternalServerError, err)
+ return
+ }
+ if !agg.checkAuth(req, cluster) {
+ sendErr(http.StatusUnauthorized, errUnauthorized)
+ return
+ }
+ if req.URL.Path != "/_health/all" {
+ sendErr(http.StatusNotFound, errNotFound)
+ return
+ }
+ json.NewEncoder(resp).Encode(agg.checkClusterHealth(cluster))
+ if agg.Log != nil {
+ agg.Log(req, nil)
+ }
+}
+
+type serviceHealth struct {
+ Health string `json:"health"`
+ N int `json:"n"`
+}
+
+type clusterHealthResponse struct {
+ Health string `json:"health"`
+ Endpoints map[string]map[string]interface{} `json:"endpoints"`
+ Services map[string]serviceHealth `json:"services"`
+}
+
+func (agg *Aggregator) checkClusterHealth(cluster *arvados.Cluster) clusterHealthResponse {
+ resp := clusterHealthResponse{
+ Health: "OK",
+ Endpoints: make(map[string]map[string]interface{}),
+ Services: make(map[string]serviceHealth),
+ }
+
+ mtx := sync.Mutex{}
+ wg := sync.WaitGroup{}
+ for node, nodeConfig := range cluster.SystemNodes {
+ for svc, addr := range map[string]string{
+ "keepstore": nodeConfig.Keepstore.Listen,
+ } {
+ if addr == "" {
+ continue
+ }
+ wg.Add(1)
+ go func() {
+ defer wg.Done()
+ pingResp := agg.ping(node, addr)
+
+ mtx.Lock()
+ defer mtx.Unlock()
+ resp.Endpoints[node+"/"+svc+"/_health/ping"] = pingResp
+ svHealth := resp.Services[svc]
+ if agg.isOK(pingResp) {
+ svHealth.N++
+ } else {
+ resp.Health = "ERROR"
+ }
+ resp.Services[svc] = svHealth
+ }()
+ }
+ }
+ wg.Wait()
+
+ for svc, svHealth := range resp.Services {
+ if svHealth.N > 0 {
+ svHealth.Health = "OK"
+ } else {
+ svHealth.Health = "ERROR"
+ }
+ resp.Services[svc] = svHealth
+ }
+
+ return resp
+}
+
+func (agg *Aggregator) isOK(result map[string]interface{}) bool {
+ h, ok := result["health"].(string)
+ return ok && h == "OK"
+}
+
+func (agg *Aggregator) ping(node, addr string) (result map[string]interface{}) {
+ t0 := time.Now()
+ result = make(map[string]interface{})
+
+ var err error
+ defer func() {
+ result["responseTime"] = json.Number(fmt.Sprintf("%.6f", time.Since(t0).Seconds()))
+ if err != nil {
+ result["health"], result["error"] = "ERROR", err
+ }
+ }()
+
+ _, port, err := net.SplitHostPort(addr)
+ if err != nil {
+ return
+ }
+ req, err := http.NewRequest("GET", "http://"+node+":"+port+"/_health/ping", nil)
+ if err != nil {
+ return
+ }
+
+ ctx, cancel := context.WithCancel(req.Context())
+ go func() {
+ select {
+ case <-time.After(time.Duration(agg.timeout)):
+ cancel()
+ case <-ctx.Done():
+ }
+ }()
+ req = req.WithContext(ctx)
+ resp, err := agg.httpClient.Do(req)
+ if err != nil {
+ return
+ }
+ err = json.NewDecoder(resp.Body).Decode(result)
+ if err != nil {
+ return
+ }
+ if resp.StatusCode != 200 {
+ err = fmt.Errorf("HTTP %d %s", resp.StatusCode, resp.Status)
+ return
+ }
+ return
+}
+
+func (agg *Aggregator) checkAuth(req *http.Request, cluster *arvados.Cluster) bool {
+ creds := auth.NewCredentialsFromHTTPRequest(req)
+ for _, token := range creds.Tokens {
+ if token != "" && token == cluster.ManagementToken {
+ return true
+ }
+ }
+ return false
+}
diff --git a/sdk/go/health/aggregator_test.go b/sdk/go/health/aggregator_test.go
new file mode 100644
index 0000000..2cb7122
--- /dev/null
+++ b/sdk/go/health/aggregator_test.go
@@ -0,0 +1,73 @@
+package health
+
+import (
+ "encoding/json"
+ "net/http"
+ "net/http/httptest"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/arvadostest"
+ "gopkg.in/check.v1"
+)
+
+type AggregatorSuite struct {
+ handler *Aggregator
+ req *http.Request
+ resp *httptest.ResponseRecorder
+}
+
+// Gocheck boilerplate
+var _ = check.Suite(&AggregatorSuite{})
+
+func (s *AggregatorSuite) TestInterface(c *check.C) {
+ var _ http.Handler = &Aggregator{}
+}
+
+func (s *AggregatorSuite) SetUpTest(c *check.C) {
+ s.handler = &Aggregator{Config: &arvados.Config{
+ Clusters: map[string]arvados.Cluster{
+ "zzzzz": {
+ ManagementToken: arvadostest.ManagementToken,
+ SystemNodes: map[string]arvados.SystemNode{},
+ },
+ },
+ }}
+ s.req = httptest.NewRequest("GET", "/_health/all", nil)
+ s.req.Header.Set("Authorization", "Bearer "+arvadostest.ManagementToken)
+ s.resp = httptest.NewRecorder()
+}
+
+func (s *AggregatorSuite) TestNoAuth(c *check.C) {
+ s.req.Header.Del("Authorization")
+ s.handler.ServeHTTP(s.resp, s.req)
+ s.checkError(c)
+ c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
+}
+
+func (s *AggregatorSuite) TestBadAuth(c *check.C) {
+ s.req.Header.Set("Authorization", "xyzzy")
+ s.handler.ServeHTTP(s.resp, s.req)
+ s.checkError(c)
+ c.Check(s.resp.Code, check.Equals, http.StatusUnauthorized)
+}
+
+func (s *AggregatorSuite) TestEmptyConfig(c *check.C) {
+ s.handler.ServeHTTP(s.resp, s.req)
+ s.checkOK(c)
+}
+
+func (s *AggregatorSuite) checkError(c *check.C) {
+ c.Check(s.resp.Code, check.Not(check.Equals), http.StatusOK)
+ var body map[string]interface{}
+ err := json.NewDecoder(s.resp.Body).Decode(&body)
+ c.Check(err, check.IsNil)
+ c.Check(body["health"], check.Not(check.Equals), "OK")
+}
+
+func (s *AggregatorSuite) checkOK(c *check.C) {
+ c.Check(s.resp.Code, check.Equals, http.StatusOK)
+ var body map[string]interface{}
+ err := json.NewDecoder(s.resp.Body).Decode(&body)
+ c.Check(err, check.IsNil)
+ c.Check(body["health"], check.Equals, "OK")
+}
diff --git a/services/health/main.go b/services/health/main.go
new file mode 100644
index 0000000..7f4d648
--- /dev/null
+++ b/services/health/main.go
@@ -0,0 +1,33 @@
+package main
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+ "git.curoverse.com/arvados.git/sdk/go/health"
+ "git.curoverse.com/arvados.git/sdk/go/httpserver"
+ log "github.com/Sirupsen/logrus"
+)
+
+func main() {
+ log.SetFormatter(&log.JSONFormatter{
+ TimestampFormat: "2006-01-02T15:04:05.000000000Z07:00",
+ })
+ sysConf, err := arvados.GetSystemConfig()
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ srv := &httpserver.Server{
+ Addr: ":", // FIXME: should be dictated by Health on this SystemNode
+ Handler: &health.Aggregator{
+ SystemConfig: sysConf,
+ },
+ }
+ srv.HandleFunc()
+ if err := srv.Start(); err != nil {
+ log.Fatal(err)
+ }
+ log.WithField("Listen", srv.Addr).Info("listening")
+ if err := srv.Wait(); err != nil {
+ log.Fatal(err)
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list