syncbase: implement client batches
Change-Id: I9556f926c32b6e37772d276975285c6bffb3a9fc
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 1f3cbd3..82a366d 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -5,7 +5,12 @@
package nosql
import (
+ "math/rand"
"path"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
@@ -19,19 +24,41 @@
"v.io/x/lib/vlog"
)
+// database is a per-database singleton (i.e. not per-request). It does not
+// directly handle RPCs.
+// Note: If a database does not exist at the time of a database RPC, the
+// dispatcher creates a short-lived database object to service that particular
+// request.
type database struct {
name string
a interfaces.App
// The fields below are initialized iff this database exists.
st store.Store // stores all data for a single database
+
+ // Active snapshots and transactions corresponding to client batches.
+ // TODO(sadovsky): Add timeouts and GC.
+ mu sync.Mutex // protects the fields below
+ sns map[uint64]store.Snapshot
+ txs map[uint64]store.Transaction
+}
+
+// databaseReq is a per-request object that handles Database RPCs.
+// It embeds database and tracks request-specific batch state.
+type databaseReq struct {
+ *database
+ // If non-nil, sn or tx will be non-nil.
+ batchId *uint64
+ sn store.Snapshot
+ tx store.Transaction
}
var (
- _ wire.DatabaseServerMethods = (*database)(nil)
+ _ wire.DatabaseServerMethods = (*databaseReq)(nil)
_ interfaces.Database = (*database)(nil)
_ util.Layer = (*database)(nil)
)
+// DatabaseOptions configures a database.
type DatabaseOptions struct {
// Database-level permissions.
Perms access.Permissions
@@ -62,6 +89,8 @@
name: name,
a: a,
st: st,
+ sns: make(map[uint64]store.Snapshot),
+ txs: make(map[uint64]store.Transaction),
}
data := &databaseData{
Name: d.name,
@@ -76,34 +105,100 @@
////////////////////////////////////////
// RPC methods
-func (d *database) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
// This database does not yet exist; d is just an ephemeral handle that holds
// {name string, a *app}. d.a.CreateNoSQLDatabase will create a new database
// handle and store it in d.a.dbs[d.name].
return d.a.CreateNoSQLDatabase(ctx, call, d.name, perms)
}
-func (d *database) Delete(ctx *context.T, call rpc.ServerCall) error {
+func (d *databaseReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return d.a.DeleteNoSQLDatabase(ctx, call, d.name)
}
-func (d *database) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
- return "", verror.NewErrNotImplemented(ctx)
+var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+
+func (d *databaseReq) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
+ if d.batchId != nil {
+ return "", wire.NewErrBoundToBatch(ctx)
+ }
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ var id uint64
+ var batchType string
+ for {
+ id = uint64(rng.Int63())
+ if bo.ReadOnly {
+ if _, ok := d.sns[id]; !ok {
+ d.sns[id] = d.st.NewSnapshot()
+ batchType = "sn"
+ break
+ }
+ } else {
+ if _, ok := d.txs[id]; !ok {
+ d.txs[id] = d.st.NewTransaction()
+ batchType = "tx"
+ break
+ }
+ }
+ }
+ return strings.Join([]string{d.name, batchType, strconv.FormatUint(id, 10)}, util.BatchSep), nil
}
-func (d *database) Commit(ctx *context.T, call rpc.ServerCall) error {
- return verror.NewErrNotImplemented(ctx)
+func (d *databaseReq) Commit(ctx *context.T, call rpc.ServerCall) error {
+ if d.batchId == nil {
+ return wire.NewErrNotBoundToBatch(ctx)
+ }
+ if d.tx == nil {
+ return wire.NewErrReadOnlyBatch(ctx)
+ }
+ var err error
+ if err = d.tx.Commit(); err == nil {
+ d.mu.Lock()
+ delete(d.txs, *d.batchId)
+ d.mu.Unlock()
+ }
+ return err
}
-func (d *database) Abort(ctx *context.T, call rpc.ServerCall) error {
- return verror.NewErrNotImplemented(ctx)
+func (d *databaseReq) Abort(ctx *context.T, call rpc.ServerCall) error {
+ if d.batchId == nil {
+ return wire.NewErrNotBoundToBatch(ctx)
+ }
+ var err error
+ if d.tx != nil {
+ if err = d.tx.Abort(); err == nil {
+ d.mu.Lock()
+ delete(d.txs, *d.batchId)
+ d.mu.Unlock()
+ }
+ } else {
+ if err = d.sn.Close(); err == nil {
+ d.mu.Lock()
+ delete(d.sns, *d.batchId)
+ d.mu.Unlock()
+ }
+ }
+ return err
}
-func (d *database) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+func (d *databaseReq) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return d.a.SetDatabasePerms(ctx, call, d.name, perms, version)
}
-func (d *database) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+func (d *databaseReq) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+ if d.batchId != nil {
+ return nil, "", wire.NewErrBoundToBatch(ctx)
+ }
data := &databaseData{}
if err := util.Get(ctx, call, d.st, d, data); err != nil {
return nil, "", err
@@ -111,15 +206,20 @@
return data.Perms, util.FormatVersion(data.Version), nil
}
-func (d *database) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+ if d.batchId != nil {
+ return nil, wire.NewErrBoundToBatch(ctx)
+ }
// Check perms.
sn := d.st.NewSnapshot()
+ closeSnapshot := func() error {
+ return sn.Close()
+ }
if err := util.Get(ctx, call, sn, d, &databaseData{}); err != nil {
- sn.Close()
+ closeSnapshot()
return nil, err
}
- pattern := "*"
- return util.Glob(ctx, call, pattern, sn, util.TablePrefix)
+ return util.Glob(ctx, call, "*", sn, closeSnapshot, util.TablePrefix)
}
////////////////////////////////////////
@@ -167,3 +267,26 @@
func (d *database) StKey() string {
return util.DatabasePrefix
}
+
+////////////////////////////////////////
+// Internal helpers
+
+func (d *databaseReq) batchReader() store.StoreReader {
+ if d.batchId == nil {
+ return nil
+ } else if d.sn != nil {
+ return d.sn
+ } else {
+ return d.tx
+ }
+}
+
+func (d *databaseReq) batchReadWriter() (store.StoreReadWriter, error) {
+ if d.batchId == nil {
+ return nil, nil
+ } else if d.tx != nil {
+ return d.tx, nil
+ } else {
+ return nil, wire.NewErrReadOnlyBatch(nil)
+ }
+}