lib/discovery: Prevent plugins from stomping on each other.

Prior to this commit, when multiple plugins are used for discovery then
events from one could stomp those from the other. For example, consider
two devices that have both mDNS and BLE connectivity to each other.
Services advertised will be picked up on both channels. If they move
out of BLE range (but are still on the same IP network), then the
"Lost" event from the BLE plugin would be delivered to the client which
might then mistakenly determine that the advertisement is no longer
valid (even though the mDNS plugin still sees it).

This commit fixes that by introducing reference counts per-plugin.
"Lost" events are delivered to the client only if *all* plugins that
previously saw an advertisement no longer see it.

The test added in discovery_test.go tests this behavior and fails
without the corresponding changes in scan.go

Change-Id: I455ba4c0b7c3211e2b8db84c53086a147e1d7587
diff --git a/lib/discovery/discovery.go b/lib/discovery/discovery.go
index 81a2192..54b8c6e 100644
--- a/lib/discovery/discovery.go
+++ b/lib/discovery/discovery.go
@@ -100,6 +100,11 @@
 	if len(plugins) == 0 {
 		return nil, NewErrNoDiscoveryPlugin(ctx)
 	}
+	if actual, limit := int32(len(plugins)), int32(32); actual > limit {
+		// Because adref used in scan.go uses a 32-bit bitmap to
+		// associate ads with the plugin that found them.
+		return nil, NewErrTooManyPlugins(ctx, actual, limit)
+	}
 	statsMu.Lock()
 	statsPrefix := naming.Join("discovery", fmt.Sprint(statsIdx))
 	statsIdx++
diff --git a/lib/discovery/discovery.vdl.go b/lib/discovery/discovery.vdl.go
index 652a9fb..1bd9fbb 100644
--- a/lib/discovery/discovery.vdl.go
+++ b/lib/discovery/discovery.vdl.go
@@ -445,6 +445,7 @@
 	ErrBadQuery               = verror.Register("v.io/x/ref/lib/discovery.BadQuery", verror.NoRetry, "{1:}{2:} invalid query: {3}")
 	ErrDiscoveryClosed        = verror.Register("v.io/x/ref/lib/discovery.DiscoveryClosed", verror.NoRetry, "{1:}{2:} discovery closed")
 	ErrNoDiscoveryPlugin      = verror.Register("v.io/x/ref/lib/discovery.NoDiscoveryPlugin", verror.NoRetry, "{1:}{2:} no discovery plugin")
+	ErrTooManyPlugins         = verror.Register("v.io/x/ref/lib/discovery.TooManyPlugins", verror.NoRetry, "{1:}{2:} too many plugins ({3}), support at most {4}")
 )
 
 // NewErrAdvertisementNotFound returns an error with the ErrAdvertisementNotFound ID.
@@ -477,6 +478,11 @@
 	return verror.New(ErrNoDiscoveryPlugin, ctx)
 }
 
+// NewErrTooManyPlugins returns an error with the ErrTooManyPlugins ID.
+func NewErrTooManyPlugins(ctx *context.T, actual int32, limit int32) error {
+	return verror.New(ErrTooManyPlugins, ctx, actual, limit)
+}
+
 //////////////////////////////////////////////////
 // Interface definitions
 
@@ -683,6 +689,7 @@
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrBadQuery.ID), "{1:}{2:} invalid query: {3}")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrDiscoveryClosed.ID), "{1:}{2:} discovery closed")
 	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrNoDiscoveryPlugin.ID), "{1:}{2:} no discovery plugin")
+	i18n.Cat().SetWithBase(i18n.LangID("en"), i18n.MsgID(ErrTooManyPlugins.ID), "{1:}{2:} too many plugins ({3}), support at most {4}")
 
 	return struct{}{}
 }
diff --git a/lib/discovery/discovery_test.go b/lib/discovery/discovery_test.go
index b5482ea..fb32e5a 100644
--- a/lib/discovery/discovery_test.go
+++ b/lib/discovery/discovery_test.go
@@ -6,6 +6,7 @@
 
 import (
 	"bytes"
+	"fmt"
 	"reflect"
 	"testing"
 	"time"
@@ -264,6 +265,99 @@
 		t.Errorf("unexpected scan: %v", update)
 	case <-time.After(5 * time.Millisecond):
 	}
+
+	// Lost in both, should see a lost event once.
+	p1.UnregisterAd(&newAdinfo)
+	p2.UnregisterAd(&newAdinfo)
+	update = <-scanCh
+	if !testutil.MatchLost(ctx, []discovery.Update{update}, newAdinfo.Ad) {
+		t.Errorf("unexpected scan: %v", update)
+	}
+	select {
+	case update = <-scanCh:
+		t.Errorf("unexpected scan: %v", update)
+	case <-time.After(5 * time.Millisecond):
+	}
+}
+
+func TestLostInOneButNotAllPlugins(t *testing.T) {
+	ctx, shutdown := test.V23Init()
+	defer shutdown()
+
+	p1, p2 := mock.New(), mock.New()
+	df, _ := idiscovery.NewFactory(ctx, p1, p2)
+	defer df.Shutdown()
+
+	ad := idiscovery.AdInfo{
+		Ad: discovery.Advertisement{
+			Id:            discovery.AdId{1, 2, 3},
+			InterfaceName: "v.io/v23/a",
+			Addresses:     []string{"/h1:123/x"},
+		},
+		Hash:        idiscovery.AdHash{1, 2, 3},
+		TimestampNs: time.Now().UnixNano(),
+	}
+
+	olderAd := ad
+	olderAd.TimestampNs -= 1000000000
+	olderAd.Hash = idiscovery.AdHash{4, 5, 6}
+
+	newerAd := ad
+	newerAd.TimestampNs += 1000000000
+	newerAd.Hash = idiscovery.AdHash{7, 8, 9}
+
+	d, _ := df.New(ctx)
+	scanCh, scanStop, err := testutil.Scan(ctx, d, "")
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer scanStop()
+
+	noevent := func() error {
+		select {
+		case update := <-scanCh:
+			return fmt.Errorf("unexpected scan: %v", update)
+		case <-time.After(5 * time.Millisecond):
+			return nil
+		}
+	}
+
+	p1.RegisterAd(&ad)
+	if update := <-scanCh; !testutil.MatchFound(ctx, []discovery.Update{update}, ad.Ad) {
+		t.Errorf("unexpected scan: %v", update)
+	}
+	// p2 sees the same ad, but no event should be delivered.
+	p2.RegisterAd(&ad)
+	if err := noevent(); err != nil {
+		t.Error(err)
+	}
+
+	// And if p1 loses ad, but p2 doesn't, then nothing should be delivered.
+	p1.UnregisterAd(&ad)
+	if err := noevent(); err != nil {
+		t.Error(err)
+	}
+
+	// An older ad should be ignored
+	p1.RegisterAd(&olderAd)
+	if err := noevent(); err != nil {
+		t.Error(err)
+	}
+
+	// But a newer one should be seen as a LOST + FOUND
+	p2.RegisterAd(&newerAd)
+	if update := <-scanCh; !testutil.MatchLost(ctx, []discovery.Update{update}, ad.Ad) {
+		t.Errorf("unexpected: %v", update)
+	}
+	if update := <-scanCh; !testutil.MatchFound(ctx, []discovery.Update{update}, newerAd.Ad) {
+		t.Errorf("unexpected: %v", update)
+	}
+
+	// Newer ad lost by p2 and never seen by p1, should be lost
+	p2.UnregisterAd(&newerAd)
+	if update := <-scanCh; !testutil.MatchLost(ctx, []discovery.Update{update}, ad.Ad) {
+		t.Errorf("unexpected: %v", update)
+	}
 }
 
 func TestTimestamp(t *testing.T) {
diff --git a/lib/discovery/errors.vdl b/lib/discovery/errors.vdl
index c6592a5..0d0cdc6 100644
--- a/lib/discovery/errors.vdl
+++ b/lib/discovery/errors.vdl
@@ -32,4 +32,8 @@
 	NoDiscoveryPlugin() {
 		"en": "no discovery plugin",
 	}
+
+	TooManyPlugins(actual, limit int32) {
+		"en": "too many plugins ({actual}), support at most {limit}",
+	}
 )
diff --git a/lib/discovery/plugin.go b/lib/discovery/plugin.go
index e7f9b2c..97a4f0b 100644
--- a/lib/discovery/plugin.go
+++ b/lib/discovery/plugin.go
@@ -25,16 +25,16 @@
 	Advertise(ctx *context.T, adinfo *AdInfo, done func()) error
 
 	// Scan scans advertisements that match the interface name and returns scanned
-	// advertisements to the channel.
+	// advertisements via the callback.
 	//
 	// An empty interface name means any advertisements.
 	//
-	// Advertisements that are returned through the channel can be changed.
-	// The plugin should not reuse the returned advertisement.
+	// The callback takes ownership of the provided AdInfo, and the plugin
+	// should not use the advertisement after invoking the callback.
 	//
 	// Scanning should continue until the context is canceled or exceeds its
 	// deadline. done should be called once when scanning is done or canceled.
-	Scan(ctx *context.T, interfaceName string, ch chan<- *AdInfo, done func()) error
+	Scan(ctx *context.T, interfaceName string, callback func(*AdInfo), done func()) error
 
 	// Close closes the plugin.
 	//
diff --git a/lib/discovery/plugins/ble/ble.go b/lib/discovery/plugins/ble/ble.go
index e08ecb7..edc24d6 100644
--- a/lib/discovery/plugins/ble/ble.go
+++ b/lib/discovery/plugins/ble/ble.go
@@ -49,7 +49,7 @@
 	return nil
 }
 
-func (p *plugin) Scan(ctx *context.T, interfaceName string, ch chan<- *idiscovery.AdInfo, done func()) error {
+func (p *plugin) Scan(ctx *context.T, interfaceName string, callback func(*idiscovery.AdInfo), done func()) error {
 	go func() {
 		defer done()
 
@@ -91,11 +91,7 @@
 					seenMu.Unlock()
 				}
 				copied := *adinfo
-				select {
-				case ch <- &copied:
-				case <-ctx.Done():
-					return
-				}
+				callback(&copied)
 			case <-ctx.Done():
 				return
 			}
diff --git a/lib/discovery/plugins/loopback/loopback.go b/lib/discovery/plugins/loopback/loopback.go
index d2e7968..30677bd 100644
--- a/lib/discovery/plugins/loopback/loopback.go
+++ b/lib/discovery/plugins/loopback/loopback.go
@@ -39,7 +39,7 @@
 	return nil
 }
 
-func (p *plugin) Scan(ctx *context.T, interfaceName string, ch chan<- *idiscovery.AdInfo, done func()) error {
+func (p *plugin) Scan(ctx *context.T, interfaceName string, callback func(*idiscovery.AdInfo), done func()) error {
 	updated := p.listenToUpdates(ctx)
 
 	go func() {
@@ -64,11 +64,7 @@
 
 			// Push new changes.
 			for i := range changed {
-				select {
-				case ch <- &changed[i]:
-				case <-ctx.Done():
-					return
-				}
+				callback(&changed[i])
 			}
 			seen = current
 
diff --git a/lib/discovery/plugins/mdns/mdns.go b/lib/discovery/plugins/mdns/mdns.go
index 0c72e0d..4fb4faf 100644
--- a/lib/discovery/plugins/mdns/mdns.go
+++ b/lib/discovery/plugins/mdns/mdns.go
@@ -122,7 +122,7 @@
 	return nil
 }
 
-func (p *plugin) Scan(ctx *context.T, interfaceName string, ch chan<- *idiscovery.AdInfo, done func()) error {
+func (p *plugin) Scan(ctx *context.T, interfaceName string, callback func(*idiscovery.AdInfo), done func()) error {
 	var serviceName string
 	if len(interfaceName) == 0 {
 		serviceName = v23ServiceName
@@ -155,11 +155,7 @@
 				ctx.Error(err)
 				continue
 			}
-			select {
-			case ch <- adinfo:
-			case <-ctx.Done():
-				return
-			}
+			callback(adinfo)
 		}
 	}()
 	return nil
diff --git a/lib/discovery/plugins/mock/mock.go b/lib/discovery/plugins/mock/mock.go
index dd92aa7..87a5059 100644
--- a/lib/discovery/plugins/mock/mock.go
+++ b/lib/discovery/plugins/mock/mock.go
@@ -34,7 +34,7 @@
 	return nil
 }
 
-func (p *plugin) Scan(ctx *context.T, interfaceName string, ch chan<- *idiscovery.AdInfo, done func()) error {
+func (p *plugin) Scan(ctx *context.T, interfaceName string, callback func(*idiscovery.AdInfo), done func()) error {
 	rescan := make(chan struct{})
 	go func() {
 		defer close(rescan)
@@ -89,11 +89,7 @@
 
 			// Push new changes.
 			for i := range changed {
-				select {
-				case ch <- &changed[i]:
-				case <-ctx.Done():
-					return
-				}
+				callback(&changed[i])
 			}
 
 			seen = current
diff --git a/lib/discovery/plugins/testutil/util.go b/lib/discovery/plugins/testutil/util.go
index 399745b..abf743b 100644
--- a/lib/discovery/plugins/testutil/util.go
+++ b/lib/discovery/plugins/testutil/util.go
@@ -35,9 +35,10 @@
 func Scan(ctx *context.T, p idiscovery.Plugin, interfaceName string) (<-chan *idiscovery.AdInfo, func(), error) {
 	ctx, cancel := context.WithCancel(ctx)
 	scanCh := make(chan *idiscovery.AdInfo)
+	callback := func(ad *idiscovery.AdInfo) { scanCh <- ad }
 	var wg sync.WaitGroup
 	wg.Add(1)
-	if err := p.Scan(ctx, interfaceName, scanCh, wg.Done); err != nil {
+	if err := p.Scan(ctx, interfaceName, callback, wg.Done); err != nil {
 		return nil, nil, fmt.Errorf("Scan failed: %v", err)
 	}
 	stop := func() {
diff --git a/lib/discovery/plugins/vine/vine.go b/lib/discovery/plugins/vine/vine.go
index 5a1deb2..4f71bc1 100644
--- a/lib/discovery/plugins/vine/vine.go
+++ b/lib/discovery/plugins/vine/vine.go
@@ -76,7 +76,7 @@
 	return nil
 }
 
-func (p *plugin) Scan(ctx *context.T, interfaceName string, ch chan<- *idiscovery.AdInfo, done func()) error {
+func (p *plugin) Scan(ctx *context.T, interfaceName string, callback func(*idiscovery.AdInfo), done func()) error {
 	updated := p.store.listenToUpdates(ctx)
 
 	go func() {
@@ -100,11 +100,7 @@
 			}
 			// Push new changes.
 			for i := range changed {
-				select {
-				case ch <- &changed[i]:
-				case <-ctx.Done():
-					return
-				}
+				callback(&changed[i])
 			}
 			seen = current
 
diff --git a/lib/discovery/scan.go b/lib/discovery/scan.go
index 5c8511c..b7511d5 100644
--- a/lib/discovery/scan.go
+++ b/lib/discovery/scan.go
@@ -12,6 +12,11 @@
 	"v.io/v23/discovery"
 )
 
+type scanChanElem struct {
+	src uint // index into idiscovery.plugins
+	val *AdInfo
+}
+
 func (d *idiscovery) scan(ctx *context.T, session sessionId, query string) (<-chan discovery.Update, error) {
 	// TODO(jhahn): Consider to use multiple target services so that the plugins
 	// can filter advertisements more efficiently if possible.
@@ -26,7 +31,7 @@
 	}
 
 	// TODO(jhahn): Revisit the buffer size.
-	scanCh := make(chan *AdInfo, 10)
+	scanCh := make(chan scanChanElem, 10)
 	updateCh := make(chan discovery.Update, 10)
 
 	barrier := NewBarrier(func() {
@@ -34,8 +39,15 @@
 		close(updateCh)
 		d.removeTask(ctx)
 	})
-	for _, plugin := range d.plugins {
-		if err := plugin.Scan(ctx, matcher.TargetInterfaceName(), scanCh, barrier.Add()); err != nil {
+	for idx, plugin := range d.plugins {
+		p := uint(idx) // https://golang.org/doc/faq#closures_and_goroutines
+		callback := func(ad *AdInfo) {
+			select {
+			case scanCh <- scanChanElem{p, ad}:
+			case <-ctx.Done():
+			}
+		}
+		if err := plugin.Scan(ctx, matcher.TargetInterfaceName(), callback, barrier.Add()); err != nil {
 			cancel()
 			return nil, err
 		}
@@ -44,12 +56,36 @@
 	return updateCh, nil
 }
 
-func (d *idiscovery) doScan(ctx *context.T, session sessionId, matcher Matcher, scanCh chan *AdInfo, updateCh chan<- discovery.Update, done func()) {
+type adref struct {
+	adinfo *AdInfo
+	refs   uint32 // Bitmap of plugin indices that saw the ad
+}
+
+func (a *adref) set(plugin uint) {
+	mask := uint32(1) << plugin
+	a.refs = a.refs | mask
+}
+
+func (a *adref) unset(plugin uint) bool {
+	mask := uint32(1) << plugin
+	a.refs = a.refs & (^mask)
+	return a.refs == 0
+}
+
+func (d *idiscovery) doScan(ctx *context.T, session sessionId, matcher Matcher, scanCh chan scanChanElem, updateCh chan<- discovery.Update, done func()) {
 	// Some plugins may not return a full advertisement information when it is lost.
 	// So we keep the advertisements that we've seen so that we can provide the
 	// full advertisement information when it is lost. Note that plugins will not
 	// include attachments unless they're tiny enough.
-	seen := make(map[discovery.AdId]*AdInfo)
+	seen := make(map[discovery.AdId]*adref)
+	send := func(u discovery.Update) bool {
+		select {
+		case updateCh <- u:
+			return true
+		case <-ctx.Done():
+			return false
+		}
+	}
 
 	var wg sync.WaitGroup
 	defer func() {
@@ -59,37 +95,52 @@
 
 	for {
 		select {
-		case adinfo := <-scanCh:
-			if !adinfo.Lost {
-				// Filter out advertisements from the same session.
-				if d.getAdSession(adinfo.Ad.Id) == session {
+		case <-ctx.Done():
+			return
+		case e := <-scanCh:
+			plugin, adinfo := e.src, e.val
+			id := adinfo.Ad.Id
+			prev := seen[adinfo.Ad.Id]
+			if adinfo.Lost {
+				// A 'Lost' advertisement may not have complete
+				// information.  Send the lost notification on
+				// updateCh only if a found event was
+				// previously sent, and all plugins that found
+				// it have lost it.
+				if prev == nil || !prev.unset(plugin) {
 					continue
 				}
-				// Filter out already seen advertisements.
-				if prev := seen[adinfo.Ad.Id]; prev != nil && prev.Status == AdReady && prev.Hash == adinfo.Hash {
-					continue
-				}
-
-				if adinfo.Status == AdReady {
-					// Clear the unnecessary directory addresses.
-					adinfo.DirAddrs = nil
-				} else {
-					if len(adinfo.DirAddrs) == 0 {
-						ctx.Errorf("no directory address available for partial advertisement %v - ignored", adinfo.Ad.Id)
-						continue
-					}
-
-					// Fetch not-ready-to-serve advertisements from the directory server.
-					if adinfo.Status == AdNotReady {
-						wg.Add(1)
-						go fetchAd(ctx, adinfo.DirAddrs, adinfo.Ad.Id, scanCh, wg.Done)
-						continue
-					}
-
-					// Sort the directory addresses to make it easy to compare.
-					sort.Strings(adinfo.DirAddrs)
+				delete(seen, id)
+				prev.adinfo.Lost = true
+				if !send(NewUpdate(prev.adinfo)) {
+					return
 				}
 			}
+			if d.getAdSession(id) == session {
+				// Ignore advertisements made within the same session.
+				continue
+			}
+			if prev != nil && prev.adinfo.Hash == adinfo.Hash {
+				prev.set(plugin)
+				if prev.adinfo.Status == AdReady {
+					continue
+				}
+			}
+			if adinfo.Status == AdReady {
+				// Clear the unnecessary directory addresses.
+				adinfo.DirAddrs = nil
+			} else if len(adinfo.DirAddrs) == 0 {
+				ctx.Errorf("no directory address available for partial advertisement %v - ignored", id)
+				continue
+			} else if adinfo.Status == AdNotReady {
+				// Fetch not-ready-to-serve advertisements from the directory server.
+				wg.Add(1)
+				go fetchAd(ctx, adinfo.DirAddrs, id, plugin, scanCh, wg.Done)
+				continue
+			}
+
+			// Sort the directory addresses to make it easy to compare.
+			sort.Strings(adinfo.DirAddrs)
 
 			if err := decrypt(ctx, adinfo); err != nil {
 				// Couldn't decrypt it. Ignore it.
@@ -99,34 +150,43 @@
 				continue
 			}
 
-			// Note that 'Lost' advertisement may not have full information. Thus we do not match
-			// the query against it. newUpdates() will ignore it if it has not been scanned.
-			if !adinfo.Lost {
-				matched, err := matcher.Match(&adinfo.Ad)
-				if err != nil {
-					ctx.Error(err)
-					continue
-				}
-				if !matched {
-					continue
-				}
+			if matched, err := matcher.Match(&adinfo.Ad); err != nil {
+				ctx.Error(err)
+				continue
+			} else if !matched {
+				continue
 			}
 
-			for _, update := range newUpdates(seen, adinfo) {
-				select {
-				case updateCh <- update:
-				case <-ctx.Done():
+			if prev == nil {
+				// Never seen before
+				ref := &adref{adinfo: adinfo}
+				ref.set(plugin)
+				seen[id] = ref
+				if !send(NewUpdate(adinfo)) {
+					return
+				}
+				continue
+			}
+			if prev.adinfo.TimestampNs > adinfo.TimestampNs {
+				// Ignore old ad.
+				continue
+			}
+			// TODO(jhahn): Compare proximity as well
+			if prev.adinfo.Hash != adinfo.Hash || (prev.adinfo.Status != AdReady && !sortedStringsEqual(prev.adinfo.DirAddrs, adinfo.DirAddrs)) {
+				// Changed contents of a previously seen ad. Treat it like a newly seen ad.
+				ref := &adref{adinfo: adinfo}
+				ref.set(plugin)
+				seen[id] = ref
+				prev.adinfo.Lost = true
+				if !send(NewUpdate(prev.adinfo)) || !send(NewUpdate(adinfo)) {
 					return
 				}
 			}
-
-		case <-ctx.Done():
-			return
 		}
 	}
 }
 
-func fetchAd(ctx *context.T, dirAddrs []string, id discovery.AdId, scanCh chan<- *AdInfo, done func()) {
+func fetchAd(ctx *context.T, dirAddrs []string, id discovery.AdId, plugin uint, scanCh chan<- scanChanElem, done func()) {
 	defer done()
 
 	dir := newDirClient(dirAddrs)
@@ -140,38 +200,7 @@
 		return
 	}
 	select {
-	case scanCh <- adinfo:
+	case scanCh <- scanChanElem{plugin, adinfo}:
 	case <-ctx.Done():
 	}
 }
-
-func newUpdates(seen map[discovery.AdId]*AdInfo, adinfo *AdInfo) []discovery.Update {
-	var updates []discovery.Update
-	// The multiple plugins may return the same advertisements. We ignores the update
-	// if it has been already sent through the update channel.
-	prev := seen[adinfo.Ad.Id]
-	if adinfo.Lost {
-		// TODO(jhahn): If some plugins return 'Lost' events for an advertisement update, we may
-		// generates multiple 'Lost' and 'Found' events for the same update. In order to minimize
-		// this flakiness, we may need to delay the handling of 'Lost'.
-		if prev != nil {
-			delete(seen, prev.Ad.Id)
-			prev.Lost = true
-			updates = []discovery.Update{NewUpdate(prev)}
-		}
-	} else {
-		// TODO(jhahn): Need to compare the proximity as well.
-		switch {
-		case prev == nil:
-			updates = []discovery.Update{NewUpdate(adinfo)}
-			seen[adinfo.Ad.Id] = adinfo
-		case prev.TimestampNs > adinfo.TimestampNs:
-			// Ignore the old advertisement.
-		case prev.Hash != adinfo.Hash || (prev.Status != AdReady && !sortedStringsEqual(prev.DirAddrs, adinfo.DirAddrs)):
-			prev.Lost = true
-			updates = []discovery.Update{NewUpdate(prev), NewUpdate(adinfo)}
-			seen[adinfo.Ad.Id] = adinfo
-		}
-	}
-	return updates
-}
diff --git a/lib/discovery/test/directory_test.go b/lib/discovery/test/directory_test.go
index 2aced6a..914923b 100644
--- a/lib/discovery/test/directory_test.go
+++ b/lib/discovery/test/directory_test.go
@@ -153,7 +153,7 @@
 	}
 
 	updateCh := make(chan *idiscovery.AdInfo)
-	if err = mockPlugin.Scan(ctx, "", updateCh, func() {}); err != nil {
+	if err = mockPlugin.Scan(ctx, "", func(ad *idiscovery.AdInfo) { updateCh <- ad }, func() {}); err != nil {
 		t.Fatal(err)
 	}
 	<-updateCh