veyron2/ipc: add RoamingListen support.
This CL adds support for 'roaming' whereby a server may dynamically
change its IP address in response to network changes. It adds a new
API method, RoamingListen, that listens on a config.Publisher channel
for a prescribed set of messages that encode the initial and changing
be changed with an option to NewServer and this usage is only expected
to be required for unit tests and other special cases.
The 'roaming' profile, veyron/profiles/roaming is required to enable
this support. This also brings in support for GCE's 1:1 NAT configuration.
This CL is preliminary and not fully tested, in particular more
unit tests are required for the core RoamingListen call and for
remove servers from the config.Publisher stream.
Change-Id: Ic09bfe0edc6391702e42468e88ae5f4d56bc3f4a
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 8f3f127..2ac1be0 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -455,18 +455,18 @@
nameErr = "does not match the provided pattern"
)
var (
- // TODO(ataly, ashankar): Uncomment the following once server authorization
- // is enabled.
- // now = time.Now()
- // cavOnlyV1 = caveat.UniversalCaveat(caveat.PeerIdentity{"client/v1"})
- // cavExpired = security.ServiceCaveat{
- // Service: security.AllPrincipals,
- // Caveat: &caveat.Expiry{IssueTime: now, ExpiryTime: now},
- // }
- // clientV1ID = derive(clientID, "v1")
- // clientV2ID = derive(clientID, "v2")
- // serverV1ID = derive(serverID, "v1", cavOnlyV1)
- // serverExpiredID = derive(serverID, "expired", cavExpired)
+ // TODO(ataly, ashankar): Uncomment the following once server authorization
+ // is enabled.
+ // now = time.Now()
+ // cavOnlyV1 = caveat.UniversalCaveat(caveat.PeerIdentity{"client/v1"})
+ // cavExpired = security.ServiceCaveat{
+ // Service: security.AllPrincipals,
+ // Caveat: &caveat.Expiry{IssueTime: now, ExpiryTime: now},
+ // }
+ // clientV1ID = derive(clientID, "v1")
+ // clientV2ID = derive(clientID, "v2")
+ // serverV1ID = derive(serverID, "v1", cavOnlyV1)
+ // serverExpiredID = derive(serverID, "expired", cavExpired)
)
tests := []struct {
@@ -1106,7 +1106,7 @@
a.IP = net.ParseIP("1.1.1.1")
return a, nil
}
- server, err := InternalNewServer(testContext(), sm, ns, vc.FixedLocalID(serverID), veyron2.PreferredAddressOpt(pa))
+ server, err := InternalNewServer(testContext(), sm, ns, vc.FixedLocalID(serverID), &veyron2.AddressChooserOpt{pa})
if err != nil {
t.Errorf("InternalNewServer failed: %v", err)
}
@@ -1139,7 +1139,7 @@
paerr := func(string, []net.Addr) (net.Addr, error) {
return nil, fmt.Errorf("oops")
}
- server, err := InternalNewServer(testContext(), sm, ns, vc.FixedLocalID(serverID), veyron2.PreferredAddressOpt(paerr))
+ server, err := InternalNewServer(testContext(), sm, ns, vc.FixedLocalID(serverID), &veyron2.AddressChooserOpt{paerr})
if err != nil {
t.Errorf("InternalNewServer failed: %v", err)
}
diff --git a/runtimes/google/ipc/roaming_test.go b/runtimes/google/ipc/roaming_test.go
new file mode 100644
index 0000000..57437bd
--- /dev/null
+++ b/runtimes/google/ipc/roaming_test.go
@@ -0,0 +1,25 @@
+package ipc_test
+
+/*
+func startRoamingServer() {
+ mgr := imanager.InternalNew(naming.FixedRoutingID(0x1111111))
+ ns := newNamespace()
+}
+
+func TestRoamingListen(t *testing.T) {
+ r, err := rt.New()
+ if err != nil {
+ t.Errorf("unexpected error: %s", err)
+ }
+ publisher := config.CreateStream("test")
+
+ server, err := r.NewServer()
+ defer server.Stop()
+
+ if err != nil {
+ t.Errorf("unexpected error: %s", err)
+ }
+ server.RoamingListen(r.Publisher())
+ server.Serve(nil)
+}
+*/
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 3457328..cb2cece 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -10,13 +10,13 @@
"time"
"veyron/lib/netstate"
-
"veyron/runtimes/google/lib/publisher"
inaming "veyron/runtimes/google/naming"
isecurity "veyron/runtimes/google/security"
vsecurity "veyron/security"
"veyron2"
+ "veyron2/config"
"veyron2/context"
"veyron2/ipc"
"veyron2/ipc/stream"
@@ -37,36 +37,48 @@
type server struct {
sync.Mutex
- ctx context.T // context used by the server to make internal RPCs.
- streamMgr stream.Manager // stream manager to listen for new flows.
- publisher publisher.Publisher // publisher to publish mounttable mounts.
- listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
- listeners map[stream.Listener]bool // listeners created by Listen.
- disp ipc.Dispatcher // dispatcher to serve RPCs
- active sync.WaitGroup // active goroutines we've spawned.
- stopped bool // whether the server has been stopped.
- stoppedChan chan struct{} // closed when the server has been stopped.
+ ctx context.T // context used by the server to make internal RPCs.
+ streamMgr stream.Manager // stream manager to listen for new flows.
+ publisher publisher.Publisher // publisher to publish mounttable mounts.
+ listenerOpts []stream.ListenerOpt // listener opts passed to Listen.
+ listeners map[stream.Listener]*dhcpListener // listeners created by Listen.
+ disp ipc.Dispatcher // dispatcher to serve RPCs
+ active sync.WaitGroup // active goroutines we've spawned.
+ stopped bool // whether the server has been stopped.
+ stoppedChan chan struct{} // closed when the server has been stopped.
ns naming.Namespace
- preferredAddress func(network string, addrs []net.Addr) (net.Addr, error)
+ addressChooser veyron2.AddressChooser
servesMountTable bool
- stats *ipcStats // stats for this server.
+ roamingOpt veyron2.RoamingPublisherOpt
+ // TODO(cnicolaou): add roaming stats to ipcStats
+ stats *ipcStats // stats for this server.
+}
+
+type dhcpListener struct {
+ sync.Mutex
+ publisher *config.Publisher // publisher used to fork the stream
+ name string // name of the publisher stream
+ ep *inaming.Endpoint // endpoint returned after listening and choosing an address to be published
+ port string
+ ch chan config.Setting // channel to receive settings over
}
func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ServerOpt) (ipc.Server, error) {
s := &server{
- ctx: ctx,
- streamMgr: streamMgr,
- publisher: publisher.New(ctx, ns, publishPeriod),
- listeners: make(map[stream.Listener]bool),
- stoppedChan: make(chan struct{}),
- preferredAddress: preferredIPAddress,
- ns: ns,
- stats: newIPCStats(naming.Join("ipc", "server", streamMgr.RoutingID().String())),
+ ctx: ctx,
+ streamMgr: streamMgr,
+ publisher: publisher.New(ctx, ns, publishPeriod),
+ listeners: make(map[stream.Listener]*dhcpListener),
+ stoppedChan: make(chan struct{}),
+ ns: ns,
+ stats: newIPCStats(naming.Join("ipc", "server", streamMgr.RoutingID().String())),
}
for _, opt := range opts {
switch opt := opt.(type) {
- case veyron2.PreferredAddressOpt:
- s.preferredAddress = opt
+ case *veyron2.AddressChooserOpt:
+ s.addressChooser = opt.AddressChooser
+ case *veyron2.RoamingPublisherOpt:
+ s.roamingOpt = *opt
case stream.ListenerOpt:
// Collect all ServerOpts that are also ListenerOpts.
s.listenerOpts = append(s.listenerOpts, opt)
@@ -113,23 +125,6 @@
return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
}
-// preferredIPAddress returns the preferred IP address, which is,
-// a public IPv4 address, then any non-loopback IPv4, then a public
-// IPv6 address and finally any non-loopback/link-local IPv6
-func preferredIPAddress(network string, addrs []net.Addr) (net.Addr, error) {
- if !netstate.IsIPNetwork(network) {
- return nil, fmt.Errorf("can't support network %q", network)
- }
- al := netstate.AddrList(addrs)
- for _, predicate := range []netstate.Predicate{netstate.IsPublicUnicastIPv4,
- netstate.IsUnicastIPv4, netstate.IsPublicUnicastIPv6} {
- if a := al.First(predicate); a != nil {
- return a, nil
- }
- }
- return nil, fmt.Errorf("failed to find any usable address for %q", network)
-}
-
func (s *server) Listen(protocol, address string) (naming.Endpoint, error) {
s.Lock()
// Shortcut if the server is stopped, to avoid needlessly creating a
@@ -169,18 +164,21 @@
if ip == nil {
return nil, fmt.Errorf("ipc: Listen(%q, %q) failed to parse IP address from address", protocol, address)
}
- if ip.IsUnspecified() && s.preferredAddress != nil {
- // Need to find a usable IP address.
- addrs, err := netstate.GetAccessibleIPs()
- if err == nil {
- if a, err := s.preferredAddress(protocol, addrs); err == nil {
- if ip := netstate.AsIP(a); ip != nil {
- // a may be an IPNet or an IPAddr under the covers,
- // but we really want the IP portion without any
- // netmask so we use AsIP to ensure that.
- iep.Address = net.JoinHostPort(ip.String(), port)
+ if ip.IsUnspecified() {
+ if s.addressChooser != nil {
+ // Need to find a usable IP address.
+ if addrs, err := netstate.GetAccessibleIPs(); err == nil {
+ if a, err := s.addressChooser(protocol, addrs); err == nil {
+ if ip := netstate.AsIP(a); ip != nil {
+ // a may be an IPNet or an IPAddr under the covers,
+ // but we really want the IP portion without any
+ // netmask so we use AsIP to ensure that.
+ iep.Address = net.JoinHostPort(ip.String(), port)
+ }
}
}
+ } else {
+ vlog.Errorf("no address chooser specified")
}
}
}
@@ -193,7 +191,7 @@
ln.Close()
return nil, errServerStopped
}
- s.listeners[ln] = true
+ s.listeners[ln] = nil
// We have a single goroutine per listener to accept new flows.
// Each flow is served from its own goroutine.
s.active.Add(1)
@@ -213,6 +211,143 @@
return ep, nil
}
+// externalEndpoint examines the endpoint returned by the stream listen call
+// and fills in the address to publish to the mount table. It also returns the
+// IP host address that it selected for publishing to the mount table.
+func (s *server) externalEndpoint(lep naming.Endpoint) (*inaming.Endpoint, *net.IPAddr, error) {
+ // We know the endpoint format, so we crack it open...
+ iep, ok := lep.(*inaming.Endpoint)
+ if !ok {
+ return nil, nil, fmt.Errorf("failed translating internal endpoint data types")
+ }
+
+ switch iep.Protocol {
+ case "tcp", "tcp4", "tcp6":
+ host, port, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ return nil, nil, err
+ }
+ ip := net.ParseIP(host)
+ if ip == nil {
+ return nil, nil, fmt.Errorf("failed to parse %q as an IP host", host)
+ }
+ if ip.IsUnspecified() && s.addressChooser != nil {
+ // Need to find a usable IP address since the call to listen
+ // didn't specify one.
+ addrs, err := netstate.GetAccessibleIPs()
+ if err == nil {
+ if a, err := s.addressChooser(iep.Protocol, addrs); err == nil {
+ iep.Address = net.JoinHostPort(a.String(), port)
+ return iep, a.(*net.IPAddr), nil
+ }
+ }
+ } else {
+ // Listen used a fixed IP address, which essentially disables
+ // roaming.
+ return iep, nil, nil
+ }
+ }
+ return iep, nil, nil
+}
+
+func (s *server) RoamingListen() (naming.Endpoint, error) {
+ s.Lock()
+ // Shortcut if the server is stopped, to avoid needlessly creating a
+ // listener.
+ if s.stopped {
+ s.Unlock()
+ return nil, errServerStopped
+ }
+ s.Unlock()
+
+ publisher := s.roamingOpt.Publisher
+ streamName := s.roamingOpt.StreamName
+
+ ch := make(chan config.Setting)
+ configStream, err := publisher.ForkStream(streamName, ch)
+ if err != nil {
+ return nil, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
+ }
+ setting := configStream.Latest[ipc.ProtocolSetting]
+ if setting == nil {
+ return nil, fmt.Errorf("protocol setting has not be sent")
+ }
+ protocol, ok := setting.Value().(string)
+ if !ok {
+ return nil, fmt.Errorf("protocol setting is of the wrong type %T", setting.Value())
+ }
+
+ setting = configStream.Latest[ipc.ListenSpecSetting]
+ if setting == nil {
+ return nil, fmt.Errorf("listen spec setting has not be sent")
+ }
+ listenSpec, ok := setting.Value().(config.IPHostPortFlag)
+ if !ok {
+ return nil, fmt.Errorf("listen spec setting is of the wrong type %T", setting.Value())
+ }
+
+ address := listenSpec.String()
+
+ isNotLoopback := func(a net.Addr) bool {
+ if ip := netstate.AsIP(a); ip != nil {
+ return !ip.IsLoopback()
+ }
+ return true
+ }
+
+ al := netstate.FromIPAddr(listenSpec.IP)
+ if len(al) > 0 && al.First(isNotLoopback) == nil {
+ // All our addresses are loopback addresses
+ // TODO(cnicolaou): use Listen for now, but should refactor more completely.
+ return s.Listen(protocol, listenSpec.String())
+ }
+
+ ln, lep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
+ if err != nil {
+ vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
+ return nil, err
+ }
+ ep, ipaddr, err := s.externalEndpoint(lep)
+ if ipaddr == nil || err != nil {
+ ln.Close()
+ if ipaddr == nil {
+ return nil, fmt.Errorf("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", address)
+ }
+ return nil, err
+ }
+
+ s.Lock()
+ if s.stopped {
+ s.Unlock()
+ // Ignore error return since we can't really do much about it.
+ ln.Close()
+ return nil, errServerStopped
+ }
+ _, port, _ := net.SplitHostPort(ep.Address)
+ dhcpl := &dhcpListener{ep: ep, port: port, ch: ch, name: streamName, publisher: publisher}
+ s.listeners[ln] = dhcpl
+ // We have a goroutine per listener to accept new flows and
+ // a goroutine to listen for address changes.
+ // Each flow is served from its own goroutine.
+ s.active.Add(2)
+
+ // goroutine to listen for connections
+ go func(ln stream.Listener, ep naming.Endpoint) {
+ s.listenLoop(ln, ep)
+ s.active.Done()
+ }(ln, ep)
+
+ // goroutine to listen for address changes.
+ go func(dl *dhcpListener) {
+ s.dhcpLoop(dl)
+ s.active.Done()
+ }(dhcpl)
+
+ s.Unlock()
+ s.publisher.AddServer(s.publishEP(ep))
+ return ep, nil
+}
+
func (s *server) publishEP(ep naming.Endpoint) string {
var name string
if !s.servesMountTable {
@@ -260,7 +395,7 @@
// (3) reconnected, publish new address
s.publisher.AddServer(s.publishEP(ep))
s.Lock()
- s.listeners[ln] = true
+ s.listeners[ln] = nil
s.Unlock()
}
}
@@ -292,6 +427,48 @@
}
}
+func (s *server) applyChange(dhcpl *dhcpListener, addrs []net.Addr, fn func(string)) {
+ dhcpl.Lock()
+ defer dhcpl.Unlock()
+ for _, a := range addrs {
+ if ip := netstate.AsIP(a); ip != nil {
+ dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.port)
+ fn(s.publishEP(dhcpl.ep))
+ }
+ }
+}
+
+func (s *server) dhcpLoop(dhcpl *dhcpListener) {
+ defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
+ vlog.VI(2).Infof("ipc: dhcp loop")
+ for setting := range dhcpl.ch {
+ if setting == nil {
+ return
+ }
+ switch v := setting.Value().(type) {
+ case bool:
+ return
+ case []net.Addr:
+ s.Lock()
+ if s.stopped {
+ s.Unlock()
+ return
+ }
+ publisher := s.publisher
+ s.Unlock()
+ switch setting.Name() {
+ case ipc.NewAddrsSetting:
+ vlog.Infof("Added some addresses: %q", v)
+ s.applyChange(dhcpl, v, publisher.AddServer)
+ case ipc.RmAddrsSetting:
+ vlog.Infof("Removed some addresses: %q", v)
+ s.applyChange(dhcpl, v, publisher.RemoveServer)
+ }
+
+ }
+ }
+}
+
func (s *server) Serve(name string, disp ipc.Dispatcher) error {
s.Lock()
defer s.Unlock()
@@ -339,10 +516,16 @@
// flows will continue until they terminate naturally.
nListeners := len(s.listeners)
errCh := make(chan error, nListeners)
- for ln, _ := range s.listeners {
+ for ln, dhcpl := range s.listeners {
go func(ln stream.Listener) {
errCh <- ln.Close()
}(ln)
+ if dhcpl != nil {
+ dhcpl.Lock()
+ dhcpl.publisher.CloseFork(dhcpl.name, dhcpl.ch)
+ dhcpl.ch <- config.NewBool("EOF", "stop", true)
+ dhcpl.Unlock()
+ }
}
s.Unlock()
var firstErr error
diff --git a/runtimes/google/lib/publisher/publisher.go b/runtimes/google/lib/publisher/publisher.go
index 77eab43..97e2176 100644
--- a/runtimes/google/lib/publisher/publisher.go
+++ b/runtimes/google/lib/publisher/publisher.go
@@ -137,7 +137,7 @@
}
func (p *publisher) runLoop(ctx context.T, ns naming.Namespace, period time.Duration) {
- vlog.VI(1).Info("ipc pub: start runLoop")
+ vlog.VI(2).Info("ipc pub: start runLoop")
state := newPubState(ctx, ns, period)
for {
select {
@@ -146,7 +146,7 @@
// Closing the cmdchan signals us to break out of the loop. Unmount
// everything and signal that we're done by closing the donechan.
state.unmountAll()
- vlog.VI(1).Info("ipc pub: exit runLoop")
+ vlog.VI(2).Info("ipc pub: exit runLoop")
close(p.donechan)
return
}
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index ccd1319..f3884f0 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -127,20 +127,30 @@
ns := rt.ns
var id security.PublicID
var otherOpts []ipc.ServerOpt
+ addressChooserOpt := &veyron2.AddressChooserOpt{rt.profile.AddressChooser()}
+ roamingOpt := &veyron2.RoamingPublisherOpt{rt.publisher, "roaming"}
for _, opt := range opts {
switch topt := opt.(type) {
case veyron2.NamespaceOpt:
ns = topt
case veyron2.LocalIDOpt:
id = topt.PublicID
+ case *veyron2.AddressChooserOpt:
+ addressChooserOpt = topt
+ case *veyron2.RoamingPublisherOpt:
+ roamingOpt = topt
default:
otherOpts = append(otherOpts, opt)
}
}
// Add the option that provides the local identity to the server.
otherOpts = append(otherOpts, rt.newLocalID(id))
+ // Add the preferredAddr and roaming opts
+ otherOpts = append(otherOpts, addressChooserOpt)
+ otherOpts = append(otherOpts, roamingOpt)
ctx := rt.NewContext()
+
return iipc.InternalNewServer(ctx, sm, ns, otherOpts...)
}