x/ref: Implement ListenForPeers
This CL does the following things.
1. modify the underlying queue implementation
- It now handles both Peer and Invite through the copyable interface.
- copyable introduces the concept of a "lost" invite or peer.
- When the copyable is removed from the queue, a "lost" version is
inserted in its stead.
- "lost" elements are only visible to cursors that have seen the
corresponding found elements.
- "lost" elements remain within the queue, and thus affect its "size".
- Garbage collection of the "lost" elements occurs once everybody
qualified to see the element has seen it via "next" OR has canceled
and unregistered themselves from the queue.
- Tests adjusted to take into account "lost" updates.
New tests added to take into account "lost" update garbage collection.
2. ListenForPeers is updated.
- Tests remain the same; it ignores the "lost" Invites.
3. ListenForInvites was added.
- Tests added by mocking out discovery.
Change-Id: I2ba36a59621a7e13d16dc722610c34d2f2a835b8
diff --git a/services/syncbase/discovery/discovery.go b/services/syncbase/discovery/discovery.go
index fa0d09c..9e2abc8 100644
--- a/services/syncbase/discovery/discovery.go
+++ b/services/syncbase/discovery/discovery.go
@@ -354,7 +354,8 @@
}
type scanState struct {
- dbs map[wire.Id]*inviteQueue
+ peers *copyableQueue
+ dbs map[wire.Id]*copyableQueue
listeners int
cancel context.CancelFunc
}
@@ -362,7 +363,8 @@
func newScan(ctx *context.T) (*scanState, error) {
ctx, cancel := context.WithRootCancel(ctx)
scan := &scanState{
- dbs: make(map[wire.Id]*inviteQueue),
+ peers: nil,
+ dbs: make(map[wire.Id]*copyableQueue),
cancel: cancel,
}
// TODO(suharshs): Add globalDiscoveryPath.
@@ -386,7 +388,7 @@
if u.IsLost() {
// TODO(mattr): Removing like this can result in resurfacing already
// retrieved invites. For example if the ACL of a syncgroup is
- // changed to add a new memeber, then we might see a remove and then
+ // changed to add a new member, then we might see a remove and then
// later an add. In this case we would end up moving the invite
// to the end of the queue. One way to fix this would be to
// keep removed invites for some time.
@@ -398,26 +400,38 @@
}
} else {
if q == nil {
- q = newInviteQueue()
+ q = newCopyableQueue()
scan.dbs[db] = q
}
q.add(invite)
q.cond.Broadcast()
}
state.mu.Unlock()
+ } else if peer, ok := makePeer(u.Advertisement()); ok {
+ state.mu.Lock()
+ q := scan.peers
+ if u.IsLost() {
+ if q != nil {
+ q.remove(peer)
+ if q.empty() {
+ scan.peers = nil
+ }
+ }
+ } else {
+ if q == nil {
+ q = newCopyableQueue()
+ scan.peers = q
+ }
+ q.add(peer)
+ q.cond.Broadcast()
+ }
+ state.mu.Unlock()
}
}
}()
return scan, nil
}
-// Invite represents an invitation to join a syncgroup as found via Discovery.
-type Invite struct {
- Syncgroup wire.Id //Syncgroup is the Id of the syncgroup you've been invited to.
- Addresses []string //Addresses are the list of addresses of the inviting server.
- key string
-}
-
// ListenForInvites listens via Discovery for syncgroup invitations for the given
// database and sends the invites to the provided channel. We stop listening when
// the given context is canceled. When that happens we close the given channel.
@@ -425,12 +439,87 @@
defer state.mu.Unlock()
state.mu.Lock()
+ scan, err := prepareScannerByPrincipal(ctx)
+ if err != nil {
+ return err
+ }
+
+ q := scan.dbs[db]
+ if q == nil {
+ q = newCopyableQueue()
+ scan.dbs[db] = q
+ }
+
+ // Send the copyables into a temporary channel.
+ cp_ch := make(chan copyable)
+ go consumeCopyableQueue(ctx, q, scan, cp_ch, func() {
+ delete(scan.dbs, db)
+ })
+
+ // Convert these copyables into Invites.
+ go func() {
+ for {
+ v, ok := <-cp_ch
+ if !ok {
+ close(ch)
+ break
+ }
+ invite := v.(Invite)
+ if !invite.isLost() {
+ ch <- invite
+ }
+ }
+ }()
+
+ return nil
+}
+
+// ListenForPeers listens via Discovery for syncgroup peers and sends them to
+// the provided channel. We stop listening when the context is canceled. When
+// that happens we close the given channel.
+func ListenForPeers(ctx *context.T, ch chan<- Peer) error {
+ defer state.mu.Unlock()
+ state.mu.Lock()
+
+ scan, err := prepareScannerByPrincipal(ctx)
+ if err != nil {
+ return err
+ }
+
+ q := scan.peers
+ if q == nil {
+ q = newCopyableQueue()
+ scan.peers = q
+ }
+
+ // Send the copyables into a temporary channel.
+ cp_ch := make(chan copyable)
+ go consumeCopyableQueue(ctx, q, scan, cp_ch, func() {
+ scan.peers = nil
+ })
+
+ // Convert these copyables into Peers.
+ go func() {
+ for {
+ v, ok := <-cp_ch
+ if !ok {
+ close(ch)
+ break
+ }
+ ch <- v.(Peer)
+ }
+ }()
+
+ return nil
+}
+
+func prepareScannerByPrincipal(ctx *context.T) (*scanState, error) {
p := v23.GetPrincipal(ctx)
scan := state.scans[p]
if scan == nil {
var err error
if scan, err = newScan(ctx); err != nil {
- return err
+ return nil, err
}
if state.scans == nil {
state.scans = make(map[security.Principal]*scanState)
@@ -438,50 +527,50 @@
state.scans[p] = scan
}
scan.listeners++
-
- q := scan.dbs[db]
- if q == nil {
- q = newInviteQueue()
- scan.dbs[db] = q
- }
-
- go func() {
- c := q.scan()
- for {
- next, ok := q.next(ctx, c)
- if !ok {
- break
- }
- select {
- case ch <- next.copy():
- case <-ctx.Done():
- break
- }
- }
- close(ch)
-
- defer state.mu.Unlock()
- state.mu.Lock()
-
- if q.empty() {
- delete(scan.dbs, db)
- }
- scan.listeners--
- if scan.listeners == 0 {
- scan.cancel()
- delete(state.scans, v23.GetPrincipal(ctx))
- }
- }()
-
- return nil
+ return scan, nil
}
-// inviteQueue is a linked list based queue. As we get new invitations we
-// add them to the queue, and when advertisements are lost we remove elements.
+type isQueueEmptyCallback func()
+
+// Should be called in a goroutine. Pairs with prepareScannerByPrincipal.
+func consumeCopyableQueue(ctx *context.T, q *copyableQueue, scan *scanState, ch chan<- copyable, cb isQueueEmptyCallback) {
+ c := q.scan()
+ for {
+ next, ok := q.next(ctx, c)
+ if !ok {
+ break
+ }
+ select {
+ case ch <- next.copy():
+ case <-ctx.Done():
+ q.stopScan(c)
+ break
+ }
+ }
+ close(ch)
+
+ defer state.mu.Unlock()
+ state.mu.Lock()
+
+ if q.empty() {
+ cb()
+ }
+ scan.listeners--
+ if scan.listeners == 0 {
+ scan.cancel()
+ delete(state.scans, v23.GetPrincipal(ctx))
+ }
+}
+
+// copyableQueue is a linked list based queue. As we get new invitations/peers,
+// we add them to the queue, and when advertisements are lost we remove instead.
// Each call to scan creates a cursor on the queue that will iterate until
// the Listen is canceled. We wait when we hit the end of the queue via the
// condition variable cond.
-type inviteQueue struct {
+// Note: The queue contains both "found" and "lost" elements. A removed "found"
+// element is replaced with a "lost" element, which all cursors will see.
+// See ielement for the garbage collection strategy for the "lost" elements.
+type copyableQueue struct {
mu nsync.Mu
cond nsync.CV
@@ -491,7 +580,7 @@
nextCursorId int
}
-func (q *inviteQueue) debugLocked() string {
+func (q *copyableQueue) debugLocked() string {
buf := &bytes.Buffer{}
fmt.Fprintf(buf, "*%p", &q.sentinel)
for c := q.sentinel.next; c != &q.sentinel; c = c.next {
@@ -500,13 +589,55 @@
return buf.String()
}
-type ielement struct {
- invite Invite
- prev, next *ielement
+// An interface used to represent Invite and Peer that reduces code duplication.
+// Typecasting is cheap, so this seems better than maintaining duplicate
+// implementations of copyableQueue.
+type copyable interface {
+ copy() copyable
+ id() string
+ isLost() bool
+ copyAsLost() copyable
}
-func newInviteQueue() *inviteQueue {
- iq := &inviteQueue{
+// Keeps track of a copyable element and the next/prev ielements in a queue.
+// ielements can represent both "found" and "lost" elements. Those of the "lost"
+// type also set "numCanSee" and "whoCanSee" to indicate that only certain
+// cursors can see the element contained in this ielement. Once all the cursors
+// have seen a "lost" ielement it can be garbage collected safely.
+// See "canBeSeenByCursor" and "consume".
+type ielement struct {
+ elem copyable
+ prev, next *ielement
+ numCanSee int // If >0, this decrements. The element is removed upon hitting 0.
+ whoCanSee []int // The cursors that can see this ielement. Set only once.
+}
+
+// A cursor can see the element in this ielement if it is a "found" element or
+// a "lost" element with the correct visibility.
+func (i *ielement) canBeSeenByCursor(cursor int) bool {
+ if i.numCanSee == 0 { // Special: 0 is visible by everyone.
+ return true
+ }
+ for _, who := range i.whoCanSee {
+ if cursor == who {
+ return true
+ }
+ }
+ return false
+}
+
+// Reduce "numCanSee" and indicate whether or not this element needs to be removed
+// from the queue.
+func (i *ielement) consume() bool {
+ if i.numCanSee > 0 {
+ i.numCanSee--
+ return i.numCanSee == 0
+ }
+ return false
+}
+
+func newCopyableQueue() *copyableQueue {
+ iq := ©ableQueue{
elems: make(map[string]*ielement),
cursors: make(map[int]*ielement),
}
@@ -514,68 +645,146 @@
return iq
}
-func (q *inviteQueue) empty() bool {
+func (q *copyableQueue) empty() bool {
defer q.mu.Unlock()
q.mu.Lock()
return len(q.elems) == 0 && len(q.cursors) == 0
}
-func (q *inviteQueue) size() int {
+func (q *copyableQueue) size() int {
defer q.mu.Unlock()
q.mu.Lock()
return len(q.elems)
}
-func (q *inviteQueue) remove(i Invite) {
+// Removes the given copyable from the queue. No-op if not present or "lost".
+// Note: As described in copyableQueue, if the copyable is a "found" element,
+// then a corresponding "lost" element will be appended to the queue. Only the
+// cursors who have seen the "found" element can see the "lost" one.
+func (q *copyableQueue) remove(i copyable) {
defer q.mu.Unlock()
q.mu.Lock()
- el, ok := q.elems[i.key]
- if !ok {
+ el, ok := q.elems[i.id()]
+ if !ok || el.elem.isLost() {
return
}
- delete(q.elems, i.key)
+
+ // Make a list of all the users who have seen this element.
+ whoCanSee := []int{}
+ for cursor, cel := range q.cursors {
+ // Adjust the cursor if it is pointing to the removed element.
+ if cel == el {
+ q.cursors[cursor] = cel.prev
+ whoCanSee = append(whoCanSee, cursor)
+ continue
+ }
+
+ // Check if this cursor has gone past the removed element.
+ // Note: This implementation assumes a small queue and a small number of
+ // cursors. Otherwise, using a table is recommended over list traversal.
+ for ; cel != &q.sentinel; cel = cel.prev {
+ if cel == el {
+ whoCanSee = append(whoCanSee, cursor)
+ break
+ }
+ }
+ }
+
+ // "Remove" el from the queue.
el.next.prev, el.prev.next = el.prev, el.next
+
+ // el is a visible lost item that needs to be added to the queue.
+ if len(whoCanSee) > 0 {
+ // Adjust the lost ielement el to include the lost data and who can see it.
+ el.elem = el.elem.copyAsLost()
+ el.numCanSee = len(whoCanSee)
+ el.whoCanSee = whoCanSee
+
+ // Insert the lost element before/after the sentinel and broadcast.
+ el.prev, el.next = q.sentinel.prev, &q.sentinel
+ q.sentinel.prev, q.sentinel.prev.next = el, el
+ q.cond.Broadcast()
+ } else {
+ delete(q.elems, i.id())
+ }
+}
+
+func (q *copyableQueue) add(i copyable) {
+ defer q.mu.Unlock()
+ q.mu.Lock()
+
+ if _, ok := q.elems[i.id()]; ok {
+ return
+ }
+ el := &ielement{i, q.sentinel.prev, &q.sentinel, 0, []int{}}
+ q.sentinel.prev, q.sentinel.prev.next = el, el
+ q.elems[i.id()] = el
+ q.cond.Broadcast()
+}
+
+func (q *copyableQueue) next(ctx *context.T, cursor int) (copyable, bool) {
+ defer q.mu.Unlock()
+ q.mu.Lock()
+ c, exists := q.cursors[cursor]
+ if !exists {
+ return nil, false
+ }
+
+ // Find the next available element that this cursor can see. Will block until
+ // somebody writes to the queue or the given context is canceled.
+ // Note: Some "lost" elements in the queue will not be visible to this cursor;
+ // these are skipped.
+ for {
+ for c.next == &q.sentinel {
+ if q.cond.WaitWithDeadline(&q.mu, nsync.NoDeadline, ctx.Done()) != nsync.OK {
+ q.removeCursorLocked(cursor)
+ return nil, false
+ }
+ c = q.cursors[cursor]
+ }
+ c = c.next
+ q.cursors[cursor] = c
+ if c.canBeSeenByCursor(cursor) {
+ if c.consume() {
+ q.removeLostIElemLocked(c)
+
+ }
+ return c.elem, true
+ }
+ }
+}
+
+func (q *copyableQueue) removeCursorLocked(cursor int) {
+ c := q.cursors[cursor]
+ delete(q.cursors, cursor)
+
+ // Garbage collection: Step through each remaining element and remove
+ // unneeded lost ielements.
+ for c.next != &q.sentinel {
+ c = c.next // step
+ if c.canBeSeenByCursor(cursor) {
+ if c.consume() {
+ q.removeLostIElemLocked(c)
+ }
+ }
+ }
+}
+
+func (q *copyableQueue) removeLostIElemLocked(ielem *ielement) {
+ // Remove the lost ielem since everyone has seen it.
+ delete(q.elems, ielem.elem.id())
+ ielem.next.prev, ielem.prev.next = ielem.prev, ielem.next
+
+ // Adjust the cursors if their elements were on the removed element.
for id, c := range q.cursors {
- if c == el {
+ if c == ielem {
q.cursors[id] = c.prev
}
}
}
-func (q *inviteQueue) add(i Invite) {
- defer q.mu.Unlock()
- q.mu.Lock()
-
- if _, ok := q.elems[i.key]; ok {
- return
- }
- el := &ielement{i, q.sentinel.prev, &q.sentinel}
- q.sentinel.prev, q.sentinel.prev.next = el, el
- q.elems[i.key] = el
- q.cond.Broadcast()
-}
-
-func (q *inviteQueue) next(ctx *context.T, cursor int) (Invite, bool) {
- defer q.mu.Unlock()
- q.mu.Lock()
- c, exists := q.cursors[cursor]
- if !exists {
- return Invite{}, false
- }
- for c.next == &q.sentinel {
- if q.cond.WaitWithDeadline(&q.mu, nsync.NoDeadline, ctx.Done()) != nsync.OK {
- delete(q.cursors, cursor)
- return Invite{}, false
- }
- c = q.cursors[cursor]
- }
- c = c.next
- q.cursors[cursor] = c
- return c.invite, true
-}
-
-func (q *inviteQueue) scan() int {
+func (q *copyableQueue) scan() int {
defer q.mu.Unlock()
q.mu.Lock()
id := q.nextCursorId
@@ -584,6 +793,23 @@
return id
}
+func (q *copyableQueue) stopScan(cursor int) {
+ defer q.mu.Unlock()
+ q.mu.Lock()
+
+ if _, exists := q.cursors[cursor]; exists {
+ q.removeCursorLocked(cursor)
+ }
+}
+
+// Invite represents an invitation to join a syncgroup as found via Discovery.
+type Invite struct {
+ Syncgroup wire.Id // Syncgroup is the Id of the syncgroup you've been invited to.
+ Addresses []string // Addresses are the list of addresses of the inviting server.
+ Lost bool // If this invite is a lost invite or not.
+ key string // Unexported. The implementation uses this key to de-dupe ads.
+}
+
func makeInvite(ad discovery.Advertisement) (Invite, wire.Id, bool) {
var dbId, sgId wire.Id
var ok bool
@@ -608,8 +834,68 @@
return i, dbId, true
}
-func (i Invite) copy() Invite {
+func (i Invite) copy() copyable {
cp := i
cp.Addresses = append([]string(nil), i.Addresses...)
return cp
}
+
+func (i Invite) id() string {
+ return i.key
+}
+
+func (i Invite) isLost() bool {
+ return i.Lost
+}
+
+func (i Invite) copyAsLost() copyable {
+ cp := i
+ cp.Addresses = append([]string(nil), i.Addresses...)
+ cp.Lost = true
+ return cp
+}
+
+// Peer represents a Syncbase peer found via Discovery.
+type Peer struct {
+ Name string // Name is the name of the Syncbase peer's sync service.
+ Addresses []string // Addresses are the list of addresses of the peer's server.
+ Lost bool // If this peer is a lost peer or not.
+ key string // Unexported. The implementation uses this key to de-dupe ads.
+}
+
+// Attempts to make a Peer using its discovery attributes.
+// Will return an empty Peer struct if it fails.
+func makePeer(ad discovery.Advertisement) (Peer, bool) {
+ peerName, ok := ad.Attributes[wire.DiscoveryAttrPeer]
+ if !ok {
+ return Peer{}, false
+ }
+ p := Peer{
+ Name: peerName,
+ Addresses: append([]string{}, ad.Addresses...),
+ }
+ sort.Strings(p.Addresses)
+ p.key = fmt.Sprintf("%v", p)
+ return p, true
+}
+
+func (p Peer) copy() copyable {
+ cp := p
+ cp.Addresses = append([]string(nil), p.Addresses...)
+ return cp
+}
+
+func (p Peer) id() string {
+ return p.key
+}
+
+func (p Peer) isLost() bool {
+ return p.Lost
+}
+
+func (p Peer) copyAsLost() copyable {
+ cp := p
+ cp.Addresses = append([]string(nil), p.Addresses...)
+ cp.Lost = true
+ return cp
+}
diff --git a/services/syncbase/discovery/discovery_internal_test.go b/services/syncbase/discovery/discovery_internal_test.go
index 1ed80e1..513d190 100644
--- a/services/syncbase/discovery/discovery_internal_test.go
+++ b/services/syncbase/discovery/discovery_internal_test.go
@@ -55,7 +55,7 @@
ctx = context.WithLogger(ctx, logger{})
- q := newInviteQueue()
+ q := newCopyableQueue()
if q.size() != 0 {
t.Errorf("got %d, want 0", q.size())
}
@@ -65,7 +65,7 @@
// Test inserts during a long scan.
ch := make(chan struct{})
go func() {
- scan(ctx, q, want)
+ scanForFoundOnly(ctx, q, want)
close(ch)
}()
@@ -75,7 +75,7 @@
if q.size() != i+1 {
t.Errorf("got %d, want %d", q.size(), i+1)
}
- if err := scan(ctx, q, want[:i+1]); err != nil {
+ if err := scanForFoundOnly(ctx, q, want[:i+1]); err != nil {
t.Error(err)
}
}
@@ -84,30 +84,60 @@
<-ch
// Start another long scan that will proceed across deletes
+ // Start a similar scan that will see 0, 1, 3, 4 and deletes for 0, 1, 3, 4.
+ // Start a final scan that will see 0, 3 and deletes for 0, 3.
steps := make(chan struct{})
go func() {
- ctx, cancel := context.WithCancel(ctx)
+ ctx1, cancel := context.WithCancel(ctx)
c := q.scan()
+ ctx2, cancel2 := context.WithCancel(ctx)
+ c2 := q.scan()
+ ctx3, cancel3 := context.WithCancel(ctx)
+ c3 := q.scan()
- if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[0] {
- t.Error("next should have suceeded.")
- }
- if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[1] {
- t.Error("next should have suceeded.")
- }
+ advance(t, ctx1, c, q, want[0], false)
+ advance(t, ctx1, c, q, want[1], false)
+ advance(t, ctx2, c2, q, want[0], false)
+ advance(t, ctx2, c2, q, want[1], false)
+ advance(t, ctx3, c3, q, want[0], false)
steps <- struct{}{}
<-steps
- if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[3] {
- t.Error("next should have suceeded.")
+ // Scanner c should just see 3 and 4.
+ advance(t, ctx1, c, q, want[3], false)
+ advance(t, ctx1, c, q, want[4], false)
+ if err := trueCancel(ctx1, cancel, q, c); err != nil {
+ t.Error(err)
}
- if inv, ok := q.next(ctx, c); !ok || inv.Syncgroup.Name != want[4] {
- t.Error("next should have suceeded.")
- }
- cancel()
+
+ // Scanner c2 should see 3, 4, lost 0, lost 1.
+ advance(t, ctx2, c2, q, want[3], false)
+ advance(t, ctx2, c2, q, want[4], false)
+ advance(t, ctx2, c2, q, want[0], true)
+ advance(t, ctx2, c2, q, want[1], true)
+
+ // Scanner c3 will just take a look at 3.
+ advance(t, ctx3, c3, q, want[3], false)
steps <- struct{}{}
+ <-steps
+
+ // After this point, 3 and 4 were removed, so c2 should see those too.
+ advance(t, ctx2, c2, q, want[3], true)
+ advance(t, ctx2, c2, q, want[4], true)
+ if err := trueCancel(ctx2, cancel2, q, c2); err != nil {
+ t.Error(err)
+ }
+
+ // Since c3 only looked at 0 and 3, it'll only see losses for those two.
+ advance(t, ctx3, c3, q, want[0], true)
+ advance(t, ctx3, c3, q, want[3], true)
+ if err := trueCancel(ctx3, cancel3, q, c3); err != nil {
+ t.Error(err)
+ }
+
+ steps <- struct{}{} // Done. Every cursor should be canceled now.
}()
// Wait for the scan to read the first two values.
@@ -122,16 +152,44 @@
// Wait for it to finish.
<-steps
}
- if q.size() != len(want)-i-1 {
- t.Errorf("got %d, want %d", q.size(), len(want)-i-1)
+ // A lost element replaces the removed found element.
+ if i < 2 && q.size() != 5 {
+ t.Errorf("got %d, want %d", q.size(), 5)
}
- if err := scan(ctx, q, want[i+1:]); err != nil {
- t.Error(err)
+ // #1 was garbage collected (c1, c2 canceled. c3 only saw #0, but not #1).
+ // #2 was garbage collected (nobody saw it)
+ // Thus, we need to see 3 items (3, 4, and lost 0)
+ if i >= 2 && q.size() != 3 {
+ t.Errorf("got %d, want %d", q.size(), 3)
}
+ if err := scanForFoundOnly(ctx, q, want[i+1:]); err != nil {
+ t.Errorf("on iteration %d, scan for found only failed: %v", i, err)
+ }
+ }
+
+ steps <- struct{}{} // return control to verify the scan for c2 and c3.
+ <-steps
+
+ // Verify that the queue is now empty.
+ // Garbage collection of all lost elements should have occurred.
+ if q.size() != 0 {
+ t.Errorf("got %d, want 0", q.size())
}
}
-func logList(ctx *context.T, q *inviteQueue) {
+func advance(t *testing.T, ctx *context.T, c int, q *copyableQueue, want string, isLost bool) {
+ if inv, ok := q.next(ctx, c); !ok {
+ t.Error("next should have suceeded.")
+ } else if inv.(Invite).Syncgroup.Name != want {
+ t.Errorf("next should be %s, but got: %s", want, inv.(Invite).Syncgroup.Name)
+ } else if inv.(Invite).Lost && !isLost {
+ t.Error("next should have been found")
+ } else if !inv.(Invite).Lost && isLost {
+ t.Error("next should have been lost")
+ }
+}
+
+func logList(ctx *context.T, q *copyableQueue) {
buf := &bytes.Buffer{}
for e := q.sentinel.next; e != &q.sentinel; e = e.next {
fmt.Fprintf(buf, "%p ", e)
@@ -139,21 +197,39 @@
ctx.Info("list", buf.String())
}
-func scan(ctx *context.T, q *inviteQueue, want []string) error {
+func scanForFoundOnly(ctx *context.T, q *copyableQueue, want []string) error {
ctx, cancel := context.WithCancel(ctx)
c := q.scan()
for i, w := range want {
inv, ok := q.next(ctx, c)
+
+ // Skip the lost for this test.
+ for inv.isLost() {
+ inv, ok = q.next(ctx, c)
+ }
+
if !ok {
return fmt.Errorf("scan ended after %d entries, wanted %d", i, len(want))
}
- if got := inv.Syncgroup.Name; got != w {
+ if got := inv.(Invite).Syncgroup.Name; got != w {
return fmt.Errorf("got %s, want %s", got, w)
}
- if i == len(want)-1 {
- // Calling cancel should allow next to fail, exiting the loop.
- cancel()
+ if inv.(Invite).Lost {
+ return fmt.Errorf("invite %v was lost", inv)
}
}
+ if err := trueCancel(ctx, cancel, q, c); err != nil {
+ return err
+ }
+
+ return nil
+}
+
+func trueCancel(ctx *context.T, cancel context.CancelFunc, q *copyableQueue, c int) error {
+ cancel()
+ q.stopScan(c)
+ if unwanted, ok := q.next(ctx, c); ok {
+ return fmt.Errorf("next returned %v after scan canceled", unwanted)
+ }
return nil
}
diff --git a/services/syncbase/discovery/discovery_test.go b/services/syncbase/discovery/discovery_test.go
index c33614f..804b653 100644
--- a/services/syncbase/discovery/discovery_test.go
+++ b/services/syncbase/discovery/discovery_test.go
@@ -22,8 +22,10 @@
fdiscovery "v.io/x/ref/lib/discovery/factory"
"v.io/x/ref/lib/discovery/global"
"v.io/x/ref/lib/discovery/plugins/mock"
+ udiscovery "v.io/x/ref/lib/discovery/testutil"
_ "v.io/x/ref/runtime/factories/roaming"
syncdis "v.io/x/ref/services/syncbase/discovery"
+ "v.io/x/ref/services/syncbase/server/interfaces"
tu "v.io/x/ref/services/syncbase/testutil"
"v.io/x/ref/test"
"v.io/x/ref/test/testutil"
@@ -293,11 +295,11 @@
}
}
- d1invites, err := listenAs(ctx, rootp, "o:app1:client1", d1.Id())
+ d1invites, err := listenForInvitesAs(ctx, rootp, "o:app1:client1", d1.Id())
if err != nil {
panic(err)
}
- d2invites, err := listenAs(ctx, rootp, "o:app1:client1", d2.Id())
+ d2invites, err := listenForInvitesAs(ctx, rootp, "o:app1:client1", d2.Id())
if err != nil {
panic(err)
}
@@ -316,7 +318,7 @@
}
}
-func listenAs(ctx *context.T, rootp security.Principal, as string, db wire.Id) (<-chan syncdis.Invite, error) {
+func listenForInvitesAs(ctx *context.T, rootp security.Principal, as string, db wire.Id) (<-chan syncdis.Invite, error) {
ch := make(chan syncdis.Invite)
return ch, syncdis.ListenForInvites(tu.NewCtx(ctx, rootp, as), db, ch)
}
@@ -352,3 +354,193 @@
}
return sg
}
+
+func raisePeer(t *testing.T, label string, ctx *context.T) <-chan struct{} {
+ ad := discovery.Advertisement{
+ InterfaceName: interfaces.SyncDesc.PkgPath + "/" + interfaces.SyncDesc.Name,
+ Attributes: map[string]string{wire.DiscoveryAttrPeer: label},
+ }
+ suffix := ""
+ eps := udiscovery.ToEndpoints("addr1:123")
+ mockServer := udiscovery.NewMockServer(eps)
+ done, err := idiscovery.AdvertiseServer(ctx, nil, mockServer, suffix, &ad, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ return done
+}
+
+// Do basic tests to check that listen for peers functions as expected.
+// 3 peers are raised in this test and multiple scanners are used.
+// Note: All scanners will see the next found peer even if they haven't
+// consumed the channel for it yet. The effective buffer size is chan size + 1.
+func TestListenForPeers(t *testing.T) {
+ // Setup a base context that is given a rooted principal.
+ baseCtx, shutdown := test.V23Init()
+ defer shutdown()
+
+ rootp := testutil.NewPrincipal("root")
+ ctx := tu.NewCtx(baseCtx, rootp, "o:app")
+
+ // Discovery is also mocked out for future "fake" ads.
+ df, _ := idiscovery.NewFactory(ctx, mock.New())
+ fdiscovery.InjectFactory(df)
+
+ // Raise Peer 1.
+ ctx1, cancel1 := context.WithCancel(ctx)
+ defer cancel1()
+ done1 := raisePeer(t, "peer1", ctx1)
+
+ // 1a: Peer 1 can only see themselves.
+ ctx1a, cancel1a := context.WithCancel(ctx1)
+ peerChan1a, err := listenForPeersAs(ctx1a, rootp, "client1")
+ if err != nil {
+ panic(err)
+ }
+ checkExpectedPeers(t, peerChan1a, 1, 0)
+ cancel1a()
+ confirmCleanupPeer(t, peerChan1a)
+
+ // Raise Peer 2.
+ ctx2, cancel2 := context.WithCancel(ctx)
+ defer cancel2()
+ raisePeer(t, "peer2", ctx2)
+
+ // 2a is special and will run until the end of the test. It reads 0 peers now.
+ ctx2a, cancel2a := context.WithCancel(ctx2)
+ peerChan2a, err := listenForPeersAs(ctx2a, rootp, "client2")
+ if err != nil {
+ panic(err)
+ }
+
+ // 2e is special and will run until the end of the test. It reads 2 peers now.
+ ctx2e, cancel2e := context.WithCancel(ctx2)
+ peerChan2e, err := listenForPeersAs(ctx2e, rootp, "client2")
+ if err != nil {
+ panic(err)
+ }
+ checkExpectedPeers(t, peerChan2e, 2, 0)
+
+ // 1b and 2b: Peer 1 and Peer 2 can see each other.
+ ctx1b, cancel1b := context.WithCancel(ctx1)
+ peerChan1b, err := listenForPeersAs(ctx1b, rootp, "client1")
+ if err != nil {
+ panic(err)
+ }
+ checkExpectedPeers(t, peerChan1b, 2, 0)
+ cancel1b()
+ confirmCleanupPeer(t, peerChan1b)
+
+ ctx2b, cancel2b := context.WithCancel(ctx2)
+ peerChan2b, err := listenForPeersAs(ctx2b, rootp, "client2")
+ if err != nil {
+ panic(err)
+ }
+ checkExpectedPeers(t, peerChan2b, 2, 0)
+ cancel2b()
+ confirmCleanupPeer(t, peerChan2b)
+
+ // Raise Peer 3.
+ ctx3, cancel3 := context.WithCancel(ctx)
+ defer cancel3()
+ done3 := raisePeer(t, "peer3", ctx3)
+
+ // 2c is special and will run until the end of the test. It reads 3 peers now.
+ ctx2c, cancel2c := context.WithCancel(ctx2)
+ peerChan2c, err := listenForPeersAs(ctx2c, rootp, "client2")
+ if err != nil {
+ panic(err)
+ }
+ checkExpectedPeers(t, peerChan2c, 3, 0)
+
+ // Stop Peer 1 by canceling its context and waiting for the ad to disappear.
+ cancel1()
+ <-done1
+
+ // The "remove" notification goroutines might race, so wait a little bit.
+ // It is okay to wait since this is purely for test simplification purposes.
+ <-time.After(time.Millisecond * 10)
+
+ // 2c also sees lost 1.
+ checkExpectedPeers(t, peerChan2c, 0, 1)
+
+ // 2d and 3d: Peer 2 and Peer 3 see each other and nobody else.
+ ctx2d, cancel2d := context.WithCancel(ctx2)
+ peerChan2d, err := listenForPeersAs(ctx2d, rootp, "client2")
+ if err != nil {
+ panic(err)
+ }
+ checkExpectedPeers(t, peerChan2d, 2, 0)
+ cancel2d()
+ confirmCleanupPeer(t, peerChan2d)
+
+ ctx3d, cancel3d := context.WithCancel(ctx3)
+ peerChan3d, err := listenForPeersAs(ctx3d, rootp, "client3")
+ if err != nil {
+ panic(err)
+ }
+ checkExpectedPeers(t, peerChan3d, 2, 0)
+ cancel3d()
+ confirmCleanupPeer(t, peerChan3d)
+
+ // Stop Peer 3 by canceling its context and waiting for the ad to disappear.
+ cancel3()
+ <-done3
+
+ // The "remove" notification goroutines might race, so wait a little bit.
+ // It is okay to wait since this is purely for test simplification purposes.
+ <-time.After(time.Millisecond * 10)
+
+ // 2c also sees lost 3.
+ checkExpectedPeers(t, peerChan2c, 0, 1)
+
+ // We're done with 2c, who saw 3 updates (1, 2, 3) and 2 losses (1 and 3)
+ cancel2c()
+ confirmCleanupPeer(t, peerChan2c)
+
+ // 2a should see 1, 2, and lose 1. It won't notice #3 since the channel won't buffer it.
+ checkExpectedPeers(t, peerChan2a, 2, 1)
+ cancel2a()
+ confirmCleanupPeer(t, peerChan2a)
+
+ // 2e has seen 2 found updates earlier, so peer 3 is buffered in the channel.
+ // We should see peer 3 and two loss events.
+ checkExpectedPeers(t, peerChan2e, 1, 2)
+ cancel2e()
+ confirmCleanupPeer(t, peerChan2e)
+}
+
+func listenForPeersAs(ctx *context.T, rootp security.Principal, as string) (<-chan syncdis.Peer, error) {
+ ch := make(chan syncdis.Peer)
+ return ch, syncdis.ListenForPeers(tu.NewCtx(ctx, rootp, as), ch)
+}
+
+func checkExpectedPeers(t *testing.T, ch <-chan syncdis.Peer, found int, lost int) {
+ counter := 0
+ for i := 0; i < found+lost; i++ {
+ select {
+ case peer, ok := <-ch:
+ if !ok {
+ t.Error("peer channel shouldn't be closed yet")
+ } else {
+ if !peer.Lost {
+ counter++
+ }
+ }
+ case <-time.After(time.Second * 1):
+ t.Errorf("timed out without seeing enough peers. Found %d in %d updates, but needed %d in %d", counter, i, found, found+lost)
+ return
+ }
+ }
+
+ if counter != found {
+ t.Errorf("Found %d peers, expected %d", counter, found)
+ }
+}
+
+func confirmCleanupPeer(t *testing.T, ch <-chan syncdis.Peer) {
+ unwanted, ok := <-ch
+ if ok {
+ t.Errorf("found unwanted peer update %v", unwanted)
+ }
+}