runtimes/google/ipc: client side RPC improvements:

- implement 'ordering' of multiple endpoints.
- implement parallel connection setup

Change-Id: I9e2ba5441b681cd1bd0ef81d0718b61d5d237e35
diff --git a/runtimes/google/ipc/sort_endpoints.go b/runtimes/google/ipc/sort_endpoints.go
new file mode 100644
index 0000000..ab9f730
--- /dev/null
+++ b/runtimes/google/ipc/sort_endpoints.go
@@ -0,0 +1,227 @@
+package ipc
+
+import (
+	"fmt"
+	"net"
+	"strings"
+
+	"veyron.io/veyron/veyron2/naming"
+	"veyron.io/veyron/veyron2/vlog"
+
+	"veyron.io/veyron/veyron/lib/netstate"
+	"veyron.io/veyron/veyron/runtimes/google/ipc/version"
+	inaming "veyron.io/veyron/veyron/runtimes/google/naming"
+)
+
+type errorAccumulator struct {
+	errs []error
+}
+
+func (e *errorAccumulator) add(err error) {
+	e.errs = append(e.errs, err)
+}
+
+func (e *errorAccumulator) failed() bool {
+	return len(e.errs) > 0
+}
+
+func (e *errorAccumulator) String() string {
+	r := ""
+	for _, err := range e.errs {
+		r += fmt.Sprintf("(%s)", err)
+	}
+	return r
+}
+
+// TODO(cnicolaou): simplify this code, especially the use of maps+slices
+// and special cases.
+
+func newErrorAccumulator() *errorAccumulator {
+	return &errorAccumulator{errs: make([]error, 0, 4)}
+}
+
+type serverEndpoint struct {
+	iep    *inaming.Endpoint
+	suffix string
+}
+
+func (se *serverEndpoint) String() string {
+	return fmt.Sprintf("(%s, %q)", se.iep, se.suffix)
+}
+
+func filterCompatibleEndpoints(errs *errorAccumulator, servers []string) []*serverEndpoint {
+	se := make([]*serverEndpoint, 0, len(servers))
+	for _, server := range servers {
+		name := server
+		address, suffix := naming.SplitAddressName(name)
+		if len(address) == 0 {
+			errs.add(fmt.Errorf("%q is not a rooted name", name))
+			continue
+		}
+		iep, err := inaming.NewEndpoint(address)
+		if err != nil {
+			errs.add(fmt.Errorf("%q: %s", name, err))
+			continue
+		}
+		if err = version.CheckCompatibility(iep); err != nil {
+			errs.add(fmt.Errorf("%q: %s", name, err))
+			continue
+		}
+		sep := &serverEndpoint{iep, suffix}
+		se = append(se, sep)
+	}
+	return se
+}
+
+func sortByProtocol(eps []*serverEndpoint) map[string][]*serverEndpoint {
+	byProtocol := make(map[string][]*serverEndpoint)
+	for _, ep := range eps {
+		p := ep.iep.Protocol
+		byProtocol[p] = append(byProtocol[p], ep)
+	}
+	return byProtocol
+}
+
+func unmatchedProtocols(hashed map[string][]*serverEndpoint, protocols []string) []*serverEndpoint {
+	unmatched := make([]*serverEndpoint, 0, 10)
+	for p, eps := range hashed {
+		found := false
+		for _, preferred := range protocols {
+			if p == preferred {
+				found = true
+				break
+			}
+		}
+		if !found {
+			unmatched = append(unmatched, eps...)
+		}
+	}
+	return unmatched
+}
+
+func orderByLocality(ifcs netstate.AddrList, eps []*serverEndpoint) []*serverEndpoint {
+	if len(ifcs) <= 1 {
+		return append([]*serverEndpoint{}, eps...)
+	}
+	ipnets := make([]*net.IPNet, 0, len(ifcs))
+	for _, a := range ifcs {
+		// Try IP
+		_, ipnet, err := net.ParseCIDR(a.Address().String())
+		if err != nil {
+			continue
+		}
+		ipnets = append(ipnets, ipnet)
+	}
+	if len(ipnets) == 0 {
+		return eps
+	}
+	// TODO(cnicolaou): this can obviously be made more efficient...
+	local := make([]*serverEndpoint, 0, len(eps))
+	remote := make([]*serverEndpoint, 0, len(eps))
+	notip := make([]*serverEndpoint, 0, len(eps))
+	for _, ep := range eps {
+		if strings.HasPrefix(ep.iep.Protocol, "tcp") || strings.HasPrefix(ep.iep.Protocol, "ws") {
+			// Take care to use the Address directly, since the network
+			// may be marked as a 'websocket'. This throws out any thought
+			// of dealing with IPv6 etc and web sockets.
+			host, _, err := net.SplitHostPort(ep.iep.Address)
+			if err != nil {
+				host = ep.iep.Address
+			}
+			ip := net.ParseIP(host)
+			if ip == nil {
+				notip = append(notip, ep)
+				continue
+			}
+			found := false
+			for _, ipnet := range ipnets {
+				if ipnet.Contains(ip) {
+					local = append(local, ep)
+					found = true
+					break
+				}
+			}
+			if !found {
+				remote = append(remote, ep)
+			}
+		} else {
+			notip = append(notip, ep)
+		}
+	}
+	return append(local, append(remote, notip...)...)
+}
+
+func slice(eps []*serverEndpoint) []string {
+	r := make([]string, len(eps))
+	for i, a := range eps {
+		r[i] = naming.JoinAddressName(a.iep.String(), a.suffix)
+	}
+	return r
+}
+
+func sliceByProtocol(eps map[string][]*serverEndpoint, protocols []string) []string {
+	r := make([]string, 0, 10)
+	for _, p := range protocols {
+		r = append(r, slice(eps[p])...)
+	}
+	return r
+}
+
+var defaultPreferredProtocolOrder = []string{"unixfd", "tcp4", "tcp", "tcp6"}
+
+func filterAndOrderServers(servers []string, protocols []string) ([]string, error) {
+	errs := newErrorAccumulator()
+	vlog.VI(3).Infof("Candidates[%v]: %v", protocols, servers)
+	compatible := filterCompatibleEndpoints(errs, servers)
+	if len(compatible) == 0 {
+		return nil, fmt.Errorf("failed to find any compatible servers: %s", errs)
+	}
+	vlog.VI(3).Infof("Version Compatible: %v", compatible)
+
+	// put the server endpoints into per-protocol lists
+	byProtocol := sortByProtocol(compatible)
+
+	if len(protocols) > 0 {
+		found := 0
+		for _, p := range protocols {
+			found += len(byProtocol[p])
+		}
+		if found == 0 {
+			return nil, fmt.Errorf("failed to find any servers compatible with %v from %s", protocols, servers)
+		}
+	}
+
+	// If a set of protocols is specified, then we will order
+	// and return endpoints that contain those protocols alone.
+	// However, if no protocols are supplied we'll order by
+	// a default ordering but append any endpoints that don't belong
+	// to that default ordering set to the returned endpoints.
+	remaining := []*serverEndpoint{}
+	preferredProtocolOrder := defaultPreferredProtocolOrder
+	if len(protocols) > 0 {
+		preferredProtocolOrder = protocols
+	} else {
+		remaining = unmatchedProtocols(byProtocol, preferredProtocolOrder)
+	}
+
+	vlog.VI(3).Infof("Have Protocols(%v): %v", protocols, byProtocol)
+
+	networks, err := netstate.GetAll()
+	if err != nil {
+		r := sliceByProtocol(byProtocol, preferredProtocolOrder)
+		r = append(r, slice(remaining)...)
+		return r, nil
+	}
+
+	ordered := make([]*serverEndpoint, 0, len(byProtocol))
+	for _, protocol := range preferredProtocolOrder {
+		o := orderByLocality(networks, byProtocol[protocol])
+		ordered = append(ordered, o...)
+	}
+
+	if len(protocols) == 0 {
+		ordered = append(ordered, remaining...)
+	}
+	vlog.VI(2).Infof("Ordered By Locality: %v", ordered)
+	return slice(ordered), nil
+}