Merge "namespace: parallelize Glob."
diff --git a/runtimes/google/naming/namespace/glob.go b/runtimes/google/naming/namespace/glob.go
index 0b85a21..d79f099 100644
--- a/runtimes/google/naming/namespace/glob.go
+++ b/runtimes/google/naming/namespace/glob.go
@@ -1,7 +1,6 @@
package namespace
import (
- "container/list"
"io"
"strings"
@@ -16,106 +15,197 @@
"v.io/core/veyron2/vlog"
)
-const mountTableGlobReplyStreamLength = 100
-
-type queuedEntry struct {
- me *naming.MountEntry
- depth int // number of mount tables traversed recursively
+// task is a sub-glob that has to be performed against a mount table. Tasks are
+// done in parrallel to speed up the glob.
+type task struct {
+ pattern *glob.Glob // pattern to match
+ me *naming.MountEntry // server to match at
+ depth int // number of mount tables traversed recursively
}
-// globAtServer performs a Glob at a single server and adds any results to the list. Paramters are:
-// server the server to perform the glob at. This may include multiple names for different
-// instances of the same server.
-// pelems the pattern to match relative to the mounted subtree.
-// l the list to add results to.
-// recursive true to continue below the matched pattern
-func (ns *namespace) globAtServer(ctx *context.T, qe *queuedEntry, pattern *glob.Glob, l *list.List) error {
- server := qe.me
+// globAtServer performs a Glob on the servers at a mount point. It cycles through the set of
+// servers until it finds one that replies.
+func (ns *namespace) globAtServer(ctx *context.T, t *task, replies chan *task) {
+ defer func() {
+ if t.me.Error == nil {
+ replies <- nil
+ } else {
+ replies <- t
+ }
+ }()
client := veyron2.GetClient(ctx)
- pstr := pattern.String()
- vlog.VI(2).Infof("globAtServer(%v, %v)", *server, pstr)
+ pstr := t.pattern.String()
+ vlog.VI(2).Infof("globAtServer(%v, %v)", *t.me, pstr)
+ // We collect errors trying to connect to servers so that we have something to
+ // return if we go through them all and noone answers.
var lastErr error
+
// Trying each instance till we get one that works.
- for _, s := range server.Servers {
- // If the pattern is finished (so we're only querying about the root on the
- // remote server) and the server is not another MT, then we needn't send the
- // query on since we know the server will not supply a new address for the
- // current name.
- if pattern.Finished() {
- if !server.ServesMountTable() {
- return nil
- }
- }
-
- // If this is restricted recursive and not a mount table, don't
- // descend into it.
- if pattern.Restricted() && !server.ServesMountTable() && pattern.Len() == 0 {
- return nil
- }
-
+ for _, s := range t.me.Servers {
// Don't further resolve s.Server.
callCtx, _ := context.WithTimeout(ctx, callTimeout)
- // Make sure that we turn the s.Server address or name into a rooted name.
- serverName := naming.JoinAddressName(s.Server, "")
- call, err := client.StartCall(callCtx, serverName, ipc.GlobMethod, []interface{}{pstr}, options.NoResolve{})
+ call, err := client.StartCall(callCtx, naming.JoinAddressName(s.Server, ""), ipc.GlobMethod, []interface{}{pstr}, options.NoResolve{})
if err != nil {
lastErr = err
continue // try another instance
}
- // At this point we're commited to a server since it answered tha call.
+ // At this point we're commited to a server since it answered the call. Cycle
+ // through all replies from that server.
for {
+ // If the mount table returns an error, we're done. Send the task to the channel
+ // including the error. This terminates the task.
var e naming.VDLMountEntry
err := call.Recv(&e)
if err == io.EOF {
break
}
if err != nil {
- return err
+ t.me.Error = err
+ return
}
- // Prefix the results with the path of the mount point.
- e.Name = naming.Join(server.Name, e.Name)
-
// Convert to the ever so slightly different name.MountTable version of a MountEntry
// and add it to the list.
- x := &queuedEntry{
+ x := &task{
me: &naming.MountEntry{
- Name: e.Name,
+ Name: naming.Join(t.me.Name, e.Name),
Servers: convertServers(e.Servers),
},
- depth: qe.depth,
+ depth: t.depth + 1,
}
x.me.SetServesMountTable(e.MT)
- // x.depth is the number of severs we've walked through since we've gone
- // recursive (i.e. with pattern length of 0).
- if pattern.Len() == 0 {
- if x.depth++; x.depth > ns.maxRecursiveGlobDepth {
+
+ // x.depth is the number of servers we've walked through since we've gone
+ // recursive (i.e. with pattern length of 0). Limit the depth of globs.
+ // TODO(p): return an error?
+ if t.pattern.Len() == 0 {
+ if x.depth > ns.maxRecursiveGlobDepth {
continue
}
}
- l.PushBack(x)
+ replies <- x
}
var globerr error
if err := call.Finish(&globerr); err != nil {
- return err
+ globerr = err
}
- return globerr
+ t.me.Error = globerr
+ return
}
- return lastErr
+ // Just soak up the last error (if any).
+ t.me.Error = lastErr
+}
+
+// depth returns the directory depth of a given name. It is used to pick off the unsatisfied part of the pattern.
+func depth(name string) int {
+ name = strings.Trim(naming.Clean(name), "/")
+ if name == "" {
+ return 0
+ }
+ return strings.Count(name, "/") + 1
+}
+
+// globLoop fires off a go routine for each server and read backs replies.
+func (ns *namespace) globLoop(ctx *context.T, e *naming.MountEntry, prefix string, pattern *glob.Glob, reply chan naming.MountEntry) {
+ defer close(reply)
+
+ // Provide enough buffers to avoid too much switching between the readers and the writers.
+ // This size is just a guess.
+ replies := make(chan *task, 100)
+ defer close(replies)
+
+ // Push the first task into the channel to start the ball rolling. This task has the
+ // root of the search and the full pattern. It will be the first task fired off in the for
+ // loop that follows.
+ replies <- &task{me: e, pattern: pattern}
+ inFlight := 0
+
+ // Perform a parallel search of the name graph. Each task will send what it learns
+ // on the replies channel. If the reply is a mount point and the pattern is not completely
+ // fulfilled, a new task will be fired off to handle it.
+ for {
+ select {
+ case t := <-replies:
+ // A nil reply represents a successfully terminated task.
+ // If no tasks are running, return.
+ if t == nil {
+ if inFlight--; inFlight <= 0 {
+ return
+ }
+ continue
+ }
+
+ // We want to output this entry if there was a real error other than
+ // "not a mount table".
+ // TODO(p): return errors on a different reply channel?
+ //
+ // An error reply is also a terminated task.
+ // If no tasks are running, return.
+ if t.me.Error != nil {
+ if !notAnMT(t.me.Error) {
+ t.me.Name = naming.Join(prefix, t.me.Name)
+ reply <- *t.me
+ }
+ if inFlight--; inFlight <= 0 {
+ return
+ }
+ continue
+ }
+
+ // Get the pattern elements below the current path.
+ suffix := pattern.Split(depth(t.me.Name))
+
+ // If we've satisfied the request and this isn't the root,
+ // reply to the caller.
+ if suffix.Len() == 0 && t.depth != 0 {
+ x := *t.me
+ x.Name = naming.Join(prefix, x.Name)
+ reply <- x
+ }
+
+ // If the pattern is finished (so we're only querying about the root on the
+ // remote server) and the server is not another MT, then we needn't send the
+ // query on since we know the server will not supply a new address for the
+ // current name.
+ if suffix.Finished() {
+ if !t.me.ServesMountTable() {
+ continue
+ }
+ }
+
+ // If this is restricted recursive and not a mount table, don't descend into it.
+ if suffix.Restricted() && suffix.Len() == 0 && !t.me.ServesMountTable() {
+ continue
+ }
+
+ // Perform a glob at the next server.
+ inFlight++
+ t.pattern = suffix
+ go ns.globAtServer(ctx, t, replies)
+ }
+ }
}
// Glob implements naming.MountTable.Glob.
func (ns *namespace) Glob(ctx *context.T, pattern string) (chan naming.MountEntry, error) {
defer vlog.LogCall()()
+
+ // Root the pattern. If we have no servers to query, give up.
e, patternWasRooted := ns.rootMountEntry(pattern)
if len(e.Servers) == 0 {
return nil, verror.Make(naming.ErrNoMountTable, ctx)
}
+ // If the name doesn't parse, give up.
+ g, err := glob.Parse(e.Name)
+ if err != nil {
+ return nil, err
+ }
+
// If pattern was already rooted, make sure we tack that root
// onto all returned names. Otherwise, just return the relative
// name.
@@ -123,62 +213,8 @@
if patternWasRooted {
prefix = e.Servers[0].Server
}
- g, err := glob.Parse(e.Name)
- if err != nil {
- return nil, err
- }
e.Name = ""
reply := make(chan naming.MountEntry, 100)
go ns.globLoop(ctx, e, prefix, g, reply)
return reply, nil
}
-
-// depth returns the directory depth of a given name.
-func depth(name string) int {
- name = strings.Trim(naming.Clean(name), "/")
- if name == "" {
- return 0
- }
- return strings.Count(name, "/") + 1
-}
-
-func (ns *namespace) globLoop(ctx *context.T, e *naming.MountEntry, prefix string, pattern *glob.Glob, reply chan naming.MountEntry) {
- defer close(reply)
-
- // As we encounter new mount tables while traversing the Glob, we add them to the list 'l'. The loop below
- // traverses this list removing a mount table each time and calling globAtServer to perform a glob at that
- // server. globAtServer will send on 'reply' any terminal entries that match the glob and add any new mount
- // tables to be traversed to the list 'l'.
- l := list.New()
- l.PushBack(&queuedEntry{me: e})
- atRoot := true
-
- // Perform a breadth first search of the name graph.
- for le := l.Front(); le != nil; le = l.Front() {
- l.Remove(le)
- e := le.Value.(*queuedEntry)
-
- // Get the pattern elements below the current path.
- suffix := pattern.Split(depth(e.me.Name))
-
- // Perform a glob at the server.
- err := ns.globAtServer(ctx, e, suffix, l)
-
- // We want to output this entry if:
- // 1. There was a real error, we return whatever name gave us the error.
- if err != nil && !notAnMT(err) {
- x := *e.me
- x.Name = naming.Join(prefix, x.Name)
- x.Error = err
- reply <- x
- }
-
- // 2. The current name fullfills the pattern.
- if suffix.Len() == 0 && !atRoot {
- x := *e.me
- x.Name = naming.Join(prefix, x.Name)
- reply <- x
- }
- atRoot = false
- }
-}