veyron/runtimes/google/ipc: add support for selecting an 'appropriate' IP address to publish on.
In many cases, the Listen call uses an address that does not
specify which network interface/address to use for communicating
with the service - e.g. ":0". In these cases, the address returned
by Listen is 'unspecified' - e.g. 0.0.0.0 or the [] equivalent for
IPv6. This change provides a mechanism for choosing an address to
use from the set of available ones. The default, built-in behaviour
is to prefer any IPv4 address over IPv6 and public/routable over
private/non-routable addresses. This means that a private IPv4
trumps a public IPv6; this is debatable at best.
This default can be overriden by provided an option to the Server
factory that specifies an alternative function to use for choosing
the preferred address. Such functions will be provided by the Profiles.
This is the first CL required to add support for DHCP roaming.
Change-Id: If95a91ea6d95c28f85edfb3312d1207e3b7e1ce1
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 4fa5f62..5b5b36d 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -3,11 +3,13 @@
import (
"fmt"
"io"
+ "net"
"reflect"
"strings"
"sync"
"time"
+ "veyron/runtimes/google/lib/netconfig"
"veyron/runtimes/google/lib/publisher"
inaming "veyron/runtimes/google/naming"
isecurity "veyron/runtimes/google/security"
@@ -44,27 +46,27 @@
stopped bool // whether the server has been stopped.
stoppedChan chan struct{} // closed when the server has been stopped.
ns naming.Namespace
- publishOpt veyron2.ServerPublishOpt // which endpoints to publish
- publishing bool // is some name being published?
+ preferredAddress func(network string) (net.Addr, error)
servesMountTable bool
}
func InternalNewServer(ctx context.T, streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ServerOpt) (ipc.Server, error) {
s := &server{
- ctx: ctx,
- streamMgr: streamMgr,
- publisher: publisher.New(ctx, ns, publishPeriod),
- listeners: make(map[stream.Listener]bool),
- stoppedChan: make(chan struct{}),
- ns: ns,
+ ctx: ctx,
+ streamMgr: streamMgr,
+ publisher: publisher.New(ctx, ns, publishPeriod),
+ listeners: make(map[stream.Listener]bool),
+ stoppedChan: make(chan struct{}),
+ preferredAddress: preferredIPAddress,
+ ns: ns,
}
for _, opt := range opts {
switch opt := opt.(type) {
+ case veyron2.PreferredAddressOpt:
+ s.preferredAddress = opt
case stream.ListenerOpt:
// Collect all ServerOpts that are also ListenerOpts.
s.listenerOpts = append(s.listenerOpts, opt)
- case veyron2.ServerPublishOpt:
- s.publishOpt = opt
case veyron2.ServesMountTableOpt:
s.servesMountTable = bool(opt)
}
@@ -108,6 +110,64 @@
return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
}
+// preferredIPAddress returns the preferred IP address, which is,
+// a public IPv4 address, then any non-loopback IPv4, then a public
+// IPv6 address and finally any non-loopback/link-local IPv6
+func preferredIPAddress(network string) (net.Addr, error) {
+ interfaces, err := net.Interfaces()
+ if err != nil {
+ return nil, err
+ }
+ var any_ip4, any_ip6, pub_ip4, pub_ip6 net.Addr
+ for _, ifc := range interfaces {
+ addrs, err := ifc.Addrs()
+ if err != nil {
+ continue
+ }
+ for _, addr := range addrs {
+ ipn, ok := addr.(*net.IPNet)
+ if !ok {
+ continue
+ }
+ ip := ipn.IP
+ if ip == nil || ip.IsUnspecified() || ip.IsLoopback() || ip.IsMulticast() || ip.IsLinkLocalUnicast() || ip.IsLinkLocalMulticast() {
+ continue
+ }
+ if network == "tcp" || network == "tcp4" {
+ if t := ip.To4(); t != nil {
+ if any_ip4 == nil {
+ any_ip4 = addr
+ }
+ if pub_ip4 == nil && netconfig.IsGloballyRoutable(t) {
+ pub_ip4 = addr
+ }
+ }
+ }
+ if network == "tcp" || network == "tcp6" {
+ if t := ip.To16(); t != nil {
+ if any_ip6 == nil {
+ any_ip6 = addr
+ }
+ if pub_ip6 == nil && netconfig.IsGloballyRoutable(t) {
+ pub_ip6 = addr
+ }
+ }
+ }
+ }
+ }
+ switch {
+ case pub_ip4 != nil:
+ return pub_ip4, nil
+ case any_ip4 != nil:
+ return any_ip4, nil
+ case pub_ip6 != nil:
+ return pub_ip6, nil
+ case any_ip6 != nil:
+ return any_ip6, nil
+ }
+ return nil, fmt.Errorf("failed to find any usable address for %q", network)
+}
+
func (s *server) Listen(protocol, address string) (naming.Endpoint, error) {
s.Lock()
// Shortcut if the server is stopped, to avoid needlessly creating a
@@ -130,6 +190,30 @@
vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
return nil, err
}
+ iep, ok := ep.(*inaming.Endpoint)
+ if !ok {
+ return nil, fmt.Errorf("ipc: Listen on %v %v failed translating internal endpoint data types", protocol, address)
+ }
+
+ // We know the endpoint format, so we crack it open...
+ switch iep.Protocol {
+ case "tcp", "tcp4", "tcp6":
+ host, port, err := net.SplitHostPort(iep.Address)
+ if err != nil {
+ return nil, err
+ }
+ ip := net.ParseIP(host)
+ if ip == nil {
+ return nil, fmt.Errorf("ipc: Listen(%q, %q) failed to parse IP address from address", protocol, address)
+ }
+ if ip.IsUnspecified() && s.preferredAddress != nil {
+ // Need to find a usable IP address.
+ if a, err := s.preferredAddress(protocol); err == nil {
+ iep.Address = net.JoinHostPort(a.String(), port)
+ }
+ }
+ }
+
s.Lock()
if s.stopped {
s.Unlock()
@@ -137,17 +221,15 @@
ln.Close()
return nil, errServerStopped
}
- publish := s.publishOpt == veyron2.PublishAll || !s.publishing
- s.publishing = true
s.listeners[ln] = true
- // We have a single goroutine per listener to accept new flows. Each flow is
- // served from its own goroutine.
+ // We have a single goroutine per listener to accept new flows.
+ // Each flow is served from its own goroutine.
s.active.Add(1)
if protocol == inaming.Network {
- go func(ln stream.Listener, ep naming.Endpoint, proxy string, publish bool) {
- s.proxyListenLoop(ln, ep, proxy, publish)
+ go func(ln stream.Listener, ep naming.Endpoint, proxy string) {
+ s.proxyListenLoop(ln, ep, proxy)
s.active.Done()
- }(ln, ep, proxyName, publish)
+ }(ln, ep, proxyName)
} else {
go func(ln stream.Listener, ep naming.Endpoint) {
s.listenLoop(ln, ep)
@@ -155,9 +237,7 @@
}(ln, ep)
}
s.Unlock()
- if publish {
- s.publisher.AddServer(s.publishEP(ep))
- }
+ s.publisher.AddServer(s.publishEP(ep))
return ep, nil
}
@@ -171,7 +251,7 @@
return naming.JoinAddressName(ep.String(), name)
}
-func (s *server) proxyListenLoop(ln stream.Listener, ep naming.Endpoint, proxy string, publish bool) {
+func (s *server) proxyListenLoop(ln stream.Listener, ep naming.Endpoint, proxy string) {
const (
min = 5 * time.Millisecond
max = 5 * time.Minute
@@ -180,9 +260,7 @@
s.listenLoop(ln, ep)
// The listener is done, so:
// (1) Unpublish its name
- if publish {
- s.publisher.RemoveServer(s.publishEP(ep))
- }
+ s.publisher.RemoveServer(s.publishEP(ep))
// (2) Reconnect to the proxy unless the server has been stopped
backoff := min
ln = nil
@@ -208,9 +286,7 @@
}
}
// (3) reconnected, publish new address
- if publish {
- s.publisher.AddServer(s.publishEP(ep))
- }
+ s.publisher.AddServer(s.publishEP(ep))
s.Lock()
s.listeners[ln] = true
s.Unlock()