x/ref: Implement ListenForPeers

This CL does the following things.
1. modify the underlying queue implementation
- It now handles both Peer and Invite through the copyable interface.
- copyable introduces the concept of a "lost" invite or peer.
- When the copyable is removed from the queue, a "lost" version is
  inserted in its stead.
- "lost" elements are only visible to cursors that have seen the
  corresponding found elements.
- "lost" elements remain within the queue, and thus affect its "size".
- Garbage collection of the "lost" elements occurs once everybody
  qualified to see the element has seen it via "next" OR has canceled
  and unregistered themselves from the queue.
- Tests adjusted to take into account "lost" updates.
  New tests added to take into account "lost" update garbage collection.
2. ListenForPeers is updated.
- Tests remain the same; it ignores the "lost" Invites.
3. ListenForInvites was added.
- Tests added by mocking out discovery.

Change-Id: I2ba36a59621a7e13d16dc722610c34d2f2a835b8
diff --git a/services/syncbase/discovery/discovery.go b/services/syncbase/discovery/discovery.go
index fa0d09c..9e2abc8 100644
--- a/services/syncbase/discovery/discovery.go
+++ b/services/syncbase/discovery/discovery.go
@@ -354,7 +354,8 @@
 }
 
 type scanState struct {
-	dbs       map[wire.Id]*inviteQueue
+	peers     *copyableQueue
+	dbs       map[wire.Id]*copyableQueue
 	listeners int
 	cancel    context.CancelFunc
 }
@@ -362,7 +363,8 @@
 func newScan(ctx *context.T) (*scanState, error) {
 	ctx, cancel := context.WithRootCancel(ctx)
 	scan := &scanState{
-		dbs:    make(map[wire.Id]*inviteQueue),
+		peers:  nil,
+		dbs:    make(map[wire.Id]*copyableQueue),
 		cancel: cancel,
 	}
 	// TODO(suharshs): Add globalDiscoveryPath.
@@ -386,7 +388,7 @@
 				if u.IsLost() {
 					// TODO(mattr): Removing like this can result in resurfacing already
 					// retrieved invites.  For example if the ACL of a syncgroup is
-					// changed to add a new memeber, then we might see a remove and then
+					// changed to add a new member, then we might see a remove and then
 					// later an add.  In this case we would end up moving the invite
 					// to the end of the queue.  One way to fix this would be to
 					// keep removed invites for some time.
@@ -398,26 +400,38 @@
 					}
 				} else {
 					if q == nil {
-						q = newInviteQueue()
+						q = newCopyableQueue()
 						scan.dbs[db] = q
 					}
 					q.add(invite)
 					q.cond.Broadcast()
 				}
 				state.mu.Unlock()
+			} else if peer, ok := makePeer(u.Advertisement()); ok {
+				state.mu.Lock()
+				q := scan.peers
+				if u.IsLost() {
+					if q != nil {
+						q.remove(peer)
+						if q.empty() {
+							scan.peers = nil
+						}
+					}
+				} else {
+					if q == nil {
+						q = newCopyableQueue()
+						scan.peers = q
+					}
+					q.add(peer)
+					q.cond.Broadcast()
+				}
+				state.mu.Unlock()
 			}
 		}
 	}()
 	return scan, nil
 }
 
-// Invite represents an invitation to join a syncgroup as found via Discovery.
-type Invite struct {
-	Syncgroup wire.Id  //Syncgroup is the Id of the syncgroup you've been invited to.
-	Addresses []string //Addresses are the list of addresses of the inviting server.
-	key       string
-}
-
 // ListenForInvites listens via Discovery for syncgroup invitations for the given
 // database and sends the invites to the provided channel.  We stop listening when
 // the given context is canceled.  When that happens we close the given channel.
@@ -425,12 +439,87 @@
 	defer state.mu.Unlock()
 	state.mu.Lock()
 
+	scan, err := prepareScannerByPrincipal(ctx)
+	if err != nil {
+		return err
+	}
+
+	q := scan.dbs[db]
+	if q == nil {
+		q = newCopyableQueue()
+		scan.dbs[db] = q
+	}
+
+	// Send the copyables into a temporary channel.
+	cp_ch := make(chan copyable)
+	go consumeCopyableQueue(ctx, q, scan, cp_ch, func() {
+		delete(scan.dbs, db)
+	})
+
+	// Convert these copyables into Invites.
+	go func() {
+		for {
+			v, ok := <-cp_ch
+			if !ok {
+				close(ch)
+				break
+			}
+			invite := v.(Invite)
+			if !invite.isLost() {
+				ch <- invite
+			}
+		}
+	}()
+
+	return nil
+}
+
+// ListenForPeers listens via Discovery for syncgroup peers and sends them to
+// the provided channel.  We stop listening when the context is canceled.  When
+// that happens we close the given channel.
+func ListenForPeers(ctx *context.T, ch chan<- Peer) error {
+	defer state.mu.Unlock()
+	state.mu.Lock()
+
+	scan, err := prepareScannerByPrincipal(ctx)
+	if err != nil {
+		return err
+	}
+
+	q := scan.peers
+	if q == nil {
+		q = newCopyableQueue()
+		scan.peers = q
+	}
+
+	// Send the copyables into a temporary channel.
+	cp_ch := make(chan copyable)
+	go consumeCopyableQueue(ctx, q, scan, cp_ch, func() {
+		scan.peers = nil
+	})
+
+	// Convert these copyables into Peers.
+	go func() {
+		for {
+			v, ok := <-cp_ch
+			if !ok {
+				close(ch)
+				break
+			}
+			ch <- v.(Peer)
+		}
+	}()
+
+	return nil
+}
+
+func prepareScannerByPrincipal(ctx *context.T) (*scanState, error) {
 	p := v23.GetPrincipal(ctx)
 	scan := state.scans[p]
 	if scan == nil {
 		var err error
 		if scan, err = newScan(ctx); err != nil {
-			return err
+			return nil, err
 		}
 		if state.scans == nil {
 			state.scans = make(map[security.Principal]*scanState)
@@ -438,50 +527,50 @@
 		state.scans[p] = scan
 	}
 	scan.listeners++
-
-	q := scan.dbs[db]
-	if q == nil {
-		q = newInviteQueue()
-		scan.dbs[db] = q
-	}
-
-	go func() {
-		c := q.scan()
-		for {
-			next, ok := q.next(ctx, c)
-			if !ok {
-				break
-			}
-			select {
-			case ch <- next.copy():
-			case <-ctx.Done():
-				break
-			}
-		}
-		close(ch)
-
-		defer state.mu.Unlock()
-		state.mu.Lock()
-
-		if q.empty() {
-			delete(scan.dbs, db)
-		}
-		scan.listeners--
-		if scan.listeners == 0 {
-			scan.cancel()
-			delete(state.scans, v23.GetPrincipal(ctx))
-		}
-	}()
-
-	return nil
+	return scan, nil
 }
 
-// inviteQueue is a linked list based queue.  As we get new invitations we
-// add them to the queue, and when advertisements are lost we remove elements.
+type isQueueEmptyCallback func()
+
+// Should be called in a goroutine. Pairs with prepareScannerByPrincipal.
+func consumeCopyableQueue(ctx *context.T, q *copyableQueue, scan *scanState, ch chan<- copyable, cb isQueueEmptyCallback) {
+	c := q.scan()
+	for {
+		next, ok := q.next(ctx, c)
+		if !ok {
+			break
+		}
+		select {
+		case ch <- next.copy():
+		case <-ctx.Done():
+			q.stopScan(c)
+			break
+		}
+	}
+	close(ch)
+
+	defer state.mu.Unlock()
+	state.mu.Lock()
+
+	if q.empty() {
+		cb()
+	}
+	scan.listeners--
+	if scan.listeners == 0 {
+		scan.cancel()
+		delete(state.scans, v23.GetPrincipal(ctx))
+	}
+}
+
+// copyableQueue is a linked list based queue. As we get new invitations/peers,
+// we add them to the queue, and when advertisements are lost we remove instead.
 // Each call to scan creates a cursor on the queue that will iterate until
 // the Listen is canceled.  We wait when we hit the end of the queue via the
 // condition variable cond.
-type inviteQueue struct {
+// Note: The queue contains both "found" and "lost" elements. A removed "found"
+// element is replaced with a "lost" element, which all cursors will see.
+// See ielement for the garbage collection strategy for the "lost" elements.
+type copyableQueue struct {
 	mu   nsync.Mu
 	cond nsync.CV
 
@@ -491,7 +580,7 @@
 	nextCursorId int
 }
 
-func (q *inviteQueue) debugLocked() string {
+func (q *copyableQueue) debugLocked() string {
 	buf := &bytes.Buffer{}
 	fmt.Fprintf(buf, "*%p", &q.sentinel)
 	for c := q.sentinel.next; c != &q.sentinel; c = c.next {
@@ -500,13 +589,55 @@
 	return buf.String()
 }
 
-type ielement struct {
-	invite     Invite
-	prev, next *ielement
+// An interface used to represent Invite and Peer that reduces code duplication.
+// Typecasting is cheap, so this seems better than maintaining duplicate
+// implementations of copyableQueue.
+type copyable interface {
+	copy() copyable
+	id() string
+	isLost() bool
+	copyAsLost() copyable
 }
 
-func newInviteQueue() *inviteQueue {
-	iq := &inviteQueue{
+// Keeps track of a copyable element and the next/prev ielements in a queue.
+// ielements can represent both "found" and "lost" elements. Those of the "lost"
+// type also set "numCanSee" and "whoCanSee" to indicate that only certain
+// cursors can see the element contained in this ielement. Once all the cursors
+// have seen a "lost" ielement it can be garbage collected safely.
+// See "canBeSeenByCursor" and "consume".
+type ielement struct {
+	elem       copyable
+	prev, next *ielement
+	numCanSee  int   // If >0, this decrements. The element is removed upon hitting 0.
+	whoCanSee  []int // The cursors that can see this ielement. Set only once.
+}
+
+// A cursor can see the element in this ielement if it is a "found" element or
+// a "lost" element with the correct visibility.
+func (i *ielement) canBeSeenByCursor(cursor int) bool {
+	if i.numCanSee == 0 { // Special: 0 is visible by everyone.
+		return true
+	}
+	for _, who := range i.whoCanSee {
+		if cursor == who {
+			return true
+		}
+	}
+	return false
+}
+
+// Reduce "numCanSee" and indicate whether or not this element needs to be removed
+// from the queue.
+func (i *ielement) consume() bool {
+	if i.numCanSee > 0 {
+		i.numCanSee--
+		return i.numCanSee == 0
+	}
+	return false
+}
+
+func newCopyableQueue() *copyableQueue {
+	iq := &copyableQueue{
 		elems:   make(map[string]*ielement),
 		cursors: make(map[int]*ielement),
 	}
@@ -514,68 +645,146 @@
 	return iq
 }
 
-func (q *inviteQueue) empty() bool {
+func (q *copyableQueue) empty() bool {
 	defer q.mu.Unlock()
 	q.mu.Lock()
 	return len(q.elems) == 0 && len(q.cursors) == 0
 }
 
-func (q *inviteQueue) size() int {
+func (q *copyableQueue) size() int {
 	defer q.mu.Unlock()
 	q.mu.Lock()
 	return len(q.elems)
 }
 
-func (q *inviteQueue) remove(i Invite) {
+// Removes the given copyable from the queue. No-op if not present or "lost".
+// Note: As described in copyableQueue, if the copyable is a "found" element,
+// then a corresponding "lost" element will be appended to the queue. Only the
+// cursors who have seen the "found" element can see the "lost" one.
+func (q *copyableQueue) remove(i copyable) {
 	defer q.mu.Unlock()
 	q.mu.Lock()
 
-	el, ok := q.elems[i.key]
-	if !ok {
+	el, ok := q.elems[i.id()]
+	if !ok || el.elem.isLost() {
 		return
 	}
-	delete(q.elems, i.key)
+
+	// Make a list of all the users who have seen this element.
+	whoCanSee := []int{}
+	for cursor, cel := range q.cursors {
+		// Adjust the cursor if it is pointing to the removed element.
+		if cel == el {
+			q.cursors[cursor] = cel.prev
+			whoCanSee = append(whoCanSee, cursor)
+			continue
+		}
+
+		// Check if this cursor has gone past the removed element.
+		// Note: This implementation assumes a small queue and a small number of
+		// cursors. Otherwise, using a table is recommended over list traversal.
+		for ; cel != &q.sentinel; cel = cel.prev {
+			if cel == el {
+				whoCanSee = append(whoCanSee, cursor)
+				break
+			}
+		}
+	}
+
+	// "Remove" el from the queue.
 	el.next.prev, el.prev.next = el.prev, el.next
+
+	// el is a visible lost item that needs to be added to the queue.
+	if len(whoCanSee) > 0 {
+		// Adjust the lost ielement el to include the lost data and who can see it.
+		el.elem = el.elem.copyAsLost()
+		el.numCanSee = len(whoCanSee)
+		el.whoCanSee = whoCanSee
+
+		// Insert the lost element before/after the sentinel and broadcast.
+		el.prev, el.next = q.sentinel.prev, &q.sentinel
+		q.sentinel.prev, q.sentinel.prev.next = el, el
+		q.cond.Broadcast()
+	} else {
+		delete(q.elems, i.id())
+	}
+}
+
+func (q *copyableQueue) add(i copyable) {
+	defer q.mu.Unlock()
+	q.mu.Lock()
+
+	if _, ok := q.elems[i.id()]; ok {
+		return
+	}
+	el := &ielement{i, q.sentinel.prev, &q.sentinel, 0, []int{}}
+	q.sentinel.prev, q.sentinel.prev.next = el, el
+	q.elems[i.id()] = el
+	q.cond.Broadcast()
+}
+
+func (q *copyableQueue) next(ctx *context.T, cursor int) (copyable, bool) {
+	defer q.mu.Unlock()
+	q.mu.Lock()
+	c, exists := q.cursors[cursor]
+	if !exists {
+		return nil, false
+	}
+
+	// Find the next available element that this cursor can see. Will block until
+	// somebody writes to the queue or the given context is canceled.
+	// Note: Some "lost" elements in the queue will not be visible to this cursor;
+	// these are skipped.
+	for {
+		for c.next == &q.sentinel {
+			if q.cond.WaitWithDeadline(&q.mu, nsync.NoDeadline, ctx.Done()) != nsync.OK {
+				q.removeCursorLocked(cursor)
+				return nil, false
+			}
+			c = q.cursors[cursor]
+		}
+		c = c.next
+		q.cursors[cursor] = c
+		if c.canBeSeenByCursor(cursor) {
+			if c.consume() {
+				q.removeLostIElemLocked(c)
+
+			}
+			return c.elem, true
+		}
+	}
+}
+
+func (q *copyableQueue) removeCursorLocked(cursor int) {
+	c := q.cursors[cursor]
+	delete(q.cursors, cursor)
+
+	// Garbage collection: Step through each remaining element and remove
+	// unneeded lost ielements.
+	for c.next != &q.sentinel {
+		c = c.next // step
+		if c.canBeSeenByCursor(cursor) {
+			if c.consume() {
+				q.removeLostIElemLocked(c)
+			}
+		}
+	}
+}
+
+func (q *copyableQueue) removeLostIElemLocked(ielem *ielement) {
+	// Remove the lost ielem since everyone has seen it.
+	delete(q.elems, ielem.elem.id())
+	ielem.next.prev, ielem.prev.next = ielem.prev, ielem.next
+
+	// Adjust the cursors if their elements were on the removed element.
 	for id, c := range q.cursors {
-		if c == el {
+		if c == ielem {
 			q.cursors[id] = c.prev
 		}
 	}
 }
 
-func (q *inviteQueue) add(i Invite) {
-	defer q.mu.Unlock()
-	q.mu.Lock()
-
-	if _, ok := q.elems[i.key]; ok {
-		return
-	}
-	el := &ielement{i, q.sentinel.prev, &q.sentinel}
-	q.sentinel.prev, q.sentinel.prev.next = el, el
-	q.elems[i.key] = el
-	q.cond.Broadcast()
-}
-
-func (q *inviteQueue) next(ctx *context.T, cursor int) (Invite, bool) {
-	defer q.mu.Unlock()
-	q.mu.Lock()
-	c, exists := q.cursors[cursor]
-	if !exists {
-		return Invite{}, false
-	}
-	for c.next == &q.sentinel {
-		if q.cond.WaitWithDeadline(&q.mu, nsync.NoDeadline, ctx.Done()) != nsync.OK {
-			delete(q.cursors, cursor)
-			return Invite{}, false
-		}
-		c = q.cursors[cursor]
-	}
-	c = c.next
-	q.cursors[cursor] = c
-	return c.invite, true
-}
-
-func (q *inviteQueue) scan() int {
+func (q *copyableQueue) scan() int {
 	defer q.mu.Unlock()
 	q.mu.Lock()
 	id := q.nextCursorId
@@ -584,6 +793,23 @@
 	return id
 }
 
+func (q *copyableQueue) stopScan(cursor int) {
+	defer q.mu.Unlock()
+	q.mu.Lock()
+
+	if _, exists := q.cursors[cursor]; exists {
+		q.removeCursorLocked(cursor)
+	}
+}
+
+// Invite represents an invitation to join a syncgroup as found via Discovery.
+type Invite struct {
+	Syncgroup wire.Id  // Syncgroup is the Id of the syncgroup you've been invited to.
+	Addresses []string // Addresses are the list of addresses of the inviting server.
+	Lost      bool     // If this invite is a lost invite or not.
+	key       string   // Unexported. The implementation uses this key to de-dupe ads.
+}
+
 func makeInvite(ad discovery.Advertisement) (Invite, wire.Id, bool) {
 	var dbId, sgId wire.Id
 	var ok bool
@@ -608,8 +834,68 @@
 	return i, dbId, true
 }
 
-func (i Invite) copy() Invite {
+func (i Invite) copy() copyable {
 	cp := i
 	cp.Addresses = append([]string(nil), i.Addresses...)
 	return cp
 }
+
+func (i Invite) id() string {
+	return i.key
+}
+
+func (i Invite) isLost() bool {
+	return i.Lost
+}
+
+func (i Invite) copyAsLost() copyable {
+	cp := i
+	cp.Addresses = append([]string(nil), i.Addresses...)
+	cp.Lost = true
+	return cp
+}
+
+// Peer represents a Syncbase peer found via Discovery.
+type Peer struct {
+	Name      string   // Name is the name of the Syncbase peer's sync service.
+	Addresses []string // Addresses are the list of addresses of the peer's server.
+	Lost      bool     // If this peer is a lost peer or not.
+	key       string   // Unexported. The implementation uses this key to de-dupe ads.
+}
+
+// Attempts to make a Peer using its discovery attributes.
+// Will return an empty Peer struct if it fails.
+func makePeer(ad discovery.Advertisement) (Peer, bool) {
+	peerName, ok := ad.Attributes[wire.DiscoveryAttrPeer]
+	if !ok {
+		return Peer{}, false
+	}
+	p := Peer{
+		Name:      peerName,
+		Addresses: append([]string{}, ad.Addresses...),
+	}
+	sort.Strings(p.Addresses)
+	p.key = fmt.Sprintf("%v", p)
+	return p, true
+}
+
+func (p Peer) copy() copyable {
+	cp := p
+	cp.Addresses = append([]string(nil), p.Addresses...)
+	return cp
+}
+
+func (p Peer) id() string {
+	return p.key
+}
+
+func (p Peer) isLost() bool {
+	return p.Lost
+}
+
+func (p Peer) copyAsLost() copyable {
+	cp := p
+	cp.Addresses = append([]string(nil), p.Addresses...)
+	cp.Lost = true
+	return cp
+}
diff --git a/services/syncbase/discovery/discovery_internal_test.go b/services/syncbase/discovery/discovery_internal_test.go
index 1ed80e1..513d190 100644
--- a/services/syncbase/discovery/discovery_internal_test.go
+++ b/services/syncbase/discovery/discovery_internal_test.go
@@ -55,7 +55,7 @@
 
 	ctx = context.WithLogger(ctx, logger{})
 
-	q := newInviteQueue()
+	q := newCopyableQueue()
 	if q.size() != 0 {
 		t.Errorf("got %d, want 0", q.size())
 	}
@@ -65,7 +65,7 @@
 	// Test inserts during a long scan.
 	ch := make(chan struct{})
 	go func() {
-		scan(ctx, q, want)
+		scanForFoundOnly(ctx, q, want)
 		close(ch)
 	}()
 
@@ -75,7 +75,7 @@
 		if q.size() != i+1 {
 			t.Errorf("got %d, want %d", q.size(), i+1)
 		}
-		if err := scan(ctx, q, want[:i+1]); err != nil {
+		if err := scanForFoundOnly(ctx, q, want[:i+1]); err != nil {
 			t.Error(err)
 		}
 	}
@@ -84,30 +84,60 @@
 	<-ch
 
 	// Start another long scan that will proceed across deletes
+	// Start a similar scan that will see 0, 1, 3, 4 and deletes for 0, 1, 3, 4.
+	// Start a final scan that will see 0, 3 and deletes for 0, 3.
 	steps := make(chan struct{})
 	go func() {
-		ctx, cancel := context.WithCancel(ctx)
+		ctx1, cancel := context.WithCancel(ctx)
 		c := q.scan()
+		ctx2, cancel2 := context.WithCancel(ctx)
+		c2 := q.scan()
+		ctx3, cancel3 := context.WithCancel(ctx)
+		c3 := q.scan()
 
-		if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[0] {
-			t.Error("next should have suceeded.")
-		}
-		if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[1] {
-			t.Error("next should have suceeded.")
-		}
+		advance(t, ctx1, c, q, want[0], false)
+		advance(t, ctx1, c, q, want[1], false)
+		advance(t, ctx2, c2, q, want[0], false)
+		advance(t, ctx2, c2, q, want[1], false)
+		advance(t, ctx3, c3, q, want[0], false)
 
 		steps <- struct{}{}
 		<-steps
 
-		if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[3] {
-			t.Error("next should have suceeded.")
+		// Scanner c should just see 3 and 4.
+		advance(t, ctx1, c, q, want[3], false)
+		advance(t, ctx1, c, q, want[4], false)
+		if err := trueCancel(ctx1, cancel, q, c); err != nil {
+			t.Error(err)
 		}
-		if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[4] {
-			t.Error("next should have suceeded.")
-		}
-		cancel()
+
+		// Scanner c2 should see 3, 4, lost 0, lost 1.
+		advance(t, ctx2, c2, q, want[3], false)
+		advance(t, ctx2, c2, q, want[4], false)
+		advance(t, ctx2, c2, q, want[0], true)
+		advance(t, ctx2, c2, q, want[1], true)
+
+		// Scanner c3 will just take a look at 3.
+		advance(t, ctx3, c3, q, want[3], false)
 
 		steps <- struct{}{}
+		<-steps
+
+		// After this point, 3 and 4 were removed, so c2 should see those too.
+		advance(t, ctx2, c2, q, want[3], true)
+		advance(t, ctx2, c2, q, want[4], true)
+		if err := trueCancel(ctx2, cancel2, q, c2); err != nil {
+			t.Error(err)
+		}
+
+		// Since c3 only looked at 0 and 3, it'll only see losses for those two.
+		advance(t, ctx3, c3, q, want[0], true)
+		advance(t, ctx3, c3, q, want[3], true)
+		if err := trueCancel(ctx3, cancel3, q, c3); err != nil {
+			t.Error(err)
+		}
+
+		steps <- struct{}{} // Done. Every cursor should be canceled now.
 	}()
 
 	// Wait for the scan to read the first two values.
@@ -122,16 +152,44 @@
 			// Wait for it to finish.
 			<-steps
 		}
-		if q.size() != len(want)-i-1 {
-			t.Errorf("got %d, want %d", q.size(), len(want)-i-1)
+		// A lost element replaces the removed found element.
+		if i < 2 && q.size() != 5 {
+			t.Errorf("got %d, want %d", q.size(), 5)
 		}
-		if err := scan(ctx, q, want[i+1:]); err != nil {
-			t.Error(err)
+		// #1 was garbage collected (c1, c2 canceled. c3 only saw #0, but not #1).
+		// #2 was garbage collected (nobody saw it)
+		// Thus, we need to see 3 items (3, 4, and lost 0)
+		if i >= 2 && q.size() != 3 {
+			t.Errorf("got %d, want %d", q.size(), 3)
 		}
+		if err := scanForFoundOnly(ctx, q, want[i+1:]); err != nil {
+			t.Errorf("on iteration %d, scan for found only failed: %v", i, err)
+		}
+	}
+
+	steps <- struct{}{} // return control to verify the scan for c2 and c3.
+	<-steps
+
+	// Verify that the queue is now empty.
+	// Garbage collection of all lost elements should have occurred.
+	if q.size() != 0 {
+		t.Errorf("got %d, want 0", q.size())
 	}
 }
 
-func logList(ctx *context.T, q *inviteQueue) {
+func advance(t *testing.T, ctx *context.T, c int, q *copyableQueue, want string, isLost bool) {
+	if inv, ok := q.next(ctx, c); !ok {
+		t.Error("next should have suceeded.")
+	} else if inv.(Invite).Syncgroup.Name != want {
+		t.Errorf("next should be %s, but got: %s", want, inv.(Invite).Syncgroup.Name)
+	} else if inv.(Invite).Lost && !isLost {
+		t.Error("next should have been found")
+	} else if !inv.(Invite).Lost && isLost {
+		t.Error("next should have been lost")
+	}
+}
+
+func logList(ctx *context.T, q *copyableQueue) {
 	buf := &bytes.Buffer{}
 	for e := q.sentinel.next; e != &q.sentinel; e = e.next {
 		fmt.Fprintf(buf, "%p ", e)
@@ -139,21 +197,39 @@
 	ctx.Info("list", buf.String())
 }
 
-func scan(ctx *context.T, q *inviteQueue, want []string) error {
+func scanForFoundOnly(ctx *context.T, q *copyableQueue, want []string) error {
 	ctx, cancel := context.WithCancel(ctx)
 	c := q.scan()
 	for i, w := range want {
 		inv, ok := q.next(ctx, c)
+
+		// Skip the lost for this test.
+		for inv.isLost() {
+			inv, ok = q.next(ctx, c)
+		}
+
 		if !ok {
 			return fmt.Errorf("scan ended after %d entries, wanted %d", i, len(want))
 		}
-		if got := inv.Syncgroup.Name; got != w {
+		if got := inv.(Invite).Syncgroup.Name; got != w {
 			return fmt.Errorf("got %s, want %s", got, w)
 		}
-		if i == len(want)-1 {
-			// Calling cancel should allow next to fail, exiting the loop.
-			cancel()
+		if inv.(Invite).Lost {
+			return fmt.Errorf("invite %v was lost", inv)
 		}
 	}
+	if err := trueCancel(ctx, cancel, q, c); err != nil {
+		return err
+	}
+
+	return nil
+}
+
+func trueCancel(ctx *context.T, cancel context.CancelFunc, q *copyableQueue, c int) error {
+	cancel()
+	q.stopScan(c)
+	if unwanted, ok := q.next(ctx, c); ok {
+		return fmt.Errorf("next returned %v after scan canceled", unwanted)
+	}
 	return nil
 }
diff --git a/services/syncbase/discovery/discovery_test.go b/services/syncbase/discovery/discovery_test.go
index c33614f..804b653 100644
--- a/services/syncbase/discovery/discovery_test.go
+++ b/services/syncbase/discovery/discovery_test.go
@@ -22,8 +22,10 @@
 	fdiscovery "v.io/x/ref/lib/discovery/factory"
 	"v.io/x/ref/lib/discovery/global"
 	"v.io/x/ref/lib/discovery/plugins/mock"
+	udiscovery "v.io/x/ref/lib/discovery/testutil"
 	_ "v.io/x/ref/runtime/factories/roaming"
 	syncdis "v.io/x/ref/services/syncbase/discovery"
+	"v.io/x/ref/services/syncbase/server/interfaces"
 	tu "v.io/x/ref/services/syncbase/testutil"
 	"v.io/x/ref/test"
 	"v.io/x/ref/test/testutil"
@@ -293,11 +295,11 @@
 		}
 	}
 
-	d1invites, err := listenAs(ctx, rootp, "o:app1:client1", d1.Id())
+	d1invites, err := listenForInvitesAs(ctx, rootp, "o:app1:client1", d1.Id())
 	if err != nil {
 		panic(err)
 	}
-	d2invites, err := listenAs(ctx, rootp, "o:app1:client1", d2.Id())
+	d2invites, err := listenForInvitesAs(ctx, rootp, "o:app1:client1", d2.Id())
 	if err != nil {
 		panic(err)
 	}
@@ -316,7 +318,7 @@
 	}
 }
 
-func listenAs(ctx *context.T, rootp security.Principal, as string, db wire.Id) (<-chan syncdis.Invite, error) {
+func listenForInvitesAs(ctx *context.T, rootp security.Principal, as string, db wire.Id) (<-chan syncdis.Invite, error) {
 	ch := make(chan syncdis.Invite)
 	return ch, syncdis.ListenForInvites(tu.NewCtx(ctx, rootp, as), db, ch)
 }
@@ -352,3 +354,193 @@
 	}
 	return sg
 }
+
+func raisePeer(t *testing.T, label string, ctx *context.T) <-chan struct{} {
+	ad := discovery.Advertisement{
+		InterfaceName: interfaces.SyncDesc.PkgPath + "/" + interfaces.SyncDesc.Name,
+		Attributes:    map[string]string{wire.DiscoveryAttrPeer: label},
+	}
+	suffix := ""
+	eps := udiscovery.ToEndpoints("addr1:123")
+	mockServer := udiscovery.NewMockServer(eps)
+	done, err := idiscovery.AdvertiseServer(ctx, nil, mockServer, suffix, &ad, nil)
+	if err != nil {
+		t.Fatal(err)
+	}
+	return done
+}
+
+// Do basic tests to check that listen for peers functions as expected.
+// 3 peers are raised in this test and multiple scanners are used.
+// Note: All scanners will see the next found peer even if they haven't
+// consumed the channel for it yet. The effective buffer size is chan size + 1.
+func TestListenForPeers(t *testing.T) {
+	// Setup a base context that is given a rooted principal.
+	baseCtx, shutdown := test.V23Init()
+	defer shutdown()
+
+	rootp := testutil.NewPrincipal("root")
+	ctx := tu.NewCtx(baseCtx, rootp, "o:app")
+
+	// Discovery is also mocked out for future "fake" ads.
+	df, _ := idiscovery.NewFactory(ctx, mock.New())
+	fdiscovery.InjectFactory(df)
+
+	// Raise Peer 1.
+	ctx1, cancel1 := context.WithCancel(ctx)
+	defer cancel1()
+	done1 := raisePeer(t, "peer1", ctx1)
+
+	// 1a: Peer 1 can only see themselves.
+	ctx1a, cancel1a := context.WithCancel(ctx1)
+	peerChan1a, err := listenForPeersAs(ctx1a, rootp, "client1")
+	if err != nil {
+		panic(err)
+	}
+	checkExpectedPeers(t, peerChan1a, 1, 0)
+	cancel1a()
+	confirmCleanupPeer(t, peerChan1a)
+
+	// Raise Peer 2.
+	ctx2, cancel2 := context.WithCancel(ctx)
+	defer cancel2()
+	raisePeer(t, "peer2", ctx2)
+
+	// 2a is special and will run until the end of the test. It reads 0 peers now.
+	ctx2a, cancel2a := context.WithCancel(ctx2)
+	peerChan2a, err := listenForPeersAs(ctx2a, rootp, "client2")
+	if err != nil {
+		panic(err)
+	}
+
+	// 2e is special and will run until the end of the test. It reads 2 peers now.
+	ctx2e, cancel2e := context.WithCancel(ctx2)
+	peerChan2e, err := listenForPeersAs(ctx2e, rootp, "client2")
+	if err != nil {
+		panic(err)
+	}
+	checkExpectedPeers(t, peerChan2e, 2, 0)
+
+	// 1b and 2b: Peer 1 and Peer 2 can see each other.
+	ctx1b, cancel1b := context.WithCancel(ctx1)
+	peerChan1b, err := listenForPeersAs(ctx1b, rootp, "client1")
+	if err != nil {
+		panic(err)
+	}
+	checkExpectedPeers(t, peerChan1b, 2, 0)
+	cancel1b()
+	confirmCleanupPeer(t, peerChan1b)
+
+	ctx2b, cancel2b := context.WithCancel(ctx2)
+	peerChan2b, err := listenForPeersAs(ctx2b, rootp, "client2")
+	if err != nil {
+		panic(err)
+	}
+	checkExpectedPeers(t, peerChan2b, 2, 0)
+	cancel2b()
+	confirmCleanupPeer(t, peerChan2b)
+
+	// Raise Peer 3.
+	ctx3, cancel3 := context.WithCancel(ctx)
+	defer cancel3()
+	done3 := raisePeer(t, "peer3", ctx3)
+
+	// 2c is special and will run until the end of the test. It reads 3 peers now.
+	ctx2c, cancel2c := context.WithCancel(ctx2)
+	peerChan2c, err := listenForPeersAs(ctx2c, rootp, "client2")
+	if err != nil {
+		panic(err)
+	}
+	checkExpectedPeers(t, peerChan2c, 3, 0)
+
+	// Stop Peer 1 by canceling its context and waiting for the ad to disappear.
+	cancel1()
+	<-done1
+
+	// The "remove" notification goroutines might race, so wait a little bit.
+	// It is okay to wait since this is purely for test simplification purposes.
+	<-time.After(time.Millisecond * 10)
+
+	// 2c also sees lost 1.
+	checkExpectedPeers(t, peerChan2c, 0, 1)
+
+	// 2d and 3d: Peer 2 and Peer 3 see each other and nobody else.
+	ctx2d, cancel2d := context.WithCancel(ctx2)
+	peerChan2d, err := listenForPeersAs(ctx2d, rootp, "client2")
+	if err != nil {
+		panic(err)
+	}
+	checkExpectedPeers(t, peerChan2d, 2, 0)
+	cancel2d()
+	confirmCleanupPeer(t, peerChan2d)
+
+	ctx3d, cancel3d := context.WithCancel(ctx3)
+	peerChan3d, err := listenForPeersAs(ctx3d, rootp, "client3")
+	if err != nil {
+		panic(err)
+	}
+	checkExpectedPeers(t, peerChan3d, 2, 0)
+	cancel3d()
+	confirmCleanupPeer(t, peerChan3d)
+
+	// Stop Peer 3 by canceling its context and waiting for the ad to disappear.
+	cancel3()
+	<-done3
+
+	// The "remove" notification goroutines might race, so wait a little bit.
+	// It is okay to wait since this is purely for test simplification purposes.
+	<-time.After(time.Millisecond * 10)
+
+	// 2c also sees lost 3.
+	checkExpectedPeers(t, peerChan2c, 0, 1)
+
+	// We're done with 2c, who saw 3 updates (1, 2, 3) and 2 losses (1 and 3)
+	cancel2c()
+	confirmCleanupPeer(t, peerChan2c)
+
+	// 2a should see 1, 2, and lose 1. It won't notice #3 since the channel won't buffer it.
+	checkExpectedPeers(t, peerChan2a, 2, 1)
+	cancel2a()
+	confirmCleanupPeer(t, peerChan2a)
+
+	// 2e has seen 2 found updates earlier, so peer 3 is buffered in the channel.
+	// We should see peer 3 and two loss events.
+	checkExpectedPeers(t, peerChan2e, 1, 2)
+	cancel2e()
+	confirmCleanupPeer(t, peerChan2e)
+}
+
+func listenForPeersAs(ctx *context.T, rootp security.Principal, as string) (<-chan syncdis.Peer, error) {
+	ch := make(chan syncdis.Peer)
+	return ch, syncdis.ListenForPeers(tu.NewCtx(ctx, rootp, as), ch)
+}
+
+func checkExpectedPeers(t *testing.T, ch <-chan syncdis.Peer, found int, lost int) {
+	counter := 0
+	for i := 0; i < found+lost; i++ {
+		select {
+		case peer, ok := <-ch:
+			if !ok {
+				t.Error("peer channel shouldn't be closed yet")
+			} else {
+				if !peer.Lost {
+					counter++
+				}
+			}
+		case <-time.After(time.Second * 1):
+			t.Errorf("timed out without seeing enough peers. Found %d in %d updates, but needed %d in %d", counter, i, found, found+lost)
+			return
+		}
+	}
+
+	if counter != found {
+		t.Errorf("Found %d peers, expected %d", counter, found)
+	}
+}
+
+func confirmCleanupPeer(t *testing.T, ch <-chan syncdis.Peer) {
+	unwanted, ok := <-ch
+	if ok {
+		t.Errorf("found unwanted peer update %v", unwanted)
+	}
+}