| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package client_test |
| |
| import ( |
| "fmt" |
| "os" |
| "reflect" |
| "sort" |
| "testing" |
| "time" |
| |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/naming" |
| "v.io/x/ref/services/mounttable/mounttablelib" |
| "v.io/x/ref/services/syncbase/syncbaselib" |
| "v.io/x/ref/test/v23test" |
| sltu "v.io/x/sensorlog/internal/client/testutil" |
| "v.io/x/sensorlog/internal/measure" |
| "v.io/x/sensorlog/internal/sbmodel" |
| "v.io/x/sensorlog/internal/sbutil" |
| "v.io/x/sensorlog/internal/util" |
| ) |
| |
| const ( |
| dummyScript = "echo 42;" |
| dummyInterval = 1 * time.Second |
| ) |
| |
| func TestV23DeviceAddAndStreamCreate(t *testing.T) { |
| v23test.SkipUnlessRunningIntegrationTests(t) |
| sh := v23test.NewShell(t, nil) |
| defer sh.Cleanup() |
| // Start a 'global' mount table. |
| globalMT, globalMTShutdown := startAdditionalMT(sh, "--v23.tcp.address=127.0.0.1:0") |
| // Mount the local mount table in the global one. |
| localMT := naming.Join(globalMT, "localmt") |
| sh.StartRootMountTableWithOpts(mounttablelib.Opts{MountName: localMT}) |
| |
| clientSb := "sb/client" |
| clientCtx := sh.ForkContext("u:client") |
| clientSbCreds := sh.ForkCredentials("u:client:sb") |
| sh.StartSyncbase(clientSbCreds, syncbaselib.Opts{Name: clientSb}, `{"Read": {"In":["root:u:client"]}, "Write": {"In":["root:u:client"]}, "Admin": {"In":["root:u:client"]}, "Resolve": {"In":["..."]}}`) |
| |
| measuredSb := "sb/measured" |
| measuredCtx := sh.ForkContext("u:measured") |
| measuredSbCreds := sh.ForkCredentials("u:measured:sb") |
| sh.StartSyncbase(measuredSbCreds, syncbaselib.Opts{Name: measuredSb}, `{"Read": {"In":["root:u:measured"]}, "Write": {"In":["root:u:measured"]}, "Admin": {"In":["root:u:measured"]}, "Resolve": {"In":["..."]}}`) |
| |
| time.Sleep(1 * time.Second) |
| |
| // Initialize measuring device syncgroup using the global mount table alias |
| // of measured Syncbase for publishing. |
| devId := "measured1" |
| publishSb := naming.Join(localMT, measuredSb) |
| if err := initSyncgroup(measuredCtx, measuredSb, devId, "root:u:client", publishSb, globalMT); err != nil { |
| t.Fatalf("failed initializing syncgroup for device %s: %v", devId, err) |
| } |
| |
| // Add measuring device to client, joining its syncgroup. |
| if err := sltu.AddTestDevice(clientCtx, clientSb, devId, publishSb); err != nil { |
| t.Errorf("failed adding device %s: %v", devId, err) |
| } |
| |
| // Allow time for syncgroup metadata sync. |
| time.Sleep(3 * time.Second) |
| |
| // After syncgroup has been joined, sync can use local mount table. Kill the |
| // global one. |
| globalMTShutdown(os.Interrupt) |
| |
| streamIds := []string{"str1", "str2", "str3"} |
| |
| // Create stream before starting measured watcher. |
| if err := sltu.CreateTestStream(clientCtx, clientSb, devId, streamIds[0], dummyScript, dummyInterval); err != nil { |
| t.Errorf("failed creating stream %s: %v", streamIds[0], err) |
| } |
| |
| time.Sleep(1 * time.Second) |
| |
| stopWatch := watchForStreams(measuredCtx, measuredSb, devId) |
| |
| // Create more streams. |
| if err := sltu.CreateTestStream(clientCtx, clientSb, devId, streamIds[1], dummyScript, dummyInterval); err != nil { |
| t.Errorf("failed creating stream %s: %v", streamIds[1], err) |
| } |
| |
| time.Sleep(1 * time.Second) |
| |
| if err := sltu.CreateTestStream(clientCtx, clientSb, devId, streamIds[2], dummyScript, dummyInterval); err != nil { |
| t.Errorf("failed creating stream %s: %v", streamIds[2], err) |
| } |
| |
| // Allow time for sync. |
| time.Sleep(3 * time.Second) |
| |
| // Stop the watcher. |
| gotStreams, err := stopWatch() |
| if err != nil { |
| t.Fatalf("measureWatcher failed: %v", err) |
| } |
| |
| // Compare streams seen by watcher to expectations. |
| sort.Strings(gotStreams) |
| sort.Strings(streamIds) |
| if got, want := gotStreams, streamIds; !reflect.DeepEqual(got, want) { |
| t.Errorf("watch returned streams do not match: got %v, want %v", got, want) |
| } |
| } |
| |
| func initSyncgroup(ctx *context.T, sbService, devId, admin, publishSb, sgMT string) error { |
| db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredCollections) |
| if err != nil { |
| return fmt.Errorf("failed initializing measured database: %v", err) |
| } |
| |
| sgMTs := append(v23.GetNamespace(ctx).Roots(), sgMT) |
| if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil { |
| return fmt.Errorf("failed initializing measured syncgroup: %v", err) |
| } |
| if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil { |
| return fmt.Errorf("initializing measured syncgroup should be idempotent, repeat failed: %v", err) |
| } |
| |
| return nil |
| } |
| |
| func watchForStreams(ctx *context.T, sbService, devId string) (stop func() (gotStreams []string, err error)) { |
| db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredCollections) |
| if err != nil { |
| return func() ([]string, error) { |
| return nil, fmt.Errorf("failed opening measured database: %v", err) |
| } |
| } |
| |
| var gotStreams []string |
| |
| ctx, cancel := context.WithCancel(ctx) |
| wait := util.AsyncRun(func() error { |
| return measure.WatchForStreams(ctx, db, devId, func(key *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error { |
| if got, want := key.DevId, devId; got != want { |
| return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want) |
| } |
| gotStreams = append(gotStreams, key.StreamId) |
| return nil |
| }) |
| }, func(err error) { |
| cancel() |
| }) |
| |
| return func() ([]string, error) { |
| cancel() |
| if err := wait(); err != nil { |
| return nil, err |
| } |
| return gotStreams, nil |
| } |
| } |
| |
| func startAdditionalMT(sh *v23test.Shell, args ...string) (string, func(sig os.Signal)) { |
| mounttabledPath := v23test.BuildGoPkg(sh, "v.io/x/ref/services/mounttable/mounttabled") |
| inv := sh.Cmd(mounttabledPath, args...) |
| inv.Start() |
| return inv.S.ExpectVar("NAME"), inv.Terminate |
| } |