blob: d7d8ba6e77852d28344303bd132ca1d2ecc9e3ba [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package memstore
import (
"sync"
"v.io/v23/verror"
"v.io/x/ref/services/syncbase/store"
"v.io/x/ref/services/syncbase/store/ptrie"
)
type stream struct {
mu sync.Mutex
node *store.ResourceNode
pstream *ptrie.Stream
err error
done bool
}
var _ store.Stream = (*stream)(nil)
func newStream(data *ptrie.T, parent *store.ResourceNode, start, limit []byte) *stream {
s := &stream{
node: store.NewResourceNode(),
pstream: data.Scan(start, limit),
}
parent.AddChild(s.node, func() {
s.Cancel()
})
return s
}
// Advance implements the store.Stream interface.
func (s *stream) Advance() bool {
s.mu.Lock()
defer s.mu.Unlock()
if s.done {
return false
}
if s.done = !s.pstream.Advance(); s.done {
s.node.Close()
}
return !s.done
}
// Key implements the store.Stream interface.
func (s *stream) Key(keybuf []byte) []byte {
s.mu.Lock()
defer s.mu.Unlock()
return s.pstream.Key(keybuf)
}
// Value implements the store.Stream interface.
func (s *stream) Value(valbuf []byte) []byte {
s.mu.Lock()
defer s.mu.Unlock()
return store.CopyBytes(valbuf, s.pstream.Value().([]byte))
}
// Err implements the store.Stream interface.
func (s *stream) Err() error {
s.mu.Lock()
defer s.mu.Unlock()
return store.ConvertError(s.err)
}
// Cancel implements the store.Stream interface.
func (s *stream) Cancel() {
s.mu.Lock()
if !s.done {
s.done = true
s.node.Close()
s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgCanceledStream)
}
s.mu.Unlock()
}