discovery: support Lost
Add "Lost" update handling.
Change-Id: I69d9d0d4360f77e304dd1a2b55ba7ddaeb8fc3bd
diff --git a/lib/discovery/discovery.go b/lib/discovery/discovery.go
index b502095..10b7377 100644
--- a/lib/discovery/discovery.go
+++ b/lib/discovery/discovery.go
@@ -25,6 +25,8 @@
ServiceUuid uuid.UUID
// TODO(jhahn): Add proximity.
+ // TODO(jhahn): Use proximity for Lost.
+ Lost bool
}
// TODO(jhahn): Need a better API.
diff --git a/lib/discovery/discovery_test.go b/lib/discovery/discovery_test.go
index 89ac563..df10892 100644
--- a/lib/discovery/discovery_test.go
+++ b/lib/discovery/discovery_test.go
@@ -41,6 +41,7 @@
stops = append(stops, stop)
}
+ // Make sure all advertisements are discovered.
if err := scanAndMatch(ds, "v.io/v23/a", services[0]); err != nil {
t.Error(err)
}
@@ -54,16 +55,31 @@
t.Error(err)
}
- // Stop advertising the first service. Shouldn't affect the other.
- stops[0]()
- if err := scanAndMatch(ds, "v.io/v23/a"); err != nil {
+ // Open a new scan channel and consume expected advertisements first.
+ scan, scanStop, err := startScan(ds, "v.io/v23/a")
+ if err != nil {
t.Error(err)
}
+ defer scanStop()
+ update := <-scan
+ if !matchFound([]discovery.Update{update}, services[0]) {
+ t.Errorf("Unexpected scan: %v", update)
+ }
+
+ // Make sure scan returns the lost advertisement when advertising is stopped.
+ stops[0]()
+
+ update = <-scan
+ if !matchLost([]discovery.Update{update}, services[0]) {
+ t.Errorf("Unexpected scan: %v", update)
+ }
+
+ // Also it shouldn't affect the other.
if err := scanAndMatch(ds, "v.io/v23/b", services[1]); err != nil {
t.Error(err)
}
- // Stop advertising the other. Now shouldn't discover any service.
+ // Stop advertising the remaining one; Shouldn't discover any service.
stops[1]()
if err := scanAndMatch(ds, ""); err != nil {
t.Error(err)
@@ -80,16 +96,26 @@
return cancel, nil
}
-func scan(ds discovery.Scanner, query string) ([]discovery.Update, error) {
- ctx, _ := context.RootContext()
- updateCh, err := ds.Scan(ctx, query)
+func startScan(ds discovery.Scanner, query string) (<-chan discovery.Update, func(), error) {
+ ctx, stop := context.RootContext()
+ scan, err := ds.Scan(ctx, query)
if err != nil {
- return nil, fmt.Errorf("Scan failed: %v", err)
+ return nil, nil, fmt.Errorf("Scan failed: %v", err)
}
+ return scan, stop, err
+}
+
+func scan(ds discovery.Scanner, query string) ([]discovery.Update, error) {
+ scan, stop, err := startScan(ds, query)
+ if err != nil {
+ return nil, err
+ }
+ defer stop()
+
var updates []discovery.Update
for {
select {
- case update := <-updateCh:
+ case update := <-scan:
updates = append(updates, update)
case <-time.After(5 * time.Millisecond):
return updates, nil
@@ -97,15 +123,22 @@
}
}
-func match(updates []discovery.Update, wants ...discovery.Service) bool {
+func match(updates []discovery.Update, lost bool, wants ...discovery.Service) bool {
for _, want := range wants {
matched := false
for i, update := range updates {
- found, ok := update.(discovery.UpdateFound)
- if !ok {
- continue
+ var service discovery.Service
+ switch u := update.(type) {
+ case discovery.UpdateFound:
+ if !lost {
+ service = u.Value.Service
+ }
+ case discovery.UpdateLost:
+ if lost {
+ service = u.Value.Service
+ }
}
- matched = reflect.DeepEqual(found.Value.Service, want)
+ matched = reflect.DeepEqual(service, want)
if matched {
updates = append(updates[:i], updates[i+1:]...)
break
@@ -118,6 +151,14 @@
return len(updates) == 0
}
+func matchFound(updates []discovery.Update, wants ...discovery.Service) bool {
+ return match(updates, false, wants...)
+}
+
+func matchLost(updates []discovery.Update, wants ...discovery.Service) bool {
+ return match(updates, true, wants...)
+}
+
func scanAndMatch(ds discovery.Scanner, query string, wants ...discovery.Service) error {
const timeout = 3 * time.Second
@@ -130,7 +171,7 @@
if err != nil {
return err
}
- if match(updates, wants...) {
+ if matchFound(updates, wants...) {
return nil
}
}
diff --git a/lib/discovery/plugins/mdns/mdns.go b/lib/discovery/plugins/mdns/mdns.go
index a02c322..baa7339 100644
--- a/lib/discovery/plugins/mdns/mdns.go
+++ b/lib/discovery/plugins/mdns/mdns.go
@@ -191,8 +191,9 @@
InstanceUuid: instanceUuid,
Attrs: make(discovery.Attributes),
},
+ Lost: len(service.SrvRRs) == 0 && len(service.TxtRRs) == 0,
}
- // TODO(jhahn): Handle lost service.
+
for _, rr := range service.TxtRRs {
for _, txt := range rr.Txt {
kv := strings.SplitN(txt, "=", 2)
diff --git a/lib/discovery/plugins/mdns/mdns_test.go b/lib/discovery/plugins/mdns/mdns_test.go
index 0356d7f..b6beeae 100644
--- a/lib/discovery/plugins/mdns/mdns_test.go
+++ b/lib/discovery/plugins/mdns/mdns_test.go
@@ -75,6 +75,7 @@
stops = append(stops, stop)
}
+ // Make sure all advertisements are discovered.
if err := scanAndMatch(p2, "v.io/x", services[0], services[1]); err != nil {
t.Error(err)
}
@@ -88,6 +89,7 @@
t.Error(err)
}
+ // Make sure it is not discovered when advertising is stopped.
stops[0]()
if err := scanAndMatch(p2, "v.io/x", services[1]); err != nil {
t.Error(err)
@@ -95,8 +97,28 @@
if err := scanAndMatch(p2, "", services[1], services[2]); err != nil {
t.Error(err)
}
- stops[1]()
+
+ // Open a new scan channel and consume expected advertisements first.
+ scan, scanStop, err := startScan(p2, "v.io/y")
+ if err != nil {
+ t.Error(err)
+ }
+ defer scanStop()
+ ad := *<-scan
+ if !matchFound([]ldiscovery.Advertisement{ad}, services[2]) {
+ t.Errorf("Unexpected scan: %v", ad)
+ }
+
+ // Make sure scan returns the lost advertisement when advertising is stopped.
stops[2]()
+
+ ad = *<-scan
+ if !matchLost([]ldiscovery.Advertisement{ad}, services[2]) {
+ t.Errorf("Unexpected scan: %v", ad)
+ }
+
+ // Stop advertising the remaining one; Shouldn't discover anything.
+ stops[1]()
if err := scanAndMatch(p2, ""); err != nil {
t.Error(err)
}
@@ -114,20 +136,30 @@
return cancel, nil
}
-func scan(p ldiscovery.Plugin, interfaceName string) ([]ldiscovery.Advertisement, error) {
- ctx, _ := context.RootContext()
- scanCh := make(chan *ldiscovery.Advertisement)
+func startScan(p ldiscovery.Plugin, interfaceName string) (<-chan *ldiscovery.Advertisement, func(), error) {
+ ctx, stop := context.RootContext()
+ scan := make(chan *ldiscovery.Advertisement)
var serviceUuid uuid.UUID
if len(interfaceName) > 0 {
serviceUuid = ldiscovery.NewServiceUUID(interfaceName)
}
- if err := p.Scan(ctx, serviceUuid, scanCh); err != nil {
- return nil, fmt.Errorf("Scan failed: %v", err)
+ if err := p.Scan(ctx, serviceUuid, scan); err != nil {
+ return nil, nil, fmt.Errorf("Scan failed: %v", err)
}
+ return scan, stop, nil
+}
+
+func scan(p ldiscovery.Plugin, interfaceName string) ([]ldiscovery.Advertisement, error) {
+ scan, stop, err := startScan(p, interfaceName)
+ if err != nil {
+ return nil, err
+ }
+ defer stop()
+
var ads []ldiscovery.Advertisement
for {
select {
- case ad := <-scanCh:
+ case ad := <-scan:
ads = append(ads, *ad)
case <-time.After(10 * time.Millisecond):
return ads, nil
@@ -135,14 +167,18 @@
}
}
-func match(ads []ldiscovery.Advertisement, wants ...discovery.Service) bool {
+func match(ads []ldiscovery.Advertisement, lost bool, wants ...discovery.Service) bool {
for _, want := range wants {
matched := false
for i, ad := range ads {
if !uuid.Equal(ad.ServiceUuid, ldiscovery.NewServiceUUID(want.InterfaceName)) {
continue
}
- matched = reflect.DeepEqual(ad.Service, want)
+ if lost {
+ matched = ad.Lost
+ } else {
+ matched = lost || reflect.DeepEqual(ad.Service, want)
+ }
if matched {
ads = append(ads[:i], ads[i+1:]...)
break
@@ -155,6 +191,14 @@
return len(ads) == 0
}
+func matchFound(ads []ldiscovery.Advertisement, wants ...discovery.Service) bool {
+ return match(ads, false, wants...)
+}
+
+func matchLost(ads []ldiscovery.Advertisement, wants ...discovery.Service) bool {
+ return match(ads, true, wants...)
+}
+
func scanAndMatch(p ldiscovery.Plugin, interfaceName string, wants ...discovery.Service) error {
const timeout = 1 * time.Second
@@ -167,7 +211,7 @@
if err != nil {
return err
}
- if match(ads, wants...) {
+ if matchFound(ads, wants...) {
return nil
}
}
diff --git a/lib/discovery/plugins/mock/mock.go b/lib/discovery/plugins/mock/mock.go
index cf202cb..a15fc8a 100644
--- a/lib/discovery/plugins/mock/mock.go
+++ b/lib/discovery/plugins/mock/mock.go
@@ -5,6 +5,7 @@
package mock
import (
+ "reflect"
"sync"
"github.com/pborman/uuid"
@@ -17,18 +18,22 @@
type plugin struct {
mu sync.Mutex
services map[string][]*discovery.Advertisement // GUARDED_BY(mu)
+
+ updated *sync.Cond
}
func (p *plugin) Advertise(ctx *context.T, ad *discovery.Advertisement) error {
p.mu.Lock()
- defer p.mu.Unlock()
key := string(ad.ServiceUuid)
ads := p.services[key]
p.services[key] = append(ads, ad)
+ p.mu.Unlock()
+ p.updated.Broadcast()
+
go func() {
<-ctx.Done()
+
p.mu.Lock()
- defer p.mu.Unlock()
ads := p.services[key]
for i, a := range ads {
if uuid.Equal(a.InstanceUuid, ad.InstanceUuid) {
@@ -41,30 +46,82 @@
} else {
delete(p.services, key)
}
+ p.mu.Unlock()
+ p.updated.Broadcast()
}()
return nil
}
func (p *plugin) Scan(ctx *context.T, serviceUuid uuid.UUID, scanCh chan<- *discovery.Advertisement) error {
+ rescan := make(chan struct{})
go func() {
- p.mu.Lock()
- defer p.mu.Unlock()
- for key, service := range p.services {
- if len(serviceUuid) > 0 && key != string(serviceUuid) {
- continue
+ for {
+ p.updated.L.Lock()
+ p.updated.Wait()
+ p.updated.L.Unlock()
+ select {
+ case rescan <- struct{}{}:
+ case <-ctx.Done():
+ return
}
- for _, ad := range service {
+ }
+ }()
+
+ go func() {
+ scanned := make(map[string]*discovery.Advertisement)
+
+ for {
+ current := make(map[string]*discovery.Advertisement)
+ p.mu.Lock()
+ for key, ads := range p.services {
+ if len(serviceUuid) > 0 && key != string(serviceUuid) {
+ continue
+ }
+ for _, ad := range ads {
+ current[string(ad.InstanceUuid)] = ad
+ }
+ }
+ p.mu.Unlock()
+
+ changed := make([]*discovery.Advertisement, 0, len(current))
+ for key, ad := range current {
+ old, ok := scanned[key]
+ if !ok || !reflect.DeepEqual(old, ad) {
+ changed = append(changed, ad)
+ }
+ }
+ for key, ad := range scanned {
+ if _, ok := current[key]; !ok {
+ ad.Lost = true
+ changed = append(changed, ad)
+ }
+ }
+
+ // Push new changes.
+ for _, ad := range changed {
select {
case scanCh <- ad:
case <-ctx.Done():
return
}
}
+
+ scanned = current
+
+ // Wait the next update.
+ select {
+ case <-rescan:
+ case <-ctx.Done():
+ return
+ }
}
}()
return nil
}
func New() discovery.Plugin {
- return &plugin{services: make(map[string][]*discovery.Advertisement)}
+ return &plugin{
+ services: make(map[string][]*discovery.Advertisement),
+ updated: sync.NewCond(&sync.Mutex{}),
+ }
}
diff --git a/lib/discovery/scan.go b/lib/discovery/scan.go
index 9425201..634e8df 100644
--- a/lib/discovery/scan.go
+++ b/lib/discovery/scan.go
@@ -40,9 +40,11 @@
select {
case ad := <-scanCh:
// TODO(jhahn): Merge scanData based on InstanceUuid.
- // TODO(jhahn): Handle "Lost" case.
- update := discovery.UpdateFound{
- Value: discovery.Found{Service: ad.Service},
+ var update discovery.Update
+ if ad.Lost {
+ update = discovery.UpdateLost{discovery.Lost{Service: ad.Service}}
+ } else {
+ update = discovery.UpdateFound{discovery.Found{Service: ad.Service}}
}
updateCh <- update
case <-ctx.Done():