blob: 3518d911895068e80fcc85ab36b78f56c1c2e5bc [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Measured methods for configuration change watching.
package measure
import (
"fmt"
"v.io/v23/context"
"v.io/v23/syncbase/nosql"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbmodel/keyutil"
)
// RegisterWorker is called with the key and value of a new or modified
// StreamDef. Thread safety is not required.
type RegisterWorker func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error
// WatchForStreams watches Syncbase for existing, new, and modified stream
// definitions for the specified measuring device and calls the register
// callback.
// If a malformed StreamDef key or value is encountered, or register returns
// an error, measured exits with an error.
// WatchForStreams is synchronous and runs until the context is cancelled or
// an error is encountered.
func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
tableName := sbmodel.KStreamDef{}.Table()
watchPrefix := keyutil.Join(devId, "")
// Watch for StreamDef changes and register samplers as needed.
ws, err := db.Watch(ctx, tableName, watchPrefix, nil)
if err != nil {
return err
}
defer ws.Cancel()
for ws.Advance() {
c := ws.Change()
key := &sbmodel.KStreamDef{}
if err := key.Parse(c.Row); err != nil {
return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
}
switch c.ChangeType {
case nosql.PutChange:
val := &sbmodel.VStreamDef{}
if err := c.Value(val); err != nil {
return fmt.Errorf("invalid StreamDef value for key %s: %v", c.Row, err)
}
if err := register(key, val); err != nil {
return err
}
case nosql.DeleteChange:
return fmt.Errorf("StreamDef delete is not supported")
}
}
return ws.Err()
}