ipc/server: reimplement roaming.
Change-Id: I75dc91f5ca5ba7f39878eff2c081ad0b03fa05bb
diff --git a/profiles/roaming/roaming_server.go b/profiles/roaming/roaming_server.go
index 30874be..e6b6276 100644
--- a/profiles/roaming/roaming_server.go
+++ b/profiles/roaming/roaming_server.go
@@ -4,12 +4,13 @@
import (
"fmt"
+ "log"
"v.io/core/veyron2"
"v.io/core/veyron2/ipc"
"v.io/core/veyron2/vlog"
- "v.io/core/veyron/profiles/roaming"
+ _ "v.io/core/veyron/profiles/roaming"
)
func main() {
@@ -22,25 +23,36 @@
}
listenSpec := veyron2.GetListenSpec(ctx)
-
fmt.Printf("listen spec: %v\n", listenSpec)
- ep, err := server.Listen(listenSpec)
+
+ _, err = server.Listen(listenSpec)
if err != nil {
vlog.Fatalf("unexpected error: %q", err)
}
- if ep != nil {
- fmt.Println(ep)
+ err = server.Serve("roamer", &dummy{}, nil)
+ if err != nil {
+ log.Fatalf("unexpected error: %q", err)
}
- if err := server.Serve("roamer", &receiver{}, nil); err != nil {
- vlog.Fatalf("unexpected error: %q", err)
- }
+ watcher := make(chan ipc.NetworkChange, 1)
+ server.WatchNetwork(watcher)
- done := make(chan struct{})
- <-done
+ for {
+ status := server.Status()
+ fmt.Printf("Endpoints: %d created:\n", len(status.Endpoints))
+ for _, ep := range status.Endpoints {
+ fmt.Printf(" %s\n", ep)
+ }
+ fmt.Printf("Mounts: %d mounts:\n", len(status.Mounts))
+ for _, ms := range status.Mounts {
+ fmt.Printf(" %s\n", ms)
+ }
+ change := <-watcher
+ fmt.Printf("Network change: %s", change.DebugString())
+ }
}
-type receiver struct{}
+type dummy struct{}
-func (d *receiver) Echo(call ipc.ServerContext, arg string) (string, error) {
+func (d *dummy) Echo(call ipc.ServerContext, arg string) (string, error) {
return arg, nil
}
diff --git a/profiles/roaming/roaminginit.go b/profiles/roaming/roaminginit.go
index 45a5f56..ae37ee9 100644
--- a/profiles/roaming/roaminginit.go
+++ b/profiles/roaming/roaminginit.go
@@ -168,7 +168,10 @@
}
// We will always send the best currently available address
if chosen, err := listenSpec.AddressChooser(listenSpec.Addrs[0].Protocol, cur); err == nil && chosen != nil {
+ vlog.VI(2).Infof("Sending added and chosen: %s", chosen)
ch <- ipc.NewAddAddrsSetting(chosen)
+ } else {
+ vlog.VI(2).Infof("Ignoring added %s", added)
}
prev = cur
case <-cleanup:
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 7fc7546..11802cd 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -36,21 +36,53 @@
// for communicating from server to client.
)
+// state for each requested listen address
+type listenState struct {
+ protocol, address string
+ ln stream.Listener
+ lep naming.Endpoint
+ lnerr, eperr error
+ roaming bool
+ // We keep track of all of the endpoints, the port and a copy of
+ // the original listen endpoint for use with roaming network changes.
+ ieps []*inaming.Endpoint // list of currently active eps
+ port string // port to use for creating new eps
+ protoIEP inaming.Endpoint // endpoint to use as template for new eps (includes rid, versions etc)
+}
+
+// state for each requested proxy
+type proxyState struct {
+ endpoint naming.Endpoint
+ err verror.E
+}
+
+type dhcpState struct {
+ name string
+ publisher *config.Publisher
+ stream *config.Stream
+ ch chan config.Setting // channel to receive dhcp settings over
+ err error // error status.
+ watchers map[chan<- ipc.NetworkChange]struct{}
+}
+
type server struct {
sync.Mutex
- state serverState // track state of the server.
- ctx *context.T // context used by the server to make internal RPCs.
+ // context used by the server to make internal RPCs, error messages etc.
+ ctx *context.T
cancel context.CancelFunc // function to cancel the above context.
+ state serverState // track state of the server.
streamMgr stream.Manager // stream manager to listen for new flows.
publisher publisher.Publisher // publisher to publish mounttable mounts.
listenerOpts []stream.ListenerOpt // listener opts for Listen.
+ dhcpState *dhcpState // dhcpState, nil if not using dhcp
- // listeners created by Listen for servers and proxies
- listeners map[stream.Listener]struct{}
- // dhcpListeners created by Listen.
- dhcpListeners map[*dhcpListener]struct{}
+ // maps that contain state on listeners.
+ listenState map[*listenState]struct{}
+ listeners map[stream.Listener]struct{}
+
// state of proxies keyed by the name of the proxy
proxies map[string]proxyState
+
// all endpoints generated and returned by this server
endpoints []naming.Endpoint
@@ -65,6 +97,7 @@
ipNets []*net.IPNet
ns naming.Namespace
servesMountTable bool
+
// TODO(cnicolaou): add roaming stats to ipcStats
stats *ipcStats // stats for this server.
}
@@ -118,21 +151,6 @@
var _ ipc.Server = (*server)(nil)
-type dhcpListener struct {
- sync.Mutex
- publisher *config.Publisher // publisher used to fork the stream
- name string // name of the publisher stream
- eps []*inaming.Endpoint // endpoint returned after listening
- pubAddrs []ipc.Address // addresses to publish
- pubPort string // port to use with the publish addresses
- ch chan config.Setting // channel to receive settings over
-}
-
-type proxyState struct {
- endpoint naming.Endpoint
- err verror.E
-}
-
// This option is used to sort and filter the endpoints when resolving the
// proxy name from a mounttable.
type PreferredServerResolveProtocols []string
@@ -152,17 +170,18 @@
ctx, _ = vtrace.SetNewSpan(ctx, "NewServer")
statsPrefix := naming.Join("ipc", "server", "routing-id", streamMgr.RoutingID().String())
s := &server{
- ctx: ctx,
- cancel: cancel,
- streamMgr: streamMgr,
- publisher: publisher.New(ctx, ns, publishPeriod),
- listeners: make(map[stream.Listener]struct{}),
- proxies: make(map[string]proxyState),
- dhcpListeners: make(map[*dhcpListener]struct{}),
- stoppedChan: make(chan struct{}),
- ipNets: ipNetworks(),
- ns: ns,
- stats: newIPCStats(statsPrefix),
+
+ ctx: ctx,
+ cancel: cancel,
+ streamMgr: streamMgr,
+ publisher: publisher.New(ctx, ns, publishPeriod),
+ listenState: make(map[*listenState]struct{}),
+ listeners: make(map[stream.Listener]struct{}),
+ proxies: make(map[string]proxyState),
+ stoppedChan: make(chan struct{}),
+ ipNets: ipNetworks(),
+ ns: ns,
+ stats: newIPCStats(statsPrefix),
}
var (
principal security.Principal
@@ -210,8 +229,18 @@
status.State = externalStates[s.state]
status.ServesMountTable = s.servesMountTable
status.Mounts = s.publisher.Status()
- status.Endpoints = make([]naming.Endpoint, len(s.endpoints))
- copy(status.Endpoints, s.endpoints)
+ status.Endpoints = []naming.Endpoint{}
+ for ls, _ := range s.listenState {
+ if ls.eperr != nil {
+ status.Errors = append(status.Errors, ls.eperr)
+ }
+ if ls.lnerr != nil {
+ status.Errors = append(status.Errors, ls.lnerr)
+ }
+ for _, iep := range ls.ieps {
+ status.Endpoints = append(status.Endpoints, iep)
+ }
+ }
status.Proxies = make([]ipc.ProxyStatus, 0, len(s.proxies))
for k, v := range s.proxies {
status.Proxies = append(status.Proxies, ipc.ProxyStatus{k, v.endpoint, v.err})
@@ -219,6 +248,24 @@
return status
}
+func (s *server) WatchNetwork(ch chan<- ipc.NetworkChange) {
+ defer vlog.LogCall()()
+ s.Lock()
+ defer s.Unlock()
+ if s.dhcpState != nil {
+ s.dhcpState.watchers[ch] = struct{}{}
+ }
+}
+
+func (s *server) UnwatchNetwork(ch chan<- ipc.NetworkChange) {
+ defer vlog.LogCall()()
+ s.Lock()
+ defer s.Unlock()
+ if s.dhcpState != nil {
+ delete(s.dhcpState.watchers, ch)
+ }
+}
+
// resolveToEndpoint resolves an object name or address to an endpoint.
func (s *server) resolveToEndpoint(address string) (string, error) {
var resolved *naming.MountEntry
@@ -249,56 +296,24 @@
return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
}
-func addrFromIP(ip net.IP) ipc.Address {
- return &netstate.AddrIfc{
- Addr: &net.IPAddr{IP: ip},
- }
-}
-
-/*
-// getIPRoamingAddrs finds an appropriate set of addresss to publish
-// externally and also determines if it's sensible to allow roaming.
-// It returns the host address of the first suitable address that
-// can be used and the port number that can be used with all addresses.
-// The host is required to allow the caller to construct an endpoint
-// that can be returned to the caller of Listen.
-func (s *server) getIPRoamingAddrs(chooser ipc.AddressChooser, iep *inaming.Endpoint) (addresses []ipc.Address, host string, port string, roaming bool, err error) {
- host, port, err = net.SplitHostPort(iep.Address)
- if err != nil {
- return nil, "", "", false, err
- }
- ip := net.ParseIP(host)
- if ip == nil {
- return nil, "", "", false, fmt.Errorf("failed to parse %q as an IP host", host)
- }
- if ip.IsUnspecified() && chooser != nil {
- // Need to find a usable IP address since the call to listen
- // didn't specify one.
- if addrs, err := netstate.GetAccessibleIPs(); err == nil {
- if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
- phost := a[0].Address().String()
- iep.Address = net.JoinHostPort(phost, port)
- return a, phost, port, true, nil
- }
- }
- return []ipc.Address{addrFromIP(ip)}, host, port, true, nil
- }
- // Listen used a fixed IP address, which we take to mean that
- // roaming is not desired.
- return []ipc.Address{addrFromIP(ip)}, host, port, false, nil
-}
-*/
-
// getPossbileAddrs returns an appropriate set of addresses that could be used
// to contact the supplied protocol, host, port parameters using the supplied
// chooser function. It returns an indication of whether the supplied address
// was fully specified or not, returning false if the address was fully
// specified, and true if it was not.
func getPossibleAddrs(protocol, host, port string, chooser ipc.AddressChooser) ([]ipc.Address, bool, error) {
+
ip := net.ParseIP(host)
if ip == nil {
return nil, false, fmt.Errorf("failed to parse %q as an IP host", host)
}
+
+ addrFromIP := func(ip net.IP) ipc.Address {
+ return &netstate.AddrIfc{
+ Addr: &net.IPAddr{IP: ip},
+ }
+ }
+
if ip.IsUnspecified() {
if chooser != nil {
// Need to find a usable IP address since the call to listen
@@ -318,128 +333,57 @@
}
// createEndpoints creates appropriate inaming.Endpoint instances for
-// all of the externally accessible networrk addresses that can be used
+// all of the externally accessible network addresses that can be used
// to reach this server.
-func (s *server) createEndpoints(lep naming.Endpoint, chooser ipc.AddressChooser) ([]*inaming.Endpoint, bool, error) {
+func (s *server) createEndpoints(lep naming.Endpoint, chooser ipc.AddressChooser) ([]*inaming.Endpoint, string, bool, error) {
iep, ok := lep.(*inaming.Endpoint)
if !ok {
- return nil, false, fmt.Errorf("internal type conversion error for %T", lep)
+ return nil, "", false, fmt.Errorf("internal type conversion error for %T", lep)
}
if !strings.HasPrefix(iep.Protocol, "tcp") &&
!strings.HasPrefix(iep.Protocol, "ws") {
- // If not tcp or ws, just return the endpoint we were given.
- return []*inaming.Endpoint{iep}, false, nil
+ // If not tcp, ws, or wsh, just return the endpoint we were given.
+ return []*inaming.Endpoint{iep}, "", false, nil
}
host, port, err := net.SplitHostPort(iep.Address)
if err != nil {
- return nil, false, err
+ return nil, "", false, err
}
addrs, unspecified, err := getPossibleAddrs(iep.Protocol, host, port, chooser)
if err != nil {
- return nil, false, err
+ return nil, port, false, err
}
ieps := make([]*inaming.Endpoint, 0, len(addrs))
for _, addr := range addrs {
n, err := inaming.NewEndpoint(lep.String())
if err != nil {
- return nil, false, err
+ return nil, port, false, err
}
n.IsMountTable = s.servesMountTable
- //n.Protocol = addr.Address().Network()
n.Address = net.JoinHostPort(addr.Address().String(), port)
ieps = append(ieps, n)
}
- return ieps, unspecified, nil
-}
-
-/*
-// configureEPAndRoaming configures the endpoint by filling in its Address
-// portion with the appropriately selected network address, it also
-// returns an indication of whether this endpoint is appropriate for
-// roaming and the set of addresses that should be published.
-func (s *server) configureEPAndRoaming(spec ipc.ListenSpec, ep naming.Endpoint) (bool, []ipc.Address, *inaming.Endpoint, error) {
- iep, ok := ep.(*inaming.Endpoint)
- if !ok {
- return false, nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
- }
- if !strings.HasPrefix(spec.Addrs[0].Protocol, "tcp") &&
- !strings.HasPrefix(spec.Addrs[0].Protocol, "ws") {
- return false, nil, iep, nil
- }
- pubAddrs, pubHost, pubPort, roaming, err := s.getIPRoamingAddrs(spec.AddressChooser, iep)
- if err != nil {
- return false, nil, iep, err
- }
- iep.Address = net.JoinHostPort(pubHost, pubPort)
- return roaming, pubAddrs, iep, nil
-}
-*/
-
-// TODO(cnicolaou): get rid of this in a subsequent CL - it's not used
-// and it's not clear it's needed.
-type listenError struct {
- err verror.E
- errors map[struct{ Protocol, Address string }]error
-}
-
-func newError() *listenError {
- return &listenError{errors: make(map[struct{ Protocol, Address string }]error)}
-}
-
-func ErrorDetails(le *listenError) map[struct{ Protocol, Address string }]error {
- return le.errors
-}
-
-// Implements error
-func (le *listenError) Error() string {
- s := le.err.Error()
- for k, v := range le.errors {
- s += fmt.Sprintf("(%s,%s:%s) ", k.Protocol, k.Address, v)
- }
- return strings.TrimRight(s, " ")
-}
-
-func (le *listenError) ErrorID() old_verror.ID {
- return le.err.ErrorID()
-}
-
-func (le *listenError) Action() verror.ActionCode {
- return le.err.Action()
-}
-
-func (le *listenError) Params() []interface{} {
- return le.err.Params()
-}
-
-func (le *listenError) HasMessage() bool {
- return le.err.HasMessage()
-}
-
-func (le *listenError) Stack() verror.PCs {
- return le.err.Stack()
-}
-
-func (s *server) newBadState(m string) *listenError {
- return &listenError{err: verror.Make(verror.BadState, s.ctx, m)}
-}
-
-func (s *server) newBadArg(m string) *listenError {
- return &listenError{err: verror.Make(verror.BadArg, s.ctx, m)}
+ return ieps, port, unspecified, nil
}
func (s *server) Listen(listenSpec ipc.ListenSpec) ([]naming.Endpoint, error) {
defer vlog.LogCall()()
+ useProxy := len(listenSpec.Proxy) > 0
+ if !useProxy && len(listenSpec.Addrs) == 0 {
+ return nil, verror.Make(verror.BadArg, s.ctx, "ListenSpec contains no proxy or addresses to listen on")
+ }
+
s.Lock()
defer s.Unlock()
+
if err := s.allowed(listening, "Listen"); err != nil {
return nil, err
}
- useProxy := len(listenSpec.Proxy) > 0
-
- // Start the proxy as early as possible.
- if useProxy {
+ // Start the proxy as early as possible, ignore duplicate requests
+ // for the same proxy.
+ if _, inuse := s.proxies[listenSpec.Proxy]; useProxy && !inuse {
// We have a goroutine for listening on proxy connections.
s.active.Add(1)
go func() {
@@ -448,96 +392,78 @@
}()
}
- var ieps []*inaming.Endpoint
-
- type lnInfo struct {
- ln stream.Listener
- ep naming.Endpoint
- }
- linfo := []lnInfo{}
- closeAll := func(lni []lnInfo) {
- for _, li := range lni {
- li.ln.Close()
- }
- }
-
roaming := false
+ lnState := make([]*listenState, 0, len(listenSpec.Addrs))
for _, addr := range listenSpec.Addrs {
if len(addr.Address) > 0 {
- // Listen if we have a local address to listen on. Some situations
- // just need a proxy (e.g. a browser extension).
- tmpln, lep, err := s.streamMgr.Listen(addr.Protocol, addr.Address, s.listenerOpts...)
- if err != nil {
- closeAll(linfo)
- vlog.Errorf("ipc: Listen on %s failed: %s", addr, err)
- return nil, err
+ // Listen if we have a local address to listen on.
+ ls := &listenState{
+ protocol: addr.Protocol,
+ address: addr.Address,
}
- linfo = append(linfo, lnInfo{tmpln, lep})
- tmpieps, tmpRoaming, err := s.createEndpoints(lep, listenSpec.AddressChooser)
- if err != nil {
- closeAll(linfo)
- return nil, err
+ ls.ln, ls.lep, ls.lnerr = s.streamMgr.Listen(addr.Protocol, addr.Address, s.listenerOpts...)
+ lnState = append(lnState, ls)
+ if ls.lnerr != nil {
+ continue
}
- ieps = append(ieps, tmpieps...)
- if tmpRoaming {
+ ls.ieps, ls.port, ls.roaming, ls.eperr = s.createEndpoints(ls.lep, listenSpec.AddressChooser)
+ if ls.roaming && ls.eperr == nil {
+ ls.protoIEP = *ls.lep.(*inaming.Endpoint)
roaming = true
}
}
}
- // TODO(cnicolaou): write a test for all of these error cases.
- if len(ieps) == 0 {
- if useProxy {
- return nil, nil
+ found := false
+ for _, ls := range lnState {
+ if ls.ln != nil {
+ found = true
+ break
}
- // no proxy.
- if len(listenSpec.Addrs) > 0 {
- // TODO(cnicolaou): should be verror2
- return nil, fmt.Errorf("no endpoints")
- }
- // TODO(cnicolaou): should be verror2
- return nil, fmt.Errorf("no proxy and no addresses requested")
+ }
+ if !found && !useProxy {
+ return nil, verror.Make(verror.BadArg, s.ctx, "failed to create any listeners")
}
- // TODO(cnicolaou): return all of the eps and their errors, then again,
- // it's not clear we need to....
-
- if roaming && listenSpec.StreamPublisher != nil {
- // TODO(cnicolaou): renable roaming in a followup CL.
- /*
- var dhcpl *dhcpListener
- streamName := listenSpec.StreamName
- ch := make(chan config.Setting)
- if _, err := publisher.ForkStream(streamName, ch); err != nil {
- return ieps[0], fmt.Errorf("failed to fork stream %q: %s", streamName, err)
- }
- dhcpl = &dhcpListener{eps: ieps, pubAddrs: pubAddrs, ch: ch, name: streamName, publisher: publisher}, iep, nil
+ if roaming && s.dhcpState == nil && listenSpec.StreamPublisher != nil {
+ // Create a dhcp listener if we haven't already done so.
+ dhcp := &dhcpState{
+ name: listenSpec.StreamName,
+ publisher: listenSpec.StreamPublisher,
+ watchers: make(map[chan<- ipc.NetworkChange]struct{}),
+ }
+ s.dhcpState = dhcp
+ dhcp.ch = make(chan config.Setting, 10)
+ dhcp.stream, dhcp.err = dhcp.publisher.ForkStream(dhcp.name, dhcp.ch)
+ if dhcp.err == nil {
// We have a goroutine to listen for dhcp changes.
s.active.Add(1)
go func() {
- s.dhcpLoop(dhcpl)
+ s.dhcpLoop(dhcp.ch)
s.active.Done()
}()
- s.dhcpListeners[dhcpl] = struct{}{}
- */
+ }
}
- for _, li := range linfo {
- s.listeners[li.ln] = struct{}{}
- // We have a goroutine per listener to accept new flows.
- // Each flow is served from its own goroutine.
- s.active.Add(1)
- go func(ln stream.Listener, ep naming.Endpoint) {
- s.listenLoop(ln, ep)
- s.active.Done()
- }(li.ln, li.ep)
+ eps := make([]naming.Endpoint, 0, 10)
+ for _, ls := range lnState {
+ s.listenState[ls] = struct{}{}
+ if ls.ln != nil {
+ // We have a goroutine per listener to accept new flows.
+ // Each flow is served from its own goroutine.
+ s.active.Add(1)
+ go func(ln stream.Listener, ep naming.Endpoint) {
+ s.listenLoop(ln, ep)
+ s.active.Done()
+ }(ls.ln, ls.lep)
+ }
+
+ for _, iep := range ls.ieps {
+ s.publisher.AddServer(iep.String(), s.servesMountTable)
+ eps = append(eps, iep)
+ }
}
- eps := make([]naming.Endpoint, len(ieps))
- for i, iep := range ieps {
- s.publisher.AddServer(iep.String(), s.servesMountTable)
- eps[i] = iep
- s.endpoints = append(s.endpoints, iep)
- }
+
return eps, nil
}
@@ -556,7 +482,6 @@
return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
}
s.Lock()
- s.listeners[ln] = struct{}{}
s.proxies[proxy] = proxyState{iep, nil}
s.Unlock()
s.publisher.AddServer(iep.String(), s.servesMountTable)
@@ -590,7 +515,14 @@
// (1) Unpublish its name
s.publisher.RemoveServer(iep.String())
s.Lock()
- s.proxies[proxy] = proxyState{iep, verror.Make(verror.NoServers, nil, err)}
+ if err != nil {
+ s.proxies[proxy] = proxyState{iep, verror.Make(verror.NoServers, s.ctx, err)}
+ } else {
+ // err will be nill if we're stopping.
+ s.proxies[proxy] = proxyState{iep, nil}
+ s.Unlock()
+ return
+ }
s.Unlock()
}
@@ -624,14 +556,44 @@
}
}
+// addListener adds the supplied listener taking care to
+// check to see if we're already stopping. It returns true
+// if the listener was added.
+func (s *server) addListener(ln stream.Listener) bool {
+ s.Lock()
+ defer s.Unlock()
+ if s.isStopState() {
+ return false
+ }
+ s.listeners[ln] = struct{}{}
+ return true
+}
+
+// rmListener removes the supplied listener taking care to
+// check if we're already stopping. It returns true if the
+// listener was removed.
+func (s *server) rmListener(ln stream.Listener) bool {
+ s.Lock()
+ defer s.Unlock()
+ if s.isStopState() {
+ return false
+ }
+ delete(s.listeners, ln)
+ return true
+}
+
func (s *server) listenLoop(ln stream.Listener, ep naming.Endpoint) error {
defer vlog.VI(1).Infof("ipc: Stopped listening on %s", ep)
var calls sync.WaitGroup
+
+ if !s.addListener(ln) {
+ // We're stopping.
+ return nil
+ }
+
defer func() {
calls.Wait()
- s.Lock()
- delete(s.listeners, ln)
- s.Unlock()
+ s.rmListener(ln)
}()
for {
flow, err := ln.Accept()
@@ -658,56 +620,117 @@
}
}
-/*
-func (s *server) applyChange(dhcpl *dhcpListener, addrs []net.Addr, fn func(string)) {
- dhcpl.Lock()
- defer dhcpl.Unlock()
- for _, a := range addrs {
- if ip := netstate.AsIP(a); ip != nil {
- dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.pubPort)
- fn(dhcpl.ep.String())
- }
- }
-}
-
-func (s *server) dhcpLoop(dhcpl *dhcpListener) {
- defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
+func (s *server) dhcpLoop(ch chan config.Setting) {
+ defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes")
vlog.VI(2).Infof("ipc: dhcp loop")
-
- ep := *dhcpl.ep
- // Publish all of the addresses
- for _, pubAddr := range dhcpl.pubAddrs {
- ep.Address = net.JoinHostPort(pubAddr.Address().String(), dhcpl.pubPort)
- s.publisher.AddServer(ep.String(), s.servesMountTable)
- }
-
- for setting := range dhcpl.ch {
+ for setting := range ch {
if setting == nil {
return
}
switch v := setting.Value().(type) {
- case bool:
- return
- case []net.Addr:
+ case []ipc.Address:
s.Lock()
- if s.stopped {
+ if s.isStopState() {
s.Unlock()
return
}
- publisher := s.publisher
- s.Unlock()
+ var err error
+ var changed []naming.Endpoint
switch setting.Name() {
case ipc.NewAddrsSetting:
- vlog.Infof("Added some addresses: %q", v)
- s.applyChange(dhcpl, v, func(name string) { publisher.AddServer(name, s.servesMountTable) })
+ changed = s.addAddresses(v)
case ipc.RmAddrsSetting:
- vlog.Infof("Removed some addresses: %q", v)
- s.applyChange(dhcpl, v, publisher.RemoveServer)
+ changed, err = s.removeAddresses(v)
}
+ change := ipc.NetworkChange{
+ Time: time.Now(),
+ State: externalStates[s.state],
+ Setting: setting,
+ Changed: changed,
+ Error: err,
+ }
+ vlog.VI(2).Infof("ipc: dhcp: change %v", change)
+ for ch, _ := range s.dhcpState.watchers {
+ select {
+ case ch <- change:
+ default:
+ }
+ }
+ s.Unlock()
+ default:
+ vlog.Errorf("ipc: dhcpLoop: unhandled setting type %T", v)
}
}
}
-*/
+
+func getHost(address ipc.Address) string {
+ host, _, err := net.SplitHostPort(address.Address().String())
+ if err == nil {
+ return host
+ }
+ return address.Address().String()
+
+}
+
+// Remove all endpoints that have the same host address as the supplied
+// address parameter.
+func (s *server) removeAddresses(addresses []ipc.Address) ([]naming.Endpoint, error) {
+ var removed []naming.Endpoint
+ for _, address := range addresses {
+ host := getHost(address)
+ for ls, _ := range s.listenState {
+ if ls != nil && ls.roaming && len(ls.ieps) > 0 {
+ remaining := make([]*inaming.Endpoint, 0, len(ls.ieps))
+ for _, iep := range ls.ieps {
+ lnHost, _, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ lnHost = iep.Address
+ }
+ if lnHost == host {
+ vlog.VI(2).Infof("ipc: dhcp removing: %s", iep)
+ removed = append(removed, iep)
+ s.publisher.RemoveServer(iep.String())
+ continue
+ }
+ remaining = append(remaining, iep)
+ }
+ ls.ieps = remaining
+ }
+ }
+ }
+ return removed, nil
+}
+
+// Add new endpoints for the new address. There is no way to know with
+// 100% confidence which new endpoints to publish without shutting down
+// all network connections and reinitializing everything from scratch.
+// Instead, we find all roaming listeners with at least one endpoint
+// and create a new endpoint with the same port as the existing ones
+// but with the new address supplied to us to by the dhcp code. As
+// an additional safeguard we reject the new address if it is not
+// externally accessible.
+// This places the onus on the dhcp/roaming code that sends us addresses
+// to ensure that those addresses are externally reachable.
+func (s *server) addAddresses(addresses []ipc.Address) []naming.Endpoint {
+ var added []naming.Endpoint
+ for _, address := range addresses {
+ if !netstate.IsAccessibleIP(address) {
+ return added
+ }
+ host := getHost(address)
+ for ls, _ := range s.listenState {
+ if ls != nil && ls.roaming {
+ niep := ls.protoIEP
+ niep.Address = net.JoinHostPort(host, ls.port)
+ ls.ieps = append(ls.ieps, &niep)
+ vlog.VI(2).Infof("ipc: dhcp adding: %s", niep)
+ s.publisher.AddServer(niep.String(), s.servesMountTable)
+ added = append(added, &niep)
+ }
+ }
+ }
+ return added
+}
type leafDispatcher struct {
invoker ipc.Invoker
@@ -724,11 +747,11 @@
func (s *server) Serve(name string, obj interface{}, authorizer security.Authorizer) error {
defer vlog.LogCall()()
if obj == nil {
- return s.newBadArg("nil object")
+ return verror.Make(verror.BadArg, s.ctx, "nil object")
}
invoker, err := objectToInvoker(obj)
if err != nil {
- return s.newBadArg(fmt.Sprintf("bad object: %v", err))
+ return verror.Make(verror.BadArg, s.ctx, fmt.Sprintf("bad object: %v", err))
}
return s.ServeDispatcher(name, &leafDispatcher{invoker, authorizer})
}
@@ -736,7 +759,7 @@
func (s *server) ServeDispatcher(name string, disp ipc.Dispatcher) error {
defer vlog.LogCall()()
if disp == nil {
- return s.newBadArg("nil dispatcher")
+ return verror.Make(verror.BadArg, s.ctx, "nil dispatcher")
}
s.Lock()
defer s.Unlock()
@@ -754,7 +777,7 @@
func (s *server) AddName(name string) error {
defer vlog.LogCall()()
if len(name) == 0 {
- return s.newBadArg("name is empty")
+ return verror.Make(verror.BadArg, s.ctx, "name is empty")
}
s.Lock()
defer s.Unlock()
@@ -818,14 +841,27 @@
}(ln)
}
- for dhcpl, _ := range s.dhcpListeners {
- dhcpl.Lock()
- dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
- dhcpl.ch <- config.NewBool("EOF", "stop", true)
- dhcpl.Unlock()
+ drain := func(ch chan config.Setting) {
+ for {
+ select {
+ case v := <-ch:
+ if v == nil {
+ return
+ }
+ default:
+ close(ch)
+ return
+ }
+ }
+ }
+
+ if dhcp := s.dhcpState; dhcp != nil {
+ dhcp.publisher.CloseFork(dhcp.name, dhcp.ch)
+ drain(dhcp.ch)
}
s.Unlock()
+
var firstErr error
for i := 0; i < nListeners; i++ {
if err := <-errCh; err != nil && firstErr == nil {
@@ -836,7 +872,22 @@
// accepted.
// Wait for the publisher and active listener + flows to finish.
- s.active.Wait()
+ done := make(chan struct{}, 1)
+ go func() { s.active.Wait(); done <- struct{}{} }()
+
+ select {
+ case <-done:
+ case <-time.After(5 * time.Minute):
+ vlog.Errorf("Listener Close Error: %v", firstErr)
+ vlog.Errorf("Timedout waiting for goroutines to stop: listeners: %d", nListeners, len(s.listeners))
+ for ln, _ := range s.listeners {
+ vlog.Errorf("Listener: %p", ln)
+ }
+ for ls, _ := range s.listenState {
+ vlog.Errorf("ListenState: %v", ls)
+ }
+ <-done
+ }
s.Lock()
defer s.Unlock()
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index 9caf0d3..d83367a 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -2,24 +2,26 @@
import (
"fmt"
+ "net"
"os"
- "path/filepath"
"reflect"
- "runtime"
"sort"
"strings"
"testing"
"time"
+ "v.io/core/veyron2/config"
"v.io/core/veyron2/context"
"v.io/core/veyron2/ipc"
"v.io/core/veyron2/naming"
"v.io/core/veyron2/security"
verror "v.io/core/veyron2/verror2"
+ "v.io/core/veyron2/vlog"
"v.io/core/veyron/lib/expect"
"v.io/core/veyron/lib/modules"
"v.io/core/veyron/lib/modules/core"
+ "v.io/core/veyron/lib/netstate"
tsecurity "v.io/core/veyron/lib/testutil/security"
imanager "v.io/core/veyron/runtimes/google/ipc/stream/manager"
"v.io/core/veyron/runtimes/google/ipc/stream/vc"
@@ -46,14 +48,15 @@
// particular, it doesn't panic).
func TestBadObject(t *testing.T) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ defer sm.Shutdown()
ns := tnaming.NewSimpleNamespace()
-
ctx := testContext()
server, err := testInternalNewServer(ctx, sm, ns)
if err != nil {
t.Fatal(err)
}
defer server.Stop()
+
if _, err := server.Listen(listenSpec); err != nil {
t.Fatalf("Listen failed: %v", err)
}
@@ -102,7 +105,6 @@
h.sh = sh
p, err := sh.Start(core.ProxyServerCommand, nil, args...)
if err != nil {
- p.Shutdown(os.Stderr, os.Stderr)
t.Fatalf("unexpected error: %s", err)
}
h.proxy = p
@@ -146,6 +148,7 @@
func testProxy(t *testing.T, spec ipc.ListenSpec, args ...string) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ defer sm.Shutdown()
ns := tnaming.NewSimpleNamespace()
client, err := InternalNewClient(sm, ns, vc.LocalPrincipal{tsecurity.NewPrincipal("client")})
if err != nil {
@@ -333,6 +336,45 @@
}
}
+func TestServerArgs(t *testing.T) {
+ sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ defer sm.Shutdown()
+ ns := tnaming.NewSimpleNamespace()
+ server, err := InternalNewServer(testContext(), sm, ns, nil)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer server.Stop()
+ _, err = server.Listen(ipc.ListenSpec{})
+ if !verror.Is(err, verror.BadArg.ID) {
+ t.Fatalf("expected a BadArg error: got %v", err)
+ }
+ _, err = server.Listen(ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "*:0"}}})
+ if !verror.Is(err, verror.BadArg.ID) {
+ t.Fatalf("expected a BadArg error: got %v", err)
+ }
+ _, err = server.Listen(ipc.ListenSpec{
+ Addrs: ipc.ListenAddrs{
+ {"tcp", "*:0"},
+ {"tcp", "127.0.0.1:0"},
+ }})
+ if verror.Is(err, verror.BadArg.ID) {
+ t.Fatalf("expected a BadArg error: got %v", err)
+ }
+ status := server.Status()
+ if got, want := len(status.Errors), 1; got != want {
+ t.Fatalf("got %s, want %s", got, want)
+ }
+ _, err = server.Listen(ipc.ListenSpec{Addrs: ipc.ListenAddrs{{"tcp", "*:0"}}})
+ if !verror.Is(err, verror.BadArg.ID) {
+ t.Fatalf("expected a BadArg error: got %v", err)
+ }
+ status = server.Status()
+ if got, want := len(status.Errors), 1; got != want {
+ t.Fatalf("got %s, want %s", got, want)
+ }
+}
+
type statusServer struct{ ch chan struct{} }
func (s *statusServer) Hang(ctx ipc.ServerContext) {
@@ -341,12 +383,15 @@
func TestServerStatus(t *testing.T) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ defer sm.Shutdown()
ns := tnaming.NewSimpleNamespace()
principal := vc.LocalPrincipal{tsecurity.NewPrincipal("testServerStatus")}
server, err := testInternalNewServer(testContext(), sm, ns, principal)
if err != nil {
t.Fatal(err)
}
+ defer server.Stop()
+
status := server.Status()
if got, want := status.State, ipc.ServerInit; got != want {
t.Fatalf("got %s, want %s", got, want)
@@ -426,31 +471,28 @@
func TestServerStates(t *testing.T) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ defer sm.Shutdown()
ns := tnaming.NewSimpleNamespace()
- loc := func() string {
- _, file, line, _ := runtime.Caller(2)
- return fmt.Sprintf("%s:%d", filepath.Base(file), line)
- }
-
expectBadState := func(err error) {
if !verror.Is(err, verror.BadState.ID) {
- t.Fatalf("%s: unexpected error: %v", loc(), err)
+ t.Fatalf("%s: unexpected error: %v", loc(1), err)
}
}
expectNoError := func(err error) {
if err != nil {
- t.Fatalf("%s: unexpected error: %v", loc(), err)
+ t.Fatalf("%s: unexpected error: %v", loc(1), err)
}
}
server, err := testInternalNewServer(testContext(), sm, ns)
expectNoError(err)
+ defer server.Stop()
expectState := func(s ipc.ServerState) {
if got, want := server.Status().State, s; got != want {
- t.Fatalf("%s: got %s, want %s", loc(), got, want)
+ t.Fatalf("%s: got %s, want %s", loc(1), got, want)
}
}
@@ -493,6 +535,345 @@
expectBadState(err)
}
+func TestMountStatus(t *testing.T) {
+ sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ defer sm.Shutdown()
+ ns := tnaming.NewSimpleNamespace()
+ server, err := testInternalNewServer(testContext(), sm, ns)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer server.Stop()
+
+ eps, err := server.Listen(ipc.ListenSpec{
+ Addrs: ipc.ListenAddrs{
+ {"tcp", "127.0.0.1:0"},
+ {"tcp", "127.0.0.1:0"},
+ }})
+ if err != nil {
+ t.Fatal(err)
+ }
+ if got, want := len(eps), 2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+ if err = server.Serve("foo", &testServer{}, nil); err != nil {
+ t.Fatal(err)
+ }
+ status := server.Status()
+ if got, want := len(status.Mounts), 2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+ servers := status.Mounts.Servers()
+ if got, want := len(servers), 2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+ if got, want := servers, endpointToStrings(eps); !reflect.DeepEqual(got, want) {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+
+ // Add a second name and we should now see 4 mounts, 2 for each name.
+ if err := server.AddName("bar"); err != nil {
+ t.Fatal(err)
+ }
+ status = server.Status()
+ if got, want := len(status.Mounts), 4; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+ servers = status.Mounts.Servers()
+ if got, want := len(servers), 2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+ if got, want := servers, endpointToStrings(eps); !reflect.DeepEqual(got, want) {
+ t.Fatalf("got %v, want %v", got, want)
+ }
+ names := status.Mounts.Names()
+ if got, want := len(names), 2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+ serversPerName := map[string][]string{}
+ for _, ms := range status.Mounts {
+ serversPerName[ms.Name] = append(serversPerName[ms.Name], ms.Server)
+ }
+ if got, want := len(serversPerName), 2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+ for _, name := range []string{"foo", "bar"} {
+ if got, want := len(serversPerName[name]), 2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+ }
+}
+
+func updateHost(ep naming.Endpoint, address string) naming.Endpoint {
+ niep := *(ep).(*inaming.Endpoint)
+ niep.Address = address
+ return &niep
+}
+
+func getIPAddrs(eps []naming.Endpoint) []ipc.Address {
+ hosts := map[string]struct{}{}
+ for _, ep := range eps {
+ iep := (ep).(*inaming.Endpoint)
+ h, _, _ := net.SplitHostPort(iep.Address)
+ if len(h) > 0 {
+ hosts[h] = struct{}{}
+ }
+ }
+ addrs := []ipc.Address{}
+ for h, _ := range hosts {
+ a := &netstate.AddrIfc{Addr: &net.IPAddr{IP: net.ParseIP(h)}}
+ addrs = append(addrs, a)
+ }
+ return addrs
+}
+
+func endpointToStrings(eps []naming.Endpoint) []string {
+ r := []string{}
+ for _, ep := range eps {
+ r = append(r, ep.String())
+ }
+ sort.Strings(r)
+ return r
+}
+
+func cmpEndpoints(got, want []naming.Endpoint) bool {
+ if len(got) != len(want) {
+ return false
+ }
+ return reflect.DeepEqual(endpointToStrings(got), endpointToStrings(want))
+}
+
+func getUniqPorts(eps []naming.Endpoint) []string {
+ ports := map[string]struct{}{}
+ for _, ep := range eps {
+ iep := ep.(*inaming.Endpoint)
+ _, p, _ := net.SplitHostPort(iep.Address)
+ ports[p] = struct{}{}
+ }
+ r := []string{}
+ for p, _ := range ports {
+ r = append(r, p)
+ }
+ return r
+}
+
+func TestRoaming(t *testing.T) {
+ sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ defer sm.Shutdown()
+ ns := tnaming.NewSimpleNamespace()
+ server, err := testInternalNewServer(testContext(), sm, ns)
+ defer server.Stop()
+
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ publisher := config.NewPublisher()
+ roaming := make(chan config.Setting)
+ stop, err := publisher.CreateStream("roaming", "roaming", roaming)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() { publisher.Shutdown(); <-stop }()
+
+ ipv4And6 := func(network string, addrs []ipc.Address) ([]ipc.Address, error) {
+ accessible := netstate.AddrList(addrs)
+ ipv4 := accessible.Filter(netstate.IsUnicastIPv4)
+ ipv6 := accessible.Filter(netstate.IsUnicastIPv6)
+ return append(ipv4, ipv6...), nil
+ }
+ spec := ipc.ListenSpec{
+ Addrs: ipc.ListenAddrs{
+ {"tcp", "*:0"},
+ {"tcp", ":0"},
+ {"tcp", ":0"},
+ },
+ StreamName: "roaming",
+ StreamPublisher: publisher,
+ AddressChooser: ipv4And6,
+ }
+
+ eps, err := server.Listen(spec)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if len(eps) == 0 {
+ t.Fatal(err)
+ }
+
+ if err = server.Serve("foo", &testServer{}, nil); err != nil {
+ t.Fatal(err)
+ }
+ if err = server.AddName("bar"); err != nil {
+ t.Fatal(err)
+ }
+
+ status := server.Status()
+ if got, want := status.Endpoints, eps; !cmpEndpoints(got, want) {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+
+ if got, want := len(status.Mounts), len(eps)*2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+
+ n1 := &netstate.AddrIfc{Addr: &net.IPAddr{IP: net.ParseIP("1.1.1.1")}}
+ n2 := &netstate.AddrIfc{Addr: &net.IPAddr{IP: net.ParseIP("2.2.2.2")}}
+
+ watcher := make(chan ipc.NetworkChange, 10)
+ server.WatchNetwork(watcher)
+ defer close(watcher)
+
+ roaming <- ipc.NewAddAddrsSetting([]ipc.Address{n1, n2})
+
+ waitForChange := func() *ipc.NetworkChange {
+ vlog.Infof("Waiting on %p", watcher)
+ select {
+ case c := <-watcher:
+ return &c
+ case <-time.After(time.Minute):
+ t.Fatalf("timedout: %s", loc(1))
+ }
+ return nil
+ }
+
+ // We expect 4 changes, one for each IP per usable listen spec addr.
+ change := waitForChange()
+ if got, want := len(change.Changed), 4; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+
+ nepsA := make([]naming.Endpoint, len(eps))
+ copy(nepsA, eps)
+ for _, p := range getUniqPorts(eps) {
+ nep1 := updateHost(eps[0], net.JoinHostPort("1.1.1.1", p))
+ nep2 := updateHost(eps[0], net.JoinHostPort("2.2.2.2", p))
+ nepsA = append(nepsA, []naming.Endpoint{nep1, nep2}...)
+ }
+
+ status = server.Status()
+ if got, want := status.Endpoints, nepsA; !cmpEndpoints(got, want) {
+ t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
+ }
+
+ if got, want := len(status.Mounts), len(nepsA)*2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+ if got, want := len(status.Mounts.Servers()), len(nepsA); got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+
+ roaming <- ipc.NewRmAddrsSetting([]ipc.Address{n1})
+
+ // We expect 2 changes, one for each usable listen spec addr.
+ change = waitForChange()
+ if got, want := len(change.Changed), 2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+
+ nepsR := make([]naming.Endpoint, len(eps))
+ copy(nepsR, eps)
+ for _, p := range getUniqPorts(eps) {
+ nep2 := updateHost(eps[0], net.JoinHostPort("2.2.2.2", p))
+ nepsR = append(nepsR, nep2)
+ }
+
+ status = server.Status()
+ if got, want := status.Endpoints, nepsR; !cmpEndpoints(got, want) {
+ t.Fatalf("got %v, want %v [%d, %d]", got, want, len(got), len(want))
+ }
+
+ // Remove all addresses to mimic losing all connectivity.
+ roaming <- ipc.NewRmAddrsSetting(getIPAddrs(nepsR))
+
+ // We expect changes for all of the current endpoints
+ change = waitForChange()
+ if got, want := len(change.Changed), len(nepsR); got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+
+ status = server.Status()
+ if got, want := len(status.Mounts), 0; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+
+ roaming <- ipc.NewAddAddrsSetting([]ipc.Address{n1})
+ // We expect 2 changes, one for each usable listen spec addr.
+ change = waitForChange()
+ if got, want := len(change.Changed), 2; got != want {
+ t.Fatalf("got %d, want %d", got, want)
+ }
+}
+
+func TestWatcherDeadlock(t *testing.T) {
+ sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
+ defer sm.Shutdown()
+ ns := tnaming.NewSimpleNamespace()
+ server, err := testInternalNewServer(testContext(), sm, ns)
+ defer server.Stop()
+
+ if err != nil {
+ t.Fatal(err)
+ }
+
+ publisher := config.NewPublisher()
+ roaming := make(chan config.Setting)
+ stop, err := publisher.CreateStream("roaming", "roaming", roaming)
+ if err != nil {
+ t.Fatal(err)
+ }
+ defer func() { publisher.Shutdown(); <-stop }()
+
+ spec := ipc.ListenSpec{
+ Addrs: ipc.ListenAddrs{
+ {"tcp", ":0"},
+ },
+ StreamName: "roaming",
+ StreamPublisher: publisher,
+ }
+ eps, err := server.Listen(spec)
+ if err != nil {
+ t.Fatal(err)
+ }
+ if err = server.Serve("foo", &testServer{}, nil); err != nil {
+ t.Fatal(err)
+ }
+
+ // Set a watcher that we never read from - the intent is to make sure
+ // that the listener still listens to changes even though there is no
+ // goroutine to read from the watcher channel.
+ watcher := make(chan ipc.NetworkChange, 0)
+ server.WatchNetwork(watcher)
+ defer close(watcher)
+
+ // Remove all addresses to mimic losing all connectivity.
+ roaming <- ipc.NewRmAddrsSetting(getIPAddrs(eps))
+
+ // Add in two new addresses
+ n1 := &netstate.AddrIfc{Addr: &net.IPAddr{IP: net.ParseIP("1.1.1.1")}}
+ n2 := &netstate.AddrIfc{Addr: &net.IPAddr{IP: net.ParseIP("2.2.2.2")}}
+ roaming <- ipc.NewAddAddrsSetting([]ipc.Address{n1, n2})
+
+ neps := make([]naming.Endpoint, 0, len(eps))
+ for _, p := range getUniqPorts(eps) {
+ nep1 := updateHost(eps[0], net.JoinHostPort("1.1.1.1", p))
+ nep2 := updateHost(eps[0], net.JoinHostPort("2.2.2.2", p))
+ neps = append(neps, []naming.Endpoint{nep1, nep2}...)
+ }
+ then := time.Now()
+ for {
+ status := server.Status()
+ if got, want := status.Endpoints, neps; cmpEndpoints(got, want) {
+ break
+ }
+ time.Sleep(100 * time.Millisecond)
+ if time.Now().Sub(then) > time.Minute {
+ t.Fatalf("timed out waiting for changes to take effect")
+ }
+ }
+
+}
+
// Required by modules framework.
func TestHelperProcess(t *testing.T) {
modules.DispatchInTest()