namespace: dedup glob replies.
Caveats:
1) it could eat up space
2) if the server names change significantly in the middle of the glob, we could
miss some dedups.
Change-Id: I200bb8312ea7d98722ee7c67d93f1899ec961786
diff --git a/profiles/internal/naming/namespace/all_test.go b/profiles/internal/naming/namespace/all_test.go
index 7ff78dc..c8981c9 100644
--- a/profiles/internal/naming/namespace/all_test.go
+++ b/profiles/internal/naming/namespace/all_test.go
@@ -574,8 +574,12 @@
boom(t, "Failed to detect cycle")
}
- // Perform the glob with a response length limit.
- doGlob(t, c, ns, "c1/...", 1000)
+ // Perform the glob with a response length limit and dup suppression. The dup supression
+ // should win.
+ r := doGlob(t, c, ns, "c1/...", 1000)
+ if len(r) != 6 {
+ t.Fatal("expected 6 replies, got %v", r)
+ }
}
// TestGoroutineLeaks tests for leaking goroutines - we have many:-(
diff --git a/profiles/internal/naming/namespace/glob.go b/profiles/internal/naming/namespace/glob.go
index 0af01c3..16cdc11 100644
--- a/profiles/internal/naming/namespace/glob.go
+++ b/profiles/internal/naming/namespace/glob.go
@@ -7,6 +7,7 @@
import (
"io"
"strings"
+ "sync"
"v.io/x/ref/lib/glob"
@@ -18,6 +19,25 @@
"v.io/x/lib/vlog"
)
+type tracks struct {
+ m sync.Mutex
+ places map[string]struct{}
+}
+
+func (tr *tracks) beenThereDoneThat(servers []string, pstr string) bool {
+ tr.m.Lock()
+ defer tr.m.Unlock()
+ found := false
+ for _, s := range servers {
+ x := s + "!" + pstr
+ if _, ok := tr.places[x]; ok {
+ found = true
+ }
+ tr.places[x] = struct{}{}
+ }
+ return found
+}
+
// task is a sub-glob that has to be performed against a mount table. Tasks are
// done in parrallel to speed up the glob.
type task struct {
@@ -30,7 +50,7 @@
// globAtServer performs a Glob on the servers at a mount point. It cycles through the set of
// servers until it finds one that replies.
-func (ns *namespace) globAtServer(ctx *context.T, t *task, replies chan *task) {
+func (ns *namespace) globAtServer(ctx *context.T, t *task, replies chan *task, tr *tracks) {
defer func() {
if t.error == nil {
replies <- nil
@@ -53,6 +73,13 @@
t.error = nil
return
}
+
+ // If we've been there before with the same request, give up.
+ if tr.beenThereDoneThat(servers, pstr) {
+ t.error = nil
+ return
+ }
+
call, err := ns.parallelStartCall(ctx, client, servers, rpc.GlobMethod, []interface{}{pstr})
if err != nil {
t.error = err
@@ -119,7 +146,7 @@
}
// globLoop fires off a go routine for each server and read backs replies.
-func (ns *namespace) globLoop(ctx *context.T, e *naming.MountEntry, prefix string, pattern *glob.Glob, reply chan interface{}) {
+func (ns *namespace) globLoop(ctx *context.T, e *naming.MountEntry, prefix string, pattern *glob.Glob, reply chan interface{}, tr *tracks) {
defer close(reply)
// Provide enough buffers to avoid too much switching between the readers and the writers.
@@ -201,7 +228,7 @@
// Perform a glob at the next server.
inFlight++
t.pattern = suffix
- go ns.globAtServer(ctx, t, replies)
+ go ns.globAtServer(ctx, t, replies, tr)
}
}
}
@@ -221,6 +248,8 @@
return nil, err
}
+ tr := &tracks{places: make(map[string]struct{})}
+
// If pattern was already rooted, make sure we tack that root
// onto all returned names. Otherwise, just return the relative
// name.
@@ -230,6 +259,6 @@
}
e.Name = ""
reply := make(chan interface{}, 100)
- go ns.globLoop(ctx, e, prefix, g, reply)
+ go ns.globLoop(ctx, e, prefix, g, reply, tr)
return reply, nil
}