// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package client_test

import (
	"fmt"
	"os"
	"reflect"
	"sort"
	"testing"
	"time"

	"v.io/v23"
	"v.io/v23/context"
	"v.io/v23/naming"
	"v.io/x/ref/test/v23test"
	sltu "v.io/x/sensorlog/internal/client/testutil"
	"v.io/x/sensorlog/internal/measure"
	"v.io/x/sensorlog/internal/sbmodel"
	"v.io/x/sensorlog/internal/sbutil"
	"v.io/x/sensorlog/internal/util"
)

const (
	dummyScript   = "echo 42;"
	dummyInterval = 1 * time.Second
)

func TestV23DeviceAddAndStreamCreate(t *testing.T) {
	v23test.SkipUnlessRunningIntegrationTests(t)
	sh := v23test.NewShell(t, nil)
	defer sh.Cleanup()
	// Start a 'global' mounttable.
	globalMT, globalMTShutdown := startAdditionalMT(sh, "--v23.tcp.address=127.0.0.1:0")
	// Mount the local mounttable in the global one.
	localMT := naming.Join(globalMT, "localmt")
	sh.StartRootMountTable("--name=" + localMT)

	clientSb := "sb/client"
	clientCtx := sh.ForkContext("u:client")
	clientSbCreds := sh.ForkCredentials("u:client:sb")
	sh.StartSyncbase(clientSbCreds, clientSb, "",
		`{"Read": {"In":["root:u:client"]}, "Write": {"In":["root:u:client"]}, "Admin": {"In":["root:u:client"]}, "Resolve": {"In":["..."]}}`)

	measuredSb := "sb/measured"
	measuredCtx := sh.ForkContext("u:measured")
	measuredSbCreds := sh.ForkCredentials("u:measured:sb")
	sh.StartSyncbase(measuredSbCreds, measuredSb, "",
		`{"Read": {"In":["root:u:measured"]}, "Write": {"In":["root:u:measured"]}, "Admin": {"In":["root:u:measured"]}, "Resolve": {"In":["..."]}}`)

	time.Sleep(1 * time.Second)

	// Initialize measuring device syncgroup using the global mounttable alias
	// of measured Syncbase for publishing.
	devId := "measured1"
	publishSb := naming.Join(localMT, measuredSb)
	if err := initSyncgroup(measuredCtx, measuredSb, devId, "root:u:client", publishSb, globalMT); err != nil {
		t.Fatalf("failed initializing syncgroup for device %s: %v", devId, err)
	}

	// Add measuring device to client, joining its syncgroup.
	if err := sltu.AddTestDevice(clientCtx, clientSb, devId, publishSb); err != nil {
		t.Errorf("failed adding device %s: %v", devId, err)
	}

	// Allow time for syncgroup metadata sync.
	time.Sleep(3 * time.Second)

	// After syncgroup has been joined, sync can use local mounttable. Kill the
	// global one.
	globalMTShutdown(os.Interrupt)

	streamIds := []string{"str1", "str2", "str3"}

	// Create stream before starting measured watcher.
	if err := sltu.CreateTestStream(clientCtx, clientSb, devId, streamIds[0], dummyScript, dummyInterval); err != nil {
		t.Errorf("failed creating stream %s: %v", streamIds[0], err)
	}

	time.Sleep(1 * time.Second)

	stopWatch := watchForStreams(measuredCtx, measuredSb, devId)

	// Create more streams.
	if err := sltu.CreateTestStream(clientCtx, clientSb, devId, streamIds[1], dummyScript, dummyInterval); err != nil {
		t.Errorf("failed creating stream %s: %v", streamIds[1], err)
	}

	time.Sleep(1 * time.Second)

	if err := sltu.CreateTestStream(clientCtx, clientSb, devId, streamIds[2], dummyScript, dummyInterval); err != nil {
		t.Errorf("failed creating stream %s: %v", streamIds[2], err)
	}

	// Allow time for sync.
	time.Sleep(3 * time.Second)

	// Stop the watcher.
	gotStreams, err := stopWatch()
	if err != nil {
		t.Fatalf("measureWatcher failed: %v", err)
	}

	// Compare streams seen by watcher to expectations.
	sort.Strings(gotStreams)
	sort.Strings(streamIds)
	if got, want := gotStreams, streamIds; !reflect.DeepEqual(got, want) {
		t.Errorf("watch returned streams do not match: got %v, want %v", got, want)
	}
}

func initSyncgroup(ctx *context.T, sbService, devId, admin, publishSb, sgMT string) error {
	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
	if err != nil {
		return fmt.Errorf("failed initializing measured database: %v", err)
	}

	sgMTs := append(v23.GetNamespace(ctx).Roots(), sgMT)
	if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
		return fmt.Errorf("failed initializing measured syncgroup: %v", err)
	}
	if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
		return fmt.Errorf("initializing measured syncgroup should be idempotent, repeat failed: %v", err)
	}

	return nil
}

func watchForStreams(ctx *context.T, sbService, devId string) (stop func() (gotStreams []string, err error)) {
	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
	if err != nil {
		return func() ([]string, error) {
			return nil, fmt.Errorf("failed opening measured database: %v", err)
		}
	}

	var gotStreams []string

	ctx, cancel := context.WithCancel(ctx)
	wait := util.AsyncRun(func() error {
		return measure.WatchForStreams(ctx, db, devId, func(key *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
			if got, want := key.DevId, devId; got != want {
				return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
			}
			gotStreams = append(gotStreams, key.StreamId)
			return nil
		})
	}, func(err error) {
		cancel()
	})

	return func() ([]string, error) {
		cancel()
		if err := wait(); err != nil {
			return nil, err
		}
		return gotStreams, nil
	}
}

func startAdditionalMT(sh *v23test.Shell, args ...string) (string, func(sig os.Signal)) {
	mounttabledPath := v23test.BuildGoPkg(sh, "v.io/x/ref/services/mounttable/mounttabled")
	inv := sh.Cmd(mounttabledPath, args...)
	inv.Start()
	return inv.S.ExpectVar("NAME"), inv.Terminate
}
