blob: 79efecc00e5c751582412200f83ebc6cec70a6c1 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package client_test
import (
"bytes"
"fmt"
"reflect"
"sort"
"syscall"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/x/ref/lib/signals"
_ "v.io/x/ref/runtime/factories/generic"
sbtu "v.io/x/ref/services/syncbase/testutil"
"v.io/x/ref/test/modules"
"v.io/x/ref/test/v23tests"
slltu "v.io/x/sensorlog/internal/client/testutil"
"v.io/x/sensorlog/internal/measure"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbutil"
"v.io/x/sensorlog/internal/util"
)
//go:generate jiri test generate
const (
dummyScript = "echo 42;"
dummyInterval = "1s"
)
func V23TestDeviceAddAndStreamCreate(t *v23tests.T) {
// Start a 'global' mounttable.
globalMT, globalMTHandle := startAdditionalMT(t, "--v23.tcp.address=127.0.0.1:0")
// Mount the local mounttable in the global one.
localMT := naming.Join(globalMT, "localmt")
v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0", "--name="+localMT)
clientSb := "sb/client"
clientCreds, _ := t.Shell().NewChildCredentials("client")
clientSbCreds, _ := t.Shell().NewChildCredentials("client:sb")
cleanup := sbtu.StartSyncbased(t, clientSbCreds, clientSb, "",
`{"Read": {"In":["root:client"]}, "Write": {"In":["root:client"]}, "Admin": {"In":["root:client"]}, "Resolve": {"In":["..."]}}`)
defer cleanup()
measuredSb := "sb/measured"
measuredCreds, _ := t.Shell().NewChildCredentials("measured")
measuredSbCreds, _ := t.Shell().NewChildCredentials("measured:sb")
cleanup = sbtu.StartSyncbased(t, measuredSbCreds, measuredSb, "",
`{"Read": {"In":["root:measured"]}, "Write": {"In":["root:measured"]}, "Admin": {"In":["root:measured"]}, "Resolve": {"In":["..."]}}`)
defer cleanup()
time.Sleep(1 * time.Second)
// Initialize measuring device syncgroup using the global mounttable alias
// of measured Syncbase for publishing.
devId := "measured1"
publishSb := naming.Join(localMT, measuredSb)
sbtu.RunClient(t, measuredCreds, runInitSyncgroup, measuredSb, devId, "root:client", publishSb, globalMT)
// Add measuring device to client, joining its syncgroup.
sbtu.RunClient(t, clientCreds, slltu.RunAddDevice, clientSb, devId, publishSb)
// Allow time for syncgroup metadata sync.
time.Sleep(3 * time.Second)
// After syncgroup has been joined, sync can use local mounttable. Kill the
// global one.
if err := globalMTHandle.Kill(syscall.SIGINT); err != nil {
t.Fatalf("failed to kill global mounttable: %v", err)
}
if err := globalMTHandle.Shutdown(nil, nil); err != nil {
t.Fatalf("failed to shutdown global mounttable: %v", err)
}
streamIds := []string{"str1", "str2", "str3"}
// Create stream before starting measured watcher.
sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[0], dummyScript, dummyInterval)
time.Sleep(1 * time.Second)
measureWatcher, err := t.Shell().StartWithOpts(
t.Shell().DefaultStartOpts().WithCustomCredentials(measuredCreds).WithSessions(t, 60*time.Second), nil,
runWatchForStreams, append([]string{measuredSb, devId}, streamIds...)...)
if err != nil {
t.Fatalf("failed to start measureWatcher: %v", err)
}
// Create more streams.
sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[1], dummyScript, dummyInterval)
time.Sleep(1 * time.Second)
sbtu.RunClient(t, clientCreds, slltu.RunCreateStream, clientSb, devId, streamIds[2], dummyScript, dummyInterval)
// Allow time for sync.
time.Sleep(3 * time.Second)
// Kill will stop the watcher and cause it to verify that seen streams match
// expectations.
if err := syscall.Kill(measureWatcher.Pid(), syscall.SIGINT); err != nil {
t.Fatalf("failed to kill measureWatcher: %v", err)
}
var stdout, stderr bytes.Buffer
if err := measureWatcher.Shutdown(&stdout, &stderr); err != nil {
t.Errorf("failed to shutdown measureWatcher: %v")
}
t.Logf("measureWatcher stdout:\n%s", stdout.String())
t.Logf("measureWatcher stderr:\n%s", stderr.String())
}
var runInitSyncgroup = modules.Register(func(env *modules.Env, args ...string) error {
sbService, devId, admin, publishSb, sgMT := args[0], args[1], args[2], args[3], args[4]
ctx, cleanup := v23.Init()
defer cleanup()
db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
if err != nil {
return fmt.Errorf("failed initializing measured database: %v", err)
}
sgMTs := append(v23.GetNamespace(ctx).Roots(), sgMT)
if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
return fmt.Errorf("failed initializing measured syncgroup: %v", err)
}
if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
return fmt.Errorf("initializing measured syncgroup should be idempotent, repeat failed: %v", err)
}
return nil
}, "runInitSyncgroup")
var runWatchForStreams = modules.Register(func(env *modules.Env, args ...string) error {
sbService, devId, expectStreams := args[0], args[1], args[2:]
ctx, cleanup := v23.Init()
defer cleanup()
db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
if err != nil {
return fmt.Errorf("failed opening measured database: %v", err)
}
gotStreams := make([]string, 0, len(expectStreams))
ctx, stop := context.WithCancel(ctx)
wait := util.AsyncRun(func() error {
return measure.WatchForStreams(ctx, db, devId, func(key *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
if got, want := key.DevId, devId; got != want {
return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
}
gotStreams = append(gotStreams, key.StreamId)
return nil
})
}, func(err error) {
stop()
})
// Wait for kill, then stop watcher.
<-signals.ShutdownOnSignals(nil)
stop()
if err := wait(); err != nil {
return err
}
// Compare streams seen by watcher to expectations.
sort.Strings(gotStreams)
sort.Strings(expectStreams)
if got, want := gotStreams, expectStreams; !reflect.DeepEqual(got, want) {
return fmt.Errorf("watch returned streams do not match: got %v, want %v", got, want)
}
return nil
}, "runWatchForStreams")
func startAdditionalMT(t *v23tests.T, args ...string) (string, *v23tests.Invocation) {
bin := t.BuildV23Pkg("v.io/x/ref/services/mounttable/mounttabled")
inv := bin.Start(args...)
return inv.ExpectVar("NAME"), inv
}