blob: e5626a9ea02542b9e22bf516f19219cd89e27cdb [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package measure_test
import (
"fmt"
"reflect"
"sort"
"testing"
"time"
"v.io/v23/context"
"v.io/v23/syncbase"
_ "v.io/x/ref/runtime/factories/roaming"
sbtu "v.io/x/ref/services/syncbase/testutil"
tu "v.io/x/ref/test/testutil"
"v.io/x/sensorlog/internal/measure"
"v.io/x/sensorlog/internal/sbmodel"
"v.io/x/sensorlog/internal/sbmodel/keyutil"
"v.io/x/sensorlog/internal/sbutil"
"v.io/x/sensorlog/internal/util"
)
var watchCollections = []sbmodel.CollectionSpec{
{Prototype: &sbmodel.KStreamDef{}},
}
func TestWatchForStreams(t *testing.T) {
_, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("u:one", "u:one:sb", nil)
defer cleanup()
// Open (create) db as measured. Keep write permission on StreamDef.
db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchCollections)
if err != nil {
t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
}
devId := "dev1"
otherDev := "dev2"
expectStreams := []string{"str1", "str2", "str3"}
gotStreams := make([]string, 0, len(expectStreams))
time.Sleep(1 * time.Second)
putStreamDef(t, ctxMeasured, db, devId, expectStreams[0])
putStreamDef(t, ctxMeasured, db, otherDev, "foo")
// Watch should call the callback for all devId streams, whether they were
// written before or after starting watch. Streams for other devices should
// not be returned.
time.Sleep(1 * time.Second)
stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error {
if got, want := key.DevId, devId; got != want {
return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want)
}
gotStreams = append(gotStreams, val.Desc)
return nil
})
putStreamDef(t, ctxMeasured, db, devId, expectStreams[1])
time.Sleep(1 * time.Second)
putStreamDef(t, ctxMeasured, db, otherDev, "bar")
putStreamDef(t, ctxMeasured, db, devId, expectStreams[2])
time.Sleep(3 * time.Second)
stop()
if err := wait(); err != nil {
t.Fatalf("watcher exited with error: %v", err)
}
sort.Strings(gotStreams)
sort.Strings(expectStreams)
if got, want := gotStreams, expectStreams; !reflect.DeepEqual(got, want) {
t.Errorf("watch returned streams do not match: got %v, want %v", got, want)
}
}
func TestWatchError(t *testing.T) {
_, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("u:one", "u:one:sb", nil)
defer cleanup()
// Open (create) db as measured. Keep write permission on StreamDef.
db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchCollections)
if err != nil {
t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err)
}
// Watch should return the callback error.
devId := "dev1"
cbErr := fmt.Errorf("callbackError")
stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
return cbErr
})
// Put anything to trigger callback.
putStreamDef(t, ctxMeasured, db, devId, "foo")
time.Sleep(3 * time.Second)
stop()
if err := wait(); err != cbErr {
t.Errorf("watch should have failed with %v, got: %v", cbErr, err)
}
// Watch should return an error when a malformed key is encountered.
devId = "dev2"
stop, wait = runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error {
return nil
})
putStreamDef(t, ctxMeasured, db, keyutil.Join(devId, "fail"), "foo")
time.Sleep(3 * time.Second)
stop()
if err := wait(); err == nil {
t.Errorf("watch should have failed on malformed key")
}
}
func runWatchForStreams(ctx *context.T, db syncbase.Database, devId string, register measure.RegisterWorker) (stop func(), wait func() error) {
ctx, stop = context.WithCancel(ctx)
wait = util.AsyncRun(func() error {
return measure.WatchForStreams(ctx, db, devId, register)
}, nil)
return stop, wait
}
func putStreamDef(t *testing.T, ctx *context.T, db syncbase.DatabaseHandle, devId, desc string) {
key := &sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()}
val := sbmodel.VStreamDef{Desc: desc}
if err := db.Collection(ctx, key.Collection()).Put(ctx, key.Key(), &val); err != nil {
t.Fatalf(tu.FormatLogLine(2, "failed to put StreamDef %s (%s): %v", key.Key(), val.Desc, err))
}
}