v23: break dependency on lib/pubsub

MultiPart: 2/2
Change-Id: Ib989867e978a27a7a892b9f922b76cb9a3a442a0
diff --git a/profiles/roaming/roaminginit.go b/profiles/roaming/roaminginit.go
index 8ec4482..c6b37f6 100644
--- a/profiles/roaming/roaminginit.go
+++ b/profiles/roaming/roaminginit.go
@@ -31,6 +31,7 @@
 	"v.io/x/ref/profiles/internal"
 	"v.io/x/ref/profiles/internal/lib/appcycle"
 	"v.io/x/ref/profiles/internal/lib/websocket"
+	irpc "v.io/x/ref/profiles/internal/rpc"
 	_ "v.io/x/ref/profiles/internal/rpc/protocols/tcp"
 	_ "v.io/x/ref/profiles/internal/rpc/protocols/ws"
 	_ "v.io/x/ref/profiles/internal/rpc/protocols/wsh"
@@ -40,6 +41,7 @@
 
 const (
 	SettingsStreamName = "roaming"
+	SettingsStreamDesc = "pubsub stream used by the roaming profile"
 )
 
 var commonFlags *flags.Flags
@@ -74,7 +76,7 @@
 				// flag to configure both the protocol and address.
 				return []net.Addr{netstate.NewNetAddr("wsh", addr.String())}, nil
 			}
-			runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
+			runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, nil, "", commonFlags.RuntimeFlags(), reservedDispatcher)
 			if err != nil {
 				return nil, nil, shutdown, err
 			}
@@ -91,7 +93,8 @@
 	// Create stream in Init function to avoid a race between any
 	// goroutines started here and consumers started after Init returns.
 	ch := make(chan pubsub.Setting)
-	stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamName, ch)
+	// TODO(cnicolaou): use stop to shutdown this stream when the profile shutdowns.
+	stop, err := publisher.CreateStream(SettingsStreamName, SettingsStreamDesc, ch)
 	if err != nil {
 		ac.Shutdown()
 		return nil, nil, nil, err
@@ -113,11 +116,9 @@
 	cleanupCh := make(chan struct{})
 	watcherCh := make(chan struct{})
 
-	listenSpec.StreamPublisher = publisher
-	listenSpec.StreamName = SettingsStreamName
 	listenSpec.AddressChooser = internal.IPAddressChooser
 
-	runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, commonFlags.RuntimeFlags(), reservedDispatcher)
+	runtime, ctx, shutdown, err := rt.Init(ctx, ac, nil, &listenSpec, publisher, SettingsStreamName, commonFlags.RuntimeFlags(), reservedDispatcher)
 	if err != nil {
 		return nil, nil, shutdown, err
 	}
@@ -170,12 +171,12 @@
 			}
 			if len(removed) > 0 {
 				vlog.VI(2).Infof("Sending removed: %s", removed)
-				ch <- rpc.NewRmAddrsSetting(removed.AsNetAddrs())
+				ch <- irpc.NewRmAddrsSetting(removed.AsNetAddrs())
 			}
 			// We will always send the best currently available address
 			if chosen, err := listenSpec.AddressChooser(listenSpec.Addrs[0].Protocol, cur.AsNetAddrs()); err == nil && chosen != nil {
 				vlog.VI(2).Infof("Sending added and chosen: %s", chosen)
-				ch <- rpc.NewAddAddrsSetting(chosen)
+				ch <- irpc.NewAddAddrsSetting(chosen)
 			} else {
 				vlog.VI(2).Infof("Ignoring added %s", added)
 			}