// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package client_test

import (
	"bytes"
	"fmt"
	"strconv"
	"syscall"
	"time"

	"v.io/v23"
	"v.io/v23/naming"
	"v.io/x/ref"
	_ "v.io/x/ref/runtime/factories/generic"
	sbtu "v.io/x/ref/services/syncbase/testutil"
	"v.io/x/ref/test/modules"
	"v.io/x/ref/test/v23tests"
	"v.io/x/sensorlog_lite/internal/client"
	slltu "v.io/x/sensorlog_lite/internal/client/testutil"
	"v.io/x/sensorlog_lite/internal/sbmodel"
	"v.io/x/sensorlog_lite/internal/sbutil"
)

//go:generate jiri test generate

func V23TestStreamConfigAndList(t *v23tests.T) {
	v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
	mtName, _ := t.Shell().GetVar(ref.EnvNamespacePrefix)

	clientSb := "sb/client"
	clientCreds, _ := t.Shell().NewChildCredentials("client")
	clientSbCreds, _ := t.Shell().NewChildCredentials("client/sb")
	cleanup := sbtu.StartSyncbased(t, clientSbCreds, clientSb, "",
		`{"Read": {"In":["root/client"]}, "Write": {"In":["root/client"]}, "Admin": {"In":["root/client"]}, "Resolve": {"In":["..."]}}`)
	defer cleanup()

	measuredSb := "sb/measured"
	measuredCreds, _ := t.Shell().NewChildCredentials("measured")
	measuredSbCreds, _ := t.Shell().NewChildCredentials("measured/sb")
	cleanup = sbtu.StartSyncbased(t, measuredSbCreds, measuredSb, "",
		`{"Read": {"In":["root/measured"]}, "Write": {"In":["root/measured"]}, "Admin": {"In":["root/measured"]}, "Resolve": {"In":["..."]}}`)
	defer cleanup()

	time.Sleep(1 * time.Second)

	// Start measured.
	measuredBin := t.BuildV23Pkg("v.io/x/sensorlog_lite/measured")
	devId := "measured1"
	publishSb := naming.Join(mtName, measuredSb)
	measuredOpts := measuredBin.StartOpts().WithCustomCredentials(measuredCreds).WithSessions(t, 180*time.Second)
	measured := measuredBin.WithStartOpts(measuredOpts).Start(
		"-alsologtostderr", "-service="+measuredSb, "-devid="+devId, "-admin=root/client",
		"-publish-sb="+publishSb)

	time.Sleep(3 * time.Second)

	// Add measuring device to client, joining its syncgroup.
	sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb)

	// Create streams.
	streamId1, result1, interval1 := "str1", "42", "2s"
	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
		streamId1, fmt.Sprintf("echo %s;", result1), interval1, "")

	time.Sleep(1 * time.Second)

	streamId2, result2, interval2 := "str2", "47", "1s"
	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
		streamId2, fmt.Sprintf("echo %s;", result2), interval2, "")

	// streamId has another streamId as prefix to verify ListDataStreams prefix
	// handling.
	streamId3, result3, interval3 := "str12", "3.14159", "0.5s"
	sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
		streamId3, fmt.Sprintf("echo %s;", result3), interval3, "")

	// Allow time for sync and at least 3 measurements.
	time.Sleep(10 * time.Second)

	// Kill will gracefully stop measured.
	if err := syscall.Kill(measured.Pid(), syscall.SIGINT); err != nil {
		t.Fatalf("failed to kill measured: %v", err)
	}
	var stdout, stderr bytes.Buffer
	if err := measured.Shutdown(&stdout, &stderr); err != nil {
		t.Errorf("failed to shutdown measured: %v")
	}
	t.Logf("measured stdout:\n%s", stdout.String())
	t.Logf("measured stderr:\n%s", stderr.String())

	// Check that both streams have at least 3 measurements synced back to
	// client device Syncbase.
	sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId1, result1)
	sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId2, result2)
	sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId3, result3)
}

var runListStreamData = modules.Register(func(env *modules.Env, args ...string) error {
	sbService, devId, streamId, expectValStr := args[0], args[1], args[2], args[3]

	expectVal, err := strconv.ParseFloat(expectValStr, 64)
	if err != nil {
		return err
	}

	ctx, cleanup := v23.Init()
	defer cleanup()

	db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
	if err != nil {
		return fmt.Errorf("failed opening measured database: %v", err)
	}

	count := 0
	streamKey := &sbmodel.KStreamDef{
		DevId:    devId,
		StreamId: streamId,
	}
	if err := client.ListStreamData(ctx, db, streamKey, func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
		if got, want := key.StreamId, streamId; got != want {
			return fmt.Errorf("list returned data for wrong stream: got %s, want %s", got, want)
		}
		switch val := val.(type) {
		case sbmodel.VDataPointValue:
			if val.Value != expectVal {
				return fmt.Errorf("unexpected stream data value, got: %v, want: %v", val.Value, expectVal)
			}
		default:
			return fmt.Errorf("unexpected stream data type, got: %v, want value: %v", val, expectVal)
		}
		count++
		return nil
	}); err != nil {
		return err
	}

	if count < 3 {
		return fmt.Errorf("expected at least 3 measurements for %s, got %d", streamId, count)
	}

	return nil
}, "runListStreamData")
