veyron/...: Changed the generated ipc stream to match our one true
stream api.
Change-Id: Ice4911926bbaba7b3a11c68579a9f8b34c5db460
diff --git a/examples/bank/bank.vdl.go b/examples/bank/bank.vdl.go
index 6156492..e8e7910 100644
--- a/examples/bank/bank.vdl.go
+++ b/examples/bank/bank.vdl.go
@@ -22,6 +22,10 @@
_gen_wiretype "veyron2/wiretype"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Bank allows clients to store virtual money. Certain implementations can use persistent storage.
// Uses the client's Veyron Identity to determine account access.
// Bank is the interface the client binds and uses.
diff --git a/examples/boxes/android/src/boxesp2p/main.go b/examples/boxes/android/src/boxesp2p/main.go
index a4748e3..67e7475 100644
--- a/examples/boxes/android/src/boxesp2p/main.go
+++ b/examples/boxes/android/src/boxesp2p/main.go
@@ -59,6 +59,7 @@
import (
"bytes"
"fmt"
+ "io"
"net"
"os"
"runtime"
@@ -174,11 +175,8 @@
func (gs *goState) streamBoxesLoop() {
// Loop to receive boxes from remote peer
go func() {
- for {
- box, err := gs.drawStream.Recv()
- if err != nil {
- return
- }
+ for gs.drawStream.Advance() {
+ box := gs.drawStream.Value()
nativeJava.addBox(&box)
}
}()
@@ -206,11 +204,8 @@
if err != nil {
panic(fmt.Errorf("Can't watch store: %s: %s", gs.storeEndpoint, err))
}
- for {
- cb, err := stream.Recv()
- if err != nil {
- panic(fmt.Errorf("Can't receive watch event: %s: %s", gs.storeEndpoint, err))
- }
+ for stream.Advance() {
+ cb := stream.Value()
for _, change := range cb.Changes {
if entry, ok := change.Value.(*storage.Entry); ok {
if box, ok := entry.Value.(boxes.Box); ok && box.DeviceId != gs.myIPAddr {
@@ -219,6 +214,13 @@
}
}
}
+
+ err = stream.Err()
+ if err == nil {
+ err = io.EOF
+ }
+ panic(fmt.Errorf("Can't receive watch event: %s: %s", gs.storeEndpoint, err))
+
}()
// Send any box updates to the store
diff --git a/examples/boxes/boxes.vdl.go b/examples/boxes/boxes.vdl.go
index 40da80a..c57d537 100644
--- a/examples/boxes/boxes.vdl.go
+++ b/examples/boxes/boxes.vdl.go
@@ -7,6 +7,7 @@
import (
// The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_io "io"
_gen_veyron2 "veyron2"
_gen_context "veyron2/context"
_gen_ipc "veyron2/ipc"
@@ -27,6 +28,10 @@
Points [4]float32
}
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// BoxSignalling allows peers to rendezvous with each other
// BoxSignalling is the interface the client binds and uses.
// BoxSignalling_ExcludingUniversal is the interface without internal framework-added methods
@@ -252,31 +257,64 @@
// Draw in the service interface DrawInterface.
type DrawInterfaceDrawStream interface {
- // Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // Send places the item onto the output stream, blocking if there is no
+ // buffer space available. Calls to Send after having called CloseSend
+ // or Cancel will fail. Any blocked Send calls will be unblocked upon
+ // calling Cancel.
Send(item Box) error
- // CloseSend indicates to the server that no more items will be sent; server
- // Recv calls will receive io.EOF after all sent items. Subsequent calls to
- // Send on the client will fail. This is an optional call - it's used by
- // streaming clients that need the server to receive the io.EOF terminator.
+ // CloseSend indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items. This is
+ // an optional call - it's used by streaming clients that need the
+ // server to receive the io.EOF terminator before the client calls
+ // Finish (for example, if the client needs to continue receiving items
+ // from the server after having finished sending).
+ // Calls to CloseSend after having called Cancel will fail.
+ // Like Send, CloseSend blocks when there's no buffer space available.
CloseSend() error
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item Box, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() Box
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish performs the equivalent of CloseSend, then blocks until the server
+ // is done, and returns the positional return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the DrawInterfaceDrawStream interface that is not exported.
type implDrawInterfaceDrawStream struct {
clientCall _gen_ipc.Call
+ val Box
+ err error
}
func (c *implDrawInterfaceDrawStream) Send(item Box) error {
@@ -287,9 +325,21 @@
return c.clientCall.CloseSend()
}
-func (c *implDrawInterfaceDrawStream) Recv() (item Box, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implDrawInterfaceDrawStream) Advance() bool {
+ c.val = Box{}
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implDrawInterfaceDrawStream) Value() Box {
+ return c.val
+}
+
+func (c *implDrawInterfaceDrawStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implDrawInterfaceDrawStream) Finish() (err error) {
@@ -307,26 +357,59 @@
// Draw in the service interface DrawInterface.
type DrawInterfaceServiceDrawStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item Box) error
- // Recv fills itemptr with the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item Box, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
+
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ //
+ // In general, Value is undefined if the underlying collection
+ // of elements changes while iteration is in progress. If
+ // <DataProvider> supports concurrent modification, it should
+ // document its behavior.
+ Value() Box
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
}
// Implementation of the DrawInterfaceServiceDrawStream interface that is not exported.
type implDrawInterfaceServiceDrawStream struct {
serverCall _gen_ipc.ServerCall
+ val Box
+ err error
}
func (s *implDrawInterfaceServiceDrawStream) Send(item Box) error {
return s.serverCall.Send(item)
}
-func (s *implDrawInterfaceServiceDrawStream) Recv() (item Box, err error) {
- err = s.serverCall.Recv(&item)
- return
+func (s *implDrawInterfaceServiceDrawStream) Advance() bool {
+ s.val = Box{}
+ s.err = s.serverCall.Recv(&s.val)
+ return s.err == nil
+}
+
+func (s *implDrawInterfaceServiceDrawStream) Value() Box {
+ return s.val
+}
+
+func (s *implDrawInterfaceServiceDrawStream) Err() error {
+ if s.err == _gen_io.EOF {
+ return nil
+ }
+ return s.err
}
// BindDrawInterface returns the client stub implementing the DrawInterface
diff --git a/examples/fortune/fortune.vdl.go b/examples/fortune/fortune.vdl.go
index 4784028..7070407 100644
--- a/examples/fortune/fortune.vdl.go
+++ b/examples/fortune/fortune.vdl.go
@@ -16,6 +16,10 @@
_gen_wiretype "veyron2/wiretype"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Fortune allows clients to Get and Add fortune strings.
// Fortune is the interface the client binds and uses.
// Fortune_ExcludingUniversal is the interface without internal framework-added methods
diff --git a/examples/inspector/inspector.vdl.go b/examples/inspector/inspector.vdl.go
index 436f056..c6b0e7c 100644
--- a/examples/inspector/inspector.vdl.go
+++ b/examples/inspector/inspector.vdl.go
@@ -5,6 +5,7 @@
import (
// The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_io "io"
_gen_veyron2 "veyron2"
_gen_context "veyron2/context"
_gen_ipc "veyron2/ipc"
@@ -26,6 +27,10 @@
IsDir bool
}
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Inspector is the interface the client binds and uses.
// Inspector_ExcludingUniversal is the interface without internal framework-added methods
// to enable embedding without method collisions. Not to be used directly by clients.
@@ -48,26 +53,64 @@
// Ls in the service interface Inspector.
type InspectorLsStream interface {
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item string, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() string
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish blocks until the server is done and returns the positional
+ // return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the InspectorLsStream interface that is not exported.
type implInspectorLsStream struct {
clientCall _gen_ipc.Call
+ val string
+ err error
}
-func (c *implInspectorLsStream) Recv() (item string, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implInspectorLsStream) Advance() bool {
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implInspectorLsStream) Value() string {
+ return c.val
+}
+
+func (c *implInspectorLsStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implInspectorLsStream) Finish() (err error) {
@@ -85,7 +128,7 @@
// Ls in the service interface Inspector.
type InspectorServiceLsStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item string) error
}
@@ -102,26 +145,65 @@
// LsDetails in the service interface Inspector.
type InspectorLsDetailsStream interface {
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item Details, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() Details
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish blocks until the server is done and returns the positional
+ // return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the InspectorLsDetailsStream interface that is not exported.
type implInspectorLsDetailsStream struct {
clientCall _gen_ipc.Call
+ val Details
+ err error
}
-func (c *implInspectorLsDetailsStream) Recv() (item Details, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implInspectorLsDetailsStream) Advance() bool {
+ c.val = Details{}
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implInspectorLsDetailsStream) Value() Details {
+ return c.val
+}
+
+func (c *implInspectorLsDetailsStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implInspectorLsDetailsStream) Finish() (err error) {
@@ -139,7 +221,7 @@
// LsDetails in the service interface Inspector.
type InspectorServiceLsDetailsStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item Details) error
}
diff --git a/examples/inspector/inspector/main.go b/examples/inspector/inspector/main.go
index cc8bc1e..e542c8d 100644
--- a/examples/inspector/inspector/main.go
+++ b/examples/inspector/inspector/main.go
@@ -128,15 +128,13 @@
// streamNames and streamDetails are idiomatic for use with stubs
func streamNames(stream inspector.InspectorLsStream) {
- for {
- if name, err := stream.Recv(); err != nil {
- if err == io.EOF {
- break
- }
- vlog.Fatalf("unexpected streaming error: %q", err)
- } else {
- fmt.Printf("%s\n", name)
- }
+ for stream.Advance() {
+ name := stream.Value()
+ fmt.Printf("%s\n", name)
+ }
+
+ if err := stream.Err(); err != nil {
+ vlog.Fatalf("unexpected streaming error: %q", err)
}
if err := stream.Finish(); err != nil && err != io.EOF {
vlog.Fatalf("%q", err)
@@ -144,18 +142,17 @@
}
func streamDetails(stream inspector.InspectorLsDetailsStream) {
- for {
- if details, err := stream.Recv(); err != nil {
- if err == io.EOF {
- break
- }
- vlog.Fatalf("unexpected streaming error: %q", err)
- } else {
- mode := os.FileMode(details.Mode)
- modtime := time.Unix(details.ModUnixSecs, int64(details.ModNano))
- fmt.Printf("%s: %d %s %s%s\n", details.Name, details.Size, mode, modtime, map[bool]string{false: "", true: "/"}[details.IsDir])
- }
+ for stream.Advance() {
+ details := stream.Value()
+ mode := os.FileMode(details.Mode)
+ modtime := time.Unix(details.ModUnixSecs, int64(details.ModNano))
+ fmt.Printf("%s: %d %s %s%s\n", details.Name, details.Size, mode, modtime, map[bool]string{false: "", true: "/"}[details.IsDir])
}
+
+ if err := stream.Err(); err != nil {
+ vlog.Fatalf("unexpected streaming error: %q", err)
+ }
+
if err := stream.Finish(); err != nil && err != io.EOF {
vlog.Fatalf("%q", err)
}
diff --git a/examples/pipetobrowser/p2b.vdl.go b/examples/pipetobrowser/p2b.vdl.go
index bfa0f47..6b5f80f 100644
--- a/examples/pipetobrowser/p2b.vdl.go
+++ b/examples/pipetobrowser/p2b.vdl.go
@@ -5,6 +5,7 @@
import (
// The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_io "io"
_gen_veyron2 "veyron2"
_gen_context "veyron2/context"
_gen_ipc "veyron2/ipc"
@@ -14,6 +15,10 @@
_gen_wiretype "veyron2/wiretype"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Viewer allows clients to stream data to it and to request a particular viewer to format and display the data.
// Viewer is the interface the client binds and uses.
// Viewer_ExcludingUniversal is the interface without internal framework-added methods
@@ -38,21 +43,38 @@
// Pipe in the service interface Viewer.
type ViewerPipeStream interface {
- // Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // Send places the item onto the output stream, blocking if there is no
+ // buffer space available. Calls to Send after having called CloseSend
+ // or Cancel will fail. Any blocked Send calls will be unblocked upon
+ // calling Cancel.
Send(item []byte) error
- // CloseSend indicates to the server that no more items will be sent; server
- // Recv calls will receive io.EOF after all sent items. Subsequent calls to
- // Send on the client will fail. This is an optional call - it's used by
- // streaming clients that need the server to receive the io.EOF terminator.
+ // CloseSend indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items. This is
+ // an optional call - it's used by streaming clients that need the
+ // server to receive the io.EOF terminator before the client calls
+ // Finish (for example, if the client needs to continue receiving items
+ // from the server after having finished sending).
+ // Calls to CloseSend after having called Cancel will fail.
+ // Like Send, CloseSend blocks when there's no buffer space available.
CloseSend() error
- // Finish closes the stream and returns the positional return values for
+ // Finish performs the equivalent of CloseSend, then blocks until the server
+ // is done, and returns the positional return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (reply _gen_vdlutil.Any, err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
@@ -84,19 +106,51 @@
// Pipe in the service interface Viewer.
type ViewerServicePipeStream interface {
- // Recv fills itemptr with the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item []byte, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
+
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ //
+ // In general, Value is undefined if the underlying collection
+ // of elements changes while iteration is in progress. If
+ // <DataProvider> supports concurrent modification, it should
+ // document its behavior.
+ Value() []byte
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
}
// Implementation of the ViewerServicePipeStream interface that is not exported.
type implViewerServicePipeStream struct {
serverCall _gen_ipc.ServerCall
+ val []byte
+ err error
}
-func (s *implViewerServicePipeStream) Recv() (item []byte, err error) {
- err = s.serverCall.Recv(&item)
- return
+func (s *implViewerServicePipeStream) Advance() bool {
+ s.err = s.serverCall.Recv(&s.val)
+ return s.err == nil
+}
+
+func (s *implViewerServicePipeStream) Value() []byte {
+ return s.val
+}
+
+func (s *implViewerServicePipeStream) Err() error {
+ if s.err == _gen_io.EOF {
+ return nil
+ }
+ return s.err
}
// BindViewer returns the client stub implementing the Viewer
diff --git a/examples/rockpaperscissors/impl/judge.go b/examples/rockpaperscissors/impl/judge.go
index 9c607b3..2729b66 100644
--- a/examples/rockpaperscissors/impl/judge.go
+++ b/examples/rockpaperscissors/impl/judge.go
@@ -114,21 +114,19 @@
done := make(chan struct{}, 1)
defer func() { done <- struct{}{} }()
go func() {
- for {
- action, err := stream.Recv()
- if err != nil {
- select {
- case c <- playerInput{player: playerNum, action: rps.PlayerAction{Quit: true}}:
- case <-done:
- }
- return
- }
+ for stream.Advance() {
+ action := stream.Value()
+
select {
case c <- playerInput{player: playerNum, action: action}:
case <-done:
return
}
}
+ select {
+ case c <- playerInput{player: playerNum, action: rps.PlayerAction{Quit: true}}:
+ case <-done:
+ }
}()
if err := stream.Send(rps.JudgeAction{PlayerNum: int32(playerNum)}); err != nil {
diff --git a/examples/rockpaperscissors/impl/player.go b/examples/rockpaperscissors/impl/player.go
index f1b84a1..258ebf4 100644
--- a/examples/rockpaperscissors/impl/player.go
+++ b/examples/rockpaperscissors/impl/player.go
@@ -1,7 +1,6 @@
package impl
import (
- "io"
"math/rand"
"time"
@@ -117,16 +116,9 @@
if err != nil {
return rps.PlayResult{}, err
}
- for {
- in, err := game.Recv()
- if err == io.EOF {
- vlog.VI(1).Infof("Game Ended")
- break
- }
- if err != nil {
- vlog.Infof("recv error: %v", err)
- break
- }
+ for game.Advance() {
+ in := game.Value()
+
if in.PlayerNum > 0 {
vlog.VI(1).Infof("I'm player %d", in.PlayerNum)
}
@@ -148,6 +140,12 @@
vlog.VI(1).Infof("Score card: %s", common.FormatScoreCard(in.Score))
}
}
+
+ if err := game.Err(); err != nil {
+ vlog.Infof("stream error: %v", err)
+ } else {
+ vlog.VI(1).Infof("Game Ended")
+ }
result, err := game.Finish()
p.gamesPlayed.Add(1)
if err == nil && result.YouWon {
diff --git a/examples/rockpaperscissors/rpsplayercli/main.go b/examples/rockpaperscissors/rpsplayercli/main.go
index 074907e..b02ce5a 100644
--- a/examples/rockpaperscissors/rpsplayercli/main.go
+++ b/examples/rockpaperscissors/rpsplayercli/main.go
@@ -6,7 +6,6 @@
"errors"
"flag"
"fmt"
- "io"
"os"
"sort"
"strings"
@@ -201,16 +200,8 @@
return rps.PlayResult{}, err
}
var playerNum int32
- for {
- in, err := game.Recv()
- if err == io.EOF {
- fmt.Println("Game Ended")
- break
- }
- if err != nil {
- vlog.Infof("recv error: %v", err)
- break
- }
+ for game.Advance() {
+ in := game.Value()
if in.PlayerNum > 0 {
playerNum = in.PlayerNum
fmt.Printf("You are player %d\n", in.PlayerNum)
@@ -255,6 +246,12 @@
}
}
}
+ if err := game.Err(); err == nil {
+ fmt.Println("Game Ended")
+ } else {
+ vlog.Infof("stream error: %v", err)
+ }
+
return game.Finish()
}
diff --git a/examples/rockpaperscissors/service.vdl.go b/examples/rockpaperscissors/service.vdl.go
index 268ee07..94abb03 100644
--- a/examples/rockpaperscissors/service.vdl.go
+++ b/examples/rockpaperscissors/service.vdl.go
@@ -5,6 +5,7 @@
import (
// The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_io "io"
_gen_veyron2 "veyron2"
_gen_context "veyron2/context"
_gen_ipc "veyron2/ipc"
@@ -80,6 +81,10 @@
Player2 = WinnerTag(2)
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Judge is the interface the client binds and uses.
// Judge_ExcludingUniversal is the interface without internal framework-added methods
// to enable embedding without method collisions. Not to be used directly by clients.
@@ -109,31 +114,64 @@
// Play in the service interface Judge.
type JudgePlayStream interface {
- // Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // Send places the item onto the output stream, blocking if there is no
+ // buffer space available. Calls to Send after having called CloseSend
+ // or Cancel will fail. Any blocked Send calls will be unblocked upon
+ // calling Cancel.
Send(item PlayerAction) error
- // CloseSend indicates to the server that no more items will be sent; server
- // Recv calls will receive io.EOF after all sent items. Subsequent calls to
- // Send on the client will fail. This is an optional call - it's used by
- // streaming clients that need the server to receive the io.EOF terminator.
+ // CloseSend indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items. This is
+ // an optional call - it's used by streaming clients that need the
+ // server to receive the io.EOF terminator before the client calls
+ // Finish (for example, if the client needs to continue receiving items
+ // from the server after having finished sending).
+ // Calls to CloseSend after having called Cancel will fail.
+ // Like Send, CloseSend blocks when there's no buffer space available.
CloseSend() error
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item JudgeAction, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() JudgeAction
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish performs the equivalent of CloseSend, then blocks until the server
+ // is done, and returns the positional return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (reply PlayResult, err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the JudgePlayStream interface that is not exported.
type implJudgePlayStream struct {
clientCall _gen_ipc.Call
+ val JudgeAction
+ err error
}
func (c *implJudgePlayStream) Send(item PlayerAction) error {
@@ -144,9 +182,21 @@
return c.clientCall.CloseSend()
}
-func (c *implJudgePlayStream) Recv() (item JudgeAction, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implJudgePlayStream) Advance() bool {
+ c.val = JudgeAction{}
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implJudgePlayStream) Value() JudgeAction {
+ return c.val
+}
+
+func (c *implJudgePlayStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implJudgePlayStream) Finish() (reply PlayResult, err error) {
@@ -164,26 +214,59 @@
// Play in the service interface Judge.
type JudgeServicePlayStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item JudgeAction) error
- // Recv fills itemptr with the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item PlayerAction, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
+
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ //
+ // In general, Value is undefined if the underlying collection
+ // of elements changes while iteration is in progress. If
+ // <DataProvider> supports concurrent modification, it should
+ // document its behavior.
+ Value() PlayerAction
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
}
// Implementation of the JudgeServicePlayStream interface that is not exported.
type implJudgeServicePlayStream struct {
serverCall _gen_ipc.ServerCall
+ val PlayerAction
+ err error
}
func (s *implJudgeServicePlayStream) Send(item JudgeAction) error {
return s.serverCall.Send(item)
}
-func (s *implJudgeServicePlayStream) Recv() (item PlayerAction, err error) {
- err = s.serverCall.Recv(&item)
- return
+func (s *implJudgeServicePlayStream) Advance() bool {
+ s.val = PlayerAction{}
+ s.err = s.serverCall.Recv(&s.val)
+ return s.err == nil
+}
+
+func (s *implJudgeServicePlayStream) Value() PlayerAction {
+ return s.val
+}
+
+func (s *implJudgeServicePlayStream) Err() error {
+ if s.err == _gen_io.EOF {
+ return nil
+ }
+ return s.err
}
// BindJudge returns the client stub implementing the Judge
diff --git a/examples/stfortune/stfortune/main.go b/examples/stfortune/stfortune/main.go
index e65a1b0..9bcd5db 100644
--- a/examples/stfortune/stfortune/main.go
+++ b/examples/stfortune/stfortune/main.go
@@ -7,6 +7,7 @@
"encoding/hex"
"flag"
"fmt"
+ "io"
"log"
"math/rand"
"os"
@@ -60,8 +61,8 @@
if err != nil {
log.Fatalf("WatchGlob %s failed: %v", path, err)
}
- if _, err := stream.Recv(); err != nil {
- log.Fatalf("Recv failed: %v", err)
+ if !stream.Advance() {
+ log.Fatalf("Advance failed: %v", stream.Err())
}
stream.Cancel()
}
@@ -92,11 +93,8 @@
log.Fatalf("watcher WatchGlob %s failed: %v", path, err)
}
- for {
- batch, err := stream.Recv()
- if err != nil {
- log.Fatalf("watcher Recv failed: %v", err)
- }
+ for stream.Advance() {
+ batch := stream.Value()
for _, change := range batch.Changes {
entry, ok := change.Value.(*storage.Entry)
@@ -114,6 +112,11 @@
fmt.Printf("watcher: new fortune: %s\n", fortune.Fortune)
}
}
+ err = stream.Err()
+ if err == nil {
+ err = io.EOF
+ }
+ log.Fatalf("watcher Advance failed: %v", err)
}
// pickFortune finds all available fortunes under the input path and
@@ -127,11 +130,8 @@
return "", err
}
var names []string
- for {
- name, err := results.Recv()
- if err != nil {
- break
- }
+ for results.Advance() {
+ name := results.Value()
names = append(names, name)
}
results.Finish()
diff --git a/examples/storage/viewer/value.go b/examples/storage/viewer/value.go
index fc1d450..d3cb79d 100644
--- a/examples/storage/viewer/value.go
+++ b/examples/storage/viewer/value.go
@@ -27,11 +27,8 @@
}
defer results.Finish()
names := []string{}
- for {
- name, err := results.Recv()
- if err != nil {
- break
- }
+ for results.Advance() {
+ name := results.Value()
names = append(names, "/"+name)
}
sort.Strings(names)
diff --git a/examples/tunnel/lib/forward.go b/examples/tunnel/lib/forward.go
index 5a7c21f..20343a3 100644
--- a/examples/tunnel/lib/forward.go
+++ b/examples/tunnel/lib/forward.go
@@ -10,7 +10,11 @@
Send([]uint8) error
}
type receiver interface {
- Recv() ([]uint8, error)
+ Advance() bool
+
+ Value() []uint8
+
+ Err() error
}
// stream is the interface common to TunnelForwardStream and TunnelServiceForwardStream.
@@ -53,19 +57,13 @@
}
func stream2conn(r receiver, w io.Writer, done chan error) {
- for {
- buf, err := r.Recv()
- if err == io.EOF {
- done <- nil
- return
- }
- if err != nil {
- done <- err
- return
- }
+ for r.Advance() {
+ buf := r.Value()
+
if n, err := w.Write(buf); n != len(buf) || err != nil {
done <- fmt.Errorf("conn.Write returned (%d, %v) want (%d, nil)", n, err, len(buf))
return
}
}
+ done <- r.Err()
}
diff --git a/examples/tunnel/tunnel.vdl.go b/examples/tunnel/tunnel.vdl.go
index 5bc70d2..e7f5146 100644
--- a/examples/tunnel/tunnel.vdl.go
+++ b/examples/tunnel/tunnel.vdl.go
@@ -7,6 +7,7 @@
"veyron2/security"
// The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_io "io"
_gen_veyron2 "veyron2"
_gen_context "veyron2/context"
_gen_ipc "veyron2/ipc"
@@ -38,6 +39,10 @@
Stderr []byte
}
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Tunnel is the interface the client binds and uses.
// Tunnel_ExcludingUniversal is the interface without internal framework-added methods
// to enable embedding without method collisions. Not to be used directly by clients.
@@ -79,31 +84,64 @@
// Forward in the service interface Tunnel.
type TunnelForwardStream interface {
- // Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // Send places the item onto the output stream, blocking if there is no
+ // buffer space available. Calls to Send after having called CloseSend
+ // or Cancel will fail. Any blocked Send calls will be unblocked upon
+ // calling Cancel.
Send(item []byte) error
- // CloseSend indicates to the server that no more items will be sent; server
- // Recv calls will receive io.EOF after all sent items. Subsequent calls to
- // Send on the client will fail. This is an optional call - it's used by
- // streaming clients that need the server to receive the io.EOF terminator.
+ // CloseSend indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items. This is
+ // an optional call - it's used by streaming clients that need the
+ // server to receive the io.EOF terminator before the client calls
+ // Finish (for example, if the client needs to continue receiving items
+ // from the server after having finished sending).
+ // Calls to CloseSend after having called Cancel will fail.
+ // Like Send, CloseSend blocks when there's no buffer space available.
CloseSend() error
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item []byte, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() []byte
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish performs the equivalent of CloseSend, then blocks until the server
+ // is done, and returns the positional return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the TunnelForwardStream interface that is not exported.
type implTunnelForwardStream struct {
clientCall _gen_ipc.Call
+ val []byte
+ err error
}
func (c *implTunnelForwardStream) Send(item []byte) error {
@@ -114,9 +152,20 @@
return c.clientCall.CloseSend()
}
-func (c *implTunnelForwardStream) Recv() (item []byte, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implTunnelForwardStream) Advance() bool {
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implTunnelForwardStream) Value() []byte {
+ return c.val
+}
+
+func (c *implTunnelForwardStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implTunnelForwardStream) Finish() (err error) {
@@ -134,57 +183,122 @@
// Forward in the service interface Tunnel.
type TunnelServiceForwardStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item []byte) error
- // Recv fills itemptr with the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item []byte, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
+
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ //
+ // In general, Value is undefined if the underlying collection
+ // of elements changes while iteration is in progress. If
+ // <DataProvider> supports concurrent modification, it should
+ // document its behavior.
+ Value() []byte
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
}
// Implementation of the TunnelServiceForwardStream interface that is not exported.
type implTunnelServiceForwardStream struct {
serverCall _gen_ipc.ServerCall
+ val []byte
+ err error
}
func (s *implTunnelServiceForwardStream) Send(item []byte) error {
return s.serverCall.Send(item)
}
-func (s *implTunnelServiceForwardStream) Recv() (item []byte, err error) {
- err = s.serverCall.Recv(&item)
- return
+func (s *implTunnelServiceForwardStream) Advance() bool {
+ s.err = s.serverCall.Recv(&s.val)
+ return s.err == nil
+}
+
+func (s *implTunnelServiceForwardStream) Value() []byte {
+ return s.val
+}
+
+func (s *implTunnelServiceForwardStream) Err() error {
+ if s.err == _gen_io.EOF {
+ return nil
+ }
+ return s.err
}
// TunnelShellStream is the interface for streaming responses of the method
// Shell in the service interface Tunnel.
type TunnelShellStream interface {
- // Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // Send places the item onto the output stream, blocking if there is no
+ // buffer space available. Calls to Send after having called CloseSend
+ // or Cancel will fail. Any blocked Send calls will be unblocked upon
+ // calling Cancel.
Send(item ClientShellPacket) error
- // CloseSend indicates to the server that no more items will be sent; server
- // Recv calls will receive io.EOF after all sent items. Subsequent calls to
- // Send on the client will fail. This is an optional call - it's used by
- // streaming clients that need the server to receive the io.EOF terminator.
+ // CloseSend indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items. This is
+ // an optional call - it's used by streaming clients that need the
+ // server to receive the io.EOF terminator before the client calls
+ // Finish (for example, if the client needs to continue receiving items
+ // from the server after having finished sending).
+ // Calls to CloseSend after having called Cancel will fail.
+ // Like Send, CloseSend blocks when there's no buffer space available.
CloseSend() error
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item ServerShellPacket, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() ServerShellPacket
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish performs the equivalent of CloseSend, then blocks until the server
+ // is done, and returns the positional return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (reply int32, err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the TunnelShellStream interface that is not exported.
type implTunnelShellStream struct {
clientCall _gen_ipc.Call
+ val ServerShellPacket
+ err error
}
func (c *implTunnelShellStream) Send(item ClientShellPacket) error {
@@ -195,9 +309,21 @@
return c.clientCall.CloseSend()
}
-func (c *implTunnelShellStream) Recv() (item ServerShellPacket, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implTunnelShellStream) Advance() bool {
+ c.val = ServerShellPacket{}
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implTunnelShellStream) Value() ServerShellPacket {
+ return c.val
+}
+
+func (c *implTunnelShellStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implTunnelShellStream) Finish() (reply int32, err error) {
@@ -215,26 +341,59 @@
// Shell in the service interface Tunnel.
type TunnelServiceShellStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item ServerShellPacket) error
- // Recv fills itemptr with the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item ClientShellPacket, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
+
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ //
+ // In general, Value is undefined if the underlying collection
+ // of elements changes while iteration is in progress. If
+ // <DataProvider> supports concurrent modification, it should
+ // document its behavior.
+ Value() ClientShellPacket
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
}
// Implementation of the TunnelServiceShellStream interface that is not exported.
type implTunnelServiceShellStream struct {
serverCall _gen_ipc.ServerCall
+ val ClientShellPacket
+ err error
}
func (s *implTunnelServiceShellStream) Send(item ServerShellPacket) error {
return s.serverCall.Send(item)
}
-func (s *implTunnelServiceShellStream) Recv() (item ClientShellPacket, err error) {
- err = s.serverCall.Recv(&item)
- return
+func (s *implTunnelServiceShellStream) Advance() bool {
+ s.val = ClientShellPacket{}
+ s.err = s.serverCall.Recv(&s.val)
+ return s.err == nil
+}
+
+func (s *implTunnelServiceShellStream) Value() ClientShellPacket {
+ return s.val
+}
+
+func (s *implTunnelServiceShellStream) Err() error {
+ if s.err == _gen_io.EOF {
+ return nil
+ }
+ return s.err
}
// BindTunnel returns the client stub implementing the Tunnel
diff --git a/examples/tunnel/tunneld/impl/iomanager.go b/examples/tunnel/tunneld/impl/iomanager.go
index fe422db..ea04738 100644
--- a/examples/tunnel/tunneld/impl/iomanager.go
+++ b/examples/tunnel/tunneld/impl/iomanager.go
@@ -93,13 +93,8 @@
// stream2stdin reads data from the stream and sends it to the shell's stdin.
func (m *ioManager) stream2stdin() {
- for {
- packet, err := m.stream.Recv()
- if err != nil {
- vlog.VI(2).Infof("stream2stdin: %v", err)
- m.done <- err
- return
- }
+ for m.stream.Advance() {
+ packet := m.stream.Value()
if len(packet.Stdin) > 0 {
if n, err := m.stdin.Write(packet.Stdin); n != len(packet.Stdin) || err != nil {
m.done <- fmt.Errorf("stdin.Write returned (%d, %v) want (%d, nil)", n, err, len(packet.Stdin))
@@ -110,4 +105,12 @@
setWindowSize(m.ptyFd, packet.Rows, packet.Cols)
}
}
+
+ err := m.stream.Err()
+ if err == nil {
+ err = io.EOF
+ }
+
+ vlog.VI(2).Infof("stream2stdin: %v", err)
+ m.done <- err
}
diff --git a/examples/tunnel/vsh/iomanager.go b/examples/tunnel/vsh/iomanager.go
index 00cd29d..6c2a0d6 100644
--- a/examples/tunnel/vsh/iomanager.go
+++ b/examples/tunnel/vsh/iomanager.go
@@ -91,13 +91,9 @@
// stream2user reads data from the stream and sends it to either stdout or stderr.
func (m *ioManager) stream2user() {
- for {
- packet, err := m.stream.Recv()
- if err != nil {
- vlog.VI(2).Infof("stream2user: %v", err)
- m.done <- err
- return
- }
+ for m.stream.Advance() {
+ packet := m.stream.Value()
+
if len(packet.Stdout) > 0 {
if n, err := m.stdout.Write(packet.Stdout); n != len(packet.Stdout) || err != nil {
m.done <- fmt.Errorf("stdout.Write returned (%d, %v) want (%d, nil)", n, err, len(packet.Stdout))
@@ -111,4 +107,10 @@
}
}
}
+ err := m.stream.Err()
+ if err == nil {
+ err = io.EOF
+ }
+ vlog.VI(2).Infof("stream2user: %v", err)
+ m.done <- err
}
diff --git a/examples/wspr_sample/cache.vdl.go b/examples/wspr_sample/cache.vdl.go
index 5f738a3..1806f07 100644
--- a/examples/wspr_sample/cache.vdl.go
+++ b/examples/wspr_sample/cache.vdl.go
@@ -5,6 +5,7 @@
import (
// The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_io "io"
_gen_veyron2 "veyron2"
_gen_context "veyron2/context"
_gen_ipc "veyron2/ipc"
@@ -20,6 +21,10 @@
Value _gen_vdlutil.Any
}
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// A Cache service mimics the memcache interface.
// Cache is the interface the client binds and uses.
// Cache_ExcludingUniversal is the interface without internal framework-added methods
@@ -120,31 +125,64 @@
// MultiGet in the service interface Cache.
type CacheMultiGetStream interface {
- // Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // Send places the item onto the output stream, blocking if there is no
+ // buffer space available. Calls to Send after having called CloseSend
+ // or Cancel will fail. Any blocked Send calls will be unblocked upon
+ // calling Cancel.
Send(item string) error
- // CloseSend indicates to the server that no more items will be sent; server
- // Recv calls will receive io.EOF after all sent items. Subsequent calls to
- // Send on the client will fail. This is an optional call - it's used by
- // streaming clients that need the server to receive the io.EOF terminator.
+ // CloseSend indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items. This is
+ // an optional call - it's used by streaming clients that need the
+ // server to receive the io.EOF terminator before the client calls
+ // Finish (for example, if the client needs to continue receiving items
+ // from the server after having finished sending).
+ // Calls to CloseSend after having called Cancel will fail.
+ // Like Send, CloseSend blocks when there's no buffer space available.
CloseSend() error
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item _gen_vdlutil.Any, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() _gen_vdlutil.Any
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish performs the equivalent of CloseSend, then blocks until the server
+ // is done, and returns the positional return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the CacheMultiGetStream interface that is not exported.
type implCacheMultiGetStream struct {
clientCall _gen_ipc.Call
+ val _gen_vdlutil.Any
+ err error
}
func (c *implCacheMultiGetStream) Send(item string) error {
@@ -155,9 +193,21 @@
return c.clientCall.CloseSend()
}
-func (c *implCacheMultiGetStream) Recv() (item _gen_vdlutil.Any, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implCacheMultiGetStream) Advance() bool {
+ c.val = nil
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implCacheMultiGetStream) Value() _gen_vdlutil.Any {
+ return c.val
+}
+
+func (c *implCacheMultiGetStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implCacheMultiGetStream) Finish() (err error) {
@@ -175,26 +225,58 @@
// MultiGet in the service interface Cache.
type CacheServiceMultiGetStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item _gen_vdlutil.Any) error
- // Recv fills itemptr with the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item string, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
+
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ //
+ // In general, Value is undefined if the underlying collection
+ // of elements changes while iteration is in progress. If
+ // <DataProvider> supports concurrent modification, it should
+ // document its behavior.
+ Value() string
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
}
// Implementation of the CacheServiceMultiGetStream interface that is not exported.
type implCacheServiceMultiGetStream struct {
serverCall _gen_ipc.ServerCall
+ val string
+ err error
}
func (s *implCacheServiceMultiGetStream) Send(item _gen_vdlutil.Any) error {
return s.serverCall.Send(item)
}
-func (s *implCacheServiceMultiGetStream) Recv() (item string, err error) {
- err = s.serverCall.Recv(&item)
- return
+func (s *implCacheServiceMultiGetStream) Advance() bool {
+ s.err = s.serverCall.Recv(&s.val)
+ return s.err == nil
+}
+
+func (s *implCacheServiceMultiGetStream) Value() string {
+ return s.val
+}
+
+func (s *implCacheServiceMultiGetStream) Err() error {
+ if s.err == _gen_io.EOF {
+ return nil
+ }
+ return s.err
}
// BindCache returns the client stub implementing the Cache
diff --git a/examples/wspr_sample/error_thrower.vdl.go b/examples/wspr_sample/error_thrower.vdl.go
index 54b046c..9fc497f 100644
--- a/examples/wspr_sample/error_thrower.vdl.go
+++ b/examples/wspr_sample/error_thrower.vdl.go
@@ -14,6 +14,10 @@
_gen_wiretype "veyron2/wiretype"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// A testing interface with methods that throw various types of errors
// ErrorThrower is the interface the client binds and uses.
// ErrorThrower_ExcludingUniversal is the interface without internal framework-added methods
diff --git a/examples/wspr_sample/sampled/lib/cache_impl.go b/examples/wspr_sample/sampled/lib/cache_impl.go
index e99d34e..878c6af 100644
--- a/examples/wspr_sample/sampled/lib/cache_impl.go
+++ b/examples/wspr_sample/sampled/lib/cache_impl.go
@@ -2,7 +2,6 @@
import (
"fmt"
- "io"
"reflect"
"sort"
"time"
@@ -182,15 +181,8 @@
// keys in the stream is not in the map or if there was an issue reading
// the stream.
func (c *cacheImpl) MultiGet(_ ipc.ServerContext, stream sample.CacheServiceMultiGetStream) error {
- for {
- key, err := stream.Recv()
- if err == io.EOF {
- return nil
- }
-
- if err != nil {
- return err
- }
+ for stream.Advance() {
+ key := stream.Value()
value, ok := c.cache[key]
if !ok {
@@ -198,4 +190,5 @@
}
stream.Send(value)
}
+ return stream.Err()
}
diff --git a/examples/wspr_sample/sampled/lib/sampled_test.go b/examples/wspr_sample/sampled/lib/sampled_test.go
index 28bd8d5..715c6e0 100644
--- a/examples/wspr_sample/sampled/lib/sampled_test.go
+++ b/examples/wspr_sample/sampled/lib/sampled_test.go
@@ -288,28 +288,28 @@
stream.Send("C")
stream.Send("E")
- if item, err := stream.Recv(); err == nil {
- if item != "A" {
+ if stream.Advance() {
+ if stream.Value() != "A" {
t.Errorf("value for 'A' didn't match")
}
} else {
- t.Fatal("error on recv: %v", err)
+ t.Fatal("error on advance: %v", stream.Err())
}
- if item, err := stream.Recv(); err == nil {
- if item != uint32(7) {
+ if stream.Advance() {
+ if stream.Value() != uint32(7) {
t.Errorf("value for 'C' didn't match")
}
} else {
- t.Fatal("error on recv: %v", err)
+ t.Fatal("error on advance: %v", stream.Err())
}
- if item, err := stream.Recv(); err == nil {
- if item != true {
+ if stream.Advance() {
+ if stream.Value() != true {
t.Errorf("value for 'E' didn't match")
}
} else {
- t.Fatal("error on recv: %v", err)
+ t.Fatal("error on advance: %v", stream.Err())
}
stream.CloseSend()
diff --git a/lib/signals/signals_test.go b/lib/signals/signals_test.go
index 93480f8..4600c32 100644
--- a/lib/signals/signals_test.go
+++ b/lib/signals/signals_test.go
@@ -2,7 +2,6 @@
import (
"fmt"
- "io"
"os"
"syscall"
"testing"
@@ -326,8 +325,8 @@
if err != nil {
t.Fatalf("Got error: %v", err)
}
- if task, err := stream.Recv(); err != io.EOF {
- t.Errorf("Expected (nil, EOF), got (%v, %v) instead: ", task, err)
+ if stream.Advance() || stream.Err() != nil {
+ t.Errorf("Expected EOF, got (%v, %v) instead: ", stream.Value(), stream.Err())
}
if err := stream.Finish(); err != nil {
t.Fatalf("Got error: %v", err)
diff --git a/lib/testutil/modules/ls.go b/lib/testutil/modules/ls.go
index 214856f..88ae4d0 100644
--- a/lib/testutil/modules/ls.go
+++ b/lib/testutil/modules/ls.go
@@ -2,7 +2,6 @@
import (
"fmt"
- "io"
"strconv"
"strings"
@@ -104,17 +103,12 @@
return []string{}, err
}
var reply []string
- for {
- e, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return reply, err
- }
+ for stream.Advance() {
+ e := stream.Value()
reply = append(reply, fmt.Sprintf("%q", e.Name))
}
- return reply, nil
+
+ return reply, stream.Err()
}
func lsUsingResolveToMountTable(name, pattern string) ([]string, error) {
diff --git a/lib/testutil/modules/servers.vdl.go b/lib/testutil/modules/servers.vdl.go
index 26f9ef2..89a72bb 100644
--- a/lib/testutil/modules/servers.vdl.go
+++ b/lib/testutil/modules/servers.vdl.go
@@ -14,6 +14,10 @@
_gen_wiretype "veyron2/wiretype"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Clock is the interface the client binds and uses.
// Clock_ExcludingUniversal is the interface without internal framework-added methods
// to enable embedding without method collisions. Not to be used directly by clients.
diff --git a/runtimes/google/ipc/benchmarks/client.go b/runtimes/google/ipc/benchmarks/client.go
index 8117e8f..6d8bfc6 100644
--- a/runtimes/google/ipc/benchmarks/client.go
+++ b/runtimes/google/ipc/benchmarks/client.go
@@ -62,8 +62,8 @@
}
done := make(chan error, 1)
go func() {
- for {
- chunk, err := stream.Recv()
+ for stream.Advance() {
+ chunk := stream.Value()
if err == io.EOF {
done <- nil
return
@@ -77,6 +77,8 @@
return
}
}
+
+ done <- stream.Err()
}()
for j := 0; j < messageCount; j++ {
if err = stream.Send(payload); err != nil {
diff --git a/runtimes/google/ipc/benchmarks/server.go b/runtimes/google/ipc/benchmarks/server.go
index bd5eb0c..0d1c95e 100644
--- a/runtimes/google/ipc/benchmarks/server.go
+++ b/runtimes/google/ipc/benchmarks/server.go
@@ -1,8 +1,6 @@
package benchmarks
import (
- "io"
-
sflag "veyron/security/flag"
"veyron2/ipc"
@@ -19,19 +17,14 @@
}
func (i *impl) EchoStream(ctx ipc.ServerContext, stream BenchmarkServiceEchoStreamStream) error {
- for {
- chunk, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
+ for stream.Advance() {
+ chunk := stream.Value()
if err := stream.Send(chunk); err != nil {
return err
}
}
- return nil
+
+ return stream.Err()
}
// StartServer starts a server that implements the Benchmark service. The
diff --git a/runtimes/google/ipc/benchmarks/service.vdl.go b/runtimes/google/ipc/benchmarks/service.vdl.go
index 2356536..eddbfa9 100644
--- a/runtimes/google/ipc/benchmarks/service.vdl.go
+++ b/runtimes/google/ipc/benchmarks/service.vdl.go
@@ -7,6 +7,7 @@
import (
// The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_io "io"
_gen_veyron2 "veyron2"
_gen_context "veyron2/context"
_gen_ipc "veyron2/ipc"
@@ -16,6 +17,10 @@
_gen_wiretype "veyron2/wiretype"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Benchmark is the interface the client binds and uses.
// Benchmark_ExcludingUniversal is the interface without internal framework-added methods
// to enable embedding without method collisions. Not to be used directly by clients.
@@ -43,31 +48,64 @@
// EchoStream in the service interface Benchmark.
type BenchmarkEchoStreamStream interface {
- // Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // Send places the item onto the output stream, blocking if there is no
+ // buffer space available. Calls to Send after having called CloseSend
+ // or Cancel will fail. Any blocked Send calls will be unblocked upon
+ // calling Cancel.
Send(item []byte) error
- // CloseSend indicates to the server that no more items will be sent; server
- // Recv calls will receive io.EOF after all sent items. Subsequent calls to
- // Send on the client will fail. This is an optional call - it's used by
- // streaming clients that need the server to receive the io.EOF terminator.
+ // CloseSend indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items. This is
+ // an optional call - it's used by streaming clients that need the
+ // server to receive the io.EOF terminator before the client calls
+ // Finish (for example, if the client needs to continue receiving items
+ // from the server after having finished sending).
+ // Calls to CloseSend after having called Cancel will fail.
+ // Like Send, CloseSend blocks when there's no buffer space available.
CloseSend() error
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item []byte, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() []byte
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish performs the equivalent of CloseSend, then blocks until the server
+ // is done, and returns the positional return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the BenchmarkEchoStreamStream interface that is not exported.
type implBenchmarkEchoStreamStream struct {
clientCall _gen_ipc.Call
+ val []byte
+ err error
}
func (c *implBenchmarkEchoStreamStream) Send(item []byte) error {
@@ -78,9 +116,20 @@
return c.clientCall.CloseSend()
}
-func (c *implBenchmarkEchoStreamStream) Recv() (item []byte, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implBenchmarkEchoStreamStream) Advance() bool {
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implBenchmarkEchoStreamStream) Value() []byte {
+ return c.val
+}
+
+func (c *implBenchmarkEchoStreamStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implBenchmarkEchoStreamStream) Finish() (err error) {
@@ -98,26 +147,58 @@
// EchoStream in the service interface Benchmark.
type BenchmarkServiceEchoStreamStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item []byte) error
- // Recv fills itemptr with the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item []byte, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
+
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ //
+ // In general, Value is undefined if the underlying collection
+ // of elements changes while iteration is in progress. If
+ // <DataProvider> supports concurrent modification, it should
+ // document its behavior.
+ Value() []byte
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
}
// Implementation of the BenchmarkServiceEchoStreamStream interface that is not exported.
type implBenchmarkServiceEchoStreamStream struct {
serverCall _gen_ipc.ServerCall
+ val []byte
+ err error
}
func (s *implBenchmarkServiceEchoStreamStream) Send(item []byte) error {
return s.serverCall.Send(item)
}
-func (s *implBenchmarkServiceEchoStreamStream) Recv() (item []byte, err error) {
- err = s.serverCall.Recv(&item)
- return
+func (s *implBenchmarkServiceEchoStreamStream) Advance() bool {
+ s.err = s.serverCall.Recv(&s.val)
+ return s.err == nil
+}
+
+func (s *implBenchmarkServiceEchoStreamStream) Value() []byte {
+ return s.val
+}
+
+func (s *implBenchmarkServiceEchoStreamStream) Err() error {
+ if s.err == _gen_io.EOF {
+ return nil
+ }
+ return s.err
}
// BindBenchmark returns the client stub implementing the Benchmark
diff --git a/runtimes/google/ipc/jni/arg_getter.go b/runtimes/google/ipc/jni/arg_getter.go
index fc6bf3b..04efd0b 100644
--- a/runtimes/google/ipc/jni/arg_getter.go
+++ b/runtimes/google/ipc/jni/arg_getter.go
@@ -97,9 +97,9 @@
mArgs.streamSendType = mSend.Type.In(0)
}
// Get the stream recv type.
- if mRecv, ok := stream.MethodByName("Recv"); ok {
- if mRecv.Type.NumOut() != 2 {
- return fmt.Errorf("Illegal number of arguments for Recv method in stream %v", stream)
+ if mRecv, ok := stream.MethodByName("Value"); ok {
+ if mRecv.Type.NumOut() != 1 {
+ return fmt.Errorf("Illegal number of arguments for Value method in stream %v", stream)
}
mArgs.streamRecvType = mRecv.Type.Out(0)
}
diff --git a/runtimes/google/rt/mgmt_test.go b/runtimes/google/rt/mgmt_test.go
index bb78645..7f314e9 100644
--- a/runtimes/google/rt/mgmt_test.go
+++ b/runtimes/google/rt/mgmt_test.go
@@ -2,7 +2,6 @@
import (
"fmt"
- "io"
"os"
"reflect"
"strings"
@@ -305,17 +304,19 @@
t.Fatalf("Got error: %v", err)
}
expectTask := func(progress, goal int32) {
- if task, err := stream.Recv(); err != nil {
- t.Fatalf("unexpected streaming error: %q", err)
- } else if task.Progress != progress || task.Goal != goal {
+ if !stream.Advance() {
+ t.Fatalf("unexpected streaming error: %q", stream.Err())
+ }
+ task := stream.Value()
+ if task.Progress != progress || task.Goal != goal {
t.Errorf("Got (%d, %d), want (%d, %d)", task.Progress, task.Goal, progress, goal)
}
}
expectTask(0, 10)
expectTask(2, 10)
expectTask(7, 10)
- if task, err := stream.Recv(); err != io.EOF {
- t.Errorf("Expected (nil, EOF), got (%v, %v) instead", task, err)
+ if stream.Advance() || stream.Err() != nil {
+ t.Errorf("Expected EOF, got (%v, %v) instead", stream.Value(), stream.Err())
}
if err := stream.Finish(); err != nil {
t.Errorf("Got error %v", err)
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index af7ece9..76aa7da 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -3,7 +3,6 @@
import (
"errors"
"fmt"
- "io"
"math/rand"
"strings"
"time"
@@ -256,14 +255,8 @@
// Compute the minimum generation for every device in this set.
minGens := GenVector{}
- for {
- rec, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return GenVector{}, err
- }
+ for stream.Advance() {
+ rec := stream.Value()
if err := i.insertRecInLogAndDag(&rec); err != nil {
return GenVector{}, err
@@ -292,6 +285,9 @@
}
}
+ if err := stream.Err(); err != nil {
+ return GenVector{}, err
+ }
if err := i.createGenMetadataBatch(newGens, orderGens); err != nil {
return GenVector{}, err
}
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index bd9356c..5699fbc 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -4,7 +4,6 @@
import (
"container/list"
"fmt"
- "io"
"os"
"time"
@@ -36,7 +35,8 @@
// dummyStream struct emulates stream of log records received from RPC.
type dummyStream struct {
- l *list.List
+ l *list.List
+ value LogRec
}
func newStream() *dummyStream {
@@ -46,14 +46,20 @@
return ds
}
-func (ds *dummyStream) Recv() (LogRec, error) {
+func (ds *dummyStream) Advance() bool {
if ds.l.Len() > 0 {
- item := ds.l.Remove(ds.l.Front()).(LogRec)
- return item, nil
+ ds.value = ds.l.Remove(ds.l.Front()).(LogRec)
+ return true
}
- return LogRec{}, io.EOF
+ return false
}
+func (ds *dummyStream) Value() LogRec {
+ return ds.value
+}
+
+func (*dummyStream) Err() error { return nil }
+
func (ds *dummyStream) Finish() (GenVector, error) {
return GenVector{}, nil
}
diff --git a/runtimes/google/vsync/vsync.vdl.go b/runtimes/google/vsync/vsync.vdl.go
index f81a4b7..7dcc5a4 100644
--- a/runtimes/google/vsync/vsync.vdl.go
+++ b/runtimes/google/vsync/vsync.vdl.go
@@ -9,6 +9,7 @@
"veyron2/storage"
// The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_io "io"
_gen_veyron2 "veyron2"
_gen_context "veyron2/context"
_gen_ipc "veyron2/ipc"
@@ -81,6 +82,10 @@
LinkRec = byte(1)
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Sync allows a device to GetDeltas from another device.
// Sync is the interface the client binds and uses.
// Sync_ExcludingUniversal is the interface without internal framework-added methods
@@ -107,26 +112,65 @@
// GetDeltas in the service interface Sync.
type SyncGetDeltasStream interface {
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item LogRec, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() LogRec
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish blocks until the server is done and returns the positional
+ // return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (reply GenVector, err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the SyncGetDeltasStream interface that is not exported.
type implSyncGetDeltasStream struct {
clientCall _gen_ipc.Call
+ val LogRec
+ err error
}
-func (c *implSyncGetDeltasStream) Recv() (item LogRec, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implSyncGetDeltasStream) Advance() bool {
+ c.val = LogRec{}
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implSyncGetDeltasStream) Value() LogRec {
+ return c.val
+}
+
+func (c *implSyncGetDeltasStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implSyncGetDeltasStream) Finish() (reply GenVector, err error) {
@@ -144,7 +188,7 @@
// GetDeltas in the service interface Sync.
type SyncServiceGetDeltasStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item LogRec) error
}
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index f911231..aa4398f 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -7,6 +7,7 @@
import (
"fmt"
+ "io"
"time"
"veyron/services/store/raw"
@@ -123,27 +124,29 @@
// If the stream is closed, distinguish between the cases of end-of-stream vs Syncd canceling
// the stream to trigger a clean exit.
func (w *syncWatcher) processWatchStream(stream watch.GlobWatcherWatchGlobStream) {
- for {
- changes, err := stream.Recv()
- if err != nil {
- if w.syncd.isSyncClosing() {
- vlog.VI(1).Info("processWatchStream: exiting, Syncd closed its channel: ", err)
- } else {
- vlog.VI(1).Info("processWatchStream: RPC stream error, re-issue Watch(): ", err)
- }
- return
- }
+ for stream.Advance() {
+ changes := stream.Value()
// Timestamp of these changes arriving at the Sync server.
syncTime := time.Now().UnixNano()
- if err = w.processChanges(changes, syncTime); err != nil {
+ if err := w.processChanges(changes, syncTime); err != nil {
// TODO(rdaoud): don't crash, instead add retry policies to attempt some degree of
// self-healing from a data corruption where feasible, otherwise quarantine this device
// from the cluster and stop Syncd to avoid propagating data corruptions.
vlog.Fatal("processWatchStream:", err)
}
}
+
+ err := stream.Err()
+ if err == nil {
+ err = io.EOF
+ }
+ if w.syncd.isSyncClosing() {
+ vlog.VI(1).Info("processWatchStream: exiting, Syncd closed its channel: ", err)
+ } else {
+ vlog.VI(1).Info("processWatchStream: RPC stream error, re-issue Watch(): ", err)
+ }
}
// processChanges applies the batch of changes (object mutations) received from the Watch API.
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index 6eec8e4..b9d8a79 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -5,7 +5,6 @@
import (
"bytes"
"fmt"
- "io"
"os"
"testing"
"time"
@@ -73,6 +72,7 @@
// fakeStream is used to simulate the reply stream of the Watch() API.
type fakeStream struct {
canceled chan struct{}
+ err error
}
func newFakeStream() *fakeStream {
@@ -81,30 +81,35 @@
return s
}
-func (s *fakeStream) Recv() (watch.ChangeBatch, error) {
- var empty watch.ChangeBatch
+func (s *fakeStream) Advance() bool {
// If "failRecv" is set, simulate a failed call.
if info.failRecv {
info.failRecvCount++
- return empty, fmt.Errorf("fake recv error on fake stream: %d", info.failRecvCount)
+ s.err = fmt.Errorf("fake recv error on fake stream: %d", info.failRecvCount)
+ return false
}
// If "eofRecv" is set, simulate a closed stream and make sure the next Recv() call blocks.
if info.eofRecv {
info.eofRecv, info.blockRecv = false, true
- return empty, io.EOF
+ s.err = nil
+ return false
}
// If "blockRecv" is set, simulate blocking the call until the stream is canceled.
if info.blockRecv {
close(recvBlocked)
<-s.canceled
- return empty, io.EOF
+ s.err = nil
+ return false
}
-
// Otherwise return a batch of changes, and make sure the next Recv() call returns EOF on the stream.
// Adjust the resume marker of the change records to follow the one given to the Watch request.
info.eofRecv = true
+ return true
+}
+
+func (s *fakeStream) Value() watch.ChangeBatch {
changes := getChangeBatch()
var lastCount byte
@@ -121,9 +126,12 @@
}
}
- return changes, nil
+ return changes
}
+func (s *fakeStream) Err() error {
+ return s.err
+}
func (s *fakeStream) Finish() error {
return nil
}
diff --git a/services/mgmt/binary/impl/impl_test.go b/services/mgmt/binary/impl/impl_test.go
index fbe84b1..269ee0c 100644
--- a/services/mgmt/binary/impl/impl_test.go
+++ b/services/mgmt/binary/impl/impl_test.go
@@ -4,7 +4,6 @@
"bytes"
"crypto/md5"
"encoding/hex"
- "io"
"io/ioutil"
"os"
"path/filepath"
@@ -64,20 +63,15 @@
return nil, err
}
output := make([]byte, 0)
- for {
- bytes, err := stream.Recv()
- if err != nil && err != io.EOF {
- if err := stream.Finish(); err != nil {
- t.Logf("Finish() failed: %v", err)
- }
- t.Logf("Recv() failed: %v", err)
- return nil, err
- }
- if err == io.EOF {
- break
- }
+ for stream.Advance() {
+ bytes := stream.Value()
output = append(output, bytes...)
}
+
+ if err := stream.Err(); err != nil {
+ t.Logf("Advance() failed with: %v", err)
+ }
+
if err := stream.Finish(); err != nil {
t.Logf("Finish() failed: %v", err)
return nil, err
diff --git a/services/mgmt/binary/impl/invoker.go b/services/mgmt/binary/impl/invoker.go
index d28fa03..59324a3 100644
--- a/services/mgmt/binary/impl/invoker.go
+++ b/services/mgmt/binary/impl/invoker.go
@@ -436,18 +436,8 @@
}
defer file.Close()
h := md5.New()
- for {
- bytes, err := stream.Recv()
- if err != nil && err != io.EOF {
- vlog.Errorf("Recv() failed: %v", err)
- if err := os.Remove(file.Name()); err != nil {
- vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
- }
- return errOperationFailed
- }
- if err == io.EOF {
- break
- }
+ for stream.Advance() {
+ bytes := stream.Value()
if _, err := file.Write(bytes); err != nil {
vlog.Errorf("Write() failed: %v", err)
if err := os.Remove(file.Name()); err != nil {
@@ -457,6 +447,15 @@
}
h.Write(bytes)
}
+
+ if err := stream.Err(); err != nil {
+ vlog.Errorf("Recv() failed: %v", err)
+ if err := os.Remove(file.Name()); err != nil {
+ vlog.Errorf("Remove(%v) failed: %v", file.Name(), err)
+ }
+ return errOperationFailed
+ }
+
hash := hex.EncodeToString(h.Sum(nil))
checksumFile, perm := filepath.Join(path, checksum), os.FileMode(0600)
if err := ioutil.WriteFile(checksumFile, []byte(hash), perm); err != nil {
diff --git a/services/mgmt/build/impl/impl_test.go b/services/mgmt/build/impl/impl_test.go
index 0404e7b..a60b1e8 100644
--- a/services/mgmt/build/impl/impl_test.go
+++ b/services/mgmt/build/impl/impl_test.go
@@ -1,7 +1,6 @@
package impl
import (
- "io"
"os"
"path/filepath"
"strings"
@@ -69,17 +68,16 @@
return nil, nil, err
}
bins := make([]build.File, 0)
- for {
- bin, err := stream.Recv()
- if err != nil && err != io.EOF {
- t.Logf("Recv() failed: %v", err)
- return nil, nil, err
- }
- if err == io.EOF {
- break
- }
+ for stream.Advance() {
+ bin := stream.Value()
bins = append(bins, bin)
}
+
+ if err := stream.Err(); err != nil {
+ t.Logf("Recv() failed: %v", err)
+ return nil, nil, err
+ }
+
output, err := stream.Finish()
if err != nil {
t.Logf("Finish() failed: %v", err)
diff --git a/services/mgmt/build/impl/invoker.go b/services/mgmt/build/impl/invoker.go
index 2ef2585..82de34a 100644
--- a/services/mgmt/build/impl/invoker.go
+++ b/services/mgmt/build/impl/invoker.go
@@ -3,7 +3,6 @@
import (
"bytes"
"errors"
- "io"
"io/ioutil"
"os"
"os/exec"
@@ -52,15 +51,8 @@
vlog.Errorf("MkdirAll(%v, %v) failed: %v", srcDir, dirPerm, err)
return nil, errInternalError
}
- for {
- srcFile, err := stream.Recv()
- if err != nil && err != io.EOF {
- vlog.Errorf("Recv() failed: %v", err)
- return nil, errInternalError
- }
- if err == io.EOF {
- break
- }
+ for stream.Advance() {
+ srcFile := stream.Value()
filePath := filepath.Join(srcDir, filepath.FromSlash(srcFile.Name))
dir := filepath.Dir(filePath)
if err := os.MkdirAll(dir, dirPerm); err != nil {
@@ -72,6 +64,11 @@
return nil, errInternalError
}
}
+
+ if err := stream.Err(); err != nil {
+ vlog.Errorf("stream failed: %v", err)
+ return nil, errInternalError
+ }
cmd := exec.Command(i.gobin, "install", "-v", "...")
cmd.Env = append(cmd.Env, "GOPATH="+filepath.Dir(srcDir))
var output bytes.Buffer
diff --git a/services/mgmt/lib/binary/impl.go b/services/mgmt/lib/binary/impl.go
index 357c5d0..a6611eb 100644
--- a/services/mgmt/lib/binary/impl.go
+++ b/services/mgmt/lib/binary/impl.go
@@ -74,16 +74,8 @@
continue
}
h, nreceived := md5.New(), 0
- for {
- bytes, err := stream.Recv()
- if err != nil {
- if err != io.EOF {
- vlog.Errorf("Recv() failed: %v", err)
- stream.Cancel()
- continue download
- }
- break
- }
+ for stream.Advance() {
+ bytes := stream.Value()
if _, err := w.Write(bytes); err != nil {
vlog.Errorf("Write() failed: %v", err)
stream.Cancel()
@@ -92,6 +84,13 @@
h.Write(bytes)
nreceived += len(bytes)
}
+
+ if err := stream.Err(); err != nil {
+ vlog.Errorf("stream failed: %v", err)
+ stream.Cancel()
+ continue download
+
+ }
if err := stream.Finish(); err != nil {
vlog.Errorf("Finish() failed: %v", err)
continue
diff --git a/services/mgmt/node/node.vdl.go b/services/mgmt/node/node.vdl.go
index 4a922f3..c7b7430 100644
--- a/services/mgmt/node/node.vdl.go
+++ b/services/mgmt/node/node.vdl.go
@@ -18,6 +18,10 @@
_gen_wiretype "veyron2/wiretype"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Config is an RPC API to the config service.
// Config is the interface the client binds and uses.
// Config_ExcludingUniversal is the interface without internal framework-added methods
diff --git a/services/mgmt/repository/repository.vdl.go b/services/mgmt/repository/repository.vdl.go
index 83bc841..0b2b305 100644
--- a/services/mgmt/repository/repository.vdl.go
+++ b/services/mgmt/repository/repository.vdl.go
@@ -24,6 +24,10 @@
_gen_wiretype "veyron2/wiretype"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Application describes an application repository internally. Besides
// the public Application interface, it allows to add and remove
// application envelopes.
diff --git a/services/mgmt/root/root.vdl.go b/services/mgmt/root/root.vdl.go
index 90f5103..a7bff85 100644
--- a/services/mgmt/root/root.vdl.go
+++ b/services/mgmt/root/root.vdl.go
@@ -16,6 +16,10 @@
_gen_wiretype "veyron2/wiretype"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Root is an interface to be implemented by a process with root level
// privileges.
// Root is the interface the client binds and uses.
diff --git a/services/mounttable/lib/collection_test.vdl.go b/services/mounttable/lib/collection_test.vdl.go
index 75cf8fb..8002428 100644
--- a/services/mounttable/lib/collection_test.vdl.go
+++ b/services/mounttable/lib/collection_test.vdl.go
@@ -14,6 +14,10 @@
_gen_wiretype "veyron2/wiretype"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Collection is the interface the client binds and uses.
// Collection_ExcludingUniversal is the interface without internal framework-added methods
// to enable embedding without method collisions. Not to be used directly by clients.
diff --git a/services/mounttable/lib/mounttable_test.go b/services/mounttable/lib/mounttable_test.go
index cfea7f8..4542d6d 100644
--- a/services/mounttable/lib/mounttable_test.go
+++ b/services/mounttable/lib/mounttable_test.go
@@ -2,7 +2,6 @@
import (
"errors"
- "io"
"reflect"
"runtime/debug"
"sort"
@@ -287,16 +286,14 @@
boom(t, "Failed call to %s.Glob(%s): %s", name, pattern, err)
}
var reply []string
- for {
- e, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- boom(t, "Glob %s: %s", name, err)
- }
+ for stream.Advance() {
+ e := stream.Value()
reply = append(reply, e.Name)
}
+
+ if err := stream.Err(); err != nil {
+ boom(t, "Glob %s: %s", name, err)
+ }
return reply
}
diff --git a/services/security/discharger.vdl.go b/services/security/discharger.vdl.go
index 7a2b5a8..cfb2ea2 100644
--- a/services/security/discharger.vdl.go
+++ b/services/security/discharger.vdl.go
@@ -16,6 +16,10 @@
_gen_wiretype "veyron2/wiretype"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// DischargeIssuer service issues caveat discharges when requested.
// Discharger is the interface the client binds and uses.
// Discharger_ExcludingUniversal is the interface without internal framework-added methods
diff --git a/services/security/revoker.vdl.go b/services/security/revoker.vdl.go
index 4e0bdaa..4bda62d 100644
--- a/services/security/revoker.vdl.go
+++ b/services/security/revoker.vdl.go
@@ -19,6 +19,10 @@
// RevocationToken can be presented to a revocation service to revoke a caveat
type RevocationToken [16]byte
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Revoker is the interface for preventing discharges from being issued. The
// dicharger ensures that no discharges will be issued for caveats that
// have been explicitly revoked using this interface. To prevent discharge
diff --git a/services/store/memstore/blackbox/sync_integration_test.go b/services/store/memstore/blackbox/sync_integration_test.go
index 87de9a6..6e56574 100644
--- a/services/store/memstore/blackbox/sync_integration_test.go
+++ b/services/store/memstore/blackbox/sync_integration_test.go
@@ -45,10 +45,10 @@
// Create a sync request
stream := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, raw.Request{})
- cb, err := stream.Recv()
- if err != nil {
- t.Fatalf("Recv() failed: %v", err)
+ if !stream.Advance() {
+ t.Fatalf("Advance() failed: %v", stream.Err())
}
+ cb := stream.Value()
// Update target
PutMutations(t, target, Mutations(cb.Changes))
GC(t, target)
@@ -78,10 +78,10 @@
id3 := Put(t, st, tr, "/a/b", "val3")
Commit(t, tr)
- cb, err := stream.Recv()
- if err != nil {
- t.Fatalf("Recv() failed: %v", err)
+ if !stream.Advance() {
+ t.Fatalf("Advance() failed: %v", stream.Err())
}
+ cb := stream.Value()
// Update target
PutMutations(t, target, Mutations(cb.Changes))
GC(t, target)
@@ -96,10 +96,11 @@
Remove(t, st, tr, "/a/b")
Commit(t, tr)
- cb, err = stream.Recv()
- if err != nil {
- t.Fatalf("Recv() failed: %v", err)
+ if !stream.Advance() {
+ t.Fatalf("Advance() failed: %v", stream.Err())
}
+
+ cb = stream.Value()
// Update target
PutMutations(t, target, Mutations(cb.Changes))
GC(t, target)
diff --git a/services/store/memstore/blackbox/util.go b/services/store/memstore/blackbox/util.go
index 2ff6bf1..569c504 100644
--- a/services/store/memstore/blackbox/util.go
+++ b/services/store/memstore/blackbox/util.go
@@ -2,7 +2,6 @@
import (
"fmt"
- "io"
"io/ioutil"
"os"
"runtime"
@@ -195,17 +194,22 @@
func newPutMutationsStream(mus []raw.Mutation) raw.StoreServicePutMutationsStream {
return &putMutationsStream{
- mus: mus,
+ mus: mus,
+ index: -1,
}
}
-func (s *putMutationsStream) Recv() (raw.Mutation, error) {
- if s.index < len(s.mus) {
- index := s.index
- s.index++
- return s.mus[index], nil
- }
- return nullMutation, io.EOF
+func (s *putMutationsStream) Advance() bool {
+ s.index++
+ return s.index < len(s.mus)
+}
+
+func (s *putMutationsStream) Value() raw.Mutation {
+ return s.mus[s.index]
+}
+
+func (*putMutationsStream) Err() error {
+ return nil
}
func PutMutations(t *testing.T, st *memstore.Store, mus []raw.Mutation) {
diff --git a/services/store/memstore/store.go b/services/store/memstore/store.go
index 7a6202d..bb1a282 100644
--- a/services/store/memstore/store.go
+++ b/services/store/memstore/store.go
@@ -1,8 +1,6 @@
package memstore
import (
- "io"
-
"veyron/runtimes/google/lib/sync"
"veyron/services/store/memstore/state"
@@ -152,24 +150,24 @@
// stream has been closed.
func (st *Store) PutMutations(ctx ipc.ServerContext, stream raw.StoreServicePutMutationsStream) error {
tr := st.newNilTransaction()
- for {
- mu, err := stream.Recv()
- if err == io.EOF {
- if ctx.IsClosed() {
- tr.Abort()
- return ErrRequestCancelled
- }
- break
- }
- if err != nil {
- tr.Abort()
- return err
- }
+ for stream.Advance() {
+ mu := stream.Value()
+
if err := tr.snapshot.PutMutation(mu); err != nil {
tr.Abort()
return err
}
}
+ err := stream.Err()
+ if err != nil {
+ tr.Abort()
+ return err
+ }
+
+ if ctx.IsClosed() {
+ tr.Abort()
+ return ErrRequestCancelled
+ }
return tr.Commit()
}
diff --git a/services/store/memstore/testing/util.go b/services/store/memstore/testing/util.go
index 0553c75..1a5e503 100644
--- a/services/store/memstore/testing/util.go
+++ b/services/store/memstore/testing/util.go
@@ -106,15 +106,22 @@
// storeServicePutMutationsStream implements raw.StoreServicePutMutationsStream
type storeServicePutMutationsStream struct {
- mus <-chan raw.Mutation
+ mus <-chan raw.Mutation
+ value raw.Mutation
}
-func (s *storeServicePutMutationsStream) Recv() (raw.Mutation, error) {
- mu, ok := <-s.mus
- if !ok {
- return mu, io.EOF
- }
- return mu, nil
+func (s *storeServicePutMutationsStream) Advance() bool {
+ var ok bool
+ s.value, ok = <-s.mus
+ return ok
+}
+
+func (s *storeServicePutMutationsStream) Value() raw.Mutation {
+ return s.value
+}
+
+func (s *storeServicePutMutationsStream) Err() error {
+ return nil
}
// storePutMutationsStream implements raw.StorePutMutationsStream
@@ -153,7 +160,7 @@
mus := make(chan raw.Mutation)
err := make(chan error)
go func() {
- err <- putMutationsFn(ctx, &storeServicePutMutationsStream{mus})
+ err <- putMutationsFn(ctx, &storeServicePutMutationsStream{mus: mus})
close(err)
}()
return &storePutMutationsStream{
@@ -197,16 +204,23 @@
// watcherWatchStream implements watch.WatcherWatchStream.
type watcherWatchStream struct {
ctx *CancellableContext
+ value watch.ChangeBatch
input <-chan watch.ChangeBatch
err <-chan error
}
-func (s *watcherWatchStream) Recv() (watch.ChangeBatch, error) {
- cb, ok := <-s.input
- if !ok {
- return cb, io.EOF
- }
- return cb, nil
+func (s *watcherWatchStream) Advance() bool {
+ var ok bool
+ s.value, ok = <-s.input
+ return ok
+}
+
+func (s *watcherWatchStream) Value() watch.ChangeBatch {
+ return s.value
+}
+
+func (*watcherWatchStream) Err() error {
+ return nil
}
func (s *watcherWatchStream) Finish() error {
diff --git a/services/store/memstore/watch/watcher_test.go b/services/store/memstore/watch/watcher_test.go
index 8ef6878..696e773 100644
--- a/services/store/memstore/watch/watcher_test.go
+++ b/services/store/memstore/watch/watcher_test.go
@@ -37,10 +37,10 @@
ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Check that watch detects the changes in the first transaction.
- cb, err := ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb := ws.Value()
changes := cb.Changes
change := changes[0]
if change.Continued {
@@ -59,10 +59,10 @@
// Check that watch detects the changes in the second transaction.
- cb, err = ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb = ws.Value()
changes = cb.Changes
change = changes[0]
if !change.Continued {
@@ -96,10 +96,10 @@
ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
// Check that watch detects the changes in the first transaction.
- cb, err := ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb := ws.Value()
changes := cb.Changes
change := changes[0]
if change.Continued {
@@ -113,11 +113,11 @@
commit(t, tr)
// Check that watch detects the changes in the second transaction.
-
- cb, err = ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb = ws.Value()
+
changes = cb.Changes
change = changes[0]
if !change.Continued {
@@ -150,7 +150,7 @@
commit(t, tr)
// Check that watch processed the first transaction.
- if _, err := ws.Recv(); err != nil {
+ if !ws.Advance() {
t.Error("Expected a change.")
}
@@ -165,8 +165,8 @@
commit(t, tr)
// Check that watch did not processed the second transaction.
- if _, err := ws.Recv(); err != io.EOF {
- t.Errorf("Unexpected error: %v", err)
+ if ws.Advance() || ws.Err() != nil {
+ t.Errorf("Unexpected error: %v", ws.Err())
}
// Check that io.EOF was returned.
@@ -195,7 +195,7 @@
commit(t, tr)
// Check that watch processed the first transaction.
- if _, err := ws.Recv(); err != nil {
+ if !ws.Advance() {
t.Error("Expected a change.")
}
@@ -245,10 +245,11 @@
ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Retrieve the resume marker for the initial state.
- cb, err := ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb := ws.Value()
+
changes := cb.Changes
change := changes[0]
resumeMarker1 := change.ResumeMarker
@@ -262,10 +263,11 @@
ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Check that watch detects the changes in the state and the transaction.
- cb, err = ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb = ws.Value()
+
changes = cb.Changes
change = changes[0]
if change.Continued {
@@ -273,10 +275,12 @@
}
watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post11, true, "val1", watchtesting.EmptyDir)
- cb, err = ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ // Check that watch detects the changes in the state and the transaction.
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb = ws.Value()
+
changes = cb.Changes
change = changes[0]
if !change.Continued {
@@ -320,10 +324,11 @@
ws := watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Retrieve the resume marker for the first transaction.
- cb, err := ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb := ws.Value()
+
changes := cb.Changes
change := changes[0]
resumeMarker1 := change.ResumeMarker
@@ -337,10 +342,11 @@
ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Check that watch detects the changes in the first and second transaction.
- cb, err = ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb = ws.Value()
+
changes = cb.Changes
change = changes[0]
if change.Continued {
@@ -348,10 +354,11 @@
}
watchtesting.ExpectMutationExists(t, changes, id1, storage.NoVersion, post11, true, "val1", watchtesting.EmptyDir)
- cb, err = ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb = ws.Value()
+
changes = cb.Changes
change = changes[0]
if !change.Continued {
@@ -374,10 +381,11 @@
ws = watchtesting.WatchRaw(rootPublicID, w.WatchRaw, req)
// Check that watch detects the changes in the second transaction.
- cb, err = ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb = ws.Value()
+
changes = cb.Changes
change = changes[0]
if !change.Continued {
@@ -429,19 +437,21 @@
post33 := st.Snapshot().Find(id3).Version
// Check that watch announces that the initial state was skipped.
- cb, err := ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb := ws.Value()
+
changes := cb.Changes
change := changes[0]
watchtesting.ExpectInitialStateSkipped(t, change)
// Check that watch detects the changes in the third transaction.
- cb, err = ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb = ws.Value()
+
changes = cb.Changes
change = changes[0]
if !change.Continued {
@@ -509,10 +519,11 @@
req := watch.GlobRequest{Pattern: "..."}
ws := watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
- cb, err := ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb := ws.Value()
+
changes := cb.Changes
change := changes[0]
// Save the ResumeMarker of the change.
@@ -521,10 +532,11 @@
// Start another watch request.
ws = watchtesting.WatchGlobOnPath(rootPublicID, w.WatchGlob, path, req)
- cb, err = ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb = ws.Value()
+
changes = cb.Changes
change = changes[0]
// Expect the same ResumeMarker.
diff --git a/services/store/raw/service.vdl.go b/services/store/raw/service.vdl.go
index ca92493..c5a907b 100644
--- a/services/store/raw/service.vdl.go
+++ b/services/store/raw/service.vdl.go
@@ -13,6 +13,7 @@
"veyron2/storage"
// The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_io "io"
_gen_veyron2 "veyron2"
_gen_context "veyron2/context"
_gen_ipc "veyron2/ipc"
@@ -59,6 +60,10 @@
RawStoreSuffix = ".store.raw"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Store defines a raw interface for the Veyron store. Mutations can be received
// via the Watcher interface, and committed via PutMutation.
// Store is the interface the client binds and uses.
@@ -92,26 +97,65 @@
// Watch in the service interface Store.
type StoreWatchStream interface {
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item watch.ChangeBatch, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() watch.ChangeBatch
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish blocks until the server is done and returns the positional
+ // return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the StoreWatchStream interface that is not exported.
type implStoreWatchStream struct {
clientCall _gen_ipc.Call
+ val watch.ChangeBatch
+ err error
}
-func (c *implStoreWatchStream) Recv() (item watch.ChangeBatch, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implStoreWatchStream) Advance() bool {
+ c.val = watch.ChangeBatch{}
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implStoreWatchStream) Value() watch.ChangeBatch {
+ return c.val
+}
+
+func (c *implStoreWatchStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implStoreWatchStream) Finish() (err error) {
@@ -129,7 +173,7 @@
// Watch in the service interface Store.
type StoreServiceWatchStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item watch.ChangeBatch) error
}
@@ -146,21 +190,38 @@
// PutMutations in the service interface Store.
type StorePutMutationsStream interface {
- // Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // Send places the item onto the output stream, blocking if there is no
+ // buffer space available. Calls to Send after having called CloseSend
+ // or Cancel will fail. Any blocked Send calls will be unblocked upon
+ // calling Cancel.
Send(item Mutation) error
- // CloseSend indicates to the server that no more items will be sent; server
- // Recv calls will receive io.EOF after all sent items. Subsequent calls to
- // Send on the client will fail. This is an optional call - it's used by
- // streaming clients that need the server to receive the io.EOF terminator.
+ // CloseSend indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items. This is
+ // an optional call - it's used by streaming clients that need the
+ // server to receive the io.EOF terminator before the client calls
+ // Finish (for example, if the client needs to continue receiving items
+ // from the server after having finished sending).
+ // Calls to CloseSend after having called Cancel will fail.
+ // Like Send, CloseSend blocks when there's no buffer space available.
CloseSend() error
- // Finish closes the stream and returns the positional return values for
+ // Finish performs the equivalent of CloseSend, then blocks until the server
+ // is done, and returns the positional return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
@@ -192,19 +253,52 @@
// PutMutations in the service interface Store.
type StoreServicePutMutationsStream interface {
- // Recv fills itemptr with the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item Mutation, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
+
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ //
+ // In general, Value is undefined if the underlying collection
+ // of elements changes while iteration is in progress. If
+ // <DataProvider> supports concurrent modification, it should
+ // document its behavior.
+ Value() Mutation
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
}
// Implementation of the StoreServicePutMutationsStream interface that is not exported.
type implStoreServicePutMutationsStream struct {
serverCall _gen_ipc.ServerCall
+ val Mutation
+ err error
}
-func (s *implStoreServicePutMutationsStream) Recv() (item Mutation, err error) {
- err = s.serverCall.Recv(&item)
- return
+func (s *implStoreServicePutMutationsStream) Advance() bool {
+ s.val = Mutation{}
+ s.err = s.serverCall.Recv(&s.val)
+ return s.err == nil
+}
+
+func (s *implStoreServicePutMutationsStream) Value() Mutation {
+ return s.val
+}
+
+func (s *implStoreServicePutMutationsStream) Err() error {
+ if s.err == _gen_io.EOF {
+ return nil
+ }
+ return s.err
}
// BindStore returns the client stub implementing the Store
diff --git a/services/store/server/server_test.go b/services/store/server/server_test.go
index 15fb5e0..e7bd4f7 100644
--- a/services/store/server/server_test.go
+++ b/services/store/server/server_test.go
@@ -349,10 +349,10 @@
// Check that watch detects the changes in the first transaction.
{
- cb, err := ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb := ws.Value()
changes := cb.Changes
change := changes[0]
if change.Continued {
@@ -384,10 +384,10 @@
// Check that watch detects the changes in the second transaction.
{
- cb, err := ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb := ws.Value()
changes := cb.Changes
change := changes[0]
if !change.Continued {
@@ -435,10 +435,10 @@
// The watch on / should send a change on /.
{
- cb, err := ws1.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws1.Advance() {
+ t.Error("Advance() failed: %v", ws1.Err())
}
+ cb := ws1.Value()
changes := cb.Changes
change := changes[0]
if change.Continued {
@@ -470,10 +470,10 @@
// The watch on / should send changes on / and /a.
{
- cb, err := ws1.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws1.Advance() {
+ t.Error("Advance() failed: %v", ws1.Err())
}
+ cb := ws1.Value()
changes := cb.Changes
change := changes[0]
if !change.Continued {
@@ -488,10 +488,10 @@
}
// The watch on /a should send a change on /a.
{
- cb, err := ws2.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws2.Advance() {
+ t.Error("Advance() failed: %v", ws2.Err())
}
+ cb := ws2.Value()
changes := cb.Changes
change := changes[0]
if change.Continued {
@@ -532,10 +532,10 @@
// Check that watch detects the changes in the first transaction.
{
- cb, err := ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb := ws.Value()
changes := cb.Changes
change := changes[0]
if change.Continued {
@@ -567,10 +567,10 @@
// Check that watch detects the changes in the second transaction.
{
- cb, err := ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb := ws.Value()
changes := cb.Changes
change := changes[0]
if !change.Continued {
@@ -601,10 +601,10 @@
// Check that watch detects the changes in the third transaction.
{
- cb, err := ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb := ws.Value()
changes := cb.Changes
change := changes[0]
if change.Continued {
@@ -615,10 +615,10 @@
// Check that watch detects the garbage collection of /a.
{
- cb, err := ws.Recv()
- if err != nil {
- t.Error("Recv() failed: %v", err)
+ if !ws.Advance() {
+ t.Error("Advance() failed: %v", ws.Err())
}
+ cb := ws.Value()
changes := cb.Changes
change := changes[0]
if change.Continued {
diff --git a/tools/binary/impl/impl_test.go b/tools/binary/impl/impl_test.go
index 5589c4a..c8ef111 100644
--- a/tools/binary/impl/impl_test.go
+++ b/tools/binary/impl/impl_test.go
@@ -63,10 +63,7 @@
func (s *server) Upload(_ ipc.ServerContext, _ int32, stream repository.BinaryServiceUploadStream) error {
vlog.Infof("Upload() was called. suffix=%v", s.suffix)
- for {
- if _, err := stream.Recv(); err != nil {
- break
- }
+ for stream.Advance() {
}
return nil
}
diff --git a/tools/mounttable/impl/impl.go b/tools/mounttable/impl/impl.go
index 6986e4f..6c88524 100644
--- a/tools/mounttable/impl/impl.go
+++ b/tools/mounttable/impl/impl.go
@@ -2,7 +2,6 @@
import (
"fmt"
- "io"
"time"
"veyron/lib/cmdline"
@@ -49,20 +48,19 @@
if err != nil {
return err
}
- for {
- buf, err := stream.Recv()
- if err != nil {
- if err == io.EOF {
- break
- }
- return fmt.Errorf("recv error: %v", err)
- }
+ for stream.Advance() {
+ buf := stream.Value()
+
fmt.Fprint(cmd.Stdout(), buf.Name)
for _, s := range buf.Servers {
fmt.Fprintf(cmd.Stdout(), " %s (TTL %s)", s.Server, time.Duration(s.TTL)*time.Second)
}
fmt.Fprintln(cmd.Stdout())
}
+
+ if err := stream.Err(); err != nil {
+ return fmt.Errorf("advance error: %v", err)
+ }
err = stream.Finish()
if err != nil {
return fmt.Errorf("finish error: %v", err)
diff --git a/tools/vrpc/test_base/test_base.vdl.go b/tools/vrpc/test_base/test_base.vdl.go
index 9332b43..6f9b537 100644
--- a/tools/vrpc/test_base/test_base.vdl.go
+++ b/tools/vrpc/test_base/test_base.vdl.go
@@ -5,6 +5,7 @@
import (
// The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_io "io"
_gen_veyron2 "veyron2"
_gen_context "veyron2/context"
_gen_ipc "veyron2/ipc"
@@ -19,6 +20,10 @@
Y int32
}
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// TypeTester is the interface the client binds and uses.
// TypeTester_ExcludingUniversal is the interface without internal framework-added methods
// to enable embedding without method collisions. Not to be used directly by clients.
@@ -86,26 +91,64 @@
// StreamingOutput in the service interface TypeTester.
type TypeTesterStreamingOutputStream interface {
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item bool, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() bool
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish blocks until the server is done and returns the positional
+ // return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the TypeTesterStreamingOutputStream interface that is not exported.
type implTypeTesterStreamingOutputStream struct {
clientCall _gen_ipc.Call
+ val bool
+ err error
}
-func (c *implTypeTesterStreamingOutputStream) Recv() (item bool, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implTypeTesterStreamingOutputStream) Advance() bool {
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implTypeTesterStreamingOutputStream) Value() bool {
+ return c.val
+}
+
+func (c *implTypeTesterStreamingOutputStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implTypeTesterStreamingOutputStream) Finish() (err error) {
@@ -123,7 +166,7 @@
// StreamingOutput in the service interface TypeTester.
type TypeTesterServiceStreamingOutputStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item bool) error
}