blob: 9c8c3923a7dfde2ceb5ddd10581451a439a04823 [file] [log] [blame]
// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
package watchable
import (
"sync"
"v.io/syncbase/x/ref/services/syncbase/store"
)
// stream streams keys and values for versioned records.
type stream struct {
iit store.Stream
st store.StoreReader
mu sync.Mutex
err error
hasValue bool
key []byte
value []byte
}
var _ store.Stream = (*stream)(nil)
// newStreamVersioned creates a new stream. It assumes all records in range
// [start, limit) are managed, i.e. versioned.
func newStreamVersioned(st store.StoreReader, start, limit []byte) *stream {
checkTransactionOrSnapshot(st)
return &stream{
iit: st.Scan(makeVersionKey(start), makeVersionKey(limit)),
st: st,
}
}
// Advance implements the store.Stream interface.
func (s *stream) Advance() bool {
s.mu.Lock()
defer s.mu.Unlock()
s.hasValue = false
if s.err != nil {
return false
}
if advanced := s.iit.Advance(); !advanced {
return false
}
versionKey, version := s.iit.Key(nil), s.iit.Value(nil)
s.key = []byte(join(split(string(versionKey))[1:]...)) // drop "$version" prefix
s.value, s.err = s.st.Get(makeAtVersionKey(s.key, version), nil)
if s.err != nil {
return false
}
s.hasValue = true
return true
}
// Key implements the store.Stream interface.
func (s *stream) Key(keybuf []byte) []byte {
s.mu.Lock()
defer s.mu.Unlock()
if !s.hasValue {
panic("nothing staged")
}
return store.CopyBytes(keybuf, s.key)
}
// Value implements the store.Stream interface.
func (s *stream) Value(valbuf []byte) []byte {
s.mu.Lock()
defer s.mu.Unlock()
if !s.hasValue {
panic("nothing staged")
}
return store.CopyBytes(valbuf, s.value)
}
// Err implements the store.Stream interface.
func (s *stream) Err() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.err != nil {
return convertError(s.err)
}
return s.iit.Err()
}
// Cancel implements the store.Stream interface.
func (s *stream) Cancel() {
s.mu.Lock()
defer s.mu.Unlock()
if s.err != nil {
return
}
s.iit.Cancel()
}