| // Copyright 2015 The Vanadium Authors. All rights reserved. |
| // Use of this source code is governed by a BSD-style |
| // license that can be found in the LICENSE file. |
| |
| package measure_test |
| |
| import ( |
| "fmt" |
| "reflect" |
| "sort" |
| "testing" |
| "time" |
| |
| "v.io/v23/context" |
| "v.io/v23/syncbase" |
| _ "v.io/x/ref/runtime/factories/generic" |
| sbtu "v.io/x/ref/services/syncbase/testutil" |
| tu "v.io/x/ref/test/testutil" |
| "v.io/x/sensorlog/internal/measure" |
| "v.io/x/sensorlog/internal/sbmodel" |
| "v.io/x/sensorlog/internal/sbmodel/keyutil" |
| "v.io/x/sensorlog/internal/sbutil" |
| "v.io/x/sensorlog/internal/util" |
| ) |
| |
| var watchCollections = []sbmodel.CollectionSpec{ |
| {Prototype: &sbmodel.KStreamDef{}}, |
| } |
| |
| func TestWatchForStreams(t *testing.T) { |
| _, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one:sb", nil) |
| defer cleanup() |
| |
| // Open (create) db as measured. Keep write permission on StreamDef. |
| db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchCollections) |
| if err != nil { |
| t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err) |
| } |
| |
| devId := "dev1" |
| otherDev := "dev2" |
| expectStreams := []string{"str1", "str2", "str3"} |
| gotStreams := make([]string, 0, len(expectStreams)) |
| |
| time.Sleep(1 * time.Second) |
| putStreamDef(t, ctxMeasured, db, devId, expectStreams[0]) |
| putStreamDef(t, ctxMeasured, db, otherDev, "foo") |
| |
| // Watch should call the callback for all devId streams, whether they were |
| // written before or after starting watch. Streams for other devices should |
| // not be returned. |
| time.Sleep(1 * time.Second) |
| stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(key *sbmodel.KStreamDef, val *sbmodel.VStreamDef) error { |
| if got, want := key.DevId, devId; got != want { |
| return fmt.Errorf("watch returned stream for wrong device: got %s, want %s", got, want) |
| } |
| gotStreams = append(gotStreams, val.Desc) |
| return nil |
| }) |
| putStreamDef(t, ctxMeasured, db, devId, expectStreams[1]) |
| |
| time.Sleep(1 * time.Second) |
| putStreamDef(t, ctxMeasured, db, otherDev, "bar") |
| putStreamDef(t, ctxMeasured, db, devId, expectStreams[2]) |
| |
| time.Sleep(3 * time.Second) |
| stop() |
| if err := wait(); err != nil { |
| t.Fatalf("watcher exited with error: %v", err) |
| } |
| |
| sort.Strings(gotStreams) |
| sort.Strings(expectStreams) |
| if got, want := gotStreams, expectStreams; !reflect.DeepEqual(got, want) { |
| t.Errorf("watch returned streams do not match: got %v, want %v", got, want) |
| } |
| } |
| |
| func TestWatchError(t *testing.T) { |
| _, ctxMeasured, sbName, _, cleanup := sbtu.SetupOrDieCustom("one", "one:sb", nil) |
| defer cleanup() |
| |
| // Open (create) db as measured. Keep write permission on StreamDef. |
| db, err := sbutil.CreateOrOpenDB(ctxMeasured, sbName, watchCollections) |
| if err != nil { |
| t.Fatalf("CreateOrOpenDB should have succeeded, got error: %v", err) |
| } |
| |
| // Watch should return the callback error. |
| devId := "dev1" |
| cbErr := fmt.Errorf("callbackError") |
| stop, wait := runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error { |
| return cbErr |
| }) |
| // Put anything to trigger callback. |
| putStreamDef(t, ctxMeasured, db, devId, "foo") |
| |
| time.Sleep(3 * time.Second) |
| stop() |
| if err := wait(); err != cbErr { |
| t.Errorf("watch should have failed with %v, got: %v", cbErr, err) |
| } |
| |
| // Watch should return an error when a malformed key is encountered. |
| devId = "dev2" |
| stop, wait = runWatchForStreams(ctxMeasured, db, devId, func(_ *sbmodel.KStreamDef, _ *sbmodel.VStreamDef) error { |
| return nil |
| }) |
| putStreamDef(t, ctxMeasured, db, keyutil.Join(devId, "fail"), "foo") |
| |
| time.Sleep(3 * time.Second) |
| stop() |
| if err := wait(); err == nil { |
| t.Errorf("watch should have failed on malformed key") |
| } |
| } |
| |
| func runWatchForStreams(ctx *context.T, db syncbase.Database, devId string, register measure.RegisterWorker) (stop func(), wait func() error) { |
| ctx, stop = context.WithCancel(ctx) |
| wait = util.AsyncRun(func() error { |
| return measure.WatchForStreams(ctx, db, devId, register) |
| }, nil) |
| return stop, wait |
| } |
| |
| func putStreamDef(t *testing.T, ctx *context.T, db syncbase.DatabaseHandle, devId, desc string) { |
| key := sbmodel.KStreamDef{DevId: devId, StreamId: keyutil.UUID()} |
| val := sbmodel.VStreamDef{Desc: desc} |
| if err := db.Collection(key.Collection()).Put(ctx, key.Key(), &val); err != nil { |
| t.Fatalf(tu.FormatLogLine(2, "failed to put StreamDef %s (%s): %v", key.Key(), val.Desc, err)) |
| } |
| } |