blob: a53564c51d90cd4c0816dae4525faf0365dcdd55 [file] [log] [blame] [edit]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package control_test
import (
wire ""
_ ""
_ ""
func TestMain(m *testing.M) {
func newController(t *testing.T) (*control.Controller, func()) {
rootDir, err := ioutil.TempDir("", "control-test-")
if err != nil {
ctx, shutdown := v23.Init()
ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
Addrs: rpc.ListenAddrs{{"tcp", ""}},
opts := control.Opts{
DebugOutput: true,
TB: t,
RootDir: rootDir,
c, err := control.NewController(ctx, opts)
if err != nil {
cleanup := func() {
if err := c.TearDown(); err != nil {
return c, cleanup
func mountTableIsRunning(t *testing.T, c *control.Controller) bool {
ctxWithTimeout, cancel := context.WithTimeout(c.InternalCtx(), 1*time.Second)
defer cancel()
_, _, err := v23.GetNamespace(ctxWithTimeout).GetPermissions(ctxWithTimeout, "")
return err == nil
func syncbaseIsRunning(t *testing.T, c *control.Controller, name string) bool {
ctxWithTimeout, cancel := context.WithTimeout(c.InternalCtx(), 1*time.Second)
defer cancel()
_, _, err := syncbase.NewService(name).GetPermissions(ctxWithTimeout)
return err == nil
func TestRunEmptyUniverse(t *testing.T) {
c, cleanup := newController(t)
defer cleanup()
u := &model.Universe{}
if err := c.Run(u); err != nil {
// Check mounttable is running.
if !mountTableIsRunning(t, c) {
t.Errorf("expected mounttable to be running but it was not")
// Calling Run a second time should not fail.
if err := c.Run(u); err != nil {
func TestRunUniverseSingleDevice(t *testing.T) {
c, cleanup := newController(t)
defer cleanup()
userName := "test-user"
deviceName := "test-device"
u := &model.Universe{
Users: model.UserSet{
Name: userName,
Devices: model.DeviceSet{
Name: deviceName,
if err := c.Run(u); err != nil {
// Check mounttable is running.
if !mountTableIsRunning(t, c) {
t.Errorf("expected mounttable to be running but it was not")
// Check syncbase is running.
if !syncbaseIsRunning(t, c, deviceName) {
t.Errorf("expected syncbase %q to be running but it was not", deviceName)
// Check that instance has correct blessing name.
gotBlessings := c.InternalGetInstance(deviceName).InternalDefaultBlessings().String()
wantSuffix := strings.Join([]string{userName, deviceName}, security.ChainSeparator)
if !strings.HasSuffix(gotBlessings, wantSuffix) {
t.Errorf("wanted blessing name to have suffix %v but got %v", wantSuffix, gotBlessings)
// Calling Run a second time should not fail.
if err := c.Run(u); err != nil {
if !syncbaseIsRunning(t, c, deviceName) {
t.Errorf("expected syncbase %q to be running but it was not", deviceName)
// Delete the device from the universe.
u.Users[0].Devices = model.DeviceSet{}
// Calling Run again should error.
if err := c.Run(u); err == nil {
t.Fatal("expected Run to fail with shrunk universe but it did not")
func TestRunUniverseTwoDevices(t *testing.T) {
c, cleanup := newController(t)
defer cleanup()
d1 := &model.Device{
Name: "test-device-1",
d2 := &model.Device{
Name: "test-device-2",
users := model.UserSet{
Name: "user-1",
Devices: model.DeviceSet{d1},
Name: "user-2",
Devices: model.DeviceSet{d2},
// Initially universe has devices unconnected.
uDisconnected := &model.Universe{
Users: users,
Topology: model.Topology{
d1: model.DeviceSet{d1},
d2: model.DeviceSet{d2},
if err := c.Run(uDisconnected); err != nil {
// Check that the devices are not syncing.
if syncbasesCanSync(t, c, d1.Name, d2.Name) {
t.Fatalf("expected syncbases %v and %v not to sync but they did", d1.Name, d2.Name)
// Connect the two devices.
uConnected := &model.Universe{
Users: users,
Topology: model.Topology{
d1: model.DeviceSet{d1, d2},
d2: model.DeviceSet{d1, d2},
if err := c.Run(uConnected); err != nil {
// Check that the devices are syncing.
if !syncbasesCanSync(t, c, d1.Name, d2.Name) {
t.Fatalf("expected syncbases %v and %v to sync but they did not", d1.Name, d2.Name)
// Revert back to unconnected devices.
if err := c.Run(uDisconnected); err != nil {
// Check that the devices are not syncing.
if syncbasesCanSync(t, c, d1.Name, d2.Name) {
t.Fatalf("expected syncbases %v and %v not to sync but they did", d1.Name, d2.Name)
func TestRunUniverseSingleDeviceWithOneClient(t *testing.T) {
c, cleanup := newController(t)
defer cleanup()
var counter int32
keyValueFunc := func(_ time.Time) (string, interface{}) {
// Stop writing after 5 rows.
if counter >= 5 {
return "", nil
return fmt.Sprintf("%d", counter), counter
// mu is locked until Writer writes 5 rows.
mu := sync.Mutex{}
keysWritten := 0
onWrite := func(_ syncbase.Collection, _ string, _ interface{}, err error) {
if err != nil {
t.Fatalf("error encountered during write: %v", err)
if keysWritten == 5 {
control.RegisterClient("test-writer", func() client.Client {
return &client.Writer{
WriteInterval: 50 * time.Millisecond,
KeyValueFunc: keyValueFunc,
OnWrite: onWrite,
defer control.InternalResetClientRegistry()
dbModel := &model.Database{
Name: "test_db",
Blessing: "root",
Collections: []model.Collection{
Name: "test_col",
Blessing: "root",
u := &model.Universe{
Users: model.UserSet{
Name: "test-user",
Devices: model.DeviceSet{
Name: "test-device",
Clients: []string{"test-writer"},
Databases: model.DatabaseSet{dbModel},
// Start controller and wait a for Writer's KeyValueFunc to run 4 times.
if err := c.Run(u); err != nil {
// Wait for Writer to write 5 values.
// Get a context with the same principal and blessings as the instance.
ctx := c.InternalCtx()
instCtx, err := v23.WithPrincipal(ctx, c.InternalGetInstance("test-device").InternalPrincipal())
if err != nil {
sbService := syncbase.NewService("test-device")
db := sbService.DatabaseForId(dbModel.Id(), nil)
col := db.CollectionForId(dbModel.Collections[0].Id())
stream := col.Scan(instCtx, syncbase.Range("", ""))
// Check that at least 4 rows have been written. Note that we don't check
// for 5 keys because although the KeyValueFunc has run 5 times, we arn't
// guaranteed that the last write has finished.
for i := 0; i < 4; i++ {
advance := stream.Advance()
if stream.Err() != nil {
t.Fatalf("stream.Err(): %v", stream.Err())
if !advance {
t.Fatalf("expected scan to find at least 4 rows but only got %d", i)
func TestRunUniverseSingleDeviceWithTwoClients(t *testing.T) {
c, cleanup := newController(t)
defer cleanup()
control.RegisterClient("test-writer", func() client.Client {
return &client.Writer{
WriteInterval: 50 * time.Millisecond,
// mu is locked until 5 changes have been received by the watcher.
changesReceived := 0
mu := sync.Mutex{}
control.RegisterClient("test-watcher", func() client.Client {
return &client.Watcher{
OnChange: func(wc syncbase.WatchChange) {
if changesReceived == 5 {
defer control.InternalResetClientRegistry()
// Construct model with one device and two clients (writer and watcher).
dbModel := &model.Database{
Name: "test_db",
Blessing: "root",
Collections: []model.Collection{
Name: "test_col",
Blessing: "root",
devModel := &model.Device{
Name: "test-device",
Clients: []string{"test-writer", "test-watcher"},
Databases: model.DatabaseSet{dbModel},
u := &model.Universe{
Users: model.UserSet{
Name: "test-user",
Devices: model.DeviceSet{devModel},
if err := c.Run(u); err != nil {
// Wait for watcher to receive 5 changes.
func TestRunUniverseTwoDevicesWithClients(t *testing.T) {
c, cleanup := newController(t)
defer cleanup()
// mu is locked until 5 changes have been received by the watcher.
changesReceived := 0
mu := sync.Mutex{}
control.RegisterClient("test-watcher", func() client.Client {
return &client.Watcher{
OnChange: func(wc syncbase.WatchChange) {
if changesReceived == 5 {
control.RegisterClient("test-writer", func() client.Client {
return &client.Writer{
WriteInterval: 50 * time.Millisecond,
defer control.InternalResetClientRegistry()
// Construct model with two devices and one client each (one writer and one
// watcher).
user := &model.User{
Name: "test-user",
users := model.UserSet{user}
perms := model.Permissions{
"Admin": users,
"Read": users,
"Resolve": users,
"Write": users,
dbModel := &model.Database{
Name: "test_db",
Blessing: "root",
Permissions: perms,
Collections: []model.Collection{
Name: "test_col",
Blessing: "root",
Permissions: perms,
writerDev := &model.Device{
Name: "writer-device",
Clients: []string{"test-writer"},
Databases: model.DatabaseSet{dbModel},
watcherDev := &model.Device{
Name: "watcher-device",
Clients: []string{"test-watcher"},
Databases: model.DatabaseSet{dbModel},
user.Devices = model.DeviceSet{writerDev, watcherDev}
// Construct a syncgroup and add it to the database.
sg := model.Syncgroup{
HostDevice: writerDev,
NameSuffix: "test_sg",
Collections: dbModel.Collections,
Permissions: perms,
CreatorDevices: model.DeviceSet{writerDev},
JoinerDevices: model.DeviceSet{watcherDev},
dbModel.Syncgroups = []model.Syncgroup{sg}
u := &model.Universe{
Users: model.UserSet{user},
// Both devices can talk to each other.
Topology: model.Topology{
writerDev: model.DeviceSet{watcherDev, writerDev},
watcherDev: model.DeviceSet{watcherDev, writerDev},
if err := c.Run(u); err != nil {
// Wait for watcher to receive 5 changes.
func TestRunUniverseTwoUsers(t *testing.T) {
c, cleanup := newController(t)
defer cleanup()
// mu is locked until 5 changes have been received by the watcher.
changesReceived := 0
mu := sync.Mutex{}
control.RegisterClient("test-watcher", func() client.Client {
return &client.Watcher{
OnChange: func(wc syncbase.WatchChange) {
if changesReceived == 5 {
control.RegisterClient("test-writer", func() client.Client {
return &client.Writer{
WriteInterval: 50 * time.Millisecond,
defer control.InternalResetClientRegistry()
// Construct model with two users, one device each. One device has watcher
// client and other has writer.
userAlice := &model.User{Name: "user-alice"}
userBob := &model.User{Name: "user-bob"}
// Alice has all permissions, and gives Bob read access.
permsAlice := model.Permissions{
"Admin": model.UserSet{userAlice},
"Read": model.UserSet{userAlice, userBob},
"Resolve": model.UserSet{userAlice},
"Write": model.UserSet{userAlice},
// Alice is creator of the database and collection.
dbModel := &model.Database{
Name: "test_db",
Blessing: "root",
Permissions: permsAlice,
Collections: []model.Collection{
Name: "test_col",
Blessing: "root",
Permissions: permsAlice,
devAliceWriter := &model.Device{
Name: "device-alice-writer",
Clients: []string{"test-writer"},
Databases: model.DatabaseSet{dbModel},
devBobWatcher := &model.Device{
Name: "device-bob-watcher",
Clients: []string{"test-watcher"},
Databases: model.DatabaseSet{dbModel},
// Alice has the writer device.
userAlice.Devices = model.DeviceSet{devAliceWriter}
// Bob has the watcher device.
userBob.Devices = model.DeviceSet{devBobWatcher}
// Construct a syncgroup and add it to the database.
sg := model.Syncgroup{
HostDevice: devAliceWriter,
NameSuffix: "test_sg",
Collections: dbModel.Collections,
Permissions: permsAlice,
CreatorDevices: model.DeviceSet{devAliceWriter},
JoinerDevices: model.DeviceSet{devBobWatcher},
dbModel.Syncgroups = []model.Syncgroup{sg}
u := &model.Universe{
Users: model.UserSet{userAlice, userBob},
// Both devices can talk to each other.
Topology: model.Topology{
devAliceWriter: model.DeviceSet{devBobWatcher, devAliceWriter},
devBobWatcher: model.DeviceSet{devBobWatcher, devAliceWriter},
if err := c.Run(u); err != nil {
// Wait for watcher to receive 5 changes.
// Check that Bob gets ErrNoAccess when writing to the collection on his
// own device because he does not have write permissions.
bobCtx, err := c.InternalConfigureContext(c.InternalCtx(), userBob.Name)
if err != nil {
s := syncbase.NewService(devBobWatcher.Name)
db := s.DatabaseForId(dbModel.Id(), nil)
col := db.CollectionForId(dbModel.Collections[0].Id())
if err = col.Put(bobCtx, "test-key", "should fail"); verror.ErrorID(err) != verror.ErrNoAccess.ID {
t.Errorf("expected bob's put to collection %v to fail with ErrNoAccess, but got %v", col, err)
var counter int
// TODO(nlacasse): Once the controller has more client-logic built-in for
// creating databases, collections, syncgroups, etc., see if this test can be
// simplified.
func syncbasesCanSync(t *testing.T, c *control.Controller, sb1Name, sb2Name string) bool {
ctx := c.InternalCtx()
sb1Service, sb2Service := syncbase.NewService(sb1Name), syncbase.NewService(sb2Name)
openPerms := testutil.DefaultPerms("...")
// Create databases on both syncbase servers.
dbName := fmt.Sprintf("test_database_%d", counter)
sb1Db := sb1Service.Database(ctx, dbName, nil)
if err := sb1Db.Create(ctx, openPerms); err != nil {
sb2Db := sb2Service.Database(ctx, dbName, nil)
if err := sb2Db.Create(ctx, openPerms); err != nil {
// Create collections on both syncbase servers.
colName := fmt.Sprintf("test_collection_%d", counter)
sb1Col := sb1Db.Collection(ctx, colName)
if err := sb1Col.Create(ctx, openPerms); err != nil {
sb2Col := sb2Db.Collection(ctx, colName)
if err := sb2Col.Create(ctx, openPerms); err != nil {
// Create a syncgroup on the first syncbase.
sgName := fmt.Sprintf("test_sg_%d", counter)
mounttable := v23.GetNamespace(ctx).Roots()[0]
sbSpec := wire.SyncgroupSpec{
Description: "test syncgroup",
Perms: openPerms,
Collections: []wire.Id{sb1Col.Id()},
MountTables: []string{mounttable},
sb1Sg := sb1Db.SyncgroupForId(wire.Id{Name: sgName, Blessing: "blessing"})
if err := sb1Sg.Create(ctx, sbSpec, wire.SyncgroupMemberInfo{}); err != nil {
// If second syncbase can join the syncgroup, they are connected.
ctxWithTimeout, cancel := context.WithTimeout(ctx, 5*time.Second)
defer cancel()
sb2Sg := sb2Db.SyncgroupForId(wire.Id{Name: sgName, Blessing: "blessing"})
_, err := sb2Sg.Join(ctxWithTimeout, sb1Name, nil, wire.SyncgroupMemberInfo{})
return err == nil