blob: 59fbe98128eb4ccdfe4c4544c784694a1d8ca2a4 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package client_test
import (
"fmt"
"os"
"testing"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/x/ref/services/syncbase/syncbaselib"
"v.io/x/ref/test/v23test"
"v.io/x/sensorlog/internal/client"
sltu "v.io/x/sensorlog/internal/client/testutil"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbutil"
)
func TestV23StreamConfigAndList(t *testing.T) {
// TODO(ivanpi): Sensorlog needs a bigger migration to work well with
// the new api. Skipping these tests for now.
t.Skip()
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
sh.StartRootMountTable()
mtName := v23.GetNamespace(sh.Ctx).Roots()[0]
clientSb := "sb/client"
clientCtx := sh.ForkContext("u:client")
clientSbCreds := sh.ForkCredentials("u:client:sb")
sh.StartSyncbase(clientSbCreds, syncbaselib.Opts{Name: clientSb}, `{"Read": {"In":["root:u:client"]}, "Write": {"In":["root:u:client"]}, "Admin": {"In":["root:u:client"]}, "Resolve": {"In":["..."]}}`)
measuredSb := "sb/measured"
measuredCreds := sh.ForkCredentials("u:measured")
measuredSbCreds := sh.ForkCredentials("u:measured:sb")
sh.StartSyncbase(measuredSbCreds, syncbaselib.Opts{Name: measuredSb}, `{"Read": {"In":["root:u:measured"]}, "Write": {"In":["root:u:measured"]}, "Admin": {"In":["root:u:measured"]}, "Resolve": {"In":["..."]}}`)
time.Sleep(1 * time.Second)
// Start measured.
measuredPath := v23test.BuildGoPkg(sh, "v.io/x/sensorlog/measured")
devId := "measured1"
publishSb := naming.Join(mtName, measuredSb)
measured := sh.Cmd(measuredPath, "-alsologtostderr", "-service="+measuredSb, "-devid="+devId, "-admin=root:u:client", "-publish-sb="+publishSb)
measured = measured.WithCredentials(measuredCreds)
measured.PropagateOutput = true
measured.Start()
time.Sleep(3 * time.Second)
// Add measuring device to client, joining its syncgroup.
if err := sltu.AddTestDevice(clientCtx, clientSb, devId, publishSb); err != nil {
t.Errorf("failed adding device %s: %v", devId, err)
}
// Create streams.
streamId1, result1, interval1 := "str1", 42.0, 2*time.Second
if err := sltu.CreateTestStream(clientCtx, clientSb, devId,
streamId1, fmt.Sprintf("echo %f;", result1), interval1); err != nil {
t.Errorf("failed creating stream %s: %v", streamId1, err)
}
time.Sleep(1 * time.Second)
streamId2, result2, interval2 := "str2", 47.0, 1*time.Second
if err := sltu.CreateTestStream(clientCtx, clientSb, devId,
streamId2, fmt.Sprintf("echo %f;", result2), interval2); err != nil {
t.Errorf("failed creating stream %s: %v", streamId2, err)
}
// streamId has another streamId as prefix to verify ListDataStreams prefix
// handling.
streamId3, result3, interval3 := "str12", 3.14159, 500*time.Millisecond
if err := sltu.CreateTestStream(clientCtx, clientSb, devId,
streamId3, fmt.Sprintf("echo %f;", result3), interval3); err != nil {
t.Errorf("failed creating stream %s: %v", streamId3, err)
}
// Allow time for sync and at least 3 measurements.
time.Sleep(10 * time.Second)
// SIGINT will gracefully stop measured.
measured.Terminate(os.Interrupt)
// Check that both streams have at least 3 measurements synced back to
// client device Syncbase.
if err := listStreamData(clientCtx, clientSb, devId, streamId1, result1); err != nil {
t.Errorf("failed listing stream %s data: %v", streamId1, err)
}
if err := listStreamData(clientCtx, clientSb, devId, streamId2, result2); err != nil {
t.Errorf("failed listing stream %s data: %v", streamId2, err)
}
if err := listStreamData(clientCtx, clientSb, devId, streamId3, result3); err != nil {
t.Errorf("failed listing stream %s data: %v", streamId3, err)
}
}
func listStreamData(ctx *context.T, sbService, devId, streamId string, expectVal float64) error {
db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterCollections)
if err != nil {
return fmt.Errorf("failed opening master database: %v", err)
}
count := 0
streamKey := &sbmodel.KStreamDef{
DevId: devId,
StreamId: streamId,
}
if err := client.ListStreamData(ctx, db, streamKey, func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
if got, want := key.StreamId, streamId; got != want {
return fmt.Errorf("list returned data for wrong stream: got %s, want %s", got, want)
}
switch val := val.(type) {
case sbmodel.VDataPointValue:
if val.Value != expectVal {
return fmt.Errorf("unexpected stream data value, got: %v, want: %v", val.Value, expectVal)
}
default:
return fmt.Errorf("unexpected stream data type, got: %v, want value: %v", val, expectVal)
}
count++
return nil
}); err != nil {
return err
}
if count < 3 {
return fmt.Errorf("expected at least 3 measurements for %s, got %d", streamId, count)
}
return nil
}