syncbase: watchable store layer
Beginnings of lightweight "watchable store" layer, enabling
client-side watch impl as well as sync module watching of
store commits.
Change-Id: If4de271f9ea0593ec8fb840f93fa22c6206f10b5
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index fcc2817..94664fb 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -10,6 +10,7 @@
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/syncbase/x/ref/services/syncbase/store/memstore"
+ "v.io/syncbase/x/ref/services/syncbase/store/watchable"
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/rpc"
@@ -39,10 +40,14 @@
return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
}
// TODO(sadovsky): Make storage engine pluggable.
+ st, err := watchable.Wrap(memstore.New())
+ if err != nil {
+ return nil, err
+ }
d := &database{
name: name,
a: a,
- st: memstore.New(),
+ st: st,
}
data := &databaseData{
Name: d.name,
diff --git a/services/syncbase/store/watchable/store.go b/services/syncbase/store/watchable/store.go
new file mode 100644
index 0000000..e4fd0ee
--- /dev/null
+++ b/services/syncbase/store/watchable/store.go
@@ -0,0 +1,197 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// Package watchable provides a store.Store that maintains a commit log. In
+// Syncbase, this log forms the basis for the implementation of client-facing
+// watch as well as the sync module's watching of store commits.
+//
+// Log entries are keyed in reverse chronological order. More specifically, the
+// LogEntry key format is "$log:<seq>:<txSeq>", where <seq> is (MaxUint64-seq)
+// and <txSeq> is (MaxUint16-txSeq). All numbers are zero-padded to ensure that
+// the lexicographic order matches the numeric order. Thus, clients implementing
+// ResumeMarkers (i.e. implementing the watch API) should use
+// fmt.Sprintf("%020d", MaxUint64-marker) to convert external markers to
+// internal LogEntry key prefixes.
+package watchable
+
+// TODO(sadovsky): Write unit tests. (As of 2015-05-26 we're still iterating on
+// the design for how to expose a "watch" API from the storage engine, and we
+// don't want to write lots of tests prematurely.)
+// TODO(sadovsky): Expose helper functions for constructing LogEntry keys.
+// TODO(sadovsky): Allow clients to subscribe via Go channel.
+
+import (
+ "fmt"
+ "strconv"
+ "strings"
+ "sync"
+
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/verror"
+ "v.io/v23/vom"
+)
+
+const (
+ LogPrefix = "$log"
+ MaxUint16 uint64 = 1<<16 - 1 // 5 digits
+ MaxUint64 uint64 = 1<<64 - 1 // 20 digits
+)
+
+// Store is a store.Store that maintains a commit log.
+type Store store.Store
+
+// Wrap returns a watchable.Store that wraps the given store.Store.
+func Wrap(st store.Store) (Store, error) {
+ it := st.Scan([]byte(LogPrefix), []byte(""))
+ var seq uint64 = 0
+ for it.Advance() {
+ key := string(it.Key(nil))
+ parts := split(key)
+ if len(parts) != 3 {
+ panic("wrong number of parts: " + key)
+ }
+ invSeq, err := strconv.ParseUint(parts[1], 10, 64)
+ if err != nil {
+ panic("failed to parse invSeq: " + key)
+ }
+ seq = MaxUint64 - invSeq
+ it.Cancel()
+ }
+ if err := it.Err(); err != nil && verror.ErrorID(err) != verror.ErrCanceled.ID {
+ return nil, err
+ }
+ return &wstore{Store: st, seq: seq}, nil
+}
+
+type wstore struct {
+ store.Store
+ mu sync.Mutex // held during transaction commits; protects seq
+ seq uint64 // sequence number, for commits
+}
+
+type transaction struct {
+ store.Transaction
+ st *wstore
+ mu sync.Mutex // protects the fields below
+ ops []Op
+}
+
+var (
+ _ Store = (*wstore)(nil)
+ _ store.Transaction = (*transaction)(nil)
+)
+
+// TODO(sadovsky): Decide whether to copy []bytes vs. requiring clients not to
+// modify passed-in []bytes.
+
+func (st *wstore) Put(key, value []byte) error {
+ // Use watchable.Store transaction so this op gets logged.
+ return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+ return st.Put(key, value)
+ })
+}
+
+func (st *wstore) Delete(key []byte) error {
+ // Use watchable.Store transaction so this op gets logged.
+ return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+ return st.Delete(key)
+ })
+}
+
+func (st *wstore) NewTransaction() store.Transaction {
+ return &transaction{Transaction: st.Store.NewTransaction(), st: st}
+}
+
+func (tx *transaction) Get(key, valbuf []byte) ([]byte, error) {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ valbuf, err := tx.Transaction.Get(key, valbuf)
+ if err == nil || verror.ErrorID(err) == store.ErrUnknownKey.ID {
+ tx.ops = append(tx.ops, &OpGet{GetOp{Key: key}})
+ }
+ return valbuf, err
+}
+
+func (tx *transaction) Scan(start, limit []byte) store.Stream {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ it := tx.Transaction.Scan(start, limit)
+ if it.Err() == nil {
+ tx.ops = append(tx.ops, &OpScan{ScanOp{Start: start, Limit: limit}})
+ }
+ return it
+}
+
+func (tx *transaction) Put(key, value []byte) error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ err := tx.Transaction.Put(key, value)
+ if err == nil {
+ tx.ops = append(tx.ops, &OpPut{PutOp{Key: key, Value: value}})
+ }
+ return err
+}
+
+func (tx *transaction) Delete(key []byte) error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ err := tx.Transaction.Delete(key)
+ if err == nil {
+ tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+ }
+ return err
+}
+
+func (tx *transaction) Commit() error {
+ tx.mu.Lock()
+ defer tx.mu.Unlock()
+ tx.st.mu.Lock()
+ defer tx.st.mu.Unlock()
+ // Check sequence numbers.
+ if uint64(len(tx.ops)) > MaxUint16 {
+ return verror.New(verror.ErrInternal, nil, "too many ops")
+ }
+ if tx.st.seq == MaxUint64 {
+ return verror.New(verror.ErrInternal, nil, "seq maxed out")
+ }
+ // Write LogEntry records.
+ // TODO(sadovsky): Use a more efficient lexicographic number encoding.
+ keyPrefix := join(LogPrefix, fmt.Sprintf("%020d", MaxUint64-tx.st.seq))
+ for txSeq, op := range tx.ops {
+ key := join(keyPrefix, fmt.Sprintf("%05d", MaxUint16-uint64(txSeq)))
+ value := &LogEntry{
+ Op: op,
+ // TODO(sadovsky): This information is also captured in LogEntry keys.
+ // Optimize to avoid redundancy.
+ Continued: txSeq < len(tx.ops)-1,
+ }
+ if err := put(tx.Transaction, key, value); err != nil {
+ return err
+ }
+ }
+ if err := tx.Transaction.Commit(); err != nil {
+ return err
+ }
+ tx.st.seq++
+ return nil
+}
+
+////////////////////////////////////////
+// Internal helpers
+
+func join(parts ...string) string {
+ return strings.Join(parts, ":")
+}
+
+func split(key string) []string {
+ return strings.Split(key, ":")
+}
+
+func put(st store.StoreWriter, k string, v interface{}) error {
+ bytes, err := vom.Encode(v)
+ if err != nil {
+ return err
+ }
+ return st.Put([]byte(k), bytes)
+}
diff --git a/services/syncbase/store/watchable/types.vdl b/services/syncbase/store/watchable/types.vdl
new file mode 100644
index 0000000..7a8f629
--- /dev/null
+++ b/services/syncbase/store/watchable/types.vdl
@@ -0,0 +1,48 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package watchable
+
+// GetOp represents a store get operation.
+type GetOp struct {
+ Key []byte
+}
+
+// ScanOp represents a store scan operation.
+type ScanOp struct {
+ Start []byte
+ Limit []byte
+}
+
+// PutOp represents a store put operation.
+type PutOp struct {
+ Key []byte
+ Value []byte
+}
+
+// DeleteOp represents a store delete operation.
+type DeleteOp struct {
+ Key []byte
+}
+
+// Op represents a store operation.
+type Op union {
+ Get GetOp
+ Scan ScanOp
+ Put PutOp
+ Delete DeleteOp
+}
+
+// LogEntry represents a single store operation. This operation may have been
+// part of a transaction, as signified by the Continued boolean. Read-only
+// operations (and read-only transactions) are not logged.
+// TODO(sadovsky): Log commit time and maybe some other things.
+type LogEntry struct {
+ // The store operation that was performed.
+ Op Op
+
+ // If true, this entry is followed by more entries that belong to the same
+ // commit as this entry.
+ Continued bool
+}
diff --git a/services/syncbase/store/watchable/types.vdl.go b/services/syncbase/store/watchable/types.vdl.go
new file mode 100644
index 0000000..3b6da63
--- /dev/null
+++ b/services/syncbase/store/watchable/types.vdl.go
@@ -0,0 +1,136 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: types.vdl
+
+package watchable
+
+import (
+ // VDL system imports
+ "v.io/v23/vdl"
+)
+
+// GetOp represents a store get operation.
+type GetOp struct {
+ Key []byte
+}
+
+func (GetOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.GetOp"`
+}) {
+}
+
+// ScanOp represents a store scan operation.
+type ScanOp struct {
+ Start []byte
+ Limit []byte
+}
+
+func (ScanOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.ScanOp"`
+}) {
+}
+
+// PutOp represents a store put operation.
+type PutOp struct {
+ Key []byte
+ Value []byte
+}
+
+func (PutOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.PutOp"`
+}) {
+}
+
+// DeleteOp represents a store delete operation.
+type DeleteOp struct {
+ Key []byte
+}
+
+func (DeleteOp) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.DeleteOp"`
+}) {
+}
+
+type (
+ // Op represents any single field of the Op union type.
+ //
+ // Op represents a store operation.
+ Op interface {
+ // Index returns the field index.
+ Index() int
+ // Interface returns the field value as an interface.
+ Interface() interface{}
+ // Name returns the field name.
+ Name() string
+ // __VDLReflect describes the Op union type.
+ __VDLReflect(__OpReflect)
+ }
+ // OpGet represents field Get of the Op union type.
+ OpGet struct{ Value GetOp }
+ // OpScan represents field Scan of the Op union type.
+ OpScan struct{ Value ScanOp }
+ // OpPut represents field Put of the Op union type.
+ OpPut struct{ Value PutOp }
+ // OpDelete represents field Delete of the Op union type.
+ OpDelete struct{ Value DeleteOp }
+ // __OpReflect describes the Op union type.
+ __OpReflect struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.Op"`
+ Type Op
+ Union struct {
+ Get OpGet
+ Scan OpScan
+ Put OpPut
+ Delete OpDelete
+ }
+ }
+)
+
+func (x OpGet) Index() int { return 0 }
+func (x OpGet) Interface() interface{} { return x.Value }
+func (x OpGet) Name() string { return "Get" }
+func (x OpGet) __VDLReflect(__OpReflect) {}
+
+func (x OpScan) Index() int { return 1 }
+func (x OpScan) Interface() interface{} { return x.Value }
+func (x OpScan) Name() string { return "Scan" }
+func (x OpScan) __VDLReflect(__OpReflect) {}
+
+func (x OpPut) Index() int { return 2 }
+func (x OpPut) Interface() interface{} { return x.Value }
+func (x OpPut) Name() string { return "Put" }
+func (x OpPut) __VDLReflect(__OpReflect) {}
+
+func (x OpDelete) Index() int { return 3 }
+func (x OpDelete) Interface() interface{} { return x.Value }
+func (x OpDelete) Name() string { return "Delete" }
+func (x OpDelete) __VDLReflect(__OpReflect) {}
+
+// LogEntry represents a single store operation. This operation may have been
+// part of a transaction, as signified by the Continued boolean. Read-only
+// operations (and read-only transactions) are not logged.
+// TODO(sadovsky): Log commit time and maybe some other things.
+type LogEntry struct {
+ // The store operation that was performed.
+ Op Op
+ // If true, this entry is followed by more entries that belong to the same
+ // commit as this entry.
+ Continued bool
+}
+
+func (LogEntry) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/store/watchable.LogEntry"`
+}) {
+}
+
+func init() {
+ vdl.Register((*GetOp)(nil))
+ vdl.Register((*ScanOp)(nil))
+ vdl.Register((*PutOp)(nil))
+ vdl.Register((*DeleteOp)(nil))
+ vdl.Register((*Op)(nil))
+ vdl.Register((*LogEntry)(nil))
+}