netconfig/roaming: Fix bug and simplify.

This patch began as a fix to a bug in handling of the socket used to
monitoring network configuration changes. Without this patch,
it is possible for one thread to be in a loop that repeatedly
reads from a file descriptor (recvfrom() in rtnetlinkWatcher.watcher in
Linux, read() in bsdNetConfigWatcher.watcher in BSD) while another
thread concurrently closes the same file descriptor
(NetconfigWatcher.Stop implementation in both).

This is fraught with peril. Quoting the 'manpage' of 'close':
"It  is  probably  unwise to close file descriptors while they may be in use by
system calls in other threads in the same process.  Since a file descriptor may
be reused, there are some obscure race conditions that may cause unintended
side effects."

Indeed, these side effects were causing flaky tests in some unrelated changes I
was working on.

In order to fix this, there were some options:
- Call "close" only from the goroutine executing "recvfrom", and use
  "shutdown" in Stop() to unblock any threads stuck in the kernel waiting
  on "recvfrom". Unfortunately, it seems that AF_NETLINK sockets in linux
  do not support "shutdown"
- Use some other scheme (such as poll/epoll/select on Linux and poll/kqueue on
  BSD) to be able to close the file descriptor in the same goroutine that
  executes reads. This was a lot of OS-specific code, which seemed
  to be more complicated than it was worth (see
  https://vanadium-review.googlesource.com/#/c/21811/ for example), OR
- Don't bother about doing a clean shutdown at all: If a process starts
  roaming network servers, then live with one file descriptor that will
  never be closed and one goroutine that might never exit.

This patch takes that last approach. In the process, layers of abstraction
are reduced and overall the code becomes tighter. For example, instead of
netconfig.NetConfigWatcher --> pubub.Publisher --> roaming.ReadRoamingStream
--> flow/manager.{add,rm}Addrs,

we have:

netconfig.NotifyChange --> flow/manager.updateAddresses

MultiPart: 1/2
Change-Id: I9182927025ab8879b7fd22fdd4bb2979ae5c2aa7
diff --git a/netconfig/.api b/netconfig/.api
index b7db259..541bb14 100644
--- a/netconfig/.api
+++ b/netconfig/.api
@@ -3,12 +3,9 @@
 pkg netconfig, func IsDefaultIPRoute(*IPRoute) bool
 pkg netconfig, func IsDefaultIPv4Route(*IPRoute) bool
 pkg netconfig, func IsDefaultIPv6Route(*IPRoute) bool
-pkg netconfig, func NewNetConfigWatcher() (NetConfigWatcher, error)
+pkg netconfig, func NotifyChange() (<-chan struct{}, error)
 pkg netconfig, type IPRoute struct
 pkg netconfig, type IPRoute struct, Gateway net.IP
 pkg netconfig, type IPRoute struct, IfcIndex int
 pkg netconfig, type IPRoute struct, Net net.IPNet
 pkg netconfig, type IPRoute struct, PreferredSource net.IP
-pkg netconfig, type NetConfigWatcher interface { Channel, Stop }
-pkg netconfig, type NetConfigWatcher interface, Channel() <-chan struct{}
-pkg netconfig, type NetConfigWatcher interface, Stop()
diff --git a/netconfig/example_test.go b/netconfig/example_test.go
index a3b822c..b84d631 100644
--- a/netconfig/example_test.go
+++ b/netconfig/example_test.go
@@ -12,13 +12,13 @@
 )
 
 func ExampleNetConfigWatcher() {
-	w, err := netconfig.NewNetConfigWatcher()
-	if err != nil {
-		log.Fatalf("oops: %s", err)
-	}
-	fmt.Println("Do something to your network. You should see one or more dings.")
 	for {
-		<-w.Channel()
-		fmt.Println("ding")
+		ch, err := netconfig.NotifyChange()
+		if err != nil {
+			log.Fatalf("oops: %s", err)
+		}
+		fmt.Println("Do something to your network.")
+		<-ch
+		fmt.Println("Network configuration changed.")
 	}
 }
diff --git a/netconfig/ipaux_bsd.go b/netconfig/ipaux_bsd.go
index d8bae90..f3cc79a 100644
--- a/netconfig/ipaux_bsd.go
+++ b/netconfig/ipaux_bsd.go
@@ -14,10 +14,9 @@
 
 import (
 	"errors"
+	"fmt"
 	"net"
-	"sync"
 	"syscall"
-	"time"
 
 	"v.io/x/lib/vlog"
 )
@@ -27,82 +26,29 @@
 */
 import "C"
 
-type bsdNetConfigWatcher struct {
-	sync.Mutex
-	t       *time.Timer
-	c       chan struct{}
-	s       int
-	stopped bool
-}
-
-func (w *bsdNetConfigWatcher) Stop() {
-	w.Lock()
-	defer w.Unlock()
-	if w.stopped {
-		return
-	}
-	w.stopped = true
-	if w.t != nil {
-		w.t.Stop()
-		w.t = nil
-	}
-	syscall.Close(w.s)
-}
-
-func (w *bsdNetConfigWatcher) Channel() <-chan struct{} {
-	return w.c
-}
-
-// NewNetConfigWatcher returns a watcher that sends a message
-// on the Channel() whenever the config changes.
-func NewNetConfigWatcher() (NetConfigWatcher, error) {
+func (n *notifier) initLocked() error {
 	s, err := syscall.Socket(C.PF_ROUTE, syscall.SOCK_RAW, syscall.AF_UNSPEC)
 	if err != nil {
-		vlog.Infof("socket failed: %s", err)
-		return nil, err
+		return fmt.Errorf("socket(PF_ROUTE, SOCK_RAW, AF_UNSPEC) failed: %v", err)
 	}
-	w := &bsdNetConfigWatcher{c: make(chan struct{}, 1), s: s}
-	go w.watcher()
-	return w, nil
+	go watcher(n, s)
+	return nil
 }
 
-func (w *bsdNetConfigWatcher) ding() {
-	w.Lock()
-	defer w.Unlock()
-	w.t = nil
-	if w.stopped {
-		return
-	}
-	// Don't let us hang in the lock.  The default is safe because the requirement
-	// is that the client get a message after the last config change.  Since this is
-	// a queued chan, we really don't have to stuff anything in it if there's already
-	// something there.
-	select {
-	case w.c <- struct{}{}:
-	default:
-	}
-}
-
-func (w *bsdNetConfigWatcher) watcher() {
-	defer func() {
-		w.Stop()
-		close(w.c)
-	}()
-
-	// Loop waiting for messages.
+func watcher(n *notifier, sock int) {
+	defer syscall.Close(sock)
+	buf := make([]byte, 4096)
 	for {
-		b := make([]byte, 4096)
-		nr, err := syscall.Read(w.s, b)
+		nr, err := syscall.Read(sock, buf)
 		if err != nil {
+			vlog.Infof("read(%d) on an PF_ROUTE socket failed: %v", sock, err)
 			return
 		}
-		msgs, err := syscall.ParseRoutingMessage(b[:nr])
+		msgs, err := syscall.ParseRoutingMessage(buf[:nr])
 		if err != nil {
-			vlog.Infof("Couldn't parse: %s", err)
+			vlog.Infof("ParseRoutingMessage failed: %s", err)
 			continue
 		}
-
-		// Decode the addresses.
 		for _, m := range msgs {
 			switch m.(type) {
 			case *syscall.InterfaceMessage:
@@ -111,17 +57,8 @@
 			default:
 				continue
 			}
-			// Changing networks usually spans many seconds and involves
-			// multiple network config changes.  We add histeresis by
-			// setting an alarm when the first change is detected and
-			// not informing the client till the alarm goes off.
-			// NOTE(p): I chose 3 seconds because that covers all the
-			// events involved in moving from one wifi network to another.
-			w.Lock()
-			if w.t == nil && !w.stopped {
-				w.t = time.AfterFunc(3*time.Second, w.ding)
-			}
-			w.Unlock()
+			n.ding()
+			break
 		}
 	}
 }
diff --git a/netconfig/ipaux_linux.go b/netconfig/ipaux_linux.go
index c2c3d5a..26a422d 100644
--- a/netconfig/ipaux_linux.go
+++ b/netconfig/ipaux_linux.go
@@ -14,9 +14,7 @@
 	"errors"
 	"fmt"
 	"net"
-	"sync"
 	"syscall"
-	"time"
 	"unsafe"
 
 	"v.io/x/lib/vlog"
@@ -255,87 +253,36 @@
 	return m, nil
 }
 
-type rtnetlinkWatcher struct {
-	sync.Mutex
-	t       *time.Timer
-	c       chan struct{}
-	s       int
-	stopped bool
-}
-
-func (w *rtnetlinkWatcher) Stop() {
-	w.Lock()
-	defer w.Unlock()
-	if w.stopped {
-		return
-	}
-	if w.t != nil {
-		w.t.Stop()
-		w.t = nil
-	}
-	w.stopped = true
-	syscall.Close(w.s)
-}
-
-func (w *rtnetlinkWatcher) Channel() <-chan struct{} {
-	return w.c
-}
-
 const (
 	GROUPS = C.RTMGRP_LINK | C.RTMGRP_IPV4_IFADDR | C.RTMGRP_IPV4_MROUTE | C.RTMGRP_IPV4_ROUTE | C.RTMGRP_IPV6_IFADDR | C.RTMGRP_IPV6_MROUTE | C.RTMGRP_IPV6_ROUTE | C.RTMGRP_NOTIFY
 )
 
-// NewNetConfigWatcher returns a watcher that wakes up anyone
-// calling the Wait routine whenever the configuration changes.
-func NewNetConfigWatcher() (NetConfigWatcher, error) {
+func (n *notifier) initLocked() error {
 	s, err := syscall.Socket(syscall.AF_NETLINK, syscall.SOCK_RAW, syscall.NETLINK_ROUTE)
 	if err != nil {
-		vlog.Infof("netconfig socket failed: %s", err)
-		return nil, err
+		return fmt.Errorf("socket(AF_NETLINK, SOCK_RAW, NETLINK_ROUTE) failed: %v", err)
 	}
 
 	lsa := &syscall.SockaddrNetlink{Family: syscall.AF_NETLINK, Groups: GROUPS}
 	if err := syscall.Bind(s, lsa); err != nil {
-		vlog.Infof("netconfig bind failed: %s", err)
-		return nil, err
+		syscall.Close(s)
+		return fmt.Errorf("bind(%d, {AF_NETLINK, 0x%x}) failed: %v", s, lsa.Groups)
 	}
-
-	w := &rtnetlinkWatcher{c: make(chan struct{}, 1), s: s}
-	go w.watcher()
-	return w, nil
+	go watcher(n, s)
+	return nil
 }
 
-func (w *rtnetlinkWatcher) ding() {
-	w.Lock()
-	defer w.Unlock()
-	w.t = nil
-	if w.stopped {
-		return
-	}
-	// Don't let us hang in the lock.  The default is safe because the requirement
-	// is that the client get a message after the last config change.  Since this is
-	// a queued chan, we really don't have to stuff anything in it if there's already
-	// something there.
-	select {
-	case w.c <- struct{}{}:
-	default:
-	}
-}
-
-func (w *rtnetlinkWatcher) watcher() {
-	defer func() {
-		w.Stop()
-		close(w.c)
-	}()
+func watcher(n *notifier, sock int) {
+	defer syscall.Close(sock)
 	var newAddrs []net.IP
+	buf := make([]byte, 4096)
 	for {
-		rb := make([]byte, 4096)
-		nr, _, err := syscall.Recvfrom(w.s, rb, 0)
+		nr, _, err := syscall.Recvfrom(sock, buf, 0)
 		if err != nil {
-			break
+			vlog.Infof("recvfrom(%d) on an AF_NETLINK socket failed: %v", sock, err)
+			return
 		}
-		rb = rb[:nr]
-		msgs, err := syscall.ParseNetlinkMessage(rb)
+		msgs, err := syscall.ParseNetlinkMessage(buf[:nr])
 		if err != nil {
 			vlog.Infof("ParseNetlinkMessage failed: %s", err)
 			continue
@@ -362,17 +309,7 @@
 			} else {
 				continue
 			}
-			// Changing networks usually spans many seconds and involves
-			// multiple network config changes.  We add histeresis by
-			// setting an alarm when the first change is detected and
-			// not informing the client till the alarm goes off.
-			// NOTE(p): I chose 3 seconds because that covers all the
-			// events involved in moving from one wifi network to another.
-			w.Lock()
-			if w.t == nil && !w.stopped {
-				w.t = time.AfterFunc(3*time.Second, w.ding)
-			}
-			w.Unlock()
+			n.ding()
 		}
 	}
 }
diff --git a/netconfig/ipaux_other.go b/netconfig/ipaux_other.go
index 2d549e5..460b5aa 100644
--- a/netconfig/ipaux_other.go
+++ b/netconfig/ipaux_other.go
@@ -15,40 +15,14 @@
 	"time"
 )
 
-type timerNetConfigWatcher struct {
-	c    chan struct{} // channel to signal confg changes
-	stop chan struct{} // channel to tell the watcher to stop
-}
-
-func (w *timerNetConfigWatcher) Stop() {
-	w.stop <- struct{}{}
-}
-
-func (w *timerNetConfigWatcher) Channel() <-chan struct{} {
-	return w.c
-}
-
-func (w *timerNetConfigWatcher) watcher() {
-	for {
-		select {
-		case <-w.stop:
-			close(w.c)
-			return
-		case <-time.NewTimer(2 * time.Minute).C:
-			select {
-			case w.c <- struct{}{}:
-			default:
-			}
+func (n *notifier) initLocked() error {
+	go func() {
+		ticker := time.Tick(2 * time.Minute)
+		for range ticker {
+			n.ding()
 		}
-	}
-}
-
-func NewNetConfigWatcher() (NetConfigWatcher, error) {
-	w := &timerNetConfigWatcher{}
-	w.c = make(chan struct{})
-	w.stop = make(chan struct{}, 1)
-	go w.watcher()
-	return w, nil
+	}()
+	return nil
 }
 
 func GetIPRoutes(defaultOnly bool) []*IPRoute {
diff --git a/netconfig/ipaux_test.go b/netconfig/ipaux_test.go
index e57cacd..fa63472 100644
--- a/netconfig/ipaux_test.go
+++ b/netconfig/ipaux_test.go
@@ -8,14 +8,12 @@
 	"testing"
 )
 
-func TestNetConfigWatcherStop(t *testing.T) {
-	w, err := NewNetConfigWatcher()
+func TestNotifyChange(t *testing.T) {
+	ch, err := NotifyChange()
 	if err != nil {
 		t.Fatal(err)
 	}
-	w.Stop()
-	// The channel should eventually be closed when the watcher exits.
-	// If it doesn't close, then this test will run into a timeout.
-	for range w.Channel() {
+	if ch == nil {
+		t.Fatalf("Expected non-nil channel")
 	}
 }
diff --git a/netconfig/model.go b/netconfig/model.go
index cbd0f8f..55a4ed3 100644
--- a/netconfig/model.go
+++ b/netconfig/model.go
@@ -10,20 +10,20 @@
 
 import (
 	"net"
+	"sync"
+	"time"
 )
 
-// NetConfigWatcher sends on channel whenever an interface or interface address
-// is added or deleted.
-type NetConfigWatcher interface {
-	// Stop watching.
-	Stop()
+var globalNotifier notifier
 
-	// A channel that returns an item whenever the network addresses or
-	// interfaces have changed. It is up to the caller to reread the
-	// network configuration in such cases.
-	//
-	// The channel will be closed when the watcher exits.
-	Channel() <-chan struct{}
+// NotifyChange returns a channel that will be closed when the network
+// configuration changes from the time this function was invoked.
+//
+// This may provide false positivies, i.e., a network change
+// will cause the channel to be closed but a channel closure
+// may not imply a network change.
+func NotifyChange() (<-chan struct{}, error) {
+	return globalNotifier.add()
 }
 
 // IPRoute represents a route in the kernel's routing table.
@@ -34,3 +34,48 @@
 	PreferredSource net.IP
 	IfcIndex        int
 }
+
+type notifier struct {
+	sync.Mutex
+	ch    chan struct{}
+	timer *time.Timer
+
+	initErr error
+	inited  bool
+}
+
+func (n *notifier) add() (<-chan struct{}, error) {
+	n.Lock()
+	defer n.Unlock()
+	if !n.inited {
+		n.ch = make(chan struct{})
+		n.initErr = n.initLocked()
+		n.inited = true
+	}
+	if n.initErr != nil {
+		return nil, n.initErr
+	}
+	return n.ch, nil
+}
+
+func (n *notifier) ding() {
+	// Changing networks usually spans many seconds and involves
+	// multiple network config changes.  We add histeresis by
+	// setting an alarm when the first change is detected and
+	// not informing the client till the alarm goes off.
+	// NOTE(p): I chose 3 seconds because that covers all the
+	// events involved in moving from one wifi network to another.
+	n.Lock()
+	if n.timer == nil {
+		n.timer = time.AfterFunc(3*time.Second, n.resetChan)
+	}
+	n.Unlock()
+}
+
+func (n *notifier) resetChan() {
+	n.Lock()
+	close(n.ch)
+	n.ch = make(chan struct{})
+	n.timer = nil
+	n.Unlock()
+}