blob: 53e17d42ea3773f0c4d38ce37da5a0f1f468f024 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Package statslib implements the Stats interface from
// v.io/v23/services/stats.
package statslib
import (
"time"
"v.io/v23/context"
"v.io/v23/glob"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/services/stats"
"v.io/v23/services/watch"
"v.io/v23/verror"
"v.io/v23/vom"
libstats "v.io/x/ref/lib/stats"
)
type statsService struct {
suffix string
watchFreq time.Duration
}
const pkgPath = "v.io/x/ref/services/internal/statslib"
var (
errOperationFailed = verror.Register(pkgPath+".errOperationFailed", verror.NoRetry, "{1:}{2:} operation failed{:_}")
)
// NewStatsService returns a stats server implementation. The value of watchFreq
// is used to specify the time between WatchGlob updates.
func NewStatsService(suffix string, watchFreq time.Duration) interface{} {
return stats.StatsServer(&statsService{suffix, watchFreq})
}
// Glob__ returns the name of all objects that match pattern.
func (i *statsService) Glob__(ctx *context.T, call rpc.GlobServerCall, g *glob.Glob) error {
ctx.VI(1).Infof("%v.Glob__(%q)", i.suffix, g.String())
sender := call.SendStream()
it := libstats.Glob(i.suffix, g.String(), time.Time{}, false)
for it.Advance() {
sender.Send(naming.GlobReplyEntry{Value: naming.MountEntry{Name: it.Value().Key}})
}
if err := it.Err(); err != nil {
ctx.VI(1).Infof("libstats.Glob(%q, %q) failed: %v", i.suffix, g.String(), err)
}
return nil
}
// WatchGlob returns the name and value of the objects that match the request,
// followed by periodic updates when values change.
func (i *statsService) WatchGlob(ctx *context.T, call watch.GlobWatcherWatchGlobServerCall, req watch.GlobRequest) error {
ctx.VI(1).Infof("%v.WatchGlob(%+v)", i.suffix, req)
var t time.Time
Loop:
for {
prevTime := t
t = time.Now()
it := libstats.Glob(i.suffix, req.Pattern, prevTime, true)
changes := []watch.Change{}
for it.Advance() {
v := it.Value()
c := watch.Change{
Name: v.Key,
State: watch.Exists,
Value: vom.RawBytesOf(v.Value),
}
changes = append(changes, c)
}
if err := it.Err(); err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
return verror.New(verror.ErrNoExist, ctx, i.suffix)
}
return verror.New(errOperationFailed, ctx, i.suffix)
}
for _, change := range changes {
if err := call.SendStream().Send(change); err != nil {
return err
}
}
select {
case <-ctx.Done():
break Loop
case <-time.After(i.watchFreq):
}
}
return nil
}
// Value returns the value of the receiver object.
func (i *statsService) Value(ctx *context.T, _ rpc.ServerCall) (*vom.RawBytes, error) {
ctx.VI(1).Infof("%v.Value()", i.suffix)
rv, err := libstats.Value(i.suffix)
switch {
case verror.ErrorID(err) == verror.ErrNoExist.ID:
return nil, verror.New(verror.ErrNoExist, ctx, i.suffix)
case verror.ErrorID(err) == stats.ErrNoValue.ID:
return nil, stats.NewErrNoValue(ctx, i.suffix)
case err != nil:
return nil, verror.New(errOperationFailed, ctx, i.suffix)
}
rb, err := vom.RawBytesFromValue(rv)
if err != nil {
return nil, verror.New(verror.ErrInternal, ctx, i.suffix, err)
}
return rb, nil
}