Merge "(runtime|services): Implement new AllGlobberX and ChildrenGlobberX"
diff --git a/lib/stats/stats_test.go b/lib/stats/stats_test.go
index d95a660..4a9cfbd 100644
--- a/lib/stats/stats_test.go
+++ b/lib/stats/stats_test.go
@@ -5,7 +5,10 @@
package stats_test
import (
+ "path/filepath"
"reflect"
+ "runtime"
+ "sync"
"testing"
"time"
@@ -357,6 +360,12 @@
libstats.NewFloatFunc("testing/float", func() float64 { return 456.789 })
libstats.NewStringFunc("testing/string", func() string { return "Hello World" })
ch := make(chan int64, 5)
+
+ libstats.NewIntegerFunc("testing/timeout", func() int64 {
+ time.Sleep(time.Second)
+ return -1
+ })
+
libstats.NewIntegerFunc("testing/slowint", func() int64 {
return <-ch
})
@@ -368,22 +377,46 @@
{"testing/integer", int64(123)},
{"testing/float", float64(456.789)},
{"testing/string", "Hello World"},
- {"testing/slowint", nil}, // Times out
+ {"testing/timeout", nil}, // Times out
}
for _, tc := range testcases {
checkVariable(t, tc.name, tc.expected)
}
- checkVariable(t, "testing/slowint", nil) // Times out
- checkVariable(t, "testing/slowint", nil) // Times out
+
+ then := time.Now()
+ checkVariable(t, "testing/timeout", nil) // Times out
+ if took := time.Now().Sub(then); took < 100*time.Millisecond {
+ t.Fatalf("expected a timeout: took %s", took)
+ }
+ checkVariable(t, "testing/timeout", nil) // Times out
+ if took := time.Now().Sub(then); took < 100*time.Millisecond {
+ t.Fatalf("expected a timeout: took %s", took)
+ }
+
ch <- int64(0)
+ then = time.Now()
checkVariable(t, "testing/slowint", int64(0)) // New value
- checkVariable(t, "testing/slowint", int64(0)) // Times out
+ if took := time.Now().Sub(then); took > 100*time.Millisecond {
+ t.Fatalf("unexpected timeout: took %s", took)
+ }
for i := 1; i <= 5; i++ {
ch <- int64(i)
}
for i := 1; i <= 5; i++ {
checkVariable(t, "testing/slowint", int64(i)) // New value each time
}
+
+ // Parallel access
+ var wg sync.WaitGroup
+ for i := 1; i <= 5; i++ {
+ wg.Add(1)
+ go func() {
+ checkVariable(t, "testing/slowint", int64(555))
+ wg.Done()
+ }()
+ }
+ ch <- int64(555)
+ wg.Wait()
}
func checkVariable(t *testing.T, name string, expected interface{}) {
@@ -392,7 +425,8 @@
t.Errorf("unexpected error for %q: %v", name, err)
}
if got != expected {
- t.Errorf("unexpected result for %q. Got %v, want %v", name, got, expected)
+ _, file, line, _ := runtime.Caller(1)
+ t.Errorf("%s:%d: unexpected result for %q. Got %v, want %v", filepath.Base(file), line, name, got, expected)
}
}
diff --git a/runtime/internal/flow/conn/conn.go b/runtime/internal/flow/conn/conn.go
new file mode 100644
index 0000000..8f6a122
--- /dev/null
+++ b/runtime/internal/flow/conn/conn.go
@@ -0,0 +1,78 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package conn
+
+import (
+ "io"
+
+ "v.io/v23/context"
+ "v.io/v23/flow"
+ "v.io/v23/naming"
+ "v.io/v23/rpc/version"
+ "v.io/v23/security"
+)
+
+// FlowHandlers process accepted flows.
+type FlowHandler interface {
+ // HandleFlow processes an accepted flow.
+ HandleFlow(flow.Flow) error
+}
+
+// Conns are a multiplexing encrypted channels that can host Flows.
+type Conn struct {
+}
+
+// Ensure that *Conn implements flow.Conn
+var _ flow.Conn = &Conn{}
+
+// NewDialed dials a new Conn on the given conn.
+// TODO(mattr): Add the flow.BlessingsForPeer (or whatever we
+// called it) as the last param once that is added.
+func NewDialed(
+ ctx *context.T,
+ conn io.ReadWriter,
+ principal security.Principal,
+ local, remote naming.Endpoint,
+ versions version.RPCVersionRange,
+ handler FlowHandler) (*Conn, error) {
+ return nil, nil
+}
+
+// NewAccepted accepts a new Conn on the given conn.
+func NewAccepted(
+ ctx *context.T,
+ conn io.ReadWriter,
+ principal security.Principal,
+ local naming.Endpoint,
+ lBlessings security.Blessings,
+ versions version.RPCVersionRange,
+ handler FlowHandler,
+ skipDischarges bool) (*Conn, error) {
+ return nil, nil
+}
+
+// Dial dials a new flow on the Conn.
+func (c *Conn) Dial() (flow.Flow, error) { return nil, nil }
+
+// Closed returns a channel that will be closed after the Conn is shutdown.
+// After this channel is closed it is guaranteed that all Dial calls will fail
+// with an error and no more flows will be sent to the FlowHandler.
+func (c *Conn) Closed() <-chan struct{} { return nil }
+
+// LocalEndpoint returns the local vanadium Endpoint
+func (c *Conn) LocalEndpoint() naming.Endpoint { return nil }
+
+// RemoteEndpoint returns the remote vanadium Endpoint
+func (c *Conn) RemoteEndpoint() naming.Endpoint { return nil }
+
+// DialerPublicKey returns the public key presented by the dialer during authentication.
+func (c *Conn) DialerPublicKey() security.PublicKey { return nil }
+
+// AcceptorBlessings returns the blessings presented by the acceptor during authentication.
+func (c *Conn) AcceptorBlessings() security.Blessings { return security.Blessings{} }
+
+// AcceptorDischarges returns the discharges presented by the acceptor during authentication.
+// Discharges are organized in a map keyed by the discharge-identifier.
+func (c *Conn) AcceptorDischarges() map[string]security.Discharge { return nil }
diff --git a/services/wspr/internal/app/app.go b/services/wspr/internal/app/app.go
index 7be91aa..6edb6c0 100644
--- a/services/wspr/internal/app/app.go
+++ b/services/wspr/internal/app/app.go
@@ -374,22 +374,6 @@
// SendVeyronRequest makes a vanadium request for the given flowId. If signal is non-nil, it will receive
// the call object after it has been constructed.
func (c *Controller) sendVeyronRequest(ctx *context.T, id int32, msg *RpcRequest, inArgs []interface{}, w lib.ClientWriter, stream *outstandingStream, span vtrace.Span) {
- sig, err := c.getSignature(ctx, msg.Name)
- if err != nil {
- w.Error(err)
- return
- }
- methName := lib.UppercaseFirstCharacter(msg.Method)
- methSig, ok := signature.FirstMethod(sig, methName)
- if !ok {
- w.Error(fmt.Errorf("method %q not found in signature: %#v", methName, sig))
- return
- }
- if len(methSig.InArgs) != len(inArgs) {
- w.Error(fmt.Errorf("invalid number of arguments, expected: %v, got:%v", methSig, *msg))
- return
- }
-
// We have to make the start call synchronous so we can make sure that we populate
// the call map before we can Handle a recieve call.
call, err := c.startCall(ctx, w, msg, inArgs)