syncbase/discovery: Introduce global discovery.
The syncbase and invites code doesn't enable global discovery yet.
We can add it when desired (or now if its desired immediately).
Change-Id: I3925edbdc63ea3acccfb7817dc4b020b0faeac00
diff --git a/services/syncbase/discovery/discovery.go b/services/syncbase/discovery/discovery.go
index 2688d27..fa0d09c 100644
--- a/services/syncbase/discovery/discovery.go
+++ b/services/syncbase/discovery/discovery.go
@@ -9,6 +9,7 @@
"fmt"
"sort"
"strings"
+ "time"
"v.io/v23"
"v.io/v23/context"
@@ -16,10 +17,16 @@
"v.io/v23/security"
wire "v.io/v23/services/syncbase"
"v.io/x/lib/nsync"
+ "v.io/x/ref/lib/discovery/global"
"v.io/x/ref/services/syncbase/server/interfaces"
)
-const visibilityKey = "vis"
+const (
+ visibilityKey = "vis"
+
+ nhDiscoveryKey = iota
+ globalDiscoveryKey
+)
// Discovery implements v.io/v23/discovery.T for syncbase based
// applications.
@@ -27,25 +34,46 @@
// point we should just replace the result of v23.NewDiscovery
// with this.
type Discovery struct {
- nhDiscovery discovery.T
- // TODO(mattr): Add global discovery.
+ nhDiscovery discovery.T
+ globalDiscovery discovery.T
}
// NewDiscovery creates a new syncbase discovery object.
-func NewDiscovery(ctx *context.T) (discovery.T, error) {
- nhDiscovery, err := v23.NewDiscovery(ctx)
- if err != nil {
+// globalDiscoveryPath is the path in the namespace where global disovery
+// advertisements will be mounted.
+// If globalDiscoveryPath is empty, no global discovery service will be created.
+// globalScanInterval is the interval at which global discovery will be refreshed.
+// If globalScanInterval is 0, the defaultScanInterval of global discovery will
+// be used.
+func NewDiscovery(ctx *context.T, globalDiscoveryPath string, globalScanInterval time.Duration) (discovery.T, error) {
+ d := &Discovery{}
+ var err error
+ if d.nhDiscovery, err = v23.NewDiscovery(ctx); err != nil {
return nil, err
}
- return &Discovery{nhDiscovery: nhDiscovery}, nil
+ if globalDiscoveryPath != "" {
+ if d.globalDiscovery, err = global.NewWithTTL(ctx, globalDiscoveryPath, 0, globalScanInterval); err != nil {
+ return nil, err
+ }
+ }
+ return d, nil
}
// Scan implements v.io/v23/discovery/T.Scan.
func (d *Discovery) Scan(ctx *context.T, query string) (<-chan discovery.Update, error) {
- nhUpdates, err := d.nhDiscovery.Scan(ctx, query)
+ nhCtx, nhCancel := context.WithCancel(ctx)
+ nhUpdates, err := d.nhDiscovery.Scan(nhCtx, query)
if err != nil {
+ nhCancel()
return nil, err
}
+ var globalUpdates <-chan discovery.Update
+ if d.globalDiscovery != nil {
+ if globalUpdates, err = d.globalDiscovery.Scan(ctx, query); err != nil {
+ nhCancel()
+ return nil, err
+ }
+ }
// Currently setting visibility on the neighborhood discovery
// service turns IBE encryption on. We currently don't have the
@@ -57,27 +85,72 @@
updates := make(chan discovery.Update)
go func() {
defer close(updates)
+ seen := make(map[discovery.AdId]*updateRef)
for {
var u discovery.Update
+ var src uint // key of the source discovery service where the update came from
select {
case <-ctx.Done():
return
case u = <-nhUpdates:
+ src = nhDiscoveryKey
+ case u = <-globalUpdates:
+ src = globalDiscoveryKey
}
- if u == nil {
- continue
- }
- patterns := splitPatterns(u.Attribute(visibilityKey))
- if len(patterns) > 0 && !matchesPatterns(ctx, patterns) {
- continue
- }
- updates <- update{u}
+ d.handleUpdate(ctx, u, src, seen, updates)
}
}()
return updates, nil
}
+func (d *Discovery) handleUpdate(ctx *context.T, u discovery.Update, src uint, seen map[discovery.AdId]*updateRef, updates chan discovery.Update) {
+ if u == nil {
+ return
+ }
+ patterns := splitPatterns(u.Attribute(visibilityKey))
+ if len(patterns) > 0 && !matchesPatterns(ctx, patterns) {
+ return
+ }
+
+ id := u.Id()
+ prev := seen[id]
+ if u.IsLost() {
+ // Only send the lost noitification if a found event was previously seen,
+ // and all discovery services that found it have lost it.
+ if prev == nil || !prev.unset(src) {
+ return
+ }
+ delete(seen, id)
+ updates <- update{Update: u, lost: true}
+ return
+ }
+
+ if prev == nil {
+ // Always send updates for updates that we have never seen before.
+ ref := &updateRef{update: u}
+ ref.set(src)
+ seen[id] = ref
+ updates <- update{Update: u}
+ return
+ }
+
+ if differ := updatesDiffer(prev.update, u); (differ && u.Timestamp().After(prev.update.Timestamp())) ||
+ (!differ && src == nhDiscoveryKey && len(u.Advertisement().Attachments) > 0) {
+ // If the updates differ and the newly found update has a later time than
+ // previously found one, lose prev and find new.
+ // Or, if the update doesn't differ, but is from neighborhood discovery, it
+ // could have more information since we don't yet encode attachements in
+ // global discovery.
+ updates <- update{Update: prev.update, lost: true}
+ ref := &updateRef{update: u}
+ ref.set(src)
+ seen[id] = ref
+ updates <- update{Update: u}
+ return
+ }
+}
+
// Advertise implements v.io/v23/discovery/T.Advertise.
func (d *Discovery) Advertise(ctx *context.T, ad *discovery.Advertisement, visibility []security.BlessingPattern) (<-chan struct{}, error) {
// Currently setting visibility on the neighborhood discovery
@@ -96,9 +169,66 @@
patterns := joinPatterns(visibility)
adCopy.Attributes[visibilityKey] = patterns
}
- ch, err := d.nhDiscovery.Advertise(ctx, &adCopy, nil)
+
+ stopped := make(chan struct{})
+ nhCtx, nhCancel := context.WithCancel(ctx)
+ nhStopped, err := d.nhDiscovery.Advertise(nhCtx, &adCopy, nil)
+ if err != nil {
+ nhCancel()
+ return nil, err
+ }
+ var globalStopped <-chan struct{}
+ if d.globalDiscovery != nil {
+ if globalStopped, err = d.globalDiscovery.Advertise(ctx, &adCopy, nil); err != nil {
+ nhCancel()
+ <-nhStopped
+ return nil, err
+ }
+ }
+ go func() {
+ <-nhStopped
+ if d.globalDiscovery != nil {
+ <-globalStopped
+ }
+ close(stopped)
+ }()
ad.Id = adCopy.Id
- return ch, err
+ return stopped, nil
+}
+
+func updatesDiffer(a, b discovery.Update) bool {
+ if !sortedStringsEqual(a.Addresses(), b.Addresses()) {
+ return true
+ }
+ if !mapsEqual(a.Advertisement().Attributes, b.Advertisement().Attributes) {
+ return true
+ }
+ return false
+}
+
+func mapsEqual(a, b map[string]string) bool {
+ if len(a) != len(b) {
+ return false
+ }
+ for ka, va := range a {
+ if vb, ok := b[ka]; !ok || va != vb {
+ return false
+ }
+ }
+ return true
+}
+
+func sortedStringsEqual(a, b []string) bool {
+ // We want to make a nil and an empty slices equal to avoid unnecessary inequality by that.
+ if len(a) != len(b) {
+ return false
+ }
+ for i, v := range a {
+ if v != b[i] {
+ return false
+ }
+ }
+ return true
}
func matchesPatterns(ctx *context.T, patterns []security.BlessingPattern) bool {
@@ -115,11 +245,40 @@
return false
}
-// update wraps the discovery.Update to remove the visibility attribute which we add.
+type updateRef struct {
+ update discovery.Update
+ nhRef bool
+ globalRef bool
+}
+
+func (r *updateRef) set(d uint) {
+ switch d {
+ case nhDiscoveryKey:
+ r.nhRef = true
+ case globalDiscoveryKey:
+ r.globalRef = true
+ }
+}
+
+func (r *updateRef) unset(d uint) bool {
+ switch d {
+ case nhDiscoveryKey:
+ r.nhRef = false
+ case globalDiscoveryKey:
+ r.globalRef = false
+ }
+ return !r.nhRef && !r.globalRef
+}
+
+// update wraps the discovery.Update to remove the visibility attribute which we add
+// and allows us to mark the update as lost.
type update struct {
discovery.Update
+ lost bool
}
+func (u update) IsLost() bool { return u.lost }
+
func (u update) Attribute(name string) string {
if name == visibilityKey {
return ""
@@ -206,7 +365,8 @@
dbs: make(map[wire.Id]*inviteQueue),
cancel: cancel,
}
- d, err := NewDiscovery(ctx)
+ // TODO(suharshs): Add globalDiscoveryPath.
+ d, err := NewDiscovery(ctx, "", 0)
if err != nil {
scan.cancel()
return nil, err
diff --git a/services/syncbase/discovery/discovery_test.go b/services/syncbase/discovery/discovery_test.go
index 85bbb3a..c33614f 100644
--- a/services/syncbase/discovery/discovery_test.go
+++ b/services/syncbase/discovery/discovery_test.go
@@ -18,9 +18,14 @@
wire "v.io/v23/services/syncbase"
"v.io/v23/syncbase"
"v.io/v23/verror"
+ idiscovery "v.io/x/ref/lib/discovery"
+ fdiscovery "v.io/x/ref/lib/discovery/factory"
+ "v.io/x/ref/lib/discovery/global"
+ "v.io/x/ref/lib/discovery/plugins/mock"
_ "v.io/x/ref/runtime/factories/roaming"
syncdis "v.io/x/ref/services/syncbase/discovery"
tu "v.io/x/ref/services/syncbase/testutil"
+ "v.io/x/ref/test"
"v.io/x/ref/test/testutil"
)
@@ -132,7 +137,7 @@
if err != nil {
return nil, err
}
- dis, err := syncdis.NewDiscovery(ctx)
+ dis, err := syncdis.NewDiscovery(ctx, "", 0)
if err != nil {
return nil, err
}
@@ -176,6 +181,100 @@
}
}
+func TestGlobalSyncbaseDiscovery(t *testing.T) {
+ ctx, shutdown := test.V23InitWithMounttable()
+ defer shutdown()
+ // Create syncbase discovery service with mock neighborhood discovery.
+ p := mock.New()
+ df, _ := idiscovery.NewFactory(ctx, p)
+ defer df.Shutdown()
+ fdiscovery.InjectFactory(df)
+ const globalDiscoveryPath = "syncdis"
+ gdis, err := global.New(ctx, globalDiscoveryPath)
+ if err != nil {
+ t.Fatal(err)
+ }
+ dis, err := syncdis.NewDiscovery(ctx, globalDiscoveryPath, 100*time.Millisecond)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // Start the syncbase discovery scan.
+ scanCh, err := dis.Scan(ctx, ``)
+ if err != nil {
+ t.Fatal(err)
+ }
+ // The ids of all the ads match.
+ ad := &discovery.Advertisement{
+ Id: discovery.AdId{1, 2, 3},
+ InterfaceName: "v.io/a",
+ Addresses: []string{"/h1:123/x", "/h2:123/y"},
+ Attributes: discovery.Attributes{"a": "v"},
+ }
+ newad := &discovery.Advertisement{
+ Id: discovery.AdId{1, 2, 3},
+ InterfaceName: "v.io/a",
+ Addresses: []string{"/h1:123/x", "/h3:123/z"},
+ Attributes: discovery.Attributes{"a": "v"},
+ }
+ newadattach := &discovery.Advertisement{
+ Id: discovery.AdId{1, 2, 3},
+ InterfaceName: "v.io/a",
+ Addresses: []string{"/h1:123/x", "/h3:123/z"},
+ Attributes: discovery.Attributes{"a": "v"},
+ Attachments: discovery.Attachments{"k": []byte("v")},
+ }
+ adinfo := &idiscovery.AdInfo{
+ Ad: *ad,
+ Hash: idiscovery.AdHash{1, 2, 3},
+ TimestampNs: time.Now().UnixNano(),
+ }
+ newadattachinfo := &idiscovery.AdInfo{
+ Ad: *newadattach,
+ Hash: idiscovery.AdHash{3, 2, 1},
+ TimestampNs: time.Now().UnixNano(),
+ }
+ // Advertise ad via both services, we should get a found update.
+ adCtx, adCancel := context.WithCancel(ctx)
+ p.RegisterAd(adinfo)
+ adStopped, err := gdis.Advertise(adCtx, ad, nil)
+ if err != nil {
+ t.Error(err)
+ }
+ if err := expect(scanCh, &ad.Id, find, ad.Attributes); err != nil {
+ t.Error(err)
+ }
+ // Lost via both services, we should get a lost update.
+ p.UnregisterAd(adinfo)
+ adCancel()
+ <-adStopped
+ if err := expect(scanCh, &ad.Id, lose, ad.Attributes); err != nil {
+ t.Error(err)
+ }
+ // Advertise via mock plugin, we should get a found update.
+ p.RegisterAd(adinfo)
+ if err := expect(scanCh, &ad.Id, find, ad.Attributes); err != nil {
+ t.Error(err)
+ }
+ // Advertise via global discovery with a newer changed ad, we should get a lost ad
+ // update and found ad update.
+ adCtx, adCancel = context.WithCancel(ctx)
+ adStopped, err = gdis.Advertise(adCtx, newad, nil)
+ if err != nil {
+ t.Error(err)
+ }
+ if err := expect(scanCh, &newad.Id, both, newad.Attributes); err != nil {
+ t.Error(err)
+ }
+ // If we advertise an ad from neighborhood discovery that is identical to the
+ // ad found from global discovery, but has attachments, we should prefer it.
+ p.RegisterAd(newadattachinfo)
+ if err := expect(scanCh, &newad.Id, both, newad.Attributes); err != nil {
+ t.Error(err)
+ }
+ adCancel()
+ <-adStopped
+}
+
func TestListenForInvites(t *testing.T) {
_, ctx, sName, rootp, cleanup := tu.SetupOrDieCustom(
"o:app1:client1",
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 4b45d4c..20eabc5 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -163,7 +163,8 @@
// that the syncer can pick from. In addition, the sync module responds to
// incoming RPCs from remote sync modules and local clients.
func New(ctx *context.T, sv interfaces.Service, blobStEngine, blobRootDir string, cl *vclock.VClock, publishInNh bool) (*syncService, error) {
- discovery, err := syncdis.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{}))
+ // TODO(suharshs): Enable global discovery.
+ discovery, err := syncdis.NewDiscovery(v23.WithListenSpec(ctx, rpc.ListenSpec{}), "", 0)
if err != nil {
return nil, err
}