// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package client_test

import (
	"fmt"
	"os"
	"testing"
	"time"

	"v.io/v23"
	"v.io/v23/context"
	"v.io/v23/naming"
	"v.io/x/ref/services/syncbase/syncbaselib"
	"v.io/x/ref/test/v23test"
	"v.io/x/sensorlog/internal/client"
	sltu "v.io/x/sensorlog/internal/client/testutil"
	"v.io/x/sensorlog/internal/sbmodel"
	"v.io/x/sensorlog/internal/sbutil"
)

func TestV23StreamConfigAndList(t *testing.T) {
	// TODO(ivanpi): Sensorlog needs a bigger migration to work well with
	// the new api. Skipping these tests for now.
	t.Skip()
	v23test.SkipUnlessRunningIntegrationTests(t)
	sh := v23test.NewShell(t, nil)
	defer sh.Cleanup()
	sh.StartRootMountTable()
	mtName := v23.GetNamespace(sh.Ctx).Roots()[0]

	clientSb := "sb/client"
	clientCtx := sh.ForkContext("u:client")
	clientSbCreds := sh.ForkCredentials("u:client:sb")
	sh.StartSyncbase(clientSbCreds, syncbaselib.Opts{Name: clientSb}, `{"Read": {"In":["root:u:client"]}, "Write": {"In":["root:u:client"]}, "Admin": {"In":["root:u:client"]}, "Resolve": {"In":["..."]}}`)

	measuredSb := "sb/measured"
	measuredCreds := sh.ForkCredentials("u:measured")
	measuredSbCreds := sh.ForkCredentials("u:measured:sb")
	sh.StartSyncbase(measuredSbCreds, syncbaselib.Opts{Name: measuredSb}, `{"Read": {"In":["root:u:measured"]}, "Write": {"In":["root:u:measured"]}, "Admin": {"In":["root:u:measured"]}, "Resolve": {"In":["..."]}}`)

	time.Sleep(1 * time.Second)

	// Start measured.
	measuredPath := v23test.BuildGoPkg(sh, "v.io/x/sensorlog/measured")
	devId := "measured1"
	publishSb := naming.Join(mtName, measuredSb)
	measured := sh.Cmd(measuredPath, "-alsologtostderr", "-service="+measuredSb, "-devid="+devId, "-admin=root:u:client", "-publish-sb="+publishSb)
	measured = measured.WithCredentials(measuredCreds)
	measured.PropagateOutput = true
	measured.Start()

	time.Sleep(3 * time.Second)

	// Add measuring device to client, joining its syncgroup.
	if err := sltu.AddTestDevice(clientCtx, clientSb, devId, publishSb); err != nil {
		t.Errorf("failed adding device %s: %v", devId, err)
	}

	// Create streams.
	streamId1, result1, interval1 := "str1", 42.0, 2*time.Second
	if err := sltu.CreateTestStream(clientCtx, clientSb, devId,
		streamId1, fmt.Sprintf("echo %f;", result1), interval1); err != nil {
		t.Errorf("failed creating stream %s: %v", streamId1, err)
	}

	time.Sleep(1 * time.Second)

	streamId2, result2, interval2 := "str2", 47.0, 1*time.Second
	if err := sltu.CreateTestStream(clientCtx, clientSb, devId,
		streamId2, fmt.Sprintf("echo %f;", result2), interval2); err != nil {
		t.Errorf("failed creating stream %s: %v", streamId2, err)
	}

	// streamId has another streamId as prefix to verify ListDataStreams prefix
	// handling.
	streamId3, result3, interval3 := "str12", 3.14159, 500*time.Millisecond
	if err := sltu.CreateTestStream(clientCtx, clientSb, devId,
		streamId3, fmt.Sprintf("echo %f;", result3), interval3); err != nil {
		t.Errorf("failed creating stream %s: %v", streamId3, err)
	}

	// Allow time for sync and at least 3 measurements.
	time.Sleep(10 * time.Second)

	// SIGINT will gracefully stop measured.
	measured.Terminate(os.Interrupt)

	// Check that both streams have at least 3 measurements synced back to
	// client device Syncbase.
	if err := listStreamData(clientCtx, clientSb, devId, streamId1, result1); err != nil {
		t.Errorf("failed listing stream %s data: %v", streamId1, err)
	}
	if err := listStreamData(clientCtx, clientSb, devId, streamId2, result2); err != nil {
		t.Errorf("failed listing stream %s data: %v", streamId2, err)
	}
	if err := listStreamData(clientCtx, clientSb, devId, streamId3, result3); err != nil {
		t.Errorf("failed listing stream %s data: %v", streamId3, err)
	}
}

func listStreamData(ctx *context.T, sbService, devId, streamId string, expectVal float64) error {
	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterCollections)
	if err != nil {
		return fmt.Errorf("failed opening master database: %v", err)
	}

	count := 0
	streamKey := &sbmodel.KStreamDef{
		DevId:    devId,
		StreamId: streamId,
	}
	if err := client.ListStreamData(ctx, db, streamKey, func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
		if got, want := key.StreamId, streamId; got != want {
			return fmt.Errorf("list returned data for wrong stream: got %s, want %s", got, want)
		}
		switch val := val.(type) {
		case sbmodel.VDataPointValue:
			if val.Value != expectVal {
				return fmt.Errorf("unexpected stream data value, got: %v, want: %v", val.Value, expectVal)
			}
		default:
			return fmt.Errorf("unexpected stream data type, got: %v, want value: %v", val, expectVal)
		}
		count++
		return nil
	}); err != nil {
		return err
	}

	if count < 3 {
		return fmt.Errorf("expected at least 3 measurements for %s, got %d", streamId, count)
	}

	return nil
}
