| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package client_test |
| |
| import ( |
| "bytes" |
| "fmt" |
| "strconv" |
| "syscall" |
| "time" |
| |
| "v.io/v23" |
| "v.io/v23/naming" |
| "v.io/x/ref" |
| _ "v.io/x/ref/runtime/factories/generic" |
| sbtu "v.io/x/ref/services/syncbase/testutil" |
| "v.io/x/ref/test/modules" |
| "v.io/x/ref/test/v23tests" |
| "v.io/x/sensorlog_lite/internal/client" |
| slltu "v.io/x/sensorlog_lite/internal/client/testutil" |
| "v.io/x/sensorlog_lite/internal/sbmodel" |
| "v.io/x/sensorlog_lite/internal/sbutil" |
| ) |
| |
| //go:generate jiri test generate |
| |
| func V23TestStreamConfigAndList(t *v23tests.T) { |
| v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0") |
| mtName, _ := t.Shell().GetVar(ref.EnvNamespacePrefix) |
| |
| clientSb := "sb/client" |
| clientCreds, _ := t.Shell().NewChildCredentials("client") |
| clientSbCreds, _ := t.Shell().NewChildCredentials("client/sb") |
| cleanup := sbtu.StartSyncbased(t, clientSbCreds, clientSb, "", |
| `{"Read": {"In":["root/client"]}, "Write": {"In":["root/client"]}, "Admin": {"In":["root/client"]}, "Resolve": {"In":["..."]}}`) |
| defer cleanup() |
| |
| measuredSb := "sb/measured" |
| measuredCreds, _ := t.Shell().NewChildCredentials("measured") |
| measuredSbCreds, _ := t.Shell().NewChildCredentials("measured/sb") |
| cleanup = sbtu.StartSyncbased(t, measuredSbCreds, measuredSb, "", |
| `{"Read": {"In":["root/measured"]}, "Write": {"In":["root/measured"]}, "Admin": {"In":["root/measured"]}, "Resolve": {"In":["..."]}}`) |
| defer cleanup() |
| |
| time.Sleep(1 * time.Second) |
| |
| // Start measured. |
| measuredBin := t.BuildV23Pkg("v.io/x/sensorlog_lite/measured") |
| devId := "measured1" |
| publishSb := naming.Join(mtName, measuredSb) |
| measuredOpts := measuredBin.StartOpts().WithCustomCredentials(measuredCreds).WithSessions(t, 180*time.Second) |
| measured := measuredBin.WithStartOpts(measuredOpts).Start( |
| "-alsologtostderr", "-service="+measuredSb, "-devid="+devId, "-admin=root/client", |
| "-publish-sb="+publishSb) |
| |
| time.Sleep(3 * time.Second) |
| |
| // Add measuring device to client, joining its syncgroup. |
| sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb) |
| |
| // Create streams. |
| streamId1, result1, interval1 := "str1", "42", "2s" |
| sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, |
| streamId1, fmt.Sprintf("echo %s;", result1), interval1, "") |
| |
| time.Sleep(1 * time.Second) |
| |
| streamId2, result2, interval2 := "str2", "47", "1s" |
| sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, |
| streamId2, fmt.Sprintf("echo %s;", result2), interval2, "") |
| |
| // streamId has another streamId as prefix to verify ListDataStreams prefix |
| // handling. |
| streamId3, result3, interval3 := "str12", "3.14159", "0.5s" |
| sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, |
| streamId3, fmt.Sprintf("echo %s;", result3), interval3, "") |
| |
| // Allow time for sync and at least 3 measurements. |
| time.Sleep(10 * time.Second) |
| |
| // Kill will gracefully stop measured. |
| if err := syscall.Kill(measured.Pid(), syscall.SIGINT); err != nil { |
| t.Fatalf("failed to kill measured: %v", err) |
| } |
| var stdout, stderr bytes.Buffer |
| if err := measured.Shutdown(&stdout, &stderr); err != nil { |
| t.Errorf("failed to shutdown measured: %v") |
| } |
| t.Logf("measured stdout:\n%s", stdout.String()) |
| t.Logf("measured stderr:\n%s", stderr.String()) |
| |
| // Check that both streams have at least 3 measurements synced back to |
| // client device Syncbase. |
| sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId1, result1) |
| sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId2, result2) |
| sbtu.RunClient(t, clientCreds, runListStreamData, clientSb, devId, streamId3, result3) |
| } |
| |
| var runListStreamData = modules.Register(func(env *modules.Env, args ...string) error { |
| sbService, devId, streamId, expectValStr := args[0], args[1], args[2], args[3] |
| |
| expectVal, err := strconv.ParseFloat(expectValStr, 64) |
| if err != nil { |
| return err |
| } |
| |
| ctx, cleanup := v23.Init() |
| defer cleanup() |
| |
| db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables) |
| if err != nil { |
| return fmt.Errorf("failed opening measured database: %v", err) |
| } |
| |
| count := 0 |
| streamKey := &sbmodel.KStreamDef{ |
| DevId: devId, |
| StreamId: streamId, |
| } |
| if err := client.ListStreamData(ctx, db, streamKey, func(key *sbmodel.KDataPoint, val sbmodel.VDataPoint) error { |
| if got, want := key.StreamId, streamId; got != want { |
| return fmt.Errorf("list returned data for wrong stream: got %s, want %s", got, want) |
| } |
| switch val := val.(type) { |
| case sbmodel.VDataPointValue: |
| if val.Value != expectVal { |
| return fmt.Errorf("unexpected stream data value, got: %v, want: %v", val.Value, expectVal) |
| } |
| default: |
| return fmt.Errorf("unexpected stream data type, got: %v, want value: %v", val, expectVal) |
| } |
| count++ |
| return nil |
| }); err != nil { |
| return err |
| } |
| |
| if count < 3 { |
| return fmt.Errorf("expected at least 3 measurements for %s, got %d", streamId, count) |
| } |
| |
| return nil |
| }, "runListStreamData") |