blob: 524bdc9e2f8f5b559f00a77689ff35e7243f5818 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package rpc
import (
"bytes"
"fmt"
"sort"
"sync"
"time"
"v.io/x/ref/lib/stats"
"v.io/x/ref/lib/stats/counter"
"v.io/x/ref/lib/stats/histogram"
"v.io/v23/naming"
)
type outstandingCall struct {
remote naming.Endpoint
method string
when time.Time
}
type outstandingCalls []*outstandingCall
func (oc outstandingCalls) Less(i, j int) bool {
return oc[i].when.Before(oc[j].when)
}
func (oc outstandingCalls) Swap(i, j int) {
oc[i], oc[j] = oc[j], oc[i]
}
func (oc outstandingCalls) Len() int {
return len(oc)
}
type outstandingStats struct {
prefix string
mu sync.Mutex
outstanding map[*outstandingCall]bool
}
func newOutstandingStats(prefix string) *outstandingStats {
o := &outstandingStats{
prefix: prefix,
outstanding: make(map[*outstandingCall]bool),
}
stats.NewStringFunc(prefix, o.String)
return o
}
func (o *outstandingStats) String() string {
defer o.mu.Unlock()
o.mu.Lock()
if len(o.outstanding) == 0 {
return "No outstanding calls."
}
calls := make(outstandingCalls, 0, len(o.outstanding))
for o := range o.outstanding {
calls = append(calls, o)
}
sort.Sort(calls)
now := time.Now()
buf := &bytes.Buffer{}
for _, o := range calls {
fmt.Fprintf(buf, "%s age:%v from:%v\n", o.method, now.Sub(o.when), o.remote)
}
return buf.String()
}
func (o *outstandingStats) close() {
stats.Delete(o.prefix)
}
func (o *outstandingStats) start(method string, remote naming.Endpoint) func() {
o.mu.Lock()
nw := &outstandingCall{
method: method,
remote: remote,
when: time.Now(),
}
o.outstanding[nw] = true
o.mu.Unlock()
return func() {
o.mu.Lock()
delete(o.outstanding, nw)
o.mu.Unlock()
}
}
type rpcStats struct {
mu sync.RWMutex
prefix string
methods map[string]*perMethodStats
blessingsCacheStats *blessingsCacheStats
}
func newRPCStats(prefix string) *rpcStats {
return &rpcStats{
prefix: prefix,
methods: make(map[string]*perMethodStats),
blessingsCacheStats: newBlessingsCacheStats(prefix),
}
}
type perMethodStats struct {
latency *histogram.Histogram
}
func (s *rpcStats) stop() {
stats.Delete(s.prefix)
}
func (s *rpcStats) record(method string, latency time.Duration) {
// Try first with a read lock. This will succeed in the most common
// case. If it fails, try again with a write lock and create the stats
// objects if they are still not there.
s.mu.RLock()
m, ok := s.methods[method]
s.mu.RUnlock()
if !ok {
m = s.newPerMethodStats(method)
}
m.latency.Add(int64(latency / time.Millisecond))
}
func (s *rpcStats) recordBlessingCache(hit bool) {
s.blessingsCacheStats.incr(hit)
}
// newPerMethodStats creates a new perMethodStats object if one doesn't exist
// already. It returns the newly created object, or the already existing one.
func (s *rpcStats) newPerMethodStats(method string) *perMethodStats {
s.mu.Lock()
defer s.mu.Unlock()
m, ok := s.methods[method]
if !ok {
name := naming.Join(s.prefix, "methods", method, "latency-ms")
s.methods[method] = &perMethodStats{
latency: stats.NewHistogram(name, histogram.Options{
NumBuckets: 25,
GrowthFactor: 1,
SmallestBucketSize: 1,
MinValue: 0,
}),
}
m = s.methods[method]
}
return m
}
// blessingsCacheStats keeps blessing cache hits and total calls received to determine
// how often the blessingCache is being used.
type blessingsCacheStats struct {
callsReceived, cacheHits *counter.Counter
}
func newBlessingsCacheStats(prefix string) *blessingsCacheStats {
cachePrefix := naming.Join(prefix, "security", "blessings", "cache")
return &blessingsCacheStats{
callsReceived: stats.NewCounter(naming.Join(cachePrefix, "attempts")),
cacheHits: stats.NewCounter(naming.Join(cachePrefix, "hits")),
}
}
// Incr increments the cache attempt counter and the cache hit counter if hit is true.
func (s *blessingsCacheStats) incr(hit bool) {
s.callsReceived.Incr(1)
if hit {
s.cacheHits.Incr(1)
}
}