blob: 85bbb3a1b8c2f2b7e83db98def1125f9d4933fdb [file] [log] [blame]
// Copyright 2016 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package discovery_test
import (
"fmt"
"reflect"
"testing"
"time"
"v.io/v23"
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/security"
"v.io/v23/security/access"
wire "v.io/v23/services/syncbase"
"v.io/v23/syncbase"
"v.io/v23/verror"
_ "v.io/x/ref/runtime/factories/roaming"
syncdis "v.io/x/ref/services/syncbase/discovery"
tu "v.io/x/ref/services/syncbase/testutil"
"v.io/x/ref/test/testutil"
)
func TestSyncgroupDiscovery(t *testing.T) {
_, ctx, sName, rootp, cleanup := tu.SetupOrDieCustom("o:app1:client1", "server",
tu.DefaultPerms(access.AllTypicalTags(), "root:o:app1:client1"))
defer cleanup()
d := tu.CreateDatabase(t, ctx, syncbase.NewService(sName), "d")
collection1 := tu.CreateCollection(t, ctx, d, "c1")
collection2 := tu.CreateCollection(t, ctx, d, "c2")
c1Updates, err := scanAs(ctx, rootp, "o:app1:client1")
if err != nil {
panic(err)
}
c2Updates, err := scanAs(ctx, rootp, "o:app1:client2")
if err != nil {
panic(err)
}
sgId := d.Syncgroup(ctx, "sg1").Id()
spec := wire.SyncgroupSpec{
Description: "test syncgroup sg1",
Perms: tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1"),
Collections: []wire.Id{collection1.Id()},
}
createSyncgroup(t, ctx, d, sgId, spec, verror.ID(""))
// First update is for syncbase, and not a specific syncgroup.
u := <-c1Updates
attrs := u.Advertisement().Attributes
peer := attrs[wire.DiscoveryAttrPeer]
if peer == "" || len(attrs) != 1 {
t.Errorf("Got %v, expected only a peer name.", attrs)
}
// Client2 should see the same.
if err := expect(c2Updates, &discovery.AdId{}, find, discovery.Attributes{wire.DiscoveryAttrPeer: peer}); err != nil {
t.Error(err)
}
sg1Attrs := discovery.Attributes{
wire.DiscoveryAttrDatabaseName: "d",
wire.DiscoveryAttrDatabaseBlessing: "root:o:app1",
wire.DiscoveryAttrSyncgroupName: "sg1",
wire.DiscoveryAttrSyncgroupBlessing: "root:o:app1:client1",
}
sg2Attrs := discovery.Attributes{
wire.DiscoveryAttrDatabaseName: "d",
wire.DiscoveryAttrDatabaseBlessing: "root:o:app1",
wire.DiscoveryAttrSyncgroupName: "sg2",
wire.DiscoveryAttrSyncgroupBlessing: "root:o:app1:client1",
}
// Then we should see an update for the created syncgroup.
var sg1AdId discovery.AdId
if err := expect(c1Updates, &sg1AdId, find, sg1Attrs); err != nil {
t.Error(err)
}
// Now update the spec to add client2 to the permissions.
spec.Perms = tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1", "root:o:app1:client2")
if err := d.SyncgroupForId(sgId).SetSpec(ctx, spec, ""); err != nil {
t.Fatalf("sg.SetSpec failed: %v", err)
}
// Client1 should see a lost and a found message.
if err := expect(c1Updates, &sg1AdId, both, sg1Attrs); err != nil {
t.Error(err)
}
// Client2 should just now see the found message.
if err := expect(c2Updates, &sg1AdId, find, sg1Attrs); err != nil {
t.Error(err)
}
// Now create a second syncgroup.
sg2Id := d.Syncgroup(ctx, "sg2").Id()
spec2 := wire.SyncgroupSpec{
Description: "test syncgroup sg2",
Perms: tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1", "root:o:app1:client2"),
Collections: []wire.Id{collection2.Id()},
}
createSyncgroup(t, ctx, d, sg2Id, spec2, verror.ID(""))
// Both clients should see the new syncgroup.
var sg2AdId discovery.AdId
if err := expect(c1Updates, &sg2AdId, find, sg2Attrs); err != nil {
t.Error(err)
}
if err := expect(c2Updates, &sg2AdId, find, sg2Attrs); err != nil {
t.Error(err)
}
spec2.Perms = tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1")
if err := d.SyncgroupForId(sg2Id).SetSpec(ctx, spec2, ""); err != nil {
t.Fatalf("sg.SetSpec failed: %v", err)
}
if err := expect(c2Updates, &sg2AdId, lose, sg2Attrs); err != nil {
t.Error(err)
}
}
func scanAs(ctx *context.T, rootp security.Principal, as string) (<-chan discovery.Update, error) {
idp := testutil.IDProviderFromPrincipal(rootp)
p := testutil.NewPrincipal()
if err := idp.Bless(p, as); err != nil {
return nil, err
}
ctx, err := v23.WithPrincipal(ctx, p)
if err != nil {
return nil, err
}
dis, err := syncdis.NewDiscovery(ctx)
if err != nil {
return nil, err
}
return dis.Scan(ctx, `v.InterfaceName="v.io/x/ref/services/syncbase/server/interfaces/Sync"`)
}
const (
lose = "lose"
find = "find"
both = "both"
)
func expect(ch <-chan discovery.Update, id *discovery.AdId, typ string, want discovery.Attributes) error {
select {
case u := <-ch:
if (u.IsLost() && typ == find) || (!u.IsLost() && typ == lose) {
return fmt.Errorf("IsLost mismatch. Got %v, wanted %v", u, typ)
}
ad := u.Advertisement()
got := ad.Attributes
if id.IsValid() {
if *id != ad.Id {
return fmt.Errorf("mismatched id, got %v, want %v", ad.Id, id)
}
} else {
*id = ad.Id
}
if !reflect.DeepEqual(got, want) {
return fmt.Errorf("got %v, want %v", got, want)
}
if typ == both {
typ = lose
if u.IsLost() {
typ = find
}
return expect(ch, id, typ, want)
}
return nil
case <-time.After(2 * time.Second):
return fmt.Errorf("timed out")
}
}
func TestListenForInvites(t *testing.T) {
_, ctx, sName, rootp, cleanup := tu.SetupOrDieCustom(
"o:app1:client1",
"server",
tu.DefaultPerms(access.AllTypicalTags(), "root:o:app1:client1"))
defer cleanup()
service := syncbase.NewService(sName)
d1 := tu.CreateDatabase(t, ctx, service, "d1")
d2 := tu.CreateDatabase(t, ctx, service, "d2")
collections := []wire.Id{}
for _, d := range []syncbase.Database{d1, d2} {
for _, n := range []string{"c1", "c2"} {
c := tu.CreateCollection(t, ctx, d, d.Id().Name+n)
collections = append(collections, c.Id())
}
}
d1invites, err := listenAs(ctx, rootp, "o:app1:client1", d1.Id())
if err != nil {
panic(err)
}
d2invites, err := listenAs(ctx, rootp, "o:app1:client1", d2.Id())
if err != nil {
panic(err)
}
if err := advertiseAndFindSyncgroup(t, ctx, d1, collections[0], d1invites); err != nil {
t.Error(err)
}
if err := advertiseAndFindSyncgroup(t, ctx, d2, collections[2], d2invites); err != nil {
t.Error(err)
}
if err := advertiseAndFindSyncgroup(t, ctx, d1, collections[1], d1invites); err != nil {
t.Error(err)
}
if err := advertiseAndFindSyncgroup(t, ctx, d2, collections[3], d2invites); err != nil {
t.Error(err)
}
}
func listenAs(ctx *context.T, rootp security.Principal, as string, db wire.Id) (<-chan syncdis.Invite, error) {
ch := make(chan syncdis.Invite)
return ch, syncdis.ListenForInvites(tu.NewCtx(ctx, rootp, as), db, ch)
}
func advertiseAndFindSyncgroup(
t *testing.T,
ctx *context.T,
d syncbase.Database,
collection wire.Id,
invites <-chan syncdis.Invite) error {
sgId := wire.Id{Name: collection.Name + "sg", Blessing: collection.Blessing}
ctx.Infof("creating %v", sgId)
spec := wire.SyncgroupSpec{
Description: "test syncgroup",
Perms: tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1"),
Collections: []wire.Id{collection},
}
createSyncgroup(t, ctx, d, sgId, spec, verror.ID(""))
// We should see an invite for sg1.
inv := <-invites
if inv.Syncgroup != sgId {
return fmt.Errorf("got %v, want %v", inv.Syncgroup, sgId)
}
return nil
}
func createSyncgroup(t *testing.T, ctx *context.T, d syncbase.Database, sgId wire.Id, spec wire.SyncgroupSpec, errID verror.ID) syncbase.Syncgroup {
sg := d.SyncgroupForId(sgId)
info := wire.SyncgroupMemberInfo{SyncPriority: 8}
if err := sg.Create(ctx, spec, info); verror.ErrorID(err) != errID {
tu.Fatalf(t, "Create SG %+v failed: %v", sgId, err)
}
return sg
}