vango: Enhancements to the "all" func

The intention is for this to be used as a demonstration of how
Vanadium discovery+RPC+security shows value in the face of constantly
moving devices with changing network connectivity (turning WiFi on/off,
turning bluetooth on/off)

Specifically:
- The "All" function continuosly advertises and scans
- Whenever it finds a new peer, it sends an RPC to it
- It prints out the identity of the remote user (as a result of the
  authentication protocol) and the address of the underlying channel
  (tcp/bluetooth etc.) used to actually communicate.

Next I intend to play a lot with this app to find bugs/areas of
improvement for the underlying RPC and discovery code.

MultiPart: 1/2
Change-Id: I433f3cff5c964ceb84c92d9e378b2502b41efd00
diff --git a/impl/google/services/vango/funcs.go b/impl/google/services/vango/funcs.go
index 727152b..b13b548 100644
--- a/impl/google/services/vango/funcs.go
+++ b/impl/google/services/vango/funcs.go
@@ -6,6 +6,7 @@
 
 import (
 	"fmt"
+	"io"
 	"time"
 
 	"v.io/v23"
@@ -30,7 +31,7 @@
 	// intended to be run by java/android applications using Vango.run(key).
 	// Users must add function entries to this map and rebuild lib/android-lib in
 	// the vanadium java repository.
-	vangoFuncs = map[string]func(*context.T) error{
+	vangoFuncs = map[string]func(*context.T, io.Writer) error{
 		"tcp-client": tcpClientFunc,
 		"tcp-server": tcpServerFunc,
 		"bt-client":  btClientFunc,
@@ -41,40 +42,40 @@
 	}
 )
 
-func tcpServerFunc(ctx *context.T) error {
+func tcpServerFunc(ctx *context.T, _ io.Writer) error {
 	ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Proxy: "proxy"})
 	return runServer(ctx, tcpServerName)
 }
 
-func tcpClientFunc(ctx *context.T) error {
+func tcpClientFunc(ctx *context.T, _ io.Writer) error {
 	return runClient(ctx, tcpServerName)
 }
 
-func btServerFunc(ctx *context.T) error {
+func btServerFunc(ctx *context.T, _ io.Writer) error {
 	ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{{Protocol: "bt", Address: "/0"}}})
 	return runServer(ctx, btServerName)
 }
 
-func btClientFunc(ctx *context.T) error {
+func btClientFunc(ctx *context.T, _ io.Writer) error {
 	return runClient(ctx, btServerName)
 }
 
-func bleServerFunc(ctx *context.T) error {
+func bleServerFunc(ctx *context.T, _ io.Writer) error {
 	ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{Addrs: rpc.ListenAddrs{{Protocol: "ble", Address: "na"}}})
 	return runServer(ctx, "")
 }
 
-func bleClientFunc(ctx *context.T) error {
+func bleClientFunc(ctx *context.T, _ io.Writer) error {
 	return runClient(ctx, bleServerName)
 }
 
 // AllFunc runs a server, advertises it, scans for other servers and makes an
 // Echo RPC to every advertised remote server.
-func AllFunc(ctx *context.T) error {
+func AllFunc(ctx *context.T, output io.Writer) error {
 	ls := rpc.ListenSpec{Proxy: "proxy"}
 	addRegisteredProto(&ls, "tcp", ":0")
 	addRegisteredProto(&ls, "bt", "/0")
-	ctx.Infof("ListenSpec: %#v", ls)
+	fmt.Fprintf(output, "Listening on: %+v (and proxy)", ls.Addrs)
 	ctx, server, err := v23.WithNewServer(
 		v23.WithListenSpec(ctx, ls),
 		mountName(ctx, "all"),
@@ -102,41 +103,86 @@
 	if err != nil {
 		return err
 	}
-	status := server.Status()
-	ctx.Infof("My AdID: %v", ad.Id)
-	ctx.Infof("Status: %+v", status)
-	counter := 0
-	onDiscovered := func(addr, message string) {
-		ctx, cancel := context.WithTimeout(ctx, rpcTimeout)
-		defer cancel()
-		summary, err := runTimedCall(ctx, addr, message)
-		if err != nil {
-			ctx.Infof("%s: ERROR: %v", summary, err)
-			return
-		}
-		ctx.Infof("%s: SUCCESS", summary)
-	}
-	for {
+	var (
+		status      = server.Status()
+		counter     = 0
+		peerByAdId  = make(map[discovery.AdId]*peer)
+		callResults = make(chan string)
+		activeCalls = 0
+		quit        = false
+		myaddrs     = serverAddrs(status)
+	)
+	fmt.Fprintln(output, "My AdID:", ad.Id)
+	fmt.Fprintln(output, "My addrs:", myaddrs)
+	ctx.Infof("SERVER STATUS: %+v", status)
+	for !quit {
 		select {
 		case <-ctx.Done():
-			ctx.Infof("EXITING")
-			return nil
+			quit = true
 		case <-status.Dirty:
 			status = server.Status()
-			ctx.Infof("Status Changed: %+v", status)
-		case u := <-updates:
-			if u.IsLost() {
-				ctx.Infof("LOST: %v", u.Id())
+			newaddrs := serverAddrs(status)
+			changed := len(newaddrs) != len(myaddrs)
+			if !changed {
+				for i := range newaddrs {
+					if newaddrs[i] != myaddrs[i] {
+						changed = true
+						break
+					}
+				}
+			}
+			if changed {
+				myaddrs = newaddrs
+				fmt.Fprintln(output, "My addrs:", myaddrs)
+			}
+			ctx.Infof("SERVER STATUS: %+v", status)
+		case u, scanning := <-updates:
+			if !scanning {
+				fmt.Fprintln(output, "SCANNING STOPPED")
+				quit = true
 				break
 			}
-			counter++
-			ctx.Infof("FOUND(%d): %+v", counter, u.Advertisement())
-			for _, addr := range u.Addresses() {
-				go onDiscovered(addr, fmt.Sprintf("CALL #%03d", counter))
+			if u.IsLost() {
+				if p, ok := peerByAdId[u.Id()]; ok {
+					fmt.Fprintln(output, "LOST:", p.description)
+					delete(peerByAdId, u.Id())
+				}
+				break
 			}
+			p, err := newPeer(ctx, u)
+			if err != nil {
+				ctx.Info(err)
+				break
+			}
+			peerByAdId[p.adId] = p
+			counter++
+			fmt.Fprintln(output, "FOUND:", p.description)
+			activeCalls++
+			go func(msg string) {
+				summary, err := p.call(ctx, msg)
+				if err != nil {
+					callResults <- fmt.Sprintf("ERROR calling [%v]: %v", p.description, err)
+					return
+				}
+				callResults <- summary
+			}(fmt.Sprintf("Hello #%03d", counter))
+		case r := <-callResults:
+			activeCalls--
+			fmt.Fprintln(output, r)
 		case <-stoppedAd:
-			ctx.Infof("Stopped advertising")
-			return fmt.Errorf("stopped advertising")
+			fmt.Fprintln(output, "STOPPED ADVERTISING")
+			stoppedAd = nil
 		}
 	}
+	fmt.Println(output, "EXITING: Cleaning up")
+	for activeCalls > 0 {
+		<-callResults
+		activeCalls--
+	}
+	// Exhaust the scanned updates queue.
+	// (The channel will be closed as a by-product of the context being Done).
+	for range updates {
+	}
+	fmt.Fprintln(output, "EXITING: Done")
+	return nil
 }
diff --git a/impl/google/services/vango/jni.go b/impl/google/services/vango/jni.go
index 8f20080..d736aa5 100644
--- a/impl/google/services/vango/jni.go
+++ b/impl/google/services/vango/jni.go
@@ -8,6 +8,8 @@
 
 import (
 	"fmt"
+	"io"
+	"runtime"
 	"unsafe"
 
 	jutil "v.io/x/jni/util"
@@ -18,7 +20,7 @@
 import "C"
 
 //export Java_io_v_android_util_Vango_nativeGoContextCall
-func Java_io_v_android_util_Vango_nativeGoContextCall(jenv *C.JNIEnv, jVango C.jobject, jContext C.jobject, jKey C.jstring) {
+func Java_io_v_android_util_Vango_nativeGoContextCall(jenv *C.JNIEnv, jVango C.jobject, jContext C.jobject, jKey C.jstring, jOutput C.jobject) {
 	env := jutil.Env(uintptr(unsafe.Pointer(jenv)))
 	key := jutil.GoString(env, jutil.Object(uintptr(unsafe.Pointer(jKey))))
 	ctx, _, err := jcontext.GoContext(env, jutil.Object(uintptr(unsafe.Pointer(jContext))))
@@ -31,8 +33,33 @@
 		jutil.JThrowV(env, fmt.Errorf("vangoFunc key %q doesn't exist", key))
 		return
 	}
-	if err := f(ctx); err != nil {
+	if err := f(ctx, newJavaOutputWriter(env, jutil.Object(uintptr(unsafe.Pointer(jOutput))))); err != nil {
 		jutil.JThrowV(env, err)
 		return
 	}
+
+}
+
+// javaOutputWriter translates an implementation of the Java Vango.OutputWriter interface to Go's io.Writer
+func newJavaOutputWriter(env jutil.Env, o jutil.Object) io.Writer {
+	ret := &javaOutputWriter{jutil.NewGlobalRef(env, o)}
+	runtime.SetFinalizer(ret, func(w *javaOutputWriter) {
+		env, freeFunc := jutil.GetEnv()
+		defer freeFunc()
+		jutil.DeleteGlobalRef(env, w.obj)
+	})
+	return ret
+}
+
+type javaOutputWriter struct {
+	obj jutil.Object // implementation of the Java Vango.OutputWriter interface
+}
+
+func (w *javaOutputWriter) Write(b []byte) (int, error) {
+	env, freeFunc := jutil.GetEnv()
+	defer freeFunc()
+	if err := jutil.CallVoidMethod(env, w.obj, "write", []jutil.Sign{jutil.StringSign}, string(b)); err != nil {
+		return 0, err
+	}
+	return len(b), nil
 }
diff --git a/impl/google/services/vango/util.go b/impl/google/services/vango/util.go
index 6f760a2..3f8c15c 100644
--- a/impl/google/services/vango/util.go
+++ b/impl/google/services/vango/util.go
@@ -6,13 +6,17 @@
 
 import (
 	"fmt"
+	"sort"
+	"strings"
 	"time"
 
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/conventions"
+	"v.io/v23/discovery"
 	"v.io/v23/flow"
 	"v.io/v23/naming"
+	"v.io/v23/options"
 	"v.io/v23/rpc"
 	"v.io/v23/security"
 )
@@ -69,6 +73,14 @@
 	return fmt.Sprintf("%s in %v (THEM:%v EP:%v) (ME:%v)", summary, elapsed, them, call.Security().RemoteEndpoint(), me), nil
 }
 
+func username(blessingNames []string) string {
+	var ret []string
+	for _, p := range conventions.ParseBlessingNames(blessingNames...) {
+		ret = append(ret, p.User)
+	}
+	return strings.Join(ret, ",")
+}
+
 func mountName(ctx *context.T, addendums ...string) string {
 	var (
 		p     = v23.GetPrincipal(ctx)
@@ -88,3 +100,81 @@
 		}
 	}
 }
+
+func serverAddrs(status rpc.ServerStatus) []string {
+	var addrs []string
+	for _, ep := range status.Endpoints {
+		addrs = append(addrs, fmt.Sprintf("(%v, %v)", ep.Addr().Network(), ep.Addr()))
+	}
+	sort.Strings(addrs)
+	return addrs
+}
+
+func newPeer(ctx *context.T, u discovery.Update) (*peer, error) {
+	var (
+		addrs     []string
+		usernames = make(map[string]bool)
+		me        = naming.MountEntry{IsLeaf: true}
+		ns        = v23.GetNamespace(ctx)
+	)
+	for _, vname := range u.Addresses() {
+		entry, err := ns.Resolve(ctx, vname)
+		if err != nil {
+			ctx.Errorf("Failed to resolve advertised address [%v]: %v", vname, err)
+			continue
+		}
+		for _, s := range entry.Servers {
+			epstr, _ := naming.SplitAddressName(s.Server) // suffix should be empty since the server address is what is advertised.
+			ep, err := naming.ParseEndpoint(epstr)
+			if err != nil {
+				ctx.Errorf("Failed to resolve advertised address [%v] into an endpoint: %v", epstr, err)
+				continue
+			}
+			addrs = append(addrs, fmt.Sprintf("(%v, %v)", ep.Addr().Network(), ep.Addr()))
+			usernames[username(ep.BlessingNames())] = true
+			me.Servers = append(me.Servers, s)
+		}
+	}
+	if len(usernames) == 0 {
+		return nil, fmt.Errorf("could not determine user associated with AdId: %v, addrs: %v", u.Id(), u.Addresses())
+	}
+	ulist := make([]string, 0, len(usernames))
+	for u := range usernames {
+		ulist = append(ulist, u)
+	}
+	return &peer{
+		username:    strings.Join(ulist, ", "),
+		description: fmt.Sprintf("%v at %v (AdId: %v)", ulist, addrs, u.Id()),
+		adId:        u.Id(),
+		preresolved: options.Preresolved{&me},
+	}, nil
+
+}
+
+type peer struct {
+	username    string // Username claimed by the advertisement
+	description string
+	preresolved options.Preresolved
+	adId        discovery.AdId
+}
+
+func (p *peer) call(ctx *context.T, message string) (string, error) {
+	ctx, cancel := context.WithTimeout(ctx, 10*time.Second)
+	defer cancel()
+	start := time.Now()
+	call, err := v23.GetClient(ctx).StartCall(ctx, "", "Echo", []interface{}{message}, p.preresolved)
+	if err != nil {
+		return "", err
+	}
+	var recvd string
+	if err := call.Finish(&recvd); err != nil {
+		return "", err
+	}
+	elapsed := time.Now().Sub(start)
+	if recvd != message {
+		return "", fmt.Errorf("got [%s], want [%s]", recvd, message)
+	}
+	them, _ := call.RemoteBlessings()
+	theiraddr := call.Security().RemoteEndpoint().Addr()
+	return fmt.Sprintf("Called %v at (%v, %v) in %v, and said [%v]", username(them), theiraddr.Network(), theiraddr, elapsed, message), nil
+}
diff --git a/impl/google/services/vango/vango/vango.go b/impl/google/services/vango/vango/vango.go
index 71892df..50f136d 100644
--- a/impl/google/services/vango/vango/vango.go
+++ b/impl/google/services/vango/vango/vango.go
@@ -8,6 +8,7 @@
 
 import (
 	"fmt"
+	"os"
 
 	"v.io/v23"
 	"v.io/x/jni/impl/google/services/vango"
@@ -17,5 +18,5 @@
 func main() {
 	ctx, shutdown := v23.Init()
 	defer shutdown()
-	fmt.Println(vango.AllFunc(ctx))
+	fmt.Println(vango.AllFunc(ctx, os.Stdout))
 }