veyron/runtimes/google/naming/namespace: Run globAtServer in parallel
With this change, globAtServer() will initiate a StartCall to all
servers in parallel. As soon as one returns successfully, it cancels the
others, and continues with the request.
The previous behavior was to try one server at a time until it found one
that works, which means that globAtServer could take up to N * callTimeout
to complete.
Also, increase the callTimeout to 30 sec to deal with our very slow ipc
Stack.
Change-Id: Idebf889f30d8ee6666f7cb211ea25f9fcb66ade4
diff --git a/runtimes/google/naming/namespace/glob.go b/runtimes/google/naming/namespace/glob.go
index d79f099..8b78731 100644
--- a/runtimes/google/naming/namespace/glob.go
+++ b/runtimes/google/naming/namespace/glob.go
@@ -41,63 +41,93 @@
// return if we go through them all and noone answers.
var lastErr error
- // Trying each instance till we get one that works.
- for _, s := range t.me.Servers {
- // Don't further resolve s.Server.
- callCtx, _ := context.WithTimeout(ctx, callTimeout)
- call, err := client.StartCall(callCtx, naming.JoinAddressName(s.Server, ""), ipc.GlobMethod, []interface{}{pstr}, options.NoResolve{})
- if err != nil {
- lastErr = err
- continue // try another instance
+ type tryResult struct {
+ index int
+ call ipc.Call
+ err error
+ }
+ var cancels = make([]func(), len(t.me.Servers))
+ ch := make(chan tryResult, len(t.me.Servers))
+
+ for i, s := range t.me.Servers {
+ callCtx, cancel := context.WithTimeout(ctx, callTimeout)
+ cancels[i] = cancel
+
+ vlog.VI(2).Infof("globAtServer: Trying %d %q", i, s.Server)
+ go func(callCtx *context.T, i int, s naming.MountedServer) {
+ call, err := client.StartCall(callCtx, naming.JoinAddressName(s.Server, ""), ipc.GlobMethod, []interface{}{pstr}, options.NoResolve{})
+ ch <- tryResult{i, call, err}
+ }(callCtx, i, s)
+ }
+ var call ipc.Call
+ // Wait for the first successful StartCall.
+ for range t.me.Servers {
+ result := <-ch
+ if result.err != nil {
+ lastErr = result.err
+ continue
}
-
- // At this point we're commited to a server since it answered the call. Cycle
- // through all replies from that server.
- for {
- // If the mount table returns an error, we're done. Send the task to the channel
- // including the error. This terminates the task.
- var e naming.VDLMountEntry
- err := call.Recv(&e)
- if err == io.EOF {
- break
- }
- if err != nil {
- t.me.Error = err
- return
- }
-
- // Convert to the ever so slightly different name.MountTable version of a MountEntry
- // and add it to the list.
- x := &task{
- me: &naming.MountEntry{
- Name: naming.Join(t.me.Name, e.Name),
- Servers: convertServers(e.Servers),
- },
- depth: t.depth + 1,
- }
- x.me.SetServesMountTable(e.MT)
-
- // x.depth is the number of servers we've walked through since we've gone
- // recursive (i.e. with pattern length of 0). Limit the depth of globs.
- // TODO(p): return an error?
- if t.pattern.Len() == 0 {
- if x.depth > ns.maxRecursiveGlobDepth {
- continue
- }
- }
- replies <- x
+ vlog.VI(2).Infof("globAtServer: Got successful call from %d %q", result.index, t.me.Servers[result.index].Server)
+ cancels[result.index] = nil
+ call = result.call
+ break
+ }
+ // Cancel all the other StartCalls
+ for i, cancel := range cancels {
+ if cancel != nil {
+ vlog.VI(2).Infof("globAtServer: Canceling call to %d %q", i, t.me.Servers[i].Server)
+ cancel()
}
-
- var globerr error
- if err := call.Finish(&globerr); err != nil {
- globerr = err
- }
- t.me.Error = globerr
+ }
+ if call == nil {
+ // No one answered.
+ t.me.Error = lastErr
return
}
- // Just soak up the last error (if any).
- t.me.Error = lastErr
+ // At this point we're commited to the server that answered the call
+ // first. Cycle through all replies from that server.
+ for {
+ // If the mount table returns an error, we're done. Send the task to the channel
+ // including the error. This terminates the task.
+ var e naming.VDLMountEntry
+ err := call.Recv(&e)
+ if err == io.EOF {
+ break
+ }
+ if err != nil {
+ t.me.Error = err
+ return
+ }
+
+ // Convert to the ever so slightly different name.MountTable version of a MountEntry
+ // and add it to the list.
+ x := &task{
+ me: &naming.MountEntry{
+ Name: naming.Join(t.me.Name, e.Name),
+ Servers: convertServers(e.Servers),
+ },
+ depth: t.depth + 1,
+ }
+ x.me.SetServesMountTable(e.MT)
+
+ // x.depth is the number of servers we've walked through since we've gone
+ // recursive (i.e. with pattern length of 0). Limit the depth of globs.
+ // TODO(p): return an error?
+ if t.pattern.Len() == 0 {
+ if x.depth > ns.maxRecursiveGlobDepth {
+ continue
+ }
+ }
+ replies <- x
+ }
+
+ var globerr error
+ if err := call.Finish(&globerr); err != nil {
+ globerr = err
+ }
+ t.me.Error = globerr
+ return
}
// depth returns the directory depth of a given name. It is used to pick off the unsatisfied part of the pattern.
diff --git a/runtimes/google/naming/namespace/namespace.go b/runtimes/google/naming/namespace/namespace.go
index 915b36d..21de030 100644
--- a/runtimes/google/naming/namespace/namespace.go
+++ b/runtimes/google/naming/namespace/namespace.go
@@ -160,7 +160,7 @@
// all operations against the mount table service use this fixed timeout for the
// time being.
-const callTimeout = 10 * time.Second
+const callTimeout = 30 * time.Second
// CacheCtl implements naming.Namespace.CacheCtl
func (ns *namespace) CacheCtl(ctls ...naming.CacheCtl) []naming.CacheCtl {