blob: f60e27ed35768415d3f43dbe5ddcb2a92a8d4f94 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package memstore
import (
"errors"
"sort"
"sync"
"v.io/syncbase/x/ref/services/syncbase/store"
)
type stream struct {
mu sync.Mutex
sn *snapshot
keys []string
currIndex int
currKey *string
err error
}
var _ store.Stream = (*stream)(nil)
func newStream(sn *snapshot, start, limit []byte) *stream {
keys := []string{}
for k := range sn.data {
if k >= string(start) && (len(limit) == 0 || k < string(limit)) {
keys = append(keys, k)
}
}
sort.Strings(keys)
return &stream{
sn: sn,
keys: keys,
currIndex: -1,
}
}
// Advance implements the store.Stream interface.
func (s *stream) Advance() bool {
// TODO(sadovsky): Advance should return false and Err should return a non-nil
// error if the Store was closed, or if the Snapshot was closed, or if the
// Transaction was committed or aborted (or timed out).
s.mu.Lock()
defer s.mu.Unlock()
if s.err != nil {
s.currKey = nil
} else {
s.currIndex++
if s.currIndex < len(s.keys) {
s.currKey = &s.keys[s.currIndex]
} else {
s.currKey = nil
}
}
return s.currKey != nil
}
// Key implements the store.Stream interface.
func (s *stream) Key(keybuf []byte) []byte {
s.mu.Lock()
defer s.mu.Unlock()
if s.currKey == nil {
panic("nothing staged")
}
return store.CopyBytes(keybuf, []byte(*s.currKey))
}
// Value implements the store.Stream interface.
func (s *stream) Value(valbuf []byte) []byte {
s.mu.Lock()
defer s.mu.Unlock()
if s.currKey == nil {
panic("nothing staged")
}
return store.CopyBytes(valbuf, s.sn.data[*s.currKey])
}
// Err implements the store.Stream interface.
func (s *stream) Err() error {
s.mu.Lock()
defer s.mu.Unlock()
return s.err
}
// Cancel implements the store.Stream interface.
func (s *stream) Cancel() {
s.mu.Lock()
defer s.mu.Unlock()
s.err = errors.New("canceled stream")
}