Merge "namespace: parallelize Glob."
diff --git a/runtimes/google/naming/namespace/glob.go b/runtimes/google/naming/namespace/glob.go
index 0b85a21..d79f099 100644
--- a/runtimes/google/naming/namespace/glob.go
+++ b/runtimes/google/naming/namespace/glob.go
@@ -1,7 +1,6 @@
 package namespace
 
 import (
-	"container/list"
 	"io"
 	"strings"
 
@@ -16,106 +15,197 @@
 	"v.io/core/veyron2/vlog"
 )
 
-const mountTableGlobReplyStreamLength = 100
-
-type queuedEntry struct {
-	me    *naming.MountEntry
-	depth int // number of mount tables traversed recursively
+// task is a sub-glob that has to be performed against a mount table.  Tasks are
+// done in parrallel to speed up the glob.
+type task struct {
+	pattern *glob.Glob         // pattern to match
+	me      *naming.MountEntry // server to match at
+	depth   int                // number of mount tables traversed recursively
 }
 
-// globAtServer performs a Glob at a single server and adds any results to the list.  Paramters are:
-//   server    the server to perform the glob at.  This may include multiple names for different
-//             instances of the same server.
-//   pelems    the pattern to match relative to the mounted subtree.
-//   l         the list to add results to.
-//   recursive true to continue below the matched pattern
-func (ns *namespace) globAtServer(ctx *context.T, qe *queuedEntry, pattern *glob.Glob, l *list.List) error {
-	server := qe.me
+// globAtServer performs a Glob on the servers at a mount point.  It cycles through the set of
+// servers until it finds one that replies.
+func (ns *namespace) globAtServer(ctx *context.T, t *task, replies chan *task) {
+	defer func() {
+		if t.me.Error == nil {
+			replies <- nil
+		} else {
+			replies <- t
+		}
+	}()
 	client := veyron2.GetClient(ctx)
-	pstr := pattern.String()
-	vlog.VI(2).Infof("globAtServer(%v, %v)", *server, pstr)
+	pstr := t.pattern.String()
+	vlog.VI(2).Infof("globAtServer(%v, %v)", *t.me, pstr)
 
+	// We collect errors trying to connect to servers so that we have something to
+	// return if we go through them all and noone answers.
 	var lastErr error
+
 	// Trying each instance till we get one that works.
-	for _, s := range server.Servers {
-		// If the pattern is finished (so we're only querying about the root on the
-		// remote server) and the server is not another MT, then we needn't send the
-		// query on since we know the server will not supply a new address for the
-		// current name.
-		if pattern.Finished() {
-			if !server.ServesMountTable() {
-				return nil
-			}
-		}
-
-		// If this is restricted recursive and not a mount table, don't
-		// descend into it.
-		if pattern.Restricted() && !server.ServesMountTable() && pattern.Len() == 0 {
-			return nil
-		}
-
+	for _, s := range t.me.Servers {
 		// Don't further resolve s.Server.
 		callCtx, _ := context.WithTimeout(ctx, callTimeout)
-		// Make sure that we turn the s.Server address or name into a rooted name.
-		serverName := naming.JoinAddressName(s.Server, "")
-		call, err := client.StartCall(callCtx, serverName, ipc.GlobMethod, []interface{}{pstr}, options.NoResolve{})
+		call, err := client.StartCall(callCtx, naming.JoinAddressName(s.Server, ""), ipc.GlobMethod, []interface{}{pstr}, options.NoResolve{})
 		if err != nil {
 			lastErr = err
 			continue // try another instance
 		}
 
-		// At this point we're commited to a server since it answered tha call.
+		// At this point we're commited to a server since it answered the call.   Cycle
+		// through all replies from that server.
 		for {
+			// If the mount table returns an error, we're done.  Send the task to the channel
+			// including the error.  This terminates the task.
 			var e naming.VDLMountEntry
 			err := call.Recv(&e)
 			if err == io.EOF {
 				break
 			}
 			if err != nil {
-				return err
+				t.me.Error = err
+				return
 			}
 
-			// Prefix the results with the path of the mount point.
-			e.Name = naming.Join(server.Name, e.Name)
-
 			// Convert to the ever so slightly different name.MountTable version of a MountEntry
 			// and add it to the list.
-			x := &queuedEntry{
+			x := &task{
 				me: &naming.MountEntry{
-					Name:    e.Name,
+					Name:    naming.Join(t.me.Name, e.Name),
 					Servers: convertServers(e.Servers),
 				},
-				depth: qe.depth,
+				depth: t.depth + 1,
 			}
 			x.me.SetServesMountTable(e.MT)
-			// x.depth is the number of severs we've walked through since we've gone
-			// recursive (i.e. with pattern length of 0).
-			if pattern.Len() == 0 {
-				if x.depth++; x.depth > ns.maxRecursiveGlobDepth {
+
+			// x.depth is the number of servers we've walked through since we've gone
+			// recursive (i.e. with pattern length of 0).  Limit the depth of globs.
+			// TODO(p): return an error?
+			if t.pattern.Len() == 0 {
+				if x.depth > ns.maxRecursiveGlobDepth {
 					continue
 				}
 			}
-			l.PushBack(x)
+			replies <- x
 		}
 
 		var globerr error
 		if err := call.Finish(&globerr); err != nil {
-			return err
+			globerr = err
 		}
-		return globerr
+		t.me.Error = globerr
+		return
 	}
 
-	return lastErr
+	// Just soak up the last error (if any).
+	t.me.Error = lastErr
+}
+
+// depth returns the directory depth of a given name.  It is used to pick off the unsatisfied part of the pattern.
+func depth(name string) int {
+	name = strings.Trim(naming.Clean(name), "/")
+	if name == "" {
+		return 0
+	}
+	return strings.Count(name, "/") + 1
+}
+
+// globLoop fires off a go routine for each server and read backs replies.
+func (ns *namespace) globLoop(ctx *context.T, e *naming.MountEntry, prefix string, pattern *glob.Glob, reply chan naming.MountEntry) {
+	defer close(reply)
+
+	// Provide enough buffers to avoid too much switching between the readers and the writers.
+	// This size is just a guess.
+	replies := make(chan *task, 100)
+	defer close(replies)
+
+	// Push the first task into the channel to start the ball rolling.  This task has the
+	// root of the search and the full pattern.  It will be the first task fired off in the for
+	// loop that follows.
+	replies <- &task{me: e, pattern: pattern}
+	inFlight := 0
+
+	// Perform a parallel search of the name graph.  Each task will send what it learns
+	// on the replies channel.  If the reply is a mount point and the pattern is not completely
+	// fulfilled, a new task will be fired off to handle it.
+	for {
+		select {
+		case t := <-replies:
+			// A nil reply represents a successfully terminated task.
+			// If no tasks are running, return.
+			if t == nil {
+				if inFlight--; inFlight <= 0 {
+					return
+				}
+				continue
+			}
+
+			// We want to output this entry if there was a real error other than
+			// "not a mount table".
+			// TODO(p): return errors on a different reply channel?
+			//
+			// An error reply is also a terminated task.
+			// If no tasks are running, return.
+			if t.me.Error != nil {
+				if !notAnMT(t.me.Error) {
+					t.me.Name = naming.Join(prefix, t.me.Name)
+					reply <- *t.me
+				}
+				if inFlight--; inFlight <= 0 {
+					return
+				}
+				continue
+			}
+
+			// Get the pattern elements below the current path.
+			suffix := pattern.Split(depth(t.me.Name))
+
+			// If we've satisfied the request and this isn't the root,
+			// reply to the caller.
+			if suffix.Len() == 0 && t.depth != 0 {
+				x := *t.me
+				x.Name = naming.Join(prefix, x.Name)
+				reply <- x
+			}
+
+			// If the pattern is finished (so we're only querying about the root on the
+			// remote server) and the server is not another MT, then we needn't send the
+			// query on since we know the server will not supply a new address for the
+			// current name.
+			if suffix.Finished() {
+				if !t.me.ServesMountTable() {
+					continue
+				}
+			}
+
+			// If this is restricted recursive and not a mount table, don't descend into it.
+			if suffix.Restricted() && suffix.Len() == 0 && !t.me.ServesMountTable() {
+				continue
+			}
+
+			// Perform a glob at the next server.
+			inFlight++
+			t.pattern = suffix
+			go ns.globAtServer(ctx, t, replies)
+		}
+	}
 }
 
 // Glob implements naming.MountTable.Glob.
 func (ns *namespace) Glob(ctx *context.T, pattern string) (chan naming.MountEntry, error) {
 	defer vlog.LogCall()()
+
+	// Root the pattern.  If we have no servers to query, give up.
 	e, patternWasRooted := ns.rootMountEntry(pattern)
 	if len(e.Servers) == 0 {
 		return nil, verror.Make(naming.ErrNoMountTable, ctx)
 	}
 
+	// If the name doesn't parse, give up.
+	g, err := glob.Parse(e.Name)
+	if err != nil {
+		return nil, err
+	}
+
 	// If pattern was already rooted, make sure we tack that root
 	// onto all returned names.  Otherwise, just return the relative
 	// name.
@@ -123,62 +213,8 @@
 	if patternWasRooted {
 		prefix = e.Servers[0].Server
 	}
-	g, err := glob.Parse(e.Name)
-	if err != nil {
-		return nil, err
-	}
 	e.Name = ""
 	reply := make(chan naming.MountEntry, 100)
 	go ns.globLoop(ctx, e, prefix, g, reply)
 	return reply, nil
 }
-
-// depth returns the directory depth of a given name.
-func depth(name string) int {
-	name = strings.Trim(naming.Clean(name), "/")
-	if name == "" {
-		return 0
-	}
-	return strings.Count(name, "/") + 1
-}
-
-func (ns *namespace) globLoop(ctx *context.T, e *naming.MountEntry, prefix string, pattern *glob.Glob, reply chan naming.MountEntry) {
-	defer close(reply)
-
-	// As we encounter new mount tables while traversing the Glob, we add them to the list 'l'.  The loop below
-	// traverses this list removing a mount table each time and calling globAtServer to perform a glob at that
-	// server.  globAtServer will send on 'reply' any terminal entries that match the glob and add any new mount
-	// tables to be traversed to the list 'l'.
-	l := list.New()
-	l.PushBack(&queuedEntry{me: e})
-	atRoot := true
-
-	// Perform a breadth first search of the name graph.
-	for le := l.Front(); le != nil; le = l.Front() {
-		l.Remove(le)
-		e := le.Value.(*queuedEntry)
-
-		// Get the pattern elements below the current path.
-		suffix := pattern.Split(depth(e.me.Name))
-
-		// Perform a glob at the server.
-		err := ns.globAtServer(ctx, e, suffix, l)
-
-		// We want to output this entry if:
-		// 1. There was a real error, we return whatever name gave us the error.
-		if err != nil && !notAnMT(err) {
-			x := *e.me
-			x.Name = naming.Join(prefix, x.Name)
-			x.Error = err
-			reply <- x
-		}
-
-		// 2. The current name fullfills the pattern.
-		if suffix.Len() == 0 && !atRoot {
-			x := *e.me
-			x.Name = naming.Join(prefix, x.Name)
-			reply <- x
-		}
-		atRoot = false
-	}
-}