[ARVADOS] created: 1c4dff8e8b1731567829f455b33b7716bc87c734
Git user
git at public.curoverse.com
Tue Aug 8 13:05:41 EDT 2017
at 1c4dff8e8b1731567829f455b33b7716bc87c734 (commit)
commit 1c4dff8e8b1731567829f455b33b7716bc87c734
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Aug 8 13:04:17 2017 -0400
12087: Exit "wait for low water mark" loop if session terminates.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>
diff --git a/services/ws/session_v0.go b/services/ws/session_v0.go
index 58c6423..f2daa60 100644
--- a/services/ws/session_v0.go
+++ b/services/ws/session_v0.go
@@ -205,6 +205,10 @@ func (sub *v0subscribe) sendOldEvents(sess *v0session) {
// client will probably reconnect and do the
// same thing all over again.
time.Sleep(100 * time.Millisecond)
+ if sess.ws.Request().Context().Error() != nil {
+ // Session terminated while we were sleeping
+ return
+ }
}
now := time.Now()
e := &event{
commit a92b78c95e9ab9c94e0f937e405e7d8e1a9508ac
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Aug 8 13:04:05 2017 -0400
12087: Remove useless continue statement.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>
diff --git a/services/ws/handler.go b/services/ws/handler.go
index b1aa59f..7528adf 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -179,7 +179,6 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
default:
}
}
- continue
case e, ok := <-incoming.Channel():
if !ok {
return
commit 99d51e812b916027445c3e26b1f0f7daea134c96
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Aug 8 13:03:20 2017 -0400
12087: Don't report read/write errors on terminated sessions.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>
diff --git a/services/ws/handler.go b/services/ws/handler.go
index b59e7a7..b1aa59f 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -76,7 +76,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
err = errFrameTooBig
}
if err != nil {
- if err != io.EOF {
+ if err != io.EOF && ctx.Error() == nil {
log.WithError(err).Info("read error")
}
return
@@ -134,7 +134,9 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
t0 := time.Now()
_, err = ws.Write(buf)
if err != nil {
- log.WithError(err).Error("write failed")
+ if ctx.Error() == nil {
+ log.WithError(err).Error("write failed")
+ }
return
}
log.Debug("sent")
commit 366381df5e5e7763e0621c1da5b7f773ec7702d6
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Aug 8 12:26:06 2017 -0400
12087: Fixup some problems found by go vet.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>
diff --git a/sdk/go/arvados/container.go b/sdk/go/arvados/container.go
index 7d39d67..7e588be 100644
--- a/sdk/go/arvados/container.go
+++ b/sdk/go/arvados/container.go
@@ -31,7 +31,7 @@ type Mount struct {
Path string `json:"path"`
Content interface{} `json:"content"`
ExcludeFromOutput bool `json:"exclude_from_output"`
- Capacity int64 `json:capacity`
+ Capacity int64 `json:"capacity"`
}
// RuntimeConstraints specify a container's compute resources (RAM,
diff --git a/sdk/go/arvados/error.go b/sdk/go/arvados/error.go
index 29eebdb..773a2e6 100644
--- a/sdk/go/arvados/error.go
+++ b/sdk/go/arvados/error.go
@@ -21,7 +21,7 @@ type TransactionError struct {
}
func (e TransactionError) Error() (s string) {
- s = fmt.Sprintf("request failed: %s", e.URL)
+ s = fmt.Sprintf("request failed: %s", e.URL.String())
if e.Status != "" {
s = s + ": " + e.Status
}
commit 61bde67a2ca4b49b1609336869c145c99abad7f6
Author: Tom Clegg <tom at curoverse.com>
Date: Tue Aug 8 12:10:33 2017 -0400
12087: Fix up context cancel usage.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curoverse.com>
diff --git a/services/ws/event_source.go b/services/ws/event_source.go
index edeb647..cfb828b 100644
--- a/services/ws/event_source.go
+++ b/services/ws/event_source.go
@@ -248,7 +248,8 @@ func (ps *pgEventSource) DB() *sql.DB {
}
func (ps *pgEventSource) DBHealth() error {
- ctx, _ := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+ ctx, cancel := context.WithDeadline(context.Background(), time.Now().Add(time.Second))
+ defer cancel()
var i int
return ps.db.QueryRowContext(ctx, "SELECT 1").Scan(&i)
}
diff --git a/services/ws/handler.go b/services/ws/handler.go
index f9f7f53..b59e7a7 100644
--- a/services/ws/handler.go
+++ b/services/ws/handler.go
@@ -60,6 +60,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
// Receive websocket frames from the client and pass them to
// sess.Receive().
go func() {
+ defer cancel()
buf := make([]byte, 2<<20)
for {
select {
@@ -78,13 +79,11 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
if err != io.EOF {
log.WithError(err).Info("read error")
}
- cancel()
return
}
err = sess.Receive(buf)
if err != nil {
log.WithError(err).Error("sess.Receive() failed")
- cancel()
return
}
}
@@ -94,6 +93,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
// sess.EventMessage() as needed, and send them to the client
// as websocket frames.
go func() {
+ defer cancel()
for {
var ok bool
var data interface{}
@@ -119,8 +119,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
buf, err = sess.EventMessage(e)
if err != nil {
log.WithError(err).Error("EventMessage failed")
- cancel()
- break
+ return
} else if len(buf) == 0 {
log.Debug("skip")
continue
@@ -136,8 +135,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
_, err = ws.Write(buf)
if err != nil {
log.WithError(err).Error("write failed")
- cancel()
- break
+ return
}
log.Debug("sent")
@@ -159,6 +157,7 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
// is done/cancelled or the incoming event stream ends. Shut
// down the handler if the outgoing queue fills up.
go func() {
+ defer cancel()
ticker := time.NewTicker(h.PingTimeout)
defer ticker.Stop()
@@ -181,7 +180,6 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
continue
case e, ok := <-incoming.Channel():
if !ok {
- cancel()
return
}
if !sess.Filter(e) {
@@ -191,7 +189,6 @@ func (h *handler) Handle(ws wsConn, eventSource eventSource, newSession func(wsC
case queue <- e:
default:
log.WithError(errQueueFull).Error("terminate")
- cancel()
return
}
}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list