// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package measure_test

import (
	"fmt"
	"reflect"
	"sort"
	"testing"
	"time"

	"v.io/v23/context"
	"v.io/v23/syncbase"
	_ "v.io/x/ref/runtime/factories/roaming"
	sbtu "v.io/x/ref/services/syncbase/testutil"
	tu "v.io/x/ref/test/testutil"
	"v.io/x/sensorlog/internal/measure"
	"v.io/x/sensorlog/internal/sbmodel"
	"v.io/x/sensorlog/internal/sbmodel/keyutil"
	"v.io/x/sensorlog/internal/sbutil"
	"v.io/x/sensorlog/internal/util"
)

var watchCollections = []sbmodel.CollectionSpec{
	{Prototype: &sbmodel.KStreamDef{}},
}

func TestWatchForStreams(t *testing.T) {
	_, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one:sb", nil)
	defer cleanup()

	// Open (create) db as measured. Keep write permission on StreamDef.
	db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchCollections)
	if err != nil {
		t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
	}

	devId := "dev1"
	otherDev := "dev2"
	expectStreams := []string{"str1", "str2", "str3"}
	gotStreams := make([]string, 0, len(expectStreams))

	time.Sleep(1 * time.Second)
	putStreamDef(t, ctxMeasured, db, devId, expectStreams[0])
	putStreamDef(t, ctxMeasured, db, otherDev, "foo")

	// Watch should call the callback for all devId streams, whether they were
	// written before or after starting watch. Streams for other devices should
	// not be returned.
	time.Sleep(1 * time.Second)
	stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
		if got, want := key.DevId, devId; got != want {
			return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
		}
		gotStreams = append(gotStreams, val.Desc)
		return nil
	})
	putStreamDef(t, ctxMeasured, db, devId, expectStreams[1])

	time.Sleep(1 * time.Second)
	putStreamDef(t, ctxMeasured, db, otherDev, "bar")
	putStreamDef(t, ctxMeasured, db, devId, expectStreams[2])

	time.Sleep(3 * time.Second)
	stop()
	if err := wait(); err != nil {
		t.Fatalf("watcher exited with error: %v", err)
	}

	sort.Strings(gotStreams)
	sort.Strings(expectStreams)
	if got, want := gotStreams, expectStreams; !reflect.DeepEqual(got, want) {
		t.Errorf("watch returned streams do not match: got %v, want %v", got, want)
	}
}

func TestWatchError(t *testing.T) {
	_, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one:sb", nil)
	defer cleanup()

	// Open (create) db as measured. Keep write permission on StreamDef.
	db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchCollections)
	if err != nil {
		t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
	}

	// Watch should return the callback error.
	devId := "dev1"
	cbErr := fmt.Errorf("callbackError")
	stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
		return cbErr
	})
	// Put anything to trigger callback.
	putStreamDef(t, ctxMeasured, db, devId, "foo")

	time.Sleep(3 * time.Second)
	stop()
	if err := wait(); err != cbErr {
		t.Errorf("watch should have failed with %v, got: %v", cbErr, err)
	}

	// Watch should return an error when a malformed key is encountered.
	devId = "dev2"
	stop, wait = runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
		return nil
	})
	putStreamDef(t, ctxMeasured, db, keyutil.Join(devId, "fail"), "foo")

	time.Sleep(3 * time.Second)
	stop()
	if err := wait(); err == nil {
		t.Errorf("watch should have failed on malformed key")
	}
}

func runWatchForStreams(ctx *context.T, db syncbase.Database, devId string, register measure.RegisterWorker) (stop func(), wait func() error) {
	ctx, stop = context.WithCancel(ctx)
	wait = util.AsyncRun(func() error {
		return measure.WatchForStreams(ctx, db, devId, register)
	}, nil)
	return stop, wait
}

func putStreamDef(t *testing.T, ctx *context.T, db syncbase.DatabaseHandle, devId, desc string) {
	key := &sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()}
	val := sbmodel.VStreamDef{Desc: desc}
	if err := db.CollectionForId(sbmodel.CollectionId(key)).Put(ctx, key.Key(), &val); err != nil {
		t.Fatalf(tu.FormatLogLine(2, "failed to put StreamDef %s (%s): %v", key.Key(), val.Desc, err))
	}
}
