Merge "device manager test"
diff --git a/v23/syncbase/nosql/exec_test/exec_test.go b/v23/syncbase/nosql/exec_test/exec_test.go
index fdd63ca..e188585 100644
--- a/v23/syncbase/nosql/exec_test/exec_test.go
+++ b/v23/syncbase/nosql/exec_test/exec_test.go
@@ -922,7 +922,7 @@
{
"select v from Unknown",
// The following error text is dependent on the implementation of the query_db.Database interface.
- syncql.NewErrTableCantAccess(ctx, 14, "Unknown", errors.New("exec_test.test:\"a/db\".Exec: Does not exist: Unknown")),
+ syncql.NewErrTableCantAccess(ctx, 14, "Unknown", errors.New("exec_test.test:\"a/db\".Exec: Does not exist: $table:Unknown")),
},
{
"select v from Customer offset -1",
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index ff4b5cc..849f352 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -6,18 +6,24 @@
import (
"fmt"
+ "time"
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/v23/syncbase"
+ "v.io/syncbase/v23/syncbase/nosql"
tu "v.io/syncbase/v23/syncbase/testutil"
constants "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/v23"
"v.io/v23/naming"
+ "v.io/x/ref"
_ "v.io/x/ref/runtime/factories/generic"
"v.io/x/ref/test/modules"
"v.io/x/ref/test/v23tests"
)
+// TODO(rdaoud): change the credentials of servers s0 and s1 to independent
+// principals derived from the same root.
+
//go:generate v23 test generate
func V23TestSyncbasedJoinSyncGroup(t *v23tests.T) {
@@ -47,10 +53,13 @@
d := a.NoSQLDatabase("d")
d.Create(ctx, nil)
+ mtName := env.Vars[ref.EnvNamespacePrefix]
+
spec := wire.SyncGroupSpec{
Description: "test syncgroup sg",
Perms: perms("root/s0", "root/s0/s1"),
- Prefixes: []string{"t1/foo"},
+ Prefixes: []string{"t1:foo"},
+ MountTables: []string{mtName},
}
sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
sg := d.SyncGroup(sgName)
@@ -74,7 +83,133 @@
sg := d.SyncGroup(sgName)
info := wire.SyncGroupMemberInfo{10}
if _, err := sg.Join(ctx, info); err != nil {
- return fmt.Errorf("Join SG %v failed: %v", sgName, err)
+ return fmt.Errorf("Join SG %q failed: %v", sgName, err)
}
return nil
}, "runJoinSyncGroup")
+
+// V23TestSyncbasedGetDeltas tests the exchange of deltas between two Syncbase
+// instances and their clients. The 1st client creates a SyncGroup and puts
+// some database entries in it. The 2nd client joins that SyncGroup and reads
+// the database entries. This verifies the end-to-end synchronization of data
+// along the path: client1--Syncbase1--Syncbase2--client2.
+func V23TestSyncbasedGetDeltas(t *v23tests.T) {
+ v23tests.RunRootMT(t, "--v23.tcp.address=127.0.0.1:0")
+ server0Creds, _ := t.Shell().NewChildCredentials("s0")
+ client0Creds, _ := t.Shell().NewChildCredentials("s0/c0")
+ cleanSync0 := tu.StartSyncbased(t, server0Creds, "sync0", "",
+ `{"Read": {"In":["root/s0/c0"]}, "Write": {"In":["root/s0/c0"]}}`)
+ defer cleanSync0()
+
+ server1Creds, _ := t.Shell().NewChildCredentials("s0/s1")
+ client1Creds, _ := t.Shell().NewChildCredentials("s0/s1/c1")
+ cleanSync1 := tu.StartSyncbased(t, server1Creds, "sync1", "",
+ `{"Read": {"In":["root/s0/s1/c1"]}, "Write": {"In":["root/s0/s1/c1"]}}`)
+ defer cleanSync1()
+
+ tu.RunClient(t, client0Creds, runCreateAndPopulateSyncGroup)
+ tu.RunClient(t, client1Creds, runJoinSyncGroupAndFetchData)
+}
+
+var runCreateAndPopulateSyncGroup = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ a := syncbase.NewService("sync0").App("a")
+ a.Create(ctx, nil)
+ d := a.NoSQLDatabase("d")
+ d.Create(ctx, nil)
+ d.CreateTable(ctx, "tb", nil)
+
+ mtName := env.Vars[ref.EnvNamespacePrefix]
+
+ spec := wire.SyncGroupSpec{
+ Description: "test syncgroup sg",
+ Perms: perms("root/s0", "root/s0/s1"),
+ Prefixes: []string{"tb:foo"},
+ MountTables: []string{mtName},
+ }
+ sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
+ sg := d.SyncGroup(sgName)
+ info := wire.SyncGroupMemberInfo{8}
+ if err := sg.Create(ctx, spec, info); err != nil {
+ return fmt.Errorf("Create SG %q failed: %v", sgName, err)
+ }
+
+ // Do Puts.
+ tb := d.Table("tb")
+ for i := 0; i < 10; i++ {
+ key := fmt.Sprintf("foo%d", i)
+ r := tb.Row(key)
+ if err := r.Put(ctx, "testkey"+key); err != nil {
+ return fmt.Errorf("r.Put() failed: %v", err)
+ }
+ }
+
+ return nil
+}, "runCreateAndPopulateSyncGroup")
+
+var runJoinSyncGroupAndFetchData = modules.Register(func(env *modules.Env, args ...string) error {
+ ctx, shutdown := v23.Init()
+ defer shutdown()
+
+ a := syncbase.NewService("sync1").App("a")
+ a.Create(ctx, nil)
+ d := a.NoSQLDatabase("d")
+ d.Create(ctx, nil)
+ d.CreateTable(ctx, "tb", nil)
+
+ sgName := naming.Join("sync0", constants.SyncbaseSuffix, "foo")
+ sg := d.SyncGroup(sgName)
+ info := wire.SyncGroupMemberInfo{10}
+ if _, err := sg.Join(ctx, info); err != nil {
+ return fmt.Errorf("Join SG %q failed: %v\n", sgName, err)
+ }
+
+ // Wait for a bit (up to 4 sec) until the last key appears.
+ tb := d.Table("tb")
+ r := tb.Row("foo9")
+ for i := 0; i < 8; i++ {
+ time.Sleep(500 * time.Millisecond)
+ var value string
+ if err := r.Get(ctx, &value); err == nil {
+ break
+ }
+ }
+
+ // Verify that all keys and values made it correctly.
+ for i := 0; i < 10; i++ {
+ key := fmt.Sprintf("foo%d", i)
+ r := tb.Row(key)
+ var got string
+ if err := r.Get(ctx, &got); err != nil {
+ return fmt.Errorf("r.Get() failed: %v\n", err)
+ }
+ want := "testkey" + key
+ if got != want {
+ return fmt.Errorf("unexpected value: got %q, want %q\n", got, want)
+ }
+ }
+
+ // Re-verify using a scan operation.
+ stream := tb.Scan(ctx, nosql.Prefix("foo"))
+ for i := 0; stream.Advance(); i++ {
+ want := fmt.Sprintf("foo%d", i)
+ got := stream.Key()
+ if got != want {
+ return fmt.Errorf("unexpected key in scan: got %q, want %q\n", got, want)
+ }
+ want = "testkey" + want
+ if err := stream.Value(&got); err != nil {
+ return fmt.Errorf("cannot fetch value in scan: %v\n", err)
+ }
+ if got != want {
+ return fmt.Errorf("unexpected value in scan: got %q, want %q\n", got, want)
+ }
+ }
+ if err := stream.Err(); err != nil {
+ return fmt.Errorf("scan stream error: %v\n", err)
+ }
+
+ return nil
+}, "runJoinSyncGroupAndFetchData")
diff --git a/v23/syncbase/nosql/v23_test.go b/v23/syncbase/nosql/v23_test.go
index efbdd39..a14dbe0 100644
--- a/v23/syncbase/nosql/v23_test.go
+++ b/v23/syncbase/nosql/v23_test.go
@@ -28,3 +28,7 @@
func TestV23SyncbasedJoinSyncGroup(t *testing.T) {
v23tests.RunTest(t, V23TestSyncbasedJoinSyncGroup)
}
+
+func TestV23SyncbasedGetDeltas(t *testing.T) {
+ v23tests.RunTest(t, V23TestSyncbasedGetDeltas)
+}
diff --git a/v23/syncbase/util/glob.go b/v23/syncbase/util/glob.go
index 1f12149..5303521 100644
--- a/v23/syncbase/util/glob.go
+++ b/v23/syncbase/util/glob.go
@@ -13,8 +13,7 @@
"v.io/v23/naming"
)
-// List does namespace.Glob("name/*") and returns a sorted slice of results or
-// a VDL-compatible error.
+// List does namespace.Glob("name/*") and returns a sorted slice of results.
func List(ctx *context.T, name string) ([]string, error) {
// TODO(sadovsky): Why can't Glob be a method on the stub, just like every
// other streaming method? Even if we encourage clients to use the namespace
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index a2b081a..48338f5 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -35,7 +35,6 @@
var (
_ wire.AppServerMethods = (*app)(nil)
_ interfaces.App = (*app)(nil)
- _ util.Layer = (*app)(nil)
)
////////////////////////////////////////
@@ -69,7 +68,7 @@
return nil, "", verror.New(verror.ErrNoExist, ctx, a.name)
}
data := &appData{}
- if err := util.Get(ctx, call, a.s.st, a, data); err != nil {
+ if err := util.GetWithAuth(ctx, call, a.s.st, a.stKey(), data); err != nil {
return nil, "", err
}
return data.Perms, util.FormatVersion(data.Version), nil
@@ -84,7 +83,7 @@
closeSnapshot := func() error {
return sn.Close()
}
- if err := util.Get(ctx, call, sn, a, &appData{}); err != nil {
+ if err := util.GetWithAuth(ctx, call, sn, a.stKey(), &appData{}); err != nil {
closeSnapshot()
return nil, err
}
@@ -150,7 +149,7 @@
aData := &appData{}
if err := store.RunInTransaction(a.s.st, func(st store.StoreReadWriter) error {
// Check appData perms.
- if err := util.Get(ctx, call, st, a, aData); err != nil {
+ if err := util.GetWithAuth(ctx, call, st, a.stKey(), aData); err != nil {
return err
}
// Check for "database already exists".
@@ -267,20 +266,17 @@
return d.SetPermsInternal(ctx, call, perms, version)
}
-////////////////////////////////////////
-// util.Layer methods
-
func (a *app) Name() string {
return a.name
}
-func (a *app) StKey() string {
- return util.JoinKeyParts(util.AppPrefix, a.stKeyPart())
-}
-
////////////////////////////////////////
// Internal helpers
+func (a *app) stKey() string {
+ return util.JoinKeyParts(util.AppPrefix, a.stKeyPart())
+}
+
func (a *app) stKeyPart() string {
return a.name
}
diff --git a/x/ref/services/syncbase/server/db_info.go b/x/ref/services/syncbase/server/db_info.go
index cbfcebd..39ffdc8 100644
--- a/x/ref/services/syncbase/server/db_info.go
+++ b/x/ref/services/syncbase/server/db_info.go
@@ -19,58 +19,30 @@
"v.io/v23/context"
)
-type dbInfoLayer struct {
- name string
- a *app
-}
-
-var (
- _ util.Layer = (*dbInfoLayer)(nil)
-)
-
-////////////////////////////////////////
-// dbInfoLayer util.Layer methods
-
-func (d *dbInfoLayer) Name() string {
- return d.name
-}
-
-func (d *dbInfoLayer) StKey() string {
- return util.JoinKeyParts(util.DbInfoPrefix, d.stKeyPart())
-}
-
-////////////////////////////////////////
-// Internal helpers
-
-func (d *dbInfoLayer) stKeyPart() string {
- return util.JoinKeyParts(d.a.stKeyPart(), d.name)
+func dbInfoStKey(a *app, dbName string) string {
+ return util.JoinKeyParts(util.DbInfoPrefix, a.stKeyPart(), dbName)
}
// getDbInfo reads data from the storage engine.
-// Returns a VDL-compatible error.
func (a *app) getDbInfo(ctx *context.T, st store.StoreReader, dbName string) (*dbInfo, error) {
info := &dbInfo{}
- if err := util.GetWithoutAuth(ctx, st, &dbInfoLayer{dbName, a}, info); err != nil {
+ if err := util.Get(ctx, st, dbInfoStKey(a, dbName), info); err != nil {
return nil, err
}
return info, nil
}
// putDbInfo writes data to the storage engine.
-// Returns a VDL-compatible error.
func (a *app) putDbInfo(ctx *context.T, st store.StoreWriter, dbName string, info *dbInfo) error {
- return util.Put(ctx, st, &dbInfoLayer{dbName, a}, info)
+ return util.Put(ctx, st, dbInfoStKey(a, dbName), info)
}
// delDbInfo deletes data from the storage engine.
-// Returns a VDL-compatible error.
func (a *app) delDbInfo(ctx *context.T, st store.StoreWriter, dbName string) error {
- return util.Delete(ctx, st, &dbInfoLayer{dbName, a})
+ return util.Delete(ctx, st, dbInfoStKey(a, dbName))
}
-// updateDbInfo performs a read-modify-write.
-// fn should "modify" v, and should return a VDL-compatible error.
-// Returns a VDL-compatible error.
+// updateDbInfo performs a read-modify-write. fn should "modify" v.
func (a *app) updateDbInfo(ctx *context.T, st store.StoreReadWriter, dbName string, fn func(info *dbInfo) error) error {
_ = st.(store.Transaction) // panics on failure, as desired
info, err := a.getDbInfo(ctx, st, dbName)
diff --git a/x/ref/services/syncbase/server/db_info_test.go b/x/ref/services/syncbase/server/db_info_test.go
index 47db158..7bc0870 100644
--- a/x/ref/services/syncbase/server/db_info_test.go
+++ b/x/ref/services/syncbase/server/db_info_test.go
@@ -8,23 +8,18 @@
"testing"
)
-type stk struct {
- appName string
- dbName string
- stkey string
-}
-
-var stTestKey stk = stk{"app1", "db1", "$dbInfo:app1:db1"}
-
-var dbinfo *dbInfoLayer = &dbInfoLayer{
- name: stTestKey.dbName,
- a: &app{
- name: stTestKey.appName,
- },
-}
-
func TestStKey(t *testing.T) {
- if stTestKey.stkey != dbinfo.StKey() {
- t.Errorf("dbInfoLayer stkey expected to be %q but found to be %q", stTestKey.stkey, dbinfo.StKey())
+ tests := []struct {
+ appName string
+ dbName string
+ stKey string
+ }{
+ {"app1", "db1", "$dbInfo:app1:db1"},
+ }
+ for _, test := range tests {
+ got, want := dbInfoStKey(&app{name: test.appName}, test.dbName), test.stKey
+ if got != want {
+ t.Errorf("wrong stKey: got %q, want %q", got, want)
+ }
}
}
diff --git a/x/ref/services/syncbase/server/interfaces/app.go b/x/ref/services/syncbase/server/interfaces/app.go
index 8ac9990..b1d5d4f 100644
--- a/x/ref/services/syncbase/server/interfaces/app.go
+++ b/x/ref/services/syncbase/server/interfaces/app.go
@@ -5,15 +5,12 @@
package interfaces
import (
- "v.io/syncbase/x/ref/services/syncbase/server/util"
-
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security/access"
)
// App is an internal interface to the app layer.
-// All methods return VDL-compatible errors.
type App interface {
// Service returns the service handle for this app.
Service() Service
@@ -33,5 +30,6 @@
// SetDatabasePerms sets the perms for the specified database.
SetDatabasePerms(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions, version string) error
- util.Layer
+ // Name returns the name of this app.
+ Name() string
}
diff --git a/x/ref/services/syncbase/server/interfaces/database.go b/x/ref/services/syncbase/server/interfaces/database.go
index 832a940..bf518a4 100644
--- a/x/ref/services/syncbase/server/interfaces/database.go
+++ b/x/ref/services/syncbase/server/interfaces/database.go
@@ -5,7 +5,6 @@
package interfaces
import (
- "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
@@ -13,7 +12,6 @@
)
// Database is an internal interface to the database layer.
-// All methods return VDL-compatible errors.
type Database interface {
// St returns the storage engine instance for this database.
St() store.Store
@@ -30,5 +28,6 @@
// Designed for use from within App.SetDatabasePerms.
SetPermsInternal(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error
- util.Layer
+ // Name returns the name of this database.
+ Name() string
}
diff --git a/x/ref/services/syncbase/server/interfaces/service.go b/x/ref/services/syncbase/server/interfaces/service.go
index 4505dce..ce665e2 100644
--- a/x/ref/services/syncbase/server/interfaces/service.go
+++ b/x/ref/services/syncbase/server/interfaces/service.go
@@ -11,7 +11,6 @@
)
// Service is an internal interface to the service layer.
-// All methods return VDL-compatible errors.
type Service interface {
// St returns the storage engine instance for this service.
St() store.Store
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index f12d437..62ad371 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -62,7 +62,6 @@
var (
_ wire.DatabaseServerMethods = (*databaseReq)(nil)
_ interfaces.Database = (*database)(nil)
- _ util.Layer = (*database)(nil)
)
// DatabaseOptions configures a database.
@@ -99,7 +98,6 @@
}
// NewDatabase creates a new database instance and returns it.
-// Returns a VDL-compatible error.
// Designed for use from within App.CreateNoSQLDatabase.
func NewDatabase(ctx *context.T, a interfaces.App, name string, opts DatabaseOptions) (*database, error) {
if opts.Perms == nil {
@@ -113,7 +111,7 @@
Name: d.name,
Perms: opts.Perms,
}
- if err := util.Put(ctx, d.st, d, data); err != nil {
+ if err := util.Put(ctx, d.st, d.stKey(), data); err != nil {
return nil, err
}
return d, nil
@@ -275,7 +273,7 @@
return nil, "", wire.NewErrBoundToBatch(ctx)
}
data := &databaseData{}
- if err := util.Get(ctx, call, d.st, d, data); err != nil {
+ if err := util.GetWithAuth(ctx, call, d.st, d.stKey(), data); err != nil {
return nil, "", err
}
return data.Perms, util.FormatVersion(data.Version), nil
@@ -293,7 +291,7 @@
closeSnapshot := func() error {
return sn.Close()
}
- if err := util.Get(ctx, call, sn, d, &databaseData{}); err != nil {
+ if err := util.GetWithAuth(ctx, call, sn, d.stKey(), &databaseData{}); err != nil {
closeSnapshot()
return nil, err
}
@@ -348,7 +346,7 @@
if !d.exists {
vlog.Fatalf("database %q does not exist", d.name)
}
- return util.Get(ctx, call, st, d, &databaseData{})
+ return util.GetWithAuth(ctx, call, st, d.stKey(), &databaseData{})
}
func (d *database) SetPermsInternal(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
@@ -357,7 +355,7 @@
}
return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
data := &databaseData{}
- return util.Update(ctx, call, st, d, data, func() error {
+ return util.UpdateWithAuth(ctx, call, st, d.stKey(), data, func() error {
if err := util.CheckVersion(ctx, version, data.Version); err != nil {
return err
}
@@ -368,19 +366,12 @@
})
}
-////////////////////////////////////////
-// util.Layer methods
-
func (d *database) Name() string {
return d.name
}
-func (d *database) StKey() string {
- return util.DatabasePrefix
-}
-
////////////////////////////////////////
-// query_db implementations
+// query_db implementation
// Implement query_db's Database, Table and KeyValueStream interfaces.
type queryDb struct {
@@ -403,7 +394,7 @@
},
}
// Now that we have a table, we need to check permissions.
- if err := util.Get(db.ctx, db.call, db.st, tDb.req, &tableData{}); err != nil {
+ if err := util.GetWithAuth(db.ctx, db.call, db.st, tDb.req.stKey(), &tableData{}); err != nil {
return nil, err
}
return tDb, nil
@@ -505,6 +496,10 @@
////////////////////////////////////////
// Internal helpers
+func (d *database) stKey() string {
+ return util.DatabasePrefix
+}
+
func (d *databaseReq) batchReader() store.StoreReader {
if d.batchId == nil {
return nil
diff --git a/x/ref/services/syncbase/server/nosql/row.go b/x/ref/services/syncbase/server/nosql/row.go
index 542224a..88c08c7 100644
--- a/x/ref/services/syncbase/server/nosql/row.go
+++ b/x/ref/services/syncbase/server/nosql/row.go
@@ -21,7 +21,6 @@
var (
_ wire.RowServerMethods = (*rowReq)(nil)
- _ util.Layer = (*rowReq)(nil)
)
////////////////////////////////////////
@@ -73,41 +72,32 @@
}
////////////////////////////////////////
-// util.Layer methods
+// Internal helpers
-func (r *rowReq) Name() string {
- return r.key
-}
-
-func (r *rowReq) StKey() string {
+func (r *rowReq) stKey() string {
return util.JoinKeyParts(util.RowPrefix, r.stKeyPart())
}
-////////////////////////////////////////
-// Internal helpers
-
func (r *rowReq) stKeyPart() string {
return util.JoinKeyParts(r.t.stKeyPart(), r.key)
}
// checkAccess checks that this row's table exists in the database, and performs
// an authorization check.
-// Returns a VDL-compatible error.
func (r *rowReq) checkAccess(ctx *context.T, call rpc.ServerCall, st store.StoreReader) error {
return r.t.checkAccess(ctx, call, st, r.key)
}
// get reads data from the storage engine.
// Performs authorization check.
-// Returns a VDL-compatible error.
func (r *rowReq) get(ctx *context.T, call rpc.ServerCall, st store.StoreReader) ([]byte, error) {
if err := r.checkAccess(ctx, call, st); err != nil {
return nil, err
}
- value, err := st.Get([]byte(r.StKey()), nil)
+ value, err := st.Get([]byte(r.stKey()), nil)
if err != nil {
if verror.ErrorID(err) == store.ErrUnknownKey.ID {
- return nil, verror.New(verror.ErrNoExist, ctx, r.Name())
+ return nil, verror.New(verror.ErrNoExist, ctx, r.stKey())
}
return nil, verror.New(verror.ErrInternal, ctx, err)
}
@@ -116,12 +106,11 @@
// put writes data to the storage engine.
// Performs authorization check.
-// Returns a VDL-compatible error.
func (r *rowReq) put(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, value []byte) error {
if err := r.checkAccess(ctx, call, st); err != nil {
return err
}
- if err := st.Put([]byte(r.StKey()), value); err != nil {
+ if err := st.Put([]byte(r.stKey()), value); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
@@ -129,12 +118,11 @@
// delete deletes data from the storage engine.
// Performs authorization check.
-// Returns a VDL-compatible error.
func (r *rowReq) delete(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
if err := r.checkAccess(ctx, call, st); err != nil {
return err
}
- if err := st.Delete([]byte(r.StKey())); err != nil {
+ if err := st.Delete([]byte(r.stKey())); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
diff --git a/x/ref/services/syncbase/server/nosql/table.go b/x/ref/services/syncbase/server/nosql/table.go
index 82a0cab..37e1fbe 100644
--- a/x/ref/services/syncbase/server/nosql/table.go
+++ b/x/ref/services/syncbase/server/nosql/table.go
@@ -25,7 +25,6 @@
var (
_ wire.TableServerMethods = (*tableReq)(nil)
- _ util.Layer = (*tableReq)(nil)
)
////////////////////////////////////////
@@ -38,11 +37,11 @@
return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
// Check databaseData perms.
dData := &databaseData{}
- if err := util.Get(ctx, call, st, t.d, dData); err != nil {
+ if err := util.GetWithAuth(ctx, call, st, t.d.stKey(), dData); err != nil {
return err
}
// Check for "table already exists".
- if err := util.GetWithoutAuth(ctx, st, t, &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.Get(ctx, st, t.stKey(), &tableData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -57,7 +56,7 @@
Name: t.name,
Perms: perms,
}
- return util.Put(ctx, st, t, data)
+ return util.Put(ctx, st, t.stKey(), data)
})
}
@@ -67,14 +66,14 @@
}
return store.RunInTransaction(t.d.st, func(st store.StoreReadWriter) error {
// Read-check-delete tableData.
- if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ if err := util.GetWithAuth(ctx, call, st, t.stKey(), &tableData{}); err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
return nil // delete is idempotent
}
return err
}
// TODO(sadovsky): Delete all rows in this table.
- return util.Delete(ctx, st, t)
+ return util.Delete(ctx, st, t.stKey())
})
}
@@ -199,7 +198,7 @@
}
if prefix == "" {
data := &tableData{}
- return util.Update(ctx, call, st, t, data, func() error {
+ return util.UpdateWithAuth(ctx, call, st, t.stKey(), data, func() error {
data.Perms = perms
return nil
})
@@ -224,10 +223,10 @@
stPrefixLimit := stPrefix + util.PrefixRangeLimitSuffix
prefixPerms = stPrefixPerms{Parent: parent, Perms: perms}
// Put the (prefix, perms) pair to the database.
- if err := util.PutObject(st, stPrefix, prefixPerms); err != nil {
+ if err := util.Put(ctx, st, stPrefix, prefixPerms); err != nil {
return err
}
- return util.PutObject(st, stPrefixLimit, prefixPerms)
+ return util.Put(ctx, st, stPrefixLimit, prefixPerms)
}
if t.d.batchId != nil {
if st, err := t.d.batchReadWriter(); err != nil {
@@ -315,19 +314,12 @@
}
////////////////////////////////////////
-// util.Layer methods
+// Internal helpers
-func (t *tableReq) Name() string {
- return t.name
-}
-
-func (t *tableReq) StKey() string {
+func (t *tableReq) stKey() string {
return util.JoinKeyParts(util.TablePrefix, t.stKeyPart())
}
-////////////////////////////////////////
-// Internal helpers
-
func (t *tableReq) stKeyPart() string {
return t.name
}
@@ -348,7 +340,7 @@
return verror.New(verror.ErrInternal, ctx, err)
}
prefixPerms.Parent = newParent
- if err := util.PutObject(st, string(key), prefixPerms); err != nil {
+ if err := util.Put(ctx, st, string(key), prefixPerms); err != nil {
it.Cancel()
return err
}
@@ -359,31 +351,29 @@
return nil
}
-// lock invalidates all concurrent transactions with ErrConcurrentTransaction
-// that have accessed this table.
-// Returns a VDL-compatible error.
+// lock invalidates all in-flight transactions that have touched this table,
+// such that any subsequent tx.Commit() will return ErrConcurrentTransaction.
//
-// It is necessary to call lock() every time prefix permissions are updated,
-// so snapshots inside all transactions reflect up-to-date permissions. Since
+// It is necessary to call lock() every time prefix permissions are updated so
+// that snapshots inside all transactions reflect up-to-date permissions. Since
// every public function that touches this table has to read the table-level
-// permissions object, it is enough to add the key of table-level permissions
-// to the write set of the current transaction.
+// permissions object, it suffices to add the key of this object to the write
+// set of the current transaction.
//
// TODO(rogulenko): Revisit this behavior to provide more granularity.
-// A possible option would be to add prefix and its parent to the write set
-// of the current transaction when permissions object for a prefix is updated.
+// One option is to add a prefix and its parent to the write set of the current
+// transaction when the permissions object for that prefix is updated.
func (t *tableReq) lock(ctx *context.T, st store.StoreReadWriter) error {
var data tableData
- if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+ if err := util.Get(ctx, st, t.stKey(), &data); err != nil {
return err
}
- return util.Put(ctx, st, t, data)
+ return util.Put(ctx, st, t.stKey(), data)
}
// checkAccess checks that this table exists in the database, and performs
// an authorization check. The access is checked at table level and at the
// level of the most specific prefix for the given key.
-// Returns a VDL-compatible error.
// TODO(rogulenko): Revisit this behavior. Eventually we'll want the table-level
// access check to be a check for "Resolve", i.e. also check access to
// service, app and database.
@@ -393,7 +383,7 @@
return err
}
if prefix != "" {
- if err := util.Get(ctx, call, st, t, &tableData{}); err != nil {
+ if err := util.GetWithAuth(ctx, call, st, t.stKey(), &tableData{}); err != nil {
return err
}
}
@@ -405,15 +395,14 @@
}
// permsForKey returns the longest prefix of the given key that has
-// associated permissions with its permissions object.
+// associated permissions, along with its permissions object.
// permsForKey doesn't perform an authorization check.
-// Returns a VDL-compatible error.
//
-// Virtually we represent all prefixes as a forest T, where each vertex maps to
-// a prefix. A parent for a string is the maximum proper prefix of it that
+// Effectively, we represent all prefixes as a forest T, where each vertex maps
+// to a prefix. A parent for a string is the maximum proper prefix of it that
// belongs to T. Each prefix P from T is represented as a pair of entries with
-// keys P and P~ with values of type stPrefixPerms (parent + perms).
-// High level of how this function works:
+// keys P and P~ with values of type stPrefixPerms (parent + perms). High level
+// explanation of how this function works:
// 1 iter = db.Scan(K, "")
// Here last character of iter.Key() is removed automatically if it is '~'
// 2 if hasPrefix(K, iter.Key()) return iter.Value()
@@ -452,17 +441,16 @@
// permsForPrefix returns the permissions object associated with the
// provided prefix.
-// Returns a VDL-compatible error.
func (t *tableReq) permsForPrefix(ctx *context.T, st store.StoreReader, prefix string) (stPrefixPerms, error) {
if prefix == "" {
var data tableData
- if err := util.GetWithoutAuth(ctx, st, t, &data); err != nil {
+ if err := util.Get(ctx, st, t.stKey(), &data); err != nil {
return stPrefixPerms{}, err
}
return stPrefixPerms{Perms: data.Perms}, nil
}
var prefixPerms stPrefixPerms
- if err := util.GetObject(st, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
+ if err := util.Get(ctx, st, t.prefixPermsKey(prefix), &prefixPerms); err != nil {
return stPrefixPerms{}, verror.New(verror.ErrInternal, ctx, err)
}
return prefixPerms, nil
diff --git a/x/ref/services/syncbase/server/service.go b/x/ref/services/syncbase/server/service.go
index 9f9e5b3..8ed1480 100644
--- a/x/ref/services/syncbase/server/service.go
+++ b/x/ref/services/syncbase/server/service.go
@@ -39,7 +39,6 @@
var (
_ wire.ServiceServerMethods = (*service)(nil)
_ interfaces.Service = (*service)(nil)
- _ util.Layer = (*service)(nil)
)
// ServiceOptions configures a service.
@@ -56,7 +55,6 @@
}
// NewService creates a new service instance and returns it.
-// Returns a VDL-compatible error.
// TODO(sadovsky): If possible, close all stores when the server is stopped.
func NewService(ctx *context.T, call rpc.ServerCall, opts ServiceOptions) (*service, error) {
if opts.Perms == nil {
@@ -74,7 +72,7 @@
data := &serviceData{
Perms: opts.Perms,
}
- if err := util.GetWithoutAuth(ctx, st, s, &serviceData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.Get(ctx, st, s.stKey(), &serviceData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return nil, err
}
@@ -125,7 +123,7 @@
}
} else {
// Service does not exist.
- if err := util.Put(ctx, st, s, data); err != nil {
+ if err := util.Put(ctx, st, s.stKey(), data); err != nil {
return nil, err
}
}
@@ -143,7 +141,7 @@
func (s *service) SetPermissions(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
return store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
data := &serviceData{}
- return util.Update(ctx, call, st, s, data, func() error {
+ return util.UpdateWithAuth(ctx, call, st, s.stKey(), data, func() error {
if err := util.CheckVersion(ctx, version, data.Version); err != nil {
return err
}
@@ -156,7 +154,7 @@
func (s *service) GetPermissions(ctx *context.T, call rpc.ServerCall) (perms access.Permissions, version string, err error) {
data := &serviceData{}
- if err := util.Get(ctx, call, s.st, s, data); err != nil {
+ if err := util.GetWithAuth(ctx, call, s.st, s.stKey(), data); err != nil {
return nil, "", err
}
return data.Perms, util.FormatVersion(data.Version), nil
@@ -168,7 +166,7 @@
closeSnapshot := func() error {
return sn.Close()
}
- if err := util.Get(ctx, call, sn, s, &serviceData{}); err != nil {
+ if err := util.GetWithAuth(ctx, call, sn, s.stKey(), &serviceData{}); err != nil {
closeSnapshot()
return nil, err
}
@@ -230,11 +228,11 @@
if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
// Check serviceData perms.
sData := &serviceData{}
- if err := util.Get(ctx, call, st, s, sData); err != nil {
+ if err := util.GetWithAuth(ctx, call, st, s.stKey(), sData); err != nil {
return err
}
// Check for "app already exists".
- if err := util.GetWithoutAuth(ctx, st, a, &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
+ if err := util.Get(ctx, st, a.stKey(), &appData{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
if err != nil {
return err
}
@@ -248,7 +246,7 @@
Name: appName,
Perms: perms,
}
- return util.Put(ctx, st, a, data)
+ return util.Put(ctx, st, a.stKey(), data)
}); err != nil {
return err
}
@@ -267,14 +265,14 @@
if err := store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
// Read-check-delete appData.
- if err := util.Get(ctx, call, st, a, &appData{}); err != nil {
+ if err := util.GetWithAuth(ctx, call, st, a.stKey(), &appData{}); err != nil {
if verror.ErrorID(err) == verror.ErrNoExist.ID {
return nil // delete is idempotent
}
return err
}
// TODO(sadovsky): Delete all databases in this app.
- return util.Delete(ctx, st, a)
+ return util.Delete(ctx, st, a.stKey())
}); err != nil {
return err
}
@@ -292,7 +290,7 @@
}
return store.RunInTransaction(s.st, func(st store.StoreReadWriter) error {
data := &appData{}
- return util.Update(ctx, call, st, a, data, func() error {
+ return util.UpdateWithAuth(ctx, call, st, a.stKey(), data, func() error {
if err := util.CheckVersion(ctx, version, data.Version); err != nil {
return err
}
@@ -304,12 +302,8 @@
}
////////////////////////////////////////
-// util.Layer methods
+// Other internal helpers
-func (s *service) Name() string {
- return "service"
-}
-
-func (s *service) StKey() string {
+func (s *service) stKey() string {
return util.ServicePrefix
}
diff --git a/x/ref/services/syncbase/server/util/store_util.go b/x/ref/services/syncbase/server/util/store_util.go
index a30f487..d62c222 100644
--- a/x/ref/services/syncbase/server/util/store_util.go
+++ b/x/ref/services/syncbase/server/util/store_util.go
@@ -29,100 +29,73 @@
return nil
}
-////////////////////////////////////////////////////////////
-// RPC-aware, higher-level get/put
-
-type Layer interface {
- // Name returns the name of this instance, e.g. "fooapp" or "bardb".
- Name() string
- // StKey returns the storage engine key to use for metadata about this layer,
- // e.g. "$table:baztable".
- StKey() string
-}
+// TODO(sadovsky): Perhaps these functions should strip key prefixes such as
+// "$table:" from the error messages they return.
type Permser interface {
// GetPerms returns the Permissions for this Layer.
GetPerms() access.Permissions
}
-// GetWithoutAuth does st.Get(l.StKey(), v), populating v.
-// Returns a VDL-compatible error.
-func GetWithoutAuth(ctx *context.T, st store.StoreReader, l Layer, v interface{}) error {
- if err := GetObject(st, l.StKey(), v); err != nil {
+// Get does st.Get(k, v) and wraps the returned error.
+func Get(ctx *context.T, st store.StoreReader, k string, v interface{}) error {
+ bytes, err := st.Get([]byte(k), nil)
+ if err != nil {
if verror.ErrorID(err) == store.ErrUnknownKey.ID {
- return verror.New(verror.ErrNoExist, ctx, l.Name())
+ return verror.New(verror.ErrNoExist, ctx, k)
}
return verror.New(verror.ErrInternal, ctx, err)
}
+ if err = vom.Decode(bytes, v); err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
return nil
}
-// Get does GetWithoutAuth followed by an auth check.
-// Returns a VDL-compatible error.
-func Get(ctx *context.T, call rpc.ServerCall, st store.StoreReader, l Layer, v Permser) error {
- if err := GetWithoutAuth(ctx, st, l, v); err != nil {
+// GetWithAuth does Get followed by an auth check.
+func GetWithAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReader, k string, v Permser) error {
+ if err := Get(ctx, st, k, v); err != nil {
return err
}
auth, _ := access.PermissionsAuthorizer(v.GetPerms(), access.TypicalTagType())
if err := auth.Authorize(ctx, call.Security()); err != nil {
- return verror.New(verror.ErrNoAccess, ctx, l.Name())
+ return verror.New(verror.ErrNoAccess, ctx, err)
}
return nil
}
-// Put does st.Put(l.StKey(), v).
-// Returns a VDL-compatible error.
-// If you need to perform an authorization check, use Update().
-func Put(ctx *context.T, st store.StoreWriter, l Layer, v interface{}) error {
- if err := PutObject(st, l.StKey(), v); err != nil {
+// Put does st.Put(k, v) and wraps the returned error.
+func Put(ctx *context.T, st store.StoreWriter, k string, v interface{}) error {
+ bytes, err := vom.Encode(v)
+ if err != nil {
+ return verror.New(verror.ErrInternal, ctx, err)
+ }
+ if err = st.Put([]byte(k), bytes); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
-// Delete does st.Delete(l.StKey()).
-// Returns a VDL-compatible error.
-// If you need to perform an authorization check, call Get() first.
-func Delete(ctx *context.T, st store.StoreWriter, l Layer) error {
- if err := st.Delete([]byte(l.StKey())); err != nil {
+// Delete does st.Delete(k, v) and wraps the returned error.
+func Delete(ctx *context.T, st store.StoreWriter, k string) error {
+ if err := st.Delete([]byte(k)); err != nil {
return verror.New(verror.ErrInternal, ctx, err)
}
return nil
}
-// Update performs a read-modify-write.
-// Input v is populated by the "read" step. fn should "modify" v, and should
-// return a VDL-compatible error.
+// UpdateWithAuth performs a read-modify-write.
+// Input v is populated by the "read" step. fn should "modify" v.
// Performs an auth check as part of the "read" step.
-// Returns a VDL-compatible error.
-func Update(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, l Layer, v Permser, fn func() error) error {
+func UpdateWithAuth(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter, k string, v Permser, fn func() error) error {
_ = st.(store.Transaction) // panics on failure, as desired
- if err := Get(ctx, call, st, l, v); err != nil {
+ if err := GetWithAuth(ctx, call, st, k, v); err != nil {
return err
}
if err := fn(); err != nil {
return err
}
- return Put(ctx, st, l, v)
-}
-
-////////////////////////////////////////////////////////////
-// RPC-oblivious, lower-level helpers
-
-func GetObject(st store.StoreReader, k string, v interface{}) error {
- bytes, err := st.Get([]byte(k), nil)
- if err != nil {
- return err
- }
- return vom.Decode(bytes, v)
-}
-
-func PutObject(st store.StoreWriter, k string, v interface{}) error {
- bytes, err := vom.Encode(v)
- if err != nil {
- return err
- }
- return st.Put([]byte(k), bytes)
+ return Put(ctx, st, k, v)
}
type OpenOptions struct {
diff --git a/x/ref/services/syncbase/server/watchable/transaction.go b/x/ref/services/syncbase/server/watchable/transaction.go
index d5206b1..9890b6a 100644
--- a/x/ref/services/syncbase/server/watchable/transaction.go
+++ b/x/ref/services/syncbase/server/watchable/transaction.go
@@ -81,7 +81,6 @@
if !tx.st.managesKey(key) {
return tx.itx.Put(key, value)
}
-
version, err := putVersioned(tx.itx, key, value)
if err != nil {
return err
@@ -99,12 +98,14 @@
}
var err error
if !tx.st.managesKey(key) {
- err = tx.itx.Delete(key)
- } else {
- err = deleteVersioned(tx.itx, key)
- tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+ return tx.itx.Delete(key)
}
- return err
+ err = deleteVersioned(tx.itx, key)
+ if err != nil {
+ return err
+ }
+ tx.ops = append(tx.ops, &OpDelete{DeleteOp{Key: key}})
+ return nil
}
// Commit implements the store.Transaction interface.
@@ -132,7 +133,7 @@
FromSync: tx.fromSync,
Continued: i < len(tx.ops)-1,
}
- if err := util.PutObject(tx.itx, key, value); err != nil {
+ if err := util.Put(nil, tx.itx, key, value); err != nil {
return err
}
seq++
diff --git a/x/ref/services/syncbase/vsync/dag.go b/x/ref/services/syncbase/vsync/dag.go
index 72e8d91..c0d8efa 100644
--- a/x/ref/services/syncbase/vsync/dag.go
+++ b/x/ref/services/syncbase/vsync/dag.go
@@ -713,10 +713,7 @@
return verror.New(verror.ErrInternal, ctx, "invalid version", version)
}
- if err := util.PutObject(tx, nodeKey(oid, version), node); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Put(ctx, tx, nodeKey(oid, version), node)
}
// getNode retrieves the DAG node entry for the given (oid, version).
@@ -727,8 +724,8 @@
var node dagNode
key := nodeKey(oid, version)
- if err := util.GetObject(st, key, &node); err != nil {
- return nil, translateError(ctx, err, key)
+ if err := util.Get(ctx, st, key, &node); err != nil {
+ return nil, err
}
return &node, nil
}
@@ -741,10 +738,7 @@
return verror.New(verror.ErrInternal, ctx, "invalid version", version)
}
- if err := tx.Delete([]byte(nodeKey(oid, version))); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Delete(ctx, tx, nodeKey(oid, version))
}
// hasNode returns true if the node (oid, version) exists in the DAG.
@@ -769,18 +763,15 @@
return verror.New(verror.ErrInternal, ctx, fmt.Errorf("invalid version: %s", version))
}
- if err := util.PutObject(tx, headKey(oid), version); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Put(ctx, tx, headKey(oid), version)
}
// getHead retrieves the DAG object head.
func getHead(ctx *context.T, st store.StoreReader, oid string) (string, error) {
var version string
key := headKey(oid)
- if err := util.GetObject(st, key, &version); err != nil {
- return NoVersion, translateError(ctx, err, key)
+ if err := util.Get(ctx, st, key, &version); err != nil {
+ return NoVersion, err
}
return version, nil
}
@@ -788,11 +779,7 @@
// delHead deletes the DAG object head.
func delHead(ctx *context.T, tx store.StoreReadWriter, oid string) error {
_ = tx.(store.Transaction)
-
- if err := tx.Delete([]byte(headKey(oid))); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Delete(ctx, tx, headKey(oid))
}
// batchKey returns the key used to access the DAG batch info.
@@ -808,10 +795,7 @@
return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
}
- if err := util.PutObject(tx, batchKey(btid), info); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Put(ctx, tx, batchKey(btid), info)
}
// getBatch retrieves the DAG batch entry.
@@ -822,8 +806,8 @@
var info batchInfo
key := batchKey(btid)
- if err := util.GetObject(st, key, &info); err != nil {
- return nil, translateError(ctx, err, key)
+ if err := util.Get(ctx, st, key, &info); err != nil {
+ return nil, err
}
return &info, nil
}
@@ -836,10 +820,7 @@
return verror.New(verror.ErrInternal, ctx, "invalid batch id", btid)
}
- if err := tx.Delete([]byte(batchKey(btid))); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Delete(ctx, tx, batchKey(btid))
}
// getParentMap is a testing and debug helper function that returns for an
diff --git a/x/ref/services/syncbase/vsync/initiator.go b/x/ref/services/syncbase/vsync/initiator.go
index 1624301..1d447c5 100644
--- a/x/ref/services/syncbase/vsync/initiator.go
+++ b/x/ref/services/syncbase/vsync/initiator.go
@@ -10,6 +10,7 @@
"time"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
@@ -124,7 +125,13 @@
}
iSt.stream = stream
- req := interfaces.DeltaReq{SgIds: iSt.sgIds, InitVec: iSt.local}
+ req := interfaces.DeltaReq{
+ AppName: iSt.appName,
+ DbName: iSt.dbName,
+ SgIds: iSt.sgIds,
+ InitVec: iSt.local,
+ }
+
sender := iSt.stream.SendStream()
sender.Send(req)
@@ -264,7 +271,8 @@
return nil, false
}
for mt := range iSt.mtTables {
- c := interfaces.SyncClient(naming.Join(mt, iSt.peer))
+ absName := naming.Join(mt, iSt.peer, util.SyncbaseSuffix)
+ c := interfaces.SyncClient(absName)
stream, err := c.GetDeltas(ctx)
if err == nil {
return stream, true
@@ -382,7 +390,6 @@
return verror.New(verror.ErrInternal, ctx, "received finish followed by finish in delta response stream")
}
finish = true
- break
case interfaces.DeltaRespRespVec:
iSt.remote = v.Value
@@ -411,6 +418,11 @@
// Mark object dirty.
iSt.updObjects[rec.Metadata.ObjId] = &objConflictState{}
}
+
+ // Break out of the stream.
+ if finish {
+ break
+ }
}
if !(start && finish) {
@@ -514,6 +526,7 @@
for {
iSt.tx = iSt.st.NewTransaction()
+ watchable.SetTransactionFromSync(iSt.tx) // for echo-suppression
if err := iSt.detectConflicts(ctx); err != nil {
return err
diff --git a/x/ref/services/syncbase/vsync/initiator_test.go b/x/ref/services/syncbase/vsync/initiator_test.go
index bfcb250..d3c55d4 100644
--- a/x/ref/services/syncbase/vsync/initiator_test.go
+++ b/x/ref/services/syncbase/vsync/initiator_test.go
@@ -256,7 +256,7 @@
testIfMapArrEqual(t, iSt.sgPfxs, sg1.Spec.Prefixes)
testIfMapArrEqual(t, iSt.mtTables, sg1.Spec.MountTables)
- s.syncState[gdb] = &dbSyncStateInMem{}
+ s.initDbSyncStateInMem(nil, "mockapp", "mockdb")
// Create local genvec so that it contains knowledge only about common prefixes.
if err := iSt.createLocalGenVec(nil); err != nil {
diff --git a/x/ref/services/syncbase/vsync/responder.go b/x/ref/services/syncbase/vsync/responder.go
index 19b1366..d096aa8 100644
--- a/x/ref/services/syncbase/vsync/responder.go
+++ b/x/ref/services/syncbase/vsync/responder.go
@@ -10,15 +10,17 @@
"strings"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/verror"
+ "v.io/x/lib/vlog"
)
// GetDeltas implements the responder side of the GetDeltas RPC.
func (s *syncService) GetDeltas(ctx *context.T, call interfaces.SyncGetDeltasServerCall) error {
recvr := call.RecvStream()
-
for recvr.Advance() {
req := recvr.Value()
// Ignoring errors since if one Database fails for any reason,
@@ -281,10 +283,11 @@
if !filterLogRec(rec, rSt.req.InitVec, initPfxs) {
// Send on the wire.
- wireRec := interfaces.LogRec{Metadata: rec.Metadata}
- // TODO(hpucha): Hash out this fake stream stuff when
- // defining the RPC and the rest of the responder.
- sender.Send(interfaces.DeltaRespRec{wireRec})
+ wireRec, err := makeWireLogRec(ctx, rSt.st, rec)
+ if err != nil {
+ return err
+ }
+ sender.Send(interfaces.DeltaRespRec{*wireRec})
}
// Add a new record from the same device if not done.
@@ -387,11 +390,19 @@
// Note: initPfxs is sorted.
func filterLogRec(rec *localLogRec, initVec interfaces.GenVector, initPfxs []string) bool {
- filter := true
+ // The key starts with one of the store's reserved prefixes for managed
+ // namespaces (e.g. $row, $perms). Remove that prefix before comparing
+ // it with the SyncGroup prefixes which are defined by the application.
+ parts := util.SplitKeyParts(rec.Metadata.ObjId)
+ if len(parts) < 2 {
+ vlog.Fatalf("filterLogRec: invalid entry key %s", rec.Metadata.ObjId)
+ }
+ key := util.JoinKeyParts(parts[1:]...)
+ filter := true
var maxGen uint64
for _, p := range initPfxs {
- if strings.HasPrefix(rec.Metadata.ObjId, p) {
+ if strings.HasPrefix(key, p) {
// Do not filter. Initiator is interested in this
// prefix.
filter = false
@@ -406,12 +417,31 @@
// Filter this record if the initiator already has it.
if maxGen >= rec.Metadata.Gen {
- return true
+ filter = true
}
return filter
}
+// makeWireLogRec creates a sync log record to send on the wire from a given
+// local sync record.
+func makeWireLogRec(ctx *context.T, st store.Store, rec *localLogRec) (*interfaces.LogRec, error) {
+ // Get the object value at the required version. Note: GetAtVersion()
+ // requires a transaction to read the data, so create and abort one.
+ // TODO(hpucha): remove the fake Tx after the change in GetAtVersion().
+ tx := st.NewTransaction()
+ defer tx.Abort()
+
+ key, version := rec.Metadata.ObjId, rec.Metadata.CurVers
+ value, err := watchable.GetAtVersion(ctx, tx, []byte(key), nil, []byte(version))
+ if err != nil {
+ return nil, err
+ }
+
+ wireRec := &interfaces.LogRec{Metadata: rec.Metadata, Value: value}
+ return wireRec, nil
+}
+
// A minHeap implements heap.Interface and holds local log records.
type minHeap []*localLogRec
diff --git a/x/ref/services/syncbase/vsync/responder_test.go b/x/ref/services/syncbase/vsync/responder_test.go
index 82c0d94..b452c07 100644
--- a/x/ref/services/syncbase/vsync/responder_test.go
+++ b/x/ref/services/syncbase/vsync/responder_test.go
@@ -12,6 +12,7 @@
"time"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/v23/naming"
"v.io/v23/rpc"
"v.io/v23/security"
@@ -378,7 +379,7 @@
if opfx == "" {
continue
}
- okey := fmt.Sprintf("%s~%x", opfx, tRng.Int())
+ okey := makeRowKey(fmt.Sprintf("%s~%x", opfx, tRng.Int()))
vers := fmt.Sprintf("%x", tRng.Int())
rec := &localLogRec{
Metadata: interfaces.LogRecMetadata{Id: id, Gen: k, ObjId: okey, CurVers: vers, UpdTime: time.Now().UTC()},
@@ -387,6 +388,10 @@
if err := putLogRec(nil, tx, rec); err != nil {
t.Fatalf("putLogRec(%d:%d) failed rec %v err %v", id, k, rec, err)
}
+ value := fmt.Sprintf("value_%s", okey)
+ if err := watchable.PutAtVersion(nil, tx, []byte(okey), []byte(value), []byte(vers)); err != nil {
+ t.Fatalf("PutAtVersion(%d:%d) failed rec %v value %s: err %v", id, k, rec, value, err)
+ }
initPfxs := extractAndSortPrefixes(test.initVec)
if !filterLogRec(rec, test.initVec, initPfxs) {
diff --git a/x/ref/services/syncbase/vsync/sync.go b/x/ref/services/syncbase/vsync/sync.go
index 6d787e1..d268843 100644
--- a/x/ref/services/syncbase/vsync/sync.go
+++ b/x/ref/services/syncbase/vsync/sync.go
@@ -19,7 +19,6 @@
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
- "v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
@@ -73,7 +72,6 @@
rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
rngLock sync.Mutex
_ interfaces.SyncServerMethods = (*syncService)(nil)
- _ util.Layer = (*syncService)(nil)
)
// rand64 generates an unsigned 64-bit pseudo-random number.
@@ -105,15 +103,15 @@
}
data := &syncData{}
- if err := util.GetObject(sv.St(), s.StKey(), data); err != nil {
- if verror.ErrorID(err) != store.ErrUnknownKey.ID {
- return nil, verror.New(verror.ErrInternal, ctx, err)
+ if err := util.Get(ctx, sv.St(), s.stKey(), data); err != nil {
+ if verror.ErrorID(err) != verror.ErrNoExist.ID {
+ return nil, err
}
// First invocation of vsync.New().
// TODO(sadovsky): Maybe move guid generation and storage to serviceData.
data.Id = rand64()
- if err := util.PutObject(sv.St(), s.StKey(), data); err != nil {
- return nil, verror.New(verror.ErrInternal, ctx, err)
+ if err := util.Put(ctx, sv.St(), s.stKey(), data); err != nil {
+ return nil, err
}
}
@@ -150,13 +148,6 @@
return &syncDatabase{db: db}
}
-////////////////////////////////////////
-// util.Layer methods.
-
-func (s *syncService) Name() string {
- return "sync"
-}
-
-func (s *syncService) StKey() string {
+func (s *syncService) stKey() string {
return util.SyncPrefix
}
diff --git a/x/ref/services/syncbase/vsync/sync_state.go b/x/ref/services/syncbase/vsync/sync_state.go
index 49d9ff5..9f8bf17 100644
--- a/x/ref/services/syncbase/vsync/sync_state.go
+++ b/x/ref/services/syncbase/vsync/sync_state.go
@@ -155,10 +155,23 @@
return verror.New(verror.ErrInternal, ctx, "db state not found", name)
}
- ds.ckPtGen = ds.gen
+ // The frozen generation is the last generation number used, i.e. one
+ // below the next available one to use.
+ ds.ckPtGen = ds.gen - 1
return nil
}
+// initDbSyncStateInMem initializes the in memory sync state of the Database if needed.
+func (s *syncService) initDbSyncStateInMem(ctx *context.T, appName, dbName string) {
+ s.syncStateLock.Lock()
+ defer s.syncStateLock.Unlock()
+
+ name := appDbName(appName, dbName)
+ if s.syncState[name] == nil {
+ s.syncState[name] = &dbSyncStateInMem{gen: 1}
+ }
+}
+
// getDbSyncStateInMem returns a copy of the current in memory sync state of the Database.
func (s *syncService) getDbSyncStateInMem(ctx *context.T, appName, dbName string) (*dbSyncStateInMem, error) {
s.syncStateLock.Lock()
@@ -261,17 +274,14 @@
// putDbSyncState persists the sync state object for a given Database.
func putDbSyncState(ctx *context.T, tx store.StoreReadWriter, ds *dbSyncState) error {
_ = tx.(store.Transaction)
- if err := util.PutObject(tx, dbSyncStateKey(), ds); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Put(ctx, tx, dbSyncStateKey(), ds)
}
// getDbSyncState retrieves the sync state object for a given Database.
func getDbSyncState(ctx *context.T, st store.StoreReader) (*dbSyncState, error) {
var ds dbSyncState
- if err := util.GetObject(st, dbSyncStateKey(), &ds); err != nil {
- return nil, translateError(ctx, err, dbSyncStateKey())
+ if err := util.Get(ctx, st, dbSyncStateKey(), &ds); err != nil {
+ return nil, err
}
return &ds, nil
}
@@ -311,7 +321,9 @@
func hasLogRec(st store.StoreReader, id, gen uint64) bool {
// TODO(hpucha): optimize to avoid the unneeded fetch/decode of the data.
var rec localLogRec
- if err := util.GetObject(st, logRecKey(id, gen), &rec); err != nil {
+ // NOTE(sadovsky): This implementation doesn't explicitly handle
+ // non-ErrNoExist errors. Is that intentional?
+ if err := util.Get(nil, st, logRecKey(id, gen), &rec); err != nil {
return false
}
return true
@@ -320,17 +332,14 @@
// putLogRec stores the log record.
func putLogRec(ctx *context.T, tx store.StoreReadWriter, rec *localLogRec) error {
_ = tx.(store.Transaction)
- if err := util.PutObject(tx, logRecKey(rec.Metadata.Id, rec.Metadata.Gen), rec); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Put(ctx, tx, logRecKey(rec.Metadata.Id, rec.Metadata.Gen), rec)
}
// getLogRec retrieves the log record for a given (devid, gen).
func getLogRec(ctx *context.T, st store.StoreReader, id, gen uint64) (*localLogRec, error) {
var rec localLogRec
- if err := util.GetObject(st, logRecKey(id, gen), &rec); err != nil {
- return nil, translateError(ctx, err, logRecKey(id, gen))
+ if err := util.Get(ctx, st, logRecKey(id, gen), &rec); err != nil {
+ return nil, err
}
return &rec, nil
}
@@ -338,9 +347,5 @@
// delLogRec deletes the log record for a given (devid, gen).
func delLogRec(ctx *context.T, tx store.StoreReadWriter, id, gen uint64) error {
_ = tx.(store.Transaction)
-
- if err := tx.Delete([]byte(logRecKey(id, gen))); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Delete(ctx, tx, logRecKey(id, gen))
}
diff --git a/x/ref/services/syncbase/vsync/syncgroup.go b/x/ref/services/syncbase/vsync/syncgroup.go
index 31eaff0..4b6512c 100644
--- a/x/ref/services/syncbase/vsync/syncgroup.go
+++ b/x/ref/services/syncbase/vsync/syncgroup.go
@@ -21,6 +21,7 @@
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/server/watchable"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/naming"
@@ -301,7 +302,9 @@
func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) bool {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
var sg interfaces.SyncGroup
- if err := util.GetObject(st, sgDataKey(gid), &sg); err != nil {
+ // NOTE(sadovsky): This implementation doesn't explicitly handle
+ // non-ErrNoExist errors. Is that intentional?
+ if err := util.Get(nil, st, sgDataKey(gid), &sg); err != nil {
return false
}
return true
@@ -311,7 +314,9 @@
func hasSGNameEntry(st store.StoreReader, name string) bool {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
var gid interfaces.GroupId
- if err := util.GetObject(st, sgNameKey(name), &gid); err != nil {
+ // NOTE(sadovsky): This implementation doesn't explicitly handle
+ // non-ErrNoExist errors. Is that intentional?
+ if err := util.Get(nil, st, sgNameKey(name), &gid); err != nil {
return false
}
return true
@@ -320,28 +325,20 @@
// setSGDataEntry stores the SyncGroup data entry.
func setSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
_ = tx.(store.Transaction)
-
- if err := util.PutObject(tx, sgDataKey(gid), sg); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Put(ctx, tx, sgDataKey(gid), sg)
}
// setSGNameEntry stores the SyncGroup name entry.
func setSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string, gid interfaces.GroupId) error {
_ = tx.(store.Transaction)
-
- if err := util.PutObject(tx, sgNameKey(name), gid); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Put(ctx, tx, sgNameKey(name), gid)
}
// getSGDataEntry retrieves the SyncGroup data for a given group ID.
func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
var sg interfaces.SyncGroup
- if err := util.GetObject(st, sgDataKey(gid), &sg); err != nil {
- return nil, verror.New(verror.ErrInternal, ctx, err)
+ if err := util.Get(ctx, st, sgDataKey(gid), &sg); err != nil {
+ return nil, err
}
return &sg, nil
}
@@ -349,8 +346,8 @@
// getSGNameEntry retrieves the SyncGroup name to ID mapping.
func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
var gid interfaces.GroupId
- if err := util.GetObject(st, sgNameKey(name), &gid); err != nil {
- return gid, verror.New(verror.ErrNoExist, ctx, err)
+ if err := util.Get(ctx, st, sgNameKey(name), &gid); err != nil {
+ return gid, err
}
return gid, nil
}
@@ -358,21 +355,13 @@
// delSGDataEntry deletes the SyncGroup data entry.
func delSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId) error {
_ = tx.(store.Transaction)
-
- if err := tx.Delete([]byte(sgDataKey(gid))); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Delete(ctx, tx, sgDataKey(gid))
}
// delSGNameEntry deletes the SyncGroup name to ID mapping.
func delSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string) error {
_ = tx.(store.Transaction)
-
- if err := tx.Delete([]byte(sgNameKey(name))); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Delete(ctx, tx, sgNameKey(name))
}
////////////////////////////////////////////////////////////
@@ -381,7 +370,6 @@
// TODO(hpucha): Pass blessings along.
func (sd *syncDatabase) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
err := store.RunInTransaction(sd.db.St(), func(tx store.StoreReadWriter) error {
-
// Check permissions on Database.
if err := sd.db.CheckPermsInternal(ctx, call, tx); err != nil {
return err
@@ -420,9 +408,8 @@
return err
}
- // TODO(hpucha): Add watch notification to signal SG creation.
-
- return nil
+ tx1 := tx.(store.Transaction)
+ return watchable.AddSyncGroupOp(ctx, tx1, spec.Prefixes, false)
})
if err != nil {
@@ -516,9 +503,8 @@
return err
}
- // TODO(hpucha): Add a watch notification to signal new SG.
-
- return nil
+ tx1 := tx.(store.Transaction)
+ return watchable.AddSyncGroupOp(ctx, tx1, sg.Spec.Prefixes, false)
})
if err != nil {
diff --git a/x/ref/services/syncbase/vsync/util.go b/x/ref/services/syncbase/vsync/util.go
index 65d47eb..3622672 100644
--- a/x/ref/services/syncbase/vsync/util.go
+++ b/x/ref/services/syncbase/vsync/util.go
@@ -12,7 +12,6 @@
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
- "v.io/v23/verror"
"v.io/x/lib/vlog"
)
@@ -75,14 +74,6 @@
return db.St(), nil
}
-// translateError translates store errors.
-func translateError(ctx *context.T, err error, key string) error {
- if verror.ErrorID(err) == store.ErrUnknownKey.ID {
- return verror.New(verror.ErrNoExist, ctx, key)
- }
- return verror.New(verror.ErrInternal, ctx, key, err)
-}
-
// unixNanoToTime converts a Unix timestamp in nanoseconds to a Time object.
func unixNanoToTime(timestamp int64) time.Time {
if timestamp < 0 {
diff --git a/x/ref/services/syncbase/vsync/watcher.go b/x/ref/services/syncbase/vsync/watcher.go
index 804907c..f21dfa6 100644
--- a/x/ref/services/syncbase/vsync/watcher.go
+++ b/x/ref/services/syncbase/vsync/watcher.go
@@ -93,6 +93,9 @@
resMark = ""
}
+ // Initialize Database sync state if needed.
+ s.initDbSyncStateInMem(ctx, appName, dbName)
+
// Get a batch of watch log entries, if any, after this resume marker.
if logs, nextResmark := getWatchLogBatch(ctx, appName, dbName, st, resMark); logs != nil {
s.processWatchLogBatch(ctx, appName, dbName, st, logs, nextResmark)
@@ -327,22 +330,22 @@
timestamp := logEnt.CommitTimestamp
switch op := logEnt.Op.(type) {
- case *watchable.OpGet:
+ case watchable.OpGet:
// TODO(rdaoud): save read-set in sync.
- case *watchable.OpScan:
+ case watchable.OpScan:
// TODO(rdaoud): save scan-set in sync.
- case *watchable.OpPut:
+ case watchable.OpPut:
rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
- case *watchable.OpSyncSnapshot:
+ case watchable.OpSyncSnapshot:
rec = newLocalLogRec(ctx, tx, op.Value.Key, op.Value.Version, false, timestamp)
- case *watchable.OpDelete:
+ case watchable.OpDelete:
rec = newLocalLogRec(ctx, tx, op.Value.Key, watchable.NewVersion(), true, timestamp)
- case *watchable.OpSyncGroup:
+ case watchable.OpSyncGroup:
vlog.Errorf("watch LogEntry for SyncGroup should not be converted: %v", logEnt)
default:
@@ -377,7 +380,7 @@
// Otherwise it returns false with no other changes.
func processSyncGroupLogRecord(appName, dbName string, logEnt *watchable.LogEntry) bool {
switch op := logEnt.Op.(type) {
- case *watchable.OpSyncGroup:
+ case watchable.OpSyncGroup:
remove := op.Value.Remove
for _, prefix := range op.Value.Prefixes {
if remove {
@@ -399,11 +402,11 @@
func syncable(appdb string, logEnt *watchable.LogEntry) bool {
var key string
switch op := logEnt.Op.(type) {
- case *watchable.OpPut:
+ case watchable.OpPut:
key = string(op.Value.Key)
- case *watchable.OpDelete:
+ case watchable.OpDelete:
key = string(op.Value.Key)
- case *watchable.OpSyncSnapshot:
+ case watchable.OpSyncSnapshot:
key = string(op.Value.Key)
default:
return false
@@ -434,19 +437,15 @@
// setResMark stores the watcher resume marker for a database.
func setResMark(ctx *context.T, tx store.StoreReadWriter, resMark string) error {
_ = tx.(store.Transaction)
-
- if err := util.PutObject(tx, resMarkKey(), resMark); err != nil {
- return verror.New(verror.ErrInternal, ctx, err)
- }
- return nil
+ return util.Put(ctx, tx, resMarkKey(), resMark)
}
// getResMark retrieves the watcher resume marker for a database.
func getResMark(ctx *context.T, st store.StoreReader) (string, error) {
var resMark string
key := resMarkKey()
- if err := util.GetObject(st, key, &resMark); err != nil {
- return NoVersion, translateError(ctx, err, key)
+ if err := util.Get(ctx, st, key, &resMark); err != nil {
+ return NoVersion, err
}
return resMark, nil
}
diff --git a/x/ref/services/syncbase/vsync/watcher_test.go b/x/ref/services/syncbase/vsync/watcher_test.go
index aa7a72f..bf4fb95 100644
--- a/x/ref/services/syncbase/vsync/watcher_test.go
+++ b/x/ref/services/syncbase/vsync/watcher_test.go
@@ -116,7 +116,7 @@
for _, test := range checkSyncableTests {
log := &watchable.LogEntry{
- Op: &watchable.OpPut{
+ Op: watchable.OpPut{
watchable.PutOp{Key: []byte(makeRowKey(test.key))},
},
}
@@ -133,9 +133,9 @@
k, v := []byte(key), []byte(version)
log := &watchable.LogEntry{}
if delete {
- log.Op = &watchable.OpDelete{watchable.DeleteOp{Key: k}}
+ log.Op = watchable.OpDelete{watchable.DeleteOp{Key: k}}
} else {
- log.Op = &watchable.OpPut{watchable.PutOp{Key: k, Version: v}}
+ log.Op = watchable.OpPut{watchable.PutOp{Key: k, Version: v}}
}
return log
}
@@ -143,7 +143,7 @@
// newSGLog creates a SyncGroup watch log entry.
func newSGLog(prefixes []string, remove bool) *watchable.LogEntry {
return &watchable.LogEntry{
- Op: &watchable.OpSyncGroup{
+ Op: watchable.OpSyncGroup{
watchable.SyncGroupOp{Prefixes: prefixes, Remove: remove},
},
}