discovery/ble: wait until all active tasks finish in shutdown
Wait until all active tasks finish when shutting down the plugin
* Also make loopback plugin New() API same as others.
Change-Id: Icb3e7e957e2c7177cf1581aae2403bc83d9c5a5b
diff --git a/lib/discovery/plugins/ble/scanner.go b/lib/discovery/plugins/ble/scanner.go
index da8e017..e9eb10f 100644
--- a/lib/discovery/plugins/ble/scanner.go
+++ b/lib/discovery/plugins/ble/scanner.go
@@ -20,6 +20,7 @@
rescan chan struct{}
done chan struct{}
+ wg sync.WaitGroup
mu sync.Mutex
listeners map[string][]chan<- *idiscovery.AdInfo // GUARDED_BY(mu)
@@ -81,10 +82,14 @@
func (s *scanner) shutdown() {
close(s.done)
+ s.wg.Wait()
}
func (s *scanner) scanLoop() {
- refreshInterval := s.ttl / 2
+ defer s.wg.Done()
+
+ var gcWg sync.WaitGroup
+ defer gcWg.Wait()
isScanning := false
stopScan := func() {
@@ -95,6 +100,7 @@
}
defer stopScan()
+ refreshInterval := s.ttl / 2
var refresh <-chan time.Time
for {
select {
@@ -131,7 +137,8 @@
isScanning = true
}
- go s.gc()
+ gcWg.Add(1)
+ go s.gc(&gcWg)
refresh = time.After(refreshInterval)
}
}
@@ -149,14 +156,23 @@
s.mu.Lock()
s.scanRecords[adinfo.Ad.Id] = &scanRecord{adinfo.Ad.InterfaceName, time.Now().Add(s.ttl)}
for _, ch := range append(s.listeners[adinfo.Ad.InterfaceName], s.listeners[""]...) {
- ch <- adinfo
+ select {
+ case ch <- adinfo:
+ case <-s.done:
+ s.mu.Unlock()
+ return
+ }
}
s.mu.Unlock()
}
}
-func (s *scanner) gc() {
+func (s *scanner) gc(wg *sync.WaitGroup) {
+ defer wg.Done()
+
s.mu.Lock()
+ defer s.mu.Unlock()
+
now := time.Now()
for id, rec := range s.scanRecords {
if rec.expiry.After(now) {
@@ -166,10 +182,13 @@
delete(s.scanRecords, id)
adinfo := &idiscovery.AdInfo{Ad: discovery.Advertisement{Id: id}, Lost: true}
for _, ch := range append(s.listeners[rec.interfaceName], s.listeners[""]...) {
- ch <- adinfo
+ select {
+ case ch <- adinfo:
+ case <-s.done:
+ return
+ }
}
}
- s.mu.Unlock()
}
func newScanner(ctx *context.T, driver Driver, ttl time.Duration) *scanner {
@@ -182,6 +201,7 @@
scanRecords: make(map[discovery.AdId]*scanRecord),
ttl: ttl,
}
+ s.wg.Add(1)
go s.scanLoop()
return s
}
diff --git a/lib/discovery/plugins/loopback/loopback.go b/lib/discovery/plugins/loopback/loopback.go
index 402bb70..d2e7968 100644
--- a/lib/discovery/plugins/loopback/loopback.go
+++ b/lib/discovery/plugins/loopback/loopback.go
@@ -151,11 +151,11 @@
}
// New returns a new loopback plugin instance.
-func New() idiscovery.Plugin {
+func New(ctx *context.T, host string) (idiscovery.Plugin, error) {
p := &plugin{
adinfoMap: make(map[string]map[discovery.AdId]*idiscovery.AdInfo),
adStopper: idiscovery.NewTrigger(),
}
p.updated = sync.NewCond(&p.mu)
- return p
+ return p, nil
}
diff --git a/lib/discovery/plugins/loopback/loopback_test.go b/lib/discovery/plugins/loopback/loopback_test.go
index 20d206b..b941ac5 100644
--- a/lib/discovery/plugins/loopback/loopback_test.go
+++ b/lib/discovery/plugins/loopback/loopback_test.go
@@ -48,7 +48,10 @@
},
}
- p := loopback.New()
+ p, err := loopback.New(ctx, "h1")
+ if err != nil {
+ t.Fatal(err)
+ }
var stops []func()
for i, _ := range adinfos {