syncbase/discovery: Introduce global discovery.

The syncbase and invites code doesn't enable global discovery yet.
We can add it when desired (or now if its desired immediately).

Change-Id: I3925edbdc63ea3acccfb7817dc4b020b0faeac00
diff --git a/services/syncbase/discovery/discovery.go b/services/syncbase/discovery/discovery.go
index 2688d27..fa0d09c 100644
--- a/services/syncbase/discovery/discovery.go
+++ b/services/syncbase/discovery/discovery.go
@@ -9,6 +9,7 @@
 	"fmt"
 	"sort"
 	"strings"
+	"time"
 
 	"v.io/v23"
 	"v.io/v23/context"
@@ -16,10 +17,16 @@
 	"v.io/v23/security"
 	wire "v.io/v23/services/syncbase"
 	"v.io/x/lib/nsync"
+	"v.io/x/ref/lib/discovery/global"
 	"v.io/x/ref/services/syncbase/server/interfaces"
 )
 
-const visibilityKey = "vis"
+const (
+	visibilityKey = "vis"
+
+	nhDiscoveryKey = iota
+	globalDiscoveryKey
+)
 
 // Discovery implements v.io/v23/discovery.T for syncbase based
 // applications.
@@ -27,25 +34,46 @@
 // point we should just replace the result of v23.NewDiscovery
 // with this.
 type Discovery struct {
-	nhDiscovery discovery.T
-	// TODO(mattr): Add global discovery.
+	nhDiscovery     discovery.T
+	globalDiscovery discovery.T
 }
 
 // NewDiscovery creates a new syncbase discovery object.
-func NewDiscovery(ctx *context.T) (discovery.T, error) {
-	nhDiscovery, err := v23.NewDiscovery(ctx)
-	if err != nil {
+// globalDiscoveryPath is the path in the namespace where global disovery
+// advertisements will be mounted.
+// If globalDiscoveryPath is empty, no global discovery service will be created.
+// globalScanInterval is the interval at which global discovery will be refreshed.
+// If globalScanInterval is 0, the defaultScanInterval of global discovery will
+// be used.
+func NewDiscovery(ctx *context.T, globalDiscoveryPath string, globalScanInterval time.Duration) (discovery.T, error) {
+	d := &Discovery{}
+	var err error
+	if d.nhDiscovery, err = v23.NewDiscovery(ctx); err != nil {
 		return nil, err
 	}
-	return &Discovery{nhDiscovery: nhDiscovery}, nil
+	if globalDiscoveryPath != "" {
+		if d.globalDiscovery, err = global.NewWithTTL(ctx, globalDiscoveryPath, 0, globalScanInterval); err != nil {
+			return nil, err
+		}
+	}
+	return d, nil
 }
 
 // Scan implements v.io/v23/discovery/T.Scan.
 func (d *Discovery) Scan(ctx *context.T, query string) (<-chan discovery.Update, error) {
-	nhUpdates, err := d.nhDiscovery.Scan(ctx, query)
+	nhCtx, nhCancel := context.WithCancel(ctx)
+	nhUpdates, err := d.nhDiscovery.Scan(nhCtx, query)
 	if err != nil {
+		nhCancel()
 		return nil, err
 	}
+	var globalUpdates <-chan discovery.Update
+	if d.globalDiscovery != nil {
+		if globalUpdates, err = d.globalDiscovery.Scan(ctx, query); err != nil {
+			nhCancel()
+			return nil, err
+		}
+	}
 
 	// Currently setting visibility on the neighborhood discovery
 	// service turns IBE encryption on.  We currently don't have the
@@ -57,27 +85,72 @@
 	updates := make(chan discovery.Update)
 	go func() {
 		defer close(updates)
+		seen := make(map[discovery.AdId]*updateRef)
 		for {
 			var u discovery.Update
+			var src uint // key of the source discovery service where the update came from
 			select {
 			case <-ctx.Done():
 				return
 			case u = <-nhUpdates:
+				src = nhDiscoveryKey
+			case u = <-globalUpdates:
+				src = globalDiscoveryKey
 			}
-			if u == nil {
-				continue
-			}
-			patterns := splitPatterns(u.Attribute(visibilityKey))
-			if len(patterns) > 0 && !matchesPatterns(ctx, patterns) {
-				continue
-			}
-			updates <- update{u}
+			d.handleUpdate(ctx, u, src, seen, updates)
 		}
 	}()
 
 	return updates, nil
 }
 
+func (d *Discovery) handleUpdate(ctx *context.T, u discovery.Update, src uint, seen map[discovery.AdId]*updateRef, updates chan discovery.Update) {
+	if u == nil {
+		return
+	}
+	patterns := splitPatterns(u.Attribute(visibilityKey))
+	if len(patterns) > 0 && !matchesPatterns(ctx, patterns) {
+		return
+	}
+
+	id := u.Id()
+	prev := seen[id]
+	if u.IsLost() {
+		// Only send the lost noitification if a found event was previously seen,
+		// and all discovery services that found it have lost it.
+		if prev == nil || !prev.unset(src) {
+			return
+		}
+		delete(seen, id)
+		updates <- update{Update: u, lost: true}
+		return
+	}
+
+	if prev == nil {
+		// Always send updates for updates that we have never seen before.
+		ref := &updateRef{update: u}
+		ref.set(src)
+		seen[id] = ref
+		updates <- update{Update: u}
+		return
+	}
+
+	if differ := updatesDiffer(prev.update, u); (differ && u.Timestamp().After(prev.update.Timestamp())) ||
+		(!differ && src == nhDiscoveryKey && len(u.Advertisement().Attachments) > 0) {
+		// If the updates differ and the newly found update has a later time than
+		// previously found one, lose prev and find new.
+		// Or, if the update doesn't differ, but is from neighborhood discovery, it
+		// could have more information since we don't yet encode attachements in
+		// global discovery.
+		updates <- update{Update: prev.update, lost: true}
+		ref := &updateRef{update: u}
+		ref.set(src)
+		seen[id] = ref
+		updates <- update{Update: u}
+		return
+	}
+}
+
 // Advertise implements v.io/v23/discovery/T.Advertise.
 func (d *Discovery) Advertise(ctx *context.T, ad *discovery.Advertisement, visibility []security.BlessingPattern) (<-chan struct{}, error) {
 	// Currently setting visibility on the neighborhood discovery
@@ -96,9 +169,66 @@
 		patterns := joinPatterns(visibility)
 		adCopy.Attributes[visibilityKey] = patterns
 	}
-	ch, err := d.nhDiscovery.Advertise(ctx, &adCopy, nil)
+
+	stopped := make(chan struct{})
+	nhCtx, nhCancel := context.WithCancel(ctx)
+	nhStopped, err := d.nhDiscovery.Advertise(nhCtx, &adCopy, nil)
+	if err != nil {
+		nhCancel()
+		return nil, err
+	}
+	var globalStopped <-chan struct{}
+	if d.globalDiscovery != nil {
+		if globalStopped, err = d.globalDiscovery.Advertise(ctx, &adCopy, nil); err != nil {
+			nhCancel()
+			<-nhStopped
+			return nil, err
+		}
+	}
+	go func() {
+		<-nhStopped
+		if d.globalDiscovery != nil {
+			<-globalStopped
+		}
+		close(stopped)
+	}()
 	ad.Id = adCopy.Id
-	return ch, err
+	return stopped, nil
+}
+
+func updatesDiffer(a, b discovery.Update) bool {
+	if !sortedStringsEqual(a.Addresses(), b.Addresses()) {
+		return true
+	}
+	if !mapsEqual(a.Advertisement().Attributes, b.Advertisement().Attributes) {
+		return true
+	}
+	return false
+}
+
+func mapsEqual(a, b map[string]string) bool {
+	if len(a) != len(b) {
+		return false
+	}
+	for ka, va := range a {
+		if vb, ok := b[ka]; !ok || va != vb {
+			return false
+		}
+	}
+	return true
+}
+
+func sortedStringsEqual(a, b []string) bool {
+	// We want to make a nil and an empty slices equal to avoid unnecessary inequality by that.
+	if len(a) != len(b) {
+		return false
+	}
+	for i, v := range a {
+		if v != b[i] {
+			return false
+		}
+	}
+	return true
 }
 
 func matchesPatterns(ctx *context.T, patterns []security.BlessingPattern) bool {
@@ -115,11 +245,40 @@
 	return false
 }
 
-// update wraps the discovery.Update to remove the visibility attribute which we add.
+type updateRef struct {
+	update    discovery.Update
+	nhRef     bool
+	globalRef bool
+}
+
+func (r *updateRef) set(d uint) {
+	switch d {
+	case nhDiscoveryKey:
+		r.nhRef = true
+	case globalDiscoveryKey:
+		r.globalRef = true
+	}
+}
+
+func (r *updateRef) unset(d uint) bool {
+	switch d {
+	case nhDiscoveryKey:
+		r.nhRef = false
+	case globalDiscoveryKey:
+		r.globalRef = false
+	}
+	return !r.nhRef && !r.globalRef
+}
+
+// update wraps the discovery.Update to remove the visibility attribute which we add
+// and allows us to mark the update as lost.
 type update struct {
 	discovery.Update
+	lost bool
 }
 
+func (u update) IsLost() bool { return u.lost }
+
 func (u update) Attribute(name string) string {
 	if name == visibilityKey {
 		return ""
@@ -206,7 +365,8 @@
 		dbs:    make(map[wire.Id]*inviteQueue),
 		cancel: cancel,
 	}
-	d, err := NewDiscovery(ctx)
+	// TODO(suharshs): Add globalDiscoveryPath.
+	d, err := NewDiscovery(ctx, "", 0)
 	if err != nil {
 		scan.cancel()
 		return nil, err
diff --git a/services/syncbase/discovery/discovery_test.go b/services/syncbase/discovery/discovery_test.go
index 85bbb3a..c33614f 100644
--- a/services/syncbase/discovery/discovery_test.go
+++ b/services/syncbase/discovery/discovery_test.go
@@ -18,9 +18,14 @@
 	wire "v.io/v23/services/syncbase"
 	"v.io/v23/syncbase"
 	"v.io/v23/verror"
+	idiscovery "v.io/x/ref/lib/discovery"
+	fdiscovery "v.io/x/ref/lib/discovery/factory"
+	"v.io/x/ref/lib/discovery/global"
+	"v.io/x/ref/lib/discovery/plugins/mock"
 	_ "v.io/x/ref/runtime/factories/roaming"
 	syncdis "v.io/x/ref/services/syncbase/discovery"
 	tu "v.io/x/ref/services/syncbase/testutil"
+	"v.io/x/ref/test"
 	"v.io/x/ref/test/testutil"
 )
 
@@ -132,7 +137,7 @@
 	if err != nil {
 		return nil, err
 	}
-	dis, err := syncdis.NewDiscovery(ctx)
+	dis, err := syncdis.NewDiscovery(ctx, "", 0)
 	if err != nil {
 		return nil, err
 	}
@@ -176,6 +181,100 @@
 	}
 }
 
+func TestGlobalSyncbaseDiscovery(t *testing.T) {
+	ctx, shutdown := test.V23InitWithMounttable()
+	defer shutdown()
+	// Create syncbase discovery service with mock neighborhood discovery.
+	p := mock.New()
+	df, _ := idiscovery.NewFactory(ctx, p)
+	defer df.Shutdown()
+	fdiscovery.InjectFactory(df)
+	const globalDiscoveryPath = "syncdis"
+	gdis, err := global.New(ctx, globalDiscoveryPath)
+	if err != nil {
+		t.Fatal(err)
+	}
+	dis, err := syncdis.NewDiscovery(ctx, globalDiscoveryPath, 100*time.Millisecond)
+	if err != nil {
+		t.Fatal(err)
+	}
+	// Start the syncbase discovery scan.
+	scanCh, err := dis.Scan(ctx, ``)
+	if err != nil {
+		t.Fatal(err)
+	}
+	// The ids of all the ads match.
+	ad := &discovery.Advertisement{
+		Id:            discovery.AdId{1, 2, 3},
+		InterfaceName: "v.io/a",
+		Addresses:     []string{"/h1:123/x", "/h2:123/y"},
+		Attributes:    discovery.Attributes{"a": "v"},
+	}
+	newad := &discovery.Advertisement{
+		Id:            discovery.AdId{1, 2, 3},
+		InterfaceName: "v.io/a",
+		Addresses:     []string{"/h1:123/x", "/h3:123/z"},
+		Attributes:    discovery.Attributes{"a": "v"},
+	}
+	newadattach := &discovery.Advertisement{
+		Id:            discovery.AdId{1, 2, 3},
+		InterfaceName: "v.io/a",
+		Addresses:     []string{"/h1:123/x", "/h3:123/z"},
+		Attributes:    discovery.Attributes{"a": "v"},
+		Attachments:   discovery.Attachments{"k": []byte("v")},
+	}
+	adinfo := &idiscovery.AdInfo{
+		Ad:          *ad,
+		Hash:        idiscovery.AdHash{1, 2, 3},
+		TimestampNs: time.Now().UnixNano(),
+	}
+	newadattachinfo := &idiscovery.AdInfo{
+		Ad:          *newadattach,
+		Hash:        idiscovery.AdHash{3, 2, 1},
+		TimestampNs: time.Now().UnixNano(),
+	}
+	// Advertise ad via both services, we should get a found update.
+	adCtx, adCancel := context.WithCancel(ctx)
+	p.RegisterAd(adinfo)
+	adStopped, err := gdis.Advertise(adCtx, ad, nil)
+	if err != nil {
+		t.Error(err)
+	}
+	if err := expect(scanCh, &ad.Id, find, ad.Attributes); err != nil {
+		t.Error(err)
+	}
+	// Lost via both services, we should get a lost update.
+	p.UnregisterAd(adinfo)
+	adCancel()
+	<-adStopped
+	if err := expect(scanCh, &ad.Id, lose, ad.Attributes); err != nil {
+		t.Error(err)
+	}
+	// Advertise via mock plugin, we should get a found update.
+	p.RegisterAd(adinfo)
+	if err := expect(scanCh, &ad.Id, find, ad.Attributes); err != nil {
+		t.Error(err)
+	}
+	// Advertise via global discovery with a newer changed ad, we should get a lost ad
+	// update and found ad update.
+	adCtx, adCancel = context.WithCancel(ctx)
+	adStopped, err = gdis.Advertise(adCtx, newad, nil)
+	if err != nil {
+		t.Error(err)
+	}
+	if err := expect(scanCh, &newad.Id, both, newad.Attributes); err != nil {
+		t.Error(err)
+	}
+	// If we advertise an ad from neighborhood discovery that is identical to the
+	// ad found from global discovery, but has attachments, we should prefer it.
+	p.RegisterAd(newadattachinfo)
+	if err := expect(scanCh, &newad.Id, both, newad.Attributes); err != nil {
+		t.Error(err)
+	}
+	adCancel()
+	<-adStopped
+}
+
 func TestListenForInvites(t *testing.T) {
 	_, ctx, sName, rootp, cleanup := tu.SetupOrDieCustom(
 		"o:app1:client1",
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 4b45d4c..20eabc5 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -163,7 +163,8 @@
 // that the syncer can pick from. In addition, the sync module responds to
 // incoming RPCs from remote sync modules and local clients.
 func New(ctx *context.T, sv interfaces.Service, blobStEngine, blobRootDir string, cl *vclock.VClock, publishInNh bool) (*syncService, error) {
-	discovery, err := syncdis.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{}))
+	// TODO(suharshs): Enable global discovery.
+	discovery, err := syncdis.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{}), "", 0)
 	if err != nil {
 		return nil, err
 	}