veyron/:  Split out the recv and send portion of the stream into sub interfaces so we don't have naming confusion when both sets of methods are on the RPC.  The RPC have RecvStream or SendStream methods for the recv or send portion of the stream.  This also includes a few naming changes in the API.

Change-Id: I7e9dd6c4b7dbcae827dad8d7d839ec7d58f4a81f
diff --git a/examples/boxes/android/src/boxesp2p/main.go b/examples/boxes/android/src/boxesp2p/main.go
index 7580fff..7174583 100644
--- a/examples/boxes/android/src/boxesp2p/main.go
+++ b/examples/boxes/android/src/boxesp2p/main.go
@@ -174,14 +174,16 @@
 func (gs *goState) streamBoxesLoop() {
 	// Loop to receive boxes from remote peer
 	go func() {
-		for gs.drawStream.Advance() {
-			box := gs.drawStream.Value()
+		rStream := gs.drawStream.RecvStream()
+		for rStream.Advance() {
+			box := rStream.Value()
 			nativeJava.addBox(&box)
 		}
 	}()
 	// Loop to send boxes to remote peer
+	sender := gs.drawStream.SendStream()
 	for {
-		if err := gs.drawStream.Send(<-gs.boxList); err != nil {
+		if err := sender.Send(<-gs.boxList); err != nil {
 			break
 		}
 	}
@@ -204,8 +206,9 @@
 		if err != nil {
 			panic(fmt.Errorf("Can't watch store: %s: %s", gs.storeEndpoint, err))
 		}
-		for stream.Advance() {
-			cb := stream.Value()
+		rStream := stream.RecvStream()
+		for rStream.Advance() {
+			cb := rStream.Value()
 			for _, change := range cb.Changes {
 				if entry, ok := change.Value.(*storage.Entry); ok {
 					if box, ok := entry.Value.(boxes.Box); ok && box.DeviceId != gs.myIPAddr {
@@ -215,7 +218,7 @@
 			}
 		}
 
-		err = stream.Err()
+		err = rStream.Err()
 		if err == nil {
 			err = io.EOF
 		}
@@ -268,6 +271,19 @@
 	}
 }
 
+// wrapper is an object that modifies the signature of DrawInterfaceDrawCall
+// to match DrawInterfaceServiceDrawStream.  This needs to happen because the
+// anonymous interface returned by SendStream in DrawInterfaceDrawCall has
+// an extra method (Close) that we need to remove from the function signataure.
+type wrapper struct {
+	boxes.DrawInterfaceDrawCall
+}
+
+func (w *wrapper) SendStream() interface {
+	Send(b boxes.Box) error
+} {
+	return w.DrawInterfaceDrawCall.SendStream()
+}
 func (gs *goState) connectPeer() {
 	endpointStr, err := gs.signalling.Get(gs.runtime.TODOContext())
 	if err != nil {
@@ -278,9 +294,11 @@
 		panic(fmt.Errorf("failed BindDrawInterface:%v", err))
 	}
 	if !useStoreService {
-		if gs.drawStream, err = drawInterface.Draw(gs.runtime.TODOContext()); err != nil {
+		val, err := drawInterface.Draw(gs.runtime.TODOContext())
+		if err != nil {
 			panic(fmt.Errorf("failed to get handle to Draw stream:%v\n", err))
 		}
+		gs.drawStream = &wrapper{val}
 		go gs.streamBoxesLoop()
 	} else {
 		// Initialize the store sync service that listens for updates from a peer
diff --git a/examples/boxes/boxes.vdl.go b/examples/boxes/boxes.vdl.go
index c2488ae..01c2127 100644
--- a/examples/boxes/boxes.vdl.go
+++ b/examples/boxes/boxes.vdl.go
@@ -230,7 +230,7 @@
 // to enable embedding without method collisions.  Not to be used directly by clients.
 type DrawInterface_ExcludingUniversal interface {
 	// Draw is used to send/receive a stream of boxes to another peer
-	Draw(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply DrawInterfaceDrawStream, err error)
+	Draw(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply DrawInterfaceDrawCall, err error)
 	// SyncBoxes is used to setup a sync service over store to send/receive
 	// boxes to another peer
 	SyncBoxes(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (err error)
@@ -250,54 +250,58 @@
 	SyncBoxes(context _gen_ipc.ServerContext) (err error)
 }
 
-// DrawInterfaceDrawStream is the interface for streaming responses of the method
+// DrawInterfaceDrawCall is the interface for call object of the method
 // Draw in the service interface DrawInterface.
-type DrawInterfaceDrawStream interface {
+type DrawInterfaceDrawCall interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
-	// Send places the item onto the output stream, blocking if there is no
-	// buffer space available.  Calls to Send after having called CloseSend
-	// or Cancel will fail.  Any blocked Send calls will be unblocked upon
-	// calling Cancel.
-	Send(item Box) error
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() Box
 
-	// CloseSend indicates to the server that no more items will be sent;
-	// server Recv calls will receive io.EOF after all sent items.  This is
-	// an optional call - it's used by streaming clients that need the
-	// server to receive the io.EOF terminator before the client calls
-	// Finish (for example, if the client needs to continue receiving items
-	// from the server after having finished sending).
-	// Calls to CloseSend after having called Cancel will fail.
-	// Like Send, CloseSend blocks when there's no buffer space available.
-	CloseSend() error
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
 
-	// Advance stages an element so the client can retrieve it
-	// with Value.  Advance returns true iff there is an
-	// element to retrieve.  The client must call Advance before
-	// calling Value.  The client must call Cancel if it does
-	// not iterate through all elements (i.e. until Advance
-	// returns false).  Advance may block if an element is not
-	// immediately available.
-	Advance() bool
+	// SendStream returns the send portion of the stream
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no
+		// buffer space available.  Calls to Send after having called Close
+		// or Cancel will fail.  Any blocked Send calls will be unblocked upon
+		// calling Cancel.
+		Send(item Box) error
 
-	// Value returns the element that was staged by Advance.
-	// Value may panic if Advance returned false or was not
-	// called at all.  Value does not block.
-	Value() Box
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.  This is
+		// an optional call - it's used by streaming clients that need the
+		// server to receive the io.EOF terminator before the client calls
+		// Finish (for example, if the client needs to continue receiving items
+		// from the server after having finished sending).
+		// Calls to Close after having called Cancel will fail.
+		// Like Send, Close blocks when there's no buffer space available.
+		Close() error
+	}
 
-	// Err returns a non-nil error iff the stream encountered
-	// any errors.  Err does not block.
-	Err() error
-
-	// Finish performs the equivalent of CloseSend, then blocks until the server
+	// Finish performs the equivalent of SendStream().Close, then blocks until the server
 	// is done, and returns the positional return values for call.
-	//
 	// If Cancel has been called, Finish will return immediately; the output of
 	// Finish could either be an error signalling cancelation, or the correct
 	// positional return values from the server depending on the timing of the
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (err error)
 
@@ -307,56 +311,150 @@
 	Cancel()
 }
 
-// Implementation of the DrawInterfaceDrawStream interface that is not exported.
-type implDrawInterfaceDrawStream struct {
+type implDrawInterfaceDrawStreamSender struct {
+	clientCall _gen_ipc.Call
+}
+
+func (c *implDrawInterfaceDrawStreamSender) Send(item Box) error {
+	return c.clientCall.Send(item)
+}
+
+func (c *implDrawInterfaceDrawStreamSender) Close() error {
+	return c.clientCall.CloseSend()
+}
+
+type implDrawInterfaceDrawStreamIterator struct {
 	clientCall _gen_ipc.Call
 	val        Box
 	err        error
 }
 
-func (c *implDrawInterfaceDrawStream) Send(item Box) error {
-	return c.clientCall.Send(item)
-}
-
-func (c *implDrawInterfaceDrawStream) CloseSend() error {
-	return c.clientCall.CloseSend()
-}
-
-func (c *implDrawInterfaceDrawStream) Advance() bool {
+func (c *implDrawInterfaceDrawStreamIterator) Advance() bool {
 	c.val = Box{}
 	c.err = c.clientCall.Recv(&c.val)
 	return c.err == nil
 }
 
-func (c *implDrawInterfaceDrawStream) Value() Box {
+func (c *implDrawInterfaceDrawStreamIterator) Value() Box {
 	return c.val
 }
 
-func (c *implDrawInterfaceDrawStream) Err() error {
+func (c *implDrawInterfaceDrawStreamIterator) Err() error {
 	if c.err == _gen_io.EOF {
 		return nil
 	}
 	return c.err
 }
 
-func (c *implDrawInterfaceDrawStream) Finish() (err error) {
+// Implementation of the DrawInterfaceDrawCall interface that is not exported.
+type implDrawInterfaceDrawCall struct {
+	clientCall  _gen_ipc.Call
+	writeStream implDrawInterfaceDrawStreamSender
+	readStream  implDrawInterfaceDrawStreamIterator
+}
+
+func (c *implDrawInterfaceDrawCall) SendStream() interface {
+	Send(item Box) error
+	Close() error
+} {
+	return &c.writeStream
+}
+
+func (c *implDrawInterfaceDrawCall) RecvStream() interface {
+	Advance() bool
+	Value() Box
+	Err() error
+} {
+	return &c.readStream
+}
+
+func (c *implDrawInterfaceDrawCall) Finish() (err error) {
 	if ierr := c.clientCall.Finish(&err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implDrawInterfaceDrawStream) Cancel() {
+func (c *implDrawInterfaceDrawCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
+type implDrawInterfaceServiceDrawStreamSender struct {
+	serverCall _gen_ipc.ServerCall
+}
+
+func (s *implDrawInterfaceServiceDrawStreamSender) Send(item Box) error {
+	return s.serverCall.Send(item)
+}
+
+type implDrawInterfaceServiceDrawStreamIterator struct {
+	serverCall _gen_ipc.ServerCall
+	val        Box
+	err        error
+}
+
+func (s *implDrawInterfaceServiceDrawStreamIterator) Advance() bool {
+	s.err = s.serverCall.Recv(&s.val)
+	return s.err == nil
+}
+
+func (s *implDrawInterfaceServiceDrawStreamIterator) Value() Box {
+	return s.val
+}
+
+func (s *implDrawInterfaceServiceDrawStreamIterator) Err() error {
+	if s.err == _gen_io.EOF {
+		return nil
+	}
+	return s.err
+}
+
 // DrawInterfaceServiceDrawStream is the interface for streaming responses of the method
 // Draw in the service interface DrawInterface.
 type DrawInterfaceServiceDrawStream interface {
+	// SendStream returns the send portion of the stream.
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no buffer
+		// space available.  If the client has canceled, an error is returned.
+		Send(item Box) error
+	}
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
+
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() Box
+
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
+}
+
+// Implementation of the DrawInterfaceServiceDrawStream interface that is not exported.
+type implDrawInterfaceServiceDrawStream struct {
+	writer implDrawInterfaceServiceDrawStreamSender
+	reader implDrawInterfaceServiceDrawStreamIterator
+}
+
+func (s *implDrawInterfaceServiceDrawStream) SendStream() interface {
 	// Send places the item onto the output stream, blocking if there is no buffer
 	// space available.  If the client has canceled, an error is returned.
 	Send(item Box) error
+} {
+	return &s.writer
+}
 
+func (s *implDrawInterfaceServiceDrawStream) RecvStream() interface {
 	// Advance stages an element so the client can retrieve it
 	// with Value.  Advance returns true iff there is an
 	// element to retrieve.  The client must call Advance before
@@ -369,44 +467,13 @@
 	// Value returns the element that was staged by Advance.
 	// Value may panic if Advance returned false or was not
 	// called at all.  Value does not block.
-	//
-	// In general, Value is undefined if the underlying collection
-	// of elements changes while iteration is in progress.  If
-	// <DataProvider> supports concurrent modification, it should
-	// document its behavior.
 	Value() Box
 
 	// Err returns a non-nil error iff the stream encountered
 	// any errors.  Err does not block.
 	Err() error
-}
-
-// Implementation of the DrawInterfaceServiceDrawStream interface that is not exported.
-type implDrawInterfaceServiceDrawStream struct {
-	serverCall _gen_ipc.ServerCall
-	val        Box
-	err        error
-}
-
-func (s *implDrawInterfaceServiceDrawStream) Send(item Box) error {
-	return s.serverCall.Send(item)
-}
-
-func (s *implDrawInterfaceServiceDrawStream) Advance() bool {
-	s.val = Box{}
-	s.err = s.serverCall.Recv(&s.val)
-	return s.err == nil
-}
-
-func (s *implDrawInterfaceServiceDrawStream) Value() Box {
-	return s.val
-}
-
-func (s *implDrawInterfaceServiceDrawStream) Err() error {
-	if s.err == _gen_io.EOF {
-		return nil
-	}
-	return s.err
+} {
+	return &s.reader
 }
 
 // BindDrawInterface returns the client stub implementing the DrawInterface
@@ -450,12 +517,12 @@
 	name   string
 }
 
-func (__gen_c *clientStubDrawInterface) Draw(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply DrawInterfaceDrawStream, err error) {
+func (__gen_c *clientStubDrawInterface) Draw(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply DrawInterfaceDrawCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "Draw", nil, opts...); err != nil {
 		return
 	}
-	reply = &implDrawInterfaceDrawStream{clientCall: call}
+	reply = &implDrawInterfaceDrawCall{clientCall: call, writeStream: implDrawInterfaceDrawStreamSender{clientCall: call}, readStream: implDrawInterfaceDrawStreamIterator{clientCall: call}}
 	return
 }
 
@@ -573,7 +640,7 @@
 }
 
 func (__gen_s *ServerStubDrawInterface) Draw(call _gen_ipc.ServerCall) (err error) {
-	stream := &implDrawInterfaceServiceDrawStream{serverCall: call}
+	stream := &implDrawInterfaceServiceDrawStream{reader: implDrawInterfaceServiceDrawStreamIterator{serverCall: call}, writer: implDrawInterfaceServiceDrawStreamSender{serverCall: call}}
 	err = __gen_s.service.Draw(call, stream)
 	return
 }
diff --git a/examples/inspector/inspector.vdl.go b/examples/inspector/inspector.vdl.go
index 64abef6..3c4b76b 100644
--- a/examples/inspector/inspector.vdl.go
+++ b/examples/inspector/inspector.vdl.go
@@ -34,8 +34,8 @@
 // Inspector_ExcludingUniversal is the interface without internal framework-added methods
 // to enable embedding without method collisions.  Not to be used directly by clients.
 type Inspector_ExcludingUniversal interface {
-	Ls(ctx _gen_context.T, Glob string, opts ..._gen_ipc.CallOpt) (reply InspectorLsStream, err error)
-	LsDetails(ctx _gen_context.T, Glob string, opts ..._gen_ipc.CallOpt) (reply InspectorLsDetailsStream, err error)
+	Ls(ctx _gen_context.T, Glob string, opts ..._gen_ipc.CallOpt) (reply InspectorLsCall, err error)
+	LsDetails(ctx _gen_context.T, Glob string, opts ..._gen_ipc.CallOpt) (reply InspectorLsDetailsCall, err error)
 }
 type Inspector interface {
 	_gen_ipc.UniversalServiceMethods
@@ -48,27 +48,29 @@
 	LsDetails(context _gen_ipc.ServerContext, Glob string, stream InspectorServiceLsDetailsStream) (err error)
 }
 
-// InspectorLsStream is the interface for streaming responses of the method
+// InspectorLsCall is the interface for call object of the method
 // Ls in the service interface Inspector.
-type InspectorLsStream interface {
+type InspectorLsCall interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
-	// Advance stages an element so the client can retrieve it
-	// with Value.  Advance returns true iff there is an
-	// element to retrieve.  The client must call Advance before
-	// calling Value.  The client must call Cancel if it does
-	// not iterate through all elements (i.e. until Advance
-	// returns false).  Advance may block if an element is not
-	// immediately available.
-	Advance() bool
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() string
 
-	// Value returns the element that was staged by Advance.
-	// Value may panic if Advance returned false or was not
-	// called at all.  Value does not block.
-	Value() string
-
-	// Err returns a non-nil error iff the stream encountered
-	// any errors.  Err does not block.
-	Err() error
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
 
 	// Finish blocks until the server is done and returns the positional
 	// return values for call.
@@ -79,7 +81,7 @@
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (err error)
 
@@ -89,78 +91,108 @@
 	Cancel()
 }
 
-// Implementation of the InspectorLsStream interface that is not exported.
-type implInspectorLsStream struct {
+type implInspectorLsStreamIterator struct {
 	clientCall _gen_ipc.Call
 	val        string
 	err        error
 }
 
-func (c *implInspectorLsStream) Advance() bool {
+func (c *implInspectorLsStreamIterator) Advance() bool {
 	c.err = c.clientCall.Recv(&c.val)
 	return c.err == nil
 }
 
-func (c *implInspectorLsStream) Value() string {
+func (c *implInspectorLsStreamIterator) Value() string {
 	return c.val
 }
 
-func (c *implInspectorLsStream) Err() error {
+func (c *implInspectorLsStreamIterator) Err() error {
 	if c.err == _gen_io.EOF {
 		return nil
 	}
 	return c.err
 }
 
-func (c *implInspectorLsStream) Finish() (err error) {
+// Implementation of the InspectorLsCall interface that is not exported.
+type implInspectorLsCall struct {
+	clientCall _gen_ipc.Call
+	readStream implInspectorLsStreamIterator
+}
+
+func (c *implInspectorLsCall) RecvStream() interface {
+	Advance() bool
+	Value() string
+	Err() error
+} {
+	return &c.readStream
+}
+
+func (c *implInspectorLsCall) Finish() (err error) {
 	if ierr := c.clientCall.Finish(&err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implInspectorLsStream) Cancel() {
+func (c *implInspectorLsCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
+type implInspectorServiceLsStreamSender struct {
+	serverCall _gen_ipc.ServerCall
+}
+
+func (s *implInspectorServiceLsStreamSender) Send(item string) error {
+	return s.serverCall.Send(item)
+}
+
 // InspectorServiceLsStream is the interface for streaming responses of the method
 // Ls in the service interface Inspector.
 type InspectorServiceLsStream interface {
-	// Send places the item onto the output stream, blocking if there is no buffer
-	// space available.  If the client has canceled, an error is returned.
-	Send(item string) error
+	// SendStream returns the send portion of the stream.
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no buffer
+		// space available.  If the client has canceled, an error is returned.
+		Send(item string) error
+	}
 }
 
 // Implementation of the InspectorServiceLsStream interface that is not exported.
 type implInspectorServiceLsStream struct {
-	serverCall _gen_ipc.ServerCall
+	writer implInspectorServiceLsStreamSender
 }
 
-func (s *implInspectorServiceLsStream) Send(item string) error {
-	return s.serverCall.Send(item)
+func (s *implInspectorServiceLsStream) SendStream() interface {
+	// Send places the item onto the output stream, blocking if there is no buffer
+	// space available.  If the client has canceled, an error is returned.
+	Send(item string) error
+} {
+	return &s.writer
 }
 
-// InspectorLsDetailsStream is the interface for streaming responses of the method
+// InspectorLsDetailsCall is the interface for call object of the method
 // LsDetails in the service interface Inspector.
-type InspectorLsDetailsStream interface {
+type InspectorLsDetailsCall interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
-	// Advance stages an element so the client can retrieve it
-	// with Value.  Advance returns true iff there is an
-	// element to retrieve.  The client must call Advance before
-	// calling Value.  The client must call Cancel if it does
-	// not iterate through all elements (i.e. until Advance
-	// returns false).  Advance may block if an element is not
-	// immediately available.
-	Advance() bool
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() Details
 
-	// Value returns the element that was staged by Advance.
-	// Value may panic if Advance returned false or was not
-	// called at all.  Value does not block.
-	Value() Details
-
-	// Err returns a non-nil error iff the stream encountered
-	// any errors.  Err does not block.
-	Err() error
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
 
 	// Finish blocks until the server is done and returns the positional
 	// return values for call.
@@ -171,7 +203,7 @@
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (err error)
 
@@ -181,56 +213,84 @@
 	Cancel()
 }
 
-// Implementation of the InspectorLsDetailsStream interface that is not exported.
-type implInspectorLsDetailsStream struct {
+type implInspectorLsDetailsStreamIterator struct {
 	clientCall _gen_ipc.Call
 	val        Details
 	err        error
 }
 
-func (c *implInspectorLsDetailsStream) Advance() bool {
+func (c *implInspectorLsDetailsStreamIterator) Advance() bool {
 	c.val = Details{}
 	c.err = c.clientCall.Recv(&c.val)
 	return c.err == nil
 }
 
-func (c *implInspectorLsDetailsStream) Value() Details {
+func (c *implInspectorLsDetailsStreamIterator) Value() Details {
 	return c.val
 }
 
-func (c *implInspectorLsDetailsStream) Err() error {
+func (c *implInspectorLsDetailsStreamIterator) Err() error {
 	if c.err == _gen_io.EOF {
 		return nil
 	}
 	return c.err
 }
 
-func (c *implInspectorLsDetailsStream) Finish() (err error) {
+// Implementation of the InspectorLsDetailsCall interface that is not exported.
+type implInspectorLsDetailsCall struct {
+	clientCall _gen_ipc.Call
+	readStream implInspectorLsDetailsStreamIterator
+}
+
+func (c *implInspectorLsDetailsCall) RecvStream() interface {
+	Advance() bool
+	Value() Details
+	Err() error
+} {
+	return &c.readStream
+}
+
+func (c *implInspectorLsDetailsCall) Finish() (err error) {
 	if ierr := c.clientCall.Finish(&err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implInspectorLsDetailsStream) Cancel() {
+func (c *implInspectorLsDetailsCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
+type implInspectorServiceLsDetailsStreamSender struct {
+	serverCall _gen_ipc.ServerCall
+}
+
+func (s *implInspectorServiceLsDetailsStreamSender) Send(item Details) error {
+	return s.serverCall.Send(item)
+}
+
 // InspectorServiceLsDetailsStream is the interface for streaming responses of the method
 // LsDetails in the service interface Inspector.
 type InspectorServiceLsDetailsStream interface {
-	// Send places the item onto the output stream, blocking if there is no buffer
-	// space available.  If the client has canceled, an error is returned.
-	Send(item Details) error
+	// SendStream returns the send portion of the stream.
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no buffer
+		// space available.  If the client has canceled, an error is returned.
+		Send(item Details) error
+	}
 }
 
 // Implementation of the InspectorServiceLsDetailsStream interface that is not exported.
 type implInspectorServiceLsDetailsStream struct {
-	serverCall _gen_ipc.ServerCall
+	writer implInspectorServiceLsDetailsStreamSender
 }
 
-func (s *implInspectorServiceLsDetailsStream) Send(item Details) error {
-	return s.serverCall.Send(item)
+func (s *implInspectorServiceLsDetailsStream) SendStream() interface {
+	// Send places the item onto the output stream, blocking if there is no buffer
+	// space available.  If the client has canceled, an error is returned.
+	Send(item Details) error
+} {
+	return &s.writer
 }
 
 // BindInspector returns the client stub implementing the Inspector
@@ -274,21 +334,21 @@
 	name   string
 }
 
-func (__gen_c *clientStubInspector) Ls(ctx _gen_context.T, Glob string, opts ..._gen_ipc.CallOpt) (reply InspectorLsStream, err error) {
+func (__gen_c *clientStubInspector) Ls(ctx _gen_context.T, Glob string, opts ..._gen_ipc.CallOpt) (reply InspectorLsCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "Ls", []interface{}{Glob}, opts...); err != nil {
 		return
 	}
-	reply = &implInspectorLsStream{clientCall: call}
+	reply = &implInspectorLsCall{clientCall: call, readStream: implInspectorLsStreamIterator{clientCall: call}}
 	return
 }
 
-func (__gen_c *clientStubInspector) LsDetails(ctx _gen_context.T, Glob string, opts ..._gen_ipc.CallOpt) (reply InspectorLsDetailsStream, err error) {
+func (__gen_c *clientStubInspector) LsDetails(ctx _gen_context.T, Glob string, opts ..._gen_ipc.CallOpt) (reply InspectorLsDetailsCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "LsDetails", []interface{}{Glob}, opts...); err != nil {
 		return
 	}
-	reply = &implInspectorLsDetailsStream{clientCall: call}
+	reply = &implInspectorLsDetailsCall{clientCall: call, readStream: implInspectorLsDetailsStreamIterator{clientCall: call}}
 	return
 }
 
@@ -404,13 +464,13 @@
 }
 
 func (__gen_s *ServerStubInspector) Ls(call _gen_ipc.ServerCall, Glob string) (err error) {
-	stream := &implInspectorServiceLsStream{serverCall: call}
+	stream := &implInspectorServiceLsStream{writer: implInspectorServiceLsStreamSender{serverCall: call}}
 	err = __gen_s.service.Ls(call, Glob, stream)
 	return
 }
 
 func (__gen_s *ServerStubInspector) LsDetails(call _gen_ipc.ServerCall, Glob string) (err error) {
-	stream := &implInspectorServiceLsDetailsStream{serverCall: call}
+	stream := &implInspectorServiceLsDetailsStream{writer: implInspectorServiceLsDetailsStreamSender{serverCall: call}}
 	err = __gen_s.service.LsDetails(call, Glob, stream)
 	return
 }
diff --git a/examples/inspector/inspector/main.go b/examples/inspector/inspector/main.go
index e542c8d..2d1de9e 100644
--- a/examples/inspector/inspector/main.go
+++ b/examples/inspector/inspector/main.go
@@ -127,13 +127,14 @@
 }
 
 // streamNames and streamDetails are idiomatic for use with stubs
-func streamNames(stream inspector.InspectorLsStream) {
-	for stream.Advance() {
-		name := stream.Value()
+func streamNames(stream inspector.InspectorLsCall) {
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		name := rStream.Value()
 		fmt.Printf("%s\n", name)
 	}
 
-	if err := stream.Err(); err != nil {
+	if err := rStream.Err(); err != nil {
 		vlog.Fatalf("unexpected streaming error: %q", err)
 	}
 	if err := stream.Finish(); err != nil && err != io.EOF {
@@ -141,15 +142,16 @@
 	}
 }
 
-func streamDetails(stream inspector.InspectorLsDetailsStream) {
-	for stream.Advance() {
-		details := stream.Value()
+func streamDetails(stream inspector.InspectorLsDetailsCall) {
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		details := rStream.Value()
 		mode := os.FileMode(details.Mode)
 		modtime := time.Unix(details.ModUnixSecs, int64(details.ModNano))
 		fmt.Printf("%s: %d %s %s%s\n", details.Name, details.Size, mode, modtime, map[bool]string{false: "", true: "/"}[details.IsDir])
 	}
 
-	if err := stream.Err(); err != nil {
+	if err := rStream.Err(); err != nil {
 		vlog.Fatalf("unexpected streaming error: %q", err)
 	}
 
diff --git a/examples/inspector/inspectord/services.go b/examples/inspector/inspectord/services.go
index 0a936bc..1a4d8b2 100644
--- a/examples/inspector/inspectord/services.go
+++ b/examples/inspector/inspectord/services.go
@@ -76,9 +76,9 @@
 
 func (s *stubbedServer) send(fi os.FileInfo, details bool) error {
 	if !details {
-		return s.names.Send(fi.Name())
+		return s.names.SendStream().Send(fi.Name())
 	}
-	return s.details.Send(inspector.Details{
+	return s.details.SendStream().Send(inspector.Details{
 		Name:        fi.Name(),
 		Size:        fi.Size(),
 		Mode:        uint32(fi.Mode()),
diff --git a/examples/pipetobrowser/p2b.vdl.go b/examples/pipetobrowser/p2b.vdl.go
index 9d35841..2232063 100644
--- a/examples/pipetobrowser/p2b.vdl.go
+++ b/examples/pipetobrowser/p2b.vdl.go
@@ -24,7 +24,7 @@
 // to enable embedding without method collisions.  Not to be used directly by clients.
 type Viewer_ExcludingUniversal interface {
 	// Pipe creates a bidirectional pipe between client and viewer service, returns total number of bytes received by the service after streaming ends
-	Pipe(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply ViewerPipeStream, err error)
+	Pipe(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply ViewerPipeCall, err error)
 }
 type Viewer interface {
 	_gen_ipc.UniversalServiceMethods
@@ -38,36 +38,38 @@
 	Pipe(context _gen_ipc.ServerContext, stream ViewerServicePipeStream) (reply _gen_vdlutil.Any, err error)
 }
 
-// ViewerPipeStream is the interface for streaming responses of the method
+// ViewerPipeCall is the interface for call object of the method
 // Pipe in the service interface Viewer.
-type ViewerPipeStream interface {
+type ViewerPipeCall interface {
 
-	// Send places the item onto the output stream, blocking if there is no
-	// buffer space available.  Calls to Send after having called CloseSend
-	// or Cancel will fail.  Any blocked Send calls will be unblocked upon
-	// calling Cancel.
-	Send(item []byte) error
+	// SendStream returns the send portion of the stream
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no
+		// buffer space available.  Calls to Send after having called Close
+		// or Cancel will fail.  Any blocked Send calls will be unblocked upon
+		// calling Cancel.
+		Send(item []byte) error
 
-	// CloseSend indicates to the server that no more items will be sent;
-	// server Recv calls will receive io.EOF after all sent items.  This is
-	// an optional call - it's used by streaming clients that need the
-	// server to receive the io.EOF terminator before the client calls
-	// Finish (for example, if the client needs to continue receiving items
-	// from the server after having finished sending).
-	// Calls to CloseSend after having called Cancel will fail.
-	// Like Send, CloseSend blocks when there's no buffer space available.
-	CloseSend() error
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.  This is
+		// an optional call - it's used by streaming clients that need the
+		// server to receive the io.EOF terminator before the client calls
+		// Finish (for example, if the client needs to continue receiving items
+		// from the server after having finished sending).
+		// Calls to Close after having called Cancel will fail.
+		// Like Send, Close blocks when there's no buffer space available.
+		Close() error
+	}
 
-	// Finish performs the equivalent of CloseSend, then blocks until the server
+	// Finish performs the equivalent of SendStream().Close, then blocks until the server
 	// is done, and returns the positional return values for call.
-	//
 	// If Cancel has been called, Finish will return immediately; the output of
 	// Finish could either be an error signalling cancelation, or the correct
 	// positional return values from the server depending on the timing of the
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (reply _gen_vdlutil.Any, err error)
 
@@ -77,34 +79,95 @@
 	Cancel()
 }
 
-// Implementation of the ViewerPipeStream interface that is not exported.
-type implViewerPipeStream struct {
+type implViewerPipeStreamSender struct {
 	clientCall _gen_ipc.Call
 }
 
-func (c *implViewerPipeStream) Send(item []byte) error {
+func (c *implViewerPipeStreamSender) Send(item []byte) error {
 	return c.clientCall.Send(item)
 }
 
-func (c *implViewerPipeStream) CloseSend() error {
+func (c *implViewerPipeStreamSender) Close() error {
 	return c.clientCall.CloseSend()
 }
 
-func (c *implViewerPipeStream) Finish() (reply _gen_vdlutil.Any, err error) {
+// Implementation of the ViewerPipeCall interface that is not exported.
+type implViewerPipeCall struct {
+	clientCall  _gen_ipc.Call
+	writeStream implViewerPipeStreamSender
+}
+
+func (c *implViewerPipeCall) SendStream() interface {
+	Send(item []byte) error
+	Close() error
+} {
+	return &c.writeStream
+}
+
+func (c *implViewerPipeCall) Finish() (reply _gen_vdlutil.Any, err error) {
 	if ierr := c.clientCall.Finish(&reply, &err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implViewerPipeStream) Cancel() {
+func (c *implViewerPipeCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
+type implViewerServicePipeStreamIterator struct {
+	serverCall _gen_ipc.ServerCall
+	val        []byte
+	err        error
+}
+
+func (s *implViewerServicePipeStreamIterator) Advance() bool {
+	s.err = s.serverCall.Recv(&s.val)
+	return s.err == nil
+}
+
+func (s *implViewerServicePipeStreamIterator) Value() []byte {
+	return s.val
+}
+
+func (s *implViewerServicePipeStreamIterator) Err() error {
+	if s.err == _gen_io.EOF {
+		return nil
+	}
+	return s.err
+}
+
 // ViewerServicePipeStream is the interface for streaming responses of the method
 // Pipe in the service interface Viewer.
 type ViewerServicePipeStream interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() []byte
+
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
+}
+
+// Implementation of the ViewerServicePipeStream interface that is not exported.
+type implViewerServicePipeStream struct {
+	reader implViewerServicePipeStreamIterator
+}
+
+func (s *implViewerServicePipeStream) RecvStream() interface {
 	// Advance stages an element so the client can retrieve it
 	// with Value.  Advance returns true iff there is an
 	// element to retrieve.  The client must call Advance before
@@ -117,39 +180,13 @@
 	// Value returns the element that was staged by Advance.
 	// Value may panic if Advance returned false or was not
 	// called at all.  Value does not block.
-	//
-	// In general, Value is undefined if the underlying collection
-	// of elements changes while iteration is in progress.  If
-	// <DataProvider> supports concurrent modification, it should
-	// document its behavior.
 	Value() []byte
 
 	// Err returns a non-nil error iff the stream encountered
 	// any errors.  Err does not block.
 	Err() error
-}
-
-// Implementation of the ViewerServicePipeStream interface that is not exported.
-type implViewerServicePipeStream struct {
-	serverCall _gen_ipc.ServerCall
-	val        []byte
-	err        error
-}
-
-func (s *implViewerServicePipeStream) Advance() bool {
-	s.err = s.serverCall.Recv(&s.val)
-	return s.err == nil
-}
-
-func (s *implViewerServicePipeStream) Value() []byte {
-	return s.val
-}
-
-func (s *implViewerServicePipeStream) Err() error {
-	if s.err == _gen_io.EOF {
-		return nil
-	}
-	return s.err
+} {
+	return &s.reader
 }
 
 // BindViewer returns the client stub implementing the Viewer
@@ -193,12 +230,12 @@
 	name   string
 }
 
-func (__gen_c *clientStubViewer) Pipe(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply ViewerPipeStream, err error) {
+func (__gen_c *clientStubViewer) Pipe(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply ViewerPipeCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "Pipe", nil, opts...); err != nil {
 		return
 	}
-	reply = &implViewerPipeStream{clientCall: call}
+	reply = &implViewerPipeCall{clientCall: call, writeStream: implViewerPipeStreamSender{clientCall: call}}
 	return
 }
 
@@ -290,7 +327,7 @@
 }
 
 func (__gen_s *ServerStubViewer) Pipe(call _gen_ipc.ServerCall) (reply _gen_vdlutil.Any, err error) {
-	stream := &implViewerServicePipeStream{serverCall: call}
+	stream := &implViewerServicePipeStream{reader: implViewerServicePipeStreamIterator{serverCall: call}}
 	reply, err = __gen_s.service.Pipe(call, stream)
 	return
 }
diff --git a/examples/pipetobrowser/p2b/main.go b/examples/pipetobrowser/p2b/main.go
index 1e47acb..3aeff5f 100644
--- a/examples/pipetobrowser/p2b/main.go
+++ b/examples/pipetobrowser/p2b/main.go
@@ -46,9 +46,13 @@
 	fmt.Fprintf(os.Stdout, usage, os.Args[0], os.Args[0])
 }
 
+type sender interface {
+	Send(p []byte) error
+}
+
 // viewerPipeStreamWriter adapts ViewerPipeStream to io.Writer
 type viewerPipeStreamWriter struct {
-	pipetobrowser.ViewerPipeStream
+	sender
 }
 
 func (w viewerPipeStreamWriter) Write(p []byte) (n int, err error) {
@@ -81,9 +85,7 @@
 		return
 	}
 
-	w := viewerPipeStreamWriter{
-		stream,
-	}
+	w := viewerPipeStreamWriter{stream.SendStream()}
 
 	_, err = io.Copy(w, os.Stdin)
 	if err != nil {
diff --git a/examples/rockpaperscissors/impl/judge.go b/examples/rockpaperscissors/impl/judge.go
index 2729b66..d55d7e2 100644
--- a/examples/rockpaperscissors/impl/judge.go
+++ b/examples/rockpaperscissors/impl/judge.go
@@ -25,11 +25,15 @@
 	gamesRun common.Counter
 }
 
+type sendStream interface {
+	Send(item rps.JudgeAction) error
+}
+
 type gameInfo struct {
 	id         rps.GameID
 	startTime  time.Time
 	score      rps.ScoreCard
-	streams    []rps.JudgeServicePlayStream
+	streams    []sendStream
 	playerChan chan playerInput
 	scoreChan  chan scoreData
 }
@@ -114,8 +118,9 @@
 	done := make(chan struct{}, 1)
 	defer func() { done <- struct{}{} }()
 	go func() {
-		for stream.Advance() {
-			action := stream.Value()
+		rStream := stream.RecvStream()
+		for rStream.Advance() {
+			action := rStream.Value()
 
 			select {
 			case c <- playerInput{player: playerNum, action: action}:
@@ -129,7 +134,7 @@
 		}
 	}()
 
-	if err := stream.Send(rps.JudgeAction{PlayerNum: int32(playerNum)}); err != nil {
+	if err := stream.SendStream().Send(rps.JudgeAction{PlayerNum: int32(playerNum)}); err != nil {
 		return nilResult, err
 	}
 	// When the second player connects, we start the game.
@@ -268,7 +273,7 @@
 		return 0, errTooManyPlayers
 	}
 	info.score.Players = append(info.score.Players, name)
-	info.streams = append(info.streams, stream)
+	info.streams = append(info.streams, stream.SendStream())
 	return len(info.streams), nil
 }
 
diff --git a/examples/rockpaperscissors/impl/player.go b/examples/rockpaperscissors/impl/player.go
index 258ebf4..7162042 100644
--- a/examples/rockpaperscissors/impl/player.go
+++ b/examples/rockpaperscissors/impl/player.go
@@ -116,8 +116,10 @@
 	if err != nil {
 		return rps.PlayResult{}, err
 	}
-	for game.Advance() {
-		in := game.Value()
+	rStream := game.RecvStream()
+	sender := game.SendStream()
+	for rStream.Advance() {
+		in := rStream.Value()
 
 		if in.PlayerNum > 0 {
 			vlog.VI(1).Infof("I'm player %d", in.PlayerNum)
@@ -128,7 +130,7 @@
 		if len(in.MoveOptions) > 0 {
 			n := rand.Intn(len(in.MoveOptions))
 			vlog.VI(1).Infof("My turn to play. Picked %q from %v", in.MoveOptions[n], in.MoveOptions)
-			if err := game.Send(rps.PlayerAction{Move: in.MoveOptions[n]}); err != nil {
+			if err := sender.Send(rps.PlayerAction{Move: in.MoveOptions[n]}); err != nil {
 				return rps.PlayResult{}, err
 			}
 		}
@@ -141,7 +143,7 @@
 		}
 	}
 
-	if err := game.Err(); err != nil {
+	if err := rStream.Err(); err != nil {
 		vlog.Infof("stream error: %v", err)
 	} else {
 		vlog.VI(1).Infof("Game Ended")
diff --git a/examples/rockpaperscissors/rpsplayercli/main.go b/examples/rockpaperscissors/rpsplayercli/main.go
index b02ce5a..fa56b89 100644
--- a/examples/rockpaperscissors/rpsplayercli/main.go
+++ b/examples/rockpaperscissors/rpsplayercli/main.go
@@ -200,8 +200,9 @@
 		return rps.PlayResult{}, err
 	}
 	var playerNum int32
-	for game.Advance() {
-		in := game.Value()
+	rStream := game.RecvStream()
+	for rStream.Advance() {
+		in := rStream.Value()
 		if in.PlayerNum > 0 {
 			playerNum = in.PlayerNum
 			fmt.Printf("You are player %d\n", in.PlayerNum)
@@ -230,7 +231,7 @@
 			fmt.Println()
 			fmt.Println("Choose your weapon:")
 			m := selectOne(in.MoveOptions)
-			if err := game.Send(rps.PlayerAction{Move: in.MoveOptions[m]}); err != nil {
+			if err := game.SendStream().Send(rps.PlayerAction{Move: in.MoveOptions[m]}); err != nil {
 				return rps.PlayResult{}, err
 			}
 		}
@@ -246,7 +247,7 @@
 			}
 		}
 	}
-	if err := game.Err(); err == nil {
+	if err := rStream.Err(); err == nil {
 		fmt.Println("Game Ended")
 	} else {
 		vlog.Infof("stream error: %v", err)
diff --git a/examples/rockpaperscissors/service.vdl.go b/examples/rockpaperscissors/service.vdl.go
index 1e9979c..767f9d2 100644
--- a/examples/rockpaperscissors/service.vdl.go
+++ b/examples/rockpaperscissors/service.vdl.go
@@ -92,7 +92,7 @@
 	// identifier that can be used by the players to join the game.
 	CreateGame(ctx _gen_context.T, Opts GameOptions, opts ..._gen_ipc.CallOpt) (reply GameID, err error)
 	// Play lets a player join an existing game and play.
-	Play(ctx _gen_context.T, ID GameID, opts ..._gen_ipc.CallOpt) (reply JudgePlayStream, err error)
+	Play(ctx _gen_context.T, ID GameID, opts ..._gen_ipc.CallOpt) (reply JudgePlayCall, err error)
 }
 type Judge interface {
 	_gen_ipc.UniversalServiceMethods
@@ -109,54 +109,58 @@
 	Play(context _gen_ipc.ServerContext, ID GameID, stream JudgeServicePlayStream) (reply PlayResult, err error)
 }
 
-// JudgePlayStream is the interface for streaming responses of the method
+// JudgePlayCall is the interface for call object of the method
 // Play in the service interface Judge.
-type JudgePlayStream interface {
+type JudgePlayCall interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
-	// Send places the item onto the output stream, blocking if there is no
-	// buffer space available.  Calls to Send after having called CloseSend
-	// or Cancel will fail.  Any blocked Send calls will be unblocked upon
-	// calling Cancel.
-	Send(item PlayerAction) error
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() JudgeAction
 
-	// CloseSend indicates to the server that no more items will be sent;
-	// server Recv calls will receive io.EOF after all sent items.  This is
-	// an optional call - it's used by streaming clients that need the
-	// server to receive the io.EOF terminator before the client calls
-	// Finish (for example, if the client needs to continue receiving items
-	// from the server after having finished sending).
-	// Calls to CloseSend after having called Cancel will fail.
-	// Like Send, CloseSend blocks when there's no buffer space available.
-	CloseSend() error
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
 
-	// Advance stages an element so the client can retrieve it
-	// with Value.  Advance returns true iff there is an
-	// element to retrieve.  The client must call Advance before
-	// calling Value.  The client must call Cancel if it does
-	// not iterate through all elements (i.e. until Advance
-	// returns false).  Advance may block if an element is not
-	// immediately available.
-	Advance() bool
+	// SendStream returns the send portion of the stream
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no
+		// buffer space available.  Calls to Send after having called Close
+		// or Cancel will fail.  Any blocked Send calls will be unblocked upon
+		// calling Cancel.
+		Send(item PlayerAction) error
 
-	// Value returns the element that was staged by Advance.
-	// Value may panic if Advance returned false or was not
-	// called at all.  Value does not block.
-	Value() JudgeAction
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.  This is
+		// an optional call - it's used by streaming clients that need the
+		// server to receive the io.EOF terminator before the client calls
+		// Finish (for example, if the client needs to continue receiving items
+		// from the server after having finished sending).
+		// Calls to Close after having called Cancel will fail.
+		// Like Send, Close blocks when there's no buffer space available.
+		Close() error
+	}
 
-	// Err returns a non-nil error iff the stream encountered
-	// any errors.  Err does not block.
-	Err() error
-
-	// Finish performs the equivalent of CloseSend, then blocks until the server
+	// Finish performs the equivalent of SendStream().Close, then blocks until the server
 	// is done, and returns the positional return values for call.
-	//
 	// If Cancel has been called, Finish will return immediately; the output of
 	// Finish could either be an error signalling cancelation, or the correct
 	// positional return values from the server depending on the timing of the
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (reply PlayResult, err error)
 
@@ -166,56 +170,150 @@
 	Cancel()
 }
 
-// Implementation of the JudgePlayStream interface that is not exported.
-type implJudgePlayStream struct {
+type implJudgePlayStreamSender struct {
+	clientCall _gen_ipc.Call
+}
+
+func (c *implJudgePlayStreamSender) Send(item PlayerAction) error {
+	return c.clientCall.Send(item)
+}
+
+func (c *implJudgePlayStreamSender) Close() error {
+	return c.clientCall.CloseSend()
+}
+
+type implJudgePlayStreamIterator struct {
 	clientCall _gen_ipc.Call
 	val        JudgeAction
 	err        error
 }
 
-func (c *implJudgePlayStream) Send(item PlayerAction) error {
-	return c.clientCall.Send(item)
-}
-
-func (c *implJudgePlayStream) CloseSend() error {
-	return c.clientCall.CloseSend()
-}
-
-func (c *implJudgePlayStream) Advance() bool {
+func (c *implJudgePlayStreamIterator) Advance() bool {
 	c.val = JudgeAction{}
 	c.err = c.clientCall.Recv(&c.val)
 	return c.err == nil
 }
 
-func (c *implJudgePlayStream) Value() JudgeAction {
+func (c *implJudgePlayStreamIterator) Value() JudgeAction {
 	return c.val
 }
 
-func (c *implJudgePlayStream) Err() error {
+func (c *implJudgePlayStreamIterator) Err() error {
 	if c.err == _gen_io.EOF {
 		return nil
 	}
 	return c.err
 }
 
-func (c *implJudgePlayStream) Finish() (reply PlayResult, err error) {
+// Implementation of the JudgePlayCall interface that is not exported.
+type implJudgePlayCall struct {
+	clientCall  _gen_ipc.Call
+	writeStream implJudgePlayStreamSender
+	readStream  implJudgePlayStreamIterator
+}
+
+func (c *implJudgePlayCall) SendStream() interface {
+	Send(item PlayerAction) error
+	Close() error
+} {
+	return &c.writeStream
+}
+
+func (c *implJudgePlayCall) RecvStream() interface {
+	Advance() bool
+	Value() JudgeAction
+	Err() error
+} {
+	return &c.readStream
+}
+
+func (c *implJudgePlayCall) Finish() (reply PlayResult, err error) {
 	if ierr := c.clientCall.Finish(&reply, &err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implJudgePlayStream) Cancel() {
+func (c *implJudgePlayCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
+type implJudgeServicePlayStreamSender struct {
+	serverCall _gen_ipc.ServerCall
+}
+
+func (s *implJudgeServicePlayStreamSender) Send(item JudgeAction) error {
+	return s.serverCall.Send(item)
+}
+
+type implJudgeServicePlayStreamIterator struct {
+	serverCall _gen_ipc.ServerCall
+	val        PlayerAction
+	err        error
+}
+
+func (s *implJudgeServicePlayStreamIterator) Advance() bool {
+	s.err = s.serverCall.Recv(&s.val)
+	return s.err == nil
+}
+
+func (s *implJudgeServicePlayStreamIterator) Value() PlayerAction {
+	return s.val
+}
+
+func (s *implJudgeServicePlayStreamIterator) Err() error {
+	if s.err == _gen_io.EOF {
+		return nil
+	}
+	return s.err
+}
+
 // JudgeServicePlayStream is the interface for streaming responses of the method
 // Play in the service interface Judge.
 type JudgeServicePlayStream interface {
+	// SendStream returns the send portion of the stream.
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no buffer
+		// space available.  If the client has canceled, an error is returned.
+		Send(item JudgeAction) error
+	}
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
+
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() PlayerAction
+
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
+}
+
+// Implementation of the JudgeServicePlayStream interface that is not exported.
+type implJudgeServicePlayStream struct {
+	writer implJudgeServicePlayStreamSender
+	reader implJudgeServicePlayStreamIterator
+}
+
+func (s *implJudgeServicePlayStream) SendStream() interface {
 	// Send places the item onto the output stream, blocking if there is no buffer
 	// space available.  If the client has canceled, an error is returned.
 	Send(item JudgeAction) error
+} {
+	return &s.writer
+}
 
+func (s *implJudgeServicePlayStream) RecvStream() interface {
 	// Advance stages an element so the client can retrieve it
 	// with Value.  Advance returns true iff there is an
 	// element to retrieve.  The client must call Advance before
@@ -228,44 +326,13 @@
 	// Value returns the element that was staged by Advance.
 	// Value may panic if Advance returned false or was not
 	// called at all.  Value does not block.
-	//
-	// In general, Value is undefined if the underlying collection
-	// of elements changes while iteration is in progress.  If
-	// <DataProvider> supports concurrent modification, it should
-	// document its behavior.
 	Value() PlayerAction
 
 	// Err returns a non-nil error iff the stream encountered
 	// any errors.  Err does not block.
 	Err() error
-}
-
-// Implementation of the JudgeServicePlayStream interface that is not exported.
-type implJudgeServicePlayStream struct {
-	serverCall _gen_ipc.ServerCall
-	val        PlayerAction
-	err        error
-}
-
-func (s *implJudgeServicePlayStream) Send(item JudgeAction) error {
-	return s.serverCall.Send(item)
-}
-
-func (s *implJudgeServicePlayStream) Advance() bool {
-	s.val = PlayerAction{}
-	s.err = s.serverCall.Recv(&s.val)
-	return s.err == nil
-}
-
-func (s *implJudgeServicePlayStream) Value() PlayerAction {
-	return s.val
-}
-
-func (s *implJudgeServicePlayStream) Err() error {
-	if s.err == _gen_io.EOF {
-		return nil
-	}
-	return s.err
+} {
+	return &s.reader
 }
 
 // BindJudge returns the client stub implementing the Judge
@@ -320,12 +387,12 @@
 	return
 }
 
-func (__gen_c *clientStubJudge) Play(ctx _gen_context.T, ID GameID, opts ..._gen_ipc.CallOpt) (reply JudgePlayStream, err error) {
+func (__gen_c *clientStubJudge) Play(ctx _gen_context.T, ID GameID, opts ..._gen_ipc.CallOpt) (reply JudgePlayCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "Play", []interface{}{ID}, opts...); err != nil {
 		return
 	}
-	reply = &implJudgePlayStream{clientCall: call}
+	reply = &implJudgePlayCall{clientCall: call, writeStream: implJudgePlayStreamSender{clientCall: call}, readStream: implJudgePlayStreamIterator{clientCall: call}}
 	return
 }
 
@@ -487,7 +554,7 @@
 }
 
 func (__gen_s *ServerStubJudge) Play(call _gen_ipc.ServerCall, ID GameID) (reply PlayResult, err error) {
-	stream := &implJudgeServicePlayStream{serverCall: call}
+	stream := &implJudgeServicePlayStream{reader: implJudgeServicePlayStreamIterator{serverCall: call}, writer: implJudgeServicePlayStreamSender{serverCall: call}}
 	reply, err = __gen_s.service.Play(call, ID, stream)
 	return
 }
diff --git a/examples/stfortune/stfortune/main.go b/examples/stfortune/stfortune/main.go
index 500b27d..4b4eff1 100644
--- a/examples/stfortune/stfortune/main.go
+++ b/examples/stfortune/stfortune/main.go
@@ -61,8 +61,8 @@
 		if err != nil {
 			log.Fatalf("WatchGlob %s failed: %v", path, err)
 		}
-		if !stream.Advance() {
-			log.Fatalf("waitForStore, path: %s, Advance failed: %v", path, stream.Err())
+		if !stream.RecvStream().Advance() {
+			log.Fatalf("waitForStore, path: %s, Advance failed: %v", path, stream.RecvStream().Err())
 		}
 		stream.Cancel()
 	}
@@ -93,8 +93,9 @@
 		log.Fatalf("watcher WatchGlob %s failed: %v", path, err)
 	}
 
-	for stream.Advance() {
-		batch := stream.Value()
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		batch := rStream.Value()
 
 		for _, change := range batch.Changes {
 			entry, ok := change.Value.(*storage.Entry)
@@ -112,7 +113,7 @@
 			fmt.Printf("watcher: new fortune: %s\n", fortune.Fortune)
 		}
 	}
-	err = stream.Err()
+	err = rStream.Err()
 	if err == nil {
 		err = io.EOF
 	}
@@ -137,8 +138,9 @@
 
 	results := store.BindObject(trPath(path)).Glob(ctx, "*")
 	var names []string
-	for results.Advance() {
-		names = append(names, results.Value())
+	rStream := results.RecvStream()
+	for rStream.Advance() {
+		names = append(names, rStream.Value())
 	}
 	if err := results.Err(); err != nil || len(names) == 0 {
 		return "", err
diff --git a/examples/storage/viewer/value.go b/examples/storage/viewer/value.go
index 279df7a..b82f09e 100644
--- a/examples/storage/viewer/value.go
+++ b/examples/storage/viewer/value.go
@@ -23,10 +23,11 @@
 func glob(st storage.Store, path, pattern string) ([]string, error) {
 	results := st.BindObject(path).Glob(rt.R().TODOContext(), pattern)
 	names := []string{}
-	for results.Advance() {
-		names = append(names, "/"+results.Value())
+	rStream := results.RecvStream()
+	for rStream.Advance() {
+		names = append(names, "/"+rStream.Value())
 	}
-	if err := results.Err(); err != nil {
+	if err := rStream.Err(); err != nil {
 		return nil, err
 	}
 	sort.Strings(names)
diff --git a/examples/tunnel/lib/forward.go b/examples/tunnel/lib/forward.go
index 20343a3..d70c238 100644
--- a/examples/tunnel/lib/forward.go
+++ b/examples/tunnel/lib/forward.go
@@ -17,14 +17,8 @@
 	Err() error
 }
 
-// stream is the interface common to TunnelForwardStream and TunnelServiceForwardStream.
-type stream interface {
-	sender
-	receiver
-}
-
 // Forward forwards data read from net.Conn to a TunnelForwardStream or a TunnelServiceForwardStream.
-func Forward(conn net.Conn, stream stream) error {
+func Forward(conn net.Conn, s sender, r receiver) error {
 	defer conn.Close()
 	// Both conn2stream and stream2conn will write to the channel exactly
 	// once.
@@ -32,8 +26,8 @@
 	// A buffered channel is used to prevent the other write to the channel
 	// from blocking.
 	done := make(chan error, 1)
-	go conn2stream(conn, stream, done)
-	go stream2conn(stream, conn, done)
+	go conn2stream(conn, s, done)
+	go stream2conn(r, conn, done)
 	return <-done
 }
 
diff --git a/examples/tunnel/tunnel.vdl.go b/examples/tunnel/tunnel.vdl.go
index b298887..506c942 100644
--- a/examples/tunnel/tunnel.vdl.go
+++ b/examples/tunnel/tunnel.vdl.go
@@ -50,13 +50,13 @@
 	// the byte stream is forwarded to the requested network address and all the
 	// data received from that network connection is sent back in the reply
 	// stream.
-	Forward(ctx _gen_context.T, network string, address string, opts ..._gen_ipc.CallOpt) (reply TunnelForwardStream, err error)
+	Forward(ctx _gen_context.T, network string, address string, opts ..._gen_ipc.CallOpt) (reply TunnelForwardCall, err error)
 	// The Shell method is used to either run shell commands remotely, or to open
 	// an interactive shell. The data received over the byte stream is sent to the
 	// shell's stdin, and the data received from the shell's stdout and stderr is
 	// sent back in the reply stream. It returns the exit status of the shell
 	// command.
-	Shell(ctx _gen_context.T, command string, shellOpts ShellOpts, opts ..._gen_ipc.CallOpt) (reply TunnelShellStream, err error)
+	Shell(ctx _gen_context.T, command string, shellOpts ShellOpts, opts ..._gen_ipc.CallOpt) (reply TunnelShellCall, err error)
 }
 type Tunnel interface {
 	_gen_ipc.UniversalServiceMethods
@@ -79,54 +79,58 @@
 	Shell(context _gen_ipc.ServerContext, command string, shellOpts ShellOpts, stream TunnelServiceShellStream) (reply int32, err error)
 }
 
-// TunnelForwardStream is the interface for streaming responses of the method
+// TunnelForwardCall is the interface for call object of the method
 // Forward in the service interface Tunnel.
-type TunnelForwardStream interface {
+type TunnelForwardCall interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
-	// Send places the item onto the output stream, blocking if there is no
-	// buffer space available.  Calls to Send after having called CloseSend
-	// or Cancel will fail.  Any blocked Send calls will be unblocked upon
-	// calling Cancel.
-	Send(item []byte) error
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() []byte
 
-	// CloseSend indicates to the server that no more items will be sent;
-	// server Recv calls will receive io.EOF after all sent items.  This is
-	// an optional call - it's used by streaming clients that need the
-	// server to receive the io.EOF terminator before the client calls
-	// Finish (for example, if the client needs to continue receiving items
-	// from the server after having finished sending).
-	// Calls to CloseSend after having called Cancel will fail.
-	// Like Send, CloseSend blocks when there's no buffer space available.
-	CloseSend() error
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
 
-	// Advance stages an element so the client can retrieve it
-	// with Value.  Advance returns true iff there is an
-	// element to retrieve.  The client must call Advance before
-	// calling Value.  The client must call Cancel if it does
-	// not iterate through all elements (i.e. until Advance
-	// returns false).  Advance may block if an element is not
-	// immediately available.
-	Advance() bool
+	// SendStream returns the send portion of the stream
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no
+		// buffer space available.  Calls to Send after having called Close
+		// or Cancel will fail.  Any blocked Send calls will be unblocked upon
+		// calling Cancel.
+		Send(item []byte) error
 
-	// Value returns the element that was staged by Advance.
-	// Value may panic if Advance returned false or was not
-	// called at all.  Value does not block.
-	Value() []byte
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.  This is
+		// an optional call - it's used by streaming clients that need the
+		// server to receive the io.EOF terminator before the client calls
+		// Finish (for example, if the client needs to continue receiving items
+		// from the server after having finished sending).
+		// Calls to Close after having called Cancel will fail.
+		// Like Send, Close blocks when there's no buffer space available.
+		Close() error
+	}
 
-	// Err returns a non-nil error iff the stream encountered
-	// any errors.  Err does not block.
-	Err() error
-
-	// Finish performs the equivalent of CloseSend, then blocks until the server
+	// Finish performs the equivalent of SendStream().Close, then blocks until the server
 	// is done, and returns the positional return values for call.
-	//
 	// If Cancel has been called, Finish will return immediately; the output of
 	// Finish could either be an error signalling cancelation, or the correct
 	// positional return values from the server depending on the timing of the
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (err error)
 
@@ -136,126 +140,149 @@
 	Cancel()
 }
 
-// Implementation of the TunnelForwardStream interface that is not exported.
-type implTunnelForwardStream struct {
+type implTunnelForwardStreamSender struct {
+	clientCall _gen_ipc.Call
+}
+
+func (c *implTunnelForwardStreamSender) Send(item []byte) error {
+	return c.clientCall.Send(item)
+}
+
+func (c *implTunnelForwardStreamSender) Close() error {
+	return c.clientCall.CloseSend()
+}
+
+type implTunnelForwardStreamIterator struct {
 	clientCall _gen_ipc.Call
 	val        []byte
 	err        error
 }
 
-func (c *implTunnelForwardStream) Send(item []byte) error {
-	return c.clientCall.Send(item)
-}
-
-func (c *implTunnelForwardStream) CloseSend() error {
-	return c.clientCall.CloseSend()
-}
-
-func (c *implTunnelForwardStream) Advance() bool {
+func (c *implTunnelForwardStreamIterator) Advance() bool {
 	c.err = c.clientCall.Recv(&c.val)
 	return c.err == nil
 }
 
-func (c *implTunnelForwardStream) Value() []byte {
+func (c *implTunnelForwardStreamIterator) Value() []byte {
 	return c.val
 }
 
-func (c *implTunnelForwardStream) Err() error {
+func (c *implTunnelForwardStreamIterator) Err() error {
 	if c.err == _gen_io.EOF {
 		return nil
 	}
 	return c.err
 }
 
-func (c *implTunnelForwardStream) Finish() (err error) {
+// Implementation of the TunnelForwardCall interface that is not exported.
+type implTunnelForwardCall struct {
+	clientCall  _gen_ipc.Call
+	writeStream implTunnelForwardStreamSender
+	readStream  implTunnelForwardStreamIterator
+}
+
+func (c *implTunnelForwardCall) SendStream() interface {
+	Send(item []byte) error
+	Close() error
+} {
+	return &c.writeStream
+}
+
+func (c *implTunnelForwardCall) RecvStream() interface {
+	Advance() bool
+	Value() []byte
+	Err() error
+} {
+	return &c.readStream
+}
+
+func (c *implTunnelForwardCall) Finish() (err error) {
 	if ierr := c.clientCall.Finish(&err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implTunnelForwardStream) Cancel() {
+func (c *implTunnelForwardCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
-// TunnelServiceForwardStream is the interface for streaming responses of the method
-// Forward in the service interface Tunnel.
-type TunnelServiceForwardStream interface {
-	// Send places the item onto the output stream, blocking if there is no buffer
-	// space available.  If the client has canceled, an error is returned.
-	Send(item []byte) error
-
-	// Advance stages an element so the client can retrieve it
-	// with Value.  Advance returns true iff there is an
-	// element to retrieve.  The client must call Advance before
-	// calling Value.  The client must call Cancel if it does
-	// not iterate through all elements (i.e. until Advance
-	// returns false).  Advance may block if an element is not
-	// immediately available.
-	Advance() bool
-
-	// Value returns the element that was staged by Advance.
-	// Value may panic if Advance returned false or was not
-	// called at all.  Value does not block.
-	//
-	// In general, Value is undefined if the underlying collection
-	// of elements changes while iteration is in progress.  If
-	// <DataProvider> supports concurrent modification, it should
-	// document its behavior.
-	Value() []byte
-
-	// Err returns a non-nil error iff the stream encountered
-	// any errors.  Err does not block.
-	Err() error
+type implTunnelServiceForwardStreamSender struct {
+	serverCall _gen_ipc.ServerCall
 }
 
-// Implementation of the TunnelServiceForwardStream interface that is not exported.
-type implTunnelServiceForwardStream struct {
+func (s *implTunnelServiceForwardStreamSender) Send(item []byte) error {
+	return s.serverCall.Send(item)
+}
+
+type implTunnelServiceForwardStreamIterator struct {
 	serverCall _gen_ipc.ServerCall
 	val        []byte
 	err        error
 }
 
-func (s *implTunnelServiceForwardStream) Send(item []byte) error {
-	return s.serverCall.Send(item)
-}
-
-func (s *implTunnelServiceForwardStream) Advance() bool {
+func (s *implTunnelServiceForwardStreamIterator) Advance() bool {
 	s.err = s.serverCall.Recv(&s.val)
 	return s.err == nil
 }
 
-func (s *implTunnelServiceForwardStream) Value() []byte {
+func (s *implTunnelServiceForwardStreamIterator) Value() []byte {
 	return s.val
 }
 
-func (s *implTunnelServiceForwardStream) Err() error {
+func (s *implTunnelServiceForwardStreamIterator) Err() error {
 	if s.err == _gen_io.EOF {
 		return nil
 	}
 	return s.err
 }
 
-// TunnelShellStream is the interface for streaming responses of the method
-// Shell in the service interface Tunnel.
-type TunnelShellStream interface {
+// TunnelServiceForwardStream is the interface for streaming responses of the method
+// Forward in the service interface Tunnel.
+type TunnelServiceForwardStream interface {
+	// SendStream returns the send portion of the stream.
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no buffer
+		// space available.  If the client has canceled, an error is returned.
+		Send(item []byte) error
+	}
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
-	// Send places the item onto the output stream, blocking if there is no
-	// buffer space available.  Calls to Send after having called CloseSend
-	// or Cancel will fail.  Any blocked Send calls will be unblocked upon
-	// calling Cancel.
-	Send(item ClientShellPacket) error
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() []byte
 
-	// CloseSend indicates to the server that no more items will be sent;
-	// server Recv calls will receive io.EOF after all sent items.  This is
-	// an optional call - it's used by streaming clients that need the
-	// server to receive the io.EOF terminator before the client calls
-	// Finish (for example, if the client needs to continue receiving items
-	// from the server after having finished sending).
-	// Calls to CloseSend after having called Cancel will fail.
-	// Like Send, CloseSend blocks when there's no buffer space available.
-	CloseSend() error
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
+}
 
+// Implementation of the TunnelServiceForwardStream interface that is not exported.
+type implTunnelServiceForwardStream struct {
+	writer implTunnelServiceForwardStreamSender
+	reader implTunnelServiceForwardStreamIterator
+}
+
+func (s *implTunnelServiceForwardStream) SendStream() interface {
+	// Send places the item onto the output stream, blocking if there is no buffer
+	// space available.  If the client has canceled, an error is returned.
+	Send(item []byte) error
+} {
+	return &s.writer
+}
+
+func (s *implTunnelServiceForwardStream) RecvStream() interface {
 	// Advance stages an element so the client can retrieve it
 	// with Value.  Advance returns true iff there is an
 	// element to retrieve.  The client must call Advance before
@@ -268,22 +295,67 @@
 	// Value returns the element that was staged by Advance.
 	// Value may panic if Advance returned false or was not
 	// called at all.  Value does not block.
-	Value() ServerShellPacket
+	Value() []byte
 
 	// Err returns a non-nil error iff the stream encountered
 	// any errors.  Err does not block.
 	Err() error
+} {
+	return &s.reader
+}
 
-	// Finish performs the equivalent of CloseSend, then blocks until the server
+// TunnelShellCall is the interface for call object of the method
+// Shell in the service interface Tunnel.
+type TunnelShellCall interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
+
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() ServerShellPacket
+
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
+
+	// SendStream returns the send portion of the stream
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no
+		// buffer space available.  Calls to Send after having called Close
+		// or Cancel will fail.  Any blocked Send calls will be unblocked upon
+		// calling Cancel.
+		Send(item ClientShellPacket) error
+
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.  This is
+		// an optional call - it's used by streaming clients that need the
+		// server to receive the io.EOF terminator before the client calls
+		// Finish (for example, if the client needs to continue receiving items
+		// from the server after having finished sending).
+		// Calls to Close after having called Cancel will fail.
+		// Like Send, Close blocks when there's no buffer space available.
+		Close() error
+	}
+
+	// Finish performs the equivalent of SendStream().Close, then blocks until the server
 	// is done, and returns the positional return values for call.
-	//
 	// If Cancel has been called, Finish will return immediately; the output of
 	// Finish could either be an error signalling cancelation, or the correct
 	// positional return values from the server depending on the timing of the
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (reply int32, err error)
 
@@ -293,56 +365,150 @@
 	Cancel()
 }
 
-// Implementation of the TunnelShellStream interface that is not exported.
-type implTunnelShellStream struct {
+type implTunnelShellStreamSender struct {
+	clientCall _gen_ipc.Call
+}
+
+func (c *implTunnelShellStreamSender) Send(item ClientShellPacket) error {
+	return c.clientCall.Send(item)
+}
+
+func (c *implTunnelShellStreamSender) Close() error {
+	return c.clientCall.CloseSend()
+}
+
+type implTunnelShellStreamIterator struct {
 	clientCall _gen_ipc.Call
 	val        ServerShellPacket
 	err        error
 }
 
-func (c *implTunnelShellStream) Send(item ClientShellPacket) error {
-	return c.clientCall.Send(item)
-}
-
-func (c *implTunnelShellStream) CloseSend() error {
-	return c.clientCall.CloseSend()
-}
-
-func (c *implTunnelShellStream) Advance() bool {
+func (c *implTunnelShellStreamIterator) Advance() bool {
 	c.val = ServerShellPacket{}
 	c.err = c.clientCall.Recv(&c.val)
 	return c.err == nil
 }
 
-func (c *implTunnelShellStream) Value() ServerShellPacket {
+func (c *implTunnelShellStreamIterator) Value() ServerShellPacket {
 	return c.val
 }
 
-func (c *implTunnelShellStream) Err() error {
+func (c *implTunnelShellStreamIterator) Err() error {
 	if c.err == _gen_io.EOF {
 		return nil
 	}
 	return c.err
 }
 
-func (c *implTunnelShellStream) Finish() (reply int32, err error) {
+// Implementation of the TunnelShellCall interface that is not exported.
+type implTunnelShellCall struct {
+	clientCall  _gen_ipc.Call
+	writeStream implTunnelShellStreamSender
+	readStream  implTunnelShellStreamIterator
+}
+
+func (c *implTunnelShellCall) SendStream() interface {
+	Send(item ClientShellPacket) error
+	Close() error
+} {
+	return &c.writeStream
+}
+
+func (c *implTunnelShellCall) RecvStream() interface {
+	Advance() bool
+	Value() ServerShellPacket
+	Err() error
+} {
+	return &c.readStream
+}
+
+func (c *implTunnelShellCall) Finish() (reply int32, err error) {
 	if ierr := c.clientCall.Finish(&reply, &err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implTunnelShellStream) Cancel() {
+func (c *implTunnelShellCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
+type implTunnelServiceShellStreamSender struct {
+	serverCall _gen_ipc.ServerCall
+}
+
+func (s *implTunnelServiceShellStreamSender) Send(item ServerShellPacket) error {
+	return s.serverCall.Send(item)
+}
+
+type implTunnelServiceShellStreamIterator struct {
+	serverCall _gen_ipc.ServerCall
+	val        ClientShellPacket
+	err        error
+}
+
+func (s *implTunnelServiceShellStreamIterator) Advance() bool {
+	s.err = s.serverCall.Recv(&s.val)
+	return s.err == nil
+}
+
+func (s *implTunnelServiceShellStreamIterator) Value() ClientShellPacket {
+	return s.val
+}
+
+func (s *implTunnelServiceShellStreamIterator) Err() error {
+	if s.err == _gen_io.EOF {
+		return nil
+	}
+	return s.err
+}
+
 // TunnelServiceShellStream is the interface for streaming responses of the method
 // Shell in the service interface Tunnel.
 type TunnelServiceShellStream interface {
+	// SendStream returns the send portion of the stream.
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no buffer
+		// space available.  If the client has canceled, an error is returned.
+		Send(item ServerShellPacket) error
+	}
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
+
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() ClientShellPacket
+
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
+}
+
+// Implementation of the TunnelServiceShellStream interface that is not exported.
+type implTunnelServiceShellStream struct {
+	writer implTunnelServiceShellStreamSender
+	reader implTunnelServiceShellStreamIterator
+}
+
+func (s *implTunnelServiceShellStream) SendStream() interface {
 	// Send places the item onto the output stream, blocking if there is no buffer
 	// space available.  If the client has canceled, an error is returned.
 	Send(item ServerShellPacket) error
+} {
+	return &s.writer
+}
 
+func (s *implTunnelServiceShellStream) RecvStream() interface {
 	// Advance stages an element so the client can retrieve it
 	// with Value.  Advance returns true iff there is an
 	// element to retrieve.  The client must call Advance before
@@ -355,44 +521,13 @@
 	// Value returns the element that was staged by Advance.
 	// Value may panic if Advance returned false or was not
 	// called at all.  Value does not block.
-	//
-	// In general, Value is undefined if the underlying collection
-	// of elements changes while iteration is in progress.  If
-	// <DataProvider> supports concurrent modification, it should
-	// document its behavior.
 	Value() ClientShellPacket
 
 	// Err returns a non-nil error iff the stream encountered
 	// any errors.  Err does not block.
 	Err() error
-}
-
-// Implementation of the TunnelServiceShellStream interface that is not exported.
-type implTunnelServiceShellStream struct {
-	serverCall _gen_ipc.ServerCall
-	val        ClientShellPacket
-	err        error
-}
-
-func (s *implTunnelServiceShellStream) Send(item ServerShellPacket) error {
-	return s.serverCall.Send(item)
-}
-
-func (s *implTunnelServiceShellStream) Advance() bool {
-	s.val = ClientShellPacket{}
-	s.err = s.serverCall.Recv(&s.val)
-	return s.err == nil
-}
-
-func (s *implTunnelServiceShellStream) Value() ClientShellPacket {
-	return s.val
-}
-
-func (s *implTunnelServiceShellStream) Err() error {
-	if s.err == _gen_io.EOF {
-		return nil
-	}
-	return s.err
+} {
+	return &s.reader
 }
 
 // BindTunnel returns the client stub implementing the Tunnel
@@ -436,21 +571,21 @@
 	name   string
 }
 
-func (__gen_c *clientStubTunnel) Forward(ctx _gen_context.T, network string, address string, opts ..._gen_ipc.CallOpt) (reply TunnelForwardStream, err error) {
+func (__gen_c *clientStubTunnel) Forward(ctx _gen_context.T, network string, address string, opts ..._gen_ipc.CallOpt) (reply TunnelForwardCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "Forward", []interface{}{network, address}, opts...); err != nil {
 		return
 	}
-	reply = &implTunnelForwardStream{clientCall: call}
+	reply = &implTunnelForwardCall{clientCall: call, writeStream: implTunnelForwardStreamSender{clientCall: call}, readStream: implTunnelForwardStreamIterator{clientCall: call}}
 	return
 }
 
-func (__gen_c *clientStubTunnel) Shell(ctx _gen_context.T, command string, shellOpts ShellOpts, opts ..._gen_ipc.CallOpt) (reply TunnelShellStream, err error) {
+func (__gen_c *clientStubTunnel) Shell(ctx _gen_context.T, command string, shellOpts ShellOpts, opts ..._gen_ipc.CallOpt) (reply TunnelShellCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "Shell", []interface{}{command, shellOpts}, opts...); err != nil {
 		return
 	}
-	reply = &implTunnelShellStream{clientCall: call}
+	reply = &implTunnelShellCall{clientCall: call, writeStream: implTunnelShellStreamSender{clientCall: call}, readStream: implTunnelShellStreamIterator{clientCall: call}}
 	return
 }
 
@@ -580,13 +715,13 @@
 }
 
 func (__gen_s *ServerStubTunnel) Forward(call _gen_ipc.ServerCall, network string, address string) (err error) {
-	stream := &implTunnelServiceForwardStream{serverCall: call}
+	stream := &implTunnelServiceForwardStream{reader: implTunnelServiceForwardStreamIterator{serverCall: call}, writer: implTunnelServiceForwardStreamSender{serverCall: call}}
 	err = __gen_s.service.Forward(call, network, address, stream)
 	return
 }
 
 func (__gen_s *ServerStubTunnel) Shell(call _gen_ipc.ServerCall, command string, shellOpts ShellOpts) (reply int32, err error) {
-	stream := &implTunnelServiceShellStream{serverCall: call}
+	stream := &implTunnelServiceShellStream{reader: implTunnelServiceShellStreamIterator{serverCall: call}, writer: implTunnelServiceShellStreamSender{serverCall: call}}
 	reply, err = __gen_s.service.Shell(call, command, shellOpts, stream)
 	return
 }
diff --git a/examples/tunnel/tunneld/impl/impl.go b/examples/tunnel/tunneld/impl/impl.go
index 9458947..80aec35 100644
--- a/examples/tunnel/tunneld/impl/impl.go
+++ b/examples/tunnel/tunneld/impl/impl.go
@@ -29,7 +29,7 @@
 	}
 	name := fmt.Sprintf("RemoteID:%v LocalAddr:%v RemoteAddr:%v", ctx.RemoteID(), conn.LocalAddr(), conn.RemoteAddr())
 	vlog.Infof("TUNNEL START: %v", name)
-	err = lib.Forward(conn, stream)
+	err = lib.Forward(conn, stream.SendStream(), stream.RecvStream())
 	vlog.Infof("TUNNEL END  : %v (%v)", name, err)
 	return err
 }
@@ -149,7 +149,7 @@
 		return
 	}
 	packet := tunnel.ServerShellPacket{Stdout: []byte(data)}
-	if err = s.Send(packet); err != nil {
+	if err = s.SendStream().Send(packet); err != nil {
 		vlog.Infof("Send failed: %v", err)
 	}
 }
diff --git a/examples/tunnel/tunneld/impl/iomanager.go b/examples/tunnel/tunneld/impl/iomanager.go
index 525e69d..265cc3b 100644
--- a/examples/tunnel/tunneld/impl/iomanager.go
+++ b/examples/tunnel/tunneld/impl/iomanager.go
@@ -56,8 +56,9 @@
 
 // chan2stream receives ServerShellPacket from outchan and sends it to stream.
 func (m *ioManager) chan2stream() {
+	sender := m.stream.SendStream()
 	for packet := range m.outchan {
-		if err := m.stream.Send(packet); err != nil {
+		if err := sender.Send(packet); err != nil {
 			m.done <- err
 			return
 		}
@@ -108,8 +109,9 @@
 
 // stream2stdin reads data from the stream and sends it to the shell's stdin.
 func (m *ioManager) stream2stdin() {
-	for m.stream.Advance() {
-		packet := m.stream.Value()
+	rStream := m.stream.RecvStream()
+	for rStream.Advance() {
+		packet := rStream.Value()
 		if len(packet.Stdin) > 0 {
 			if n, err := m.stdin.Write(packet.Stdin); n != len(packet.Stdin) || err != nil {
 				m.done <- fmt.Errorf("stdin.Write returned (%d, %v) want (%d, nil)", n, err, len(packet.Stdin))
@@ -121,7 +123,7 @@
 		}
 	}
 
-	err := m.stream.Err()
+	err := rStream.Err()
 	if err == nil {
 		err = io.EOF
 	}
diff --git a/examples/tunnel/vsh/iomanager.go b/examples/tunnel/vsh/iomanager.go
index 2b28b1c..3389809 100644
--- a/examples/tunnel/vsh/iomanager.go
+++ b/examples/tunnel/vsh/iomanager.go
@@ -12,7 +12,7 @@
 	"veyron2/vlog"
 )
 
-func runIOManager(stdin io.Reader, stdout, stderr io.Writer, stream tunnel.TunnelShellStream) error {
+func runIOManager(stdin io.Reader, stdout, stderr io.Writer, stream tunnel.TunnelShellCall) error {
 	m := ioManager{stdin: stdin, stdout: stdout, stderr: stderr, stream: stream}
 	return m.run()
 }
@@ -22,7 +22,7 @@
 type ioManager struct {
 	stdin          io.Reader
 	stdout, stderr io.Writer
-	stream         tunnel.TunnelShellStream
+	stream         tunnel.TunnelShellCall
 
 	// done receives any error from chan2stream, user2outchan, or
 	// stream2user.
@@ -58,8 +58,9 @@
 
 // chan2stream receives ClientShellPacket from outchan and sends it to stream.
 func (m *ioManager) chan2stream() {
+	sender := m.stream.SendStream()
 	for packet := range m.outchan {
-		if err := m.stream.Send(packet); err != nil {
+		if err := sender.Send(packet); err != nil {
 			m.done <- err
 			return
 		}
@@ -107,8 +108,9 @@
 
 // stream2user reads data from the stream and sends it to either stdout or stderr.
 func (m *ioManager) stream2user() {
-	for m.stream.Advance() {
-		packet := m.stream.Value()
+	rStream := m.stream.RecvStream()
+	for rStream.Advance() {
+		packet := rStream.Value()
 
 		if len(packet.Stdout) > 0 {
 			if n, err := m.stdout.Write(packet.Stdout); n != len(packet.Stdout) || err != nil {
@@ -123,7 +125,7 @@
 			}
 		}
 	}
-	err := m.stream.Err()
+	err := rStream.Err()
 	if err == nil {
 		err = io.EOF
 	}
diff --git a/examples/tunnel/vsh/main.go b/examples/tunnel/vsh/main.go
index 01f31ef..a460012 100644
--- a/examples/tunnel/vsh/main.go
+++ b/examples/tunnel/vsh/main.go
@@ -199,7 +199,7 @@
 		name := fmt.Sprintf("%v-->%v-->(%v)-->%v", conn.RemoteAddr(), conn.LocalAddr(), oname, raddr)
 		go func() {
 			vlog.VI(1).Infof("TUNNEL START: %v", name)
-			errf := lib.Forward(conn, stream)
+			errf := lib.Forward(conn, stream.SendStream(), stream.RecvStream())
 			err := stream.Finish()
 			vlog.VI(1).Infof("TUNNEL END  : %v (%v, %v)", name, errf, err)
 		}()
diff --git a/examples/wspr_sample/cache.vdl.go b/examples/wspr_sample/cache.vdl.go
index e965426..fc727a7 100644
--- a/examples/wspr_sample/cache.vdl.go
+++ b/examples/wspr_sample/cache.vdl.go
@@ -68,7 +68,7 @@
 	// Size returns the total number of entries in the cache.
 	Size(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply int64, err error)
 	// MultiGet sets up a stream that allows fetching multiple keys.
-	MultiGet(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply CacheMultiGetStream, err error)
+	MultiGet(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply CacheMultiGetCall, err error)
 }
 type Cache interface {
 	_gen_ipc.UniversalServiceMethods
@@ -120,54 +120,58 @@
 	MultiGet(context _gen_ipc.ServerContext, stream CacheServiceMultiGetStream) (err error)
 }
 
-// CacheMultiGetStream is the interface for streaming responses of the method
+// CacheMultiGetCall is the interface for call object of the method
 // MultiGet in the service interface Cache.
-type CacheMultiGetStream interface {
+type CacheMultiGetCall interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
-	// Send places the item onto the output stream, blocking if there is no
-	// buffer space available.  Calls to Send after having called CloseSend
-	// or Cancel will fail.  Any blocked Send calls will be unblocked upon
-	// calling Cancel.
-	Send(item string) error
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() _gen_vdlutil.Any
 
-	// CloseSend indicates to the server that no more items will be sent;
-	// server Recv calls will receive io.EOF after all sent items.  This is
-	// an optional call - it's used by streaming clients that need the
-	// server to receive the io.EOF terminator before the client calls
-	// Finish (for example, if the client needs to continue receiving items
-	// from the server after having finished sending).
-	// Calls to CloseSend after having called Cancel will fail.
-	// Like Send, CloseSend blocks when there's no buffer space available.
-	CloseSend() error
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
 
-	// Advance stages an element so the client can retrieve it
-	// with Value.  Advance returns true iff there is an
-	// element to retrieve.  The client must call Advance before
-	// calling Value.  The client must call Cancel if it does
-	// not iterate through all elements (i.e. until Advance
-	// returns false).  Advance may block if an element is not
-	// immediately available.
-	Advance() bool
+	// SendStream returns the send portion of the stream
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no
+		// buffer space available.  Calls to Send after having called Close
+		// or Cancel will fail.  Any blocked Send calls will be unblocked upon
+		// calling Cancel.
+		Send(item string) error
 
-	// Value returns the element that was staged by Advance.
-	// Value may panic if Advance returned false or was not
-	// called at all.  Value does not block.
-	Value() _gen_vdlutil.Any
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.  This is
+		// an optional call - it's used by streaming clients that need the
+		// server to receive the io.EOF terminator before the client calls
+		// Finish (for example, if the client needs to continue receiving items
+		// from the server after having finished sending).
+		// Calls to Close after having called Cancel will fail.
+		// Like Send, Close blocks when there's no buffer space available.
+		Close() error
+	}
 
-	// Err returns a non-nil error iff the stream encountered
-	// any errors.  Err does not block.
-	Err() error
-
-	// Finish performs the equivalent of CloseSend, then blocks until the server
+	// Finish performs the equivalent of SendStream().Close, then blocks until the server
 	// is done, and returns the positional return values for call.
-	//
 	// If Cancel has been called, Finish will return immediately; the output of
 	// Finish could either be an error signalling cancelation, or the correct
 	// positional return values from the server depending on the timing of the
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (err error)
 
@@ -177,56 +181,150 @@
 	Cancel()
 }
 
-// Implementation of the CacheMultiGetStream interface that is not exported.
-type implCacheMultiGetStream struct {
+type implCacheMultiGetStreamSender struct {
+	clientCall _gen_ipc.Call
+}
+
+func (c *implCacheMultiGetStreamSender) Send(item string) error {
+	return c.clientCall.Send(item)
+}
+
+func (c *implCacheMultiGetStreamSender) Close() error {
+	return c.clientCall.CloseSend()
+}
+
+type implCacheMultiGetStreamIterator struct {
 	clientCall _gen_ipc.Call
 	val        _gen_vdlutil.Any
 	err        error
 }
 
-func (c *implCacheMultiGetStream) Send(item string) error {
-	return c.clientCall.Send(item)
-}
-
-func (c *implCacheMultiGetStream) CloseSend() error {
-	return c.clientCall.CloseSend()
-}
-
-func (c *implCacheMultiGetStream) Advance() bool {
+func (c *implCacheMultiGetStreamIterator) Advance() bool {
 	c.val = nil
 	c.err = c.clientCall.Recv(&c.val)
 	return c.err == nil
 }
 
-func (c *implCacheMultiGetStream) Value() _gen_vdlutil.Any {
+func (c *implCacheMultiGetStreamIterator) Value() _gen_vdlutil.Any {
 	return c.val
 }
 
-func (c *implCacheMultiGetStream) Err() error {
+func (c *implCacheMultiGetStreamIterator) Err() error {
 	if c.err == _gen_io.EOF {
 		return nil
 	}
 	return c.err
 }
 
-func (c *implCacheMultiGetStream) Finish() (err error) {
+// Implementation of the CacheMultiGetCall interface that is not exported.
+type implCacheMultiGetCall struct {
+	clientCall  _gen_ipc.Call
+	writeStream implCacheMultiGetStreamSender
+	readStream  implCacheMultiGetStreamIterator
+}
+
+func (c *implCacheMultiGetCall) SendStream() interface {
+	Send(item string) error
+	Close() error
+} {
+	return &c.writeStream
+}
+
+func (c *implCacheMultiGetCall) RecvStream() interface {
+	Advance() bool
+	Value() _gen_vdlutil.Any
+	Err() error
+} {
+	return &c.readStream
+}
+
+func (c *implCacheMultiGetCall) Finish() (err error) {
 	if ierr := c.clientCall.Finish(&err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implCacheMultiGetStream) Cancel() {
+func (c *implCacheMultiGetCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
+type implCacheServiceMultiGetStreamSender struct {
+	serverCall _gen_ipc.ServerCall
+}
+
+func (s *implCacheServiceMultiGetStreamSender) Send(item _gen_vdlutil.Any) error {
+	return s.serverCall.Send(item)
+}
+
+type implCacheServiceMultiGetStreamIterator struct {
+	serverCall _gen_ipc.ServerCall
+	val        string
+	err        error
+}
+
+func (s *implCacheServiceMultiGetStreamIterator) Advance() bool {
+	s.err = s.serverCall.Recv(&s.val)
+	return s.err == nil
+}
+
+func (s *implCacheServiceMultiGetStreamIterator) Value() string {
+	return s.val
+}
+
+func (s *implCacheServiceMultiGetStreamIterator) Err() error {
+	if s.err == _gen_io.EOF {
+		return nil
+	}
+	return s.err
+}
+
 // CacheServiceMultiGetStream is the interface for streaming responses of the method
 // MultiGet in the service interface Cache.
 type CacheServiceMultiGetStream interface {
+	// SendStream returns the send portion of the stream.
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no buffer
+		// space available.  If the client has canceled, an error is returned.
+		Send(item _gen_vdlutil.Any) error
+	}
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
+
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() string
+
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
+}
+
+// Implementation of the CacheServiceMultiGetStream interface that is not exported.
+type implCacheServiceMultiGetStream struct {
+	writer implCacheServiceMultiGetStreamSender
+	reader implCacheServiceMultiGetStreamIterator
+}
+
+func (s *implCacheServiceMultiGetStream) SendStream() interface {
 	// Send places the item onto the output stream, blocking if there is no buffer
 	// space available.  If the client has canceled, an error is returned.
 	Send(item _gen_vdlutil.Any) error
+} {
+	return &s.writer
+}
 
+func (s *implCacheServiceMultiGetStream) RecvStream() interface {
 	// Advance stages an element so the client can retrieve it
 	// with Value.  Advance returns true iff there is an
 	// element to retrieve.  The client must call Advance before
@@ -239,43 +337,13 @@
 	// Value returns the element that was staged by Advance.
 	// Value may panic if Advance returned false or was not
 	// called at all.  Value does not block.
-	//
-	// In general, Value is undefined if the underlying collection
-	// of elements changes while iteration is in progress.  If
-	// <DataProvider> supports concurrent modification, it should
-	// document its behavior.
 	Value() string
 
 	// Err returns a non-nil error iff the stream encountered
 	// any errors.  Err does not block.
 	Err() error
-}
-
-// Implementation of the CacheServiceMultiGetStream interface that is not exported.
-type implCacheServiceMultiGetStream struct {
-	serverCall _gen_ipc.ServerCall
-	val        string
-	err        error
-}
-
-func (s *implCacheServiceMultiGetStream) Send(item _gen_vdlutil.Any) error {
-	return s.serverCall.Send(item)
-}
-
-func (s *implCacheServiceMultiGetStream) Advance() bool {
-	s.err = s.serverCall.Recv(&s.val)
-	return s.err == nil
-}
-
-func (s *implCacheServiceMultiGetStream) Value() string {
-	return s.val
-}
-
-func (s *implCacheServiceMultiGetStream) Err() error {
-	if s.err == _gen_io.EOF {
-		return nil
-	}
-	return s.err
+} {
+	return &s.reader
 }
 
 // BindCache returns the client stub implementing the Cache
@@ -506,12 +574,12 @@
 	return
 }
 
-func (__gen_c *clientStubCache) MultiGet(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply CacheMultiGetStream, err error) {
+func (__gen_c *clientStubCache) MultiGet(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply CacheMultiGetCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "MultiGet", nil, opts...); err != nil {
 		return
 	}
-	reply = &implCacheMultiGetStream{clientCall: call}
+	reply = &implCacheMultiGetCall{clientCall: call, writeStream: implCacheMultiGetStreamSender{clientCall: call}, readStream: implCacheMultiGetStreamIterator{clientCall: call}}
 	return
 }
 
@@ -874,7 +942,7 @@
 }
 
 func (__gen_s *ServerStubCache) MultiGet(call _gen_ipc.ServerCall) (err error) {
-	stream := &implCacheServiceMultiGetStream{serverCall: call}
+	stream := &implCacheServiceMultiGetStream{reader: implCacheServiceMultiGetStreamIterator{serverCall: call}, writer: implCacheServiceMultiGetStreamSender{serverCall: call}}
 	err = __gen_s.service.MultiGet(call, stream)
 	return
 }
diff --git a/examples/wspr_sample/sampled/lib/cache_impl.go b/examples/wspr_sample/sampled/lib/cache_impl.go
index 878c6af..41df38d 100644
--- a/examples/wspr_sample/sampled/lib/cache_impl.go
+++ b/examples/wspr_sample/sampled/lib/cache_impl.go
@@ -181,14 +181,16 @@
 // keys in the stream is not in the map or if there was an issue reading
 // the stream.
 func (c *cacheImpl) MultiGet(_ ipc.ServerContext, stream sample.CacheServiceMultiGetStream) error {
-	for stream.Advance() {
-		key := stream.Value()
+	rStream := stream.RecvStream()
+	sender := stream.SendStream()
+	for rStream.Advance() {
+		key := rStream.Value()
 
 		value, ok := c.cache[key]
 		if !ok {
 			return fmt.Errorf("key not found: %v", key)
 		}
-		stream.Send(value)
+		sender.Send(value)
 	}
-	return stream.Err()
+	return rStream.Err()
 }
diff --git a/examples/wspr_sample/sampled/lib/sampled_test.go b/examples/wspr_sample/sampled/lib/sampled_test.go
index 715c6e0..ab531ff 100644
--- a/examples/wspr_sample/sampled/lib/sampled_test.go
+++ b/examples/wspr_sample/sampled/lib/sampled_test.go
@@ -284,33 +284,37 @@
 	if err != nil {
 		t.Fatal("error calling MultiGet: ", err)
 	}
-	stream.Send("A")
-	stream.Send("C")
-	stream.Send("E")
 
-	if stream.Advance() {
-		if stream.Value() != "A" {
+	sender := stream.SendStream()
+
+	sender.Send("A")
+	sender.Send("C")
+	sender.Send("E")
+
+	rStream := stream.RecvStream()
+	if rStream.Advance() {
+		if rStream.Value() != "A" {
 			t.Errorf("value for 'A' didn't match")
 		}
 	} else {
-		t.Fatal("error on advance: %v", stream.Err())
+		t.Fatal("error on advance: %v", rStream.Err())
 	}
 
-	if stream.Advance() {
-		if stream.Value() != uint32(7) {
+	if rStream.Advance() {
+		if rStream.Value() != uint32(7) {
 			t.Errorf("value for 'C' didn't match")
 		}
 	} else {
-		t.Fatal("error on advance: %v", stream.Err())
+		t.Fatal("error on advance: %v", rStream.Err())
 	}
 
-	if stream.Advance() {
-		if stream.Value() != true {
+	if rStream.Advance() {
+		if rStream.Value() != true {
 			t.Errorf("value for 'E' didn't match")
 		}
 	} else {
-		t.Fatal("error on advance: %v", stream.Err())
+		t.Fatal("error on advance: %v", rStream.Err())
 	}
 
-	stream.CloseSend()
+	sender.Close()
 }
diff --git a/lib/signals/signals_test.go b/lib/signals/signals_test.go
index 4600c32..5b69816 100644
--- a/lib/signals/signals_test.go
+++ b/lib/signals/signals_test.go
@@ -325,8 +325,9 @@
 	if err != nil {
 		t.Fatalf("Got error: %v", err)
 	}
-	if stream.Advance() || stream.Err() != nil {
-		t.Errorf("Expected EOF, got (%v, %v) instead: ", stream.Value(), stream.Err())
+	rStream := stream.RecvStream()
+	if rStream.Advance() || rStream.Err() != nil {
+		t.Errorf("Expected EOF, got (%v, %v) instead: ", rStream.Value(), rStream.Err())
 	}
 	if err := stream.Finish(); err != nil {
 		t.Fatalf("Got error: %v", err)
diff --git a/lib/testutil/modules/ls.go b/lib/testutil/modules/ls.go
index fa6f05e..3ea2dd3 100644
--- a/lib/testutil/modules/ls.go
+++ b/lib/testutil/modules/ls.go
@@ -103,12 +103,13 @@
 		return []string{}, err
 	}
 	var reply []string
-	for stream.Advance() {
-		e := stream.Value()
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		e := rStream.Value()
 		reply = append(reply, fmt.Sprintf("%q", e.Name))
 	}
 
-	return reply, stream.Err()
+	return reply, rStream.Err()
 }
 
 func lsUsingResolveToMountTable(name, pattern string) ([]string, error) {
diff --git a/runtimes/google/ipc/benchmarks/client.go b/runtimes/google/ipc/benchmarks/client.go
index 6d8bfc6..218d125 100644
--- a/runtimes/google/ipc/benchmarks/client.go
+++ b/runtimes/google/ipc/benchmarks/client.go
@@ -62,8 +62,9 @@
 		}
 		done := make(chan error, 1)
 		go func() {
-			for stream.Advance() {
-				chunk := stream.Value()
+			rStream := stream.RecvStream()
+			for rStream.Advance() {
+				chunk := rStream.Value()
 				if err == io.EOF {
 					done <- nil
 					return
@@ -78,15 +79,16 @@
 				}
 			}
 
-			done <- stream.Err()
+			done <- rStream.Err()
 		}()
+		sender := stream.SendStream()
 		for j := 0; j < messageCount; j++ {
-			if err = stream.Send(payload); err != nil {
+			if err = sender.Send(payload); err != nil {
 				vlog.Fatalf("Send failed: %v", err)
 			}
 		}
-		if err = stream.CloseSend(); err != nil {
-			vlog.Fatalf("CloseSend() failed: %v", err)
+		if err = sender.Close(); err != nil {
+			vlog.Fatalf("Close() failed: %v", err)
 		}
 		if err = <-done; err != nil {
 			vlog.Fatalf("%v", err)
diff --git a/runtimes/google/ipc/benchmarks/server.go b/runtimes/google/ipc/benchmarks/server.go
index 0d1c95e..d1ebf0a 100644
--- a/runtimes/google/ipc/benchmarks/server.go
+++ b/runtimes/google/ipc/benchmarks/server.go
@@ -17,14 +17,16 @@
 }
 
 func (i *impl) EchoStream(ctx ipc.ServerContext, stream BenchmarkServiceEchoStreamStream) error {
-	for stream.Advance() {
-		chunk := stream.Value()
-		if err := stream.Send(chunk); err != nil {
+	rStream := stream.RecvStream()
+	sender := stream.SendStream()
+	for rStream.Advance() {
+		chunk := rStream.Value()
+		if err := sender.Send(chunk); err != nil {
 			return err
 		}
 	}
 
-	return stream.Err()
+	return rStream.Err()
 }
 
 // StartServer starts a server that implements the Benchmark service. The
diff --git a/runtimes/google/ipc/benchmarks/service.vdl.go b/runtimes/google/ipc/benchmarks/service.vdl.go
index eedda9a..6abe8eb 100644
--- a/runtimes/google/ipc/benchmarks/service.vdl.go
+++ b/runtimes/google/ipc/benchmarks/service.vdl.go
@@ -27,7 +27,7 @@
 	// Echo returns the payload that it receives.
 	Echo(ctx _gen_context.T, Payload []byte, opts ..._gen_ipc.CallOpt) (reply []byte, err error)
 	// EchoStream returns the payload that it receives via the stream.
-	EchoStream(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply BenchmarkEchoStreamStream, err error)
+	EchoStream(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply BenchmarkEchoStreamCall, err error)
 }
 type Benchmark interface {
 	_gen_ipc.UniversalServiceMethods
@@ -43,54 +43,58 @@
 	EchoStream(context _gen_ipc.ServerContext, stream BenchmarkServiceEchoStreamStream) (err error)
 }
 
-// BenchmarkEchoStreamStream is the interface for streaming responses of the method
+// BenchmarkEchoStreamCall is the interface for call object of the method
 // EchoStream in the service interface Benchmark.
-type BenchmarkEchoStreamStream interface {
+type BenchmarkEchoStreamCall interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
-	// Send places the item onto the output stream, blocking if there is no
-	// buffer space available.  Calls to Send after having called CloseSend
-	// or Cancel will fail.  Any blocked Send calls will be unblocked upon
-	// calling Cancel.
-	Send(item []byte) error
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() []byte
 
-	// CloseSend indicates to the server that no more items will be sent;
-	// server Recv calls will receive io.EOF after all sent items.  This is
-	// an optional call - it's used by streaming clients that need the
-	// server to receive the io.EOF terminator before the client calls
-	// Finish (for example, if the client needs to continue receiving items
-	// from the server after having finished sending).
-	// Calls to CloseSend after having called Cancel will fail.
-	// Like Send, CloseSend blocks when there's no buffer space available.
-	CloseSend() error
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
 
-	// Advance stages an element so the client can retrieve it
-	// with Value.  Advance returns true iff there is an
-	// element to retrieve.  The client must call Advance before
-	// calling Value.  The client must call Cancel if it does
-	// not iterate through all elements (i.e. until Advance
-	// returns false).  Advance may block if an element is not
-	// immediately available.
-	Advance() bool
+	// SendStream returns the send portion of the stream
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no
+		// buffer space available.  Calls to Send after having called Close
+		// or Cancel will fail.  Any blocked Send calls will be unblocked upon
+		// calling Cancel.
+		Send(item []byte) error
 
-	// Value returns the element that was staged by Advance.
-	// Value may panic if Advance returned false or was not
-	// called at all.  Value does not block.
-	Value() []byte
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.  This is
+		// an optional call - it's used by streaming clients that need the
+		// server to receive the io.EOF terminator before the client calls
+		// Finish (for example, if the client needs to continue receiving items
+		// from the server after having finished sending).
+		// Calls to Close after having called Cancel will fail.
+		// Like Send, Close blocks when there's no buffer space available.
+		Close() error
+	}
 
-	// Err returns a non-nil error iff the stream encountered
-	// any errors.  Err does not block.
-	Err() error
-
-	// Finish performs the equivalent of CloseSend, then blocks until the server
+	// Finish performs the equivalent of SendStream().Close, then blocks until the server
 	// is done, and returns the positional return values for call.
-	//
 	// If Cancel has been called, Finish will return immediately; the output of
 	// Finish could either be an error signalling cancelation, or the correct
 	// positional return values from the server depending on the timing of the
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (err error)
 
@@ -100,55 +104,149 @@
 	Cancel()
 }
 
-// Implementation of the BenchmarkEchoStreamStream interface that is not exported.
-type implBenchmarkEchoStreamStream struct {
+type implBenchmarkEchoStreamStreamSender struct {
+	clientCall _gen_ipc.Call
+}
+
+func (c *implBenchmarkEchoStreamStreamSender) Send(item []byte) error {
+	return c.clientCall.Send(item)
+}
+
+func (c *implBenchmarkEchoStreamStreamSender) Close() error {
+	return c.clientCall.CloseSend()
+}
+
+type implBenchmarkEchoStreamStreamIterator struct {
 	clientCall _gen_ipc.Call
 	val        []byte
 	err        error
 }
 
-func (c *implBenchmarkEchoStreamStream) Send(item []byte) error {
-	return c.clientCall.Send(item)
-}
-
-func (c *implBenchmarkEchoStreamStream) CloseSend() error {
-	return c.clientCall.CloseSend()
-}
-
-func (c *implBenchmarkEchoStreamStream) Advance() bool {
+func (c *implBenchmarkEchoStreamStreamIterator) Advance() bool {
 	c.err = c.clientCall.Recv(&c.val)
 	return c.err == nil
 }
 
-func (c *implBenchmarkEchoStreamStream) Value() []byte {
+func (c *implBenchmarkEchoStreamStreamIterator) Value() []byte {
 	return c.val
 }
 
-func (c *implBenchmarkEchoStreamStream) Err() error {
+func (c *implBenchmarkEchoStreamStreamIterator) Err() error {
 	if c.err == _gen_io.EOF {
 		return nil
 	}
 	return c.err
 }
 
-func (c *implBenchmarkEchoStreamStream) Finish() (err error) {
+// Implementation of the BenchmarkEchoStreamCall interface that is not exported.
+type implBenchmarkEchoStreamCall struct {
+	clientCall  _gen_ipc.Call
+	writeStream implBenchmarkEchoStreamStreamSender
+	readStream  implBenchmarkEchoStreamStreamIterator
+}
+
+func (c *implBenchmarkEchoStreamCall) SendStream() interface {
+	Send(item []byte) error
+	Close() error
+} {
+	return &c.writeStream
+}
+
+func (c *implBenchmarkEchoStreamCall) RecvStream() interface {
+	Advance() bool
+	Value() []byte
+	Err() error
+} {
+	return &c.readStream
+}
+
+func (c *implBenchmarkEchoStreamCall) Finish() (err error) {
 	if ierr := c.clientCall.Finish(&err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implBenchmarkEchoStreamStream) Cancel() {
+func (c *implBenchmarkEchoStreamCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
+type implBenchmarkServiceEchoStreamStreamSender struct {
+	serverCall _gen_ipc.ServerCall
+}
+
+func (s *implBenchmarkServiceEchoStreamStreamSender) Send(item []byte) error {
+	return s.serverCall.Send(item)
+}
+
+type implBenchmarkServiceEchoStreamStreamIterator struct {
+	serverCall _gen_ipc.ServerCall
+	val        []byte
+	err        error
+}
+
+func (s *implBenchmarkServiceEchoStreamStreamIterator) Advance() bool {
+	s.err = s.serverCall.Recv(&s.val)
+	return s.err == nil
+}
+
+func (s *implBenchmarkServiceEchoStreamStreamIterator) Value() []byte {
+	return s.val
+}
+
+func (s *implBenchmarkServiceEchoStreamStreamIterator) Err() error {
+	if s.err == _gen_io.EOF {
+		return nil
+	}
+	return s.err
+}
+
 // BenchmarkServiceEchoStreamStream is the interface for streaming responses of the method
 // EchoStream in the service interface Benchmark.
 type BenchmarkServiceEchoStreamStream interface {
+	// SendStream returns the send portion of the stream.
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no buffer
+		// space available.  If the client has canceled, an error is returned.
+		Send(item []byte) error
+	}
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
+
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() []byte
+
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
+}
+
+// Implementation of the BenchmarkServiceEchoStreamStream interface that is not exported.
+type implBenchmarkServiceEchoStreamStream struct {
+	writer implBenchmarkServiceEchoStreamStreamSender
+	reader implBenchmarkServiceEchoStreamStreamIterator
+}
+
+func (s *implBenchmarkServiceEchoStreamStream) SendStream() interface {
 	// Send places the item onto the output stream, blocking if there is no buffer
 	// space available.  If the client has canceled, an error is returned.
 	Send(item []byte) error
+} {
+	return &s.writer
+}
 
+func (s *implBenchmarkServiceEchoStreamStream) RecvStream() interface {
 	// Advance stages an element so the client can retrieve it
 	// with Value.  Advance returns true iff there is an
 	// element to retrieve.  The client must call Advance before
@@ -161,43 +259,13 @@
 	// Value returns the element that was staged by Advance.
 	// Value may panic if Advance returned false or was not
 	// called at all.  Value does not block.
-	//
-	// In general, Value is undefined if the underlying collection
-	// of elements changes while iteration is in progress.  If
-	// <DataProvider> supports concurrent modification, it should
-	// document its behavior.
 	Value() []byte
 
 	// Err returns a non-nil error iff the stream encountered
 	// any errors.  Err does not block.
 	Err() error
-}
-
-// Implementation of the BenchmarkServiceEchoStreamStream interface that is not exported.
-type implBenchmarkServiceEchoStreamStream struct {
-	serverCall _gen_ipc.ServerCall
-	val        []byte
-	err        error
-}
-
-func (s *implBenchmarkServiceEchoStreamStream) Send(item []byte) error {
-	return s.serverCall.Send(item)
-}
-
-func (s *implBenchmarkServiceEchoStreamStream) Advance() bool {
-	s.err = s.serverCall.Recv(&s.val)
-	return s.err == nil
-}
-
-func (s *implBenchmarkServiceEchoStreamStream) Value() []byte {
-	return s.val
-}
-
-func (s *implBenchmarkServiceEchoStreamStream) Err() error {
-	if s.err == _gen_io.EOF {
-		return nil
-	}
-	return s.err
+} {
+	return &s.reader
 }
 
 // BindBenchmark returns the client stub implementing the Benchmark
@@ -252,12 +320,12 @@
 	return
 }
 
-func (__gen_c *clientStubBenchmark) EchoStream(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply BenchmarkEchoStreamStream, err error) {
+func (__gen_c *clientStubBenchmark) EchoStream(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply BenchmarkEchoStreamCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "EchoStream", nil, opts...); err != nil {
 		return
 	}
-	reply = &implBenchmarkEchoStreamStream{clientCall: call}
+	reply = &implBenchmarkEchoStreamCall{clientCall: call, writeStream: implBenchmarkEchoStreamStreamSender{clientCall: call}, readStream: implBenchmarkEchoStreamStreamIterator{clientCall: call}}
 	return
 }
 
@@ -365,7 +433,7 @@
 }
 
 func (__gen_s *ServerStubBenchmark) EchoStream(call _gen_ipc.ServerCall) (err error) {
-	stream := &implBenchmarkServiceEchoStreamStream{serverCall: call}
+	stream := &implBenchmarkServiceEchoStreamStream{reader: implBenchmarkServiceEchoStreamStreamIterator{serverCall: call}, writer: implBenchmarkServiceEchoStreamStreamSender{serverCall: call}}
 	err = __gen_s.service.EchoStream(call, stream)
 	return
 }
diff --git a/runtimes/google/ipc/jni/arg_getter.go b/runtimes/google/ipc/jni/arg_getter.go
index 04efd0b..ef24b5d 100644
--- a/runtimes/google/ipc/jni/arg_getter.go
+++ b/runtimes/google/ipc/jni/arg_getter.go
@@ -90,14 +90,29 @@
 // fillStreamArgs fills in stream argument types for the provided stream.
 func fillStreamArgs(stream reflect.Type, mArgs *methodArgs) error {
 	// Get the stream send type.
-	if mSend, ok := stream.MethodByName("Send"); ok {
+	if mSendStream, ok := stream.MethodByName("SendStream"); ok {
+		if mSendStream.Type.NumOut() != 1 {
+			return fmt.Errorf("Illegal number of arguments for SendStream method in stream %v", stream)
+		}
+		mSend, ok := mSendStream.Type.Out(0).MethodByName("Send")
+		if !ok {
+			return fmt.Errorf("Illegal Send method in SendStream %v", mSendStream)
+		}
+
 		if mSend.Type.NumIn() != 1 {
 			return fmt.Errorf("Illegal number of arguments for Send method in stream %v", stream)
 		}
 		mArgs.streamSendType = mSend.Type.In(0)
 	}
 	// Get the stream recv type.
-	if mRecv, ok := stream.MethodByName("Value"); ok {
+	if mRecvStream, ok := stream.MethodByName("RecvStream"); ok {
+		if mRecvStream.Type.NumOut() != 1 {
+			return fmt.Errorf("Illegal number of arguments for RecvStream method in stream %v", stream)
+		}
+		mRecv, ok := mRecvStream.Type.Out(0).MethodByName("Value")
+		if !ok {
+			return fmt.Errorf("Illegal Value method in RecvStream %v", mRecvStream)
+		}
 		if mRecv.Type.NumOut() != 1 {
 			return fmt.Errorf("Illegal number of arguments for Value method in stream %v", stream)
 		}
diff --git a/runtimes/google/rt/mgmt_test.go b/runtimes/google/rt/mgmt_test.go
index 7f314e9..4208a00 100644
--- a/runtimes/google/rt/mgmt_test.go
+++ b/runtimes/google/rt/mgmt_test.go
@@ -303,11 +303,12 @@
 	if err != nil {
 		t.Fatalf("Got error: %v", err)
 	}
+	rStream := stream.RecvStream()
 	expectTask := func(progress, goal int32) {
-		if !stream.Advance() {
-			t.Fatalf("unexpected streaming error: %q", stream.Err())
+		if !rStream.Advance() {
+			t.Fatalf("unexpected streaming error: %q", rStream.Err())
 		}
-		task := stream.Value()
+		task := rStream.Value()
 		if task.Progress != progress || task.Goal != goal {
 			t.Errorf("Got (%d, %d), want (%d, %d)", task.Progress, task.Goal, progress, goal)
 		}
@@ -315,8 +316,8 @@
 	expectTask(0, 10)
 	expectTask(2, 10)
 	expectTask(7, 10)
-	if stream.Advance() || stream.Err() != nil {
-		t.Errorf("Expected EOF, got (%v, %v) instead", stream.Value(), stream.Err())
+	if rStream.Advance() || rStream.Err() != nil {
+		t.Errorf("Expected EOF, got (%v, %v) instead", rStream.Value(), rStream.Err())
 	}
 	if err := stream.Finish(); err != nil {
 		t.Errorf("Got error %v", err)
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index 12d48d8..e885359 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -243,8 +243,8 @@
 // call. It does not perform any conflict resolution during replay.
 // This avoids resolving conflicts that have already been resolved by
 // other devices.
-func (i *syncInitiator) processLogStream(stream SyncGetDeltasStream) (GenVector, error) {
-	// Map to track new generations received in the RPC reply.
+func (i *syncInitiator) processLogStream(stream SyncGetDeltasCall) (GenVector, error) {
+	// Map to track new generations received in the Call reply.
 	// TODO(hpucha): If needed, this can be optimized under the
 	// assumption that an entire generation is received
 	// sequentially. We can then parse a generation at a time.
@@ -256,8 +256,9 @@
 	minGens := GenVector{}
 
 	curTx := NoTxID
-	for stream.Advance() {
-		rec := stream.Value()
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		rec := rStream.Value()
 
 		// Begin a new transaction if needed.
 		if curTx == NoTxID && rec.Value.Continued {
@@ -302,7 +303,7 @@
 		}
 	}
 
-	if err := stream.Err(); err != nil {
+	if err := rStream.Err(); err != nil {
 		return GenVector{}, err
 	}
 	if curTx != NoTxID {
@@ -560,13 +561,14 @@
 			vlog.Errorf("updateStoreAndSync:: putmutations err %v", err)
 			return err
 		}
+		sender := stream.SendStream()
 		for i := range m {
-			if err := stream.Send(m[i]); err != nil {
+			if err := sender.Send(m[i]); err != nil {
 				vlog.Errorf("updateStoreAndSync:: send err %v", err)
 				return err
 			}
 		}
-		if err := stream.CloseSend(); err != nil {
+		if err := sender.Close(); err != nil {
 			vlog.Errorf("updateStoreAndSync:: closesend err %v", err)
 			return err
 		}
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index ac24df1..002ab95 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -58,6 +58,14 @@
 	return ds.value
 }
 
+func (ds *dummyStream) RecvStream() interface {
+	Advance() bool
+	Value() LogRec
+	Err() error
+} {
+	return ds
+}
+
 func (*dummyStream) Err() error { return nil }
 
 func (ds *dummyStream) Finish() (GenVector, error) {
diff --git a/runtimes/google/vsync/vsync.vdl.go b/runtimes/google/vsync/vsync.vdl.go
index 03fe65c..03efad9 100644
--- a/runtimes/google/vsync/vsync.vdl.go
+++ b/runtimes/google/vsync/vsync.vdl.go
@@ -92,7 +92,7 @@
 type Sync_ExcludingUniversal interface {
 	// GetDeltas returns a device's current generation vector and all the missing log records
 	// when compared to the incoming generation vector.
-	GetDeltas(ctx _gen_context.T, In GenVector, ClientID DeviceID, opts ..._gen_ipc.CallOpt) (reply SyncGetDeltasStream, err error)
+	GetDeltas(ctx _gen_context.T, In GenVector, ClientID DeviceID, opts ..._gen_ipc.CallOpt) (reply SyncGetDeltasCall, err error)
 }
 type Sync interface {
 	_gen_ipc.UniversalServiceMethods
@@ -107,27 +107,29 @@
 	GetDeltas(context _gen_ipc.ServerContext, In GenVector, ClientID DeviceID, stream SyncServiceGetDeltasStream) (reply GenVector, err error)
 }
 
-// SyncGetDeltasStream is the interface for streaming responses of the method
+// SyncGetDeltasCall is the interface for call object of the method
 // GetDeltas in the service interface Sync.
-type SyncGetDeltasStream interface {
+type SyncGetDeltasCall interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
-	// Advance stages an element so the client can retrieve it
-	// with Value.  Advance returns true iff there is an
-	// element to retrieve.  The client must call Advance before
-	// calling Value.  The client must call Cancel if it does
-	// not iterate through all elements (i.e. until Advance
-	// returns false).  Advance may block if an element is not
-	// immediately available.
-	Advance() bool
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() LogRec
 
-	// Value returns the element that was staged by Advance.
-	// Value may panic if Advance returned false or was not
-	// called at all.  Value does not block.
-	Value() LogRec
-
-	// Err returns a non-nil error iff the stream encountered
-	// any errors.  Err does not block.
-	Err() error
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
 
 	// Finish blocks until the server is done and returns the positional
 	// return values for call.
@@ -138,7 +140,7 @@
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (reply GenVector, err error)
 
@@ -148,56 +150,84 @@
 	Cancel()
 }
 
-// Implementation of the SyncGetDeltasStream interface that is not exported.
-type implSyncGetDeltasStream struct {
+type implSyncGetDeltasStreamIterator struct {
 	clientCall _gen_ipc.Call
 	val        LogRec
 	err        error
 }
 
-func (c *implSyncGetDeltasStream) Advance() bool {
+func (c *implSyncGetDeltasStreamIterator) Advance() bool {
 	c.val = LogRec{}
 	c.err = c.clientCall.Recv(&c.val)
 	return c.err == nil
 }
 
-func (c *implSyncGetDeltasStream) Value() LogRec {
+func (c *implSyncGetDeltasStreamIterator) Value() LogRec {
 	return c.val
 }
 
-func (c *implSyncGetDeltasStream) Err() error {
+func (c *implSyncGetDeltasStreamIterator) Err() error {
 	if c.err == _gen_io.EOF {
 		return nil
 	}
 	return c.err
 }
 
-func (c *implSyncGetDeltasStream) Finish() (reply GenVector, err error) {
+// Implementation of the SyncGetDeltasCall interface that is not exported.
+type implSyncGetDeltasCall struct {
+	clientCall _gen_ipc.Call
+	readStream implSyncGetDeltasStreamIterator
+}
+
+func (c *implSyncGetDeltasCall) RecvStream() interface {
+	Advance() bool
+	Value() LogRec
+	Err() error
+} {
+	return &c.readStream
+}
+
+func (c *implSyncGetDeltasCall) Finish() (reply GenVector, err error) {
 	if ierr := c.clientCall.Finish(&reply, &err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implSyncGetDeltasStream) Cancel() {
+func (c *implSyncGetDeltasCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
+type implSyncServiceGetDeltasStreamSender struct {
+	serverCall _gen_ipc.ServerCall
+}
+
+func (s *implSyncServiceGetDeltasStreamSender) Send(item LogRec) error {
+	return s.serverCall.Send(item)
+}
+
 // SyncServiceGetDeltasStream is the interface for streaming responses of the method
 // GetDeltas in the service interface Sync.
 type SyncServiceGetDeltasStream interface {
-	// Send places the item onto the output stream, blocking if there is no buffer
-	// space available.  If the client has canceled, an error is returned.
-	Send(item LogRec) error
+	// SendStream returns the send portion of the stream.
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no buffer
+		// space available.  If the client has canceled, an error is returned.
+		Send(item LogRec) error
+	}
 }
 
 // Implementation of the SyncServiceGetDeltasStream interface that is not exported.
 type implSyncServiceGetDeltasStream struct {
-	serverCall _gen_ipc.ServerCall
+	writer implSyncServiceGetDeltasStreamSender
 }
 
-func (s *implSyncServiceGetDeltasStream) Send(item LogRec) error {
-	return s.serverCall.Send(item)
+func (s *implSyncServiceGetDeltasStream) SendStream() interface {
+	// Send places the item onto the output stream, blocking if there is no buffer
+	// space available.  If the client has canceled, an error is returned.
+	Send(item LogRec) error
+} {
+	return &s.writer
 }
 
 // BindSync returns the client stub implementing the Sync
@@ -241,12 +271,12 @@
 	name   string
 }
 
-func (__gen_c *clientStubSync) GetDeltas(ctx _gen_context.T, In GenVector, ClientID DeviceID, opts ..._gen_ipc.CallOpt) (reply SyncGetDeltasStream, err error) {
+func (__gen_c *clientStubSync) GetDeltas(ctx _gen_context.T, In GenVector, ClientID DeviceID, opts ..._gen_ipc.CallOpt) (reply SyncGetDeltasCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "GetDeltas", []interface{}{In, ClientID}, opts...); err != nil {
 		return
 	}
-	reply = &implSyncGetDeltasStream{clientCall: call}
+	reply = &implSyncGetDeltasCall{clientCall: call, readStream: implSyncGetDeltasStreamIterator{clientCall: call}}
 	return
 }
 
@@ -385,7 +415,7 @@
 }
 
 func (__gen_s *ServerStubSync) GetDeltas(call _gen_ipc.ServerCall, In GenVector, ClientID DeviceID) (reply GenVector, err error) {
-	stream := &implSyncServiceGetDeltasStream{serverCall: call}
+	stream := &implSyncServiceGetDeltasStream{writer: implSyncServiceGetDeltasStreamSender{serverCall: call}}
 	reply, err = __gen_s.service.GetDeltas(call, In, ClientID, stream)
 	return
 }
diff --git a/runtimes/google/vsync/vsyncd.go b/runtimes/google/vsync/vsyncd.go
index 8ef1c53..2b74ce7 100644
--- a/runtimes/google/vsync/vsyncd.go
+++ b/runtimes/google/vsync/vsyncd.go
@@ -207,7 +207,7 @@
 					v.devID, v.genID, i, err)
 			}
 			vlog.VI(1).Infof("Sending log record %v", rec)
-			if err := Stream.Send(*rec); err != nil {
+			if err := Stream.SendStream().Send(*rec); err != nil {
 				vlog.Errorf("GetDeltas:: Couldn't send stream err: %v", err)
 				return GenVector{}, err
 			}
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index 4ec4ac6..d470ce0 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -51,7 +51,7 @@
 // its goroutines to exit by closing its internal channel.  This in turn unblocks the watcher
 // enabling it to exit.  If the RPC fails, the watcher notifies the canceler to exit by
 // closing a private "done" channel between them.
-func (w *syncWatcher) watchStreamCanceler(stream watch.GlobWatcherWatchGlobStream, done chan struct{}) {
+func (w *syncWatcher) watchStreamCanceler(stream watch.GlobWatcherWatchGlobCall, done chan struct{}) {
 	select {
 	case <-w.syncd.closed:
 		vlog.VI(1).Info("watchStreamCanceler: Syncd channel closed, cancel stream and exit")
@@ -103,7 +103,7 @@
 
 // getWatchStream() returns a Watch API stream and handles retries if the Watch() call fails.
 // If the stream is nil, it means Syncd is exiting cleanly and the caller should terminate.
-func (w *syncWatcher) getWatchStream(ctx context.T) watch.GlobWatcherWatchGlobStream {
+func (w *syncWatcher) getWatchStream(ctx context.T) watch.GlobWatcherWatchGlobCall {
 	for {
 		req := raw.Request{}
 		if resmark := w.syncd.devtab.head.Resmark; resmark != nil {
@@ -129,8 +129,9 @@
 // Ideally this call does not return as the stream should be un-ending (like "tail -f").
 // If the stream is closed, distinguish between the cases of end-of-stream vs Syncd canceling
 // the stream to trigger a clean exit.
-func (w *syncWatcher) processWatchStream(stream watch.GlobWatcherWatchGlobStream) {
+func (w *syncWatcher) processWatchStream(call watch.GlobWatcherWatchGlobCall) {
 	w.curTx = NoTxID
+	stream := call.RecvStream()
 	for stream.Advance() {
 		changes := stream.Value()
 
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index 90c1629..45a9b44 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -52,7 +52,7 @@
 	panic("not implemented")
 }
 
-func (v *fakeVStore) Watch(_ context.T, req raw.Request, _ ...ipc.CallOpt) (raw.StoreWatchStream, error) {
+func (v *fakeVStore) Watch(_ context.T, req raw.Request, _ ...ipc.CallOpt) (raw.StoreWatchCall, error) {
 	// If "failWatch" is set, simulate a failed RPC call.
 	if info.failWatch {
 		info.failWatchCount++
@@ -66,7 +66,7 @@
 	return newFakeStream(), nil
 }
 
-func (*fakeVStore) PutMutations(_ context.T, _ ...ipc.CallOpt) (raw.StorePutMutationsStream, error) {
+func (*fakeVStore) PutMutations(_ context.T, _ ...ipc.CallOpt) (raw.StorePutMutationsCall, error) {
 	panic("not implemented")
 }
 
@@ -82,6 +82,14 @@
 	return s
 }
 
+func (s *fakeStream) RecvStream() interface {
+	Advance() bool
+	Value() watch.ChangeBatch
+	Err() error
+} {
+	return s
+}
+
 func (s *fakeStream) Advance() bool {
 	// If "failRecv" is set, simulate a failed call.
 	if info.failRecv {
diff --git a/services/mgmt/binary/impl/impl_test.go b/services/mgmt/binary/impl/impl_test.go
index 269ee0c..b706f3c 100644
--- a/services/mgmt/binary/impl/impl_test.go
+++ b/services/mgmt/binary/impl/impl_test.go
@@ -33,18 +33,19 @@
 		t.Errorf("Upload() failed: %v", err)
 		return err
 	}
-	if err := stream.Send(data); err != nil {
+	sender := stream.SendStream()
+	if err := sender.Send(data); err != nil {
 		if err := stream.Finish(); err != nil {
 			t.Logf("Finish() failed: %v", err)
 		}
 		t.Logf("Send() failed: %v", err)
 		return err
 	}
-	if err := stream.CloseSend(); err != nil {
+	if err := sender.Close(); err != nil {
 		if err := stream.Finish(); err != nil {
 			t.Logf("Finish() failed: %v", err)
 		}
-		t.Logf("CloseSend() failed: %v", err)
+		t.Logf("Close() failed: %v", err)
 		return err
 	}
 	if err := stream.Finish(); err != nil {
@@ -63,12 +64,13 @@
 		return nil, err
 	}
 	output := make([]byte, 0)
-	for stream.Advance() {
-		bytes := stream.Value()
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		bytes := rStream.Value()
 		output = append(output, bytes...)
 	}
 
-	if err := stream.Err(); err != nil {
+	if err := rStream.Err(); err != nil {
 		t.Logf("Advance() failed with: %v", err)
 	}
 
diff --git a/services/mgmt/binary/impl/invoker.go b/services/mgmt/binary/impl/invoker.go
index 59324a3..9ad1576 100644
--- a/services/mgmt/binary/impl/invoker.go
+++ b/services/mgmt/binary/impl/invoker.go
@@ -350,6 +350,7 @@
 	}
 	defer file.Close()
 	buffer := make([]byte, bufferLength)
+	sender := stream.SendStream()
 	for {
 		n, err := file.Read(buffer)
 		if err != nil && err != io.EOF {
@@ -359,7 +360,7 @@
 		if n == 0 {
 			break
 		}
-		if err := stream.Send(buffer[:n]); err != nil {
+		if err := sender.Send(buffer[:n]); err != nil {
 			vlog.Errorf("Send() failed: %v", err)
 			return errOperationFailed
 		}
@@ -436,8 +437,9 @@
 	}
 	defer file.Close()
 	h := md5.New()
-	for stream.Advance() {
-		bytes := stream.Value()
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		bytes := rStream.Value()
 		if _, err := file.Write(bytes); err != nil {
 			vlog.Errorf("Write() failed: %v", err)
 			if err := os.Remove(file.Name()); err != nil {
@@ -448,8 +450,8 @@
 		h.Write(bytes)
 	}
 
-	if err := stream.Err(); err != nil {
-		vlog.Errorf("Recv() failed: %v", err)
+	if err := rStream.Err(); err != nil {
+		vlog.Errorf("Advance() failed: %v", err)
 		if err := os.Remove(file.Name()); err != nil {
 			vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
 		}
diff --git a/services/mgmt/build/impl/impl_test.go b/services/mgmt/build/impl/impl_test.go
index 65b3311..4d0bcd6 100644
--- a/services/mgmt/build/impl/impl_test.go
+++ b/services/mgmt/build/impl/impl_test.go
@@ -56,23 +56,25 @@
 		t.Errorf("Build(%v, %v) failed: %v", err, arch, opsys)
 		return nil, nil, err
 	}
+	sender := stream.SendStream()
 	for _, file := range files {
-		if err := stream.Send(file); err != nil {
+		if err := sender.Send(file); err != nil {
 			t.Logf("Send() failed: %v", err)
 			stream.Cancel()
 			return nil, nil, err
 		}
 	}
-	if err := stream.CloseSend(); err != nil {
-		t.Logf("CloseSend() failed: %v", err)
+	if err := sender.Close(); err != nil {
+		t.Logf("Close() failed: %v", err)
 		stream.Cancel()
 		return nil, nil, err
 	}
 	bins := make([]build.File, 0)
-	for stream.Advance() {
-		bins = append(bins, stream.Value())
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		bins = append(bins, rStream.Value())
 	}
-	if err := stream.Err(); err != nil {
+	if err := rStream.Err(); err != nil {
 		t.Logf("Advance() failed: %v", err)
 		return nil, nil, err
 	}
diff --git a/services/mgmt/build/impl/invoker.go b/services/mgmt/build/impl/invoker.go
index 74778da..407f8a6 100644
--- a/services/mgmt/build/impl/invoker.go
+++ b/services/mgmt/build/impl/invoker.go
@@ -56,8 +56,9 @@
 		vlog.Errorf("MkdirAll(%v, %v) failed: %v", srcDir, dirPerm, err)
 		return nil, errInternalError
 	}
-	for stream.Advance() {
-		srcFile := stream.Value()
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		srcFile := rStream.Value()
 		filePath := filepath.Join(srcDir, filepath.FromSlash(srcFile.Name))
 		dir := filepath.Dir(filePath)
 		if err := os.MkdirAll(dir, dirPerm); err != nil {
@@ -70,8 +71,8 @@
 		}
 	}
 
-	if err := stream.Err(); err != nil {
-		vlog.Errorf("stream failed: %v", err)
+	if err := rStream.Err(); err != nil {
+		vlog.Errorf("rStream failed: %v", err)
 		return nil, errInternalError
 	}
 	cmd := exec.Command(i.gobin, "install", "-v", "...")
@@ -108,7 +109,7 @@
 			Name:     "bin/" + file.Name(),
 			Contents: bytes,
 		}
-		if err := stream.Send(result); err != nil {
+		if err := stream.SendStream().Send(result); err != nil {
 			vlog.Errorf("Send() failed: %v", err)
 			return nil, errInternalError
 		}
diff --git a/services/mgmt/lib/binary/impl.go b/services/mgmt/lib/binary/impl.go
index a6611eb..63cc031 100644
--- a/services/mgmt/lib/binary/impl.go
+++ b/services/mgmt/lib/binary/impl.go
@@ -74,8 +74,9 @@
 				continue
 			}
 			h, nreceived := md5.New(), 0
-			for stream.Advance() {
-				bytes := stream.Value()
+			rStream := stream.RecvStream()
+			for rStream.Advance() {
+				bytes := rStream.Value()
 				if _, err := w.Write(bytes); err != nil {
 					vlog.Errorf("Write() failed: %v", err)
 					stream.Cancel()
@@ -85,8 +86,8 @@
 				nreceived += len(bytes)
 			}
 
-			if err := stream.Err(); err != nil {
-				vlog.Errorf("stream failed: %v", err)
+			if err := rStream.Err(); err != nil {
+				vlog.Errorf("Advance() failed: %v", err)
 				stream.Cancel()
 				continue download
 
@@ -210,19 +211,20 @@
 					continue upload
 				}
 			}
+			sender := stream.SendStream()
 			for from := 0; from < len(buffer); from += subpartSize {
 				to := from + subpartSize
 				if to > len(buffer) {
 					to = len(buffer)
 				}
-				if err := stream.Send(buffer[from:to]); err != nil {
+				if err := sender.Send(buffer[from:to]); err != nil {
 					vlog.Errorf("Send() failed: %v", err)
 					stream.Cancel()
 					continue upload
 				}
 			}
-			if err := stream.CloseSend(); err != nil {
-				vlog.Errorf("CloseSend() failed: %v", err)
+			if err := sender.Close(); err != nil {
+				vlog.Errorf("Close() failed: %v", err)
 				parts, statErr := client.Stat(rt.R().NewContext())
 				if statErr != nil {
 					vlog.Errorf("Stat() failed: %v", statErr)
diff --git a/services/mgmt/node/impl/mock_repo_test.go b/services/mgmt/node/impl/mock_repo_test.go
index 28eaf4a..8630621 100644
--- a/services/mgmt/node/impl/mock_repo_test.go
+++ b/services/mgmt/node/impl/mock_repo_test.go
@@ -90,13 +90,14 @@
 	defer file.Close()
 	bufferLength := 4096
 	buffer := make([]byte, bufferLength)
+	sender := stream.SendStream()
 	for {
 		n, err := file.Read(buffer)
 		switch err {
 		case io.EOF:
 			return nil
 		case nil:
-			if err := stream.Send(buffer[:n]); err != nil {
+			if err := sender.Send(buffer[:n]); err != nil {
 				vlog.Errorf("Send() failed: %v", err)
 				return errOperationFailed
 			}
diff --git a/services/mounttable/lib/mounttable.go b/services/mounttable/lib/mounttable.go
index 367230f..bfb4f56 100644
--- a/services/mounttable/lib/mounttable.go
+++ b/services/mounttable/lib/mounttable.go
@@ -328,6 +328,7 @@
 		}
 	}
 
+	sender := reply.SendStream()
 	// If this is a mount point, we're done.
 	if m := n.mount; m != nil {
 		// Garbage-collect if expired.
@@ -335,7 +336,7 @@
 			n.removeUseless()
 			return
 		}
-		reply.Send(mounttable.MountEntry{Name: name, Servers: m.servers.copyToSlice()})
+		sender.Send(mounttable.MountEntry{Name: name, Servers: m.servers.copyToSlice()})
 		return
 	}
 
@@ -345,7 +346,7 @@
 			n.removeUseless()
 			return
 		}
-		reply.Send(mounttable.MountEntry{Name: name})
+		sender.Send(mounttable.MountEntry{Name: name})
 	}
 
 	if pattern.Finished() {
diff --git a/services/mounttable/lib/mounttable_test.go b/services/mounttable/lib/mounttable_test.go
index 443cf4f..5b60408 100644
--- a/services/mounttable/lib/mounttable_test.go
+++ b/services/mounttable/lib/mounttable_test.go
@@ -295,12 +295,13 @@
 		boom(t, "Failed call to %s.Glob(%s): %s", name, pattern, err)
 	}
 	var reply []string
-	for stream.Advance() {
-		e := stream.Value()
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		e := rStream.Value()
 		reply = append(reply, e.Name)
 	}
 
-	if err := stream.Err(); err != nil {
+	if err := rStream.Err(); err != nil {
 		boom(t, "Glob %s: %s", name, err)
 	}
 	return reply
diff --git a/services/mounttable/lib/neighborhood.go b/services/mounttable/lib/neighborhood.go
index 7653604..2a1206f 100644
--- a/services/mounttable/lib/neighborhood.go
+++ b/services/mounttable/lib/neighborhood.go
@@ -240,13 +240,14 @@
 	// return all neighbors that match the first element of the pattern.
 	nh := ns.nh
 
+	sender := reply.SendStream()
 	switch len(ns.elems) {
 	case 0:
 		for k, n := range nh.neighbors() {
 			if ok, _ := g.MatchInitialSegment(k); !ok {
 				continue
 			}
-			if err := reply.Send(mounttable.MountEntry{Name: k, Servers: n}); err != nil {
+			if err := sender.Send(mounttable.MountEntry{Name: k, Servers: n}); err != nil {
 				return err
 			}
 		}
@@ -256,7 +257,7 @@
 		if neighbor == nil {
 			return naming.ErrNoSuchName
 		}
-		return reply.Send(mounttable.MountEntry{Name: "", Servers: neighbor})
+		return sender.Send(mounttable.MountEntry{Name: "", Servers: neighbor})
 	default:
 		return naming.ErrNoSuchName
 	}
diff --git a/services/store/memstore/blackbox/sync_integration_test.go b/services/store/memstore/blackbox/sync_integration_test.go
index 06139f4..6be3e37 100644
--- a/services/store/memstore/blackbox/sync_integration_test.go
+++ b/services/store/memstore/blackbox/sync_integration_test.go
@@ -45,10 +45,11 @@
 	// Create a sync request
 	stream := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, raw.Request{})
 
-	if !stream.Advance() {
-		t.Fatalf("Advance() failed: %v", stream.Err())
+	rStream := stream.RecvStream()
+	if !rStream.Advance() {
+		t.Fatalf("Advance() failed: %v", rStream.Err())
 	}
-	cb := stream.Value()
+	cb := rStream.Value()
 	// Update target
 	PutMutations(t, target, Mutations(cb.Changes))
 	GC(t, target)
@@ -79,10 +80,11 @@
 	id3 := Put(t, st, tr, "/a/b", "val3")
 	Commit(t, tr)
 
-	if !stream.Advance() {
-		t.Fatalf("Advance() failed: %v", stream.Err())
+	rStream := stream.RecvStream()
+	if !rStream.Advance() {
+		t.Fatalf("Advance() failed: %v", rStream.Err())
 	}
-	cb := stream.Value()
+	cb := rStream.Value()
 	// Update target
 	PutMutations(t, target, Mutations(cb.Changes))
 	GC(t, target)
@@ -97,11 +99,11 @@
 	Remove(t, st, tr, "/a/b")
 	Commit(t, tr)
 
-	if !stream.Advance() {
-		t.Fatalf("Advance() failed: %v", stream.Err())
+	if !rStream.Advance() {
+		t.Fatalf("Advance() failed: %v", rStream.Err())
 	}
 
-	cb = stream.Value()
+	cb = rStream.Value()
 	// Update target
 	PutMutations(t, target, Mutations(cb.Changes))
 	GC(t, target)
diff --git a/services/store/memstore/blackbox/util.go b/services/store/memstore/blackbox/util.go
index d432ad5..c618fb8 100644
--- a/services/store/memstore/blackbox/util.go
+++ b/services/store/memstore/blackbox/util.go
@@ -201,6 +201,14 @@
 	}
 }
 
+func (s *putMutationsStream) RecvStream() interface {
+	Advance() bool
+	Value() raw.Mutation
+	Err() error
+} {
+	return s
+}
+
 func (s *putMutationsStream) Advance() bool {
 	s.index++
 	return s.index < len(s.mus)
diff --git a/services/store/memstore/store.go b/services/store/memstore/store.go
index a5222e2..bb7ef4a 100644
--- a/services/store/memstore/store.go
+++ b/services/store/memstore/store.go
@@ -153,15 +153,16 @@
 // stream has been closed.
 func (st *Store) PutMutations(ctx ipc.ServerContext, stream raw.StoreServicePutMutationsStream) error {
 	tr := st.newNilTransaction()
-	for stream.Advance() {
-		mu := stream.Value()
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		mu := rStream.Value()
 
 		if err := tr.snapshot.PutMutation(mu); err != nil {
 			tr.Abort()
 			return err
 		}
 	}
-	err := stream.Err()
+	err := rStream.Err()
 	if err != nil {
 		tr.Abort()
 		return err
diff --git a/services/store/memstore/store_test.go b/services/store/memstore/store_test.go
index a71aa48..bad595c 100644
--- a/services/store/memstore/store_test.go
+++ b/services/store/memstore/store_test.go
@@ -285,7 +285,7 @@
 	v2 = "v4"
 
 	s := storetesting.PutMutations(rootPublicID, st.PutMutations)
-	s.Send(raw.Mutation{
+	s.SendStream().Send(raw.Mutation{
 		ID:           id2,
 		PriorVersion: pre2,
 		Version:      post2,
@@ -314,7 +314,7 @@
 
 	id := storage.NewID()
 	s := storetesting.PutMutations(rootPublicID, st.PutMutations)
-	s.Send(raw.Mutation{
+	s.SendStream().Send(raw.Mutation{
 		ID:           id,
 		PriorVersion: storage.NoVersion,
 		Version:      storage.NewVersion(),
@@ -322,7 +322,7 @@
 		Value:        "v1",
 		Dir:          empty,
 	})
-	s.Send(raw.Mutation{
+	s.SendStream().Send(raw.Mutation{
 		ID:           id,
 		PriorVersion: storage.NoVersion,
 		Version:      storage.NewVersion(),
@@ -349,7 +349,7 @@
 	}
 
 	s := storetesting.PutMutations(rootPublicID, st.PutMutations)
-	s.Send(raw.Mutation{
+	s.SendStream().Send(raw.Mutation{
 		ID:           storage.NewID(),
 		PriorVersion: storage.NoVersion,
 		Version:      storage.NewVersion(),
diff --git a/services/store/memstore/testing/util.go b/services/store/memstore/testing/util.go
index 1a5e503..00066f5 100644
--- a/services/store/memstore/testing/util.go
+++ b/services/store/memstore/testing/util.go
@@ -110,6 +110,14 @@
 	value raw.Mutation
 }
 
+func (s *storeServicePutMutationsStream) RecvStream() interface {
+	Advance() bool
+	Value() raw.Mutation
+	Err() error
+} {
+	return s
+}
+
 func (s *storeServicePutMutationsStream) Advance() bool {
 	var ok bool
 	s.value, ok = <-s.mus
@@ -126,10 +134,8 @@
 
 // storePutMutationsStream implements raw.StorePutMutationsStream
 type storePutMutationsStream struct {
-	ctx    ipc.ServerContext
 	closed bool
 	mus    chan<- raw.Mutation
-	err    <-chan error
 }
 
 func (s *storePutMutationsStream) Send(mu raw.Mutation) error {
@@ -137,7 +143,7 @@
 	return nil
 }
 
-func (s *storePutMutationsStream) CloseSend() error {
+func (s *storePutMutationsStream) Close() error {
 	if !s.closed {
 		s.closed = true
 		close(s.mus)
@@ -145,17 +151,30 @@
 	return nil
 }
 
-func (s *storePutMutationsStream) Finish() error {
-	s.CloseSend()
+type storePutMutationsCall struct {
+	ctx    ipc.ServerContext
+	stream storePutMutationsStream
+	err    <-chan error
+}
+
+func (s *storePutMutationsCall) SendStream() interface {
+	Send(mu raw.Mutation) error
+	Close() error
+} {
+	return &s.stream
+}
+
+func (s *storePutMutationsCall) Finish() error {
+	s.stream.Close()
 	return <-s.err
 }
 
-func (s *storePutMutationsStream) Cancel() {
+func (s *storePutMutationsCall) Cancel() {
 	s.ctx.(*CancellableContext).Cancel()
-	s.CloseSend()
+	s.stream.Close()
 }
 
-func PutMutations(id security.PublicID, putMutationsFn func(ipc.ServerContext, raw.StoreServicePutMutationsStream) error) raw.StorePutMutationsStream {
+func PutMutations(id security.PublicID, putMutationsFn func(ipc.ServerContext, raw.StoreServicePutMutationsStream) error) raw.StorePutMutationsCall {
 	ctx := NewCancellableContext(id)
 	mus := make(chan raw.Mutation)
 	err := make(chan error)
@@ -163,17 +182,19 @@
 		err <- putMutationsFn(ctx, &storeServicePutMutationsStream{mus: mus})
 		close(err)
 	}()
-	return &storePutMutationsStream{
+	return &storePutMutationsCall{
 		ctx: ctx,
-		mus: mus,
 		err: err,
+		stream: storePutMutationsStream{
+			mus: mus,
+		},
 	}
 }
 
 func PutMutationsBatch(t *testing.T, id security.PublicID, putMutationsFn func(ipc.ServerContext, raw.StoreServicePutMutationsStream) error, mus []raw.Mutation) {
 	storePutMutationsStream := PutMutations(id, putMutationsFn)
 	for _, mu := range mus {
-		storePutMutationsStream.Send(mu)
+		storePutMutationsStream.SendStream().Send(mu)
 	}
 	if err := storePutMutationsStream.Finish(); err != nil {
 		_, file, line, _ := runtime.Caller(1)
@@ -183,14 +204,14 @@
 
 // Utilities for Watch.
 
-// watcherServiceWatchStream implements watch.WatcherServiceWatchStream.
-type watcherServiceWatchStream struct {
+// watcherServiceWatchStreamSender implements watch.WatcherServiceWatchStreamSender
+type watcherServiceWatchStreamSender struct {
 	mu     *sync.Mutex
 	ctx    ipc.ServerContext
 	output chan<- watch.ChangeBatch
 }
 
-func (s *watcherServiceWatchStream) Send(cb watch.ChangeBatch) error {
+func (s *watcherServiceWatchStreamSender) Send(cb watch.ChangeBatch) error {
 	s.mu.Lock()
 	defer s.mu.Unlock()
 	select {
@@ -201,6 +222,18 @@
 	}
 }
 
+// watcherServiceWatchStream implements watch.WatcherServiceWatchStream
+type watcherServiceWatchStream struct {
+	watcherServiceWatchStreamSender
+}
+
+func (s *watcherServiceWatchStream) SendStream() interface {
+	Send(cb watch.ChangeBatch) error
+} {
+	return s
+}
+func (*watcherServiceWatchStream) Cancel() {}
+
 // watcherWatchStream implements watch.WatcherWatchStream.
 type watcherWatchStream struct {
 	ctx   *CancellableContext
@@ -232,6 +265,14 @@
 	s.ctx.Cancel()
 }
 
+func (s *watcherWatchStream) RecvStream() interface {
+	Advance() bool
+	Value() watch.ChangeBatch
+	Err() error
+} {
+	return s
+}
+
 func watchImpl(id security.PublicID, watchFn func(ipc.ServerContext, *watcherServiceWatchStream) error) *watcherWatchStream {
 	mu := &sync.Mutex{}
 	ctx := NewCancellableContext(id)
@@ -239,9 +280,11 @@
 	errc := make(chan error, 1)
 	go func() {
 		stream := &watcherServiceWatchStream{
-			mu:     mu,
-			ctx:    ctx,
-			output: c,
+			watcherServiceWatchStreamSender{
+				mu:     mu,
+				ctx:    ctx,
+				output: c,
+			},
 		}
 		err := watchFn(ctx, stream)
 		mu.Lock()
@@ -258,19 +301,19 @@
 	}
 }
 
-func WatchRaw(id security.PublicID, watchFn func(ipc.ServerContext, raw.Request, raw.StoreServiceWatchStream) error, req raw.Request) raw.StoreWatchStream {
+func WatchRaw(id security.PublicID, watchFn func(ipc.ServerContext, raw.Request, raw.StoreServiceWatchStream) error, req raw.Request) raw.StoreWatchCall {
 	return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
 		return watchFn(ctx, req, stream)
 	})
 }
 
-func WatchGlob(id security.PublicID, watchFn func(ipc.ServerContext, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, req watch.GlobRequest) watch.GlobWatcherWatchGlobStream {
-	return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
-		return watchFn(ctx, req, stream)
+func WatchGlob(id security.PublicID, watchFn func(ipc.ServerContext, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, req watch.GlobRequest) watch.GlobWatcherWatchGlobCall {
+	return watchImpl(id, func(ctx ipc.ServerContext, iterator *watcherServiceWatchStream) error {
+		return watchFn(ctx, req, iterator)
 	})
 }
 
-func WatchGlobOnPath(id security.PublicID, watchFn func(ipc.ServerContext, storage.PathName, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, path storage.PathName, req watch.GlobRequest) watch.GlobWatcherWatchGlobStream {
+func WatchGlobOnPath(id security.PublicID, watchFn func(ipc.ServerContext, storage.PathName, watch.GlobRequest, watch.GlobWatcherServiceWatchGlobStream) error, path storage.PathName, req watch.GlobRequest) watch.GlobWatcherWatchGlobCall {
 	return watchImpl(id, func(ctx ipc.ServerContext, stream *watcherServiceWatchStream) error {
 		return watchFn(ctx, path, req, stream)
 	})
diff --git a/services/store/memstore/watch/watcher.go b/services/store/memstore/watch/watcher.go
index ad1ae00..9c3e925 100644
--- a/services/store/memstore/watch/watcher.go
+++ b/services/store/memstore/watch/watcher.go
@@ -61,7 +61,7 @@
 	if err != nil {
 		return err
 	}
-	return w.Watch(ctx, processor, req.ResumeMarker, stream)
+	return w.Watch(ctx, processor, req.ResumeMarker, stream.SendStream())
 }
 
 // WatchGlob returns a stream of changes that match a pattern.
@@ -72,7 +72,7 @@
 	if err != nil {
 		return err
 	}
-	return w.Watch(ctx, processor, req.ResumeMarker, stream)
+	return w.Watch(ctx, processor, req.ResumeMarker, stream.SendStream())
 }
 
 // WatchQuery returns a stream of changes that satisfy a query.
diff --git a/services/store/memstore/watch/watcher_test.go b/services/store/memstore/watch/watcher_test.go
index 696e773..c471716 100644
--- a/services/store/memstore/watch/watcher_test.go
+++ b/services/store/memstore/watch/watcher_test.go
@@ -37,10 +37,11 @@
 	ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
 
 	// Check that watch detects the changes in the first transaction.
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	rStream := ws.RecvStream()
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb := ws.Value()
+	cb := rStream.Value()
 	changes := cb.Changes
 	change := changes[0]
 	if change.Continued {
@@ -59,10 +60,10 @@
 
 	// Check that watch detects the changes in the second transaction.
 
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb = ws.Value()
+	cb = rStream.Value()
 	changes = cb.Changes
 	change = changes[0]
 	if !change.Continued {
@@ -95,11 +96,12 @@
 	req := watch.GlobRequest{Pattern: "..."}
 	ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
 
+	rStream := ws.RecvStream()
 	// Check that watch detects the changes in the first transaction.
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb := ws.Value()
+	cb := rStream.Value()
 	changes := cb.Changes
 	change := changes[0]
 	if change.Continued {
@@ -113,10 +115,10 @@
 	commit(t, tr)
 
 	// Check that watch detects the changes in the second transaction.
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb = ws.Value()
+	cb = rStream.Value()
 
 	changes = cb.Changes
 	change = changes[0]
@@ -150,7 +152,8 @@
 	commit(t, tr)
 
 	// Check that watch processed the first transaction.
-	if !ws.Advance() {
+	rStream := ws.RecvStream()
+	if !rStream.Advance() {
 		t.Error("Expected a change.")
 	}
 
@@ -165,8 +168,8 @@
 	commit(t, tr)
 
 	// Check that watch did not processed the second transaction.
-	if ws.Advance() || ws.Err() != nil {
-		t.Errorf("Unexpected error: %v", ws.Err())
+	if rStream.Advance() || rStream.Err() != nil {
+		t.Errorf("Unexpected error: %v", rStream.Err())
 	}
 
 	// Check that io.EOF was returned.
@@ -195,7 +198,7 @@
 	commit(t, tr)
 
 	// Check that watch processed the first transaction.
-	if !ws.Advance() {
+	if !ws.RecvStream().Advance() {
 		t.Error("Expected a change.")
 	}
 
@@ -245,10 +248,11 @@
 	ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
 
 	// Retrieve the resume marker for the initial state.
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	rStream := ws.RecvStream()
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb := ws.Value()
+	cb := rStream.Value()
 
 	changes := cb.Changes
 	change := changes[0]
@@ -262,11 +266,12 @@
 	req = raw.Request{ResumeMarker: resumeMarker1}
 	ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
 
+	rStream = ws.RecvStream()
 	// Check that watch detects the changes in the state and the transaction.
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb = ws.Value()
+	cb = rStream.Value()
 
 	changes = cb.Changes
 	change = changes[0]
@@ -276,10 +281,10 @@
 	watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post11, true, "val1", watchtesting.EmptyDir)
 
 	// Check that watch detects the changes in the state and the transaction.
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb = ws.Value()
+	cb = rStream.Value()
 
 	changes = cb.Changes
 	change = changes[0]
@@ -324,10 +329,11 @@
 	ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
 
 	// Retrieve the resume marker for the first transaction.
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	rStream := ws.RecvStream()
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb := ws.Value()
+	cb := rStream.Value()
 
 	changes := cb.Changes
 	change := changes[0]
@@ -342,10 +348,11 @@
 	ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
 
 	// Check that watch detects the changes in the first and second transaction.
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	rStream = ws.RecvStream()
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb = ws.Value()
+	cb = rStream.Value()
 
 	changes = cb.Changes
 	change = changes[0]
@@ -354,10 +361,10 @@
 	}
 	watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post11, true, "val1", watchtesting.EmptyDir)
 
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb = ws.Value()
+	cb = rStream.Value()
 
 	changes = cb.Changes
 	change = changes[0]
@@ -381,10 +388,11 @@
 	ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
 
 	// Check that watch detects the changes in the second transaction.
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	rStream = ws.RecvStream()
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb = ws.Value()
+	cb = rStream.Value()
 
 	changes = cb.Changes
 	change = changes[0]
@@ -437,20 +445,21 @@
 	post33 := st.Snapshot().Find(id3).Version
 
 	// Check that watch announces that the initial state was skipped.
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	rStream := ws.RecvStream()
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb := ws.Value()
+	cb := rStream.Value()
 
 	changes := cb.Changes
 	change := changes[0]
 	watchtesting.ExpectInitialStateSkipped(t, change)
 
 	// Check that watch detects the changes in the third transaction.
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb = ws.Value()
+	cb = rStream.Value()
 
 	changes = cb.Changes
 	change = changes[0]
@@ -519,10 +528,11 @@
 	req := watch.GlobRequest{Pattern: "..."}
 	ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
 
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	rStream := ws.RecvStream()
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb := ws.Value()
+	cb := rStream.Value()
 
 	changes := cb.Changes
 	change := changes[0]
@@ -532,10 +542,11 @@
 	// Start another watch request.
 	ws = watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
 
-	if !ws.Advance() {
-		t.Error("Advance() failed: %v", ws.Err())
+	rStream = ws.RecvStream()
+	if !rStream.Advance() {
+		t.Error("Advance() failed: %v", rStream.Err())
 	}
-	cb = ws.Value()
+	cb = rStream.Value()
 
 	changes = cb.Changes
 	change = changes[0]
diff --git a/services/store/raw/service.vdl.go b/services/store/raw/service.vdl.go
index d669a23..e0570f3 100644
--- a/services/store/raw/service.vdl.go
+++ b/services/store/raw/service.vdl.go
@@ -66,11 +66,11 @@
 // to enable embedding without method collisions.  Not to be used directly by clients.
 type Store_ExcludingUniversal interface {
 	// Watch returns a stream of all changes.
-	Watch(ctx _gen_context.T, Req Request, opts ..._gen_ipc.CallOpt) (reply StoreWatchStream, err error)
+	Watch(ctx _gen_context.T, Req Request, opts ..._gen_ipc.CallOpt) (reply StoreWatchCall, err error)
 	// PutMutations atomically commits a stream of Mutations when the stream is
 	// closed. Mutations are not committed if the request is cancelled before
 	// the stream has been closed.
-	PutMutations(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply StorePutMutationsStream, err error)
+	PutMutations(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply StorePutMutationsCall, err error)
 }
 type Store interface {
 	_gen_ipc.UniversalServiceMethods
@@ -88,27 +88,29 @@
 	PutMutations(context _gen_ipc.ServerContext, stream StoreServicePutMutationsStream) (err error)
 }
 
-// StoreWatchStream is the interface for streaming responses of the method
+// StoreWatchCall is the interface for call object of the method
 // Watch in the service interface Store.
-type StoreWatchStream interface {
+type StoreWatchCall interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
-	// Advance stages an element so the client can retrieve it
-	// with Value.  Advance returns true iff there is an
-	// element to retrieve.  The client must call Advance before
-	// calling Value.  The client must call Cancel if it does
-	// not iterate through all elements (i.e. until Advance
-	// returns false).  Advance may block if an element is not
-	// immediately available.
-	Advance() bool
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() watch.ChangeBatch
 
-	// Value returns the element that was staged by Advance.
-	// Value may panic if Advance returned false or was not
-	// called at all.  Value does not block.
-	Value() watch.ChangeBatch
-
-	// Err returns a non-nil error iff the stream encountered
-	// any errors.  Err does not block.
-	Err() error
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
 
 	// Finish blocks until the server is done and returns the positional
 	// return values for call.
@@ -119,7 +121,7 @@
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (err error)
 
@@ -129,88 +131,118 @@
 	Cancel()
 }
 
-// Implementation of the StoreWatchStream interface that is not exported.
-type implStoreWatchStream struct {
+type implStoreWatchStreamIterator struct {
 	clientCall _gen_ipc.Call
 	val        watch.ChangeBatch
 	err        error
 }
 
-func (c *implStoreWatchStream) Advance() bool {
+func (c *implStoreWatchStreamIterator) Advance() bool {
 	c.val = watch.ChangeBatch{}
 	c.err = c.clientCall.Recv(&c.val)
 	return c.err == nil
 }
 
-func (c *implStoreWatchStream) Value() watch.ChangeBatch {
+func (c *implStoreWatchStreamIterator) Value() watch.ChangeBatch {
 	return c.val
 }
 
-func (c *implStoreWatchStream) Err() error {
+func (c *implStoreWatchStreamIterator) Err() error {
 	if c.err == _gen_io.EOF {
 		return nil
 	}
 	return c.err
 }
 
-func (c *implStoreWatchStream) Finish() (err error) {
+// Implementation of the StoreWatchCall interface that is not exported.
+type implStoreWatchCall struct {
+	clientCall _gen_ipc.Call
+	readStream implStoreWatchStreamIterator
+}
+
+func (c *implStoreWatchCall) RecvStream() interface {
+	Advance() bool
+	Value() watch.ChangeBatch
+	Err() error
+} {
+	return &c.readStream
+}
+
+func (c *implStoreWatchCall) Finish() (err error) {
 	if ierr := c.clientCall.Finish(&err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implStoreWatchStream) Cancel() {
+func (c *implStoreWatchCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
+type implStoreServiceWatchStreamSender struct {
+	serverCall _gen_ipc.ServerCall
+}
+
+func (s *implStoreServiceWatchStreamSender) Send(item watch.ChangeBatch) error {
+	return s.serverCall.Send(item)
+}
+
 // StoreServiceWatchStream is the interface for streaming responses of the method
 // Watch in the service interface Store.
 type StoreServiceWatchStream interface {
-	// Send places the item onto the output stream, blocking if there is no buffer
-	// space available.  If the client has canceled, an error is returned.
-	Send(item watch.ChangeBatch) error
+	// SendStream returns the send portion of the stream.
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no buffer
+		// space available.  If the client has canceled, an error is returned.
+		Send(item watch.ChangeBatch) error
+	}
 }
 
 // Implementation of the StoreServiceWatchStream interface that is not exported.
 type implStoreServiceWatchStream struct {
-	serverCall _gen_ipc.ServerCall
+	writer implStoreServiceWatchStreamSender
 }
 
-func (s *implStoreServiceWatchStream) Send(item watch.ChangeBatch) error {
-	return s.serverCall.Send(item)
+func (s *implStoreServiceWatchStream) SendStream() interface {
+	// Send places the item onto the output stream, blocking if there is no buffer
+	// space available.  If the client has canceled, an error is returned.
+	Send(item watch.ChangeBatch) error
+} {
+	return &s.writer
 }
 
-// StorePutMutationsStream is the interface for streaming responses of the method
+// StorePutMutationsCall is the interface for call object of the method
 // PutMutations in the service interface Store.
-type StorePutMutationsStream interface {
+type StorePutMutationsCall interface {
 
-	// Send places the item onto the output stream, blocking if there is no
-	// buffer space available.  Calls to Send after having called CloseSend
-	// or Cancel will fail.  Any blocked Send calls will be unblocked upon
-	// calling Cancel.
-	Send(item Mutation) error
+	// SendStream returns the send portion of the stream
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no
+		// buffer space available.  Calls to Send after having called Close
+		// or Cancel will fail.  Any blocked Send calls will be unblocked upon
+		// calling Cancel.
+		Send(item Mutation) error
 
-	// CloseSend indicates to the server that no more items will be sent;
-	// server Recv calls will receive io.EOF after all sent items.  This is
-	// an optional call - it's used by streaming clients that need the
-	// server to receive the io.EOF terminator before the client calls
-	// Finish (for example, if the client needs to continue receiving items
-	// from the server after having finished sending).
-	// Calls to CloseSend after having called Cancel will fail.
-	// Like Send, CloseSend blocks when there's no buffer space available.
-	CloseSend() error
+		// Close indicates to the server that no more items will be sent;
+		// server Recv calls will receive io.EOF after all sent items.  This is
+		// an optional call - it's used by streaming clients that need the
+		// server to receive the io.EOF terminator before the client calls
+		// Finish (for example, if the client needs to continue receiving items
+		// from the server after having finished sending).
+		// Calls to Close after having called Cancel will fail.
+		// Like Send, Close blocks when there's no buffer space available.
+		Close() error
+	}
 
-	// Finish performs the equivalent of CloseSend, then blocks until the server
+	// Finish performs the equivalent of SendStream().Close, then blocks until the server
 	// is done, and returns the positional return values for call.
-	//
 	// If Cancel has been called, Finish will return immediately; the output of
 	// Finish could either be an error signalling cancelation, or the correct
 	// positional return values from the server depending on the timing of the
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (err error)
 
@@ -220,34 +252,95 @@
 	Cancel()
 }
 
-// Implementation of the StorePutMutationsStream interface that is not exported.
-type implStorePutMutationsStream struct {
+type implStorePutMutationsStreamSender struct {
 	clientCall _gen_ipc.Call
 }
 
-func (c *implStorePutMutationsStream) Send(item Mutation) error {
+func (c *implStorePutMutationsStreamSender) Send(item Mutation) error {
 	return c.clientCall.Send(item)
 }
 
-func (c *implStorePutMutationsStream) CloseSend() error {
+func (c *implStorePutMutationsStreamSender) Close() error {
 	return c.clientCall.CloseSend()
 }
 
-func (c *implStorePutMutationsStream) Finish() (err error) {
+// Implementation of the StorePutMutationsCall interface that is not exported.
+type implStorePutMutationsCall struct {
+	clientCall  _gen_ipc.Call
+	writeStream implStorePutMutationsStreamSender
+}
+
+func (c *implStorePutMutationsCall) SendStream() interface {
+	Send(item Mutation) error
+	Close() error
+} {
+	return &c.writeStream
+}
+
+func (c *implStorePutMutationsCall) Finish() (err error) {
 	if ierr := c.clientCall.Finish(&err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implStorePutMutationsStream) Cancel() {
+func (c *implStorePutMutationsCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
+type implStoreServicePutMutationsStreamIterator struct {
+	serverCall _gen_ipc.ServerCall
+	val        Mutation
+	err        error
+}
+
+func (s *implStoreServicePutMutationsStreamIterator) Advance() bool {
+	s.err = s.serverCall.Recv(&s.val)
+	return s.err == nil
+}
+
+func (s *implStoreServicePutMutationsStreamIterator) Value() Mutation {
+	return s.val
+}
+
+func (s *implStoreServicePutMutationsStreamIterator) Err() error {
+	if s.err == _gen_io.EOF {
+		return nil
+	}
+	return s.err
+}
+
 // StoreServicePutMutationsStream is the interface for streaming responses of the method
 // PutMutations in the service interface Store.
 type StoreServicePutMutationsStream interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() Mutation
+
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
+}
+
+// Implementation of the StoreServicePutMutationsStream interface that is not exported.
+type implStoreServicePutMutationsStream struct {
+	reader implStoreServicePutMutationsStreamIterator
+}
+
+func (s *implStoreServicePutMutationsStream) RecvStream() interface {
 	// Advance stages an element so the client can retrieve it
 	// with Value.  Advance returns true iff there is an
 	// element to retrieve.  The client must call Advance before
@@ -260,40 +353,13 @@
 	// Value returns the element that was staged by Advance.
 	// Value may panic if Advance returned false or was not
 	// called at all.  Value does not block.
-	//
-	// In general, Value is undefined if the underlying collection
-	// of elements changes while iteration is in progress.  If
-	// <DataProvider> supports concurrent modification, it should
-	// document its behavior.
 	Value() Mutation
 
 	// Err returns a non-nil error iff the stream encountered
 	// any errors.  Err does not block.
 	Err() error
-}
-
-// Implementation of the StoreServicePutMutationsStream interface that is not exported.
-type implStoreServicePutMutationsStream struct {
-	serverCall _gen_ipc.ServerCall
-	val        Mutation
-	err        error
-}
-
-func (s *implStoreServicePutMutationsStream) Advance() bool {
-	s.val = Mutation{}
-	s.err = s.serverCall.Recv(&s.val)
-	return s.err == nil
-}
-
-func (s *implStoreServicePutMutationsStream) Value() Mutation {
-	return s.val
-}
-
-func (s *implStoreServicePutMutationsStream) Err() error {
-	if s.err == _gen_io.EOF {
-		return nil
-	}
-	return s.err
+} {
+	return &s.reader
 }
 
 // BindStore returns the client stub implementing the Store
@@ -337,21 +403,21 @@
 	name   string
 }
 
-func (__gen_c *clientStubStore) Watch(ctx _gen_context.T, Req Request, opts ..._gen_ipc.CallOpt) (reply StoreWatchStream, err error) {
+func (__gen_c *clientStubStore) Watch(ctx _gen_context.T, Req Request, opts ..._gen_ipc.CallOpt) (reply StoreWatchCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "Watch", []interface{}{Req}, opts...); err != nil {
 		return
 	}
-	reply = &implStoreWatchStream{clientCall: call}
+	reply = &implStoreWatchCall{clientCall: call, readStream: implStoreWatchStreamIterator{clientCall: call}}
 	return
 }
 
-func (__gen_c *clientStubStore) PutMutations(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply StorePutMutationsStream, err error) {
+func (__gen_c *clientStubStore) PutMutations(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (reply StorePutMutationsCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "PutMutations", nil, opts...); err != nil {
 		return
 	}
-	reply = &implStorePutMutationsStream{clientCall: call}
+	reply = &implStorePutMutationsCall{clientCall: call, writeStream: implStorePutMutationsStreamSender{clientCall: call}}
 	return
 }
 
@@ -496,13 +562,13 @@
 }
 
 func (__gen_s *ServerStubStore) Watch(call _gen_ipc.ServerCall, Req Request) (err error) {
-	stream := &implStoreServiceWatchStream{serverCall: call}
+	stream := &implStoreServiceWatchStream{writer: implStoreServiceWatchStreamSender{serverCall: call}}
 	err = __gen_s.service.Watch(call, Req, stream)
 	return
 }
 
 func (__gen_s *ServerStubStore) PutMutations(call _gen_ipc.ServerCall) (err error) {
-	stream := &implStoreServicePutMutationsStream{serverCall: call}
+	stream := &implStoreServicePutMutationsStream{reader: implStoreServicePutMutationsStreamIterator{serverCall: call}}
 	err = __gen_s.service.PutMutations(call, stream)
 	return
 }
diff --git a/services/store/server/object.go b/services/store/server/object.go
index 51313c0..a76a84b 100644
--- a/services/store/server/object.go
+++ b/services/store/server/object.go
@@ -189,7 +189,7 @@
 		return err
 	}
 	for it.Next() {
-		if err := stream.Send(*it.Get()); err != nil {
+		if err := stream.SendStream().Send(*it.Get()); err != nil {
 			it.Abort()
 			return err
 		}
@@ -197,14 +197,24 @@
 	return it.Err()
 }
 
+type globStreamSenderAdapter struct {
+	stream interface {
+		Send(entry mounttable.MountEntry) error
+	}
+}
+
+func (a *globStreamSenderAdapter) Send(item string) error {
+	return a.stream.Send(mounttable.MountEntry{Name: item})
+}
+
 type globStreamAdapter struct {
 	stream mounttable.GlobbableServiceGlobStream
 }
 
-func (a *globStreamAdapter) Send(item string) error {
-	return a.stream.Send(mounttable.MountEntry{
-		Name: item,
-	})
+func (a *globStreamAdapter) SendStream() interface {
+	Send(item string) error
+} {
+	return &globStreamSenderAdapter{a.stream.SendStream()}
 }
 
 // Glob streams a series of names that match the given pattern.
@@ -222,7 +232,7 @@
 		if ctx.IsClosed() {
 			break
 		}
-		if err := gsa.Send(it.Name()); err != nil {
+		if err := gsa.SendStream().Send(it.Name()); err != nil {
 			return err
 		}
 	}
@@ -259,16 +269,22 @@
 	return s.delegate.Send(cbCp)
 }
 
+func (s *entryTransformStream) SendStream() interface {
+	Send(cb watch.ChangeBatch) error
+} {
+	return s
+}
+
 // WatchGlob returns a stream of changes that match a pattern.
 func (o *object) WatchGlob(ctx ipc.ServerContext, req watch.GlobRequest, stream watch.GlobWatcherServiceWatchGlobStream) error {
 	path := storage.ParsePath(o.name)
-	stream = &entryTransformStream{stream}
+	stream = &entryTransformStream{stream.SendStream()}
 	return o.server.watcher.WatchGlob(ctx, path, req, stream)
 }
 
 // WatchQuery returns a stream of changes that satisfy a query.
 func (o *object) WatchQuery(ctx ipc.ServerContext, req watch.QueryRequest, stream watch.QueryWatcherServiceWatchQueryStream) error {
 	path := storage.ParsePath(o.name)
-	stream = &entryTransformStream{stream}
+	stream = &entryTransformStream{stream.SendStream()}
 	return o.server.watcher.WatchQuery(ctx, path, req, stream)
 }
diff --git a/services/store/server/server_test.go b/services/store/server/server_test.go
index 9d2eb0c..87e602d 100644
--- a/services/store/server/server_test.go
+++ b/services/store/server/server_test.go
@@ -346,12 +346,13 @@
 	req := raw.Request{}
 	ws := watchtesting.WatchRaw(rootPublicID, s.Watch, req)
 
+	rStream := ws.RecvStream()
 	// Check that watch detects the changes in the first transaction.
 	{
-		if !ws.Advance() {
-			t.Error("Advance() failed: %v", ws.Err())
+		if !rStream.Advance() {
+			t.Error("Advance() failed: %v", rStream.Err())
 		}
-		cb := ws.Value()
+		cb := rStream.Value()
 		changes := cb.Changes
 		change := changes[0]
 		if change.Continued {
@@ -379,10 +380,10 @@
 
 	// Check that watch detects the changes in the second transaction.
 	{
-		if !ws.Advance() {
-			t.Error("Advance() failed: %v", ws.Err())
+		if !rStream.Advance() {
+			t.Error("Advance() failed: %v", rStream.Err())
 		}
-		cb := ws.Value()
+		cb := rStream.Value()
 		changes := cb.Changes
 		change := changes[0]
 		if !change.Continued {
@@ -425,12 +426,14 @@
 	ws1 := watchtesting.WatchGlob(rootPublicID, o1.WatchGlob, req)
 	ws2 := watchtesting.WatchGlob(rootPublicID, o2.WatchGlob, req)
 
+	rStream1 := ws1.RecvStream()
+	rStream2 := ws2.RecvStream()
 	// The watch on / should send a change on /.
 	{
-		if !ws1.Advance() {
-			t.Error("Advance() failed: %v", ws1.Err())
+		if !rStream1.Advance() {
+			t.Error("Advance() failed: %v", rStream1.Err())
 		}
-		cb := ws1.Value()
+		cb := rStream1.Value()
 		changes := cb.Changes
 		change := changes[0]
 		if change.Continued {
@@ -459,10 +462,10 @@
 
 	// The watch on / should send changes on / and /a.
 	{
-		if !ws1.Advance() {
-			t.Error("Advance() failed: %v", ws1.Err())
+		if !rStream1.Advance() {
+			t.Error("Advance() failed: %v", rStream1.Err())
 		}
-		cb := ws1.Value()
+		cb := rStream1.Value()
 		changes := cb.Changes
 		change := changes[0]
 		if !change.Continued {
@@ -477,10 +480,10 @@
 	}
 	// The watch on /a should send a change on /a.
 	{
-		if !ws2.Advance() {
-			t.Error("Advance() failed: %v", ws2.Err())
+		if !rStream2.Advance() {
+			t.Error("Advance() failed: %v", rStream2.Err())
 		}
-		cb := ws2.Value()
+		cb := rStream2.Value()
 		changes := cb.Changes
 		change := changes[0]
 		if change.Continued {
@@ -515,12 +518,13 @@
 	req := raw.Request{}
 	ws := watchtesting.WatchRaw(rootPublicID, s.Watch, req)
 
+	rStream := ws.RecvStream()
 	// Check that watch detects the changes in the first transaction.
 	{
-		if !ws.Advance() {
-			t.Error("Advance() failed: %v", ws.Err())
+		if !rStream.Advance() {
+			t.Error("Advance() failed: %v", rStream.Err())
 		}
-		cb := ws.Value()
+		cb := rStream.Value()
 		changes := cb.Changes
 		change := changes[0]
 		if change.Continued {
@@ -548,10 +552,10 @@
 
 	// Check that watch detects the changes in the second transaction.
 	{
-		if !ws.Advance() {
-			t.Error("Advance() failed: %v", ws.Err())
+		if !rStream.Advance() {
+			t.Error("Advance() failed: %v", rStream.Err())
 		}
-		cb := ws.Value()
+		cb := rStream.Value()
 		changes := cb.Changes
 		change := changes[0]
 		if !change.Continued {
@@ -578,10 +582,10 @@
 
 	// Check that watch detects the changes in the third transaction.
 	{
-		if !ws.Advance() {
-			t.Error("Advance() failed: %v", ws.Err())
+		if !rStream.Advance() {
+			t.Error("Advance() failed: %v", rStream.Err())
 		}
-		cb := ws.Value()
+		cb := rStream.Value()
 		changes := cb.Changes
 		change := changes[0]
 		if change.Continued {
@@ -592,10 +596,10 @@
 
 	// Check that watch detects the garbage collection of /a.
 	{
-		if !ws.Advance() {
-			t.Error("Advance() failed: %v", ws.Err())
+		if !rStream.Advance() {
+			t.Error("Advance() failed: %v", rStream.Err())
 		}
-		cb := ws.Value()
+		cb := rStream.Value()
 		changes := cb.Changes
 		change := changes[0]
 		if change.Continued {
diff --git a/tools/binary/impl/impl_test.go b/tools/binary/impl/impl_test.go
index c8ef111..f31ceb8 100644
--- a/tools/binary/impl/impl_test.go
+++ b/tools/binary/impl/impl_test.go
@@ -42,8 +42,9 @@
 
 func (s *server) Download(_ ipc.ServerContext, _ int32, stream repository.BinaryServiceDownloadStream) error {
 	vlog.Infof("Download() was called. suffix=%v", s.suffix)
-	stream.Send([]byte("Hello"))
-	stream.Send([]byte("World"))
+	sender := stream.SendStream()
+	sender.Send([]byte("Hello"))
+	sender.Send([]byte("World"))
 	return nil
 }
 
@@ -63,7 +64,8 @@
 
 func (s *server) Upload(_ ipc.ServerContext, _ int32, stream repository.BinaryServiceUploadStream) error {
 	vlog.Infof("Upload() was called. suffix=%v", s.suffix)
-	for stream.Advance() {
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
 	}
 	return nil
 }
diff --git a/tools/mounttable/impl/impl.go b/tools/mounttable/impl/impl.go
index 6c88524..09282fc 100644
--- a/tools/mounttable/impl/impl.go
+++ b/tools/mounttable/impl/impl.go
@@ -48,8 +48,9 @@
 	if err != nil {
 		return err
 	}
-	for stream.Advance() {
-		buf := stream.Value()
+	rStream := stream.RecvStream()
+	for rStream.Advance() {
+		buf := rStream.Value()
 
 		fmt.Fprint(cmd.Stdout(), buf.Name)
 		for _, s := range buf.Servers {
@@ -58,7 +59,7 @@
 		fmt.Fprintln(cmd.Stdout())
 	}
 
-	if err := stream.Err(); err != nil {
+	if err := rStream.Err(); err != nil {
 		return fmt.Errorf("advance error: %v", err)
 	}
 	err = stream.Finish()
diff --git a/tools/mounttable/impl/impl_test.go b/tools/mounttable/impl/impl_test.go
index ef40175..33c9b76 100644
--- a/tools/mounttable/impl/impl_test.go
+++ b/tools/mounttable/impl/impl_test.go
@@ -22,8 +22,9 @@
 
 func (s *server) Glob(_ ipc.ServerContext, pattern string, stream mounttable.GlobbableServiceGlobStream) error {
 	vlog.VI(2).Infof("Glob() was called. suffix=%v pattern=%q", s.suffix, pattern)
-	stream.Send(mounttable.MountEntry{"name1", []mounttable.MountedServer{{"server1", 123}}})
-	stream.Send(mounttable.MountEntry{"name2", []mounttable.MountedServer{{"server2", 456}, {"server3", 789}}})
+	sender := stream.SendStream()
+	sender.Send(mounttable.MountEntry{"name1", []mounttable.MountedServer{{"server1", 123}}})
+	sender.Send(mounttable.MountEntry{"name2", []mounttable.MountedServer{{"server2", 456}, {"server3", 789}}})
 	return nil
 }
 
diff --git a/tools/vrpc/impl/impl_test.go b/tools/vrpc/impl/impl_test.go
index 85c9be6..ac5c4c0 100644
--- a/tools/vrpc/impl/impl_test.go
+++ b/tools/vrpc/impl/impl_test.go
@@ -118,8 +118,9 @@
 
 func (*server) StreamingOutput(call ipc.ServerContext, nStream int32, item bool, reply test_base.TypeTesterServiceStreamingOutputStream) error {
 	vlog.VI(2).Info("StreamingOutput(%v,%v) was called.", nStream, item)
+	sender := reply.SendStream()
 	for i := int32(0); i < nStream; i++ {
-		reply.Send(item)
+		sender.Send(item)
 	}
 	return nil
 }
diff --git a/tools/vrpc/test_base/test_base.vdl.go b/tools/vrpc/test_base/test_base.vdl.go
index 937c9a7..f9bebf2 100644
--- a/tools/vrpc/test_base/test_base.vdl.go
+++ b/tools/vrpc/test_base/test_base.vdl.go
@@ -50,7 +50,7 @@
 	NoArguments(ctx _gen_context.T, opts ..._gen_ipc.CallOpt) (err error)
 	MultipleArguments(ctx _gen_context.T, I1 int32, I2 int32, opts ..._gen_ipc.CallOpt) (O1 int32, O2 int32, err error)
 	// Methods to test support for streaming.
-	StreamingOutput(ctx _gen_context.T, NumStreamItems int32, StreamItem bool, opts ..._gen_ipc.CallOpt) (reply TypeTesterStreamingOutputStream, err error)
+	StreamingOutput(ctx _gen_context.T, NumStreamItems int32, StreamItem bool, opts ..._gen_ipc.CallOpt) (reply TypeTesterStreamingOutputCall, err error)
 }
 type TypeTester interface {
 	_gen_ipc.UniversalServiceMethods
@@ -86,27 +86,29 @@
 	StreamingOutput(context _gen_ipc.ServerContext, NumStreamItems int32, StreamItem bool, stream TypeTesterServiceStreamingOutputStream) (err error)
 }
 
-// TypeTesterStreamingOutputStream is the interface for streaming responses of the method
+// TypeTesterStreamingOutputCall is the interface for call object of the method
 // StreamingOutput in the service interface TypeTester.
-type TypeTesterStreamingOutputStream interface {
+type TypeTesterStreamingOutputCall interface {
+	// RecvStream returns the recv portion of the stream
+	RecvStream() interface {
+		// Advance stages an element so the client can retrieve it
+		// with Value.  Advance returns true iff there is an
+		// element to retrieve.  The client must call Advance before
+		// calling Value.  The client must call Cancel if it does
+		// not iterate through all elements (i.e. until Advance
+		// returns false).  Advance may block if an element is not
+		// immediately available.
+		Advance() bool
 
-	// Advance stages an element so the client can retrieve it
-	// with Value.  Advance returns true iff there is an
-	// element to retrieve.  The client must call Advance before
-	// calling Value.  The client must call Cancel if it does
-	// not iterate through all elements (i.e. until Advance
-	// returns false).  Advance may block if an element is not
-	// immediately available.
-	Advance() bool
+		// Value returns the element that was staged by Advance.
+		// Value may panic if Advance returned false or was not
+		// called at all.  Value does not block.
+		Value() bool
 
-	// Value returns the element that was staged by Advance.
-	// Value may panic if Advance returned false or was not
-	// called at all.  Value does not block.
-	Value() bool
-
-	// Err returns a non-nil error iff the stream encountered
-	// any errors.  Err does not block.
-	Err() error
+		// Err returns a non-nil error iff the stream encountered
+		// any errors.  Err does not block.
+		Err() error
+	}
 
 	// Finish blocks until the server is done and returns the positional
 	// return values for call.
@@ -117,7 +119,7 @@
 	// call.
 	//
 	// Calling Finish is mandatory for releasing stream resources, unless Cancel
-	// has been called or any of the other methods return a non-EOF error.
+	// has been called or any of the other methods return an error.
 	// Finish should be called at most once.
 	Finish() (err error)
 
@@ -127,55 +129,83 @@
 	Cancel()
 }
 
-// Implementation of the TypeTesterStreamingOutputStream interface that is not exported.
-type implTypeTesterStreamingOutputStream struct {
+type implTypeTesterStreamingOutputStreamIterator struct {
 	clientCall _gen_ipc.Call
 	val        bool
 	err        error
 }
 
-func (c *implTypeTesterStreamingOutputStream) Advance() bool {
+func (c *implTypeTesterStreamingOutputStreamIterator) Advance() bool {
 	c.err = c.clientCall.Recv(&c.val)
 	return c.err == nil
 }
 
-func (c *implTypeTesterStreamingOutputStream) Value() bool {
+func (c *implTypeTesterStreamingOutputStreamIterator) Value() bool {
 	return c.val
 }
 
-func (c *implTypeTesterStreamingOutputStream) Err() error {
+func (c *implTypeTesterStreamingOutputStreamIterator) Err() error {
 	if c.err == _gen_io.EOF {
 		return nil
 	}
 	return c.err
 }
 
-func (c *implTypeTesterStreamingOutputStream) Finish() (err error) {
+// Implementation of the TypeTesterStreamingOutputCall interface that is not exported.
+type implTypeTesterStreamingOutputCall struct {
+	clientCall _gen_ipc.Call
+	readStream implTypeTesterStreamingOutputStreamIterator
+}
+
+func (c *implTypeTesterStreamingOutputCall) RecvStream() interface {
+	Advance() bool
+	Value() bool
+	Err() error
+} {
+	return &c.readStream
+}
+
+func (c *implTypeTesterStreamingOutputCall) Finish() (err error) {
 	if ierr := c.clientCall.Finish(&err); ierr != nil {
 		err = ierr
 	}
 	return
 }
 
-func (c *implTypeTesterStreamingOutputStream) Cancel() {
+func (c *implTypeTesterStreamingOutputCall) Cancel() {
 	c.clientCall.Cancel()
 }
 
+type implTypeTesterServiceStreamingOutputStreamSender struct {
+	serverCall _gen_ipc.ServerCall
+}
+
+func (s *implTypeTesterServiceStreamingOutputStreamSender) Send(item bool) error {
+	return s.serverCall.Send(item)
+}
+
 // TypeTesterServiceStreamingOutputStream is the interface for streaming responses of the method
 // StreamingOutput in the service interface TypeTester.
 type TypeTesterServiceStreamingOutputStream interface {
-	// Send places the item onto the output stream, blocking if there is no buffer
-	// space available.  If the client has canceled, an error is returned.
-	Send(item bool) error
+	// SendStream returns the send portion of the stream.
+	SendStream() interface {
+		// Send places the item onto the output stream, blocking if there is no buffer
+		// space available.  If the client has canceled, an error is returned.
+		Send(item bool) error
+	}
 }
 
 // Implementation of the TypeTesterServiceStreamingOutputStream interface that is not exported.
 type implTypeTesterServiceStreamingOutputStream struct {
-	serverCall _gen_ipc.ServerCall
+	writer implTypeTesterServiceStreamingOutputStreamSender
 }
 
-func (s *implTypeTesterServiceStreamingOutputStream) Send(item bool) error {
-	return s.serverCall.Send(item)
+func (s *implTypeTesterServiceStreamingOutputStream) SendStream() interface {
+	// Send places the item onto the output stream, blocking if there is no buffer
+	// space available.  If the client has canceled, an error is returned.
+	Send(item bool) error
+} {
+	return &s.writer
 }
 
 // BindTypeTester returns the client stub implementing the TypeTester
@@ -428,12 +458,12 @@
 	return
 }
 
-func (__gen_c *clientStubTypeTester) StreamingOutput(ctx _gen_context.T, NumStreamItems int32, StreamItem bool, opts ..._gen_ipc.CallOpt) (reply TypeTesterStreamingOutputStream, err error) {
+func (__gen_c *clientStubTypeTester) StreamingOutput(ctx _gen_context.T, NumStreamItems int32, StreamItem bool, opts ..._gen_ipc.CallOpt) (reply TypeTesterStreamingOutputCall, err error) {
 	var call _gen_ipc.Call
 	if call, err = __gen_c.client.StartCall(ctx, __gen_c.name, "StreamingOutput", []interface{}{NumStreamItems, StreamItem}, opts...); err != nil {
 		return
 	}
-	reply = &implTypeTesterStreamingOutputStream{clientCall: call}
+	reply = &implTypeTesterStreamingOutputCall{clientCall: call, readStream: implTypeTesterStreamingOutputStreamIterator{clientCall: call}}
 	return
 }
 
@@ -825,7 +855,7 @@
 }
 
 func (__gen_s *ServerStubTypeTester) StreamingOutput(call _gen_ipc.ServerCall, NumStreamItems int32, StreamItem bool) (err error) {
-	stream := &implTypeTesterServiceStreamingOutputStream{serverCall: call}
+	stream := &implTypeTesterServiceStreamingOutputStream{writer: implTypeTesterServiceStreamingOutputStreamSender{serverCall: call}}
 	err = __gen_s.service.StreamingOutput(call, NumStreamItems, StreamItem, stream)
 	return
 }