blob: d029306b9825706377976f4dd8b313b710e6c4d5 [file] [log] [blame]
Cosmos Nicolaou174a44e2014-05-16 13:59:30 -07001package ipc
2
3import (
4 "container/heap"
5 "sync"
6)
7
8const (
9 // TODO(cnicolaou): what are good initial values? Large servers want
10 // large values, most won't.
11 initialResults = 1000
12 initialOutOfOrderResults = 100
13)
14
15type results []interface{}
16
17// Implement heap.Interface to maintain an ordered min-heap of uint64s.
18type intHeap []uint64
19
20func (h intHeap) Len() int { return len(h) }
21func (h intHeap) Less(i, j int) bool { return h[i] < h[j] }
22func (h intHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] }
23
24func (h *intHeap) Push(x interface{}) {
25 // Push and Pop use pointer receivers because they modify the slice's length,
26 // not just its contents.
27 *h = append(*h, x.(uint64))
28}
29
30func (h *intHeap) Pop() interface{} {
31 old := *h
32 n := len(old)
33 x := old[n-1]
34 *h = old[0 : n-1]
35 return x
36}
37
38// resultStore is used to store the results of previously exited RPCs
39// until the client indicates that it has received those results and hence
40// the server no longer needs to store them. Store entries are added
41// one at a time, but the client indicates that it has received entries up to
42// given value and that all entries with key lower than that can be deleted.
43// Retrieving values is complicated by the fact that requests may arrive
44// out of order and hence one RPC may have to wait for another to complete
45// in order to access its stored results. A separate map of channels is
46// used to implement this synchronization.
47// TODO(cnicolaou): Servers protect themselves from badly behaved clients by
48// refusing to allocate beyond a certain number of results.
49type resultsStore struct {
50 sync.Mutex
51 store map[uint64]results
52 chans map[uint64]chan struct{}
53 keys intHeap
Asim Shankarc14225c2014-05-23 17:18:36 -070054 // TODO(cnicolaou): Should addEntry/waitForEntry return an error when
55 // the calls do not match the frontier?
56 frontier uint64 // results with index less than this have been removed.
Cosmos Nicolaou174a44e2014-05-16 13:59:30 -070057}
58
59func newStore() *resultsStore {
60 r := &resultsStore{
61 store: make(map[uint64]results, initialResults),
62 chans: make(map[uint64]chan struct{}, initialOutOfOrderResults),
63 }
64 heap.Init(&r.keys)
65 return r
66}
67
68func (rs *resultsStore) addEntry(key uint64, data results) {
69 rs.Lock()
Asim Shankarc14225c2014-05-23 17:18:36 -070070 if _, present := rs.store[key]; !present && rs.frontier <= key {
Cosmos Nicolaou174a44e2014-05-16 13:59:30 -070071 rs.store[key] = data
72 heap.Push(&rs.keys, key)
73 }
74 if ch, present := rs.chans[key]; present {
75 close(ch)
76 delete(rs.chans, key)
77 }
78 rs.Unlock()
79}
80
81func (rs *resultsStore) removeEntriesTo(to uint64) {
82 rs.Lock()
Asim Shankarc14225c2014-05-23 17:18:36 -070083 if rs.frontier > to {
84 rs.Unlock()
85 return
86 }
87 rs.frontier = to + 1
Cosmos Nicolaou174a44e2014-05-16 13:59:30 -070088 for rs.keys.Len() > 0 && to >= rs.keys[0] {
89 k := heap.Pop(&rs.keys).(uint64)
90 delete(rs.store, k)
91 if ch, present := rs.chans[k]; present {
92 close(ch)
93 delete(rs.chans, k)
94 }
95 }
96 rs.Unlock()
97}
98
99func (rs *resultsStore) waitForEntry(key uint64) results {
100 rs.Lock()
101 if r, present := rs.store[key]; present {
102 rs.Unlock()
103 return r
104 }
Asim Shankarc14225c2014-05-23 17:18:36 -0700105 if key < rs.frontier {
106 rs.Unlock()
107 return nil
108 }
Cosmos Nicolaou174a44e2014-05-16 13:59:30 -0700109 // entry is not present, need to wait for it.
110 ch, present := rs.chans[key]
111 if !present {
112 heap.Push(&rs.keys, key)
113 ch = make(chan struct{}, 1)
114 rs.chans[key] = ch
115 }
116 rs.Unlock()
117 <-ch
118 rs.Lock()
Cosmos Nicolaou174a44e2014-05-16 13:59:30 -0700119 defer rs.Unlock()
Asim Shankarc14225c2014-05-23 17:18:36 -0700120 delete(rs.chans, key) // Allow the channel to be GC'ed.
121 return rs.store[key] // This may be nil if the entry has been removed
Cosmos Nicolaou174a44e2014-05-16 13:59:30 -0700122}