internal storage engine: improve compile-time typing
This change does to internal storage engine API changes.
First, renaming Snapshot.Close() -> Snapshot.Abort().
Before this change, the Store interface implemented the
Snapshot interface, now it doesn't.
Second, adding the SnapshotOrTranscation interface.
Only Snapshot and Transaction implement this interface.
Transaction doesn't implement Snapshot.
Third, the StoreReadWriter becomes private. The StoreReadWriter
was replaced by the Transaction. This was done because in lots
of places the StoreReadWriter was checked to be the Transaction
at runtime.
This change also updates the code to the new API and removes
extra runtime type checking.
MultiPart: 1/2
Change-Id: Ic213cc0479a6cd8604bc62c29edaf7adb95b260e
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index 5dd9cad..957f4ed 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -88,14 +88,11 @@
}
// Check perms.
sn := a.s.st.NewSnapshot()
- closeSnapshot := func() error {
- return sn.Close()
- }
if err := util.GetWithAuth(ctx, call, sn, a.stKey(), &appData{}); err != nil {
- closeSnapshot()
+ sn.Abort()
return nil, err
}
- return util.Glob(ctx, call, "*", sn, closeSnapshot, util.JoinKeyParts(util.DbInfoPrefix, a.name))
+ return util.Glob(ctx, call, "*", sn, sn.Abort, util.JoinKeyParts(util.DbInfoPrefix, a.name))
}
////////////////////////////////////////
@@ -155,13 +152,13 @@
// 1. Check appData perms, create dbInfo record.
rootDir, engine := a.rootDirForDb(dbName), a.s.opts.Engine
aData := &appData{}
- if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
+ if err := store.RunInTransaction(a.s.st, func(tx store.Transaction) error {
// Check appData perms.
- if err := util.GetWithAuth(ctx, call, st, a.stKey(), aData); err != nil {
+ if err := util.GetWithAuth(ctx, call, tx, a.stKey(), aData); err != nil {
return err
}
// Check for "database already exists".
- if _, err := a.getDbInfo(ctx, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if _, err := a.getDbInfo(ctx, tx, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -174,7 +171,7 @@
RootDir: rootDir,
Engine: engine,
}
- return a.putDbInfo(ctx, st, dbName, info)
+ return a.putDbInfo(ctx, tx, dbName, info)
}); err != nil {
return err
}
@@ -193,8 +190,8 @@
}
// 3. Flip dbInfo.Initialized to true.
- if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
- return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
+ if err := store.RunInTransaction(a.s.st, func(tx store.Transaction) error {
+ return a.updateDbInfo(ctx, tx, dbName, func(info *dbInfo) error {
info.Initialized = true
return nil
})
@@ -235,8 +232,8 @@
}
// 2. Flip dbInfo.Deleted to true.
- if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
- return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
+ if err := store.RunInTransaction(a.s.st, func(tx store.Transaction) error {
+ return a.updateDbInfo(ctx, tx, dbName, func(info *dbInfo) error {
info.Deleted = true
return nil
})
diff --git a/services/syncbase/server/db_info.go b/services/syncbase/server/db_info.go
index 39ffdc8..f750d57 100644
--- a/services/syncbase/server/db_info.go
+++ b/services/syncbase/server/db_info.go
@@ -24,33 +24,32 @@
}
// getDbInfo reads data from the storage engine.
-func (a *app) getDbInfo(ctx *context.T, st store.StoreReader, dbName string) (*dbInfo, error) {
+func (a *app) getDbInfo(ctx *context.T, sntx store.SnapshotOrTransaction, dbName string) (*dbInfo, error) {
info := &dbInfo{}
- if err := util.Get(ctx, st, dbInfoStKey(a, dbName), info); err != nil {
+ if err := util.Get(ctx, sntx, dbInfoStKey(a, dbName), info); err != nil {
return nil, err
}
return info, nil
}
// putDbInfo writes data to the storage engine.
-func (a *app) putDbInfo(ctx *context.T, st store.StoreWriter, dbName string, info *dbInfo) error {
- return util.Put(ctx, st, dbInfoStKey(a, dbName), info)
+func (a *app) putDbInfo(ctx *context.T, tx store.Transaction, dbName string, info *dbInfo) error {
+ return util.Put(ctx, tx, dbInfoStKey(a, dbName), info)
}
// delDbInfo deletes data from the storage engine.
-func (a *app) delDbInfo(ctx *context.T, st store.StoreWriter, dbName string) error {
- return util.Delete(ctx, st, dbInfoStKey(a, dbName))
+func (a *app) delDbInfo(ctx *context.T, stw store.StoreWriter, dbName string) error {
+ return util.Delete(ctx, stw, dbInfoStKey(a, dbName))
}
// updateDbInfo performs a read-modify-write. fn should "modify" v.
-func (a *app) updateDbInfo(ctx *context.T, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
- _ = st.(store.Transaction) // panics on failure, as desired
- info, err := a.getDbInfo(ctx, st, dbName)
+func (a *app) updateDbInfo(ctx *context.T, tx store.Transaction, dbName string, fn func(info *dbInfo) error) error {
+ info, err := a.getDbInfo(ctx, tx, dbName)
if err != nil {
return err
}
if err := fn(info); err != nil {
return err
}
- return a.putDbInfo(ctx, st, dbName, info)
+ return a.putDbInfo(ctx, tx, dbName, info)
}
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index 3fe1b96..d4875c0 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -245,7 +245,7 @@
d.mu.Unlock()
}
} else {
- if err = d.sn.Close(); err == nil {
+ if err = d.sn.Abort(); err == nil {
d.mu.Lock()
delete(d.sns, *d.batchId)
d.mu.Unlock()
@@ -276,13 +276,12 @@
}
return rs.Err()
}
- var st store.StoreReader
+ var sntx store.SnapshotOrTransaction
if d.batchId != nil {
- st = d.batchReader()
+ sntx = d.batchReader()
} else {
- sn := d.st.NewSnapshot()
- st = sn
- defer sn.Close()
+ sntx = d.st.NewSnapshot()
+ defer sntx.Abort()
}
// queryDb implements query_db.Database
// which is needed by the query package's
@@ -291,7 +290,7 @@
ctx: ctx,
call: call,
req: d,
- st: st,
+ sntx: sntx,
}
return impl(query_exec.Exec(db, q))
@@ -349,14 +348,11 @@
}
// Check perms.
sn := d.st.NewSnapshot()
- closeSnapshot := func() error {
- return sn.Close()
- }
if err := util.GetWithAuth(ctx, call, sn, d.stKey(), &databaseData{}); err != nil {
- closeSnapshot()
+ sn.Abort()
return nil, err
}
- return util.Glob(ctx, call, "*", sn, closeSnapshot, util.TablePrefix)
+ return util.Glob(ctx, call, "*", sn, sn.Abort, util.TablePrefix)
}
////////////////////////////////////////
@@ -421,9 +417,9 @@
if !d.exists {
vlog.Fatalf("database %q does not exist", d.name)
}
- return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(d.st, func(tx store.Transaction) error {
data := &databaseData{}
- return util.UpdateWithAuth(ctx, call, st, d.stKey(), data, func() error {
+ return util.UpdateWithAuth(ctx, call, tx, d.stKey(), data, func() error {
if err := util.CheckVersion(ctx, version, data.Version); err != nil {
return err
}
@@ -446,7 +442,7 @@
ctx *context.T
call wire.DatabaseExecServerCall
req *databaseReq
- st store.StoreReader
+ sntx store.SnapshotOrTransaction
}
func (db *queryDb) GetContext() *context.T {
@@ -462,7 +458,7 @@
},
}
// Now that we have a table, we need to check permissions.
- if err := util.GetWithAuth(db.ctx, db.call, db.st, tDb.req.stKey(), &tableData{}); err != nil {
+ if err := util.GetWithAuth(db.ctx, db.call, db.sntx, tDb.req.stKey(), &tableData{}); err != nil {
return nil, err
}
return tDb, nil
@@ -478,7 +474,7 @@
for _, keyRange := range keyRanges {
// TODO(jkline): For now, acquire all of the streams at once to minimize the race condition.
// Need a way to Scan multiple ranges at the same state of uncommitted changes.
- streams = append(streams, t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.req.name), keyRange.Start, keyRange.Limit)))
+ streams = append(streams, t.qdb.sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.req.name), keyRange.Start, keyRange.Limit)))
}
return &kvs{
t: t,
@@ -568,7 +564,7 @@
return util.DatabasePrefix
}
-func (d *databaseReq) batchReader() store.StoreReader {
+func (d *databaseReq) batchReader() store.SnapshotOrTransaction {
if d.batchId == nil {
return nil
} else if d.sn != nil {
@@ -578,7 +574,7 @@
}
}
-func (d *databaseReq) batchReadWriter() (store.StoreReadWriter, error) {
+func (d *databaseReq) batchTransaction() (store.Transaction, error) {
if d.batchId == nil {
return nil, nil
} else if d.tx != nil {
diff --git a/services/syncbase/server/nosql/database_sm.go b/services/syncbase/server/nosql/database_sm.go
index 5d1239e..3c87a6b 100644
--- a/services/syncbase/server/nosql/database_sm.go
+++ b/services/syncbase/server/nosql/database_sm.go
@@ -41,9 +41,9 @@
}
// Check permissions on Database and store schema metadata.
- return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(d.st, func(tx store.Transaction) error {
dbData := databaseData{}
- return util.UpdateWithAuth(ctx, call, st, d.stKey(), &dbData, func() error {
+ return util.UpdateWithAuth(ctx, call, tx, d.stKey(), &dbData, func() error {
// NOTE: For now we expect the client to not issue multiple
// concurrent SetSchemaMetadata calls.
dbData.SchemaMetadata = &metadata
diff --git a/services/syncbase/server/nosql/row.go b/services/syncbase/server/nosql/row.go
index 62a5f3a..a6fbf9d 100644
--- a/services/syncbase/server/nosql/row.go
+++ b/services/syncbase/server/nosql/row.go
@@ -32,35 +32,33 @@
}
func (r *rowReq) Get(ctx *context.T, call rpc.ServerCall, schemaVersion int32) ([]byte, error) {
- impl := func(st store.StoreReader) ([]byte, error) {
+ impl := func(sntx store.SnapshotOrTransaction) ([]byte, error) {
if err := r.t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return []byte{}, err
}
- return r.get(ctx, call, st)
+ return r.get(ctx, call, sntx)
}
- var st store.StoreReader
if r.t.d.batchId != nil {
- st = r.t.d.batchReader()
+ return impl(r.t.d.batchReader())
} else {
sn := r.t.d.st.NewSnapshot()
- st = sn
- defer sn.Close()
+ defer sn.Abort()
+ return impl(sn)
}
- return impl(st)
}
func (r *rowReq) Put(ctx *context.T, call rpc.ServerCall, schemaVersion int32, value []byte) error {
- impl := func(st store.StoreReadWriter) error {
+ impl := func(tx store.Transaction) error {
if err := r.t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- return r.put(ctx, call, st, value)
+ return r.put(ctx, call, tx, value)
}
if r.t.d.batchId != nil {
- if st, err := r.t.d.batchReadWriter(); err != nil {
+ if tx, err := r.t.d.batchTransaction(); err != nil {
return err
} else {
- return impl(st)
+ return impl(tx)
}
} else {
return store.RunInTransaction(r.t.d.st, impl)
@@ -68,17 +66,17 @@
}
func (r *rowReq) Delete(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
- impl := func(st store.StoreReadWriter) error {
+ impl := func(tx store.Transaction) error {
if err := r.t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- return r.delete(ctx, call, st)
+ return r.delete(ctx, call, tx)
}
if r.t.d.batchId != nil {
- if st, err := r.t.d.batchReadWriter(); err != nil {
+ if tx, err := r.t.d.batchTransaction(); err != nil {
return err
} else {
- return impl(st)
+ return impl(tx)
}
} else {
return store.RunInTransaction(r.t.d.st, impl)
@@ -98,17 +96,17 @@
// checkAccess checks that this row's table exists in the database, and performs
// an authorization check.
-func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
- return r.t.checkAccess(ctx, call, st, r.key)
+func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction) error {
+ return r.t.checkAccess(ctx, call, sntx, r.key)
}
// get reads data from the storage engine.
// Performs authorization check.
-func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
- if err := r.checkAccess(ctx, call, st); err != nil {
+func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction) ([]byte, error) {
+ if err := r.checkAccess(ctx, call, sntx); err != nil {
return nil, err
}
- value, err := st.Get([]byte(r.stKey()), nil)
+ value, err := sntx.Get([]byte(r.stKey()), nil)
if err != nil {
if verror.ErrorID(err) == store.ErrUnknownKey.ID {
return nil, verror.New(verror.ErrNoExist, ctx, r.stKey())
@@ -120,11 +118,11 @@
// put writes data to the storage engine.
// Performs authorization check.
-func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
- if err := r.checkAccess(ctx, call, st); err != nil {
+func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, tx store.Transaction, value []byte) error {
+ if err := r.checkAccess(ctx, call, tx); err != nil {
return err
}
- if err := st.Put([]byte(r.stKey()), value); err != nil {
+ if err := tx.Put([]byte(r.stKey()), value); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
@@ -132,11 +130,11 @@
// delete deletes data from the storage engine.
// Performs authorization check.
-func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
- if err := r.checkAccess(ctx, call, st); err != nil {
+func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, tx store.Transaction) error {
+ if err := r.checkAccess(ctx, call, tx); err != nil {
return err
}
- if err := st.Delete([]byte(r.stKey())); err != nil {
+ if err := tx.Delete([]byte(r.stKey())); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
diff --git a/services/syncbase/server/nosql/table.go b/services/syncbase/server/nosql/table.go
index 7b371e0..558a1c1 100644
--- a/services/syncbase/server/nosql/table.go
+++ b/services/syncbase/server/nosql/table.go
@@ -37,14 +37,14 @@
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(t.d.st, func(tx store.Transaction) error {
// Check databaseData perms.
dData := &databaseData{}
- if err := util.GetWithAuth(ctx, call, st, t.d.stKey(), dData); err != nil {
+ if err := util.GetWithAuth(ctx, call, tx, t.d.stKey(), dData); err != nil {
return err
}
// Check for "table already exists".
- if err := util.Get(ctx, st, t.stKey(), &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.Get(ctx, tx, t.stKey(), &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -59,7 +59,7 @@
Name: t.name,
Perms: perms,
}
- return util.Put(ctx, st, t.stKey(), data)
+ return util.Put(ctx, tx, t.stKey(), data)
})
}
@@ -70,16 +70,16 @@
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(t.d.st, func(tx store.Transaction) error {
// Read-check-delete tableData.
- if err := util.GetWithAuth(ctx, call, st, t.stKey(), &tableData{}); err != nil {
+ if err := util.GetWithAuth(ctx, call, tx, t.stKey(), &tableData{}); err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
return nil // delete is idempotent
}
return err
}
// TODO(sadovsky): Delete all rows in this table.
- return util.Delete(ctx, st, t.stKey())
+ return util.Delete(ctx, tx, t.stKey())
})
}
@@ -91,9 +91,9 @@
}
func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, schemaVersion int32, start, limit []byte) error {
- impl := func(st store.StoreReadWriter) error {
+ impl := func(tx store.Transaction) error {
// Check for table-level access before doing a scan.
- if err := t.checkAccess(ctx, call, st, ""); err != nil {
+ if err := t.checkAccess(ctx, call, tx, ""); err != nil {
return err
}
// Check if the db schema version and the version provided by client
@@ -101,21 +101,21 @@
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
+ it := tx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
key := []byte{}
for it.Advance() {
key = it.Key(key)
// Check perms.
parts := util.SplitKeyParts(string(key))
externalKey := parts[len(parts)-1]
- if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+ if err := t.checkAccess(ctx, call, tx, externalKey); err != nil {
// TODO(rogulenko): Revisit this behavior. Probably we should
// delete all rows that we have access to.
it.Cancel()
return err
}
// Delete the key-value pair.
- if err := st.Delete(key); err != nil {
+ if err := tx.Delete(key); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
}
@@ -125,10 +125,10 @@
return nil
}
if t.d.batchId != nil {
- if st, err := t.d.batchReadWriter(); err != nil {
+ if tx, err := t.d.batchTransaction(); err != nil {
return err
} else {
- return impl(st)
+ return impl(tx)
}
} else {
return store.RunInTransaction(t.d.st, impl)
@@ -136,15 +136,15 @@
}
func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, schemaVersion int32, start, limit []byte) error {
- impl := func(st store.StoreReader) error {
+ impl := func(sntx store.SnapshotOrTransaction) error {
// Check for table-level access before doing a scan.
- if err := t.checkAccess(ctx, call, st, ""); err != nil {
+ if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
return err
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
+ it := sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
sender := call.SendStream()
key, value := []byte{}, []byte{}
for it.Advance() {
@@ -152,7 +152,7 @@
// Check perms.
parts := util.SplitKeyParts(string(key))
externalKey := parts[len(parts)-1]
- if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+ if err := t.checkAccess(ctx, call, sntx, externalKey); err != nil {
it.Cancel()
return err
}
@@ -163,28 +163,26 @@
}
return nil
}
- var st store.StoreReader
if t.d.batchId != nil {
- st = t.d.batchReader()
+ return impl(t.d.batchReader())
} else {
- sn := t.d.st.NewSnapshot()
- st = sn
- defer sn.Close()
+ sntx := t.d.st.NewSnapshot()
+ defer sntx.Abort()
+ return impl(sntx)
}
- return impl(st)
}
func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, schemaVersion int32, key string) ([]wire.PrefixPermissions, error) {
- impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
+ impl := func(sntx store.SnapshotOrTransaction) ([]wire.PrefixPermissions, error) {
// Check permissions only at table level.
- if err := t.checkAccess(ctx, call, st, ""); err != nil {
+ if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
return nil, err
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return nil, err
}
// Get the most specific permissions object.
- prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+ prefix, prefixPerms, err := t.permsForKey(ctx, sntx, key)
if err != nil {
return nil, err
}
@@ -192,27 +190,25 @@
// Collect all parent permissions objects all the way up to the table level.
for prefix != "" {
prefix = prefixPerms.Parent
- if prefixPerms, err = t.permsForPrefix(ctx, st, prefixPerms.Parent); err != nil {
+ if prefixPerms, err = t.permsForPrefix(ctx, sntx, prefixPerms.Parent); err != nil {
return nil, err
}
result = append(result, wire.PrefixPermissions{Prefix: prefix, Perms: prefixPerms.Perms})
}
return result, nil
}
- var st store.StoreReader
if t.d.batchId != nil {
- st = t.d.batchReader()
+ return impl(t.d.batchReader())
} else {
- sn := t.d.st.NewSnapshot()
- st = sn
- defer sn.Close()
+ sntx := t.d.st.NewSnapshot()
+ defer sntx.Abort()
+ return impl(sntx)
}
- return impl(st)
}
func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, schemaVersion int32, prefix string, perms access.Permissions) error {
- impl := func(st store.StoreReadWriter) error {
- if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+ impl := func(tx store.Transaction) error {
+ if err := t.checkAccess(ctx, call, tx, prefix); err != nil {
return err
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
@@ -220,18 +216,18 @@
}
// Concurrent transactions that touch this table should fail with
// ErrConcurrentTransaction when this transaction commits.
- if err := t.lock(ctx, st); err != nil {
+ if err := t.lock(ctx, tx); err != nil {
return err
}
if prefix == "" {
data := &tableData{}
- return util.UpdateWithAuth(ctx, call, st, t.stKey(), data, func() error {
+ return util.UpdateWithAuth(ctx, call, tx, t.stKey(), data, func() error {
data.Perms = perms
return nil
})
}
// Get the most specific permissions object.
- parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+ parent, prefixPerms, err := t.permsForKey(ctx, tx, prefix)
if err != nil {
return err
}
@@ -240,7 +236,7 @@
// parents for all children of the prefix to the node corresponding to
// the prefix.
if parent != prefix {
- if err := t.updateParentRefs(ctx, st, prefix, prefix); err != nil {
+ if err := t.updateParentRefs(ctx, tx, prefix, prefix); err != nil {
return err
}
} else {
@@ -250,16 +246,16 @@
stPrefixLimit := stPrefix + util.PrefixRangeLimitSuffix
prefixPerms = stPrefixPerms{Parent: parent, Perms: perms}
// Put the (prefix, perms) pair to the database.
- if err := util.Put(ctx, st, stPrefix, prefixPerms); err != nil {
+ if err := util.Put(ctx, tx, stPrefix, prefixPerms); err != nil {
return err
}
- return util.Put(ctx, st, stPrefixLimit, prefixPerms)
+ return util.Put(ctx, tx, stPrefixLimit, prefixPerms)
}
if t.d.batchId != nil {
- if st, err := t.d.batchReadWriter(); err != nil {
+ if tx, err := t.d.batchTransaction(); err != nil {
return err
} else {
- return impl(st)
+ return impl(tx)
}
} else {
return store.RunInTransaction(t.d.st, impl)
@@ -270,8 +266,8 @@
if prefix == "" {
return verror.New(verror.ErrBadArg, ctx, prefix)
}
- impl := func(st store.StoreReadWriter) error {
- if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+ impl := func(tx store.Transaction) error {
+ if err := t.checkAccess(ctx, call, tx, prefix); err != nil {
return err
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
@@ -279,11 +275,11 @@
}
// Concurrent transactions that touch this table should fail with
// ErrConcurrentTransaction when this transaction commits.
- if err := t.lock(ctx, st); err != nil {
+ if err := t.lock(ctx, tx); err != nil {
return err
}
// Get the most specific permissions object.
- parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+ parent, prefixPerms, err := t.permsForKey(ctx, tx, prefix)
if err != nil {
return err
}
@@ -295,21 +291,21 @@
// We need to delete the node corresponding to the prefix from the prefix
// permissions tree. We do it by updating parents for all children of the
// prefix to the parent of the node corresponding to the prefix.
- if err := t.updateParentRefs(ctx, st, prefix, prefixPerms.Parent); err != nil {
+ if err := t.updateParentRefs(ctx, tx, prefix, prefixPerms.Parent); err != nil {
return err
}
stPrefix := []byte(t.prefixPermsKey(prefix))
stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
- if err := st.Delete(stPrefix); err != nil {
+ if err := tx.Delete(stPrefix); err != nil {
return err
}
- return st.Delete(stPrefixLimit)
+ return tx.Delete(stPrefixLimit)
}
if t.d.batchId != nil {
- if st, err := t.d.batchReadWriter(); err != nil {
+ if tx, err := t.d.batchTransaction(); err != nil {
return err
} else {
- return impl(st)
+ return impl(tx)
}
} else {
return store.RunInTransaction(t.d.st, impl)
@@ -317,30 +313,23 @@
}
func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
- impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
+ impl := func(sntx store.SnapshotOrTransaction, closeSntx func() error) (<-chan string, error) {
// Check perms.
- if err := t.checkAccess(ctx, call, st, ""); err != nil {
- closeStoreReader()
+ if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
+ closeSntx()
return nil, err
}
// TODO(rogulenko): Check prefix permissions for children.
- return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
+ return util.Glob(ctx, call, "*", sntx, closeSntx, util.JoinKeyParts(util.RowPrefix, t.name))
}
- var st store.StoreReader
- var closeStoreReader func() error
if t.d.batchId != nil {
- st = t.d.batchReader()
- closeStoreReader = func() error {
+ return impl(t.d.batchReader(), func() error {
return nil
- }
+ })
} else {
sn := t.d.st.NewSnapshot()
- st = sn
- closeStoreReader = func() error {
- return sn.Close()
- }
+ return impl(sn, sn.Abort)
}
- return impl(st, closeStoreReader)
}
////////////////////////////////////////
@@ -356,11 +345,11 @@
// updateParentRefs updates the parent for all children of the given
// prefix to newParent.
-func (t *tableReq) updateParentRefs(ctx *context.T, st store.StoreReadWriter, prefix, newParent string) error {
+func (t *tableReq) updateParentRefs(ctx *context.T, tx store.Transaction, prefix, newParent string) error {
stPrefix := []byte(t.prefixPermsKey(prefix))
stPrefixStart := append(stPrefix, 0)
stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
- it := st.Scan(stPrefixStart, stPrefixLimit)
+ it := tx.Scan(stPrefixStart, stPrefixLimit)
var key, value []byte
for it.Advance() {
key, value = it.Key(key), it.Value(value)
@@ -370,7 +359,7 @@
return verror.New(verror.ErrInternal, ctx, err)
}
prefixPerms.Parent = newParent
- if err := util.Put(ctx, st, string(key), prefixPerms); err != nil {
+ if err := util.Put(ctx, tx, string(key), prefixPerms); err != nil {
it.Cancel()
return err
}
@@ -393,12 +382,12 @@
// TODO(rogulenko): Revisit this behavior to provide more granularity.
// One option is to add a prefix and its parent to the write set of the current
// transaction when the permissions object for that prefix is updated.
-func (t *tableReq) lock(ctx *context.T, st store.StoreReadWriter) error {
+func (t *tableReq) lock(ctx *context.T, tx store.Transaction) error {
var data tableData
- if err := util.Get(ctx, st, t.stKey(), &data); err != nil {
+ if err := util.Get(ctx, tx, t.stKey(), &data); err != nil {
return err
}
- return util.Put(ctx, st, t.stKey(), data)
+ return util.Put(ctx, tx, t.stKey(), data)
}
// checkAccess checks that this table exists in the database, and performs
@@ -407,13 +396,13 @@
// TODO(rogulenko): Revisit this behavior. Eventually we'll want the table-level
// access check to be a check for "Resolve", i.e. also check access to
// service, app and database.
-func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader, key string) error {
- prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction, key string) error {
+ prefix, prefixPerms, err := t.permsForKey(ctx, sntx, key)
if err != nil {
return err
}
if prefix != "" {
- if err := util.GetWithAuth(ctx, call, st, t.stKey(), &tableData{}); err != nil {
+ if err := util.GetWithAuth(ctx, call, sntx, t.stKey(), &tableData{}); err != nil {
return err
}
}
@@ -448,10 +437,10 @@
// proper prefixes of K are less than K; parent(t) is a prefix of K, otherwise
// K < parent(t) < t; parent(t) is the largest prefix of K, otherwise t is a
// prefix of K; in this case line 3 returns correct result.
-func (t *tableReq) permsForKey(ctx *context.T, st store.StoreReader, key string) (string, stPrefixPerms, error) {
- it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
+func (t *tableReq) permsForKey(ctx *context.T, sntx store.SnapshotOrTransaction, key string) (string, stPrefixPerms, error) {
+ it := sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
if !it.Advance() {
- prefixPerms, err := t.permsForPrefix(ctx, st, "")
+ prefixPerms, err := t.permsForPrefix(ctx, sntx, "")
return "", prefixPerms, err
}
defer it.Cancel()
@@ -465,22 +454,22 @@
if strings.HasPrefix(key, prefix) {
return prefix, prefixPerms, nil
}
- prefixPerms, err := t.permsForPrefix(ctx, st, prefixPerms.Parent)
+ prefixPerms, err := t.permsForPrefix(ctx, sntx, prefixPerms.Parent)
return prefixPerms.Parent, prefixPerms, err
}
// permsForPrefix returns the permissions object associated with the
// provided prefix.
-func (t *tableReq) permsForPrefix(ctx *context.T, st store.StoreReader, prefix string) (stPrefixPerms, error) {
+func (t *tableReq) permsForPrefix(ctx *context.T, sntx store.SnapshotOrTransaction, prefix string) (stPrefixPerms, error) {
if prefix == "" {
var data tableData
- if err := util.Get(ctx, st, t.stKey(), &data); err != nil {
+ if err := util.Get(ctx, sntx, t.stKey(), &data); err != nil {
return stPrefixPerms{}, err
}
return stPrefixPerms{Perms: data.Perms}, nil
}
var prefixPerms stPrefixPerms
- if err := util.Get(ctx, st, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
+ if err := util.Get(ctx, sntx, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
return stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
}
return prefixPerms, nil
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index 8ed1480..5b87252 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -139,9 +139,9 @@
// RPC methods
func (s *service) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
- return store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(s.st, func(tx store.Transaction) error {
data := &serviceData{}
- return util.UpdateWithAuth(ctx, call, st, s.stKey(), data, func() error {
+ return util.UpdateWithAuth(ctx, call, tx, s.stKey(), data, func() error {
if err := util.CheckVersion(ctx, version, data.Version); err != nil {
return err
}
@@ -163,14 +163,11 @@
func (s *service) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
// Check perms.
sn := s.st.NewSnapshot()
- closeSnapshot := func() error {
- return sn.Close()
- }
if err := util.GetWithAuth(ctx, call, sn, s.stKey(), &serviceData{}); err != nil {
- closeSnapshot()
+ sn.Abort()
return nil, err
}
- return util.Glob(ctx, call, "*", sn, closeSnapshot, util.AppPrefix)
+ return util.Glob(ctx, call, "*", sn, sn.Abort, util.AppPrefix)
}
////////////////////////////////////////
@@ -225,14 +222,14 @@
dbs: make(map[string]interfaces.Database),
}
- if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+ if err := store.RunInTransaction(s.st, func(tx store.Transaction) error {
// Check serviceData perms.
sData := &serviceData{}
- if err := util.GetWithAuth(ctx, call, st, s.stKey(), sData); err != nil {
+ if err := util.GetWithAuth(ctx, call, tx, s.stKey(), sData); err != nil {
return err
}
// Check for "app already exists".
- if err := util.Get(ctx, st, a.stKey(), &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.Get(ctx, tx, a.stKey(), &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -246,7 +243,7 @@
Name: appName,
Perms: perms,
}
- return util.Put(ctx, st, a.stKey(), data)
+ return util.Put(ctx, tx, a.stKey(), data)
}); err != nil {
return err
}
@@ -263,16 +260,16 @@
return nil // delete is idempotent
}
- if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+ if err := store.RunInTransaction(s.st, func(tx store.Transaction) error {
// Read-check-delete appData.
- if err := util.GetWithAuth(ctx, call, st, a.stKey(), &appData{}); err != nil {
+ if err := util.GetWithAuth(ctx, call, tx, a.stKey(), &appData{}); err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
return nil // delete is idempotent
}
return err
}
// TODO(sadovsky): Delete all databases in this app.
- return util.Delete(ctx, st, a.stKey())
+ return util.Delete(ctx, tx, a.stKey())
}); err != nil {
return err
}
@@ -288,9 +285,9 @@
if !ok {
return verror.New(verror.ErrNoExist, ctx, appName)
}
- return store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(s.st, func(tx store.Transaction) error {
data := &appData{}
- return util.UpdateWithAuth(ctx, call, st, a.stKey(), data, func() error {
+ return util.UpdateWithAuth(ctx, call, tx, a.stKey(), data, func() error {
if err := util.CheckVersion(ctx, version, data.Version); err != nil {
return err
}
diff --git a/services/syncbase/server/util/glob.go b/services/syncbase/server/util/glob.go
index acdd74f..2cbc3f9 100644
--- a/services/syncbase/server/util/glob.go
+++ b/services/syncbase/server/util/glob.go
@@ -53,29 +53,29 @@
return res, nil
}
-// Glob performs a glob. It calls closeStoreReader to close st.
+// Glob performs a glob. It calls closeSntx to close sntx.
// TODO(sadovsky): Why do we make developers implement Glob differently from
// other streaming RPCs? It's confusing that Glob must return immediately and
// write its results to a channel, while other streaming RPC handlers must block
// and write their results to the output stream. See nlacasse's TODO below, too.
-func Glob(ctx *context.T, call rpc.ServerCall, pattern string, st store.StoreReader, closeStoreReader func() error, stKeyPrefix string) (<-chan string, error) {
+func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sntx store.SnapshotOrTransaction, closeSntx func() error, stKeyPrefix string) (<-chan string, error) {
// TODO(sadovsky): Support glob with non-prefix pattern.
if _, err := glob.Parse(pattern); err != nil {
- closeStoreReader()
+ closeSntx()
return nil, verror.New(verror.ErrBadArg, ctx, err)
}
prefix, err := globPatternToPrefix(pattern)
if err != nil {
- closeStoreReader()
+ closeSntx()
if verror.ErrorID(err) == verror.ErrBadArg.ID {
return nil, verror.NewErrNotImplemented(ctx)
}
return nil, verror.New(verror.ErrInternal, ctx, err)
}
- it := st.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
+ it := sntx.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
ch := make(chan string)
go func() {
- defer closeStoreReader()
+ defer closeSntx()
defer close(ch)
key := []byte{}
for it.Advance() {
diff --git a/services/syncbase/server/util/store_util.go b/services/syncbase/server/util/store_util.go
index 38fa8ce..b8f1905 100644
--- a/services/syncbase/server/util/store_util.go
+++ b/services/syncbase/server/util/store_util.go
@@ -64,21 +64,21 @@
return nil
}
-// Put does st.Put(k, v) and wraps the returned error.
-func Put(ctx *context.T, st store.StoreWriter, k string, v interface{}) error {
+// Put does stw.Put(k, v) and wraps the returned error.
+func Put(ctx *context.T, stw store.StoreWriter, k string, v interface{}) error {
bytes, err := vom.Encode(v)
if err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
- if err = st.Put([]byte(k), bytes); err != nil {
+ if err = stw.Put([]byte(k), bytes); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
-// Delete does st.Delete(k, v) and wraps the returned error.
-func Delete(ctx *context.T, st store.StoreWriter, k string) error {
- if err := st.Delete([]byte(k)); err != nil {
+// Delete does stw.Delete(k, v) and wraps the returned error.
+func Delete(ctx *context.T, stw store.StoreWriter, k string) error {
+ if err := stw.Delete([]byte(k)); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
@@ -87,15 +87,14 @@
// UpdateWithAuth performs a read-modify-write.
// Input v is populated by the "read" step. fn should "modify" v.
// Performs an auth check as part of the "read" step.
-func UpdateWithAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, k string, v Permser, fn func() error) error {
- _ = st.(store.Transaction) // panics on failure, as desired
- if err := GetWithAuth(ctx, call, st, k, v); err != nil {
+func UpdateWithAuth(ctx *context.T, call rpc.ServerCall, tx store.Transaction, k string, v Permser, fn func() error) error {
+ if err := GetWithAuth(ctx, call, tx, k, v); err != nil {
return err
}
if err := fn(); err != nil {
return err
}
- return Put(ctx, st, k, v)
+ return Put(ctx, tx, k, v)
}
// Wraps a call to Get and returns true if Get found the object, false
diff --git a/services/syncbase/server/watchable/snapshot.go b/services/syncbase/server/watchable/snapshot.go
index ac3694f..37af4e1 100644
--- a/services/syncbase/server/watchable/snapshot.go
+++ b/services/syncbase/server/watchable/snapshot.go
@@ -9,6 +9,7 @@
)
type snapshot struct {
+ store.SnapshotSpecImpl
isn store.Snapshot
st *wstore
}
@@ -22,9 +23,9 @@
}
}
-// Close implements the store.Snapshot interface.
-func (s *snapshot) Close() error {
- return s.isn.Close()
+// Abort implements the store.Snapshot interface.
+func (s *snapshot) Abort() error {
+ return s.isn.Abort()
}
// Get implements the store.StoreReader interface.
diff --git a/services/syncbase/server/watchable/store.go b/services/syncbase/server/watchable/store.go
index 2b0037a..010643d 100644
--- a/services/syncbase/server/watchable/store.go
+++ b/services/syncbase/server/watchable/store.go
@@ -79,7 +79,7 @@
return st.ist.Get(key, valbuf)
}
sn := newSnapshot(st)
- defer sn.Close()
+ defer sn.Abort()
return sn.Get(key, valbuf)
}
@@ -95,16 +95,16 @@
// Put implements the store.StoreWriter interface.
func (st *wstore) Put(key, value []byte) error {
// Use watchable.Store transaction so this op gets logged.
- return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- return st.Put(key, value)
+ return store.RunInTransaction(st, func(tx store.Transaction) error {
+ return tx.Put(key, value)
})
}
// Delete implements the store.StoreWriter interface.
func (st *wstore) Delete(key []byte) error {
// Use watchable.Store transaction so this op gets logged.
- return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- return st.Delete(key)
+ return store.RunInTransaction(st, func(tx store.Transaction) error {
+ return tx.Delete(key)
})
}
diff --git a/services/syncbase/server/watchable/stream.go b/services/syncbase/server/watchable/stream.go
index 9c8c392..26502e1 100644
--- a/services/syncbase/server/watchable/stream.go
+++ b/services/syncbase/server/watchable/stream.go
@@ -13,7 +13,7 @@
// stream streams keys and values for versioned records.
type stream struct {
iit store.Stream
- st store.StoreReader
+ sntx store.SnapshotOrTransaction
mu sync.Mutex
err error
hasValue bool
@@ -25,11 +25,10 @@
// newStreamVersioned creates a new stream. It assumes all records in range
// [start, limit) are managed, i.e. versioned.
-func newStreamVersioned(st store.StoreReader, start, limit []byte) *stream {
- checkTransactionOrSnapshot(st)
+func newStreamVersioned(sntx store.SnapshotOrTransaction, start, limit []byte) *stream {
return &stream{
- iit: st.Scan(makeVersionKey(start), makeVersionKey(limit)),
- st: st,
+ iit: sntx.Scan(makeVersionKey(start), makeVersionKey(limit)),
+ sntx: sntx,
}
}
@@ -46,7 +45,7 @@
}
versionKey, version := s.iit.Key(nil), s.iit.Value(nil)
s.key = []byte(join(split(string(versionKey))[1:]...)) // drop "$version" prefix
- s.value, s.err = s.st.Get(makeAtVersionKey(s.key, version), nil)
+ s.value, s.err = s.sntx.Get(makeAtVersionKey(s.key, version), nil)
if s.err != nil {
return false
}
diff --git a/services/syncbase/server/watchable/transaction.go b/services/syncbase/server/watchable/transaction.go
index 93b6b33..fb82f72 100644
--- a/services/syncbase/server/watchable/transaction.go
+++ b/services/syncbase/server/watchable/transaction.go
@@ -177,7 +177,7 @@
// operations (create, join, leave, destroy) to notify the sync watcher of the
// change at its proper position in the timeline (the transaction commit).
// Note: this is an internal function used by sync, not part of the interface.
-func AddSyncGroupOp(ctx *context.T, tx store.StoreReadWriter, prefixes []string, remove bool) error {
+func AddSyncGroupOp(ctx *context.T, tx store.Transaction, prefixes []string, remove bool) error {
wtx := tx.(*transaction)
wtx.mu.Lock()
defer wtx.mu.Unlock()
@@ -195,7 +195,7 @@
// current keys and their versions to use when initializing the sync metadata
// at the point in the timeline when these keys become syncable (at commit).
// Note: this is an internal function used by sync, not part of the interface.
-func AddSyncSnapshotOp(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+func AddSyncSnapshotOp(ctx *context.T, tx store.Transaction, key, version []byte) error {
wtx := tx.(*transaction)
wtx.mu.Lock()
defer wtx.mu.Unlock()
@@ -227,10 +227,10 @@
// GetVersion returns the current version of a managed key. This method is used
// by the Sync module when the initiator is attempting to add new versions of
// objects. Reading the version key is used for optimistic concurrency
-// control. At minimum, an object implementing the StoreReader interface is
+// control. At minimum, an object implementing the Transaction interface is
// required since this is a Get operation.
-func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
- switch w := st.(type) {
+func GetVersion(ctx *context.T, tx store.Transaction, key []byte) ([]byte, error) {
+ switch w := tx.(type) {
case *transaction:
w.mu.Lock()
defer w.mu.Unlock()
@@ -238,8 +238,6 @@
return nil, convertError(w.err)
}
return getVersion(w.itx, key)
- case *wstore:
- return getVersion(w.ist, key)
}
return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
}
@@ -266,8 +264,8 @@
// PutAtVersion puts a value for the managed key at the requested version. This
// method is used by the Sync module exclusively when the initiator adds objects
// with versions created on other Syncbases. At minimum, an object implementing
-// the StoreReadWriter interface is required since this is a Put operation.
-func PutAtVersion(ctx *context.T, tx store.StoreReadWriter, key, valbuf, version []byte) error {
+// the Transaction interface is required since this is a Put operation.
+func PutAtVersion(ctx *context.T, tx store.Transaction, key, valbuf, version []byte) error {
wtx := tx.(*transaction)
wtx.mu.Lock()
@@ -285,8 +283,8 @@
// version. This method is used by the Sync module exclusively when the
// initiator selects which of the already stored versions (via PutAtVersion
// calls) becomes the current version. At minimum, an object implementing
-// the StoreReadWriter interface is required since this is a Put operation.
-func PutVersion(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+// the Transaction interface is required since this is a Put operation.
+func PutVersion(ctx *context.T, tx store.Transaction, key, version []byte) error {
wtx := tx.(*transaction)
wtx.mu.Lock()
diff --git a/services/syncbase/server/watchable/transaction_test.go b/services/syncbase/server/watchable/transaction_test.go
index 43b7b94..8b95536 100644
--- a/services/syncbase/server/watchable/transaction_test.go
+++ b/services/syncbase/server/watchable/transaction_test.go
@@ -33,17 +33,17 @@
updateVal: "val-b2",
}
-func checkAndUpdate(st store.StoreReadWriter, data testData) error {
+func checkAndUpdate(tx store.Transaction, data testData) error {
// check and update data1
keyBytes := []byte(data.key)
- val, err := st.Get(keyBytes, nil)
+ val, err := tx.Get(keyBytes, nil)
if err != nil {
return fmt.Errorf("can't get key %q: %v", data.key, err)
}
if !bytes.Equal(val, []byte(data.createVal)) {
return fmt.Errorf("Unexpected value for key %q: %q", data.key, string(val))
}
- if err := st.Put(keyBytes, []byte(data.updateVal)); err != nil {
+ if err := tx.Put(keyBytes, []byte(data.updateVal)); err != nil {
return fmt.Errorf("can't put {%q: %v}: %v", data.key, data.updateVal, err)
}
return nil
@@ -79,13 +79,13 @@
setMockSystemClock(wst1, mockClock)
// Create data in store
- if err := store.RunInTransaction(wst1, func(st store.StoreReadWriter) error {
+ if err := store.RunInTransaction(wst1, func(tx store.Transaction) error {
// add data1
- if err := st.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
+ if err := tx.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
return fmt.Errorf("can't put {%q: %v}: %v", data1.key, data1.createVal, err)
}
// add data2
- if err := st.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
+ if err := tx.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
return fmt.Errorf("can't put {%q: %v}: %v", data2.key, data2.createVal, err)
}
return nil
@@ -110,11 +110,11 @@
t.Errorf("unexpected sequence number for update. seq for create: %d, seq for update: %d", seqForCreate, seqForUpdate)
}
- if err := store.RunInTransaction(wst2, func(st store.StoreReadWriter) error {
- if err := checkAndUpdate(st, data1); err != nil {
+ if err := store.RunInTransaction(wst2, func(tx store.Transaction) error {
+ if err := checkAndUpdate(tx, data1); err != nil {
return err
}
- if err := checkAndUpdate(st, data2); err != nil {
+ if err := checkAndUpdate(tx, data2); err != nil {
return err
}
return nil
@@ -144,33 +144,33 @@
t.Fatalf("Wrap failed: %v", err)
}
- if err := store.RunInTransaction(wst, func(st store.StoreReadWriter) error {
+ if err := store.RunInTransaction(wst, func(tx store.Transaction) error {
putKey, putVal := []byte("foo"), []byte("bar")
- if err := st.Put(putKey, putVal); err != nil {
+ if err := tx.Put(putKey, putVal); err != nil {
return err
}
getKey := []byte("foo")
- if getVal, err := st.Get(getKey, nil); err != nil {
+ if getVal, err := tx.Get(getKey, nil); err != nil {
return err
} else {
eq(t, getVal, putVal)
}
start, limit := []byte("aaa"), []byte("bbb")
- st.Scan(start, limit)
+ tx.Scan(start, limit)
delKey := []byte("foo")
- if err := st.Delete(delKey); err != nil {
+ if err := tx.Delete(delKey); err != nil {
return err
}
sgPrefixes := []string{"sga", "sgb"}
- if err := AddSyncGroupOp(nil, st, sgPrefixes, false); err != nil {
+ if err := AddSyncGroupOp(nil, tx, sgPrefixes, false); err != nil {
return err
}
snKey, snVersion := []byte("aa"), []byte("123")
- if err := AddSyncSnapshotOp(nil, st, snKey, snVersion); err != nil {
+ if err := AddSyncSnapshotOp(nil, tx, snKey, snVersion); err != nil {
return err
}
pvKey, pvVersion := []byte("pv"), []byte("456")
- if err := PutVersion(nil, st, pvKey, pvVersion); err != nil {
+ if err := PutVersion(nil, tx, pvKey, pvVersion); err != nil {
return err
}
for _, buf := range [][]byte{putKey, putVal, getKey, start, limit, delKey, snKey, snVersion, pvKey, pvVersion} {
diff --git a/services/syncbase/server/watchable/util.go b/services/syncbase/server/watchable/util.go
index 3d08129..64f8eeb 100644
--- a/services/syncbase/server/watchable/util.go
+++ b/services/syncbase/server/watchable/util.go
@@ -50,21 +50,20 @@
return []byte(join(string(key), string(version)))
}
-func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
- return st.Get(makeVersionKey(key), nil)
+func getVersion(sntx store.SnapshotOrTransaction, key []byte) ([]byte, error) {
+ return sntx.Get(makeVersionKey(key), nil)
}
func getAtVersion(st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
return st.Get(makeAtVersionKey(key, version), valbuf)
}
-func getVersioned(st store.StoreReader, key, valbuf []byte) ([]byte, error) {
- checkTransactionOrSnapshot(st)
- version, err := getVersion(st, key)
+func getVersioned(sntx store.SnapshotOrTransaction, key, valbuf []byte) ([]byte, error) {
+ version, err := getVersion(sntx, key)
if err != nil {
return valbuf, err
}
- return getAtVersion(st, key, valbuf, version)
+ return getAtVersion(sntx, key, valbuf, version)
}
func putVersioned(tx store.Transaction, key, value []byte) ([]byte, error) {
@@ -82,14 +81,6 @@
return tx.Delete(makeVersionKey(key))
}
-func checkTransactionOrSnapshot(st store.StoreReader) {
- _, isTransaction := st.(store.Transaction)
- _, isSnapshot := st.(store.Snapshot)
- if !isTransaction && !isSnapshot {
- panic("neither a Transaction nor a Snapshot")
- }
-}
-
func getLogEntryKey(seq uint64) string {
// Note: MaxUint64 is 0xffffffffffffffff.
// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.