| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Measured methods for configuration change watching. |
| |
| package measure |
| |
| import ( |
| "fmt" |
| |
| "v.io/v23/context" |
| nosql_wire "v.io/v23/services/syncbase/nosql" |
| "v.io/v23/services/watch" |
| "v.io/v23/syncbase/nosql" |
| "v.io/x/sensorlog/internal/sbmodel" |
| "v.io/x/sensorlog/internal/sbmodel/keyutil" |
| ) |
| |
| // RegisterWorker is called with the key and value of a new or modified |
| // StreamDef. Thread safety is not required. |
| type RegisterWorker func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error |
| |
| // WatchForStreams watches Syncbase for new and modified stream definitions |
| // for the specified measuring device and calls the register callback. |
| // If a malformed StreamDef key or value is encountered, or register returns |
| // an error, measured exits with an error. |
| // WatchForStreams is synchronous and runs until the context is cancelled or |
| // an error is encountered. |
| func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error { |
| tableName := sbmodel.KStreamDef{}.Table() |
| watchPrefix := keyutil.Join(devId, "") |
| var resMark watch.ResumeMarker |
| |
| // BeginBatch scoped using function with deferred Abort. |
| if err := func() error { |
| bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true}) |
| if err != nil { |
| return err |
| } |
| defer bdb.Abort(ctx) |
| |
| resMark, err = bdb.GetResumeMarker(ctx) |
| if err != nil { |
| return err |
| } |
| |
| // Register samplers for all existing StreamDefs. |
| sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(watchPrefix)) |
| defer sstr.Cancel() |
| |
| for sstr.Advance() { |
| key := &sbmodel.KStreamDef{} |
| if err := key.Parse(sstr.Key()); err != nil { |
| return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err) |
| } |
| val := &sbmodel.VStreamDef{} |
| if err := sstr.Value(val); err != nil { |
| return fmt.Errorf("invalid StreamDef value for key %s: %v", sstr.Key(), err) |
| } |
| if err := register(key, val); err != nil { |
| return err |
| } |
| } |
| return sstr.Err() |
| }(); err != nil { |
| return err |
| } |
| |
| // Watch for StreamDef changes and register samplers as needed. |
| ws, err := db.Watch(ctx, tableName, watchPrefix, resMark) |
| if err != nil { |
| return err |
| } |
| defer ws.Cancel() |
| |
| for ws.Advance() { |
| c := ws.Change() |
| key := &sbmodel.KStreamDef{} |
| if err := key.Parse(c.Row); err != nil { |
| return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err) |
| } |
| switch c.ChangeType { |
| case nosql.PutChange: |
| val := &sbmodel.VStreamDef{} |
| if err := c.Value(val); err != nil { |
| return fmt.Errorf("invalid StreamDef value for key %s: %v", c.Row, err) |
| } |
| if err := register(key, val); err != nil { |
| return err |
| } |
| case nosql.DeleteChange: |
| return fmt.Errorf("StreamDef delete is not supported") |
| } |
| } |
| return ws.Err() |
| } |