blob: 98f66b0fbe6ade2074e033f052541664131c71c5 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package client_test
import (
"bytes"
"fmt"
"strconv"
"syscall"
"time"
"v.io/v23"
"v.io/v23/naming"
"v.io/x/ref"
_ "v.io/x/ref/runtime/factories/generic"
sbtu "v.io/x/ref/services/syncbase/testutil"
"v.io/x/ref/test/modules"
"v.io/x/ref/test/v23tests"
"v.io/x/sensorlog/internal/client"
slltu "v.io/x/sensorlog/internal/client/testutil"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbutil"
)
//go:generate jiri test generate
func V23TestStreamConfigAndList(t *v23tests.T) {
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
mtName, _ := t.Shell().GetVar(ref.EnvNamespacePrefix)
clientSb := "sb/client"
clientCreds, _ := t.Shell().NewChildCredentials("client")
clientSbCreds, _ := t.Shell().NewChildCredentials("client:sb")
cleanup := sbtu.StartSyncbased(t, clientSbCreds, clientSb, "",
`{"Read": {"In":["root:client"]}, "Write": {"In":["root:client"]}, "Admin": {"In":["root:client"]}, "Resolve": {"In":["..."]}}`)
defer cleanup()
measuredSb := "sb/measured"
measuredCreds, _ := t.Shell().NewChildCredentials("measured")
measuredSbCreds, _ := t.Shell().NewChildCredentials("measured:sb")
cleanup = sbtu.StartSyncbased(t, measuredSbCreds, measuredSb, "",
`{"Read": {"In":["root:measured"]}, "Write": {"In":["root:measured"]}, "Admin": {"In":["root:measured"]}, "Resolve": {"In":["..."]}}`)
defer cleanup()
time.Sleep(1 * time.Second)
// Start measured.
measuredBin := t.BuildV23Pkg("v.io/x/sensorlog/measured")
devId := "measured1"
publishSb := naming.Join(mtName, measuredSb)
measuredOpts := measuredBin.StartOpts().WithCustomCredentials(measuredCreds).WithSessions(t, 180*time.Second)
measured := measuredBin.WithStartOpts(measuredOpts).Start(
"-alsologtostderr", "-service="+measuredSb, "-devid="+devId, "-admin=root:client",
"-publish-sb="+publishSb)
time.Sleep(3 * time.Second)
// Add measuring device to client, joining its syncgroup.
sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb)
// Create streams.
streamId1, result1, interval1 := "str1", "42", "2s"
sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
streamId1, fmt.Sprintf("echo %s;", result1), interval1, "")
time.Sleep(1 * time.Second)
streamId2, result2, interval2 := "str2", "47", "1s"
sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
streamId2, fmt.Sprintf("echo %s;", result2), interval2, "")
// streamId has another streamId as prefix to verify ListDataStreams prefix
// handling.
streamId3, result3, interval3 := "str12", "3.14159", "0.5s"
sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId,
streamId3, fmt.Sprintf("echo %s;", result3), interval3, "")
// Allow time for sync and at least 3 measurements.
time.Sleep(10 * time.Second)
// Kill will gracefully stop measured.
if err := syscall.Kill(measured.Pid(), syscall.SIGINT); err != nil {
t.Fatalf("failed to kill measured: %v", err)
}
var stdout, stderr bytes.Buffer
if err := measured.Shutdown(&stdout, &stderr); err != nil {
t.Errorf("failed to shutdown measured: %v")
}
t.Logf("measured stdout:\n%s", stdout.String())
t.Logf("measured stderr:\n%s", stderr.String())
// Check that both streams have at least 3 measurements synced back to
// client device Syncbase.
sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId1, result1)
sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId2, result2)
sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId3, result3)
}
var runListStreamData = modules.Register(func(env *modules.Env, args ...string) error {
sbService, devId, streamId, expectValStr := args[0], args[1], args[2], args[3]
expectVal, err := strconv.ParseFloat(expectValStr, 64)
if err != nil {
return err
}
ctx, cleanup := v23.Init()
defer cleanup()
db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
if err != nil {
return fmt.Errorf("failed opening measured database: %v", err)
}
count := 0
streamKey := &sbmodel.KStreamDef{
DevId: devId,
StreamId: streamId,
}
if err := client.ListStreamData(ctx, db, streamKey, func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error {
if got, want := key.StreamId, streamId; got != want {
return fmt.Errorf("list returned data for wrong stream: got %s, want %s", got, want)
}
switch val := val.(type) {
case sbmodel.VDataPointValue:
if val.Value != expectVal {
return fmt.Errorf("unexpected stream data value, got: %v, want: %v", val.Value, expectVal)
}
default:
return fmt.Errorf("unexpected stream data type, got: %v, want value: %v", val, expectVal)
}
count++
return nil
}); err != nil {
return err
}
if count < 3 {
return fmt.Errorf("expected at least 3 measurements for %s, got %d", streamId, count)
}
return nil
}, "runListStreamData")