runtimes/google/ipc/...: consistently return io.EOF to avoid spammy log messages.

- vc/writer now returns io.EOF if the flow has been closed remotely.

Change-Id: I5bc5190d7857c949e20f76570545997383224d37
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index af3a370..1b6a66d 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -443,7 +443,9 @@
 			if err := fs.serve(); err != nil {
 				// TODO(caprita): Logging errors here is too spammy. For example, "not
 				// authorized" errors shouldn't be logged as server errors.
-				vlog.Errorf("Flow serve on %v failed: %v", ep, err)
+				if err != io.EOF {
+					vlog.Errorf("Flow serve on %v failed: %v", ep, err)
+				}
 			}
 		}(flow)
 	}
@@ -757,6 +759,10 @@
 		TraceResponse:    traceResponse,
 	}
 	if err := fs.enc.Encode(response); err != nil {
+		if err == io.EOF {
+			return err
+		}
+		// We'll close the flow 2x.
 		return verror.BadProtocolf("ipc: response encoding failed: %v", err)
 	}
 	if response.Error != nil {
@@ -764,6 +770,9 @@
 	}
 	for ix, res := range results {
 		if err := fs.encodeValueHack(res); err != nil {
+			if err == io.EOF {
+				return err
+			}
 			return verror.BadProtocolf("ipc: result #%d [%T=%v] encoding failed: %v", ix, res, res, err)
 		}
 	}
diff --git a/runtimes/google/ipc/stream/vc/flow.go b/runtimes/google/ipc/stream/vc/flow.go
index f7fa3d8..958db51 100644
--- a/runtimes/google/ipc/stream/vc/flow.go
+++ b/runtimes/google/ipc/stream/vc/flow.go
@@ -39,12 +39,12 @@
 // This is appropriate when the flow has been closed by the remote end.
 func (f *flow) Shutdown() {
 	f.reader.Close()
-	f.writer.Shutdown(true)
+	f.writer.shutdown(true)
 }
 
 // Cancel closes the flow and discards any queued up write buffers.
 // This is appropriate when the flow is being cancelled locally.
 func (f *flow) Cancel() {
 	f.reader.Close()
-	f.writer.Shutdown(false)
+	f.writer.shutdown(false)
 }
diff --git a/runtimes/google/ipc/stream/vc/writer.go b/runtimes/google/ipc/stream/vc/writer.go
index f3f3788..7ac9fff 100644
--- a/runtimes/google/ipc/stream/vc/writer.go
+++ b/runtimes/google/ipc/stream/vc/writer.go
@@ -3,6 +3,7 @@
 import (
 	"errors"
 	"fmt"
+	"io"
 	"sync"
 	"sync/atomic"
 
@@ -20,11 +21,12 @@
 	Alloc          *iobuf.Allocator // Allocator for iobuf.Slice objects. GUARDED_BY(mu)
 	SharedCounters *vsync.Semaphore // Semaphore hosting counters shared by all flows over a VC.
 
-	mu        sync.Mutex      // Guards call to Writes
-	wroteOnce bool            // GUARDED_BY(mu)
-	isClosed  bool            // GUARDED_BY(mu)
-	closed    chan struct{}   // GUARDED_BY(mu)
-	deadline  <-chan struct{} // GUARDED_BY(mu)
+	mu         sync.Mutex      // Guards call to Writes
+	wroteOnce  bool            // GUARDED_BY(mu)
+	isClosed   bool            // GUARDED_BY(mu)
+	closeError error           // GUARDED_BY(mu)
+	closed     chan struct{}   // GUARDED_BY(mu)
+	deadline   <-chan struct{} // GUARDED_BY(mu)
 
 	// Total number of bytes filled in by all Write calls on this writer.
 	// Atomic operations are used to manipulate it.
@@ -42,6 +44,7 @@
 		Alloc:          alloc,
 		SharedCounters: counters,
 		closed:         make(chan struct{}),
+		closeError:     errWriterClosed,
 	}
 }
 
@@ -50,15 +53,15 @@
 // If removeWriter is true the writer will also be removed entirely from the
 // bqueue, otherwise the now empty writer will eventually be returned by
 // bqueue.Get.
-func (w *writer) Shutdown(removeWriter bool) {
+func (w *writer) shutdown(removeWriter bool) {
 	w.Sink.Shutdown(removeWriter)
-	w.finishClose()
+	w.finishClose(true)
 }
 
 // Close closes the writer without discarding any queued up write buffers.
 func (w *writer) Close() {
 	w.Sink.Close()
-	w.finishClose()
+	w.finishClose(false)
 }
 
 func (w *writer) IsClosed() bool {
@@ -71,7 +74,7 @@
 	return w.closed
 }
 
-func (w *writer) finishClose() {
+func (w *writer) finishClose(remoteShutdown bool) {
 	// IsClosed() and Closed() indicate that the writer is closed before
 	// finishClose() completes. This is safe because Alloc and shared counters
 	// are guarded, and are not accessed elsewhere after w.closed is closed.
@@ -79,8 +82,12 @@
 	// finishClose() is idempotent, but Go's builtin close is not.
 	if !w.isClosed {
 		w.isClosed = true
+		if remoteShutdown {
+			w.closeError = io.EOF
+		}
 		close(w.closed)
 	}
+
 	w.Alloc.Release()
 	w.mu.Unlock()
 
@@ -103,8 +110,9 @@
 	w.mu.Lock()
 	defer w.mu.Unlock()
 	if w.isClosed {
-		return 0, errWriterClosed
+		return 0, w.closeError
 	}
+
 	for len(b) > 0 {
 		n := len(b)
 		if n > w.MTU {
@@ -134,7 +142,7 @@
 			case bqueue.ErrCancelled, vsync.ErrCanceled:
 				return written, timeoutError{}
 			case bqueue.ErrWriterIsClosed:
-				return written, errWriterClosed
+				return written, w.closeError
 			default:
 				return written, fmt.Errorf("bqueue.Writer.Put failed: %v", err)
 			}
diff --git a/runtimes/google/ipc/stream/vc/writer_test.go b/runtimes/google/ipc/stream/vc/writer_test.go
index 1ba4908..5c2655b 100644
--- a/runtimes/google/ipc/stream/vc/writer_test.go
+++ b/runtimes/google/ipc/stream/vc/writer_test.go
@@ -2,6 +2,7 @@
 
 import (
 	"bytes"
+	"io"
 	"net"
 	"reflect"
 	"testing"
@@ -95,6 +96,26 @@
 	}
 }
 
+func TestShutdownBeforeWrite(t *testing.T) {
+	bq := drrqueue.New(128)
+	defer bq.Close()
+
+	bw, err := bq.NewWriter(0, 0, 10)
+	if err != nil {
+		t.Fatal(err)
+	}
+
+	shared := sync.NewSemaphore()
+	shared.IncN(4)
+
+	w := newTestWriter(bw, shared)
+	w.shutdown(true)
+
+	if n, err := w.Write([]byte{1, 2}); n != 0 || err != io.EOF {
+		t.Errorf("Got (%v, %v) want (0, %v)", n, err, io.EOF)
+	}
+}
+
 func TestCloseDoesNotDiscardPendingWrites(t *testing.T) {
 	bq := drrqueue.New(128)
 	defer bq.Close()
diff --git a/runtimes/google/ipc/stream/vif/vif.go b/runtimes/google/ipc/stream/vif/vif.go
index 6f97362..9a9f025 100644
--- a/runtimes/google/ipc/stream/vif/vif.go
+++ b/runtimes/google/ipc/stream/vif/vif.go
@@ -386,6 +386,10 @@
 		if vc, _, _ := vif.vcMap.Find(m.VCI); vc != nil {
 			vif.vcMap.Delete(vc.VCI())
 			vlog.VI(2).Infof("CloseVC(%+v) on VIF %s", m, vif)
+			// TODO(cnicolaou): it would be nice to have a method on VC
+			// to indicate a 'remote close' rather than a 'local one'. This helps
+			// with error reporting since we expect reads/writes to occur
+			// after a remote close, but not after a local close.
 			vc.Close(fmt.Sprintf("remote end closed VC(%v)", m.Error))
 			return nil
 		}