ref: Merge stats from todoshacks into master.

Change-Id: Ia89adaa13e2785dff951ad02d606770db2a440a9
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index 2c9220b..04e622c 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -84,6 +84,7 @@
 	flowMgr            flow.Manager
 	preferredProtocols []string
 	ctx                *context.T
+	outstanding        *outstandingStats
 	// stop is kept for backward compatibilty to implement Close().
 	// TODO(mattr): deprecate Close.
 	stop func()
@@ -101,11 +102,13 @@
 
 func NewClient(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
 	ctx, cancel := context.WithCancel(ctx)
+	statsPrefix := fmt.Sprintf("rpc/client/outstanding/%p", ctx)
 	c := &client{
-		ctx:       ctx,
-		typeCache: newTypeCache(),
-		stop:      cancel,
-		closed:    make(chan struct{}),
+		ctx:         ctx,
+		typeCache:   newTypeCache(),
+		stop:        cancel,
+		closed:      make(chan struct{}),
+		outstanding: newOutstandingStats(statsPrefix),
 	}
 
 	connIdleExpiry := time.Duration(0)
@@ -131,6 +134,7 @@
 
 		<-c.flowMgr.Closed()
 		c.wg.Wait()
+		c.outstanding.close()
 		close(c.closed)
 	}()
 
@@ -179,7 +183,8 @@
 		return nil, err
 	}
 
-	fc, err := newFlowClient(ctx, r.flow, r.typeEnc, r.typeDec)
+	removeStat := c.outstanding.start(method, r.flow.RemoteEndpoint())
+	fc, err := newFlowClient(ctx, removeStat, r.flow, r.typeEnc, r.typeDec)
 	if err != nil {
 		return nil, err
 	}
@@ -583,26 +588,33 @@
 	sendClosedMu sync.Mutex
 	sendClosed   bool // is the send side already closed? GUARDED_BY(sendClosedMu)
 	finished     bool // has Finish() already been called?
+	removeStat   func()
 }
 
 var _ rpc.ClientCall = (*flowClient)(nil)
 var _ rpc.Stream = (*flowClient)(nil)
 
-func newFlowClient(ctx *context.T, flow flow.Flow, typeEnc *vom.TypeEncoder, typeDec *vom.TypeDecoder) (*flowClient, error) {
+func newFlowClient(ctx *context.T, removeStat func(), flow flow.Flow, typeEnc *vom.TypeEncoder, typeDec *vom.TypeDecoder) (*flowClient, error) {
 	bf := conn.NewBufferingFlow(ctx, flow)
 	if _, err := bf.Write([]byte{dataFlow}); err != nil {
 		flow.Close()
+		removeStat()
 		return nil, err
 	}
 	fc := &flowClient{
-		ctx:  ctx,
-		flow: bf,
-		dec:  vom.NewDecoderWithTypeDecoder(bf, typeDec),
-		enc:  vom.NewEncoderWithTypeEncoder(bf, typeEnc),
+		ctx:        ctx,
+		flow:       bf,
+		dec:        vom.NewDecoderWithTypeDecoder(bf, typeDec),
+		enc:        vom.NewEncoderWithTypeEncoder(bf, typeEnc),
+		removeStat: removeStat,
 	}
 	return fc, nil
 }
 
+func (fc *flowClient) Conn() flow.ManagedConn {
+	return fc.flow.Conn()
+}
+
 // close determines the appropriate error to return, in particular,
 // if a timeout or cancelation has occured then any error
 // is turned into a timeout or cancelation as appropriate.
@@ -610,6 +622,10 @@
 // a timeout can lead to any other number of errors due to the underlying
 // network connection being shutdown abruptly.
 func (fc *flowClient) close(err error) error {
+	fc.removeStat()
+	if err == nil {
+		return nil
+	}
 	subErr := verror.SubErr{Err: err, Options: verror.Print}
 	subErr.Name = "remote=" + fc.flow.RemoteEndpoint().String()
 	if cerr := fc.flow.Close(); cerr != nil && err == nil {
@@ -866,6 +882,7 @@
 			return fc.close(berr)
 		}
 	}
+	fc.close(nil)
 	return nil
 }
 
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 6b8566a..8d4a283 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -78,7 +78,8 @@
 	isLeaf             bool
 	lameDuckTimeout    time.Duration // the time to wait for inflight operations to finish on shutdown
 
-	stats *rpcStats // stats for this server.
+	stats       *rpcStats // stats for this server.
+	outstanding *outstandingStats
 }
 
 func WithNewServer(ctx *context.T,
@@ -123,6 +124,7 @@
 		endpoints:         make(map[string]naming.Endpoint),
 		lameDuckTimeout:   5 * time.Second,
 		closed:            make(chan struct{}),
+		outstanding:       newOutstandingStats(naming.Join("rpc", "server", "outstanding", rid.String())),
 	}
 	channelTimeout := time.Duration(0)
 	connIdleExpiry := time.Duration(0)
@@ -371,7 +373,7 @@
 		if len(addr.Address) > 0 {
 			ch, err := s.flowMgr.Listen(ctx, addr.Protocol, addr.Address)
 			if err != nil {
-				s.ctx.Errorf("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, err)
+				s.ctx.VI(1).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, err)
 			}
 			s.active.Add(1)
 			go s.relisten(lctx, addr.Protocol, addr.Address, ch, err)
@@ -410,7 +412,7 @@
 			}
 		}
 		if ch, err = s.flowMgr.Listen(ctx, protocol, address); err != nil {
-			s.ctx.Errorf("Listen(%q, %q, ...) failed: %v", protocol, address, err)
+			s.ctx.VI(1).Infof("Listen(%q, %q, ...) failed: %v", protocol, address, err)
 		}
 	}
 }
@@ -607,6 +609,7 @@
 	discharges       map[string]security.Discharge
 	starttime        time.Time
 	endStreamArgs    bool // are the stream args at EOF?
+	removeStat       func()
 }
 
 var (
@@ -659,6 +662,11 @@
 
 func (fs *flowServer) serve() error {
 	defer fs.flow.Close()
+	defer func() {
+		if fs.removeStat != nil {
+			fs.removeStat()
+		}
+	}()
 
 	ctx, results, err := fs.processRequest()
 	vtrace.GetSpan(ctx).Finish()
@@ -745,6 +753,7 @@
 		ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
 		return ctx, nil, err
 	}
+	fs.removeStat = fs.server.outstanding.start(req.Method, fs.flow.RemoteEndpoint())
 
 	// Start building up a new context for the request now that we know
 	// the header information.
diff --git a/runtime/internal/rpc/stats.go b/runtime/internal/rpc/stats.go
index 640618a..524bdc9 100644
--- a/runtime/internal/rpc/stats.go
+++ b/runtime/internal/rpc/stats.go
@@ -5,6 +5,9 @@
 package rpc
 
 import (
+	"bytes"
+	"fmt"
+	"sort"
 	"sync"
 	"time"
 
@@ -15,6 +18,79 @@
 	"v.io/v23/naming"
 )
 
+type outstandingCall struct {
+	remote naming.Endpoint
+	method string
+	when   time.Time
+}
+
+type outstandingCalls []*outstandingCall
+
+func (oc outstandingCalls) Less(i, j int) bool {
+	return oc[i].when.Before(oc[j].when)
+}
+func (oc outstandingCalls) Swap(i, j int) {
+	oc[i], oc[j] = oc[j], oc[i]
+}
+func (oc outstandingCalls) Len() int {
+	return len(oc)
+}
+
+type outstandingStats struct {
+	prefix      string
+	mu          sync.Mutex
+	outstanding map[*outstandingCall]bool
+}
+
+func newOutstandingStats(prefix string) *outstandingStats {
+	o := &outstandingStats{
+		prefix:      prefix,
+		outstanding: make(map[*outstandingCall]bool),
+	}
+	stats.NewStringFunc(prefix, o.String)
+	return o
+}
+
+func (o *outstandingStats) String() string {
+	defer o.mu.Unlock()
+	o.mu.Lock()
+	if len(o.outstanding) == 0 {
+		return "No outstanding calls."
+	}
+	calls := make(outstandingCalls, 0, len(o.outstanding))
+	for o := range o.outstanding {
+		calls = append(calls, o)
+	}
+	sort.Sort(calls)
+	now := time.Now()
+	buf := &bytes.Buffer{}
+	for _, o := range calls {
+		fmt.Fprintf(buf, "%s age:%v from:%v\n", o.method, now.Sub(o.when), o.remote)
+	}
+	return buf.String()
+}
+
+func (o *outstandingStats) close() {
+	stats.Delete(o.prefix)
+}
+
+func (o *outstandingStats) start(method string, remote naming.Endpoint) func() {
+	o.mu.Lock()
+	nw := &outstandingCall{
+		method: method,
+		remote: remote,
+		when:   time.Now(),
+	}
+	o.outstanding[nw] = true
+	o.mu.Unlock()
+
+	return func() {
+		o.mu.Lock()
+		delete(o.outstanding, nw)
+		o.mu.Unlock()
+	}
+}
+
 type rpcStats struct {
 	mu                  sync.RWMutex
 	prefix              string
diff --git a/services/debug/debug/browseserver/sbtree/colltree.go b/services/debug/debug/browseserver/sbtree/colltree.go
index 561ae5d..0e40812 100644
--- a/services/debug/debug/browseserver/sbtree/colltree.go
+++ b/services/debug/debug/browseserver/sbtree/colltree.go
@@ -10,6 +10,7 @@
 	"v.io/v23/context"
 	wire "v.io/v23/services/syncbase"
 	"v.io/v23/syncbase"
+	"v.io/v23/vdl"
 )
 
 // CollectionTree has all the data for the collection page of the Syncbase debug
@@ -100,12 +101,14 @@
 		if state == gathering {
 			// Grab the value, put it and the key into a KeyVal, and
 			// add it to the page.
-			var value interface{}
-			err := stream.Value(&value)
-			if err != nil {
-				value = fmt.Sprintf("ERROR getting value: %v", err)
+			kv := keyVal{Index: rowCount, Key: key}
+			var value *vdl.Value
+			if err := stream.Value(&value); err != nil {
+				kv.Value = fmt.Sprintf("ERROR getting value: %v", err)
+			} else {
+				kv.Value = value
 			}
-			page.KeyVals = append(page.KeyVals, keyVal{rowCount, key, value})
+			page.KeyVals = append(page.KeyVals, kv)
 		}
 		rowCount++
 	}
diff --git a/services/debug/debug/browseserver/sbtree/colltree_test.go b/services/debug/debug/browseserver/sbtree/colltree_test.go
index 3c1bd8a..55c8793 100644
--- a/services/debug/debug/browseserver/sbtree/colltree_test.go
+++ b/services/debug/debug/browseserver/sbtree/colltree_test.go
@@ -9,6 +9,7 @@
 	"testing"
 
 	"v.io/v23/syncbase"
+	"v.io/v23/vdl"
 	_ "v.io/x/ref/runtime/factories/generic"
 	"v.io/x/ref/services/debug/debug/browseserver/sbtree"
 	tu "v.io/x/ref/services/syncbase/testutil"
@@ -112,9 +113,9 @@
 		}
 	}
 	for i, want := range []interface{}{complex(11, 22), int64(9999), 'x', "something"} {
-		if got.KeysPage.KeyVals[i].Value != want {
+		if val := vdl.ValueOf(want); !reflect.DeepEqual(got.KeysPage.KeyVals[i].Value, val) {
 			t.Errorf("got %v of type %T, want %v of type %T",
-				got.KeysPage.KeyVals[i].Value, got.KeysPage.KeyVals[i].Value, want, want)
+				got.KeysPage.KeyVals[i].Value, got.KeysPage.KeyVals[i].Value, val, val)
 		}
 	}
 
@@ -339,12 +340,12 @@
 		t.Errorf("Wanted 1 keys, got %v (length %d)",
 			got.KeysPage.KeyVals, len(got.KeysPage.KeyVals))
 	}
-	value, ok := got.KeysPage.KeyVals[0].Value.(someCustomType)
-	if !ok {
+	value := got.KeysPage.KeyVals[0].Value.(*vdl.Value)
+	if value.Type() != vdl.TypeOf(someCustomType{}) {
 		t.Fatalf("Got %v of type %T, want of type someCustomType",
-			got.KeysPage.KeyVals[0].Value, got.KeysPage.KeyVals[0].Value)
+			value, value.Type)
 	}
-	want := someCustomType{"something", 'x', childType{9999, complex(11, 22)}}
+	want := vdl.ValueOf(someCustomType{"something", 'x', childType{9999, complex(11, 22)}})
 	if !reflect.DeepEqual(value, want) {
 		t.Errorf("Got %v, want %v", value, want)
 	}
diff --git a/services/syncbase/vsync/peer_manager.go b/services/syncbase/vsync/peer_manager.go
index 401ea06..a8fcea5 100644
--- a/services/syncbase/vsync/peer_manager.go
+++ b/services/syncbase/vsync/peer_manager.go
@@ -5,6 +5,8 @@
 package vsync
 
 import (
+	"bytes"
+	"fmt"
 	"sync"
 	"time"
 
@@ -14,6 +16,7 @@
 	"v.io/v23/verror"
 	"v.io/x/lib/set"
 	"v.io/x/lib/vlog"
+	"v.io/x/ref/lib/stats"
 	"v.io/x/ref/services/syncbase/common"
 	"v.io/x/ref/services/syncbase/ping"
 	"v.io/x/ref/services/syncbase/server/interfaces"
@@ -48,6 +51,8 @@
 	// updatePeerFromResponder updates information for a peer that the
 	// responder responds to.
 	updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gvs interfaces.Knowledge) error
+
+	exportStats(prefix string)
 }
 
 ////////////////////////////////////////
@@ -121,6 +126,9 @@
 	// Once pinned.Unpin() is called, the connection will no longer be pinned in
 	// rpc cache, and healthCheck will return to the rpc default health check interval.
 	pinned flow.PinnedConn
+
+	// addedTime is the time at which the connection was put into the peer cache.
+	addedTime time.Time
 }
 
 type peerManagerImpl struct {
@@ -152,6 +160,30 @@
 	}
 }
 
+func (pm *peerManagerImpl) exportStats(prefix string) {
+	stats.NewStringFunc(naming.Join(prefix, "peers"), pm.debugStringForPeers)
+}
+
+func (pm *peerManagerImpl) debugStringForPeers() string {
+	pm.Lock()
+	defer pm.Unlock()
+	buf := &bytes.Buffer{}
+	for _, c := range pm.healthyPeerCache {
+		fmt.Fprintf(buf, "%v\n", c.debugString())
+		fmt.Fprintln(buf)
+	}
+	return buf.String()
+}
+
+func (c *connInfo) debugString() string {
+	buf := &bytes.Buffer{}
+	fmt.Fprintf(buf, "RELNAME: %v\n", c.relName)
+	fmt.Fprintf(buf, "MTTBLS: %v\n", c.mtTbls)
+	fmt.Fprintf(buf, "ADDRS: %v\n", c.addrs)
+	fmt.Fprintf(buf, "ADDEDTIME: %v\n", c.addedTime)
+	return buf.String()
+}
+
 func (pm *peerManagerImpl) managePeers(ctx *context.T) {
 	defer pm.s.pending.Done()
 
@@ -218,7 +250,9 @@
 		// neighborhood peers until the cache entries expire.
 	}
 
+	now := time.Now()
 	for _, p := range peers {
+		p.addedTime = now
 		pm.healthyPeerCache[p.relName] = p
 	}
 }
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 8fc5213..4b45d4c 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -29,6 +29,7 @@
 	"v.io/v23/verror"
 	"v.io/x/lib/vlog"
 	idiscovery "v.io/x/ref/lib/discovery"
+	"v.io/x/ref/lib/stats"
 	"v.io/x/ref/services/syncbase/common"
 	syncdis "v.io/x/ref/services/syncbase/discovery"
 	blob "v.io/x/ref/services/syncbase/localblobstore"
@@ -124,6 +125,9 @@
 
 	// Peer manager for managing peers to sync with.
 	pm peerManager
+
+	// Naming prefix at which debugging information is exported.
+	statPrefix string
 }
 
 // syncDatabase contains the metadata for syncing a database. This struct is
@@ -173,7 +177,9 @@
 		discovery:      discovery,
 		publishInNh:    publishInNh,
 		advSyncgroups:  make(map[interfaces.GroupId]syncAdvertisementState),
+		statPrefix:     syncServiceStatName(),
 	}
+	s.exportStats()
 
 	data := &SyncData{}
 	if err := store.RunInTransaction(sv.St(), func(tx store.Transaction) error {
@@ -217,6 +223,7 @@
 
 	// Start the peer manager thread to maintain peers viable for syncing.
 	go s.pm.managePeers(ctx)
+	s.pm.exportStats(naming.Join("syncbase", s.name))
 
 	// Start initiator thread to periodically get deltas from peers. The
 	// initiator threads consults the peer manager to pick peers to sync
@@ -240,6 +247,7 @@
 	close(s.closed)
 	s.pending.Wait()
 	s.bst.Close()
+	stats.Delete(s.statPrefix)
 }
 
 func NewSyncDatabase(db interfaces.Database) *syncDatabase {
@@ -294,7 +302,7 @@
 	s.discoveryLock.Lock()
 	defer s.discoveryLock.Unlock()
 
-	vlog.VI(3).Info("sync: updateDiscoveryInfo: %s: %+v, %p, current discoverySyncgroups is %+v", id, ad, ad, s.discoverySyncgroups)
+	vlog.VI(3).Infof("sync: updateDiscoveryInfo: %s: %+v, %p, current discoverySyncgroups is %+v", id, ad, ad, s.discoverySyncgroups)
 
 	// The first time around initialize all discovery maps.
 	if s.discoveryIds == nil {
@@ -616,3 +624,44 @@
 func (s *syncService) stKey() string {
 	return common.SyncPrefix
 }
+
+var (
+	statMu  sync.Mutex
+	statIdx int
+)
+
+func syncServiceStatName() string {
+	statMu.Lock()
+	ret := naming.Join("syncbase", "vsync", fmt.Sprint(statIdx))
+	statIdx++
+	statMu.Unlock()
+	return ret
+}
+
+func (s *syncService) exportStats() {
+	stats.NewStringFunc(s.statPrefix, func() string {
+		s.discoveryLock.Lock()
+		defer s.discoveryLock.Unlock()
+		return fmt.Sprintf(`
+Peers:        %v
+Ads:          %v
+Syncgroups:   %v
+`, adMapKeys(s.discoveryPeers), adMapKeys(s.discoveryIds), groupMapKeys(s.discoverySyncgroups))
+	})
+}
+
+func adMapKeys(m map[string]*discovery.Advertisement) []string {
+	var ret []string
+	for k, v := range m {
+		ret = append(ret, fmt.Sprintf("%v: %v\n", k, *v))
+	}
+	return ret
+}
+
+func groupMapKeys(m map[interfaces.GroupId]map[string]*discovery.Advertisement) []string {
+	var ret []string
+	for k := range m {
+		ret = append(ret, fmt.Sprint(k))
+	}
+	return ret
+}