v23: break dependency on lib/pubsub
MultiPart: 2/2
Change-Id: Ib989867e978a27a7a892b9f922b76cb9a3a442a0
diff --git a/profiles/roaming/roaminginit.go b/profiles/roaming/roaminginit.go
index 8ec4482..c6b37f6 100644
--- a/profiles/roaming/roaminginit.go
+++ b/profiles/roaming/roaminginit.go
@@ -31,6 +31,7 @@
"v.io/x/ref/profiles/internal"
"v.io/x/ref/profiles/internal/lib/appcycle"
"v.io/x/ref/profiles/internal/lib/websocket"
+ irpc "v.io/x/ref/profiles/internal/rpc"
_ "v.io/x/ref/profiles/internal/rpc/protocols/tcp"
_ "v.io/x/ref/profiles/internal/rpc/protocols/ws"
_ "v.io/x/ref/profiles/internal/rpc/protocols/wsh"
@@ -40,6 +41,7 @@
const (
SettingsStreamName = "roaming"
+ SettingsStreamDesc = "pubsub stream used by the roaming profile"
)
var commonFlags *flags.Flags
@@ -74,7 +76,7 @@
// flag to configure both the protocol and address.
return []net.Addr{netstate.NewNetAddr("wsh", addr.String())}, nil
}
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
return nil, nil, shutdown, err
}
@@ -91,7 +93,8 @@
// Create stream in Init function to avoid a race between any
// goroutines started here and consumers started after Init returns.
ch := make(chan pubsub.Setting)
- stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamName, ch)
+ // TODO(cnicolaou): use stop to shutdown this stream when the profile shutdowns.
+ stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
if err != nil {
ac.Shutdown()
return nil, nil, nil, err
@@ -113,11 +116,9 @@
cleanupCh := make(chan struct{})
watcherCh := make(chan struct{})
- listenSpec.StreamPublisher = publisher
- listenSpec.StreamName = SettingsStreamName
listenSpec.AddressChooser = internal.IPAddressChooser
- runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
+ runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
if err != nil {
return nil, nil, shutdown, err
}
@@ -170,12 +171,12 @@
}
if len(removed) > 0 {
vlog.VI(2).Infof("Sending removed: %s", removed)
- ch <- rpc.NewRmAddrsSetting(removed.AsNetAddrs())
+ ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
}
// We will always send the best currently available address
if chosen, err := listenSpec.AddressChooser(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
vlog.VI(2).Infof("Sending added and chosen: %s", chosen)
- ch <- rpc.NewAddAddrsSetting(chosen)
+ ch <- irpc.NewAddAddrsSetting(chosen)
} else {
vlog.VI(2).Infof("Ignoring added %s", added)
}