Add CacheCtl to naming.Namespace.

Change-Id: Id7f17c95cd02208fe8613b91d706b0af7a01b82c
diff --git a/runtimes/google/ipc/full_test.go b/runtimes/google/ipc/full_test.go
index d62f1cc..72859c4 100644
--- a/runtimes/google/ipc/full_test.go
+++ b/runtimes/google/ipc/full_test.go
@@ -246,6 +246,10 @@
 	return false
 }
 
+func (ns *namespace) CacheCtl(ctls ...naming.CacheCtl) []naming.CacheCtl {
+	return nil
+}
+
 func (ns *namespace) Glob(ctx context.T, pattern string) (chan naming.MountEntry, error) {
 	panic("Glob not implemented")
 	return nil, nil
diff --git a/runtimes/google/naming/namespace/cache.go b/runtimes/google/naming/namespace/cache.go
index f4ed5a9..1229ee4 100644
--- a/runtimes/google/naming/namespace/cache.go
+++ b/runtimes/google/naming/namespace/cache.go
@@ -17,15 +17,23 @@
 // cacheHisteresisSize is how much we back off to if the cache gets filled up.
 const cacheHisteresisSize = (3 * maxCacheEntries) / 4
 
-type cache struct {
+// cache is a generic interface to the resolution cache.
+type cache interface {
+	remember(prefix string, servers []mountedServer)
+	forget(names []string)
+	lookup(name string) ([]mountedServer, string)
+}
+
+// ttlCache is an instance of cache that obeys ttl from the mount points.
+type ttlCache struct {
 	sync.Mutex
 	epochStart time.Time
 	entries    map[string][]mountedServer
 }
 
-// newCache creates a empty cache.
-func newCache() *cache {
-	return &cache{epochStart: time.Now(), entries: make(map[string][]mountedServer)}
+// newTTLCache creates an empty ttlCache.
+func newTTLCache() cache {
+	return &ttlCache{epochStart: time.Now(), entries: make(map[string][]mountedServer)}
 }
 
 func isStale(now uint32, servers []mountedServer) bool {
@@ -37,13 +45,22 @@
 	return false
 }
 
+// normalize removes any single trailing slash.  Added for idiots who seem to
+// like adding trailing slashes.
+func normalize(name string) string {
+	if strings.HasSuffix(name, "//") {
+		return name
+	}
+	return strings.TrimSuffix(name, "/")
+}
+
 // esecs returns seconds since start of this cache's epoch.
-func (c *cache) esecs() uint32 {
+func (c *ttlCache) esecs() uint32 {
 	return uint32(time.Since(c.epochStart).Seconds())
 }
 
 // randomDrop randomly removes one cache entry.  Assumes we've already locked the cache.
-func (c *cache) randomDrop() {
+func (c *ttlCache) randomDrop() {
 	n := rand.Intn(len(c.entries))
 	for k := range c.entries {
 		if n == 0 {
@@ -55,7 +72,7 @@
 }
 
 // cleaner reduces the number of entries.  Assumes we've already locked the cache.
-func (c *cache) cleaner() {
+func (c *ttlCache) cleaner() {
 	// First dump any stale entries.
 	now := c.esecs()
 	for k, v := range c.entries {
@@ -74,7 +91,8 @@
 }
 
 // remember the servers associated with name with suffix removed.
-func (c *cache) remember(prefix string, servers []mountedServer) {
+func (c *ttlCache) remember(prefix string, servers []mountedServer) {
+	prefix = normalize(prefix)
 	for i := range servers {
 		// Remember when this cached entry times out relative to our epoch.
 		servers[i].TTL += c.esecs()
@@ -92,11 +110,12 @@
 
 // forget cache entries whose index begins with an element of names.  If names is nil
 // forget all cached entries.
-func (c *cache) forget(names []string) {
+func (c *ttlCache) forget(names []string) {
 	c.Lock()
 	defer c.Unlock()
 	for key := range c.entries {
 		for _, n := range names {
+			n = normalize(n)
 			if strings.HasPrefix(key, n) {
 				delete(c.entries, key)
 				break
@@ -108,7 +127,8 @@
 // lookup searches the cache for a maximal prefix of name and returns the associated servers,
 // prefix, and suffix.  If any of the associated servers is past its TTL, don't return anything
 // since that would reduce availability.
-func (c *cache) lookup(name string) ([]mountedServer, string) {
+func (c *ttlCache) lookup(name string) ([]mountedServer, string) {
+	name = normalize(name)
 	c.Lock()
 	defer c.Unlock()
 	now := c.esecs()
@@ -146,3 +166,11 @@
 	}
 	return "", naming.Join(prefix, suffix)
 }
+
+// nullCache is an instance of cache that does nothing.
+type nullCache int
+
+func newNullCache() cache                                         { return nullCache(1) }
+func (nullCache) remember(prefix string, servers []mountedServer) {}
+func (nullCache) forget(names []string)                           {}
+func (nullCache) lookup(name string) ([]mountedServer, string)    { return nil, "" }
diff --git a/runtimes/google/naming/namespace/cache_test.go b/runtimes/google/naming/namespace/cache_test.go
index a78c1c7..3a2df2f 100644
--- a/runtimes/google/naming/namespace/cache_test.go
+++ b/runtimes/google/naming/namespace/cache_test.go
@@ -25,7 +25,7 @@
 		{"/h2//c/d", "d", "/h3"},
 		{"/h3//d", "", "/h4:1234"},
 	}
-	c := newCache()
+	c := newTTLCache()
 	for _, p := range preload {
 		c.remember(naming.TrimSuffix(p.name, p.suffix), []mountedServer{mountedServer{Server: p.server, TTL: 30}})
 	}
@@ -51,7 +51,7 @@
 }
 
 func TestCacheLimit(t *testing.T) {
-	c := newCache()
+	c := newTTLCache().(*ttlCache)
 	servers := []mountedServer{mountedServer{Server: "the rain in spain", TTL: 3000}}
 	for i := 0; i < maxCacheEntries; i++ {
 		c.remember(fmt.Sprintf("%d", i), servers)
@@ -67,7 +67,7 @@
 }
 
 func TestCacheTTL(t *testing.T) {
-	c := newCache()
+	c := newTTLCache().(*ttlCache)
 	// Fill cache.
 	servers := []mountedServer{mountedServer{Server: "the rain in spain", TTL: 3000}}
 	for i := 0; i < maxCacheEntries; i++ {
@@ -99,7 +99,7 @@
 		{"/h3//d", "/h4:1234"},
 	}
 	ns, _ := New(nil)
-	c := ns.resolutionCache
+	c := ns.resolutionCache.(*ttlCache)
 	for _, p := range preload {
 		c.remember(p.name, []mountedServer{mountedServer{Server: p.server, TTL: 3000}})
 	}
@@ -129,3 +129,50 @@
 		t.Errorf("%s flushed too many entries", toflush)
 	}
 }
+
+func disabled(ctls []naming.CacheCtl) bool {
+	for _, c := range ctls {
+		switch v := c.(type) {
+		case naming.DisableCache:
+			if v {
+				return true
+			}
+		}
+	}
+	return false
+}
+
+func TestCacheDisableEnable(t *testing.T) {
+	ns, _ := New(nil)
+
+	// Default should be working resolution cache.
+	name := "/h1//a"
+	serverName := "/h2//"
+	c := ns.resolutionCache.(*ttlCache)
+	c.remember(name, []mountedServer{mountedServer{Server: serverName, TTL: 3000}})
+	if servers, _ := c.lookup(name); servers == nil || servers[0].Server != serverName {
+		t.Errorf("should have found the server in the cache")
+	}
+
+	// Turn off the resolution cache.
+	ctls := ns.CacheCtl(naming.DisableCache(true))
+	if !disabled(ctls) {
+		t.Errorf("caching not disabled")
+	}
+	nc := ns.resolutionCache.(nullCache)
+	nc.remember(name, []mountedServer{mountedServer{Server: serverName, TTL: 3000}})
+	if servers, _ := nc.lookup(name); servers != nil {
+		t.Errorf("should not have found the server in the cache")
+	}
+
+	// Turn on the resolution cache.
+	ctls = ns.CacheCtl(naming.DisableCache(false))
+	if disabled(ctls) {
+		t.Errorf("caching disabled")
+	}
+	c = ns.resolutionCache.(*ttlCache)
+	c.remember(name, []mountedServer{mountedServer{Server: serverName, TTL: 3000}})
+	if servers, _ := c.lookup(name); servers == nil || servers[0].Server != serverName {
+		t.Errorf("should have found the server in the cache")
+	}
+}
diff --git a/runtimes/google/naming/namespace/namespace.go b/runtimes/google/naming/namespace/namespace.go
index d3603b9..92bff25 100644
--- a/runtimes/google/naming/namespace/namespace.go
+++ b/runtimes/google/naming/namespace/namespace.go
@@ -12,7 +12,7 @@
 const defaultMaxResolveDepth = 32
 const defaultMaxRecursiveGlobDepth = 10
 
-// namespace is an implementation of naming.MountTable.
+// namespace is an implementation of naming.Namespace.
 type namespace struct {
 	sync.RWMutex
 	rt veyron2.Runtime
@@ -25,7 +25,7 @@
 	maxRecursiveGlobDepth int
 
 	// cache for name resolutions
-	resolutionCache *cache
+	resolutionCache cache
 }
 
 func rooted(names []string) bool {
@@ -52,11 +52,11 @@
 		roots:                 roots,
 		maxResolveDepth:       defaultMaxResolveDepth,
 		maxRecursiveGlobDepth: defaultMaxRecursiveGlobDepth,
-		resolutionCache:       newCache(),
+		resolutionCache:       newTTLCache(),
 	}, nil
 }
 
-// SetRoots implements naming.MountTable.SetRoots
+// SetRoots implements naming.Namespace.SetRoots
 func (ns *namespace) SetRoots(roots ...string) error {
 	if !rooted(roots) {
 		return badRoots(roots)
@@ -78,7 +78,7 @@
 	}
 }
 
-// Roots implements naming.MountTable.Roots
+// Roots implements naming.Namespace.Roots
 func (ns *namespace) Roots() []string {
 	ns.RLock()
 	defer ns.RUnlock()
@@ -124,3 +124,29 @@
 // all operations against the mount table service use this fixed timeout for the
 // time being.
 const callTimeout = 10 * time.Second
+
+// CacheCtl implements naming.Namespace.CacheCtl
+func (ns *namespace) CacheCtl(ctls ...naming.CacheCtl) []naming.CacheCtl {
+	for _, c := range ctls {
+		switch v := c.(type) {
+		case naming.DisableCache:
+			ns.Lock()
+			if _, isDisabled := ns.resolutionCache.(nullCache); isDisabled {
+				if !v {
+					ns.resolutionCache = newTTLCache()
+				}
+			} else {
+				if v {
+					ns.resolutionCache = newNullCache()
+				}
+			}
+			ns.Unlock()
+		}
+	}
+	ns.RLock()
+	defer ns.RUnlock()
+	if _, isDisabled := ns.resolutionCache.(nullCache); isDisabled {
+		return []naming.CacheCtl{naming.DisableCache(true)}
+	}
+	return nil
+}
diff --git a/services/mounttable/lib/mounttable_test.go b/services/mounttable/lib/mounttable_test.go
index 220f25e..85cd332 100644
--- a/services/mounttable/lib/mounttable_test.go
+++ b/services/mounttable/lib/mounttable_test.go
@@ -100,6 +100,10 @@
 	return false
 }
 
+func (stupidNS) CacheCtl(ctls ...naming.CacheCtl) []naming.CacheCtl {
+	return nil
+}
+
 // Glob implements naming.MountTable.Glob.
 func (stupidNS) Glob(ctx context.T, pattern string) (chan naming.MountEntry, error) {
 	return nil, errors.New("Glob is not implemented in this MountTable")