[arvados] created: 2.7.0-6398-g1904360588
git repository hosting
git at public.arvados.org
Wed Apr 10 13:47:58 UTC 2024
at 1904360588999c32a0790faa7abe9fab617c1480 (commit)
commit 1904360588999c32a0790faa7abe9fab617c1480
Author: Tom Clegg <tom at curii.com>
Date: Thu Mar 28 10:40:12 2024 -0400
21606: Change default output buffer size to 0.
See https://dev.arvados.org/issues/21606#note-8.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index a081c2c993..aadd548fd9 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -804,7 +804,7 @@ Clusters:
# Per-connection output buffer for WebDAV downloads. May improve
# throughput for large files, particularly when storage volumes
# have high latency.
- WebDAVOutputBuffer: 1M
+ WebDAVOutputBuffer: 0
Login:
# One of the following mechanisms (Google, PAM, LDAP, or
commit 4cb15f0f439d19c43ee56546b7143163f48321f5
Author: Tom Clegg <tom at curii.com>
Date: Thu Mar 28 10:39:34 2024 -0400
21606: Improve code comments.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/services/keep-web/writebuffer.go b/services/keep-web/writebuffer.go
index f309b69484..90bdcb476b 100644
--- a/services/keep-web/writebuffer.go
+++ b/services/keep-web/writebuffer.go
@@ -11,10 +11,19 @@ import (
"sync/atomic"
)
+// writeBuffer uses a ring buffer to implement an asynchronous write
+// buffer.
+//
+// rpos==wpos means the buffer is empty.
+//
+// rpos==(wpos+1)%size means the buffer is full.
+//
+// size<2 means the buffer is always empty and full, so in this case
+// writeBuffer writes through synchronously.
type writeBuffer struct {
out io.Writer
buf []byte
- writesize int
+ writesize int // max bytes flush() should write in a single out.Write()
wpos atomic.Int64 // index in buf where writer (Write()) will write to next
wsignal chan struct{} // receives a value after wpos or closed changes
rpos atomic.Int64 // index in buf where reader (flush()) will read from next
@@ -63,6 +72,7 @@ func (wb *writeBuffer) Write(p []byte) (int, error) {
wpos := int(wb.wpos.Load())
rpos := int(wb.rpos.Load())
for len(todo) > 0 {
+ // wait until the buffer is not full.
for rpos == (wpos+1)%len(wb.buf) {
select {
case <-wb.flushed:
@@ -74,6 +84,8 @@ func (wb *writeBuffer) Write(p []byte) (int, error) {
rpos = int(wb.rpos.Load())
}
}
+ // determine next contiguous portion of buffer that is
+ // available.
var avail []byte
if rpos == 0 {
avail = wb.buf[wpos : len(wb.buf)-1]
@@ -101,6 +113,7 @@ func (wb *writeBuffer) flush() {
wpos := 0
closed := false
for {
+ // wait until buffer is not empty.
for rpos == wpos {
if closed {
return
@@ -109,6 +122,8 @@ func (wb *writeBuffer) flush() {
closed = wb.closed.Load()
wpos = int(wb.wpos.Load())
}
+ // determine next contiguous portion of buffer that is
+ // ready to write through.
var ready []byte
if rpos < wpos {
ready = wb.buf[rpos:wpos]
@@ -131,6 +146,11 @@ func (wb *writeBuffer) flush() {
}
}
+// responseWriter enables inserting an io.Writer-wrapper (like
+// *writeBuffer) into an http.ResponseWriter stack.
+//
+// It passes Write() calls to an io.Writer, and all other calls to an
+// http.ResponseWriter.
type responseWriter struct {
io.Writer
http.ResponseWriter
commit 25e0eebe45472f1b144f51cc6bec5584bd585f58
Author: Tom Clegg <tom at curii.com>
Date: Thu Mar 21 11:01:46 2024 -0400
21606: Update config comment.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index eb9173ec55..a081c2c993 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -802,7 +802,8 @@ Clusters:
WebDAVLogEvents: true
# Per-connection output buffer for WebDAV downloads. May improve
- # throughput for large files.
+ # throughput for large files, particularly when storage volumes
+ # have high latency.
WebDAVOutputBuffer: 1M
Login:
commit 8d0062abaad0808bb1bfceca41f4dcdf0e317ba2
Author: Tom Clegg <tom at curii.com>
Date: Tue Mar 19 15:37:32 2024 -0400
21606: Add output buffer for webdav downloads.
Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>
diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index a3ae4fd56b..eb9173ec55 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -801,6 +801,10 @@ Clusters:
# load on the API server and you don't need it.
WebDAVLogEvents: true
+ # Per-connection output buffer for WebDAV downloads. May improve
+ # throughput for large files.
+ WebDAVOutputBuffer: 1M
+
Login:
# One of the following mechanisms (Google, PAM, LDAP, or
# LoginCluster) should be enabled; see
diff --git a/lib/config/export.go b/lib/config/export.go
index 4b6c142ff2..f511ebbcb1 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -122,6 +122,7 @@ var whitelist = map[string]bool{
"Collections.TrustAllContent": true,
"Collections.WebDAVCache": false,
"Collections.WebDAVLogEvents": false,
+ "Collections.WebDAVOutputBuffer": false,
"Collections.WebDAVPermission": false,
"Containers": true,
"Containers.AlwaysUsePreemptibleInstances": true,
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 698ee20d8c..116051b09e 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -159,6 +159,7 @@ type Cluster struct {
KeepproxyPermission UploadDownloadRolePermissions
WebDAVPermission UploadDownloadRolePermissions
WebDAVLogEvents bool
+ WebDAVOutputBuffer ByteSize
}
Git struct {
GitCommand string
diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index e0da14e774..cdd51f0bb7 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -178,7 +178,12 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
r.URL.Scheme = xfp
}
- w := httpserver.WrapResponseWriter(wOrig)
+ wbuffer := newWriteBuffer(wOrig, int(h.Cluster.Collections.WebDAVOutputBuffer))
+ defer wbuffer.Close()
+ w := httpserver.WrapResponseWriter(responseWriter{
+ Writer: wbuffer,
+ ResponseWriter: wOrig,
+ })
if r.Method == "OPTIONS" && ServeCORSPreflight(w, r.Header) {
return
diff --git a/services/keep-web/writebuffer.go b/services/keep-web/writebuffer.go
new file mode 100644
index 0000000000..f309b69484
--- /dev/null
+++ b/services/keep-web/writebuffer.go
@@ -0,0 +1,141 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepweb
+
+import (
+ "errors"
+ "io"
+ "net/http"
+ "sync/atomic"
+)
+
+type writeBuffer struct {
+ out io.Writer
+ buf []byte
+ writesize int
+ wpos atomic.Int64 // index in buf where writer (Write()) will write to next
+ wsignal chan struct{} // receives a value after wpos or closed changes
+ rpos atomic.Int64 // index in buf where reader (flush()) will read from next
+ rsignal chan struct{} // receives a value after rpos or err changes
+ err error // error encountered by flush
+ closed atomic.Bool
+ flushed chan struct{} // closes when flush() is finished
+}
+
+func newWriteBuffer(w io.Writer, size int) *writeBuffer {
+ wb := &writeBuffer{
+ out: w,
+ buf: make([]byte, size),
+ writesize: (size + 63) / 64,
+ wsignal: make(chan struct{}, 1),
+ rsignal: make(chan struct{}, 1),
+ flushed: make(chan struct{}),
+ }
+ go wb.flush()
+ return wb
+}
+
+func (wb *writeBuffer) Close() error {
+ if wb.closed.Load() {
+ return errors.New("writeBuffer: already closed")
+ }
+ wb.closed.Store(true)
+ // wake up flush()
+ select {
+ case wb.wsignal <- struct{}{}:
+ default:
+ }
+ // wait for flush() to finish
+ <-wb.flushed
+ return wb.err
+}
+
+func (wb *writeBuffer) Write(p []byte) (int, error) {
+ if len(wb.buf) < 2 {
+ // Our buffer logic doesn't work with size<2, and such
+ // a tiny buffer has no purpose anyway, so just write
+ // through unbuffered.
+ return wb.out.Write(p)
+ }
+ todo := p
+ wpos := int(wb.wpos.Load())
+ rpos := int(wb.rpos.Load())
+ for len(todo) > 0 {
+ for rpos == (wpos+1)%len(wb.buf) {
+ select {
+ case <-wb.flushed:
+ if wb.err == nil {
+ return 0, errors.New("Write called on closed writeBuffer")
+ }
+ return 0, wb.err
+ case <-wb.rsignal:
+ rpos = int(wb.rpos.Load())
+ }
+ }
+ var avail []byte
+ if rpos == 0 {
+ avail = wb.buf[wpos : len(wb.buf)-1]
+ } else if wpos >= rpos {
+ avail = wb.buf[wpos:]
+ } else {
+ avail = wb.buf[wpos : rpos-1]
+ }
+ n := copy(avail, todo)
+ wpos = (wpos + n) % len(wb.buf)
+ wb.wpos.Store(int64(wpos))
+ // wake up flush()
+ select {
+ case wb.wsignal <- struct{}{}:
+ default:
+ }
+ todo = todo[n:]
+ }
+ return len(p), nil
+}
+
+func (wb *writeBuffer) flush() {
+ defer close(wb.flushed)
+ rpos := 0
+ wpos := 0
+ closed := false
+ for {
+ for rpos == wpos {
+ if closed {
+ return
+ }
+ <-wb.wsignal
+ closed = wb.closed.Load()
+ wpos = int(wb.wpos.Load())
+ }
+ var ready []byte
+ if rpos < wpos {
+ ready = wb.buf[rpos:wpos]
+ } else {
+ ready = wb.buf[rpos:]
+ }
+ if len(ready) > wb.writesize {
+ ready = ready[:wb.writesize]
+ }
+ _, wb.err = wb.out.Write(ready)
+ if wb.err != nil {
+ return
+ }
+ rpos = (rpos + len(ready)) % len(wb.buf)
+ wb.rpos.Store(int64(rpos))
+ select {
+ case wb.rsignal <- struct{}{}:
+ default:
+ }
+ }
+}
+
+type responseWriter struct {
+ io.Writer
+ http.ResponseWriter
+}
+
+func (rwc responseWriter) Write(p []byte) (int, error) {
+ return rwc.Writer.Write(p)
+}
diff --git a/services/keep-web/writebuffer_test.go b/services/keep-web/writebuffer_test.go
new file mode 100644
index 0000000000..589dc241a2
--- /dev/null
+++ b/services/keep-web/writebuffer_test.go
@@ -0,0 +1,98 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepweb
+
+import (
+ "bytes"
+ "io"
+ "math/rand"
+ "time"
+
+ . "gopkg.in/check.v1"
+)
+
+var _ = Suite(&writeBufferSuite{})
+
+type writeBufferSuite struct {
+}
+
+// 1000 / 96.3 ns/op = 10.384 GB/s
+func (s *writeBufferSuite) Benchmark_1KBWrites(c *C) {
+ wb := newWriteBuffer(io.Discard, 1<<20)
+ in := make([]byte, 1000)
+ for i := 0; i < c.N; i++ {
+ wb.Write(in)
+ }
+ wb.Close()
+}
+
+func (s *writeBufferSuite) TestRandomizedSpeedsAndSizes(c *C) {
+ for i := 0; i < 20; i++ {
+ insize := rand.Intn(1 << 26)
+ bufsize := rand.Intn(1 << 26)
+ if i < 2 {
+ // make sure to test edge cases
+ bufsize = i
+ } else if insize/bufsize > 1000 {
+ // don't waste too much time testing tiny
+ // buffer / huge content
+ insize = bufsize*1000 + 123
+ }
+ c.Logf("%s: insize %d bufsize %d", c.TestName(), insize, bufsize)
+
+ in := make([]byte, insize)
+ b := byte(0)
+ for i := range in {
+ in[i] = b
+ b++
+ }
+
+ out := &bytes.Buffer{}
+ done := make(chan struct{})
+ pr, pw := io.Pipe()
+ go func() {
+ n, err := slowCopy(out, pr, rand.Intn(8192)+1)
+ c.Check(err, IsNil)
+ c.Check(n, Equals, int64(insize))
+ close(done)
+ }()
+ wb := newWriteBuffer(pw, bufsize)
+ n, err := slowCopy(wb, bytes.NewBuffer(in), rand.Intn(8192)+1)
+ c.Check(err, IsNil)
+ c.Check(n, Equals, int64(insize))
+ c.Check(wb.Close(), IsNil)
+ c.Check(pw.Close(), IsNil)
+ <-done
+ c.Check(out.Len(), Equals, insize)
+ for i := 0; i < out.Len() && i < len(in); i++ {
+ if out.Bytes()[i] != in[i] {
+ c.Errorf("content mismatch at byte %d", i)
+ break
+ }
+ }
+ }
+}
+
+func slowCopy(dst io.Writer, src io.Reader, bufsize int) (int64, error) {
+ wrote := int64(0)
+ buf := make([]byte, bufsize)
+ for {
+ time.Sleep(time.Duration(rand.Intn(100) + 1))
+ n, err := src.Read(buf)
+ if n > 0 {
+ n, err := dst.Write(buf[:n])
+ wrote += int64(n)
+ if err != nil {
+ return wrote, err
+ }
+ }
+ if err == io.EOF {
+ return wrote, nil
+ }
+ if err != nil {
+ return wrote, err
+ }
+ }
+}
-----------------------------------------------------------------------
hooks/post-receive
--
More information about the arvados-commits
mailing list