veyron/...: Changed the generated ipc stream to match our one true
stream api.

Change-Id: Ice4911926bbaba7b3a11c68579a9f8b34c5db460
diff --git a/runtimes/google/ipc/benchmarks/client.go b/runtimes/google/ipc/benchmarks/client.go
index 8117e8f..6d8bfc6 100644
--- a/runtimes/google/ipc/benchmarks/client.go
+++ b/runtimes/google/ipc/benchmarks/client.go
@@ -62,8 +62,8 @@
 		}
 		done := make(chan error, 1)
 		go func() {
-			for {
-				chunk, err := stream.Recv()
+			for stream.Advance() {
+				chunk := stream.Value()
 				if err == io.EOF {
 					done <- nil
 					return
@@ -77,6 +77,8 @@
 					return
 				}
 			}
+
+			done <- stream.Err()
 		}()
 		for j := 0; j < messageCount; j++ {
 			if err = stream.Send(payload); err != nil {
diff --git a/runtimes/google/ipc/benchmarks/server.go b/runtimes/google/ipc/benchmarks/server.go
index bd5eb0c..0d1c95e 100644
--- a/runtimes/google/ipc/benchmarks/server.go
+++ b/runtimes/google/ipc/benchmarks/server.go
@@ -1,8 +1,6 @@
 package benchmarks
 
 import (
-	"io"
-
 	sflag "veyron/security/flag"
 
 	"veyron2/ipc"
@@ -19,19 +17,14 @@
 }
 
 func (i *impl) EchoStream(ctx ipc.ServerContext, stream BenchmarkServiceEchoStreamStream) error {
-	for {
-		chunk, err := stream.Recv()
-		if err == io.EOF {
-			break
-		}
-		if err != nil {
-			return err
-		}
+	for stream.Advance() {
+		chunk := stream.Value()
 		if err := stream.Send(chunk); err != nil {
 			return err
 		}
 	}
-	return nil
+
+	return stream.Err()
 }
 
 // StartServer starts a server that implements the Benchmark service. The
diff --git a/runtimes/google/ipc/benchmarks/service.vdl.go b/runtimes/google/ipc/benchmarks/service.vdl.go
index 2356536..eddbfa9 100644
--- a/runtimes/google/ipc/benchmarks/service.vdl.go
+++ b/runtimes/google/ipc/benchmarks/service.vdl.go
@@ -7,6 +7,7 @@
 
 import (
 	// The non-user imports are prefixed with "_gen_" to prevent collisions.
+	_gen_io "io"
 	_gen_veyron2 "veyron2"
 	_gen_context "veyron2/context"
 	_gen_ipc "veyron2/ipc"
@@ -16,6 +17,10 @@
 	_gen_wiretype "veyron2/wiretype"
 )
 
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
 // Benchmark is the interface the client binds and uses.
 // Benchmark_ExcludingUniversal is the interface without internal framework-added methods
 // to enable embedding without method collisions.  Not to be used directly by clients.
@@ -43,31 +48,64 @@
 // EchoStream in the service interface Benchmark.
 type BenchmarkEchoStreamStream interface {
 
-	// Send places the item onto the output stream, blocking if there is no buffer
-	// space available.
+	// Send places the item onto the output stream, blocking if there is no
+	// buffer space available.  Calls to Send after having called CloseSend
+	// or Cancel will fail.  Any blocked Send calls will be unblocked upon
+	// calling Cancel.
 	Send(item []byte) error
 
-	// CloseSend indicates to the server that no more items will be sent; server
-	// Recv calls will receive io.EOF after all sent items.  Subsequent calls to
-	// Send on the client will fail.  This is an optional call - it's used by
-	// streaming clients that need the server to receive the io.EOF terminator.
+	// CloseSend indicates to the server that no more items will be sent;
+	// server Recv calls will receive io.EOF after all sent items.  This is
+	// an optional call - it's used by streaming clients that need the
+	// server to receive the io.EOF terminator before the client calls
+	// Finish (for example, if the client needs to continue receiving items
+	// from the server after having finished sending).
+	// Calls to CloseSend after having called Cancel will fail.
+	// Like Send, CloseSend blocks when there's no buffer space available.
 	CloseSend() error
 
-	// Recv returns the next item in the input stream, blocking until
-	// an item is available.  Returns io.EOF to indicate graceful end of input.
-	Recv() (item []byte, err error)
+	// Advance stages an element so the client can retrieve it
+	// with Value.  Advance returns true iff there is an
+	// element to retrieve.  The client must call Advance before
+	// calling Value.  The client must call Cancel if it does
+	// not iterate through all elements (i.e. until Advance
+	// returns false).  Advance may block if an element is not
+	// immediately available.
+	Advance() bool
 
-	// Finish closes the stream and returns the positional return values for
+	// Value returns the element that was staged by Advance.
+	// Value may panic if Advance returned false or was not
+	// called at all.  Value does not block.
+	Value() []byte
+
+	// Err returns a non-nil error iff the stream encountered
+	// any errors.  Err does not block.
+	Err() error
+
+	// Finish performs the equivalent of CloseSend, then blocks until the server
+	// is done, and returns the positional return values for call.
+	//
+	// If Cancel has been called, Finish will return immediately; the output of
+	// Finish could either be an error signalling cancelation, or the correct
+	// positional return values from the server depending on the timing of the
 	// call.
+	//
+	// Calling Finish is mandatory for releasing stream resources, unless Cancel
+	// has been called or any of the other methods return a non-EOF error.
+	// Finish should be called at most once.
 	Finish() (err error)
 
-	// Cancel cancels the RPC, notifying the server to stop processing.
+	// Cancel cancels the RPC, notifying the server to stop processing.  It
+	// is safe to call Cancel concurrently with any of the other stream methods.
+	// Calling Cancel after Finish has returned is a no-op.
 	Cancel()
 }
 
 // Implementation of the BenchmarkEchoStreamStream interface that is not exported.
 type implBenchmarkEchoStreamStream struct {
 	clientCall _gen_ipc.Call
+	val        []byte
+	err        error
 }
 
 func (c *implBenchmarkEchoStreamStream) Send(item []byte) error {
@@ -78,9 +116,20 @@
 	return c.clientCall.CloseSend()
 }
 
-func (c *implBenchmarkEchoStreamStream) Recv() (item []byte, err error) {
-	err = c.clientCall.Recv(&item)
-	return
+func (c *implBenchmarkEchoStreamStream) Advance() bool {
+	c.err = c.clientCall.Recv(&c.val)
+	return c.err == nil
+}
+
+func (c *implBenchmarkEchoStreamStream) Value() []byte {
+	return c.val
+}
+
+func (c *implBenchmarkEchoStreamStream) Err() error {
+	if c.err == _gen_io.EOF {
+		return nil
+	}
+	return c.err
 }
 
 func (c *implBenchmarkEchoStreamStream) Finish() (err error) {
@@ -98,26 +147,58 @@
 // EchoStream in the service interface Benchmark.
 type BenchmarkServiceEchoStreamStream interface {
 	// Send places the item onto the output stream, blocking if there is no buffer
-	// space available.
+	// space available.  If the client has canceled, an error is returned.
 	Send(item []byte) error
 
-	// Recv fills itemptr with the next item in the input stream, blocking until
-	// an item is available.  Returns io.EOF to indicate graceful end of input.
-	Recv() (item []byte, err error)
+	// Advance stages an element so the client can retrieve it
+	// with Value.  Advance returns true iff there is an
+	// element to retrieve.  The client must call Advance before
+	// calling Value.  The client must call Cancel if it does
+	// not iterate through all elements (i.e. until Advance
+	// returns false).  Advance may block if an element is not
+	// immediately available.
+	Advance() bool
+
+	// Value returns the element that was staged by Advance.
+	// Value may panic if Advance returned false or was not
+	// called at all.  Value does not block.
+	//
+	// In general, Value is undefined if the underlying collection
+	// of elements changes while iteration is in progress.  If
+	// <DataProvider> supports concurrent modification, it should
+	// document its behavior.
+	Value() []byte
+
+	// Err returns a non-nil error iff the stream encountered
+	// any errors.  Err does not block.
+	Err() error
 }
 
 // Implementation of the BenchmarkServiceEchoStreamStream interface that is not exported.
 type implBenchmarkServiceEchoStreamStream struct {
 	serverCall _gen_ipc.ServerCall
+	val        []byte
+	err        error
 }
 
 func (s *implBenchmarkServiceEchoStreamStream) Send(item []byte) error {
 	return s.serverCall.Send(item)
 }
 
-func (s *implBenchmarkServiceEchoStreamStream) Recv() (item []byte, err error) {
-	err = s.serverCall.Recv(&item)
-	return
+func (s *implBenchmarkServiceEchoStreamStream) Advance() bool {
+	s.err = s.serverCall.Recv(&s.val)
+	return s.err == nil
+}
+
+func (s *implBenchmarkServiceEchoStreamStream) Value() []byte {
+	return s.val
+}
+
+func (s *implBenchmarkServiceEchoStreamStream) Err() error {
+	if s.err == _gen_io.EOF {
+		return nil
+	}
+	return s.err
 }
 
 // BindBenchmark returns the client stub implementing the Benchmark
diff --git a/runtimes/google/ipc/jni/arg_getter.go b/runtimes/google/ipc/jni/arg_getter.go
index fc6bf3b..04efd0b 100644
--- a/runtimes/google/ipc/jni/arg_getter.go
+++ b/runtimes/google/ipc/jni/arg_getter.go
@@ -97,9 +97,9 @@
 		mArgs.streamSendType = mSend.Type.In(0)
 	}
 	// Get the stream recv type.
-	if mRecv, ok := stream.MethodByName("Recv"); ok {
-		if mRecv.Type.NumOut() != 2 {
-			return fmt.Errorf("Illegal number of arguments for Recv method in stream %v", stream)
+	if mRecv, ok := stream.MethodByName("Value"); ok {
+		if mRecv.Type.NumOut() != 1 {
+			return fmt.Errorf("Illegal number of arguments for Value method in stream %v", stream)
 		}
 		mArgs.streamRecvType = mRecv.Type.Out(0)
 	}
diff --git a/runtimes/google/rt/mgmt_test.go b/runtimes/google/rt/mgmt_test.go
index bb78645..7f314e9 100644
--- a/runtimes/google/rt/mgmt_test.go
+++ b/runtimes/google/rt/mgmt_test.go
@@ -2,7 +2,6 @@
 
 import (
 	"fmt"
-	"io"
 	"os"
 	"reflect"
 	"strings"
@@ -305,17 +304,19 @@
 		t.Fatalf("Got error: %v", err)
 	}
 	expectTask := func(progress, goal int32) {
-		if task, err := stream.Recv(); err != nil {
-			t.Fatalf("unexpected streaming error: %q", err)
-		} else if task.Progress != progress || task.Goal != goal {
+		if !stream.Advance() {
+			t.Fatalf("unexpected streaming error: %q", stream.Err())
+		}
+		task := stream.Value()
+		if task.Progress != progress || task.Goal != goal {
 			t.Errorf("Got (%d, %d), want (%d, %d)", task.Progress, task.Goal, progress, goal)
 		}
 	}
 	expectTask(0, 10)
 	expectTask(2, 10)
 	expectTask(7, 10)
-	if task, err := stream.Recv(); err != io.EOF {
-		t.Errorf("Expected (nil, EOF), got (%v, %v) instead", task, err)
+	if stream.Advance() || stream.Err() != nil {
+		t.Errorf("Expected EOF, got (%v, %v) instead", stream.Value(), stream.Err())
 	}
 	if err := stream.Finish(); err != nil {
 		t.Errorf("Got error %v", err)
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index af7ece9..76aa7da 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -3,7 +3,6 @@
 import (
 	"errors"
 	"fmt"
-	"io"
 	"math/rand"
 	"strings"
 	"time"
@@ -256,14 +255,8 @@
 	// Compute the minimum generation for every device in this set.
 	minGens := GenVector{}
 
-	for {
-		rec, err := stream.Recv()
-		if err == io.EOF {
-			break
-		}
-		if err != nil {
-			return GenVector{}, err
-		}
+	for stream.Advance() {
+		rec := stream.Value()
 
 		if err := i.insertRecInLogAndDag(&rec); err != nil {
 			return GenVector{}, err
@@ -292,6 +285,9 @@
 		}
 	}
 
+	if err := stream.Err(); err != nil {
+		return GenVector{}, err
+	}
 	if err := i.createGenMetadataBatch(newGens, orderGens); err != nil {
 		return GenVector{}, err
 	}
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index bd9356c..5699fbc 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -4,7 +4,6 @@
 import (
 	"container/list"
 	"fmt"
-	"io"
 	"os"
 	"time"
 
@@ -36,7 +35,8 @@
 
 // dummyStream struct emulates stream of log records received from RPC.
 type dummyStream struct {
-	l *list.List
+	l     *list.List
+	value LogRec
 }
 
 func newStream() *dummyStream {
@@ -46,14 +46,20 @@
 	return ds
 }
 
-func (ds *dummyStream) Recv() (LogRec, error) {
+func (ds *dummyStream) Advance() bool {
 	if ds.l.Len() > 0 {
-		item := ds.l.Remove(ds.l.Front()).(LogRec)
-		return item, nil
+		ds.value = ds.l.Remove(ds.l.Front()).(LogRec)
+		return true
 	}
-	return LogRec{}, io.EOF
+	return false
 }
 
+func (ds *dummyStream) Value() LogRec {
+	return ds.value
+}
+
+func (*dummyStream) Err() error { return nil }
+
 func (ds *dummyStream) Finish() (GenVector, error) {
 	return GenVector{}, nil
 }
diff --git a/runtimes/google/vsync/vsync.vdl.go b/runtimes/google/vsync/vsync.vdl.go
index f81a4b7..7dcc5a4 100644
--- a/runtimes/google/vsync/vsync.vdl.go
+++ b/runtimes/google/vsync/vsync.vdl.go
@@ -9,6 +9,7 @@
 	"veyron2/storage"
 
 	// The non-user imports are prefixed with "_gen_" to prevent collisions.
+	_gen_io "io"
 	_gen_veyron2 "veyron2"
 	_gen_context "veyron2/context"
 	_gen_ipc "veyron2/ipc"
@@ -81,6 +82,10 @@
 	LinkRec = byte(1)
 )
 
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
 // Sync allows a device to GetDeltas from another device.
 // Sync is the interface the client binds and uses.
 // Sync_ExcludingUniversal is the interface without internal framework-added methods
@@ -107,26 +112,65 @@
 // GetDeltas in the service interface Sync.
 type SyncGetDeltasStream interface {
 
-	// Recv returns the next item in the input stream, blocking until
-	// an item is available.  Returns io.EOF to indicate graceful end of input.
-	Recv() (item LogRec, err error)
+	// Advance stages an element so the client can retrieve it
+	// with Value.  Advance returns true iff there is an
+	// element to retrieve.  The client must call Advance before
+	// calling Value.  The client must call Cancel if it does
+	// not iterate through all elements (i.e. until Advance
+	// returns false).  Advance may block if an element is not
+	// immediately available.
+	Advance() bool
 
-	// Finish closes the stream and returns the positional return values for
+	// Value returns the element that was staged by Advance.
+	// Value may panic if Advance returned false or was not
+	// called at all.  Value does not block.
+	Value() LogRec
+
+	// Err returns a non-nil error iff the stream encountered
+	// any errors.  Err does not block.
+	Err() error
+
+	// Finish blocks until the server is done and returns the positional
+	// return values for call.
+	//
+	// If Cancel has been called, Finish will return immediately; the output of
+	// Finish could either be an error signalling cancelation, or the correct
+	// positional return values from the server depending on the timing of the
 	// call.
+	//
+	// Calling Finish is mandatory for releasing stream resources, unless Cancel
+	// has been called or any of the other methods return a non-EOF error.
+	// Finish should be called at most once.
 	Finish() (reply GenVector, err error)
 
-	// Cancel cancels the RPC, notifying the server to stop processing.
+	// Cancel cancels the RPC, notifying the server to stop processing.  It
+	// is safe to call Cancel concurrently with any of the other stream methods.
+	// Calling Cancel after Finish has returned is a no-op.
 	Cancel()
 }
 
 // Implementation of the SyncGetDeltasStream interface that is not exported.
 type implSyncGetDeltasStream struct {
 	clientCall _gen_ipc.Call
+	val        LogRec
+	err        error
 }
 
-func (c *implSyncGetDeltasStream) Recv() (item LogRec, err error) {
-	err = c.clientCall.Recv(&item)
-	return
+func (c *implSyncGetDeltasStream) Advance() bool {
+	c.val = LogRec{}
+	c.err = c.clientCall.Recv(&c.val)
+	return c.err == nil
+}
+
+func (c *implSyncGetDeltasStream) Value() LogRec {
+	return c.val
+}
+
+func (c *implSyncGetDeltasStream) Err() error {
+	if c.err == _gen_io.EOF {
+		return nil
+	}
+	return c.err
 }
 
 func (c *implSyncGetDeltasStream) Finish() (reply GenVector, err error) {
@@ -144,7 +188,7 @@
 // GetDeltas in the service interface Sync.
 type SyncServiceGetDeltasStream interface {
 	// Send places the item onto the output stream, blocking if there is no buffer
-	// space available.
+	// space available.  If the client has canceled, an error is returned.
 	Send(item LogRec) error
 }
 
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index f911231..aa4398f 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -7,6 +7,7 @@
 
 import (
 	"fmt"
+	"io"
 	"time"
 
 	"veyron/services/store/raw"
@@ -123,27 +124,29 @@
 // If the stream is closed, distinguish between the cases of end-of-stream vs Syncd canceling
 // the stream to trigger a clean exit.
 func (w *syncWatcher) processWatchStream(stream watch.GlobWatcherWatchGlobStream) {
-	for {
-		changes, err := stream.Recv()
-		if err != nil {
-			if w.syncd.isSyncClosing() {
-				vlog.VI(1).Info("processWatchStream: exiting, Syncd closed its channel: ", err)
-			} else {
-				vlog.VI(1).Info("processWatchStream: RPC stream error, re-issue Watch(): ", err)
-			}
-			return
-		}
+	for stream.Advance() {
+		changes := stream.Value()
 
 		// Timestamp of these changes arriving at the Sync server.
 		syncTime := time.Now().UnixNano()
 
-		if err = w.processChanges(changes, syncTime); err != nil {
+		if err := w.processChanges(changes, syncTime); err != nil {
 			// TODO(rdaoud): don't crash, instead add retry policies to attempt some degree of
 			// self-healing from a data corruption where feasible, otherwise quarantine this device
 			// from the cluster and stop Syncd to avoid propagating data corruptions.
 			vlog.Fatal("processWatchStream:", err)
 		}
 	}
+
+	err := stream.Err()
+	if err == nil {
+		err = io.EOF
+	}
+	if w.syncd.isSyncClosing() {
+		vlog.VI(1).Info("processWatchStream: exiting, Syncd closed its channel: ", err)
+	} else {
+		vlog.VI(1).Info("processWatchStream: RPC stream error, re-issue Watch(): ", err)
+	}
 }
 
 // processChanges applies the batch of changes (object mutations) received from the Watch API.
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index 6eec8e4..b9d8a79 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -5,7 +5,6 @@
 import (
 	"bytes"
 	"fmt"
-	"io"
 	"os"
 	"testing"
 	"time"
@@ -73,6 +72,7 @@
 // fakeStream is used to simulate the reply stream of the Watch() API.
 type fakeStream struct {
 	canceled chan struct{}
+	err      error
 }
 
 func newFakeStream() *fakeStream {
@@ -81,30 +81,35 @@
 	return s
 }
 
-func (s *fakeStream) Recv() (watch.ChangeBatch, error) {
-	var empty watch.ChangeBatch
+func (s *fakeStream) Advance() bool {
 	// If "failRecv" is set, simulate a failed call.
 	if info.failRecv {
 		info.failRecvCount++
-		return empty, fmt.Errorf("fake recv error on fake stream: %d", info.failRecvCount)
+		s.err = fmt.Errorf("fake recv error on fake stream: %d", info.failRecvCount)
+		return false
 	}
 
 	// If "eofRecv" is set, simulate a closed stream and make sure the next Recv() call blocks.
 	if info.eofRecv {
 		info.eofRecv, info.blockRecv = false, true
-		return empty, io.EOF
+		s.err = nil
+		return false
 	}
 
 	// If "blockRecv" is set, simulate blocking the call until the stream is canceled.
 	if info.blockRecv {
 		close(recvBlocked)
 		<-s.canceled
-		return empty, io.EOF
+		s.err = nil
+		return false
 	}
-
 	// Otherwise return a batch of changes, and make sure the next Recv() call returns EOF on the stream.
 	// Adjust the resume marker of the change records to follow the one given to the Watch request.
 	info.eofRecv = true
+	return true
+}
+
+func (s *fakeStream) Value() watch.ChangeBatch {
 	changes := getChangeBatch()
 
 	var lastCount byte
@@ -121,9 +126,12 @@
 		}
 	}
 
-	return changes, nil
+	return changes
 }
 
+func (s *fakeStream) Err() error {
+	return s.err
+}
 func (s *fakeStream) Finish() error {
 	return nil
 }