veyron/runtimes/google/ipc: implement a store for the results of previous RP

This is the first in a series of CLs to implement support for allowing a
dependent RPC call to access the results of a dependee RPC call without havi
to roundtrip the results back to the client. Thus, a sequence of the form:
x := A(); B(x), can be executed as B(A()) on the server.

Change-Id: Ie69717924a1d951ae0431b78a8b7084db6d2d926
diff --git a/runtimes/google/ipc/results_store.go b/runtimes/google/ipc/results_store.go
new file mode 100644
index 0000000..343cdd1
--- /dev/null
+++ b/runtimes/google/ipc/results_store.go
@@ -0,0 +1,110 @@
+package ipc
+
+import (
+	"container/heap"
+	"sync"
+)
+
+const (
+	// TODO(cnicolaou): what are good initial values? Large servers want
+	// large values, most won't.
+	initialResults           = 1000
+	initialOutOfOrderResults = 100
+)
+
+type results []interface{}
+
+// Implement heap.Interface to maintain an ordered min-heap of uint64s.
+type intHeap []uint64
+
+func (h intHeap) Len() int           { return len(h) }
+func (h intHeap) Less(i, j int) bool { return h[i] < h[j] }
+func (h intHeap) Swap(i, j int)      { h[i], h[j] = h[j], h[i] }
+
+func (h *intHeap) Push(x interface{}) {
+	// Push and Pop use pointer receivers because they modify the slice's length,
+	// not just its contents.
+	*h = append(*h, x.(uint64))
+}
+
+func (h *intHeap) Pop() interface{} {
+	old := *h
+	n := len(old)
+	x := old[n-1]
+	*h = old[0 : n-1]
+	return x
+}
+
+// resultStore is used to store the results of previously exited RPCs
+// until the client indicates that it has received those results and hence
+// the server no longer needs to store them. Store entries are added
+// one at a time, but the client indicates that it has received entries up to
+// given value and that all entries with key lower than that can be deleted.
+// Retrieving values is complicated by the fact that requests may arrive
+// out of order and hence one RPC may have to wait for another to complete
+// in order to access its stored results. A separate map of channels is
+// used to implement this synchronization.
+// TODO(cnicolaou): Servers protect themselves from badly behaved clients by
+// refusing to allocate beyond a certain number of results.
+type resultsStore struct {
+	sync.Mutex
+	store map[uint64]results
+	chans map[uint64]chan struct{}
+	keys  intHeap
+}
+
+func newStore() *resultsStore {
+	r := &resultsStore{
+		store: make(map[uint64]results, initialResults),
+		chans: make(map[uint64]chan struct{}, initialOutOfOrderResults),
+	}
+	heap.Init(&r.keys)
+	return r
+}
+
+func (rs *resultsStore) addEntry(key uint64, data results) {
+	rs.Lock()
+	if _, present := rs.store[key]; !present {
+		rs.store[key] = data
+		heap.Push(&rs.keys, key)
+	}
+	if ch, present := rs.chans[key]; present {
+		close(ch)
+		delete(rs.chans, key)
+	}
+	rs.Unlock()
+}
+
+func (rs *resultsStore) removeEntriesTo(to uint64) {
+	rs.Lock()
+	for rs.keys.Len() > 0 && to >= rs.keys[0] {
+		k := heap.Pop(&rs.keys).(uint64)
+		delete(rs.store, k)
+		if ch, present := rs.chans[k]; present {
+			close(ch)
+			delete(rs.chans, k)
+		}
+	}
+	rs.Unlock()
+}
+
+func (rs *resultsStore) waitForEntry(key uint64) results {
+	rs.Lock()
+	if r, present := rs.store[key]; present {
+		rs.Unlock()
+		return r
+	}
+	// entry is not present, need to wait for it.
+	ch, present := rs.chans[key]
+	if !present {
+		heap.Push(&rs.keys, key)
+		ch = make(chan struct{}, 1)
+		rs.chans[key] = ch
+	}
+	rs.Unlock()
+	<-ch
+	rs.Lock()
+	delete(rs.chans, key) // Allow the channel to be GC'ed.
+	defer rs.Unlock()
+	return rs.store[key] // This may be nil if the entry has been removed
+}
diff --git a/runtimes/google/ipc/results_store_test.go b/runtimes/google/ipc/results_store_test.go
new file mode 100644
index 0000000..d5fa409
--- /dev/null
+++ b/runtimes/google/ipc/results_store_test.go
@@ -0,0 +1,132 @@
+package ipc
+
+import (
+	"math/rand"
+	"sort"
+	"sync"
+	"testing"
+)
+
+func randomKeys() []uint64 {
+	n := (rand.Intn(256*10) / 10) + 256
+	k := make([]uint64, n)
+	for i := 0; i < n; i++ {
+		k[i] = uint64(rand.Int63())
+	}
+	return k
+}
+
+type keySlice []uint64
+
+func (p keySlice) Len() int           { return len(p) }
+func (p keySlice) Less(i, j int) bool { return p[i] < p[j] }
+func (p keySlice) Swap(i, j int)      { p[i], p[j] = p[j], p[i] }
+func (p keySlice) Sort()              { sort.Sort(p) }
+
+func TestStoreRandom(t *testing.T) {
+	store := newStore()
+	keys := randomKeys()
+
+	for i := 0; i < len(keys); i++ {
+		r := []interface{}{i}
+		store.addEntry(keys[i], r)
+	}
+	if len(store.store) != len(keys) {
+		t.Errorf("num stored entries: got %d, want %d", len(store.store), len(keys))
+	}
+	for i := 0; i < len(keys); i++ {
+		// Each call to removeEntries will remove an unknown number of entries
+		// depending on the original randomised value of the ints.
+		store.removeEntriesTo(keys[i])
+	}
+	if len(store.store) != 0 {
+		t.Errorf("store is not empty: %d", len(store.store))
+	}
+}
+
+func TestStoreOrdered(t *testing.T) {
+	store := newStore()
+	keys := randomKeys()
+
+	for i := 0; i < len(keys); i++ {
+		r := []interface{}{i}
+		store.addEntry(keys[i], r)
+	}
+	if len(store.store) != len(keys) {
+		t.Errorf("num stored entries: got %d, want %d", len(store.store), len(keys))
+	}
+
+	(keySlice(keys)).Sort()
+	l := len(keys)
+	for i := 0; i < len(keys); i++ {
+		store.removeEntriesTo(keys[i])
+		l--
+		if len(store.store) != l {
+			t.Errorf("failed to remove a single item(%d): %d != %d", keys[i], len(store.store), l)
+		}
+	}
+	if len(store.store) != 0 {
+		t.Errorf("store is not empty: %d", len(store.store))
+	}
+}
+
+func TestStoreWaitForEntry(t *testing.T) {
+	store := newStore()
+	store.addEntry(1, []interface{}{"1"})
+	r := store.waitForEntry(1)
+	if r[0].(string) != "1" {
+		t.Errorf("Got: %q, Want: %q", r[0], "1")
+	}
+	ch := make(chan string)
+	go func(ch chan string) {
+		r := store.waitForEntry(2)
+		ch <- r[0].(string)
+	}(ch)
+	store.addEntry(2, []interface{}{"2"})
+	if result := <-ch; result != "2" {
+		t.Errorf("Got: %q, Want: %q", r[0], "2")
+	}
+}
+
+func TestStoreWaitForEntryRandom(t *testing.T) {
+	store := newStore()
+	keys := randomKeys()
+	var wg sync.WaitGroup
+	for _, k := range keys {
+		wg.Add(1)
+		go func(t *testing.T, id uint64) {
+			r := store.waitForEntry(id)
+			if r[0].(uint64) != id {
+				t.Errorf("Got: %d, Want: %d", r[0].(uint64), id)
+			}
+			wg.Done()
+		}(t, k)
+	}
+	(keySlice(keys)).Sort()
+	for _, k := range keys {
+		store.addEntry(k, []interface{}{k})
+	}
+	wg.Wait()
+}
+
+// Test removing an entry that's being waited for.
+func TestStoreWaitForRemovedEntry(t *testing.T) {
+	store := newStore()
+	keys := randomKeys()
+	var wg sync.WaitGroup
+	for _, k := range keys {
+		wg.Add(1)
+		go func(t *testing.T, id uint64) {
+			r := store.waitForEntry(id)
+			if r != nil {
+				t.Errorf("Got: %d, Want: nil", r[0].(uint64))
+			}
+			wg.Done()
+		}(t, k)
+	}
+	(keySlice(keys)).Sort()
+	for _, k := range keys {
+		store.removeEntriesTo(k)
+	}
+	wg.Wait()
+}