runtimes/google/ipc: client side RPC improvements:

- implement 'ordering' of multiple endpoints.
- implement parallel connection setup

Change-Id: I9e2ba5441b681cd1bd0ef81d0718b61d5d237e35
diff --git a/lib/netstate/netstate.go b/lib/netstate/netstate.go
index ff276b8..e9a61a5 100644
--- a/lib/netstate/netstate.go
+++ b/lib/netstate/netstate.go
@@ -225,7 +225,6 @@
 		return true
 	default:
 		return false
-
 	}
 }
 
@@ -378,7 +377,6 @@
 	if err != nil {
 		return false, err
 	}
-
 	ips := make(map[string]struct{})
 	for _, a := range addrs {
 		ip, _, err := net.ParseCIDR(a.Address().String())
@@ -392,7 +390,6 @@
 	if err != nil {
 		return false, err
 	}
-
 	_, islocal := ips[client]
 	return islocal, nil
 }
diff --git a/profiles/chrome/chrome.go b/profiles/chrome/chrome.go
new file mode 100644
index 0000000..22b5beb
--- /dev/null
+++ b/profiles/chrome/chrome.go
@@ -0,0 +1,49 @@
+// Package chrome implements a profile for use within Chrome, in particular
+// for use by Chrome extensions.
+package chrome
+
+import (
+	"veyron.io/veyron/veyron2"
+	"veyron.io/veyron/veyron2/config"
+	"veyron.io/veyron/veyron2/ipc"
+	"veyron.io/veyron/veyron2/options"
+	"veyron.io/veyron/veyron2/rt"
+
+	"veyron.io/veyron/veyron/profiles"
+)
+
+var ListenSpec = ipc.ListenSpec{}
+
+type chrome struct{}
+
+func init() {
+	rt.RegisterProfile(New())
+}
+
+// New returns a new instance of a Profile for use within chrome, in particular
+// chrome extensions etc should use.
+func New() veyron2.Profile {
+	return &chrome{}
+}
+
+func (*chrome) Name() string {
+	return "chrome"
+}
+
+func (*chrome) Runtime() (string, []veyron2.ROpt) {
+	return veyron2.GoogleRuntimeName, []veyron2.ROpt{options.PreferredProtocols{"ws"}}
+}
+
+func (*chrome) Platform() *veyron2.Platform {
+	p, _ := profiles.Platform()
+	return p
+}
+
+func (g *chrome) Init(rt veyron2.Runtime, _ *config.Publisher) error {
+	rt.Logger().VI(1).Infof("%s", g)
+	return nil
+}
+
+func (g *chrome) String() string {
+	return "chrome profile on " + g.Platform().String()
+}
diff --git a/profiles/gce/init.go b/profiles/gce/init.go
index 4102b78..b5afa26 100644
--- a/profiles/gce/init.go
+++ b/profiles/gce/init.go
@@ -41,8 +41,8 @@
 	return "GCE"
 }
 
-func (p *profile) Runtime() string {
-	return ""
+func (p *profile) Runtime() (string, []veyron2.ROpt) {
+	return "", nil
 }
 
 func (p *profile) Platform() *veyron2.Platform {
diff --git a/profiles/generic.go b/profiles/generic.go
index 0d9dda2..9aadc25 100644
--- a/profiles/generic.go
+++ b/profiles/generic.go
@@ -35,8 +35,8 @@
 	return "generic"
 }
 
-func (*generic) Runtime() string {
-	return veyron2.GoogleRuntimeName
+func (*generic) Runtime() (string, []veyron2.ROpt) {
+	return veyron2.GoogleRuntimeName, nil
 }
 
 func (*generic) Platform() *veyron2.Platform {
diff --git a/profiles/roaming/roaming.go b/profiles/roaming/roaming.go
index 33a1c20..d4c0262 100644
--- a/profiles/roaming/roaming.go
+++ b/profiles/roaming/roaming.go
@@ -61,8 +61,8 @@
 	return "roaming" + p.gce
 }
 
-func (p *profile) Runtime() string {
-	return veyron2.GoogleRuntimeName
+func (p *profile) Runtime() (string, []veyron2.ROpt) {
+	return veyron2.GoogleRuntimeName, nil
 }
 
 func (p *profile) String() string {
diff --git a/profiles/static/static.go b/profiles/static/static.go
index 7ff75b1..377874a 100644
--- a/profiles/static/static.go
+++ b/profiles/static/static.go
@@ -46,8 +46,8 @@
 	return "static" + p.gce
 }
 
-func (p *static) Runtime() string {
-	return "google"
+func (p *static) Runtime() (string, []veyron2.ROpt) {
+	return "google", nil
 }
 
 func (*static) Platform() *veyron2.Platform {
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index ce6e65c..1b72b7a 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -29,6 +29,7 @@
 
 var (
 	errNoServers              = verror.NoExistf("ipc: no servers")
+	errNoAccess               = verror.NoAccessf("ipc: client unwilling to access to server")
 	errFlowClosed             = verror.Abortedf("ipc: flow closed")
 	errRemainingStreamResults = verror.BadProtocolf("ipc: Finish called with remaining streaming results")
 	errNonRootedName          = verror.BadArgf("ipc: cannot connect to a non-rooted name")
@@ -40,9 +41,10 @@
 const enableSecureServerAuth = false
 
 type client struct {
-	streamMgr stream.Manager
-	ns        naming.Namespace
-	vcOpts    []stream.VCOpt // vc opts passed to dial
+	streamMgr          stream.Manager
+	ns                 naming.Namespace
+	vcOpts             []stream.VCOpt // vc opts passed to dial
+	preferredProtocols []string
 
 	// We support concurrent calls to StartCall and Close, so we must protect the
 	// vcMap.  Everything else is initialized upon client construction, and safe
@@ -63,6 +65,7 @@
 }
 
 func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) {
+
 	c := &client{
 		streamMgr: streamMgr,
 		ns:        ns,
@@ -73,16 +76,23 @@
 			c.dc = dc
 		}
 		// Collect all client opts that are also vc opts.
-		if vcOpt, ok := opt.(stream.VCOpt); ok {
-			c.vcOpts = append(c.vcOpts, vcOpt)
+		switch v := opt.(type) {
+		case stream.VCOpt:
+			c.vcOpts = append(c.vcOpts, v)
+		case options.PreferredProtocols:
+			c.preferredProtocols = v
 		}
 	}
+
 	return c, nil
 }
 
 func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) {
 	c.vcMapMu.Lock()
 	defer c.vcMapMu.Unlock()
+	if c.vcMap == nil {
+		return nil, fmt.Errorf("client has been closed")
+	}
 	if vcinfo := c.vcMap[ep.String()]; vcinfo != nil {
 		if flow, err := vcinfo.vc.Connect(); err == nil {
 			return flow, nil
@@ -95,12 +105,17 @@
 		// before removing the vc from the map?
 		delete(c.vcMap, ep.String())
 	}
+	sm := c.streamMgr
 	c.vcMapMu.Unlock()
-	vc, err := c.streamMgr.Dial(ep, c.vcOpts...)
+	vc, err := sm.Dial(ep, c.vcOpts...)
 	c.vcMapMu.Lock()
 	if err != nil {
 		return nil, err
 	}
+	if c.vcMap == nil {
+		sm.ShutdownEndpoint(ep)
+		return nil, fmt.Errorf("client has been closed")
+	}
 	if othervc, exists := c.vcMap[ep.String()]; exists {
 		vc = othervc.vc
 		// TODO(ashankar,toddw): Figure out how to close up the VC that
@@ -253,79 +268,211 @@
 	return nil, lastErr
 }
 
-// tryCall makes a single attempt at a call.
+type serverStatus struct {
+	index     int
+	suffix    string
+	flow      stream.Flow
+	processed bool
+	err       verror.E
+}
+
+func (c *client) tryServer(index int, server string, ch chan<- *serverStatus, done <-chan struct{}) {
+	select {
+	case <-done:
+		return
+	default:
+	}
+	status := &serverStatus{index: index}
+	flow, suffix, err := c.connectFlow(server)
+	if err != nil {
+		vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err)
+		status.err = verror.NoExistf("ipc: %q: %s", server, err)
+		ch <- status
+		return
+	}
+	status.suffix = suffix
+	status.flow = flow
+	select {
+	case <-done:
+		flow.Close()
+	default:
+		ch <- status
+	}
+}
+
+// tryCall makes a single attempt at a call, against possibly multiple servers.
 func (c *client) tryCall(ctx context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, verror.E) {
 	ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("<client>\"%s\".%s", name, method))
-
 	_, serverPattern, name := splitObjectName(name)
-
 	// Resolve name unless told not to.
 	var servers []string
 	if getNoResolveOpt(opts) {
 		servers = []string{name}
 	} else {
-		var err error
-		if servers, err = c.ns.Resolve(ctx, name); err != nil {
+		if resolved, err := c.ns.Resolve(ctx, name); err != nil {
 			return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err)
+		} else {
+			// An empty set of protocols means all protocols...
+			ordered, err := filterAndOrderServers(resolved, c.preferredProtocols)
+			if len(ordered) == 0 {
+				return nil, verror.NoExistf("ipc: %q: %s", name, err)
+			}
+			servers = ordered
 		}
 	}
+	// servers is now orderd by the priority heurestic implemented in
+	// filterAndOrderServers.
+	attempts := len(servers)
+	if attempts == 0 {
+		return nil, errNoServers
+	}
 
-	// Try all servers, and if none of them are authorized for the call then return the error of the last server
-	// that was tried.
-	var lastErr verror.E
-	// TODO(cnicolaou): sort servers by sensible metric.
-	for _, server := range servers {
-		flow, suffix, err := c.connectFlow(server)
-		if err != nil {
-			lastErr = verror.NoExistf("ipc: couldn't connect to server %v: %v", server, err)
-			vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err)
-			continue // Try the next server.
-		}
-		flow.SetDeadline(ctx.Done())
-		var (
-			serverB  []string
-			grantedB security.Blessings
-		)
-		// LocalPrincipal is nil means that the client wanted to avoid authentication,
-		// and thus wanted to skip authorization as well.
-		if flow.LocalPrincipal() != nil {
-			// Validate caveats on the server's identity for the context associated with this call.
-			if serverB, grantedB, err = c.authorizeServer(flow, name, method, serverPattern, opts); err != nil {
-				lastErr = verror.NoAccessf("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
-				flow.Close()
-				continue
+	// Try to connect to all servers in parallel.
+	responses := make([]*serverStatus, attempts)
+
+	// Provide sufficient buffering for all of the connections to finish
+	// instantaneously. This is important because we want to process
+	// the responses in priority order; that order is indicated by the
+	// order of entries in servers. So, if two respones come in at the
+	// same 'instant', we prefer the first in the slice.
+	ch := make(chan *serverStatus, attempts)
+
+	// Read as many responses as we can before we would block.
+	gatherResponses := func() {
+		for {
+			select {
+			default:
+				return
+			case s := <-ch:
+				responses[s.index] = s
 			}
 		}
-
-		lastErr = nil
-		fc := newFlowClient(ctx, serverB, flow, c.dc)
-
-		if doneChan := ctx.Done(); doneChan != nil {
-			go func() {
-				select {
-				case <-ctx.Done():
-					fc.Cancel()
-				case <-fc.flow.Closed():
-				}
-			}()
-		}
-
-		timeout := time.Duration(ipc.NoTimeout)
-		if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
-			timeout = deadline.Sub(time.Now())
-		}
-		if verr := fc.start(suffix, method, args, timeout, grantedB); verr != nil {
-			return nil, verr
-		}
-		return fc, nil
 	}
-	if lastErr != nil {
-		// If there was any problem starting the call, flush the cache entry under the
-		// assumption that it was caused by stale data.
+
+	delay := time.Duration(ipc.NoTimeout)
+	if dl, set := ctx.Deadline(); set {
+		delay = dl.Sub(time.Now())
+	}
+	timeoutChan := time.After(delay)
+
+	// We'll close this channel when an RPC has been started and we've
+	// irrevocably selected a server.
+	done := make(chan struct{})
+	// Try all of the servers in parallel.
+	for i, server := range servers {
+		go c.tryServer(i, server, ch, done)
+	}
+
+	select {
+	case <-timeoutChan:
+		// All calls failed if we get here.
+		close(done)
 		c.ns.FlushCacheEntry(name)
-		return nil, lastErr
+		return nil, verror.NoExistf("ipc: couldn't connect to server %v", name)
+	case s := <-ch:
+		responses[s.index] = s
+		gatherResponses()
 	}
-	return nil, errNoServers
+
+	accessErrs := []error{}
+	connErrs := []error{}
+	for {
+
+		for _, r := range responses {
+			if r == nil || r.err != nil {
+				if r != nil && r.err != nil && !r.processed {
+					connErrs = append(connErrs, r.err)
+					r.processed = true
+				}
+				continue
+			}
+
+			flow := r.flow
+			suffix := r.suffix
+			flow.SetDeadline(ctx.Done())
+
+			var (
+				serverB  []string
+				grantedB security.Blessings
+			)
+
+			// LocalPrincipal is nil means that the client wanted to avoid
+			// authentication, and thus wanted to skip authorization as well.
+			if flow.LocalPrincipal() != nil {
+				// Validate caveats on the server's identity for the context associated with this call.
+				var err error
+				if serverB, grantedB, err = c.authorizeServer(flow, name, method, serverPattern, opts); err != nil {
+					vlog.VI(2).Infof("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
+					if !r.processed {
+						accessErrs = append(accessErrs, err)
+						r.err = verror.NoAccessf("ipc: unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
+						r.processed = true
+					}
+					flow.Close()
+					continue
+				}
+			}
+
+			// This is the 'point of no return', so we tell the tryServer
+			// goroutines to not bother sending us any more flows.
+			// Once the RPC is started (fc.start below) we can't be sure
+			// if it makes it to the server or not so, this code will
+			// never call fc.start more than once to ensure that we
+			// provide 'at-most-once' rpc semantics at this level. Retrying
+			// the network connections (i.e. creating flows) is fine since
+			// we can cleanup that state if we abort a call (i.e. close the
+			// flow).
+			close(done)
+
+			fc := newFlowClient(ctx, serverB, flow, c.dc)
+
+			if doneChan := ctx.Done(); doneChan != nil {
+				go func() {
+					select {
+					case <-ctx.Done():
+						fc.Cancel()
+					case <-fc.flow.Closed():
+					}
+				}()
+			}
+
+			timeout := time.Duration(ipc.NoTimeout)
+			if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
+				timeout = deadline.Sub(time.Now())
+			}
+			if verr := fc.start(suffix, method, args, timeout, grantedB); verr != nil {
+				return nil, verr
+			}
+			return fc, nil
+		}
+
+		// Quit if we've seen an error from all parallel connection attempts
+		handled := 0
+		for _, r := range responses {
+			if r != nil && r.err != nil {
+				handled++
+			}
+		}
+		if handled == len(responses) {
+			break
+		}
+
+		select {
+		case <-timeoutChan:
+			// All remaining calls failed if we get here.
+			vlog.VI(2).Infof("ipc: couldn't connect to server %v", name)
+			goto quit
+		case s := <-ch:
+			responses[s.index] = s
+			gatherResponses()
+		}
+	}
+quit:
+	close(done)
+	c.ns.FlushCacheEntry(name)
+	// TODO(cnicolaou): introduce a third error code here for mixed
+	// conn/access errors.
+	return nil, verror.NoExistf("ipc: client failed to invoke  %q.%q: on %v", name, method, servers, append(connErrs, accessErrs...))
 }
 
 // authorizeServer validates that the server (remote end of flow) has the credentials to serve
diff --git a/runtimes/google/ipc/sort_endpoints.go b/runtimes/google/ipc/sort_endpoints.go
new file mode 100644
index 0000000..ab9f730
--- /dev/null
+++ b/runtimes/google/ipc/sort_endpoints.go
@@ -0,0 +1,227 @@
+package ipc
+
+import (
+	"fmt"
+	"net"
+	"strings"
+
+	"veyron.io/veyron/veyron2/naming"
+	"veyron.io/veyron/veyron2/vlog"
+
+	"veyron.io/veyron/veyron/lib/netstate"
+	"veyron.io/veyron/veyron/runtimes/google/ipc/version"
+	inaming "veyron.io/veyron/veyron/runtimes/google/naming"
+)
+
+type errorAccumulator struct {
+	errs []error
+}
+
+func (e *errorAccumulator) add(err error) {
+	e.errs = append(e.errs, err)
+}
+
+func (e *errorAccumulator) failed() bool {
+	return len(e.errs) > 0
+}
+
+func (e *errorAccumulator) String() string {
+	r := ""
+	for _, err := range e.errs {
+		r += fmt.Sprintf("(%s)", err)
+	}
+	return r
+}
+
+// TODO(cnicolaou): simplify this code, especially the use of maps+slices
+// and special cases.
+
+func newErrorAccumulator() *errorAccumulator {
+	return &errorAccumulator{errs: make([]error, 0, 4)}
+}
+
+type serverEndpoint struct {
+	iep    *inaming.Endpoint
+	suffix string
+}
+
+func (se *serverEndpoint) String() string {
+	return fmt.Sprintf("(%s, %q)", se.iep, se.suffix)
+}
+
+func filterCompatibleEndpoints(errs *errorAccumulator, servers []string) []*serverEndpoint {
+	se := make([]*serverEndpoint, 0, len(servers))
+	for _, server := range servers {
+		name := server
+		address, suffix := naming.SplitAddressName(name)
+		if len(address) == 0 {
+			errs.add(fmt.Errorf("%q is not a rooted name", name))
+			continue
+		}
+		iep, err := inaming.NewEndpoint(address)
+		if err != nil {
+			errs.add(fmt.Errorf("%q: %s", name, err))
+			continue
+		}
+		if err = version.CheckCompatibility(iep); err != nil {
+			errs.add(fmt.Errorf("%q: %s", name, err))
+			continue
+		}
+		sep := &serverEndpoint{iep, suffix}
+		se = append(se, sep)
+	}
+	return se
+}
+
+func sortByProtocol(eps []*serverEndpoint) map[string][]*serverEndpoint {
+	byProtocol := make(map[string][]*serverEndpoint)
+	for _, ep := range eps {
+		p := ep.iep.Protocol
+		byProtocol[p] = append(byProtocol[p], ep)
+	}
+	return byProtocol
+}
+
+func unmatchedProtocols(hashed map[string][]*serverEndpoint, protocols []string) []*serverEndpoint {
+	unmatched := make([]*serverEndpoint, 0, 10)
+	for p, eps := range hashed {
+		found := false
+		for _, preferred := range protocols {
+			if p == preferred {
+				found = true
+				break
+			}
+		}
+		if !found {
+			unmatched = append(unmatched, eps...)
+		}
+	}
+	return unmatched
+}
+
+func orderByLocality(ifcs netstate.AddrList, eps []*serverEndpoint) []*serverEndpoint {
+	if len(ifcs) <= 1 {
+		return append([]*serverEndpoint{}, eps...)
+	}
+	ipnets := make([]*net.IPNet, 0, len(ifcs))
+	for _, a := range ifcs {
+		// Try IP
+		_, ipnet, err := net.ParseCIDR(a.Address().String())
+		if err != nil {
+			continue
+		}
+		ipnets = append(ipnets, ipnet)
+	}
+	if len(ipnets) == 0 {
+		return eps
+	}
+	// TODO(cnicolaou): this can obviously be made more efficient...
+	local := make([]*serverEndpoint, 0, len(eps))
+	remote := make([]*serverEndpoint, 0, len(eps))
+	notip := make([]*serverEndpoint, 0, len(eps))
+	for _, ep := range eps {
+		if strings.HasPrefix(ep.iep.Protocol, "tcp") || strings.HasPrefix(ep.iep.Protocol, "ws") {
+			// Take care to use the Address directly, since the network
+			// may be marked as a 'websocket'. This throws out any thought
+			// of dealing with IPv6 etc and web sockets.
+			host, _, err := net.SplitHostPort(ep.iep.Address)
+			if err != nil {
+				host = ep.iep.Address
+			}
+			ip := net.ParseIP(host)
+			if ip == nil {
+				notip = append(notip, ep)
+				continue
+			}
+			found := false
+			for _, ipnet := range ipnets {
+				if ipnet.Contains(ip) {
+					local = append(local, ep)
+					found = true
+					break
+				}
+			}
+			if !found {
+				remote = append(remote, ep)
+			}
+		} else {
+			notip = append(notip, ep)
+		}
+	}
+	return append(local, append(remote, notip...)...)
+}
+
+func slice(eps []*serverEndpoint) []string {
+	r := make([]string, len(eps))
+	for i, a := range eps {
+		r[i] = naming.JoinAddressName(a.iep.String(), a.suffix)
+	}
+	return r
+}
+
+func sliceByProtocol(eps map[string][]*serverEndpoint, protocols []string) []string {
+	r := make([]string, 0, 10)
+	for _, p := range protocols {
+		r = append(r, slice(eps[p])...)
+	}
+	return r
+}
+
+var defaultPreferredProtocolOrder = []string{"unixfd", "tcp4", "tcp", "tcp6"}
+
+func filterAndOrderServers(servers []string, protocols []string) ([]string, error) {
+	errs := newErrorAccumulator()
+	vlog.VI(3).Infof("Candidates[%v]: %v", protocols, servers)
+	compatible := filterCompatibleEndpoints(errs, servers)
+	if len(compatible) == 0 {
+		return nil, fmt.Errorf("failed to find any compatible servers: %s", errs)
+	}
+	vlog.VI(3).Infof("Version Compatible: %v", compatible)
+
+	// put the server endpoints into per-protocol lists
+	byProtocol := sortByProtocol(compatible)
+
+	if len(protocols) > 0 {
+		found := 0
+		for _, p := range protocols {
+			found += len(byProtocol[p])
+		}
+		if found == 0 {
+			return nil, fmt.Errorf("failed to find any servers compatible with %v from %s", protocols, servers)
+		}
+	}
+
+	// If a set of protocols is specified, then we will order
+	// and return endpoints that contain those protocols alone.
+	// However, if no protocols are supplied we'll order by
+	// a default ordering but append any endpoints that don't belong
+	// to that default ordering set to the returned endpoints.
+	remaining := []*serverEndpoint{}
+	preferredProtocolOrder := defaultPreferredProtocolOrder
+	if len(protocols) > 0 {
+		preferredProtocolOrder = protocols
+	} else {
+		remaining = unmatchedProtocols(byProtocol, preferredProtocolOrder)
+	}
+
+	vlog.VI(3).Infof("Have Protocols(%v): %v", protocols, byProtocol)
+
+	networks, err := netstate.GetAll()
+	if err != nil {
+		r := sliceByProtocol(byProtocol, preferredProtocolOrder)
+		r = append(r, slice(remaining)...)
+		return r, nil
+	}
+
+	ordered := make([]*serverEndpoint, 0, len(byProtocol))
+	for _, protocol := range preferredProtocolOrder {
+		o := orderByLocality(networks, byProtocol[protocol])
+		ordered = append(ordered, o...)
+	}
+
+	if len(protocols) == 0 {
+		ordered = append(ordered, remaining...)
+	}
+	vlog.VI(2).Infof("Ordered By Locality: %v", ordered)
+	return slice(ordered), nil
+}
diff --git a/runtimes/google/ipc/sort_internal_test.go b/runtimes/google/ipc/sort_internal_test.go
new file mode 100644
index 0000000..5b9b4cf
--- /dev/null
+++ b/runtimes/google/ipc/sort_internal_test.go
@@ -0,0 +1,169 @@
+package ipc
+
+import (
+	"reflect"
+	"strings"
+	"testing"
+
+	"veyron.io/veyron/veyron2/ipc/version"
+	"veyron.io/veyron/veyron2/naming"
+	"veyron.io/veyron/veyron2/vlog"
+)
+
+func TestIncompatible(t *testing.T) {
+	servers := []string{}
+
+	_, err := filterAndOrderServers(servers, []string{"tcp"})
+	if err == nil || err.Error() != "failed to find any compatible servers: " {
+		t.Errorf("expected a different error: %v", err)
+	}
+
+	for _, a := range []string{"127.0.0.1", "127.0.0.2"} {
+		addr := naming.FormatEndpoint("tcp", a, version.IPCVersionRange{100, 200})
+		name := naming.JoinAddressName(addr, "")
+		servers = append(servers, name)
+	}
+
+	_, err = filterAndOrderServers(servers, []string{"tcp"})
+	if err == nil || (!strings.HasPrefix(err.Error(), "failed to find any compatible servers:") && !strings.Contains(err.Error(), "No compatible IPC versions available")) {
+		vlog.Infof("A: %t . %t", strings.HasPrefix(err.Error(), "failed to find any compatible servers:"), !strings.Contains(err.Error(), "No compatible IPC versions available"))
+		t.Errorf("expected a different error to: %v", err)
+	}
+
+	for _, a := range []string{"127.0.0.3", "127.0.0.4"} {
+		name := naming.JoinAddressName(naming.FormatEndpoint("tcp", a), "")
+		servers = append(servers, name)
+	}
+
+	_, err = filterAndOrderServers(servers, []string{"foobar"})
+	if err == nil || !strings.HasPrefix(err.Error(), "failed to find any servers compatible with [foobar] ") {
+		t.Errorf("expected a different error to: %v", err)
+	}
+
+}
+
+func TestOrderingByProtocol(t *testing.T) {
+	servers := []string{}
+	for _, a := range []string{"127.0.0.3", "127.0.0.4"} {
+		name := naming.JoinAddressName(naming.FormatEndpoint("tcp", a), "")
+		servers = append(servers, name)
+	}
+	for _, a := range []string{"127.0.0.1", "127.0.0.2"} {
+		name := naming.JoinAddressName(naming.FormatEndpoint("tcp4", a), "")
+		servers = append(servers, name)
+	}
+	for _, a := range []string{"127.0.0.10", "127.0.0.11"} {
+		name := naming.JoinAddressName(naming.FormatEndpoint("foobar", a), "")
+		servers = append(servers, name)
+	}
+
+	got, err := filterAndOrderServers(servers, []string{"foobar", "tcp"})
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+
+	// Just foobar and tcp
+	want := []string{
+		"/@2@foobar@127.0.0.10@00000000000000000000000000000000@@@@",
+		"/@2@foobar@127.0.0.11@00000000000000000000000000000000@@@@",
+		"/@2@tcp@127.0.0.3@00000000000000000000000000000000@@@@",
+		"/@2@tcp@127.0.0.4@00000000000000000000000000000000@@@@",
+	}
+	if !reflect.DeepEqual(got, want) {
+		t.Errorf("got: %v, want %v", got, want)
+	}
+
+	// Everything, since we didn't specify a protocol
+	got, err = filterAndOrderServers(servers, []string{})
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	// The order will be the default preferred order for protocols, the
+	// original ordering within each protocol, with protocols that
+	// are not in the default ordering list at the end.
+	want = []string{
+		"/@2@tcp4@127.0.0.1@00000000000000000000000000000000@@@@",
+		"/@2@tcp4@127.0.0.2@00000000000000000000000000000000@@@@",
+		"/@2@tcp@127.0.0.3@00000000000000000000000000000000@@@@",
+		"/@2@tcp@127.0.0.4@00000000000000000000000000000000@@@@",
+		"/@2@foobar@127.0.0.10@00000000000000000000000000000000@@@@",
+		"/@2@foobar@127.0.0.11@00000000000000000000000000000000@@@@",
+	}
+	if !reflect.DeepEqual(got, want) {
+		t.Errorf("got: %v, want %v", got, want)
+	}
+
+	// Ask for all protocols, with no ordering, except for locality
+	servers = []string{}
+	for _, a := range []string{"74.125.69.139", "127.0.0.3", "127.0.0.1", "192.168.1.10", "74.125.142.83"} {
+		name := naming.JoinAddressName(naming.FormatEndpoint("tcp", a), "")
+		servers = append(servers, name)
+	}
+	for _, a := range []string{"127.0.0.10", "127.0.0.11"} {
+		name := naming.JoinAddressName(naming.FormatEndpoint("foobar", a), "")
+		servers = append(servers, name)
+	}
+	// Everything, since we didn't specify a protocol
+	got, err = filterAndOrderServers(servers, []string{})
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	want = []string{
+		"/@2@tcp@127.0.0.3@00000000000000000000000000000000@@@@",
+		"/@2@tcp@127.0.0.1@00000000000000000000000000000000@@@@",
+		"/@2@tcp@74.125.69.139@00000000000000000000000000000000@@@@",
+		"/@2@tcp@192.168.1.10@00000000000000000000000000000000@@@@",
+		"/@2@tcp@74.125.142.83@00000000000000000000000000000000@@@@",
+		"/@2@foobar@127.0.0.10@00000000000000000000000000000000@@@@",
+		"/@2@foobar@127.0.0.11@00000000000000000000000000000000@@@@",
+	}
+	if !reflect.DeepEqual(got, want) {
+		t.Errorf("got: %v, want %v", got, want)
+	}
+}
+
+func TestOrderByNetwork(t *testing.T) {
+	servers := []string{}
+	for _, a := range []string{"74.125.69.139", "127.0.0.3", "127.0.0.1", "192.168.1.10", "74.125.142.83"} {
+		name := naming.JoinAddressName(naming.FormatEndpoint("tcp", a), "")
+		servers = append(servers, name)
+	}
+	got, err := filterAndOrderServers(servers, []string{"tcp"})
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	want := []string{
+		"/@2@tcp@127.0.0.3@00000000000000000000000000000000@@@@",
+		"/@2@tcp@127.0.0.1@00000000000000000000000000000000@@@@",
+		"/@2@tcp@74.125.69.139@00000000000000000000000000000000@@@@",
+		"/@2@tcp@192.168.1.10@00000000000000000000000000000000@@@@",
+		"/@2@tcp@74.125.142.83@00000000000000000000000000000000@@@@",
+	}
+	if !reflect.DeepEqual(got, want) {
+		t.Errorf("got: %v, want %v", got, want)
+	}
+	for _, a := range []string{"74.125.69.139", "127.0.0.3:123", "127.0.0.1", "192.168.1.10", "74.125.142.83"} {
+		name := naming.JoinAddressName(naming.FormatEndpoint("ws", a), "")
+		servers = append(servers, name)
+	}
+
+	got, err = filterAndOrderServers(servers, []string{"ws", "tcp"})
+	if err != nil {
+		t.Fatalf("unexpected error: %s", err)
+	}
+	want = []string{
+		"/@2@ws@127.0.0.3:123@00000000000000000000000000000000@@@@",
+		"/@2@ws@127.0.0.1@00000000000000000000000000000000@@@@",
+		"/@2@ws@74.125.69.139@00000000000000000000000000000000@@@@",
+		"/@2@ws@192.168.1.10@00000000000000000000000000000000@@@@",
+		"/@2@ws@74.125.142.83@00000000000000000000000000000000@@@@",
+		"/@2@tcp@127.0.0.3@00000000000000000000000000000000@@@@",
+		"/@2@tcp@127.0.0.1@00000000000000000000000000000000@@@@",
+		"/@2@tcp@74.125.69.139@00000000000000000000000000000000@@@@",
+		"/@2@tcp@192.168.1.10@00000000000000000000000000000000@@@@",
+		"/@2@tcp@74.125.142.83@00000000000000000000000000000000@@@@",
+	}
+	if !reflect.DeepEqual(got, want) {
+		t.Errorf("got: %v, want %v", got, want)
+	}
+}
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index bbccfd4..9214839 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -36,12 +36,7 @@
 			otherOpts = append(otherOpts, opt)
 		}
 	}
-	// Add the option that provides the runtime's principal to the client.
-	// Set a low timeout for now, until we get parallel connections
-	// going.
-	// TODO(cnicolaou): extend the timeout when parallel connections are
-	// going.
-	otherOpts = append(otherOpts, vc.LocalPrincipal{rt.principal}, &imanager.DialTimeout{5 * time.Second})
+	otherOpts = append(otherOpts, vc.LocalPrincipal{rt.principal}, &imanager.DialTimeout{5 * time.Minute}, rt.preferredProtocols)
 
 	dc, err := iipc.InternalNewDischargeClient(sm, ns, rt.NewContext(), otherOpts...)
 	if err != nil {
diff --git a/runtimes/google/rt/rt.go b/runtimes/google/rt/rt.go
index 4b02edf..80b6e61 100644
--- a/runtimes/google/rt/rt.go
+++ b/runtimes/google/rt/rt.go
@@ -32,19 +32,20 @@
 type vrt struct {
 	mu sync.Mutex
 
-	profile      veyron2.Profile
-	publisher    *config.Publisher
-	sm           []stream.Manager // GUARDED_BY(mu)
-	ns           naming.Namespace
-	signals      chan os.Signal
-	principal    security.Principal
-	client       ipc.Client
-	mgmt         *mgmtImpl
-	flags        flags.RuntimeFlags
-	reservedDisp ipc.Dispatcher
-	reservedOpts []ipc.ServerOpt
-	nServers     int  // GUARDED_BY(mu)
-	cleaningUp   bool // GUARDED_BY(mu)
+	profile            veyron2.Profile
+	publisher          *config.Publisher
+	sm                 []stream.Manager // GUARDED_BY(mu)
+	ns                 naming.Namespace
+	signals            chan os.Signal
+	principal          security.Principal
+	client             ipc.Client
+	mgmt               *mgmtImpl
+	flags              flags.RuntimeFlags
+	preferredProtocols options.PreferredProtocols
+	reservedDisp       ipc.Dispatcher
+	reservedOpts       []ipc.ServerOpt
+	nServers           int  // GUARDED_BY(mu)
+	cleaningUp         bool // GUARDED_BY(mu)
 
 	lang       i18n.LangID    // Language, from environment variables.
 	program    string         // Program name, from os.Args[0].
@@ -81,6 +82,8 @@
 			rt.principal = v.Principal
 		case options.Profile:
 			rt.profile = v.Profile
+		case options.PreferredProtocols:
+			rt.preferredProtocols = v
 		default:
 			return nil, fmt.Errorf("option has wrong type %T", o)
 		}
diff --git a/runtimes/google/rt/signal_test.go b/runtimes/google/rt/signal_test.go
index e698e77..f478d6d 100644
--- a/runtimes/google/rt/signal_test.go
+++ b/runtimes/google/rt/signal_test.go
@@ -30,8 +30,8 @@
 	return "test"
 }
 
-func (mp *myprofile) Runtime() string {
-	return "google"
+func (mp *myprofile) Runtime() (string, []veyron2.ROpt) {
+	return "google", nil
 }
 
 func (mp *myprofile) Platform() *veyron2.Platform {