| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Measured methods for configuration change watching. |
| |
| package measure |
| |
| import ( |
| "fmt" |
| |
| "v.io/v23/context" |
| "v.io/v23/syncbase" |
| "v.io/x/sensorlog/internal/sbmodel" |
| "v.io/x/sensorlog/internal/sbmodel/keyutil" |
| ) |
| |
| // RegisterWorker is called with the key and value of a new or modified |
| // StreamDef. Thread safety is not required. |
| type RegisterWorker func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error |
| |
| // WatchForStreams watches Syncbase for existing, new, and modified stream |
| // definitions for the specified measuring device and calls the register |
| // callback. |
| // If a malformed StreamDef key or value is encountered, or register returns |
| // an error, measured exits with an error. |
| // WatchForStreams is synchronous and runs until the context is cancelled or |
| // an error is encountered. |
| func WatchForStreams(ctx *context.T, db syncbase.Database, devId string, register RegisterWorker) error { |
| collectionId := sbmodel.CollectionId(&sbmodel.KStreamDef{}) |
| watchPrefix := keyutil.Join(devId, "") |
| |
| // Watch for StreamDef changes and register samplers as needed. |
| ws, err := db.Watch(ctx, collectionId, watchPrefix, nil) |
| if err != nil { |
| return err |
| } |
| defer ws.Cancel() |
| |
| for ws.Advance() { |
| c := ws.Change() |
| key := &sbmodel.KStreamDef{} |
| if err := key.Parse(c.Row); err != nil { |
| return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err) |
| } |
| switch c.ChangeType { |
| case syncbase.PutChange: |
| val := &sbmodel.VStreamDef{} |
| if err := c.Value(val); err != nil { |
| return fmt.Errorf("invalid StreamDef value for key %s: %v", c.Row, err) |
| } |
| if err := register(key, val); err != nil { |
| return err |
| } |
| case syncbase.DeleteChange: |
| return fmt.Errorf("StreamDef delete is not supported") |
| } |
| } |
| return ws.Err() |
| } |