veyron/...: Changed the generated ipc stream to match our one true
stream api.
Change-Id: Ice4911926bbaba7b3a11c68579a9f8b34c5db460
diff --git a/runtimes/google/ipc/benchmarks/client.go b/runtimes/google/ipc/benchmarks/client.go
index 8117e8f..6d8bfc6 100644
--- a/runtimes/google/ipc/benchmarks/client.go
+++ b/runtimes/google/ipc/benchmarks/client.go
@@ -62,8 +62,8 @@
}
done := make(chan error, 1)
go func() {
- for {
- chunk, err := stream.Recv()
+ for stream.Advance() {
+ chunk := stream.Value()
if err == io.EOF {
done <- nil
return
@@ -77,6 +77,8 @@
return
}
}
+
+ done <- stream.Err()
}()
for j := 0; j < messageCount; j++ {
if err = stream.Send(payload); err != nil {
diff --git a/runtimes/google/ipc/benchmarks/server.go b/runtimes/google/ipc/benchmarks/server.go
index bd5eb0c..0d1c95e 100644
--- a/runtimes/google/ipc/benchmarks/server.go
+++ b/runtimes/google/ipc/benchmarks/server.go
@@ -1,8 +1,6 @@
package benchmarks
import (
- "io"
-
sflag "veyron/security/flag"
"veyron2/ipc"
@@ -19,19 +17,14 @@
}
func (i *impl) EchoStream(ctx ipc.ServerContext, stream BenchmarkServiceEchoStreamStream) error {
- for {
- chunk, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return err
- }
+ for stream.Advance() {
+ chunk := stream.Value()
if err := stream.Send(chunk); err != nil {
return err
}
}
- return nil
+
+ return stream.Err()
}
// StartServer starts a server that implements the Benchmark service. The
diff --git a/runtimes/google/ipc/benchmarks/service.vdl.go b/runtimes/google/ipc/benchmarks/service.vdl.go
index 2356536..eddbfa9 100644
--- a/runtimes/google/ipc/benchmarks/service.vdl.go
+++ b/runtimes/google/ipc/benchmarks/service.vdl.go
@@ -7,6 +7,7 @@
import (
// The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_io "io"
_gen_veyron2 "veyron2"
_gen_context "veyron2/context"
_gen_ipc "veyron2/ipc"
@@ -16,6 +17,10 @@
_gen_wiretype "veyron2/wiretype"
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Benchmark is the interface the client binds and uses.
// Benchmark_ExcludingUniversal is the interface without internal framework-added methods
// to enable embedding without method collisions. Not to be used directly by clients.
@@ -43,31 +48,64 @@
// EchoStream in the service interface Benchmark.
type BenchmarkEchoStreamStream interface {
- // Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // Send places the item onto the output stream, blocking if there is no
+ // buffer space available. Calls to Send after having called CloseSend
+ // or Cancel will fail. Any blocked Send calls will be unblocked upon
+ // calling Cancel.
Send(item []byte) error
- // CloseSend indicates to the server that no more items will be sent; server
- // Recv calls will receive io.EOF after all sent items. Subsequent calls to
- // Send on the client will fail. This is an optional call - it's used by
- // streaming clients that need the server to receive the io.EOF terminator.
+ // CloseSend indicates to the server that no more items will be sent;
+ // server Recv calls will receive io.EOF after all sent items. This is
+ // an optional call - it's used by streaming clients that need the
+ // server to receive the io.EOF terminator before the client calls
+ // Finish (for example, if the client needs to continue receiving items
+ // from the server after having finished sending).
+ // Calls to CloseSend after having called Cancel will fail.
+ // Like Send, CloseSend blocks when there's no buffer space available.
CloseSend() error
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item []byte, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() []byte
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish performs the equivalent of CloseSend, then blocks until the server
+ // is done, and returns the positional return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the BenchmarkEchoStreamStream interface that is not exported.
type implBenchmarkEchoStreamStream struct {
clientCall _gen_ipc.Call
+ val []byte
+ err error
}
func (c *implBenchmarkEchoStreamStream) Send(item []byte) error {
@@ -78,9 +116,20 @@
return c.clientCall.CloseSend()
}
-func (c *implBenchmarkEchoStreamStream) Recv() (item []byte, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implBenchmarkEchoStreamStream) Advance() bool {
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implBenchmarkEchoStreamStream) Value() []byte {
+ return c.val
+}
+
+func (c *implBenchmarkEchoStreamStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implBenchmarkEchoStreamStream) Finish() (err error) {
@@ -98,26 +147,58 @@
// EchoStream in the service interface Benchmark.
type BenchmarkServiceEchoStreamStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item []byte) error
- // Recv fills itemptr with the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item []byte, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
+
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ //
+ // In general, Value is undefined if the underlying collection
+ // of elements changes while iteration is in progress. If
+ // <DataProvider> supports concurrent modification, it should
+ // document its behavior.
+ Value() []byte
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
}
// Implementation of the BenchmarkServiceEchoStreamStream interface that is not exported.
type implBenchmarkServiceEchoStreamStream struct {
serverCall _gen_ipc.ServerCall
+ val []byte
+ err error
}
func (s *implBenchmarkServiceEchoStreamStream) Send(item []byte) error {
return s.serverCall.Send(item)
}
-func (s *implBenchmarkServiceEchoStreamStream) Recv() (item []byte, err error) {
- err = s.serverCall.Recv(&item)
- return
+func (s *implBenchmarkServiceEchoStreamStream) Advance() bool {
+ s.err = s.serverCall.Recv(&s.val)
+ return s.err == nil
+}
+
+func (s *implBenchmarkServiceEchoStreamStream) Value() []byte {
+ return s.val
+}
+
+func (s *implBenchmarkServiceEchoStreamStream) Err() error {
+ if s.err == _gen_io.EOF {
+ return nil
+ }
+ return s.err
}
// BindBenchmark returns the client stub implementing the Benchmark
diff --git a/runtimes/google/ipc/jni/arg_getter.go b/runtimes/google/ipc/jni/arg_getter.go
index fc6bf3b..04efd0b 100644
--- a/runtimes/google/ipc/jni/arg_getter.go
+++ b/runtimes/google/ipc/jni/arg_getter.go
@@ -97,9 +97,9 @@
mArgs.streamSendType = mSend.Type.In(0)
}
// Get the stream recv type.
- if mRecv, ok := stream.MethodByName("Recv"); ok {
- if mRecv.Type.NumOut() != 2 {
- return fmt.Errorf("Illegal number of arguments for Recv method in stream %v", stream)
+ if mRecv, ok := stream.MethodByName("Value"); ok {
+ if mRecv.Type.NumOut() != 1 {
+ return fmt.Errorf("Illegal number of arguments for Value method in stream %v", stream)
}
mArgs.streamRecvType = mRecv.Type.Out(0)
}
diff --git a/runtimes/google/rt/mgmt_test.go b/runtimes/google/rt/mgmt_test.go
index bb78645..7f314e9 100644
--- a/runtimes/google/rt/mgmt_test.go
+++ b/runtimes/google/rt/mgmt_test.go
@@ -2,7 +2,6 @@
import (
"fmt"
- "io"
"os"
"reflect"
"strings"
@@ -305,17 +304,19 @@
t.Fatalf("Got error: %v", err)
}
expectTask := func(progress, goal int32) {
- if task, err := stream.Recv(); err != nil {
- t.Fatalf("unexpected streaming error: %q", err)
- } else if task.Progress != progress || task.Goal != goal {
+ if !stream.Advance() {
+ t.Fatalf("unexpected streaming error: %q", stream.Err())
+ }
+ task := stream.Value()
+ if task.Progress != progress || task.Goal != goal {
t.Errorf("Got (%d, %d), want (%d, %d)", task.Progress, task.Goal, progress, goal)
}
}
expectTask(0, 10)
expectTask(2, 10)
expectTask(7, 10)
- if task, err := stream.Recv(); err != io.EOF {
- t.Errorf("Expected (nil, EOF), got (%v, %v) instead", task, err)
+ if stream.Advance() || stream.Err() != nil {
+ t.Errorf("Expected EOF, got (%v, %v) instead", stream.Value(), stream.Err())
}
if err := stream.Finish(); err != nil {
t.Errorf("Got error %v", err)
diff --git a/runtimes/google/vsync/initiator.go b/runtimes/google/vsync/initiator.go
index af7ece9..76aa7da 100644
--- a/runtimes/google/vsync/initiator.go
+++ b/runtimes/google/vsync/initiator.go
@@ -3,7 +3,6 @@
import (
"errors"
"fmt"
- "io"
"math/rand"
"strings"
"time"
@@ -256,14 +255,8 @@
// Compute the minimum generation for every device in this set.
minGens := GenVector{}
- for {
- rec, err := stream.Recv()
- if err == io.EOF {
- break
- }
- if err != nil {
- return GenVector{}, err
- }
+ for stream.Advance() {
+ rec := stream.Value()
if err := i.insertRecInLogAndDag(&rec); err != nil {
return GenVector{}, err
@@ -292,6 +285,9 @@
}
}
+ if err := stream.Err(); err != nil {
+ return GenVector{}, err
+ }
if err := i.createGenMetadataBatch(newGens, orderGens); err != nil {
return GenVector{}, err
}
diff --git a/runtimes/google/vsync/util_test.go b/runtimes/google/vsync/util_test.go
index bd9356c..5699fbc 100644
--- a/runtimes/google/vsync/util_test.go
+++ b/runtimes/google/vsync/util_test.go
@@ -4,7 +4,6 @@
import (
"container/list"
"fmt"
- "io"
"os"
"time"
@@ -36,7 +35,8 @@
// dummyStream struct emulates stream of log records received from RPC.
type dummyStream struct {
- l *list.List
+ l *list.List
+ value LogRec
}
func newStream() *dummyStream {
@@ -46,14 +46,20 @@
return ds
}
-func (ds *dummyStream) Recv() (LogRec, error) {
+func (ds *dummyStream) Advance() bool {
if ds.l.Len() > 0 {
- item := ds.l.Remove(ds.l.Front()).(LogRec)
- return item, nil
+ ds.value = ds.l.Remove(ds.l.Front()).(LogRec)
+ return true
}
- return LogRec{}, io.EOF
+ return false
}
+func (ds *dummyStream) Value() LogRec {
+ return ds.value
+}
+
+func (*dummyStream) Err() error { return nil }
+
func (ds *dummyStream) Finish() (GenVector, error) {
return GenVector{}, nil
}
diff --git a/runtimes/google/vsync/vsync.vdl.go b/runtimes/google/vsync/vsync.vdl.go
index f81a4b7..7dcc5a4 100644
--- a/runtimes/google/vsync/vsync.vdl.go
+++ b/runtimes/google/vsync/vsync.vdl.go
@@ -9,6 +9,7 @@
"veyron2/storage"
// The non-user imports are prefixed with "_gen_" to prevent collisions.
+ _gen_io "io"
_gen_veyron2 "veyron2"
_gen_context "veyron2/context"
_gen_ipc "veyron2/ipc"
@@ -81,6 +82,10 @@
LinkRec = byte(1)
)
+// TODO(bprosnitz) Remove this line once signatures are updated to use typevals.
+// It corrects a bug where _gen_wiretype is unused in VDL pacakges where only bootstrap types are used on interfaces.
+const _ = _gen_wiretype.TypeIDInvalid
+
// Sync allows a device to GetDeltas from another device.
// Sync is the interface the client binds and uses.
// Sync_ExcludingUniversal is the interface without internal framework-added methods
@@ -107,26 +112,65 @@
// GetDeltas in the service interface Sync.
type SyncGetDeltasStream interface {
- // Recv returns the next item in the input stream, blocking until
- // an item is available. Returns io.EOF to indicate graceful end of input.
- Recv() (item LogRec, err error)
+ // Advance stages an element so the client can retrieve it
+ // with Value. Advance returns true iff there is an
+ // element to retrieve. The client must call Advance before
+ // calling Value. The client must call Cancel if it does
+ // not iterate through all elements (i.e. until Advance
+ // returns false). Advance may block if an element is not
+ // immediately available.
+ Advance() bool
- // Finish closes the stream and returns the positional return values for
+ // Value returns the element that was staged by Advance.
+ // Value may panic if Advance returned false or was not
+ // called at all. Value does not block.
+ Value() LogRec
+
+ // Err returns a non-nil error iff the stream encountered
+ // any errors. Err does not block.
+ Err() error
+
+ // Finish blocks until the server is done and returns the positional
+ // return values for call.
+ //
+ // If Cancel has been called, Finish will return immediately; the output of
+ // Finish could either be an error signalling cancelation, or the correct
+ // positional return values from the server depending on the timing of the
// call.
+ //
+ // Calling Finish is mandatory for releasing stream resources, unless Cancel
+ // has been called or any of the other methods return a non-EOF error.
+ // Finish should be called at most once.
Finish() (reply GenVector, err error)
- // Cancel cancels the RPC, notifying the server to stop processing.
+ // Cancel cancels the RPC, notifying the server to stop processing. It
+ // is safe to call Cancel concurrently with any of the other stream methods.
+ // Calling Cancel after Finish has returned is a no-op.
Cancel()
}
// Implementation of the SyncGetDeltasStream interface that is not exported.
type implSyncGetDeltasStream struct {
clientCall _gen_ipc.Call
+ val LogRec
+ err error
}
-func (c *implSyncGetDeltasStream) Recv() (item LogRec, err error) {
- err = c.clientCall.Recv(&item)
- return
+func (c *implSyncGetDeltasStream) Advance() bool {
+ c.val = LogRec{}
+ c.err = c.clientCall.Recv(&c.val)
+ return c.err == nil
+}
+
+func (c *implSyncGetDeltasStream) Value() LogRec {
+ return c.val
+}
+
+func (c *implSyncGetDeltasStream) Err() error {
+ if c.err == _gen_io.EOF {
+ return nil
+ }
+ return c.err
}
func (c *implSyncGetDeltasStream) Finish() (reply GenVector, err error) {
@@ -144,7 +188,7 @@
// GetDeltas in the service interface Sync.
type SyncServiceGetDeltasStream interface {
// Send places the item onto the output stream, blocking if there is no buffer
- // space available.
+ // space available. If the client has canceled, an error is returned.
Send(item LogRec) error
}
diff --git a/runtimes/google/vsync/watcher.go b/runtimes/google/vsync/watcher.go
index f911231..aa4398f 100644
--- a/runtimes/google/vsync/watcher.go
+++ b/runtimes/google/vsync/watcher.go
@@ -7,6 +7,7 @@
import (
"fmt"
+ "io"
"time"
"veyron/services/store/raw"
@@ -123,27 +124,29 @@
// If the stream is closed, distinguish between the cases of end-of-stream vs Syncd canceling
// the stream to trigger a clean exit.
func (w *syncWatcher) processWatchStream(stream watch.GlobWatcherWatchGlobStream) {
- for {
- changes, err := stream.Recv()
- if err != nil {
- if w.syncd.isSyncClosing() {
- vlog.VI(1).Info("processWatchStream: exiting, Syncd closed its channel: ", err)
- } else {
- vlog.VI(1).Info("processWatchStream: RPC stream error, re-issue Watch(): ", err)
- }
- return
- }
+ for stream.Advance() {
+ changes := stream.Value()
// Timestamp of these changes arriving at the Sync server.
syncTime := time.Now().UnixNano()
- if err = w.processChanges(changes, syncTime); err != nil {
+ if err := w.processChanges(changes, syncTime); err != nil {
// TODO(rdaoud): don't crash, instead add retry policies to attempt some degree of
// self-healing from a data corruption where feasible, otherwise quarantine this device
// from the cluster and stop Syncd to avoid propagating data corruptions.
vlog.Fatal("processWatchStream:", err)
}
}
+
+ err := stream.Err()
+ if err == nil {
+ err = io.EOF
+ }
+ if w.syncd.isSyncClosing() {
+ vlog.VI(1).Info("processWatchStream: exiting, Syncd closed its channel: ", err)
+ } else {
+ vlog.VI(1).Info("processWatchStream: RPC stream error, re-issue Watch(): ", err)
+ }
}
// processChanges applies the batch of changes (object mutations) received from the Watch API.
diff --git a/runtimes/google/vsync/watcher_test.go b/runtimes/google/vsync/watcher_test.go
index 6eec8e4..b9d8a79 100644
--- a/runtimes/google/vsync/watcher_test.go
+++ b/runtimes/google/vsync/watcher_test.go
@@ -5,7 +5,6 @@
import (
"bytes"
"fmt"
- "io"
"os"
"testing"
"time"
@@ -73,6 +72,7 @@
// fakeStream is used to simulate the reply stream of the Watch() API.
type fakeStream struct {
canceled chan struct{}
+ err error
}
func newFakeStream() *fakeStream {
@@ -81,30 +81,35 @@
return s
}
-func (s *fakeStream) Recv() (watch.ChangeBatch, error) {
- var empty watch.ChangeBatch
+func (s *fakeStream) Advance() bool {
// If "failRecv" is set, simulate a failed call.
if info.failRecv {
info.failRecvCount++
- return empty, fmt.Errorf("fake recv error on fake stream: %d", info.failRecvCount)
+ s.err = fmt.Errorf("fake recv error on fake stream: %d", info.failRecvCount)
+ return false
}
// If "eofRecv" is set, simulate a closed stream and make sure the next Recv() call blocks.
if info.eofRecv {
info.eofRecv, info.blockRecv = false, true
- return empty, io.EOF
+ s.err = nil
+ return false
}
// If "blockRecv" is set, simulate blocking the call until the stream is canceled.
if info.blockRecv {
close(recvBlocked)
<-s.canceled
- return empty, io.EOF
+ s.err = nil
+ return false
}
-
// Otherwise return a batch of changes, and make sure the next Recv() call returns EOF on the stream.
// Adjust the resume marker of the change records to follow the one given to the Watch request.
info.eofRecv = true
+ return true
+}
+
+func (s *fakeStream) Value() watch.ChangeBatch {
changes := getChangeBatch()
var lastCount byte
@@ -121,9 +126,12 @@
}
}
- return changes, nil
+ return changes
}
+func (s *fakeStream) Err() error {
+ return s.err
+}
func (s *fakeStream) Finish() error {
return nil
}