services/vango: Tweaks
- Export application state as a stat (viewable via "debug browse")
- Prettier printing of addresses so as to not overwhelm the output with
link-local and loopback addresses on each interface.
Change-Id: I38c4372bd07cb6792d8fe44a6c84e982c5339977
diff --git a/impl/google/services/vango/funcs.go b/impl/google/services/vango/funcs.go
index 3dd38bd..6e097fc 100644
--- a/impl/google/services/vango/funcs.go
+++ b/impl/google/services/vango/funcs.go
@@ -5,6 +5,7 @@
package vango
import (
+ "bytes"
"fmt"
"io"
"time"
@@ -16,12 +17,15 @@
"v.io/v23/rpc"
"v.io/v23/security"
libdiscovery "v.io/x/ref/lib/discovery"
+ "v.io/x/ref/lib/stats"
)
const (
rpcTimeout = 10 * time.Second
tcpServerName = "tmp/vango/tcp"
btServerName = "tmp/vango/bt"
+ interfaceName = "v.io/x/jni/impl/google/services/vango.EchoServer"
+ vangoStat = "vango"
)
var (
@@ -75,7 +79,7 @@
ls := rpc.ListenSpec{Proxy: "proxy"}
addRegisteredProto(&ls, "tcp", ":0")
addRegisteredProto(&ls, "bt", "/0")
- fmt.Fprintf(output, "Listening on: %+v (and proxy)", ls.Addrs)
+ fmt.Fprintf(output, "Listening on: %+v (and proxy)\n", ls.Addrs)
ctx, server, err := v23.WithNewServer(
v23.WithListenSpec(ctx, ls),
mountName(ctx, "all"),
@@ -84,7 +88,6 @@
if err != nil {
return err
}
- const interfaceName = "v.io/x/jni/impl/google/services/vango.EchoServer"
ad := &discovery.Advertisement{
InterfaceName: interfaceName,
Attributes: discovery.Attributes{
@@ -99,7 +102,7 @@
if err != nil {
return err
}
- updates, err := d.Scan(ctx, "")
+ updates, err := d.Scan(ctx, "v.InterfaceName=\""+interfaceName+"\"")
if err != nil {
return err
}
@@ -120,14 +123,22 @@
go func(msg string) {
summary, err := p.call(ctx, msg)
if err != nil {
- callResults <- fmt.Sprintf("ERROR calling [%v]: %v", p.description, err)
+ ctx.Infof("Failed to call [%v]: %v", p.description, err)
+ callResults <- ""
return
}
callResults <- summary
}(fmt.Sprintf("Hello #%d", counter))
}
+ statRequest = make(chan chan<- string)
)
defer ticker.Stop()
+ stats.NewStringFunc(vangoStat, func() string {
+ r := make(chan string)
+ statRequest <- r
+ return <-r
+ })
+ defer stats.Delete(vangoStat)
fmt.Fprintln(output, "My AdID:", ad.Id)
fmt.Fprintln(output, "My addrs:", myaddrs)
ctx.Infof("SERVER STATUS: %+v", status)
@@ -176,7 +187,9 @@
call(p)
case r := <-callResults:
activeCalls--
- fmt.Fprintln(output, r)
+ if len(r) > 0 {
+ fmt.Fprintln(output, r)
+ }
case <-stoppedAd:
fmt.Fprintln(output, "STOPPED ADVERTISING")
stoppedAd = nil
@@ -188,6 +201,15 @@
call(peerByAdId[id])
}
}
+ case s := <-statRequest:
+ idx := 1
+ ret := new(bytes.Buffer)
+ fmt.Fprintln(ret, "ACTIVE CALLS:", activeCalls)
+ fmt.Fprintln(ret, "PEERS")
+ for id, p := range peerByAdId {
+ fmt.Fprintf(ret, "%2d) %s -- %v\n", idx, p.description, lastCall[id])
+ }
+ s <- ret.String()
}
}
fmt.Println(output, "EXITING: Cleaning up")
diff --git a/impl/google/services/vango/util.go b/impl/google/services/vango/util.go
index 8870630..268b58f 100644
--- a/impl/google/services/vango/util.go
+++ b/impl/google/services/vango/util.go
@@ -6,6 +6,7 @@
import (
"fmt"
+ "net"
"sort"
"strings"
"time"
@@ -101,18 +102,68 @@
}
}
-func serverAddrs(status rpc.ServerStatus) []string {
- var addrs []string
- for _, ep := range status.Endpoints {
- addrs = append(addrs, fmt.Sprintf("(%v, %v)", ep.Addr().Network(), ep.Addr()))
+type addrList []net.Addr
+
+func (l addrList) Len() int { return len(l) }
+func (l addrList) Less(i, j int) bool {
+ if l[i].Network() == l[j].Network() {
+ return addrString(l[i]) < addrString(l[j])
}
- sort.Strings(addrs)
- return addrs
+ return l[i].Network() < l[j].Network()
+}
+func (l addrList) Swap(i, j int) { l[i], l[j] = l[j], l[i] }
+
+func addrString(a net.Addr) string {
+ if !strings.HasPrefix(a.Network(), "tcp") && !strings.HasPrefix(a.Network(), "wsh") {
+ return a.String()
+ }
+ host, port, err := net.SplitHostPort(a.String())
+ if err != nil {
+ return a.String()
+ }
+ ip := net.ParseIP(host)
+ if ip == nil {
+ return a.String()
+ }
+ if ip.IsLoopback() {
+ host = "loopback"
+ }
+ if ip.IsLinkLocalUnicast() {
+ host = "linklocal"
+ }
+ if ip.IsMulticast() {
+ // Shouldn't happen
+ host = "multicast"
+ }
+ return net.JoinHostPort(host, port)
+}
+
+func serverAddrs(status rpc.ServerStatus) []string {
+ var addrs addrList
+ for _, ep := range status.Endpoints {
+ addrs = append(addrs, ep.Addr())
+ }
+ return prettyAddrList(addrs)
+}
+
+func prettyAddrList(addrs addrList) []string {
+ if len(addrs) == 0 {
+ return nil
+ }
+ sort.Sort(addrs)
+ var ret []string
+ for i, a := range addrs {
+ str := fmt.Sprintf("(%v, %v)", a.Network(), addrString(a))
+ if i == 0 || ret[len(ret)-1] != str {
+ ret = append(ret, str)
+ }
+ }
+ return ret
}
func newPeer(ctx *context.T, u discovery.Update) (*peer, error) {
var (
- addrs []string
+ addrs addrList
usernames = make(map[string]bool)
me = naming.MountEntry{IsLeaf: true}
ns = v23.GetNamespace(ctx)
@@ -130,7 +181,7 @@
ctx.Errorf("Failed to resolve advertised address [%v] into an endpoint: %v", epstr, err)
continue
}
- addrs = append(addrs, fmt.Sprintf("(%v, %v)", ep.Addr().Network(), ep.Addr()))
+ addrs = append(addrs, ep.Addr())
usernames[username(ep.BlessingNames())] = true
me.Servers = append(me.Servers, s)
}
@@ -144,7 +195,7 @@
}
return &peer{
username: strings.Join(ulist, ", "),
- description: fmt.Sprintf("%v at %v (AdId: %v)", ulist, addrs, u.Id()),
+ description: fmt.Sprintf("%v at %v (AdId: %v)", ulist, prettyAddrList(addrs), u.Id()),
adId: u.Id(),
preresolved: options.Preresolved{&me},
}, nil