| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| // Client methods for stream configuration. |
| |
| package client |
| |
| import ( |
| "fmt" |
| "strings" |
| "time" |
| |
| "v.io/v23/context" |
| wire "v.io/v23/services/syncbase" |
| "v.io/v23/syncbase" |
| "v.io/v23/verror" |
| "v.io/x/sensorlog/internal/config" |
| "v.io/x/sensorlog/internal/sbmodel" |
| "v.io/x/sensorlog/internal/sbmodel/keyutil" |
| ) |
| |
| // CreateStream writes the configuration for a new sampling stream to Syncbase. |
| // The target device must have been configured. streamId must be unique for |
| // the target device. |
| // TODO(ivanpi): Make start time configurable. |
| func CreateStream(ctx *context.T, db syncbase.Database, devKey *sbmodel.KDeviceCfg, streamId string, script string, interval time.Duration, desc string) (*sbmodel.KStreamDef, error) { |
| if err := keyutil.ValidateId(streamId); err != nil { |
| return nil, fmt.Errorf("invalid streamId: %v", err) |
| } |
| if strings.TrimSpace(script) == "" { |
| return nil, fmt.Errorf("sampling script cannot be empty") |
| } |
| if interval.Nanoseconds() < config.MinSamplingInterval.Nanoseconds() { |
| return nil, fmt.Errorf("sampling interval cannot be smaller than %v", config.MinSamplingInterval) |
| } |
| stmKey := sbmodel.KStreamDef{ |
| DevId: devKey.DevId, |
| StreamId: streamId, |
| } |
| if desc == "" { |
| desc = "stream:" + stmKey.Key() |
| } |
| stmVal := sbmodel.VStreamDef{ |
| Desc: desc, |
| Sampler: sbmodel.SamplerDef{ |
| Script: script, |
| Start: time.Now(), |
| Interval: interval, |
| }, |
| Enabled: true, |
| } |
| |
| if err := syncbase.RunInBatch(ctx, db, wire.BatchOptions{}, func(db syncbase.BatchDatabase) error { |
| devRow := db.Collection(devKey.Collection()).Row(devKey.Key()) |
| stmRow := db.Collection(stmKey.Collection()).Row(stmKey.Key()) |
| |
| if exists, err := devRow.Exists(ctx); err != nil { |
| return err |
| } else if !exists { |
| return verror.New(verror.ErrNoExist, ctx, "Device '"+devKey.Key()+"' does not exist") |
| } |
| |
| if exists, err := stmRow.Exists(ctx); err != nil { |
| return err |
| } else if exists { |
| return verror.New(verror.ErrExist, ctx, "Stream '"+stmKey.Key()+"' already exists") |
| } |
| |
| return stmRow.Put(ctx, stmVal) |
| }); err != nil { |
| return nil, err |
| } |
| |
| return &stmKey, nil |
| } |
| |
| // TODO(ivanpi): Implement pausing and resuming streams. |