Merge branch 'master' into discovery

Change-Id: Ic9c22b988ef0b87d19fa9842ee57826a4ee5457b
diff --git a/lib/discovery/util/advertise.go b/lib/discovery/util/advertise.go
index 5835552..93d6a40 100644
--- a/lib/discovery/util/advertise.go
+++ b/lib/discovery/util/advertise.go
@@ -27,13 +27,9 @@
 		service.InstanceUuid = idiscovery.NewInstanceUUID()
 	}
 
-	watcher := make(chan rpc.NetworkChange, 3)
-	server.WatchNetwork(watcher)
-
-	stop, err := advertise(ctx, service, getEndpoints(server), suffix, visibility)
+	eps, valid := getEndpoints(server)
+	stop, err := advertise(ctx, service, eps, suffix, visibility)
 	if err != nil {
-		server.UnwatchNetwork(watcher)
-		close(watcher)
 		return nil, err
 	}
 
@@ -41,17 +37,16 @@
 	go func() {
 		for {
 			select {
-			case <-watcher:
+			case <-valid:
 				if stop != nil {
 					stop() // Stop the previous advertisement.
 				}
-				stop, err = advertise(ctx, service, getEndpoints(server), suffix, visibility)
+				eps, valid = getEndpoints(server)
+				stop, err = advertise(ctx, service, eps, suffix, visibility)
 				if err != nil {
 					ctx.Error(err)
 				}
 			case <-ctx.Done():
-				server.UnwatchNetwork(watcher)
-				close(watcher)
 				close(done)
 				return
 			}
@@ -81,11 +76,11 @@
 }
 
 // TODO(suharshs): Use server.Status().Endpoints only when migrating to a new server.
-func getEndpoints(server rpc.Server) []naming.Endpoint {
+func getEndpoints(server rpc.Server) ([]naming.Endpoint, <-chan struct{}) {
 	status := server.Status()
 	eps := status.Endpoints
 	for _, p := range status.Proxies {
 		eps = append(eps, p.Endpoint)
 	}
-	return eps
+	return eps, status.Valid
 }
diff --git a/lib/discovery/util/advertise_test.go b/lib/discovery/util/advertise_test.go
index b3ff757..96b55de 100644
--- a/lib/discovery/util/advertise_test.go
+++ b/lib/discovery/util/advertise_test.go
@@ -7,6 +7,7 @@
 import (
 	"fmt"
 	"reflect"
+	"sync"
 	"testing"
 	"time"
 
@@ -25,40 +26,37 @@
 )
 
 type mockServer struct {
-	eps             []naming.Endpoint
-	watcher         chan<- rpc.NetworkChange
-	watcherClosedCh chan struct{}
+	mu    sync.Mutex
+	eps   []naming.Endpoint
+	valid chan struct{}
 }
 
-func (s *mockServer) AddName(string) error     { return nil }
-func (s *mockServer) RemoveName(string)        {}
-func (s *mockServer) Stop() error              { return nil }
-func (s *mockServer) Closed() <-chan struct{}  { return nil }
-func (s *mockServer) Status() rpc.ServerStatus { return rpc.ServerStatus{Endpoints: s.eps} }
-
-func (s *mockServer) WatchNetwork(ch chan<- rpc.NetworkChange) {
-	s.watcher = ch
-	s.watcherClosedCh = make(chan struct{})
-}
-
-func (s *mockServer) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
-	s.watcher = nil
-	close(s.watcherClosedCh)
-}
-
-func (s *mockServer) updateNetwork(eps []naming.Endpoint) {
-	s.eps = eps
-	if s.watcher != nil {
-		s.watcher <- rpc.NetworkChange{Changed: eps}
+func (s *mockServer) AddName(string) error    { return nil }
+func (s *mockServer) RemoveName(string)       {}
+func (s *mockServer) Stop() error             { return nil }
+func (s *mockServer) Closed() <-chan struct{} { return nil }
+func (s *mockServer) Status() rpc.ServerStatus {
+	defer s.mu.Unlock()
+	s.mu.Lock()
+	return rpc.ServerStatus{
+		Endpoints: s.eps,
+		Valid:     s.valid,
 	}
 }
 
-func (s *mockServer) watcherClosed() <-chan struct{} {
-	return s.watcherClosedCh
+func (s *mockServer) updateNetwork(eps []naming.Endpoint) {
+	defer s.mu.Unlock()
+	s.mu.Lock()
+	s.eps = eps
+	close(s.valid)
+	s.valid = make(chan struct{})
 }
 
 func newMockServer(eps []naming.Endpoint) *mockServer {
-	return &mockServer{eps: eps}
+	return &mockServer{
+		eps:   eps,
+		valid: make(chan struct{}),
+	}
 }
 
 func newEndpoints(addrs ...string) []naming.Endpoint {
@@ -84,7 +82,6 @@
 	eps := newEndpoints("addr1:123")
 	mock := newMockServer(eps)
 
-	ctx, cancel := context.WithCancel(ctx)
 	util.AdvertiseServer(ctx, mock, suffix, service, nil)
 	if err := scanAndMatch(ctx, service, eps, suffix); err != nil {
 		t.Error(err)
@@ -101,16 +98,6 @@
 			t.Error(err)
 		}
 	}
-
-	// Make sure that the network watcher is unregistered when the context
-	// is canceled.
-	cancel()
-
-	select {
-	case <-mock.watcherClosed():
-	case <-time.After(3 * time.Second):
-		t.Error("watcher not closed")
-	}
 }
 
 func TestNetworkChangeInstanceUuid(t *testing.T) {
diff --git a/runtime/factories/android/android.go b/runtime/factories/android/android.go
index 8704541..759c752 100644
--- a/runtime/factories/android/android.go
+++ b/runtime/factories/android/android.go
@@ -16,9 +16,6 @@
 	"flag"
 	"os"
 
-	"v.io/x/lib/netconfig"
-	"v.io/x/lib/netstate"
-
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/discovery"
@@ -37,9 +34,9 @@
 	_ "v.io/x/ref/runtime/internal/flow/protocols/ws"
 	_ "v.io/x/ref/runtime/internal/flow/protocols/wsh"
 	"v.io/x/ref/runtime/internal/lib/appcycle"
+	"v.io/x/ref/runtime/internal/lib/roaming"
 	"v.io/x/ref/runtime/internal/lib/websocket"
 	"v.io/x/ref/runtime/internal/lib/xwebsocket"
-	irpc "v.io/x/ref/runtime/internal/rpc"
 	"v.io/x/ref/runtime/internal/rt"
 	"v.io/x/ref/services/debug/debuglib"
 
@@ -50,11 +47,6 @@
 	"v.io/x/ref/lib/discovery/plugins/mdns"
 )
 
-const (
-	SettingsStreamName = "roaming"
-	SettingsStreamDesc = "pubsub stream used by the roaming RuntimeFactory"
-)
-
 var commonFlags *flags.Flags
 
 var bleCreator func(string) (idiscovery.Plugin, error)
@@ -93,107 +85,26 @@
 
 	publisher := pubsub.NewPublisher()
 
-	// Create stream in Init function to avoid a race between any
-	// goroutines started here and consumers started after Init returns.
-	ch := make(chan pubsub.Setting)
-	// TODO(cnicolaou): use stop to shutdown this stream when the RuntimeFactory shutdowns.
-	stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
+	// TODO(suharshs): We can remove the SettingName argument after the transition to new RPC.
+	runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, roaming.RoamingSetting, commonFlags.RuntimeFlags(), reservedDispatcher)
 	if err != nil {
 		ishutdown()
 		return nil, nil, nil, err
 	}
 
-	prev, err := netstate.GetAccessibleIPs()
+	stopRoaming, err := roaming.CreateRoamingStream(ctx, publisher, listenSpec)
 	if err != nil {
-		ishutdown()
 		return nil, nil, nil, err
 	}
 
-	// Start the dhcp watcher.
-	watcher, err := netconfig.NewNetConfigWatcher()
-	if err != nil {
-		ishutdown()
-		return nil, nil, nil, err
-	}
-
-	cleanupCh := make(chan struct{})
-	watcherCh := make(chan struct{})
-
-	runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
-	if err != nil {
-		ishutdown()
-		return nil, nil, nil, err
-	}
-
-	go monitorNetworkSettingsX(runtime, ctx, watcher, prev, stop, cleanupCh, watcherCh, ch)
 	runtimeFactoryShutdown := func() {
-		close(cleanupCh)
 		ishutdown()
 		shutdown()
-		<-watcherCh
+		stopRoaming()
 	}
 	return runtime, ctx, runtimeFactoryShutdown, nil
 }
 
-// monitorNetworkSettings will monitor network configuration changes and
-// publish subsequent Settings to reflect any changes detected.
-func monitorNetworkSettingsX(
-	runtime *rt.Runtime,
-	ctx *context.T,
-	watcher netconfig.NetConfigWatcher,
-	prev netstate.AddrList,
-	pubStop, cleanup <-chan struct{},
-	watcherLoop chan<- struct{},
-	ch chan<- pubsub.Setting) {
-	defer close(ch)
-
-	listenSpec := runtime.GetListenSpec(ctx)
-
-	// TODO(cnicolaou): add support for listening on multiple network addresses.
-
-done:
-	for {
-		select {
-		case <-watcher.Channel():
-			netstate.InvalidateCache()
-			cur, err := netstate.GetAccessibleIPs()
-			if err != nil {
-				ctx.Errorf("failed to read network state: %s", err)
-				continue
-			}
-			removed := netstate.FindRemoved(prev, cur)
-			added := netstate.FindAdded(prev, cur)
-			ctx.VI(2).Infof("Previous: %d: %s", len(prev), prev)
-			ctx.VI(2).Infof("Current : %d: %s", len(cur), cur)
-			ctx.VI(2).Infof("Added   : %d: %s", len(added), added)
-			ctx.VI(2).Infof("Removed : %d: %s", len(removed), removed)
-			if len(removed) == 0 && len(added) == 0 {
-				ctx.VI(2).Infof("Network event that lead to no address changes since our last 'baseline'")
-				continue
-			}
-			if len(removed) > 0 {
-				ctx.VI(2).Infof("Sending removed: %s", removed)
-				ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
-			}
-			// We will always send the best currently available address
-			if chosen, err := listenSpec.AddressChooser.ChooseAddresses(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
-				ctx.VI(2).Infof("Sending added and chosen: %s", chosen)
-				ch <- irpc.NewNewAddrsSetting(chosen)
-			} else {
-				ctx.VI(2).Infof("Ignoring added %s", added)
-			}
-			prev = cur
-		case <-cleanup:
-			break done
-		case <-pubStop:
-			goto done
-		}
-	}
-	watcher.Stop()
-	close(watcherLoop)
-}
-
-
 func createDiscovery() (discovery.T, error) {
 	plugins := make([]idiscovery.Plugin, 2)
 	host, _ := os.Hostname()
diff --git a/runtime/factories/roaming/roaming.go b/runtime/factories/roaming/roaming.go
index a015e18..7a2703d 100644
--- a/runtime/factories/roaming/roaming.go
+++ b/runtime/factories/roaming/roaming.go
@@ -16,9 +16,6 @@
 import (
 	"flag"
 
-	"v.io/x/lib/netconfig"
-	"v.io/x/lib/netstate"
-
 	"v.io/v23"
 	"v.io/v23/context"
 	"v.io/v23/flow"
@@ -34,9 +31,9 @@
 	_ "v.io/x/ref/runtime/internal/flow/protocols/ws"
 	_ "v.io/x/ref/runtime/internal/flow/protocols/wsh"
 	"v.io/x/ref/runtime/internal/lib/appcycle"
+	"v.io/x/ref/runtime/internal/lib/roaming"
 	"v.io/x/ref/runtime/internal/lib/websocket"
 	"v.io/x/ref/runtime/internal/lib/xwebsocket"
-	irpc "v.io/x/ref/runtime/internal/rpc"
 	"v.io/x/ref/runtime/internal/rt"
 	"v.io/x/ref/services/debug/debuglib"
 
@@ -46,11 +43,6 @@
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
 )
 
-const (
-	SettingsStreamName = "roaming"
-	SettingsStreamDesc = "pubsub stream used by the roaming RuntimeFactory"
-)
-
 var commonFlags *flags.Flags
 
 func init() {
@@ -87,102 +79,22 @@
 
 	publisher := pubsub.NewPublisher()
 
-	// Create stream in Init function to avoid a race between any
-	// goroutines started here and consumers started after Init returns.
-	ch := make(chan pubsub.Setting)
-	// TODO(cnicolaou): use stop to shutdown this stream when the RuntimeFactory shutdowns.
-	stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
+	// TODO(suharshs): We can remove the SettingName argument after the transition to new RPC.
+	runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, roaming.RoamingSetting, commonFlags.RuntimeFlags(), reservedDispatcher)
 	if err != nil {
 		ishutdown()
 		return nil, nil, nil, err
 	}
 
-	prev, err := netstate.GetAccessibleIPs()
+	stopRoaming, err := roaming.CreateRoamingStream(ctx, publisher, listenSpec)
 	if err != nil {
-		ishutdown()
 		return nil, nil, nil, err
 	}
 
-	// Start the dhcp watcher.
-	watcher, err := netconfig.NewNetConfigWatcher()
-	if err != nil {
-		ishutdown()
-		return nil, nil, nil, err
-	}
-
-	cleanupCh := make(chan struct{})
-	watcherCh := make(chan struct{})
-
-	runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
-	if err != nil {
-		ishutdown()
-		return nil, nil, nil, err
-	}
-
-	go monitorNetworkSettingsX(runtime, ctx, watcher, prev, stop, cleanupCh, watcherCh, ch)
 	runtimeFactoryShutdown := func() {
-		close(cleanupCh)
 		ishutdown()
 		shutdown()
-		<-watcherCh
+		stopRoaming()
 	}
 	return runtime, ctx, runtimeFactoryShutdown, nil
 }
-
-// monitorNetworkSettings will monitor network configuration changes and
-// publish subsequent Settings to reflect any changes detected.
-func monitorNetworkSettingsX(
-	runtime *rt.Runtime,
-	ctx *context.T,
-	watcher netconfig.NetConfigWatcher,
-	prev netstate.AddrList,
-	pubStop, cleanup <-chan struct{},
-	watcherLoop chan<- struct{},
-	ch chan<- pubsub.Setting) {
-	defer close(ch)
-
-	listenSpec := runtime.GetListenSpec(ctx)
-
-	// TODO(cnicolaou): add support for listening on multiple network addresses.
-
-done:
-	for {
-		select {
-		case <-watcher.Channel():
-			netstate.InvalidateCache()
-			cur, err := netstate.GetAccessibleIPs()
-			if err != nil {
-				ctx.Errorf("failed to read network state: %s", err)
-				continue
-			}
-			removed := netstate.FindRemoved(prev, cur)
-			added := netstate.FindAdded(prev, cur)
-			ctx.VI(2).Infof("Previous: %d: %s", len(prev), prev)
-			ctx.VI(2).Infof("Current : %d: %s", len(cur), cur)
-			ctx.VI(2).Infof("Added   : %d: %s", len(added), added)
-			ctx.VI(2).Infof("Removed : %d: %s", len(removed), removed)
-			if len(removed) == 0 && len(added) == 0 {
-				ctx.VI(2).Infof("Network event that lead to no address changes since our last 'baseline'")
-				continue
-			}
-			if len(removed) > 0 {
-				ctx.VI(2).Infof("Sending removed: %s", removed)
-				ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
-			}
-			// We will always send the best currently available address
-			if chosen, err := listenSpec.AddressChooser.ChooseAddresses(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
-				ctx.VI(2).Infof("Sending added and chosen: %s", chosen)
-				ch <- irpc.NewNewAddrsSetting(chosen)
-			} else {
-				ctx.VI(2).Infof("Ignoring added %s", added)
-			}
-			prev = cur
-		case <-cleanup:
-			break done
-		case <-pubStop:
-			goto done
-		}
-	}
-	watcher.Stop()
-	close(watcherLoop)
-}
diff --git a/runtime/factories/roaming/roaming_server.go b/runtime/factories/roaming/roaming_server.go
deleted file mode 100644
index b8e5170..0000000
--- a/runtime/factories/roaming/roaming_server.go
+++ /dev/null
@@ -1,47 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-// +build ignore
-
-package main
-
-import (
-	"fmt"
-
-	"v.io/v23"
-	"v.io/v23/rpc"
-	_ "v.io/x/ref/runtime/factories/roaming"
-)
-
-func main() {
-	ctx, shutdown := v23.Init()
-	defer shutdown()
-
-	ctx, server, err := v23.WithNewServer(ctx, "roamer", &dummy{}, nil)
-	if err != nil {
-		ctx.Fatalf("unexpected error: %q", err)
-	}
-	watcher := make(chan rpc.NetworkChange, 1)
-	server.WatchNetwork(watcher)
-
-	for {
-		status := server.Status()
-		fmt.Printf("Endpoints: %d created:\n", len(status.Endpoints))
-		for _, ep := range status.Endpoints {
-			fmt.Printf("  %s\n", ep)
-		}
-		fmt.Printf("Mounts: %d mounts:\n", len(status.Mounts))
-		for _, ms := range status.Mounts {
-			fmt.Printf("  %s\n", ms)
-		}
-		change := <-watcher
-		fmt.Printf("Network change: %s", change.DebugString())
-	}
-}
-
-type dummy struct{}
-
-func (d *dummy) Echo(call rpc.ServerCall, arg string) (string, error) {
-	return arg, nil
-}
diff --git a/runtime/internal/flow/manager/manager.go b/runtime/internal/flow/manager/manager.go
index 01ebcc8..aa41575 100644
--- a/runtime/internal/flow/manager/manager.go
+++ b/runtime/internal/flow/manager/manager.go
@@ -19,9 +19,12 @@
 	"v.io/v23/naming"
 	"v.io/v23/security"
 
+	"v.io/x/lib/netstate"
+	"v.io/x/ref/lib/pubsub"
 	iflow "v.io/x/ref/runtime/internal/flow"
 	"v.io/x/ref/runtime/internal/flow/conn"
 	"v.io/x/ref/runtime/internal/flow/protocols/bidi"
+	"v.io/x/ref/runtime/internal/lib/roaming"
 	"v.io/x/ref/runtime/internal/lib/upcqueue"
 	inaming "v.io/x/ref/runtime/internal/naming"
 	"v.io/x/ref/runtime/internal/rpc/version"
@@ -43,7 +46,27 @@
 	serverNames     []string
 }
 
-func NewWithBlessings(ctx *context.T, serverBlessings security.Blessings, rid naming.RoutingID) flow.Manager {
+type listenState struct {
+	q             *upcqueue.T
+	listenLoops   sync.WaitGroup
+	dhcpPublisher *pubsub.Publisher
+
+	mu             sync.Mutex
+	stopProxy      chan struct{}
+	listeners      []flow.Listener
+	endpoints      []*endpointState
+	proxyEndpoints []naming.Endpoint
+	notifyWatchers chan struct{}
+	roaming        bool
+}
+
+type endpointState struct {
+	leps         []*inaming.Endpoint // the list of currently active endpoints.
+	tmplEndpoint *inaming.Endpoint   // endpoint used as a template for creating new endpoints from the network interfaces provided from roaming.
+	roaming      bool
+}
+
+func NewWithBlessings(ctx *context.T, serverBlessings security.Blessings, rid naming.RoutingID, dhcpPublisher *pubsub.Publisher) flow.Manager {
 	m := &manager{
 		rid:    rid,
 		closed: make(chan struct{}),
@@ -54,9 +77,11 @@
 		m.serverBlessings = serverBlessings
 		m.serverNames = security.BlessingNames(v23.GetPrincipal(ctx), serverBlessings)
 		m.ls = &listenState{
-			q:         upcqueue.New(),
-			listeners: []flow.Listener{},
-			stopProxy: make(chan struct{}),
+			q:              upcqueue.New(),
+			listeners:      []flow.Listener{},
+			stopProxy:      make(chan struct{}),
+			notifyWatchers: make(chan struct{}),
+			dhcpPublisher:  dhcpPublisher,
 		}
 	}
 	go func() {
@@ -78,22 +103,12 @@
 	return m
 }
 
-func New(ctx *context.T, rid naming.RoutingID) flow.Manager {
+func New(ctx *context.T, rid naming.RoutingID, dhcpPublisher *pubsub.Publisher) flow.Manager {
 	var serverBlessings security.Blessings
 	if rid != naming.NullRoutingID {
 		serverBlessings = v23.GetPrincipal(ctx).BlessingStore().Default()
 	}
-	return NewWithBlessings(ctx, serverBlessings, rid)
-}
-
-type listenState struct {
-	q           *upcqueue.T
-	listenLoops sync.WaitGroup
-
-	mu        sync.Mutex
-	stopProxy chan struct{}
-	listeners []flow.Listener
-	endpoints []naming.Endpoint
+	return NewWithBlessings(ctx, serverBlessings, rid, dhcpPublisher)
 }
 
 func (m *manager) stopListening() {
@@ -108,6 +123,10 @@
 		close(m.ls.stopProxy)
 		m.ls.stopProxy = nil
 	}
+	if m.ls.notifyWatchers != nil {
+		close(m.ls.notifyWatchers)
+		m.ls.notifyWatchers = nil
+	}
 	m.ls.mu.Unlock()
 	for _, ln := range listeners {
 		ln.Close()
@@ -143,6 +162,7 @@
 		RID:       m.rid,
 		Blessings: m.serverNames,
 	}
+	defer m.ls.mu.Unlock()
 	m.ls.mu.Lock()
 	if m.ls.listeners == nil {
 		m.ls.mu.Unlock()
@@ -150,29 +170,143 @@
 		return flow.NewErrBadState(ctx, NewErrManagerClosed(ctx))
 	}
 	m.ls.listeners = append(m.ls.listeners, ln)
-	m.ls.endpoints = append(m.ls.endpoints, local)
-	m.ls.mu.Unlock()
+	leps, roam, err := m.createEndpoints(ctx, local)
+	if err != nil {
+		return iflow.MaybeWrapError(flow.ErrBadArg, ctx, err)
+	}
+	m.ls.endpoints = append(m.ls.endpoints, &endpointState{
+		leps:         leps,
+		tmplEndpoint: local,
+		roaming:      roam,
+	})
+	if !m.ls.roaming && m.ls.dhcpPublisher != nil && roam {
+		m.ls.roaming = true
+		m.ls.listenLoops.Add(1)
+		go func() {
+			roaming.ReadRoamingStream(ctx, m.ls.dhcpPublisher, m.rmAddrs, m.addAddrs)
+			m.ls.listenLoops.Done()
+		}()
+	}
 
 	m.ls.listenLoops.Add(1)
 	go m.lnAcceptLoop(ctx, ln, local)
 	return nil
 }
 
+func (m *manager) createEndpoints(ctx *context.T, lep naming.Endpoint) ([]*inaming.Endpoint, bool, error) {
+	iep := lep.(*inaming.Endpoint)
+	if !strings.HasPrefix(iep.Protocol, "tcp") &&
+		!strings.HasPrefix(iep.Protocol, "ws") {
+		// If not tcp, ws, or wsh, just return the endpoint we were given.
+		return []*inaming.Endpoint{iep}, false, nil
+	}
+	host, port, err := net.SplitHostPort(iep.Address)
+	if err != nil {
+		return nil, false, err
+	}
+	chooser := v23.GetListenSpec(ctx).AddressChooser
+	addrs, unspecified, err := netstate.PossibleAddresses(iep.Protocol, host, chooser)
+	if err != nil {
+		return nil, false, err
+	}
+	ieps := make([]*inaming.Endpoint, 0, len(addrs))
+	for _, addr := range addrs {
+		n, err := inaming.NewEndpoint(lep.String())
+		if err != nil {
+			return nil, false, err
+		}
+		n.Address = net.JoinHostPort(addr.String(), port)
+		ieps = append(ieps, n)
+	}
+	return ieps, unspecified, nil
+}
+
+func (m *manager) addAddrs(addrs []net.Addr) {
+	defer m.ls.mu.Unlock()
+	m.ls.mu.Lock()
+	changed := false
+	for _, addr := range netstate.ConvertToAddresses(addrs) {
+		if !netstate.IsAccessibleIP(addr) {
+			continue
+		}
+		host, _ := getHostPort(addr)
+		for _, epState := range m.ls.endpoints {
+			if !epState.roaming {
+				continue
+			}
+			tmplEndpoint := epState.tmplEndpoint
+			_, port := getHostPort(tmplEndpoint.Addr())
+			if i := findEndpoint(epState, host); i < 0 {
+				nep := *tmplEndpoint
+				nep.Address = net.JoinHostPort(host, port)
+				epState.leps = append(epState.leps, &nep)
+				changed = true
+			}
+		}
+	}
+	if changed && m.ls.notifyWatchers != nil {
+		close(m.ls.notifyWatchers)
+		m.ls.notifyWatchers = make(chan struct{})
+	}
+}
+
+func findEndpoint(epState *endpointState, host string) int {
+	for i, ep := range epState.leps {
+		epHost, _ := getHostPort(ep.Addr())
+		if epHost == host {
+			return i
+		}
+	}
+	return -1
+}
+
+func (m *manager) rmAddrs(addrs []net.Addr) {
+	defer m.ls.mu.Unlock()
+	m.ls.mu.Lock()
+	changed := false
+	for _, addr := range netstate.ConvertToAddresses(addrs) {
+		host, _ := getHostPort(addr)
+		for _, epState := range m.ls.endpoints {
+			if !epState.roaming {
+				continue
+			}
+			if i := findEndpoint(epState, host); i >= 0 {
+				n := len(epState.leps) - 1
+				epState.leps[i], epState.leps[n] = epState.leps[n], nil
+				epState.leps = epState.leps[:n]
+				changed = true
+			}
+		}
+	}
+	if changed && m.ls.notifyWatchers != nil {
+		close(m.ls.notifyWatchers)
+		m.ls.notifyWatchers = make(chan struct{})
+	}
+}
+
+func getHostPort(address net.Addr) (string, string) {
+	host, port, err := net.SplitHostPort(address.String())
+	if err == nil {
+		return host, port
+	}
+	return address.String(), ""
+}
+
 // ProxyListen causes the Manager to accept flows from the specified endpoint.
 // The endpoint must correspond to a vanadium proxy.
 //
 // update gets passed the complete set of endpoints for the proxy every time it
 // is called.
-func (m *manager) ProxyListen(ctx *context.T, ep naming.Endpoint, update func([]naming.Endpoint)) error {
+func (m *manager) ProxyListen(ctx *context.T, ep naming.Endpoint) error {
 	if m.ls == nil {
 		return NewErrListeningWithNullRid(ctx)
 	}
 	m.ls.listenLoops.Add(1)
-	go m.connectToProxy(ctx, ep, update)
+	go m.connectToProxy(ctx, ep)
 	return nil
 }
 
-func (m *manager) connectToProxy(ctx *context.T, ep naming.Endpoint, update func([]naming.Endpoint)) {
+func (m *manager) connectToProxy(ctx *context.T, ep naming.Endpoint) {
 	defer m.ls.listenLoops.Done()
 	var eps []naming.Endpoint
 	for delay := reconnectDelay; ; delay *= 2 {
@@ -207,19 +341,52 @@
 		for i := range eps {
 			eps[i].(*inaming.Endpoint).Blessings = m.serverNames
 		}
-		update(eps)
+		m.updateProxyEndpoints(eps)
 		select {
 		case <-ctx.Done():
 			return
 		case <-m.ls.stopProxy:
 			return
 		case <-f.Closed():
-			update(nil)
+			m.updateProxyEndpoints(nil)
 			delay = reconnectDelay
 		}
 	}
 }
 
+func (m *manager) updateProxyEndpoints(eps []naming.Endpoint) {
+	defer m.ls.mu.Unlock()
+	m.ls.mu.Lock()
+	if endpointsEqual(m.ls.proxyEndpoints, eps) {
+		return
+	}
+	m.ls.proxyEndpoints = eps
+	// The proxy endpoints have changed so we need to notify any watchers to
+	// requery ListeningEndpoints.
+	if m.ls.notifyWatchers != nil {
+		close(m.ls.notifyWatchers)
+		m.ls.notifyWatchers = make(chan struct{})
+	}
+}
+
+func endpointsEqual(a, b []naming.Endpoint) bool {
+	if len(a) != len(b) {
+		return false
+	}
+	m := make(map[string]struct{})
+	for _, ep := range a {
+		m[ep.String()] = struct{}{}
+	}
+	for _, ep := range b {
+		key := ep.String()
+		if _, ok := m[key]; !ok {
+			return false
+		}
+		delete(m, key)
+	}
+	return len(m) == 0
+}
+
 func (m *manager) readProxyResponse(ctx *context.T, f flow.Flow) ([]naming.Endpoint, error) {
 	b, err := f.ReadMsg()
 	if err != nil {
@@ -361,18 +528,24 @@
 // If the Manager is not listening on any endpoints, an endpoint with the
 // Manager's RoutingID will be returned for use in bidirectional RPC.
 // Returned endpoints all have the Manager's unique RoutingID.
-func (m *manager) ListeningEndpoints() (out []naming.Endpoint) {
+func (m *manager) ListeningEndpoints() (out []naming.Endpoint, changed <-chan struct{}) {
 	if m.ls == nil {
-		return nil
+		return nil, nil
 	}
 	m.ls.mu.Lock()
-	out = make([]naming.Endpoint, len(m.ls.endpoints))
-	copy(out, m.ls.endpoints)
+	out = make([]naming.Endpoint, len(m.ls.proxyEndpoints))
+	copy(out, m.ls.proxyEndpoints)
+	for _, epState := range m.ls.endpoints {
+		for _, ep := range epState.leps {
+			out = append(out, ep)
+		}
+	}
+	changed = m.ls.notifyWatchers
 	m.ls.mu.Unlock()
 	if len(out) == 0 {
 		out = append(out, &inaming.Endpoint{Protocol: bidi.Name, RID: m.rid})
 	}
-	return out
+	return out, changed
 }
 
 // Accept blocks until a new Flow has been initiated by a remote process.
diff --git a/runtime/internal/flow/manager/manager_test.go b/runtime/internal/flow/manager/manager_test.go
index 6a949f8..60f9a2a 100644
--- a/runtime/internal/flow/manager/manager_test.go
+++ b/runtime/internal/flow/manager/manager_test.go
@@ -31,11 +31,11 @@
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
 
-	am := New(ctx, naming.FixedRoutingID(0x5555))
+	am := New(ctx, naming.FixedRoutingID(0x5555), nil)
 	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
-	dm := New(ctx, naming.FixedRoutingID(0x1111))
+	dm := New(ctx, naming.FixedRoutingID(0x1111), nil)
 
 	testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
 
@@ -48,12 +48,12 @@
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
 
-	am := New(ctx, naming.FixedRoutingID(0x5555))
+	am := New(ctx, naming.FixedRoutingID(0x5555), nil)
 	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
 
-	dm := New(ctx, naming.FixedRoutingID(0x1111))
+	dm := New(ctx, naming.FixedRoutingID(0x1111), nil)
 	// At first the cache should be empty.
 	if got, want := len(dm.(*manager).cache.addrCache), 0; got != want {
 		t.Fatalf("got cache size %v, want %v", got, want)
@@ -83,12 +83,12 @@
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
 
-	am := New(ctx, naming.FixedRoutingID(0x5555))
+	am := New(ctx, naming.FixedRoutingID(0x5555), nil)
 	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
 
-	dm := New(ctx, naming.FixedRoutingID(0x1111))
+	dm := New(ctx, naming.FixedRoutingID(0x1111), nil)
 	testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
 	// Now am should be able to make a flow to dm even though dm is not listening.
 	testFlows(t, ctx, am, dm, flowtest.AllowAllPeersAuthorizer{})
@@ -102,18 +102,18 @@
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
 
-	am := New(ctx, naming.FixedRoutingID(0x5555))
+	am := New(ctx, naming.FixedRoutingID(0x5555), nil)
 	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
-	nulldm := New(ctx, naming.NullRoutingID)
+	nulldm := New(ctx, naming.NullRoutingID, nil)
 	_, af := testFlows(t, ctx, nulldm, am, flowtest.AllowAllPeersAuthorizer{})
 	// Ensure that the remote blessings of the underlying conn of the accepted blessings
 	// only has the public key of the client and no certificates.
 	if rBlessings := af.Conn().(*conn.Conn).RemoteBlessings(); len(rBlessings.String()) > 0 || rBlessings.PublicKey() == nil {
 		t.Errorf("got %v, want no-cert blessings", rBlessings)
 	}
-	dm := New(ctx, naming.FixedRoutingID(0x1111))
+	dm := New(ctx, naming.FixedRoutingID(0x1111), nil)
 	_, af = testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
 	// Ensure that the remote blessings of the underlying conn of the accepted flow are
 	// non-zero if we did specify a RoutingID.
@@ -131,16 +131,18 @@
 	defer goroutines.NoLeaks(t, leakWaitTime)()
 	ctx, shutdown := v23.Init()
 
-	am := New(ctx, naming.FixedRoutingID(0x5555))
+	am := New(ctx, naming.FixedRoutingID(0x5555), nil)
 	if err := am.Listen(ctx, "tcp", "127.0.0.1:0"); err != nil {
 		t.Fatal(err)
 	}
-	dm := New(ctx, naming.FixedRoutingID(0x1111))
+	dm := New(ctx, naming.FixedRoutingID(0x1111), nil)
 	testFlows(t, ctx, dm, am, flowtest.AllowAllPeersAuthorizer{})
 
+	leps, _ := am.ListeningEndpoints()
+	lameEP := leps[0]
 	am.StopListening(ctx)
 
-	if f, err := dm.Dial(ctx, am.ListeningEndpoints()[0], flowtest.AllowAllPeersAuthorizer{}); err == nil {
+	if f, err := dm.Dial(ctx, lameEP, flowtest.AllowAllPeersAuthorizer{}); err == nil {
 		t.Errorf("dialing a lame duck should fail, but didn't %#v.", f)
 	}
 
@@ -150,7 +152,8 @@
 }
 
 func testFlows(t *testing.T, ctx *context.T, dm, am flow.Manager, auth flow.PeerAuthorizer) (df, af flow.Flow) {
-	ep := am.ListeningEndpoints()[0]
+	eps, _ := am.ListeningEndpoints()
+	ep := eps[0]
 	var err error
 	df, err = dm.Dial(ctx, ep, auth)
 	if err != nil {
diff --git a/runtime/internal/lib/roaming/roaming.go b/runtime/internal/lib/roaming/roaming.go
new file mode 100644
index 0000000..c455332
--- /dev/null
+++ b/runtime/internal/lib/roaming/roaming.go
@@ -0,0 +1,145 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package roaming
+
+import (
+	"net"
+
+	"v.io/x/lib/netconfig"
+	"v.io/x/lib/netstate"
+	"v.io/x/ref/lib/pubsub"
+
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+)
+
+const (
+	RoamingSetting         = "roaming"
+	RoamingSettingDesc     = "pubsub stream used by the roaming RuntimeFactory"
+	UpdateAddrsSetting     = "NewAddrs"
+	UpdateAddrsSettingDesc = "Addresses that have been available since last change"
+	RmAddrsSetting         = "RmAddrs"
+	RmAddrsSettingDesc     = "Addresses that have been removed since last change"
+)
+
+// CreateRoamingStream creates a stream that monitors network configuration
+// changes and publishes sebsequent Settings to reflect any changes detected.
+// A synchronous shutdown function is returned.
+func CreateRoamingStream(ctx *context.T, publisher *pubsub.Publisher, listenSpec rpc.ListenSpec) (func(), error) {
+	ch := make(chan pubsub.Setting)
+	stop, err := publisher.CreateStream(RoamingSetting, RoamingSettingDesc, ch)
+	if err != nil {
+		return nil, err
+	}
+	prev, err := netstate.GetAccessibleIPs()
+	if err != nil {
+		return nil, err
+	}
+	watcher, err := netconfig.NewNetConfigWatcher()
+	if err != nil {
+		return nil, err
+	}
+	done := make(chan struct{})
+	go func() {
+		defer close(ch)
+		for {
+			select {
+			case <-ctx.Done():
+				return
+			case <-done:
+				return
+			case <-stop:
+				return
+			case <-watcher.Channel():
+				netstate.InvalidateCache()
+				cur, err := netstate.GetAccessibleIPs()
+				if err != nil {
+					ctx.Errorf("failed to read network state: %v", err)
+					continue
+				}
+				removed := netstate.FindRemoved(prev, cur)
+				added := netstate.FindAdded(prev, cur)
+				ctx.VI(2).Infof("Previous: %d: %s", len(prev), prev)
+				ctx.VI(2).Infof("Current : %d: %s", len(cur), cur)
+				ctx.VI(2).Infof("Added   : %d: %s", len(added), added)
+				ctx.VI(2).Infof("Removed : %d: %s", len(removed), removed)
+				if len(removed) > 0 {
+					ctx.VI(2).Infof("Sending removed: %s", removed)
+					ch <- NewRmAddrsSetting(removed.AsNetAddrs())
+				}
+				if chosen, err := listenSpec.AddressChooser.ChooseAddresses(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && len(chosen) > 0 {
+					ctx.VI(2).Infof("Sending added and chosen: %s", chosen)
+					ch <- NewUpdateAddrsSetting(chosen)
+				}
+				prev = cur
+			}
+		}
+	}()
+	shutdown := func() {
+		close(done)
+		watcher.Stop()
+	}
+	return shutdown, nil
+}
+
+// ReadRoamingStream reads updates from the roaming RuntimeFactory. Updates are
+// passed as the argument to 'update'.
+// 'remove' gets passed the net.Addrs that are no longer being listened on.
+// 'add' gets passed the net.Addrs that are newly being listened on.
+func ReadRoamingStream(ctx *context.T, pub *pubsub.Publisher, remove, add func([]net.Addr)) {
+	ch := make(chan pubsub.Setting, 10)
+	_, err := pub.ForkStream(RoamingSetting, ch)
+	if err != nil {
+		ctx.Errorf("error forking stream:", err)
+		return
+	}
+	for {
+		select {
+		case <-ctx.Done():
+			if err := pub.CloseFork(RoamingSetting, ch); err == nil {
+				drain(ch)
+			}
+			return
+		case setting := <-ch:
+			if setting == nil {
+				continue
+			}
+			addrs, ok := setting.Value().([]net.Addr)
+			if !ok {
+				ctx.Errorf("unhandled setting type %T", addrs)
+				continue
+			}
+			switch setting.Name() {
+			case UpdateAddrsSetting:
+				add(addrs)
+			case RmAddrsSetting:
+				remove(addrs)
+			}
+		}
+	}
+}
+
+func drain(ch chan pubsub.Setting) {
+	for {
+		select {
+		case <-ch:
+		default:
+			close(ch)
+			return
+		}
+	}
+}
+
+// NewUpdateAddrsSetting creates the Setting to be sent to Listen to inform
+// it of all addresses that habe become available since the last change.
+func NewUpdateAddrsSetting(a []net.Addr) pubsub.Setting {
+	return pubsub.NewAny(UpdateAddrsSetting, UpdateAddrsSettingDesc, a)
+}
+
+// NewRmAddrsSetting creates the Setting to be sent to Listen to inform
+// it of addresses that are no longer available.
+func NewRmAddrsSetting(a []net.Addr) pubsub.Setting {
+	return pubsub.NewAny(RmAddrsSetting, RmAddrsSettingDesc, a)
+}
diff --git a/runtime/internal/rpc/benchmark/simple/main.go b/runtime/internal/rpc/benchmark/simple/main.go
index fb30f0a..1b450ac 100644
--- a/runtime/internal/rpc/benchmark/simple/main.go
+++ b/runtime/internal/rpc/benchmark/simple/main.go
@@ -56,7 +56,7 @@
 	b.ResetTimer()
 	for i := 0; i < b.N; i++ {
 		if ref.RPCTransitionState() >= ref.XServers {
-			m := fmanager.New(nctx, naming.FixedRoutingID(0xc))
+			m := fmanager.New(nctx, naming.FixedRoutingID(0xc), nil)
 			b.StartTimer()
 			_, err := m.Dial(nctx, serverEP, flowtest.AllowAllPeersAuthorizer{})
 			if err != nil {
diff --git a/runtime/internal/rpc/pubsub.go b/runtime/internal/rpc/pubsub.go
deleted file mode 100644
index 03f1e67..0000000
--- a/runtime/internal/rpc/pubsub.go
+++ /dev/null
@@ -1,30 +0,0 @@
-// Copyright 2015 The Vanadium Authors. All rights reserved.
-// Use of this source code is governed by a BSD-style
-// license that can be found in the LICENSE file.
-
-package rpc
-
-import (
-	"net"
-
-	"v.io/x/ref/lib/pubsub"
-)
-
-// NewNewAddrsSetting creates the Setting to be sent to Listen to inform
-// it of all addresses that habe become available since the last change.
-func NewNewAddrsSetting(a []net.Addr) pubsub.Setting {
-	return pubsub.NewAny(NewAddrsSetting, NewAddrsSettingDesc, a)
-}
-
-// NewRmAddrsSetting creates the Setting to be sent to Listen to inform
-// it of addresses that are no longer available.
-func NewRmAddrsSetting(a []net.Addr) pubsub.Setting {
-	return pubsub.NewAny(RmAddrsSetting, RmAddrsSettingDesc, a)
-}
-
-const (
-	NewAddrsSetting     = "NewAddrs"
-	NewAddrsSettingDesc = "Addresses that have been available since last change"
-	RmAddrsSetting      = "RmAddrs"
-	RmAddrsSettingDesc  = "Addresses that have been removed since last change"
-)
diff --git a/runtime/internal/rpc/roaming_test.go b/runtime/internal/rpc/roaming_test.go
index d192ec9..e96ac7a 100644
--- a/runtime/internal/rpc/roaming_test.go
+++ b/runtime/internal/rpc/roaming_test.go
@@ -8,15 +8,20 @@
 	"net"
 	"reflect"
 	"sort"
+	"strings"
 	"testing"
 	"time"
 
 	"v.io/v23"
+	"v.io/v23/context"
 	"v.io/v23/naming"
 	"v.io/v23/rpc"
 	"v.io/x/lib/netstate"
 	"v.io/x/lib/set"
+	"v.io/x/ref"
 	"v.io/x/ref/lib/pubsub"
+	"v.io/x/ref/runtime/factories/fake"
+	"v.io/x/ref/runtime/internal/lib/roaming"
 	inaming "v.io/x/ref/runtime/internal/naming"
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/tcp"
 	_ "v.io/x/ref/runtime/internal/rpc/protocols/ws"
@@ -26,7 +31,114 @@
 	"v.io/x/ref/test/testutil"
 )
 
-// TODO(mattr): Transition this to using public API.
+func TestRoamingNew(t *testing.T) {
+	if ref.RPCTransitionState() < ref.XServers {
+		t.Skip("This test only runs under the new rpc system.")
+	}
+	ctx, shutdown := v23.Init()
+	defer shutdown()
+
+	ctx = fake.SetClientFactory(ctx, func(ctx *context.T, opts ...rpc.ClientOpt) rpc.Client {
+		return NewXClient(ctx, v23.GetNamespace(ctx), opts...)
+	})
+
+	publisher := pubsub.NewPublisher()
+	ch := make(chan pubsub.Setting)
+	stop, err := publisher.CreateStream(roaming.RoamingSetting, roaming.RoamingSettingDesc, ch)
+	if err != nil {
+		t.Fatal(err)
+	}
+	defer func() { publisher.Shutdown(); <-stop }()
+
+	ipv4And6 := netstate.AddressChooserFunc(func(network string, addrs []net.Addr) ([]net.Addr, error) {
+		accessible := netstate.ConvertToAddresses(addrs)
+		ipv4 := accessible.Filter(netstate.IsUnicastIPv4)
+		ipv6 := accessible.Filter(netstate.IsUnicastIPv6)
+		return append(ipv4.AsNetAddrs(), ipv6.AsNetAddrs()...), nil
+	})
+	spec := rpc.ListenSpec{
+		Addrs: rpc.ListenAddrs{
+			{"tcp", "*:0"}, // an invalid address.
+			{"tcp", ":0"},
+			{"tcp", ":0"},
+		},
+		AddressChooser: ipv4And6,
+	}
+	sctx := v23.WithListenSpec(ctx, spec)
+	sctx, _ = v23.WithPrincipal(sctx, testutil.NewPrincipal("test"))
+	sctx, server, err := WithNewServer(sctx, "foo", &testServer{}, nil, publisher)
+	if err != nil {
+		t.Fatal(err)
+	}
+	status := server.Status()
+	prevEps := status.Endpoints
+
+	n1 := netstate.NewNetAddr("ip", "1.1.1.1")
+	n2 := netstate.NewNetAddr("ip", "2.2.2.2")
+
+	change := status.Valid
+
+	ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1, n2})
+	// We should be notified of a network change.
+	<-change
+	status = server.Status()
+	eps := status.Endpoints
+	change = status.Valid
+	// We expect 4 new endpoints, 2 for each valid listen call.
+	if got, want := len(eps), len(prevEps)+4; got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
+	// We expect the added networks to be in the new endpoints.
+	if got, want := len(filterEndpointsByHost(eps, "1.1.1.1")), 2; got != want {
+		t.Errorf("got %v, wanted %v endpoints with host 1.1.1.1")
+	}
+	if got, want := len(filterEndpointsByHost(eps, "2.2.2.2")), 2; got != want {
+		t.Errorf("got %v, wanted %v endpoints with host 2.2.2.2")
+	}
+	prevEps = eps
+
+	// Now remove a network.
+	ch <- roaming.NewRmAddrsSetting([]net.Addr{n1})
+	<-change
+	status = server.Status()
+	eps = status.Endpoints
+	change = status.Valid
+	// We expect 2 endpoints to be missing.
+	if got, want := len(eps), len(prevEps)-2; got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
+	// We expect the removed network to not be in the new endpoints.
+	if got, want := len(filterEndpointsByHost(eps, "1.1.1.1")), 0; got != want {
+		t.Errorf("got %v, wanted %v endpoints with host 1.1.1.1")
+	}
+	prevEps = eps
+
+	// Now remove everything, essentially "disconnected from the network"
+	ch <- roaming.NewRmAddrsSetting(getIPAddrs(prevEps))
+	<-change
+	status = server.Status()
+	eps = status.Endpoints
+	change = status.Valid
+	// We expect there to be only the bidi endpoint.
+	if got, want := len(eps), 1; got != want && eps[0].Addr().Network() != "bidi" {
+		t.Errorf("got %v, want %v", got, want)
+	}
+
+	// Now if we reconnect to a network it should should up.
+	ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1})
+	<-change
+	status = server.Status()
+	eps = status.Endpoints
+	// We expect 2 endpoints to be added
+	if got, want := len(eps), 2; got != want {
+		t.Errorf("got %v, want %v", got, want)
+	}
+	// We expect the removed network to not be in the new endpoints.
+	if got, want := len(filterEndpointsByHost(eps, "1.1.1.1")), 2; got != want {
+		t.Errorf("got %v, wanted %v endpoints with host 1.1.1.1")
+	}
+}
+
 func TestRoaming(t *testing.T) {
 	ctx, shutdown := initForTest()
 	defer shutdown()
@@ -35,19 +147,19 @@
 	ns := tnaming.NewSimpleNamespace()
 
 	publisher := pubsub.NewPublisher()
-	roaming := make(chan pubsub.Setting)
-	stop, err := publisher.CreateStream("TestRoaming", "TestRoaming", roaming)
+	ch := make(chan pubsub.Setting)
+	stop, err := publisher.CreateStream("TestRoaming", "TestRoaming", ch)
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer func() { publisher.Shutdown(); <-stop }()
 
 	nctx, _ := v23.WithPrincipal(ctx, testutil.NewPrincipal("test"))
-	server, err := testInternalNewServerWithPubsub(nctx, sm, ns, publisher, "TestRoaming")
+	s, err := testInternalNewServerWithPubsub(nctx, sm, ns, publisher, "TestRoaming")
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer server.Stop()
+	defer s.Stop()
 
 	ipv4And6 := netstate.AddressChooserFunc(func(network string, addrs []net.Addr) ([]net.Addr, error) {
 		accessible := netstate.ConvertToAddresses(addrs)
@@ -64,7 +176,7 @@
 		AddressChooser: ipv4And6,
 	}
 
-	eps, err := server.Listen(spec)
+	eps, err := s.Listen(spec)
 	if err != nil {
 		t.Fatal(err)
 	}
@@ -72,15 +184,16 @@
 		t.Fatal("no endpoints listened on.")
 	}
 
-	if err = server.Serve("foo", &testServer{}, nil); err != nil {
+	if err = s.Serve("foo", &testServer{}, nil); err != nil {
 		t.Fatal(err)
 	}
 	setLeafEndpoints(eps)
-	if err = server.AddName("bar"); err != nil {
+	if err = s.AddName("bar"); err != nil {
 		t.Fatal(err)
 	}
 
-	status := server.Status()
+	status := s.Status()
+	netChange := status.Valid
 	if got, want := status.Endpoints, eps; !cmpEndpoints(got, want) {
 		t.Fatalf("got %v, want %v", got, want)
 	}
@@ -89,11 +202,11 @@
 		t.Fatalf("got %d, want %d", got, want)
 	}
 
-	watcher := make(chan rpc.NetworkChange, 10)
-	server.WatchNetwork(watcher)
+	watcher := make(chan NetworkChange, 10)
+	s.(*server).WatchNetwork(watcher)
 	defer close(watcher)
 
-	waitForChange := func() *rpc.NetworkChange {
+	waitForChange := func() *NetworkChange {
 		ctx.Infof("Waiting on %p", watcher)
 		select {
 		case c := <-watcher:
@@ -108,7 +221,8 @@
 	n2 := netstate.NewNetAddr("ip", "2.2.2.2")
 	n3 := netstate.NewNetAddr("ip", "3.3.3.3")
 
-	roaming <- NewNewAddrsSetting([]net.Addr{n1, n2})
+	ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1, n2})
+	<-netChange
 
 	// We expect 2 added addrs and 4 changes, one for each IP per usable listen spec addr.
 	change := waitForChange()
@@ -127,7 +241,8 @@
 		nepsA = append(nepsA, []naming.Endpoint{nep1, nep2}...)
 	}
 
-	status = server.Status()
+	status = s.Status()
+	netChange = status.Valid
 	if got, want := status.Endpoints, nepsA; !cmpEndpoints(got, want) {
 		t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
 	}
@@ -140,9 +255,10 @@
 	}
 
 	// Mimic that n2 has been changed to n3. The network monitor will publish
-	// two AddrsSettings - RmAddrsSetting(n2) and NewNewAddrsSetting(n1, n3).
+	// two AddrsSettings - RmAddrsSetting(n2) and NewUpdateAddrsSetting(n1, n3).
 
-	roaming <- NewRmAddrsSetting([]net.Addr{n2})
+	ch <- roaming.NewRmAddrsSetting([]net.Addr{n2})
+	<-netChange
 
 	// We expect 1 removed addr and 2 changes, one for each usable listen spec addr.
 	change = waitForChange()
@@ -160,12 +276,14 @@
 		nepsR = append(nepsR, nep2)
 	}
 
-	status = server.Status()
+	status = s.Status()
+	netChange = status.Valid
 	if got, want := status.Endpoints, nepsR; !cmpEndpoints(got, want) {
 		t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
 	}
 
-	roaming <- NewNewAddrsSetting([]net.Addr{n1, n3})
+	ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1, n3})
+	<-netChange
 
 	// We expect 1 added addr and 2 changes, one for the new IP per usable listen spec addr.
 	change = waitForChange()
@@ -184,7 +302,8 @@
 		nepsC = append(nepsC, nep1, nep2)
 	}
 
-	status = server.Status()
+	status = s.Status()
+	netChange = status.Valid
 	if got, want := status.Endpoints, nepsC; !cmpEndpoints(got, want) {
 		t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
 	}
@@ -197,7 +316,8 @@
 	}
 
 	// Remove all addresses to mimic losing all connectivity.
-	roaming <- NewRmAddrsSetting(getIPAddrs(nepsC))
+	ch <- roaming.NewRmAddrsSetting(getIPAddrs(nepsC))
+	<-netChange
 
 	// We expect changes for all of the current endpoints
 	change = waitForChange()
@@ -205,12 +325,14 @@
 		t.Fatalf("got %d, want %d", got, want)
 	}
 
-	status = server.Status()
+	status = s.Status()
+	netChange = status.Valid
 	if got, want := len(status.Mounts), 0; got != want {
 		t.Fatalf("got %d, want %d: %v", got, want, status.Mounts)
 	}
 
-	roaming <- NewNewAddrsSetting([]net.Addr{n1})
+	ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1})
+	<-netChange
 	// We expect 2 changes, one for each usable listen spec addr.
 	change = waitForChange()
 	if got, want := len(change.Changed), 2; got != want {
@@ -226,30 +348,30 @@
 	ns := tnaming.NewSimpleNamespace()
 
 	publisher := pubsub.NewPublisher()
-	roaming := make(chan pubsub.Setting)
-	stop, err := publisher.CreateStream("TestWatcherDeadlock", "TestWatcherDeadlock", roaming)
+	ch := make(chan pubsub.Setting)
+	stop, err := publisher.CreateStream("TestWatcherDeadlock", "TestWatcherDeadlock", ch)
 	if err != nil {
 		t.Fatal(err)
 	}
 	defer func() { publisher.Shutdown(); <-stop }()
 
 	nctx, _ := v23.WithPrincipal(ctx, testutil.NewPrincipal("test"))
-	server, err := testInternalNewServerWithPubsub(nctx, sm, ns, publisher, "TestWatcherDeadlock")
+	s, err := testInternalNewServerWithPubsub(nctx, sm, ns, publisher, "TestWatcherDeadlock")
 	if err != nil {
 		t.Fatal(err)
 	}
-	defer server.Stop()
+	defer s.Stop()
 
 	spec := rpc.ListenSpec{
 		Addrs: rpc.ListenAddrs{
 			{"tcp", ":0"},
 		},
 	}
-	eps, err := server.Listen(spec)
+	eps, err := s.Listen(spec)
 	if err != nil {
 		t.Fatal(err)
 	}
-	if err = server.Serve("foo", &testServer{}, nil); err != nil {
+	if err = s.Serve("foo", &testServer{}, nil); err != nil {
 		t.Fatal(err)
 	}
 	setLeafEndpoints(eps)
@@ -257,17 +379,17 @@
 	// Set a watcher that we never read from - the intent is to make sure
 	// that the listener still listens to changes even though there is no
 	// goroutine to read from the watcher channel.
-	watcher := make(chan rpc.NetworkChange, 0)
-	server.WatchNetwork(watcher)
+	watcher := make(chan NetworkChange, 0)
+	s.(*server).WatchNetwork(watcher)
 	defer close(watcher)
 
 	// Remove all addresses to mimic losing all connectivity.
-	roaming <- NewRmAddrsSetting(getIPAddrs(eps))
+	ch <- roaming.NewRmAddrsSetting(getIPAddrs(eps))
 
 	// Add in two new addresses
 	n1 := netstate.NewNetAddr("ip", "1.1.1.1")
 	n2 := netstate.NewNetAddr("ip", "2.2.2.2")
-	roaming <- NewNewAddrsSetting([]net.Addr{n1, n2})
+	ch <- roaming.NewUpdateAddrsSetting([]net.Addr{n1, n2})
 
 	neps := make([]naming.Endpoint, 0, len(eps))
 	for _, p := range getUniqPorts(eps) {
@@ -277,7 +399,7 @@
 	}
 	then := time.Now()
 	for {
-		status := server.Status()
+		status := s.Status()
 		if got, want := status.Endpoints, neps; cmpEndpoints(got, want) {
 			break
 		}
@@ -310,6 +432,16 @@
 	return addrs
 }
 
+func filterEndpointsByHost(eps []naming.Endpoint, host string) []naming.Endpoint {
+	var filtered []naming.Endpoint
+	for _, ep := range eps {
+		if strings.Contains(ep.Addr().String(), host) {
+			filtered = append(filtered, ep)
+		}
+	}
+	return filtered
+}
+
 func cmpEndpoints(got, want []naming.Endpoint) bool {
 	if len(got) != len(want) {
 		return false
diff --git a/runtime/internal/rpc/server.go b/runtime/internal/rpc/server.go
index 80fa2bf..7cb149d 100644
--- a/runtime/internal/rpc/server.go
+++ b/runtime/internal/rpc/server.go
@@ -34,6 +34,7 @@
 	"v.io/x/ref/lib/pubsub"
 	"v.io/x/ref/lib/stats"
 	"v.io/x/ref/runtime/internal/lib/publisher"
+	"v.io/x/ref/runtime/internal/lib/roaming"
 	inaming "v.io/x/ref/runtime/internal/naming"
 	"v.io/x/ref/runtime/internal/rpc/stream"
 	"v.io/x/ref/runtime/internal/rpc/stream/manager"
@@ -151,7 +152,17 @@
 	stream    *pubsub.Stream
 	ch        chan pubsub.Setting // channel to receive dhcp settings over
 	err       error               // error status.
-	watchers  map[chan<- rpc.NetworkChange]struct{}
+	watchers  map[chan<- NetworkChange]struct{}
+	change    chan struct{}
+}
+
+type NetworkChange struct {
+	Time         time.Time         // Time of last change.
+	State        rpc.ServerState   // The current state of the server.
+	AddedAddrs   []net.Addr        // Addresses added sinced the last change.
+	RemovedAddrs []net.Addr        // Addresses removed since the last change.
+	Changed      []naming.Endpoint // The set of endpoints added/removed as a result of this change.
+	Error        error             // Any error that encountered.
 }
 
 type server struct {
@@ -340,6 +351,9 @@
 	status.ServesMountTable = s.servesMountTable
 	status.Mounts = s.publisher.Status()
 	status.Endpoints = []naming.Endpoint{}
+	if s.dhcpState != nil {
+		status.Valid = s.dhcpState.change
+	}
 	for ls, _ := range s.listenState {
 		if ls.eperr != nil {
 			status.Errors = append(status.Errors, ls.eperr)
@@ -362,7 +376,7 @@
 	return status
 }
 
-func (s *server) WatchNetwork(ch chan<- rpc.NetworkChange) {
+func (s *server) WatchNetwork(ch chan<- NetworkChange) {
 	defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	s.Lock()
 	defer s.Unlock()
@@ -371,7 +385,7 @@
 	}
 }
 
-func (s *server) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
+func (s *server) UnwatchNetwork(ch chan<- NetworkChange) {
 	defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	s.Lock()
 	defer s.Unlock()
@@ -519,7 +533,8 @@
 		dhcp := &dhcpState{
 			name:      s.settingsName,
 			publisher: s.settingsPublisher,
-			watchers:  make(map[chan<- rpc.NetworkChange]struct{}),
+			watchers:  make(map[chan<- NetworkChange]struct{}),
+			change:    make(chan struct{}),
 		}
 		s.dhcpState = dhcp
 		dhcp.ch = make(chan pubsub.Setting, 10)
@@ -727,14 +742,14 @@
 				s.Unlock()
 				return
 			}
-			change := rpc.NetworkChange{
+			change := NetworkChange{
 				Time:  time.Now(),
 				State: externalStates[s.state],
 			}
 			switch setting.Name() {
-			case NewAddrsSetting:
+			case roaming.UpdateAddrsSetting:
 				change.AddedAddrs, change.Changed = s.updateAddresses(v)
-			case RmAddrsSetting:
+			case roaming.RmAddrsSetting:
 				change.RemovedAddrs, change.Changed = s.removeAddresses(v)
 			}
 			s.ctx.VI(2).Infof("rpc: dhcp: change %v", change)
@@ -744,6 +759,10 @@
 				default:
 				}
 			}
+			if s.dhcpState.change != nil {
+				close(s.dhcpState.change)
+				s.dhcpState.change = make(chan struct{})
+			}
 			s.Unlock()
 		default:
 			s.ctx.Errorf("rpc: dhcpLoop: unhandled setting type %T", v)
@@ -757,7 +776,6 @@
 		return host
 	}
 	return address.String()
-
 }
 
 // findEndpoint returns the index of the first endpoint in ieps with the given network address.
@@ -994,6 +1012,10 @@
 		if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
 			drain(dhcp.ch)
 		}
+		if dhcp.change != nil {
+			close(dhcp.change)
+			dhcp.change = nil
+		}
 	}
 
 	s.Unlock()
diff --git a/runtime/internal/rpc/test/full_test.go b/runtime/internal/rpc/test/full_test.go
index 5af791a..1bee64e 100644
--- a/runtime/internal/rpc/test/full_test.go
+++ b/runtime/internal/rpc/test/full_test.go
@@ -13,6 +13,7 @@
 	"io"
 	"net"
 	"reflect"
+	"strings"
 	"testing"
 	"time"
 
@@ -289,13 +290,19 @@
 		t.Fatal(err)
 	}
 	status := server.Status()
-	if got, want := len(status.Endpoints), 0; got != want {
-		t.Errorf("got %d, want %d", got, want)
+	if ref.RPCTransitionState() >= ref.XServers {
+		if got, want := len(status.Endpoints), 1; got != want {
+			t.Errorf("got %d, want %d", got, want)
+		}
+	} else {
+		if got, want := len(status.Endpoints), 0; got != want {
+			t.Errorf("got %d, want %d", got, want)
+		}
 	}
 	if got, want := len(status.Errors), 1; got != want {
 		t.Errorf("got %d, want %d", got, want)
 	}
-	if got, want := status.Errors[0].Error(), "oops"; got != want {
+	if got, want := status.Errors[0].Error(), "oops"; !strings.Contains(got, want) {
 		t.Errorf("got %d, want %d", got, want)
 	}
 }
diff --git a/runtime/internal/rpc/x_test.go b/runtime/internal/rpc/x_test.go
index 5568371..0697ae0 100644
--- a/runtime/internal/rpc/x_test.go
+++ b/runtime/internal/rpc/x_test.go
@@ -30,7 +30,7 @@
 	var i uint64 = 1
 	ctx = fake.SetFlowManagerFactory(ctx, func(ctx *context.T) flow.Manager {
 		i++
-		return manager.New(ctx, naming.FixedRoutingID(i))
+		return manager.New(ctx, naming.FixedRoutingID(i), nil)
 	})
 	ctx = fake.SetClientFactory(ctx, func(ctx *context.T, o ...rpc.ClientOpt) rpc.Client {
 		return NewXClient(ctx, v23.GetNamespace(ctx), o...)
@@ -39,7 +39,7 @@
 	ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
 		Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
 	})
-	_, _, err := WithNewServer(ctx, "server", &testService{}, nil, nil, "")
+	_, _, err := WithNewServer(ctx, "server", &testService{}, nil, nil)
 	if err != nil {
 		t.Fatal(verror.DebugString(err))
 	}
@@ -64,7 +64,7 @@
 	var i uint64 = 1
 	ctx = fake.SetFlowManagerFactory(ctx, func(ctx *context.T) flow.Manager {
 		i++
-		return manager.New(ctx, naming.FixedRoutingID(i))
+		return manager.New(ctx, naming.FixedRoutingID(i), nil)
 	})
 	ctx = fake.SetClientFactory(ctx, func(ctx *context.T, o ...rpc.ClientOpt) rpc.Client {
 		return NewXClient(ctx, v23.GetNamespace(ctx), o...)
@@ -73,7 +73,7 @@
 	ctx = v23.WithListenSpec(ctx, rpc.ListenSpec{
 		Addrs: rpc.ListenAddrs{{Protocol: "tcp", Address: "127.0.0.1:0"}},
 	})
-	_, _, err := WithNewDispatchingServer(ctx, "server", &testDispatcher{}, nil, "")
+	_, _, err := WithNewDispatchingServer(ctx, "server", &testDispatcher{}, nil)
 	if err != nil {
 		t.Fatal(verror.DebugString(err))
 	}
diff --git a/runtime/internal/rpc/xclient.go b/runtime/internal/rpc/xclient.go
index 0c78353..875e6fe 100644
--- a/runtime/internal/rpc/xclient.go
+++ b/runtime/internal/rpc/xclient.go
@@ -80,7 +80,7 @@
 		}
 	}
 	if c.flowMgr == nil {
-		c.flowMgr = manager.New(ctx, naming.NullRoutingID)
+		c.flowMgr = manager.New(ctx, naming.NullRoutingID, nil)
 	}
 
 	go func() {
diff --git a/runtime/internal/rpc/xserver.go b/runtime/internal/rpc/xserver.go
index 5b8a798..42ef6c4 100644
--- a/runtime/internal/rpc/xserver.go
+++ b/runtime/internal/rpc/xserver.go
@@ -7,7 +7,6 @@
 import (
 	"fmt"
 	"io"
-	"net"
 	"reflect"
 	"strings"
 	"sync"
@@ -26,7 +25,6 @@
 	"v.io/v23/verror"
 	"v.io/v23/vom"
 	"v.io/v23/vtrace"
-	"v.io/x/lib/netstate"
 	"v.io/x/ref/lib/apilog"
 	"v.io/x/ref/lib/pubsub"
 	"v.io/x/ref/lib/stats"
@@ -36,7 +34,6 @@
 )
 
 // TODO(mattr): add/removeAddresses
-// TODO(mattr): dhcpLoop
 
 type xserver struct {
 	sync.Mutex
@@ -49,17 +46,13 @@
 	flowMgr           flow.Manager
 	publisher         publisher.Publisher // publisher to publish mounttable mounts.
 	settingsPublisher *pubsub.Publisher   // pubsub publisher for dhcp
-	settingsName      string              // pubwsub stream name for dhcp
-	dhcpState         *dhcpState          // dhcpState, nil if not using dhcp
+	valid             chan struct{}
 	blessings         security.Blessings
 	typeCache         *typeCache
-	addressChooser    rpc.AddressChooser
 	state             rpc.ServerState // the current state of the server.
 
-	chosenEndpoints map[string]*inaming.Endpoint            // endpoints chosen by the addressChooser for publishing.
-	protoEndpoints  map[string]*inaming.Endpoint            // endpoints that act as "template" endpoints.
-	proxyEndpoints  map[string]map[string]*inaming.Endpoint // keyed by ep.String()
-	lnErrors        map[string]error                        // erros from listening, keyed by ep.String()
+	endpoints map[string]*inaming.Endpoint // endpoints that the server is listening on.
+	lnErrors  []error                      // errors from listening
 
 	disp               rpc.Dispatcher // dispatcher to serve RPCs
 	dispReserved       rpc.Dispatcher // dispatcher for reserved methods
@@ -68,13 +61,12 @@
 	servesMountTable   bool
 	isLeaf             bool
 
-	// TODO(cnicolaou): add roaming stats to rpcStats
 	stats *rpcStats // stats for this server.
 }
 
 func WithNewServer(ctx *context.T,
 	name string, object interface{}, authorizer security.Authorizer,
-	settingsPublisher *pubsub.Publisher, settingsName string,
+	settingsPublisher *pubsub.Publisher,
 	opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
 	if object == nil {
 		return ctx, nil, verror.New(verror.ErrBadArg, ctx, "nil object")
@@ -85,12 +77,12 @@
 	}
 	d := &leafDispatcher{invoker, authorizer}
 	opts = append([]rpc.ServerOpt{options.IsLeaf(true)}, opts...)
-	return WithNewDispatchingServer(ctx, name, d, settingsPublisher, settingsName, opts...)
+	return WithNewDispatchingServer(ctx, name, d, settingsPublisher, opts...)
 }
 
 func WithNewDispatchingServer(ctx *context.T,
 	name string, dispatcher rpc.Dispatcher,
-	settingsPublisher *pubsub.Publisher, settingsName string,
+	settingsPublisher *pubsub.Publisher,
 	opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
 	if dispatcher == nil {
 		return ctx, nil, verror.New(verror.ErrBadArg, ctx, "nil dispatcher")
@@ -110,11 +102,11 @@
 		blessings:         v23.GetPrincipal(rootCtx).BlessingStore().Default(),
 		stats:             newRPCStats(statsPrefix),
 		settingsPublisher: settingsPublisher,
-		settingsName:      settingsName,
+		valid:             make(chan struct{}),
 		disp:              dispatcher,
 		typeCache:         newTypeCache(),
-		proxyEndpoints:    make(map[string]map[string]*inaming.Endpoint),
 		state:             rpc.ServerActive,
+		endpoints:         make(map[string]*inaming.Endpoint),
 	}
 	for _, opt := range opts {
 		switch opt := opt.(type) {
@@ -136,7 +128,7 @@
 		}
 	}
 
-	s.flowMgr = manager.NewWithBlessings(rootCtx, s.blessings, rid)
+	s.flowMgr = manager.NewWithBlessings(rootCtx, s.blessings, rid, settingsPublisher)
 	rootCtx, _, err = v23.WithNewClient(rootCtx,
 		clientFlowManagerOpt{s.flowMgr},
 		PreferredProtocols(s.preferredProtocols))
@@ -157,7 +149,7 @@
 		// TODO(mattr): We only call AddServer here, but if someone calls AddName
 		// later there will be no servers?
 		s.Lock()
-		for k, _ := range s.chosenEndpoints {
+		for k, _ := range s.endpoints {
 			s.publisher.AddServer(k)
 		}
 		s.Unlock()
@@ -187,30 +179,6 @@
 			close(done)
 		}()
 
-		s.Lock()
-		// TODO(mattr): I don't understand what this is.
-		if dhcp := s.dhcpState; dhcp != nil {
-			// TODO(cnicolaou,caprita): investigate not having to close and drain
-			// the channel here. It's a little awkward right now since we have to
-			// be careful to not close the channel in two places, i.e. here and
-			// and from the publisher's Shutdown method.
-			if err := dhcp.publisher.CloseFork(dhcp.name, dhcp.ch); err == nil {
-			drain:
-				for {
-					select {
-					case v := <-dhcp.ch:
-						if v == nil {
-							break drain
-						}
-					default:
-						close(dhcp.ch)
-						break drain
-					}
-				}
-			}
-		}
-		s.Unlock()
-
 		select {
 		case <-done:
 		case <-time.After(5 * time.Second): // TODO(mattr): This should be configurable.
@@ -222,6 +190,8 @@
 		// operations to a close.
 		s.cancel()
 		<-s.flowMgr.Closed()
+		close(s.valid)
+		s.valid = nil
 		// Now we really will wait forever.  If this doesn't exit, there's something
 		// wrong with the users code.
 		<-done
@@ -237,35 +207,16 @@
 	status.ServesMountTable = s.servesMountTable
 	status.Mounts = s.publisher.Status()
 	s.Lock()
+	status.Valid = s.valid
 	status.State = s.state
-	for _, e := range s.chosenEndpoints {
+	for _, e := range s.endpoints {
 		status.Endpoints = append(status.Endpoints, e)
 	}
-	for _, err := range s.lnErrors {
-		status.Errors = append(status.Errors, err)
-	}
+	status.Errors = s.lnErrors
 	s.Unlock()
 	return status
 }
 
-func (s *xserver) WatchNetwork(ch chan<- rpc.NetworkChange) {
-	defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	s.Lock()
-	defer s.Unlock()
-	if s.dhcpState != nil {
-		s.dhcpState.watchers[ch] = struct{}{}
-	}
-}
-
-func (s *xserver) UnwatchNetwork(ch chan<- rpc.NetworkChange) {
-	defer apilog.LogCallf(nil, "ch=")(nil, "") // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
-	s.Lock()
-	defer s.Unlock()
-	if s.dhcpState != nil {
-		delete(s.dhcpState.watchers, ch)
-	}
-}
-
 // resolveToEndpoint resolves an object name or address to an endpoint.
 func (s *xserver) resolveToEndpoint(ctx *context.T, address string) (naming.Endpoint, error) {
 	var resolved *naming.MountEntry
@@ -297,75 +248,89 @@
 	return nil, verror.New(errFailedToResolveToEndpoint, s.ctx, address)
 }
 
-// createEndpoints creates appropriate inaming.Endpoint instances for
-// all of the externally accessible network addresses that can be used
-// to reach this server.
-func (s *xserver) createEndpoints(lep naming.Endpoint, chooser netstate.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
-	iep, ok := lep.(*inaming.Endpoint)
-	if !ok {
-		return nil, "", false, verror.New(errInternalTypeConversion, nil, fmt.Sprintf("%T", lep))
-	}
-	if !strings.HasPrefix(iep.Protocol, "tcp") &&
-		!strings.HasPrefix(iep.Protocol, "ws") {
-		// If not tcp, ws, or wsh, just return the endpoint we were given.
-		return []*inaming.Endpoint{iep}, "", false, nil
-	}
-	host, port, err := net.SplitHostPort(iep.Address)
-	if err != nil {
-		return nil, "", false, err
-	}
-	addrs, unspecified, err := netstate.PossibleAddresses(iep.Protocol, host, chooser)
-	if err != nil {
-		return nil, port, false, err
-	}
-
-	ieps := make([]*inaming.Endpoint, 0, len(addrs))
-	for _, addr := range addrs {
-		n, err := inaming.NewEndpoint(lep.String())
-		if err != nil {
-			return nil, port, false, err
-		}
-		n.IsMountTable = s.servesMountTable
-		n.IsLeaf = s.isLeaf
-		n.Address = net.JoinHostPort(addr.String(), port)
-		ieps = append(ieps, n)
-	}
-	return ieps, port, unspecified, nil
+// createEndpoint adds server publishing information to the ep from the manager.
+func (s *xserver) createEndpoint(lep naming.Endpoint) *inaming.Endpoint {
+	n := *(lep.(*inaming.Endpoint))
+	n.IsMountTable = s.servesMountTable
+	n.IsLeaf = s.isLeaf
+	return &n
 }
 
-func (s *xserver) update(pep naming.Endpoint) func([]naming.Endpoint) {
-	return func(leps []naming.Endpoint) {
-		chosenEps := make(map[string]*inaming.Endpoint)
-		pkey := pep.String()
-		for _, ep := range leps {
-			eps, _, _, _ := s.createEndpoints(ep, s.addressChooser)
-			for _, cep := range eps {
-				chosenEps[cep.String()] = cep
+func (s *xserver) listen(ctx *context.T, listenSpec rpc.ListenSpec) {
+	s.Lock()
+	defer s.Unlock()
+	if len(listenSpec.Proxy) > 0 {
+		ep, err := s.resolveToEndpoint(ctx, listenSpec.Proxy)
+		if err != nil {
+			s.ctx.VI(2).Infof("resolveToEndpoint(%q) failed: %v", listenSpec.Proxy, err)
+		} else {
+			err = s.flowMgr.ProxyListen(ctx, ep)
+			if err != nil {
+				s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", inaming.Network, ep, err)
 			}
-			// TODO(suharshs): do protoEndpoints need to be handled here?
 		}
-
-		// Endpoints to add and remove.
-		s.Lock()
-		oldEps := s.proxyEndpoints[pkey]
-		s.proxyEndpoints[pkey] = chosenEps
-		rmEps := setDiff(oldEps, chosenEps)
-		addEps := setDiff(chosenEps, oldEps)
-		for k := range rmEps {
-			delete(s.chosenEndpoints, k)
-		}
-		for k, ep := range addEps {
-			s.chosenEndpoints[k] = ep
-		}
-		s.Unlock()
-
-		for k := range rmEps {
-			s.publisher.RemoveServer(k)
-		}
-		for k := range addEps {
-			s.publisher.AddServer(k)
+		if err != nil {
+			s.lnErrors = append(s.lnErrors, err)
 		}
 	}
+	for _, addr := range listenSpec.Addrs {
+		if len(addr.Address) > 0 {
+			err := s.flowMgr.Listen(ctx, addr.Protocol, addr.Address)
+			if err != nil {
+				s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, err)
+				s.lnErrors = append(s.lnErrors, err)
+			}
+		}
+	}
+
+	// We call updateEndpointsLocked in serial once to populate our endpoints for
+	// server status with at least one endpoint.
+	leps, _ := s.flowMgr.ListeningEndpoints()
+	s.updateEndpointsLocked(leps)
+	s.active.Add(2)
+	go s.updateEndpointsLoop()
+	go s.acceptLoop(ctx)
+}
+
+func (s *xserver) updateEndpointsLoop() {
+	defer s.active.Done()
+	for leps, changed := s.flowMgr.ListeningEndpoints(); changed != nil; {
+		s.Lock()
+		s.updateEndpointsLocked(leps)
+		if s.valid != nil {
+			close(s.valid)
+			s.valid = make(chan struct{})
+		}
+		s.Unlock()
+		<-changed
+		leps, changed = s.flowMgr.ListeningEndpoints()
+	}
+}
+
+func (s *xserver) updateEndpointsLocked(leps []naming.Endpoint) {
+	endpoints := make(map[string]*inaming.Endpoint)
+	for _, ep := range leps {
+		sep := s.createEndpoint(ep)
+		endpoints[sep.String()] = sep
+	}
+	// Endpoints to add and remaove.
+	rmEps := setDiff(s.endpoints, endpoints)
+	addEps := setDiff(endpoints, s.endpoints)
+	for k := range rmEps {
+		delete(s.endpoints, k)
+	}
+	for k, ep := range addEps {
+		s.endpoints[k] = ep
+	}
+
+	s.Unlock()
+	for k := range rmEps {
+		s.publisher.RemoveServer(k)
+	}
+	for k := range addEps {
+		s.publisher.AddServer(k)
+	}
+	s.Lock()
 }
 
 // setDiff returns the endpoints in a that are not in b.
@@ -379,62 +344,6 @@
 	return ret
 }
 
-func (s *xserver) listen(ctx *context.T, listenSpec rpc.ListenSpec) {
-	s.Lock()
-	defer s.Unlock()
-	var lastErr error
-	var ep naming.Endpoint
-	if len(listenSpec.Proxy) > 0 {
-		ep, lastErr = s.resolveToEndpoint(ctx, listenSpec.Proxy)
-		if lastErr != nil {
-			s.ctx.VI(2).Infof("resolveToEndpoint(%q) failed: %v", listenSpec.Proxy, lastErr)
-		} else {
-			lastErr = s.flowMgr.ProxyListen(ctx, ep, s.update(ep))
-			if lastErr != nil {
-				s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", inaming.Network, ep, lastErr)
-			}
-		}
-	}
-	for _, addr := range listenSpec.Addrs {
-		if len(addr.Address) > 0 {
-			lastErr = s.flowMgr.Listen(ctx, addr.Protocol, addr.Address)
-			if lastErr != nil {
-				s.ctx.VI(2).Infof("Listen(%q, %q, ...) failed: %v", addr.Protocol, addr.Address, lastErr)
-			}
-		}
-	}
-
-	leps := s.flowMgr.ListeningEndpoints()
-	s.addressChooser = listenSpec.AddressChooser
-	roaming := false
-	chosenEps := make(map[string]*inaming.Endpoint)
-	protoEps := make(map[string]*inaming.Endpoint)
-	lnErrs := make(map[string]error)
-	for _, ep := range leps {
-		eps, _, eproaming, eperr := s.createEndpoints(ep, listenSpec.AddressChooser)
-		for _, cep := range eps {
-			chosenEps[cep.String()] = cep
-		}
-		if eproaming && eperr == nil {
-			protoEps[ep.String()] = ep.(*inaming.Endpoint)
-			roaming = true
-		}
-		if eperr != nil {
-			lnErrs[ep.String()] = eperr
-		}
-	}
-	s.chosenEndpoints = chosenEps
-	s.protoEndpoints = protoEps
-	s.lnErrors = lnErrs
-
-	if roaming && s.dhcpState == nil && s.settingsPublisher != nil {
-		// TODO(mattr): Support roaming.
-	}
-
-	s.active.Add(1)
-	go s.acceptLoop(ctx)
-}
-
 func (s *xserver) acceptLoop(ctx *context.T) error {
 	var calls sync.WaitGroup
 	defer func() {
diff --git a/runtime/internal/rt/runtime.go b/runtime/internal/rt/runtime.go
index 9dd8a22..78c4bdb 100644
--- a/runtime/internal/rt/runtime.go
+++ b/runtime/internal/rt/runtime.go
@@ -512,10 +512,11 @@
 	if err != nil {
 		return nil, err
 	}
-	return manager.New(ctx, rid), nil
+	id, _ := ctx.Value(initKey).(*initData)
+	return manager.New(ctx, rid, id.settingsPublisher), nil
 }
 
-func (r *Runtime) commonServerInit(ctx *context.T, opts ...rpc.ServerOpt) (*pubsub.Publisher, string, []rpc.ServerOpt, error) {
+func (r *Runtime) commonServerInit(ctx *context.T, opts ...rpc.ServerOpt) (*pubsub.Publisher, []rpc.ServerOpt, error) {
 	otherOpts := append([]rpc.ServerOpt{}, opts...)
 	if reservedDispatcher := r.GetReservedNameDispatcher(ctx); reservedDispatcher != nil {
 		otherOpts = append(otherOpts, irpc.ReservedNameDispatcher{
@@ -526,17 +527,17 @@
 	if id.protocols != nil {
 		otherOpts = append(otherOpts, irpc.PreferredServerResolveProtocols(id.protocols))
 	}
-	return id.settingsPublisher, id.settingsName, otherOpts, nil
+	return id.settingsPublisher, otherOpts, nil
 }
 
 func (r *Runtime) WithNewServer(ctx *context.T, name string, object interface{}, auth security.Authorizer, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	if ref.RPCTransitionState() >= ref.XServers {
-		spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+		spub, opts, err := r.commonServerInit(ctx, opts...)
 		if err != nil {
 			return ctx, nil, err
 		}
-		newctx, s, err := irpc.WithNewServer(ctx, name, object, auth, spub, sname, opts...)
+		newctx, s, err := irpc.WithNewServer(ctx, name, object, auth, spub, opts...)
 		if err != nil {
 			return ctx, nil, err
 		}
@@ -563,11 +564,11 @@
 func (r *Runtime) WithNewDispatchingServer(ctx *context.T, name string, disp rpc.Dispatcher, opts ...rpc.ServerOpt) (*context.T, rpc.Server, error) {
 	defer apilog.LogCall(ctx)(ctx) // gologcop: DO NOT EDIT, MUST BE FIRST STATEMENT
 	if ref.RPCTransitionState() >= ref.XServers {
-		spub, sname, opts, err := r.commonServerInit(ctx, opts...)
+		spub, opts, err := r.commonServerInit(ctx, opts...)
 		if err != nil {
 			return ctx, nil, err
 		}
-		newctx, s, err := irpc.WithNewDispatchingServer(ctx, name, disp, spub, sname, opts...)
+		newctx, s, err := irpc.WithNewDispatchingServer(ctx, name, disp, spub, opts...)
 		if err != nil {
 			return ctx, nil, err
 		}
diff --git a/runtime/internal/rt/security.go b/runtime/internal/rt/security.go
index 35d9427..c637ee1 100644
--- a/runtime/internal/rt/security.go
+++ b/runtime/internal/rt/security.go
@@ -80,7 +80,13 @@
 	} else {
 		name = "anonymous"
 	}
-	if host, _ := os.Hostname(); len(host) > 0 {
+	host, _ := os.Hostname()
+	if host == "(none)" {
+		// (none) is a common default hostname and contains parentheses,
+		// which are invalid blessings characters.
+		host = "anonymous"
+	}
+	if len(host) > 0 {
 		name = name + "@" + host
 	}
 	return fmt.Sprintf("%s-%d", name, os.Getpid())
diff --git a/services/xproxyd/proxy_test.go b/services/xproxyd/proxy_test.go
index b1eaa78..a6314d2 100644
--- a/services/xproxyd/proxy_test.go
+++ b/services/xproxyd/proxy_test.go
@@ -58,12 +58,12 @@
 	}
 	// Wait for the server to finish listening through the proxy.
 	eps := s.Status().Endpoints
-	for ; len(eps) < 2 || eps[1].Addr().Network() == bidiProtocol; eps = s.Status().Endpoints {
+	for ; len(eps) == 0 || eps[0].Addr().Network() == bidiProtocol; eps = s.Status().Endpoints {
 		time.Sleep(pollTime)
 	}
 
 	var got string
-	if err := v23.GetClient(ctx).Call(ctx, eps[1].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
+	if err := v23.GetClient(ctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
 		t.Fatal(err)
 	}
 	if want := "response:hello"; got != want {
@@ -147,13 +147,13 @@
 	}
 	// Wait for the server to finish listening through the proxy.
 	eps := s.Status().Endpoints
-	for ; len(eps) < 2 || eps[1].Addr().Network() == bidiProtocol; eps = s.Status().Endpoints {
+	for ; len(eps) == 0 || eps[0].Addr().Network() == bidiProtocol; eps = s.Status().Endpoints {
 		time.Sleep(pollTime)
 	}
 	// The call should succeed which means that the client did not try to authorize
 	// the proxy's blessings.
 	var got string
-	if err := v23.GetClient(ctx).Call(ctx, eps[1].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
+	if err := v23.GetClient(ctx).Call(ctx, eps[0].Name(), "Echo", []interface{}{"hello"}, []interface{}{&got}); err != nil {
 		t.Fatal(err)
 	}
 	if want := "response:hello"; got != want {
@@ -179,19 +179,19 @@
 
 	pep := startProxy(t, ctx, address{"kill", "127.0.0.1:0"})
 
-	done := make(chan struct{})
-	update := func(eps []naming.Endpoint) {
-		if len(eps) > 0 {
+	if err := am.ProxyListen(ctx, pep); err != nil {
+		t.Fatal(err)
+	}
+	for {
+		eps, changed := am.ListeningEndpoints()
+		if eps[0].Addr().Network() != bidiProtocol {
 			if err := testEndToEndConnection(t, ctx, dm, am, eps[0]); err != nil {
 				t.Error(err)
 			}
-			close(done)
+			return
 		}
+		<-changed
 	}
-	if err := am.ProxyListen(ctx, pep, update); err != nil {
-		t.Fatal(err)
-	}
-	<-done
 }
 
 func TestMultipleProxies(t *testing.T) {
@@ -215,27 +215,26 @@
 
 	p3ep := startProxy(t, ctx, address{"v23", p2ep.String()}, address{"kill", "127.0.0.1:0"})
 
-	done := make(chan struct{})
-	update := func(eps []naming.Endpoint) {
+	if err := am.ProxyListen(ctx, p3ep); err != nil {
+		t.Fatal(err)
+	}
+
+	for {
+		eps, changed := am.ListeningEndpoints()
 		// TODO(suharshs): Fix this test once we have the proxy send update messages to the
 		// server when it reconnects to a proxy. This test only really tests the first connection
 		// currently because the connections are cached. So we need to kill connections and
 		// wait for them to reestablish but we need proxies to update communicate their new endpoints
 		// to each other and to the server. For now we at least check a random endpoint so the
 		// test will at least fail over many runs if something is wrong.
-		if len(eps) > 0 {
+		if eps[0].Addr().Network() != bidiProtocol {
 			if err := testEndToEndConnection(t, ctx, dm, am, eps[rand.Int()%3]); err != nil {
 				t.Error(err)
 			}
-			close(done)
+			return
 		}
+		<-changed
 	}
-
-	if err := am.ProxyListen(ctx, p3ep, update); err != nil {
-		t.Fatal(err)
-	}
-
-	<-done
 }
 
 func testEndToEndConnection(t *testing.T, ctx *context.T, dm, am flow.Manager, aep naming.Endpoint) error {
diff --git a/services/xproxyd/proxyd.go b/services/xproxyd/proxyd.go
index d14b2f2..ebf4703 100644
--- a/services/xproxyd/proxyd.go
+++ b/services/xproxyd/proxyd.go
@@ -49,7 +49,9 @@
 }
 
 func (p *proxy) ListeningEndpoints() []naming.Endpoint {
-	return p.m.ListeningEndpoints()
+	// TODO(suharshs): Return changed channel here as well.
+	eps, _ := p.m.ListeningEndpoints()
+	return eps
 }
 
 func (p *proxy) MultipleProxyEndpoints() []naming.Endpoint {
@@ -165,7 +167,7 @@
 
 func (p *proxy) returnEndpoints(ctx *context.T, rid naming.RoutingID, route string) ([]naming.Endpoint, error) {
 	p.mu.Lock()
-	eps := p.m.ListeningEndpoints()
+	eps, _ := p.m.ListeningEndpoints()
 	for _, peps := range p.proxyEndpoints {
 		eps = append(eps, peps...)
 	}