[ARVADOS] created: 7cb536fa58d8cc837b4cb59680c7355a1687648b

Git user git at public.curoverse.com
Sun Nov 13 19:43:55 EST 2016


        at  7cb536fa58d8cc837b4cb59680c7355a1687648b (commit)


commit 7cb536fa58d8cc837b4cb59680c7355a1687648b
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 13 19:43:46 2016 -0500

    8460: Receive events and broadcast to clients.

diff --git a/services/ws/config.go b/services/ws/config.go
index 60731f9..9a2bb3c 100644
--- a/services/ws/config.go
+++ b/services/ws/config.go
@@ -8,6 +8,7 @@ type Config struct {
 	Client   arvados.Client
 	Postgres pgConfig
 	Listen   string
+	Debug    bool
 
 	ClientEventQueue int
 	ServerEventQueue int
diff --git a/services/ws/event.go b/services/ws/event.go
index 26cdb3b..1634a7a 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -1,4 +1,54 @@
 package main
 
+import (
+	"database/sql"
+	"log"
+	"sync"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type eventSink interface {
+	Channel() <-chan *event
+	Stop()
+}
+
+type eventSource interface {
+	NewSink(chan *event) eventSink
+}
+
 type event struct {
+	LogUUID  string
+	Received time.Time
+	Serial   uint64
+
+	logRow *arvados.Log
+	err error
+	mtx sync.Mutex
+}
+
+// Detail returns the database row corresponding to the event. It can
+// be called safely from multiple goroutines. Only one attempt will be
+// made. If the database row cannot be retrieved, Detail returns nil.
+func (e *event) Detail(db *sql.DB) *arvados.Log {
+	e.mtx.Lock()
+	defer e.mtx.Unlock()
+	if e.logRow != nil || e.err != nil {
+		return e.logRow
+	}
+	var logRow arvados.Log
+	var oldAttrs, newAttrs []byte
+	e.err = db.QueryRow(`SELECT uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan(
+		&logRow.UUID,
+		&logRow.ObjectUUID,
+		&logRow.ObjectOwnerUUID,
+		&logRow.EventType,
+		&logRow.CreatedAt,
+		&oldAttrs,
+		&newAttrs)
+	if e.err != nil {
+		log.Printf("retrieving log row %s: %s", e.LogUUID, e.err)
+	}
+	return e.logRow
 }
diff --git a/services/ws/handler.go b/services/ws/handler.go
new file mode 100644
index 0000000..c768143
--- /dev/null
+++ b/services/ws/handler.go
@@ -0,0 +1,9 @@
+package main
+
+import (
+	"io"
+)
+
+type handler interface {
+	Handle(io.ReadWriter, <-chan *event)
+}
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index b53f950..40ab75b 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -4,5 +4,9 @@ import (
 	"io"
 )
 
-func handlerV0(ws io.ReadWriter) {
+type handlerV0 struct {
+	QueueSize int
+}
+
+func (h *handlerV0) Handle(ws io.ReadWriter, events <-chan *event) {
 }
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
index fcf85dd..4b3f12b 100644
--- a/services/ws/handler_v1.go
+++ b/services/ws/handler_v1.go
@@ -4,5 +4,9 @@ import (
 	"io"
 )
 
-func handlerV1(ws io.ReadWriter) {
+type handlerV1 struct {
+	QueueSize int
+}
+
+func (h *handlerV1) Handle(ws io.ReadWriter, events <-chan *event) {
 }
diff --git a/services/ws/main.go b/services/ws/main.go
index 9a24b31..0f97823 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -10,6 +10,8 @@ import (
 	"git.curoverse.com/arvados.git/sdk/go/config"
 )
 
+var debugLogf = func(string, ...interface{}) {}
+
 func main() {
 	configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
 	dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
@@ -20,6 +22,9 @@ func main() {
 	if err != nil {
 		log.Fatal(err)
 	}
+	if cfg.Debug {
+		debugLogf = log.Printf
+	}
 
 	if *dumpConfig {
 		txt, err := config.Dump(&cfg)
@@ -36,10 +41,11 @@ func main() {
 		WriteTimeout:   time.Minute,
 		MaxHeaderBytes: 1 << 20,
 		Handler: &router{
-			EventSource: (&pgEventSource{
+			Config: &cfg,
+			eventSource: &pgEventSource{
 				PgConfig:  cfg.Postgres,
 				QueueSize: cfg.ServerEventQueue,
-			}).EventSource(),
+			},
 		},
 	}
 	log.Fatal(srv.ListenAndServe())
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 01b5bff..6bce668 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -5,8 +5,9 @@ import (
 	"log"
 	"strings"
 	"sync"
+	"time"
 
-	_ "github.com/lib/pq"
+	"github.com/lib/pq"
 )
 
 type pgConfig map[string]string
@@ -28,19 +29,117 @@ type pgEventSource struct {
 	PgConfig  pgConfig
 	QueueSize int
 
-	db        *sql.DB
-	setupOnce sync.Once
+	pqListener *pq.Listener
+	sinks      map[*pgEventSink]bool
+	setupOnce  sync.Once
+	mtx        sync.Mutex
 }
 
-func (es *pgEventSource) setup() {
-	db, err := sql.Open("postgres", es.PgConfig.ConnectionString())
+func (ps *pgEventSource) setup() {
+	ps.sinks = make(map[*pgEventSink]bool)
+	go ps.run()
+}
+
+func (ps *pgEventSource) run() {
+	db, err := sql.Open("postgres", ps.PgConfig.ConnectionString())
 	if err != nil {
 		log.Fatal(err)
 	}
-	es.db = db
+
+	listener := pq.NewListener(ps.PgConfig.ConnectionString(), time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+		if err != nil {
+			// Until we have a mechanism for catching up
+			// on missed events, we cannot recover from a
+			// dropped connection without breaking our
+			// promises to clients.
+			log.Fatal(err)
+		}
+	})
+	err = listener.Listen("logs")
+	if err != nil {
+		log.Fatal(err)
+	}
+	go func() {
+		for _ = range time.NewTicker(time.Minute).C {
+			listener.Ping()
+		}
+	}()
+
+	eventQueue := make(chan *event, ps.QueueSize)
+	go func() {
+		for e := range eventQueue {
+			// Wait for the "select ... from logs" call to
+			// finish. This limits max concurrent queries
+			// to ps.QueueSize. Without this, max
+			// concurrent queries would be bounded by
+			// client_count X client_queue_size.
+			e.Detail(db)
+			debugLogf("%+v", e)
+			ps.mtx.Lock()
+			for sink := range ps.sinks {
+				sink.channel <- e
+			}
+			ps.mtx.Unlock()
+		}
+	}()
+
+	var serial uint64
+	for pqEvent := range listener.Notify {
+		if pqEvent.Channel != "logs" {
+			continue
+		}
+		serial++
+		e := &event{
+			LogUUID:  pqEvent.Extra,
+			Received: time.Now(),
+			Serial:   serial,
+		}
+		debugLogf("%+v", e)
+		eventQueue <- e
+		go e.Detail(db)
+	}
+}
+
+// NewSink subscribes to the event source. If c is not nil, it will be
+// used as the event channel. Otherwise, a new channel will be
+// created. Either way, the sink channel will be returned by the
+// Channel() method of the returned eventSink. All subsequent events
+// will be sent to the sink channel. The caller must ensure events are
+// received from the sink channel as quickly as possible: when one
+// sink blocks, all other sinks also block.
+func (ps *pgEventSource) NewSink(c chan *event) eventSink {
+	ps.setupOnce.Do(ps.setup)
+	if c == nil {
+		c = make(chan *event, 1)
+	}
+	sink := &pgEventSink{
+		channel: c,
+		source:  ps,
+	}
+	ps.mtx.Lock()
+	ps.sinks[sink] = true
+	ps.mtx.Unlock()
+	return sink
+}
+
+type pgEventSink struct {
+	channel chan *event
+	source  *pgEventSource
+}
+
+func (sink *pgEventSink) Channel() <-chan *event {
+	return sink.channel
 }
 
-func (es *pgEventSource) EventSource() <-chan event {
-	es.setupOnce.Do(es.setup)
-	return nil
+func (sink *pgEventSink) Stop() {
+	go func() {
+		// Ensure this sink cannot fill up and block the
+		// server-side queue (which otherwise could in turn
+		// block our mtx.Lock() here)
+		for _ = range sink.channel {}
+	}()
+	sink.source.mtx.Lock()
+	delete(sink.source.sinks, sink)
+	sink.source.mtx.Unlock()
+	close(sink.channel)
 }
diff --git a/services/ws/router.go b/services/ws/router.go
index 8b7658e..01c1477 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -1,7 +1,6 @@
 package main
 
 import (
-	"io"
 	"log"
 	"net/http"
 	"sync"
@@ -10,25 +9,33 @@ import (
 )
 
 type router struct {
-	EventSource <-chan event
+	Config *Config
+
+	eventSource eventSource
 	mux         *http.ServeMux
 	setupOnce   sync.Once
 }
 
 func (rtr *router) setup() {
 	rtr.mux = http.NewServeMux()
-	rtr.mux.Handle("/websocket", makeServer(handlerV0))
-	rtr.mux.Handle("/arvados/v1/events.ws", makeServer(handlerV1))
+	rtr.mux.Handle("/websocket", rtr.makeServer(&handlerV0{
+		QueueSize: rtr.Config.ClientEventQueue,
+	}))
+	rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(&handlerV1{
+		QueueSize: rtr.Config.ClientEventQueue,
+	}))
 }
 
-func makeServer(handler func(io.ReadWriter)) websocket.Server {
-	return websocket.Server{
+func (rtr *router) makeServer(handler handler) *websocket.Server {
+	return &websocket.Server{
 		Handshake: func(c *websocket.Config, r *http.Request) error {
 			return nil
 		},
 		Handler: websocket.Handler(func(ws *websocket.Conn) {
 			log.Printf("socket request: %+v", ws.Request())
-			handler(ws)
+			sink := rtr.eventSource.NewSink(nil)
+			handler.Handle(ws, sink.Channel())
+			sink.Stop()
 			ws.Close()
 			log.Printf("socket disconnect: %+v", ws.Request().RemoteAddr)
 		}),

commit 1e094e23db422cabe013ecbf7a0b465c4b096e12
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 13 17:14:18 2016 -0500

    8460: Add default config and -dump-config flag.

diff --git a/sdk/go/config/load.go b/sdk/go/config/load.go
index 9c65d65..2bbb440 100644
--- a/sdk/go/config/load.go
+++ b/sdk/go/config/load.go
@@ -22,3 +22,8 @@ func LoadFile(cfg interface{}, configPath string) error {
 	}
 	return nil
 }
+
+// Dump returns a YAML representation of cfg.
+func Dump(cfg interface{}) ([]byte, error) {
+	return yaml.Marshal(cfg)
+}
diff --git a/services/ws/config.go b/services/ws/config.go
index fbdedb5..60731f9 100644
--- a/services/ws/config.go
+++ b/services/ws/config.go
@@ -15,6 +15,17 @@ type Config struct {
 
 func DefaultConfig() Config {
 	return Config{
+		Client: arvados.Client{
+			APIHost: "localhost:443",
+		},
+		Postgres: pgConfig{
+			"dbname":          "arvados_test",
+			"user":            "arvados",
+			"password":        "xyzzy",
+			"host":            "localhost",
+			"connect_timeout": "30",
+			"sslmode":         "disable",
+		},
 		ClientEventQueue: 64,
 	}
 }
diff --git a/services/ws/main.go b/services/ws/main.go
index ce56d0d..9a24b31 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -2,6 +2,7 @@ package main
 
 import (
 	"flag"
+	"fmt"
 	"log"
 	"net/http"
 	"time"
@@ -11,12 +12,24 @@ import (
 
 func main() {
 	configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
+	dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
 	cfg := DefaultConfig()
+	flag.Parse()
+
 	err := config.LoadFile(&cfg, *configPath)
 	if err != nil {
 		log.Fatal(err)
 	}
 
+	if *dumpConfig {
+		txt, err := config.Dump(&cfg)
+		if err != nil {
+			log.Fatal(err)
+		}
+		fmt.Print(string(txt))
+		return
+	}
+
 	srv := &http.Server{
 		Addr:           cfg.Listen,
 		ReadTimeout:    time.Minute,

commit 2dec79b036796df75885abf4e1a977958f9c541d
Author: Tom Clegg <tom at curoverse.com>
Date:   Sun Nov 13 15:46:49 2016 -0500

    8460: Scaffold for websocket server.

diff --git a/services/ws/config.go b/services/ws/config.go
new file mode 100644
index 0000000..fbdedb5
--- /dev/null
+++ b/services/ws/config.go
@@ -0,0 +1,20 @@
+package main
+
+import (
+	"git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type Config struct {
+	Client   arvados.Client
+	Postgres pgConfig
+	Listen   string
+
+	ClientEventQueue int
+	ServerEventQueue int
+}
+
+func DefaultConfig() Config {
+	return Config{
+		ClientEventQueue: 64,
+	}
+}
diff --git a/services/ws/event.go b/services/ws/event.go
new file mode 100644
index 0000000..26cdb3b
--- /dev/null
+++ b/services/ws/event.go
@@ -0,0 +1,4 @@
+package main
+
+type event struct {
+}
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
new file mode 100644
index 0000000..b53f950
--- /dev/null
+++ b/services/ws/handler_v0.go
@@ -0,0 +1,8 @@
+package main
+
+import (
+	"io"
+)
+
+func handlerV0(ws io.ReadWriter) {
+}
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
new file mode 100644
index 0000000..fcf85dd
--- /dev/null
+++ b/services/ws/handler_v1.go
@@ -0,0 +1,8 @@
+package main
+
+import (
+	"io"
+)
+
+func handlerV1(ws io.ReadWriter) {
+}
diff --git a/services/ws/main.go b/services/ws/main.go
new file mode 100644
index 0000000..ce56d0d
--- /dev/null
+++ b/services/ws/main.go
@@ -0,0 +1,33 @@
+package main
+
+import (
+	"flag"
+	"log"
+	"net/http"
+	"time"
+
+	"git.curoverse.com/arvados.git/sdk/go/config"
+)
+
+func main() {
+	configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
+	cfg := DefaultConfig()
+	err := config.LoadFile(&cfg, *configPath)
+	if err != nil {
+		log.Fatal(err)
+	}
+
+	srv := &http.Server{
+		Addr:           cfg.Listen,
+		ReadTimeout:    time.Minute,
+		WriteTimeout:   time.Minute,
+		MaxHeaderBytes: 1 << 20,
+		Handler: &router{
+			EventSource: (&pgEventSource{
+				PgConfig:  cfg.Postgres,
+				QueueSize: cfg.ServerEventQueue,
+			}).EventSource(),
+		},
+	}
+	log.Fatal(srv.ListenAndServe())
+}
diff --git a/services/ws/pg.go b/services/ws/pg.go
new file mode 100644
index 0000000..01b5bff
--- /dev/null
+++ b/services/ws/pg.go
@@ -0,0 +1,46 @@
+package main
+
+import (
+	"database/sql"
+	"log"
+	"strings"
+	"sync"
+
+	_ "github.com/lib/pq"
+)
+
+type pgConfig map[string]string
+
+func (c pgConfig) ConnectionString() string {
+	s := ""
+	for k, v := range c {
+		s += k
+		s += "='"
+		s += strings.Replace(
+			strings.Replace(v, `\`, `\\`, -1),
+			`'`, `\'`, -1)
+		s += "' "
+	}
+	return s
+}
+
+type pgEventSource struct {
+	PgConfig  pgConfig
+	QueueSize int
+
+	db        *sql.DB
+	setupOnce sync.Once
+}
+
+func (es *pgEventSource) setup() {
+	db, err := sql.Open("postgres", es.PgConfig.ConnectionString())
+	if err != nil {
+		log.Fatal(err)
+	}
+	es.db = db
+}
+
+func (es *pgEventSource) EventSource() <-chan event {
+	es.setupOnce.Do(es.setup)
+	return nil
+}
diff --git a/services/ws/router.go b/services/ws/router.go
new file mode 100644
index 0000000..8b7658e
--- /dev/null
+++ b/services/ws/router.go
@@ -0,0 +1,41 @@
+package main
+
+import (
+	"io"
+	"log"
+	"net/http"
+	"sync"
+
+	"golang.org/x/net/websocket"
+)
+
+type router struct {
+	EventSource <-chan event
+	mux         *http.ServeMux
+	setupOnce   sync.Once
+}
+
+func (rtr *router) setup() {
+	rtr.mux = http.NewServeMux()
+	rtr.mux.Handle("/websocket", makeServer(handlerV0))
+	rtr.mux.Handle("/arvados/v1/events.ws", makeServer(handlerV1))
+}
+
+func makeServer(handler func(io.ReadWriter)) websocket.Server {
+	return websocket.Server{
+		Handshake: func(c *websocket.Config, r *http.Request) error {
+			return nil
+		},
+		Handler: websocket.Handler(func(ws *websocket.Conn) {
+			log.Printf("socket request: %+v", ws.Request())
+			handler(ws)
+			ws.Close()
+			log.Printf("socket disconnect: %+v", ws.Request().RemoteAddr)
+		}),
+	}
+}
+
+func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+	rtr.setupOnce.Do(rtr.setup)
+	rtr.mux.ServeHTTP(resp, req)
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list