veyron/runtimes/google/ipc: implement a store for the results of previous RP
This is the first in a series of CLs to implement support for allowing a
dependent RPC call to access the results of a dependee RPC call without havi
to roundtrip the results back to the client. Thus, a sequence of the form:
x := A(); B(x), can be executed as B(A()) on the server.
Change-Id: Ie69717924a1d951ae0431b78a8b7084db6d2d926
diff --git a/runtimes/google/ipc/results_store.go b/runtimes/google/ipc/results_store.go
new file mode 100644
index 0000000..343cdd1
--- /dev/null
+++ b/runtimes/google/ipc/results_store.go
@@ -0,0 +1,110 @@
+package ipc
+
+import (
+ "container/heap"
+ "sync"
+)
+
+const (
+ // TODO(cnicolaou): what are good initial values? Large servers want
+ // large values, most won't.
+ initialResults = 1000
+ initialOutOfOrderResults = 100
+)
+
+type results []interface{}
+
+// Implement heap.Interface to maintain an ordered min-heap of uint64s.
+type intHeap []uint64
+
+func (h intHeap) Len() int { return len(h) }
+func (h intHeap) Less(i, j int) bool { return h[i] < h[j] }
+func (h intHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
+
+func (h *intHeap) Push(x interface{}) {
+ // Push and Pop use pointer receivers because they modify the slice's length,
+ // not just its contents.
+ *h = append(*h, x.(uint64))
+}
+
+func (h *intHeap) Pop() interface{} {
+ old := *h
+ n := len(old)
+ x := old[n-1]
+ *h = old[0 : n-1]
+ return x
+}
+
+// resultStore is used to store the results of previously exited RPCs
+// until the client indicates that it has received those results and hence
+// the server no longer needs to store them. Store entries are added
+// one at a time, but the client indicates that it has received entries up to
+// given value and that all entries with key lower than that can be deleted.
+// Retrieving values is complicated by the fact that requests may arrive
+// out of order and hence one RPC may have to wait for another to complete
+// in order to access its stored results. A separate map of channels is
+// used to implement this synchronization.
+// TODO(cnicolaou): Servers protect themselves from badly behaved clients by
+// refusing to allocate beyond a certain number of results.
+type resultsStore struct {
+ sync.Mutex
+ store map[uint64]results
+ chans map[uint64]chan struct{}
+ keys intHeap
+}
+
+func newStore() *resultsStore {
+ r := &resultsStore{
+ store: make(map[uint64]results, initialResults),
+ chans: make(map[uint64]chan struct{}, initialOutOfOrderResults),
+ }
+ heap.Init(&r.keys)
+ return r
+}
+
+func (rs *resultsStore) addEntry(key uint64, data results) {
+ rs.Lock()
+ if _, present := rs.store[key]; !present {
+ rs.store[key] = data
+ heap.Push(&rs.keys, key)
+ }
+ if ch, present := rs.chans[key]; present {
+ close(ch)
+ delete(rs.chans, key)
+ }
+ rs.Unlock()
+}
+
+func (rs *resultsStore) removeEntriesTo(to uint64) {
+ rs.Lock()
+ for rs.keys.Len() > 0 && to >= rs.keys[0] {
+ k := heap.Pop(&rs.keys).(uint64)
+ delete(rs.store, k)
+ if ch, present := rs.chans[k]; present {
+ close(ch)
+ delete(rs.chans, k)
+ }
+ }
+ rs.Unlock()
+}
+
+func (rs *resultsStore) waitForEntry(key uint64) results {
+ rs.Lock()
+ if r, present := rs.store[key]; present {
+ rs.Unlock()
+ return r
+ }
+ // entry is not present, need to wait for it.
+ ch, present := rs.chans[key]
+ if !present {
+ heap.Push(&rs.keys, key)
+ ch = make(chan struct{}, 1)
+ rs.chans[key] = ch
+ }
+ rs.Unlock()
+ <-ch
+ rs.Lock()
+ delete(rs.chans, key) // Allow the channel to be GC'ed.
+ defer rs.Unlock()
+ return rs.store[key] // This may be nil if the entry has been removed
+}
diff --git a/runtimes/google/ipc/results_store_test.go b/runtimes/google/ipc/results_store_test.go
new file mode 100644
index 0000000..d5fa409
--- /dev/null
+++ b/runtimes/google/ipc/results_store_test.go
@@ -0,0 +1,132 @@
+package ipc
+
+import (
+ "math/rand"
+ "sort"
+ "sync"
+ "testing"
+)
+
+func randomKeys() []uint64 {
+ n := (rand.Intn(256*10) / 10) + 256
+ k := make([]uint64, n)
+ for i := 0; i < n; i++ {
+ k[i] = uint64(rand.Int63())
+ }
+ return k
+}
+
+type keySlice []uint64
+
+func (p keySlice) Len() int { return len(p) }
+func (p keySlice) Less(i, j int) bool { return p[i] < p[j] }
+func (p keySlice) Swap(i, j int) { p[i], p[j] = p[j], p[i] }
+func (p keySlice) Sort() { sort.Sort(p) }
+
+func TestStoreRandom(t *testing.T) {
+ store := newStore()
+ keys := randomKeys()
+
+ for i := 0; i < len(keys); i++ {
+ r := []interface{}{i}
+ store.addEntry(keys[i], r)
+ }
+ if len(store.store) != len(keys) {
+ t.Errorf("num stored entries: got %d, want %d", len(store.store), len(keys))
+ }
+ for i := 0; i < len(keys); i++ {
+ // Each call to removeEntries will remove an unknown number of entries
+ // depending on the original randomised value of the ints.
+ store.removeEntriesTo(keys[i])
+ }
+ if len(store.store) != 0 {
+ t.Errorf("store is not empty: %d", len(store.store))
+ }
+}
+
+func TestStoreOrdered(t *testing.T) {
+ store := newStore()
+ keys := randomKeys()
+
+ for i := 0; i < len(keys); i++ {
+ r := []interface{}{i}
+ store.addEntry(keys[i], r)
+ }
+ if len(store.store) != len(keys) {
+ t.Errorf("num stored entries: got %d, want %d", len(store.store), len(keys))
+ }
+
+ (keySlice(keys)).Sort()
+ l := len(keys)
+ for i := 0; i < len(keys); i++ {
+ store.removeEntriesTo(keys[i])
+ l--
+ if len(store.store) != l {
+ t.Errorf("failed to remove a single item(%d): %d != %d", keys[i], len(store.store), l)
+ }
+ }
+ if len(store.store) != 0 {
+ t.Errorf("store is not empty: %d", len(store.store))
+ }
+}
+
+func TestStoreWaitForEntry(t *testing.T) {
+ store := newStore()
+ store.addEntry(1, []interface{}{"1"})
+ r := store.waitForEntry(1)
+ if r[0].(string) != "1" {
+ t.Errorf("Got: %q, Want: %q", r[0], "1")
+ }
+ ch := make(chan string)
+ go func(ch chan string) {
+ r := store.waitForEntry(2)
+ ch <- r[0].(string)
+ }(ch)
+ store.addEntry(2, []interface{}{"2"})
+ if result := <-ch; result != "2" {
+ t.Errorf("Got: %q, Want: %q", r[0], "2")
+ }
+}
+
+func TestStoreWaitForEntryRandom(t *testing.T) {
+ store := newStore()
+ keys := randomKeys()
+ var wg sync.WaitGroup
+ for _, k := range keys {
+ wg.Add(1)
+ go func(t *testing.T, id uint64) {
+ r := store.waitForEntry(id)
+ if r[0].(uint64) != id {
+ t.Errorf("Got: %d, Want: %d", r[0].(uint64), id)
+ }
+ wg.Done()
+ }(t, k)
+ }
+ (keySlice(keys)).Sort()
+ for _, k := range keys {
+ store.addEntry(k, []interface{}{k})
+ }
+ wg.Wait()
+}
+
+// Test removing an entry that's being waited for.
+func TestStoreWaitForRemovedEntry(t *testing.T) {
+ store := newStore()
+ keys := randomKeys()
+ var wg sync.WaitGroup
+ for _, k := range keys {
+ wg.Add(1)
+ go func(t *testing.T, id uint64) {
+ r := store.waitForEntry(id)
+ if r != nil {
+ t.Errorf("Got: %d, Want: nil", r[0].(uint64))
+ }
+ wg.Done()
+ }(t, k)
+ }
+ (keySlice(keys)).Sort()
+ for _, k := range keys {
+ store.removeEntriesTo(k)
+ }
+ wg.Wait()
+}