discovery: support large advertisements
Support large advertisements by
- discovery service brings up its own directory server so that
we can fetch large advertisements through vanadium RPC without
being restricted by the underlying protocol.
- if the advertisement is too large for each plugin to advertise,
it will try to send only mandatary fields like interface name,
hash, and the directory server address.
- during scanning, if the discovered advertisement does not have
an enough information to query against it, it will be fetched
from the directory server directly.
- attachments are optional. so directory server will not send
if they are too big. In this case, each large attachment will
be fetched through additional RPCs when it is requested.
* plugins will be updated in the following CL.
Change-Id: If9d615ecb54a8cc3da2ccbd3610e67090d2a7340
diff --git a/lib/discovery/advertise.go b/lib/discovery/advertise.go
index ce923e2..ebf3d5f 100644
--- a/lib/discovery/advertise.go
+++ b/lib/discovery/advertise.go
@@ -5,6 +5,8 @@
package discovery
import (
+ "sync"
+
"v.io/v23/context"
"v.io/v23/discovery"
"v.io/v23/security"
@@ -25,47 +27,130 @@
if err := encrypt(ctx, adinfo, visibility); err != nil {
return nil, err
}
- HashAd(adinfo)
+ hashAd(adinfo)
ctx, cancel, err := d.addTask(ctx)
if err != nil {
return nil, err
}
- if !d.addAd(session, adinfo) {
+ id := adinfo.Ad.Id
+ if !d.addAd(id, session) {
cancel()
d.removeTask(ctx)
- return nil, NewErrAlreadyBeingAdvertised(ctx, adinfo.Ad.Id)
+ return nil, NewErrAlreadyBeingAdvertised(ctx, id)
}
+ subtask := &adSubtask{parent: ctx}
+ d.adMu.Lock()
+ d.adSubtasks[id] = subtask
+ d.adMu.Unlock()
+
done := make(chan struct{})
- barrier := NewBarrier(func() {
- d.removeAd(adinfo)
+ stop := func() {
+ d.stopAdvertising(id)
+ d.dirServer.unpublish(id)
+ d.removeAd(id)
d.removeTask(ctx)
close(done)
- })
- for _, plugin := range d.plugins {
- if err := plugin.Advertise(ctx, adinfo, barrier.Add()); err != nil {
- cancel()
- return nil, err
- }
}
+
+ // Lock the subtask to prevent any update from directory server endpoint changes while
+ // the advertising is being started to not lose any endpoint change during starting.
+ subtask.mu.Lock()
+ d.dirServer.publish(adinfo)
+ subtask.stop, err = d.startAdvertising(ctx, adinfo)
+ subtask.mu.Unlock()
+ if err != nil {
+ cancel()
+ stop()
+ return nil, err
+ }
+ d.adStopTrigger.Add(stop, ctx.Done())
return done, nil
}
-func (d *idiscovery) addAd(session sessionId, adinfo *AdInfo) bool {
- d.mu.Lock()
- if _, exist := d.ads[adinfo.Ad.Id]; exist {
- d.mu.Unlock()
+func (d *idiscovery) addAd(id discovery.AdId, session sessionId) bool {
+ d.adMu.Lock()
+ if _, exist := d.adSessions[id]; exist {
+ d.adMu.Unlock()
return false
}
- d.ads[adinfo.Ad.Id] = session
- d.mu.Unlock()
+ d.adSessions[id] = session
+ d.adMu.Unlock()
return true
}
-func (d *idiscovery) removeAd(adinfo *AdInfo) {
- d.mu.Lock()
- delete(d.ads, adinfo.Ad.Id)
- d.mu.Unlock()
+func (d *idiscovery) removeAd(id discovery.AdId) {
+ d.adMu.Lock()
+ delete(d.adSessions, id)
+ d.adMu.Unlock()
+}
+
+func (d *idiscovery) getAdSession(id discovery.AdId) sessionId {
+ d.adMu.Lock()
+ session := d.adSessions[id]
+ d.adMu.Unlock()
+ return session
+}
+
+func (d *idiscovery) startAdvertising(ctx *context.T, adinfo *AdInfo) (func(), error) {
+ ctx, cancel := context.WithCancel(ctx)
+ var wg sync.WaitGroup
+ for _, plugin := range d.plugins {
+ wg.Add(1)
+ if err := plugin.Advertise(ctx, adinfo, wg.Done); err != nil {
+ return nil, err
+ }
+ }
+
+ stop := func() {
+ cancel()
+ wg.Wait()
+ }
+ return stop, nil
+}
+
+func (d *idiscovery) stopAdvertising(id discovery.AdId) {
+ d.adMu.Lock()
+ subtask := d.adSubtasks[id]
+ delete(d.adSubtasks, id)
+ d.adMu.Unlock()
+ if subtask == nil {
+ return
+ }
+
+ subtask.mu.Lock()
+ if subtask.stop != nil {
+ subtask.stop()
+ subtask.stop = nil
+ }
+ subtask.mu.Unlock()
+}
+
+func (d *idiscovery) updateAdvertising(adinfo *AdInfo) {
+ d.adMu.Lock()
+ subtask := d.adSubtasks[adinfo.Ad.Id]
+ if subtask == nil {
+ d.adMu.Unlock()
+ return
+ }
+ d.adMu.Unlock()
+
+ subtask.mu.Lock()
+ defer subtask.mu.Unlock()
+
+ if subtask.stop == nil {
+ return
+ }
+ subtask.stop()
+
+ ctx := subtask.parent
+
+ var err error
+ subtask.stop, err = d.startAdvertising(ctx, adinfo)
+ if err != nil {
+ ctx.Error(err)
+ d.cancelTask(ctx)
+ }
}
diff --git a/lib/discovery/advertise_server.go b/lib/discovery/advertise_server.go
index 790219f..098955e 100644
--- a/lib/discovery/advertise_server.go
+++ b/lib/discovery/advertise_server.go
@@ -33,39 +33,48 @@
}
}
- eps, updateCh := getEndpoints(server)
- stop, err := advertiseServer(ctx, d, ad, eps, suffix, visibility)
+ status := server.Status()
+ curAddrs := sortedNames(status.Endpoints)
+ stop, err := advertiseServer(ctx, d, ad, curAddrs, suffix, visibility)
if err != nil {
return nil, err
}
done := make(chan struct{})
go func() {
+ defer close(done)
+
for {
select {
- case <-updateCh:
- if stop != nil {
- stop() // Stop the previous advertisement.
+ case <-status.Dirty:
+ status = server.Status()
+ if status.State != rpc.ServerActive {
+ return
}
- eps, updateCh = getEndpoints(server)
- stop, err = advertiseServer(ctx, d, ad, eps, suffix, visibility)
+ newAddrs := sortedNames(status.Endpoints)
+ if sortedStringsEqual(curAddrs, newAddrs) {
+ continue
+ }
+
+ stop() // Stop the previous advertisement.
+ stop, err = advertiseServer(ctx, d, ad, newAddrs, suffix, visibility)
if err != nil {
ctx.Error(err)
+ return
}
+ curAddrs = newAddrs
case <-ctx.Done():
- close(done)
return
}
}
}()
-
return done, nil
}
-func advertiseServer(ctx *context.T, d discovery.T, ad *discovery.Advertisement, eps []naming.Endpoint, suffix string, visibility []security.BlessingPattern) (func(), error) {
- ad.Addresses = make([]string, len(eps))
- for i, ep := range eps {
- ad.Addresses[i] = naming.JoinAddressName(ep.Name(), suffix)
+func advertiseServer(ctx *context.T, d discovery.T, ad *discovery.Advertisement, addrs []string, suffix string, visibility []security.BlessingPattern) (func(), error) {
+ ad.Addresses = make([]string, len(addrs))
+ for i, addr := range addrs {
+ ad.Addresses[i] = naming.JoinAddressName(addr, suffix)
}
ctx, cancel := context.WithCancel(ctx)
done, err := d.Advertise(ctx, ad, visibility)
@@ -79,8 +88,3 @@
}
return stop, nil
}
-
-func getEndpoints(server rpc.Server) ([]naming.Endpoint, <-chan struct{}) {
- status := server.Status()
- return status.Endpoints, status.Dirty
-}
diff --git a/lib/discovery/advertise_server_test.go b/lib/discovery/advertise_server_test.go
index 9ea0a34..6069227 100644
--- a/lib/discovery/advertise_server_test.go
+++ b/lib/discovery/advertise_server_test.go
@@ -5,13 +5,11 @@
package discovery_test
import (
- "sync"
"testing"
"v.io/v23"
"v.io/v23/discovery"
"v.io/v23/naming"
- "v.io/v23/rpc"
idiscovery "v.io/x/ref/lib/discovery"
fdiscovery "v.io/x/ref/lib/discovery/factory"
@@ -21,47 +19,6 @@
"v.io/x/ref/test"
)
-type mockServer struct {
- mu sync.Mutex
- eps []naming.Endpoint
- dirty chan struct{}
-}
-
-func (s *mockServer) AddName(string) error { return nil }
-func (s *mockServer) RemoveName(string) {}
-func (s *mockServer) Closed() <-chan struct{} { return nil }
-func (s *mockServer) Status() rpc.ServerStatus {
- defer s.mu.Unlock()
- s.mu.Lock()
- return rpc.ServerStatus{
- Endpoints: s.eps,
- Dirty: s.dirty,
- }
-}
-
-func (s *mockServer) updateNetwork(eps []naming.Endpoint) {
- defer s.mu.Unlock()
- s.mu.Lock()
- s.eps = eps
- close(s.dirty)
- s.dirty = make(chan struct{})
-}
-
-func newMockServer(eps []naming.Endpoint) *mockServer {
- return &mockServer{
- eps: eps,
- dirty: make(chan struct{}),
- }
-}
-
-func newEndpoints(addrs ...string) []naming.Endpoint {
- eps := make([]naming.Endpoint, len(addrs))
- for i, a := range addrs {
- eps[i], _ = v23.NewEndpoint(a)
- }
- return eps
-}
-
func withNewAddresses(ad *discovery.Advertisement, eps []naming.Endpoint, suffix string) discovery.Advertisement {
newAd := *ad
newAd.Addresses = make([]string, len(eps))
@@ -80,15 +37,15 @@
const suffix = "test"
- eps := newEndpoints("addr1:123")
- mock := newMockServer(eps)
+ eps := testutil.ToEndpoints("addr1:123")
+ mockServer := testutil.NewMockServer(eps)
ad := discovery.Advertisement{
InterfaceName: "v.io/v23/a",
Attributes: map[string]string{"a1": "v1"},
}
- _, err := idiscovery.AdvertiseServer(ctx, nil, mock, suffix, &ad, nil)
+ _, err := idiscovery.AdvertiseServer(ctx, nil, mockServer, suffix, &ad, nil)
if err != nil {
t.Fatal(err)
}
@@ -101,12 +58,12 @@
}
tests := [][]naming.Endpoint{
- newEndpoints("addr2:123", "addr3:456"),
- newEndpoints("addr4:123"),
- newEndpoints("addr5:123", "addr6:456"),
+ testutil.ToEndpoints("addr2:123", "addr3:456"),
+ testutil.ToEndpoints("addr4:123"),
+ testutil.ToEndpoints("addr5:123", "addr6:456"),
}
for _, eps := range tests {
- mock.updateNetwork(eps)
+ mockServer.UpdateNetwork(eps)
newAd = withNewAddresses(&ad, eps, suffix)
if err := testutil.ScanAndMatch(ctx, d, "", newAd); err != nil {
diff --git a/lib/discovery/barrier.go b/lib/discovery/barrier.go
index 3b99e0f..814b11b 100644
--- a/lib/discovery/barrier.go
+++ b/lib/discovery/barrier.go
@@ -17,13 +17,13 @@
}
// Add increments the barrier. Each closure returned by Add() should eventually
-// be run once, otherwise 'done' will never be run. It returns nil if the done
-// closure has been already called.
+// be run once, otherwise 'done' will never be run. It returns no-op function
+// if the done closure has been already called.
func (b *Barrier) Add() func() {
b.mu.Lock()
if b.done == nil {
b.mu.Unlock()
- return nil
+ return func() {}
}
b.n++
b.mu.Unlock()
diff --git a/lib/discovery/barrier_test.go b/lib/discovery/barrier_test.go
index d229129..d3b3b73 100644
--- a/lib/discovery/barrier_test.go
+++ b/lib/discovery/barrier_test.go
@@ -2,11 +2,13 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package discovery
+package discovery_test
import (
"testing"
"time"
+
+ "v.io/x/ref/lib/discovery"
)
func TestBarrier(t *testing.T) {
@@ -16,7 +18,7 @@
done := func() { ch <- struct{}{} }
// A new barrier; Shouldn't call done.
- br := NewBarrier(done)
+ br := discovery.NewBarrier(done)
if waitDone(ch, timeout) {
t.Error("unexpected done call")
}
@@ -29,12 +31,13 @@
}
// Try to add a sub-closure, but done has been already called.
cb = br.Add()
- if cb != nil {
- t.Error("expect nil closure, but got non-nil")
+ cb()
+ if waitDone(ch, timeout) {
+ t.Error("unexpected done call")
}
// Make sure the barrier works with multiple sub-closures.
- br = NewBarrier(done)
+ br = discovery.NewBarrier(done)
cb1 := br.Add()
cb2 := br.Add()
cb1()
diff --git a/lib/discovery/directory.go b/lib/discovery/directory.go
new file mode 100644
index 0000000..399e36e
--- /dev/null
+++ b/lib/discovery/directory.go
@@ -0,0 +1,176 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package discovery
+
+import (
+ "sync"
+
+ "v.io/v23"
+ "v.io/v23/context"
+ "v.io/v23/discovery"
+ "v.io/v23/naming"
+ "v.io/v23/options"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+)
+
+const maxTotalAttachmentSize = 4096
+
+type dirServer struct {
+ d *idiscovery
+
+ mu sync.Mutex
+ adMap map[discovery.AdId]*AdInfo // GUARDED_BY(mu)
+ dirAddrs []string // GUARDED_BY(mu)
+
+ cancel func()
+}
+
+func (s *dirServer) Lookup(ctx *context.T, _ rpc.ServerCall, id discovery.AdId) (AdInfo, error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ adinfo := s.adMap[id]
+ if adinfo == nil {
+ return AdInfo{}, NewErrAdvertisementNotFound(ctx, id)
+ }
+
+ copied := *adinfo
+ copied.Ad.Attachments = make(discovery.Attachments)
+
+ sizeSum := 0
+ for k, v := range adinfo.Ad.Attachments {
+ if sizeSum+len(v) > maxTotalAttachmentSize {
+ copied.Status = AdPartiallyReady
+ continue
+ }
+ sizeSum += len(v)
+ copied.Ad.Attachments[k] = v
+ }
+ return copied, nil
+}
+
+func (s *dirServer) GetAttachment(ctx *context.T, _ rpc.ServerCall, id discovery.AdId, name string) ([]byte, error) {
+ s.mu.Lock()
+ defer s.mu.Unlock()
+
+ adinfo := s.adMap[id]
+ if adinfo == nil {
+ return nil, NewErrAdvertisementNotFound(ctx, id)
+ }
+ return adinfo.Ad.Attachments[name], nil
+}
+
+func (s *dirServer) publish(adinfo *AdInfo) {
+ s.mu.Lock()
+ adinfo.DirAddrs = s.dirAddrs
+ s.adMap[adinfo.Ad.Id] = adinfo
+ s.mu.Unlock()
+}
+
+func (s *dirServer) unpublish(id discovery.AdId) {
+ s.mu.Lock()
+ delete(s.adMap, id)
+ s.mu.Unlock()
+}
+
+func (s *dirServer) shutdown() {
+ s.cancel()
+}
+
+func (s *dirServer) updateDirAddrs(addrs []string) {
+ s.mu.Lock()
+ s.dirAddrs = addrs
+ newAdinfos := make([]*AdInfo, 0, len(s.adMap))
+ for id, adinfo := range s.adMap {
+ newAdinfo := *adinfo
+ newAdinfo.DirAddrs = s.dirAddrs
+ s.adMap[id] = &newAdinfo
+ newAdinfos = append(newAdinfos, &newAdinfo)
+ }
+ s.mu.Unlock()
+
+ for _, adinfo := range newAdinfos {
+ s.d.updateAdvertising(adinfo)
+ }
+}
+
+func newDirServer(ctx *context.T, d *idiscovery, opts ...rpc.ServerOpt) (*dirServer, error) {
+ ctx, cancel := context.WithCancel(ctx)
+ s := &dirServer{d: d, adMap: make(map[discovery.AdId]*AdInfo), cancel: cancel}
+ ctx, server, err := v23.WithNewServer(ctx, "", DirectoryServer(s), security.AllowEveryone(), opts...)
+ if err != nil {
+ return nil, err
+ }
+
+ status := server.Status()
+ curAddrs := sortedNames(status.Endpoints)
+ s.updateDirAddrs(curAddrs)
+
+ go func() {
+ for {
+ select {
+ case <-status.Dirty:
+ status = server.Status()
+ if status.State != rpc.ServerActive {
+ return
+ }
+ newAddrs := sortedNames(status.Endpoints)
+ if sortedStringsEqual(curAddrs, newAddrs) {
+ continue
+ }
+
+ s.updateDirAddrs(newAddrs)
+ curAddrs = newAddrs
+ case <-ctx.Done():
+ return
+ }
+ }
+ }()
+ return s, nil
+}
+
+type dirClient struct {
+ dirAddrs []string
+}
+
+func (c *dirClient) Lookup(ctx *context.T, id discovery.AdId) (*AdInfo, error) {
+ resolved, err := c.resolveDirAddrs(ctx)
+ if err != nil {
+ return nil, err
+ }
+ adinfo, err := DirectoryClient("").Lookup(ctx, id, resolved)
+ if err != nil {
+ return nil, err
+ }
+ return &adinfo, nil
+}
+
+func (c *dirClient) GetAttachment(ctx *context.T, id discovery.AdId, name string) ([]byte, error) {
+ resolved, err := c.resolveDirAddrs(ctx)
+ if err != nil {
+ return nil, err
+ }
+ return DirectoryClient("").GetAttachment(ctx, id, name, resolved)
+}
+
+func (c *dirClient) resolveDirAddrs(ctx *context.T) (options.Preresolved, error) {
+ // We pre-resolve the addresses so that we can utilize the RPC system's handling
+ // of connecting to the same server by trying multiple endpoints.
+ ns := v23.GetNamespace(ctx)
+ resolved := naming.MountEntry{IsLeaf: true}
+ for _, addr := range c.dirAddrs {
+ entry, err := ns.Resolve(ctx, addr)
+ if err != nil {
+ return options.Preresolved{}, err
+ }
+ resolved.Servers = append(resolved.Servers, entry.Servers...)
+ }
+ return options.Preresolved{&resolved}, nil
+}
+
+func newDirClient(dirAddrs []string) *dirClient {
+ return &dirClient{dirAddrs}
+}
diff --git a/lib/discovery/directory.vdl b/lib/discovery/directory.vdl
index 64059fa..0cc9b57 100644
--- a/lib/discovery/directory.vdl
+++ b/lib/discovery/directory.vdl
@@ -12,5 +12,13 @@
// Directory is the interface for advertisement directory service.
type Directory interface {
// Lookup returns the advertisement of the given service instance.
+ //
+ // The returned advertisement may not include all attachments.
Lookup(id discovery.AdId) (AdInfo | error) {access.Read}
+
+ // GetAttachment returns the named attachment. Accessing non-existent attachment
+ // is not an error - nil data is returned if not found.
+ //
+ // TODO(jhahn): Consider to return an error if not found.
+ GetAttachment(id discovery.AdId, name string) ([]byte | error) {access.Read}
}
diff --git a/lib/discovery/discovery.go b/lib/discovery/discovery.go
index 13a20b9..c9f4b2e 100644
--- a/lib/discovery/discovery.go
+++ b/lib/discovery/discovery.go
@@ -11,8 +11,6 @@
"v.io/v23/discovery"
)
-type sessionId uint64
-
type idiscovery struct {
plugins []Plugin
@@ -21,7 +19,21 @@
tasks map[*context.T]func() // GUARDED_BY(mu)
wg sync.WaitGroup
- ads map[discovery.AdId]sessionId // GUARDED_BY(mu)
+ adMu sync.Mutex
+ adSessions map[discovery.AdId]sessionId // GUARDED_BY(adMu)
+ adSubtasks map[discovery.AdId]*adSubtask // GUARDED_BY(adMu)
+ adStopTrigger *Trigger
+
+ dirServer *dirServer
+}
+
+type sessionId uint64
+
+type adSubtask struct {
+ parent *context.T
+
+ mu sync.Mutex
+ stop func() // GUARDED_BY(mu)
}
func (d *idiscovery) shutdown() {
@@ -30,6 +42,7 @@
d.mu.Unlock()
return
}
+ d.dirServer.shutdown()
for _, cancel := range d.tasks {
cancel()
}
@@ -64,15 +77,33 @@
d.mu.Unlock()
}
+func (d *idiscovery) cancelTask(ctx *context.T) {
+ d.mu.Lock()
+ cancel := d.tasks[ctx]
+ d.mu.Unlock()
+ if cancel != nil {
+ cancel()
+ }
+}
+
func newDiscovery(ctx *context.T, plugins []Plugin) (*idiscovery, error) {
if len(plugins) == 0 {
return nil, NewErrNoDiscoveryPlugin(ctx)
}
d := &idiscovery{
- plugins: make([]Plugin, len(plugins)),
- tasks: make(map[*context.T]func()),
- ads: make(map[discovery.AdId]sessionId),
+ plugins: make([]Plugin, len(plugins)),
+ tasks: make(map[*context.T]func()),
+ adSessions: make(map[discovery.AdId]sessionId),
+ adSubtasks: make(map[discovery.AdId]*adSubtask),
+ adStopTrigger: NewTrigger(),
}
copy(d.plugins, plugins)
+
+ // TODO(jhahn): Consider to start a directory server when it is required.
+ // For example, scan-only applications would not need it.
+ var err error
+ if d.dirServer, err = newDirServer(ctx, d); err != nil {
+ return nil, err
+ }
return d, nil
}
diff --git a/lib/discovery/discovery.vdl.go b/lib/discovery/discovery.vdl.go
index 0f314d6..e6a802a 100644
--- a/lib/discovery/discovery.vdl.go
+++ b/lib/discovery/discovery.vdl.go
@@ -25,6 +25,48 @@
//////////////////////////////////////////////////
// Type definitions
+type Uuid []byte
+
+func (Uuid) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/discovery.Uuid"`
+}) {
+}
+
+func (m *Uuid) FillVDLTarget(t vdl.Target, tt *vdl.Type) error {
+ if err := t.FromBytes([]byte((*m)), tt); err != nil {
+ return err
+ }
+ return nil
+}
+
+func (m *Uuid) MakeVDLTarget() vdl.Target {
+ return &UuidTarget{Value: m}
+}
+
+type UuidTarget struct {
+ Value *Uuid
+ vdl.TargetBase
+}
+
+func (t *UuidTarget) FromBytes(src []byte, tt *vdl.Type) error {
+
+ if ttWant := vdl.TypeOf((*Uuid)(nil)); !vdl.Compatible(tt, ttWant) {
+ return fmt.Errorf("type %v incompatible with %v", tt, ttWant)
+ }
+ if len(src) == 0 {
+ *t.Value = nil
+ } else {
+ *t.Value = make([]byte, len(src))
+ copy(*t.Value, src)
+ }
+
+ return nil
+}
+func (t *UuidTarget) FromZero(tt *vdl.Type) error {
+ *t.Value = Uuid(nil)
+ return nil
+}
+
type EncryptionAlgorithm int32
func (EncryptionAlgorithm) __VDLReflect(struct {
@@ -125,45 +167,61 @@
return nil
}
-type Uuid []byte
+type AdStatus byte
-func (Uuid) __VDLReflect(struct {
- Name string `vdl:"v.io/x/ref/lib/discovery.Uuid"`
+func (AdStatus) __VDLReflect(struct {
+ Name string `vdl:"v.io/x/ref/lib/discovery.AdStatus"`
}) {
}
-func (m *Uuid) FillVDLTarget(t vdl.Target, tt *vdl.Type) error {
- if err := t.FromBytes([]byte((*m)), tt); err != nil {
+func (m *AdStatus) FillVDLTarget(t vdl.Target, tt *vdl.Type) error {
+ if err := t.FromUint(uint64((*m)), tt); err != nil {
return err
}
return nil
}
-func (m *Uuid) MakeVDLTarget() vdl.Target {
- return &UuidTarget{Value: m}
+func (m *AdStatus) MakeVDLTarget() vdl.Target {
+ return &AdStatusTarget{Value: m}
}
-type UuidTarget struct {
- Value *Uuid
+type AdStatusTarget struct {
+ Value *AdStatus
vdl.TargetBase
}
-func (t *UuidTarget) FromBytes(src []byte, tt *vdl.Type) error {
+func (t *AdStatusTarget) FromUint(src uint64, tt *vdl.Type) error {
- if ttWant := vdl.TypeOf((*Uuid)(nil)); !vdl.Compatible(tt, ttWant) {
- return fmt.Errorf("type %v incompatible with %v", tt, ttWant)
+ val, err := vdlconv.Uint64ToUint8(src)
+ if err != nil {
+ return err
}
- if len(src) == 0 {
- *t.Value = nil
- } else {
- *t.Value = make([]byte, len(src))
- copy(*t.Value, src)
- }
+ *t.Value = AdStatus(val)
return nil
}
-func (t *UuidTarget) FromZero(tt *vdl.Type) error {
- *t.Value = Uuid(nil)
+func (t *AdStatusTarget) FromInt(src int64, tt *vdl.Type) error {
+
+ val, err := vdlconv.Int64ToUint8(src)
+ if err != nil {
+ return err
+ }
+ *t.Value = AdStatus(val)
+
+ return nil
+}
+func (t *AdStatusTarget) FromFloat(src float64, tt *vdl.Type) error {
+
+ val, err := vdlconv.Float64ToUint8(src)
+ if err != nil {
+ return err
+ }
+ *t.Value = AdStatus(val)
+
+ return nil
+}
+func (t *AdStatusTarget) FromZero(tt *vdl.Type) error {
+ *t.Value = AdStatus(0)
return nil
}
@@ -214,11 +272,13 @@
// If the advertisement is encrypted, then the data required to
// decrypt it. The format of this data is a function of the algorithm.
EncryptionKeys []EncryptionKey
- // Hash of the current advertisement.
+ // Hash of the current advertisement. This does not include the fields below.
Hash AdHash
// The addresses (vanadium object names) that the advertisement directory service
// is served on. See directory.vdl.
DirAddrs []string
+ // Status of the current advertisement. Valid for scanned advertisements.
+ Status AdStatus
// TODO(jhahn): Add proximity.
// TODO(jhahn): Use proximity for Lost.
Lost bool
@@ -397,19 +457,20 @@
return err
}
}
- keyTarget28, fieldTarget29, err := fieldsTarget1.StartField("Lost")
+ keyTarget28, fieldTarget29, err := fieldsTarget1.StartField("Status")
if err != vdl.ErrFieldNoExist && err != nil {
return err
}
if err != vdl.ErrFieldNoExist {
- var30 := (m.Lost == false)
+ var30 := (m.Status == AdStatus(0))
if var30 {
if err := fieldTarget29.FromZero(tt.NonOptional().Field(5).Type); err != nil {
return err
}
} else {
- if err := fieldTarget29.FromBool(bool(m.Lost), tt.NonOptional().Field(5).Type); err != nil {
+
+ if err := m.Status.FillVDLTarget(fieldTarget29, tt.NonOptional().Field(5).Type); err != nil {
return err
}
}
@@ -417,6 +478,26 @@
return err
}
}
+ keyTarget31, fieldTarget32, err := fieldsTarget1.StartField("Lost")
+ if err != vdl.ErrFieldNoExist && err != nil {
+ return err
+ }
+ if err != vdl.ErrFieldNoExist {
+
+ var33 := (m.Lost == false)
+ if var33 {
+ if err := fieldTarget32.FromZero(tt.NonOptional().Field(6).Type); err != nil {
+ return err
+ }
+ } else {
+ if err := fieldTarget32.FromBool(bool(m.Lost), tt.NonOptional().Field(6).Type); err != nil {
+ return err
+ }
+ }
+ if err := fieldsTarget1.FinishField(keyTarget31, fieldTarget32); err != nil {
+ return err
+ }
+ }
if err := t.FinishFields(fieldsTarget1); err != nil {
return err
}
@@ -434,6 +515,7 @@
encryptionKeysTarget __VDLTarget1_list
hashTarget AdHashTarget
dirAddrsTarget vdl.StringSliceTarget
+ statusTarget AdStatusTarget
lostTarget vdl.BoolTarget
vdl.TargetBase
vdl.FieldsTargetBase
@@ -468,6 +550,10 @@
t.dirAddrsTarget.Value = &t.Value.DirAddrs
target, err := &t.dirAddrsTarget, error(nil)
return nil, target, err
+ case "Status":
+ t.statusTarget.Value = &t.Value.Status
+ target, err := &t.statusTarget, error(nil)
+ return nil, target, err
case "Lost":
t.lostTarget.Value = &t.Value.Lost
target, err := &t.lostTarget, error(nil)
@@ -531,10 +617,14 @@
const NoEncryption = EncryptionAlgorithm(0)
const TestEncryption = EncryptionAlgorithm(1)
const IbeEncryption = EncryptionAlgorithm(2)
+const AdReady = AdStatus(0) // All information is available
+const AdNotReady = AdStatus(1) // Not all information is available for querying against it
+const AdPartiallyReady = AdStatus(2) // All information except attachments is available
//////////////////////////////////////////////////
// Error definitions
var (
+ ErrAdvertisementNotFound = verror.Register("v.io/x/ref/lib/discovery.AdvertisementNotFound", verror.NoRetry, "{1:}{2:} advertisement not found: {3}")
ErrAlreadyBeingAdvertised = verror.Register("v.io/x/ref/lib/discovery.AlreadyBeingAdvertised", verror.NoRetry, "{1:}{2:} already being advertised: {3}")
ErrBadAdvertisement = verror.Register("v.io/x/ref/lib/discovery.BadAdvertisement", verror.NoRetry, "{1:}{2:} invalid advertisement: {3}")
ErrBadQuery = verror.Register("v.io/x/ref/lib/discovery.BadQuery", verror.NoRetry, "{1:}{2:} invalid query: {3}")
@@ -542,6 +632,11 @@
ErrNoDiscoveryPlugin = verror.Register("v.io/x/ref/lib/discovery.NoDiscoveryPlugin", verror.NoRetry, "{1:}{2:} no discovery plugin")
)
+// NewErrAdvertisementNotFound returns an error with the ErrAdvertisementNotFound ID.
+func NewErrAdvertisementNotFound(ctx *context.T, id discovery.AdId) error {
+ return verror.New(ErrAdvertisementNotFound, ctx, id)
+}
+
// NewErrAlreadyBeingAdvertised returns an error with the ErrAlreadyBeingAdvertised ID.
func NewErrAlreadyBeingAdvertised(ctx *context.T, id discovery.AdId) error {
return verror.New(ErrAlreadyBeingAdvertised, ctx, id)
@@ -576,7 +671,14 @@
// Directory is the interface for advertisement directory service.
type DirectoryClientMethods interface {
// Lookup returns the advertisement of the given service instance.
+ //
+ // The returned advertisement may not include all attachments.
Lookup(_ *context.T, id discovery.AdId, _ ...rpc.CallOpt) (AdInfo, error)
+ // GetAttachment returns the named attachment. Accessing non-existent attachment
+ // is not an error - nil data is returned if not found.
+ //
+ // TODO(jhahn): Consider to return an error if not found.
+ GetAttachment(_ *context.T, id discovery.AdId, name string, _ ...rpc.CallOpt) ([]byte, error)
}
// DirectoryClientStub adds universal methods to DirectoryClientMethods.
@@ -599,13 +701,25 @@
return
}
+func (c implDirectoryClientStub) GetAttachment(ctx *context.T, i0 discovery.AdId, i1 string, opts ...rpc.CallOpt) (o0 []byte, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "GetAttachment", []interface{}{i0, i1}, []interface{}{&o0}, opts...)
+ return
+}
+
// DirectoryServerMethods is the interface a server writer
// implements for Directory.
//
// Directory is the interface for advertisement directory service.
type DirectoryServerMethods interface {
// Lookup returns the advertisement of the given service instance.
+ //
+ // The returned advertisement may not include all attachments.
Lookup(_ *context.T, _ rpc.ServerCall, id discovery.AdId) (AdInfo, error)
+ // GetAttachment returns the named attachment. Accessing non-existent attachment
+ // is not an error - nil data is returned if not found.
+ //
+ // TODO(jhahn): Consider to return an error if not found.
+ GetAttachment(_ *context.T, _ rpc.ServerCall, id discovery.AdId, name string) ([]byte, error)
}
// DirectoryServerStubMethods is the server interface containing
@@ -647,6 +761,10 @@
return s.impl.Lookup(ctx, call, i0)
}
+func (s implDirectoryServerStub) GetAttachment(ctx *context.T, call rpc.ServerCall, i0 discovery.AdId, i1 string) ([]byte, error) {
+ return s.impl.GetAttachment(ctx, call, i0, i1)
+}
+
func (s implDirectoryServerStub) Globber() *rpc.GlobState {
return s.gs
}
@@ -666,7 +784,7 @@
Methods: []rpc.MethodDesc{
{
Name: "Lookup",
- Doc: "// Lookup returns the advertisement of the given service instance.",
+ Doc: "// Lookup returns the advertisement of the given service instance.\n//\n// The returned advertisement may not include all attachments.",
InArgs: []rpc.ArgDesc{
{"id", ``}, // discovery.AdId
},
@@ -675,6 +793,18 @@
},
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
},
+ {
+ Name: "GetAttachment",
+ Doc: "// GetAttachment returns the named attachment. Accessing non-existent attachment\n// is not an error - nil data is returned if not found.\n//\n// TODO(jhahn): Consider to return an error if not found.",
+ InArgs: []rpc.ArgDesc{
+ {"id", ``}, // discovery.AdId
+ {"name", ``}, // string
+ },
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // []byte
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
},
}
@@ -700,13 +830,15 @@
__VDLInitCalled = true
// Register types.
+ vdl.Register((*Uuid)(nil))
vdl.Register((*EncryptionAlgorithm)(nil))
vdl.Register((*EncryptionKey)(nil))
- vdl.Register((*Uuid)(nil))
+ vdl.Register((*AdStatus)(nil))
vdl.Register((*AdHash)(nil))
vdl.Register((*AdInfo)(nil))
// Set error format strings.
+ i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAdvertisementNotFound.ID), "{1:}{2:} advertisement not found: {3}")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrAlreadyBeingAdvertised.ID), "{1:}{2:} already being advertised: {3}")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBadAdvertisement.ID), "{1:}{2:} invalid advertisement: {3}")
i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBadQuery.ID), "{1:}{2:} invalid query: {3}")
diff --git a/lib/discovery/discovery_test.go b/lib/discovery/discovery_test.go
index 782bd2b..69103b8 100644
--- a/lib/discovery/discovery_test.go
+++ b/lib/discovery/discovery_test.go
@@ -5,6 +5,8 @@
package discovery_test
import (
+ "bytes"
+ "reflect"
"testing"
"time"
@@ -264,6 +266,116 @@
}
}
+func TestLargeAdvertisement(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
+ df, err := idiscovery.NewFactory(ctx, mock.NewWithAdStatus(idiscovery.AdNotReady))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer df.Shutdown()
+
+ ads := []discovery.Advertisement{
+ {
+ InterfaceName: "v.io/a",
+ Addresses: []string{"/h1:123/x"},
+ Attributes: discovery.Attributes{"a": "v"},
+ },
+ {
+ InterfaceName: "v.io/b",
+ Addresses: []string{"/h1:123/y"},
+ Attachments: discovery.Attachments{
+ "a1": bytes.Repeat([]byte{1}, 2048),
+ "a2": bytes.Repeat([]byte{2}, 4096),
+ "a3": bytes.Repeat([]byte{3}, 1024),
+ "a4": bytes.Repeat([]byte{4}, 2048),
+ },
+ },
+ }
+
+ d1, err := df.New(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ for i, _ := range ads {
+ stop, err := testutil.Advertise(ctx, d1, nil, &ads[i])
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer stop()
+ }
+
+ d2, err := df.New(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if err := testutil.ScanAndMatch(ctx, d2, `v.InterfaceName="v.io/a"`, ads[0]); err != nil {
+ t.Error(err)
+ }
+ if err := testutil.ScanAndMatch(ctx, d2, `v.Attributes["a"]="v"`, ads[0]); err != nil {
+ t.Error(err)
+ }
+
+ scanCh, scanStop, err := testutil.Scan(ctx, d2, `v.InterfaceName="v.io/b"`)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer scanStop()
+ update := <-scanCh
+
+ // Make sure that the directory server does not return all of the attachments if they are too large.
+ if reflect.DeepEqual(update.Advertisement().Attachments, ads[1].Attachments) {
+ t.Errorf("did not expect all of attachments, but got all: %v", update.Advertisement())
+ }
+ // But we should be able to fetch them lazily.
+ if !testutil.UpdateEqual(ctx, update, ads[1]) {
+ t.Errorf("Match failed; got %v, but wanted %v", update, ads[1])
+ }
+}
+
+func TestLargeAttachments(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
+ df, err := idiscovery.NewFactory(ctx, mock.NewWithAdStatus(idiscovery.AdPartiallyReady))
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer df.Shutdown()
+
+ ad := discovery.Advertisement{
+ InterfaceName: "v.io/a",
+ Addresses: []string{"/h1:123/x", "/h2:123/y"},
+ Attachments: discovery.Attachments{
+ "a1": []byte{1, 2, 3},
+ "a2": []byte{4, 5, 6},
+ },
+ }
+
+ d1, err := df.New(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ stop, err := testutil.Advertise(ctx, d1, nil, &ad)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer stop()
+
+ d2, err := df.New(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ if err := testutil.ScanAndMatch(ctx, d2, ``, ad); err != nil {
+ t.Error(err)
+ }
+}
+
func TestShutdown(t *testing.T) {
ctx, shutdown := test.V23Init()
defer shutdown()
diff --git a/lib/discovery/errors.vdl b/lib/discovery/errors.vdl
index 5c5bbed..c6592a5 100644
--- a/lib/discovery/errors.vdl
+++ b/lib/discovery/errors.vdl
@@ -9,6 +9,10 @@
)
error (
+ AdvertisementNotFound(id discovery.AdId) {
+ "en": "advertisement not found: {id}",
+ }
+
AlreadyBeingAdvertised(id discovery.AdId) {
"en": "already being advertised: {id}",
}
diff --git a/lib/discovery/plugins/ble/testdata/testdata.vdl.go b/lib/discovery/plugins/ble/testdata/testdata.vdl.go
index a48166a..8fc11b4 100644
--- a/lib/discovery/plugins/ble/testdata/testdata.vdl.go
+++ b/lib/discovery/plugins/ble/testdata/testdata.vdl.go
@@ -79,8 +79,10 @@
var14 = true
}
var4 = var4 && var14
- var15 := (m.AdInfo.Lost == false)
+ var15 := (m.AdInfo.Status == discovery_2.AdStatus(0))
var4 = var4 && var15
+ var16 := (m.AdInfo.Lost == false)
+ var4 = var4 && var16
if var4 {
if err := fieldTarget3.FromZero(tt.NonOptional().Field(0).Type); err != nil {
return err
@@ -95,51 +97,51 @@
return err
}
}
- keyTarget16, fieldTarget17, err := fieldsTarget1.StartField("GattAttrs")
+ keyTarget17, fieldTarget18, err := fieldsTarget1.StartField("GattAttrs")
if err != vdl.ErrFieldNoExist && err != nil {
return err
}
if err != vdl.ErrFieldNoExist {
- var var18 bool
+ var var19 bool
if len(m.GattAttrs) == 0 {
- var18 = true
+ var19 = true
}
- if var18 {
- if err := fieldTarget17.FromZero(tt.NonOptional().Field(1).Type); err != nil {
+ if var19 {
+ if err := fieldTarget18.FromZero(tt.NonOptional().Field(1).Type); err != nil {
return err
}
} else {
- mapTarget19, err := fieldTarget17.StartMap(tt.NonOptional().Field(1).Type, len(m.GattAttrs))
+ mapTarget20, err := fieldTarget18.StartMap(tt.NonOptional().Field(1).Type, len(m.GattAttrs))
if err != nil {
return err
}
- for key21, value23 := range m.GattAttrs {
- keyTarget20, err := mapTarget19.StartKey()
+ for key22, value24 := range m.GattAttrs {
+ keyTarget21, err := mapTarget20.StartKey()
if err != nil {
return err
}
- if err := keyTarget20.FromString(string(key21), tt.NonOptional().Field(1).Type.Key()); err != nil {
+ if err := keyTarget21.FromString(string(key22), tt.NonOptional().Field(1).Type.Key()); err != nil {
return err
}
- valueTarget22, err := mapTarget19.FinishKeyStartField(keyTarget20)
+ valueTarget23, err := mapTarget20.FinishKeyStartField(keyTarget21)
if err != nil {
return err
}
- if err := valueTarget22.FromBytes([]byte(value23), tt.NonOptional().Field(1).Type.Elem()); err != nil {
+ if err := valueTarget23.FromBytes([]byte(value24), tt.NonOptional().Field(1).Type.Elem()); err != nil {
return err
}
- if err := mapTarget19.FinishField(keyTarget20, valueTarget22); err != nil {
+ if err := mapTarget20.FinishField(keyTarget21, valueTarget23); err != nil {
return err
}
}
- if err := fieldTarget17.FinishMap(mapTarget19); err != nil {
+ if err := fieldTarget18.FinishMap(mapTarget20); err != nil {
return err
}
}
- if err := fieldsTarget1.FinishField(keyTarget16, fieldTarget17); err != nil {
+ if err := fieldsTarget1.FinishField(keyTarget17, fieldTarget18); err != nil {
return err
}
}
diff --git a/lib/discovery/plugins/mock/mock.go b/lib/discovery/plugins/mock/mock.go
index 89680aa..dd92aa7 100644
--- a/lib/discovery/plugins/mock/mock.go
+++ b/lib/discovery/plugins/mock/mock.go
@@ -20,6 +20,7 @@
updated *sync.Cond
updateSeq int // GUARDED_BY(mu)
+ adStatus idiscovery.AdStatus
adinfoMap map[string]map[discovery.AdId]*idiscovery.AdInfo // GUARDED_BY(mu)
}
@@ -66,7 +67,7 @@
continue
}
for id, adinfo := range adinfos {
- current[id] = *adinfo
+ current[id] = copyAd(adinfo, p.adStatus)
}
}
p.mu.Unlock()
@@ -80,6 +81,7 @@
}
for id, adinfo := range seen {
if _, ok := current[id]; !ok {
+ adinfo.Status = idiscovery.AdReady
adinfo.Lost = true
changed = append(changed, adinfo)
}
@@ -109,6 +111,20 @@
func (p *plugin) Close() {}
+func copyAd(adinfo *idiscovery.AdInfo, status idiscovery.AdStatus) idiscovery.AdInfo {
+ copied := *adinfo
+ copied.Status = status
+ switch copied.Status {
+ case idiscovery.AdReady:
+ copied.DirAddrs = nil
+ case idiscovery.AdNotReady:
+ copied.Ad = discovery.Advertisement{Id: adinfo.Ad.Id}
+ case idiscovery.AdPartiallyReady:
+ copied.Ad.Attachments = nil
+ }
+ return copied
+}
+
// RegisterAd registers an advertisement to the plugin. If there is already an
// advertisement with the same id, it will be updated with the given advertisement.
func (p *plugin) RegisterAd(adinfo *idiscovery.AdInfo) {
@@ -138,7 +154,11 @@
}
func New() *plugin {
- p := &plugin{adinfoMap: make(map[string]map[discovery.AdId]*idiscovery.AdInfo)}
+ return NewWithAdStatus(idiscovery.AdReady)
+}
+
+func NewWithAdStatus(status idiscovery.AdStatus) *plugin {
+ p := &plugin{adStatus: status, adinfoMap: make(map[string]map[discovery.AdId]*idiscovery.AdInfo)}
p.updated = sync.NewCond(&p.mu)
return p
}
diff --git a/lib/discovery/plugins/testutil/util.go b/lib/discovery/plugins/testutil/util.go
index e6bcaab..cce9d60 100644
--- a/lib/discovery/plugins/testutil/util.go
+++ b/lib/discovery/plugins/testutil/util.go
@@ -64,25 +64,24 @@
return fmt.Errorf("Match failed; got %v, but wanted %v", adinfos, wants)
}
-func doScan(ctx *context.T, p idiscovery.Plugin, interfaceName string, expectedAds int) ([]idiscovery.AdInfo, error) {
+func doScan(ctx *context.T, p idiscovery.Plugin, interfaceName string, expectedAdInfos int) ([]idiscovery.AdInfo, error) {
scanCh, stop, err := Scan(ctx, p, interfaceName)
if err != nil {
return nil, err
}
defer stop()
- adinfos := make([]idiscovery.AdInfo, 0, expectedAds)
+ adinfos := make([]idiscovery.AdInfo, 0, expectedAdInfos)
for {
- timeout := 5 * time.Millisecond
- if len(adinfos) < expectedAds {
- // Increase the timeout if we do not receive enough updates
- // to avoid flakiness in unit tests.
- timeout = 5 * time.Second
+ var timer <-chan time.Time
+ if len(adinfos) >= expectedAdInfos {
+ timer = time.After(5 * time.Millisecond)
}
+
select {
case adinfo := <-scanCh:
adinfos = append(adinfos, *adinfo)
- case <-time.After(timeout):
+ case <-timer:
return adinfos, nil
}
}
diff --git a/lib/discovery/scan.go b/lib/discovery/scan.go
index fbe9fef..bbcd631 100644
--- a/lib/discovery/scan.go
+++ b/lib/discovery/scan.go
@@ -5,6 +5,9 @@
package discovery
import (
+ "sort"
+ "sync"
+
"v.io/v23/context"
"v.io/v23/discovery"
)
@@ -24,8 +27,11 @@
// TODO(jhahn): Revisit the buffer size.
scanCh := make(chan *AdInfo, 10)
+ updateCh := make(chan discovery.Update, 10)
+
barrier := NewBarrier(func() {
close(scanCh)
+ close(updateCh)
d.removeTask(ctx)
})
for _, plugin := range d.plugins {
@@ -34,26 +40,57 @@
return nil, err
}
}
- // TODO(jhahn): Revisit the buffer size.
- updateCh := make(chan discovery.Update, 10)
- go d.doScan(ctx, session, matcher, scanCh, updateCh)
+ go d.doScan(ctx, session, matcher, scanCh, updateCh, barrier.Add())
return updateCh, nil
}
-func (d *idiscovery) doScan(ctx *context.T, session sessionId, matcher Matcher, scanCh <-chan *AdInfo, updateCh chan<- discovery.Update) {
- defer close(updateCh)
-
+func (d *idiscovery) doScan(ctx *context.T, session sessionId, matcher Matcher, scanCh chan *AdInfo, updateCh chan<- discovery.Update, done func()) {
// Some plugins may not return a full advertisement information when it is lost.
// So we keep the advertisements that we've seen so that we can provide the
// full advertisement information when it is lost. Note that plugins will not
// include attachments unless they're tiny enough.
seen := make(map[discovery.AdId]*AdInfo)
+
+ var wg sync.WaitGroup
+ defer func() {
+ wg.Wait()
+ done()
+ }()
+
for {
select {
- case adinfo, ok := <-scanCh:
- if !ok {
- return
+ case adinfo := <-scanCh:
+ if !adinfo.Lost {
+ // Filter out advertisements from the same session.
+ if d.getAdSession(adinfo.Ad.Id) == session {
+ continue
+ }
+ // Filter out already seen advertisements.
+ if prev := seen[adinfo.Ad.Id]; prev != nil && prev.Status == AdReady && prev.Hash == adinfo.Hash {
+ continue
+ }
+
+ if adinfo.Status == AdReady {
+ // Clear the unnecessary directory addresses.
+ adinfo.DirAddrs = nil
+ } else {
+ if len(adinfo.DirAddrs) == 0 {
+ ctx.Errorf("no directory address available for partial advertisement %v - ignored", adinfo.Ad.Id)
+ continue
+ }
+
+ // Fetch not-ready-to-serve advertisements from the directory server.
+ if adinfo.Status == AdNotReady {
+ wg.Add(1)
+ go fetchAd(ctx, adinfo.DirAddrs, adinfo.Ad.Id, scanCh, wg.Done)
+ continue
+ }
+
+ // Sort the directory addresses to make it easy to compare.
+ sort.Strings(adinfo.DirAddrs)
+ }
}
+
if err := decrypt(ctx, adinfo); err != nil {
// Couldn't decrypt it. Ignore it.
if err != errNoPermission {
@@ -61,13 +98,9 @@
}
continue
}
- // Filter out advertisements from the same session.
- if d.getAdSession(adinfo) == session {
- continue
- }
- // Note that 'Lost' advertisement may not have full service information.
- // Thus we do not match the query against it. newUpdates() will ignore it
- // if it has not been scanned.
+
+ // Note that 'Lost' advertisement may not have full information. Thus we do not match
+ // the query against it. newUpdates() will ignore it if it has not been scanned.
if !adinfo.Lost {
matched, err := matcher.Match(&adinfo.Ad)
if err != nil {
@@ -78,6 +111,7 @@
continue
}
}
+
for _, update := range newUpdates(seen, adinfo) {
select {
case updateCh <- update:
@@ -85,17 +119,30 @@
return
}
}
+
case <-ctx.Done():
return
}
}
}
-func (d *idiscovery) getAdSession(adinfo *AdInfo) sessionId {
- d.mu.Lock()
- session := d.ads[adinfo.Ad.Id]
- d.mu.Unlock()
- return session
+func fetchAd(ctx *context.T, dirAddrs []string, id discovery.AdId, scanCh chan<- *AdInfo, done func()) {
+ defer done()
+
+ dir := newDirClient(dirAddrs)
+ adinfo, err := dir.Lookup(ctx, id)
+ if err != nil {
+ select {
+ case <-ctx.Done():
+ default:
+ ctx.Error(err)
+ }
+ return
+ }
+ select {
+ case scanCh <- adinfo:
+ case <-ctx.Done():
+ }
}
func newUpdates(seen map[discovery.AdId]*AdInfo, adinfo *AdInfo) []discovery.Update {
@@ -118,7 +165,7 @@
case prev == nil:
updates = []discovery.Update{NewUpdate(adinfo)}
seen[adinfo.Ad.Id] = adinfo
- case prev.Hash != adinfo.Hash:
+ case prev.Hash != adinfo.Hash || !sortedStringsEqual(prev.DirAddrs, adinfo.DirAddrs):
prev.Lost = true
updates = []discovery.Update{NewUpdate(prev), NewUpdate(adinfo)}
seen[adinfo.Ad.Id] = adinfo
diff --git a/lib/discovery/test/directory_test.go b/lib/discovery/test/directory_test.go
new file mode 100644
index 0000000..2aced6a
--- /dev/null
+++ b/lib/discovery/test/directory_test.go
@@ -0,0 +1,189 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package test
+
+import (
+ "bytes"
+ "reflect"
+ "testing"
+
+ "v.io/v23/context"
+ "v.io/v23/discovery"
+ "v.io/v23/rpc"
+ "v.io/v23/security"
+
+ idiscovery "v.io/x/ref/lib/discovery"
+ "v.io/x/ref/lib/discovery/plugins/mock"
+ "v.io/x/ref/lib/discovery/testutil"
+ "v.io/x/ref/runtime/factories/fake"
+ "v.io/x/ref/test"
+)
+
+func TestDirectoryBasic(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
+ var dirServer idiscovery.DirectoryServerStub
+
+ mockServer := testutil.NewMockServer(testutil.ToEndpoints("addr:123"))
+ ctx = fake.SetServerFactory(ctx, func(_ *context.T, _ string, impl interface{}, _ security.Authorizer, _ ...rpc.ServerOpt) (*context.T, rpc.Server) {
+ dirServer = impl.(idiscovery.DirectoryServerStub)
+ return ctx, mockServer
+ })
+
+ df, err := idiscovery.NewFactory(ctx, mock.New())
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer df.Shutdown()
+
+ ad := discovery.Advertisement{
+ InterfaceName: "v.io/v23/a",
+ Addresses: []string{"/h1:123/x"},
+ Attachments: discovery.Attachments{"a": []byte{1, 2, 3}},
+ }
+
+ d1, err := df.New(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ stop, err := testutil.Advertise(ctx, d1, nil, &ad)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // The directory server should serve the advertisement.
+ fetched, err := dirServer.Lookup(ctx, nil, ad.Id)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !reflect.DeepEqual(fetched.Ad, ad) {
+ t.Errorf("got %v, but wanted %v", fetched.Ad, ad)
+ }
+
+ attachment, err := dirServer.GetAttachment(ctx, nil, ad.Id, "a")
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !bytes.Equal(attachment, ad.Attachments["a"]) {
+ t.Errorf("got %v, but wanted %v", attachment, ad.Attachments["a"])
+ }
+
+ attachment, err = dirServer.GetAttachment(ctx, nil, ad.Id, "b")
+ if err != nil {
+ t.Fatal(err)
+ }
+ if attachment != nil {
+ t.Errorf("got %v, but wanted %v", attachment, nil)
+ }
+
+ // Stop advertising; Shouldn't be served by the directory server.
+ stop()
+
+ if _, err = dirServer.Lookup(ctx, nil, ad.Id); err == nil {
+ t.Error("expected an error, but got none")
+ }
+ if _, err = dirServer.GetAttachment(ctx, nil, ad.Id, "a"); err == nil {
+ t.Error("expected an error, but got none")
+ }
+}
+
+func TestDirectoryRoaming(t *testing.T) {
+ ctx, shutdown := test.V23Init()
+ defer shutdown()
+
+ var dirServer idiscovery.DirectoryServerStub
+
+ eps := testutil.ToEndpoints("addr:123")
+ mockServer := testutil.NewMockServer(eps)
+ ctx = fake.SetServerFactory(ctx, func(_ *context.T, _ string, impl interface{}, _ security.Authorizer, _ ...rpc.ServerOpt) (*context.T, rpc.Server) {
+ dirServer = impl.(idiscovery.DirectoryServerStub)
+ return ctx, mockServer
+ })
+
+ mockPlugin := mock.NewWithAdStatus(idiscovery.AdPartiallyReady)
+ df, err := idiscovery.NewFactory(ctx, mockPlugin)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer df.Shutdown()
+
+ ad := discovery.Advertisement{
+ InterfaceName: "v.io/v23/a",
+ Addresses: []string{"/h1:123/x"},
+ }
+
+ d1, err := df.New(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ stop, err := testutil.Advertise(ctx, d1, nil, &ad)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer stop()
+
+ d2, err := df.New(ctx)
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ // Open a new scan channel and consume expected advertisements first.
+ scanCh, scanStop, err := testutil.Scan(ctx, d2, ``)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer scanStop()
+
+ update := <-scanCh
+ if !testutil.MatchFound(ctx, []discovery.Update{update}, ad) {
+ t.Errorf("unexpected scan: %v", update)
+ }
+
+ fetched, err := dirServer.Lookup(ctx, nil, ad.Id)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !reflect.DeepEqual(testutil.ToEndpoints(fetched.DirAddrs...), eps) {
+ t.Errorf("got %v, but wanted %v", testutil.ToEndpoints(fetched.DirAddrs...), eps)
+ }
+
+ updateCh := make(chan *idiscovery.AdInfo)
+ if err = mockPlugin.Scan(ctx, "", updateCh, func() {}); err != nil {
+ t.Fatal(err)
+ }
+ <-updateCh
+
+ eps = testutil.ToEndpoints("addr:456")
+ mockServer.UpdateNetwork(eps)
+
+ // Wait until the address change is applied
+ for {
+ adinfo := <-updateCh
+ if reflect.DeepEqual(testutil.ToEndpoints(adinfo.DirAddrs...), eps) {
+ break
+ }
+ }
+
+ // Make sure that a new advertisement is published.
+ update = <-scanCh
+ if !testutil.MatchLost(ctx, []discovery.Update{update}, ad) {
+ t.Errorf("unexpected scan: %v", update)
+ }
+ update = <-scanCh
+ if !testutil.MatchFound(ctx, []discovery.Update{update}, ad) {
+ t.Errorf("unexpected scan: %v", update)
+ }
+
+ fetched, err = dirServer.Lookup(ctx, nil, ad.Id)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if !reflect.DeepEqual(testutil.ToEndpoints(fetched.DirAddrs...), eps) {
+ t.Errorf("got %v, but wanted %v", testutil.ToEndpoints(fetched.DirAddrs...), eps)
+ }
+}
diff --git a/lib/discovery/test/doc.go b/lib/discovery/test/doc.go
new file mode 100644
index 0000000..2fc346a
--- /dev/null
+++ b/lib/discovery/test/doc.go
@@ -0,0 +1,9 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// package test contains tests that rely on a fake runtime.
+//
+// TODO(jhahn): Move tests to the 'discovery' directory once we support a dynamic
+// runtime initialization.
+package test
diff --git a/lib/discovery/testutil/mock_server.go b/lib/discovery/testutil/mock_server.go
new file mode 100644
index 0000000..efc5807
--- /dev/null
+++ b/lib/discovery/testutil/mock_server.go
@@ -0,0 +1,57 @@
+// Copyright 2016 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package testutil
+
+import (
+ "strings"
+ "sync"
+
+ "v.io/v23"
+ "v.io/v23/naming"
+ "v.io/v23/rpc"
+)
+
+type MockServer struct {
+ mu sync.Mutex
+
+ eps []naming.Endpoint // GUARDED_BY(mu)
+ dirty chan struct{} // GUARDED_BY(mu)
+}
+
+func (s *MockServer) AddName(string) error { return nil }
+func (s *MockServer) RemoveName(string) {}
+func (s *MockServer) Closed() <-chan struct{} { return nil }
+func (s *MockServer) Status() rpc.ServerStatus {
+ defer s.mu.Unlock()
+ s.mu.Lock()
+ return rpc.ServerStatus{
+ Endpoints: s.eps,
+ Dirty: s.dirty,
+ }
+}
+
+func (s *MockServer) UpdateNetwork(eps []naming.Endpoint) {
+ defer s.mu.Unlock()
+ s.mu.Lock()
+ s.eps = eps
+ close(s.dirty)
+ s.dirty = make(chan struct{})
+}
+
+func NewMockServer(eps []naming.Endpoint) *MockServer {
+ return &MockServer{
+ eps: eps,
+ dirty: make(chan struct{}),
+ }
+}
+
+func ToEndpoints(addrs ...string) []naming.Endpoint {
+ eps := make([]naming.Endpoint, len(addrs))
+ for i, addr := range addrs {
+ addr = strings.TrimPrefix(addr, "/")
+ eps[i], _ = v23.NewEndpoint(addr)
+ }
+ return eps
+}
diff --git a/lib/discovery/testutil/util.go b/lib/discovery/testutil/util.go
index dc00663..142ac9d 100644
--- a/lib/discovery/testutil/util.go
+++ b/lib/discovery/testutil/util.go
@@ -149,7 +149,7 @@
if update.IsLost() != lost {
return false
}
- if !updateEqual(ctx, update, want) {
+ if !UpdateEqual(ctx, update, want) {
return false
}
delete(updateMap, update.Id())
@@ -157,7 +157,7 @@
return len(updateMap) == 0
}
-func updateEqual(ctx *context.T, update discovery.Update, ad discovery.Advertisement) bool {
+func UpdateEqual(ctx *context.T, update discovery.Update, ad discovery.Advertisement) bool {
if update.Id() != ad.Id {
return false
}
@@ -178,7 +178,10 @@
}
}
for k, v := range ad.Attachments {
- r := <-update.Attachment(ctx, k)
+ r, ok := <-update.Attachment(ctx, k)
+ if !ok {
+ return false
+ }
if r.Error != nil {
return false
}
diff --git a/lib/discovery/types.vdl b/lib/discovery/types.vdl
index 37634a0..a5ff813 100644
--- a/lib/discovery/types.vdl
+++ b/lib/discovery/types.vdl
@@ -8,6 +8,8 @@
"v.io/v23/discovery"
)
+type Uuid []byte
+
type EncryptionAlgorithm int32
type EncryptionKey []byte
@@ -17,7 +19,13 @@
IbeEncryption = EncryptionAlgorithm(2)
)
-type Uuid []byte
+type AdStatus byte
+
+const (
+ AdReady = AdStatus(0) // All information is available
+ AdNotReady = AdStatus(1) // Not all information is available for querying against it
+ AdPartiallyReady = AdStatus(2) // All information except attachments is available
+)
// AdInfo represents advertisement information for discovery.
type AdInfo struct {
@@ -30,13 +38,16 @@
// decrypt it. The format of this data is a function of the algorithm.
EncryptionKeys []EncryptionKey
- // Hash of the current advertisement.
+ // Hash of the current advertisement. This does not include the fields below.
Hash AdHash
// The addresses (vanadium object names) that the advertisement directory service
// is served on. See directory.vdl.
DirAddrs []string
+ // Status of the current advertisement. Valid for scanned advertisements.
+ Status AdStatus
+
// TODO(jhahn): Add proximity.
// TODO(jhahn): Use proximity for Lost.
Lost bool
diff --git a/lib/discovery/update.go b/lib/discovery/update.go
index 76e3350..2014f88 100644
--- a/lib/discovery/update.go
+++ b/lib/discovery/update.go
@@ -16,6 +16,7 @@
ad discovery.Advertisement
hash AdHash
dirAddrs []string
+ status AdStatus
lost bool
}
@@ -32,18 +33,37 @@
func (u *update) Attribute(name string) string { return u.ad.Attributes[name] }
func (u *update) Attachment(ctx *context.T, name string) <-chan discovery.DataOrError {
- // TODO(jhahn): Handle lazy fetching.
- var r discovery.DataOrError
- if data := u.ad.Attachments[name]; len(data) > 0 {
+ ch := make(chan discovery.DataOrError, 1)
+ if data, ok := u.ad.Attachments[name]; ok {
+ var r discovery.DataOrError
r.Data = make([]byte, len(data))
copy(r.Data, data)
+ ch <- r
+ close(ch)
+ } else if u.status == AdPartiallyReady {
+ go u.fetchAttachment(ctx, name, ch)
+ } else {
+ close(ch)
}
- ch := make(chan discovery.DataOrError, 1)
- ch <- r
- close(ch)
return ch
}
+func (u *update) fetchAttachment(ctx *context.T, name string, ch chan<- discovery.DataOrError) {
+ defer close(ch)
+
+ dir := newDirClient(u.dirAddrs)
+ data, err := dir.GetAttachment(ctx, u.ad.Id, name)
+ if err != nil {
+ select {
+ case <-ctx.Done():
+ default:
+ ctx.Error(err)
+ }
+ return
+ }
+ ch <- discovery.DataOrError{Data: data}
+}
+
func (u *update) Advertisement() discovery.Advertisement {
ad := discovery.Advertisement{
Id: u.ad.Id,
@@ -76,6 +96,7 @@
ad: adinfo.Ad,
hash: adinfo.Hash,
dirAddrs: adinfo.DirAddrs,
+ status: adinfo.Status,
lost: adinfo.Lost,
}
}
diff --git a/lib/discovery/util.go b/lib/discovery/util.go
index df33f95..61c8eef 100644
--- a/lib/discovery/util.go
+++ b/lib/discovery/util.go
@@ -10,10 +10,12 @@
"encoding/binary"
"io"
"sort"
+
+ "v.io/v23/naming"
)
-// Hash hashes the advertisement.
-func HashAd(adinfo *AdInfo) {
+// hashAd hashes the advertisement.
+func hashAd(adinfo *AdInfo) {
w := func(w io.Writer, data []byte) {
sum := sha256.Sum256(data)
w.Write(sum[:])
@@ -67,12 +69,28 @@
}
hasher.Write(field.Sum(nil))
- field.Reset()
- for _, addr := range adinfo.DirAddrs {
- w(field, []byte(addr))
- }
- hasher.Write(field.Sum(nil))
-
// We use the first 8 bytes to reduce the advertise packet size.
copy(adinfo.Hash[:], hasher.Sum(nil))
}
+
+func sortedNames(eps []naming.Endpoint) []string {
+ names := make([]string, len(eps))
+ for i, ep := range eps {
+ names[i] = ep.Name()
+ }
+ sort.Strings(names)
+ return names
+}
+
+func sortedStringsEqual(a, b []string) bool {
+ // We want to make a nil and an empty slices equal to avoid unnecessary inequality by that.
+ if len(a) != len(b) {
+ return false
+ }
+ for i, v := range a {
+ if v != b[i] {
+ return false
+ }
+ }
+ return true
+}
diff --git a/lib/discovery/util_test.go b/lib/discovery/util_test.go
index 35afb02..d763c89 100644
--- a/lib/discovery/util_test.go
+++ b/lib/discovery/util_test.go
@@ -29,27 +29,27 @@
// Shouldn't be changed by hashing.
a2 := a1
- HashAd(&a2)
+ hashAd(&a2)
a2.Hash = AdHash{}
if !reflect.DeepEqual(a1, a2) {
t.Errorf("shouldn't be changed by hash: %v, %v", a1, a2)
}
// Should have the same hash for the same advertisements.
- HashAd(&a1)
- HashAd(&a2)
+ hashAd(&a1)
+ hashAd(&a2)
if a1.Hash != a2.Hash {
t.Errorf("expected same hash, but got different: %v, %v", a1.Hash, a2.Hash)
}
// Should be idempotent.
- HashAd(&a2)
+ hashAd(&a2)
if a1.Hash != a2.Hash {
t.Errorf("expected same hash, but got different: %v, %v", a1.Hash, a2.Hash)
}
a2.Ad.Id = discovery.AdId{4, 5, 6}
- HashAd(&a2)
+ hashAd(&a2)
if a1.Hash == a2.Hash {
t.Errorf("expected different hashes, but got same: %v, %v", a1.Hash, a2.Hash)
}
@@ -61,7 +61,7 @@
for k, v := range a1.Ad.Attributes {
a2.Ad.Attachments[k] = []byte(v)
}
- HashAd(&a2)
+ hashAd(&a2)
if a1.Hash == a2.Hash {
t.Errorf("expected different hashes, but got same: %v, %v", a1.Hash, a2.Hash)
}
@@ -76,7 +76,7 @@
for i := len(keys) - 1; i >= 0; i-- {
a2.Ad.Attributes[keys[i]] = a1.Ad.Attributes[keys[i]]
}
- HashAd(&a2)
+ hashAd(&a2)
if a1.Hash != a2.Hash {
t.Errorf("expected same hash, but got different: %v, %v", a1.Hash, a2.Hash)
}
@@ -84,7 +84,7 @@
// Shouldn't distinguish between nil and empty.
a2 = a1
a2.Ad.Attachments = make(discovery.Attachments)
- HashAd(&a2)
+ hashAd(&a2)
if a1.Hash != a2.Hash {
t.Errorf("expected same hash, but got different: %v, %v", a1.Hash, a2.Hash)
}
@@ -114,7 +114,7 @@
// Ensure that every single field of advertisement is hashed.
ad := AdInfo{}
- HashAd(&ad)
+ hashAd(&ad)
for ty, i := reflect.TypeOf(ad.Ad), 0; i < ty.NumField(); i++ {
oldAd := ad
@@ -122,7 +122,7 @@
fieldName := reflect.TypeOf(ad.Ad).Field(i).Name
gen(reflect.ValueOf(&ad.Ad).Elem().FieldByName(fieldName))
- HashAd(&ad)
+ hashAd(&ad)
if oldAd.Hash == ad.Hash {
t.Errorf("Ad.%s: expected different hashes, but got same: %v, %v", fieldName, oldAd.Hash, ad.Hash)
@@ -135,12 +135,12 @@
fieldName := reflect.TypeOf(ad).Field(i).Name
switch fieldName {
- case "Ad", "Hash", "Lost":
+ case "Ad", "Hash", "DirAddrs", "Status", "Lost":
continue
}
gen(reflect.ValueOf(&ad).Elem().FieldByName(fieldName))
- HashAd(&ad)
+ hashAd(&ad)
if oldAd.Hash == ad.Hash {
t.Errorf("AdInfo.%s: expected different hashes, but got same: %v, %v", fieldName, oldAd.Hash, ad.Hash)
diff --git a/runtime/factories/fake/rpc.go b/runtime/factories/fake/rpc.go
index 6c99454..54f8630 100644
--- a/runtime/factories/fake/rpc.go
+++ b/runtime/factories/fake/rpc.go
@@ -66,9 +66,21 @@
return factory(ctx, channelTimeout), nil
}
+// SetServerFactory can be used to inject a mock Server implementation into the context.
+// When v23.WithNewServer is called the passed function will be invoked.
+func SetServerFactory(ctx *context.T, factory func(*context.T, string, interface{}, security.Authorizer, ...rpc.ServerOpt) (*context.T, rpc.Server)) *context.T {
+ return context.WithValue(ctx, serverFactoryKey, factory)
+}
+
func (r *Runtime) WithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
- panic("unimplemented")
+
+ factory, ok := ctx.Value(serverFactoryKey).(func(*context.T, string, interface{}, security.Authorizer, ...rpc.ServerOpt) (*context.T, rpc.Server))
+ if !ok {
+ panic("Calling WithNewServer on the fake runtime, but no factory has been set.")
+ }
+ ctx, server := factory(ctx, name, object, auth, opts...)
+ return ctx, server, nil
}
func (r *Runtime) WithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
diff --git a/runtime/factories/fake/runtime.go b/runtime/factories/fake/runtime.go
index 86f87a6..1efe7dc 100644
--- a/runtime/factories/fake/runtime.go
+++ b/runtime/factories/fake/runtime.go
@@ -28,6 +28,7 @@
clientFactoryKey
flowFactoryKey
+ serverFactoryKey
)
type Runtime struct {