discovery: support Lost

  Add "Lost" update handling.

Change-Id: I69d9d0d4360f77e304dd1a2b55ba7ddaeb8fc3bd
diff --git a/lib/discovery/discovery.go b/lib/discovery/discovery.go
index b502095..10b7377 100644
--- a/lib/discovery/discovery.go
+++ b/lib/discovery/discovery.go
@@ -25,6 +25,8 @@
 	ServiceUuid uuid.UUID
 
 	// TODO(jhahn): Add proximity.
+	// TODO(jhahn): Use proximity for Lost.
+	Lost bool
 }
 
 // TODO(jhahn): Need a better API.
diff --git a/lib/discovery/discovery_test.go b/lib/discovery/discovery_test.go
index 89ac563..df10892 100644
--- a/lib/discovery/discovery_test.go
+++ b/lib/discovery/discovery_test.go
@@ -41,6 +41,7 @@
 		stops = append(stops, stop)
 	}
 
+	// Make sure all advertisements are discovered.
 	if err := scanAndMatch(ds, "v.io/v23/a", services[0]); err != nil {
 		t.Error(err)
 	}
@@ -54,16 +55,31 @@
 		t.Error(err)
 	}
 
-	// Stop advertising the first service. Shouldn't affect the other.
-	stops[0]()
-	if err := scanAndMatch(ds, "v.io/v23/a"); err != nil {
+	// Open a new scan channel and consume expected advertisements first.
+	scan, scanStop, err := startScan(ds, "v.io/v23/a")
+	if err != nil {
 		t.Error(err)
 	}
+	defer scanStop()
+	update := <-scan
+	if !matchFound([]discovery.Update{update}, services[0]) {
+		t.Errorf("Unexpected scan: %v", update)
+	}
+
+	// Make sure scan returns the lost advertisement when advertising is stopped.
+	stops[0]()
+
+	update = <-scan
+	if !matchLost([]discovery.Update{update}, services[0]) {
+		t.Errorf("Unexpected scan: %v", update)
+	}
+
+	// Also it shouldn't affect the other.
 	if err := scanAndMatch(ds, "v.io/v23/b", services[1]); err != nil {
 		t.Error(err)
 	}
 
-	// Stop advertising the other. Now shouldn't discover any service.
+	// Stop advertising the remaining one; Shouldn't discover any service.
 	stops[1]()
 	if err := scanAndMatch(ds, ""); err != nil {
 		t.Error(err)
@@ -80,16 +96,26 @@
 	return cancel, nil
 }
 
-func scan(ds discovery.Scanner, query string) ([]discovery.Update, error) {
-	ctx, _ := context.RootContext()
-	updateCh, err := ds.Scan(ctx, query)
+func startScan(ds discovery.Scanner, query string) (<-chan discovery.Update, func(), error) {
+	ctx, stop := context.RootContext()
+	scan, err := ds.Scan(ctx, query)
 	if err != nil {
-		return nil, fmt.Errorf("Scan failed: %v", err)
+		return nil, nil, fmt.Errorf("Scan failed: %v", err)
 	}
+	return scan, stop, err
+}
+
+func scan(ds discovery.Scanner, query string) ([]discovery.Update, error) {
+	scan, stop, err := startScan(ds, query)
+	if err != nil {
+		return nil, err
+	}
+	defer stop()
+
 	var updates []discovery.Update
 	for {
 		select {
-		case update := <-updateCh:
+		case update := <-scan:
 			updates = append(updates, update)
 		case <-time.After(5 * time.Millisecond):
 			return updates, nil
@@ -97,15 +123,22 @@
 	}
 }
 
-func match(updates []discovery.Update, wants ...discovery.Service) bool {
+func match(updates []discovery.Update, lost bool, wants ...discovery.Service) bool {
 	for _, want := range wants {
 		matched := false
 		for i, update := range updates {
-			found, ok := update.(discovery.UpdateFound)
-			if !ok {
-				continue
+			var service discovery.Service
+			switch u := update.(type) {
+			case discovery.UpdateFound:
+				if !lost {
+					service = u.Value.Service
+				}
+			case discovery.UpdateLost:
+				if lost {
+					service = u.Value.Service
+				}
 			}
-			matched = reflect.DeepEqual(found.Value.Service, want)
+			matched = reflect.DeepEqual(service, want)
 			if matched {
 				updates = append(updates[:i], updates[i+1:]...)
 				break
@@ -118,6 +151,14 @@
 	return len(updates) == 0
 }
 
+func matchFound(updates []discovery.Update, wants ...discovery.Service) bool {
+	return match(updates, false, wants...)
+}
+
+func matchLost(updates []discovery.Update, wants ...discovery.Service) bool {
+	return match(updates, true, wants...)
+}
+
 func scanAndMatch(ds discovery.Scanner, query string, wants ...discovery.Service) error {
 	const timeout = 3 * time.Second
 
@@ -130,7 +171,7 @@
 		if err != nil {
 			return err
 		}
-		if match(updates, wants...) {
+		if matchFound(updates, wants...) {
 			return nil
 		}
 	}
diff --git a/lib/discovery/plugins/mdns/mdns.go b/lib/discovery/plugins/mdns/mdns.go
index a02c322..baa7339 100644
--- a/lib/discovery/plugins/mdns/mdns.go
+++ b/lib/discovery/plugins/mdns/mdns.go
@@ -191,8 +191,9 @@
 			InstanceUuid: instanceUuid,
 			Attrs:        make(discovery.Attributes),
 		},
+		Lost: len(service.SrvRRs) == 0 && len(service.TxtRRs) == 0,
 	}
-	// TODO(jhahn): Handle lost service.
+
 	for _, rr := range service.TxtRRs {
 		for _, txt := range rr.Txt {
 			kv := strings.SplitN(txt, "=", 2)
diff --git a/lib/discovery/plugins/mdns/mdns_test.go b/lib/discovery/plugins/mdns/mdns_test.go
index 0356d7f..b6beeae 100644
--- a/lib/discovery/plugins/mdns/mdns_test.go
+++ b/lib/discovery/plugins/mdns/mdns_test.go
@@ -75,6 +75,7 @@
 		stops = append(stops, stop)
 	}
 
+	// Make sure all advertisements are discovered.
 	if err := scanAndMatch(p2, "v.io/x", services[0], services[1]); err != nil {
 		t.Error(err)
 	}
@@ -88,6 +89,7 @@
 		t.Error(err)
 	}
 
+	// Make sure it is not discovered when advertising is stopped.
 	stops[0]()
 	if err := scanAndMatch(p2, "v.io/x", services[1]); err != nil {
 		t.Error(err)
@@ -95,8 +97,28 @@
 	if err := scanAndMatch(p2, "", services[1], services[2]); err != nil {
 		t.Error(err)
 	}
-	stops[1]()
+
+	// Open a new scan channel and consume expected advertisements first.
+	scan, scanStop, err := startScan(p2, "v.io/y")
+	if err != nil {
+		t.Error(err)
+	}
+	defer scanStop()
+	ad := *<-scan
+	if !matchFound([]ldiscovery.Advertisement{ad}, services[2]) {
+		t.Errorf("Unexpected scan: %v", ad)
+	}
+
+	// Make sure scan returns the lost advertisement when advertising is stopped.
 	stops[2]()
+
+	ad = *<-scan
+	if !matchLost([]ldiscovery.Advertisement{ad}, services[2]) {
+		t.Errorf("Unexpected scan: %v", ad)
+	}
+
+	// Stop advertising the remaining one; Shouldn't discover anything.
+	stops[1]()
 	if err := scanAndMatch(p2, ""); err != nil {
 		t.Error(err)
 	}
@@ -114,20 +136,30 @@
 	return cancel, nil
 }
 
-func scan(p ldiscovery.Plugin, interfaceName string) ([]ldiscovery.Advertisement, error) {
-	ctx, _ := context.RootContext()
-	scanCh := make(chan *ldiscovery.Advertisement)
+func startScan(p ldiscovery.Plugin, interfaceName string) (<-chan *ldiscovery.Advertisement, func(), error) {
+	ctx, stop := context.RootContext()
+	scan := make(chan *ldiscovery.Advertisement)
 	var serviceUuid uuid.UUID
 	if len(interfaceName) > 0 {
 		serviceUuid = ldiscovery.NewServiceUUID(interfaceName)
 	}
-	if err := p.Scan(ctx, serviceUuid, scanCh); err != nil {
-		return nil, fmt.Errorf("Scan failed: %v", err)
+	if err := p.Scan(ctx, serviceUuid, scan); err != nil {
+		return nil, nil, fmt.Errorf("Scan failed: %v", err)
 	}
+	return scan, stop, nil
+}
+
+func scan(p ldiscovery.Plugin, interfaceName string) ([]ldiscovery.Advertisement, error) {
+	scan, stop, err := startScan(p, interfaceName)
+	if err != nil {
+		return nil, err
+	}
+	defer stop()
+
 	var ads []ldiscovery.Advertisement
 	for {
 		select {
-		case ad := <-scanCh:
+		case ad := <-scan:
 			ads = append(ads, *ad)
 		case <-time.After(10 * time.Millisecond):
 			return ads, nil
@@ -135,14 +167,18 @@
 	}
 }
 
-func match(ads []ldiscovery.Advertisement, wants ...discovery.Service) bool {
+func match(ads []ldiscovery.Advertisement, lost bool, wants ...discovery.Service) bool {
 	for _, want := range wants {
 		matched := false
 		for i, ad := range ads {
 			if !uuid.Equal(ad.ServiceUuid, ldiscovery.NewServiceUUID(want.InterfaceName)) {
 				continue
 			}
-			matched = reflect.DeepEqual(ad.Service, want)
+			if lost {
+				matched = ad.Lost
+			} else {
+				matched = lost || reflect.DeepEqual(ad.Service, want)
+			}
 			if matched {
 				ads = append(ads[:i], ads[i+1:]...)
 				break
@@ -155,6 +191,14 @@
 	return len(ads) == 0
 }
 
+func matchFound(ads []ldiscovery.Advertisement, wants ...discovery.Service) bool {
+	return match(ads, false, wants...)
+}
+
+func matchLost(ads []ldiscovery.Advertisement, wants ...discovery.Service) bool {
+	return match(ads, true, wants...)
+}
+
 func scanAndMatch(p ldiscovery.Plugin, interfaceName string, wants ...discovery.Service) error {
 	const timeout = 1 * time.Second
 
@@ -167,7 +211,7 @@
 		if err != nil {
 			return err
 		}
-		if match(ads, wants...) {
+		if matchFound(ads, wants...) {
 			return nil
 		}
 	}
diff --git a/lib/discovery/plugins/mock/mock.go b/lib/discovery/plugins/mock/mock.go
index cf202cb..a15fc8a 100644
--- a/lib/discovery/plugins/mock/mock.go
+++ b/lib/discovery/plugins/mock/mock.go
@@ -5,6 +5,7 @@
 package mock
 
 import (
+	"reflect"
 	"sync"
 
 	"github.com/pborman/uuid"
@@ -17,18 +18,22 @@
 type plugin struct {
 	mu       sync.Mutex
 	services map[string][]*discovery.Advertisement // GUARDED_BY(mu)
+
+	updated *sync.Cond
 }
 
 func (p *plugin) Advertise(ctx *context.T, ad *discovery.Advertisement) error {
 	p.mu.Lock()
-	defer p.mu.Unlock()
 	key := string(ad.ServiceUuid)
 	ads := p.services[key]
 	p.services[key] = append(ads, ad)
+	p.mu.Unlock()
+	p.updated.Broadcast()
+
 	go func() {
 		<-ctx.Done()
+
 		p.mu.Lock()
-		defer p.mu.Unlock()
 		ads := p.services[key]
 		for i, a := range ads {
 			if uuid.Equal(a.InstanceUuid, ad.InstanceUuid) {
@@ -41,30 +46,82 @@
 		} else {
 			delete(p.services, key)
 		}
+		p.mu.Unlock()
+		p.updated.Broadcast()
 	}()
 	return nil
 }
 
 func (p *plugin) Scan(ctx *context.T, serviceUuid uuid.UUID, scanCh chan<- *discovery.Advertisement) error {
+	rescan := make(chan struct{})
 	go func() {
-		p.mu.Lock()
-		defer p.mu.Unlock()
-		for key, service := range p.services {
-			if len(serviceUuid) > 0 && key != string(serviceUuid) {
-				continue
+		for {
+			p.updated.L.Lock()
+			p.updated.Wait()
+			p.updated.L.Unlock()
+			select {
+			case rescan <- struct{}{}:
+			case <-ctx.Done():
+				return
 			}
-			for _, ad := range service {
+		}
+	}()
+
+	go func() {
+		scanned := make(map[string]*discovery.Advertisement)
+
+		for {
+			current := make(map[string]*discovery.Advertisement)
+			p.mu.Lock()
+			for key, ads := range p.services {
+				if len(serviceUuid) > 0 && key != string(serviceUuid) {
+					continue
+				}
+				for _, ad := range ads {
+					current[string(ad.InstanceUuid)] = ad
+				}
+			}
+			p.mu.Unlock()
+
+			changed := make([]*discovery.Advertisement, 0, len(current))
+			for key, ad := range current {
+				old, ok := scanned[key]
+				if !ok || !reflect.DeepEqual(old, ad) {
+					changed = append(changed, ad)
+				}
+			}
+			for key, ad := range scanned {
+				if _, ok := current[key]; !ok {
+					ad.Lost = true
+					changed = append(changed, ad)
+				}
+			}
+
+			// Push new changes.
+			for _, ad := range changed {
 				select {
 				case scanCh <- ad:
 				case <-ctx.Done():
 					return
 				}
 			}
+
+			scanned = current
+
+			// Wait the next update.
+			select {
+			case <-rescan:
+			case <-ctx.Done():
+				return
+			}
 		}
 	}()
 	return nil
 }
 
 func New() discovery.Plugin {
-	return &plugin{services: make(map[string][]*discovery.Advertisement)}
+	return &plugin{
+		services: make(map[string][]*discovery.Advertisement),
+		updated:  sync.NewCond(&sync.Mutex{}),
+	}
 }
diff --git a/lib/discovery/scan.go b/lib/discovery/scan.go
index 9425201..634e8df 100644
--- a/lib/discovery/scan.go
+++ b/lib/discovery/scan.go
@@ -40,9 +40,11 @@
 		select {
 		case ad := <-scanCh:
 			// TODO(jhahn): Merge scanData based on InstanceUuid.
-			// TODO(jhahn): Handle "Lost" case.
-			update := discovery.UpdateFound{
-				Value: discovery.Found{Service: ad.Service},
+			var update discovery.Update
+			if ad.Lost {
+				update = discovery.UpdateLost{discovery.Lost{Service: ad.Service}}
+			} else {
+				update = discovery.UpdateFound{discovery.Found{Service: ad.Service}}
 			}
 			updateCh <- update
 		case <-ctx.Done():