| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package client_test |
| |
| import ( |
| "fmt" |
| "os" |
| "testing" |
| "time" |
| |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/naming" |
| "v.io/x/ref/services/syncbase/syncbaselib" |
| "v.io/x/ref/test/v23test" |
| "v.io/x/sensorlog/internal/client" |
| sltu "v.io/x/sensorlog/internal/client/testutil" |
| "v.io/x/sensorlog/internal/sbmodel" |
| "v.io/x/sensorlog/internal/sbutil" |
| ) |
| |
| func TestV23StreamConfigAndList(t *testing.T) { |
| v23test.SkipUnlessRunningIntegrationTests(t) |
| sh := v23test.NewShell(t, nil) |
| defer sh.Cleanup() |
| sh.StartRootMountTable() |
| mtName := v23.GetNamespace(sh.Ctx).Roots()[0] |
| |
| clientSb := "sb/client" |
| clientCtx := sh.ForkContext("u:client") |
| clientSbCreds := sh.ForkCredentials("u:client:sb") |
| sh.StartSyncbase(clientSbCreds, syncbaselib.Opts{Name: clientSb}, `{"Read": {"In":["root:u:client"]}, "Write": {"In":["root:u:client"]}, "Admin": {"In":["root:u:client"]}, "Resolve": {"In":["..."]}}`) |
| |
| measuredSb := "sb/measured" |
| measuredCreds := sh.ForkCredentials("u:measured") |
| measuredSbCreds := sh.ForkCredentials("u:measured:sb") |
| sh.StartSyncbase(measuredSbCreds, syncbaselib.Opts{Name: measuredSb}, `{"Read": {"In":["root:u:measured"]}, "Write": {"In":["root:u:measured"]}, "Admin": {"In":["root:u:measured"]}, "Resolve": {"In":["..."]}}`) |
| |
| time.Sleep(1 * time.Second) |
| |
| // Start measured. |
| measuredPath := v23test.BuildGoPkg(sh, "v.io/x/sensorlog/measured") |
| devId := "measured1" |
| publishSb := naming.Join(mtName, measuredSb) |
| measured := sh.Cmd(measuredPath, "-alsologtostderr", "-service="+measuredSb, "-devid="+devId, "-admin=root:u:client", "-publish-sb="+publishSb) |
| measured = measured.WithCredentials(measuredCreds) |
| measured.PropagateOutput = true |
| measured.Start() |
| |
| time.Sleep(3 * time.Second) |
| |
| // Add measuring device to client, joining its syncgroup. |
| if err := sltu.AddTestDevice(clientCtx, clientSb, devId, publishSb); err != nil { |
| t.Errorf("failed adding device %s: %v", devId, err) |
| } |
| |
| // Create streams. |
| streamId1, result1, interval1 := "str1", 42.0, 2*time.Second |
| if err := sltu.CreateTestStream(clientCtx, clientSb, devId, |
| streamId1, fmt.Sprintf("echo %f;", result1), interval1); err != nil { |
| t.Errorf("failed creating stream %s: %v", streamId1, err) |
| } |
| |
| time.Sleep(1 * time.Second) |
| |
| streamId2, result2, interval2 := "str2", 47.0, 1*time.Second |
| if err := sltu.CreateTestStream(clientCtx, clientSb, devId, |
| streamId2, fmt.Sprintf("echo %f;", result2), interval2); err != nil { |
| t.Errorf("failed creating stream %s: %v", streamId2, err) |
| } |
| |
| // streamId has another streamId as prefix to verify ListDataStreams prefix |
| // handling. |
| streamId3, result3, interval3 := "str12", 3.14159, 500*time.Millisecond |
| if err := sltu.CreateTestStream(clientCtx, clientSb, devId, |
| streamId3, fmt.Sprintf("echo %f;", result3), interval3); err != nil { |
| t.Errorf("failed creating stream %s: %v", streamId3, err) |
| } |
| |
| // Allow time for sync and at least 3 measurements. |
| time.Sleep(10 * time.Second) |
| |
| // SIGINT will gracefully stop measured. |
| measured.Terminate(os.Interrupt) |
| |
| // Check that both streams have at least 3 measurements synced back to |
| // client device Syncbase. |
| if err := listStreamData(clientCtx, clientSb, devId, streamId1, result1); err != nil { |
| t.Errorf("failed listing stream %s data: %v", streamId1, err) |
| } |
| if err := listStreamData(clientCtx, clientSb, devId, streamId2, result2); err != nil { |
| t.Errorf("failed listing stream %s data: %v", streamId2, err) |
| } |
| if err := listStreamData(clientCtx, clientSb, devId, streamId3, result3); err != nil { |
| t.Errorf("failed listing stream %s data: %v", streamId3, err) |
| } |
| } |
| |
| func listStreamData(ctx *context.T, sbService, devId, streamId string, expectVal float64) error { |
| db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MasterCollections) |
| if err != nil { |
| return fmt.Errorf("failed opening master database: %v", err) |
| } |
| |
| count := 0 |
| streamKey := &sbmodel.KStreamDef{ |
| DevId: devId, |
| StreamId: streamId, |
| } |
| if err := client.ListStreamData(ctx, db, streamKey, func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error { |
| if got, want := key.StreamId, streamId; got != want { |
| return fmt.Errorf("list returned data for wrong stream: got %s, want %s", got, want) |
| } |
| switch val := val.(type) { |
| case sbmodel.VDataPointValue: |
| if val.Value != expectVal { |
| return fmt.Errorf("unexpected stream data value, got: %v, want: %v", val.Value, expectVal) |
| } |
| default: |
| return fmt.Errorf("unexpected stream data type, got: %v, want value: %v", val, expectVal) |
| } |
| count++ |
| return nil |
| }); err != nil { |
| return err |
| } |
| |
| if count < 3 { |
| return fmt.Errorf("expected at least 3 measurements for %s, got %d", streamId, count) |
| } |
| |
| return nil |
| } |