blob: 221d33c3ed3e2996d00d3478f593e11967b7e26b [file] [log] [blame]
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package memstore
6
7import (
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -07008 "errors"
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -07009 "sort"
10 "sync"
11
12 "v.io/syncbase/x/ref/services/syncbase/store"
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070013)
14
15type stream struct {
16 mu sync.Mutex
17 sn *snapshot
18 keys []string
19 currIndex int
20 currKey *string
21 err error
22}
23
24var _ store.Stream = (*stream)(nil)
25
26func newStream(sn *snapshot, start, end []byte) *stream {
27 keys := []string{}
28 for k := range sn.data {
Adam Sadovsky49261192015-05-19 17:39:59 -070029 if k >= string(start) && (len(end) == 0 || k < string(end)) {
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070030 keys = append(keys, k)
31 }
32 }
33 sort.Strings(keys)
34 return &stream{
35 sn: sn,
36 keys: keys,
37 currIndex: -1,
38 }
39}
40
41// Advance implements the store.Stream interface.
42func (s *stream) Advance() bool {
43 // TODO(sadovsky): Advance should return false and Err should return a non-nil
44 // error if the Store was closed, or if the Snapshot was closed, or if the
45 // Transaction was committed or aborted (or timed out).
46 s.mu.Lock()
47 defer s.mu.Unlock()
48 if s.err != nil {
49 s.currKey = nil
50 } else {
51 s.currIndex++
52 if s.currIndex < len(s.keys) {
53 s.currKey = &s.keys[s.currIndex]
54 } else {
55 s.currKey = nil
56 }
57 }
58 return s.currKey != nil
59}
60
61// Key implements the store.Stream interface.
62func (s *stream) Key(keybuf []byte) []byte {
63 s.mu.Lock()
64 defer s.mu.Unlock()
65 if s.currKey == nil {
66 panic("nothing staged")
67 }
68 return store.CopyBytes(keybuf, []byte(*s.currKey))
69}
70
71// Value implements the store.Stream interface.
72func (s *stream) Value(valbuf []byte) []byte {
73 s.mu.Lock()
74 defer s.mu.Unlock()
75 if s.currKey == nil {
76 panic("nothing staged")
77 }
78 return store.CopyBytes(valbuf, s.sn.data[*s.currKey])
79}
80
81// Err implements the store.Stream interface.
82func (s *stream) Err() error {
83 s.mu.Lock()
84 defer s.mu.Unlock()
85 return s.err
86}
87
88// Cancel implements the store.Stream interface.
89func (s *stream) Cancel() {
90 s.mu.Lock()
91 defer s.mu.Unlock()
Sergey Rogulenko0dbfe072015-05-19 20:10:18 -070092 s.err = errors.New("canceled stream")
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070093}