Move syncbase discovery to the ref repo.

This change also includes the chnage in:
https://vanadium-review.googlesource.com/#/c/22812/

Please see that change for details.

MultiPart: 2/2
Change-Id: If85c95523b08986ddd67a12c4af97d666a8b4fe7
diff --git a/services/syncbase/discovery/discovery.go b/services/syncbase/discovery/discovery.go
new file mode 100644
index 0000000..baeaff4
--- /dev/null
+++ b/services/syncbase/discovery/discovery.go
@@ -0,0 +1,444 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package syncbase
+
+import (
+	"fmt"
+	"sort"
+	"strings"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/discovery"
+	"v.io/v23/security"
+	wire "v.io/v23/services/syncbase"
+	"v.io/x/lib/nsync"
+	"v.io/x/ref/services/syncbase/server/interfaces"
+)
+
+const visibilityKey = "vis"
+
+// Discovery implements v.io/v23/discovery.T for syncbase based
+// applications.
+// TODO(mattr): Actually this is not syncbase specific.  At some
+// point we should just replace the result of v23.NewDiscovery
+// with this.
+type Discovery struct {
+	nhDiscovery discovery.T
+	// TODO(mattr): Add global discovery.
+}
+
+// NewDiscovery creates a new syncbase discovery object.
+func NewDiscovery(ctx *context.T) (discovery.T, error) {
+	nhDiscovery, err := v23.NewDiscovery(ctx)
+	if err != nil {
+		return nil, err
+	}
+	return &Discovery{nhDiscovery: nhDiscovery}, nil
+}
+
+// Scan implements v.io/v23/discovery/T.Scan.
+func (d *Discovery) Scan(ctx *context.T, query string) (<-chan discovery.Update, error) {
+	nhUpdates, err := d.nhDiscovery.Scan(ctx, query)
+	if err != nil {
+		return nil, err
+	}
+
+	// Currently setting visibility on the neighborhood discovery
+	// service turns IBE encryption on.  We currently don't have the
+	// infrastructure support for IBE, so that would make our advertisements
+	// unreadable by everyone.
+	// Instead we add the visibility list to the attributes of the advertisement
+	// and filter on the client side.  This is a temporary measure until
+	// IBE is set up.  See v.io/i/1345.
+	updates := make(chan discovery.Update)
+	go func() {
+		defer close(updates)
+		for {
+			var u discovery.Update
+			select {
+			case <-ctx.Done():
+				return
+			case u = <-nhUpdates:
+			}
+			if u == nil {
+				continue
+			}
+			patterns := splitPatterns(u.Attribute(visibilityKey))
+			if len(patterns) > 0 && !matchesPatterns(ctx, patterns) {
+				continue
+			}
+			updates <- update{u}
+		}
+	}()
+
+	return updates, nil
+}
+
+// Advertise implements v.io/v23/discovery/T.Advertise.
+func (d *Discovery) Advertise(ctx *context.T, ad *discovery.Advertisement, visibility []security.BlessingPattern) (<-chan struct{}, error) {
+	// Currently setting visibility on the neighborhood discovery
+	// service turns IBE encryption on.  We currently don't have the
+	// infrastructure support for IBE, so that would make our advertisements
+	// unreadable by everyone.
+	// Instead we add the visibility list to the attributes of the advertisement
+	// and filter on the client side.  This is a temporary measure until
+	// IBE is set up.  See v.io/i/1345.
+	adCopy := *ad
+	if len(visibility) > 0 {
+		adCopy.Attributes = make(discovery.Attributes, len(ad.Attributes)+1)
+		for k, v := range ad.Attributes {
+			adCopy.Attributes[k] = v
+		}
+		patterns := joinPatterns(visibility)
+		adCopy.Attributes[visibilityKey] = patterns
+	}
+	ch, err := d.nhDiscovery.Advertise(ctx, &adCopy, nil)
+	ad.Id = adCopy.Id
+	return ch, err
+}
+
+func matchesPatterns(ctx *context.T, patterns []security.BlessingPattern) bool {
+	p := v23.GetPrincipal(ctx)
+	blessings := p.BlessingStore().PeerBlessings()
+	for _, b := range blessings {
+		names := security.BlessingNames(p, b)
+		for _, pattern := range patterns {
+			if pattern.MatchedBy(names...) {
+				return true
+			}
+		}
+	}
+	return false
+}
+
+// update wraps the discovery.Update to remove the visibility attribute which we add.
+type update struct {
+	discovery.Update
+}
+
+func (u update) Attribute(name string) string {
+	if name == visibilityKey {
+		return ""
+	}
+	return u.Update.Attribute(name)
+}
+
+func (u update) Advertisement() discovery.Advertisement {
+	cp := u.Update.Advertisement()
+	orig := cp.Attributes
+	cp.Attributes = make(discovery.Attributes, len(orig))
+	for k, v := range orig {
+		if k != visibilityKey {
+			cp.Attributes[k] = v
+		}
+	}
+	return cp
+}
+
+// blessingSeparator is used to join multiple blessings into a
+// single string.
+// Note that comma cannot appear in blessings, see:
+// v.io/v23/security/certificate.go
+const blessingsSeparator = ','
+
+// joinPatterns concatenates the elements of a to create a single string.
+// The string can be split again with SplitPatterns.
+func joinPatterns(a []security.BlessingPattern) string {
+	if len(a) == 0 {
+		return ""
+	}
+	if len(a) == 1 {
+		return string(a[0])
+	}
+	n := (len(a) - 1)
+	for i := 0; i < len(a); i++ {
+		n += len(a[i])
+	}
+
+	b := make([]byte, n)
+	bp := copy(b, a[0])
+	for _, s := range a[1:] {
+		b[bp] = blessingsSeparator
+		bp++
+		bp += copy(b[bp:], s)
+	}
+	return string(b)
+}
+
+// splitPatterns splits BlessingPatterns that were joined with
+// JoinBlessingPattern.
+func splitPatterns(patterns string) []security.BlessingPattern {
+	if patterns == "" {
+		return nil
+	}
+	n := strings.Count(patterns, string(blessingsSeparator)) + 1
+	out := make([]security.BlessingPattern, n)
+	last, start := 0, 0
+	for i, r := range patterns {
+		if r == blessingsSeparator {
+			out[last] = security.BlessingPattern(patterns[start:i])
+			last++
+			start = i + 1
+		}
+	}
+	out[last] = security.BlessingPattern(patterns[start:])
+	return out
+}
+
+var state struct {
+	mu    nsync.Mu
+	scans map[security.Principal]*scanState
+}
+
+type scanState struct {
+	dbs       map[wire.Id]*inviteQueue
+	listeners int
+	cancel    context.CancelFunc
+}
+
+func newScan(ctx *context.T) (*scanState, error) {
+	ctx, cancel := context.WithRootCancel(ctx)
+	scan := &scanState{
+		dbs:    make(map[wire.Id]*inviteQueue),
+		cancel: cancel,
+	}
+	d, err := NewDiscovery(ctx)
+	if err != nil {
+		scan.cancel()
+		return nil, err
+	}
+	query := fmt.Sprintf("v.InterfaceName=\"%s/%s\"",
+		interfaces.SyncDesc.PkgPath, interfaces.SyncDesc.Name)
+	updates, err := d.Scan(ctx, query)
+	if err != nil {
+		scan.cancel()
+		return nil, err
+	}
+	go func() {
+		for u := range updates {
+			if invite, db, ok := makeInvite(u.Advertisement()); ok {
+				state.mu.Lock()
+				q := scan.dbs[db]
+				if u.IsLost() {
+					// TODO(mattr): Removing like this can result in resurfacing already
+					// retrieved invites.  For example if the ACL of a syncgroup is
+					// changed to add a new memeber, then we might see a remove and then
+					// later an add.  In this case we would end up moving the invite
+					// to the end of the queue.  One way to fix this would be to
+					// keep removed invites for some time.
+					if q != nil {
+						q.remove(invite)
+						if q.empty() {
+							delete(scan.dbs, db)
+						}
+					}
+				} else {
+					if q == nil {
+						q = newInviteQueue()
+						scan.dbs[db] = q
+					}
+					q.add(invite)
+					q.cond.Broadcast()
+				}
+				state.mu.Unlock()
+			}
+		}
+	}()
+	return scan, nil
+}
+
+// Invite represents an invitation to join a syncgroup as found via Discovery.
+type Invite struct {
+	Syncgroup wire.Id  //Syncgroup is the Id of the syncgroup you've been invited to.
+	Addresses []string //Addresses are the list of addresses of the inviting server.
+	key       string
+}
+
+// ListenForInvites listens via Discovery for syncgroup invitations for the given
+// database and sends the invites to the provided channel.  We stop listening when
+// the given context is canceled.  When that happens we close the given channel.
+func ListenForInvites(ctx *context.T, db wire.Id, ch chan<- Invite) error {
+	defer state.mu.Unlock()
+	state.mu.Lock()
+
+	p := v23.GetPrincipal(ctx)
+	scan := state.scans[p]
+	if scan == nil {
+		var err error
+		if scan, err = newScan(ctx); err != nil {
+			return err
+		}
+		if state.scans == nil {
+			state.scans = make(map[security.Principal]*scanState)
+		}
+		state.scans[p] = scan
+	}
+	scan.listeners++
+
+	q := scan.dbs[db]
+	if q == nil {
+		q = newInviteQueue()
+		scan.dbs[db] = q
+	}
+
+	go func() {
+		c := q.scan()
+		for {
+			next, ok := q.next(ctx, c)
+			if !ok {
+				break
+			}
+			select {
+			case ch <- next.copy():
+			case <-ctx.Done():
+				break
+			}
+		}
+		close(ch)
+
+		defer state.mu.Unlock()
+		state.mu.Lock()
+
+		if q.empty() {
+			delete(scan.dbs, db)
+		}
+		scan.listeners--
+		if scan.listeners == 0 {
+			scan.cancel()
+			delete(state.scans, v23.GetPrincipal(ctx))
+		}
+	}()
+
+	return nil
+}
+
+// inviteQueue is a linked list based queue.  As we get new invitations we
+// add them to the queue, and when advertisements are lost we remove elements.
+// Each call to scan creates a cursor on the queue that will iterate until
+// the Listen is canceled.  We wait when we hit the end of the queue via the
+// condition variable cond.
+type inviteQueue struct {
+	mu   nsync.Mu
+	cond nsync.CV
+
+	elems        map[string]*ielement
+	sentinel     ielement
+	cursors      map[int]*ielement
+	nextCursorId int
+}
+
+type ielement struct {
+	invite     Invite
+	prev, next *ielement
+}
+
+func newInviteQueue() *inviteQueue {
+	iq := &inviteQueue{
+		elems:   make(map[string]*ielement),
+		cursors: make(map[int]*ielement),
+	}
+	iq.sentinel.next, iq.sentinel.prev = &iq.sentinel, &iq.sentinel
+	return iq
+}
+
+func (q *inviteQueue) empty() bool {
+	defer q.mu.Unlock()
+	q.mu.Lock()
+	return len(q.elems) == 0 && len(q.cursors) == 0
+}
+
+func (q *inviteQueue) size() int {
+	defer q.mu.Unlock()
+	q.mu.Lock()
+	return len(q.elems)
+}
+
+func (q *inviteQueue) remove(i Invite) {
+	defer q.mu.Unlock()
+	q.mu.Lock()
+
+	el, ok := q.elems[i.key]
+	if !ok {
+		return
+	}
+	delete(q.elems, i.key)
+	el.next.prev, el.prev.next = el.prev, el.next
+	for id, c := range q.cursors {
+		if c == el {
+			q.cursors[id] = c.prev
+		}
+	}
+}
+
+func (q *inviteQueue) add(i Invite) {
+	defer q.mu.Unlock()
+	q.mu.Lock()
+
+	if _, ok := q.elems[i.key]; ok {
+		return
+	}
+	el := &ielement{i, q.sentinel.prev, &q.sentinel}
+	q.sentinel.prev, q.sentinel.prev.next = el, el
+	q.elems[i.key] = el
+	q.cond.Broadcast()
+}
+
+func (q *inviteQueue) next(ctx *context.T, cursor int) (Invite, bool) {
+	defer q.mu.Unlock()
+	q.mu.Lock()
+	c, exists := q.cursors[cursor]
+	if !exists {
+		return Invite{}, false
+	}
+	for c.next == &q.sentinel {
+		if q.cond.WaitWithDeadline(&q.mu, nsync.NoDeadline, ctx.Done()) != nsync.OK {
+			delete(q.cursors, cursor)
+			return Invite{}, false
+		}
+	}
+	c = c.next
+	q.cursors[cursor] = c
+	return c.invite, true
+}
+
+func (q *inviteQueue) scan() int {
+	defer q.mu.Unlock()
+	q.mu.Lock()
+	id := q.nextCursorId
+	q.nextCursorId++
+	q.cursors[id] = &q.sentinel
+	return id
+}
+
+func makeInvite(ad discovery.Advertisement) (Invite, wire.Id, bool) {
+	var dbId, sgId wire.Id
+	var ok bool
+	if dbId.Name, ok = ad.Attributes[wire.DiscoveryAttrDatabaseName]; !ok {
+		return Invite{}, dbId, false
+	}
+	if dbId.Blessing, ok = ad.Attributes[wire.DiscoveryAttrDatabaseBlessing]; !ok {
+		return Invite{}, dbId, false
+	}
+	if sgId.Name, ok = ad.Attributes[wire.DiscoveryAttrSyncgroupName]; !ok {
+		return Invite{}, dbId, false
+	}
+	if sgId.Blessing, ok = ad.Attributes[wire.DiscoveryAttrSyncgroupBlessing]; !ok {
+		return Invite{}, dbId, false
+	}
+	i := Invite{
+		Addresses: append([]string{}, ad.Addresses...),
+		Syncgroup: sgId,
+	}
+	sort.Strings(i.Addresses)
+	i.key = fmt.Sprintf("%v", i)
+	return i, dbId, true
+}
+
+func (i Invite) copy() Invite {
+	cp := i
+	cp.Addresses = append([]string{}, i.Addresses...)
+	return cp
+}
diff --git a/services/syncbase/discovery/discovery_internal_test.go b/services/syncbase/discovery/discovery_internal_test.go
new file mode 100644
index 0000000..834267d
--- /dev/null
+++ b/services/syncbase/discovery/discovery_internal_test.go
@@ -0,0 +1,159 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package syncbase
+
+import (
+	"bytes"
+	"fmt"
+	"os"
+	"reflect"
+	"testing"
+
+	"v.io/v23/context"
+	"v.io/v23/logging"
+	"v.io/v23/security"
+	wire "v.io/v23/services/syncbase"
+)
+
+func TestJoinSplitPatterns(t *testing.T) {
+	cases := []struct {
+		patterns []security.BlessingPattern
+		joined   string
+	}{
+		{nil, ""},
+		{[]security.BlessingPattern{"a", "b"}, "a,b"},
+		{[]security.BlessingPattern{"a:b:c", "d:e:f"}, "a:b:c,d:e:f"},
+		{[]security.BlessingPattern{"alpha:one", "alpha:two", "alpha:three"}, "alpha:one,alpha:two,alpha:three"},
+	}
+	for _, c := range cases {
+		if got := joinPatterns(c.patterns); got != c.joined {
+			t.Errorf("%#v, got %q, wanted %q", c.patterns, got, c.joined)
+		}
+		if got := splitPatterns(c.joined); !reflect.DeepEqual(got, c.patterns) {
+			t.Errorf("%q, got %#v, wanted %#v", c.joined, got, c.patterns)
+		}
+	}
+	// Special case, Joining an empty non-nil list results in empty string.
+	if got := joinPatterns([]security.BlessingPattern{}); got != "" {
+		t.Errorf("Joining empty list: got %q, want %q", got, "")
+	}
+}
+
+type logger struct {
+	logging.Logger
+}
+
+func (l logger) InfoDepth(depth int, args ...interface{}) {
+	fmt.Fprintln(os.Stdout, args...)
+}
+
+func TestInviteQueue(t *testing.T) {
+	ctx, cancel := context.RootContext()
+	defer cancel()
+
+	ctx = context.WithLogger(ctx, logger{})
+
+	q := newInviteQueue()
+	if q.size() != 0 {
+		t.Errorf("got %d, want 0", q.size())
+	}
+
+	want := []string{"0", "1", "2", "3", "4"}
+
+	// Test inserts during a long scan.
+	ch := make(chan struct{})
+	go func() {
+		scan(ctx, q, want)
+		close(ch)
+	}()
+
+	// Add a bunch of entries.
+	for i, w := range want {
+		q.add(Invite{Syncgroup: wire.Id{Name: w}, key: w})
+		if q.size() != i+1 {
+			t.Errorf("got %d, want %d", q.size(), i+1)
+		}
+		if err := scan(ctx, q, want[:i+1]); err != nil {
+			t.Error(err)
+		}
+	}
+
+	// Make sure long term scan finished.
+	<-ch
+
+	// Start another long scan that will proceed across deletes
+	steps := make(chan struct{})
+	go func() {
+		ctx, cancel := context.WithCancel(ctx)
+		c := q.scan()
+
+		if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[0] {
+			t.Error("next should have suceeded.")
+		}
+		if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[1] {
+			t.Error("next should have suceeded.")
+		}
+
+		steps <- struct{}{}
+		<-steps
+
+		if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[3] {
+			t.Error("next should have suceeded.")
+		}
+		if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[4] {
+			t.Error("next should have suceeded.")
+		}
+		cancel()
+
+		steps <- struct{}{}
+	}()
+
+	// Wait for the scan to read the first two values.
+	<-steps
+
+	// Remove a bunch of entries.
+	for i, w := range want {
+		q.remove(Invite{Syncgroup: wire.Id{Name: w}, key: w})
+		if i == 2 {
+			// Tell the scan to read the next two values
+			steps <- struct{}{}
+			// Wait for it to finish.
+			<-steps
+		}
+		if q.size() != len(want)-i-1 {
+			t.Errorf("got %d, want %d", q.size(), len(want)-i-1)
+		}
+		if err := scan(ctx, q, want[i+1:]); err != nil {
+			t.Error(err)
+		}
+	}
+}
+
+func logList(ctx *context.T, q *inviteQueue) {
+	buf := &bytes.Buffer{}
+	for e := q.sentinel.next; e != &q.sentinel; e = e.next {
+		fmt.Fprintf(buf, "%p ", e)
+	}
+	ctx.Info("list", buf.String())
+}
+
+func scan(ctx *context.T, q *inviteQueue, want []string) error {
+	ctx, cancel := context.WithCancel(ctx)
+	c := q.scan()
+	for i, w := range want {
+		inv, ok := q.next(ctx, c)
+		if !ok {
+			return fmt.Errorf("scan ended after %d entries, wanted %d", i, len(want))
+		}
+		if got := inv.Syncgroup.Name; got != w {
+			return fmt.Errorf("got %s, want %s", got, w)
+		}
+		if i == len(want)-1 {
+			// Calling cancel should allow next to fail, exiting the loop.
+			cancel()
+		}
+	}
+	return nil
+}
diff --git a/services/syncbase/discovery/discovery_test.go b/services/syncbase/discovery/discovery_test.go
new file mode 100644
index 0000000..5928c6a
--- /dev/null
+++ b/services/syncbase/discovery/discovery_test.go
@@ -0,0 +1,255 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package syncbase_test
+
+import (
+	"fmt"
+	"reflect"
+	"testing"
+	"time"
+
+	"v.io/v23"
+	"v.io/v23/context"
+	"v.io/v23/discovery"
+	"v.io/v23/security"
+	"v.io/v23/security/access"
+	wire "v.io/v23/services/syncbase"
+	"v.io/v23/syncbase"
+	"v.io/v23/verror"
+	_ "v.io/x/ref/runtime/factories/roaming"
+	syncdis "v.io/x/ref/services/syncbase/discovery"
+	tu "v.io/x/ref/services/syncbase/testutil"
+	"v.io/x/ref/test/testutil"
+)
+
+func TestSyncgroupDiscovery(t *testing.T) {
+	_, ctx, sName, rootp, cleanup := tu.SetupOrDieCustom("o:app1:client1", "server",
+		tu.DefaultPerms(access.AllTypicalTags(), "root:o:app1:client1"))
+	defer cleanup()
+	d := tu.CreateDatabase(t, ctx, syncbase.NewService(sName), "d")
+	collection1 := tu.CreateCollection(t, ctx, d, "c1")
+	collection2 := tu.CreateCollection(t, ctx, d, "c2")
+
+	c1Updates, err := scanAs(ctx, rootp, "o:app1:client1")
+	if err != nil {
+		panic(err)
+	}
+	c2Updates, err := scanAs(ctx, rootp, "o:app1:client2")
+	if err != nil {
+		panic(err)
+	}
+
+	sgId := d.Syncgroup(ctx, "sg1").Id()
+	spec := wire.SyncgroupSpec{
+		Description: "test syncgroup sg1",
+		Perms:       tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1"),
+		Collections: []wire.Id{collection1.Id()},
+	}
+	createSyncgroup(t, ctx, d, sgId, spec, verror.ID(""))
+
+	// First update is for syncbase, and not a specific syncgroup.
+	u := <-c1Updates
+	attrs := u.Advertisement().Attributes
+	peer := attrs[wire.DiscoveryAttrPeer]
+	if peer == "" || len(attrs) != 1 {
+		t.Errorf("Got %v, expected only a peer name.", attrs)
+	}
+	// Client2 should see the same.
+	if err := expect(c2Updates, &discovery.AdId{}, find, discovery.Attributes{wire.DiscoveryAttrPeer: peer}); err != nil {
+		t.Error(err)
+	}
+
+	sg1Attrs := discovery.Attributes{
+		wire.DiscoveryAttrDatabaseName:      "d",
+		wire.DiscoveryAttrDatabaseBlessing:  "root:o:app1",
+		wire.DiscoveryAttrSyncgroupName:     "sg1",
+		wire.DiscoveryAttrSyncgroupBlessing: "root:o:app1:client1",
+	}
+	sg2Attrs := discovery.Attributes{
+		wire.DiscoveryAttrDatabaseName:      "d",
+		wire.DiscoveryAttrDatabaseBlessing:  "root:o:app1",
+		wire.DiscoveryAttrSyncgroupName:     "sg2",
+		wire.DiscoveryAttrSyncgroupBlessing: "root:o:app1:client1",
+	}
+
+	// Then we should see an update for the created syncgroup.
+	var sg1AdId discovery.AdId
+	if err := expect(c1Updates, &sg1AdId, find, sg1Attrs); err != nil {
+		t.Error(err)
+	}
+
+	// Now update the spec to add client2 to the permissions.
+	spec.Perms = tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1", "root:o:app1:client2")
+	if err := d.SyncgroupForId(sgId).SetSpec(ctx, spec, ""); err != nil {
+		t.Fatalf("sg.SetSpec failed: %v", err)
+	}
+
+	// Client1 should see a lost and a found message.
+	if err := expect(c1Updates, &sg1AdId, both, sg1Attrs); err != nil {
+		t.Error(err)
+	}
+	// Client2 should just now see the found message.
+	if err := expect(c2Updates, &sg1AdId, find, sg1Attrs); err != nil {
+		t.Error(err)
+	}
+
+	// Now create a second syncgroup.
+	sg2Id := d.Syncgroup(ctx, "sg2").Id()
+	spec2 := wire.SyncgroupSpec{
+		Description: "test syncgroup sg2",
+		Perms:       tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1", "root:o:app1:client2"),
+		Collections: []wire.Id{collection2.Id()},
+	}
+	createSyncgroup(t, ctx, d, sg2Id, spec2, verror.ID(""))
+
+	// Both clients should see the new syncgroup.
+	var sg2AdId discovery.AdId
+	if err := expect(c1Updates, &sg2AdId, find, sg2Attrs); err != nil {
+		t.Error(err)
+	}
+	if err := expect(c2Updates, &sg2AdId, find, sg2Attrs); err != nil {
+		t.Error(err)
+	}
+
+	spec2.Perms = tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1")
+	if err := d.SyncgroupForId(sg2Id).SetSpec(ctx, spec2, ""); err != nil {
+		t.Fatalf("sg.SetSpec failed: %v", err)
+	}
+	if err := expect(c2Updates, &sg2AdId, lose, sg2Attrs); err != nil {
+		t.Error(err)
+	}
+}
+
+func scanAs(ctx *context.T, rootp security.Principal, as string) (<-chan discovery.Update, error) {
+	idp := testutil.IDProviderFromPrincipal(rootp)
+	p := testutil.NewPrincipal()
+	if err := idp.Bless(p, as); err != nil {
+		return nil, err
+	}
+	ctx, err := v23.WithPrincipal(ctx, p)
+	if err != nil {
+		return nil, err
+	}
+	dis, err := syncdis.NewDiscovery(ctx)
+	if err != nil {
+		return nil, err
+	}
+	return dis.Scan(ctx, `v.InterfaceName="v.io/x/ref/services/syncbase/server/interfaces/Sync"`)
+}
+
+const (
+	lose = "lose"
+	find = "find"
+	both = "both"
+)
+
+func expect(ch <-chan discovery.Update, id *discovery.AdId, typ string, want discovery.Attributes) error {
+	select {
+	case u := <-ch:
+		if (u.IsLost() && typ == find) || (!u.IsLost() && typ == lose) {
+			return fmt.Errorf("IsLost mismatch.  Got %v, wanted %v", u, typ)
+		}
+		ad := u.Advertisement()
+		got := ad.Attributes
+		if id.IsValid() {
+			if *id != ad.Id {
+				return fmt.Errorf("mismatched id, got %v, want %v", ad.Id, id)
+			}
+		} else {
+			*id = ad.Id
+		}
+		if !reflect.DeepEqual(got, want) {
+			return fmt.Errorf("got %v, want %v", got, want)
+		}
+		if typ == both {
+			typ = lose
+			if u.IsLost() {
+				typ = find
+			}
+			return expect(ch, id, typ, want)
+		}
+		return nil
+	case <-time.After(2 * time.Second):
+		return fmt.Errorf("timed out")
+	}
+}
+
+func TestListenForInvites(t *testing.T) {
+	_, ctx, sName, rootp, cleanup := tu.SetupOrDieCustom(
+		"o:app1:client1",
+		"server",
+		tu.DefaultPerms(access.AllTypicalTags(), "root:o:app1:client1"))
+	defer cleanup()
+	service := syncbase.NewService(sName)
+	d1 := tu.CreateDatabase(t, ctx, service, "d1")
+	d2 := tu.CreateDatabase(t, ctx, service, "d2")
+
+	collections := []wire.Id{}
+	for _, d := range []syncbase.Database{d1, d2} {
+		for _, n := range []string{"c1", "c2"} {
+			c := tu.CreateCollection(t, ctx, d, d.Id().Name+n)
+			collections = append(collections, c.Id())
+		}
+	}
+
+	d1invites, err := listenAs(ctx, rootp, "o:app1:client1", d1.Id())
+	if err != nil {
+		panic(err)
+	}
+	d2invites, err := listenAs(ctx, rootp, "o:app1:client1", d2.Id())
+	if err != nil {
+		panic(err)
+	}
+
+	if err := advertiseAndFindSyncgroup(t, ctx, d1, collections[0], d1invites); err != nil {
+		t.Error(err)
+	}
+	if err := advertiseAndFindSyncgroup(t, ctx, d2, collections[2], d2invites); err != nil {
+		t.Error(err)
+	}
+	if err := advertiseAndFindSyncgroup(t, ctx, d1, collections[1], d1invites); err != nil {
+		t.Error(err)
+	}
+	if err := advertiseAndFindSyncgroup(t, ctx, d2, collections[3], d2invites); err != nil {
+		t.Error(err)
+	}
+}
+
+func listenAs(ctx *context.T, rootp security.Principal, as string, db wire.Id) (<-chan syncdis.Invite, error) {
+	ch := make(chan syncdis.Invite)
+	return ch, syncdis.ListenForInvites(tu.NewCtx(ctx, rootp, as), db, ch)
+}
+
+func advertiseAndFindSyncgroup(
+	t *testing.T,
+	ctx *context.T,
+	d syncbase.Database,
+	collection wire.Id,
+	invites <-chan syncdis.Invite) error {
+	sgId := wire.Id{Name: collection.Name + "sg", Blessing: collection.Blessing}
+	ctx.Infof("creating %v", sgId)
+	spec := wire.SyncgroupSpec{
+		Description: "test syncgroup",
+		Perms:       tu.DefaultPerms(wire.AllSyncgroupTags, "root:server", "root:o:app1:client1"),
+		Collections: []wire.Id{collection},
+	}
+	createSyncgroup(t, ctx, d, sgId, spec, verror.ID(""))
+
+	// We should see an invite for sg1.
+	inv := <-invites
+	if inv.Syncgroup != sgId {
+		return fmt.Errorf("got %v, want %v", inv.Syncgroup, sgId)
+	}
+	return nil
+}
+
+func createSyncgroup(t *testing.T, ctx *context.T, d syncbase.Database, sgId wire.Id, spec wire.SyncgroupSpec, errID verror.ID) syncbase.Syncgroup {
+	sg := d.SyncgroupForId(sgId)
+	info := wire.SyncgroupMemberInfo{SyncPriority: 8}
+	if err := sg.Create(ctx, spec, info); verror.ErrorID(err) != errID {
+		tu.Fatalf(t, "Create SG %+v failed: %v", sgId, err)
+	}
+	return sg
+}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 66eac7b..d9ff29f 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -26,11 +26,11 @@
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
 	wire "v.io/v23/services/syncbase"
-	"v.io/v23/syncbase"
 	"v.io/v23/verror"
 	"v.io/x/lib/vlog"
 	idiscovery "v.io/x/ref/lib/discovery"
 	"v.io/x/ref/services/syncbase/common"
+	syncdis "v.io/x/ref/services/syncbase/discovery"
 	blob "v.io/x/ref/services/syncbase/localblobstore"
 	fsblob "v.io/x/ref/services/syncbase/localblobstore/fs_cablobstore"
 	"v.io/x/ref/services/syncbase/server/interfaces"
@@ -159,7 +159,7 @@
 // that the syncer can pick from. In addition, the sync module responds to
 // incoming RPCs from remote sync modules and local clients.
 func New(ctx *context.T, sv interfaces.Service, blobStEngine, blobRootDir string, cl *vclock.VClock, publishInNh bool) (*syncService, error) {
-	discovery, err := syncbase.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{}))
+	discovery, err := syncdis.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{}))
 	if err != nil {
 		return nil, err
 	}