v23: break dependency on lib/pubsub
MultiPart: 2/2
Change-Id: Ib989867e978a27a7a892b9f922b76cb9a3a442a0
diff --git a/profiles/internal/rpc/server.go b/profiles/internal/rpc/server.go
index 3516796..102f82a 100644
--- a/profiles/internal/rpc/server.go
+++ b/profiles/internal/rpc/server.go
@@ -83,15 +83,17 @@
type server struct {
sync.Mutex
// context used by the server to make internal RPCs, error messages etc.
- ctx *context.T
- cancel context.CancelFunc // function to cancel the above context.
- state serverState // track state of the server.
- streamMgr stream.Manager // stream manager to listen for new flows.
- publisher publisher.Publisher // publisher to publish mounttable mounts.
- listenerOpts []stream.ListenerOpt // listener opts for Listen.
- dhcpState *dhcpState // dhcpState, nil if not using dhcp
- principal security.Principal
- blessings security.Blessings
+ ctx *context.T
+ cancel context.CancelFunc // function to cancel the above context.
+ state serverState // track state of the server.
+ streamMgr stream.Manager // stream manager to listen for new flows.
+ publisher publisher.Publisher // publisher to publish mounttable mounts.
+ listenerOpts []stream.ListenerOpt // listener opts for Listen.
+ settingsPublisher *pubsub.Publisher // pubsub publisher for dhcp
+ settingsName string // pubwsub stream name for dhcp
+ dhcpState *dhcpState // dhcpState, nil if not using dhcp
+ principal security.Principal
+ blessings security.Blessings
// maps that contain state on listeners.
listenState map[*listenState]struct{}
@@ -173,6 +175,8 @@
ctx *context.T,
streamMgr stream.Manager,
ns namespace.T,
+ settingsPublisher *pubsub.Publisher,
+ settingsName string,
client rpc.Client,
principal security.Principal,
opts ...rpc.ServerOpt) (rpc.Server, error) {
@@ -180,18 +184,20 @@
ctx, _ = vtrace.WithNewSpan(ctx, "NewServer")
statsPrefix := naming.Join("rpc", "server", "routing-id", streamMgr.RoutingID().String())
s := &server{
- ctx: ctx,
- cancel: cancel,
- streamMgr: streamMgr,
- principal: principal,
- publisher: publisher.New(ctx, ns, publishPeriod),
- listenState: make(map[*listenState]struct{}),
- listeners: make(map[stream.Listener]struct{}),
- proxies: make(map[string]proxyState),
- stoppedChan: make(chan struct{}),
- ipNets: ipNetworks(),
- ns: ns,
- stats: newRPCStats(statsPrefix),
+ ctx: ctx,
+ cancel: cancel,
+ streamMgr: streamMgr,
+ principal: principal,
+ publisher: publisher.New(ctx, ns, publishPeriod),
+ listenState: make(map[*listenState]struct{}),
+ listeners: make(map[stream.Listener]struct{}),
+ proxies: make(map[string]proxyState),
+ stoppedChan: make(chan struct{}),
+ ipNets: ipNetworks(),
+ ns: ns,
+ stats: newRPCStats(statsPrefix),
+ settingsPublisher: settingsPublisher,
+ settingsName: settingsName,
}
var (
dischargeExpiryBuffer = vc.DefaultServerDischargeExpiryBuffer
@@ -412,11 +418,11 @@
return nil, verror.New(verror.ErrBadArg, s.ctx, "failed to create any listeners")
}
- if roaming && s.dhcpState == nil && listenSpec.StreamPublisher != nil {
+ if roaming && s.dhcpState == nil && s.settingsPublisher != nil {
// Create a dhcp listener if we haven't already done so.
dhcp := &dhcpState{
- name: listenSpec.StreamName,
- publisher: listenSpec.StreamPublisher,
+ name: s.settingsName,
+ publisher: s.settingsPublisher,
watchers: make(map[chan<- rpc.NetworkChange]struct{}),
}
s.dhcpState = dhcp
@@ -624,20 +630,17 @@
s.Unlock()
return
}
- var err error
- var changed []naming.Endpoint
- switch setting.Name() {
- case rpc.NewAddrsSetting:
- changed = s.addAddresses(v)
- case rpc.RmAddrsSetting:
- changed, err = s.removeAddresses(v)
- }
change := rpc.NetworkChange{
- Time: time.Now(),
- State: externalStates[s.state],
- Setting: setting,
- Changed: changed,
- Error: err,
+ Time: time.Now(),
+ State: externalStates[s.state],
+ }
+ switch setting.Name() {
+ case NewAddrsSetting:
+ change.Changed = s.addAddresses(v)
+ change.AddedAddrs = v
+ case RmAddrsSetting:
+ change.Changed, change.Error = s.removeAddresses(v)
+ change.RemovedAddrs = v
}
vlog.VI(2).Infof("rpc: dhcp: change %v", change)
for ch, _ := range s.dhcpState.watchers {
@@ -703,14 +706,11 @@
// to ensure that those addresses are externally reachable.
func (s *server) addAddresses(addrs []net.Addr) []naming.Endpoint {
var added []naming.Endpoint
- vlog.Infof("HERE WITH %v -> %v", addrs, netstate.ConvertToAddresses(addrs))
for _, address := range netstate.ConvertToAddresses(addrs) {
if !netstate.IsAccessibleIP(address) {
- vlog.Infof("RETURN A %v", added)
return added
}
host := getHost(address)
- vlog.Infof("LISTEN ST: %v", s.listenState)
for ls, _ := range s.listenState {
if ls != nil && ls.roaming {
niep := ls.protoIEP
@@ -724,7 +724,6 @@
}
}
}
- vlog.Infof("RETURN B %v", added)
return added
}