third_party/csrc/leveldb:  fix problems in leveldb's caching code

This change has been submitted upstream as CL 123247237.

Background:

LevelDB uses a cache (util/cache.h, util/cache.cc) of (key,value)
pairs for two purposes:
- a cache of (table, file handle) pairs
- a cache of blocks

The cache places the (key,value) pairs in a reference-counted
wrapper.  When it returns a value, it returns a reference to this
wrapper.  When the client has finished using the reference and
its enclosed (key,value), it calls Release() to decrement the
reference count.

Each (key,value) pair has an associated resource usage.  The
cache maintains the sum of the usages of the elements it holds,
and removes values as needed to keep the sum below a capacity
threshold.  It maintains an LRU list so that it will remove the
least-recently used elements first.

The max_open_files option to LevelDB sets the size of the cache
of (table, file handle) pairs.  The option is not used in any
other way.

The observed behaviour:

If LevelDB at any time used more file handles concurrently than
the cache size set via max_open_files, it attempted to reduce the
number by evicting entries from the table cache.  This could
happen most easily during compaction, and if max_open_files was
low.  Because the handles were in use, their reference count did
not drop to zero, and so the usage sum in the cache was not
modified by the evictions.  Subsequent Insert() calls returned
valid handles, but their entries were immediately evicted from
the cache, which though empty still acted as though full.  As a
result, there was effectively no caching, and the number of open
file handles rose rapidly until it hit system-imposed limits and
the process died.

If one set max_open_files lower, the cache was more likely to
exhibit this beahviour, and cause the process to run out of file
descriptors.  That is, max_open_files acted in almost exactly the
opposite manner from what was intended.

The problems:

1. The cache kept all elements on its LRU list eligible for capacity
   eviction---even those with outstanding references from clients.  This was
   ineffective in reducing resource consumption because there was an
   outstanding reference, guaranteeing that the items remained.  A secondary
   issue was that there is no guarantee that these in-use items will be the
   last things reached in the LRU chain, which actually recorded
   "least-recently requested" rather than "least-recently used".

2. The sum of usages was decremented not when a (key,value) was evicted from
   the cache, but when its reference count went to zero.  Thus, when things
   were removed from the cache, either by garbage collection or via Erase(),
   the usage sum was not necessarily decreased.  This allowed the cache to act
   as though full when it was in fact not, reducing caching effectiveness, and
   leading to more resources being consumed---the opposite of what the
   evictions were intended to achieve.

3. (minor) The cache's clients insert items into it by first looking up the
   key, and inserting only if no value is found.  Although the cache has an
   internal lock, the clients use no locking to ensure atomicity of the
   Lookup/Insert pair.  (see table/table.cc:  block_cache->Insert() and
   db/table_cache.cc:  cache_->Insert()).  Thus, if two threads Insert() at
   about the same time, they can both Lookup(), find nothing, and both
   Insert().  The second Insert() would evict the first value, leaving each
   thread with a handle on its own version of the data, and with the second
   version in the cache.  It would be better if both threads ended up with a
   handle on the same (key,value) pair, which implies it must be the first item
   inserted.  This suggests that Insert() should not replace an existing value.

   This can be made safe with current usage inside LeveDB itself, but this is
   not easy to change first because Cache is a public interface, so to change
   the semantics of an existing call might break things, second because Cache
   is an abstract virtual class, so adding a new abstract virtual method may
   break other implementations, and third, the new method "insert without
   replacing" cannot be implemented in terms of the existing methods, so cannot
   be implemented with a non-abstract default.   But fortunately, the effects
   of this issue are minor, so this issue is not fixed by this change.

The changes:

The assumption in the fixes is that it is always better to cache
entries unless removal from the cache would lead to deallocation.

Cache entries now have an "in_cache" boolean indicating whether
the cache has a reference on the entry.  The only ways that this can
become false without the entry being passed to its "deleter" are via
Erase(), via Insert() when an element with a duplicate key is inserted,
or on destruction of the cache.

The cache now keeps two linked lists instead of one.  All items
in the cache are in one list or the other, and never both.  Items
still referenced by clients but erased from the cache are in
neither list.  The lists are:
- in-use:  contains the items currently referenced by clients, in no particular
  order.  (This list is used for invariant checking.  If we removed the check,
  elements that would otherwise be on this list could be left as disconnected
  singleton lists.)
- LRU:  contains the items not currently referenced by clients, in LRU order

A new internal Ref() method increments the reference count.  If
incrementing from 1 to 2 for an item in the cache, it is moved
from the LRU list to the in-use list.

The Unref() call now moves things from the in-use list to the LRU
list if the reference count falls to 1, and the item is in the
cache.  It no longer adjusts the usage sum.  The usage sum now
reflects only what is in the cache, rather than including
still-referenced items that have been evicted.

The LRU_Append() now takes a "list" parameter so that it can be
used to append either to the LRU list or the in-use list.

Lookup() is modified to use the new Ref() call, rather than
adjusting the reference count and LRU chain directly.

Insert() eviction code is also modified to adjust the usage sum and the
in_cache boolean of the evicted elements.  Some LevelDB tests assume that there
will be no caching whatsoever if the cache size is set to zero, so this is
handled as a special case.

A new private method FinishErase() is factored out
with the common code from where items are removed from the cache.

Erase() is modified to adjust the usage sum and the in_cache
boolean of the erased elements, and to use FinishErase().

Prune() is modified to use FinishErase() also, and to make use of the fact that
the lru_ list now contains only items with reference count 1.

- EvictionPolicy is modified to test that an entry with an
outstanding handle is not evicted.  This test fails with the old cache.cc.

- A new test case UseExceedsCacheSize verifies that even when the
cache is overfull of entries with outstanding handles, none are
evicted.  This test fails with the old cache.cc, and is the key
issue that causes file descriptors to run out when the cache
size is set too small.

Change-Id: I58a1b27a0b07d338e24b07089d1579faeb68daad
diff --git a/csrc/leveldb/util/cache.cc b/csrc/leveldb/util/cache.cc
index 8b197bc..5b26772 100644
--- a/csrc/leveldb/util/cache.cc
+++ b/csrc/leveldb/util/cache.cc
@@ -19,6 +19,23 @@
 namespace {
 
 // LRU cache implementation
+//
+// Cache entries have an "in_cache" boolean indicating whether the cache has a
+// reference on the entry.  The only ways that this can become false without the
+// entry being passed to its "deleter" are via Erase(), via Insert() when
+// an element with a duplicate key is inserted, or on destruction of the cache.
+//
+// The cache keeps two linked lists of items in the cache.  All items in the
+// cache are in one list or the other, and never both.  Items still referenced
+// by clients but erased from the cache are in neither list.  The lists are:
+// - in-use:  contains the items currently referenced by clients, in no
+//   particular order.  (This list is used for invariant checking.  If we
+//   removed the check, elements that would otherwise be on this list could be
+//   left as disconnected singleton lists.)
+// - LRU:  contains the items not currently referenced by clients, in LRU order
+// Elements are moved between these lists by the Ref() and Unref() methods,
+// when they detect an element in the cache acquiring or losing its only
+// external reference.
 
 // An entry is a variable length heap-allocated structure.  Entries
 // are kept in a circular doubly linked list ordered by access time.
@@ -30,7 +47,8 @@
   LRUHandle* prev;
   size_t charge;      // TODO(opt): Only allow uint32_t?
   size_t key_length;
-  uint32_t refs;
+  bool in_cache;      // Whether entry is in the cache.
+  uint32_t refs;      // References, including cache reference, if present.
   uint32_t hash;      // Hash of key(); used for fast sharding and comparisons
   char key_data[1];   // Beginning of key
 
@@ -150,8 +168,10 @@
 
  private:
   void LRU_Remove(LRUHandle* e);
-  void LRU_Append(LRUHandle* e);
+  void LRU_Append(LRUHandle*list, LRUHandle* e);
+  void Ref(LRUHandle* e);
   void Unref(LRUHandle* e);
+  bool FinishErase(LRUHandle* e);
 
   // Initialized before use.
   size_t capacity_;
@@ -162,34 +182,55 @@
 
   // Dummy head of LRU list.
   // lru.prev is newest entry, lru.next is oldest entry.
+  // Entries have refs==1 and in_cache==true.
   LRUHandle lru_;
 
+  // Dummy head of in-use list.
+  // Entries are in use by clients, and have refs >= 2 and in_cache==true.
+  LRUHandle in_use_;
+
   HandleTable table_;
 };
 
 LRUCache::LRUCache()
     : usage_(0) {
-  // Make empty circular linked list
+  // Make empty circular linked lists.
   lru_.next = &lru_;
   lru_.prev = &lru_;
+  in_use_.next = &in_use_;
+  in_use_.prev = &in_use_;
 }
 
 LRUCache::~LRUCache() {
+  assert(in_use_.next == &in_use_);  // Error if caller has an unreleased handle
   for (LRUHandle* e = lru_.next; e != &lru_; ) {
     LRUHandle* next = e->next;
-    assert(e->refs == 1);  // Error if caller has an unreleased handle
+    assert(e->in_cache);
+    e->in_cache = false;
+    assert(e->refs == 1);  // Invariant of lru_ list.
     Unref(e);
     e = next;
   }
 }
 
+void LRUCache::Ref(LRUHandle* e) {
+  if (e->refs == 1 && e->in_cache) {  // If on lru_ list, move to in_use_ list.
+    LRU_Remove(e);
+    LRU_Append(&in_use_, e);
+  }
+  e->refs++;
+}
+
 void LRUCache::Unref(LRUHandle* e) {
   assert(e->refs > 0);
   e->refs--;
-  if (e->refs <= 0) {
-    usage_ -= e->charge;
+  if (e->refs == 0) { // Deallocate.
+    assert(!e->in_cache);
     (*e->deleter)(e->key(), e->value);
     free(e);
+  } else if (e->in_cache && e->refs == 1) {  // No longer in use; move to lru_ list.
+    LRU_Remove(e);
+    LRU_Append(&lru_, e);
   }
 }
 
@@ -198,10 +239,10 @@
   e->prev->next = e->next;
 }
 
-void LRUCache::LRU_Append(LRUHandle* e) {
-  // Make "e" newest entry by inserting just before lru_
-  e->next = &lru_;
-  e->prev = lru_.prev;
+void LRUCache::LRU_Append(LRUHandle* list, LRUHandle* e) {
+  // Make "e" newest entry by inserting just before *list
+  e->next = list;
+  e->prev = list->prev;
   e->prev->next = e;
   e->next->prev = e;
 }
@@ -210,9 +251,7 @@
   MutexLock l(&mutex_);
   LRUHandle* e = table_.Lookup(key, hash);
   if (e != NULL) {
-    e->refs++;
-    LRU_Remove(e);
-    LRU_Append(e);
+    Ref(e);
   }
   return reinterpret_cast<Cache::Handle*>(e);
 }
@@ -234,34 +273,46 @@
   e->charge = charge;
   e->key_length = key.size();
   e->hash = hash;
-  e->refs = 2;  // One from LRUCache, one for the returned handle
+  e->in_cache = false;
+  e->refs = 1;  // for the returned handle.
   memcpy(e->key_data, key.data(), key.size());
-  LRU_Append(e);
-  usage_ += charge;
 
-  LRUHandle* old = table_.Insert(e);
-  if (old != NULL) {
-    LRU_Remove(old);
-    Unref(old);
-  }
+  if (capacity_ > 0) {
+    e->refs++;  // for the cache's reference.
+    e->in_cache = true;
+    LRU_Append(&in_use_, e);
+    usage_ += charge;
+    FinishErase(table_.Insert(e));
+  } // else don't cache.  (Tests use capacity_==0 to turn off caching.)
 
   while (usage_ > capacity_ && lru_.next != &lru_) {
     LRUHandle* old = lru_.next;
-    LRU_Remove(old);
-    table_.Remove(old->key(), old->hash);
-    Unref(old);
+    assert(old->refs == 1);
+    bool erased = FinishErase(table_.Remove(old->key(), old->hash));
+    if (!erased) {  // to avoid unused variable when compiled NDEBUG
+      assert(erased);
+    }
   }
 
   return reinterpret_cast<Cache::Handle*>(e);
 }
 
-void LRUCache::Erase(const Slice& key, uint32_t hash) {
-  MutexLock l(&mutex_);
-  LRUHandle* e = table_.Remove(key, hash);
+// If e != NULL, finish removing *e from the cache; it has already been removed
+// from the hash table.  Return whether e != NULL.  Requires mutex_ held.
+bool LRUCache::FinishErase(LRUHandle* e) {
   if (e != NULL) {
+    assert(e->in_cache);
     LRU_Remove(e);
+    e->in_cache = false;
+    usage_ -= e->charge;
     Unref(e);
   }
+  return e != NULL;
+}
+
+void LRUCache::Erase(const Slice& key, uint32_t hash) {
+  MutexLock l(&mutex_);
+  FinishErase(table_.Remove(key, hash));
 }
 
 static const int kNumShardBits = 4;
diff --git a/csrc/leveldb/util/cache_test.cc b/csrc/leveldb/util/cache_test.cc
index 4371671..b9bbce7 100644
--- a/csrc/leveldb/util/cache_test.cc
+++ b/csrc/leveldb/util/cache_test.cc
@@ -59,6 +59,11 @@
                                    &CacheTest::Deleter));
   }
 
+  Cache::Handle* InsertAndReturnHandle(int key, int value, int charge = 1) {
+    return cache_->Insert(EncodeKey(key), EncodeValue(value), charge,
+                          &CacheTest::Deleter);
+  }
+
   void Erase(int key) {
     cache_->Erase(EncodeKey(key));
   }
@@ -135,8 +140,11 @@
 TEST(CacheTest, EvictionPolicy) {
   Insert(100, 101);
   Insert(200, 201);
+  Insert(300, 301);
+  Cache::Handle* h = cache_->Lookup(EncodeKey(300));
 
-  // Frequently used entry must be kept around
+  // Frequently used entry must be kept around,
+  // as must things that are still in use.
   for (int i = 0; i < kCacheSize + 100; i++) {
     Insert(1000+i, 2000+i);
     ASSERT_EQ(2000+i, Lookup(1000+i));
@@ -144,6 +152,25 @@
   }
   ASSERT_EQ(101, Lookup(100));
   ASSERT_EQ(-1, Lookup(200));
+  ASSERT_EQ(301, Lookup(300));
+  cache_->Release(h);
+}
+
+TEST(CacheTest, UseExceedsCacheSize) {
+  // Overfill the cache, keeping handles on all inserted entries.
+  std::vector<Cache::Handle*> h;
+  for (int i = 0; i < kCacheSize + 100; i++) {
+    h.push_back(InsertAndReturnHandle(1000+i, 2000+i));
+  }
+
+  // Check that all the entries can be found in the cache.
+  for (int i = 0; i < h.size(); i++) {
+    ASSERT_EQ(2000+i, Lookup(1000+i));
+  }
+
+  for (int i = 0; i < h.size(); i++) {
+    cache_->Release(h[i]);
+  }
 }
 
 TEST(CacheTest, HeavyEntries) {