blob: dfb98fa31fe330dcdaca0f106cfe1c4751feacc9 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package client_test
import (
"fmt"
"os"
"reflect"
"sort"
"testing"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/x/ref/lib/v23test"
sltu "v.io/x/sensorlog/internal/client/testutil"
"v.io/x/sensorlog/internal/measure"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbutil"
"v.io/x/sensorlog/internal/util"
)
const (
dummyScript = "echo 42;"
dummyInterval = 1 * time.Second
)
func TestV23DeviceAddAndStreamCreate(t *testing.T) {
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, v23test.Opts{})
defer sh.Cleanup()
// Start a 'global' mounttable.
globalMT, globalMTShutdown := startAdditionalMT(sh, "--v23.tcp.address=127.0.0.1:0")
// Mount the local mounttable in the global one.
localMT := naming.Join(globalMT, "localmt")
sh.StartRootMountTable("--name=" + localMT)
clientSb := "sb/client"
clientCtx := sh.ForkContext("u:client")
clientSbCreds := sh.ForkCredentials("u:client:sb")
sh.StartSyncbase(clientSbCreds, clientSb, "",
`{"Read": {"In":["root:u:client"]}, "Write": {"In":["root:u:client"]}, "Admin": {"In":["root:u:client"]}, "Resolve": {"In":["..."]}}`)
measuredSb := "sb/measured"
measuredCtx := sh.ForkContext("u:measured")
measuredSbCreds := sh.ForkCredentials("u:measured:sb")
sh.StartSyncbase(measuredSbCreds, measuredSb, "",
`{"Read": {"In":["root:u:measured"]}, "Write": {"In":["root:u:measured"]}, "Admin": {"In":["root:u:measured"]}, "Resolve": {"In":["..."]}}`)
time.Sleep(1 * time.Second)
// Initialize measuring device syncgroup using the global mounttable alias
// of measured Syncbase for publishing.
devId := "measured1"
publishSb := naming.Join(localMT, measuredSb)
if err := initSyncgroup(measuredCtx, measuredSb, devId, "root:u:client", publishSb, globalMT); err != nil {
t.Fatalf("failed initializing syncgroup for device %s: %v", devId, err)
}
// Add measuring device to client, joining its syncgroup.
if err := sltu.AddTestDevice(clientCtx, clientSb, devId, publishSb); err != nil {
t.Errorf("failed adding device %s: %v", devId, err)
}
// Allow time for syncgroup metadata sync.
time.Sleep(3 * time.Second)
// After syncgroup has been joined, sync can use local mounttable. Kill the
// global one.
globalMTShutdown(os.Interrupt)
streamIds := []string{"str1", "str2", "str3"}
// Create stream before starting measured watcher.
if err := sltu.CreateTestStream(clientCtx, clientSb, devId, streamIds[0], dummyScript, dummyInterval); err != nil {
t.Errorf("failed creating stream %s: %v", streamIds[0], err)
}
time.Sleep(1 * time.Second)
stopWatch := watchForStreams(measuredCtx, measuredSb, devId)
// Create more streams.
if err := sltu.CreateTestStream(clientCtx, clientSb, devId, streamIds[1], dummyScript, dummyInterval); err != nil {
t.Errorf("failed creating stream %s: %v", streamIds[1], err)
}
time.Sleep(1 * time.Second)
if err := sltu.CreateTestStream(clientCtx, clientSb, devId, streamIds[2], dummyScript, dummyInterval); err != nil {
t.Errorf("failed creating stream %s: %v", streamIds[2], err)
}
// Allow time for sync.
time.Sleep(3 * time.Second)
// Stop the watcher.
gotStreams, err := stopWatch()
if err != nil {
t.Fatalf("measureWatcher failed: %v", err)
}
// Compare streams seen by watcher to expectations.
sort.Strings(gotStreams)
sort.Strings(streamIds)
if got, want := gotStreams, streamIds; !reflect.DeepEqual(got, want) {
t.Errorf("watch returned streams do not match: got %v, want %v", got, want)
}
}
func initSyncgroup(ctx *context.T, sbService, devId, admin, publishSb, sgMT string) error {
db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
if err != nil {
return fmt.Errorf("failed initializing measured database: %v", err)
}
sgMTs := append(v23.GetNamespace(ctx).Roots(), sgMT)
if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
return fmt.Errorf("failed initializing measured syncgroup: %v", err)
}
if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
return fmt.Errorf("initializing measured syncgroup should be idempotent, repeat failed: %v", err)
}
return nil
}
func watchForStreams(ctx *context.T, sbService, devId string) (stop func() (gotStreams []string, err error)) {
db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredTables)
if err != nil {
return func() ([]string, error) {
return nil, fmt.Errorf("failed opening measured database: %v", err)
}
}
var gotStreams []string
ctx, cancel := context.WithCancel(ctx)
wait := util.AsyncRun(func() error {
return measure.WatchForStreams(ctx, db, devId, func(key *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
if got, want := key.DevId, devId; got != want {
return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
}
gotStreams = append(gotStreams, key.StreamId)
return nil
})
}, func(err error) {
cancel()
})
return func() ([]string, error) {
cancel()
if err := wait(); err != nil {
return nil, err
}
return gotStreams, nil
}
}
func startAdditionalMT(sh *v23test.Shell, args ...string) (string, func(sig os.Signal)) {
mounttabledPath := sh.BuildGoPkg("v.io/x/ref/services/mounttable/mounttabled")
inv := sh.Cmd(mounttabledPath, args...)
inv.Start()
return inv.S.ExpectVar("NAME"), inv.Shutdown
}