syncbase: watchable store layer

Beginnings of lightweight "watchable store" layer, enabling
client-side watch impl as well as sync module watching of
store commits.

Change-Id: If4de271f9ea0593ec8fb840f93fa22c6206f10b5
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index fcc2817..94664fb 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -10,6 +10,7 @@
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/store"
 	"v.io/syncbase/x/ref/services/syncbase/store/memstore"
+	"v.io/syncbase/x/ref/services/syncbase/store/watchable"
 	"v.io/v23/context"
 	"v.io/v23/naming"
 	"v.io/v23/rpc"
@@ -39,10 +40,14 @@
 		return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
 	}
 	// TODO(sadovsky): Make storage engine pluggable.
+	st, err := watchable.Wrap(memstore.New())
+	if err != nil {
+		return nil, err
+	}
 	d := &database{
 		name: name,
 		a:    a,
-		st:   memstore.New(),
+		st:   st,
 	}
 	data := &databaseData{
 		Name:  d.name,
diff --git a/services/syncbase/store/watchable/store.go b/services/syncbase/store/watchable/store.go
new file mode 100644
index 0000000..e4fd0ee
--- /dev/null
+++ b/services/syncbase/store/watchable/store.go
@@ -0,0 +1,197 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package watchable provides a store.Store that maintains a commit log. In
+// Syncbase, this log forms the basis for the implementation of client-facing
+// watch as well as the sync module's watching of store commits.
+//
+// Log entries are keyed in reverse chronological order. More specifically, the
+// LogEntry key format is "$log:<seq>:<txSeq>", where <seq> is (MaxUint64-seq)
+// and <txSeq> is (MaxUint16-txSeq). All numbers are zero-padded to ensure that
+// the lexicographic order matches the numeric order. Thus, clients implementing
+// ResumeMarkers (i.e. implementing the watch API) should use
+// fmt.Sprintf("%020d", MaxUint64-marker) to convert external markers to
+// internal LogEntry key prefixes.
+package watchable
+
+// TODO(sadovsky): Write unit tests. (As of 2015-05-26 we're still iterating on
+// the design for how to expose a "watch" API from the storage engine, and we
+// don't want to write lots of tests prematurely.)
+// TODO(sadovsky): Expose helper functions for constructing LogEntry keys.
+// TODO(sadovsky): Allow clients to subscribe via Go channel.
+
+import (
+	"fmt"
+	"strconv"
+	"strings"
+	"sync"
+
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/verror"
+	"v.io/v23/vom"
+)
+
+const (
+	LogPrefix        = "$log"
+	MaxUint16 uint64 = 1<<16 - 1 // 5 digits
+	MaxUint64 uint64 = 1<<64 - 1 // 20 digits
+)
+
+// Store is a store.Store that maintains a commit log.
+type Store store.Store
+
+// Wrap returns a watchable.Store that wraps the given store.Store.
+func Wrap(st store.Store) (Store, error) {
+	it := st.Scan([]byte(LogPrefix), []byte(""))
+	var seq uint64 = 0
+	for it.Advance() {
+		key := string(it.Key(nil))
+		parts := split(key)
+		if len(parts) != 3 {
+			panic("wrong number of parts: " + key)
+		}
+		invSeq, err := strconv.ParseUint(parts[1], 10, 64)
+		if err != nil {
+			panic("failed to parse invSeq: " + key)
+		}
+		seq = MaxUint64 - invSeq
+		it.Cancel()
+	}
+	if err := it.Err(); err != nil && verror.ErrorID(err) != verror.ErrCanceled.ID {
+		return nil, err
+	}
+	return &wstore{Store: st, seq: seq}, nil
+}
+
+type wstore struct {
+	store.Store
+	mu  sync.Mutex // held during transaction commits; protects seq
+	seq uint64     // sequence number, for commits
+}
+
+type transaction struct {
+	store.Transaction
+	st  *wstore
+	mu  sync.Mutex // protects the fields below
+	ops []Op
+}
+
+var (
+	_ Store             = (*wstore)(nil)
+	_ store.Transaction = (*transaction)(nil)
+)
+
+// TODO(sadovsky): Decide whether to copy []bytes vs. requiring clients not to
+// modify passed-in []bytes.
+
+func (st *wstore) Put(key, value []byte) error {
+	// Use watchable.Store transaction so this op gets logged.
+	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+		return st.Put(key, value)
+	})
+}
+
+func (st *wstore) Delete(key []byte) error {
+	// Use watchable.Store transaction so this op gets logged.
+	return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+		return st.Delete(key)
+	})
+}
+
+func (st *wstore) NewTransaction() store.Transaction {
+	return &transaction{Transaction: st.Store.NewTransaction(), st: st}
+}
+
+func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	valbuf, err := tx.Transaction.Get(key, valbuf)
+	if err == nil || verror.ErrorID(err) == store.ErrUnknownKey.ID {
+		tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
+	}
+	return valbuf, err
+}
+
+func (tx *transaction) Scan(start, limit []byte) store.Stream {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	it := tx.Transaction.Scan(start, limit)
+	if it.Err() == nil {
+		tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
+	}
+	return it
+}
+
+func (tx *transaction) Put(key, value []byte) error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	err := tx.Transaction.Put(key, value)
+	if err == nil {
+		tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+	}
+	return err
+}
+
+func (tx *transaction) Delete(key []byte) error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	err := tx.Transaction.Delete(key)
+	if err == nil {
+		tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+	}
+	return err
+}
+
+func (tx *transaction) Commit() error {
+	tx.mu.Lock()
+	defer tx.mu.Unlock()
+	tx.st.mu.Lock()
+	defer tx.st.mu.Unlock()
+	// Check sequence numbers.
+	if uint64(len(tx.ops)) > MaxUint16 {
+		return verror.New(verror.ErrInternal, nil, "too many ops")
+	}
+	if tx.st.seq == MaxUint64 {
+		return verror.New(verror.ErrInternal, nil, "seq maxed out")
+	}
+	// Write LogEntry records.
+	// TODO(sadovsky): Use a more efficient lexicographic number encoding.
+	keyPrefix := join(LogPrefix, fmt.Sprintf("%020d", MaxUint64-tx.st.seq))
+	for txSeq, op := range tx.ops {
+		key := join(keyPrefix, fmt.Sprintf("%05d", MaxUint16-uint64(txSeq)))
+		value := &LogEntry{
+			Op: op,
+			// TODO(sadovsky): This information is also captured in LogEntry keys.
+			// Optimize to avoid redundancy.
+			Continued: txSeq < len(tx.ops)-1,
+		}
+		if err := put(tx.Transaction, key, value); err != nil {
+			return err
+		}
+	}
+	if err := tx.Transaction.Commit(); err != nil {
+		return err
+	}
+	tx.st.seq++
+	return nil
+}
+
+////////////////////////////////////////
+// Internal helpers
+
+func join(parts ...string) string {
+	return strings.Join(parts, ":")
+}
+
+func split(key string) []string {
+	return strings.Split(key, ":")
+}
+
+func put(st store.StoreWriter, k string, v interface{}) error {
+	bytes, err := vom.Encode(v)
+	if err != nil {
+		return err
+	}
+	return st.Put([]byte(k), bytes)
+}
diff --git a/services/syncbase/store/watchable/types.vdl b/services/syncbase/store/watchable/types.vdl
new file mode 100644
index 0000000..7a8f629
--- /dev/null
+++ b/services/syncbase/store/watchable/types.vdl
@@ -0,0 +1,48 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+// GetOp represents a store get operation.
+type GetOp struct {
+	Key []byte
+}
+
+// ScanOp represents a store scan operation.
+type ScanOp struct {
+	Start []byte
+	Limit []byte
+}
+
+// PutOp represents a store put operation.
+type PutOp struct {
+	Key   []byte
+	Value []byte
+}
+
+// DeleteOp represents a store delete operation.
+type DeleteOp struct {
+	Key []byte
+}
+
+// Op represents a store operation.
+type Op union {
+	Get    GetOp
+	Scan   ScanOp
+	Put    PutOp
+	Delete DeleteOp
+}
+
+// LogEntry represents a single store operation. This operation may have been
+// part of a transaction, as signified by the Continued boolean. Read-only
+// operations (and read-only transactions) are not logged.
+// TODO(sadovsky): Log commit time and maybe some other things.
+type LogEntry struct {
+	// The store operation that was performed.
+	Op Op
+
+	// If true, this entry is followed by more entries that belong to the same
+	// commit as this entry.
+	Continued bool
+}
diff --git a/services/syncbase/store/watchable/types.vdl.go b/services/syncbase/store/watchable/types.vdl.go
new file mode 100644
index 0000000..3b6da63
--- /dev/null
+++ b/services/syncbase/store/watchable/types.vdl.go
@@ -0,0 +1,136 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: types.vdl
+
+package watchable
+
+import (
+	// VDL system imports
+	"v.io/v23/vdl"
+)
+
+// GetOp represents a store get operation.
+type GetOp struct {
+	Key []byte
+}
+
+func (GetOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.GetOp"`
+}) {
+}
+
+// ScanOp represents a store scan operation.
+type ScanOp struct {
+	Start []byte
+	Limit []byte
+}
+
+func (ScanOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.ScanOp"`
+}) {
+}
+
+// PutOp represents a store put operation.
+type PutOp struct {
+	Key   []byte
+	Value []byte
+}
+
+func (PutOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.PutOp"`
+}) {
+}
+
+// DeleteOp represents a store delete operation.
+type DeleteOp struct {
+	Key []byte
+}
+
+func (DeleteOp) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.DeleteOp"`
+}) {
+}
+
+type (
+	// Op represents any single field of the Op union type.
+	//
+	// Op represents a store operation.
+	Op interface {
+		// Index returns the field index.
+		Index() int
+		// Interface returns the field value as an interface.
+		Interface() interface{}
+		// Name returns the field name.
+		Name() string
+		// __VDLReflect describes the Op union type.
+		__VDLReflect(__OpReflect)
+	}
+	// OpGet represents field Get of the Op union type.
+	OpGet struct{ Value GetOp }
+	// OpScan represents field Scan of the Op union type.
+	OpScan struct{ Value ScanOp }
+	// OpPut represents field Put of the Op union type.
+	OpPut struct{ Value PutOp }
+	// OpDelete represents field Delete of the Op union type.
+	OpDelete struct{ Value DeleteOp }
+	// __OpReflect describes the Op union type.
+	__OpReflect struct {
+		Name  string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.Op"`
+		Type  Op
+		Union struct {
+			Get    OpGet
+			Scan   OpScan
+			Put    OpPut
+			Delete OpDelete
+		}
+	}
+)
+
+func (x OpGet) Index() int               { return 0 }
+func (x OpGet) Interface() interface{}   { return x.Value }
+func (x OpGet) Name() string             { return "Get" }
+func (x OpGet) __VDLReflect(__OpReflect) {}
+
+func (x OpScan) Index() int               { return 1 }
+func (x OpScan) Interface() interface{}   { return x.Value }
+func (x OpScan) Name() string             { return "Scan" }
+func (x OpScan) __VDLReflect(__OpReflect) {}
+
+func (x OpPut) Index() int               { return 2 }
+func (x OpPut) Interface() interface{}   { return x.Value }
+func (x OpPut) Name() string             { return "Put" }
+func (x OpPut) __VDLReflect(__OpReflect) {}
+
+func (x OpDelete) Index() int               { return 3 }
+func (x OpDelete) Interface() interface{}   { return x.Value }
+func (x OpDelete) Name() string             { return "Delete" }
+func (x OpDelete) __VDLReflect(__OpReflect) {}
+
+// LogEntry represents a single store operation. This operation may have been
+// part of a transaction, as signified by the Continued boolean. Read-only
+// operations (and read-only transactions) are not logged.
+// TODO(sadovsky): Log commit time and maybe some other things.
+type LogEntry struct {
+	// The store operation that was performed.
+	Op Op
+	// If true, this entry is followed by more entries that belong to the same
+	// commit as this entry.
+	Continued bool
+}
+
+func (LogEntry) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.LogEntry"`
+}) {
+}
+
+func init() {
+	vdl.Register((*GetOp)(nil))
+	vdl.Register((*ScanOp)(nil))
+	vdl.Register((*PutOp)(nil))
+	vdl.Register((*DeleteOp)(nil))
+	vdl.Register((*Op)(nil))
+	vdl.Register((*LogEntry)(nil))
+}