v23: break dependency on lib/pubsub

MultiPart: 2/2
Change-Id: Ib989867e978a27a7a892b9f922b76cb9a3a442a0
diff --git a/profiles/internal/rpc/server.go b/profiles/internal/rpc/server.go
index 3516796..102f82a 100644
--- a/profiles/internal/rpc/server.go
+++ b/profiles/internal/rpc/server.go
@@ -83,15 +83,17 @@
 type server struct {
 	sync.Mutex
 	// context used by the server to make internal RPCs, error messages etc.
-	ctx          *context.T
-	cancel       context.CancelFunc   // function to cancel the above context.
-	state        serverState          // track state of the server.
-	streamMgr    stream.Manager       // stream manager to listen for new flows.
-	publisher    publisher.Publisher  // publisher to publish mounttable mounts.
-	listenerOpts []stream.ListenerOpt // listener opts for Listen.
-	dhcpState    *dhcpState           // dhcpState, nil if not using dhcp
-	principal    security.Principal
-	blessings    security.Blessings
+	ctx               *context.T
+	cancel            context.CancelFunc   // function to cancel the above context.
+	state             serverState          // track state of the server.
+	streamMgr         stream.Manager       // stream manager to listen for new flows.
+	publisher         publisher.Publisher  // publisher to publish mounttable mounts.
+	listenerOpts      []stream.ListenerOpt // listener opts for Listen.
+	settingsPublisher *pubsub.Publisher    // pubsub publisher for dhcp
+	settingsName      string               // pubwsub stream name for dhcp
+	dhcpState         *dhcpState           // dhcpState, nil if not using dhcp
+	principal         security.Principal
+	blessings         security.Blessings
 
 	// maps that contain state on listeners.
 	listenState map[*listenState]struct{}
@@ -173,6 +175,8 @@
 	ctx *context.T,
 	streamMgr stream.Manager,
 	ns namespace.T,
+	settingsPublisher *pubsub.Publisher,
+	settingsName string,
 	client rpc.Client,
 	principal security.Principal,
 	opts ...rpc.ServerOpt) (rpc.Server, error) {
@@ -180,18 +184,20 @@
 	ctx, _ = vtrace.WithNewSpan(ctx, "NewServer")
 	statsPrefix := naming.Join("rpc", "server", "routing-id", streamMgr.RoutingID().String())
 	s := &server{
-		ctx:         ctx,
-		cancel:      cancel,
-		streamMgr:   streamMgr,
-		principal:   principal,
-		publisher:   publisher.New(ctx, ns, publishPeriod),
-		listenState: make(map[*listenState]struct{}),
-		listeners:   make(map[stream.Listener]struct{}),
-		proxies:     make(map[string]proxyState),
-		stoppedChan: make(chan struct{}),
-		ipNets:      ipNetworks(),
-		ns:          ns,
-		stats:       newRPCStats(statsPrefix),
+		ctx:               ctx,
+		cancel:            cancel,
+		streamMgr:         streamMgr,
+		principal:         principal,
+		publisher:         publisher.New(ctx, ns, publishPeriod),
+		listenState:       make(map[*listenState]struct{}),
+		listeners:         make(map[stream.Listener]struct{}),
+		proxies:           make(map[string]proxyState),
+		stoppedChan:       make(chan struct{}),
+		ipNets:            ipNetworks(),
+		ns:                ns,
+		stats:             newRPCStats(statsPrefix),
+		settingsPublisher: settingsPublisher,
+		settingsName:      settingsName,
 	}
 	var (
 		dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
@@ -412,11 +418,11 @@
 		return nil, verror.New(verror.ErrBadArg, s.ctx, "failed to create any listeners")
 	}
 
-	if roaming && s.dhcpState == nil && listenSpec.StreamPublisher != nil {
+	if roaming && s.dhcpState == nil && s.settingsPublisher != nil {
 		// Create a dhcp listener if we haven't already done so.
 		dhcp := &dhcpState{
-			name:      listenSpec.StreamName,
-			publisher: listenSpec.StreamPublisher,
+			name:      s.settingsName,
+			publisher: s.settingsPublisher,
 			watchers:  make(map[chan<- rpc.NetworkChange]struct{}),
 		}
 		s.dhcpState = dhcp
@@ -624,20 +630,17 @@
 				s.Unlock()
 				return
 			}
-			var err error
-			var changed []naming.Endpoint
-			switch setting.Name() {
-			case rpc.NewAddrsSetting:
-				changed = s.addAddresses(v)
-			case rpc.RmAddrsSetting:
-				changed, err = s.removeAddresses(v)
-			}
 			change := rpc.NetworkChange{
-				Time:    time.Now(),
-				State:   externalStates[s.state],
-				Setting: setting,
-				Changed: changed,
-				Error:   err,
+				Time:  time.Now(),
+				State: externalStates[s.state],
+			}
+			switch setting.Name() {
+			case NewAddrsSetting:
+				change.Changed = s.addAddresses(v)
+				change.AddedAddrs = v
+			case RmAddrsSetting:
+				change.Changed, change.Error = s.removeAddresses(v)
+				change.RemovedAddrs = v
 			}
 			vlog.VI(2).Infof("rpc: dhcp: change %v", change)
 			for ch, _ := range s.dhcpState.watchers {
@@ -703,14 +706,11 @@
 // to ensure that those addresses are externally reachable.
 func (s *server) addAddresses(addrs []net.Addr) []naming.Endpoint {
 	var added []naming.Endpoint
-	vlog.Infof("HERE WITH %v -> %v", addrs, netstate.ConvertToAddresses(addrs))
 	for _, address := range netstate.ConvertToAddresses(addrs) {
 		if !netstate.IsAccessibleIP(address) {
-			vlog.Infof("RETURN A %v", added)
 			return added
 		}
 		host := getHost(address)
-		vlog.Infof("LISTEN ST: %v", s.listenState)
 		for ls, _ := range s.listenState {
 			if ls != nil && ls.roaming {
 				niep := ls.protoIEP
@@ -724,7 +724,6 @@
 			}
 		}
 	}
-	vlog.Infof("RETURN B %v", added)
 	return added
 }