veyron/runtimes/google/ipc: refactor Listen loop code - back from the rollback...
- deleted commented out old code.
- cleaned up Listen implementation, the only behavioural difference
is that we now publish both local+proxy addresses for objects that
are being proxied.
- cleaned up the proxy retry loop.
- made it possible to use the proxy without a local listener.
- Dial takes a timeout argument.
Change-Id: Ia7598de824593b937d9d562f0597c7dc4100f88e
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index e828b51..8451c3e 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -247,9 +247,11 @@
return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err)
}
}
+
// Try all servers, and if none of them are authorized for the call then return the error of the last server
// that was tried.
var lastErr verror.E
+ // TODO(cnicolaou): sort servers by sensible metric.
for _, server := range servers {
flow, suffix, err := c.connectFlow(server)
if err != nil {
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 0cb0709..d9209f4 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -5,7 +5,9 @@
"fmt"
"io"
"net"
+ "path/filepath"
"reflect"
+ "runtime"
"strings"
"sync"
"testing"
@@ -189,15 +191,23 @@
return ep, server
}
-func verifyMount(t *testing.T, ns naming.Namespace, name string) {
- if _, err := ns.Resolve(testContext(), name); err != nil {
- t.Errorf("%s not found in mounttable", name)
+func loc(d int) string {
+ _, file, line, _ := runtime.Caller(d + 1)
+ return fmt.Sprintf("%s:%d", filepath.Base(file), line)
+}
+
+func verifyMount(t *testing.T, ns naming.Namespace, name string) []string {
+ addrs, err := ns.Resolve(testContext(), name)
+ if err != nil {
+ t.Errorf("%s: %s not found in mounttable", loc(1), name)
+ return nil
}
+ return addrs
}
func verifyMountMissing(t *testing.T, ns naming.Namespace, name string) {
if servers, err := ns.Resolve(testContext(), name); err == nil {
- t.Errorf("%s not supposed to be found in mounttable; got %d servers instead", name, len(servers))
+ t.Errorf("%s: %s not supposed to be found in mounttable; got %d servers instead", loc(1), name, len(servers))
}
}
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 454ca88..c5c9d02 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -58,10 +58,11 @@
type dhcpListener struct {
sync.Mutex
- publisher *config.Publisher // publisher used to fork the stream
- name string // name of the publisher stream
- ep *inaming.Endpoint // endpoint returned after listening and choosing an address to be published
- port string
+ publisher *config.Publisher // publisher used to fork the stream
+ name string // name of the publisher stream
+ ep *inaming.Endpoint // endpoint returned after listening
+ pubAddrs []ipc.Address // addresses to publish
+ pubPort string // port to use with the publish addresses
ch chan config.Setting // channel to receive settings over
}
@@ -128,161 +129,74 @@
return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
}
-/*
-// ipAddressChooser returns the preferred IP address, which is,
-// a public IPv4 address, then any non-loopback IPv4, then a public
-// IPv6 address and finally any non-loopback/link-local IPv6
-// It is replicated here to avoid a circular dependency and will, in any case,
-// go away when we transition away from Listen to the ListenX API.
-func ipAddressChooser(network string, addrs []ipc.Address) ([]ipc.Address, error) {
- if !netstate.IsIPProtocol(network) {
- return nil, fmt.Errorf("can't support network protocol %q", network)
+func addrFromIP(ip net.IP) ipc.Address {
+ return &netstate.AddrIfc{
+ Addr: &net.IPAddr{IP: ip},
}
- accessible := netstate.AddrList(addrs)
-
- // Try and find an address on a interface with a default route.
- predicates := []netstate.AddressPredicate{netstate.IsPublicUnicastIPv4,
- netstate.IsUnicastIPv4, netstate.IsPublicUnicastIPv6}
- for _, predicate := range predicates {
- if addrs := accessible.Filter(predicate); len(addrs) > 0 {
- onDefaultRoutes := addrs.Filter(netstate.IsOnDefaultRoute)
- if len(onDefaultRoutes) > 0 {
- return onDefaultRoutes, nil
- }
- }
- }
-
- // We failed to find any addresses with default routes, try again
- // but without the default route requirement.
- for _, predicate := range predicates {
- if addrs := accessible.Filter(predicate); len(addrs) > 0 {
- return addrs, nil
- }
- }
-
- return nil, fmt.Errorf("failed to find any usable address for %q", network)
}
-func (s *server) Listen(protocol, address string) (naming.Endpoint, error) {
- defer vlog.LogCall()()
- s.Lock()
- // Shortcut if the server is stopped, to avoid needlessly creating a
- // listener.
- if s.stopped {
- s.Unlock()
- return nil, errServerStopped
- }
- s.Unlock()
- var proxyName string
- if protocol == inaming.Network {
- proxyName = address
- var err error
- if address, err = s.resolveToAddress(address); err != nil {
- return nil, err
- }
- }
- // TODO(cnicolaou): pass options.ServesMountTable to streamMgr.Listen so that
- // it can more cleanly set the IsMountTable bit in the endpoint.
- ln, ep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
+// getIPRoamingAddrs finds an appropriate set of addresss to publish
+// externally and also determines if it's sensible to allow roaming.
+// It returns the host address of the first suitable address that
+// can be used and the port number that can be used with all addresses.
+// The host is required to allow the caller to construct an endpoint
+// that can be returned to the caller of Listen.
+func (s *server) getIPRoamingAddrs(chooser ipc.AddressChooser, iep *inaming.Endpoint) (addresses []ipc.Address, host string, port string, roaming bool, err error) {
+ host, port, err = net.SplitHostPort(iep.Address)
if err != nil {
- vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
- return nil, err
+ return nil, "", "", false, err
}
+ ip := net.ParseIP(host)
+ if ip == nil {
+ return nil, "", "", false, fmt.Errorf("failed to parse %q as an IP host", host)
+ }
+ if ip.IsUnspecified() && chooser != nil {
+ // Need to find a usable IP address since the call to listen
+ // didn't specify one.
+ if addrs, err := netstate.GetAccessibleIPs(); err == nil {
+ if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
+ phost := a[0].Address().String()
+ iep.Address = net.JoinHostPort(phost, port)
+ return a, phost, port, true, nil
+ }
+ }
+ return []ipc.Address{addrFromIP(ip)}, host, port, true, nil
+ }
+ // Listen used a fixed IP address, which we take to mean that
+ // roaming is not desired.
+ return []ipc.Address{addrFromIP(ip)}, host, port, false, nil
+}
+
+// configureEPAndRoaming configures the endpoint and roaming. In particular,
+// it fills in the Address portion of the endpoint with the appropriately
+// selected network address and creates a dhcpListener struct if roaming
+// is enabled.
+func (s *server) configureEPAndRoaming(spec ipc.ListenSpec, ep naming.Endpoint) (*dhcpListener, *inaming.Endpoint, error) {
iep, ok := ep.(*inaming.Endpoint)
if !ok {
- return nil, fmt.Errorf("ipc: Listen on %v %v failed translating internal endpoint data types", protocol, address)
+ return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
}
-
- if protocol != inaming.Network {
- // We know the endpoint format, so we crack it open...
- switch iep.Protocol {
- case "tcp", "tcp4", "tcp6":
- host, port, err := net.SplitHostPort(iep.Address)
- if err != nil {
- return nil, err
- }
- ip := net.ParseIP(host)
- if ip == nil {
- return nil, fmt.Errorf("ipc: Listen(%q, %q) failed to parse IP address from address", protocol, address)
- }
- if ip.IsUnspecified() {
- addrs, err := netstate.GetAccessibleIPs()
- if err == nil {
- if a, err := ipAddressChooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
- iep.Address = net.JoinHostPort(a[0].Address().String(), port)
- }
- }
- }
+ if !strings.HasPrefix(spec.Protocol, "tcp") {
+ return nil, iep, nil
+ }
+ pubAddrs, pubHost, pubPort, roaming, err := s.getIPRoamingAddrs(spec.AddressChooser, iep)
+ if err != nil {
+ return nil, iep, err
+ }
+ iep.Address = net.JoinHostPort(pubHost, pubPort)
+ if !roaming {
+ vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", spec.Address)
+ }
+ publisher := spec.StreamPublisher
+ if roaming && publisher != nil {
+ streamName := spec.StreamName
+ ch := make(chan config.Setting)
+ if _, err := publisher.ForkStream(streamName, ch); err != nil {
+ return nil, iep, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
}
+ return &dhcpListener{ep: iep, pubAddrs: pubAddrs, pubPort: pubPort, ch: ch, name: streamName, publisher: publisher}, iep, nil
}
-
- s.Lock()
- if s.stopped {
- s.Unlock()
- // Ignore error return since we can't really do much about it.
- ln.Close()
- return nil, errServerStopped
- }
- s.listeners[ln] = nil
- // We have a single goroutine per listener to accept new flows.
- // Each flow is served from its own goroutine.
- s.active.Add(1)
- if protocol == inaming.Network {
- go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
- s.proxyListenLoop(ln, ep, proxy)
- s.active.Done()
- }(ln, iep, proxyName)
- } else {
- go func(ln stream.Listener, ep naming.Endpoint) {
- s.listenLoop(ln, ep)
- s.active.Done()
- }(ln, iep)
- }
- s.Unlock()
- s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
- return ep, nil
-}
-*/
-
-// externalEndpoint examines the endpoint returned by the stream listen call
-// and fills in the address to publish to the mount table. It also returns the
-// IP host address that it selected for publishing to the mount table.
-func (s *server) externalEndpoint(chooser ipc.AddressChooser, lep naming.Endpoint) (*inaming.Endpoint, *net.IPAddr, error) {
- // We know the endpoint format, so we crack it open...
- iep, ok := lep.(*inaming.Endpoint)
- if !ok {
- return nil, nil, fmt.Errorf("failed translating internal endpoint data types")
- }
- switch iep.Protocol {
- case "tcp", "tcp4", "tcp6":
- host, port, err := net.SplitHostPort(iep.Address)
- if err != nil {
- return nil, nil, err
- }
- ip := net.ParseIP(host)
- if ip == nil {
- return nil, nil, fmt.Errorf("failed to parse %q as an IP host", host)
- }
- if ip.IsUnspecified() && chooser != nil {
- // Need to find a usable IP address since the call to listen
- // didn't specify one.
- addrs, err := netstate.GetAccessibleIPs()
- if err == nil {
- // TODO(cnicolaou): we could return multiple addresses here,
- // all of which can be exported to the mount table. Look at
- // this after we transition fully to ListenX.
- if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
- iep.Address = net.JoinHostPort(a[0].Address().String(), port)
- return iep, a[0].Address().(*net.IPAddr), nil
- }
- }
- } else {
- // Listen used a fixed IP address, which essentially disables
- // roaming.
- return iep, nil, nil
- }
- }
- return iep, nil, nil
+ return nil, iep, nil
}
func (s *server) Listen(listenSpec ipc.ListenSpec) (naming.Endpoint, error) {
@@ -296,100 +210,67 @@
}
s.Unlock()
- protocol := listenSpec.Protocol
- address := listenSpec.Address
- proxyAddress := ""
- if len(listenSpec.Proxy) > 0 {
- if address, err := s.resolveToAddress(listenSpec.Proxy); err != nil {
+ var iep *inaming.Endpoint
+ var dhcpl *dhcpListener
+ var ln stream.Listener
+
+ if len(listenSpec.Address) > 0 {
+ // Listen if we have a local address to listen on. Some situations
+ // just need a proxy (e.g. a browser extension).
+ tmpln, lep, err := s.streamMgr.Listen(listenSpec.Protocol, listenSpec.Address, s.listenerOpts...)
+ if err != nil {
+ vlog.Errorf("ipc: Listen on %s failed: %s", listenSpec, err)
+ return nil, err
+ }
+ ln = tmpln
+ if tmpdhcpl, tmpiep, err := s.configureEPAndRoaming(listenSpec, lep); err != nil {
+ ln.Close()
return nil, err
} else {
- proxyAddress = address
+ dhcpl = tmpdhcpl
+ iep = tmpiep
}
}
- ln, lep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
- if err != nil {
- vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
- return nil, err
- }
- ep, ipaddr, err := s.externalEndpoint(listenSpec.AddressChooser, lep)
- if err != nil {
- ln.Close()
- return nil, err
- }
-
s.Lock()
+ defer s.Unlock()
if s.stopped {
- s.Unlock()
- // Ignore error return since we can't really do much about it.
ln.Close()
return nil, errServerStopped
}
- var ip net.IP
- if ipaddr != nil {
- ip = net.ParseIP(ipaddr.String())
- } else {
- vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", address)
- }
- publisher := listenSpec.StreamPublisher
- if ip != nil && !ip.IsLoopback() && publisher != nil {
- streamName := listenSpec.StreamName
- ch := make(chan config.Setting)
- _, err := publisher.ForkStream(streamName, ch)
- if err != nil {
- return nil, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
- }
- _, port, _ := net.SplitHostPort(ep.Address)
- dhcpl := &dhcpListener{ep: ep, port: port, ch: ch, name: streamName, publisher: publisher}
-
+ if dhcpl != nil {
// We have a goroutine to listen for dhcp changes.
- s.active.Add(1)
- // goroutine to listen for address changes.
- go func(dl *dhcpListener) {
- s.dhcpLoop(dl)
+ go func() {
+ s.active.Add(1)
+ s.dhcpLoop(dhcpl)
s.active.Done()
- }(dhcpl)
+ }()
s.listeners[ln] = dhcpl
- } else {
+ } else if ln != nil {
s.listeners[ln] = nil
}
- // We have a goroutine per listener to accept new flows.
- // Each flow is served from its own goroutine.
- s.active.Add(1)
-
- // goroutine to listen for connections
- go func(ln stream.Listener, ep naming.Endpoint) {
- s.listenLoop(ln, ep)
- s.active.Done()
- }(ln, lep)
-
- if len(proxyAddress) > 0 {
- pln, pep, err := s.streamMgr.Listen(inaming.Network, proxyAddress, s.listenerOpts...)
- if err != nil {
- vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
- return nil, err
- }
- ipep, ok := pep.(*inaming.Endpoint)
- if !ok {
- return nil, fmt.Errorf("failed translating internal endpoint data types")
- }
- // We have a goroutine for listening on proxy connections.
- s.active.Add(1)
- go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
- s.proxyListenLoop(ln, ep, proxy)
+ if iep != nil {
+ // We have a goroutine per listener to accept new flows.
+ // Each flow is served from its own goroutine.
+ go func() {
+ s.active.Add(1)
+ s.listenLoop(ln, iep)
s.active.Done()
- }(pln, ipep, listenSpec.Proxy)
- s.listeners[pln] = nil
- // TODO(cnicolaou,p): AddServer no longer needs to take the
- // servesMountTable bool since it can be extracted from the endpoint.
- s.publisher.AddServer(s.publishEP(ipep, s.servesMountTable), s.servesMountTable)
- } else {
- s.publisher.AddServer(s.publishEP(ep, s.servesMountTable), s.servesMountTable)
+ }()
+ s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
}
- s.Unlock()
- return ep, nil
+
+ if len(listenSpec.Proxy) > 0 {
+ // We have a goroutine for listening on proxy connections.
+ go func() {
+ s.active.Add(1)
+ s.proxyListenLoop(listenSpec.Proxy)
+ s.active.Done()
+ }()
+ }
+ return iep, nil
}
// TODO(cnicolaou): Take this out or make the ServesMountTable bit work in the endpoint.
@@ -399,63 +280,83 @@
return naming.JoinAddressName(ep.String(), name)
}
-func (s *server) proxyListenLoop(ln stream.Listener, iep *inaming.Endpoint, proxy string) {
+func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
+ resolved, err := s.resolveToAddress(proxy)
+ if err != nil {
+ return nil, nil, fmt.Errorf("Failed to resolve proxy %q (%v)", proxy, err)
+ }
+ ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
+ if err != nil {
+ return nil, nil, fmt.Errorf("failed to listen on %q: %s", resolved, err)
+ }
+ iep, ok := ep.(*inaming.Endpoint)
+ if !ok {
+ ln.Close()
+ return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
+ }
+ s.Lock()
+ s.listeners[ln] = nil
+ s.Unlock()
+ s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
+ return iep, ln, nil
+}
+
+func (s *server) proxyListenLoop(proxy string) {
const (
min = 5 * time.Millisecond
max = 5 * time.Minute
)
+
+ iep, ln, err := s.reconnectAndPublishProxy(proxy)
+ if err != nil {
+ vlog.VI(1).Infof("Failed to connect to proxy: %s", err)
+ }
+ // the initial connection maybe have failed, but we enter the retry
+ // loop anyway so that we will continue to try and connect to the
+ // proxy.
+
+ s.Lock()
+ if s.stopped {
+ s.Unlock()
+ return
+ }
+ s.Unlock()
+
for {
- s.listenLoop(ln, iep)
- // The listener is done, so:
- // (1) Unpublish its name
- s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
+ if ln != nil && iep != nil {
+ s.listenLoop(ln, iep)
+ // The listener is done, so:
+ // (1) Unpublish its name
+ s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
+ }
+
+ s.Lock()
+ if s.stopped {
+ s.Unlock()
+ return
+ }
+ s.Unlock()
+
// (2) Reconnect to the proxy unless the server has been stopped
backoff := min
ln = nil
- // TODO(ashankar,cnicolaou): this code is way too confusing and should
- // be cleaned up.
- for ln == nil {
+ for {
select {
case <-time.After(backoff):
- resolved, err := s.resolveToAddress(proxy)
- if err != nil {
- vlog.VI(1).Infof("Failed to resolve proxy %q (%v), will retry in %v", proxy, err, backoff)
- if backoff = backoff * 2; backoff > max {
- backoff = max
- }
- break
- }
- var ep naming.Endpoint
- ln, ep, err = s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
- if err == nil {
- var ok bool
- iep, ok = ep.(*inaming.Endpoint)
- if !ok {
- vlog.Errorf("failed translating internal endpoint data types")
- ln = nil
- continue
- }
- vlog.VI(1).Infof("Reconnected to proxy at %q listener: (%v, %v)", proxy, ln, iep)
- break
- }
if backoff = backoff * 2; backoff > max {
backoff = max
}
- vlog.VI(1).Infof("Proxy reconnection failed, will retry in %v", backoff)
case <-s.stoppedChan:
return
}
+ // (3) reconnect, publish new address
+ if iep, ln, err = s.reconnectAndPublishProxy(proxy); err != nil {
+ vlog.VI(1).Infof("Failed to reconnect to proxy %q: %s", proxy, err)
+ } else {
+ vlog.VI(1).Infof("Reconnected to proxy %q, %s", proxy, iep)
+ break
+ }
}
- // TODO(cnicolaou,ashankar): this won't work when we are both
- // proxying and publishing locally, which is the common case.
- // listenLoop, dhcpLoop and the original publish are all publishing
- // addresses to the same name, but the client is not smart enough
- // to choose sensibly between them.
- // (3) reconnected, publish new address
- s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
- s.Lock()
- s.listeners[ln] = nil
- s.Unlock()
}
}
@@ -491,7 +392,7 @@
defer dhcpl.Unlock()
for _, a := range addrs {
if ip := netstate.AsIP(a); ip != nil {
- dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.port)
+ dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.pubPort)
fn(s.publishEP(dhcpl.ep, s.servesMountTable))
}
}
@@ -500,6 +401,14 @@
func (s *server) dhcpLoop(dhcpl *dhcpListener) {
defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
vlog.VI(2).Infof("ipc: dhcp loop")
+
+ ep := *dhcpl.ep
+ // Publish all of the addresses
+ for _, pubAddr := range dhcpl.pubAddrs {
+ ep.Address = net.JoinHostPort(pubAddr.Address().String(), dhcpl.pubPort)
+ s.publisher.AddServer(s.publishEP(&ep, s.servesMountTable), s.servesMountTable)
+ }
+
for setting := range dhcpl.ch {
if setting == nil {
return
@@ -513,11 +422,6 @@
s.Unlock()
return
}
- // TODO(cnicolaou,ashankar): this won't work when we are both
- // proxying and publishing locally, which is the common case.
- // listenLoop, dhcpLoop and the original publish are all publishing
- // addresses to the same name, but the client is not smart enough
- // to choose sensibly between them.
publisher := s.publisher
s.Unlock()
switch setting.Name() {
@@ -528,7 +432,6 @@
vlog.Infof("Removed some addresses: %q", v)
s.applyChange(dhcpl, v, publisher.RemoveServer)
}
-
}
}
}
@@ -626,6 +529,7 @@
s.publisher.WaitForStop()
s.Lock()
+
// Close all listeners. No new flows will be accepted, while in-flight
// flows will continue until they terminate naturally.
nListeners := len(s.listeners)
@@ -642,6 +546,7 @@
dhcpl.Unlock()
}
}
+
s.Unlock()
var firstErr error
for i := 0; i < nListeners; i++ {
@@ -654,6 +559,7 @@
// Wait for the publisher and active listener + flows to finish.
s.active.Wait()
+
s.Lock()
s.disp = nil
s.Unlock()
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index f4e5a80..02088f5 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -4,19 +4,21 @@
"fmt"
"io"
"os"
+ "reflect"
+ "sort"
"strings"
"testing"
"time"
+ "veyron.io/veyron/veyron2/ipc"
"veyron.io/veyron/veyron2/naming"
"veyron.io/veyron/veyron/lib/expect"
- "veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
-
"veyron.io/veyron/veyron/lib/modules"
imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/sectest"
+ "veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
inaming "veyron.io/veyron/veyron/runtimes/google/naming"
tnaming "veyron.io/veyron/veyron/runtimes/google/testing/mocks/naming"
)
@@ -113,7 +115,22 @@
return h.ns.Unmount(testContext(), "proxy", h.mount)
}
+func TestProxyOnly(t *testing.T) {
+ listenSpec := ipc.ListenSpec{Proxy: "proxy"}
+ testProxy(t, listenSpec)
+}
+
func TestProxy(t *testing.T) {
+ listenSpec := ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0", Proxy: "proxy"}
+ testProxy(t, listenSpec)
+}
+
+func addrOnly(name string) string {
+ addr, _ := naming.SplitAddressName(name)
+ return addr
+}
+
+func testProxy(t *testing.T, spec ipc.ListenSpec) {
sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
ns := tnaming.NewSimpleNamespace()
client, err := InternalNewClient(sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("client")})
@@ -127,6 +144,10 @@
}
defer server.Stop()
+ // If no address is specified then we'll only 'listen' via
+ // the proxy.
+ hasLocalListener := len(spec.Address) != 0
+
name := "mountpoint/server/suffix"
makeCall := func() (string, error) {
ctx, _ := testContext().WithDeadline(time.Now().Add(5 * time.Second))
@@ -148,15 +169,69 @@
t.Fatal(err)
}
defer proxy.Stop()
- spec := listenSpec
- spec.Proxy = "proxy"
- if _, err := server.Listen(spec); err != nil {
+ addrs := verifyMount(t, ns, "proxy")
+ if len(addrs) != 1 {
+ t.Fatalf("failed to lookup proxy")
+ }
+ proxyEP := addrOnly(addrs[0])
+
+ ep, err := server.Listen(spec)
+ if err != nil {
t.Fatal(err)
}
if err := server.ServeDispatcher("mountpoint/server", testServerDisp{&testServer{}}); err != nil {
t.Fatal(err)
}
- verifyMount(t, ns, name)
+
+ ch := make(chan struct{})
+ // Proxy connections are started asynchronously, so we need to wait..
+ waitfor := func(expect int) {
+ for {
+ addrs, _ := ns.Resolve(testContext(), name)
+ if len(addrs) == expect {
+ close(ch)
+ return
+ }
+ time.Sleep(100 * time.Millisecond)
+ }
+ }
+
+ proxiedEP, err := inaming.NewEndpoint(proxyEP)
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ proxiedEP.RID = naming.FixedRoutingID(0x555555555)
+ expectedEndpoints := []string{proxiedEP.String()}
+ if hasLocalListener {
+ expectedEndpoints = append(expectedEndpoints, ep.String())
+ }
+
+ // Proxy connetions are created asynchronously, so we wait for the
+ // expected number of endpoints to appear for the specified service name.
+ go waitfor(len(expectedEndpoints))
+ select {
+ case <-time.After(time.Minute):
+ t.Fatalf("timedout waiting for two entries in the mount table")
+ case <-ch:
+ }
+
+ got := []string{}
+ for _, s := range verifyMount(t, ns, name) {
+ got = append(got, addrOnly(s))
+ }
+ sort.Strings(got)
+ sort.Strings(expectedEndpoints)
+ if !reflect.DeepEqual(got, expectedEndpoints) {
+ t.Errorf("got %v, want %v", got, expectedEndpoints)
+ }
+
+ if hasLocalListener {
+ // Listen will publish both the local and proxied endpoint with the
+ // mount table, given that we're trying to test the proxy, we remove
+ // the local endpoint from the mount table entry!
+ ns.Unmount(testContext(), "mountpoint/server", naming.JoinAddressName(ep.String(), ""))
+ }
+
// Proxied endpoint should be published and RPC should succeed (through proxy)
const expected = `method:"Echo",suffix:"suffix",arg:"batman"`
if result, err := makeCall(); result != expected || err != nil {
diff --git a/runtimes/google/ipc/stream/manager/listener.go b/runtimes/google/ipc/stream/manager/listener.go
index 0d67c12..a0f2789 100644
--- a/runtimes/google/ipc/stream/manager/listener.go
+++ b/runtimes/google/ipc/stream/manager/listener.go
@@ -6,6 +6,7 @@
"net"
"strings"
"sync"
+ "time"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vif"
@@ -154,7 +155,9 @@
func (ln *proxyListener) connect() (*vif.VIF, naming.Endpoint, error) {
vlog.VI(1).Infof("Connecting to proxy at %v", ln.proxyEP)
- vf, err := ln.manager.FindOrDialVIF(ln.proxyEP.Addr())
+ // TODO(cnicolaou, ashankar): probably want to set a timeout here.
+ var timeout time.Duration
+ vf, err := ln.manager.FindOrDialVIF(ln.proxyEP.Addr(), timeout)
if err != nil {
return nil, nil, err
}
diff --git a/runtimes/google/ipc/stream/manager/manager.go b/runtimes/google/ipc/stream/manager/manager.go
index bc8284e..cfb4ad0 100644
--- a/runtimes/google/ipc/stream/manager/manager.go
+++ b/runtimes/google/ipc/stream/manager/manager.go
@@ -7,6 +7,7 @@
"net"
"strings"
"sync"
+ "time"
"veyron.io/veyron/veyron/lib/stats"
"veyron.io/veyron/veyron/runtimes/google/ipc/stream/crypto"
@@ -54,23 +55,28 @@
var _ stream.Manager = (*manager)(nil)
-func dial(network, address string) (net.Conn, error) {
+type DialTimeout struct{ time.Duration }
+
+func (DialTimeout) IPCStreamVCOpt() {}
+func (DialTimeout) IPCClientOpt() {}
+
+func dial(network, address string, timeout time.Duration) (net.Conn, error) {
if d, _ := stream.RegisteredProtocol(network); d != nil {
return d(address)
}
- return net.Dial(network, address)
+ return net.DialTimeout(network, address, timeout)
}
// FindOrDialVIF returns the network connection (VIF) to the provided address
// from the cache in the manager. If not already present in the cache, a new
// connection will be created using net.Dial.
-func (m *manager) FindOrDialVIF(addr net.Addr) (*vif.VIF, error) {
+func (m *manager) FindOrDialVIF(addr net.Addr, timeout time.Duration) (*vif.VIF, error) {
network, address := addr.Network(), addr.String()
if vf := m.vifs.Find(network, address); vf != nil {
return vf, nil
}
vlog.VI(1).Infof("(%q, %q) not in VIF cache. Dialing", network, address)
- conn, err := dial(network, address)
+ conn, err := dial(network, address, timeout)
if err != nil {
return nil, fmt.Errorf("net.Dial(%q, %q) failed: %v", network, address, err)
}
@@ -104,10 +110,17 @@
}
func (m *manager) Dial(remote naming.Endpoint, opts ...stream.VCOpt) (stream.VC, error) {
+ var timeout time.Duration
+ for _, o := range opts {
+ switch v := o.(type) {
+ case *DialTimeout:
+ timeout = v.Duration
+ }
+ }
// If vif.Dial fails because the cached network connection was broken, remove from
// the cache and try once more.
for retry := true; true; retry = false {
- vf, err := m.FindOrDialVIF(remote.Addr())
+ vf, err := m.FindOrDialVIF(remote.Addr(), timeout)
if err != nil {
return nil, err
}
diff --git a/runtimes/google/ipc/stream/manager/manager_test.go b/runtimes/google/ipc/stream/manager/manager_test.go
index 873d882..a4a7b25 100644
--- a/runtimes/google/ipc/stream/manager/manager_test.go
+++ b/runtimes/google/ipc/stream/manager/manager_test.go
@@ -124,6 +124,27 @@
}
}
+func TestConnectionTimeout(t *testing.T) {
+ client := InternalNew(naming.FixedRoutingID(0xcccccccc))
+
+ ch := make(chan error)
+ go func() {
+ // 203.0.113.0 is TEST-NET-3 from RFC5737
+ ep, _ := inaming.NewEndpoint(naming.FormatEndpoint("tcp", "203.0.113.10:80"))
+ _, err := client.Dial(ep, &DialTimeout{time.Second})
+ ch <- err
+ }()
+
+ select {
+ case err := <-ch:
+ if err == nil {
+ t.Fatalf("expected an error")
+ }
+ case <-time.After(time.Minute):
+ t.Fatalf("timedout")
+ }
+}
+
func TestAuthenticatedByDefault(t *testing.T) {
var (
server = InternalNew(naming.FixedRoutingID(0x55555555))
@@ -473,7 +494,7 @@
}
}
-// Required by blackbox framework
+// Needed by modules framework
func TestHelperProcess(t *testing.T) {
modules.DispatchInTest()
}
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index 6f3de3b..d5abc7a 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -3,6 +3,7 @@
import (
"fmt"
"math/rand"
+ "time"
iipc "veyron.io/veyron/veyron/runtimes/google/ipc"
imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
@@ -36,7 +37,11 @@
}
}
// Add the option that provides the runtime's principal to the client.
- otherOpts = append(otherOpts, vc.LocalPrincipal{rt.principal})
+ // Set a low timeout for now, until we get parallel connections
+ // going.
+ // TODO(cnicolaou): extend the timeout when parallel connections are
+ // going.
+ otherOpts = append(otherOpts, vc.LocalPrincipal{rt.principal}, &imanager.DialTimeout{5 * time.Second})
return iipc.InternalNewClient(sm, ns, otherOpts...)
}