ref: Merge stats from todoshacks into master.
Change-Id: Ia89adaa13e2785dff951ad02d606770db2a440a9
diff --git a/runtime/internal/rpc/client.go b/runtime/internal/rpc/client.go
index 2c9220b..04e622c 100644
--- a/runtime/internal/rpc/client.go
+++ b/runtime/internal/rpc/client.go
@@ -84,6 +84,7 @@
flowMgr flow.Manager
preferredProtocols []string
ctx *context.T
+ outstanding *outstandingStats
// stop is kept for backward compatibilty to implement Close().
// TODO(mattr): deprecate Close.
stop func()
@@ -101,11 +102,13 @@
func NewClient(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
ctx, cancel := context.WithCancel(ctx)
+ statsPrefix := fmt.Sprintf("rpc/client/outstanding/%p", ctx)
c := &client{
- ctx: ctx,
- typeCache: newTypeCache(),
- stop: cancel,
- closed: make(chan struct{}),
+ ctx: ctx,
+ typeCache: newTypeCache(),
+ stop: cancel,
+ closed: make(chan struct{}),
+ outstanding: newOutstandingStats(statsPrefix),
}
connIdleExpiry := time.Duration(0)
@@ -131,6 +134,7 @@
<-c.flowMgr.Closed()
c.wg.Wait()
+ c.outstanding.close()
close(c.closed)
}()
@@ -179,7 +183,8 @@
return nil, err
}
- fc, err := newFlowClient(ctx, r.flow, r.typeEnc, r.typeDec)
+ removeStat := c.outstanding.start(method, r.flow.RemoteEndpoint())
+ fc, err := newFlowClient(ctx, removeStat, r.flow, r.typeEnc, r.typeDec)
if err != nil {
return nil, err
}
@@ -583,26 +588,33 @@
sendClosedMu sync.Mutex
sendClosed bool // is the send side already closed? GUARDED_BY(sendClosedMu)
finished bool // has Finish() already been called?
+ removeStat func()
}
var _ rpc.ClientCall = (*flowClient)(nil)
var _ rpc.Stream = (*flowClient)(nil)
-func newFlowClient(ctx *context.T, flow flow.Flow, typeEnc *vom.TypeEncoder, typeDec *vom.TypeDecoder) (*flowClient, error) {
+func newFlowClient(ctx *context.T, removeStat func(), flow flow.Flow, typeEnc *vom.TypeEncoder, typeDec *vom.TypeDecoder) (*flowClient, error) {
bf := conn.NewBufferingFlow(ctx, flow)
if _, err := bf.Write([]byte{dataFlow}); err != nil {
flow.Close()
+ removeStat()
return nil, err
}
fc := &flowClient{
- ctx: ctx,
- flow: bf,
- dec: vom.NewDecoderWithTypeDecoder(bf, typeDec),
- enc: vom.NewEncoderWithTypeEncoder(bf, typeEnc),
+ ctx: ctx,
+ flow: bf,
+ dec: vom.NewDecoderWithTypeDecoder(bf, typeDec),
+ enc: vom.NewEncoderWithTypeEncoder(bf, typeEnc),
+ removeStat: removeStat,
}
return fc, nil
}
+func (fc *flowClient) Conn() flow.ManagedConn {
+ return fc.flow.Conn()
+}
+
// close determines the appropriate error to return, in particular,
// if a timeout or cancelation has occured then any error
// is turned into a timeout or cancelation as appropriate.
@@ -610,6 +622,10 @@
// a timeout can lead to any other number of errors due to the underlying
// network connection being shutdown abruptly.
func (fc *flowClient) close(err error) error {
+ fc.removeStat()
+ if err == nil {
+ return nil
+ }
subErr := verror.SubErr{Err: err, Options: verror.Print}
subErr.Name = "remote=" + fc.flow.RemoteEndpoint().String()
if cerr := fc.flow.Close(); cerr != nil && err == nil {
@@ -866,6 +882,7 @@
return fc.close(berr)
}
}
+ fc.close(nil)
return nil
}
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 6b8566a..8d4a283 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -78,7 +78,8 @@
isLeaf bool
lameDuckTimeout time.Duration // the time to wait for inflight operations to finish on shutdown
- stats *rpcStats // stats for this server.
+ stats *rpcStats // stats for this server.
+ outstanding *outstandingStats
}
func WithNewServer(ctx *context.T,
@@ -123,6 +124,7 @@
endpoints: make(map[string]naming.Endpoint),
lameDuckTimeout: 5 * time.Second,
closed: make(chan struct{}),
+ outstanding: newOutstandingStats(naming.Join("rpc", "server", "outstanding", rid.String())),
}
channelTimeout := time.Duration(0)
connIdleExpiry := time.Duration(0)
@@ -371,7 +373,7 @@
if len(addr.Address) > 0 {
ch, err := s.flowMgr.Listen(ctx, addr.Protocol, addr.Address)
if err != nil {
- s.ctx.Errorf("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, err)
+ s.ctx.VI(1).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, err)
}
s.active.Add(1)
go s.relisten(lctx, addr.Protocol, addr.Address, ch, err)
@@ -410,7 +412,7 @@
}
}
if ch, err = s.flowMgr.Listen(ctx, protocol, address); err != nil {
- s.ctx.Errorf("Listen(%q, %q, ...) failed: %v", protocol, address, err)
+ s.ctx.VI(1).Infof("Listen(%q, %q, ...) failed: %v", protocol, address, err)
}
}
}
@@ -607,6 +609,7 @@
discharges map[string]security.Discharge
starttime time.Time
endStreamArgs bool // are the stream args at EOF?
+ removeStat func()
}
var (
@@ -659,6 +662,11 @@
func (fs *flowServer) serve() error {
defer fs.flow.Close()
+ defer func() {
+ if fs.removeStat != nil {
+ fs.removeStat()
+ }
+ }()
ctx, results, err := fs.processRequest()
vtrace.GetSpan(ctx).Finish()
@@ -745,6 +753,7 @@
ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("\"%s\".UNKNOWN", fs.suffix))
return ctx, nil, err
}
+ fs.removeStat = fs.server.outstanding.start(req.Method, fs.flow.RemoteEndpoint())
// Start building up a new context for the request now that we know
// the header information.
diff --git a/runtime/internal/rpc/stats.go b/runtime/internal/rpc/stats.go
index 640618a..524bdc9 100644
--- a/runtime/internal/rpc/stats.go
+++ b/runtime/internal/rpc/stats.go
@@ -5,6 +5,9 @@
package rpc
import (
+ "bytes"
+ "fmt"
+ "sort"
"sync"
"time"
@@ -15,6 +18,79 @@
"v.io/v23/naming"
)
+type outstandingCall struct {
+ remote naming.Endpoint
+ method string
+ when time.Time
+}
+
+type outstandingCalls []*outstandingCall
+
+func (oc outstandingCalls) Less(i, j int) bool {
+ return oc[i].when.Before(oc[j].when)
+}
+func (oc outstandingCalls) Swap(i, j int) {
+ oc[i], oc[j] = oc[j], oc[i]
+}
+func (oc outstandingCalls) Len() int {
+ return len(oc)
+}
+
+type outstandingStats struct {
+ prefix string
+ mu sync.Mutex
+ outstanding map[*outstandingCall]bool
+}
+
+func newOutstandingStats(prefix string) *outstandingStats {
+ o := &outstandingStats{
+ prefix: prefix,
+ outstanding: make(map[*outstandingCall]bool),
+ }
+ stats.NewStringFunc(prefix, o.String)
+ return o
+}
+
+func (o *outstandingStats) String() string {
+ defer o.mu.Unlock()
+ o.mu.Lock()
+ if len(o.outstanding) == 0 {
+ return "No outstanding calls."
+ }
+ calls := make(outstandingCalls, 0, len(o.outstanding))
+ for o := range o.outstanding {
+ calls = append(calls, o)
+ }
+ sort.Sort(calls)
+ now := time.Now()
+ buf := &bytes.Buffer{}
+ for _, o := range calls {
+ fmt.Fprintf(buf, "%s age:%v from:%v\n", o.method, now.Sub(o.when), o.remote)
+ }
+ return buf.String()
+}
+
+func (o *outstandingStats) close() {
+ stats.Delete(o.prefix)
+}
+
+func (o *outstandingStats) start(method string, remote naming.Endpoint) func() {
+ o.mu.Lock()
+ nw := &outstandingCall{
+ method: method,
+ remote: remote,
+ when: time.Now(),
+ }
+ o.outstanding[nw] = true
+ o.mu.Unlock()
+
+ return func() {
+ o.mu.Lock()
+ delete(o.outstanding, nw)
+ o.mu.Unlock()
+ }
+}
+
type rpcStats struct {
mu sync.RWMutex
prefix string
diff --git a/services/debug/debug/browseserver/sbtree/colltree.go b/services/debug/debug/browseserver/sbtree/colltree.go
index 561ae5d..0e40812 100644
--- a/services/debug/debug/browseserver/sbtree/colltree.go
+++ b/services/debug/debug/browseserver/sbtree/colltree.go
@@ -10,6 +10,7 @@
"v.io/v23/context"
wire "v.io/v23/services/syncbase"
"v.io/v23/syncbase"
+ "v.io/v23/vdl"
)
// CollectionTree has all the data for the collection page of the Syncbase debug
@@ -100,12 +101,14 @@
if state == gathering {
// Grab the value, put it and the key into a KeyVal, and
// add it to the page.
- var value interface{}
- err := stream.Value(&value)
- if err != nil {
- value = fmt.Sprintf("ERROR getting value: %v", err)
+ kv := keyVal{Index: rowCount, Key: key}
+ var value *vdl.Value
+ if err := stream.Value(&value); err != nil {
+ kv.Value = fmt.Sprintf("ERROR getting value: %v", err)
+ } else {
+ kv.Value = value
}
- page.KeyVals = append(page.KeyVals, keyVal{rowCount, key, value})
+ page.KeyVals = append(page.KeyVals, kv)
}
rowCount++
}
diff --git a/services/debug/debug/browseserver/sbtree/colltree_test.go b/services/debug/debug/browseserver/sbtree/colltree_test.go
index 3c1bd8a..55c8793 100644
--- a/services/debug/debug/browseserver/sbtree/colltree_test.go
+++ b/services/debug/debug/browseserver/sbtree/colltree_test.go
@@ -9,6 +9,7 @@
"testing"
"v.io/v23/syncbase"
+ "v.io/v23/vdl"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/services/debug/debug/browseserver/sbtree"
tu "v.io/x/ref/services/syncbase/testutil"
@@ -112,9 +113,9 @@
}
}
for i, want := range []interface{}{complex(11, 22), int64(9999), 'x', "something"} {
- if got.KeysPage.KeyVals[i].Value != want {
+ if val := vdl.ValueOf(want); !reflect.DeepEqual(got.KeysPage.KeyVals[i].Value, val) {
t.Errorf("got %v of type %T, want %v of type %T",
- got.KeysPage.KeyVals[i].Value, got.KeysPage.KeyVals[i].Value, want, want)
+ got.KeysPage.KeyVals[i].Value, got.KeysPage.KeyVals[i].Value, val, val)
}
}
@@ -339,12 +340,12 @@
t.Errorf("Wanted 1 keys, got %v (length %d)",
got.KeysPage.KeyVals, len(got.KeysPage.KeyVals))
}
- value, ok := got.KeysPage.KeyVals[0].Value.(someCustomType)
- if !ok {
+ value := got.KeysPage.KeyVals[0].Value.(*vdl.Value)
+ if value.Type() != vdl.TypeOf(someCustomType{}) {
t.Fatalf("Got %v of type %T, want of type someCustomType",
- got.KeysPage.KeyVals[0].Value, got.KeysPage.KeyVals[0].Value)
+ value, value.Type)
}
- want := someCustomType{"something", 'x', childType{9999, complex(11, 22)}}
+ want := vdl.ValueOf(someCustomType{"something", 'x', childType{9999, complex(11, 22)}})
if !reflect.DeepEqual(value, want) {
t.Errorf("Got %v, want %v", value, want)
}
diff --git a/services/syncbase/vsync/peer_manager.go b/services/syncbase/vsync/peer_manager.go
index 401ea06..a8fcea5 100644
--- a/services/syncbase/vsync/peer_manager.go
+++ b/services/syncbase/vsync/peer_manager.go
@@ -5,6 +5,8 @@
package vsync
import (
+ "bytes"
+ "fmt"
"sync"
"time"
@@ -14,6 +16,7 @@
"v.io/v23/verror"
"v.io/x/lib/set"
"v.io/x/lib/vlog"
+ "v.io/x/ref/lib/stats"
"v.io/x/ref/services/syncbase/common"
"v.io/x/ref/services/syncbase/ping"
"v.io/x/ref/services/syncbase/server/interfaces"
@@ -48,6 +51,8 @@
// updatePeerFromResponder updates information for a peer that the
// responder responds to.
updatePeerFromResponder(ctx *context.T, peer string, connTs time.Time, gvs interfaces.Knowledge) error
+
+ exportStats(prefix string)
}
////////////////////////////////////////
@@ -121,6 +126,9 @@
// Once pinned.Unpin() is called, the connection will no longer be pinned in
// rpc cache, and healthCheck will return to the rpc default health check interval.
pinned flow.PinnedConn
+
+ // addedTime is the time at which the connection was put into the peer cache.
+ addedTime time.Time
}
type peerManagerImpl struct {
@@ -152,6 +160,30 @@
}
}
+func (pm *peerManagerImpl) exportStats(prefix string) {
+ stats.NewStringFunc(naming.Join(prefix, "peers"), pm.debugStringForPeers)
+}
+
+func (pm *peerManagerImpl) debugStringForPeers() string {
+ pm.Lock()
+ defer pm.Unlock()
+ buf := &bytes.Buffer{}
+ for _, c := range pm.healthyPeerCache {
+ fmt.Fprintf(buf, "%v\n", c.debugString())
+ fmt.Fprintln(buf)
+ }
+ return buf.String()
+}
+
+func (c *connInfo) debugString() string {
+ buf := &bytes.Buffer{}
+ fmt.Fprintf(buf, "RELNAME: %v\n", c.relName)
+ fmt.Fprintf(buf, "MTTBLS: %v\n", c.mtTbls)
+ fmt.Fprintf(buf, "ADDRS: %v\n", c.addrs)
+ fmt.Fprintf(buf, "ADDEDTIME: %v\n", c.addedTime)
+ return buf.String()
+}
+
func (pm *peerManagerImpl) managePeers(ctx *context.T) {
defer pm.s.pending.Done()
@@ -218,7 +250,9 @@
// neighborhood peers until the cache entries expire.
}
+ now := time.Now()
for _, p := range peers {
+ p.addedTime = now
pm.healthyPeerCache[p.relName] = p
}
}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 8fc5213..4b45d4c 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -29,6 +29,7 @@
"v.io/v23/verror"
"v.io/x/lib/vlog"
idiscovery "v.io/x/ref/lib/discovery"
+ "v.io/x/ref/lib/stats"
"v.io/x/ref/services/syncbase/common"
syncdis "v.io/x/ref/services/syncbase/discovery"
blob "v.io/x/ref/services/syncbase/localblobstore"
@@ -124,6 +125,9 @@
// Peer manager for managing peers to sync with.
pm peerManager
+
+ // Naming prefix at which debugging information is exported.
+ statPrefix string
}
// syncDatabase contains the metadata for syncing a database. This struct is
@@ -173,7 +177,9 @@
discovery: discovery,
publishInNh: publishInNh,
advSyncgroups: make(map[interfaces.GroupId]syncAdvertisementState),
+ statPrefix: syncServiceStatName(),
}
+ s.exportStats()
data := &SyncData{}
if err := store.RunInTransaction(sv.St(), func(tx store.Transaction) error {
@@ -217,6 +223,7 @@
// Start the peer manager thread to maintain peers viable for syncing.
go s.pm.managePeers(ctx)
+ s.pm.exportStats(naming.Join("syncbase", s.name))
// Start initiator thread to periodically get deltas from peers. The
// initiator threads consults the peer manager to pick peers to sync
@@ -240,6 +247,7 @@
close(s.closed)
s.pending.Wait()
s.bst.Close()
+ stats.Delete(s.statPrefix)
}
func NewSyncDatabase(db interfaces.Database) *syncDatabase {
@@ -294,7 +302,7 @@
s.discoveryLock.Lock()
defer s.discoveryLock.Unlock()
- vlog.VI(3).Info("sync: updateDiscoveryInfo: %s: %+v, %p, current discoverySyncgroups is %+v", id, ad, ad, s.discoverySyncgroups)
+ vlog.VI(3).Infof("sync: updateDiscoveryInfo: %s: %+v, %p, current discoverySyncgroups is %+v", id, ad, ad, s.discoverySyncgroups)
// The first time around initialize all discovery maps.
if s.discoveryIds == nil {
@@ -616,3 +624,44 @@
func (s *syncService) stKey() string {
return common.SyncPrefix
}
+
+var (
+ statMu sync.Mutex
+ statIdx int
+)
+
+func syncServiceStatName() string {
+ statMu.Lock()
+ ret := naming.Join("syncbase", "vsync", fmt.Sprint(statIdx))
+ statIdx++
+ statMu.Unlock()
+ return ret
+}
+
+func (s *syncService) exportStats() {
+ stats.NewStringFunc(s.statPrefix, func() string {
+ s.discoveryLock.Lock()
+ defer s.discoveryLock.Unlock()
+ return fmt.Sprintf(`
+Peers: %v
+Ads: %v
+Syncgroups: %v
+`, adMapKeys(s.discoveryPeers), adMapKeys(s.discoveryIds), groupMapKeys(s.discoverySyncgroups))
+ })
+}
+
+func adMapKeys(m map[string]*discovery.Advertisement) []string {
+ var ret []string
+ for k, v := range m {
+ ret = append(ret, fmt.Sprintf("%v: %v\n", k, *v))
+ }
+ return ret
+}
+
+func groupMapKeys(m map[interfaces.GroupId]map[string]*discovery.Advertisement) []string {
+ var ret []string
+ for k := range m {
+ ret = append(ret, fmt.Sprint(k))
+ }
+ return ret
+}