runtimes/google/ipc/...: consistently return io.EOF to avoid spammy log messages.
- vc/writer now returns io.EOF if the flow has been closed remotely.
Change-Id: I5bc5190d7857c949e20f76570545997383224d37
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index af3a370..1b6a66d 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -443,7 +443,9 @@
if err := fs.serve(); err != nil {
// TODO(caprita): Logging errors here is too spammy. For example, "not
// authorized" errors shouldn't be logged as server errors.
- vlog.Errorf("Flow serve on %v failed: %v", ep, err)
+ if err != io.EOF {
+ vlog.Errorf("Flow serve on %v failed: %v", ep, err)
+ }
}
}(flow)
}
@@ -757,6 +759,10 @@
TraceResponse: traceResponse,
}
if err := fs.enc.Encode(response); err != nil {
+ if err == io.EOF {
+ return err
+ }
+ // We'll close the flow 2x.
return verror.BadProtocolf("ipc: response encoding failed: %v", err)
}
if response.Error != nil {
@@ -764,6 +770,9 @@
}
for ix, res := range results {
if err := fs.encodeValueHack(res); err != nil {
+ if err == io.EOF {
+ return err
+ }
return verror.BadProtocolf("ipc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
}
}
diff --git a/runtimes/google/ipc/stream/vc/flow.go b/runtimes/google/ipc/stream/vc/flow.go
index f7fa3d8..958db51 100644
--- a/runtimes/google/ipc/stream/vc/flow.go
+++ b/runtimes/google/ipc/stream/vc/flow.go
@@ -39,12 +39,12 @@
// This is appropriate when the flow has been closed by the remote end.
func (f *flow) Shutdown() {
f.reader.Close()
- f.writer.Shutdown(true)
+ f.writer.shutdown(true)
}
// Cancel closes the flow and discards any queued up write buffers.
// This is appropriate when the flow is being cancelled locally.
func (f *flow) Cancel() {
f.reader.Close()
- f.writer.Shutdown(false)
+ f.writer.shutdown(false)
}
diff --git a/runtimes/google/ipc/stream/vc/writer.go b/runtimes/google/ipc/stream/vc/writer.go
index f3f3788..7ac9fff 100644
--- a/runtimes/google/ipc/stream/vc/writer.go
+++ b/runtimes/google/ipc/stream/vc/writer.go
@@ -3,6 +3,7 @@
import (
"errors"
"fmt"
+ "io"
"sync"
"sync/atomic"
@@ -20,11 +21,12 @@
Alloc *iobuf.Allocator // Allocator for iobuf.Slice objects. GUARDED_BY(mu)
SharedCounters *vsync.Semaphore // Semaphore hosting counters shared by all flows over a VC.
- mu sync.Mutex // Guards call to Writes
- wroteOnce bool // GUARDED_BY(mu)
- isClosed bool // GUARDED_BY(mu)
- closed chan struct{} // GUARDED_BY(mu)
- deadline <-chan struct{} // GUARDED_BY(mu)
+ mu sync.Mutex // Guards call to Writes
+ wroteOnce bool // GUARDED_BY(mu)
+ isClosed bool // GUARDED_BY(mu)
+ closeError error // GUARDED_BY(mu)
+ closed chan struct{} // GUARDED_BY(mu)
+ deadline <-chan struct{} // GUARDED_BY(mu)
// Total number of bytes filled in by all Write calls on this writer.
// Atomic operations are used to manipulate it.
@@ -42,6 +44,7 @@
Alloc: alloc,
SharedCounters: counters,
closed: make(chan struct{}),
+ closeError: errWriterClosed,
}
}
@@ -50,15 +53,15 @@
// If removeWriter is true the writer will also be removed entirely from the
// bqueue, otherwise the now empty writer will eventually be returned by
// bqueue.Get.
-func (w *writer) Shutdown(removeWriter bool) {
+func (w *writer) shutdown(removeWriter bool) {
w.Sink.Shutdown(removeWriter)
- w.finishClose()
+ w.finishClose(true)
}
// Close closes the writer without discarding any queued up write buffers.
func (w *writer) Close() {
w.Sink.Close()
- w.finishClose()
+ w.finishClose(false)
}
func (w *writer) IsClosed() bool {
@@ -71,7 +74,7 @@
return w.closed
}
-func (w *writer) finishClose() {
+func (w *writer) finishClose(remoteShutdown bool) {
// IsClosed() and Closed() indicate that the writer is closed before
// finishClose() completes. This is safe because Alloc and shared counters
// are guarded, and are not accessed elsewhere after w.closed is closed.
@@ -79,8 +82,12 @@
// finishClose() is idempotent, but Go's builtin close is not.
if !w.isClosed {
w.isClosed = true
+ if remoteShutdown {
+ w.closeError = io.EOF
+ }
close(w.closed)
}
+
w.Alloc.Release()
w.mu.Unlock()
@@ -103,8 +110,9 @@
w.mu.Lock()
defer w.mu.Unlock()
if w.isClosed {
- return 0, errWriterClosed
+ return 0, w.closeError
}
+
for len(b) > 0 {
n := len(b)
if n > w.MTU {
@@ -134,7 +142,7 @@
case bqueue.ErrCancelled, vsync.ErrCanceled:
return written, timeoutError{}
case bqueue.ErrWriterIsClosed:
- return written, errWriterClosed
+ return written, w.closeError
default:
return written, fmt.Errorf("bqueue.Writer.Put failed: %v", err)
}
diff --git a/runtimes/google/ipc/stream/vc/writer_test.go b/runtimes/google/ipc/stream/vc/writer_test.go
index 1ba4908..5c2655b 100644
--- a/runtimes/google/ipc/stream/vc/writer_test.go
+++ b/runtimes/google/ipc/stream/vc/writer_test.go
@@ -2,6 +2,7 @@
import (
"bytes"
+ "io"
"net"
"reflect"
"testing"
@@ -95,6 +96,26 @@
}
}
+func TestShutdownBeforeWrite(t *testing.T) {
+ bq := drrqueue.New(128)
+ defer bq.Close()
+
+ bw, err := bq.NewWriter(0, 0, 10)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ shared := sync.NewSemaphore()
+ shared.IncN(4)
+
+ w := newTestWriter(bw, shared)
+ w.shutdown(true)
+
+ if n, err := w.Write([]byte{1, 2}); n != 0 || err != io.EOF {
+ t.Errorf("Got (%v, %v) want (0, %v)", n, err, io.EOF)
+ }
+}
+
func TestCloseDoesNotDiscardPendingWrites(t *testing.T) {
bq := drrqueue.New(128)
defer bq.Close()
diff --git a/runtimes/google/ipc/stream/vif/vif.go b/runtimes/google/ipc/stream/vif/vif.go
index 6f97362..9a9f025 100644
--- a/runtimes/google/ipc/stream/vif/vif.go
+++ b/runtimes/google/ipc/stream/vif/vif.go
@@ -386,6 +386,10 @@
if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
vif.vcMap.Delete(vc.VCI())
vlog.VI(2).Infof("CloseVC(%+v) on VIF %s", m, vif)
+ // TODO(cnicolaou): it would be nice to have a method on VC
+ // to indicate a 'remote close' rather than a 'local one'. This helps
+ // with error reporting since we expect reads/writes to occur
+ // after a remote close, but not after a local close.
vc.Close(fmt.Sprintf("remote end closed VC(%v)", m.Error))
return nil
}