blob: 6e92191437cec04fdeed909aa6fda2fb53952325 [file] [log] [blame]
package gkvlite
import (
"encoding/json"
"errors"
"fmt"
"math/rand"
"sync"
"sync/atomic"
"unsafe"
)
// User-supplied key comparison func should return 0 if a == b,
// -1 if a < b, and +1 if a > b. For example: bytes.Compare()
type KeyCompare func(a, b []byte) int
// A persistable collection of ordered key-values (Item's).
type Collection struct {
name string // May be "" for a private collection.
store *Store
compare KeyCompare
rootLock *sync.Mutex
root *rootNodeLoc // Protected by rootLock.
allocStats AllocStats // User must serialize access (e.g., see locks in alloc.go).
AppData unsafe.Pointer // For app-specific data; atomic CAS recommended.
}
type rootNodeLoc struct {
// The rootNodeLoc fields are protected by Collection.rootLock.
refs int64 // Reference counter.
root *nodeLoc
next *rootNodeLoc // For free-list tracking.
reclaimMark node // Address is used as a sentinel.
// We might own a reference count on another Collection/rootNodeLoc.
// When our reference drops to 0 and we're free'd, then also release
// our reference count on the next guy in the chain.
chainedCollection *Collection
chainedRootNodeLoc *rootNodeLoc
// More nodes to maybe reclaim when our reference count goes to 0.
// But they might be repeated, so we scan for them during reclaimation.
reclaimLater [3]*node
}
func (t *Collection) Name() string {
return t.name
}
func (t *Collection) closeCollection() { // Just "close" is a keyword.
if t == nil {
return
}
t.rootLock.Lock()
r := t.root
t.root = nil
t.rootLock.Unlock()
t.reclaimMarkUpdate(r.root, nil, &r.reclaimMark)
if r != nil {
t.rootDecRef(r)
}
}
// Retrieve an item by its key. Use withValue of false if you don't
// need the item's value (Item.Val may be nil), which might be able
// to save on I/O and memory resources, especially for large values.
// The returned Item should be treated as immutable.
func (t *Collection) GetItem(key []byte, withValue bool) (i *Item, err error) {
rnl := t.rootAddRef()
defer t.rootDecRef(rnl)
n := rnl.root
for {
nNode, err := n.read(t.store)
if err != nil || n.isEmpty() || nNode == nil {
return nil, err
}
i := &nNode.item
iItem, err := i.read(t, false)
if err != nil {
return nil, err
}
if iItem == nil || iItem.Key == nil {
return nil, errors.New("missing item after item.read() in GetItem()")
}
c := t.compare(key, iItem.Key)
if c < 0 {
n = &nNode.left
} else if c > 0 {
n = &nNode.right
} else {
if withValue {
iItem, err = i.read(t, withValue)
if err != nil {
return nil, err
}
}
t.store.ItemAddRef(t, iItem)
return iItem, nil
}
}
}
// Retrieve a value by its key. Returns nil if the item is not in the
// collection. The returned value should be treated as immutable.
func (t *Collection) Get(key []byte) (val []byte, err error) {
i, err := t.GetItem(key, true)
if err != nil {
return nil, err
}
if i != nil {
return i.Val, nil
}
return nil, nil
}
// Replace or insert an item of a given key.
// A random item Priority (e.g., rand.Int31()) will usually work well,
// but advanced users may consider using non-random item priorities
// at the risk of unbalancing the lookup tree. The input Item instance
// should be considered immutable and owned by the Collection.
func (t *Collection) SetItem(item *Item) (err error) {
if t.store.readOnly {
return errors.New("store is read only")
}
if item.Key == nil || len(item.Key) > 0xffff || len(item.Key) == 0 ||
item.Val == nil {
return errors.New("Item.Key/Val missing or too long")
}
if item.Priority < 0 {
return errors.New("Item.Priority must be non-negative")
}
rnl := t.rootAddRef()
defer t.rootDecRef(rnl)
root := rnl.root
n := t.mkNode(nil, nil, nil, 1, uint64(len(item.Key))+uint64(item.NumValBytes(t)))
t.store.ItemAddRef(t, item)
n.item.item = unsafe.Pointer(item) // Avoid garbage via separate init.
nloc := t.mkNodeLoc(n)
defer t.freeNodeLoc(nloc)
r, err := t.store.union(t, root, nloc, &rnl.reclaimMark)
if err != nil {
return err
}
rnlNew := t.mkRootNodeLoc(r)
// Can't reclaim n right now because r might point to n.
rnlNew.reclaimLater[0] = t.reclaimMarkUpdate(nloc,
&rnl.reclaimMark, &rnlNew.reclaimMark)
if !t.rootCAS(rnl, rnlNew) {
return errors.New("concurrent mutation attempted")
}
t.rootDecRef(rnl)
return nil
}
// Replace or insert an item of a given key.
func (t *Collection) Set(key []byte, val []byte) error {
return t.SetItem(&Item{Key: key, Val: val, Priority: rand.Int31()})
}
// Deletes an item of a given key.
func (t *Collection) Delete(key []byte) (wasDeleted bool, err error) {
if t.store.readOnly {
return false, errors.New("store is read only")
}
rnl := t.rootAddRef()
defer t.rootDecRef(rnl)
root := rnl.root
i, err := t.GetItem(key, false)
if err != nil || i == nil {
return false, err
}
t.store.ItemDecRef(t, i)
left, middle, right, err := t.store.split(t, root, key, &rnl.reclaimMark)
if err != nil {
return false, err
}
defer t.freeNodeLoc(left)
defer t.freeNodeLoc(right)
defer t.freeNodeLoc(middle)
if middle.isEmpty() {
return false, fmt.Errorf("concurrent delete, key: %v", key)
}
r, err := t.store.join(t, left, right, &rnl.reclaimMark)
if err != nil {
return false, err
}
rnlNew := t.mkRootNodeLoc(r)
// Can't reclaim immediately due to readers.
rnlNew.reclaimLater[0] = t.reclaimMarkUpdate(left,
&rnl.reclaimMark, &rnlNew.reclaimMark)
rnlNew.reclaimLater[1] = t.reclaimMarkUpdate(right,
&rnl.reclaimMark, &rnlNew.reclaimMark)
rnlNew.reclaimLater[2] = t.reclaimMarkUpdate(middle,
&rnl.reclaimMark, &rnlNew.reclaimMark)
t.markReclaimable(rnlNew.reclaimLater[2], &rnlNew.reclaimMark)
if !t.rootCAS(rnl, rnlNew) {
return false, errors.New("concurrent mutation attempted")
}
t.rootDecRef(rnl)
return true, nil
}
// Retrieves the item with the "smallest" key.
// The returned item should be treated as immutable.
func (t *Collection) MinItem(withValue bool) (*Item, error) {
return t.store.walk(t, withValue,
func(n *node) (*nodeLoc, bool) { return &n.left, true })
}
// Retrieves the item with the "largest" key.
// The returned item should be treated as immutable.
func (t *Collection) MaxItem(withValue bool) (*Item, error) {
return t.store.walk(t, withValue,
func(n *node) (*nodeLoc, bool) { return &n.right, true })
}
// Evict some clean items found by randomly walking a tree branch.
// For concurrent users, only the single mutator thread should call
// EvictSomeItems(), making it serialized with mutations.
func (t *Collection) EvictSomeItems() (numEvicted uint64) {
if t.store.readOnly {
return 0
}
i, err := t.store.walk(t, false, func(n *node) (*nodeLoc, bool) {
if !n.item.Loc().isEmpty() {
i := n.item.Item()
if i != nil && atomic.CompareAndSwapPointer(&n.item.item,
unsafe.Pointer(i), unsafe.Pointer(nil)) {
t.store.ItemDecRef(t, i)
numEvicted++
}
}
next := &n.left
if (rand.Int() & 0x01) == 0x01 {
next = &n.right
}
if next.isEmpty() {
return nil, false
}
return next, true
})
if i != nil && err != nil {
t.store.ItemDecRef(t, i)
}
return numEvicted
}
type ItemVisitor func(i *Item) bool
type ItemVisitorEx func(i *Item, depth uint64) bool
// Visit items greater-than-or-equal to the target key in ascending order.
func (t *Collection) VisitItemsAscend(target []byte, withValue bool, v ItemVisitor) error {
return t.VisitItemsAscendEx(target, withValue,
func(i *Item, depth uint64) bool { return v(i) })
}
// Visit items less-than the target key in descending order.
func (t *Collection) VisitItemsDescend(target []byte, withValue bool, v ItemVisitor) error {
return t.VisitItemsDescendEx(target, withValue,
func(i *Item, depth uint64) bool { return v(i) })
}
// Visit items greater-than-or-equal to the target key in ascending order; with depth info.
func (t *Collection) VisitItemsAscendEx(target []byte, withValue bool,
visitor ItemVisitorEx) error {
rnl := t.rootAddRef()
defer t.rootDecRef(rnl)
var prevVisitItem *Item
var errCheckedVisitor error
checkedVisitor := func(i *Item, depth uint64) bool {
if prevVisitItem != nil && t.compare(prevVisitItem.Key, i.Key) > 0 {
errCheckedVisitor = fmt.Errorf("corrupted / out-of-order index"+
", key: %s vs %s, coll: %p, collName: %s, store: %p, storeFile: %v",
string(prevVisitItem.Key), string(i.Key), t, t.name, t.store, t.store.file)
return false
}
prevVisitItem = i
return visitor(i, depth)
}
_, err := t.store.visitNodes(t, rnl.root,
target, withValue, checkedVisitor, 0, ascendChoice)
if errCheckedVisitor != nil {
return errCheckedVisitor
}
return err
}
// Visit items less-than the target key in descending order; with depth info.
func (t *Collection) VisitItemsDescendEx(target []byte, withValue bool,
visitor ItemVisitorEx) error {
rnl := t.rootAddRef()
defer t.rootDecRef(rnl)
_, err := t.store.visitNodes(t, rnl.root,
target, withValue, visitor, 0, descendChoice)
return err
}
func ascendChoice(cmp int, n *node) (bool, *nodeLoc, *nodeLoc) {
return cmp <= 0, &n.left, &n.right
}
func descendChoice(cmp int, n *node) (bool, *nodeLoc, *nodeLoc) {
return cmp > 0, &n.right, &n.left
}
// Returns total number of items and total key bytes plus value bytes.
func (t *Collection) GetTotals() (numItems uint64, numBytes uint64, err error) {
rnl := t.rootAddRef()
defer t.rootDecRef(rnl)
n := rnl.root
nNode, err := n.read(t.store)
if err != nil || n.isEmpty() || nNode == nil {
return 0, 0, err
}
return nNode.numNodes, nNode.numBytes, nil
}
// Returns JSON representation of root node file location.
func (t *Collection) MarshalJSON() ([]byte, error) {
rnl := t.rootAddRef()
defer t.rootDecRef(rnl)
return rnl.MarshalJSON()
}
// Returns JSON representation of root node file location.
func (rnl *rootNodeLoc) MarshalJSON() ([]byte, error) {
loc := rnl.root.Loc()
if loc.isEmpty() {
return json.Marshal(ploc_empty)
}
return json.Marshal(loc)
}
// Unmarshals JSON representation of root node file location.
func (t *Collection) UnmarshalJSON(d []byte) error {
p := ploc{}
if err := json.Unmarshal(d, &p); err != nil {
return err
}
if t.rootLock == nil {
t.rootLock = &sync.Mutex{}
}
nloc := t.mkNodeLoc(nil)
nloc.loc = unsafe.Pointer(&p)
if !t.rootCAS(nil, t.mkRootNodeLoc(nloc)) {
return errors.New("concurrent mutation during UnmarshalJSON().")
}
return nil
}
func (t *Collection) AllocStats() (res AllocStats) {
withAllocLocks(func() { res = t.allocStats })
return res
}
// Writes dirty items of a collection BUT (WARNING) does NOT write new
// root records. Use Store.Flush() to write root records, which would
// make these writes visible to the next file re-opening/re-loading.
func (t *Collection) Write() error {
if t.store.readOnly {
return errors.New("store is read only")
}
rnl := t.rootAddRef()
defer t.rootDecRef(rnl)
return t.write(rnl.root)
}
func (t *Collection) write(nloc *nodeLoc) error {
if err := t.writeItems(nloc); err != nil {
return err
}
if err := t.writeNodes(nloc); err != nil {
return err
}
return nil
}
func (t *Collection) writeItems(nloc *nodeLoc) (err error) {
if nloc == nil || !nloc.Loc().isEmpty() {
return nil // Write only unpersisted items of non-empty, unpersisted nodes.
}
node := nloc.Node()
if node == nil {
return nil
}
if err = t.writeItems(&node.left); err != nil {
return err
}
if err = node.item.write(t); err != nil { // Write items in key order.
return err
}
return t.writeItems(&node.right)
}
func (t *Collection) writeNodes(nloc *nodeLoc) (err error) {
if nloc == nil || !nloc.Loc().isEmpty() {
return nil // Write only non-empty, unpersisted nodes.
}
node := nloc.Node()
if node == nil {
return nil
}
if err = t.writeNodes(&node.left); err != nil {
return err
}
if err = t.writeNodes(&node.right); err != nil {
return err
}
return nloc.write(t.store) // Write nodes in children-first order.
}
func (t *Collection) rootCAS(prev, next *rootNodeLoc) bool {
t.rootLock.Lock()
defer t.rootLock.Unlock()
if t.root != prev {
return false // TODO: Callers need to release resources.
}
t.root = next
if prev != nil && prev.refs > 2 {
// Since the prev is in-use, hook up its chain to disallow
// next's nodes from being reclaimed until prev is done.
if prev.chainedCollection != nil ||
prev.chainedRootNodeLoc != nil {
panic(fmt.Sprintf("chain already taken, coll: %v", t.Name()))
}
prev.chainedCollection = t
prev.chainedRootNodeLoc = t.root
t.root.refs++ // This ref is owned by prev.
}
return true
}
func (t *Collection) rootAddRef() *rootNodeLoc {
t.rootLock.Lock()
defer t.rootLock.Unlock()
t.root.refs++
return t.root
}
func (t *Collection) rootDecRef(r *rootNodeLoc) {
t.rootLock.Lock()
freeNodeLock.Lock()
t.rootDecRef_unlocked(r)
freeNodeLock.Unlock()
t.rootLock.Unlock()
}
func (t *Collection) rootDecRef_unlocked(r *rootNodeLoc) {
r.refs--
if r.refs > 0 {
return
}
if r.chainedCollection != nil && r.chainedRootNodeLoc != nil {
r.chainedCollection.rootDecRef_unlocked(r.chainedRootNodeLoc)
}
t.reclaimNodes_unlocked(r.root.Node(), &r.reclaimLater, &r.reclaimMark)
for i := 0; i < len(r.reclaimLater); i++ {
if r.reclaimLater[i] != nil {
t.reclaimNodes_unlocked(r.reclaimLater[i], nil, &r.reclaimMark)
r.reclaimLater[i] = nil
}
}
t.freeNodeLoc(r.root)
t.freeRootNodeLoc(r)
}