ref: Implement roaming.

The flow manager now handles roaming.
Changes:
(1) The server is notified to requery ListeningEndpoints when a chan
returned from ListeningEndpoints is closed.
(2) Roaming logic has been refactored into a library.
(3) WatchNetwork and UnwatchNetwork have been deprecated and replaced
by status.Valid that is a channel that is closed when the
Status needs to be requeried by callers.

MultiPart: 2/2

Change-Id: I5e34876205e1017ae8be00ca5bb731fa0a94a5a2
diff --git a/runtime/factories/roaming/roaming.go b/runtime/factories/roaming/roaming.go
index a015e18..7a2703d 100644
--- a/runtime/factories/roaming/roaming.go
+++ b/runtime/factories/roaming/roaming.go
@@ -16,9 +16,6 @@
 import (
 	"flag"
 
-	"v.io/x/lib/netconfig"
-	"v.io/x/lib/netstate"
-
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/flow"
@@ -34,9 +31,9 @@
 	_ "v.io/x/ref/runtime/internal/flow/protocols/ws"
 	_ "v.io/x/ref/runtime/internal/flow/protocols/wsh"
 	"v.io/x/ref/runtime/internal/lib/appcycle"
+	"v.io/x/ref/runtime/internal/lib/roaming"
 	"v.io/x/ref/runtime/internal/lib/websocket"
 	"v.io/x/ref/runtime/internal/lib/xwebsocket"
-	irpc "v.io/x/ref/runtime/internal/rpc"
 	"v.io/x/ref/runtime/internal/rt"
 	"v.io/x/ref/services/debug/debuglib"
 
@@ -46,11 +43,6 @@
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
 )
 
-const (
-	SettingsStreamName = "roaming"
-	SettingsStreamDesc = "pubsub stream used by the roaming RuntimeFactory"
-)
-
 var commonFlags *flags.Flags
 
 func init() {
@@ -87,102 +79,22 @@
 
 	publisher := pubsub.NewPublisher()
 
-	// Create stream in Init function to avoid a race between any
-	// goroutines started here and consumers started after Init returns.
-	ch := make(chan pubsub.Setting)
-	// TODO(cnicolaou): use stop to shutdown this stream when the RuntimeFactory shutdowns.
-	stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
+	// TODO(suharshs): We can remove the SettingName argument after the transition to new RPC.
+	runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, roaming.RoamingSetting, commonFlags.RuntimeFlags(), reservedDispatcher)
 	if err != nil {
 		ishutdown()
 		return nil, nil, nil, err
 	}
 
-	prev, err := netstate.GetAccessibleIPs()
+	stopRoaming, err := roaming.CreateRoamingStream(ctx, publisher, listenSpec)
 	if err != nil {
-		ishutdown()
 		return nil, nil, nil, err
 	}
 
-	// Start the dhcp watcher.
-	watcher, err := netconfig.NewNetConfigWatcher()
-	if err != nil {
-		ishutdown()
-		return nil, nil, nil, err
-	}
-
-	cleanupCh := make(chan struct{})
-	watcherCh := make(chan struct{})
-
-	runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
-	if err != nil {
-		ishutdown()
-		return nil, nil, nil, err
-	}
-
-	go monitorNetworkSettingsX(runtime, ctx, watcher, prev, stop, cleanupCh, watcherCh, ch)
 	runtimeFactoryShutdown := func() {
-		close(cleanupCh)
 		ishutdown()
 		shutdown()
-		<-watcherCh
+		stopRoaming()
 	}
 	return runtime, ctx, runtimeFactoryShutdown, nil
 }
-
-// monitorNetworkSettings will monitor network configuration changes and
-// publish subsequent Settings to reflect any changes detected.
-func monitorNetworkSettingsX(
-	runtime *rt.Runtime,
-	ctx *context.T,
-	watcher netconfig.NetConfigWatcher,
-	prev netstate.AddrList,
-	pubStop, cleanup <-chan struct{},
-	watcherLoop chan<- struct{},
-	ch chan<- pubsub.Setting) {
-	defer close(ch)
-
-	listenSpec := runtime.GetListenSpec(ctx)
-
-	// TODO(cnicolaou): add support for listening on multiple network addresses.
-
-done:
-	for {
-		select {
-		case <-watcher.Channel():
-			netstate.InvalidateCache()
-			cur, err := netstate.GetAccessibleIPs()
-			if err != nil {
-				ctx.Errorf("failed to read network state: %s", err)
-				continue
-			}
-			removed := netstate.FindRemoved(prev, cur)
-			added := netstate.FindAdded(prev, cur)
-			ctx.VI(2).Infof("Previous: %d: %s", len(prev), prev)
-			ctx.VI(2).Infof("Current : %d: %s", len(cur), cur)
-			ctx.VI(2).Infof("Added   : %d: %s", len(added), added)
-			ctx.VI(2).Infof("Removed : %d: %s", len(removed), removed)
-			if len(removed) == 0 && len(added) == 0 {
-				ctx.VI(2).Infof("Network event that lead to no address changes since our last 'baseline'")
-				continue
-			}
-			if len(removed) > 0 {
-				ctx.VI(2).Infof("Sending removed: %s", removed)
-				ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
-			}
-			// We will always send the best currently available address
-			if chosen, err := listenSpec.AddressChooser.ChooseAddresses(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
-				ctx.VI(2).Infof("Sending added and chosen: %s", chosen)
-				ch <- irpc.NewNewAddrsSetting(chosen)
-			} else {
-				ctx.VI(2).Infof("Ignoring added %s", added)
-			}
-			prev = cur
-		case <-cleanup:
-			break done
-		case <-pubStop:
-			goto done
-		}
-	}
-	watcher.Stop()
-	close(watcherLoop)
-}
diff --git a/runtime/factories/roaming/roaming_server.go b/runtime/factories/roaming/roaming_server.go
deleted file mode 100644
index b8e5170..0000000
--- a/runtime/factories/roaming/roaming_server.go
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build ignore
-
-package main
-
-import (
-	"fmt"
-
-	"v.io/v23"
-	"v.io/v23/rpc"
-	_ "v.io/x/ref/runtime/factories/roaming"
-)
-
-func main() {
-	ctx, shutdown := v23.Init()
-	defer shutdown()
-
-	ctx, server, err := v23.WithNewServer(ctx, "roamer", &dummy{}, nil)
-	if err != nil {
-		ctx.Fatalf("unexpected error: %q", err)
-	}
-	watcher := make(chan rpc.NetworkChange, 1)
-	server.WatchNetwork(watcher)
-
-	for {
-		status := server.Status()
-		fmt.Printf("Endpoints: %d created:\n", len(status.Endpoints))
-		for _, ep := range status.Endpoints {
-			fmt.Printf("  %s\n", ep)
-		}
-		fmt.Printf("Mounts: %d mounts:\n", len(status.Mounts))
-		for _, ms := range status.Mounts {
-			fmt.Printf("  %s\n", ms)
-		}
-		change := <-watcher
-		fmt.Printf("Network change: %s", change.DebugString())
-	}
-}
-
-type dummy struct{}
-
-func (d *dummy) Echo(call rpc.ServerCall, arg string) (string, error) {
-	return arg, nil
-}