Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 1 | // Copyright 2015 The Vanadium Authors. All rights reserved. |
| 2 | // Use of this source code is governed by a BSD-style |
| 3 | // license that can be found in the LICENSE file. |
| 4 | |
| 5 | package memstore |
| 6 | |
| 7 | import ( |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame^] | 8 | "errors" |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 9 | "sort" |
| 10 | "sync" |
| 11 | |
| 12 | "v.io/syncbase/x/ref/services/syncbase/store" |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 13 | ) |
| 14 | |
| 15 | type stream struct { |
| 16 | mu sync.Mutex |
| 17 | sn *snapshot |
| 18 | keys []string |
| 19 | currIndex int |
| 20 | currKey *string |
| 21 | err error |
| 22 | } |
| 23 | |
| 24 | var _ store.Stream = (*stream)(nil) |
| 25 | |
| 26 | func newStream(sn *snapshot, start, end []byte) *stream { |
| 27 | keys := []string{} |
| 28 | for k := range sn.data { |
Adam Sadovsky | 4926119 | 2015-05-19 17:39:59 -0700 | [diff] [blame] | 29 | if k >= string(start) && (len(end) == 0 || k < string(end)) { |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 30 | keys = append(keys, k) |
| 31 | } |
| 32 | } |
| 33 | sort.Strings(keys) |
| 34 | return &stream{ |
| 35 | sn: sn, |
| 36 | keys: keys, |
| 37 | currIndex: -1, |
| 38 | } |
| 39 | } |
| 40 | |
| 41 | // Advance implements the store.Stream interface. |
| 42 | func (s *stream) Advance() bool { |
| 43 | // TODO(sadovsky): Advance should return false and Err should return a non-nil |
| 44 | // error if the Store was closed, or if the Snapshot was closed, or if the |
| 45 | // Transaction was committed or aborted (or timed out). |
| 46 | s.mu.Lock() |
| 47 | defer s.mu.Unlock() |
| 48 | if s.err != nil { |
| 49 | s.currKey = nil |
| 50 | } else { |
| 51 | s.currIndex++ |
| 52 | if s.currIndex < len(s.keys) { |
| 53 | s.currKey = &s.keys[s.currIndex] |
| 54 | } else { |
| 55 | s.currKey = nil |
| 56 | } |
| 57 | } |
| 58 | return s.currKey != nil |
| 59 | } |
| 60 | |
| 61 | // Key implements the store.Stream interface. |
| 62 | func (s *stream) Key(keybuf []byte) []byte { |
| 63 | s.mu.Lock() |
| 64 | defer s.mu.Unlock() |
| 65 | if s.currKey == nil { |
| 66 | panic("nothing staged") |
| 67 | } |
| 68 | return store.CopyBytes(keybuf, []byte(*s.currKey)) |
| 69 | } |
| 70 | |
| 71 | // Value implements the store.Stream interface. |
| 72 | func (s *stream) Value(valbuf []byte) []byte { |
| 73 | s.mu.Lock() |
| 74 | defer s.mu.Unlock() |
| 75 | if s.currKey == nil { |
| 76 | panic("nothing staged") |
| 77 | } |
| 78 | return store.CopyBytes(valbuf, s.sn.data[*s.currKey]) |
| 79 | } |
| 80 | |
| 81 | // Err implements the store.Stream interface. |
| 82 | func (s *stream) Err() error { |
| 83 | s.mu.Lock() |
| 84 | defer s.mu.Unlock() |
| 85 | return s.err |
| 86 | } |
| 87 | |
| 88 | // Cancel implements the store.Stream interface. |
| 89 | func (s *stream) Cancel() { |
| 90 | s.mu.Lock() |
| 91 | defer s.mu.Unlock() |
Sergey Rogulenko | 0dbfe07 | 2015-05-19 20:10:18 -0700 | [diff] [blame^] | 92 | s.err = errors.New("canceled stream") |
Adam Sadovsky | c18c8ca | 2015-05-08 18:05:46 -0700 | [diff] [blame] | 93 | } |