[ARVADOS] created: a278be11225d5f4d77eaa332854421995ce20224

Git user git at public.curoverse.com
Fri Oct 14 11:53:36 EDT 2016


        at  a278be11225d5f4d77eaa332854421995ce20224 (commit)


commit a278be11225d5f4d77eaa332854421995ce20224
Author: Tom Clegg <tom at curoverse.com>
Date:   Fri Oct 14 11:49:21 2016 -0400

    10211: Return an error instead of crashing if stream is closed twice.

diff --git a/sdk/go/streamer/streamer.go b/sdk/go/streamer/streamer.go
index 2217dd3..a46ca4c 100644
--- a/sdk/go/streamer/streamer.go
+++ b/sdk/go/streamer/streamer.go
@@ -36,15 +36,19 @@ Alternately, if you already have a filled buffer and just want to read out from
 package streamer
 
 import (
+	"errors"
 	"io"
 )
 
+var ErrAlreadyClosed = errors.New("cannot close a stream twice")
+
 type AsyncStream struct {
 	buffer            []byte
 	requests          chan sliceRequest
 	add_reader        chan bool
 	subtract_reader   chan bool
 	wait_zero_readers chan bool
+	closed            bool
 }
 
 // Reads from the buffer managed by the Transfer()
@@ -55,7 +59,13 @@ type StreamReader struct {
 }
 
 func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
-	t := &AsyncStream{make([]byte, buffersize), make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+	t := &AsyncStream{
+		buffer:            make([]byte, buffersize),
+		requests:          make(chan sliceRequest),
+		add_reader:        make(chan bool),
+		subtract_reader:   make(chan bool),
+		wait_zero_readers: make(chan bool),
+	}
 
 	go t.transfer(source)
 	go t.readersMonitor()
@@ -64,7 +74,13 @@ func AsyncStreamFromReader(buffersize int, source io.Reader) *AsyncStream {
 }
 
 func AsyncStreamFromSlice(buf []byte) *AsyncStream {
-	t := &AsyncStream{buf, make(chan sliceRequest), make(chan bool), make(chan bool), make(chan bool)}
+	t := &AsyncStream{
+		buffer:            buf,
+		requests:          make(chan sliceRequest),
+		add_reader:        make(chan bool),
+		subtract_reader:   make(chan bool),
+		wait_zero_readers: make(chan bool),
+	}
 
 	go t.transfer(nil)
 	go t.readersMonitor()
@@ -115,16 +131,24 @@ func (this *StreamReader) WriteTo(dest io.Writer) (written int64, err error) {
 
 // Close the responses channel
 func (this *StreamReader) Close() error {
+	if this.stream == nil {
+		return ErrAlreadyClosed
+	}
 	this.stream.subtract_reader <- true
 	close(this.responses)
 	this.stream = nil
 	return nil
 }
 
-func (this *AsyncStream) Close() {
+func (this *AsyncStream) Close() error {
+	if this.closed {
+		return ErrAlreadyClosed
+	}
+	this.closed = true
 	this.wait_zero_readers <- true
 	close(this.requests)
 	close(this.add_reader)
 	close(this.subtract_reader)
 	close(this.wait_zero_readers)
+	return nil
 }
diff --git a/sdk/go/streamer/streamer_test.go b/sdk/go/streamer/streamer_test.go
index 80aeb26..e3274f7 100644
--- a/sdk/go/streamer/streamer_test.go
+++ b/sdk/go/streamer/streamer_test.go
@@ -365,3 +365,13 @@ func (s *StandaloneSuite) TestManyReaders(c *C) {
 	writer.Write([]byte("baz"))
 	writer.Close()
 }
+
+func (s *StandaloneSuite) TestMultipleClose(c *C) {
+	buffer := make([]byte, 100)
+	tr := AsyncStreamFromSlice(buffer)
+	sr := tr.MakeStreamReader()
+	sr.Close()
+	sr.Close()
+	tr.Close()
+	tr.Close()
+}

-----------------------------------------------------------------------


hooks/post-receive
-- 




More information about the arvados-commits mailing list