blob: 23dbc42c10845f6b68061e0da6833191d61d9ad3 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package client_test
import (
"fmt"
"os"
"reflect"
"sort"
"testing"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/x/ref/services/mounttable/mounttablelib"
"v.io/x/ref/services/syncbase/syncbaselib"
"v.io/x/ref/test/v23test"
sltu "v.io/x/sensorlog/internal/client/testutil"
"v.io/x/sensorlog/internal/measure"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbutil"
"v.io/x/sensorlog/internal/util"
)
const (
dummyScript = "echo 42;"
dummyInterval = 1 * time.Second
)
func TestV23DeviceAddAndStreamCreate(t *testing.T) {
// TODO(ivanpi): Sensorlog needs a bigger migration to work well with
// the new api. Skipping these tests for now.
t.Skip()
v23test.SkipUnlessRunningIntegrationTests(t)
sh := v23test.NewShell(t, nil)
defer sh.Cleanup()
// Start a 'global' mount table.
globalMT, globalMTShutdown := startAdditionalMT(sh, "--v23.tcp.address=127.0.0.1:0")
// Mount the local mount table in the global one.
localMT := naming.Join(globalMT, "localmt")
sh.StartRootMountTableWithOpts(mounttablelib.Opts{MountName: localMT})
clientSb := "sb/client"
clientCtx := sh.ForkContext("u:client")
clientSbCreds := sh.ForkCredentials("u:client:sb")
sh.StartSyncbase(clientSbCreds, syncbaselib.Opts{Name: clientSb}, `{"Read": {"In":["root:u:client"]}, "Write": {"In":["root:u:client"]}, "Admin": {"In":["root:u:client"]}, "Resolve": {"In":["..."]}}`)
measuredSb := "sb/measured"
measuredCtx := sh.ForkContext("u:measured")
measuredSbCreds := sh.ForkCredentials("u:measured:sb")
sh.StartSyncbase(measuredSbCreds, syncbaselib.Opts{Name: measuredSb}, `{"Read": {"In":["root:u:measured"]}, "Write": {"In":["root:u:measured"]}, "Admin": {"In":["root:u:measured"]}, "Resolve": {"In":["..."]}}`)
time.Sleep(1 * time.Second)
// Initialize measuring device syncgroup using the global mount table alias
// of measured Syncbase for publishing.
devId := "measured1"
publishSb := naming.Join(localMT, measuredSb)
if err := initSyncgroup(measuredCtx, measuredSb, devId, "root:u:client", publishSb, globalMT); err != nil {
t.Fatalf("failed initializing syncgroup for device %s: %v", devId, err)
}
// Add measuring device to client, joining its syncgroup.
if err := sltu.AddTestDevice(clientCtx, clientSb, devId, publishSb); err != nil {
t.Errorf("failed adding device %s: %v", devId, err)
}
// Allow time for syncgroup metadata sync.
time.Sleep(3 * time.Second)
// After syncgroup has been joined, sync can use local mount table. Kill the
// global one.
globalMTShutdown(os.Interrupt)
streamIds := []string{"str1", "str2", "str3"}
// Create stream before starting measured watcher.
if err := sltu.CreateTestStream(clientCtx, clientSb, devId, streamIds[0], dummyScript, dummyInterval); err != nil {
t.Errorf("failed creating stream %s: %v", streamIds[0], err)
}
time.Sleep(1 * time.Second)
stopWatch := watchForStreams(measuredCtx, measuredSb, devId)
// Create more streams.
if err := sltu.CreateTestStream(clientCtx, clientSb, devId, streamIds[1], dummyScript, dummyInterval); err != nil {
t.Errorf("failed creating stream %s: %v", streamIds[1], err)
}
time.Sleep(1 * time.Second)
if err := sltu.CreateTestStream(clientCtx, clientSb, devId, streamIds[2], dummyScript, dummyInterval); err != nil {
t.Errorf("failed creating stream %s: %v", streamIds[2], err)
}
// Allow time for sync.
time.Sleep(3 * time.Second)
// Stop the watcher.
gotStreams, err := stopWatch()
if err != nil {
t.Fatalf("measureWatcher failed: %v", err)
}
// Compare streams seen by watcher to expectations.
sort.Strings(gotStreams)
sort.Strings(streamIds)
if got, want := gotStreams, streamIds; !reflect.DeepEqual(got, want) {
t.Errorf("watch returned streams do not match: got %v, want %v", got, want)
}
}
func initSyncgroup(ctx *context.T, sbService, devId, admin, publishSb, sgMT string) error {
db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredCollections)
if err != nil {
return fmt.Errorf("failed initializing measured database: %v", err)
}
sgMTs := append(v23.GetNamespace(ctx).Roots(), sgMT)
if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
return fmt.Errorf("failed initializing measured syncgroup: %v", err)
}
if err := measure.InitSyncgroup(ctx, db, devId, admin, publishSb, sgMTs); err != nil {
return fmt.Errorf("initializing measured syncgroup should be idempotent, repeat failed: %v", err)
}
return nil
}
func watchForStreams(ctx *context.T, sbService, devId string) (stop func() (gotStreams []string, err error)) {
db, err := sbutil.CreateOrOpenDB(ctx, sbService, sbmodel.MeasuredCollections)
if err != nil {
return func() ([]string, error) {
return nil, fmt.Errorf("failed opening measured database: %v", err)
}
}
var gotStreams []string
ctx, cancel := context.WithCancel(ctx)
wait := util.AsyncRun(func() error {
return measure.WatchForStreams(ctx, db, devId, func(key *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
if got, want := key.DevId, devId; got != want {
return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
}
gotStreams = append(gotStreams, key.StreamId)
return nil
})
}, func(err error) {
cancel()
})
return func() ([]string, error) {
cancel()
if err := wait(); err != nil {
return nil, err
}
return gotStreams, nil
}
}
func startAdditionalMT(sh *v23test.Shell, args ...string) (string, func(sig os.Signal)) {
mounttabledPath := v23test.BuildGoPkg(sh, "v.io/x/ref/services/mounttable/mounttabled")
inv := sh.Cmd(mounttabledPath, args...)
inv.Start()
return inv.S.ExpectVar("NAME"), inv.Terminate
}