Cosmos Nicolaou | 174a44e | 2014-05-16 13:59:30 -0700 | [diff] [blame] | 1 | package ipc |
| 2 | |
| 3 | import ( |
| 4 | "container/heap" |
| 5 | "sync" |
| 6 | ) |
| 7 | |
| 8 | const ( |
| 9 | // TODO(cnicolaou): what are good initial values? Large servers want |
| 10 | // large values, most won't. |
| 11 | initialResults = 1000 |
| 12 | initialOutOfOrderResults = 100 |
| 13 | ) |
| 14 | |
| 15 | type results []interface{} |
| 16 | |
| 17 | // Implement heap.Interface to maintain an ordered min-heap of uint64s. |
| 18 | type intHeap []uint64 |
| 19 | |
| 20 | func (h intHeap) Len() int { return len(h) } |
| 21 | func (h intHeap) Less(i, j int) bool { return h[i] < h[j] } |
| 22 | func (h intHeap) Swap(i, j int) { h[i], h[j] = h[j], h[i] } |
| 23 | |
| 24 | func (h *intHeap) Push(x interface{}) { |
| 25 | // Push and Pop use pointer receivers because they modify the slice's length, |
| 26 | // not just its contents. |
| 27 | *h = append(*h, x.(uint64)) |
| 28 | } |
| 29 | |
| 30 | func (h *intHeap) Pop() interface{} { |
| 31 | old := *h |
| 32 | n := len(old) |
| 33 | x := old[n-1] |
| 34 | *h = old[0 : n-1] |
| 35 | return x |
| 36 | } |
| 37 | |
| 38 | // resultStore is used to store the results of previously exited RPCs |
| 39 | // until the client indicates that it has received those results and hence |
| 40 | // the server no longer needs to store them. Store entries are added |
| 41 | // one at a time, but the client indicates that it has received entries up to |
| 42 | // given value and that all entries with key lower than that can be deleted. |
| 43 | // Retrieving values is complicated by the fact that requests may arrive |
| 44 | // out of order and hence one RPC may have to wait for another to complete |
| 45 | // in order to access its stored results. A separate map of channels is |
| 46 | // used to implement this synchronization. |
| 47 | // TODO(cnicolaou): Servers protect themselves from badly behaved clients by |
| 48 | // refusing to allocate beyond a certain number of results. |
| 49 | type resultsStore struct { |
| 50 | sync.Mutex |
| 51 | store map[uint64]results |
| 52 | chans map[uint64]chan struct{} |
| 53 | keys intHeap |
Asim Shankar | c14225c | 2014-05-23 17:18:36 -0700 | [diff] [blame] | 54 | // TODO(cnicolaou): Should addEntry/waitForEntry return an error when |
| 55 | // the calls do not match the frontier? |
| 56 | frontier uint64 // results with index less than this have been removed. |
Cosmos Nicolaou | 174a44e | 2014-05-16 13:59:30 -0700 | [diff] [blame] | 57 | } |
| 58 | |
| 59 | func newStore() *resultsStore { |
| 60 | r := &resultsStore{ |
| 61 | store: make(map[uint64]results, initialResults), |
| 62 | chans: make(map[uint64]chan struct{}, initialOutOfOrderResults), |
| 63 | } |
| 64 | heap.Init(&r.keys) |
| 65 | return r |
| 66 | } |
| 67 | |
| 68 | func (rs *resultsStore) addEntry(key uint64, data results) { |
| 69 | rs.Lock() |
Asim Shankar | c14225c | 2014-05-23 17:18:36 -0700 | [diff] [blame] | 70 | if _, present := rs.store[key]; !present && rs.frontier <= key { |
Cosmos Nicolaou | 174a44e | 2014-05-16 13:59:30 -0700 | [diff] [blame] | 71 | rs.store[key] = data |
| 72 | heap.Push(&rs.keys, key) |
| 73 | } |
| 74 | if ch, present := rs.chans[key]; present { |
| 75 | close(ch) |
| 76 | delete(rs.chans, key) |
| 77 | } |
| 78 | rs.Unlock() |
| 79 | } |
| 80 | |
| 81 | func (rs *resultsStore) removeEntriesTo(to uint64) { |
| 82 | rs.Lock() |
Asim Shankar | c14225c | 2014-05-23 17:18:36 -0700 | [diff] [blame] | 83 | if rs.frontier > to { |
| 84 | rs.Unlock() |
| 85 | return |
| 86 | } |
| 87 | rs.frontier = to + 1 |
Cosmos Nicolaou | 174a44e | 2014-05-16 13:59:30 -0700 | [diff] [blame] | 88 | for rs.keys.Len() > 0 && to >= rs.keys[0] { |
| 89 | k := heap.Pop(&rs.keys).(uint64) |
| 90 | delete(rs.store, k) |
| 91 | if ch, present := rs.chans[k]; present { |
| 92 | close(ch) |
| 93 | delete(rs.chans, k) |
| 94 | } |
| 95 | } |
| 96 | rs.Unlock() |
| 97 | } |
| 98 | |
| 99 | func (rs *resultsStore) waitForEntry(key uint64) results { |
| 100 | rs.Lock() |
| 101 | if r, present := rs.store[key]; present { |
| 102 | rs.Unlock() |
| 103 | return r |
| 104 | } |
Asim Shankar | c14225c | 2014-05-23 17:18:36 -0700 | [diff] [blame] | 105 | if key < rs.frontier { |
| 106 | rs.Unlock() |
| 107 | return nil |
| 108 | } |
Cosmos Nicolaou | 174a44e | 2014-05-16 13:59:30 -0700 | [diff] [blame] | 109 | // entry is not present, need to wait for it. |
| 110 | ch, present := rs.chans[key] |
| 111 | if !present { |
| 112 | heap.Push(&rs.keys, key) |
| 113 | ch = make(chan struct{}, 1) |
| 114 | rs.chans[key] = ch |
| 115 | } |
| 116 | rs.Unlock() |
| 117 | <-ch |
| 118 | rs.Lock() |
Cosmos Nicolaou | 174a44e | 2014-05-16 13:59:30 -0700 | [diff] [blame] | 119 | defer rs.Unlock() |
Asim Shankar | c14225c | 2014-05-23 17:18:36 -0700 | [diff] [blame] | 120 | delete(rs.chans, key) // Allow the channel to be GC'ed. |
| 121 | return rs.store[key] // This may be nil if the entry has been removed |
Cosmos Nicolaou | 174a44e | 2014-05-16 13:59:30 -0700 | [diff] [blame] | 122 | } |