Vango: Add a function to try doing BT rpcs, while advertising over BLE.
Change-Id: Ib68e8631dd3070a3797e6d9e709781ad8a7cc4f8
diff --git a/impl/google/services/vango/funcs.go b/impl/google/services/vango/funcs.go
index 6e097fc..15ce716 100644
--- a/impl/google/services/vango/funcs.go
+++ b/impl/google/services/vango/funcs.go
@@ -14,6 +14,7 @@
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/naming"
+ "v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/security"
libdiscovery "v.io/x/ref/lib/discovery"
@@ -36,13 +37,14 @@
// Users must add function entries to this map and rebuild lib/android-lib in
// the vanadium java repository.
vangoFuncs = map[string]func(*context.T, io.Writer) error{
- "tcp-client": tcpClientFunc,
- "tcp-server": tcpServerFunc,
- "bt-client": btClientFunc,
- "bt-server": btServerFunc,
- "ble-client": bleClientFunc,
- "ble-server": bleServerFunc,
- "all": AllFunc,
+ "tcp-client": tcpClientFunc,
+ "tcp-server": tcpServerFunc,
+ "bt-client": btClientFunc,
+ "bt-server": btServerFunc,
+ "ble-client": bleClientFunc,
+ "ble-server": bleServerFunc,
+ "all": AllFunc,
+ "btdiscovery": btAndDiscoveryFunc,
}
)
@@ -73,6 +75,133 @@
return runClient(ctx, bleServerName)
}
+func btAndDiscoveryFunc(ctx *context.T, w io.Writer) error {
+ bothf := func(ctx *context.T, w io.Writer, format string, args ...interface{}) {
+ fmt.Fprintf(w, format, args...)
+ ctx.Infof(format, args...)
+ }
+ defer bothf(ctx, w, "finishing!")
+
+ dis, err := v23.NewDiscovery(ctx)
+ if err != nil {
+ bothf(ctx, w, "Can't create discovery %v", err)
+ return err
+ }
+
+ ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{{Protocol: "bt", Address: "/0"}}})
+ _, server, err := v23.WithNewServer(ctx, "", &echoServer{}, security.AllowEveryone())
+ if err != nil {
+ bothf(ctx, w, "Can't create server %v", err)
+ return err
+ }
+ ctx.Infof("Server listening on %v", server.Status().Endpoints)
+ ctx.Infof("Server listen errors: %v", server.Status().ListenErrors)
+
+ interfaces := []string{
+ "v.io/x/jni/impl/google/services/vango/Echo",
+ "v.io/x/jni/impl/google/services/vango/Echo2",
+ "v.io/x/jni/impl/google/services/vango/Echo3",
+ "v.io/x/jni/impl/google/services/vango/Echo4",
+ }
+ type adstate struct {
+ ad *discovery.Advertisement
+ stop func()
+ }
+ ads := []adstate{}
+ for _, name := range interfaces {
+ ad := &discovery.Advertisement{
+ InterfaceName: name,
+ Attributes: discovery.Attributes{
+ "one": "A value of some kind",
+ "two": "Yet another value",
+ "three": "More and more",
+ "four": "This is insane",
+ },
+ }
+ nctx, ncancel := context.WithCancel(ctx)
+ ch, err := libdiscovery.AdvertiseServer(nctx, dis, server, "", ad, nil)
+ if err != nil {
+ bothf(nctx, w, "Can't advertise server %v", err)
+ return err
+ }
+
+ stop := func() {
+ ncancel()
+ <-ch
+ }
+ ads = append(ads, adstate{ad, stop})
+ }
+
+ type updateState struct {
+ ch <-chan discovery.Update
+ stop func()
+ }
+ var updates []updateState
+ for _, name := range interfaces {
+ nctx, ncancel := context.WithCancel(ctx)
+ u, err := dis.Scan(nctx, `v.InterfaceName="`+name+`"`)
+ if err != nil {
+ bothf(nctx, w, "Can't scan %v", err)
+ return err
+ }
+ stop := func() {
+ ncancel()
+ }
+ updates = append(updates, updateState{u, stop})
+ }
+
+ for _, u := range updates[1:] {
+ go func(up updateState) {
+ for _ = range up.ch {
+ }
+ }(u)
+ }
+
+ makeopt := func(ad discovery.Advertisement) options.Preresolved {
+ me := &naming.MountEntry{
+ IsLeaf: true,
+ }
+ for _, a := range ad.Addresses {
+ addr, _ := naming.SplitAddressName(a)
+ me.Servers = append(me.Servers, naming.MountedServer{
+ Server: addr,
+ })
+ }
+ return options.Preresolved{Resolution: me}
+ }
+
+ alive := map[discovery.AdId]options.Preresolved{}
+ ticker := time.NewTicker(time.Second)
+ for {
+ select {
+ case <-ticker.C:
+ if len(alive) == 0 {
+ bothf(ctx, w, "No live connections to dial.")
+ }
+ for _, opt := range alive {
+ dialtime := options.ConnectionTimeout(5 * time.Second)
+ channeltime := options.ChannelTimeout(2 * time.Second)
+ data := make([]byte, 1024)
+ summary, err := runTimedCall(ctx, "A timed call.", string(data), opt, dialtime, channeltime)
+ if err != nil {
+ bothf(ctx, w, "failed call %s, %v, %v", summary, err, opt.Resolution.Servers)
+ } else {
+ bothf(ctx, w, "succeeded call: %s, %v", summary, opt.Resolution.Servers)
+ }
+ }
+
+ case u := <-updates[0].ch:
+ if u.IsLost() {
+ bothf(ctx, w, "lost %v", u.Addresses())
+ delete(alive, u.Id())
+ } else {
+ bothf(ctx, w, "found %v", u.Addresses())
+ alive[u.Id()] = makeopt(u.Advertisement())
+ }
+ }
+ }
+}
+
// AllFunc runs a server, advertises it, scans for other servers and makes an
// Echo RPC to every advertised remote server.
func AllFunc(ctx *context.T, output io.Writer) error {
diff --git a/impl/google/services/vango/util.go b/impl/google/services/vango/util.go
index 268b58f..d67ce5b 100644
--- a/impl/google/services/vango/util.go
+++ b/impl/google/services/vango/util.go
@@ -54,10 +54,14 @@
return nil
}
-func runTimedCall(ctx *context.T, name, message string) (string, error) {
+type conner interface {
+ Conn() flow.ManagedConn
+}
+
+func runTimedCall(ctx *context.T, name, message string, opts ...rpc.CallOpt) (string, error) {
summary := fmt.Sprintf("[%s] to %v", message, name)
start := time.Now()
- call, err := v23.GetClient(ctx).StartCall(ctx, name, "Echo", []interface{}{message})
+ call, err := v23.GetClient(ctx).StartCall(ctx, name, "Echo", []interface{}{message}, opts...)
if err != nil {
return summary, err
}
@@ -71,7 +75,11 @@
}
me := security.LocalBlessingNames(ctx, call.Security())
them, _ := call.RemoteBlessings()
- return fmt.Sprintf("%s in %v (THEM:%v EP:%v) (ME:%v)", summary, elapsed, them, call.Security().RemoteEndpoint(), me), nil
+ connstr := "<unknown>"
+ if cn, ok := call.(conner); ok {
+ connstr = fmt.Sprintf("%p", cn.Conn())
+ }
+ return fmt.Sprintf("%s in %v (THEM:%v EP:%v) (ME:%v) conn %v", summary, elapsed, them, call.Security().RemoteEndpoint(), me, connstr), nil
}
func username(blessingNames []string) string {