| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package client_test |
| |
| import ( |
| "bytes" |
| "fmt" |
| "reflect" |
| "sort" |
| "syscall" |
| "time" |
| |
| "v.io/v23" |
| "v.io/v23/context" |
| "v.io/v23/naming" |
| "v.io/x/ref/lib/signals" |
| _ "v.io/x/ref/runtime/factories/generic" |
| sbtu "v.io/x/ref/services/syncbase/testutil" |
| "v.io/x/ref/test/modules" |
| "v.io/x/ref/test/v23tests" |
| slltu "v.io/x/sensorlog/internal/client/testutil" |
| "v.io/x/sensorlog/internal/measure" |
| "v.io/x/sensorlog/internal/sbmodel" |
| "v.io/x/sensorlog/internal/sbutil" |
| "v.io/x/sensorlog/internal/util" |
| ) |
| |
| //go:generate jiri test generate |
| |
| const ( |
| dummyScript = "echo 42;" |
| dummyInterval = "1s" |
| ) |
| |
| func V23TestDeviceAddAndStreamCreate(t *v23tests.T) { |
| // Start a 'global' mounttable. |
| globalMT, globalMTHandle := startAdditionalMT(t, "--v23.tcp.address=127.0.0.1:0") |
| // Mount the local mounttable in the global one. |
| localMT := naming.Join(globalMT, "localmt") |
| v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0", "--name="+localMT) |
| |
| clientSb := "sb/client" |
| clientCreds, _ := t.Shell().NewChildCredentials("client") |
| clientSbCreds, _ := t.Shell().NewChildCredentials("client:sb") |
| cleanup := sbtu.StartSyncbased(t, clientSbCreds, clientSb, "", |
| `{"Read": {"In":["root:client"]}, "Write": {"In":["root:client"]}, "Admin": {"In":["root:client"]}, "Resolve": {"In":["..."]}}`) |
| defer cleanup() |
| |
| measuredSb := "sb/measured" |
| measuredCreds, _ := t.Shell().NewChildCredentials("measured") |
| measuredSbCreds, _ := t.Shell().NewChildCredentials("measured:sb") |
| cleanup = sbtu.StartSyncbased(t, measuredSbCreds, measuredSb, "", |
| `{"Read": {"In":["root:measured"]}, "Write": {"In":["root:measured"]}, "Admin": {"In":["root:measured"]}, "Resolve": {"In":["..."]}}`) |
| defer cleanup() |
| |
| time.Sleep(1 * time.Second) |
| |
| // Initialize measuring device syncgroup using the global mounttable alias |
| // of measured Syncbase for publishing. |
| devId := "measured1" |
| publishSb := naming.Join(localMT, measuredSb) |
| sbtu.RunClient(t, measuredCreds, runInitSyncgroup, measuredSb, devId, "root:client", publishSb, globalMT) |
| |
| // Add measuring device to client, joining its syncgroup. |
| sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb) |
| |
| // Allow time for syncgroup metadata sync. |
| time.Sleep(3 * time.Second) |
| |
| // After syncgroup has been joined, sync can use local mounttable. Kill the |
| // global one. |
| if err := globalMTHandle.Kill(syscall.SIGINT); err != nil { |
| t.Fatalf("failed to kill global mounttable: %v", err) |
| } |
| if err := globalMTHandle.Shutdown(nil, nil); err != nil { |
| t.Fatalf("failed to shutdown global mounttable: %v", err) |
| } |
| |
| streamIds := []string{"str1", "str2", "str3"} |
| |
| // Create stream before starting measured watcher. |
| sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[0], dummyScript, dummyInterval) |
| |
| time.Sleep(1 * time.Second) |
| |
| measureWatcher, err := t.Shell().StartWithOpts( |
| t.Shell().DefaultStartOpts().WithCustomCredentials(measuredCreds).WithSessions(t, 60*time.Second), nil, |
| runWatchForStreams, append([]string{measuredSb, devId}, streamIds...)...) |
| if err != nil { |
| t.Fatalf("failed to start measureWatcher: %v", err) |
| } |
| |
| // Create more streams. |
| sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[1], dummyScript, dummyInterval) |
| |
| time.Sleep(1 * time.Second) |
| |
| sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[2], dummyScript, dummyInterval) |
| |
| // Allow time for sync. |
| time.Sleep(3 * time.Second) |
| |
| // Kill will stop the watcher and cause it to verify that seen streams match |
| // expectations. |
| if err := syscall.Kill(measureWatcher.Pid(), syscall.SIGINT); err != nil { |
| t.Fatalf("failed to kill measureWatcher: %v", err) |
| } |
| var stdout, stderr bytes.Buffer |
| if err := measureWatcher.Shutdown(&stdout, &stderr); err != nil { |
| t.Errorf("failed to shutdown measureWatcher: %v") |
| } |
| t.Logf("measureWatcher stdout:\n%s", stdout.String()) |
| t.Logf("measureWatcher stderr:\n%s", stderr.String()) |
| } |
| |
| var runInitSyncgroup = modules.Register(func(env *modules.Env, args ...string) error { |
| sbService, devId, admin, publishSb, sgMT := args[0], args[1], args[2], args[3], args[4] |
| |
| ctx, cleanup := v23.Init() |
| defer cleanup() |
| db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables) |
| if err != nil { |
| return fmt.Errorf("failed initializing measured database: %v", err) |
| } |
| |
| sgMTs := append(v23.GetNamespace(ctx).Roots(), sgMT) |
| if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil { |
| return fmt.Errorf("failed initializing measured syncgroup: %v", err) |
| } |
| if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil { |
| return fmt.Errorf("initializing measured syncgroup should be idempotent, repeat failed: %v", err) |
| } |
| |
| return nil |
| }, "runInitSyncgroup") |
| |
| var runWatchForStreams = modules.Register(func(env *modules.Env, args ...string) error { |
| sbService, devId, expectStreams := args[0], args[1], args[2:] |
| |
| ctx, cleanup := v23.Init() |
| defer cleanup() |
| |
| db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables) |
| if err != nil { |
| return fmt.Errorf("failed opening measured database: %v", err) |
| } |
| |
| gotStreams := make([]string, 0, len(expectStreams)) |
| |
| ctx, stop := context.WithCancel(ctx) |
| wait := util.AsyncRun(func() error { |
| return measure.WatchForStreams(ctx, db, devId, func(key *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error { |
| if got, want := key.DevId, devId; got != want { |
| return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want) |
| } |
| gotStreams = append(gotStreams, key.StreamId) |
| return nil |
| }) |
| }, func(err error) { |
| stop() |
| }) |
| |
| // Wait for kill, then stop watcher. |
| <-signals.ShutdownOnSignals(nil) |
| stop() |
| |
| if err := wait(); err != nil { |
| return err |
| } |
| |
| // Compare streams seen by watcher to expectations. |
| sort.Strings(gotStreams) |
| sort.Strings(expectStreams) |
| if got, want := gotStreams, expectStreams; !reflect.DeepEqual(got, want) { |
| return fmt.Errorf("watch returned streams do not match: got %v, want %v", got, want) |
| } |
| |
| return nil |
| }, "runWatchForStreams") |
| |
| func startAdditionalMT(t *v23tests.T, args ...string) (string, *v23tests.Invocation) { |
| bin := t.BuildV23Pkg("v.io/x/ref/services/mounttable/mounttabled") |
| inv := bin.Start(args...) |
| return inv.ExpectVar("NAME"), inv |
| } |