ipc/server: reimplement roaming.
Change-Id: I75dc91f5ca5ba7f39878eff2c081ad0b03fa05bb
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 7fc7546..11802cd 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -36,21 +36,53 @@
// for communicating from server to client.
)
+// state for each requested listen address
+type listenState struct {
+ protocol, address string
+ ln stream.Listener
+ lep naming.Endpoint
+ lnerr, eperr error
+ roaming bool
+ // We keep track of all of the endpoints, the port and a copy of
+ // the original listen endpoint for use with roaming network changes.
+ ieps []*inaming.Endpoint // list of currently active eps
+ port string // port to use for creating new eps
+ protoIEP inaming.Endpoint // endpoint to use as template for new eps (includes rid, versions etc)
+}
+
+// state for each requested proxy
+type proxyState struct {
+ endpoint naming.Endpoint
+ err verror.E
+}
+
+type dhcpState struct {
+ name string
+ publisher *config.Publisher
+ stream *config.Stream
+ ch chan config.Setting // channel to receive dhcp settings over
+ err error // error status.
+ watchers map[chan<- ipc.NetworkChange]struct{}
+}
+
type server struct {
sync.Mutex
- state serverState // track state of the server.
- ctx *context.T // context used by the server to make internal RPCs.
+ // context used by the server to make internal RPCs, error messages etc.
+ ctx *context.T
cancel context.CancelFunc // function to cancel the above context.
+ state serverState // track state of the server.
streamMgr stream.Manager // stream manager to listen for new flows.
publisher publisher.Publisher // publisher to publish mounttable mounts.
listenerOpts []stream.ListenerOpt // listener opts for Listen.
+ dhcpState *dhcpState // dhcpState, nil if not using dhcp
- // listeners created by Listen for servers and proxies
- listeners map[stream.Listener]struct{}
- // dhcpListeners created by Listen.
- dhcpListeners map[*dhcpListener]struct{}
+ // maps that contain state on listeners.
+ listenState map[*listenState]struct{}
+ listeners map[stream.Listener]struct{}
+
// state of proxies keyed by the name of the proxy
proxies map[string]proxyState
+
// all endpoints generated and returned by this server
endpoints []naming.Endpoint
@@ -65,6 +97,7 @@
ipNets []*net.IPNet
ns naming.Namespace
servesMountTable bool
+
// TODO(cnicolaou): add roaming stats to ipcStats
stats *ipcStats // stats for this server.
}
@@ -118,21 +151,6 @@
var _ ipc.Server = (*server)(nil)
-type dhcpListener struct {
- sync.Mutex
- publisher *config.Publisher // publisher used to fork the stream
- name string // name of the publisher stream
- eps []*inaming.Endpoint // endpoint returned after listening
- pubAddrs []ipc.Address // addresses to publish
- pubPort string // port to use with the publish addresses
- ch chan config.Setting // channel to receive settings over
-}
-
-type proxyState struct {
- endpoint naming.Endpoint
- err verror.E
-}
-
// This option is used to sort and filter the endpoints when resolving the
// proxy name from a mounttable.
type PreferredServerResolveProtocols []string
@@ -152,17 +170,18 @@
ctx, _ = vtrace.SetNewSpan(ctx, "NewServer")
statsPrefix := naming.Join("ipc", "server", "routing-id", streamMgr.RoutingID().String())
s := &server{
- ctx: ctx,
- cancel: cancel,
- streamMgr: streamMgr,
- publisher: publisher.New(ctx, ns, publishPeriod),
- listeners: make(map[stream.Listener]struct{}),
- proxies: make(map[string]proxyState),
- dhcpListeners: make(map[*dhcpListener]struct{}),
- stoppedChan: make(chan struct{}),
- ipNets: ipNetworks(),
- ns: ns,
- stats: newIPCStats(statsPrefix),
+
+ ctx: ctx,
+ cancel: cancel,
+ streamMgr: streamMgr,
+ publisher: publisher.New(ctx, ns, publishPeriod),
+ listenState: make(map[*listenState]struct{}),
+ listeners: make(map[stream.Listener]struct{}),
+ proxies: make(map[string]proxyState),
+ stoppedChan: make(chan struct{}),
+ ipNets: ipNetworks(),
+ ns: ns,
+ stats: newIPCStats(statsPrefix),
}
var (
principal security.Principal
@@ -210,8 +229,18 @@
status.State = externalStates[s.state]
status.ServesMountTable = s.servesMountTable
status.Mounts = s.publisher.Status()
- status.Endpoints = make([]naming.Endpoint, len(s.endpoints))
- copy(status.Endpoints, s.endpoints)
+ status.Endpoints = []naming.Endpoint{}
+ for ls, _ := range s.listenState {
+ if ls.eperr != nil {
+ status.Errors = append(status.Errors, ls.eperr)
+ }
+ if ls.lnerr != nil {
+ status.Errors = append(status.Errors, ls.lnerr)
+ }
+ for _, iep := range ls.ieps {
+ status.Endpoints = append(status.Endpoints, iep)
+ }
+ }
status.Proxies = make([]ipc.ProxyStatus, 0, len(s.proxies))
for k, v := range s.proxies {
status.Proxies = append(status.Proxies, ipc.ProxyStatus{k, v.endpoint, v.err})
@@ -219,6 +248,24 @@
return status
}
+func (s *server) WatchNetwork(ch chan<- ipc.NetworkChange) {
+ defer vlog.LogCall()()
+ s.Lock()
+ defer s.Unlock()
+ if s.dhcpState != nil {
+ s.dhcpState.watchers[ch] = struct{}{}
+ }
+}
+
+func (s *server) UnwatchNetwork(ch chan<- ipc.NetworkChange) {
+ defer vlog.LogCall()()
+ s.Lock()
+ defer s.Unlock()
+ if s.dhcpState != nil {
+ delete(s.dhcpState.watchers, ch)
+ }
+}
+
// resolveToEndpoint resolves an object name or address to an endpoint.
func (s *server) resolveToEndpoint(address string) (string, error) {
var resolved *naming.MountEntry
@@ -249,56 +296,24 @@
return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
}
-func addrFromIP(ip net.IP) ipc.Address {
- return &netstate.AddrIfc{
- Addr: &net.IPAddr{IP: ip},
- }
-}
-
-/*
-// getIPRoamingAddrs finds an appropriate set of addresss to publish
-// externally and also determines if it's sensible to allow roaming.
-// It returns the host address of the first suitable address that
-// can be used and the port number that can be used with all addresses.
-// The host is required to allow the caller to construct an endpoint
-// that can be returned to the caller of Listen.
-func (s *server) getIPRoamingAddrs(chooser ipc.AddressChooser, iep *inaming.Endpoint) (addresses []ipc.Address, host string, port string, roaming bool, err error) {
- host, port, err = net.SplitHostPort(iep.Address)
- if err != nil {
- return nil, "", "", false, err
- }
- ip := net.ParseIP(host)
- if ip == nil {
- return nil, "", "", false, fmt.Errorf("failed to parse %q as an IP host", host)
- }
- if ip.IsUnspecified() && chooser != nil {
- // Need to find a usable IP address since the call to listen
- // didn't specify one.
- if addrs, err := netstate.GetAccessibleIPs(); err == nil {
- if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
- phost := a[0].Address().String()
- iep.Address = net.JoinHostPort(phost, port)
- return a, phost, port, true, nil
- }
- }
- return []ipc.Address{addrFromIP(ip)}, host, port, true, nil
- }
- // Listen used a fixed IP address, which we take to mean that
- // roaming is not desired.
- return []ipc.Address{addrFromIP(ip)}, host, port, false, nil
-}
-*/
-
// getPossbileAddrs returns an appropriate set of addresses that could be used
// to contact the supplied protocol, host, port parameters using the supplied
// chooser function. It returns an indication of whether the supplied address
// was fully specified or not, returning false if the address was fully
// specified, and true if it was not.
func getPossibleAddrs(protocol, host, port string, chooser ipc.AddressChooser) ([]ipc.Address, bool, error) {
+
ip := net.ParseIP(host)
if ip == nil {
return nil, false, fmt.Errorf("failed to parse %q as an IP host", host)
}
+
+ addrFromIP := func(ip net.IP) ipc.Address {
+ return &netstate.AddrIfc{
+ Addr: &net.IPAddr{IP: ip},
+ }
+ }
+
if ip.IsUnspecified() {
if chooser != nil {
// Need to find a usable IP address since the call to listen
@@ -318,128 +333,57 @@
}
// createEndpoints creates appropriate inaming.Endpoint instances for
-// all of the externally accessible networrk addresses that can be used
+// all of the externally accessible network addresses that can be used
// to reach this server.
-func (s *server) createEndpoints(lep naming.Endpoint, chooser ipc.AddressChooser) ([]*inaming.Endpoint, bool, error) {
+func (s *server) createEndpoints(lep naming.Endpoint, chooser ipc.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
iep, ok := lep.(*inaming.Endpoint)
if !ok {
- return nil, false, fmt.Errorf("internal type conversion error for %T", lep)
+ return nil, "", false, fmt.Errorf("internal type conversion error for %T", lep)
}
if !strings.HasPrefix(iep.Protocol, "tcp") &&
!strings.HasPrefix(iep.Protocol, "ws") {
- // If not tcp or ws, just return the endpoint we were given.
- return []*inaming.Endpoint{iep}, false, nil
+ // If not tcp, ws, or wsh, just return the endpoint we were given.
+ return []*inaming.Endpoint{iep}, "", false, nil
}
host, port, err := net.SplitHostPort(iep.Address)
if err != nil {
- return nil, false, err
+ return nil, "", false, err
}
addrs, unspecified, err := getPossibleAddrs(iep.Protocol, host, port, chooser)
if err != nil {
- return nil, false, err
+ return nil, port, false, err
}
ieps := make([]*inaming.Endpoint, 0, len(addrs))
for _, addr := range addrs {
n, err := inaming.NewEndpoint(lep.String())
if err != nil {
- return nil, false, err
+ return nil, port, false, err
}
n.IsMountTable = s.servesMountTable
- //n.Protocol = addr.Address().Network()
n.Address = net.JoinHostPort(addr.Address().String(), port)
ieps = append(ieps, n)
}
- return ieps, unspecified, nil
-}
-
-/*
-// configureEPAndRoaming configures the endpoint by filling in its Address
-// portion with the appropriately selected network address, it also
-// returns an indication of whether this endpoint is appropriate for
-// roaming and the set of addresses that should be published.
-func (s *server) configureEPAndRoaming(spec ipc.ListenSpec, ep naming.Endpoint) (bool, []ipc.Address, *inaming.Endpoint, error) {
- iep, ok := ep.(*inaming.Endpoint)
- if !ok {
- return false, nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
- }
- if !strings.HasPrefix(spec.Addrs[0].Protocol, "tcp") &&
- !strings.HasPrefix(spec.Addrs[0].Protocol, "ws") {
- return false, nil, iep, nil
- }
- pubAddrs, pubHost, pubPort, roaming, err := s.getIPRoamingAddrs(spec.AddressChooser, iep)
- if err != nil {
- return false, nil, iep, err
- }
- iep.Address = net.JoinHostPort(pubHost, pubPort)
- return roaming, pubAddrs, iep, nil
-}
-*/
-
-// TODO(cnicolaou): get rid of this in a subsequent CL - it's not used
-// and it's not clear it's needed.
-type listenError struct {
- err verror.E
- errors map[struct{ Protocol, Address string }]error
-}
-
-func newError() *listenError {
- return &listenError{errors: make(map[struct{ Protocol, Address string }]error)}
-}
-
-func ErrorDetails(le *listenError) map[struct{ Protocol, Address string }]error {
- return le.errors
-}
-
-// Implements error
-func (le *listenError) Error() string {
- s := le.err.Error()
- for k, v := range le.errors {
- s += fmt.Sprintf("(%s,%s:%s) ", k.Protocol, k.Address, v)
- }
- return strings.TrimRight(s, " ")
-}
-
-func (le *listenError) ErrorID() old_verror.ID {
- return le.err.ErrorID()
-}
-
-func (le *listenError) Action() verror.ActionCode {
- return le.err.Action()
-}
-
-func (le *listenError) Params() []interface{} {
- return le.err.Params()
-}
-
-func (le *listenError) HasMessage() bool {
- return le.err.HasMessage()
-}
-
-func (le *listenError) Stack() verror.PCs {
- return le.err.Stack()
-}
-
-func (s *server) newBadState(m string) *listenError {
- return &listenError{err: verror.Make(verror.BadState, s.ctx, m)}
-}
-
-func (s *server) newBadArg(m string) *listenError {
- return &listenError{err: verror.Make(verror.BadArg, s.ctx, m)}
+ return ieps, port, unspecified, nil
}
func (s *server) Listen(listenSpec ipc.ListenSpec) ([]naming.Endpoint, error) {
defer vlog.LogCall()()
+ useProxy := len(listenSpec.Proxy) > 0
+ if !useProxy && len(listenSpec.Addrs) == 0 {
+ return nil, verror.Make(verror.BadArg, s.ctx, "ListenSpec contains no proxy or addresses to listen on")
+ }
+
s.Lock()
defer s.Unlock()
+
if err := s.allowed(listening, "Listen"); err != nil {
return nil, err
}
- useProxy := len(listenSpec.Proxy) > 0
-
- // Start the proxy as early as possible.
- if useProxy {
+ // Start the proxy as early as possible, ignore duplicate requests
+ // for the same proxy.
+ if _, inuse := s.proxies[listenSpec.Proxy]; useProxy && !inuse {
// We have a goroutine for listening on proxy connections.
s.active.Add(1)
go func() {
@@ -448,96 +392,78 @@
}()
}
- var ieps []*inaming.Endpoint
-
- type lnInfo struct {
- ln stream.Listener
- ep naming.Endpoint
- }
- linfo := []lnInfo{}
- closeAll := func(lni []lnInfo) {
- for _, li := range lni {
- li.ln.Close()
- }
- }
-
roaming := false
+ lnState := make([]*listenState, 0, len(listenSpec.Addrs))
for _, addr := range listenSpec.Addrs {
if len(addr.Address) > 0 {
- // Listen if we have a local address to listen on. Some situations
- // just need a proxy (e.g. a browser extension).
- tmpln, lep, err := s.streamMgr.Listen(addr.Protocol, addr.Address, s.listenerOpts...)
- if err != nil {
- closeAll(linfo)
- vlog.Errorf("ipc: Listen on %s failed: %s", addr, err)
- return nil, err
+ // Listen if we have a local address to listen on.
+ ls := &listenState{
+ protocol: addr.Protocol,
+ address: addr.Address,
}
- linfo = append(linfo, lnInfo{tmpln, lep})
- tmpieps, tmpRoaming, err := s.createEndpoints(lep, listenSpec.AddressChooser)
- if err != nil {
- closeAll(linfo)
- return nil, err
+ ls.ln, ls.lep, ls.lnerr = s.streamMgr.Listen(addr.Protocol, addr.Address, s.listenerOpts...)
+ lnState = append(lnState, ls)
+ if ls.lnerr != nil {
+ continue
}
- ieps = append(ieps, tmpieps...)
- if tmpRoaming {
+ ls.ieps, ls.port, ls.roaming, ls.eperr = s.createEndpoints(ls.lep, listenSpec.AddressChooser)
+ if ls.roaming && ls.eperr == nil {
+ ls.protoIEP = *ls.lep.(*inaming.Endpoint)
roaming = true
}
}
}
- // TODO(cnicolaou): write a test for all of these error cases.
- if len(ieps) == 0 {
- if useProxy {
- return nil, nil
+ found := false
+ for _, ls := range lnState {
+ if ls.ln != nil {
+ found = true
+ break
}
- // no proxy.
- if len(listenSpec.Addrs) > 0 {
- // TODO(cnicolaou): should be verror2
- return nil, fmt.Errorf("no endpoints")
- }
- // TODO(cnicolaou): should be verror2
- return nil, fmt.Errorf("no proxy and no addresses requested")
+ }
+ if !found && !useProxy {
+ return nil, verror.Make(verror.BadArg, s.ctx, "failed to create any listeners")
}
- // TODO(cnicolaou): return all of the eps and their errors, then again,
- // it's not clear we need to....
-
- if roaming && listenSpec.StreamPublisher != nil {
- // TODO(cnicolaou): renable roaming in a followup CL.
- /*
- var dhcpl *dhcpListener
- streamName := listenSpec.StreamName
- ch := make(chan config.Setting)
- if _, err := publisher.ForkStream(streamName, ch); err != nil {
- return ieps[0], fmt.Errorf("failed to fork stream %q: %s", streamName, err)
- }
- dhcpl = &dhcpListener{eps: ieps, pubAddrs: pubAddrs, ch: ch, name: streamName, publisher: publisher}, iep, nil
+ if roaming && s.dhcpState == nil && listenSpec.StreamPublisher != nil {
+ // Create a dhcp listener if we haven't already done so.
+ dhcp := &dhcpState{
+ name: listenSpec.StreamName,
+ publisher: listenSpec.StreamPublisher,
+ watchers: make(map[chan<- ipc.NetworkChange]struct{}),
+ }
+ s.dhcpState = dhcp
+ dhcp.ch = make(chan config.Setting, 10)
+ dhcp.stream, dhcp.err = dhcp.publisher.ForkStream(dhcp.name, dhcp.ch)
+ if dhcp.err == nil {
// We have a goroutine to listen for dhcp changes.
s.active.Add(1)
go func() {
- s.dhcpLoop(dhcpl)
+ s.dhcpLoop(dhcp.ch)
s.active.Done()
}()
- s.dhcpListeners[dhcpl] = struct{}{}
- */
+ }
}
- for _, li := range linfo {
- s.listeners[li.ln] = struct{}{}
- // We have a goroutine per listener to accept new flows.
- // Each flow is served from its own goroutine.
- s.active.Add(1)
- go func(ln stream.Listener, ep naming.Endpoint) {
- s.listenLoop(ln, ep)
- s.active.Done()
- }(li.ln, li.ep)
+ eps := make([]naming.Endpoint, 0, 10)
+ for _, ls := range lnState {
+ s.listenState[ls] = struct{}{}
+ if ls.ln != nil {
+ // We have a goroutine per listener to accept new flows.
+ // Each flow is served from its own goroutine.
+ s.active.Add(1)
+ go func(ln stream.Listener, ep naming.Endpoint) {
+ s.listenLoop(ln, ep)
+ s.active.Done()
+ }(ls.ln, ls.lep)
+ }
+
+ for _, iep := range ls.ieps {
+ s.publisher.AddServer(iep.String(), s.servesMountTable)
+ eps = append(eps, iep)
+ }
}
- eps := make([]naming.Endpoint, len(ieps))
- for i, iep := range ieps {
- s.publisher.AddServer(iep.String(), s.servesMountTable)
- eps[i] = iep
- s.endpoints = append(s.endpoints, iep)
- }
+
return eps, nil
}
@@ -556,7 +482,6 @@
return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
}
s.Lock()
- s.listeners[ln] = struct{}{}
s.proxies[proxy] = proxyState{iep, nil}
s.Unlock()
s.publisher.AddServer(iep.String(), s.servesMountTable)
@@ -590,7 +515,14 @@
// (1) Unpublish its name
s.publisher.RemoveServer(iep.String())
s.Lock()
- s.proxies[proxy] = proxyState{iep, verror.Make(verror.NoServers, nil, err)}
+ if err != nil {
+ s.proxies[proxy] = proxyState{iep, verror.Make(verror.NoServers, s.ctx, err)}
+ } else {
+ // err will be nill if we're stopping.
+ s.proxies[proxy] = proxyState{iep, nil}
+ s.Unlock()
+ return
+ }
s.Unlock()
}
@@ -624,14 +556,44 @@
}
}
+// addListener adds the supplied listener taking care to
+// check to see if we're already stopping. It returns true
+// if the listener was added.
+func (s *server) addListener(ln stream.Listener) bool {
+ s.Lock()
+ defer s.Unlock()
+ if s.isStopState() {
+ return false
+ }
+ s.listeners[ln] = struct{}{}
+ return true
+}
+
+// rmListener removes the supplied listener taking care to
+// check if we're already stopping. It returns true if the
+// listener was removed.
+func (s *server) rmListener(ln stream.Listener) bool {
+ s.Lock()
+ defer s.Unlock()
+ if s.isStopState() {
+ return false
+ }
+ delete(s.listeners, ln)
+ return true
+}
+
func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) error {
defer vlog.VI(1).Infof("ipc: Stopped listening on %s", ep)
var calls sync.WaitGroup
+
+ if !s.addListener(ln) {
+ // We're stopping.
+ return nil
+ }
+
defer func() {
calls.Wait()
- s.Lock()
- delete(s.listeners, ln)
- s.Unlock()
+ s.rmListener(ln)
}()
for {
flow, err := ln.Accept()
@@ -658,56 +620,117 @@
}
}
-/*
-func (s *server) applyChange(dhcpl *dhcpListener, addrs []net.Addr, fn func(string)) {
- dhcpl.Lock()
- defer dhcpl.Unlock()
- for _, a := range addrs {
- if ip := netstate.AsIP(a); ip != nil {
- dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.pubPort)
- fn(dhcpl.ep.String())
- }
- }
-}
-
-func (s *server) dhcpLoop(dhcpl *dhcpListener) {
- defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
+func (s *server) dhcpLoop(ch chan config.Setting) {
+ defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes")
vlog.VI(2).Infof("ipc: dhcp loop")
-
- ep := *dhcpl.ep
- // Publish all of the addresses
- for _, pubAddr := range dhcpl.pubAddrs {
- ep.Address = net.JoinHostPort(pubAddr.Address().String(), dhcpl.pubPort)
- s.publisher.AddServer(ep.String(), s.servesMountTable)
- }
-
- for setting := range dhcpl.ch {
+ for setting := range ch {
if setting == nil {
return
}
switch v := setting.Value().(type) {
- case bool:
- return
- case []net.Addr:
+ case []ipc.Address:
s.Lock()
- if s.stopped {
+ if s.isStopState() {
s.Unlock()
return
}
- publisher := s.publisher
- s.Unlock()
+ var err error
+ var changed []naming.Endpoint
switch setting.Name() {
case ipc.NewAddrsSetting:
- vlog.Infof("Added some addresses: %q", v)
- s.applyChange(dhcpl, v, func(name string) { publisher.AddServer(name, s.servesMountTable) })
+ changed = s.addAddresses(v)
case ipc.RmAddrsSetting:
- vlog.Infof("Removed some addresses: %q", v)
- s.applyChange(dhcpl, v, publisher.RemoveServer)
+ changed, err = s.removeAddresses(v)
}
+ change := ipc.NetworkChange{
+ Time: time.Now(),
+ State: externalStates[s.state],
+ Setting: setting,
+ Changed: changed,
+ Error: err,
+ }
+ vlog.VI(2).Infof("ipc: dhcp: change %v", change)
+ for ch, _ := range s.dhcpState.watchers {
+ select {
+ case ch <- change:
+ default:
+ }
+ }
+ s.Unlock()
+ default:
+ vlog.Errorf("ipc: dhcpLoop: unhandled setting type %T", v)
}
}
}
-*/
+
+func getHost(address ipc.Address) string {
+ host, _, err := net.SplitHostPort(address.Address().String())
+ if err == nil {
+ return host
+ }
+ return address.Address().String()
+
+}
+
+// Remove all endpoints that have the same host address as the supplied
+// address parameter.
+func (s *server) removeAddresses(addresses []ipc.Address) ([]naming.Endpoint, error) {
+ var removed []naming.Endpoint
+ for _, address := range addresses {
+ host := getHost(address)
+ for ls, _ := range s.listenState {
+ if ls != nil && ls.roaming && len(ls.ieps) > 0 {
+ remaining := make([]*inaming.Endpoint, 0, len(ls.ieps))
+ for _, iep := range ls.ieps {
+ lnHost, _, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ lnHost = iep.Address
+ }
+ if lnHost == host {
+ vlog.VI(2).Infof("ipc: dhcp removing: %s", iep)
+ removed = append(removed, iep)
+ s.publisher.RemoveServer(iep.String())
+ continue
+ }
+ remaining = append(remaining, iep)
+ }
+ ls.ieps = remaining
+ }
+ }
+ }
+ return removed, nil
+}
+
+// Add new endpoints for the new address. There is no way to know with
+// 100% confidence which new endpoints to publish without shutting down
+// all network connections and reinitializing everything from scratch.
+// Instead, we find all roaming listeners with at least one endpoint
+// and create a new endpoint with the same port as the existing ones
+// but with the new address supplied to us to by the dhcp code. As
+// an additional safeguard we reject the new address if it is not
+// externally accessible.
+// This places the onus on the dhcp/roaming code that sends us addresses
+// to ensure that those addresses are externally reachable.
+func (s *server) addAddresses(addresses []ipc.Address) []naming.Endpoint {
+ var added []naming.Endpoint
+ for _, address := range addresses {
+ if !netstate.IsAccessibleIP(address) {
+ return added
+ }
+ host := getHost(address)
+ for ls, _ := range s.listenState {
+ if ls != nil && ls.roaming {
+ niep := ls.protoIEP
+ niep.Address = net.JoinHostPort(host, ls.port)
+ ls.ieps = append(ls.ieps, &niep)
+ vlog.VI(2).Infof("ipc: dhcp adding: %s", niep)
+ s.publisher.AddServer(niep.String(), s.servesMountTable)
+ added = append(added, &niep)
+ }
+ }
+ }
+ return added
+}
type leafDispatcher struct {
invoker ipc.Invoker
@@ -724,11 +747,11 @@
func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
defer vlog.LogCall()()
if obj == nil {
- return s.newBadArg("nil object")
+ return verror.Make(verror.BadArg, s.ctx, "nil object")
}
invoker, err := objectToInvoker(obj)
if err != nil {
- return s.newBadArg(fmt.Sprintf("bad object: %v", err))
+ return verror.Make(verror.BadArg, s.ctx, fmt.Sprintf("bad object: %v", err))
}
return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
}
@@ -736,7 +759,7 @@
func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
defer vlog.LogCall()()
if disp == nil {
- return s.newBadArg("nil dispatcher")
+ return verror.Make(verror.BadArg, s.ctx, "nil dispatcher")
}
s.Lock()
defer s.Unlock()
@@ -754,7 +777,7 @@
func (s *server) AddName(name string) error {
defer vlog.LogCall()()
if len(name) == 0 {
- return s.newBadArg("name is empty")
+ return verror.Make(verror.BadArg, s.ctx, "name is empty")
}
s.Lock()
defer s.Unlock()
@@ -818,14 +841,27 @@
}(ln)
}
- for dhcpl, _ := range s.dhcpListeners {
- dhcpl.Lock()
- dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
- dhcpl.ch <- config.NewBool("EOF", "stop", true)
- dhcpl.Unlock()
+ drain := func(ch chan config.Setting) {
+ for {
+ select {
+ case v := <-ch:
+ if v == nil {
+ return
+ }
+ default:
+ close(ch)
+ return
+ }
+ }
+ }
+
+ if dhcp := s.dhcpState; dhcp != nil {
+ dhcp.publisher.CloseFork(dhcp.name, dhcp.ch)
+ drain(dhcp.ch)
}
s.Unlock()
+
var firstErr error
for i := 0; i < nListeners; i++ {
if err := <-errCh; err != nil && firstErr == nil {
@@ -836,7 +872,22 @@
// accepted.
// Wait for the publisher and active listener + flows to finish.
- s.active.Wait()
+ done := make(chan struct{}, 1)
+ go func() { s.active.Wait(); done <- struct{}{} }()
+
+ select {
+ case <-done:
+ case <-time.After(5 * time.Minute):
+ vlog.Errorf("Listener Close Error: %v", firstErr)
+ vlog.Errorf("Timedout waiting for goroutines to stop: listeners: %d", nListeners, len(s.listeners))
+ for ln, _ := range s.listeners {
+ vlog.Errorf("Listener: %p", ln)
+ }
+ for ls, _ := range s.listenState {
+ vlog.Errorf("ListenState: %v", ls)
+ }
+ <-done
+ }
s.Lock()
defer s.Unlock()