Merge "internal storage engine: improve compile-time typing"
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index 5dd9cad..957f4ed 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -88,14 +88,11 @@
}
// Check perms.
sn := a.s.st.NewSnapshot()
- closeSnapshot := func() error {
- return sn.Close()
- }
if err := util.GetWithAuth(ctx, call, sn, a.stKey(), &appData{}); err != nil {
- closeSnapshot()
+ sn.Abort()
return nil, err
}
- return util.Glob(ctx, call, "*", sn, closeSnapshot, util.JoinKeyParts(util.DbInfoPrefix, a.name))
+ return util.Glob(ctx, call, "*", sn, sn.Abort, util.JoinKeyParts(util.DbInfoPrefix, a.name))
}
////////////////////////////////////////
@@ -155,13 +152,13 @@
// 1. Check appData perms, create dbInfo record.
rootDir, engine := a.rootDirForDb(dbName), a.s.opts.Engine
aData := &appData{}
- if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
+ if err := store.RunInTransaction(a.s.st, func(tx store.Transaction) error {
// Check appData perms.
- if err := util.GetWithAuth(ctx, call, st, a.stKey(), aData); err != nil {
+ if err := util.GetWithAuth(ctx, call, tx, a.stKey(), aData); err != nil {
return err
}
// Check for "database already exists".
- if _, err := a.getDbInfo(ctx, st, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if _, err := a.getDbInfo(ctx, tx, dbName); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -174,7 +171,7 @@
RootDir: rootDir,
Engine: engine,
}
- return a.putDbInfo(ctx, st, dbName, info)
+ return a.putDbInfo(ctx, tx, dbName, info)
}); err != nil {
return err
}
@@ -193,8 +190,8 @@
}
// 3. Flip dbInfo.Initialized to true.
- if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
- return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
+ if err := store.RunInTransaction(a.s.st, func(tx store.Transaction) error {
+ return a.updateDbInfo(ctx, tx, dbName, func(info *dbInfo) error {
info.Initialized = true
return nil
})
@@ -235,8 +232,8 @@
}
// 2. Flip dbInfo.Deleted to true.
- if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
- return a.updateDbInfo(ctx, st, dbName, func(info *dbInfo) error {
+ if err := store.RunInTransaction(a.s.st, func(tx store.Transaction) error {
+ return a.updateDbInfo(ctx, tx, dbName, func(info *dbInfo) error {
info.Deleted = true
return nil
})
diff --git a/x/ref/services/syncbase/server/db_info.go b/x/ref/services/syncbase/server/db_info.go
index 39ffdc8..f750d57 100644
--- a/x/ref/services/syncbase/server/db_info.go
+++ b/x/ref/services/syncbase/server/db_info.go
@@ -24,33 +24,32 @@
}
// getDbInfo reads data from the storage engine.
-func (a *app) getDbInfo(ctx *context.T, st store.StoreReader, dbName string) (*dbInfo, error) {
+func (a *app) getDbInfo(ctx *context.T, sntx store.SnapshotOrTransaction, dbName string) (*dbInfo, error) {
info := &dbInfo{}
- if err := util.Get(ctx, st, dbInfoStKey(a, dbName), info); err != nil {
+ if err := util.Get(ctx, sntx, dbInfoStKey(a, dbName), info); err != nil {
return nil, err
}
return info, nil
}
// putDbInfo writes data to the storage engine.
-func (a *app) putDbInfo(ctx *context.T, st store.StoreWriter, dbName string, info *dbInfo) error {
- return util.Put(ctx, st, dbInfoStKey(a, dbName), info)
+func (a *app) putDbInfo(ctx *context.T, tx store.Transaction, dbName string, info *dbInfo) error {
+ return util.Put(ctx, tx, dbInfoStKey(a, dbName), info)
}
// delDbInfo deletes data from the storage engine.
-func (a *app) delDbInfo(ctx *context.T, st store.StoreWriter, dbName string) error {
- return util.Delete(ctx, st, dbInfoStKey(a, dbName))
+func (a *app) delDbInfo(ctx *context.T, stw store.StoreWriter, dbName string) error {
+ return util.Delete(ctx, stw, dbInfoStKey(a, dbName))
}
// updateDbInfo performs a read-modify-write. fn should "modify" v.
-func (a *app) updateDbInfo(ctx *context.T, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
- _ = st.(store.Transaction) // panics on failure, as desired
- info, err := a.getDbInfo(ctx, st, dbName)
+func (a *app) updateDbInfo(ctx *context.T, tx store.Transaction, dbName string, fn func(info *dbInfo) error) error {
+ info, err := a.getDbInfo(ctx, tx, dbName)
if err != nil {
return err
}
if err := fn(info); err != nil {
return err
}
- return a.putDbInfo(ctx, st, dbName, info)
+ return a.putDbInfo(ctx, tx, dbName, info)
}
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 3fe1b96..d4875c0 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -245,7 +245,7 @@
d.mu.Unlock()
}
} else {
- if err = d.sn.Close(); err == nil {
+ if err = d.sn.Abort(); err == nil {
d.mu.Lock()
delete(d.sns, *d.batchId)
d.mu.Unlock()
@@ -276,13 +276,12 @@
}
return rs.Err()
}
- var st store.StoreReader
+ var sntx store.SnapshotOrTransaction
if d.batchId != nil {
- st = d.batchReader()
+ sntx = d.batchReader()
} else {
- sn := d.st.NewSnapshot()
- st = sn
- defer sn.Close()
+ sntx = d.st.NewSnapshot()
+ defer sntx.Abort()
}
// queryDb implements query_db.Database
// which is needed by the query package's
@@ -291,7 +290,7 @@
ctx: ctx,
call: call,
req: d,
- st: st,
+ sntx: sntx,
}
return impl(query_exec.Exec(db, q))
@@ -349,14 +348,11 @@
}
// Check perms.
sn := d.st.NewSnapshot()
- closeSnapshot := func() error {
- return sn.Close()
- }
if err := util.GetWithAuth(ctx, call, sn, d.stKey(), &databaseData{}); err != nil {
- closeSnapshot()
+ sn.Abort()
return nil, err
}
- return util.Glob(ctx, call, "*", sn, closeSnapshot, util.TablePrefix)
+ return util.Glob(ctx, call, "*", sn, sn.Abort, util.TablePrefix)
}
////////////////////////////////////////
@@ -421,9 +417,9 @@
if !d.exists {
vlog.Fatalf("database %q does not exist", d.name)
}
- return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(d.st, func(tx store.Transaction) error {
data := &databaseData{}
- return util.UpdateWithAuth(ctx, call, st, d.stKey(), data, func() error {
+ return util.UpdateWithAuth(ctx, call, tx, d.stKey(), data, func() error {
if err := util.CheckVersion(ctx, version, data.Version); err != nil {
return err
}
@@ -446,7 +442,7 @@
ctx *context.T
call wire.DatabaseExecServerCall
req *databaseReq
- st store.StoreReader
+ sntx store.SnapshotOrTransaction
}
func (db *queryDb) GetContext() *context.T {
@@ -462,7 +458,7 @@
},
}
// Now that we have a table, we need to check permissions.
- if err := util.GetWithAuth(db.ctx, db.call, db.st, tDb.req.stKey(), &tableData{}); err != nil {
+ if err := util.GetWithAuth(db.ctx, db.call, db.sntx, tDb.req.stKey(), &tableData{}); err != nil {
return nil, err
}
return tDb, nil
@@ -478,7 +474,7 @@
for _, keyRange := range keyRanges {
// TODO(jkline): For now, acquire all of the streams at once to minimize the race condition.
// Need a way to Scan multiple ranges at the same state of uncommitted changes.
- streams = append(streams, t.qdb.st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.req.name), keyRange.Start, keyRange.Limit)))
+ streams = append(streams, t.qdb.sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.req.name), keyRange.Start, keyRange.Limit)))
}
return &kvs{
t: t,
@@ -568,7 +564,7 @@
return util.DatabasePrefix
}
-func (d *databaseReq) batchReader() store.StoreReader {
+func (d *databaseReq) batchReader() store.SnapshotOrTransaction {
if d.batchId == nil {
return nil
} else if d.sn != nil {
@@ -578,7 +574,7 @@
}
}
-func (d *databaseReq) batchReadWriter() (store.StoreReadWriter, error) {
+func (d *databaseReq) batchTransaction() (store.Transaction, error) {
if d.batchId == nil {
return nil, nil
} else if d.tx != nil {
diff --git a/x/ref/services/syncbase/server/nosql/database_sm.go b/x/ref/services/syncbase/server/nosql/database_sm.go
index 5d1239e..3c87a6b 100644
--- a/x/ref/services/syncbase/server/nosql/database_sm.go
+++ b/x/ref/services/syncbase/server/nosql/database_sm.go
@@ -41,9 +41,9 @@
}
// Check permissions on Database and store schema metadata.
- return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(d.st, func(tx store.Transaction) error {
dbData := databaseData{}
- return util.UpdateWithAuth(ctx, call, st, d.stKey(), &dbData, func() error {
+ return util.UpdateWithAuth(ctx, call, tx, d.stKey(), &dbData, func() error {
// NOTE: For now we expect the client to not issue multiple
// concurrent SetSchemaMetadata calls.
dbData.SchemaMetadata = &metadata
diff --git a/x/ref/services/syncbase/server/nosql/row.go b/x/ref/services/syncbase/server/nosql/row.go
index 62a5f3a..a6fbf9d 100644
--- a/x/ref/services/syncbase/server/nosql/row.go
+++ b/x/ref/services/syncbase/server/nosql/row.go
@@ -32,35 +32,33 @@
}
func (r *rowReq) Get(ctx *context.T, call rpc.ServerCall, schemaVersion int32) ([]byte, error) {
- impl := func(st store.StoreReader) ([]byte, error) {
+ impl := func(sntx store.SnapshotOrTransaction) ([]byte, error) {
if err := r.t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return []byte{}, err
}
- return r.get(ctx, call, st)
+ return r.get(ctx, call, sntx)
}
- var st store.StoreReader
if r.t.d.batchId != nil {
- st = r.t.d.batchReader()
+ return impl(r.t.d.batchReader())
} else {
sn := r.t.d.st.NewSnapshot()
- st = sn
- defer sn.Close()
+ defer sn.Abort()
+ return impl(sn)
}
- return impl(st)
}
func (r *rowReq) Put(ctx *context.T, call rpc.ServerCall, schemaVersion int32, value []byte) error {
- impl := func(st store.StoreReadWriter) error {
+ impl := func(tx store.Transaction) error {
if err := r.t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- return r.put(ctx, call, st, value)
+ return r.put(ctx, call, tx, value)
}
if r.t.d.batchId != nil {
- if st, err := r.t.d.batchReadWriter(); err != nil {
+ if tx, err := r.t.d.batchTransaction(); err != nil {
return err
} else {
- return impl(st)
+ return impl(tx)
}
} else {
return store.RunInTransaction(r.t.d.st, impl)
@@ -68,17 +66,17 @@
}
func (r *rowReq) Delete(ctx *context.T, call rpc.ServerCall, schemaVersion int32) error {
- impl := func(st store.StoreReadWriter) error {
+ impl := func(tx store.Transaction) error {
if err := r.t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- return r.delete(ctx, call, st)
+ return r.delete(ctx, call, tx)
}
if r.t.d.batchId != nil {
- if st, err := r.t.d.batchReadWriter(); err != nil {
+ if tx, err := r.t.d.batchTransaction(); err != nil {
return err
} else {
- return impl(st)
+ return impl(tx)
}
} else {
return store.RunInTransaction(r.t.d.st, impl)
@@ -98,17 +96,17 @@
// checkAccess checks that this row's table exists in the database, and performs
// an authorization check.
-func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
- return r.t.checkAccess(ctx, call, st, r.key)
+func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction) error {
+ return r.t.checkAccess(ctx, call, sntx, r.key)
}
// get reads data from the storage engine.
// Performs authorization check.
-func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
- if err := r.checkAccess(ctx, call, st); err != nil {
+func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction) ([]byte, error) {
+ if err := r.checkAccess(ctx, call, sntx); err != nil {
return nil, err
}
- value, err := st.Get([]byte(r.stKey()), nil)
+ value, err := sntx.Get([]byte(r.stKey()), nil)
if err != nil {
if verror.ErrorID(err) == store.ErrUnknownKey.ID {
return nil, verror.New(verror.ErrNoExist, ctx, r.stKey())
@@ -120,11 +118,11 @@
// put writes data to the storage engine.
// Performs authorization check.
-func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
- if err := r.checkAccess(ctx, call, st); err != nil {
+func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, tx store.Transaction, value []byte) error {
+ if err := r.checkAccess(ctx, call, tx); err != nil {
return err
}
- if err := st.Put([]byte(r.stKey()), value); err != nil {
+ if err := tx.Put([]byte(r.stKey()), value); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
@@ -132,11 +130,11 @@
// delete deletes data from the storage engine.
// Performs authorization check.
-func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
- if err := r.checkAccess(ctx, call, st); err != nil {
+func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, tx store.Transaction) error {
+ if err := r.checkAccess(ctx, call, tx); err != nil {
return err
}
- if err := st.Delete([]byte(r.stKey())); err != nil {
+ if err := tx.Delete([]byte(r.stKey())); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
diff --git a/x/ref/services/syncbase/server/nosql/table.go b/x/ref/services/syncbase/server/nosql/table.go
index 7b371e0..558a1c1 100644
--- a/x/ref/services/syncbase/server/nosql/table.go
+++ b/x/ref/services/syncbase/server/nosql/table.go
@@ -37,14 +37,14 @@
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(t.d.st, func(tx store.Transaction) error {
// Check databaseData perms.
dData := &databaseData{}
- if err := util.GetWithAuth(ctx, call, st, t.d.stKey(), dData); err != nil {
+ if err := util.GetWithAuth(ctx, call, tx, t.d.stKey(), dData); err != nil {
return err
}
// Check for "table already exists".
- if err := util.Get(ctx, st, t.stKey(), &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.Get(ctx, tx, t.stKey(), &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -59,7 +59,7 @@
Name: t.name,
Perms: perms,
}
- return util.Put(ctx, st, t.stKey(), data)
+ return util.Put(ctx, tx, t.stKey(), data)
})
}
@@ -70,16 +70,16 @@
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(t.d.st, func(tx store.Transaction) error {
// Read-check-delete tableData.
- if err := util.GetWithAuth(ctx, call, st, t.stKey(), &tableData{}); err != nil {
+ if err := util.GetWithAuth(ctx, call, tx, t.stKey(), &tableData{}); err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
return nil // delete is idempotent
}
return err
}
// TODO(sadovsky): Delete all rows in this table.
- return util.Delete(ctx, st, t.stKey())
+ return util.Delete(ctx, tx, t.stKey())
})
}
@@ -91,9 +91,9 @@
}
func (t *tableReq) DeleteRowRange(ctx *context.T, call rpc.ServerCall, schemaVersion int32, start, limit []byte) error {
- impl := func(st store.StoreReadWriter) error {
+ impl := func(tx store.Transaction) error {
// Check for table-level access before doing a scan.
- if err := t.checkAccess(ctx, call, st, ""); err != nil {
+ if err := t.checkAccess(ctx, call, tx, ""); err != nil {
return err
}
// Check if the db schema version and the version provided by client
@@ -101,21 +101,21 @@
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
+ it := tx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
key := []byte{}
for it.Advance() {
key = it.Key(key)
// Check perms.
parts := util.SplitKeyParts(string(key))
externalKey := parts[len(parts)-1]
- if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+ if err := t.checkAccess(ctx, call, tx, externalKey); err != nil {
// TODO(rogulenko): Revisit this behavior. Probably we should
// delete all rows that we have access to.
it.Cancel()
return err
}
// Delete the key-value pair.
- if err := st.Delete(key); err != nil {
+ if err := tx.Delete(key); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
}
@@ -125,10 +125,10 @@
return nil
}
if t.d.batchId != nil {
- if st, err := t.d.batchReadWriter(); err != nil {
+ if tx, err := t.d.batchTransaction(); err != nil {
return err
} else {
- return impl(st)
+ return impl(tx)
}
} else {
return store.RunInTransaction(t.d.st, impl)
@@ -136,15 +136,15 @@
}
func (t *tableReq) Scan(ctx *context.T, call wire.TableScanServerCall, schemaVersion int32, start, limit []byte) error {
- impl := func(st store.StoreReader) error {
+ impl := func(sntx store.SnapshotOrTransaction) error {
// Check for table-level access before doing a scan.
- if err := t.checkAccess(ctx, call, st, ""); err != nil {
+ if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
return err
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return err
}
- it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
+ it := sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.RowPrefix, t.name), string(start), string(limit)))
sender := call.SendStream()
key, value := []byte{}, []byte{}
for it.Advance() {
@@ -152,7 +152,7 @@
// Check perms.
parts := util.SplitKeyParts(string(key))
externalKey := parts[len(parts)-1]
- if err := t.checkAccess(ctx, call, st, externalKey); err != nil {
+ if err := t.checkAccess(ctx, call, sntx, externalKey); err != nil {
it.Cancel()
return err
}
@@ -163,28 +163,26 @@
}
return nil
}
- var st store.StoreReader
if t.d.batchId != nil {
- st = t.d.batchReader()
+ return impl(t.d.batchReader())
} else {
- sn := t.d.st.NewSnapshot()
- st = sn
- defer sn.Close()
+ sntx := t.d.st.NewSnapshot()
+ defer sntx.Abort()
+ return impl(sntx)
}
- return impl(st)
}
func (t *tableReq) GetPermissions(ctx *context.T, call rpc.ServerCall, schemaVersion int32, key string) ([]wire.PrefixPermissions, error) {
- impl := func(st store.StoreReader) ([]wire.PrefixPermissions, error) {
+ impl := func(sntx store.SnapshotOrTransaction) ([]wire.PrefixPermissions, error) {
// Check permissions only at table level.
- if err := t.checkAccess(ctx, call, st, ""); err != nil {
+ if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
return nil, err
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
return nil, err
}
// Get the most specific permissions object.
- prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+ prefix, prefixPerms, err := t.permsForKey(ctx, sntx, key)
if err != nil {
return nil, err
}
@@ -192,27 +190,25 @@
// Collect all parent permissions objects all the way up to the table level.
for prefix != "" {
prefix = prefixPerms.Parent
- if prefixPerms, err = t.permsForPrefix(ctx, st, prefixPerms.Parent); err != nil {
+ if prefixPerms, err = t.permsForPrefix(ctx, sntx, prefixPerms.Parent); err != nil {
return nil, err
}
result = append(result, wire.PrefixPermissions{Prefix: prefix, Perms: prefixPerms.Perms})
}
return result, nil
}
- var st store.StoreReader
if t.d.batchId != nil {
- st = t.d.batchReader()
+ return impl(t.d.batchReader())
} else {
- sn := t.d.st.NewSnapshot()
- st = sn
- defer sn.Close()
+ sntx := t.d.st.NewSnapshot()
+ defer sntx.Abort()
+ return impl(sntx)
}
- return impl(st)
}
func (t *tableReq) SetPermissions(ctx *context.T, call rpc.ServerCall, schemaVersion int32, prefix string, perms access.Permissions) error {
- impl := func(st store.StoreReadWriter) error {
- if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+ impl := func(tx store.Transaction) error {
+ if err := t.checkAccess(ctx, call, tx, prefix); err != nil {
return err
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
@@ -220,18 +216,18 @@
}
// Concurrent transactions that touch this table should fail with
// ErrConcurrentTransaction when this transaction commits.
- if err := t.lock(ctx, st); err != nil {
+ if err := t.lock(ctx, tx); err != nil {
return err
}
if prefix == "" {
data := &tableData{}
- return util.UpdateWithAuth(ctx, call, st, t.stKey(), data, func() error {
+ return util.UpdateWithAuth(ctx, call, tx, t.stKey(), data, func() error {
data.Perms = perms
return nil
})
}
// Get the most specific permissions object.
- parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+ parent, prefixPerms, err := t.permsForKey(ctx, tx, prefix)
if err != nil {
return err
}
@@ -240,7 +236,7 @@
// parents for all children of the prefix to the node corresponding to
// the prefix.
if parent != prefix {
- if err := t.updateParentRefs(ctx, st, prefix, prefix); err != nil {
+ if err := t.updateParentRefs(ctx, tx, prefix, prefix); err != nil {
return err
}
} else {
@@ -250,16 +246,16 @@
stPrefixLimit := stPrefix + util.PrefixRangeLimitSuffix
prefixPerms = stPrefixPerms{Parent: parent, Perms: perms}
// Put the (prefix, perms) pair to the database.
- if err := util.Put(ctx, st, stPrefix, prefixPerms); err != nil {
+ if err := util.Put(ctx, tx, stPrefix, prefixPerms); err != nil {
return err
}
- return util.Put(ctx, st, stPrefixLimit, prefixPerms)
+ return util.Put(ctx, tx, stPrefixLimit, prefixPerms)
}
if t.d.batchId != nil {
- if st, err := t.d.batchReadWriter(); err != nil {
+ if tx, err := t.d.batchTransaction(); err != nil {
return err
} else {
- return impl(st)
+ return impl(tx)
}
} else {
return store.RunInTransaction(t.d.st, impl)
@@ -270,8 +266,8 @@
if prefix == "" {
return verror.New(verror.ErrBadArg, ctx, prefix)
}
- impl := func(st store.StoreReadWriter) error {
- if err := t.checkAccess(ctx, call, st, prefix); err != nil {
+ impl := func(tx store.Transaction) error {
+ if err := t.checkAccess(ctx, call, tx, prefix); err != nil {
return err
}
if err := t.d.checkSchemaVersion(ctx, schemaVersion); err != nil {
@@ -279,11 +275,11 @@
}
// Concurrent transactions that touch this table should fail with
// ErrConcurrentTransaction when this transaction commits.
- if err := t.lock(ctx, st); err != nil {
+ if err := t.lock(ctx, tx); err != nil {
return err
}
// Get the most specific permissions object.
- parent, prefixPerms, err := t.permsForKey(ctx, st, prefix)
+ parent, prefixPerms, err := t.permsForKey(ctx, tx, prefix)
if err != nil {
return err
}
@@ -295,21 +291,21 @@
// We need to delete the node corresponding to the prefix from the prefix
// permissions tree. We do it by updating parents for all children of the
// prefix to the parent of the node corresponding to the prefix.
- if err := t.updateParentRefs(ctx, st, prefix, prefixPerms.Parent); err != nil {
+ if err := t.updateParentRefs(ctx, tx, prefix, prefixPerms.Parent); err != nil {
return err
}
stPrefix := []byte(t.prefixPermsKey(prefix))
stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
- if err := st.Delete(stPrefix); err != nil {
+ if err := tx.Delete(stPrefix); err != nil {
return err
}
- return st.Delete(stPrefixLimit)
+ return tx.Delete(stPrefixLimit)
}
if t.d.batchId != nil {
- if st, err := t.d.batchReadWriter(); err != nil {
+ if tx, err := t.d.batchTransaction(); err != nil {
return err
} else {
- return impl(st)
+ return impl(tx)
}
} else {
return store.RunInTransaction(t.d.st, impl)
@@ -317,30 +313,23 @@
}
func (t *tableReq) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
- impl := func(st store.StoreReader, closeStoreReader func() error) (<-chan string, error) {
+ impl := func(sntx store.SnapshotOrTransaction, closeSntx func() error) (<-chan string, error) {
// Check perms.
- if err := t.checkAccess(ctx, call, st, ""); err != nil {
- closeStoreReader()
+ if err := t.checkAccess(ctx, call, sntx, ""); err != nil {
+ closeSntx()
return nil, err
}
// TODO(rogulenko): Check prefix permissions for children.
- return util.Glob(ctx, call, "*", st, closeStoreReader, util.JoinKeyParts(util.RowPrefix, t.name))
+ return util.Glob(ctx, call, "*", sntx, closeSntx, util.JoinKeyParts(util.RowPrefix, t.name))
}
- var st store.StoreReader
- var closeStoreReader func() error
if t.d.batchId != nil {
- st = t.d.batchReader()
- closeStoreReader = func() error {
+ return impl(t.d.batchReader(), func() error {
return nil
- }
+ })
} else {
sn := t.d.st.NewSnapshot()
- st = sn
- closeStoreReader = func() error {
- return sn.Close()
- }
+ return impl(sn, sn.Abort)
}
- return impl(st, closeStoreReader)
}
////////////////////////////////////////
@@ -356,11 +345,11 @@
// updateParentRefs updates the parent for all children of the given
// prefix to newParent.
-func (t *tableReq) updateParentRefs(ctx *context.T, st store.StoreReadWriter, prefix, newParent string) error {
+func (t *tableReq) updateParentRefs(ctx *context.T, tx store.Transaction, prefix, newParent string) error {
stPrefix := []byte(t.prefixPermsKey(prefix))
stPrefixStart := append(stPrefix, 0)
stPrefixLimit := append(stPrefix, util.PrefixRangeLimitSuffix...)
- it := st.Scan(stPrefixStart, stPrefixLimit)
+ it := tx.Scan(stPrefixStart, stPrefixLimit)
var key, value []byte
for it.Advance() {
key, value = it.Key(key), it.Value(value)
@@ -370,7 +359,7 @@
return verror.New(verror.ErrInternal, ctx, err)
}
prefixPerms.Parent = newParent
- if err := util.Put(ctx, st, string(key), prefixPerms); err != nil {
+ if err := util.Put(ctx, tx, string(key), prefixPerms); err != nil {
it.Cancel()
return err
}
@@ -393,12 +382,12 @@
// TODO(rogulenko): Revisit this behavior to provide more granularity.
// One option is to add a prefix and its parent to the write set of the current
// transaction when the permissions object for that prefix is updated.
-func (t *tableReq) lock(ctx *context.T, st store.StoreReadWriter) error {
+func (t *tableReq) lock(ctx *context.T, tx store.Transaction) error {
var data tableData
- if err := util.Get(ctx, st, t.stKey(), &data); err != nil {
+ if err := util.Get(ctx, tx, t.stKey(), &data); err != nil {
return err
}
- return util.Put(ctx, st, t.stKey(), data)
+ return util.Put(ctx, tx, t.stKey(), data)
}
// checkAccess checks that this table exists in the database, and performs
@@ -407,13 +396,13 @@
// TODO(rogulenko): Revisit this behavior. Eventually we'll want the table-level
// access check to be a check for "Resolve", i.e. also check access to
// service, app and database.
-func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader, key string) error {
- prefix, prefixPerms, err := t.permsForKey(ctx, st, key)
+func (t *tableReq) checkAccess(ctx *context.T, call rpc.ServerCall, sntx store.SnapshotOrTransaction, key string) error {
+ prefix, prefixPerms, err := t.permsForKey(ctx, sntx, key)
if err != nil {
return err
}
if prefix != "" {
- if err := util.GetWithAuth(ctx, call, st, t.stKey(), &tableData{}); err != nil {
+ if err := util.GetWithAuth(ctx, call, sntx, t.stKey(), &tableData{}); err != nil {
return err
}
}
@@ -448,10 +437,10 @@
// proper prefixes of K are less than K; parent(t) is a prefix of K, otherwise
// K < parent(t) < t; parent(t) is the largest prefix of K, otherwise t is a
// prefix of K; in this case line 3 returns correct result.
-func (t *tableReq) permsForKey(ctx *context.T, st store.StoreReader, key string) (string, stPrefixPerms, error) {
- it := st.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
+func (t *tableReq) permsForKey(ctx *context.T, sntx store.SnapshotOrTransaction, key string) (string, stPrefixPerms, error) {
+ it := sntx.Scan(util.ScanRangeArgs(util.JoinKeyParts(util.PermsPrefix, t.name), key, ""))
if !it.Advance() {
- prefixPerms, err := t.permsForPrefix(ctx, st, "")
+ prefixPerms, err := t.permsForPrefix(ctx, sntx, "")
return "", prefixPerms, err
}
defer it.Cancel()
@@ -465,22 +454,22 @@
if strings.HasPrefix(key, prefix) {
return prefix, prefixPerms, nil
}
- prefixPerms, err := t.permsForPrefix(ctx, st, prefixPerms.Parent)
+ prefixPerms, err := t.permsForPrefix(ctx, sntx, prefixPerms.Parent)
return prefixPerms.Parent, prefixPerms, err
}
// permsForPrefix returns the permissions object associated with the
// provided prefix.
-func (t *tableReq) permsForPrefix(ctx *context.T, st store.StoreReader, prefix string) (stPrefixPerms, error) {
+func (t *tableReq) permsForPrefix(ctx *context.T, sntx store.SnapshotOrTransaction, prefix string) (stPrefixPerms, error) {
if prefix == "" {
var data tableData
- if err := util.Get(ctx, st, t.stKey(), &data); err != nil {
+ if err := util.Get(ctx, sntx, t.stKey(), &data); err != nil {
return stPrefixPerms{}, err
}
return stPrefixPerms{Perms: data.Perms}, nil
}
var prefixPerms stPrefixPerms
- if err := util.Get(ctx, st, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
+ if err := util.Get(ctx, sntx, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
return stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
}
return prefixPerms, nil
diff --git a/x/ref/services/syncbase/server/service.go b/x/ref/services/syncbase/server/service.go
index 8ed1480..5b87252 100644
--- a/x/ref/services/syncbase/server/service.go
+++ b/x/ref/services/syncbase/server/service.go
@@ -139,9 +139,9 @@
// RPC methods
func (s *service) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
- return store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(s.st, func(tx store.Transaction) error {
data := &serviceData{}
- return util.UpdateWithAuth(ctx, call, st, s.stKey(), data, func() error {
+ return util.UpdateWithAuth(ctx, call, tx, s.stKey(), data, func() error {
if err := util.CheckVersion(ctx, version, data.Version); err != nil {
return err
}
@@ -163,14 +163,11 @@
func (s *service) GlobChildren__(ctx *context.T, call rpc.ServerCall) (<-chan string, error) {
// Check perms.
sn := s.st.NewSnapshot()
- closeSnapshot := func() error {
- return sn.Close()
- }
if err := util.GetWithAuth(ctx, call, sn, s.stKey(), &serviceData{}); err != nil {
- closeSnapshot()
+ sn.Abort()
return nil, err
}
- return util.Glob(ctx, call, "*", sn, closeSnapshot, util.AppPrefix)
+ return util.Glob(ctx, call, "*", sn, sn.Abort, util.AppPrefix)
}
////////////////////////////////////////
@@ -225,14 +222,14 @@
dbs: make(map[string]interfaces.Database),
}
- if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+ if err := store.RunInTransaction(s.st, func(tx store.Transaction) error {
// Check serviceData perms.
sData := &serviceData{}
- if err := util.GetWithAuth(ctx, call, st, s.stKey(), sData); err != nil {
+ if err := util.GetWithAuth(ctx, call, tx, s.stKey(), sData); err != nil {
return err
}
// Check for "app already exists".
- if err := util.Get(ctx, st, a.stKey(), &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.Get(ctx, tx, a.stKey(), &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -246,7 +243,7 @@
Name: appName,
Perms: perms,
}
- return util.Put(ctx, st, a.stKey(), data)
+ return util.Put(ctx, tx, a.stKey(), data)
}); err != nil {
return err
}
@@ -263,16 +260,16 @@
return nil // delete is idempotent
}
- if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+ if err := store.RunInTransaction(s.st, func(tx store.Transaction) error {
// Read-check-delete appData.
- if err := util.GetWithAuth(ctx, call, st, a.stKey(), &appData{}); err != nil {
+ if err := util.GetWithAuth(ctx, call, tx, a.stKey(), &appData{}); err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
return nil // delete is idempotent
}
return err
}
// TODO(sadovsky): Delete all databases in this app.
- return util.Delete(ctx, st, a.stKey())
+ return util.Delete(ctx, tx, a.stKey())
}); err != nil {
return err
}
@@ -288,9 +285,9 @@
if !ok {
return verror.New(verror.ErrNoExist, ctx, appName)
}
- return store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
+ return store.RunInTransaction(s.st, func(tx store.Transaction) error {
data := &appData{}
- return util.UpdateWithAuth(ctx, call, st, a.stKey(), data, func() error {
+ return util.UpdateWithAuth(ctx, call, tx, a.stKey(), data, func() error {
if err := util.CheckVersion(ctx, version, data.Version); err != nil {
return err
}
diff --git a/x/ref/services/syncbase/server/util/glob.go b/x/ref/services/syncbase/server/util/glob.go
index acdd74f..2cbc3f9 100644
--- a/x/ref/services/syncbase/server/util/glob.go
+++ b/x/ref/services/syncbase/server/util/glob.go
@@ -53,29 +53,29 @@
return res, nil
}
-// Glob performs a glob. It calls closeStoreReader to close st.
+// Glob performs a glob. It calls closeSntx to close sntx.
// TODO(sadovsky): Why do we make developers implement Glob differently from
// other streaming RPCs? It's confusing that Glob must return immediately and
// write its results to a channel, while other streaming RPC handlers must block
// and write their results to the output stream. See nlacasse's TODO below, too.
-func Glob(ctx *context.T, call rpc.ServerCall, pattern string, st store.StoreReader, closeStoreReader func() error, stKeyPrefix string) (<-chan string, error) {
+func Glob(ctx *context.T, call rpc.ServerCall, pattern string, sntx store.SnapshotOrTransaction, closeSntx func() error, stKeyPrefix string) (<-chan string, error) {
// TODO(sadovsky): Support glob with non-prefix pattern.
if _, err := glob.Parse(pattern); err != nil {
- closeStoreReader()
+ closeSntx()
return nil, verror.New(verror.ErrBadArg, ctx, err)
}
prefix, err := globPatternToPrefix(pattern)
if err != nil {
- closeStoreReader()
+ closeSntx()
if verror.ErrorID(err) == verror.ErrBadArg.ID {
return nil, verror.NewErrNotImplemented(ctx)
}
return nil, verror.New(verror.ErrInternal, ctx, err)
}
- it := st.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
+ it := sntx.Scan(ScanPrefixArgs(stKeyPrefix, prefix))
ch := make(chan string)
go func() {
- defer closeStoreReader()
+ defer closeSntx()
defer close(ch)
key := []byte{}
for it.Advance() {
diff --git a/x/ref/services/syncbase/server/util/store_util.go b/x/ref/services/syncbase/server/util/store_util.go
index 38fa8ce..b8f1905 100644
--- a/x/ref/services/syncbase/server/util/store_util.go
+++ b/x/ref/services/syncbase/server/util/store_util.go
@@ -64,21 +64,21 @@
return nil
}
-// Put does st.Put(k, v) and wraps the returned error.
-func Put(ctx *context.T, st store.StoreWriter, k string, v interface{}) error {
+// Put does stw.Put(k, v) and wraps the returned error.
+func Put(ctx *context.T, stw store.StoreWriter, k string, v interface{}) error {
bytes, err := vom.Encode(v)
if err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
- if err = st.Put([]byte(k), bytes); err != nil {
+ if err = stw.Put([]byte(k), bytes); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
-// Delete does st.Delete(k, v) and wraps the returned error.
-func Delete(ctx *context.T, st store.StoreWriter, k string) error {
- if err := st.Delete([]byte(k)); err != nil {
+// Delete does stw.Delete(k, v) and wraps the returned error.
+func Delete(ctx *context.T, stw store.StoreWriter, k string) error {
+ if err := stw.Delete([]byte(k)); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
@@ -87,15 +87,14 @@
// UpdateWithAuth performs a read-modify-write.
// Input v is populated by the "read" step. fn should "modify" v.
// Performs an auth check as part of the "read" step.
-func UpdateWithAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, k string, v Permser, fn func() error) error {
- _ = st.(store.Transaction) // panics on failure, as desired
- if err := GetWithAuth(ctx, call, st, k, v); err != nil {
+func UpdateWithAuth(ctx *context.T, call rpc.ServerCall, tx store.Transaction, k string, v Permser, fn func() error) error {
+ if err := GetWithAuth(ctx, call, tx, k, v); err != nil {
return err
}
if err := fn(); err != nil {
return err
}
- return Put(ctx, st, k, v)
+ return Put(ctx, tx, k, v)
}
// Wraps a call to Get and returns true if Get found the object, false
diff --git a/x/ref/services/syncbase/server/watchable/snapshot.go b/x/ref/services/syncbase/server/watchable/snapshot.go
index ac3694f..37af4e1 100644
--- a/x/ref/services/syncbase/server/watchable/snapshot.go
+++ b/x/ref/services/syncbase/server/watchable/snapshot.go
@@ -9,6 +9,7 @@
)
type snapshot struct {
+ store.SnapshotSpecImpl
isn store.Snapshot
st *wstore
}
@@ -22,9 +23,9 @@
}
}
-// Close implements the store.Snapshot interface.
-func (s *snapshot) Close() error {
- return s.isn.Close()
+// Abort implements the store.Snapshot interface.
+func (s *snapshot) Abort() error {
+ return s.isn.Abort()
}
// Get implements the store.StoreReader interface.
diff --git a/x/ref/services/syncbase/server/watchable/store.go b/x/ref/services/syncbase/server/watchable/store.go
index 2b0037a..010643d 100644
--- a/x/ref/services/syncbase/server/watchable/store.go
+++ b/x/ref/services/syncbase/server/watchable/store.go
@@ -79,7 +79,7 @@
return st.ist.Get(key, valbuf)
}
sn := newSnapshot(st)
- defer sn.Close()
+ defer sn.Abort()
return sn.Get(key, valbuf)
}
@@ -95,16 +95,16 @@
// Put implements the store.StoreWriter interface.
func (st *wstore) Put(key, value []byte) error {
// Use watchable.Store transaction so this op gets logged.
- return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- return st.Put(key, value)
+ return store.RunInTransaction(st, func(tx store.Transaction) error {
+ return tx.Put(key, value)
})
}
// Delete implements the store.StoreWriter interface.
func (st *wstore) Delete(key []byte) error {
// Use watchable.Store transaction so this op gets logged.
- return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- return st.Delete(key)
+ return store.RunInTransaction(st, func(tx store.Transaction) error {
+ return tx.Delete(key)
})
}
diff --git a/x/ref/services/syncbase/server/watchable/stream.go b/x/ref/services/syncbase/server/watchable/stream.go
index 9c8c392..26502e1 100644
--- a/x/ref/services/syncbase/server/watchable/stream.go
+++ b/x/ref/services/syncbase/server/watchable/stream.go
@@ -13,7 +13,7 @@
// stream streams keys and values for versioned records.
type stream struct {
iit store.Stream
- st store.StoreReader
+ sntx store.SnapshotOrTransaction
mu sync.Mutex
err error
hasValue bool
@@ -25,11 +25,10 @@
// newStreamVersioned creates a new stream. It assumes all records in range
// [start, limit) are managed, i.e. versioned.
-func newStreamVersioned(st store.StoreReader, start, limit []byte) *stream {
- checkTransactionOrSnapshot(st)
+func newStreamVersioned(sntx store.SnapshotOrTransaction, start, limit []byte) *stream {
return &stream{
- iit: st.Scan(makeVersionKey(start), makeVersionKey(limit)),
- st: st,
+ iit: sntx.Scan(makeVersionKey(start), makeVersionKey(limit)),
+ sntx: sntx,
}
}
@@ -46,7 +45,7 @@
}
versionKey, version := s.iit.Key(nil), s.iit.Value(nil)
s.key = []byte(join(split(string(versionKey))[1:]...)) // drop "$version" prefix
- s.value, s.err = s.st.Get(makeAtVersionKey(s.key, version), nil)
+ s.value, s.err = s.sntx.Get(makeAtVersionKey(s.key, version), nil)
if s.err != nil {
return false
}
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index 93b6b33..fb82f72 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -177,7 +177,7 @@
// operations (create, join, leave, destroy) to notify the sync watcher of the
// change at its proper position in the timeline (the transaction commit).
// Note: this is an internal function used by sync, not part of the interface.
-func AddSyncGroupOp(ctx *context.T, tx store.StoreReadWriter, prefixes []string, remove bool) error {
+func AddSyncGroupOp(ctx *context.T, tx store.Transaction, prefixes []string, remove bool) error {
wtx := tx.(*transaction)
wtx.mu.Lock()
defer wtx.mu.Unlock()
@@ -195,7 +195,7 @@
// current keys and their versions to use when initializing the sync metadata
// at the point in the timeline when these keys become syncable (at commit).
// Note: this is an internal function used by sync, not part of the interface.
-func AddSyncSnapshotOp(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+func AddSyncSnapshotOp(ctx *context.T, tx store.Transaction, key, version []byte) error {
wtx := tx.(*transaction)
wtx.mu.Lock()
defer wtx.mu.Unlock()
@@ -227,10 +227,10 @@
// GetVersion returns the current version of a managed key. This method is used
// by the Sync module when the initiator is attempting to add new versions of
// objects. Reading the version key is used for optimistic concurrency
-// control. At minimum, an object implementing the StoreReader interface is
+// control. At minimum, an object implementing the Transaction interface is
// required since this is a Get operation.
-func GetVersion(ctx *context.T, st store.StoreReader, key []byte) ([]byte, error) {
- switch w := st.(type) {
+func GetVersion(ctx *context.T, tx store.Transaction, key []byte) ([]byte, error) {
+ switch w := tx.(type) {
case *transaction:
w.mu.Lock()
defer w.mu.Unlock()
@@ -238,8 +238,6 @@
return nil, convertError(w.err)
}
return getVersion(w.itx, key)
- case *wstore:
- return getVersion(w.ist, key)
}
return nil, verror.New(verror.ErrInternal, ctx, "unsupported store type")
}
@@ -266,8 +264,8 @@
// PutAtVersion puts a value for the managed key at the requested version. This
// method is used by the Sync module exclusively when the initiator adds objects
// with versions created on other Syncbases. At minimum, an object implementing
-// the StoreReadWriter interface is required since this is a Put operation.
-func PutAtVersion(ctx *context.T, tx store.StoreReadWriter, key, valbuf, version []byte) error {
+// the Transaction interface is required since this is a Put operation.
+func PutAtVersion(ctx *context.T, tx store.Transaction, key, valbuf, version []byte) error {
wtx := tx.(*transaction)
wtx.mu.Lock()
@@ -285,8 +283,8 @@
// version. This method is used by the Sync module exclusively when the
// initiator selects which of the already stored versions (via PutAtVersion
// calls) becomes the current version. At minimum, an object implementing
-// the StoreReadWriter interface is required since this is a Put operation.
-func PutVersion(ctx *context.T, tx store.StoreReadWriter, key, version []byte) error {
+// the Transaction interface is required since this is a Put operation.
+func PutVersion(ctx *context.T, tx store.Transaction, key, version []byte) error {
wtx := tx.(*transaction)
wtx.mu.Lock()
diff --git a/x/ref/services/syncbase/server/watchable/transaction_test.go b/x/ref/services/syncbase/server/watchable/transaction_test.go
index 43b7b94..8b95536 100644
--- a/x/ref/services/syncbase/server/watchable/transaction_test.go
+++ b/x/ref/services/syncbase/server/watchable/transaction_test.go
@@ -33,17 +33,17 @@
updateVal: "val-b2",
}
-func checkAndUpdate(st store.StoreReadWriter, data testData) error {
+func checkAndUpdate(tx store.Transaction, data testData) error {
// check and update data1
keyBytes := []byte(data.key)
- val, err := st.Get(keyBytes, nil)
+ val, err := tx.Get(keyBytes, nil)
if err != nil {
return fmt.Errorf("can't get key %q: %v", data.key, err)
}
if !bytes.Equal(val, []byte(data.createVal)) {
return fmt.Errorf("Unexpected value for key %q: %q", data.key, string(val))
}
- if err := st.Put(keyBytes, []byte(data.updateVal)); err != nil {
+ if err := tx.Put(keyBytes, []byte(data.updateVal)); err != nil {
return fmt.Errorf("can't put {%q: %v}: %v", data.key, data.updateVal, err)
}
return nil
@@ -79,13 +79,13 @@
setMockSystemClock(wst1, mockClock)
// Create data in store
- if err := store.RunInTransaction(wst1, func(st store.StoreReadWriter) error {
+ if err := store.RunInTransaction(wst1, func(tx store.Transaction) error {
// add data1
- if err := st.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
+ if err := tx.Put([]byte(data1.key), []byte(data1.createVal)); err != nil {
return fmt.Errorf("can't put {%q: %v}: %v", data1.key, data1.createVal, err)
}
// add data2
- if err := st.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
+ if err := tx.Put([]byte(data2.key), []byte(data2.createVal)); err != nil {
return fmt.Errorf("can't put {%q: %v}: %v", data2.key, data2.createVal, err)
}
return nil
@@ -110,11 +110,11 @@
t.Errorf("unexpected sequence number for update. seq for create: %d, seq for update: %d", seqForCreate, seqForUpdate)
}
- if err := store.RunInTransaction(wst2, func(st store.StoreReadWriter) error {
- if err := checkAndUpdate(st, data1); err != nil {
+ if err := store.RunInTransaction(wst2, func(tx store.Transaction) error {
+ if err := checkAndUpdate(tx, data1); err != nil {
return err
}
- if err := checkAndUpdate(st, data2); err != nil {
+ if err := checkAndUpdate(tx, data2); err != nil {
return err
}
return nil
@@ -144,33 +144,33 @@
t.Fatalf("Wrap failed: %v", err)
}
- if err := store.RunInTransaction(wst, func(st store.StoreReadWriter) error {
+ if err := store.RunInTransaction(wst, func(tx store.Transaction) error {
putKey, putVal := []byte("foo"), []byte("bar")
- if err := st.Put(putKey, putVal); err != nil {
+ if err := tx.Put(putKey, putVal); err != nil {
return err
}
getKey := []byte("foo")
- if getVal, err := st.Get(getKey, nil); err != nil {
+ if getVal, err := tx.Get(getKey, nil); err != nil {
return err
} else {
eq(t, getVal, putVal)
}
start, limit := []byte("aaa"), []byte("bbb")
- st.Scan(start, limit)
+ tx.Scan(start, limit)
delKey := []byte("foo")
- if err := st.Delete(delKey); err != nil {
+ if err := tx.Delete(delKey); err != nil {
return err
}
sgPrefixes := []string{"sga", "sgb"}
- if err := AddSyncGroupOp(nil, st, sgPrefixes, false); err != nil {
+ if err := AddSyncGroupOp(nil, tx, sgPrefixes, false); err != nil {
return err
}
snKey, snVersion := []byte("aa"), []byte("123")
- if err := AddSyncSnapshotOp(nil, st, snKey, snVersion); err != nil {
+ if err := AddSyncSnapshotOp(nil, tx, snKey, snVersion); err != nil {
return err
}
pvKey, pvVersion := []byte("pv"), []byte("456")
- if err := PutVersion(nil, st, pvKey, pvVersion); err != nil {
+ if err := PutVersion(nil, tx, pvKey, pvVersion); err != nil {
return err
}
for _, buf := range [][]byte{putKey, putVal, getKey, start, limit, delKey, snKey, snVersion, pvKey, pvVersion} {
diff --git a/x/ref/services/syncbase/server/watchable/util.go b/x/ref/services/syncbase/server/watchable/util.go
index 3d08129..64f8eeb 100644
--- a/x/ref/services/syncbase/server/watchable/util.go
+++ b/x/ref/services/syncbase/server/watchable/util.go
@@ -50,21 +50,20 @@
return []byte(join(string(key), string(version)))
}
-func getVersion(st store.StoreReader, key []byte) ([]byte, error) {
- return st.Get(makeVersionKey(key), nil)
+func getVersion(sntx store.SnapshotOrTransaction, key []byte) ([]byte, error) {
+ return sntx.Get(makeVersionKey(key), nil)
}
func getAtVersion(st store.StoreReader, key, valbuf, version []byte) ([]byte, error) {
return st.Get(makeAtVersionKey(key, version), valbuf)
}
-func getVersioned(st store.StoreReader, key, valbuf []byte) ([]byte, error) {
- checkTransactionOrSnapshot(st)
- version, err := getVersion(st, key)
+func getVersioned(sntx store.SnapshotOrTransaction, key, valbuf []byte) ([]byte, error) {
+ version, err := getVersion(sntx, key)
if err != nil {
return valbuf, err
}
- return getAtVersion(st, key, valbuf, version)
+ return getAtVersion(sntx, key, valbuf, version)
}
func putVersioned(tx store.Transaction, key, value []byte) ([]byte, error) {
@@ -82,14 +81,6 @@
return tx.Delete(makeVersionKey(key))
}
-func checkTransactionOrSnapshot(st store.StoreReader) {
- _, isTransaction := st.(store.Transaction)
- _, isSnapshot := st.(store.Snapshot)
- if !isTransaction && !isSnapshot {
- panic("neither a Transaction nor a Snapshot")
- }
-}
-
func getLogEntryKey(seq uint64) string {
// Note: MaxUint64 is 0xffffffffffffffff.
// TODO(sadovsky): Use a more space-efficient lexicographic number encoding.
diff --git a/x/ref/services/syncbase/store/constants.go b/x/ref/services/syncbase/store/constants.go
index f97f2fd..26551fa 100644
--- a/x/ref/services/syncbase/store/constants.go
+++ b/x/ref/services/syncbase/store/constants.go
@@ -6,10 +6,10 @@
// TODO(sadovsky): Maybe define verrors for these.
const (
- ErrMsgClosedStore = "closed store"
- ErrMsgClosedSnapshot = "closed snapshot"
- ErrMsgCanceledStream = "canceled stream"
- ErrMsgCommittedTxn = "already called commit"
- ErrMsgAbortedTxn = "already called abort"
- ErrMsgExpiredTxn = "expired transaction"
+ ErrMsgClosedStore = "closed store"
+ ErrMsgAbortedSnapshot = "aborted snapshot"
+ ErrMsgCanceledStream = "canceled stream"
+ ErrMsgCommittedTxn = "already called commit"
+ ErrMsgAbortedTxn = "already called abort"
+ ErrMsgExpiredTxn = "expired transaction"
)
diff --git a/x/ref/services/syncbase/store/invalid_types.go b/x/ref/services/syncbase/store/invalid_types.go
index bb008e2..a230684 100644
--- a/x/ref/services/syncbase/store/invalid_types.go
+++ b/x/ref/services/syncbase/store/invalid_types.go
@@ -10,6 +10,7 @@
// InvalidSnapshot is a Snapshot for which all methods return errors.
type InvalidSnapshot struct {
+ SnapshotSpecImpl
Error error // returned by all methods
}
@@ -32,8 +33,8 @@
////////////////////////////////////////////////////////////
// InvalidSnapshot
-// Close implements the store.Snapshot interface.
-func (s *InvalidSnapshot) Close() error {
+// Abort implements the store.Snapshot interface.
+func (s *InvalidSnapshot) Abort() error {
return convertError(s.Error)
}
diff --git a/x/ref/services/syncbase/store/leveldb/db.go b/x/ref/services/syncbase/store/leveldb/db.go
index 765103f..4f1ba6c 100644
--- a/x/ref/services/syncbase/store/leveldb/db.go
+++ b/x/ref/services/syncbase/store/leveldb/db.go
@@ -166,7 +166,7 @@
d.mu.RLock()
defer d.mu.RUnlock()
if d.err != nil {
- return &store.InvalidSnapshot{d.err}
+ return &store.InvalidSnapshot{Error: d.err}
}
return newSnapshot(d, d.node)
}
diff --git a/x/ref/services/syncbase/store/leveldb/snapshot.go b/x/ref/services/syncbase/store/leveldb/snapshot.go
index e43d67f..a07be02 100644
--- a/x/ref/services/syncbase/store/leveldb/snapshot.go
+++ b/x/ref/services/syncbase/store/leveldb/snapshot.go
@@ -16,6 +16,7 @@
// snapshot is a wrapper around LevelDB snapshot that implements
// the store.Snapshot interface.
type snapshot struct {
+ store.SnapshotSpecImpl
// mu protects the state of the snapshot.
mu sync.RWMutex
node *store.ResourceNode
@@ -39,13 +40,13 @@
cOpts: cOpts,
}
parent.AddChild(s.node, func() {
- s.Close()
+ s.Abort()
})
return s
}
-// Close implements the store.Snapshot interface.
-func (s *snapshot) Close() error {
+// Abort implements the store.Snapshot interface.
+func (s *snapshot) Abort() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.err != nil {
@@ -56,7 +57,7 @@
s.cOpts = nil
C.leveldb_release_snapshot(s.d.cDb, s.cSnapshot)
s.cSnapshot = nil
- s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedSnapshot)
+ s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedSnapshot)
return nil
}
diff --git a/x/ref/services/syncbase/store/memstore/snapshot.go b/x/ref/services/syncbase/store/memstore/snapshot.go
index 43beffd..128d127 100644
--- a/x/ref/services/syncbase/store/memstore/snapshot.go
+++ b/x/ref/services/syncbase/store/memstore/snapshot.go
@@ -12,6 +12,7 @@
)
type snapshot struct {
+ store.SnapshotSpecImpl
mu sync.Mutex
node *store.ResourceNode
data map[string][]byte
@@ -31,20 +32,20 @@
data: dataCopy,
}
parent.AddChild(s.node, func() {
- s.Close()
+ s.Abort()
})
return s
}
-// Close implements the store.Snapshot interface.
-func (s *snapshot) Close() error {
+// Abort implements the store.Snapshot interface.
+func (s *snapshot) Abort() error {
s.mu.Lock()
defer s.mu.Unlock()
if s.err != nil {
return convertError(s.err)
}
s.node.Close()
- s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgClosedSnapshot)
+ s.err = verror.New(verror.ErrCanceled, nil, store.ErrMsgAbortedSnapshot)
return nil
}
diff --git a/x/ref/services/syncbase/store/memstore/store.go b/x/ref/services/syncbase/store/memstore/store.go
index 7e3f816..f4ef644 100644
--- a/x/ref/services/syncbase/store/memstore/store.go
+++ b/x/ref/services/syncbase/store/memstore/store.go
@@ -73,15 +73,15 @@
// Put implements the store.StoreWriter interface.
func (st *memstore) Put(key, value []byte) error {
- return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- return st.Put(key, value)
+ return store.RunInTransaction(st, func(tx store.Transaction) error {
+ return tx.Put(key, value)
})
}
// Delete implements the store.StoreWriter interface.
func (st *memstore) Delete(key []byte) error {
- return store.RunInTransaction(st, func(st store.StoreReadWriter) error {
- return st.Delete(key)
+ return store.RunInTransaction(st, func(tx store.Transaction) error {
+ return tx.Delete(key)
})
}
@@ -101,7 +101,7 @@
st.mu.Lock()
defer st.mu.Unlock()
if st.err != nil {
- return &store.InvalidSnapshot{st.err}
+ return &store.InvalidSnapshot{Error: st.err}
}
return newSnapshot(st, st.node)
}
diff --git a/x/ref/services/syncbase/store/model.go b/x/ref/services/syncbase/store/model.go
index a2c48f0..be7265d 100644
--- a/x/ref/services/syncbase/store/model.go
+++ b/x/ref/services/syncbase/store/model.go
@@ -37,15 +37,15 @@
Delete(key []byte) error
}
-// StoreReadWriter combines StoreReader and StoreWriter.
-type StoreReadWriter interface {
+// storeReadWriter combines StoreReader and StoreWriter.
+type storeReadWriter interface {
StoreReader
StoreWriter
}
// Store is a CRUD-capable storage engine that supports transactions.
type Store interface {
- StoreReadWriter
+ storeReadWriter
// Close closes the store.
Close() error
@@ -59,6 +59,29 @@
NewSnapshot() Snapshot
}
+// SnapshotOrTransaction represents a Snapshot or a Transaction.
+type SnapshotOrTransaction interface {
+ StoreReader
+
+ // Abort closes the snapshot or transaction.
+ // Any subsequent method calls will fail.
+ // NOTE: this method is also used to distinguish between StoreReader and
+ // SnapshotOrTransaction.
+ Abort() error
+}
+
+// Snapshot is a handle to particular state in time of a Store.
+//
+// All read operations are executed against a consistent view of Store commit
+// history. Snapshots don't acquire locks and thus don't block transactions.
+type Snapshot interface {
+ SnapshotOrTransaction
+
+ // __snapshotSpec is a utility method to distinguish between Snapshot and
+ // SnapshotOrTransaction. This is a no-op.
+ __snapshotSpec()
+}
+
// Transaction provides a mechanism for atomic reads and writes. Instead of
// calling this function directly, clients are encouraged to use the
// RunInTransaction() helper function, which detects "concurrent transaction"
@@ -77,27 +100,13 @@
// Once a transaction has been committed or aborted, subsequent method calls
// will fail with no effect.
type Transaction interface {
- StoreReadWriter
+ SnapshotOrTransaction
+ StoreWriter
// Commit commits the transaction.
// Fails if writes from outside this transaction conflict with reads from
// within this transaction.
Commit() error
-
- // Abort aborts the transaction.
- Abort() error
-}
-
-// Snapshot is a handle to particular state in time of a Store.
-//
-// All read operations are executed against a consistent view of Store commit
-// history. Snapshots don't acquire locks and thus don't block transactions.
-type Snapshot interface {
- StoreReader
-
- // Close closes the snapshot.
- // Any subsequent method calls will fail.
- Close() error
}
// Stream is an interface for iterating through a collection of key-value pairs.
diff --git a/x/ref/services/syncbase/store/test/snapshot.go b/x/ref/services/syncbase/store/test/snapshot.go
index 4dbee5f..04dee18 100644
--- a/x/ref/services/syncbase/store/test/snapshot.go
+++ b/x/ref/services/syncbase/store/test/snapshot.go
@@ -26,12 +26,12 @@
verifyAdvance(t, s, key1, value1)
verifyAdvance(t, s, nil, nil)
- // Test functions after Close.
- if err := snapshot.Close(); err != nil {
- t.Fatalf("can't close the snapshot: %v", err)
+ // Test functions after Abort.
+ if err := snapshot.Abort(); err != nil {
+ t.Fatalf("can't abort the snapshot: %v", err)
}
- expectedErrMsg := store.ErrMsgClosedSnapshot
- verifyError(t, snapshot.Close(), verror.ErrCanceled.ID, expectedErrMsg)
+ expectedErrMsg := store.ErrMsgAbortedSnapshot
+ verifyError(t, snapshot.Abort(), verror.ErrCanceled.ID, expectedErrMsg)
_, err := snapshot.Get(key1, nil)
verifyError(t, err, verror.ErrCanceled.ID, expectedErrMsg)
diff --git a/x/ref/services/syncbase/store/test/store.go b/x/ref/services/syncbase/store/test/store.go
index 2d6d9a1..48022f9 100644
--- a/x/ref/services/syncbase/store/test/store.go
+++ b/x/ref/services/syncbase/store/test/store.go
@@ -137,7 +137,7 @@
s.verify(t, st)
for i := 0; i < len(states); i++ {
states[i].verify(t, snapshots[i])
- snapshots[i].Close()
+ snapshots[i].Abort()
}
}
@@ -230,7 +230,7 @@
}
for _, snapshot := range snapshots {
_, err := snapshot.Get(key1, nil)
- verifyError(t, err, verror.ErrCanceled.ID, store.ErrMsgClosedSnapshot)
+ verifyError(t, err, verror.ErrCanceled.ID, store.ErrMsgAbortedSnapshot)
}
for _, tx := range transactions {
_, err := tx.Get(key1, nil)
diff --git a/x/ref/services/syncbase/store/test/transaction.go b/x/ref/services/syncbase/store/test/transaction.go
index 3e1039b..6cf26e8 100644
--- a/x/ref/services/syncbase/store/test/transaction.go
+++ b/x/ref/services/syncbase/store/test/transaction.go
@@ -155,7 +155,7 @@
go func(idx int) {
rnd := rand.New(rand.NewSource(239017 * int64(idx)))
perm := rnd.Perm(n)
- if err := store.RunInTransaction(st, func(st store.StoreReadWriter) error {
+ if err := store.RunInTransaction(st, func(tx store.Transaction) error {
for j := 0; j <= m; j++ {
var keystr string
if j < m {
@@ -164,7 +164,7 @@
keystr = fmt.Sprintf("%05d", n)
}
key := []byte(keystr)
- val, err := st.Get(key, nil)
+ val, err := tx.Get(key, nil)
if err != nil {
return fmt.Errorf("can't get key %q: %v", key, err)
}
@@ -178,7 +178,7 @@
} else {
newValue = intValue + int64(m)
}
- if err := st.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
+ if err := tx.Put(key, []byte(fmt.Sprintf("%d", newValue))); err != nil {
return fmt.Errorf("can't put {%q: %v}: %v", key, newValue, err)
}
}
diff --git a/x/ref/services/syncbase/store/util.go b/x/ref/services/syncbase/store/util.go
index aac934b..72c4beb 100644
--- a/x/ref/services/syncbase/store/util.go
+++ b/x/ref/services/syncbase/store/util.go
@@ -8,9 +8,13 @@
"v.io/v23/verror"
)
+type SnapshotSpecImpl struct{}
+
+func (s *SnapshotSpecImpl) __snapshotSpec() {}
+
// RunInTransaction runs the given fn in a transaction, managing retries and
// commit/abort.
-func RunInTransaction(st Store, fn func(st StoreReadWriter) error) error {
+func RunInTransaction(st Store, fn func(tx Transaction) error) error {
// TODO(rogulenko): Make the number of attempts configurable.
// TODO(rogulenko): Change the default number of attempts to 3. Currently,
// some storage engine tests fail when the number of attempts is that low.
diff --git a/x/ref/services/syncbase/vsync/dag.go b/x/ref/services/syncbase/vsync/dag.go
index 3850d5b..121f594 100644
--- a/x/ref/services/syncbase/vsync/dag.go
+++ b/x/ref/services/syncbase/vsync/dag.go
@@ -222,9 +222,7 @@
// endBatch marks the end of a given batch. The batch information is persisted
// to the store and removed from the temporary in-memory entry.
-func (s *syncService) endBatch(ctx *context.T, tx store.StoreReadWriter, btid, count uint64) error {
- _ = tx.(store.Transaction)
-
+func (s *syncService) endBatch(ctx *context.T, tx store.Transaction, btid, count uint64) error {
s.batchesLock.Lock()
defer s.batchesLock.Unlock()
@@ -271,9 +269,7 @@
//
// The grafting structure is not needed when nodes are being added locally by
// the Watcher, passing a nil grafting structure.
-func (s *syncService) addNode(ctx *context.T, tx store.StoreReadWriter, oid, version, logrec string, deleted bool, parents []string, btid uint64, graft graftMap) error {
- _ = tx.(store.Transaction)
-
+func (s *syncService) addNode(ctx *context.T, tx store.Transaction, oid, version, logrec string, deleted bool, parents []string, btid uint64, graft graftMap) error {
if parents != nil {
if len(parents) > 2 {
return verror.New(verror.ErrInternal, ctx, "cannot have more than 2 parents")
@@ -380,9 +376,7 @@
// to track DAG attachements during a sync operation. It is not needed if the
// parent linkage is due to a local change (from conflict resolution selecting
// an existing version).
-func (s *syncService) addParent(ctx *context.T, tx store.StoreReadWriter, oid, version, parent string, graft graftMap) error {
- _ = tx.(store.Transaction)
-
+func (s *syncService) addParent(ctx *context.T, tx store.Transaction, oid, version, parent string, graft graftMap) error {
if version == parent {
return verror.New(verror.ErrInternal, ctx, "object", oid, version, "cannot be its own parent")
}
@@ -447,9 +441,7 @@
}
// moveHead moves the object head node in the DAG.
-func moveHead(ctx *context.T, tx store.StoreReadWriter, oid, head string) error {
- _ = tx.(store.Transaction)
-
+func moveHead(ctx *context.T, tx store.Transaction, oid, head string) error {
// Verify that the node exists.
if ok, err := hasNode(ctx, tx, oid, head); err != nil {
return err
@@ -565,7 +557,7 @@
// getObjectGraft returns the graftInfo for an object ID. If the graftMap is
// nil, a nil graftInfo is returned because grafting is not being tracked.
-func getObjectGraftInfo(ctx *context.T, st store.StoreReader, graft graftMap, oid string) *graftInfo {
+func getObjectGraftInfo(ctx *context.T, sntx store.SnapshotOrTransaction, graft graftMap, oid string) *graftInfo {
if graft == nil {
return nil
}
@@ -580,7 +572,7 @@
}
// If the object has a head node, include it in the set of new heads.
- if head, err := getHead(ctx, st, oid); err == nil {
+ if head, err := getHead(ctx, sntx, oid); err == nil {
info.newHeads[head] = true
info.oldHeadSnap = head
}
@@ -636,9 +628,7 @@
// The batch set passed is used to track batches affected by the deletion of DAG
// objects across multiple calls to prune(). It is later given to pruneDone()
// to do GC on these batches.
-func prune(ctx *context.T, tx store.StoreReadWriter, oid, version string, batches batchSet, delLogRec func(ctx *context.T, tx store.StoreReadWriter, logrec string) error) error {
- _ = tx.(store.Transaction)
-
+func prune(ctx *context.T, tx store.Transaction, oid, version string, batches batchSet, delLogRec func(ctx *context.T, tx store.Transaction, logrec string) error) error {
if batches == nil {
return verror.New(verror.ErrInternal, ctx, "missing batch set")
}
@@ -690,9 +680,7 @@
// pruneDone is called when object pruning is finished within a single pass of
// the sync garbage collector. It updates the batch sets affected by objects
// deleted by prune().
-func pruneDone(ctx *context.T, tx store.StoreReadWriter, batches batchSet) error {
- _ = tx.(store.Transaction)
-
+func pruneDone(ctx *context.T, tx store.Transaction, batches batchSet) error {
// Update batch sets by removing the pruned objects from them.
for btid, pruneInfo := range batches {
info, err := getBatch(ctx, tx, btid)
@@ -734,9 +722,7 @@
}
// setNode stores the DAG node entry.
-func setNode(ctx *context.T, tx store.StoreReadWriter, oid, version string, node *dagNode) error {
- _ = tx.(store.Transaction)
-
+func setNode(ctx *context.T, tx store.Transaction, oid, version string, node *dagNode) error {
if version == NoVersion {
return verror.New(verror.ErrInternal, ctx, "invalid version", version)
}
@@ -759,9 +745,7 @@
}
// delNode deletes the DAG node entry.
-func delNode(ctx *context.T, tx store.StoreReadWriter, oid, version string) error {
- _ = tx.(store.Transaction)
-
+func delNode(ctx *context.T, tx store.Transaction, oid, version string) error {
if version == NoVersion {
return verror.New(verror.ErrInternal, ctx, "invalid version", version)
}
@@ -787,9 +771,7 @@
}
// setHead stores version as the DAG object head.
-func setHead(ctx *context.T, tx store.StoreReadWriter, oid, version string) error {
- _ = tx.(store.Transaction)
-
+func setHead(ctx *context.T, tx store.Transaction, oid, version string) error {
if version == NoVersion {
return verror.New(verror.ErrInternal, ctx, fmt.Errorf("invalid version: %s", version))
}
@@ -808,8 +790,7 @@
}
// delHead deletes the DAG object head.
-func delHead(ctx *context.T, tx store.StoreReadWriter, oid string) error {
- _ = tx.(store.Transaction)
+func delHead(ctx *context.T, tx store.Transaction, oid string) error {
return util.Delete(ctx, tx, headKey(oid))
}
@@ -819,9 +800,7 @@
}
// setBatch stores the DAG batch entry.
-func setBatch(ctx *context.T, tx store.StoreReadWriter, btid uint64, info *batchInfo) error {
- _ = tx.(store.Transaction)
-
+func setBatch(ctx *context.T, tx store.Transaction, btid uint64, info *batchInfo) error {
if btid == NoBatchId {
return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
}
@@ -844,9 +823,7 @@
}
// delBatch deletes the DAG batch entry.
-func delBatch(ctx *context.T, tx store.StoreReadWriter, btid uint64) error {
- _ = tx.(store.Transaction)
-
+func delBatch(ctx *context.T, tx store.Transaction, btid uint64) error {
if btid == NoBatchId {
return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
}
diff --git a/x/ref/services/syncbase/vsync/dag_test.go b/x/ref/services/syncbase/vsync/dag_test.go
index 3728635..9ef43f7 100644
--- a/x/ref/services/syncbase/vsync/dag_test.go
+++ b/x/ref/services/syncbase/vsync/dag_test.go
@@ -790,7 +790,7 @@
tx := st.NewTransaction()
del := 0
err := prune(nil, tx, oid, version, batches,
- func(ctx *context.T, tx store.StoreReadWriter, lr string) error {
+ func(ctx *context.T, tx store.Transaction, lr string) error {
del++
return nil
})
@@ -872,7 +872,7 @@
batches := newBatchPruning()
tx := st.NewTransaction()
err := prune(nil, tx, oid, version, batches,
- func(ctx *context.T, tx store.StoreReadWriter, lr string) error {
+ func(ctx *context.T, tx store.Transaction, lr string) error {
del++
if lr == "logrec-03" {
return fmt.Errorf("refuse to delete %s", lr)
@@ -1574,7 +1574,7 @@
t.Errorf("cannot getHead() on object %s: %v", oid, err)
}
err = prune(nil, tx, oid, head, batches,
- func(ctx *context.T, itx store.StoreReadWriter, lr string) error {
+ func(ctx *context.T, itx store.Transaction, lr string) error {
return nil
})
if err != nil {
@@ -1614,7 +1614,7 @@
batches = newBatchPruning()
err = prune(nil, tx, oid_c, "3", batches,
- func(ctx *context.T, itx store.StoreReadWriter, lr string) error {
+ func(ctx *context.T, itx store.Transaction, lr string) error {
return nil
})
if err != nil {
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index 6b4fad1..8ba9cd9 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -427,9 +427,9 @@
// TODO(hpucha): This can be optimized using a scan instead of "gets" in a for
// loop.
-func getNextLogRec(ctx *context.T, sn store.StoreReader, dev uint64, r *genRange) (*localLogRec, error) {
+func getNextLogRec(ctx *context.T, st store.Store, dev uint64, r *genRange) (*localLogRec, error) {
for i := r.cur; i <= r.max; i++ {
- rec, err := getLogRec(ctx, sn, dev, i)
+ rec, err := getLogRec(ctx, st, dev, i)
if err == nil {
r.cur = i + 1
return rec, nil
diff --git a/x/ref/services/syncbase/vsync/sync.go b/x/ref/services/syncbase/vsync/sync.go
index 6b017ee..ebb55f6 100644
--- a/x/ref/services/syncbase/vsync/sync.go
+++ b/x/ref/services/syncbase/vsync/sync.go
@@ -19,7 +19,7 @@
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
-
+ "v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/verror"
@@ -119,16 +119,19 @@
}
data := &syncData{}
- if err := util.Get(ctx, sv.St(), s.stKey(), data); err != nil {
- if verror.ErrorID(err) != verror.ErrNoExist.ID {
- return nil, err
+ if err := store.RunInTransaction(sv.St(), func(tx store.Transaction) error {
+ if err := util.Get(ctx, sv.St(), s.stKey(), data); err != nil {
+ if verror.ErrorID(err) != verror.ErrNoExist.ID {
+ return err
+ }
+ // First invocation of vsync.New().
+ // TODO(sadovsky): Maybe move guid generation and storage to serviceData.
+ data.Id = rand64()
+ return util.Put(ctx, tx, s.stKey(), data)
}
- // First invocation of vsync.New().
- // TODO(sadovsky): Maybe move guid generation and storage to serviceData.
- data.Id = rand64()
- if err := util.Put(ctx, sv.St(), s.stKey(), data); err != nil {
- return nil, err
- }
+ return nil
+ }); err != nil {
+ return nil, err
}
// data.Id is now guaranteed to be initialized.
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index 0c1adce..8195559 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -269,8 +269,7 @@
}
// putDbSyncState persists the sync state object for a given Database.
-func putDbSyncState(ctx *context.T, tx store.StoreReadWriter, ds *dbSyncState) error {
- _ = tx.(store.Transaction)
+func putDbSyncState(ctx *context.T, tx store.Transaction, ds *dbSyncState) error {
return util.Put(ctx, tx, dbSyncStateKey(), ds)
}
@@ -331,8 +330,7 @@
}
// putLogRec stores the log record.
-func putLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
- _ = tx.(store.Transaction)
+func putLogRec(ctx *context.T, tx store.Transaction, rec *localLogRec) error {
return util.Put(ctx, tx, logRecKey(rec.Metadata.Id, rec.Metadata.Gen), rec)
}
@@ -346,7 +344,6 @@
}
// delLogRec deletes the log record for a given (devid, gen).
-func delLogRec(ctx *context.T, tx store.StoreReadWriter, id, gen uint64) error {
- _ = tx.(store.Transaction)
+func delLogRec(ctx *context.T, tx store.Transaction, id, gen uint64) error {
return util.Delete(ctx, tx, logRecKey(id, gen))
}
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 961121c..bf4e3aa 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -110,9 +110,7 @@
}
// addSyncGroup adds a new SyncGroup given its information.
-func addSyncGroup(ctx *context.T, tx store.StoreReadWriter, sg *interfaces.SyncGroup) error {
- _ = tx.(store.Transaction)
-
+func addSyncGroup(ctx *context.T, tx store.Transaction, sg *interfaces.SyncGroup) error {
// Verify SyncGroup before storing it since it may have been received
// from a remote peer.
if err := verifySyncGroup(ctx, sg); err != nil {
@@ -170,9 +168,7 @@
}
// delSyncGroupById deletes the SyncGroup given its ID.
-func delSyncGroupById(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId) error {
- _ = tx.(store.Transaction)
-
+func delSyncGroupById(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
sg, err := getSyncGroupById(ctx, tx, gid)
if err != nil {
return err
@@ -184,9 +180,7 @@
}
// delSyncGroupByName deletes the SyncGroup given its name.
-func delSyncGroupByName(ctx *context.T, tx store.StoreReadWriter, name string) error {
- _ = tx.(store.Transaction)
-
+func delSyncGroupByName(ctx *context.T, tx store.Transaction, name string) error {
gid, err := getSyncGroupId(ctx, tx, name)
if err != nil {
return err
@@ -218,7 +212,7 @@
// For each database, fetch its SyncGroup data entries by scanning their
// prefix range. Use a database snapshot for the scan.
sn := st.NewSnapshot()
- defer sn.Close()
+ defer sn.Abort()
name := appDbName(appName, dbName)
forEachSyncGroup(sn, func(sg *interfaces.SyncGroup) bool {
@@ -317,10 +311,10 @@
}
// hasSGDataEntry returns true if the SyncGroup data entry exists.
-func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) (bool, error) {
+func hasSGDataEntry(sntx store.SnapshotOrTransaction, gid interfaces.GroupId) (bool, error) {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
var sg interfaces.SyncGroup
- if err := util.Get(nil, st, sgDataKey(gid), &sg); err != nil {
+ if err := util.Get(nil, sntx, sgDataKey(gid), &sg); err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
err = nil
}
@@ -330,10 +324,10 @@
}
// hasSGNameEntry returns true if the SyncGroup name entry exists.
-func hasSGNameEntry(st store.StoreReader, name string) (bool, error) {
+func hasSGNameEntry(sntx store.SnapshotOrTransaction, name string) (bool, error) {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
var gid interfaces.GroupId
- if err := util.Get(nil, st, sgNameKey(name), &gid); err != nil {
+ if err := util.Get(nil, sntx, sgNameKey(name), &gid); err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
err = nil
}
@@ -343,14 +337,12 @@
}
// setSGDataEntry stores the SyncGroup data entry.
-func setSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
- _ = tx.(store.Transaction)
+func setSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
return util.Put(ctx, tx, sgDataKey(gid), sg)
}
// setSGNameEntry stores the SyncGroup name entry.
-func setSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string, gid interfaces.GroupId) error {
- _ = tx.(store.Transaction)
+func setSGNameEntry(ctx *context.T, tx store.Transaction, name string, gid interfaces.GroupId) error {
return util.Put(ctx, tx, sgNameKey(name), gid)
}
@@ -373,14 +365,12 @@
}
// delSGDataEntry deletes the SyncGroup data entry.
-func delSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId) error {
- _ = tx.(store.Transaction)
+func delSGDataEntry(ctx *context.T, tx store.Transaction, gid interfaces.GroupId) error {
return util.Delete(ctx, tx, sgDataKey(gid))
}
// delSGNameEntry deletes the SyncGroup name to ID mapping.
-func delSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string) error {
- _ = tx.(store.Transaction)
+func delSGNameEntry(ctx *context.T, tx store.Transaction, name string) error {
return util.Delete(ctx, tx, sgNameKey(name))
}
@@ -392,7 +382,7 @@
vlog.VI(2).Infof("sync: CreateSyncGroup: begin: %s", sgName)
defer vlog.VI(2).Infof("sync: CreateSyncGroup: end: %s", sgName)
- err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+ err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
return err
@@ -452,7 +442,7 @@
var sg *interfaces.SyncGroup
nullSpec := wire.SyncGroupSpec{}
- err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+ err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
return err
@@ -509,7 +499,7 @@
return nullSpec, verror.New(verror.ErrBadArg, ctx, "bad app/db with syncgroup")
}
- err = store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+ err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
@@ -540,7 +530,7 @@
defer vlog.VI(2).Infof("sync: GetSyncGroupNames: end: %v", sgNames)
sn := sd.db.St().NewSnapshot()
- defer sn.Close()
+ defer sn.Abort()
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
@@ -573,7 +563,7 @@
defer vlog.VI(2).Infof("sync: GetSyncGroupSpec: end: %s spec %v", sgName, spec)
sn := sd.db.St().NewSnapshot()
- defer sn.Close()
+ defer sn.Abort()
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
@@ -598,7 +588,7 @@
defer vlog.VI(2).Infof("sync: GetSyncGroupMembers: end: %s members %v", sgName, members)
sn := sd.db.St().NewSnapshot()
- defer sn.Close()
+ defer sn.Abort()
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, sn); err != nil {
@@ -622,7 +612,7 @@
vlog.VI(2).Infof("sync: SetSyncGroupSpec: begin %s %v %s", sgName, spec, version)
defer vlog.VI(2).Infof("sync: SetSyncGroupSpec: end: %s", sgName)
- err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+ err := store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
return err
@@ -673,7 +663,7 @@
// Publish rejected. Persist that to avoid retrying in the
// future and to remember the split universe scenario.
- err = store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
+ err = store.RunInTransaction(sd.db.St(), func(tx store.Transaction) error {
// Ensure SG still exists.
sg, err := getSyncGroupByName(ctx, tx, sgName)
if err != nil {
@@ -694,7 +684,7 @@
// be time consuming. Consider doing it asynchronously and letting the server
// reply to the client earlier. However it must happen within the scope of this
// transaction (and its snapshot view).
-func (sd *syncDatabase) bootstrapSyncGroup(ctx *context.T, tx store.StoreReadWriter, prefixes []string) error {
+func (sd *syncDatabase) bootstrapSyncGroup(ctx *context.T, tx store.Transaction, prefixes []string) error {
if len(prefixes) == 0 {
return verror.New(verror.ErrInternal, ctx, "no prefixes specified")
}
@@ -789,7 +779,7 @@
return err
}
- err = store.RunInTransaction(st, func(tx store.StoreReadWriter) error {
+ err = store.RunInTransaction(st, func(tx store.Transaction) error {
localSG, err := getSyncGroupByName(ctx, tx, sg.Name)
if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
@@ -856,7 +846,7 @@
}
var sg *interfaces.SyncGroup
- err = store.RunInTransaction(dbSt, func(tx store.StoreReadWriter) error {
+ err = store.RunInTransaction(dbSt, func(tx store.Transaction) error {
var err error
sg, err = getSyncGroupById(ctx, tx, gid)
if err != nil {
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 8f5f60c..6d80317 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -169,7 +169,7 @@
// Transactional processing of the batch: convert these syncable log
// records to a batch of sync log records, filling their parent versions
// from the DAG head nodes.
- err := store.RunInTransaction(st, func(tx store.StoreReadWriter) error {
+ err := store.RunInTransaction(st, func(tx store.Transaction) error {
batch := make([]*localLogRec, 0, len(logs))
for _, entry := range logs {
if rec, err := convertLogRecord(ctx, tx, entry); err != nil {
@@ -193,9 +193,7 @@
// processBatch applies a single batch of changes (object mutations) received
// from watching a particular Database.
-func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, appBatch bool, totalCount uint64, tx store.StoreReadWriter) error {
- _ = tx.(store.Transaction)
-
+func (s *syncService) processBatch(ctx *context.T, appName, dbName string, batch []*localLogRec, appBatch bool, totalCount uint64, tx store.Transaction) error {
count := uint64(len(batch))
if count == 0 {
return nil
@@ -247,7 +245,7 @@
// processLocalLogRec processes a local log record by adding to the Database and
// suitably updating the DAG metadata.
-func (s *syncService) processLocalLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
+func (s *syncService) processLocalLogRec(ctx *context.T, tx store.Transaction, rec *localLogRec) error {
// Insert the new log record into the log.
if err := putLogRec(ctx, tx, rec); err != nil {
return err
@@ -361,8 +359,7 @@
// simplify the store-to-sync interaction. A deleted key would still have a
// version and its value entry would encode the "deleted" flag, either in the
// key or probably in a value wrapper that would contain other metadata.
-func convertLogRecord(ctx *context.T, tx store.StoreReadWriter, logEnt *watchable.LogEntry) (*localLogRec, error) {
- _ = tx.(store.Transaction)
+func convertLogRecord(ctx *context.T, tx store.Transaction, logEnt *watchable.LogEntry) (*localLogRec, error) {
var rec *localLogRec
timestamp := logEnt.CommitTimestamp
@@ -404,9 +401,7 @@
// newLocalLogRec creates a local sync log record given its information: key,
// version, deletion flag, and timestamp. It retrieves the current DAG head
// for the key (if one exists) to use as its parent (previous) version.
-func newLocalLogRec(ctx *context.T, tx store.StoreReadWriter, key, version []byte, deleted bool, timestamp int64) *localLogRec {
- _ = tx.(store.Transaction)
-
+func newLocalLogRec(ctx *context.T, tx store.Transaction, key, version []byte, deleted bool, timestamp int64) *localLogRec {
rec := localLogRec{}
oid := string(key)
@@ -484,8 +479,7 @@
}
// setResMark stores the watcher resume marker for a database.
-func setResMark(ctx *context.T, tx store.StoreReadWriter, resMark string) error {
- _ = tx.(store.Transaction)
+func setResMark(ctx *context.T, tx store.Transaction, resMark string) error {
return util.Put(ctx, tx, resMarkKey(), resMark)
}