blob: bf59a108400c221b1a8ea6a6ad3ffb34c1f6609b [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Measured methods for configuration change watching.
package measure
import (
"fmt"
"v.io/v23/context"
wire "v.io/v23/services/syncbase"
"v.io/v23/syncbase"
"v.io/v23/syncbase/util"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbmodel/keyutil"
)
// RegisterWorker is called with the key and value of a new or modified
// StreamDef. Thread safety is not required.
type RegisterWorker func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error
// WatchForStreams watches Syncbase for existing, new, and modified stream
// definitions for the specified measuring device and calls the register
// callback.
// If a malformed StreamDef key or value is encountered, or register returns
// an error, measured exits with an error.
// WatchForStreams is synchronous and runs until the context is cancelled or
// an error is encountered.
func WatchForStreams(ctx *context.T, db syncbase.Database, devId string, register RegisterWorker) error {
collection := db.Collection(ctx, sbmodel.KStreamDef{}.Collection())
watchPrefix := keyutil.Join(devId, "")
// Watch for StreamDef changes and register samplers as needed.
ws := db.Watch(ctx, nil, []wire.CollectionRowPattern{
util.RowPrefixPattern(collection.Id(), watchPrefix),
})
defer ws.Cancel()
for ws.Advance() {
c := ws.Change()
if c.EntityType != syncbase.EntityRow {
continue
}
key := &sbmodel.KStreamDef{}
if err := key.Parse(c.Row); err != nil {
return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
}
switch c.ChangeType {
case syncbase.PutChange:
val := &sbmodel.VStreamDef{}
if err := c.Value(val); err != nil {
return fmt.Errorf("invalid StreamDef value for key %s: %v", c.Row, err)
}
if err := register(key, val); err != nil {
return err
}
case syncbase.DeleteChange:
return fmt.Errorf("StreamDef delete is not supported")
}
}
return ws.Err()
}