// Copyright 2015 The Vanadium Authors. All rights reserved.
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.

// Package chunkmap implements a map from chunk checksums to chunk locations
// and vice versa, using a store.Store (currently, one implemented with
// leveldb).
package chunkmap

import "encoding/binary"
import "sync"

import "v.io/syncbase/x/ref/services/syncbase/store"
import "v.io/syncbase/x/ref/services/syncbase/store/leveldb"
import "v.io/v23/context"
import "v.io/v23/verror"

const pkgPath = "v.io/syncbase/x/ref/services/syncbase/localblobstore/chunkmap"

var (
	errBadBlobIDLen        = verror.Register(pkgPath+".errBadBlobIDLen", verror.NoRetry, "{1:}{2:} chunkmap {3}: bad blob length {4} should be {5}{:_}")
	errBadChunkHashLen     = verror.Register(pkgPath+".errBadChunkHashLen", verror.NoRetry, "{1:}{2:} chunkmap {3}: bad chunk hash length {4} should be {5}{:_}")
	errNoSuchBlob          = verror.Register(pkgPath+".errNoSuchBlob", verror.NoRetry, "{1:}{2:} chunkmap {3}: no such blob{:_}")
	errMalformedChunkEntry = verror.Register(pkgPath+".errMalformedChunkEntry", verror.NoRetry, "{1:}{2:} chunkmap {3}: malfored chunk entry{:_}")
	errNoSuchChunk         = verror.Register(pkgPath+".errNoSuchChunk", verror.NoRetry, "{1:}{2:} chunkmap {3}: no such chunk{:_}")
	errMalformedBlobEntry  = verror.Register(pkgPath+".errMalformedBlobEntry", verror.NoRetry, "{1:}{2:} chunkmap {3}: malfored blob entry{:_}")
)

// There are two tables: chunk-to-location, and blob-to-chunk.
// Each chunk is represented by one entry in each table.
// On deletion, the latter is used to find the former, so the latter is added
// first, and deleted last.
//
// chunk-to-location:
//    Key:    1-byte containing chunkPrefix, 16-byte chunk hash, 16-byte blob ID
//    Value:  Varint offset, Varint length.
// The chunk with the specified 16-byte hash had the specified length, and is
// (or was) found at the specified offset in the blob.
//
// blob-to-chunk:
//    Key:    1-byte containing blobPrefix, 16-byte blob ID, 8-byte bigendian offset
//    Value:  16-byte chunk hash, Varint length.
//
// The varint encoded fields are written/read with
// encoding/binary.{Put,Read}Varint.  The blob-to-chunk keys encode the offset
// as raw big-endian (encoding/binary.{Put,}Uint64) so that it will sort in
// increasing offset order.

const chunkHashLen = 16 // length of chunk hash
const blobIDLen = 16    // length of blob ID
const offsetLen = 8     // length of offset in blob-to-chunk key

const maxKeyLen = 64 // conservative maximum key length
const maxValLen = 64 // conservative maximum value length

var chunkPrefix []byte = []byte{0} // key prefix for chunk-to-location
var blobPrefix []byte = []byte{1}  // key prefix for blob-to-chunk

// offsetLimit is an offset that's greater than, and one byte longer than, any
// real offset.
var offsetLimit []byte = []byte{
	0xff, 0xff, 0xff, 0xff,
	0xff, 0xff, 0xff, 0xff,
	0xff,
}

// blobLimit is a blobID that's greater than, and one byte longer than, any
// real blob ID
var blobLimit []byte = []byte{
	0xff, 0xff, 0xff, 0xff,
	0xff, 0xff, 0xff, 0xff,
	0xff, 0xff, 0xff, 0xff,
	0xff, 0xff, 0xff, 0xff,
	0xff,
}

// A Location describes chunk's location within a blob.
type Location struct {
	BlobID []byte // ID of blob
	Offset int64  // byte offset of chunk within blob
	Size   int64  // size of chunk
}

// A ChunkMap maps chunk checksums to Locations, and vice versa.
type ChunkMap struct {
	dir string      // the directory where the store is held
	st  store.Store // private store that holds the mapping.
}

// New() returns a pointer to a ChunkMap, backed by storage in directory dir.
func New(ctx *context.T, dir string) (cm *ChunkMap, err error) {
	cm = new(ChunkMap)
	cm.dir = dir
	cm.st, err = leveldb.Open(dir, leveldb.OpenOptions{CreateIfMissing: true, ErrorIfExists: false})
	return cm, err
}

// Close() closes any files or other resources associated with *cm.
// No other methods on cm may be called after Close().
func (cm *ChunkMap) Close() error {
	return cm.st.Close()
}

// AssociateChunkWithLocation() remembers that the specified chunk hash is
// associated with the specified Location.
func (cm *ChunkMap) AssociateChunkWithLocation(ctx *context.T, chunk []byte, loc Location) (err error) {
	// Check of expected lengths explicitly in routines that modify the database.
	if len(loc.BlobID) != blobIDLen {
		err = verror.New(errBadBlobIDLen, ctx, cm.dir, len(loc.BlobID), blobIDLen)
	} else if len(chunk) != chunkHashLen {
		err = verror.New(errBadChunkHashLen, ctx, cm.dir, len(chunk), chunkHashLen)
	} else {
		var key [maxKeyLen]byte
		var val [maxValLen]byte

		// Put the blob-to-chunk entry first, since it's used
		// to garbage collect the other.
		keyLen := copy(key[:], blobPrefix)
		keyLen += copy(key[keyLen:], loc.BlobID)
		binary.BigEndian.PutUint64(key[keyLen:], uint64(loc.Offset))
		keyLen += offsetLen

		valLen := copy(val[:], chunk)
		valLen += binary.PutVarint(val[valLen:], loc.Size)
		err = cm.st.Put(key[:keyLen], val[:valLen])

		if err == nil {
			keyLen = copy(key[:], chunkPrefix)
			keyLen += copy(key[keyLen:], chunk)
			keyLen += copy(key[keyLen:], loc.BlobID)

			valLen = binary.PutVarint(val[:], loc.Offset)
			valLen += binary.PutVarint(val[valLen:], loc.Size)

			err = cm.st.Put(key[:keyLen], val[:valLen])
		}
	}

	return err
}

// DeleteBlob() deletes any of the chunk associations previously added with
// AssociateChunkWithLocation(..., chunk, ...).
func (cm *ChunkMap) DeleteBlob(ctx *context.T, blob []byte) (err error) {
	// Check of expected lengths explicitly in routines that modify the database.
	if len(blob) != blobIDLen {
		err = verror.New(errBadBlobIDLen, ctx, cm.dir, len(blob), blobIDLen)
	} else {
		var start [maxKeyLen]byte
		var limit [maxKeyLen]byte

		startLen := copy(start[:], blobPrefix)
		startLen += copy(start[startLen:], blob)

		limitLen := copy(limit[:], start[:startLen])
		limitLen += copy(limit[limitLen:], offsetLimit)

		var keyBuf [maxKeyLen]byte    // buffer for keys returned by stream
		var valBuf [maxValLen]byte    // buffer for values returned by stream
		var deleteKey [maxKeyLen]byte // buffer to construct chunk-to-location keys to delete

		deletePrefixLen := copy(deleteKey[:], chunkPrefix)

		seenAValue := false

		s := cm.st.Scan(start[:startLen], limit[:limitLen])
		for s.Advance() && err == nil {
			seenAValue = true

			key := s.Key(keyBuf[:])
			value := s.Value(valBuf[:])

			if len(value) >= chunkHashLen {
				deleteKeyLen := deletePrefixLen
				deleteKeyLen += copy(deleteKey[deleteKeyLen:], value[:chunkHashLen])
				deleteKeyLen += copy(deleteKey[deleteKeyLen:], blob)
				err = cm.st.Delete(deleteKey[:deleteKeyLen])
			}

			if err == nil {
				// Delete the blob-to-chunk entry last, as it's
				// used to find the chunk-to-location entry.
				err = cm.st.Delete(key)
			}
		}

		if err != nil {
			s.Cancel()
		} else {
			err = s.Err()
			if err == nil && !seenAValue {
				err = verror.New(errNoSuchBlob, ctx, cm.dir, blob)
			}
		}
	}

	return err
}

// LookupChunk() returns a Location for the specified chunk.  Only one Location
// is returned, even if several are available in the database.  If the client
// finds that the Location is not available, perhaps because its blob has
// been deleted, the client should remove the blob from the ChunkMap using
// DeleteBlob(loc.Blob), and try again.  (The client may also wish to
// arrange at some point to call GC() on the blob store.)
func (cm *ChunkMap) LookupChunk(ctx *context.T, chunkHash []byte) (loc Location, err error) {
	var start [maxKeyLen]byte
	var limit [maxKeyLen]byte

	startLen := copy(start[:], chunkPrefix)
	startLen += copy(start[startLen:], chunkHash)

	limitLen := copy(limit[:], start[:startLen])
	limitLen += copy(limit[limitLen:], blobLimit)

	var keyBuf [maxKeyLen]byte // buffer for keys returned by stream
	var valBuf [maxValLen]byte // buffer for values returned by stream

	s := cm.st.Scan(start[:startLen], limit[:limitLen])
	if s.Advance() {
		var n int
		key := s.Key(keyBuf[:])
		value := s.Value(valBuf[:])
		loc.BlobID = key[len(chunkPrefix)+chunkHashLen:]
		loc.Offset, n = binary.Varint(value)
		if n > 0 {
			loc.Size, n = binary.Varint(value[n:])
		}
		if n <= 0 {
			err = verror.New(errMalformedChunkEntry, ctx, cm.dir, chunkHash, key, value)
		}
		s.Cancel()
	} else {
		if err == nil {
			err = s.Err()
		}
		if err == nil {
			err = verror.New(errNoSuchChunk, ctx, cm.dir, chunkHash)
		}
	}

	return loc, err
}

// A ChunkStream allows the client to iterate over the chunks in a blob:
//	cs := cm.NewChunkStream(ctx, blob)
//	for cs.Advance() {
//		chunkHash := cs.Value()
//		...process chunkHash...
//	}
//	if cs.Err() != nil {
//		...there was an error...
//	}
type ChunkStream struct {
	cm     *ChunkMap
	ctx    *context.T
	stream store.Stream

	keyBuf [maxKeyLen]byte // buffer for keys
	valBuf [maxValLen]byte // buffer for values
	key    []byte          // key for current element
	value  []byte          // value of current element
	loc    Location        // location of current element
	err    error           // error encountered.
	more   bool            // whether stream may be consulted again
}

// NewChunkStream() returns a pointer to a new ChunkStream that allows the client
// to enumerate the chunk hashes in a blob, in order.
func (cm *ChunkMap) NewChunkStream(ctx *context.T, blob []byte) *ChunkStream {
	var start [maxKeyLen]byte
	var limit [maxKeyLen]byte

	startLen := copy(start[:], blobPrefix)
	startLen += copy(start[startLen:], blob)

	limitLen := copy(limit[:], start[:startLen])
	limitLen += copy(limit[limitLen:], offsetLimit)

	cs := new(ChunkStream)
	cs.cm = cm
	cs.ctx = ctx
	cs.stream = cm.st.Scan(start[:startLen], limit[:limitLen])
	cs.more = true

	return cs
}

// Advance() stages an element so the client can retrieve the chunk hash with
// Value(), or its Location with Location().  Advance() returns true iff there
// is an element to retrieve.  The client must call Advance() before calling
// Value() or Location() The client must call Cancel if it does not iterate
// through all elements (i.e. until Advance() returns false).  Advance() may
// block if an element is not immediately available.
func (cs *ChunkStream) Advance() (ok bool) {
	if cs.more && cs.err == nil {
		if !cs.stream.Advance() {
			cs.err = cs.stream.Err()
			cs.more = false // no more stream, even if no error
		} else {
			cs.key = cs.stream.Key(cs.keyBuf[:])
			cs.value = cs.stream.Value(cs.valBuf[:])
			ok = (len(cs.value) >= chunkHashLen) &&
				(len(cs.key) == len(blobPrefix)+blobIDLen+offsetLen)
			if ok {
				var n int
				cs.loc.BlobID = make([]byte, blobIDLen)
				copy(cs.loc.BlobID, cs.key[len(blobPrefix):len(blobPrefix)+blobIDLen])
				cs.loc.Offset = int64(binary.BigEndian.Uint64(cs.key[len(blobPrefix)+blobIDLen:]))
				cs.loc.Size, n = binary.Varint(cs.value[chunkHashLen:])
				ok = (n > 0)
			}
			if !ok {
				cs.err = verror.New(errMalformedBlobEntry, cs.ctx, cs.cm.dir, cs.key, cs.value)
				cs.stream.Cancel()
			}
		}
	}
	return ok
}

// Value() returns the content hash of the chunk staged by
// Advance().  The returned slice may be a sub-slice of buf if buf is large
// enough to hold the entire value.  Otherwise, a newly allocated slice will be
// returned.  It is valid to pass a nil buf.  Value() may panic if Advance()
// returned false or was not called at all.  Value() does not block.
func (cs *ChunkStream) Value(buf []byte) (result []byte) {
	if len(buf) < chunkHashLen {
		buf = make([]byte, chunkHashLen)
	}
	copy(buf, cs.value[:chunkHashLen])
	return buf[:chunkHashLen]
}

// Location() returns the Location associated with the chunk staged by
// Advance().  Location() may panic if Advance() returned false or was not
// called at all.  Location() does not block.
func (cs *ChunkStream) Location() Location {
	return cs.loc
}

// Err() returns a non-nil error iff the stream encountered any errors.  Err()
// does not block.
func (cs *ChunkStream) Err() error {
	return cs.err
}

// Cancel() notifies the stream provider that it can stop producing elements.
// The client must call Cancel() if it does not iterate through all elements
// (i.e. until Advance() returns false).  Cancel() is idempotent and can be
// called concurrently with a goroutine that is iterating via Advance() and
// Value().  Cancel() causes Advance() to subsequently return false.
// Cancel() does not block.
func (cs *ChunkStream) Cancel() {
	cs.stream.Cancel()
}

// A BlobStream allows the client to iterate over the blobs in ChunkMap:
//	bs := cm.NewBlobStream(ctx)
//	for bs.Advance() {
//		blobID := bs.Value()
//		...process blobID...
//	}
//	if bs.Err() != nil {
//		...there was an error...
//	}
type BlobStream struct {
	cm  *ChunkMap
	ctx *context.T

	key    []byte          // key for current element
	keyBuf [maxKeyLen]byte // buffer for keys
	err    error           // error encountered.
	mu     sync.Mutex      // protects "more", which may be written in Cancel()
	more   bool            // whether stream may be consulted again
}

// keyLimit is the key for limit in store.Scan() calls within a BlobStream.
var keyLimit []byte

func init() {
	// The limit key is the maximum length key, all ones after the blobPrefix.
	keyLimit = make([]byte, maxKeyLen)
	for i := copy(keyLimit, blobPrefix); i != len(keyLimit); i++ {
		keyLimit[i] = 0xff
	}
}

// NewBlobStream() returns a pointer to a new BlobStream that allows the client
// to enumerate the blobs ChunkMap, in lexicographic order.
func (cm *ChunkMap) NewBlobStream(ctx *context.T) *BlobStream {
	bs := new(BlobStream)
	bs.cm = cm
	bs.ctx = ctx
	bs.more = true
	return bs
}

// Advance() stages an element so the client can retrieve the next blob ID with
// Value().  Advance() returns true iff there is an element to retrieve.  The
// client must call Advance() before calling Value().  The client must call
// Cancel if it does not iterate through all elements (i.e. until Advance()
// returns false).  Advance() may block if an element is not immediately
// available.
func (bs *BlobStream) Advance() (ok bool) {
	bs.mu.Lock()
	ok = bs.more
	bs.mu.Unlock()
	if ok {
		prefixAndKeyLen := len(blobPrefix) + blobIDLen
		// Compute the next key to search for.
		if len(bs.key) == 0 { // First time through: anything starting with blobPrefix.
			n := copy(bs.keyBuf[:], blobPrefix)
			bs.key = bs.keyBuf[:n]
		} else {
			// Increment the blobID to form the next possible key.
			i := prefixAndKeyLen - 1
			for ; i != len(blobPrefix)-1 && bs.keyBuf[i] == 0xff; i-- {
				bs.keyBuf[i] = 0
			}
			if i == len(blobPrefix)-1 { // End of database
				ok = false
			} else {
				bs.keyBuf[i]++
			}
			bs.key = bs.keyBuf[:prefixAndKeyLen]
		}
		if ok {
			stream := bs.cm.st.Scan(bs.key, keyLimit)
			if !stream.Advance() {
				bs.err = stream.Err()
				ok = false // no more stream, even if no error
			} else {
				bs.key = stream.Key(bs.keyBuf[:])
				if len(bs.key) < prefixAndKeyLen {
					bs.err = verror.New(errMalformedBlobEntry, bs.ctx, bs.cm.dir, bs.key, stream.Value(nil))
					ok = false
				}
				stream.Cancel() // We get at most one element from each stream.
			}
		}
		if !ok {
			bs.mu.Lock()
			bs.more = false
			bs.mu.Unlock()
		}
	}
	return ok
}

// Value() returns the blob ID staged by Advance().  The returned slice may be
// a sub-slice of buf if buf is large enough to hold the entire value.
// Otherwise, a newly allocated slice will be returned.  It is valid to pass a
// nil buf.  Value() may panic if Advance() returned false or was not called at
// all.  Value() does not block.
func (bs *BlobStream) Value(buf []byte) (result []byte) {
	if len(buf) < blobIDLen {
		buf = make([]byte, blobIDLen)
	}
	copy(buf, bs.key[len(blobPrefix):len(blobPrefix)+blobIDLen])
	return buf[:blobIDLen]
}

// Err() returns a non-nil error iff the stream encountered any errors.  Err()
// does not block.
func (bs *BlobStream) Err() error {
	return bs.err
}

// Cancel() notifies the stream provider that it can stop producing elements.
// The client must call Cancel() if it does not iterate through all elements
// (i.e. until Advance() returns false).  Cancel() is idempotent and can be
// called concurrently with a goroutine that is iterating via Advance() and
// Value().  Cancel() causes Advance() to subsequently return false.
// Cancel() does not block.
func (bs *BlobStream) Cancel() {
	bs.mu.Lock()
	bs.more = false
	bs.mu.Unlock()
}
