// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Client methods for stream configuration.

package client

import (
	"fmt"
	"strings"
	"time"

	"v.io/v23/context"
	wire "v.io/v23/services/syncbase"
	"v.io/v23/syncbase"
	"v.io/v23/verror"
	"v.io/x/sensorlog/internal/config"
	"v.io/x/sensorlog/internal/sbmodel"
	"v.io/x/sensorlog/internal/sbmodel/keyutil"
)

// CreateStream writes the configuration for a new sampling stream to Syncbase.
// The target device must have been configured. streamId must be unique for
// the target device.
// TODO(ivanpi): Make start time configurable.
func CreateStream(ctx *context.T, db syncbase.Database, devKey *sbmodel.KDeviceCfg, streamId string, script string, interval time.Duration, desc string) (*sbmodel.KStreamDef, error) {
	if err := keyutil.ValidateId(streamId); err != nil {
		return nil, fmt.Errorf("invalid streamId: %v", err)
	}
	if strings.TrimSpace(script) == "" {
		return nil, fmt.Errorf("sampling script cannot be empty")
	}
	if interval.Nanoseconds() < config.MinSamplingInterval.Nanoseconds() {
		return nil, fmt.Errorf("sampling interval cannot be smaller than %v", config.MinSamplingInterval)
	}
	stmKey := sbmodel.KStreamDef{
		DevId:    devKey.DevId,
		StreamId: streamId,
	}
	if desc == "" {
		desc = "stream:" + stmKey.Key()
	}
	stmVal := sbmodel.VStreamDef{
		Desc: desc,
		Sampler: sbmodel.SamplerDef{
			Script:   script,
			Start:    time.Now(),
			Interval: interval,
		},
		Enabled: true,
	}

	if err := syncbase.RunInBatch(ctx, db, wire.BatchOptions{}, func(db syncbase.BatchDatabase) error {
		devRow := db.Collection(devKey.Collection()).Row(devKey.Key())
		stmRow := db.Collection(stmKey.Collection()).Row(stmKey.Key())

		if exists, err := devRow.Exists(ctx); err != nil {
			return err
		} else if !exists {
			return verror.New(verror.ErrNoExist, ctx, "Device '"+devKey.Key()+"' does not exist")
		}

		if exists, err := stmRow.Exists(ctx); err != nil {
			return err
		} else if exists {
			return verror.New(verror.ErrExist, ctx, "Stream '"+stmKey.Key()+"' already exists")
		}

		return stmRow.Put(ctx, stmVal)
	}); err != nil {
		return nil, err
	}

	return &stmKey, nil
}

// TODO(ivanpi): Implement pausing and resuming streams.
