[ARVADOS] created: 7cb536fa58d8cc837b4cb59680c7355a1687648b
Git user
git at public.curoverse.com
Sun Nov 13 19:43:55 EST 2016
at 7cb536fa58d8cc837b4cb59680c7355a1687648b (commit)
commit 7cb536fa58d8cc837b4cb59680c7355a1687648b
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Nov 13 19:43:46 2016 -0500
8460: Receive events and broadcast to clients.
diff --git a/services/ws/config.go b/services/ws/config.go
index 60731f9..9a2bb3c 100644
--- a/services/ws/config.go
+++ b/services/ws/config.go
@@ -8,6 +8,7 @@ type Config struct {
Client arvados.Client
Postgres pgConfig
Listen string
+ Debug bool
ClientEventQueue int
ServerEventQueue int
diff --git a/services/ws/event.go b/services/ws/event.go
index 26cdb3b..1634a7a 100644
--- a/services/ws/event.go
+++ b/services/ws/event.go
@@ -1,4 +1,54 @@
package main
+import (
+ "database/sql"
+ "log"
+ "sync"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type eventSink interface {
+ Channel() <-chan *event
+ Stop()
+}
+
+type eventSource interface {
+ NewSink(chan *event) eventSink
+}
+
type event struct {
+ LogUUID string
+ Received time.Time
+ Serial uint64
+
+ logRow *arvados.Log
+ err error
+ mtx sync.Mutex
+}
+
+// Detail returns the database row corresponding to the event. It can
+// be called safely from multiple goroutines. Only one attempt will be
+// made. If the database row cannot be retrieved, Detail returns nil.
+func (e *event) Detail(db *sql.DB) *arvados.Log {
+ e.mtx.Lock()
+ defer e.mtx.Unlock()
+ if e.logRow != nil || e.err != nil {
+ return e.logRow
+ }
+ var logRow arvados.Log
+ var oldAttrs, newAttrs []byte
+ e.err = db.QueryRow(`SELECT uuid, object_uuid, object_owner_uuid, event_type, created_at, old_attributes, new_attributes FROM logs WHERE uuid = ?`, e.LogUUID).Scan(
+ &logRow.UUID,
+ &logRow.ObjectUUID,
+ &logRow.ObjectOwnerUUID,
+ &logRow.EventType,
+ &logRow.CreatedAt,
+ &oldAttrs,
+ &newAttrs)
+ if e.err != nil {
+ log.Printf("retrieving log row %s: %s", e.LogUUID, e.err)
+ }
+ return e.logRow
}
diff --git a/services/ws/handler.go b/services/ws/handler.go
new file mode 100644
index 0000000..c768143
--- /dev/null
+++ b/services/ws/handler.go
@@ -0,0 +1,9 @@
+package main
+
+import (
+ "io"
+)
+
+type handler interface {
+ Handle(io.ReadWriter, <-chan *event)
+}
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
index b53f950..40ab75b 100644
--- a/services/ws/handler_v0.go
+++ b/services/ws/handler_v0.go
@@ -4,5 +4,9 @@ import (
"io"
)
-func handlerV0(ws io.ReadWriter) {
+type handlerV0 struct {
+ QueueSize int
+}
+
+func (h *handlerV0) Handle(ws io.ReadWriter, events <-chan *event) {
}
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
index fcf85dd..4b3f12b 100644
--- a/services/ws/handler_v1.go
+++ b/services/ws/handler_v1.go
@@ -4,5 +4,9 @@ import (
"io"
)
-func handlerV1(ws io.ReadWriter) {
+type handlerV1 struct {
+ QueueSize int
+}
+
+func (h *handlerV1) Handle(ws io.ReadWriter, events <-chan *event) {
}
diff --git a/services/ws/main.go b/services/ws/main.go
index 9a24b31..0f97823 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -10,6 +10,8 @@ import (
"git.curoverse.com/arvados.git/sdk/go/config"
)
+var debugLogf = func(string, ...interface{}) {}
+
func main() {
configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
@@ -20,6 +22,9 @@ func main() {
if err != nil {
log.Fatal(err)
}
+ if cfg.Debug {
+ debugLogf = log.Printf
+ }
if *dumpConfig {
txt, err := config.Dump(&cfg)
@@ -36,10 +41,11 @@ func main() {
WriteTimeout: time.Minute,
MaxHeaderBytes: 1 << 20,
Handler: &router{
- EventSource: (&pgEventSource{
+ Config: &cfg,
+ eventSource: &pgEventSource{
PgConfig: cfg.Postgres,
QueueSize: cfg.ServerEventQueue,
- }).EventSource(),
+ },
},
}
log.Fatal(srv.ListenAndServe())
diff --git a/services/ws/pg.go b/services/ws/pg.go
index 01b5bff..6bce668 100644
--- a/services/ws/pg.go
+++ b/services/ws/pg.go
@@ -5,8 +5,9 @@ import (
"log"
"strings"
"sync"
+ "time"
- _ "github.com/lib/pq"
+ "github.com/lib/pq"
)
type pgConfig map[string]string
@@ -28,19 +29,117 @@ type pgEventSource struct {
PgConfig pgConfig
QueueSize int
- db *sql.DB
- setupOnce sync.Once
+ pqListener *pq.Listener
+ sinks map[*pgEventSink]bool
+ setupOnce sync.Once
+ mtx sync.Mutex
}
-func (es *pgEventSource) setup() {
- db, err := sql.Open("postgres", es.PgConfig.ConnectionString())
+func (ps *pgEventSource) setup() {
+ ps.sinks = make(map[*pgEventSink]bool)
+ go ps.run()
+}
+
+func (ps *pgEventSource) run() {
+ db, err := sql.Open("postgres", ps.PgConfig.ConnectionString())
if err != nil {
log.Fatal(err)
}
- es.db = db
+
+ listener := pq.NewListener(ps.PgConfig.ConnectionString(), time.Second, time.Minute, func(ev pq.ListenerEventType, err error) {
+ if err != nil {
+ // Until we have a mechanism for catching up
+ // on missed events, we cannot recover from a
+ // dropped connection without breaking our
+ // promises to clients.
+ log.Fatal(err)
+ }
+ })
+ err = listener.Listen("logs")
+ if err != nil {
+ log.Fatal(err)
+ }
+ go func() {
+ for _ = range time.NewTicker(time.Minute).C {
+ listener.Ping()
+ }
+ }()
+
+ eventQueue := make(chan *event, ps.QueueSize)
+ go func() {
+ for e := range eventQueue {
+ // Wait for the "select ... from logs" call to
+ // finish. This limits max concurrent queries
+ // to ps.QueueSize. Without this, max
+ // concurrent queries would be bounded by
+ // client_count X client_queue_size.
+ e.Detail(db)
+ debugLogf("%+v", e)
+ ps.mtx.Lock()
+ for sink := range ps.sinks {
+ sink.channel <- e
+ }
+ ps.mtx.Unlock()
+ }
+ }()
+
+ var serial uint64
+ for pqEvent := range listener.Notify {
+ if pqEvent.Channel != "logs" {
+ continue
+ }
+ serial++
+ e := &event{
+ LogUUID: pqEvent.Extra,
+ Received: time.Now(),
+ Serial: serial,
+ }
+ debugLogf("%+v", e)
+ eventQueue <- e
+ go e.Detail(db)
+ }
+}
+
+// NewSink subscribes to the event source. If c is not nil, it will be
+// used as the event channel. Otherwise, a new channel will be
+// created. Either way, the sink channel will be returned by the
+// Channel() method of the returned eventSink. All subsequent events
+// will be sent to the sink channel. The caller must ensure events are
+// received from the sink channel as quickly as possible: when one
+// sink blocks, all other sinks also block.
+func (ps *pgEventSource) NewSink(c chan *event) eventSink {
+ ps.setupOnce.Do(ps.setup)
+ if c == nil {
+ c = make(chan *event, 1)
+ }
+ sink := &pgEventSink{
+ channel: c,
+ source: ps,
+ }
+ ps.mtx.Lock()
+ ps.sinks[sink] = true
+ ps.mtx.Unlock()
+ return sink
+}
+
+type pgEventSink struct {
+ channel chan *event
+ source *pgEventSource
+}
+
+func (sink *pgEventSink) Channel() <-chan *event {
+ return sink.channel
}
-func (es *pgEventSource) EventSource() <-chan event {
- es.setupOnce.Do(es.setup)
- return nil
+func (sink *pgEventSink) Stop() {
+ go func() {
+ // Ensure this sink cannot fill up and block the
+ // server-side queue (which otherwise could in turn
+ // block our mtx.Lock() here)
+ for _ = range sink.channel {}
+ }()
+ sink.source.mtx.Lock()
+ delete(sink.source.sinks, sink)
+ sink.source.mtx.Unlock()
+ close(sink.channel)
}
diff --git a/services/ws/router.go b/services/ws/router.go
index 8b7658e..01c1477 100644
--- a/services/ws/router.go
+++ b/services/ws/router.go
@@ -1,7 +1,6 @@
package main
import (
- "io"
"log"
"net/http"
"sync"
@@ -10,25 +9,33 @@ import (
)
type router struct {
- EventSource <-chan event
+ Config *Config
+
+ eventSource eventSource
mux *http.ServeMux
setupOnce sync.Once
}
func (rtr *router) setup() {
rtr.mux = http.NewServeMux()
- rtr.mux.Handle("/websocket", makeServer(handlerV0))
- rtr.mux.Handle("/arvados/v1/events.ws", makeServer(handlerV1))
+ rtr.mux.Handle("/websocket", rtr.makeServer(&handlerV0{
+ QueueSize: rtr.Config.ClientEventQueue,
+ }))
+ rtr.mux.Handle("/arvados/v1/events.ws", rtr.makeServer(&handlerV1{
+ QueueSize: rtr.Config.ClientEventQueue,
+ }))
}
-func makeServer(handler func(io.ReadWriter)) websocket.Server {
- return websocket.Server{
+func (rtr *router) makeServer(handler handler) *websocket.Server {
+ return &websocket.Server{
Handshake: func(c *websocket.Config, r *http.Request) error {
return nil
},
Handler: websocket.Handler(func(ws *websocket.Conn) {
log.Printf("socket request: %+v", ws.Request())
- handler(ws)
+ sink := rtr.eventSource.NewSink(nil)
+ handler.Handle(ws, sink.Channel())
+ sink.Stop()
ws.Close()
log.Printf("socket disconnect: %+v", ws.Request().RemoteAddr)
}),
commit 1e094e23db422cabe013ecbf7a0b465c4b096e12
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Nov 13 17:14:18 2016 -0500
8460: Add default config and -dump-config flag.
diff --git a/sdk/go/config/load.go b/sdk/go/config/load.go
index 9c65d65..2bbb440 100644
--- a/sdk/go/config/load.go
+++ b/sdk/go/config/load.go
@@ -22,3 +22,8 @@ func LoadFile(cfg interface{}, configPath string) error {
}
return nil
}
+
+// Dump returns a YAML representation of cfg.
+func Dump(cfg interface{}) ([]byte, error) {
+ return yaml.Marshal(cfg)
+}
diff --git a/services/ws/config.go b/services/ws/config.go
index fbdedb5..60731f9 100644
--- a/services/ws/config.go
+++ b/services/ws/config.go
@@ -15,6 +15,17 @@ type Config struct {
func DefaultConfig() Config {
return Config{
+ Client: arvados.Client{
+ APIHost: "localhost:443",
+ },
+ Postgres: pgConfig{
+ "dbname": "arvados_test",
+ "user": "arvados",
+ "password": "xyzzy",
+ "host": "localhost",
+ "connect_timeout": "30",
+ "sslmode": "disable",
+ },
ClientEventQueue: 64,
}
}
diff --git a/services/ws/main.go b/services/ws/main.go
index ce56d0d..9a24b31 100644
--- a/services/ws/main.go
+++ b/services/ws/main.go
@@ -2,6 +2,7 @@ package main
import (
"flag"
+ "fmt"
"log"
"net/http"
"time"
@@ -11,12 +12,24 @@ import (
func main() {
configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
+ dumpConfig := flag.Bool("dump-config", false, "show current configuration and exit")
cfg := DefaultConfig()
+ flag.Parse()
+
err := config.LoadFile(&cfg, *configPath)
if err != nil {
log.Fatal(err)
}
+ if *dumpConfig {
+ txt, err := config.Dump(&cfg)
+ if err != nil {
+ log.Fatal(err)
+ }
+ fmt.Print(string(txt))
+ return
+ }
+
srv := &http.Server{
Addr: cfg.Listen,
ReadTimeout: time.Minute,
commit 2dec79b036796df75885abf4e1a977958f9c541d
Author: Tom Clegg <tom at curoverse.com>
Date: Sun Nov 13 15:46:49 2016 -0500
8460: Scaffold for websocket server.
diff --git a/services/ws/config.go b/services/ws/config.go
new file mode 100644
index 0000000..fbdedb5
--- /dev/null
+++ b/services/ws/config.go
@@ -0,0 +1,20 @@
+package main
+
+import (
+ "git.curoverse.com/arvados.git/sdk/go/arvados"
+)
+
+type Config struct {
+ Client arvados.Client
+ Postgres pgConfig
+ Listen string
+
+ ClientEventQueue int
+ ServerEventQueue int
+}
+
+func DefaultConfig() Config {
+ return Config{
+ ClientEventQueue: 64,
+ }
+}
diff --git a/services/ws/event.go b/services/ws/event.go
new file mode 100644
index 0000000..26cdb3b
--- /dev/null
+++ b/services/ws/event.go
@@ -0,0 +1,4 @@
+package main
+
+type event struct {
+}
diff --git a/services/ws/handler_v0.go b/services/ws/handler_v0.go
new file mode 100644
index 0000000..b53f950
--- /dev/null
+++ b/services/ws/handler_v0.go
@@ -0,0 +1,8 @@
+package main
+
+import (
+ "io"
+)
+
+func handlerV0(ws io.ReadWriter) {
+}
diff --git a/services/ws/handler_v1.go b/services/ws/handler_v1.go
new file mode 100644
index 0000000..fcf85dd
--- /dev/null
+++ b/services/ws/handler_v1.go
@@ -0,0 +1,8 @@
+package main
+
+import (
+ "io"
+)
+
+func handlerV1(ws io.ReadWriter) {
+}
diff --git a/services/ws/main.go b/services/ws/main.go
new file mode 100644
index 0000000..ce56d0d
--- /dev/null
+++ b/services/ws/main.go
@@ -0,0 +1,33 @@
+package main
+
+import (
+ "flag"
+ "log"
+ "net/http"
+ "time"
+
+ "git.curoverse.com/arvados.git/sdk/go/config"
+)
+
+func main() {
+ configPath := flag.String("config", "/etc/arvados/ws/ws.yml", "`path` to config file")
+ cfg := DefaultConfig()
+ err := config.LoadFile(&cfg, *configPath)
+ if err != nil {
+ log.Fatal(err)
+ }
+
+ srv := &http.Server{
+ Addr: cfg.Listen,
+ ReadTimeout: time.Minute,
+ WriteTimeout: time.Minute,
+ MaxHeaderBytes: 1 << 20,
+ Handler: &router{
+ EventSource: (&pgEventSource{
+ PgConfig: cfg.Postgres,
+ QueueSize: cfg.ServerEventQueue,
+ }).EventSource(),
+ },
+ }
+ log.Fatal(srv.ListenAndServe())
+}
diff --git a/services/ws/pg.go b/services/ws/pg.go
new file mode 100644
index 0000000..01b5bff
--- /dev/null
+++ b/services/ws/pg.go
@@ -0,0 +1,46 @@
+package main
+
+import (
+ "database/sql"
+ "log"
+ "strings"
+ "sync"
+
+ _ "github.com/lib/pq"
+)
+
+type pgConfig map[string]string
+
+func (c pgConfig) ConnectionString() string {
+ s := ""
+ for k, v := range c {
+ s += k
+ s += "='"
+ s += strings.Replace(
+ strings.Replace(v, `\`, `\\`, -1),
+ `'`, `\'`, -1)
+ s += "' "
+ }
+ return s
+}
+
+type pgEventSource struct {
+ PgConfig pgConfig
+ QueueSize int
+
+ db *sql.DB
+ setupOnce sync.Once
+}
+
+func (es *pgEventSource) setup() {
+ db, err := sql.Open("postgres", es.PgConfig.ConnectionString())
+ if err != nil {
+ log.Fatal(err)
+ }
+ es.db = db
+}
+
+func (es *pgEventSource) EventSource() <-chan event {
+ es.setupOnce.Do(es.setup)
+ return nil
+}
diff --git a/services/ws/router.go b/services/ws/router.go
new file mode 100644
index 0000000..8b7658e
--- /dev/null
+++ b/services/ws/router.go
@@ -0,0 +1,41 @@
+package main
+
+import (
+ "io"
+ "log"
+ "net/http"
+ "sync"
+
+ "golang.org/x/net/websocket"
+)
+
+type router struct {
+ EventSource <-chan event
+ mux *http.ServeMux
+ setupOnce sync.Once
+}
+
+func (rtr *router) setup() {
+ rtr.mux = http.NewServeMux()
+ rtr.mux.Handle("/websocket", makeServer(handlerV0))
+ rtr.mux.Handle("/arvados/v1/events.ws", makeServer(handlerV1))
+}
+
+func makeServer(handler func(io.ReadWriter)) websocket.Server {
+ return websocket.Server{
+ Handshake: func(c *websocket.Config, r *http.Request) error {
+ return nil
+ },
+ Handler: websocket.Handler(func(ws *websocket.Conn) {
+ log.Printf("socket request: %+v", ws.Request())
+ handler(ws)
+ ws.Close()
+ log.Printf("socket disconnect: %+v", ws.Request().RemoteAddr)
+ }),
+ }
+}
+
+func (rtr *router) ServeHTTP(resp http.ResponseWriter, req *http.Request) {
+ rtr.setupOnce.Do(rtr.setup)
+ rtr.mux.ServeHTTP(resp, req)
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list