[arvados] created: 2.7.0-6398-g1904360588

git repository hosting git at public.arvados.org
Wed Apr 10 13:47:58 UTC 2024


        at  1904360588999c32a0790faa7abe9fab617c1480 (commit)


commit 1904360588999c32a0790faa7abe9fab617c1480
Author: Tom Clegg <tom at curii.com>
Date:   Thu Mar 28 10:40:12 2024 -0400

    21606: Change default output buffer size to 0.
    
    See https://dev.arvados.org/issues/21606#note-8.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index a081c2c993..aadd548fd9 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -804,7 +804,7 @@ Clusters:
       # Per-connection output buffer for WebDAV downloads. May improve
       # throughput for large files, particularly when storage volumes
       # have high latency.
-      WebDAVOutputBuffer: 1M
+      WebDAVOutputBuffer: 0
 
     Login:
       # One of the following mechanisms (Google, PAM, LDAP, or

commit 4cb15f0f439d19c43ee56546b7143163f48321f5
Author: Tom Clegg <tom at curii.com>
Date:   Thu Mar 28 10:39:34 2024 -0400

    21606: Improve code comments.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/services/keep-web/writebuffer.go b/services/keep-web/writebuffer.go
index f309b69484..90bdcb476b 100644
--- a/services/keep-web/writebuffer.go
+++ b/services/keep-web/writebuffer.go
@@ -11,10 +11,19 @@ import (
 	"sync/atomic"
 )
 
+// writeBuffer uses a ring buffer to implement an asynchronous write
+// buffer.
+//
+// rpos==wpos means the buffer is empty.
+//
+// rpos==(wpos+1)%size means the buffer is full.
+//
+// size<2 means the buffer is always empty and full, so in this case
+// writeBuffer writes through synchronously.
 type writeBuffer struct {
 	out       io.Writer
 	buf       []byte
-	writesize int
+	writesize int           // max bytes flush() should write in a single out.Write()
 	wpos      atomic.Int64  // index in buf where writer (Write()) will write to next
 	wsignal   chan struct{} // receives a value after wpos or closed changes
 	rpos      atomic.Int64  // index in buf where reader (flush()) will read from next
@@ -63,6 +72,7 @@ func (wb *writeBuffer) Write(p []byte) (int, error) {
 	wpos := int(wb.wpos.Load())
 	rpos := int(wb.rpos.Load())
 	for len(todo) > 0 {
+		// wait until the buffer is not full.
 		for rpos == (wpos+1)%len(wb.buf) {
 			select {
 			case <-wb.flushed:
@@ -74,6 +84,8 @@ func (wb *writeBuffer) Write(p []byte) (int, error) {
 				rpos = int(wb.rpos.Load())
 			}
 		}
+		// determine next contiguous portion of buffer that is
+		// available.
 		var avail []byte
 		if rpos == 0 {
 			avail = wb.buf[wpos : len(wb.buf)-1]
@@ -101,6 +113,7 @@ func (wb *writeBuffer) flush() {
 	wpos := 0
 	closed := false
 	for {
+		// wait until buffer is not empty.
 		for rpos == wpos {
 			if closed {
 				return
@@ -109,6 +122,8 @@ func (wb *writeBuffer) flush() {
 			closed = wb.closed.Load()
 			wpos = int(wb.wpos.Load())
 		}
+		// determine next contiguous portion of buffer that is
+		// ready to write through.
 		var ready []byte
 		if rpos < wpos {
 			ready = wb.buf[rpos:wpos]
@@ -131,6 +146,11 @@ func (wb *writeBuffer) flush() {
 	}
 }
 
+// responseWriter enables inserting an io.Writer-wrapper (like
+// *writeBuffer) into an http.ResponseWriter stack.
+//
+// It passes Write() calls to an io.Writer, and all other calls to an
+// http.ResponseWriter.
 type responseWriter struct {
 	io.Writer
 	http.ResponseWriter

commit 25e0eebe45472f1b144f51cc6bec5584bd585f58
Author: Tom Clegg <tom at curii.com>
Date:   Thu Mar 21 11:01:46 2024 -0400

    21606: Update config comment.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index eb9173ec55..a081c2c993 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -802,7 +802,8 @@ Clusters:
       WebDAVLogEvents: true
 
       # Per-connection output buffer for WebDAV downloads. May improve
-      # throughput for large files.
+      # throughput for large files, particularly when storage volumes
+      # have high latency.
       WebDAVOutputBuffer: 1M
 
     Login:

commit 8d0062abaad0808bb1bfceca41f4dcdf0e317ba2
Author: Tom Clegg <tom at curii.com>
Date:   Tue Mar 19 15:37:32 2024 -0400

    21606: Add output buffer for webdav downloads.
    
    Arvados-DCO-1.1-Signed-off-by: Tom Clegg <tom at curii.com>

diff --git a/lib/config/config.default.yml b/lib/config/config.default.yml
index a3ae4fd56b..eb9173ec55 100644
--- a/lib/config/config.default.yml
+++ b/lib/config/config.default.yml
@@ -801,6 +801,10 @@ Clusters:
       # load on the API server and you don't need it.
       WebDAVLogEvents: true
 
+      # Per-connection output buffer for WebDAV downloads. May improve
+      # throughput for large files.
+      WebDAVOutputBuffer: 1M
+
     Login:
       # One of the following mechanisms (Google, PAM, LDAP, or
       # LoginCluster) should be enabled; see
diff --git a/lib/config/export.go b/lib/config/export.go
index 4b6c142ff2..f511ebbcb1 100644
--- a/lib/config/export.go
+++ b/lib/config/export.go
@@ -122,6 +122,7 @@ var whitelist = map[string]bool{
 	"Collections.TrustAllContent":              true,
 	"Collections.WebDAVCache":                  false,
 	"Collections.WebDAVLogEvents":              false,
+	"Collections.WebDAVOutputBuffer":           false,
 	"Collections.WebDAVPermission":             false,
 	"Containers":                               true,
 	"Containers.AlwaysUsePreemptibleInstances": true,
diff --git a/sdk/go/arvados/config.go b/sdk/go/arvados/config.go
index 698ee20d8c..116051b09e 100644
--- a/sdk/go/arvados/config.go
+++ b/sdk/go/arvados/config.go
@@ -159,6 +159,7 @@ type Cluster struct {
 		KeepproxyPermission UploadDownloadRolePermissions
 		WebDAVPermission    UploadDownloadRolePermissions
 		WebDAVLogEvents     bool
+		WebDAVOutputBuffer  ByteSize
 	}
 	Git struct {
 		GitCommand   string
diff --git a/services/keep-web/handler.go b/services/keep-web/handler.go
index e0da14e774..cdd51f0bb7 100644
--- a/services/keep-web/handler.go
+++ b/services/keep-web/handler.go
@@ -178,7 +178,12 @@ func (h *handler) ServeHTTP(wOrig http.ResponseWriter, r *http.Request) {
 		r.URL.Scheme = xfp
 	}
 
-	w := httpserver.WrapResponseWriter(wOrig)
+	wbuffer := newWriteBuffer(wOrig, int(h.Cluster.Collections.WebDAVOutputBuffer))
+	defer wbuffer.Close()
+	w := httpserver.WrapResponseWriter(responseWriter{
+		Writer:         wbuffer,
+		ResponseWriter: wOrig,
+	})
 
 	if r.Method == "OPTIONS" && ServeCORSPreflight(w, r.Header) {
 		return
diff --git a/services/keep-web/writebuffer.go b/services/keep-web/writebuffer.go
new file mode 100644
index 0000000000..f309b69484
--- /dev/null
+++ b/services/keep-web/writebuffer.go
@@ -0,0 +1,141 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepweb
+
+import (
+	"errors"
+	"io"
+	"net/http"
+	"sync/atomic"
+)
+
+type writeBuffer struct {
+	out       io.Writer
+	buf       []byte
+	writesize int
+	wpos      atomic.Int64  // index in buf where writer (Write()) will write to next
+	wsignal   chan struct{} // receives a value after wpos or closed changes
+	rpos      atomic.Int64  // index in buf where reader (flush()) will read from next
+	rsignal   chan struct{} // receives a value after rpos or err changes
+	err       error         // error encountered by flush
+	closed    atomic.Bool
+	flushed   chan struct{} // closes when flush() is finished
+}
+
+func newWriteBuffer(w io.Writer, size int) *writeBuffer {
+	wb := &writeBuffer{
+		out:       w,
+		buf:       make([]byte, size),
+		writesize: (size + 63) / 64,
+		wsignal:   make(chan struct{}, 1),
+		rsignal:   make(chan struct{}, 1),
+		flushed:   make(chan struct{}),
+	}
+	go wb.flush()
+	return wb
+}
+
+func (wb *writeBuffer) Close() error {
+	if wb.closed.Load() {
+		return errors.New("writeBuffer: already closed")
+	}
+	wb.closed.Store(true)
+	// wake up flush()
+	select {
+	case wb.wsignal <- struct{}{}:
+	default:
+	}
+	// wait for flush() to finish
+	<-wb.flushed
+	return wb.err
+}
+
+func (wb *writeBuffer) Write(p []byte) (int, error) {
+	if len(wb.buf) < 2 {
+		// Our buffer logic doesn't work with size<2, and such
+		// a tiny buffer has no purpose anyway, so just write
+		// through unbuffered.
+		return wb.out.Write(p)
+	}
+	todo := p
+	wpos := int(wb.wpos.Load())
+	rpos := int(wb.rpos.Load())
+	for len(todo) > 0 {
+		for rpos == (wpos+1)%len(wb.buf) {
+			select {
+			case <-wb.flushed:
+				if wb.err == nil {
+					return 0, errors.New("Write called on closed writeBuffer")
+				}
+				return 0, wb.err
+			case <-wb.rsignal:
+				rpos = int(wb.rpos.Load())
+			}
+		}
+		var avail []byte
+		if rpos == 0 {
+			avail = wb.buf[wpos : len(wb.buf)-1]
+		} else if wpos >= rpos {
+			avail = wb.buf[wpos:]
+		} else {
+			avail = wb.buf[wpos : rpos-1]
+		}
+		n := copy(avail, todo)
+		wpos = (wpos + n) % len(wb.buf)
+		wb.wpos.Store(int64(wpos))
+		// wake up flush()
+		select {
+		case wb.wsignal <- struct{}{}:
+		default:
+		}
+		todo = todo[n:]
+	}
+	return len(p), nil
+}
+
+func (wb *writeBuffer) flush() {
+	defer close(wb.flushed)
+	rpos := 0
+	wpos := 0
+	closed := false
+	for {
+		for rpos == wpos {
+			if closed {
+				return
+			}
+			<-wb.wsignal
+			closed = wb.closed.Load()
+			wpos = int(wb.wpos.Load())
+		}
+		var ready []byte
+		if rpos < wpos {
+			ready = wb.buf[rpos:wpos]
+		} else {
+			ready = wb.buf[rpos:]
+		}
+		if len(ready) > wb.writesize {
+			ready = ready[:wb.writesize]
+		}
+		_, wb.err = wb.out.Write(ready)
+		if wb.err != nil {
+			return
+		}
+		rpos = (rpos + len(ready)) % len(wb.buf)
+		wb.rpos.Store(int64(rpos))
+		select {
+		case wb.rsignal <- struct{}{}:
+		default:
+		}
+	}
+}
+
+type responseWriter struct {
+	io.Writer
+	http.ResponseWriter
+}
+
+func (rwc responseWriter) Write(p []byte) (int, error) {
+	return rwc.Writer.Write(p)
+}
diff --git a/services/keep-web/writebuffer_test.go b/services/keep-web/writebuffer_test.go
new file mode 100644
index 0000000000..589dc241a2
--- /dev/null
+++ b/services/keep-web/writebuffer_test.go
@@ -0,0 +1,98 @@
+// Copyright (C) The Arvados Authors. All rights reserved.
+//
+// SPDX-License-Identifier: AGPL-3.0
+
+package keepweb
+
+import (
+	"bytes"
+	"io"
+	"math/rand"
+	"time"
+
+	. "gopkg.in/check.v1"
+)
+
+var _ = Suite(&writeBufferSuite{})
+
+type writeBufferSuite struct {
+}
+
+// 1000 / 96.3 ns/op = 10.384 GB/s
+func (s *writeBufferSuite) Benchmark_1KBWrites(c *C) {
+	wb := newWriteBuffer(io.Discard, 1<<20)
+	in := make([]byte, 1000)
+	for i := 0; i < c.N; i++ {
+		wb.Write(in)
+	}
+	wb.Close()
+}
+
+func (s *writeBufferSuite) TestRandomizedSpeedsAndSizes(c *C) {
+	for i := 0; i < 20; i++ {
+		insize := rand.Intn(1 << 26)
+		bufsize := rand.Intn(1 << 26)
+		if i < 2 {
+			// make sure to test edge cases
+			bufsize = i
+		} else if insize/bufsize > 1000 {
+			// don't waste too much time testing tiny
+			// buffer / huge content
+			insize = bufsize*1000 + 123
+		}
+		c.Logf("%s: insize %d bufsize %d", c.TestName(), insize, bufsize)
+
+		in := make([]byte, insize)
+		b := byte(0)
+		for i := range in {
+			in[i] = b
+			b++
+		}
+
+		out := &bytes.Buffer{}
+		done := make(chan struct{})
+		pr, pw := io.Pipe()
+		go func() {
+			n, err := slowCopy(out, pr, rand.Intn(8192)+1)
+			c.Check(err, IsNil)
+			c.Check(n, Equals, int64(insize))
+			close(done)
+		}()
+		wb := newWriteBuffer(pw, bufsize)
+		n, err := slowCopy(wb, bytes.NewBuffer(in), rand.Intn(8192)+1)
+		c.Check(err, IsNil)
+		c.Check(n, Equals, int64(insize))
+		c.Check(wb.Close(), IsNil)
+		c.Check(pw.Close(), IsNil)
+		<-done
+		c.Check(out.Len(), Equals, insize)
+		for i := 0; i < out.Len() && i < len(in); i++ {
+			if out.Bytes()[i] != in[i] {
+				c.Errorf("content mismatch at byte %d", i)
+				break
+			}
+		}
+	}
+}
+
+func slowCopy(dst io.Writer, src io.Reader, bufsize int) (int64, error) {
+	wrote := int64(0)
+	buf := make([]byte, bufsize)
+	for {
+		time.Sleep(time.Duration(rand.Intn(100) + 1))
+		n, err := src.Read(buf)
+		if n > 0 {
+			n, err := dst.Write(buf[:n])
+			wrote += int64(n)
+			if err != nil {
+				return wrote, err
+			}
+		}
+		if err == io.EOF {
+			return wrote, nil
+		}
+		if err != nil {
+			return wrote, err
+		}
+	}
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list