ref: Implement roaming.
The flow manager now handles roaming.
Changes:
(1) The server is notified to requery ListeningEndpoints when a chan
returned from ListeningEndpoints is closed.
(2) Roaming logic has been refactored into a library.
(3) WatchNetwork and UnwatchNetwork have been deprecated and replaced
by status.Valid that is a channel that is closed when the
Status needs to be requeried by callers.
MultiPart: 2/2
Change-Id: I5e34876205e1017ae8be00ca5bb731fa0a94a5a2
diff --git a/runtime/factories/roaming/roaming.go b/runtime/factories/roaming/roaming.go
index a015e18..7a2703d 100644
--- a/runtime/factories/roaming/roaming.go
+++ b/runtime/factories/roaming/roaming.go
@@ -16,9 +16,6 @@
import (
"flag"
- "v.io/x/lib/netconfig"
- "v.io/x/lib/netstate"
-
"v.io/v23"
"v.io/v23/context"
"v.io/v23/flow"
@@ -34,9 +31,9 @@
_ "v.io/x/ref/runtime/internal/flow/protocols/ws"
_ "v.io/x/ref/runtime/internal/flow/protocols/wsh"
"v.io/x/ref/runtime/internal/lib/appcycle"
+ "v.io/x/ref/runtime/internal/lib/roaming"
"v.io/x/ref/runtime/internal/lib/websocket"
"v.io/x/ref/runtime/internal/lib/xwebsocket"
- irpc "v.io/x/ref/runtime/internal/rpc"
"v.io/x/ref/runtime/internal/rt"
"v.io/x/ref/services/debug/debuglib"
@@ -46,11 +43,6 @@
_ "v.io/x/ref/runtime/internal/rpc/protocols/wsh"
)
-const (
- SettingsStreamName = "roaming"
- SettingsStreamDesc = "pubsub stream used by the roaming RuntimeFactory"
-)
-
var commonFlags *flags.Flags
func init() {
@@ -87,102 +79,22 @@
publisher := pubsub.NewPublisher()
- // Create stream in Init function to avoid a race between any
- // goroutines started here and consumers started after Init returns.
- ch := make(chan pubsub.Setting)
- // TODO(cnicolaou): use stop to shutdown this stream when the RuntimeFactory shutdowns.
- stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
+ // TODO(suharshs): We can remove the SettingName argument after the transition to new RPC.
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, roaming.RoamingSetting, commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
ishutdown()
return nil, nil, nil, err
}
- prev, err := netstate.GetAccessibleIPs()
+ stopRoaming, err := roaming.CreateRoamingStream(ctx, publisher, listenSpec)
if err != nil {
- ishutdown()
return nil, nil, nil, err
}
- // Start the dhcp watcher.
- watcher, err := netconfig.NewNetConfigWatcher()
- if err != nil {
- ishutdown()
- return nil, nil, nil, err
- }
-
- cleanupCh := make(chan struct{})
- watcherCh := make(chan struct{})
-
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, discovery, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
- if err != nil {
- ishutdown()
- return nil, nil, nil, err
- }
-
- go monitorNetworkSettingsX(runtime, ctx, watcher, prev, stop, cleanupCh, watcherCh, ch)
runtimeFactoryShutdown := func() {
- close(cleanupCh)
ishutdown()
shutdown()
- <-watcherCh
+ stopRoaming()
}
return runtime, ctx, runtimeFactoryShutdown, nil
}
-
-// monitorNetworkSettings will monitor network configuration changes and
-// publish subsequent Settings to reflect any changes detected.
-func monitorNetworkSettingsX(
- runtime *rt.Runtime,
- ctx *context.T,
- watcher netconfig.NetConfigWatcher,
- prev netstate.AddrList,
- pubStop, cleanup <-chan struct{},
- watcherLoop chan<- struct{},
- ch chan<- pubsub.Setting) {
- defer close(ch)
-
- listenSpec := runtime.GetListenSpec(ctx)
-
- // TODO(cnicolaou): add support for listening on multiple network addresses.
-
-done:
- for {
- select {
- case <-watcher.Channel():
- netstate.InvalidateCache()
- cur, err := netstate.GetAccessibleIPs()
- if err != nil {
- ctx.Errorf("failed to read network state: %s", err)
- continue
- }
- removed := netstate.FindRemoved(prev, cur)
- added := netstate.FindAdded(prev, cur)
- ctx.VI(2).Infof("Previous: %d: %s", len(prev), prev)
- ctx.VI(2).Infof("Current : %d: %s", len(cur), cur)
- ctx.VI(2).Infof("Added : %d: %s", len(added), added)
- ctx.VI(2).Infof("Removed : %d: %s", len(removed), removed)
- if len(removed) == 0 && len(added) == 0 {
- ctx.VI(2).Infof("Network event that lead to no address changes since our last 'baseline'")
- continue
- }
- if len(removed) > 0 {
- ctx.VI(2).Infof("Sending removed: %s", removed)
- ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
- }
- // We will always send the best currently available address
- if chosen, err := listenSpec.AddressChooser.ChooseAddresses(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
- ctx.VI(2).Infof("Sending added and chosen: %s", chosen)
- ch <- irpc.NewNewAddrsSetting(chosen)
- } else {
- ctx.VI(2).Infof("Ignoring added %s", added)
- }
- prev = cur
- case <-cleanup:
- break done
- case <-pubStop:
- goto done
- }
- }
- watcher.Stop()
- close(watcherLoop)
-}