// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Measured methods for configuration change watching.

package measure

import (
	"fmt"

	"v.io/v23/context"
	nosql_wire "v.io/v23/services/syncbase/nosql"
	"v.io/v23/services/watch"
	"v.io/v23/syncbase/nosql"
	"v.io/x/sensorlog/internal/sbmodel"
	"v.io/x/sensorlog/internal/sbmodel/keyutil"
)

// RegisterWorker is called with the key and value of a new or modified
// StreamDef. Thread safety is not required.
type RegisterWorker func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error

// WatchForStreams watches Syncbase for new and modified stream definitions
// for the specified measuring device and calls the register callback.
// If a malformed StreamDef key or value is encountered, or register returns
// an error, measured exits with an error.
// WatchForStreams is synchronous and runs until the context is cancelled or
// an error is encountered.
func WatchForStreams(ctx *context.T, db nosql.Database, devId string, register RegisterWorker) error {
	tableName := sbmodel.KStreamDef{}.Table()
	watchPrefix := keyutil.Join(devId, "")
	var resMark watch.ResumeMarker

	// BeginBatch scoped using function with deferred Abort.
	if err := func() error {
		bdb, err := db.BeginBatch(ctx, nosql_wire.BatchOptions{ReadOnly: true})
		if err != nil {
			return err
		}
		defer bdb.Abort(ctx)

		resMark, err = bdb.GetResumeMarker(ctx)
		if err != nil {
			return err
		}

		// Register samplers for all existing StreamDefs.
		sstr := bdb.Table(tableName).Scan(ctx, nosql.Prefix(watchPrefix))
		defer sstr.Cancel()

		for sstr.Advance() {
			key := &sbmodel.KStreamDef{}
			if err := key.Parse(sstr.Key()); err != nil {
				return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
			}
			val := &sbmodel.VStreamDef{}
			if err := sstr.Value(val); err != nil {
				return fmt.Errorf("invalid StreamDef value for key %s: %v", sstr.Key(), err)
			}
			if err := register(key, val); err != nil {
				return err
			}
		}
		return sstr.Err()
	}(); err != nil {
		return err
	}

	// Watch for StreamDef changes and register samplers as needed.
	ws, err := db.Watch(ctx, tableName, watchPrefix, resMark)
	if err != nil {
		return err
	}
	defer ws.Cancel()

	for ws.Advance() {
		c := ws.Change()
		key := &sbmodel.KStreamDef{}
		if err := key.Parse(c.Row); err != nil {
			return fmt.Errorf("invalid StreamDef key for prefix %s: %v", watchPrefix, err)
		}
		switch c.ChangeType {
		case nosql.PutChange:
			val := &sbmodel.VStreamDef{}
			if err := c.Value(val); err != nil {
				return fmt.Errorf("invalid StreamDef value for key %s: %v", c.Row, err)
			}
			if err := register(key, val); err != nil {
				return err
			}
		case nosql.DeleteChange:
			return fmt.Errorf("StreamDef delete is not supported")
		}
	}
	return ws.Err()
}
