syncbase: implement client batches
Change-Id: I9556f926c32b6e37772d276975285c6bffb3a9fc
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index aa4bfb3..1e1fe1e 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -19,6 +19,7 @@
"v.io/v23/verror"
)
+// app is a per-app singleton (i.e. not per-request) that handles App RPCs.
type app struct {
name string
s *service
@@ -66,12 +67,14 @@
func (a *app) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
// Check perms.
sn := a.s.st.NewSnapshot()
+ closeSnapshot := func() error {
+ return sn.Close()
+ }
if err := util.Get(ctx, call, sn, a, &appData{}); err != nil {
- sn.Close()
+ closeSnapshot()
return nil, err
}
- pattern := "*"
- return util.Glob(ctx, call, pattern, sn, util.JoinKeyParts(util.DbInfoPrefix, a.name))
+ return util.Glob(ctx, call, "*", sn, closeSnapshot, util.JoinKeyParts(util.DbInfoPrefix, a.name))
}
////////////////////////////////////////
diff --git a/services/syncbase/server/dispatcher.go b/services/syncbase/server/dispatcher.go
index f6a4dde..17fb042 100644
--- a/services/syncbase/server/dispatcher.go
+++ b/services/syncbase/server/dispatcher.go
@@ -48,8 +48,8 @@
aExists := false
var a *app
- if aint, err := disp.s.App(nil, nil, appName); err == nil {
- a = aint.(*app) // panics on failure, as desired
+ if aInt, err := disp.s.App(nil, nil, appName); err == nil {
+ a = aInt.(*app) // panics on failure, as desired
aExists = true
} else {
if verror.ErrorID(err) != verror.ErrNoExist.ID {
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 1f3cbd3..82a366d 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -5,7 +5,12 @@
package nosql
import (
+ "math/rand"
"path"
+ "strconv"
+ "strings"
+ "sync"
+ "time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
@@ -19,19 +24,41 @@
"v.io/x/lib/vlog"
)
+// database is a per-database singleton (i.e. not per-request). It does not
+// directly handle RPCs.
+// Note: If a database does not exist at the time of a database RPC, the
+// dispatcher creates a short-lived database object to service that particular
+// request.
type database struct {
name string
a interfaces.App
// The fields below are initialized iff this database exists.
st store.Store // stores all data for a single database
+
+ // Active snapshots and transactions corresponding to client batches.
+ // TODO(sadovsky): Add timeouts and GC.
+ mu sync.Mutex // protects the fields below
+ sns map[uint64]store.Snapshot
+ txs map[uint64]store.Transaction
+}
+
+// databaseReq is a per-request object that handles Database RPCs.
+// It embeds database and tracks request-specific batch state.
+type databaseReq struct {
+ *database
+ // If non-nil, sn or tx will be non-nil.
+ batchId *uint64
+ sn store.Snapshot
+ tx store.Transaction
}
var (
- _ wire.DatabaseServerMethods = (*database)(nil)
+ _ wire.DatabaseServerMethods = (*databaseReq)(nil)
_ interfaces.Database = (*database)(nil)
_ util.Layer = (*database)(nil)
)
+// DatabaseOptions configures a database.
type DatabaseOptions struct {
// Database-level permissions.
Perms access.Permissions
@@ -62,6 +89,8 @@
name: name,
a: a,
st: st,
+ sns: make(map[uint64]store.Snapshot),
+ txs: make(map[uint64]store.Transaction),
}
data := &databaseData{
Name: d.name,
@@ -76,34 +105,100 @@
////////////////////////////////////////
// RPC methods
-func (d *database) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
// This database does not yet exist; d is just an ephemeral handle that holds
// {name string, a *app}. d.a.CreateNoSQLDatabase will create a new database
// handle and store it in d.a.dbs[d.name].
return d.a.CreateNoSQLDatabase(ctx, call, d.name, perms)
}
-func (d *database) Delete(ctx *context.T, call rpc.ServerCall) error {
+func (d *databaseReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return d.a.DeleteNoSQLDatabase(ctx, call, d.name)
}
-func (d *database) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
- return "", verror.NewErrNotImplemented(ctx)
+var rng *rand.Rand = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+
+func (d *databaseReq) BeginBatch(ctx *context.T, call rpc.ServerCall, bo wire.BatchOptions) (string, error) {
+ if d.batchId != nil {
+ return "", wire.NewErrBoundToBatch(ctx)
+ }
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ var id uint64
+ var batchType string
+ for {
+ id = uint64(rng.Int63())
+ if bo.ReadOnly {
+ if _, ok := d.sns[id]; !ok {
+ d.sns[id] = d.st.NewSnapshot()
+ batchType = "sn"
+ break
+ }
+ } else {
+ if _, ok := d.txs[id]; !ok {
+ d.txs[id] = d.st.NewTransaction()
+ batchType = "tx"
+ break
+ }
+ }
+ }
+ return strings.Join([]string{d.name, batchType, strconv.FormatUint(id, 10)}, util.BatchSep), nil
}
-func (d *database) Commit(ctx *context.T, call rpc.ServerCall) error {
- return verror.NewErrNotImplemented(ctx)
+func (d *databaseReq) Commit(ctx *context.T, call rpc.ServerCall) error {
+ if d.batchId == nil {
+ return wire.NewErrNotBoundToBatch(ctx)
+ }
+ if d.tx == nil {
+ return wire.NewErrReadOnlyBatch(ctx)
+ }
+ var err error
+ if err = d.tx.Commit(); err == nil {
+ d.mu.Lock()
+ delete(d.txs, *d.batchId)
+ d.mu.Unlock()
+ }
+ return err
}
-func (d *database) Abort(ctx *context.T, call rpc.ServerCall) error {
- return verror.NewErrNotImplemented(ctx)
+func (d *databaseReq) Abort(ctx *context.T, call rpc.ServerCall) error {
+ if d.batchId == nil {
+ return wire.NewErrNotBoundToBatch(ctx)
+ }
+ var err error
+ if d.tx != nil {
+ if err = d.tx.Abort(); err == nil {
+ d.mu.Lock()
+ delete(d.txs, *d.batchId)
+ d.mu.Unlock()
+ }
+ } else {
+ if err = d.sn.Close(); err == nil {
+ d.mu.Lock()
+ delete(d.sns, *d.batchId)
+ d.mu.Unlock()
+ }
+ }
+ return err
}
-func (d *database) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+func (d *databaseReq) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return d.a.SetDatabasePerms(ctx, call, d.name, perms, version)
}
-func (d *database) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+func (d *databaseReq) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
+ if d.batchId != nil {
+ return nil, "", wire.NewErrBoundToBatch(ctx)
+ }
data := &databaseData{}
if err := util.Get(ctx, call, d.st, d, data); err != nil {
return nil, "", err
@@ -111,15 +206,20 @@
return data.Perms, util.FormatVersion(data.Version), nil
}
-func (d *database) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+func (d *databaseReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+ if d.batchId != nil {
+ return nil, wire.NewErrBoundToBatch(ctx)
+ }
// Check perms.
sn := d.st.NewSnapshot()
+ closeSnapshot := func() error {
+ return sn.Close()
+ }
if err := util.Get(ctx, call, sn, d, &databaseData{}); err != nil {
- sn.Close()
+ closeSnapshot()
return nil, err
}
- pattern := "*"
- return util.Glob(ctx, call, pattern, sn, util.TablePrefix)
+ return util.Glob(ctx, call, "*", sn, closeSnapshot, util.TablePrefix)
}
////////////////////////////////////////
@@ -167,3 +267,26 @@
func (d *database) StKey() string {
return util.DatabasePrefix
}
+
+////////////////////////////////////////
+// Internal helpers
+
+func (d *databaseReq) batchReader() store.StoreReader {
+ if d.batchId == nil {
+ return nil
+ } else if d.sn != nil {
+ return d.sn
+ } else {
+ return d.tx
+ }
+}
+
+func (d *databaseReq) batchReadWriter() (store.StoreReadWriter, error) {
+ if d.batchId == nil {
+ return nil, nil
+ } else if d.tx != nil {
+ return d.tx, nil
+ } else {
+ return nil, wire.NewErrReadOnlyBatch(nil)
+ }
+}
diff --git a/services/syncbase/server/nosql/database_sgm.go b/services/syncbase/server/nosql/database_sgm.go
index 768b478..b5797b0 100644
--- a/services/syncbase/server/nosql/database_sgm.go
+++ b/services/syncbase/server/nosql/database_sgm.go
@@ -16,39 +16,66 @@
////////////////////////////////////////
// SyncGroup RPC methods
-func (d *database) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+func (d *databaseReq) GetSyncGroupNames(ctx *context.T, call rpc.ServerCall) ([]string, error) {
+ if d.batchId != nil {
+ return nil, wire.NewErrBoundToBatch(ctx)
+ }
return nil, verror.NewErrNotImplemented(ctx)
}
-func (d *database) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+func (d *databaseReq) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
sd := vsync.NewSyncDatabase(d)
return sd.CreateSyncGroup(ctx, call, sgName, spec, myInfo)
}
-func (d *database) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+func (d *databaseReq) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
+ if d.batchId != nil {
+ return wire.SyncGroupSpec{}, wire.NewErrBoundToBatch(ctx)
+ }
return wire.SyncGroupSpec{}, verror.NewErrNotImplemented(ctx)
}
-func (d *database) LeaveSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+func (d *databaseReq) LeaveSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return verror.NewErrNotImplemented(ctx)
}
-func (d *database) DestroySyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+func (d *databaseReq) DestroySyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return verror.NewErrNotImplemented(ctx)
}
-func (d *database) EjectFromSyncGroup(ctx *context.T, call rpc.ServerCall, sgName, member string) error {
+func (d *databaseReq) EjectFromSyncGroup(ctx *context.T, call rpc.ServerCall, sgName, member string) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return verror.NewErrNotImplemented(ctx)
}
-func (d *database) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+func (d *databaseReq) GetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string) (wire.SyncGroupSpec, string, error) {
+ if d.batchId != nil {
+ return wire.SyncGroupSpec{}, "", wire.NewErrBoundToBatch(ctx)
+ }
return wire.SyncGroupSpec{}, "", verror.NewErrNotImplemented(ctx)
}
-func (d *database) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+func (d *databaseReq) SetSyncGroupSpec(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, version string) error {
+ if d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return verror.NewErrNotImplemented(ctx)
}
-func (d *database) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+func (d *databaseReq) GetSyncGroupMembers(ctx *context.T, call rpc.ServerCall, sgName string) (map[string]wire.SyncGroupMemberInfo, error) {
+ if d.batchId != nil {
+ return nil, wire.NewErrBoundToBatch(ctx)
+ }
return nil, verror.NewErrNotImplemented(ctx)
}
diff --git a/services/syncbase/server/nosql/dispatcher.go b/services/syncbase/server/nosql/dispatcher.go
index d8fe2f0..3bbea26 100644
--- a/services/syncbase/server/nosql/dispatcher.go
+++ b/services/syncbase/server/nosql/dispatcher.go
@@ -5,12 +5,14 @@
package nosql
import (
+ "strconv"
"strings"
wire "v.io/syncbase/v23/services/syncbase"
nosqlWire "v.io/syncbase/v23/services/syncbase/nosql"
pubutil "v.io/syncbase/v23/syncbase/util"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/v23/rpc"
"v.io/v23/security"
"v.io/v23/verror"
@@ -35,9 +37,15 @@
vlog.Fatal("invalid nosql.dispatcher Lookup")
}
+ dParts := strings.Split(parts[0], util.BatchSep)
+ dName := dParts[0]
+
// Validate all key atoms up front, so that we can avoid doing so in all our
// method implementations.
- for _, s := range parts {
+ if !pubutil.ValidName(dName) {
+ return nil, nil, wire.NewErrInvalidName(nil, suffix)
+ }
+ for _, s := range parts[1:] {
if !pubutil.ValidName(s) {
return nil, nil, wire.NewErrInvalidName(nil, suffix)
}
@@ -45,22 +53,28 @@
dExists := false
var d *database
- if dint, err := disp.a.NoSQLDatabase(nil, nil, parts[0]); err == nil {
- d = dint.(*database) // panics on failure, as desired
+ if dInt, err := disp.a.NoSQLDatabase(nil, nil, dName); err == nil {
+ d = dInt.(*database) // panics on failure, as desired
dExists = true
} else {
if verror.ErrorID(err) != verror.ErrNoExist.ID {
return nil, nil, err
} else {
+ // Database does not exist. Create a short-lived database object to
+ // service this request.
d = &database{
- name: parts[0],
+ name: dName,
a: disp.a,
}
}
}
+ dReq := &databaseReq{database: d}
+ if !setBatchFields(dReq, dParts) {
+ return nil, nil, wire.NewErrInvalidName(nil, suffix)
+ }
if len(parts) == 1 {
- return nosqlWire.DatabaseServer(d), nil, nil
+ return nosqlWire.DatabaseServer(dReq), nil, nil
}
// All table and row methods require the database to exist. If it doesn't,
@@ -73,21 +87,50 @@
// downstream handling of this request. Depending on the order in which things
// execute, the client may not get an error, but in any case ultimately the
// store will end up in a consistent state.
- t := &table{
+ tReq := &tableReq{
name: parts[1],
- d: d,
+ d: dReq,
}
if len(parts) == 2 {
- return nosqlWire.TableServer(t), nil, nil
+ return nosqlWire.TableServer(tReq), nil, nil
}
- r := &row{
+ rReq := &rowReq{
key: parts[2],
- t: t,
+ t: tReq,
}
if len(parts) == 3 {
- return nosqlWire.RowServer(r), nil, nil
+ return nosqlWire.RowServer(rReq), nil, nil
}
return nil, nil, verror.NewErrNoExist(nil)
}
+
+// setBatchFields sets the batch-related fields in databaseReq based on the
+// value of dParts, the parts of the database name component. It returns false
+// if dParts is malformed.
+func setBatchFields(d *databaseReq, dParts []string) bool {
+ if len(dParts) == 1 {
+ return true
+ }
+ if len(dParts) != 3 {
+ return false
+ }
+ batchId, err := strconv.ParseUint(dParts[2], 0, 64)
+ if err != nil {
+ return false
+ }
+ d.batchId = &batchId
+ d.mu.Lock()
+ defer d.mu.Unlock()
+ var ok bool
+ switch dParts[1] {
+ case "sn":
+ d.sn, ok = d.sns[batchId]
+ case "tx":
+ d.tx, ok = d.txs[batchId]
+ default:
+ return false
+ }
+ return ok
+}
diff --git a/services/syncbase/server/nosql/row.go b/services/syncbase/server/nosql/row.go
index 057dc10..3397fd1 100644
--- a/services/syncbase/server/nosql/row.go
+++ b/services/syncbase/server/nosql/row.go
@@ -13,67 +13,95 @@
"v.io/v23/verror"
)
-// TODO(sadovsky): Extend data layout to support version tracking for sync.
-// See go/vanadium-local-structured-store.
-
-// TODO(sadovsky): Handle the case where we're in a batch.
-
-type row struct {
+// rowReq is a per-request object that handles Row RPCs.
+type rowReq struct {
key string
- t *table
+ t *tableReq
}
var (
- _ wire.RowServerMethods = (*row)(nil)
- _ util.Layer = (*row)(nil)
+ _ wire.RowServerMethods = (*rowReq)(nil)
+ _ util.Layer = (*rowReq)(nil)
)
////////////////////////////////////////
// RPC methods
-func (r *row) Get(ctx *context.T, call rpc.ServerCall) ([]byte, error) {
- return r.get(ctx, call, r.t.d.st)
+func (r *rowReq) Get(ctx *context.T, call rpc.ServerCall) ([]byte, error) {
+ impl := func(st store.StoreReader) ([]byte, error) {
+ return r.get(ctx, call, st)
+ }
+ var st store.StoreReader
+ if r.t.d.batchId != nil {
+ st = r.t.d.batchReader()
+ } else {
+ sn := r.t.d.st.NewSnapshot()
+ st = sn
+ defer sn.Close()
+ }
+ return impl(st)
}
-func (r *row) Put(ctx *context.T, call rpc.ServerCall, value []byte) error {
- return r.put(ctx, call, r.t.d.st, value)
+func (r *rowReq) Put(ctx *context.T, call rpc.ServerCall, value []byte) error {
+ impl := func(st store.StoreReadWriter) error {
+ return r.put(ctx, call, st, value)
+ }
+ if r.t.d.batchId != nil {
+ if st, err := r.t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(r.t.d.st, impl)
+ }
}
-func (r *row) Delete(ctx *context.T, call rpc.ServerCall) error {
- return r.del(ctx, call, r.t.d.st)
+func (r *rowReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+ impl := func(st store.StoreReadWriter) error {
+ return r.delete(ctx, call, st)
+ }
+ if r.t.d.batchId != nil {
+ if st, err := r.t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(r.t.d.st, impl)
+ }
}
////////////////////////////////////////
// util.Layer methods
-func (r *row) Name() string {
+func (r *rowReq) Name() string {
return r.key
}
-func (r *row) StKey() string {
+func (r *rowReq) StKey() string {
return util.JoinKeyParts(util.RowPrefix, r.stKeyPart())
}
////////////////////////////////////////
// Internal helpers
-func (r *row) stKeyPart() string {
+func (r *rowReq) stKeyPart() string {
return util.JoinKeyParts(r.t.stKeyPart(), r.key)
}
-// TODO(sadovsky): Update access checks to use prefix permissions.
-
-// checkAccess checks that this row's table exists in the database and performs
+// checkAccess checks that this row's table exists in the database, and performs
// an authorization check (currently against the table perms).
// Returns a VDL-compatible error.
-func (r *row) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
+// TODO(sadovsky): Use prefix permissions.
+func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
return util.Get(ctx, call, st, r.t, &tableData{})
}
// get reads data from the storage engine.
// Performs authorization check.
// Returns a VDL-compatible error.
-func (r *row) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
+func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
if err := r.checkAccess(ctx, call, st); err != nil {
return nil, err
}
@@ -90,7 +118,7 @@
// put writes data to the storage engine.
// Performs authorization check.
// Returns a VDL-compatible error.
-func (r *row) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
+func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
if err := r.checkAccess(ctx, call, st); err != nil {
return err
}
@@ -100,10 +128,10 @@
return nil
}
-// del deletes data from the storage engine.
+// delete deletes data from the storage engine.
// Performs authorization check.
// Returns a VDL-compatible error.
-func (r *row) del(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
if err := r.checkAccess(ctx, call, st); err != nil {
return err
}
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index e73afd2..7619fc4 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -14,22 +14,24 @@
"v.io/v23/verror"
)
-// TODO(sadovsky): Handle the case where we're in a batch.
-
-type table struct {
+// tableReq is a per-request object that handles Table RPCs.
+type tableReq struct {
name string
- d *database
+ d *databaseReq
}
var (
- _ wire.TableServerMethods = (*table)(nil)
- _ util.Layer = (*table)(nil)
+ _ wire.TableServerMethods = (*tableReq)(nil)
+ _ util.Layer = (*tableReq)(nil)
)
////////////////////////////////////////
// RPC methods
-func (t *table) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+func (t *tableReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+ if t.d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
// Check databaseData perms.
dData := &databaseData{}
@@ -56,7 +58,10 @@
})
}
-func (t *table) Delete(ctx *context.T, call rpc.ServerCall) error {
+func (t *tableReq) Delete(ctx *context.T, call rpc.ServerCall) error {
+ if t.d.batchId != nil {
+ return wire.NewErrBoundToBatch(ctx)
+ }
return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
// Read-check-delete tableData.
if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
@@ -70,8 +75,8 @@
})
}
-func (t *table) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
- return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, start, limit []byte) error {
+ impl := func(st store.StoreReadWriter) error {
// Check perms.
if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
return err
@@ -88,83 +93,136 @@
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
- })
+ }
+ if t.d.batchId != nil {
+ if st, err := t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(t.d.st, impl)
+ }
}
-func (t *table) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
- sn := t.d.st.NewSnapshot()
- defer sn.Close()
- // Check perms.
- if err := util.Get(ctx, call, sn, t, &tableData{}); err != nil {
- return err
+func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, start, limit []byte) error {
+ impl := func(st store.StoreReader) error {
+ // Check perms.
+ if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ return err
+ }
+ it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
+ sender := call.SendStream()
+ key, value := []byte{}, []byte{}
+ for it.Advance() {
+ key, value = it.Key(key), it.Value(value)
+ parts := util.SplitKeyParts(string(key))
+ sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+ }
+ if err := it.Err(); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ return nil
}
- it := sn.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
- sender := call.SendStream()
- key, value := []byte{}, []byte{}
- for it.Advance() {
- key, value = it.Key(key), it.Value(value)
- parts := util.SplitKeyParts(string(key))
- sender.Send(wire.KeyValue{Key: parts[len(parts)-1], Value: value})
+ var st store.StoreReader
+ if t.d.batchId != nil {
+ st = t.d.batchReader()
+ } else {
+ sn := t.d.st.NewSnapshot()
+ st = sn
+ defer sn.Close()
}
- if err := it.Err(); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return impl(st)
}
-func (t *table) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
+func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, prefix string, perms access.Permissions) error {
if prefix != "" {
return verror.NewErrNotImplemented(ctx)
}
- return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+ impl := func(st store.StoreReadWriter) error {
data := &tableData{}
return util.Update(ctx, call, st, t, data, func() error {
data.Perms = perms
return nil
})
- })
+ }
+ if t.d.batchId != nil {
+ if st, err := t.d.batchReadWriter(); err != nil {
+ return err
+ } else {
+ return impl(st)
+ }
+ } else {
+ return store.RunInTransaction(t.d.st, impl)
+ }
}
-func (t *table) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
+func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, key string) ([]wire.PrefixPermissions, error) {
if key != "" {
return nil, verror.NewErrNotImplemented(ctx)
}
- data := &tableData{}
- if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
- return nil, err
+ impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
+ data := &tableData{}
+ if err := util.Get(ctx, call, t.d.st, t, data); err != nil {
+ return nil, err
+ }
+ return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
}
- return []wire.PrefixPermissions{{Prefix: "", Perms: data.Perms}}, nil
+ var st store.StoreReader
+ if t.d.batchId != nil {
+ st = t.d.batchReader()
+ } else {
+ sn := t.d.st.NewSnapshot()
+ st = sn
+ defer sn.Close()
+ }
+ return impl(st)
}
-func (t *table) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
+func (t *tableReq) DeletePermissions(ctx *context.T, call rpc.ServerCall, prefix string) error {
return verror.NewErrNotImplemented(ctx)
}
-func (t *table) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
- sn := t.d.st.NewSnapshot()
- // Check perms.
- if err := util.Get(ctx, call, sn, t, &tableData{}); err != nil {
- sn.Close()
- return nil, err
+func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
+ impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
+ // Check perms.
+ if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ closeStoreReader()
+ return nil, err
+ }
+ return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
}
- pattern := "*"
- return util.Glob(ctx, call, pattern, sn, util.JoinKeyParts(util.RowPrefix, t.name))
+ var st store.StoreReader
+ var closeStoreReader func() error
+ if t.d.batchId != nil {
+ st = t.d.batchReader()
+ closeStoreReader = func() error {
+ return nil
+ }
+ } else {
+ sn := t.d.st.NewSnapshot()
+ st = sn
+ closeStoreReader = func() error {
+ return sn.Close()
+ }
+ }
+ return impl(st, closeStoreReader)
}
////////////////////////////////////////
// util.Layer methods
-func (t *table) Name() string {
+func (t *tableReq) Name() string {
return t.name
}
-func (t *table) StKey() string {
+func (t *tableReq) StKey() string {
return util.JoinKeyParts(util.TablePrefix, t.stKeyPart())
}
////////////////////////////////////////
// Internal helpers
-func (t *table) stKeyPart() string {
+func (t *tableReq) stKeyPart() string {
return t.name
}
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index fd3e3b8..86a2a37 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -23,15 +23,7 @@
"v.io/v23/verror"
)
-type ServiceOptions struct {
- // Service-level permissions.
- Perms access.Permissions
- // Root dir for data storage.
- RootDir string
- // Storage engine to use (for service and per-database engines).
- Engine string
-}
-
+// service is a singleton (i.e. not per-request) that handles Service RPCs.
type service struct {
st store.Store // keeps track of which apps and databases exist, etc.
sync interfaces.SyncServerMethods
@@ -48,6 +40,16 @@
_ util.Layer = (*service)(nil)
)
+// ServiceOptions configures a service.
+type ServiceOptions struct {
+ // Service-level permissions.
+ Perms access.Permissions
+ // Root dir for data storage.
+ RootDir string
+ // Storage engine to use (for service and per-database engines).
+ Engine string
+}
+
// NewService creates a new service instance and returns it.
// Returns a VDL-compatible error.
func NewService(ctx *context.T, call rpc.ServerCall, opts ServiceOptions) (*service, error) {
@@ -103,12 +105,14 @@
func (s *service) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
// Check perms.
sn := s.st.NewSnapshot()
+ closeSnapshot := func() error {
+ return sn.Close()
+ }
if err := util.Get(ctx, call, sn, s, &serviceData{}); err != nil {
- sn.Close()
+ closeSnapshot()
return nil, err
}
- pattern := "*"
- return util.Glob(ctx, call, pattern, sn, util.AppPrefix)
+ return util.Glob(ctx, call, "*", sn, closeSnapshot, util.AppPrefix)
}
////////////////////////////////////////
diff --git a/services/syncbase/server/util/constants.go b/services/syncbase/server/util/constants.go
index 88e1115..44d0ff6 100644
--- a/services/syncbase/server/util/constants.go
+++ b/services/syncbase/server/util/constants.go
@@ -23,4 +23,8 @@
const (
// Service object name suffix for Syncbase-to-Syncbase RPCs.
SyncbaseSuffix = "$internal"
+ // Separator for batch info in database names.
+ BatchSep = ":"
+ // Separator for parts of storage engine keys.
+ KeyPartSep = ":"
)
diff --git a/services/syncbase/server/util/glob.go b/services/syncbase/server/util/glob.go
index 87a204e..acdd74f 100644
--- a/services/syncbase/server/util/glob.go
+++ b/services/syncbase/server/util/glob.go
@@ -53,29 +53,29 @@
return res, nil
}
-// Takes ownership of sn.
+// Glob performs a glob. It calls closeStoreReader to close st.
// TODO(sadovsky): Why do we make developers implement Glob differently from
// other streaming RPCs? It's confusing that Glob must return immediately and
// write its results to a channel, while other streaming RPC handlers must block
// and write their results to the output stream. See nlacasse's TODO below, too.
-func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sn store.Snapshot, stKeyPrefix string) (<-chan string, error) {
+func Glob(ctx *context.T, call rpc.ServerCall, pattern string, st store.StoreReader, closeStoreReader func() error, stKeyPrefix string) (<-chan string, error) {
// TODO(sadovsky): Support glob with non-prefix pattern.
if _, err := glob.Parse(pattern); err != nil {
- sn.Close()
+ closeStoreReader()
return nil, verror.New(verror.ErrBadArg, ctx, err)
}
prefix, err := globPatternToPrefix(pattern)
if err != nil {
- sn.Close()
+ closeStoreReader()
if verror.ErrorID(err) == verror.ErrBadArg.ID {
return nil, verror.NewErrNotImplemented(ctx)
}
return nil, verror.New(verror.ErrInternal, ctx, err)
}
- it := sn.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
+ it := st.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
ch := make(chan string)
go func() {
- defer sn.Close()
+ defer closeStoreReader()
defer close(ch)
key := []byte{}
for it.Advance() {
diff --git a/services/syncbase/server/util/key_util.go b/services/syncbase/server/util/key_util.go
index b469f9e..80a8a6d 100644
--- a/services/syncbase/server/util/key_util.go
+++ b/services/syncbase/server/util/key_util.go
@@ -13,12 +13,12 @@
// JoinKeyParts builds keys for accessing data in the storage engine.
func JoinKeyParts(parts ...string) string {
// TODO(sadovsky): Figure out which delimiter makes the most sense.
- return strings.Join(parts, ":")
+ return strings.Join(parts, KeyPartSep)
}
// SplitKeyParts is the inverse of JoinKeyParts.
func SplitKeyParts(key string) []string {
- return strings.Split(key, ":")
+ return strings.Split(key, KeyPartSep)
}
// ScanPrefixArgs returns args for sn.Scan() for the specified prefix.