blob: c4d227750f391340b27b872f98069aca552b333f [file] [log] [blame]
// Package impl implements the Stats interface from
// veyron2/services/mgmt/stats.
package impl
import (
"time"
libstats "v.io/core/veyron/lib/stats"
"v.io/core/veyron2/ipc"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/services/mgmt/stats"
"v.io/core/veyron2/services/mgmt/stats/types"
"v.io/core/veyron2/services/watch"
watchtypes "v.io/core/veyron2/services/watch/types"
"v.io/core/veyron2/vdl"
verror "v.io/core/veyron2/verror2"
"v.io/core/veyron2/vlog"
)
type statsService struct {
suffix string
watchFreq time.Duration
}
const pkgPath = "v.io/core/veyron/services/mgmt/stats/impl"
var (
errNoValue = verror.Register(types.NoValue, verror.NoRetry, "{1:}{2:} object has no value{:_}")
errOperationFailed = verror.Register(pkgPath+".errOperationFailed", verror.NoRetry, "{1:}{2:} operation failed{:_}")
)
// NewStatsService returns a stats server implementation. The value of watchFreq
// is used to specify the time between WatchGlob updates.
func NewStatsService(suffix string, watchFreq time.Duration) interface{} {
return stats.StatsServer(&statsService{suffix, watchFreq})
}
// Glob__ returns the name of all objects that match pattern.
func (i *statsService) Glob__(ctx ipc.ServerContext, pattern string) (<-chan naming.VDLMountEntry, error) {
vlog.VI(1).Infof("%v.Glob__(%q)", i.suffix, pattern)
ch := make(chan naming.VDLMountEntry)
go func() {
defer close(ch)
it := libstats.Glob(i.suffix, pattern, time.Time{}, false)
for it.Advance() {
ch <- naming.VDLMountEntry{Name: it.Value().Key}
}
if err := it.Err(); err != nil {
vlog.VI(1).Infof("libstats.Glob(%q, %q) failed: %v", i.suffix, pattern, err)
}
}()
return ch, nil
}
// WatchGlob returns the name and value of the objects that match the request,
// followed by periodic updates when values change.
func (i *statsService) WatchGlob(ctx watch.GlobWatcherWatchGlobContext, req watchtypes.GlobRequest) error {
vlog.VI(1).Infof("%v.WatchGlob(%+v)", i.suffix, req)
var t time.Time
Loop:
for {
prevTime := t
t = time.Now()
it := libstats.Glob(i.suffix, req.Pattern, prevTime, true)
changes := []watchtypes.Change{}
for it.Advance() {
v := it.Value()
c := watchtypes.Change{
Name: v.Key,
State: watchtypes.Exists,
Value: v.Value,
}
changes = append(changes, c)
}
if err := it.Err(); err != nil {
if err == libstats.ErrNotFound {
return verror.Make(verror.NoExist, ctx.Context(), i.suffix)
}
return verror.Make(errOperationFailed, ctx.Context(), i.suffix)
}
for _, change := range changes {
if err := ctx.SendStream().Send(change); err != nil {
return err
}
}
select {
case <-ctx.Context().Done():
break Loop
case <-time.After(i.watchFreq):
}
}
return nil
}
// Value returns the value of the receiver object.
func (i *statsService) Value(ctx ipc.ServerContext) (vdl.AnyRep, error) {
vlog.VI(1).Infof("%v.Value()", i.suffix)
v, err := libstats.Value(i.suffix)
switch err {
case libstats.ErrNotFound:
return nil, verror.Make(verror.NoExist, ctx.Context(), i.suffix)
case libstats.ErrNoValue:
return nil, verror.Make(errNoValue, ctx.Context(), i.suffix)
case nil:
return v, nil
default:
return nil, verror.Make(errOperationFailed, ctx.Context(), i.suffix)
}
}