blob: 9ad89a64808cf9b764ee5fa92634af6ed000d6ec [file] [log] [blame]
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -07001// Copyright 2015 The Vanadium Authors. All rights reserved.
2// Use of this source code is governed by a BSD-style
3// license that can be found in the LICENSE file.
4
5package memstore
6
7import (
8 "sort"
9 "sync"
10
11 "v.io/syncbase/x/ref/services/syncbase/store"
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -070012 "v.io/v23/verror"
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070013)
14
15type stream struct {
16 mu sync.Mutex
Sergey Rogulenko95baa662015-05-22 15:07:06 -070017 node *store.ResourceNode
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070018 sn *snapshot
19 keys []string
20 currIndex int
21 currKey *string
22 err error
23}
24
25var _ store.Stream = (*stream)(nil)
26
Sergey Rogulenko95baa662015-05-22 15:07:06 -070027func newStream(sn *snapshot, parent *store.ResourceNode, start, limit []byte) *stream {
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070028 keys := []string{}
29 for k := range sn.data {
Adam Sadovskyf437f332015-05-19 23:03:22 -070030 if k >= string(start) && (len(limit) == 0 || k < string(limit)) {
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070031 keys = append(keys, k)
32 }
33 }
34 sort.Strings(keys)
Sergey Rogulenko95baa662015-05-22 15:07:06 -070035 s := &stream{
36 node: store.NewResourceNode(),
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070037 sn: sn,
38 keys: keys,
39 currIndex: -1,
40 }
Sergey Rogulenko95baa662015-05-22 15:07:06 -070041 parent.AddChild(s.node, func() {
42 s.Cancel()
43 })
44 return s
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070045}
46
47// Advance implements the store.Stream interface.
48func (s *stream) Advance() bool {
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070049 s.mu.Lock()
50 defer s.mu.Unlock()
51 if s.err != nil {
52 s.currKey = nil
53 } else {
54 s.currIndex++
55 if s.currIndex < len(s.keys) {
56 s.currKey = &s.keys[s.currIndex]
57 } else {
58 s.currKey = nil
59 }
60 }
61 return s.currKey != nil
62}
63
64// Key implements the store.Stream interface.
65func (s *stream) Key(keybuf []byte) []byte {
66 s.mu.Lock()
67 defer s.mu.Unlock()
68 if s.currKey == nil {
69 panic("nothing staged")
70 }
71 return store.CopyBytes(keybuf, []byte(*s.currKey))
72}
73
74// Value implements the store.Stream interface.
75func (s *stream) Value(valbuf []byte) []byte {
76 s.mu.Lock()
77 defer s.mu.Unlock()
78 if s.currKey == nil {
79 panic("nothing staged")
80 }
81 return store.CopyBytes(valbuf, s.sn.data[*s.currKey])
82}
83
84// Err implements the store.Stream interface.
85func (s *stream) Err() error {
86 s.mu.Lock()
87 defer s.mu.Unlock()
Adam Sadovskya3fc33c2015-06-02 18:44:46 -070088 return convertError(s.err)
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -070089}
90
91// Cancel implements the store.Stream interface.
92func (s *stream) Cancel() {
93 s.mu.Lock()
94 defer s.mu.Unlock()
Sergey Rogulenkoa53e60f2015-05-22 11:05:01 -070095 if s.err != nil {
96 return
97 }
Sergey Rogulenko95baa662015-05-22 15:07:06 -070098 s.node.Close()
Adam Sadovsky8db74432015-05-29 17:37:32 -070099 s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgCanceledStream)
Adam Sadovskyc18c8ca2015-05-08 18:05:46 -0700100}