ref: Implement roaming.

The flow manager now handles roaming.
Changes:
(1) The server is notified to requery ListeningEndpoints when a chan
returned from ListeningEndpoints is closed.
(2) Roaming logic has been refactored into a library.
(3) WatchNetwork and UnwatchNetwork have been deprecated and replaced
by status.Valid that is a channel that is closed when the
Status needs to be requeried by callers.

MultiPart: 2/2

Change-Id: I5e34876205e1017ae8be00ca5bb731fa0a94a5a2
diff --git a/runtime/factories/roaming/roaming.go b/runtime/factories/roaming/roaming.go
index a015e18..7a2703d 100644
--- a/runtime/factories/roaming/roaming.go
+++ b/runtime/factories/roaming/roaming.go
@@ -16,9 +16,6 @@
 import (
 	"flag"
 
-	"v.io/x/lib/netconfig"
-	"v.io/x/lib/netstate"
-
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/flow"
@@ -34,9 +31,9 @@
 	_ "v.io/x/ref/runtime/internal/flow/protocols/ws"
 	_ "v.io/x/ref/runtime/internal/flow/protocols/wsh"
 	"v.io/x/ref/runtime/internal/lib/appcycle"
+	"v.io/x/ref/runtime/internal/lib/roaming"
 	"v.io/x/ref/runtime/internal/lib/websocket"
 	"v.io/x/ref/runtime/internal/lib/xwebsocket"
-	irpc "v.io/x/ref/runtime/internal/rpc"
 	"v.io/x/ref/runtime/internal/rt"
 	"v.io/x/ref/services/debug/debuglib"
 
@@ -46,11 +43,6 @@
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
 )
 
-const (
-	SettingsStreamName = "roaming"
-	SettingsStreamDesc = "pubsub stream used by the roaming RuntimeFactory"
-)
-
 var commonFlags *flags.Flags
 
 func init() {
@@ -87,102 +79,22 @@
 
 	publisher := pubsub.NewPublisher()
 
-	// Create stream in Init function to avoid a race between any
-	// goroutines started here and consumers started after Init returns.
-	ch := make(chan pubsub.Setting)
-	// TODO(cnicolaou): use stop to shutdown this stream when the RuntimeFactory shutdowns.
-	stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
+	// TODO(suharshs): We can remove the SettingName argument after the transition to new RPC.
+	runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, roaming.RoamingSetting, commonFlags.RuntimeFlags(), reservedDispatcher)
 	if err != nil {
 		ishutdown()
 		return nil, nil, nil, err
 	}
 
-	prev, err := netstate.GetAccessibleIPs()
+	stopRoaming, err := roaming.CreateRoamingStream(ctx, publisher, listenSpec)
 	if err != nil {
-		ishutdown()
 		return nil, nil, nil, err
 	}
 
-	// Start the dhcp watcher.
-	watcher, err := netconfig.NewNetConfigWatcher()
-	if err != nil {
-		ishutdown()
-		return nil, nil, nil, err
-	}
-
-	cleanupCh := make(chan struct{})
-	watcherCh := make(chan struct{})
-
-	runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
-	if err != nil {
-		ishutdown()
-		return nil, nil, nil, err
-	}
-
-	go monitorNetworkSettingsX(runtime, ctx, watcher, prev, stop, cleanupCh, watcherCh, ch)
 	runtimeFactoryShutdown := func() {
-		close(cleanupCh)
 		ishutdown()
 		shutdown()
-		<-watcherCh
+		stopRoaming()
 	}
 	return runtime, ctx, runtimeFactoryShutdown, nil
 }
-
-// monitorNetworkSettings will monitor network configuration changes and
-// publish subsequent Settings to reflect any changes detected.
-func monitorNetworkSettingsX(
-	runtime *rt.Runtime,
-	ctx *context.T,
-	watcher netconfig.NetConfigWatcher,
-	prev netstate.AddrList,
-	pubStop, cleanup <-chan struct{},
-	watcherLoop chan<- struct{},
-	ch chan<- pubsub.Setting) {
-	defer close(ch)
-
-	listenSpec := runtime.GetListenSpec(ctx)
-
-	// TODO(cnicolaou): add support for listening on multiple network addresses.
-
-done:
-	for {
-		select {
-		case <-watcher.Channel():
-			netstate.InvalidateCache()
-			cur, err := netstate.GetAccessibleIPs()
-			if err != nil {
-				ctx.Errorf("failed to read network state: %s", err)
-				continue
-			}
-			removed := netstate.FindRemoved(prev, cur)
-			added := netstate.FindAdded(prev, cur)
-			ctx.VI(2).Infof("Previous: %d: %s", len(prev), prev)
-			ctx.VI(2).Infof("Current : %d: %s", len(cur), cur)
-			ctx.VI(2).Infof("Added   : %d: %s", len(added), added)
-			ctx.VI(2).Infof("Removed : %d: %s", len(removed), removed)
-			if len(removed) == 0 && len(added) == 0 {
-				ctx.VI(2).Infof("Network event that lead to no address changes since our last 'baseline'")
-				continue
-			}
-			if len(removed) > 0 {
-				ctx.VI(2).Infof("Sending removed: %s", removed)
-				ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
-			}
-			// We will always send the best currently available address
-			if chosen, err := listenSpec.AddressChooser.ChooseAddresses(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
-				ctx.VI(2).Infof("Sending added and chosen: %s", chosen)
-				ch <- irpc.NewNewAddrsSetting(chosen)
-			} else {
-				ctx.VI(2).Infof("Ignoring added %s", added)
-			}
-			prev = cur
-		case <-cleanup:
-			break done
-		case <-pubStop:
-			goto done
-		}
-	}
-	watcher.Stop()
-	close(watcherLoop)
-}