// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package client_test

import (
	"bytes"
	"fmt"
	"reflect"
	"sort"
	"syscall"
	"time"

	"v.io/v23"
	"v.io/v23/context"
	"v.io/v23/naming"
	"v.io/x/ref/lib/signals"
	_ "v.io/x/ref/runtime/factories/generic"
	sbtu "v.io/x/ref/services/syncbase/testutil"
	"v.io/x/ref/test/modules"
	"v.io/x/ref/test/v23tests"
	slltu "v.io/x/sensorlog_lite/internal/client/testutil"
	"v.io/x/sensorlog_lite/internal/measure"
	"v.io/x/sensorlog_lite/internal/sbmodel"
	"v.io/x/sensorlog_lite/internal/sbutil"
	"v.io/x/sensorlog_lite/internal/util"
)

//go:generate jiri test generate

const (
	dummyScript   = "echo 42;"
	dummyInterval = "1s"
)

func V23TestDeviceAddAndStreamCreate(t *v23tests.T) {
	// Start a 'global' mounttable.
	globalMT, globalMTHandle := startAdditionalMT(t, "--v23.tcp.address=127.0.0.1:0")
	// Mount the local mounttable in the global one.
	localMT := naming.Join(globalMT, "localmt")
	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0", "--name="+localMT)

	clientSb := "sb/client"
	clientCreds, _ := t.Shell().NewChildCredentials("client")
	clientSbCreds, _ := t.Shell().NewChildCredentials("client/sb")
	cleanup := sbtu.StartSyncbased(t, clientSbCreds, clientSb, "",
		`{"Read": {"In":["root/client"]}, "Write": {"In":["root/client"]}, "Admin": {"In":["root/client"]}, "Resolve": {"In":["..."]}}`)
	defer cleanup()

	measuredSb := "sb/measured"
	measuredCreds, _ := t.Shell().NewChildCredentials("measured")
	measuredSbCreds, _ := t.Shell().NewChildCredentials("measured/sb")
	cleanup = sbtu.StartSyncbased(t, measuredSbCreds, measuredSb, "",
		`{"Read": {"In":["root/measured"]}, "Write": {"In":["root/measured"]}, "Admin": {"In":["root/measured"]}, "Resolve": {"In":["..."]}}`)
	defer cleanup()

	time.Sleep(1 * time.Second)

	// Initialize measuring device syncgroup using the global mounttable alias
	// of measured Syncbase for publishing.
	devId := "measured1"
	publishSb := naming.Join(localMT, measuredSb)
	sbtu.RunClient(t, measuredCreds, runInitSyncgroup, measuredSb, devId, "root/client", publishSb, globalMT)

	// Add measuring device to client, joining its syncgroup.
	sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb)

	// Allow time for syncgroup metadata sync.
	time.Sleep(3 * time.Second)

	// After syncgroup has been joined, sync can use local mounttable. Kill the
	// global one.
	if err := globalMTHandle.Kill(syscall.SIGINT); err != nil {
		t.Fatalf("failed to kill global mounttable: %v", err)
	}
	if err := globalMTHandle.Shutdown(nil, nil); err != nil {
		t.Fatalf("failed to shutdown global mounttable: %v", err)
	}

	streamIds := []string{"str1", "str2", "str3"}

	// Create stream before starting measured watcher.
	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[0], dummyScript, dummyInterval)

	time.Sleep(1 * time.Second)

	measureWatcher, err := t.Shell().StartWithOpts(
		t.Shell().DefaultStartOpts().WithCustomCredentials(measuredCreds).WithSessions(t, 60*time.Second), nil,
		runWatchForStreams, append([]string{measuredSb, devId}, streamIds...)...)
	if err != nil {
		t.Fatalf("failed to start measureWatcher: %v", err)
	}

	// Create more streams.
	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[1], dummyScript, dummyInterval)

	time.Sleep(1 * time.Second)

	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[2], dummyScript, dummyInterval)

	// Allow time for sync.
	time.Sleep(3 * time.Second)

	// Kill will stop the watcher and cause it to verify that seen streams match
	// expectations.
	if err := syscall.Kill(measureWatcher.Pid(), syscall.SIGINT); err != nil {
		t.Fatalf("failed to kill measureWatcher: %v", err)
	}
	var stdout, stderr bytes.Buffer
	if err := measureWatcher.Shutdown(&stdout, &stderr); err != nil {
		t.Errorf("failed to shutdown measureWatcher: %v")
	}
	t.Logf("measureWatcher stdout:\n%s", stdout.String())
	t.Logf("measureWatcher stderr:\n%s", stderr.String())
}

var runInitSyncgroup = modules.Register(func(env *modules.Env, args ...string) error {
	sbService, devId, admin, publishSb, sgMT := args[0], args[1], args[2], args[3], args[4]

	ctx, cleanup := v23.Init()
	defer cleanup()
	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
	if err != nil {
		return fmt.Errorf("failed initializing measured database: %v", err)
	}

	sgMTs := append(v23.GetNamespace(ctx).Roots(), sgMT)
	if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
		return fmt.Errorf("failed initializing measured syncgroup: %v", err)
	}
	if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
		return fmt.Errorf("initializing measured syncgroup should be idempotent, repeat failed: %v", err)
	}

	return nil
}, "runInitSyncgroup")

var runWatchForStreams = modules.Register(func(env *modules.Env, args ...string) error {
	sbService, devId, expectStreams := args[0], args[1], args[2:]

	ctx, cleanup := v23.Init()
	defer cleanup()

	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
	if err != nil {
		return fmt.Errorf("failed opening measured database: %v", err)
	}

	gotStreams := make([]string, 0, len(expectStreams))

	ctx, stop := context.WithCancel(ctx)
	wait := util.AsyncRun(func() error {
		return measure.WatchForStreams(ctx, db, devId, func(key *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
			if got, want := key.DevId, devId; got != want {
				return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
			}
			gotStreams = append(gotStreams, key.StreamId)
			return nil
		})
	}, func(err error) {
		stop()
	})

	// Wait for kill, then stop watcher.
	<-signals.ShutdownOnSignals(nil)
	stop()

	if err := wait(); err != nil {
		return err
	}

	// Compare streams seen by watcher to expectations.
	sort.Strings(gotStreams)
	sort.Strings(expectStreams)
	if got, want := gotStreams, expectStreams; !reflect.DeepEqual(got, want) {
		return fmt.Errorf("watch returned streams do not match: got %v, want %v", got, want)
	}

	return nil
}, "runWatchForStreams")

func startAdditionalMT(t *v23tests.T, args ...string) (string, *v23tests.Invocation) {
	bin := t.BuildV23Pkg("v.io/x/ref/services/mounttable/mounttabled")
	inv := bin.Start(args...)
	return inv.ExpectVar("NAME"), inv
}
