veyron/runtimes/google/ipc: refactor Listen loop code - back from the rollback...

- deleted commented out old code.
- cleaned up Listen implementation, the only behavioural difference
is that we now publish both local+proxy addresses for objects that
are being proxied.
- cleaned up the proxy retry loop.
- made it possible to use the proxy without a local listener.
- Dial takes a timeout argument.

Change-Id: Ia7598de824593b937d9d562f0597c7dc4100f88e
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index e828b51..8451c3e 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -247,9 +247,11 @@
 			return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err)
 		}
 	}
+
 	// Try all servers, and if none of them are authorized for the call then return the error of the last server
 	// that was tried.
 	var lastErr verror.E
+	// TODO(cnicolaou): sort servers by sensible metric.
 	for _, server := range servers {
 		flow, suffix, err := c.connectFlow(server)
 		if err != nil {
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index 0cb0709..d9209f4 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -5,7 +5,9 @@
 	"fmt"
 	"io"
 	"net"
+	"path/filepath"
 	"reflect"
+	"runtime"
 	"strings"
 	"sync"
 	"testing"
@@ -189,15 +191,23 @@
 	return ep, server
 }
 
-func verifyMount(t *testing.T, ns naming.Namespace, name string) {
-	if _, err := ns.Resolve(testContext(), name); err != nil {
-		t.Errorf("%s not found in mounttable", name)
+func loc(d int) string {
+	_, file, line, _ := runtime.Caller(d + 1)
+	return fmt.Sprintf("%s:%d", filepath.Base(file), line)
+}
+
+func verifyMount(t *testing.T, ns naming.Namespace, name string) []string {
+	addrs, err := ns.Resolve(testContext(), name)
+	if err != nil {
+		t.Errorf("%s: %s not found in mounttable", loc(1), name)
+		return nil
 	}
+	return addrs
 }
 
 func verifyMountMissing(t *testing.T, ns naming.Namespace, name string) {
 	if servers, err := ns.Resolve(testContext(), name); err == nil {
-		t.Errorf("%s not supposed to be found in mounttable; got %d servers instead", name, len(servers))
+		t.Errorf("%s: %s not supposed to be found in mounttable; got %d servers instead", loc(1), name, len(servers))
 	}
 }
 
diff --git a/runtimes/google/ipc/server.go b/runtimes/google/ipc/server.go
index 454ca88..c5c9d02 100644
--- a/runtimes/google/ipc/server.go
+++ b/runtimes/google/ipc/server.go
@@ -58,10 +58,11 @@
 
 type dhcpListener struct {
 	sync.Mutex
-	publisher *config.Publisher // publisher used to fork the stream
-	name      string            // name of the publisher stream
-	ep        *inaming.Endpoint // endpoint returned after listening and choosing an address to be published
-	port      string
+	publisher *config.Publisher   // publisher used to fork the stream
+	name      string              // name of the publisher stream
+	ep        *inaming.Endpoint   // endpoint returned after listening
+	pubAddrs  []ipc.Address       // addresses to publish
+	pubPort   string              // port to use with the publish addresses
 	ch        chan config.Setting // channel to receive settings over
 }
 
@@ -128,161 +129,74 @@
 	return "", fmt.Errorf("unable to resolve %q to an endpoint", address)
 }
 
-/*
-// ipAddressChooser returns the preferred IP address, which is,
-// a public IPv4 address, then any non-loopback IPv4, then a public
-// IPv6 address and finally any non-loopback/link-local IPv6
-// It is replicated here to avoid a circular dependency and will, in any case,
-// go away when we transition away from Listen to the ListenX API.
-func ipAddressChooser(network string, addrs []ipc.Address) ([]ipc.Address, error) {
-	if !netstate.IsIPProtocol(network) {
-		return nil, fmt.Errorf("can't support network protocol %q", network)
+func addrFromIP(ip net.IP) ipc.Address {
+	return &netstate.AddrIfc{
+		Addr: &net.IPAddr{IP: ip},
 	}
-	accessible := netstate.AddrList(addrs)
-
-	// Try and find an address on a interface with a default route.
-	predicates := []netstate.AddressPredicate{netstate.IsPublicUnicastIPv4,
-		netstate.IsUnicastIPv4, netstate.IsPublicUnicastIPv6}
-	for _, predicate := range predicates {
-		if addrs := accessible.Filter(predicate); len(addrs) > 0 {
-			onDefaultRoutes := addrs.Filter(netstate.IsOnDefaultRoute)
-			if len(onDefaultRoutes) > 0 {
-				return onDefaultRoutes, nil
-			}
-		}
-	}
-
-	// We failed to find any addresses with default routes, try again
-	// but without the default route requirement.
-	for _, predicate := range predicates {
-		if addrs := accessible.Filter(predicate); len(addrs) > 0 {
-			return addrs, nil
-		}
-	}
-
-	return nil, fmt.Errorf("failed to find any usable address for %q", network)
 }
 
-func (s *server) Listen(protocol, address string) (naming.Endpoint, error) {
-	defer vlog.LogCall()()
-	s.Lock()
-	// Shortcut if the server is stopped, to avoid needlessly creating a
-	// listener.
-	if s.stopped {
-		s.Unlock()
-		return nil, errServerStopped
-	}
-	s.Unlock()
-	var proxyName string
-	if protocol == inaming.Network {
-		proxyName = address
-		var err error
-		if address, err = s.resolveToAddress(address); err != nil {
-			return nil, err
-		}
-	}
-	// TODO(cnicolaou): pass options.ServesMountTable to streamMgr.Listen so that
-	// it can more cleanly set the IsMountTable bit in the endpoint.
-	ln, ep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
+// getIPRoamingAddrs finds an appropriate set of addresss to publish
+// externally and also determines if it's sensible to allow roaming.
+// It returns the host address of the first suitable address that
+// can be used and the port number that can be used with all addresses.
+// The host is required to allow the caller to construct an endpoint
+// that can be returned to the caller of Listen.
+func (s *server) getIPRoamingAddrs(chooser ipc.AddressChooser, iep *inaming.Endpoint) (addresses []ipc.Address, host string, port string, roaming bool, err error) {
+	host, port, err = net.SplitHostPort(iep.Address)
 	if err != nil {
-		vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
-		return nil, err
+		return nil, "", "", false, err
 	}
+	ip := net.ParseIP(host)
+	if ip == nil {
+		return nil, "", "", false, fmt.Errorf("failed to parse %q as an IP host", host)
+	}
+	if ip.IsUnspecified() && chooser != nil {
+		// Need to find a usable IP address since the call to listen
+		// didn't specify one.
+		if addrs, err := netstate.GetAccessibleIPs(); err == nil {
+			if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
+				phost := a[0].Address().String()
+				iep.Address = net.JoinHostPort(phost, port)
+				return a, phost, port, true, nil
+			}
+		}
+		return []ipc.Address{addrFromIP(ip)}, host, port, true, nil
+	}
+	// Listen used a fixed IP address, which we take to mean that
+	// roaming is not desired.
+	return []ipc.Address{addrFromIP(ip)}, host, port, false, nil
+}
+
+// configureEPAndRoaming configures the endpoint and roaming. In particular,
+// it fills in the Address portion of the endpoint with the appropriately
+// selected network address and creates a dhcpListener struct if roaming
+// is enabled.
+func (s *server) configureEPAndRoaming(spec ipc.ListenSpec, ep naming.Endpoint) (*dhcpListener, *inaming.Endpoint, error) {
 	iep, ok := ep.(*inaming.Endpoint)
 	if !ok {
-		return nil, fmt.Errorf("ipc: Listen on %v %v failed translating internal endpoint data types", protocol, address)
+		return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
 	}
-
-	if protocol != inaming.Network {
-		// We know the endpoint format, so we crack it open...
-		switch iep.Protocol {
-		case "tcp", "tcp4", "tcp6":
-			host, port, err := net.SplitHostPort(iep.Address)
-			if err != nil {
-				return nil, err
-			}
-			ip := net.ParseIP(host)
-			if ip == nil {
-				return nil, fmt.Errorf("ipc: Listen(%q, %q) failed to parse IP address from address", protocol, address)
-			}
-			if ip.IsUnspecified() {
-				addrs, err := netstate.GetAccessibleIPs()
-				if err == nil {
-					if a, err := ipAddressChooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
-						iep.Address = net.JoinHostPort(a[0].Address().String(), port)
-					}
-				}
-			}
+	if !strings.HasPrefix(spec.Protocol, "tcp") {
+		return nil, iep, nil
+	}
+	pubAddrs, pubHost, pubPort, roaming, err := s.getIPRoamingAddrs(spec.AddressChooser, iep)
+	if err != nil {
+		return nil, iep, err
+	}
+	iep.Address = net.JoinHostPort(pubHost, pubPort)
+	if !roaming {
+		vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", spec.Address)
+	}
+	publisher := spec.StreamPublisher
+	if roaming && publisher != nil {
+		streamName := spec.StreamName
+		ch := make(chan config.Setting)
+		if _, err := publisher.ForkStream(streamName, ch); err != nil {
+			return nil, iep, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
 		}
+		return &dhcpListener{ep: iep, pubAddrs: pubAddrs, pubPort: pubPort, ch: ch, name: streamName, publisher: publisher}, iep, nil
 	}
-
-	s.Lock()
-	if s.stopped {
-		s.Unlock()
-		// Ignore error return since we can't really do much about it.
-		ln.Close()
-		return nil, errServerStopped
-	}
-	s.listeners[ln] = nil
-	// We have a single goroutine per listener to accept new flows.
-	// Each flow is served from its own goroutine.
-	s.active.Add(1)
-	if protocol == inaming.Network {
-		go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
-			s.proxyListenLoop(ln, ep, proxy)
-			s.active.Done()
-		}(ln, iep, proxyName)
-	} else {
-		go func(ln stream.Listener, ep naming.Endpoint) {
-			s.listenLoop(ln, ep)
-			s.active.Done()
-		}(ln, iep)
-	}
-	s.Unlock()
-	s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
-	return ep, nil
-}
-*/
-
-// externalEndpoint examines the endpoint returned by the stream listen call
-// and fills in the address to publish to the mount table. It also returns the
-// IP host address that it selected for publishing to the mount table.
-func (s *server) externalEndpoint(chooser ipc.AddressChooser, lep naming.Endpoint) (*inaming.Endpoint, *net.IPAddr, error) {
-	// We know the endpoint format, so we crack it open...
-	iep, ok := lep.(*inaming.Endpoint)
-	if !ok {
-		return nil, nil, fmt.Errorf("failed translating internal endpoint data types")
-	}
-	switch iep.Protocol {
-	case "tcp", "tcp4", "tcp6":
-		host, port, err := net.SplitHostPort(iep.Address)
-		if err != nil {
-			return nil, nil, err
-		}
-		ip := net.ParseIP(host)
-		if ip == nil {
-			return nil, nil, fmt.Errorf("failed to parse %q as an IP host", host)
-		}
-		if ip.IsUnspecified() && chooser != nil {
-			// Need to find a usable IP address since the call to listen
-			// didn't specify one.
-			addrs, err := netstate.GetAccessibleIPs()
-			if err == nil {
-				// TODO(cnicolaou): we could return multiple addresses here,
-				// all of which can be exported to the mount table. Look at
-				// this after we transition fully to ListenX.
-				if a, err := chooser(iep.Protocol, addrs); err == nil && len(a) > 0 {
-					iep.Address = net.JoinHostPort(a[0].Address().String(), port)
-					return iep, a[0].Address().(*net.IPAddr), nil
-				}
-			}
-		} else {
-			// Listen used a fixed IP address, which essentially disables
-			// roaming.
-			return iep, nil, nil
-		}
-	}
-	return iep, nil, nil
+	return nil, iep, nil
 }
 
 func (s *server) Listen(listenSpec ipc.ListenSpec) (naming.Endpoint, error) {
@@ -296,100 +210,67 @@
 	}
 	s.Unlock()
 
-	protocol := listenSpec.Protocol
-	address := listenSpec.Address
-	proxyAddress := ""
-	if len(listenSpec.Proxy) > 0 {
-		if address, err := s.resolveToAddress(listenSpec.Proxy); err != nil {
+	var iep *inaming.Endpoint
+	var dhcpl *dhcpListener
+	var ln stream.Listener
+
+	if len(listenSpec.Address) > 0 {
+		// Listen if we have a local address to listen on. Some situations
+		// just need a proxy (e.g. a browser extension).
+		tmpln, lep, err := s.streamMgr.Listen(listenSpec.Protocol, listenSpec.Address, s.listenerOpts...)
+		if err != nil {
+			vlog.Errorf("ipc: Listen on %s failed: %s", listenSpec, err)
+			return nil, err
+		}
+		ln = tmpln
+		if tmpdhcpl, tmpiep, err := s.configureEPAndRoaming(listenSpec, lep); err != nil {
+			ln.Close()
 			return nil, err
 		} else {
-			proxyAddress = address
+			dhcpl = tmpdhcpl
+			iep = tmpiep
 		}
 	}
 
-	ln, lep, err := s.streamMgr.Listen(protocol, address, s.listenerOpts...)
-	if err != nil {
-		vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
-		return nil, err
-	}
-	ep, ipaddr, err := s.externalEndpoint(listenSpec.AddressChooser, lep)
-	if err != nil {
-		ln.Close()
-		return nil, err
-	}
-
 	s.Lock()
+	defer s.Unlock()
 	if s.stopped {
-		s.Unlock()
-		// Ignore error return since we can't really do much about it.
 		ln.Close()
 		return nil, errServerStopped
 	}
 
-	var ip net.IP
-	if ipaddr != nil {
-		ip = net.ParseIP(ipaddr.String())
-	} else {
-		vlog.VI(2).Infof("the address %q requested for listening contained a fixed IP address which disables roaming, use :0 instead", address)
-	}
-	publisher := listenSpec.StreamPublisher
-	if ip != nil && !ip.IsLoopback() && publisher != nil {
-		streamName := listenSpec.StreamName
-		ch := make(chan config.Setting)
-		_, err := publisher.ForkStream(streamName, ch)
-		if err != nil {
-			return nil, fmt.Errorf("failed to fork stream %q: %s", streamName, err)
-		}
-		_, port, _ := net.SplitHostPort(ep.Address)
-		dhcpl := &dhcpListener{ep: ep, port: port, ch: ch, name: streamName, publisher: publisher}
-
+	if dhcpl != nil {
 		// We have a goroutine to listen for dhcp changes.
-		s.active.Add(1)
-		// goroutine to listen for address changes.
-		go func(dl *dhcpListener) {
-			s.dhcpLoop(dl)
+		go func() {
+			s.active.Add(1)
+			s.dhcpLoop(dhcpl)
 			s.active.Done()
-		}(dhcpl)
+		}()
 		s.listeners[ln] = dhcpl
-	} else {
+	} else if ln != nil {
 		s.listeners[ln] = nil
 	}
 
-	// We have a goroutine per listener to accept new flows.
-	// Each flow is served from its own goroutine.
-	s.active.Add(1)
-
-	//  goroutine to listen for connections
-	go func(ln stream.Listener, ep naming.Endpoint) {
-		s.listenLoop(ln, ep)
-		s.active.Done()
-	}(ln, lep)
-
-	if len(proxyAddress) > 0 {
-		pln, pep, err := s.streamMgr.Listen(inaming.Network, proxyAddress, s.listenerOpts...)
-		if err != nil {
-			vlog.Errorf("ipc: Listen on %v %v failed: %v", protocol, address, err)
-			return nil, err
-		}
-		ipep, ok := pep.(*inaming.Endpoint)
-		if !ok {
-			return nil, fmt.Errorf("failed translating internal endpoint data types")
-		}
-		// We have a goroutine for listening on proxy connections.
-		s.active.Add(1)
-		go func(ln stream.Listener, ep *inaming.Endpoint, proxy string) {
-			s.proxyListenLoop(ln, ep, proxy)
+	if iep != nil {
+		// We have a goroutine per listener to accept new flows.
+		// Each flow is served from its own goroutine.
+		go func() {
+			s.active.Add(1)
+			s.listenLoop(ln, iep)
 			s.active.Done()
-		}(pln, ipep, listenSpec.Proxy)
-		s.listeners[pln] = nil
-		// TODO(cnicolaou,p): AddServer no longer needs to take the
-		// servesMountTable bool since it can be extracted from the endpoint.
-		s.publisher.AddServer(s.publishEP(ipep, s.servesMountTable), s.servesMountTable)
-	} else {
-		s.publisher.AddServer(s.publishEP(ep, s.servesMountTable), s.servesMountTable)
+		}()
+		s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
 	}
-	s.Unlock()
-	return ep, nil
+
+	if len(listenSpec.Proxy) > 0 {
+		// We have a goroutine for listening on proxy connections.
+		go func() {
+			s.active.Add(1)
+			s.proxyListenLoop(listenSpec.Proxy)
+			s.active.Done()
+		}()
+	}
+	return iep, nil
 }
 
 // TODO(cnicolaou): Take this out or make the ServesMountTable bit work in the endpoint.
@@ -399,63 +280,83 @@
 	return naming.JoinAddressName(ep.String(), name)
 }
 
-func (s *server) proxyListenLoop(ln stream.Listener, iep *inaming.Endpoint, proxy string) {
+func (s *server) reconnectAndPublishProxy(proxy string) (*inaming.Endpoint, stream.Listener, error) {
+	resolved, err := s.resolveToAddress(proxy)
+	if err != nil {
+		return nil, nil, fmt.Errorf("Failed to resolve proxy %q (%v)", proxy, err)
+	}
+	ln, ep, err := s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
+	if err != nil {
+		return nil, nil, fmt.Errorf("failed to listen on %q: %s", resolved, err)
+	}
+	iep, ok := ep.(*inaming.Endpoint)
+	if !ok {
+		ln.Close()
+		return nil, nil, fmt.Errorf("internal type conversion error for %T", ep)
+	}
+	s.Lock()
+	s.listeners[ln] = nil
+	s.Unlock()
+	s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
+	return iep, ln, nil
+}
+
+func (s *server) proxyListenLoop(proxy string) {
 	const (
 		min = 5 * time.Millisecond
 		max = 5 * time.Minute
 	)
+
+	iep, ln, err := s.reconnectAndPublishProxy(proxy)
+	if err != nil {
+		vlog.VI(1).Infof("Failed to connect to proxy: %s", err)
+	}
+	// the initial connection maybe have failed, but we enter the retry
+	// loop anyway so that we will continue to try and connect to the
+	// proxy.
+
+	s.Lock()
+	if s.stopped {
+		s.Unlock()
+		return
+	}
+	s.Unlock()
+
 	for {
-		s.listenLoop(ln, iep)
-		// The listener is done, so:
-		// (1) Unpublish its name
-		s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
+		if ln != nil && iep != nil {
+			s.listenLoop(ln, iep)
+			// The listener is done, so:
+			// (1) Unpublish its name
+			s.publisher.RemoveServer(s.publishEP(iep, s.servesMountTable))
+		}
+
+		s.Lock()
+		if s.stopped {
+			s.Unlock()
+			return
+		}
+		s.Unlock()
+
 		// (2) Reconnect to the proxy unless the server has been stopped
 		backoff := min
 		ln = nil
-		// TODO(ashankar,cnicolaou): this code is way too confusing and should
-		// be cleaned up.
-		for ln == nil {
+		for {
 			select {
 			case <-time.After(backoff):
-				resolved, err := s.resolveToAddress(proxy)
-				if err != nil {
-					vlog.VI(1).Infof("Failed to resolve proxy %q (%v), will retry in %v", proxy, err, backoff)
-					if backoff = backoff * 2; backoff > max {
-						backoff = max
-					}
-					break
-				}
-				var ep naming.Endpoint
-				ln, ep, err = s.streamMgr.Listen(inaming.Network, resolved, s.listenerOpts...)
-				if err == nil {
-					var ok bool
-					iep, ok = ep.(*inaming.Endpoint)
-					if !ok {
-						vlog.Errorf("failed translating internal endpoint data types")
-						ln = nil
-						continue
-					}
-					vlog.VI(1).Infof("Reconnected to proxy at %q listener: (%v, %v)", proxy, ln, iep)
-					break
-				}
 				if backoff = backoff * 2; backoff > max {
 					backoff = max
 				}
-				vlog.VI(1).Infof("Proxy reconnection failed, will retry in %v", backoff)
 			case <-s.stoppedChan:
 				return
 			}
+			// (3) reconnect, publish new address
+			if iep, ln, err = s.reconnectAndPublishProxy(proxy); err != nil {
+				vlog.VI(1).Infof("Failed to reconnect to proxy %q: %s", proxy, err)
+			} else {
+				vlog.VI(1).Infof("Reconnected to proxy %q, %s", proxy, iep)
+				break
+			}
 		}
-		// TODO(cnicolaou,ashankar): this won't work when we are both
-		// proxying and publishing locally, which is the common case.
-		// listenLoop, dhcpLoop and the original publish are all publishing
-		// addresses to the same name, but the client is not smart enough
-		// to choose sensibly between them.
-		// (3) reconnected, publish new address
-		s.publisher.AddServer(s.publishEP(iep, s.servesMountTable), s.servesMountTable)
-		s.Lock()
-		s.listeners[ln] = nil
-		s.Unlock()
 	}
 }
 
@@ -491,7 +392,7 @@
 	defer dhcpl.Unlock()
 	for _, a := range addrs {
 		if ip := netstate.AsIP(a); ip != nil {
-			dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.port)
+			dhcpl.ep.Address = net.JoinHostPort(ip.String(), dhcpl.pubPort)
 			fn(s.publishEP(dhcpl.ep, s.servesMountTable))
 		}
 	}
@@ -500,6 +401,14 @@
 func (s *server) dhcpLoop(dhcpl *dhcpListener) {
 	defer vlog.VI(1).Infof("ipc: Stopped listen for dhcp changes on %v", dhcpl.ep)
 	vlog.VI(2).Infof("ipc: dhcp loop")
+
+	ep := *dhcpl.ep
+	// Publish all of the addresses
+	for _, pubAddr := range dhcpl.pubAddrs {
+		ep.Address = net.JoinHostPort(pubAddr.Address().String(), dhcpl.pubPort)
+		s.publisher.AddServer(s.publishEP(&ep, s.servesMountTable), s.servesMountTable)
+	}
+
 	for setting := range dhcpl.ch {
 		if setting == nil {
 			return
@@ -513,11 +422,6 @@
 				s.Unlock()
 				return
 			}
-			// TODO(cnicolaou,ashankar): this won't work when we are both
-			// proxying and publishing locally, which is the common case.
-			// listenLoop, dhcpLoop and the original publish are all publishing
-			// addresses to the same name, but the client is not smart enough
-			// to choose sensibly between them.
 			publisher := s.publisher
 			s.Unlock()
 			switch setting.Name() {
@@ -528,7 +432,6 @@
 				vlog.Infof("Removed some addresses: %q", v)
 				s.applyChange(dhcpl, v, publisher.RemoveServer)
 			}
-
 		}
 	}
 }
@@ -626,6 +529,7 @@
 	s.publisher.WaitForStop()
 
 	s.Lock()
+
 	// Close all listeners.  No new flows will be accepted, while in-flight
 	// flows will continue until they terminate naturally.
 	nListeners := len(s.listeners)
@@ -642,6 +546,7 @@
 			dhcpl.Unlock()
 		}
 	}
+
 	s.Unlock()
 	var firstErr error
 	for i := 0; i < nListeners; i++ {
@@ -654,6 +559,7 @@
 
 	// Wait for the publisher and active listener + flows to finish.
 	s.active.Wait()
+
 	s.Lock()
 	s.disp = nil
 	s.Unlock()
diff --git a/runtimes/google/ipc/server_test.go b/runtimes/google/ipc/server_test.go
index f4e5a80..02088f5 100644
--- a/runtimes/google/ipc/server_test.go
+++ b/runtimes/google/ipc/server_test.go
@@ -4,19 +4,21 @@
 	"fmt"
 	"io"
 	"os"
+	"reflect"
+	"sort"
 	"strings"
 	"testing"
 	"time"
 
+	"veyron.io/veyron/veyron2/ipc"
 	"veyron.io/veyron/veyron2/naming"
 
 	"veyron.io/veyron/veyron/lib/expect"
-	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
-
 	"veyron.io/veyron/veyron/lib/modules"
 	imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/sectest"
+	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vc"
 	inaming "veyron.io/veyron/veyron/runtimes/google/naming"
 	tnaming "veyron.io/veyron/veyron/runtimes/google/testing/mocks/naming"
 )
@@ -113,7 +115,22 @@
 	return h.ns.Unmount(testContext(), "proxy", h.mount)
 }
 
+func TestProxyOnly(t *testing.T) {
+	listenSpec := ipc.ListenSpec{Proxy: "proxy"}
+	testProxy(t, listenSpec)
+}
+
 func TestProxy(t *testing.T) {
+	listenSpec := ipc.ListenSpec{Protocol: "tcp", Address: "127.0.0.1:0", Proxy: "proxy"}
+	testProxy(t, listenSpec)
+}
+
+func addrOnly(name string) string {
+	addr, _ := naming.SplitAddressName(name)
+	return addr
+}
+
+func testProxy(t *testing.T, spec ipc.ListenSpec) {
 	sm := imanager.InternalNew(naming.FixedRoutingID(0x555555555))
 	ns := tnaming.NewSimpleNamespace()
 	client, err := InternalNewClient(sm, ns, vc.LocalPrincipal{sectest.NewPrincipal("client")})
@@ -127,6 +144,10 @@
 	}
 	defer server.Stop()
 
+	// If no address is specified then we'll only 'listen' via
+	// the proxy.
+	hasLocalListener := len(spec.Address) != 0
+
 	name := "mountpoint/server/suffix"
 	makeCall := func() (string, error) {
 		ctx, _ := testContext().WithDeadline(time.Now().Add(5 * time.Second))
@@ -148,15 +169,69 @@
 		t.Fatal(err)
 	}
 	defer proxy.Stop()
-	spec := listenSpec
-	spec.Proxy = "proxy"
-	if _, err := server.Listen(spec); err != nil {
+	addrs := verifyMount(t, ns, "proxy")
+	if len(addrs) != 1 {
+		t.Fatalf("failed to lookup proxy")
+	}
+	proxyEP := addrOnly(addrs[0])
+
+	ep, err := server.Listen(spec)
+	if err != nil {
 		t.Fatal(err)
 	}
 	if err := server.ServeDispatcher("mountpoint/server", testServerDisp{&testServer{}}); err != nil {
 		t.Fatal(err)
 	}
-	verifyMount(t, ns, name)
+
+	ch := make(chan struct{})
+	// Proxy connections are started asynchronously, so we need to wait..
+	waitfor := func(expect int) {
+		for {
+			addrs, _ := ns.Resolve(testContext(), name)
+			if len(addrs) == expect {
+				close(ch)
+				return
+			}
+			time.Sleep(100 * time.Millisecond)
+		}
+	}
+
+	proxiedEP, err := inaming.NewEndpoint(proxyEP)
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	proxiedEP.RID = naming.FixedRoutingID(0x555555555)
+	expectedEndpoints := []string{proxiedEP.String()}
+	if hasLocalListener {
+		expectedEndpoints = append(expectedEndpoints, ep.String())
+	}
+
+	// Proxy connetions are created asynchronously, so we wait for the
+	// expected number of endpoints to appear for the specified service name.
+	go waitfor(len(expectedEndpoints))
+	select {
+	case <-time.After(time.Minute):
+		t.Fatalf("timedout waiting for two entries in the mount table")
+	case <-ch:
+	}
+
+	got := []string{}
+	for _, s := range verifyMount(t, ns, name) {
+		got = append(got, addrOnly(s))
+	}
+	sort.Strings(got)
+	sort.Strings(expectedEndpoints)
+	if !reflect.DeepEqual(got, expectedEndpoints) {
+		t.Errorf("got %v, want %v", got, expectedEndpoints)
+	}
+
+	if hasLocalListener {
+		// Listen will publish both the local and proxied endpoint with the
+		// mount table, given that we're trying to test the proxy, we remove
+		// the local endpoint from the mount table entry!
+		ns.Unmount(testContext(), "mountpoint/server", naming.JoinAddressName(ep.String(), ""))
+	}
+
 	// Proxied endpoint should be published and RPC should succeed (through proxy)
 	const expected = `method:"Echo",suffix:"suffix",arg:"batman"`
 	if result, err := makeCall(); result != expected || err != nil {
diff --git a/runtimes/google/ipc/stream/manager/listener.go b/runtimes/google/ipc/stream/manager/listener.go
index 0d67c12..a0f2789 100644
--- a/runtimes/google/ipc/stream/manager/listener.go
+++ b/runtimes/google/ipc/stream/manager/listener.go
@@ -6,6 +6,7 @@
 	"net"
 	"strings"
 	"sync"
+	"time"
 
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/proxy"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/vif"
@@ -154,7 +155,9 @@
 
 func (ln *proxyListener) connect() (*vif.VIF, naming.Endpoint, error) {
 	vlog.VI(1).Infof("Connecting to proxy at %v", ln.proxyEP)
-	vf, err := ln.manager.FindOrDialVIF(ln.proxyEP.Addr())
+	// TODO(cnicolaou, ashankar): probably want to set a timeout here.
+	var timeout time.Duration
+	vf, err := ln.manager.FindOrDialVIF(ln.proxyEP.Addr(), timeout)
 	if err != nil {
 		return nil, nil, err
 	}
diff --git a/runtimes/google/ipc/stream/manager/manager.go b/runtimes/google/ipc/stream/manager/manager.go
index bc8284e..cfb4ad0 100644
--- a/runtimes/google/ipc/stream/manager/manager.go
+++ b/runtimes/google/ipc/stream/manager/manager.go
@@ -7,6 +7,7 @@
 	"net"
 	"strings"
 	"sync"
+	"time"
 
 	"veyron.io/veyron/veyron/lib/stats"
 	"veyron.io/veyron/veyron/runtimes/google/ipc/stream/crypto"
@@ -54,23 +55,28 @@
 
 var _ stream.Manager = (*manager)(nil)
 
-func dial(network, address string) (net.Conn, error) {
+type DialTimeout struct{ time.Duration }
+
+func (DialTimeout) IPCStreamVCOpt() {}
+func (DialTimeout) IPCClientOpt()   {}
+
+func dial(network, address string, timeout time.Duration) (net.Conn, error) {
 	if d, _ := stream.RegisteredProtocol(network); d != nil {
 		return d(address)
 	}
-	return net.Dial(network, address)
+	return net.DialTimeout(network, address, timeout)
 }
 
 // FindOrDialVIF returns the network connection (VIF) to the provided address
 // from the cache in the manager. If not already present in the cache, a new
 // connection will be created using net.Dial.
-func (m *manager) FindOrDialVIF(addr net.Addr) (*vif.VIF, error) {
+func (m *manager) FindOrDialVIF(addr net.Addr, timeout time.Duration) (*vif.VIF, error) {
 	network, address := addr.Network(), addr.String()
 	if vf := m.vifs.Find(network, address); vf != nil {
 		return vf, nil
 	}
 	vlog.VI(1).Infof("(%q, %q) not in VIF cache. Dialing", network, address)
-	conn, err := dial(network, address)
+	conn, err := dial(network, address, timeout)
 	if err != nil {
 		return nil, fmt.Errorf("net.Dial(%q, %q) failed: %v", network, address, err)
 	}
@@ -104,10 +110,17 @@
 }
 
 func (m *manager) Dial(remote naming.Endpoint, opts ...stream.VCOpt) (stream.VC, error) {
+	var timeout time.Duration
+	for _, o := range opts {
+		switch v := o.(type) {
+		case *DialTimeout:
+			timeout = v.Duration
+		}
+	}
 	// If vif.Dial fails because the cached network connection was broken, remove from
 	// the cache and try once more.
 	for retry := true; true; retry = false {
-		vf, err := m.FindOrDialVIF(remote.Addr())
+		vf, err := m.FindOrDialVIF(remote.Addr(), timeout)
 		if err != nil {
 			return nil, err
 		}
diff --git a/runtimes/google/ipc/stream/manager/manager_test.go b/runtimes/google/ipc/stream/manager/manager_test.go
index 873d882..a4a7b25 100644
--- a/runtimes/google/ipc/stream/manager/manager_test.go
+++ b/runtimes/google/ipc/stream/manager/manager_test.go
@@ -124,6 +124,27 @@
 	}
 }
 
+func TestConnectionTimeout(t *testing.T) {
+	client := InternalNew(naming.FixedRoutingID(0xcccccccc))
+
+	ch := make(chan error)
+	go func() {
+		// 203.0.113.0 is TEST-NET-3 from RFC5737
+		ep, _ := inaming.NewEndpoint(naming.FormatEndpoint("tcp", "203.0.113.10:80"))
+		_, err := client.Dial(ep, &DialTimeout{time.Second})
+		ch <- err
+	}()
+
+	select {
+	case err := <-ch:
+		if err == nil {
+			t.Fatalf("expected an error")
+		}
+	case <-time.After(time.Minute):
+		t.Fatalf("timedout")
+	}
+}
+
 func TestAuthenticatedByDefault(t *testing.T) {
 	var (
 		server = InternalNew(naming.FixedRoutingID(0x55555555))
@@ -473,7 +494,7 @@
 	}
 }
 
-// Required by blackbox framework
+// Needed by modules framework
 func TestHelperProcess(t *testing.T) {
 	modules.DispatchInTest()
 }
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index 6f3de3b..d5abc7a 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -3,6 +3,7 @@
 import (
 	"fmt"
 	"math/rand"
+	"time"
 
 	iipc "veyron.io/veyron/veyron/runtimes/google/ipc"
 	imanager "veyron.io/veyron/veyron/runtimes/google/ipc/stream/manager"
@@ -36,7 +37,11 @@
 		}
 	}
 	// Add the option that provides the runtime's principal to the client.
-	otherOpts = append(otherOpts, vc.LocalPrincipal{rt.principal})
+	// Set a low timeout for now, until we get parallel connections
+	// going.
+	// TODO(cnicolaou): extend the timeout when parallel connections are
+	// going.
+	otherOpts = append(otherOpts, vc.LocalPrincipal{rt.principal}, &imanager.DialTimeout{5 * time.Second})
 	return iipc.InternalNewClient(sm, ns, otherOpts...)
 }