// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

package leveldb

// #include "leveldb/c.h"
// #include "syncbase_leveldb.h"
import "C"
import (
	"bytes"
	"sync"

	"v.io/syncbase/x/ref/services/syncbase/store"
	"v.io/v23/verror"
)

// stream is a wrapper around LevelDB iterator that implements
// the store.Stream interface.
type stream struct {
	// mu protects the state of the stream.
	mu    sync.Mutex
	node  *store.ResourceNode
	cIter *C.syncbase_leveldb_iterator_t
	limit []byte

	hasAdvanced bool
	err         error

	// hasValue is true iff a value has been staged. If hasValue is true,
	// key and value point to the staged key/value pair. The underlying buffers
	// of key and value are allocated on the C heap until Cancel is called,
	// at which point they are copied to the Go heap.
	hasValue bool
	key      []byte
	value    []byte
}

var _ store.Stream = (*stream)(nil)

func newStream(d *db, parent *store.ResourceNode, start, limit []byte, cOpts *C.leveldb_readoptions_t) *stream {
	cStr, size := cSlice(start)
	cIter := C.syncbase_leveldb_create_iterator(d.cDb, cOpts, cStr, size)
	s := &stream{
		node:  store.NewResourceNode(),
		cIter: cIter,
		limit: limit,
	}
	parent.AddChild(s.node, func() {
		s.Cancel()
	})
	return s
}

// destroyLeveldbIter destroys the underlying C iterator.
// Assumes mu is held.
func (s *stream) destroyLeveldbIter() {
	s.node.Close()
	C.syncbase_leveldb_iter_destroy(s.cIter)
	s.cIter = nil
}

// Advance implements the store.Stream interface.
func (s *stream) Advance() bool {
	s.mu.Lock()
	defer s.mu.Unlock()
	s.hasValue = false
	if s.err != nil {
		return false
	}
	// The C iterator starts out initialized, pointing at the first value; we
	// shouldn't move it during the first Advance() call.
	if !s.hasAdvanced {
		s.hasAdvanced = true
	} else {
		C.syncbase_leveldb_iter_next(s.cIter)
	}
	if s.cIter.is_valid != 0 && (len(s.limit) == 0 || bytes.Compare(s.cKey(), s.limit) < 0) {
		s.hasValue = true
		s.key = s.cKey()
		s.value = s.cVal()
		return true
	}

	var cError *C.char
	C.syncbase_leveldb_iter_get_error(s.cIter, &cError)
	s.err = goError(cError)
	s.destroyLeveldbIter()
	return false
}

// Key implements the store.Stream interface.
func (s *stream) Key(keybuf []byte) []byte {
	s.mu.Lock()
	defer s.mu.Unlock()
	if !s.hasValue {
		panic("nothing staged")
	}
	return store.CopyBytes(keybuf, s.key)
}

// Value implements the store.Stream interface.
func (s *stream) Value(valbuf []byte) []byte {
	s.mu.Lock()
	defer s.mu.Unlock()
	if !s.hasValue {
		panic("nothing staged")
	}
	return store.CopyBytes(valbuf, s.value)
}

// Err implements the store.Stream interface.
func (s *stream) Err() error {
	s.mu.Lock()
	defer s.mu.Unlock()
	return convertError(s.err)
}

// Cancel implements the store.Stream interface.
// TODO(rogulenko): make Cancel non-blocking.
func (s *stream) Cancel() {
	s.mu.Lock()
	defer s.mu.Unlock()
	if s.err != nil {
		return
	}
	// s.hasValue will be false if Advance has never been called.
	if s.hasValue {
		// We copy the key and the value from the C heap to the Go heap before
		// deallocating the C iterator.
		s.key = store.CopyBytes(nil, s.cKey())
		s.value = store.CopyBytes(nil, s.cVal())
	}
	s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgCanceledStream)
	s.destroyLeveldbIter()
}

// cKey returns the current key.
// The returned []byte points to a buffer allocated on the C heap. This buffer
// is valid until the next call to Advance or Cancel.
func (it *stream) cKey() []byte {
	return goBytes(it.cIter.key, it.cIter.key_len)
}

// cVal returns the current value.
// The returned []byte points to a buffer allocated on the C heap. This buffer
// is valid until the next call to Advance or Cancel.
func (it *stream) cVal() []byte {
	return goBytes(it.cIter.val, it.cIter.val_len)
}
