Limit glob depth when doing a recursive glob.

Change-Id: Ie3b325015804ba85214bf4aaafc7392a762672df
diff --git a/runtimes/google/naming/namespace/all_test.go b/runtimes/google/naming/namespace/all_test.go
index e921c39..64d6743 100644
--- a/runtimes/google/naming/namespace/all_test.go
+++ b/runtimes/google/naming/namespace/all_test.go
@@ -36,7 +36,7 @@
 	}
 }
 
-func doGlob(t *testing.T, ctx context.T, ns naming.Namespace, pattern string) []string {
+func doGlob(t *testing.T, ctx context.T, ns naming.Namespace, pattern string, limit int) []string {
 	var replies []string
 	rc, err := ns.Glob(ctx, pattern)
 	if err != nil {
@@ -44,6 +44,9 @@
 	}
 	for s := range rc {
 		replies = append(replies, s.Name)
+		if limit > 0 && len(replies) > limit {
+			boom(t, "Glob returns too many results, perhaps not limiting recursion")
+		}
 	}
 	return replies
 }
@@ -381,10 +384,10 @@
 		{"*/f??/*z", []string{"mt4/foo/baz"}},
 	}
 	for _, test := range globTests {
-		out := doGlob(t, r, ns, test.pattern)
+		out := doGlob(t, r, ns, test.pattern, 0)
 		compare(t, "Glob", test.pattern, test.expected, out)
 		// Do the same with a full rooted name.
-		out = doGlob(t, r, ns, naming.JoinAddressName(root.name, test.pattern))
+		out = doGlob(t, r, ns, naming.JoinAddressName(root.name, test.pattern), 0)
 		var expectedWithRoot []string
 		for _, s := range test.expected {
 			expectedWithRoot = append(expectedWithRoot, naming.JoinAddressName(root.name, s))
@@ -394,8 +397,6 @@
 }
 
 func TestCycles(t *testing.T) {
-	t.Skip() // Remove when the bug is fixed.
-
 	sr := rt.Init()
 	r, _ := rt.New() // We use a different runtime for the client side.
 	defer r.Cleanup()
@@ -440,19 +441,8 @@
 		boom(t, "Failed to detect cycle")
 	}
 
-	// Remove the timeout when the bug is fixed, right now, this just
-	// finishes the test immediately. Add a comparison for the expected
-	// output from glob also.
-	ch := make(chan struct{})
-	go func() {
-		doGlob(t, r, ns, "c1/...")
-		close(ch)
-	}()
-	select {
-	case <-ch:
-	case <-time.After(time.Millisecond * 100):
-		t.Errorf("glob timedout")
-	}
+	// Perform the glob with a response length limit.
+	doGlob(t, r, ns, "c1/...", 1000)
 }
 
 func TestUnresolve(t *testing.T) {
@@ -469,7 +459,7 @@
 	ns := r.Namespace()
 	ns.SetRoots(root.name)
 
-	vlog.Infof("Glob: %v", doGlob(t, r, ns, "*"))
+	vlog.Infof("Glob: %v", doGlob(t, r, ns, "*", 0))
 	testResolve(t, r, ns, "joke1", jokes["joke1"].name)
 	testUnresolve(t, r, ns, "joke1", "")
 }
diff --git a/runtimes/google/naming/namespace/glob.go b/runtimes/google/naming/namespace/glob.go
index 2560bfb..68d491f 100644
--- a/runtimes/google/naming/namespace/glob.go
+++ b/runtimes/google/naming/namespace/glob.go
@@ -15,6 +15,11 @@
 
 const mountTableGlobReplyStreamLength = 100
 
+type queuedEntry struct {
+	me    *naming.MountEntry
+	depth int // number of mount tables traversed recursively
+}
+
 // globAtServer performs a Glob at a single server and adds any results to the list.  Paramters are:
 //   server    the server to perform the glob at.  This may include multiple names for different
 //             instances of the same server.
@@ -22,7 +27,8 @@
 //   l         the list to add results to.
 //   recursive true to continue below the matched pattern
 // We return a bool foundRoot which indicates whether the empty name "" was found on a target server.
-func (ns *namespace) globAtServer(ctx context.T, server *naming.MountEntry, pattern *glob.Glob, l *list.List) (bool, error) {
+func (ns *namespace) globAtServer(ctx context.T, qe *queuedEntry, pattern *glob.Glob, l *list.List) (bool, error) {
+	server := qe.me
 	pstr := pattern.String()
 	foundRoot := false
 	vlog.VI(2).Infof("globAtServer(%v, %v)", *server, pstr)
@@ -62,10 +68,21 @@
 
 				// Convert to the ever so slightly different name.MountTable version of a MountEntry
 				// and add it to the list.
-				l.PushBack(&naming.MountEntry{
-					Name:    e.Name,
-					Servers: convertServers(e.Servers),
-				})
+				x := &queuedEntry{
+					me: &naming.MountEntry{
+						Name:    e.Name,
+						Servers: convertServers(e.Servers),
+					},
+					depth: qe.depth,
+				}
+				// x.depth is the number of severs we've walked through since we've gone
+				// recursive (i.e. with pattern length of 0).
+				if pattern.Len() == 0 {
+					if x.depth++; x.depth > ns.maxRecursiveGlobDepth {
+						continue
+					}
+				}
+				l.PushBack(x)
 			}
 
 			if err := call.Finish(); err != nil {
@@ -135,16 +152,20 @@
 func (ns *namespace) globLoop(ctx context.T, servers []string, prefix string, pattern *glob.Glob, reply chan naming.MountEntry) {
 	defer close(reply)
 
+	// As we encounter new mount tables while traversing the Glob, we add them to the list 'l'.  The loop below
+	// traverses this list removing a mount table each time and calling globAtServer to perform a glob at that
+	// server.  globAtServer will send on 'reply' any terminal entries that match the glob and add any new mount
+	// tables to be traversed to the list 'l'.
 	l := list.New()
-	l.PushBack(&naming.MountEntry{Name: "", Servers: convertStringsToServers(servers)})
+	l.PushBack(&queuedEntry{me: &naming.MountEntry{Name: "", Servers: convertStringsToServers(servers)}})
 
 	// Perform a breadth first search of the name graph.
 	for le := l.Front(); le != nil; le = l.Front() {
 		l.Remove(le)
-		e := le.Value.(*naming.MountEntry)
+		e := le.Value.(*queuedEntry)
 
 		// Get the pattern elements below the current path.
-		suffix := pattern.Split(depth(e.Name))
+		suffix := pattern.Split(depth(e.me.Name))
 
 		// Perform a glob at the server.
 		foundRoot, err := ns.globAtServer(ctx, e, suffix, l)
@@ -152,7 +173,7 @@
 		// We want to output this entry if:
 		// 1. There was a real error, we return whatever name gave us the error.
 		if err != nil && !notAnMT(err) {
-			x := *e
+			x := *e.me
 			x.Name = naming.Join(prefix, x.Name)
 			x.Error = err
 			reply <- x
@@ -161,7 +182,7 @@
 		// 2. The current name fullfills the pattern and further servers did not respond
 		//    with "".  That is, we want to prefer foo/ over foo.
 		if suffix.Len() == 0 && !foundRoot {
-			x := *e
+			x := *e.me
 			x.Name = naming.Join(prefix, x.Name)
 			reply <- x
 		}
diff --git a/runtimes/google/naming/namespace/namespace.go b/runtimes/google/naming/namespace/namespace.go
index 16b4fdd..65e0eb5 100644
--- a/runtimes/google/naming/namespace/namespace.go
+++ b/runtimes/google/naming/namespace/namespace.go
@@ -9,6 +9,9 @@
 	"veyron2/verror"
 )
 
+const defaultMaxResolveDepth = 32
+const defaultMaxRecursiveGlobDepth = 10
+
 // namespace is an implementation of naming.MountTable.
 type namespace struct {
 	sync.RWMutex
@@ -16,6 +19,10 @@
 
 	// the default root servers for resolutions in this namespace.
 	roots []string
+
+	// depth limits
+	maxResolveDepth       int
+	maxRecursiveGlobDepth int
 }
 
 func rooted(names []string) bool {
@@ -37,7 +44,12 @@
 		return nil, badRoots(roots)
 	}
 	// A namespace with no roots can still be used for lookups of rooted names.
-	return &namespace{rt: rt, roots: roots}, nil
+	return &namespace{
+		rt:                    rt,
+		roots:                 roots,
+		maxResolveDepth:       defaultMaxResolveDepth,
+		maxRecursiveGlobDepth: defaultMaxRecursiveGlobDepth,
+	}, nil
 }
 
 // SetRoots implements naming.MountTable.SetRoots
@@ -52,6 +64,16 @@
 	return nil
 }
 
+// SetDepthLimits overrides the default limits.
+func (ns *namespace) SetDepthLimits(resolve, glob int) {
+	if resolve >= 0 {
+		ns.maxResolveDepth = resolve
+	}
+	if glob >= 0 {
+		ns.maxRecursiveGlobDepth = glob
+	}
+}
+
 // Roots implements naming.MountTable.Roots
 func (ns *namespace) Roots() []string {
 	ns.RLock()
diff --git a/runtimes/google/naming/namespace/resolve.go b/runtimes/google/naming/namespace/resolve.go
index 3043a31..4e1fdf4 100644
--- a/runtimes/google/naming/namespace/resolve.go
+++ b/runtimes/google/naming/namespace/resolve.go
@@ -11,8 +11,6 @@
 	"veyron2/vlog"
 )
 
-const maxDepth = 32
-
 func convertServersToStrings(servers []mountedServer, suffix string) (ret []string) {
 	for _, s := range servers {
 		ret = append(ret, naming.Join(s.Server, suffix))
@@ -85,7 +83,7 @@
 		return nil, naming.ErrNoMountTable
 	}
 	// Iterate walking through mount table servers.
-	for remaining := maxDepth; remaining > 0; remaining-- {
+	for remaining := ns.maxResolveDepth; remaining > 0; remaining-- {
 		vlog.VI(2).Infof("Resolve(%s) loop %s", name, names)
 		if terminal(names) {
 			vlog.VI(1).Infof("Resolve(%s) -> %s", name, names)
@@ -125,7 +123,7 @@
 		return nil, naming.ErrNoMountTable
 	}
 	last := names
-	for remaining := maxDepth; remaining > 0; remaining-- {
+	for remaining := ns.maxResolveDepth; remaining > 0; remaining-- {
 		vlog.VI(2).Infof("ResolveToMountTable(%s) loop %s", name, names)
 		var err error
 		curr := names
@@ -212,7 +210,7 @@
 	if err != nil {
 		return nil, err
 	}
-	for remaining := maxDepth; remaining > 0; remaining-- {
+	for remaining := ns.maxResolveDepth; remaining > 0; remaining-- {
 		vlog.VI(2).Infof("Unresolve loop %s", names)
 		curr := names
 		if names, err = unresolveAgainstServer(ctx, ns.rt.Client(), names); err != nil {