discovery/ble: wait until all active tasks finish in shutdown

  Wait until all active tasks finish when shutting down the plugin

  * Also make loopback plugin New() API same as others.

Change-Id: Icb3e7e957e2c7177cf1581aae2403bc83d9c5a5b
diff --git a/lib/discovery/plugins/ble/scanner.go b/lib/discovery/plugins/ble/scanner.go
index da8e017..e9eb10f 100644
--- a/lib/discovery/plugins/ble/scanner.go
+++ b/lib/discovery/plugins/ble/scanner.go
@@ -20,6 +20,7 @@
 
 	rescan chan struct{}
 	done   chan struct{}
+	wg     sync.WaitGroup
 
 	mu          sync.Mutex
 	listeners   map[string][]chan<- *idiscovery.AdInfo // GUARDED_BY(mu)
@@ -81,10 +82,14 @@
 
 func (s *scanner) shutdown() {
 	close(s.done)
+	s.wg.Wait()
 }
 
 func (s *scanner) scanLoop() {
-	refreshInterval := s.ttl / 2
+	defer s.wg.Done()
+
+	var gcWg sync.WaitGroup
+	defer gcWg.Wait()
 
 	isScanning := false
 	stopScan := func() {
@@ -95,6 +100,7 @@
 	}
 	defer stopScan()
 
+	refreshInterval := s.ttl / 2
 	var refresh <-chan time.Time
 	for {
 		select {
@@ -131,7 +137,8 @@
 			isScanning = true
 		}
 
-		go s.gc()
+		gcWg.Add(1)
+		go s.gc(&gcWg)
 		refresh = time.After(refreshInterval)
 	}
 }
@@ -149,14 +156,23 @@
 		s.mu.Lock()
 		s.scanRecords[adinfo.Ad.Id] = &scanRecord{adinfo.Ad.InterfaceName, time.Now().Add(s.ttl)}
 		for _, ch := range append(s.listeners[adinfo.Ad.InterfaceName], s.listeners[""]...) {
-			ch <- adinfo
+			select {
+			case ch <- adinfo:
+			case <-s.done:
+				s.mu.Unlock()
+				return
+			}
 		}
 		s.mu.Unlock()
 	}
 }
 
-func (s *scanner) gc() {
+func (s *scanner) gc(wg *sync.WaitGroup) {
+	defer wg.Done()
+
 	s.mu.Lock()
+	defer s.mu.Unlock()
+
 	now := time.Now()
 	for id, rec := range s.scanRecords {
 		if rec.expiry.After(now) {
@@ -166,10 +182,13 @@
 		delete(s.scanRecords, id)
 		adinfo := &idiscovery.AdInfo{Ad: discovery.Advertisement{Id: id}, Lost: true}
 		for _, ch := range append(s.listeners[rec.interfaceName], s.listeners[""]...) {
-			ch <- adinfo
+			select {
+			case ch <- adinfo:
+			case <-s.done:
+				return
+			}
 		}
 	}
-	s.mu.Unlock()
 }
 
 func newScanner(ctx *context.T, driver Driver, ttl time.Duration) *scanner {
@@ -182,6 +201,7 @@
 		scanRecords: make(map[discovery.AdId]*scanRecord),
 		ttl:         ttl,
 	}
+	s.wg.Add(1)
 	go s.scanLoop()
 	return s
 }
diff --git a/lib/discovery/plugins/loopback/loopback.go b/lib/discovery/plugins/loopback/loopback.go
index 402bb70..d2e7968 100644
--- a/lib/discovery/plugins/loopback/loopback.go
+++ b/lib/discovery/plugins/loopback/loopback.go
@@ -151,11 +151,11 @@
 }
 
 // New returns a new loopback plugin instance.
-func New() idiscovery.Plugin {
+func New(ctx *context.T, host string) (idiscovery.Plugin, error) {
 	p := &plugin{
 		adinfoMap: make(map[string]map[discovery.AdId]*idiscovery.AdInfo),
 		adStopper: idiscovery.NewTrigger(),
 	}
 	p.updated = sync.NewCond(&p.mu)
-	return p
+	return p, nil
 }
diff --git a/lib/discovery/plugins/loopback/loopback_test.go b/lib/discovery/plugins/loopback/loopback_test.go
index 20d206b..b941ac5 100644
--- a/lib/discovery/plugins/loopback/loopback_test.go
+++ b/lib/discovery/plugins/loopback/loopback_test.go
@@ -48,7 +48,10 @@
 		},
 	}
 
-	p := loopback.New()
+	p, err := loopback.New(ctx, "h1")
+	if err != nil {
+		t.Fatal(err)
+	}
 
 	var stops []func()
 	for i, _ := range adinfos {