vango: Enhancements to the "all" func
The intention is for this to be used as a demonstration of how
Vanadium discovery+RPC+security shows value in the face of constantly
moving devices with changing network connectivity (turning WiFi on/off,
turning bluetooth on/off)
Specifically:
- The "All" function continuosly advertises and scans
- Whenever it finds a new peer, it sends an RPC to it
- It prints out the identity of the remote user (as a result of the
authentication protocol) and the address of the underlying channel
(tcp/bluetooth etc.) used to actually communicate.
Next I intend to play a lot with this app to find bugs/areas of
improvement for the underlying RPC and discovery code.
MultiPart: 1/2
Change-Id: I433f3cff5c964ceb84c92d9e378b2502b41efd00
diff --git a/impl/google/services/vango/funcs.go b/impl/google/services/vango/funcs.go
index 727152b..b13b548 100644
--- a/impl/google/services/vango/funcs.go
+++ b/impl/google/services/vango/funcs.go
@@ -6,6 +6,7 @@
import (
"fmt"
+ "io"
"time"
"v.io/v23"
@@ -30,7 +31,7 @@
// intended to be run by java/android applications using Vango.run(key).
// Users must add function entries to this map and rebuild lib/android-lib in
// the vanadium java repository.
- vangoFuncs = map[string]func(*context.T) error{
+ vangoFuncs = map[string]func(*context.T, io.Writer) error{
"tcp-client": tcpClientFunc,
"tcp-server": tcpServerFunc,
"bt-client": btClientFunc,
@@ -41,40 +42,40 @@
}
)
-func tcpServerFunc(ctx *context.T) error {
+func tcpServerFunc(ctx *context.T, _ io.Writer) error {
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: "proxy"})
return runServer(ctx, tcpServerName)
}
-func tcpClientFunc(ctx *context.T) error {
+func tcpClientFunc(ctx *context.T, _ io.Writer) error {
return runClient(ctx, tcpServerName)
}
-func btServerFunc(ctx *context.T) error {
+func btServerFunc(ctx *context.T, _ io.Writer) error {
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{{Protocol: "bt", Address: "/0"}}})
return runServer(ctx, btServerName)
}
-func btClientFunc(ctx *context.T) error {
+func btClientFunc(ctx *context.T, _ io.Writer) error {
return runClient(ctx, btServerName)
}
-func bleServerFunc(ctx *context.T) error {
+func bleServerFunc(ctx *context.T, _ io.Writer) error {
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{{Protocol: "ble", Address: "na"}}})
return runServer(ctx, "")
}
-func bleClientFunc(ctx *context.T) error {
+func bleClientFunc(ctx *context.T, _ io.Writer) error {
return runClient(ctx, bleServerName)
}
// AllFunc runs a server, advertises it, scans for other servers and makes an
// Echo RPC to every advertised remote server.
-func AllFunc(ctx *context.T) error {
+func AllFunc(ctx *context.T, output io.Writer) error {
ls := rpc.ListenSpec{Proxy: "proxy"}
addRegisteredProto(&ls, "tcp", ":0")
addRegisteredProto(&ls, "bt", "/0")
- ctx.Infof("ListenSpec: %#v", ls)
+ fmt.Fprintf(output, "Listening on: %+v (and proxy)", ls.Addrs)
ctx, server, err := v23.WithNewServer(
v23.WithListenSpec(ctx, ls),
mountName(ctx, "all"),
@@ -102,41 +103,86 @@
if err != nil {
return err
}
- status := server.Status()
- ctx.Infof("My AdID: %v", ad.Id)
- ctx.Infof("Status: %+v", status)
- counter := 0
- onDiscovered := func(addr, message string) {
- ctx, cancel := context.WithTimeout(ctx, rpcTimeout)
- defer cancel()
- summary, err := runTimedCall(ctx, addr, message)
- if err != nil {
- ctx.Infof("%s: ERROR: %v", summary, err)
- return
- }
- ctx.Infof("%s: SUCCESS", summary)
- }
- for {
+ var (
+ status = server.Status()
+ counter = 0
+ peerByAdId = make(map[discovery.AdId]*peer)
+ callResults = make(chan string)
+ activeCalls = 0
+ quit = false
+ myaddrs = serverAddrs(status)
+ )
+ fmt.Fprintln(output, "My AdID:", ad.Id)
+ fmt.Fprintln(output, "My addrs:", myaddrs)
+ ctx.Infof("SERVER STATUS: %+v", status)
+ for !quit {
select {
case <-ctx.Done():
- ctx.Infof("EXITING")
- return nil
+ quit = true
case <-status.Dirty:
status = server.Status()
- ctx.Infof("Status Changed: %+v", status)
- case u := <-updates:
- if u.IsLost() {
- ctx.Infof("LOST: %v", u.Id())
+ newaddrs := serverAddrs(status)
+ changed := len(newaddrs) != len(myaddrs)
+ if !changed {
+ for i := range newaddrs {
+ if newaddrs[i] != myaddrs[i] {
+ changed = true
+ break
+ }
+ }
+ }
+ if changed {
+ myaddrs = newaddrs
+ fmt.Fprintln(output, "My addrs:", myaddrs)
+ }
+ ctx.Infof("SERVER STATUS: %+v", status)
+ case u, scanning := <-updates:
+ if !scanning {
+ fmt.Fprintln(output, "SCANNING STOPPED")
+ quit = true
break
}
- counter++
- ctx.Infof("FOUND(%d): %+v", counter, u.Advertisement())
- for _, addr := range u.Addresses() {
- go onDiscovered(addr, fmt.Sprintf("CALL #%03d", counter))
+ if u.IsLost() {
+ if p, ok := peerByAdId[u.Id()]; ok {
+ fmt.Fprintln(output, "LOST:", p.description)
+ delete(peerByAdId, u.Id())
+ }
+ break
}
+ p, err := newPeer(ctx, u)
+ if err != nil {
+ ctx.Info(err)
+ break
+ }
+ peerByAdId[p.adId] = p
+ counter++
+ fmt.Fprintln(output, "FOUND:", p.description)
+ activeCalls++
+ go func(msg string) {
+ summary, err := p.call(ctx, msg)
+ if err != nil {
+ callResults <- fmt.Sprintf("ERROR calling [%v]: %v", p.description, err)
+ return
+ }
+ callResults <- summary
+ }(fmt.Sprintf("Hello #%03d", counter))
+ case r := <-callResults:
+ activeCalls--
+ fmt.Fprintln(output, r)
case <-stoppedAd:
- ctx.Infof("Stopped advertising")
- return fmt.Errorf("stopped advertising")
+ fmt.Fprintln(output, "STOPPED ADVERTISING")
+ stoppedAd = nil
}
}
+ fmt.Println(output, "EXITING: Cleaning up")
+ for activeCalls > 0 {
+ <-callResults
+ activeCalls--
+ }
+ // Exhaust the scanned updates queue.
+ // (The channel will be closed as a by-product of the context being Done).
+ for range updates {
+ }
+ fmt.Fprintln(output, "EXITING: Done")
+ return nil
}
diff --git a/impl/google/services/vango/jni.go b/impl/google/services/vango/jni.go
index 8f20080..d736aa5 100644
--- a/impl/google/services/vango/jni.go
+++ b/impl/google/services/vango/jni.go
@@ -8,6 +8,8 @@
import (
"fmt"
+ "io"
+ "runtime"
"unsafe"
jutil "v.io/x/jni/util"
@@ -18,7 +20,7 @@
import "C"
//export Java_io_v_android_util_Vango_nativeGoContextCall
-func Java_io_v_android_util_Vango_nativeGoContextCall(jenv *C.JNIEnv, jVango C.jobject, jContext C.jobject, jKey C.jstring) {
+func Java_io_v_android_util_Vango_nativeGoContextCall(jenv *C.JNIEnv, jVango C.jobject, jContext C.jobject, jKey C.jstring, jOutput C.jobject) {
env := jutil.Env(uintptr(unsafe.Pointer(jenv)))
key := jutil.GoString(env, jutil.Object(uintptr(unsafe.Pointer(jKey))))
ctx, _, err := jcontext.GoContext(env, jutil.Object(uintptr(unsafe.Pointer(jContext))))
@@ -31,8 +33,33 @@
jutil.JThrowV(env, fmt.Errorf("vangoFunc key %q doesn't exist", key))
return
}
- if err := f(ctx); err != nil {
+ if err := f(ctx, newJavaOutputWriter(env, jutil.Object(uintptr(unsafe.Pointer(jOutput))))); err != nil {
jutil.JThrowV(env, err)
return
}
+
+}
+
+// javaOutputWriter translates an implementation of the Java Vango.OutputWriter interface to Go's io.Writer
+func newJavaOutputWriter(env jutil.Env, o jutil.Object) io.Writer {
+ ret := &javaOutputWriter{jutil.NewGlobalRef(env, o)}
+ runtime.SetFinalizer(ret, func(w *javaOutputWriter) {
+ env, freeFunc := jutil.GetEnv()
+ defer freeFunc()
+ jutil.DeleteGlobalRef(env, w.obj)
+ })
+ return ret
+}
+
+type javaOutputWriter struct {
+ obj jutil.Object // implementation of the Java Vango.OutputWriter interface
+}
+
+func (w *javaOutputWriter) Write(b []byte) (int, error) {
+ env, freeFunc := jutil.GetEnv()
+ defer freeFunc()
+ if err := jutil.CallVoidMethod(env, w.obj, "write", []jutil.Sign{jutil.StringSign}, string(b)); err != nil {
+ return 0, err
+ }
+ return len(b), nil
}
diff --git a/impl/google/services/vango/util.go b/impl/google/services/vango/util.go
index 6f760a2..3f8c15c 100644
--- a/impl/google/services/vango/util.go
+++ b/impl/google/services/vango/util.go
@@ -6,13 +6,17 @@
import (
"fmt"
+ "sort"
+ "strings"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/conventions"
+ "v.io/v23/discovery"
"v.io/v23/flow"
"v.io/v23/naming"
+ "v.io/v23/options"
"v.io/v23/rpc"
"v.io/v23/security"
)
@@ -69,6 +73,14 @@
return fmt.Sprintf("%s in %v (THEM:%v EP:%v) (ME:%v)", summary, elapsed, them, call.Security().RemoteEndpoint(), me), nil
}
+func username(blessingNames []string) string {
+ var ret []string
+ for _, p := range conventions.ParseBlessingNames(blessingNames...) {
+ ret = append(ret, p.User)
+ }
+ return strings.Join(ret, ",")
+}
+
func mountName(ctx *context.T, addendums ...string) string {
var (
p = v23.GetPrincipal(ctx)
@@ -88,3 +100,81 @@
}
}
}
+
+func serverAddrs(status rpc.ServerStatus) []string {
+ var addrs []string
+ for _, ep := range status.Endpoints {
+ addrs = append(addrs, fmt.Sprintf("(%v, %v)", ep.Addr().Network(), ep.Addr()))
+ }
+ sort.Strings(addrs)
+ return addrs
+}
+
+func newPeer(ctx *context.T, u discovery.Update) (*peer, error) {
+ var (
+ addrs []string
+ usernames = make(map[string]bool)
+ me = naming.MountEntry{IsLeaf: true}
+ ns = v23.GetNamespace(ctx)
+ )
+ for _, vname := range u.Addresses() {
+ entry, err := ns.Resolve(ctx, vname)
+ if err != nil {
+ ctx.Errorf("Failed to resolve advertised address [%v]: %v", vname, err)
+ continue
+ }
+ for _, s := range entry.Servers {
+ epstr, _ := naming.SplitAddressName(s.Server) // suffix should be empty since the server address is what is advertised.
+ ep, err := naming.ParseEndpoint(epstr)
+ if err != nil {
+ ctx.Errorf("Failed to resolve advertised address [%v] into an endpoint: %v", epstr, err)
+ continue
+ }
+ addrs = append(addrs, fmt.Sprintf("(%v, %v)", ep.Addr().Network(), ep.Addr()))
+ usernames[username(ep.BlessingNames())] = true
+ me.Servers = append(me.Servers, s)
+ }
+ }
+ if len(usernames) == 0 {
+ return nil, fmt.Errorf("could not determine user associated with AdId: %v, addrs: %v", u.Id(), u.Addresses())
+ }
+ ulist := make([]string, 0, len(usernames))
+ for u := range usernames {
+ ulist = append(ulist, u)
+ }
+ return &peer{
+ username: strings.Join(ulist, ", "),
+ description: fmt.Sprintf("%v at %v (AdId: %v)", ulist, addrs, u.Id()),
+ adId: u.Id(),
+ preresolved: options.Preresolved{&me},
+ }, nil
+
+}
+
+type peer struct {
+ username string // Username claimed by the advertisement
+ description string
+ preresolved options.Preresolved
+ adId discovery.AdId
+}
+
+func (p *peer) call(ctx *context.T, message string) (string, error) {
+ ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+ defer cancel()
+ start := time.Now()
+ call, err := v23.GetClient(ctx).StartCall(ctx, "", "Echo", []interface{}{message}, p.preresolved)
+ if err != nil {
+ return "", err
+ }
+ var recvd string
+ if err := call.Finish(&recvd); err != nil {
+ return "", err
+ }
+ elapsed := time.Now().Sub(start)
+ if recvd != message {
+ return "", fmt.Errorf("got [%s], want [%s]", recvd, message)
+ }
+ them, _ := call.RemoteBlessings()
+ theiraddr := call.Security().RemoteEndpoint().Addr()
+ return fmt.Sprintf("Called %v at (%v, %v) in %v, and said [%v]", username(them), theiraddr.Network(), theiraddr, elapsed, message), nil
+}
diff --git a/impl/google/services/vango/vango/vango.go b/impl/google/services/vango/vango/vango.go
index 71892df..50f136d 100644
--- a/impl/google/services/vango/vango/vango.go
+++ b/impl/google/services/vango/vango/vango.go
@@ -8,6 +8,7 @@
import (
"fmt"
+ "os"
"v.io/v23"
"v.io/x/jni/impl/google/services/vango"
@@ -17,5 +18,5 @@
func main() {
ctx, shutdown := v23.Init()
defer shutdown()
- fmt.Println(vango.AllFunc(ctx))
+ fmt.Println(vango.AllFunc(ctx, os.Stdout))
}