blob: 7926019318d0d03573b6e6a74db5feb6eae5f519 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
// Client methods for stream configuration.
package client
import (
"fmt"
"strings"
"time"
"v.io/v23/context"
wire "v.io/v23/services/syncbase"
"v.io/v23/syncbase"
"v.io/v23/verror"
"v.io/x/sensorlog/internal/config"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbmodel/keyutil"
)
// CreateStream writes the configuration for a new sampling stream to Syncbase.
// The target device must have been configured. streamId must be unique for
// the target device.
// TODO(ivanpi): Make start time configurable.
func CreateStream(ctx *context.T, db syncbase.Database, devKey *sbmodel.KDeviceCfg, streamId string, script string, interval time.Duration, desc string) (*sbmodel.KStreamDef, error) {
if err := keyutil.ValidateId(streamId); err != nil {
return nil, fmt.Errorf("invalid streamId: %v", err)
}
if strings.TrimSpace(script) == "" {
return nil, fmt.Errorf("sampling script cannot be empty")
}
if interval.Nanoseconds() < config.MinSamplingInterval.Nanoseconds() {
return nil, fmt.Errorf("sampling interval cannot be smaller than %v", config.MinSamplingInterval)
}
stmKey := &sbmodel.KStreamDef{
DevId: devKey.DevId,
StreamId: streamId,
}
if desc == "" {
desc = "stream:" + stmKey.Key()
}
stmVal := sbmodel.VStreamDef{
Desc: desc,
Sampler: sbmodel.SamplerDef{
Script: script,
Start: time.Now(),
Interval: interval,
},
Enabled: true,
}
if err := syncbase.RunInBatch(ctx, db, wire.BatchOptions{}, func(db syncbase.BatchDatabase) error {
devRow := db.Collection(ctx, devKey.Collection()).Row(devKey.Key())
stmRow := db.Collection(ctx, stmKey.Collection()).Row(stmKey.Key())
if exists, err := devRow.Exists(ctx); err != nil {
return err
} else if !exists {
return verror.New(verror.ErrNoExist, ctx, "Device '"+devKey.Key()+"' does not exist")
}
if exists, err := stmRow.Exists(ctx); err != nil {
return err
} else if exists {
return verror.New(verror.ErrExist, ctx, "Stream '"+stmKey.Key()+"' already exists")
}
return stmRow.Put(ctx, stmVal)
}); err != nil {
return nil, err
}
return stmKey, nil
}
// TODO(ivanpi): Implement pausing and resuming streams.