Merge "discovery: refresh mdns subscription if there is no update for a while"
diff --git a/lib/discovery/plugins/mdns/mdns.go b/lib/discovery/plugins/mdns/mdns.go
index b048cd2..4cc5e6a 100644
--- a/lib/discovery/plugins/mdns/mdns.go
+++ b/lib/discovery/plugins/mdns/mdns.go
@@ -115,39 +115,20 @@
go func() {
defer done()
- // Subscribe to the service if not subscribed yet or if we haven't refreshed in a while.
- p.subscriptionMu.Lock()
- sub := p.subscription[serviceName]
- sub.count++
- if time.Since(sub.lastSubscription) > p.subscriptionRefreshTime {
- p.mdns.SubscribeToService(serviceName)
- // Wait a bit to learn about neighborhood.
- time.Sleep(p.subscriptionWaitTime)
- sub.lastSubscription = time.Now()
- }
- p.subscription[serviceName] = sub
- p.subscriptionMu.Unlock()
-
- // Watch the membership changes.
+ p.subscribeToService(serviceName)
watcher, stopWatcher := p.mdns.ServiceMemberWatch(serviceName)
defer func() {
stopWatcher()
- p.subscriptionMu.Lock()
- sub := p.subscription[serviceName]
- sub.count--
- if sub.count == 0 {
- delete(p.subscription, serviceName)
- p.mdns.UnsubscribeFromService(serviceName)
- } else {
- p.subscription[serviceName] = sub
- }
- p.subscriptionMu.Unlock()
+ p.unsubscribeFromService(serviceName)
}()
for {
var service mdns.ServiceInstance
select {
case service = <-watcher:
+ case <-time.After(p.subscriptionRefreshTime):
+ p.refreshSubscription(serviceName)
+ continue
case <-ctx.Done():
return
}
@@ -166,6 +147,46 @@
return nil
}
+func (p *plugin) subscribeToService(serviceName string) {
+ p.subscriptionMu.Lock()
+ sub := p.subscription[serviceName]
+ sub.count++
+ p.subscription[serviceName] = sub
+ p.subscriptionMu.Unlock()
+ p.refreshSubscription(serviceName)
+}
+
+func (p *plugin) unsubscribeFromService(serviceName string) {
+ p.subscriptionMu.Lock()
+ sub := p.subscription[serviceName]
+ sub.count--
+ if sub.count == 0 {
+ delete(p.subscription, serviceName)
+ p.mdns.UnsubscribeFromService(serviceName)
+ } else {
+ p.subscription[serviceName] = sub
+ }
+ p.subscriptionMu.Unlock()
+}
+
+func (p *plugin) refreshSubscription(serviceName string) {
+ p.subscriptionMu.Lock()
+ sub, ok := p.subscription[serviceName]
+ if !ok {
+ p.subscriptionMu.Unlock()
+ return
+ }
+ // Subscribe to the service again if we haven't refreshed in a while.
+ if time.Since(sub.lastSubscription) > p.subscriptionRefreshTime {
+ p.mdns.SubscribeToService(serviceName)
+ // Wait a bit to learn about neighborhood.
+ time.Sleep(p.subscriptionWaitTime)
+ sub.lastSubscription = time.Now()
+ }
+ p.subscription[serviceName] = sub
+ p.subscriptionMu.Unlock()
+}
+
func createTxtRecords(ad *idiscovery.Advertisement) ([]string, error) {
// Prepare a txt record with attributes and addresses to announce.
txt := appendTxtRecord(nil, attrInterface, ad.InterfaceName)
@@ -287,7 +308,7 @@
mdns: m,
adStopper: idiscovery.NewTrigger(),
// TODO(jhahn): Figure out a good subscription refresh time.
- subscriptionRefreshTime: 10 * time.Second,
+ subscriptionRefreshTime: 15 * time.Second,
subscription: make(map[string]subscription),
}
if loopback {