runtimes/google/ipc: client side RPC improvements:
- implement 'ordering' of multiple endpoints.
- implement parallel connection setup
Change-Id: I9e2ba5441b681cd1bd0ef81d0718b61d5d237e35
diff --git a/lib/netstate/netstate.go b/lib/netstate/netstate.go
index ff276b8..e9a61a5 100644
--- a/lib/netstate/netstate.go
+++ b/lib/netstate/netstate.go
@@ -225,7 +225,6 @@
return true
default:
return false
-
}
}
@@ -378,7 +377,6 @@
if err != nil {
return false, err
}
-
ips := make(map[string]struct{})
for _, a := range addrs {
ip, _, err := net.ParseCIDR(a.Address().String())
@@ -392,7 +390,6 @@
if err != nil {
return false, err
}
-
_, islocal := ips[client]
return islocal, nil
}
diff --git a/profiles/chrome/chrome.go b/profiles/chrome/chrome.go
new file mode 100644
index 0000000..22b5beb
--- /dev/null
+++ b/profiles/chrome/chrome.go
@@ -0,0 +1,49 @@
+// Package chrome implements a profile for use within Chrome, in particular
+// for use by Chrome extensions.
+package chrome
+
+import (
+ "veyron.io/veyron/veyron2"
+ "veyron.io/veyron/veyron2/config"
+ "veyron.io/veyron/veyron2/ipc"
+ "veyron.io/veyron/veyron2/options"
+ "veyron.io/veyron/veyron2/rt"
+
+ "veyron.io/veyron/veyron/profiles"
+)
+
+var ListenSpec = ipc.ListenSpec{}
+
+type chrome struct{}
+
+func init() {
+ rt.RegisterProfile(New())
+}
+
+// New returns a new instance of a Profile for use within chrome, in particular
+// chrome extensions etc should use.
+func New() veyron2.Profile {
+ return &chrome{}
+}
+
+func (*chrome) Name() string {
+ return "chrome"
+}
+
+func (*chrome) Runtime() (string, []veyron2.ROpt) {
+ return veyron2.GoogleRuntimeName, []veyron2.ROpt{options.PreferredProtocols{"ws"}}
+}
+
+func (*chrome) Platform() *veyron2.Platform {
+ p, _ := profiles.Platform()
+ return p
+}
+
+func (g *chrome) Init(rt veyron2.Runtime, _ *config.Publisher) error {
+ rt.Logger().VI(1).Infof("%s", g)
+ return nil
+}
+
+func (g *chrome) String() string {
+ return "chrome profile on " + g.Platform().String()
+}
diff --git a/profiles/gce/init.go b/profiles/gce/init.go
index 4102b78..b5afa26 100644
--- a/profiles/gce/init.go
+++ b/profiles/gce/init.go
@@ -41,8 +41,8 @@
return "GCE"
}
-func (p *profile) Runtime() string {
- return ""
+func (p *profile) Runtime() (string, []veyron2.ROpt) {
+ return "", nil
}
func (p *profile) Platform() *veyron2.Platform {
diff --git a/profiles/generic.go b/profiles/generic.go
index 0d9dda2..9aadc25 100644
--- a/profiles/generic.go
+++ b/profiles/generic.go
@@ -35,8 +35,8 @@
return "generic"
}
-func (*generic) Runtime() string {
- return veyron2.GoogleRuntimeName
+func (*generic) Runtime() (string, []veyron2.ROpt) {
+ return veyron2.GoogleRuntimeName, nil
}
func (*generic) Platform() *veyron2.Platform {
diff --git a/profiles/roaming/roaming.go b/profiles/roaming/roaming.go
index 33a1c20..d4c0262 100644
--- a/profiles/roaming/roaming.go
+++ b/profiles/roaming/roaming.go
@@ -61,8 +61,8 @@
return "roaming" + p.gce
}
-func (p *profile) Runtime() string {
- return veyron2.GoogleRuntimeName
+func (p *profile) Runtime() (string, []veyron2.ROpt) {
+ return veyron2.GoogleRuntimeName, nil
}
func (p *profile) String() string {
diff --git a/profiles/static/static.go b/profiles/static/static.go
index 7ff75b1..377874a 100644
--- a/profiles/static/static.go
+++ b/profiles/static/static.go
@@ -46,8 +46,8 @@
return "static" + p.gce
}
-func (p *static) Runtime() string {
- return "google"
+func (p *static) Runtime() (string, []veyron2.ROpt) {
+ return "google", nil
}
func (*static) Platform() *veyron2.Platform {
diff --git a/runtimes/google/ipc/client.go b/runtimes/google/ipc/client.go
index ce6e65c..1b72b7a 100644
--- a/runtimes/google/ipc/client.go
+++ b/runtimes/google/ipc/client.go
@@ -29,6 +29,7 @@
var (
errNoServers = verror.NoExistf("ipc: no servers")
+ errNoAccess = verror.NoAccessf("ipc: client unwilling to access to server")
errFlowClosed = verror.Abortedf("ipc: flow closed")
errRemainingStreamResults = verror.BadProtocolf("ipc: Finish called with remaining streaming results")
errNonRootedName = verror.BadArgf("ipc: cannot connect to a non-rooted name")
@@ -40,9 +41,10 @@
const enableSecureServerAuth = false
type client struct {
- streamMgr stream.Manager
- ns naming.Namespace
- vcOpts []stream.VCOpt // vc opts passed to dial
+ streamMgr stream.Manager
+ ns naming.Namespace
+ vcOpts []stream.VCOpt // vc opts passed to dial
+ preferredProtocols []string
// We support concurrent calls to StartCall and Close, so we must protect the
// vcMap. Everything else is initialized upon client construction, and safe
@@ -63,6 +65,7 @@
}
func InternalNewClient(streamMgr stream.Manager, ns naming.Namespace, opts ...ipc.ClientOpt) (ipc.Client, error) {
+
c := &client{
streamMgr: streamMgr,
ns: ns,
@@ -73,16 +76,23 @@
c.dc = dc
}
// Collect all client opts that are also vc opts.
- if vcOpt, ok := opt.(stream.VCOpt); ok {
- c.vcOpts = append(c.vcOpts, vcOpt)
+ switch v := opt.(type) {
+ case stream.VCOpt:
+ c.vcOpts = append(c.vcOpts, v)
+ case options.PreferredProtocols:
+ c.preferredProtocols = v
}
}
+
return c, nil
}
func (c *client) createFlow(ep naming.Endpoint) (stream.Flow, error) {
c.vcMapMu.Lock()
defer c.vcMapMu.Unlock()
+ if c.vcMap == nil {
+ return nil, fmt.Errorf("client has been closed")
+ }
if vcinfo := c.vcMap[ep.String()]; vcinfo != nil {
if flow, err := vcinfo.vc.Connect(); err == nil {
return flow, nil
@@ -95,12 +105,17 @@
// before removing the vc from the map?
delete(c.vcMap, ep.String())
}
+ sm := c.streamMgr
c.vcMapMu.Unlock()
- vc, err := c.streamMgr.Dial(ep, c.vcOpts...)
+ vc, err := sm.Dial(ep, c.vcOpts...)
c.vcMapMu.Lock()
if err != nil {
return nil, err
}
+ if c.vcMap == nil {
+ sm.ShutdownEndpoint(ep)
+ return nil, fmt.Errorf("client has been closed")
+ }
if othervc, exists := c.vcMap[ep.String()]; exists {
vc = othervc.vc
// TODO(ashankar,toddw): Figure out how to close up the VC that
@@ -253,79 +268,211 @@
return nil, lastErr
}
-// tryCall makes a single attempt at a call.
+type serverStatus struct {
+ index int
+ suffix string
+ flow stream.Flow
+ processed bool
+ err verror.E
+}
+
+func (c *client) tryServer(index int, server string, ch chan<- *serverStatus, done <-chan struct{}) {
+ select {
+ case <-done:
+ return
+ default:
+ }
+ status := &serverStatus{index: index}
+ flow, suffix, err := c.connectFlow(server)
+ if err != nil {
+ vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err)
+ status.err = verror.NoExistf("ipc: %q: %s", server, err)
+ ch <- status
+ return
+ }
+ status.suffix = suffix
+ status.flow = flow
+ select {
+ case <-done:
+ flow.Close()
+ default:
+ ch <- status
+ }
+}
+
+// tryCall makes a single attempt at a call, against possibly multiple servers.
func (c *client) tryCall(ctx context.T, name, method string, args []interface{}, opts []ipc.CallOpt) (ipc.Call, verror.E) {
ctx, _ = vtrace.WithNewSpan(ctx, fmt.Sprintf("<client>\"%s\".%s", name, method))
-
_, serverPattern, name := splitObjectName(name)
-
// Resolve name unless told not to.
var servers []string
if getNoResolveOpt(opts) {
servers = []string{name}
} else {
- var err error
- if servers, err = c.ns.Resolve(ctx, name); err != nil {
+ if resolved, err := c.ns.Resolve(ctx, name); err != nil {
return nil, verror.NoExistf("ipc: Resolve(%q) failed: %v", name, err)
+ } else {
+ // An empty set of protocols means all protocols...
+ ordered, err := filterAndOrderServers(resolved, c.preferredProtocols)
+ if len(ordered) == 0 {
+ return nil, verror.NoExistf("ipc: %q: %s", name, err)
+ }
+ servers = ordered
}
}
+ // servers is now orderd by the priority heurestic implemented in
+ // filterAndOrderServers.
+ attempts := len(servers)
+ if attempts == 0 {
+ return nil, errNoServers
+ }
- // Try all servers, and if none of them are authorized for the call then return the error of the last server
- // that was tried.
- var lastErr verror.E
- // TODO(cnicolaou): sort servers by sensible metric.
- for _, server := range servers {
- flow, suffix, err := c.connectFlow(server)
- if err != nil {
- lastErr = verror.NoExistf("ipc: couldn't connect to server %v: %v", server, err)
- vlog.VI(2).Infof("ipc: couldn't connect to server %v: %v", server, err)
- continue // Try the next server.
- }
- flow.SetDeadline(ctx.Done())
- var (
- serverB []string
- grantedB security.Blessings
- )
- // LocalPrincipal is nil means that the client wanted to avoid authentication,
- // and thus wanted to skip authorization as well.
- if flow.LocalPrincipal() != nil {
- // Validate caveats on the server's identity for the context associated with this call.
- if serverB, grantedB, err = c.authorizeServer(flow, name, method, serverPattern, opts); err != nil {
- lastErr = verror.NoAccessf("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
- flow.Close()
- continue
+ // Try to connect to all servers in parallel.
+ responses := make([]*serverStatus, attempts)
+
+ // Provide sufficient buffering for all of the connections to finish
+ // instantaneously. This is important because we want to process
+ // the responses in priority order; that order is indicated by the
+ // order of entries in servers. So, if two respones come in at the
+ // same 'instant', we prefer the first in the slice.
+ ch := make(chan *serverStatus, attempts)
+
+ // Read as many responses as we can before we would block.
+ gatherResponses := func() {
+ for {
+ select {
+ default:
+ return
+ case s := <-ch:
+ responses[s.index] = s
}
}
-
- lastErr = nil
- fc := newFlowClient(ctx, serverB, flow, c.dc)
-
- if doneChan := ctx.Done(); doneChan != nil {
- go func() {
- select {
- case <-ctx.Done():
- fc.Cancel()
- case <-fc.flow.Closed():
- }
- }()
- }
-
- timeout := time.Duration(ipc.NoTimeout)
- if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
- timeout = deadline.Sub(time.Now())
- }
- if verr := fc.start(suffix, method, args, timeout, grantedB); verr != nil {
- return nil, verr
- }
- return fc, nil
}
- if lastErr != nil {
- // If there was any problem starting the call, flush the cache entry under the
- // assumption that it was caused by stale data.
+
+ delay := time.Duration(ipc.NoTimeout)
+ if dl, set := ctx.Deadline(); set {
+ delay = dl.Sub(time.Now())
+ }
+ timeoutChan := time.After(delay)
+
+ // We'll close this channel when an RPC has been started and we've
+ // irrevocably selected a server.
+ done := make(chan struct{})
+ // Try all of the servers in parallel.
+ for i, server := range servers {
+ go c.tryServer(i, server, ch, done)
+ }
+
+ select {
+ case <-timeoutChan:
+ // All calls failed if we get here.
+ close(done)
c.ns.FlushCacheEntry(name)
- return nil, lastErr
+ return nil, verror.NoExistf("ipc: couldn't connect to server %v", name)
+ case s := <-ch:
+ responses[s.index] = s
+ gatherResponses()
}
- return nil, errNoServers
+
+ accessErrs := []error{}
+ connErrs := []error{}
+ for {
+
+ for _, r := range responses {
+ if r == nil || r.err != nil {
+ if r != nil && r.err != nil && !r.processed {
+ connErrs = append(connErrs, r.err)
+ r.processed = true
+ }
+ continue
+ }
+
+ flow := r.flow
+ suffix := r.suffix
+ flow.SetDeadline(ctx.Done())
+
+ var (
+ serverB []string
+ grantedB security.Blessings
+ )
+
+ // LocalPrincipal is nil means that the client wanted to avoid
+ // authentication, and thus wanted to skip authorization as well.
+ if flow.LocalPrincipal() != nil {
+ // Validate caveats on the server's identity for the context associated with this call.
+ var err error
+ if serverB, grantedB, err = c.authorizeServer(flow, name, method, serverPattern, opts); err != nil {
+ vlog.VI(2).Infof("ipc: client unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
+ if !r.processed {
+ accessErrs = append(accessErrs, err)
+ r.err = verror.NoAccessf("ipc: unwilling to invoke %q.%q on server %v: %v", name, method, flow.RemoteBlessings(), err)
+ r.processed = true
+ }
+ flow.Close()
+ continue
+ }
+ }
+
+ // This is the 'point of no return', so we tell the tryServer
+ // goroutines to not bother sending us any more flows.
+ // Once the RPC is started (fc.start below) we can't be sure
+ // if it makes it to the server or not so, this code will
+ // never call fc.start more than once to ensure that we
+ // provide 'at-most-once' rpc semantics at this level. Retrying
+ // the network connections (i.e. creating flows) is fine since
+ // we can cleanup that state if we abort a call (i.e. close the
+ // flow).
+ close(done)
+
+ fc := newFlowClient(ctx, serverB, flow, c.dc)
+
+ if doneChan := ctx.Done(); doneChan != nil {
+ go func() {
+ select {
+ case <-ctx.Done():
+ fc.Cancel()
+ case <-fc.flow.Closed():
+ }
+ }()
+ }
+
+ timeout := time.Duration(ipc.NoTimeout)
+ if deadline, hasDeadline := ctx.Deadline(); hasDeadline {
+ timeout = deadline.Sub(time.Now())
+ }
+ if verr := fc.start(suffix, method, args, timeout, grantedB); verr != nil {
+ return nil, verr
+ }
+ return fc, nil
+ }
+
+ // Quit if we've seen an error from all parallel connection attempts
+ handled := 0
+ for _, r := range responses {
+ if r != nil && r.err != nil {
+ handled++
+ }
+ }
+ if handled == len(responses) {
+ break
+ }
+
+ select {
+ case <-timeoutChan:
+ // All remaining calls failed if we get here.
+ vlog.VI(2).Infof("ipc: couldn't connect to server %v", name)
+ goto quit
+ case s := <-ch:
+ responses[s.index] = s
+ gatherResponses()
+ }
+ }
+quit:
+ close(done)
+ c.ns.FlushCacheEntry(name)
+ // TODO(cnicolaou): introduce a third error code here for mixed
+ // conn/access errors.
+ return nil, verror.NoExistf("ipc: client failed to invoke %q.%q: on %v", name, method, servers, append(connErrs, accessErrs...))
}
// authorizeServer validates that the server (remote end of flow) has the credentials to serve
diff --git a/runtimes/google/ipc/sort_endpoints.go b/runtimes/google/ipc/sort_endpoints.go
new file mode 100644
index 0000000..ab9f730
--- /dev/null
+++ b/runtimes/google/ipc/sort_endpoints.go
@@ -0,0 +1,227 @@
+package ipc
+
+import (
+ "fmt"
+ "net"
+ "strings"
+
+ "veyron.io/veyron/veyron2/naming"
+ "veyron.io/veyron/veyron2/vlog"
+
+ "veyron.io/veyron/veyron/lib/netstate"
+ "veyron.io/veyron/veyron/runtimes/google/ipc/version"
+ inaming "veyron.io/veyron/veyron/runtimes/google/naming"
+)
+
+type errorAccumulator struct {
+ errs []error
+}
+
+func (e *errorAccumulator) add(err error) {
+ e.errs = append(e.errs, err)
+}
+
+func (e *errorAccumulator) failed() bool {
+ return len(e.errs) > 0
+}
+
+func (e *errorAccumulator) String() string {
+ r := ""
+ for _, err := range e.errs {
+ r += fmt.Sprintf("(%s)", err)
+ }
+ return r
+}
+
+// TODO(cnicolaou): simplify this code, especially the use of maps+slices
+// and special cases.
+
+func newErrorAccumulator() *errorAccumulator {
+ return &errorAccumulator{errs: make([]error, 0, 4)}
+}
+
+type serverEndpoint struct {
+ iep *inaming.Endpoint
+ suffix string
+}
+
+func (se *serverEndpoint) String() string {
+ return fmt.Sprintf("(%s, %q)", se.iep, se.suffix)
+}
+
+func filterCompatibleEndpoints(errs *errorAccumulator, servers []string) []*serverEndpoint {
+ se := make([]*serverEndpoint, 0, len(servers))
+ for _, server := range servers {
+ name := server
+ address, suffix := naming.SplitAddressName(name)
+ if len(address) == 0 {
+ errs.add(fmt.Errorf("%q is not a rooted name", name))
+ continue
+ }
+ iep, err := inaming.NewEndpoint(address)
+ if err != nil {
+ errs.add(fmt.Errorf("%q: %s", name, err))
+ continue
+ }
+ if err = version.CheckCompatibility(iep); err != nil {
+ errs.add(fmt.Errorf("%q: %s", name, err))
+ continue
+ }
+ sep := &serverEndpoint{iep, suffix}
+ se = append(se, sep)
+ }
+ return se
+}
+
+func sortByProtocol(eps []*serverEndpoint) map[string][]*serverEndpoint {
+ byProtocol := make(map[string][]*serverEndpoint)
+ for _, ep := range eps {
+ p := ep.iep.Protocol
+ byProtocol[p] = append(byProtocol[p], ep)
+ }
+ return byProtocol
+}
+
+func unmatchedProtocols(hashed map[string][]*serverEndpoint, protocols []string) []*serverEndpoint {
+ unmatched := make([]*serverEndpoint, 0, 10)
+ for p, eps := range hashed {
+ found := false
+ for _, preferred := range protocols {
+ if p == preferred {
+ found = true
+ break
+ }
+ }
+ if !found {
+ unmatched = append(unmatched, eps...)
+ }
+ }
+ return unmatched
+}
+
+func orderByLocality(ifcs netstate.AddrList, eps []*serverEndpoint) []*serverEndpoint {
+ if len(ifcs) <= 1 {
+ return append([]*serverEndpoint{}, eps...)
+ }
+ ipnets := make([]*net.IPNet, 0, len(ifcs))
+ for _, a := range ifcs {
+ // Try IP
+ _, ipnet, err := net.ParseCIDR(a.Address().String())
+ if err != nil {
+ continue
+ }
+ ipnets = append(ipnets, ipnet)
+ }
+ if len(ipnets) == 0 {
+ return eps
+ }
+ // TODO(cnicolaou): this can obviously be made more efficient...
+ local := make([]*serverEndpoint, 0, len(eps))
+ remote := make([]*serverEndpoint, 0, len(eps))
+ notip := make([]*serverEndpoint, 0, len(eps))
+ for _, ep := range eps {
+ if strings.HasPrefix(ep.iep.Protocol, "tcp") || strings.HasPrefix(ep.iep.Protocol, "ws") {
+ // Take care to use the Address directly, since the network
+ // may be marked as a 'websocket'. This throws out any thought
+ // of dealing with IPv6 etc and web sockets.
+ host, _, err := net.SplitHostPort(ep.iep.Address)
+ if err != nil {
+ host = ep.iep.Address
+ }
+ ip := net.ParseIP(host)
+ if ip == nil {
+ notip = append(notip, ep)
+ continue
+ }
+ found := false
+ for _, ipnet := range ipnets {
+ if ipnet.Contains(ip) {
+ local = append(local, ep)
+ found = true
+ break
+ }
+ }
+ if !found {
+ remote = append(remote, ep)
+ }
+ } else {
+ notip = append(notip, ep)
+ }
+ }
+ return append(local, append(remote, notip...)...)
+}
+
+func slice(eps []*serverEndpoint) []string {
+ r := make([]string, len(eps))
+ for i, a := range eps {
+ r[i] = naming.JoinAddressName(a.iep.String(), a.suffix)
+ }
+ return r
+}
+
+func sliceByProtocol(eps map[string][]*serverEndpoint, protocols []string) []string {
+ r := make([]string, 0, 10)
+ for _, p := range protocols {
+ r = append(r, slice(eps[p])...)
+ }
+ return r
+}
+
+var defaultPreferredProtocolOrder = []string{"unixfd", "tcp4", "tcp", "tcp6"}
+
+func filterAndOrderServers(servers []string, protocols []string) ([]string, error) {
+ errs := newErrorAccumulator()
+ vlog.VI(3).Infof("Candidates[%v]: %v", protocols, servers)
+ compatible := filterCompatibleEndpoints(errs, servers)
+ if len(compatible) == 0 {
+ return nil, fmt.Errorf("failed to find any compatible servers: %s", errs)
+ }
+ vlog.VI(3).Infof("Version Compatible: %v", compatible)
+
+ // put the server endpoints into per-protocol lists
+ byProtocol := sortByProtocol(compatible)
+
+ if len(protocols) > 0 {
+ found := 0
+ for _, p := range protocols {
+ found += len(byProtocol[p])
+ }
+ if found == 0 {
+ return nil, fmt.Errorf("failed to find any servers compatible with %v from %s", protocols, servers)
+ }
+ }
+
+ // If a set of protocols is specified, then we will order
+ // and return endpoints that contain those protocols alone.
+ // However, if no protocols are supplied we'll order by
+ // a default ordering but append any endpoints that don't belong
+ // to that default ordering set to the returned endpoints.
+ remaining := []*serverEndpoint{}
+ preferredProtocolOrder := defaultPreferredProtocolOrder
+ if len(protocols) > 0 {
+ preferredProtocolOrder = protocols
+ } else {
+ remaining = unmatchedProtocols(byProtocol, preferredProtocolOrder)
+ }
+
+ vlog.VI(3).Infof("Have Protocols(%v): %v", protocols, byProtocol)
+
+ networks, err := netstate.GetAll()
+ if err != nil {
+ r := sliceByProtocol(byProtocol, preferredProtocolOrder)
+ r = append(r, slice(remaining)...)
+ return r, nil
+ }
+
+ ordered := make([]*serverEndpoint, 0, len(byProtocol))
+ for _, protocol := range preferredProtocolOrder {
+ o := orderByLocality(networks, byProtocol[protocol])
+ ordered = append(ordered, o...)
+ }
+
+ if len(protocols) == 0 {
+ ordered = append(ordered, remaining...)
+ }
+ vlog.VI(2).Infof("Ordered By Locality: %v", ordered)
+ return slice(ordered), nil
+}
diff --git a/runtimes/google/ipc/sort_internal_test.go b/runtimes/google/ipc/sort_internal_test.go
new file mode 100644
index 0000000..5b9b4cf
--- /dev/null
+++ b/runtimes/google/ipc/sort_internal_test.go
@@ -0,0 +1,169 @@
+package ipc
+
+import (
+ "reflect"
+ "strings"
+ "testing"
+
+ "veyron.io/veyron/veyron2/ipc/version"
+ "veyron.io/veyron/veyron2/naming"
+ "veyron.io/veyron/veyron2/vlog"
+)
+
+func TestIncompatible(t *testing.T) {
+ servers := []string{}
+
+ _, err := filterAndOrderServers(servers, []string{"tcp"})
+ if err == nil || err.Error() != "failed to find any compatible servers: " {
+ t.Errorf("expected a different error: %v", err)
+ }
+
+ for _, a := range []string{"127.0.0.1", "127.0.0.2"} {
+ addr := naming.FormatEndpoint("tcp", a, version.IPCVersionRange{100, 200})
+ name := naming.JoinAddressName(addr, "")
+ servers = append(servers, name)
+ }
+
+ _, err = filterAndOrderServers(servers, []string{"tcp"})
+ if err == nil || (!strings.HasPrefix(err.Error(), "failed to find any compatible servers:") && !strings.Contains(err.Error(), "No compatible IPC versions available")) {
+ vlog.Infof("A: %t . %t", strings.HasPrefix(err.Error(), "failed to find any compatible servers:"), !strings.Contains(err.Error(), "No compatible IPC versions available"))
+ t.Errorf("expected a different error to: %v", err)
+ }
+
+ for _, a := range []string{"127.0.0.3", "127.0.0.4"} {
+ name := naming.JoinAddressName(naming.FormatEndpoint("tcp", a), "")
+ servers = append(servers, name)
+ }
+
+ _, err = filterAndOrderServers(servers, []string{"foobar"})
+ if err == nil || !strings.HasPrefix(err.Error(), "failed to find any servers compatible with [foobar] ") {
+ t.Errorf("expected a different error to: %v", err)
+ }
+
+}
+
+func TestOrderingByProtocol(t *testing.T) {
+ servers := []string{}
+ for _, a := range []string{"127.0.0.3", "127.0.0.4"} {
+ name := naming.JoinAddressName(naming.FormatEndpoint("tcp", a), "")
+ servers = append(servers, name)
+ }
+ for _, a := range []string{"127.0.0.1", "127.0.0.2"} {
+ name := naming.JoinAddressName(naming.FormatEndpoint("tcp4", a), "")
+ servers = append(servers, name)
+ }
+ for _, a := range []string{"127.0.0.10", "127.0.0.11"} {
+ name := naming.JoinAddressName(naming.FormatEndpoint("foobar", a), "")
+ servers = append(servers, name)
+ }
+
+ got, err := filterAndOrderServers(servers, []string{"foobar", "tcp"})
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+
+ // Just foobar and tcp
+ want := []string{
+ "/@2@foobar@127.0.0.10@00000000000000000000000000000000@@@@",
+ "/@2@foobar@127.0.0.11@00000000000000000000000000000000@@@@",
+ "/@2@tcp@127.0.0.3@00000000000000000000000000000000@@@@",
+ "/@2@tcp@127.0.0.4@00000000000000000000000000000000@@@@",
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got: %v, want %v", got, want)
+ }
+
+ // Everything, since we didn't specify a protocol
+ got, err = filterAndOrderServers(servers, []string{})
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ // The order will be the default preferred order for protocols, the
+ // original ordering within each protocol, with protocols that
+ // are not in the default ordering list at the end.
+ want = []string{
+ "/@2@tcp4@127.0.0.1@00000000000000000000000000000000@@@@",
+ "/@2@tcp4@127.0.0.2@00000000000000000000000000000000@@@@",
+ "/@2@tcp@127.0.0.3@00000000000000000000000000000000@@@@",
+ "/@2@tcp@127.0.0.4@00000000000000000000000000000000@@@@",
+ "/@2@foobar@127.0.0.10@00000000000000000000000000000000@@@@",
+ "/@2@foobar@127.0.0.11@00000000000000000000000000000000@@@@",
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got: %v, want %v", got, want)
+ }
+
+ // Ask for all protocols, with no ordering, except for locality
+ servers = []string{}
+ for _, a := range []string{"74.125.69.139", "127.0.0.3", "127.0.0.1", "192.168.1.10", "74.125.142.83"} {
+ name := naming.JoinAddressName(naming.FormatEndpoint("tcp", a), "")
+ servers = append(servers, name)
+ }
+ for _, a := range []string{"127.0.0.10", "127.0.0.11"} {
+ name := naming.JoinAddressName(naming.FormatEndpoint("foobar", a), "")
+ servers = append(servers, name)
+ }
+ // Everything, since we didn't specify a protocol
+ got, err = filterAndOrderServers(servers, []string{})
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ want = []string{
+ "/@2@tcp@127.0.0.3@00000000000000000000000000000000@@@@",
+ "/@2@tcp@127.0.0.1@00000000000000000000000000000000@@@@",
+ "/@2@tcp@74.125.69.139@00000000000000000000000000000000@@@@",
+ "/@2@tcp@192.168.1.10@00000000000000000000000000000000@@@@",
+ "/@2@tcp@74.125.142.83@00000000000000000000000000000000@@@@",
+ "/@2@foobar@127.0.0.10@00000000000000000000000000000000@@@@",
+ "/@2@foobar@127.0.0.11@00000000000000000000000000000000@@@@",
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got: %v, want %v", got, want)
+ }
+}
+
+func TestOrderByNetwork(t *testing.T) {
+ servers := []string{}
+ for _, a := range []string{"74.125.69.139", "127.0.0.3", "127.0.0.1", "192.168.1.10", "74.125.142.83"} {
+ name := naming.JoinAddressName(naming.FormatEndpoint("tcp", a), "")
+ servers = append(servers, name)
+ }
+ got, err := filterAndOrderServers(servers, []string{"tcp"})
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ want := []string{
+ "/@2@tcp@127.0.0.3@00000000000000000000000000000000@@@@",
+ "/@2@tcp@127.0.0.1@00000000000000000000000000000000@@@@",
+ "/@2@tcp@74.125.69.139@00000000000000000000000000000000@@@@",
+ "/@2@tcp@192.168.1.10@00000000000000000000000000000000@@@@",
+ "/@2@tcp@74.125.142.83@00000000000000000000000000000000@@@@",
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got: %v, want %v", got, want)
+ }
+ for _, a := range []string{"74.125.69.139", "127.0.0.3:123", "127.0.0.1", "192.168.1.10", "74.125.142.83"} {
+ name := naming.JoinAddressName(naming.FormatEndpoint("ws", a), "")
+ servers = append(servers, name)
+ }
+
+ got, err = filterAndOrderServers(servers, []string{"ws", "tcp"})
+ if err != nil {
+ t.Fatalf("unexpected error: %s", err)
+ }
+ want = []string{
+ "/@2@ws@127.0.0.3:123@00000000000000000000000000000000@@@@",
+ "/@2@ws@127.0.0.1@00000000000000000000000000000000@@@@",
+ "/@2@ws@74.125.69.139@00000000000000000000000000000000@@@@",
+ "/@2@ws@192.168.1.10@00000000000000000000000000000000@@@@",
+ "/@2@ws@74.125.142.83@00000000000000000000000000000000@@@@",
+ "/@2@tcp@127.0.0.3@00000000000000000000000000000000@@@@",
+ "/@2@tcp@127.0.0.1@00000000000000000000000000000000@@@@",
+ "/@2@tcp@74.125.69.139@00000000000000000000000000000000@@@@",
+ "/@2@tcp@192.168.1.10@00000000000000000000000000000000@@@@",
+ "/@2@tcp@74.125.142.83@00000000000000000000000000000000@@@@",
+ }
+ if !reflect.DeepEqual(got, want) {
+ t.Errorf("got: %v, want %v", got, want)
+ }
+}
diff --git a/runtimes/google/rt/ipc.go b/runtimes/google/rt/ipc.go
index bbccfd4..9214839 100644
--- a/runtimes/google/rt/ipc.go
+++ b/runtimes/google/rt/ipc.go
@@ -36,12 +36,7 @@
otherOpts = append(otherOpts, opt)
}
}
- // Add the option that provides the runtime's principal to the client.
- // Set a low timeout for now, until we get parallel connections
- // going.
- // TODO(cnicolaou): extend the timeout when parallel connections are
- // going.
- otherOpts = append(otherOpts, vc.LocalPrincipal{rt.principal}, &imanager.DialTimeout{5 * time.Second})
+ otherOpts = append(otherOpts, vc.LocalPrincipal{rt.principal}, &imanager.DialTimeout{5 * time.Minute}, rt.preferredProtocols)
dc, err := iipc.InternalNewDischargeClient(sm, ns, rt.NewContext(), otherOpts...)
if err != nil {
diff --git a/runtimes/google/rt/rt.go b/runtimes/google/rt/rt.go
index 4b02edf..80b6e61 100644
--- a/runtimes/google/rt/rt.go
+++ b/runtimes/google/rt/rt.go
@@ -32,19 +32,20 @@
type vrt struct {
mu sync.Mutex
- profile veyron2.Profile
- publisher *config.Publisher
- sm []stream.Manager // GUARDED_BY(mu)
- ns naming.Namespace
- signals chan os.Signal
- principal security.Principal
- client ipc.Client
- mgmt *mgmtImpl
- flags flags.RuntimeFlags
- reservedDisp ipc.Dispatcher
- reservedOpts []ipc.ServerOpt
- nServers int // GUARDED_BY(mu)
- cleaningUp bool // GUARDED_BY(mu)
+ profile veyron2.Profile
+ publisher *config.Publisher
+ sm []stream.Manager // GUARDED_BY(mu)
+ ns naming.Namespace
+ signals chan os.Signal
+ principal security.Principal
+ client ipc.Client
+ mgmt *mgmtImpl
+ flags flags.RuntimeFlags
+ preferredProtocols options.PreferredProtocols
+ reservedDisp ipc.Dispatcher
+ reservedOpts []ipc.ServerOpt
+ nServers int // GUARDED_BY(mu)
+ cleaningUp bool // GUARDED_BY(mu)
lang i18n.LangID // Language, from environment variables.
program string // Program name, from os.Args[0].
@@ -81,6 +82,8 @@
rt.principal = v.Principal
case options.Profile:
rt.profile = v.Profile
+ case options.PreferredProtocols:
+ rt.preferredProtocols = v
default:
return nil, fmt.Errorf("option has wrong type %T", o)
}
diff --git a/runtimes/google/rt/signal_test.go b/runtimes/google/rt/signal_test.go
index e698e77..f478d6d 100644
--- a/runtimes/google/rt/signal_test.go
+++ b/runtimes/google/rt/signal_test.go
@@ -30,8 +30,8 @@
return "test"
}
-func (mp *myprofile) Runtime() string {
- return "google"
+func (mp *myprofile) Runtime() (string, []veyron2.ROpt) {
+ return "google", nil
}
func (mp *myprofile) Platform() *veyron2.Platform {