blob: 33403084c0692377c95e18833588f530f087c789 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Measured methods for configuration change watching.
package measure
import (
"fmt"
"v.io/v23/context"
nosql_wire "v.io/v23/services/syncbase/nosql"
"v.io/v23/services/watch"
"v.io/v23/syncbase/nosql"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbmodel/keyutil"
)
// RegisterWorker is called with the key and value of a new or modified
// StreamDef. Thread safety is not required.
type RegisterWorker func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error
// WatchForStreams watches Syncbase for new and modified stream definitions
// for the specified measuring device and calls the register callback.
// If a malformed StreamDef key or value is encountered, or register returns
// an error, measured exits with an error.
// WatchForStreams is synchronous and runs until the context is cancelled or
// an error is encountered.
func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
tableName := sbmodel.KStreamDef{}.Table()
watchPrefix := keyutil.Join(devId, "")
var resMark watch.ResumeMarker
// BeginBatch scoped using function with deferred Abort.
if err := func() error {
bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
if err != nil {
return err
}
defer bdb.Abort(ctx)
resMark, err = bdb.GetResumeMarker(ctx)
if err != nil {
return err
}
// Register samplers for all existing StreamDefs.
sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(watchPrefix))
defer sstr.Cancel()
for sstr.Advance() {
key := &sbmodel.KStreamDef{}
if err := key.Parse(sstr.Key()); err != nil {
return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
}
val := &sbmodel.VStreamDef{}
if err := sstr.Value(val); err != nil {
return fmt.Errorf("invalid StreamDef value for key %s: %v", sstr.Key(), err)
}
if err := register(key, val); err != nil {
return err
}
}
return sstr.Err()
}(); err != nil {
return err
}
// Watch for StreamDef changes and register samplers as needed.
ws, err := db.Watch(ctx, tableName, watchPrefix, resMark)
if err != nil {
return err
}
defer ws.Cancel()
for ws.Advance() {
c := ws.Change()
key := &sbmodel.KStreamDef{}
if err := key.Parse(c.Row); err != nil {
return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
}
switch c.ChangeType {
case nosql.PutChange:
val := &sbmodel.VStreamDef{}
if err := c.Value(val); err != nil {
return fmt.Errorf("invalid StreamDef value for key %s: %v", c.Row, err)
}
if err := register(key, val); err != nil {
return err
}
case nosql.DeleteChange:
return fmt.Errorf("StreamDef delete is not supported")
}
}
return ws.Err()
}