Move syncbase discovery to the ref repo.
This change also includes the chnage in:
https://vanadium-review.googlesource.com/#/c/22812/
Please see that change for details.
MultiPart: 2/2
Change-Id: If85c95523b08986ddd67a12c4af97d666a8b4fe7
diff --git a/services/syncbase/discovery/discovery.go b/services/syncbase/discovery/discovery.go
new file mode 100644
index 0000000..baeaff4
--- /dev/null
+++ b/services/syncbase/discovery/discovery.go
@@ -0,0 +1,444 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package syncbase
+
+import (
+ "fmt"
+ "sort"
+ "strings"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/discovery"
+ "v.io/v23/security"
+ wire "v.io/v23/services/syncbase"
+ "v.io/x/lib/nsync"
+ "v.io/x/ref/services/syncbase/server/interfaces"
+)
+
+const visibilityKey = "vis"
+
+// Discovery implements v.io/v23/discovery.T for syncbase based
+// applications.
+// TODO(mattr): Actually this is not syncbase specific. At some
+// point we should just replace the result of v23.NewDiscovery
+// with this.
+type Discovery struct {
+ nhDiscovery discovery.T
+ // TODO(mattr): Add global discovery.
+}
+
+// NewDiscovery creates a new syncbase discovery object.
+func NewDiscovery(ctx *context.T) (discovery.T, error) {
+ nhDiscovery, err := v23.NewDiscovery(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return &Discovery{nhDiscovery: nhDiscovery}, nil
+}
+
+// Scan implements v.io/v23/discovery/T.Scan.
+func (d *Discovery) Scan(ctx *context.T, query string) (<-chan discovery.Update, error) {
+ nhUpdates, err := d.nhDiscovery.Scan(ctx, query)
+ if err != nil {
+ return nil, err
+ }
+
+ // Currently setting visibility on the neighborhood discovery
+ // service turns IBE encryption on. We currently don't have the
+ // infrastructure support for IBE, so that would make our advertisements
+ // unreadable by everyone.
+ // Instead we add the visibility list to the attributes of the advertisement
+ // and filter on the client side. This is a temporary measure until
+ // IBE is set up. See v.io/i/1345.
+ updates := make(chan discovery.Update)
+ go func() {
+ defer close(updates)
+ for {
+ var u discovery.Update
+ select {
+ case <-ctx.Done():
+ return
+ case u = <-nhUpdates:
+ }
+ if u == nil {
+ continue
+ }
+ patterns := splitPatterns(u.Attribute(visibilityKey))
+ if len(patterns) > 0 && !matchesPatterns(ctx, patterns) {
+ continue
+ }
+ updates <- update{u}
+ }
+ }()
+
+ return updates, nil
+}
+
+// Advertise implements v.io/v23/discovery/T.Advertise.
+func (d *Discovery) Advertise(ctx *context.T, ad *discovery.Advertisement, visibility []security.BlessingPattern) (<-chan struct{}, error) {
+ // Currently setting visibility on the neighborhood discovery
+ // service turns IBE encryption on. We currently don't have the
+ // infrastructure support for IBE, so that would make our advertisements
+ // unreadable by everyone.
+ // Instead we add the visibility list to the attributes of the advertisement
+ // and filter on the client side. This is a temporary measure until
+ // IBE is set up. See v.io/i/1345.
+ adCopy := *ad
+ if len(visibility) > 0 {
+ adCopy.Attributes = make(discovery.Attributes, len(ad.Attributes)+1)
+ for k, v := range ad.Attributes {
+ adCopy.Attributes[k] = v
+ }
+ patterns := joinPatterns(visibility)
+ adCopy.Attributes[visibilityKey] = patterns
+ }
+ ch, err := d.nhDiscovery.Advertise(ctx, &adCopy, nil)
+ ad.Id = adCopy.Id
+ return ch, err
+}
+
+func matchesPatterns(ctx *context.T, patterns []security.BlessingPattern) bool {
+ p := v23.GetPrincipal(ctx)
+ blessings := p.BlessingStore().PeerBlessings()
+ for _, b := range blessings {
+ names := security.BlessingNames(p, b)
+ for _, pattern := range patterns {
+ if pattern.MatchedBy(names...) {
+ return true
+ }
+ }
+ }
+ return false
+}
+
+// update wraps the discovery.Update to remove the visibility attribute which we add.
+type update struct {
+ discovery.Update
+}
+
+func (u update) Attribute(name string) string {
+ if name == visibilityKey {
+ return ""
+ }
+ return u.Update.Attribute(name)
+}
+
+func (u update) Advertisement() discovery.Advertisement {
+ cp := u.Update.Advertisement()
+ orig := cp.Attributes
+ cp.Attributes = make(discovery.Attributes, len(orig))
+ for k, v := range orig {
+ if k != visibilityKey {
+ cp.Attributes[k] = v
+ }
+ }
+ return cp
+}
+
+// blessingSeparator is used to join multiple blessings into a
+// single string.
+// Note that comma cannot appear in blessings, see:
+// v.io/v23/security/certificate.go
+const blessingsSeparator = ','
+
+// joinPatterns concatenates the elements of a to create a single string.
+// The string can be split again with SplitPatterns.
+func joinPatterns(a []security.BlessingPattern) string {
+ if len(a) == 0 {
+ return ""
+ }
+ if len(a) == 1 {
+ return string(a[0])
+ }
+ n := (len(a) - 1)
+ for i := 0; i < len(a); i++ {
+ n += len(a[i])
+ }
+
+ b := make([]byte, n)
+ bp := copy(b, a[0])
+ for _, s := range a[1:] {
+ b[bp] = blessingsSeparator
+ bp++
+ bp += copy(b[bp:], s)
+ }
+ return string(b)
+}
+
+// splitPatterns splits BlessingPatterns that were joined with
+// JoinBlessingPattern.
+func splitPatterns(patterns string) []security.BlessingPattern {
+ if patterns == "" {
+ return nil
+ }
+ n := strings.Count(patterns, string(blessingsSeparator)) + 1
+ out := make([]security.BlessingPattern, n)
+ last, start := 0, 0
+ for i, r := range patterns {
+ if r == blessingsSeparator {
+ out[last] = security.BlessingPattern(patterns[start:i])
+ last++
+ start = i + 1
+ }
+ }
+ out[last] = security.BlessingPattern(patterns[start:])
+ return out
+}
+
+var state struct {
+ mu nsync.Mu
+ scans map[security.Principal]*scanState
+}
+
+type scanState struct {
+ dbs map[wire.Id]*inviteQueue
+ listeners int
+ cancel context.CancelFunc
+}
+
+func newScan(ctx *context.T) (*scanState, error) {
+ ctx, cancel := context.WithRootCancel(ctx)
+ scan := &scanState{
+ dbs: make(map[wire.Id]*inviteQueue),
+ cancel: cancel,
+ }
+ d, err := NewDiscovery(ctx)
+ if err != nil {
+ scan.cancel()
+ return nil, err
+ }
+ query := fmt.Sprintf("v.InterfaceName=\"%s/%s\"",
+ interfaces.SyncDesc.PkgPath, interfaces.SyncDesc.Name)
+ updates, err := d.Scan(ctx, query)
+ if err != nil {
+ scan.cancel()
+ return nil, err
+ }
+ go func() {
+ for u := range updates {
+ if invite, db, ok := makeInvite(u.Advertisement()); ok {
+ state.mu.Lock()
+ q := scan.dbs[db]
+ if u.IsLost() {
+ // TODO(mattr): Removing like this can result in resurfacing already
+ // retrieved invites. For example if the ACL of a syncgroup is
+ // changed to add a new memeber, then we might see a remove and then
+ // later an add. In this case we would end up moving the invite
+ // to the end of the queue. One way to fix this would be to
+ // keep removed invites for some time.
+ if q != nil {
+ q.remove(invite)
+ if q.empty() {
+ delete(scan.dbs, db)
+ }
+ }
+ } else {
+ if q == nil {
+ q = newInviteQueue()
+ scan.dbs[db] = q
+ }
+ q.add(invite)
+ q.cond.Broadcast()
+ }
+ state.mu.Unlock()
+ }
+ }
+ }()
+ return scan, nil
+}
+
+// Invite represents an invitation to join a syncgroup as found via Discovery.
+type Invite struct {
+ Syncgroup wire.Id //Syncgroup is the Id of the syncgroup you've been invited to.
+ Addresses []string //Addresses are the list of addresses of the inviting server.
+ key string
+}
+
+// ListenForInvites listens via Discovery for syncgroup invitations for the given
+// database and sends the invites to the provided channel. We stop listening when
+// the given context is canceled. When that happens we close the given channel.
+func ListenForInvites(ctx *context.T, db wire.Id, ch chan<- Invite) error {
+ defer state.mu.Unlock()
+ state.mu.Lock()
+
+ p := v23.GetPrincipal(ctx)
+ scan := state.scans[p]
+ if scan == nil {
+ var err error
+ if scan, err = newScan(ctx); err != nil {
+ return err
+ }
+ if state.scans == nil {
+ state.scans = make(map[security.Principal]*scanState)
+ }
+ state.scans[p] = scan
+ }
+ scan.listeners++
+
+ q := scan.dbs[db]
+ if q == nil {
+ q = newInviteQueue()
+ scan.dbs[db] = q
+ }
+
+ go func() {
+ c := q.scan()
+ for {
+ next, ok := q.next(ctx, c)
+ if !ok {
+ break
+ }
+ select {
+ case ch <- next.copy():
+ case <-ctx.Done():
+ break
+ }
+ }
+ close(ch)
+
+ defer state.mu.Unlock()
+ state.mu.Lock()
+
+ if q.empty() {
+ delete(scan.dbs, db)
+ }
+ scan.listeners--
+ if scan.listeners == 0 {
+ scan.cancel()
+ delete(state.scans, v23.GetPrincipal(ctx))
+ }
+ }()
+
+ return nil
+}
+
+// inviteQueue is a linked list based queue. As we get new invitations we
+// add them to the queue, and when advertisements are lost we remove elements.
+// Each call to scan creates a cursor on the queue that will iterate until
+// the Listen is canceled. We wait when we hit the end of the queue via the
+// condition variable cond.
+type inviteQueue struct {
+ mu nsync.Mu
+ cond nsync.CV
+
+ elems map[string]*ielement
+ sentinel ielement
+ cursors map[int]*ielement
+ nextCursorId int
+}
+
+type ielement struct {
+ invite Invite
+ prev, next *ielement
+}
+
+func newInviteQueue() *inviteQueue {
+ iq := &inviteQueue{
+ elems: make(map[string]*ielement),
+ cursors: make(map[int]*ielement),
+ }
+ iq.sentinel.next, iq.sentinel.prev = &iq.sentinel, &iq.sentinel
+ return iq
+}
+
+func (q *inviteQueue) empty() bool {
+ defer q.mu.Unlock()
+ q.mu.Lock()
+ return len(q.elems) == 0 && len(q.cursors) == 0
+}
+
+func (q *inviteQueue) size() int {
+ defer q.mu.Unlock()
+ q.mu.Lock()
+ return len(q.elems)
+}
+
+func (q *inviteQueue) remove(i Invite) {
+ defer q.mu.Unlock()
+ q.mu.Lock()
+
+ el, ok := q.elems[i.key]
+ if !ok {
+ return
+ }
+ delete(q.elems, i.key)
+ el.next.prev, el.prev.next = el.prev, el.next
+ for id, c := range q.cursors {
+ if c == el {
+ q.cursors[id] = c.prev
+ }
+ }
+}
+
+func (q *inviteQueue) add(i Invite) {
+ defer q.mu.Unlock()
+ q.mu.Lock()
+
+ if _, ok := q.elems[i.key]; ok {
+ return
+ }
+ el := &ielement{i, q.sentinel.prev, &q.sentinel}
+ q.sentinel.prev, q.sentinel.prev.next = el, el
+ q.elems[i.key] = el
+ q.cond.Broadcast()
+}
+
+func (q *inviteQueue) next(ctx *context.T, cursor int) (Invite, bool) {
+ defer q.mu.Unlock()
+ q.mu.Lock()
+ c, exists := q.cursors[cursor]
+ if !exists {
+ return Invite{}, false
+ }
+ for c.next == &q.sentinel {
+ if q.cond.WaitWithDeadline(&q.mu, nsync.NoDeadline, ctx.Done()) != nsync.OK {
+ delete(q.cursors, cursor)
+ return Invite{}, false
+ }
+ }
+ c = c.next
+ q.cursors[cursor] = c
+ return c.invite, true
+}
+
+func (q *inviteQueue) scan() int {
+ defer q.mu.Unlock()
+ q.mu.Lock()
+ id := q.nextCursorId
+ q.nextCursorId++
+ q.cursors[id] = &q.sentinel
+ return id
+}
+
+func makeInvite(ad discovery.Advertisement) (Invite, wire.Id, bool) {
+ var dbId, sgId wire.Id
+ var ok bool
+ if dbId.Name, ok = ad.Attributes[wire.DiscoveryAttrDatabaseName]; !ok {
+ return Invite{}, dbId, false
+ }
+ if dbId.Blessing, ok = ad.Attributes[wire.DiscoveryAttrDatabaseBlessing]; !ok {
+ return Invite{}, dbId, false
+ }
+ if sgId.Name, ok = ad.Attributes[wire.DiscoveryAttrSyncgroupName]; !ok {
+ return Invite{}, dbId, false
+ }
+ if sgId.Blessing, ok = ad.Attributes[wire.DiscoveryAttrSyncgroupBlessing]; !ok {
+ return Invite{}, dbId, false
+ }
+ i := Invite{
+ Addresses: append([]string{}, ad.Addresses...),
+ Syncgroup: sgId,
+ }
+ sort.Strings(i.Addresses)
+ i.key = fmt.Sprintf("%v", i)
+ return i, dbId, true
+}
+
+func (i Invite) copy() Invite {
+ cp := i
+ cp.Addresses = append([]string{}, i.Addresses...)
+ return cp
+}
diff --git a/services/syncbase/discovery/discovery_internal_test.go b/services/syncbase/discovery/discovery_internal_test.go
new file mode 100644
index 0000000..834267d
--- /dev/null
+++ b/services/syncbase/discovery/discovery_internal_test.go
@@ -0,0 +1,159 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package syncbase
+
+import (
+ "bytes"
+ "fmt"
+ "os"
+ "reflect"
+ "testing"
+
+ "v.io/v23/context"
+ "v.io/v23/logging"
+ "v.io/v23/security"
+ wire "v.io/v23/services/syncbase"
+)
+
+func TestJoinSplitPatterns(t *testing.T) {
+ cases := []struct {
+ patterns []security.BlessingPattern
+ joined string
+ }{
+ {nil, ""},
+ {[]security.BlessingPattern{"a", "b"}, "a,b"},
+ {[]security.BlessingPattern{"a:b:c", "d:e:f"}, "a:b:c,d:e:f"},
+ {[]security.BlessingPattern{"alpha:one", "alpha:two", "alpha:three"}, "alpha:one,alpha:two,alpha:three"},
+ }
+ for _, c := range cases {
+ if got := joinPatterns(c.patterns); got != c.joined {
+ t.Errorf("%#v, got %q, wanted %q", c.patterns, got, c.joined)
+ }
+ if got := splitPatterns(c.joined); !reflect.DeepEqual(got, c.patterns) {
+ t.Errorf("%q, got %#v, wanted %#v", c.joined, got, c.patterns)
+ }
+ }
+ // Special case, Joining an empty non-nil list results in empty string.
+ if got := joinPatterns([]security.BlessingPattern{}); got != "" {
+ t.Errorf("Joining empty list: got %q, want %q", got, "")
+ }
+}
+
+type logger struct {
+ logging.Logger
+}
+
+func (l logger) InfoDepth(depth int, args ...interface{}) {
+ fmt.Fprintln(os.Stdout, args...)
+}
+
+func TestInviteQueue(t *testing.T) {
+ ctx, cancel := context.RootContext()
+ defer cancel()
+
+ ctx = context.WithLogger(ctx, logger{})
+
+ q := newInviteQueue()
+ if q.size() != 0 {
+ t.Errorf("got %d, want 0", q.size())
+ }
+
+ want := []string{"0", "1", "2", "3", "4"}
+
+ // Test inserts during a long scan.
+ ch := make(chan struct{})
+ go func() {
+ scan(ctx, q, want)
+ close(ch)
+ }()
+
+ // Add a bunch of entries.
+ for i, w := range want {
+ q.add(Invite{Syncgroup: wire.Id{Name: w}, key: w})
+ if q.size() != i+1 {
+ t.Errorf("got %d, want %d", q.size(), i+1)
+ }
+ if err := scan(ctx, q, want[:i+1]); err != nil {
+ t.Error(err)
+ }
+ }
+
+ // Make sure long term scan finished.
+ <-ch
+
+ // Start another long scan that will proceed across deletes
+ steps := make(chan struct{})
+ go func() {
+ ctx, cancel := context.WithCancel(ctx)
+ c := q.scan()
+
+ if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[0] {
+ t.Error("next should have suceeded.")
+ }
+ if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[1] {
+ t.Error("next should have suceeded.")
+ }
+
+ steps <- struct{}{}
+ <-steps
+
+ if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[3] {
+ t.Error("next should have suceeded.")
+ }
+ if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[4] {
+ t.Error("next should have suceeded.")
+ }
+ cancel()
+
+ steps <- struct{}{}
+ }()
+
+ // Wait for the scan to read the first two values.
+ <-steps
+
+ // Remove a bunch of entries.
+ for i, w := range want {
+ q.remove(Invite{Syncgroup: wire.Id{Name: w}, key: w})
+ if i == 2 {
+ // Tell the scan to read the next two values
+ steps <- struct{}{}
+ // Wait for it to finish.
+ <-steps
+ }
+ if q.size() != len(want)-i-1 {
+ t.Errorf("got %d, want %d", q.size(), len(want)-i-1)
+ }
+ if err := scan(ctx, q, want[i+1:]); err != nil {
+ t.Error(err)
+ }
+ }
+}
+
+func logList(ctx *context.T, q *inviteQueue) {
+ buf := &bytes.Buffer{}
+ for e := q.sentinel.next; e != &q.sentinel; e = e.next {
+ fmt.Fprintf(buf, "%p ", e)
+ }
+ ctx.Info("list", buf.String())
+}
+
+func scan(ctx *context.T, q *inviteQueue, want []string) error {
+ ctx, cancel := context.WithCancel(ctx)
+ c := q.scan()
+ for i, w := range want {
+ inv, ok := q.next(ctx, c)
+ if !ok {
+ return fmt.Errorf("scan ended after %d entries, wanted %d", i, len(want))
+ }
+ if got := inv.Syncgroup.Name; got != w {
+ return fmt.Errorf("got %s, want %s", got, w)
+ }
+ if i == len(want)-1 {
+ // Calling cancel should allow next to fail, exiting the loop.
+ cancel()
+ }
+ }
+ return nil
+}
diff --git a/services/syncbase/discovery/discovery_test.go b/services/syncbase/discovery/discovery_test.go
new file mode 100644
index 0000000..5928c6a
--- /dev/null
+++ b/services/syncbase/discovery/discovery_test.go
@@ -0,0 +1,255 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package syncbase_test
+
+import (
+ "fmt"
+ "reflect"
+ "testing"
+ "time"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/discovery"
+ "v.io/v23/security"
+ "v.io/v23/security/access"
+ wire "v.io/v23/services/syncbase"
+ "v.io/v23/syncbase"
+ "v.io/v23/verror"
+ _ "v.io/x/ref/runtime/factories/roaming"
+ syncdis "v.io/x/ref/services/syncbase/discovery"
+ tu "v.io/x/ref/services/syncbase/testutil"
+ "v.io/x/ref/test/testutil"
+)
+
+func TestSyncgroupDiscovery(t *testing.T) {
+ _, ctx, sName, rootp, cleanup := tu.SetupOrDieCustom("o:app1:client1", "server",
+ tu.DefaultPerms(access.AllTypicalTags(), "root:o:app1:client1"))
+ defer cleanup()
+ d := tu.CreateDatabase(t, ctx, syncbase.NewService(sName), "d")
+ collection1 := tu.CreateCollection(t, ctx, d, "c1")
+ collection2 := tu.CreateCollection(t, ctx, d, "c2")
+
+ c1Updates, err := scanAs(ctx, rootp, "o:app1:client1")
+ if err != nil {
+ panic(err)
+ }
+ c2Updates, err := scanAs(ctx, rootp, "o:app1:client2")
+ if err != nil {
+ panic(err)
+ }
+
+ sgId := d.Syncgroup(ctx, "sg1").Id()
+ spec := wire.SyncgroupSpec{
+ Description: "test syncgroup sg1",
+ Perms: tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1"),
+ Collections: []wire.Id{collection1.Id()},
+ }
+ createSyncgroup(t, ctx, d, sgId, spec, verror.ID(""))
+
+ // First update is for syncbase, and not a specific syncgroup.
+ u := <-c1Updates
+ attrs := u.Advertisement().Attributes
+ peer := attrs[wire.DiscoveryAttrPeer]
+ if peer == "" || len(attrs) != 1 {
+ t.Errorf("Got %v, expected only a peer name.", attrs)
+ }
+ // Client2 should see the same.
+ if err := expect(c2Updates, &discovery.AdId{}, find, discovery.Attributes{wire.DiscoveryAttrPeer: peer}); err != nil {
+ t.Error(err)
+ }
+
+ sg1Attrs := discovery.Attributes{
+ wire.DiscoveryAttrDatabaseName: "d",
+ wire.DiscoveryAttrDatabaseBlessing: "root:o:app1",
+ wire.DiscoveryAttrSyncgroupName: "sg1",
+ wire.DiscoveryAttrSyncgroupBlessing: "root:o:app1:client1",
+ }
+ sg2Attrs := discovery.Attributes{
+ wire.DiscoveryAttrDatabaseName: "d",
+ wire.DiscoveryAttrDatabaseBlessing: "root:o:app1",
+ wire.DiscoveryAttrSyncgroupName: "sg2",
+ wire.DiscoveryAttrSyncgroupBlessing: "root:o:app1:client1",
+ }
+
+ // Then we should see an update for the created syncgroup.
+ var sg1AdId discovery.AdId
+ if err := expect(c1Updates, &sg1AdId, find, sg1Attrs); err != nil {
+ t.Error(err)
+ }
+
+ // Now update the spec to add client2 to the permissions.
+ spec.Perms = tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1", "root:o:app1:client2")
+ if err := d.SyncgroupForId(sgId).SetSpec(ctx, spec, ""); err != nil {
+ t.Fatalf("sg.SetSpec failed: %v", err)
+ }
+
+ // Client1 should see a lost and a found message.
+ if err := expect(c1Updates, &sg1AdId, both, sg1Attrs); err != nil {
+ t.Error(err)
+ }
+ // Client2 should just now see the found message.
+ if err := expect(c2Updates, &sg1AdId, find, sg1Attrs); err != nil {
+ t.Error(err)
+ }
+
+ // Now create a second syncgroup.
+ sg2Id := d.Syncgroup(ctx, "sg2").Id()
+ spec2 := wire.SyncgroupSpec{
+ Description: "test syncgroup sg2",
+ Perms: tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1", "root:o:app1:client2"),
+ Collections: []wire.Id{collection2.Id()},
+ }
+ createSyncgroup(t, ctx, d, sg2Id, spec2, verror.ID(""))
+
+ // Both clients should see the new syncgroup.
+ var sg2AdId discovery.AdId
+ if err := expect(c1Updates, &sg2AdId, find, sg2Attrs); err != nil {
+ t.Error(err)
+ }
+ if err := expect(c2Updates, &sg2AdId, find, sg2Attrs); err != nil {
+ t.Error(err)
+ }
+
+ spec2.Perms = tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1")
+ if err := d.SyncgroupForId(sg2Id).SetSpec(ctx, spec2, ""); err != nil {
+ t.Fatalf("sg.SetSpec failed: %v", err)
+ }
+ if err := expect(c2Updates, &sg2AdId, lose, sg2Attrs); err != nil {
+ t.Error(err)
+ }
+}
+
+func scanAs(ctx *context.T, rootp security.Principal, as string) (<-chan discovery.Update, error) {
+ idp := testutil.IDProviderFromPrincipal(rootp)
+ p := testutil.NewPrincipal()
+ if err := idp.Bless(p, as); err != nil {
+ return nil, err
+ }
+ ctx, err := v23.WithPrincipal(ctx, p)
+ if err != nil {
+ return nil, err
+ }
+ dis, err := syncdis.NewDiscovery(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return dis.Scan(ctx, `v.InterfaceName="v.io/x/ref/services/syncbase/server/interfaces/Sync"`)
+}
+
+const (
+ lose = "lose"
+ find = "find"
+ both = "both"
+)
+
+func expect(ch <-chan discovery.Update, id *discovery.AdId, typ string, want discovery.Attributes) error {
+ select {
+ case u := <-ch:
+ if (u.IsLost() && typ == find) || (!u.IsLost() && typ == lose) {
+ return fmt.Errorf("IsLost mismatch. Got %v, wanted %v", u, typ)
+ }
+ ad := u.Advertisement()
+ got := ad.Attributes
+ if id.IsValid() {
+ if *id != ad.Id {
+ return fmt.Errorf("mismatched id, got %v, want %v", ad.Id, id)
+ }
+ } else {
+ *id = ad.Id
+ }
+ if !reflect.DeepEqual(got, want) {
+ return fmt.Errorf("got %v, want %v", got, want)
+ }
+ if typ == both {
+ typ = lose
+ if u.IsLost() {
+ typ = find
+ }
+ return expect(ch, id, typ, want)
+ }
+ return nil
+ case <-time.After(2 * time.Second):
+ return fmt.Errorf("timed out")
+ }
+}
+
+func TestListenForInvites(t *testing.T) {
+ _, ctx, sName, rootp, cleanup := tu.SetupOrDieCustom(
+ "o:app1:client1",
+ "server",
+ tu.DefaultPerms(access.AllTypicalTags(), "root:o:app1:client1"))
+ defer cleanup()
+ service := syncbase.NewService(sName)
+ d1 := tu.CreateDatabase(t, ctx, service, "d1")
+ d2 := tu.CreateDatabase(t, ctx, service, "d2")
+
+ collections := []wire.Id{}
+ for _, d := range []syncbase.Database{d1, d2} {
+ for _, n := range []string{"c1", "c2"} {
+ c := tu.CreateCollection(t, ctx, d, d.Id().Name+n)
+ collections = append(collections, c.Id())
+ }
+ }
+
+ d1invites, err := listenAs(ctx, rootp, "o:app1:client1", d1.Id())
+ if err != nil {
+ panic(err)
+ }
+ d2invites, err := listenAs(ctx, rootp, "o:app1:client1", d2.Id())
+ if err != nil {
+ panic(err)
+ }
+
+ if err := advertiseAndFindSyncgroup(t, ctx, d1, collections[0], d1invites); err != nil {
+ t.Error(err)
+ }
+ if err := advertiseAndFindSyncgroup(t, ctx, d2, collections[2], d2invites); err != nil {
+ t.Error(err)
+ }
+ if err := advertiseAndFindSyncgroup(t, ctx, d1, collections[1], d1invites); err != nil {
+ t.Error(err)
+ }
+ if err := advertiseAndFindSyncgroup(t, ctx, d2, collections[3], d2invites); err != nil {
+ t.Error(err)
+ }
+}
+
+func listenAs(ctx *context.T, rootp security.Principal, as string, db wire.Id) (<-chan syncdis.Invite, error) {
+ ch := make(chan syncdis.Invite)
+ return ch, syncdis.ListenForInvites(tu.NewCtx(ctx, rootp, as), db, ch)
+}
+
+func advertiseAndFindSyncgroup(
+ t *testing.T,
+ ctx *context.T,
+ d syncbase.Database,
+ collection wire.Id,
+ invites <-chan syncdis.Invite) error {
+ sgId := wire.Id{Name: collection.Name + "sg", Blessing: collection.Blessing}
+ ctx.Infof("creating %v", sgId)
+ spec := wire.SyncgroupSpec{
+ Description: "test syncgroup",
+ Perms: tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1"),
+ Collections: []wire.Id{collection},
+ }
+ createSyncgroup(t, ctx, d, sgId, spec, verror.ID(""))
+
+ // We should see an invite for sg1.
+ inv := <-invites
+ if inv.Syncgroup != sgId {
+ return fmt.Errorf("got %v, want %v", inv.Syncgroup, sgId)
+ }
+ return nil
+}
+
+func createSyncgroup(t *testing.T, ctx *context.T, d syncbase.Database, sgId wire.Id, spec wire.SyncgroupSpec, errID verror.ID) syncbase.Syncgroup {
+ sg := d.SyncgroupForId(sgId)
+ info := wire.SyncgroupMemberInfo{SyncPriority: 8}
+ if err := sg.Create(ctx, spec, info); verror.ErrorID(err) != errID {
+ tu.Fatalf(t, "Create SG %+v failed: %v", sgId, err)
+ }
+ return sg
+}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 66eac7b..d9ff29f 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -26,11 +26,11 @@
"v.io/v23/rpc"
"v.io/v23/security/access"
wire "v.io/v23/services/syncbase"
- "v.io/v23/syncbase"
"v.io/v23/verror"
"v.io/x/lib/vlog"
idiscovery "v.io/x/ref/lib/discovery"
"v.io/x/ref/services/syncbase/common"
+ syncdis "v.io/x/ref/services/syncbase/discovery"
blob "v.io/x/ref/services/syncbase/localblobstore"
fsblob "v.io/x/ref/services/syncbase/localblobstore/fs_cablobstore"
"v.io/x/ref/services/syncbase/server/interfaces"
@@ -159,7 +159,7 @@
// that the syncer can pick from. In addition, the sync module responds to
// incoming RPCs from remote sync modules and local clients.
func New(ctx *context.T, sv interfaces.Service, blobStEngine, blobRootDir string, cl *vclock.VClock, publishInNh bool) (*syncService, error) {
- discovery, err := syncbase.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{}))
+ discovery, err := syncdis.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{}))
if err != nil {
return nil, err
}