syncbase/vsync: Handle CreateSyncGroup RPC from client-syncbase.
Change-Id: I24dbc2fb4b556001124ba4e9da00ad763413d619
diff --git a/services/syncbase/server/app.go b/services/syncbase/server/app.go
index 463c498..6e1812f 100644
--- a/services/syncbase/server/app.go
+++ b/services/syncbase/server/app.go
@@ -76,6 +76,10 @@
////////////////////////////////////////
// interfaces.App methods
+func (a *app) Service() interfaces.Service {
+ return a.s
+}
+
func (a *app) NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (interfaces.Database, error) {
// TODO(sadovsky): Record storage engine config (e.g. LevelDB directory) in
// dbInfo, and add API for opening and closing storage engines.
@@ -181,7 +185,7 @@
}
// 1. Check databaseData perms.
- if err := d.CheckPermsInternal(ctx, call); err != nil {
+ if err := d.CheckPermsInternal(ctx, call, d.St()); err != nil {
return err
}
diff --git a/services/syncbase/server/interfaces/app.go b/services/syncbase/server/interfaces/app.go
index f9d8dab..8ac9990 100644
--- a/services/syncbase/server/interfaces/app.go
+++ b/services/syncbase/server/interfaces/app.go
@@ -5,6 +5,8 @@
package interfaces
import (
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security/access"
@@ -13,6 +15,9 @@
// App is an internal interface to the app layer.
// All methods return VDL-compatible errors.
type App interface {
+ // Service returns the service handle for this app.
+ Service() Service
+
// NoSQLDatabase returns the Database for the specified NoSQL database.
NoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) (Database, error)
@@ -27,4 +32,6 @@
// SetDatabasePerms sets the perms for the specified database.
SetDatabasePerms(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions, version string) error
+
+ util.Layer
}
diff --git a/services/syncbase/server/interfaces/database.go b/services/syncbase/server/interfaces/database.go
index 4f8bda8..832a940 100644
--- a/services/syncbase/server/interfaces/database.go
+++ b/services/syncbase/server/interfaces/database.go
@@ -5,6 +5,7 @@
package interfaces
import (
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
"v.io/v23/context"
"v.io/v23/rpc"
@@ -17,12 +18,17 @@
// St returns the storage engine instance for this database.
St() store.Store
+ // App returns the app handle for this database.
+ App() App
+
// CheckPermsInternal checks whether the given RPC (ctx, call) is allowed per
// the database perms.
// Designed for use from within App.DeleteNoSQLDatabase.
- CheckPermsInternal(ctx *context.T, call rpc.ServerCall) error
+ CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error
// SetPermsInternal updates the database perms.
// Designed for use from within App.SetDatabasePerms.
SetPermsInternal(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error
+
+ util.Layer
}
diff --git a/services/syncbase/server/interfaces/service.go b/services/syncbase/server/interfaces/service.go
index 4d7ff60..4505dce 100644
--- a/services/syncbase/server/interfaces/service.go
+++ b/services/syncbase/server/interfaces/service.go
@@ -16,6 +16,9 @@
// St returns the storage engine instance for this service.
St() store.Store
+ // Sync returns the sync instance for this service.
+ Sync() SyncServerMethods
+
// App returns the App with the specified name.
App(ctx *context.T, call rpc.ServerCall, appName string) (App, error)
diff --git a/services/syncbase/vsync/sync.vdl b/services/syncbase/server/interfaces/sync.vdl
similarity index 77%
rename from services/syncbase/vsync/sync.vdl
rename to services/syncbase/server/interfaces/sync.vdl
index b922376..7421b48 100644
--- a/services/syncbase/vsync/sync.vdl
+++ b/services/syncbase/server/interfaces/sync.vdl
@@ -2,7 +2,7 @@
// Use of this source code is governed by a BSD-style
// license that can be found in the LICENSE file.
-package vsync
+package interfaces
// Sync defines methods for data exchange between Syncbases.
// TODO(hpucha): Flesh this out further.
@@ -13,11 +13,10 @@
GetDeltas() error
// SyncGroup-related methods.
- // TODO(hpucha): Think of better names for these methods.
- // CreateSyncGroup is typically invoked on a "central" peer to
- // request the creation of a SyncGroup.
- CreateSyncGroup() error
+ // PublishSyncGroup is typically invoked on a "central" peer
+ // to publish the SyncGroup.
+ PublishSyncGroup(sg SyncGroup) error
// JoinSyncGroup is invoked on a SyncGroup Admin and checks if
// the requestor can join the SyncGroup.
diff --git a/services/syncbase/vsync/sync.vdl.go b/services/syncbase/server/interfaces/sync.vdl.go
similarity index 83%
rename from services/syncbase/vsync/sync.vdl.go
rename to services/syncbase/server/interfaces/sync.vdl.go
index e481e98..37c9a1e 100644
--- a/services/syncbase/vsync/sync.vdl.go
+++ b/services/syncbase/server/interfaces/sync.vdl.go
@@ -5,7 +5,7 @@
// This file was auto-generated by the vanadium vdl tool.
// Source: sync.vdl
-package vsync
+package interfaces
import (
// VDL system imports
@@ -24,9 +24,9 @@
// and all the missing log records when compared to the
// initiator's generation vector.
GetDeltas(*context.T, ...rpc.CallOpt) error
- // CreateSyncGroup is typically invoked on a "central" peer to
- // request the creation of a SyncGroup.
- CreateSyncGroup(*context.T, ...rpc.CallOpt) error
+ // PublishSyncGroup is typically invoked on a "central" peer
+ // to publish the SyncGroup.
+ PublishSyncGroup(ctx *context.T, sg SyncGroup, opts ...rpc.CallOpt) error
// JoinSyncGroup is invoked on a SyncGroup Admin and checks if
// the requestor can join the SyncGroup.
JoinSyncGroup(*context.T, ...rpc.CallOpt) error
@@ -55,8 +55,8 @@
return
}
-func (c implSyncClientStub) CreateSyncGroup(ctx *context.T, opts ...rpc.CallOpt) (err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "CreateSyncGroup", nil, nil, opts...)
+func (c implSyncClientStub) PublishSyncGroup(ctx *context.T, i0 SyncGroup, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "PublishSyncGroup", []interface{}{i0}, nil, opts...)
return
}
@@ -80,9 +80,9 @@
// and all the missing log records when compared to the
// initiator's generation vector.
GetDeltas(*context.T, rpc.ServerCall) error
- // CreateSyncGroup is typically invoked on a "central" peer to
- // request the creation of a SyncGroup.
- CreateSyncGroup(*context.T, rpc.ServerCall) error
+ // PublishSyncGroup is typically invoked on a "central" peer
+ // to publish the SyncGroup.
+ PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg SyncGroup) error
// JoinSyncGroup is invoked on a SyncGroup Admin and checks if
// the requestor can join the SyncGroup.
JoinSyncGroup(*context.T, rpc.ServerCall) error
@@ -130,8 +130,8 @@
return s.impl.GetDeltas(ctx, call)
}
-func (s implSyncServerStub) CreateSyncGroup(ctx *context.T, call rpc.ServerCall) error {
- return s.impl.CreateSyncGroup(ctx, call)
+func (s implSyncServerStub) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, i0 SyncGroup) error {
+ return s.impl.PublishSyncGroup(ctx, call, i0)
}
func (s implSyncServerStub) JoinSyncGroup(ctx *context.T, call rpc.ServerCall) error {
@@ -156,7 +156,7 @@
// descSync hides the desc to keep godoc clean.
var descSync = rpc.InterfaceDesc{
Name: "Sync",
- PkgPath: "v.io/syncbase/x/ref/services/syncbase/vsync",
+ PkgPath: "v.io/syncbase/x/ref/services/syncbase/server/interfaces",
Doc: "// Sync defines methods for data exchange between Syncbases.\n// TODO(hpucha): Flesh this out further.",
Methods: []rpc.MethodDesc{
{
@@ -164,8 +164,11 @@
Doc: "// GetDeltas returns the responder's current generation vector\n// and all the missing log records when compared to the\n// initiator's generation vector.",
},
{
- Name: "CreateSyncGroup",
- Doc: "// CreateSyncGroup is typically invoked on a \"central\" peer to\n// request the creation of a SyncGroup.",
+ Name: "PublishSyncGroup",
+ Doc: "// PublishSyncGroup is typically invoked on a \"central\" peer\n// to publish the SyncGroup.",
+ InArgs: []rpc.ArgDesc{
+ {"sg", ``}, // SyncGroup
+ },
},
{
Name: "JoinSyncGroup",
diff --git a/services/syncbase/server/interfaces/sync_types.vdl b/services/syncbase/server/interfaces/sync_types.vdl
new file mode 100644
index 0000000..ffb2b40
--- /dev/null
+++ b/services/syncbase/server/interfaces/sync_types.vdl
@@ -0,0 +1,43 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package interfaces
+
+import (
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+)
+
+const (
+ NoGroupId = GroupId(0)
+)
+
+// GroupId is a globally unique SyncGroup ID.
+type GroupId uint64
+
+// Possible states for a SyncGroup.
+type SyncGroupStatus enum {
+ // Indicates that a SyncGroup is operational, but publishing to the
+ // remote server is pending.
+ PublishPending
+
+ // Indicates that the SyncGroup is operational, but the publishing
+ // failed.
+ PublishRejected
+
+ // Indicates that the SyncGroup is operational and published.
+ Running
+}
+
+// SyncGroup contains the state of a SyncGroup object.
+type SyncGroup struct {
+ Id GroupId // globally unique identifier generated by Syncbase
+ Name string // globally unique Vanadium name chosen by app
+ SpecVersion string // version on SyncGroup spec for concurrency control
+ Spec wire.SyncGroupSpec // app-given specification
+ Creator string // Creator's Vanadium name
+ AppName string // Globally unique App name
+ DbName string // Database name within the App
+ Status SyncGroupStatus // Status of the SyncGroup
+ Joiners map[string]wire.SyncGroupMemberInfo // map of joiners to their metadata
+}
diff --git a/services/syncbase/server/interfaces/sync_types.vdl.go b/services/syncbase/server/interfaces/sync_types.vdl.go
new file mode 100644
index 0000000..3c7e2dd
--- /dev/null
+++ b/services/syncbase/server/interfaces/sync_types.vdl.go
@@ -0,0 +1,105 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+// Source: sync_types.vdl
+
+package interfaces
+
+import (
+ // VDL system imports
+ "fmt"
+ "v.io/v23/vdl"
+
+ // VDL user imports
+ "v.io/syncbase/v23/services/syncbase/nosql"
+)
+
+// GroupId is a globally unique SyncGroup ID.
+type GroupId uint64
+
+func (GroupId) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.GroupId"`
+}) {
+}
+
+// Possible states for a SyncGroup.
+type SyncGroupStatus int
+
+const (
+ SyncGroupStatusPublishPending SyncGroupStatus = iota
+ SyncGroupStatusPublishRejected
+ SyncGroupStatusRunning
+)
+
+// SyncGroupStatusAll holds all labels for SyncGroupStatus.
+var SyncGroupStatusAll = [...]SyncGroupStatus{SyncGroupStatusPublishPending, SyncGroupStatusPublishRejected, SyncGroupStatusRunning}
+
+// SyncGroupStatusFromString creates a SyncGroupStatus from a string label.
+func SyncGroupStatusFromString(label string) (x SyncGroupStatus, err error) {
+ err = x.Set(label)
+ return
+}
+
+// Set assigns label to x.
+func (x *SyncGroupStatus) Set(label string) error {
+ switch label {
+ case "PublishPending", "publishpending":
+ *x = SyncGroupStatusPublishPending
+ return nil
+ case "PublishRejected", "publishrejected":
+ *x = SyncGroupStatusPublishRejected
+ return nil
+ case "Running", "running":
+ *x = SyncGroupStatusRunning
+ return nil
+ }
+ *x = -1
+ return fmt.Errorf("unknown label %q in interfaces.SyncGroupStatus", label)
+}
+
+// String returns the string label of x.
+func (x SyncGroupStatus) String() string {
+ switch x {
+ case SyncGroupStatusPublishPending:
+ return "PublishPending"
+ case SyncGroupStatusPublishRejected:
+ return "PublishRejected"
+ case SyncGroupStatusRunning:
+ return "Running"
+ }
+ return ""
+}
+
+func (SyncGroupStatus) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.SyncGroupStatus"`
+ Enum struct{ PublishPending, PublishRejected, Running string }
+}) {
+}
+
+// SyncGroup contains the state of a SyncGroup object.
+type SyncGroup struct {
+ Id GroupId // globally unique identifier generated by Syncbase
+ Name string // globally unique Vanadium name chosen by app
+ SpecVersion string // version on SyncGroup spec for concurrency control
+ Spec nosql.SyncGroupSpec // app-given specification
+ Creator string // Creator's Vanadium name
+ AppName string // Globally unique App name
+ DbName string // Database name within the App
+ Status SyncGroupStatus // Status of the SyncGroup
+ Joiners map[string]nosql.SyncGroupMemberInfo // map of joiners to their metadata
+}
+
+func (SyncGroup) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/server/interfaces.SyncGroup"`
+}) {
+}
+
+func init() {
+ vdl.Register((*GroupId)(nil))
+ vdl.Register((*SyncGroupStatus)(nil))
+ vdl.Register((*SyncGroup)(nil))
+}
+
+const NoGroupId = GroupId(0)
diff --git a/services/syncbase/server/nosql/database.go b/services/syncbase/server/nosql/database.go
index c042cbe..40cc85b 100644
--- a/services/syncbase/server/nosql/database.go
+++ b/services/syncbase/server/nosql/database.go
@@ -119,11 +119,12 @@
return d.st
}
-func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall) error {
- if d.st == nil {
- vlog.Fatalf("database %q does not exist", d.name)
- }
- return util.Get(ctx, call, d.st, d, &databaseData{})
+func (d *database) App() interfaces.App {
+ return d.a
+}
+
+func (d *database) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
+ return util.Get(ctx, call, st, d, &databaseData{})
}
func (d *database) SetPermsInternal(ctx *context.T, call rpc.ServerCall, perms access.Permissions, version string) error {
diff --git a/services/syncbase/server/nosql/database_sgm.go b/services/syncbase/server/nosql/database_sgm.go
index c19c4f4..768b478 100644
--- a/services/syncbase/server/nosql/database_sgm.go
+++ b/services/syncbase/server/nosql/database_sgm.go
@@ -5,6 +5,8 @@
package nosql
import (
+ "v.io/syncbase/x/ref/services/syncbase/vsync"
+
wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/v23/context"
"v.io/v23/rpc"
@@ -19,7 +21,8 @@
}
func (d *database) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
- return verror.NewErrNotImplemented(ctx)
+ sd := vsync.NewSyncDatabase(d)
+ return sd.CreateSyncGroup(ctx, call, sgName, spec, myInfo)
}
func (d *database) JoinSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, myInfo wire.SyncGroupMemberInfo) (wire.SyncGroupSpec, error) {
diff --git a/services/syncbase/server/service.go b/services/syncbase/server/service.go
index c2a283d..c5f6be4 100644
--- a/services/syncbase/server/service.go
+++ b/services/syncbase/server/service.go
@@ -21,7 +21,7 @@
type service struct {
st store.Store // keeps track of which apps and databases exist, etc.
- sync vsync.SyncServerMethods
+ sync interfaces.SyncServerMethods
// Guards the fields below. Held during app Create, Delete, and
// SetPermissions.
mu sync.Mutex
@@ -104,6 +104,10 @@
return s.st
}
+func (s *service) Sync() interfaces.SyncServerMethods {
+ return s.sync
+}
+
func (s *service) App(ctx *context.T, call rpc.ServerCall, appName string) (interfaces.App, error) {
s.mu.Lock()
defer s.mu.Unlock()
diff --git a/services/syncbase/server/util/key_util.go b/services/syncbase/server/util/key_util.go
index 5c729e6..b469f9e 100644
--- a/services/syncbase/server/util/key_util.go
+++ b/services/syncbase/server/util/key_util.go
@@ -12,7 +12,7 @@
// JoinKeyParts builds keys for accessing data in the storage engine.
func JoinKeyParts(parts ...string) string {
- // TODO(sadovsky): Figure out which delimeter makes the most sense.
+ // TODO(sadovsky): Figure out which delimiter makes the most sense.
return strings.Join(parts, ":")
}
diff --git a/services/syncbase/vsync/sync.go b/services/syncbase/vsync/sync.go
index 078024d..b3f396b 100644
--- a/services/syncbase/vsync/sync.go
+++ b/services/syncbase/vsync/sync.go
@@ -27,25 +27,39 @@
// syncService contains the metadata for the sync module.
type syncService struct {
+ // TODO(hpucha): see if uniqueid is a better fit. It is 128 bits.
id int64 // globally unique id for this instance of Syncbase
sv interfaces.Service
+
// State to coordinate shutdown of spawned goroutines.
pending sync.WaitGroup
closed chan struct{}
+ // TODO(hpucha): Other global names to advertise to enable Syncbase
+ // discovery. For example, every Syncbase must be reachable under
+ // <mttable>/<syncbaseid> for p2p sync. This is the name advertised
+ // during SyncGroup join. In addition, a Syncbase might also be
+ // accepting "publish SyncGroup requests", and might use a more
+ // human-readable name such as <mttable>/<idp>/<sgserver>. All these
+ // names must be advertised in the appropriate mount tables.
+
// In-memory sync membership info aggregated across databases.
allMembers *memberView
}
-var (
- rng *rand.Rand
- _ util.Layer = (*syncService)(nil)
-)
-
-func init() {
- rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+// syncDatabase contains the metadata for syncing a database. This struct is
+// used as a receiver to hand off the app-initiated SyncGroup calls that arrive
+// against a nosql.Database to the sync module.
+type syncDatabase struct {
+ db interfaces.Database
}
+var (
+ rng = rand.New(rand.NewSource(time.Now().UTC().UnixNano()))
+ _ interfaces.SyncServerMethods = (*syncService)(nil)
+ _ util.Layer = (*syncService)(nil)
+)
+
// New creates a new sync module.
//
// Concurrency: sync initializes two goroutines at startup: a "watcher" and an
@@ -85,6 +99,10 @@
return s, nil
}
+func NewSyncDatabase(db interfaces.Database) *syncDatabase {
+ return &syncDatabase{db: db}
+}
+
////////////////////////////////////////
// Core sync method.
diff --git a/services/syncbase/vsync/syncgroup.go b/services/syncbase/vsync/syncgroup.go
index 847fcb5..f7e5a04 100644
--- a/services/syncbase/vsync/syncgroup.go
+++ b/services/syncbase/vsync/syncgroup.go
@@ -12,11 +12,15 @@
import (
"fmt"
+ "strings"
"time"
- "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/store"
+
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/verror"
@@ -47,7 +51,7 @@
// memberInfo holds the member metadata for each SyncGroup this member belongs
// to.
type memberInfo struct {
- gid2info map[GroupId]nosql.SyncGroupMemberInfo
+ gid2info map[interfaces.GroupId]wire.SyncGroupMemberInfo
}
// newSyncGroupVersion generates a random SyncGroup version ("etag").
@@ -55,20 +59,32 @@
return fmt.Sprintf("%x", rng.Int63())
}
-// addSyncGroup adds a new SyncGroup given its information.
-func addSyncGroup(ctx *context.T, tx store.StoreReadWriter, sg *SyncGroup) error {
- _ = tx.(store.Transaction)
+// newSyncGroupId generates a random SyncGroup ID.
+func newSyncGroupId() interfaces.GroupId {
+ return interfaces.GroupId(rng.Int63())
+}
+// verifySyncGroup verifies if a SyncGroup struct is well-formed.
+func verifySyncGroup(ctx *context.T, sg *interfaces.SyncGroup) error {
if sg == nil {
return verror.New(verror.ErrBadArg, ctx, "group information not specified")
}
if sg.Name == "" {
return verror.New(verror.ErrBadArg, ctx, "group name not specified")
}
- if sg.Id == NoGroupId {
- return verror.New(verror.ErrBadArg, ctx, "group ID not specified")
+ if sg.AppName == "" {
+ return verror.New(verror.ErrBadArg, ctx, "app name not specified")
}
- if sg.Version == "" {
+ if sg.DbName == "" {
+ return verror.New(verror.ErrBadArg, ctx, "db name not specified")
+ }
+ if sg.Creator == "" {
+ return verror.New(verror.ErrBadArg, ctx, "creator id not specified")
+ }
+ if sg.Id == interfaces.NoGroupId {
+ return verror.New(verror.ErrBadArg, ctx, "group id not specified")
+ }
+ if sg.SpecVersion == "" {
return verror.New(verror.ErrBadArg, ctx, "group version not specified")
}
if len(sg.Joiners) == 0 {
@@ -77,12 +93,24 @@
if len(sg.Spec.Prefixes) == 0 {
return verror.New(verror.ErrBadArg, ctx, "group has no prefixes specified")
}
+ return nil
+}
+
+// addSyncGroup adds a new SyncGroup given its information.
+func addSyncGroup(ctx *context.T, tx store.StoreReadWriter, sg *interfaces.SyncGroup) error {
+ _ = tx.(store.Transaction)
+
+ // Verify SyncGroup before storing it since it may have been received
+ // from a remote peer.
+ if err := verifySyncGroup(ctx, sg); err != nil {
+ return err
+ }
if hasSGDataEntry(tx, sg.Id) {
- return verror.New(verror.ErrBadArg, ctx, "group id already exists")
+ return verror.New(verror.ErrExist, ctx, "group id already exists")
}
if hasSGNameEntry(tx, sg.Name) {
- return verror.New(verror.ErrBadArg, ctx, "group name already exists")
+ return verror.New(verror.ErrExist, ctx, "group name already exists")
}
// Add the group name and data entries.
@@ -97,12 +125,12 @@
}
// getSyncGroupId retrieves the SyncGroup ID given its name.
-func getSyncGroupId(ctx *context.T, st store.StoreReader, name string) (GroupId, error) {
+func getSyncGroupId(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
return getSGNameEntry(ctx, st, name)
}
// getSyncGroupName retrieves the SyncGroup name given its ID.
-func getSyncGroupName(ctx *context.T, st store.StoreReader, gid GroupId) (string, error) {
+func getSyncGroupName(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (string, error) {
sg, err := getSyncGroupById(ctx, st, gid)
if err != nil {
return "", err
@@ -111,12 +139,12 @@
}
// getSyncGroupById retrieves the SyncGroup given its ID.
-func getSyncGroupById(ctx *context.T, st store.StoreReader, gid GroupId) (*SyncGroup, error) {
+func getSyncGroupById(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
return getSGDataEntry(ctx, st, gid)
}
// getSyncGroupByName retrieves the SyncGroup given its name.
-func getSyncGroupByName(ctx *context.T, st store.StoreReader, name string) (*SyncGroup, error) {
+func getSyncGroupByName(ctx *context.T, st store.StoreReader, name string) (*interfaces.SyncGroup, error) {
gid, err := getSyncGroupId(ctx, st, name)
if err != nil {
return nil, err
@@ -125,7 +153,7 @@
}
// delSyncGroupById deletes the SyncGroup given its ID.
-func delSyncGroupById(ctx *context.T, tx store.StoreReadWriter, gid GroupId) error {
+func delSyncGroupById(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId) error {
_ = tx.(store.Transaction)
sg, err := getSyncGroupById(ctx, tx, gid)
@@ -178,7 +206,7 @@
stream := sn.Scan(scanStart, scanLimit)
for stream.Advance() {
- var sg SyncGroup
+ var sg interfaces.SyncGroup
if vom.Decode(stream.Value(nil), &sg) != nil {
vlog.Errorf("invalid SyncGroup value for key %s", string(stream.Key(nil)))
continue
@@ -189,7 +217,7 @@
for member, info := range sg.Joiners {
if _, ok := newMembers[member]; !ok {
newMembers[member] = &memberInfo{
- gid2info: make(map[GroupId]nosql.SyncGroupMemberInfo),
+ gid2info: make(map[interfaces.GroupId]wire.SyncGroupMemberInfo),
}
}
newMembers[member].gid2info[sg.Id] = info
@@ -225,7 +253,7 @@
}
// sgDataKey returns the key used to access the SyncGroup data entry.
-func sgDataKey(gid GroupId) string {
+func sgDataKey(gid interfaces.GroupId) string {
return util.JoinKeyParts(util.SyncPrefix, "sg", "d", fmt.Sprintf("%d", gid))
}
@@ -235,9 +263,9 @@
}
// hasSGDataEntry returns true if the SyncGroup data entry exists.
-func hasSGDataEntry(st store.StoreReader, gid GroupId) bool {
+func hasSGDataEntry(st store.StoreReader, gid interfaces.GroupId) bool {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
- var sg SyncGroup
+ var sg interfaces.SyncGroup
if err := util.GetObject(st, sgDataKey(gid), &sg); err != nil {
return false
}
@@ -247,7 +275,7 @@
// hasSGNameEntry returns true if the SyncGroup name entry exists.
func hasSGNameEntry(st store.StoreReader, name string) bool {
// TODO(rdaoud): optimize to avoid the unneeded fetch/decode of the data.
- var gid GroupId
+ var gid interfaces.GroupId
if err := util.GetObject(st, sgNameKey(name), &gid); err != nil {
return false
}
@@ -255,7 +283,7 @@
}
// setSGDataEntry stores the SyncGroup data entry.
-func setSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid GroupId, sg *SyncGroup) error {
+func setSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId, sg *interfaces.SyncGroup) error {
_ = tx.(store.Transaction)
if err := util.PutObject(tx, sgDataKey(gid), sg); err != nil {
@@ -265,7 +293,7 @@
}
// setSGNameEntry stores the SyncGroup name entry.
-func setSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string, gid GroupId) error {
+func setSGNameEntry(ctx *context.T, tx store.StoreReadWriter, name string, gid interfaces.GroupId) error {
_ = tx.(store.Transaction)
if err := util.PutObject(tx, sgNameKey(name), gid); err != nil {
@@ -275,8 +303,8 @@
}
// getSGDataEntry retrieves the SyncGroup data for a given group ID.
-func getSGDataEntry(ctx *context.T, st store.StoreReader, gid GroupId) (*SyncGroup, error) {
- var sg SyncGroup
+func getSGDataEntry(ctx *context.T, st store.StoreReader, gid interfaces.GroupId) (*interfaces.SyncGroup, error) {
+ var sg interfaces.SyncGroup
if err := util.GetObject(st, sgDataKey(gid), &sg); err != nil {
return nil, verror.New(verror.ErrInternal, ctx, err)
}
@@ -284,8 +312,8 @@
}
// getSGNameEntry retrieves the SyncGroup name to ID mapping.
-func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (GroupId, error) {
- var gid GroupId
+func getSGNameEntry(ctx *context.T, st store.StoreReader, name string) (interfaces.GroupId, error) {
+ var gid interfaces.GroupId
if err := util.GetObject(st, sgNameKey(name), &gid); err != nil {
return gid, verror.New(verror.ErrInternal, ctx, err)
}
@@ -293,7 +321,7 @@
}
// delSGDataEntry deletes the SyncGroup data entry.
-func delSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid GroupId) error {
+func delSGDataEntry(ctx *context.T, tx store.StoreReadWriter, gid interfaces.GroupId) error {
_ = tx.(store.Transaction)
if err := tx.Delete([]byte(sgDataKey(gid))); err != nil {
@@ -315,11 +343,204 @@
////////////////////////////////////////////////////////////
// SyncGroup methods between Client and Syncbase.
+// TODO(hpucha): Pass blessings along.
+func (sd *syncDatabase) CreateSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string, spec wire.SyncGroupSpec, myInfo wire.SyncGroupMemberInfo) error {
+ err := store.RunInTransaction(sd.db.St(), func(st store.StoreReadWriter) error {
+
+ // Check permissions on Database.
+ if err := sd.db.CheckPermsInternal(ctx, call, st); err != nil {
+ return err
+ }
+
+ // TODO(hpucha): Check prefix ACLs on all SG prefixes.
+ // This may need another method on util.Database interface.
+
+ // TODO(hpucha): Do some SGACL checking. Check creator
+ // has Admin privilege.
+
+ // Get this Syncbase's id.
+ ss := sd.db.App().Service().Sync().(*syncService)
+ name := fmt.Sprintf("%x", ss.id)
+
+ // Instantiate sg. Add self as joiner.
+ sg := &interfaces.SyncGroup{
+ Id: newSyncGroupId(),
+ Name: sgName,
+ SpecVersion: newSyncGroupVersion(),
+ Spec: spec,
+ Creator: name,
+ AppName: sd.db.App().Name(),
+ DbName: sd.db.Name(),
+ Status: interfaces.SyncGroupStatusPublishPending,
+ Joiners: map[string]wire.SyncGroupMemberInfo{name: myInfo},
+ }
+
+ if err := addSyncGroup(ctx, st, sg); err != nil {
+ return err
+ }
+
+ // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG metadata.
+
+ // Take a snapshot of the data to bootstrap the SyncGroup.
+ if err := bootstrapSyncGroup(st, spec.Prefixes); err != nil {
+ return err
+ }
+
+ // TODO(hpucha): Add watch notification to signal SG creation.
+
+ return nil
+ })
+
+ if err != nil {
+ return err
+ }
+
+ // Local SG create succeeded. Publish the SG at the chosen server.
+ sd.publishSyncGroup(ctx, call, sgName)
+
+ // Publish at the chosen mount table and in the neighborhood.
+ sd.publishInMountTables(ctx, call, spec)
+
+ return nil
+}
+
+//////////////////////////////
+// Helper functions
+
+// TODO(hpucha): Call this periodically until we are able to contact the remote peer.
+func (sd *syncDatabase) publishSyncGroup(ctx *context.T, call rpc.ServerCall, sgName string) error {
+ sg, err := getSyncGroupByName(ctx, sd.db.St(), sgName)
+ if err != nil {
+ return err
+ }
+
+ if sg.Status != interfaces.SyncGroupStatusPublishPending {
+ return nil
+ }
+
+ c := interfaces.SyncClient(sgName)
+ err = c.PublishSyncGroup(ctx, *sg)
+
+ // Publish failed temporarily. Retry later.
+ // TODO(hpucha): Is there an RPC error that we can check here?
+ if err != nil && verror.ErrorID(err) != verror.ErrExist.ID {
+ return err
+ }
+
+ // Publish succeeded.
+ if err == nil {
+ // TODO(hpucha): Get SG Deltas from publisher. Obtaining the
+ // new version from the publisher prevents SG conflicts.
+ return err
+ }
+
+ // Publish rejected. Persist that to avoid retrying in the
+ // future and to remember the split universe scenario.
+ err = store.RunInTransaction(sd.db.St(), func(st store.StoreReadWriter) error {
+ // Ensure SG still exists.
+ sg, err := getSyncGroupByName(ctx, st, sgName)
+ if err != nil {
+ return err
+ }
+
+ sg.Status = interfaces.SyncGroupStatusPublishRejected
+ return setSGDataEntry(ctx, st, sg.Id, sg)
+ })
+ return err
+}
+
+// TODO(hpucha): Should this be generalized?
+func splitPrefix(name string) (string, string) {
+ parts := strings.SplitN(name, "/", 2)
+ if len(parts) == 2 {
+ return parts[0], parts[1]
+ }
+ return parts[0], ""
+}
+
+func bootstrapSyncGroup(tx store.StoreReadWriter, prefixes []string) error {
+ _ = tx.(store.Transaction)
+
+ for _, p := range prefixes {
+ table, row := splitPrefix(p)
+ it := tx.Scan(util.ScanRangeArgs(table, row, ""))
+ key, value := []byte{}, []byte{}
+ for it.Advance() {
+ key, value = it.Key(key), it.Value(value)
+
+ // TODO(hpucha): Ensure prefix ACLs show up in the scan
+ // stream.
+
+ // TODO(hpucha): Process this object.
+ }
+ if err := it.Err(); err != nil {
+ return err
+ }
+ }
+ return nil
+}
+
+func (sd *syncDatabase) publishInMountTables(ctx *context.T, call rpc.ServerCall, spec wire.SyncGroupSpec) error {
+ // TODO(hpucha): To be implemented.
+ // Pass server to Service in store.
+ // server.ServeDispatcher(*name, service, authorizer)
+ return nil
+}
+
////////////////////////////////////////////////////////////
// Methods for SyncGroup create/join between Syncbases.
-func (s *syncService) CreateSyncGroup(ctx *context.T, call rpc.ServerCall) error {
- return verror.NewErrNotImplemented(ctx)
+func (s *syncService) PublishSyncGroup(ctx *context.T, call rpc.ServerCall, sg interfaces.SyncGroup) error {
+
+ // Find the database store for this SyncGroup.
+ app, err := s.sv.App(ctx, call, sg.AppName)
+ if err != nil {
+ return err
+ }
+ db, err := app.NoSQLDatabase(ctx, call, sg.DbName)
+ if err != nil {
+ return err
+ }
+
+ err = store.RunInTransaction(db.St(), func(st store.StoreReadWriter) error {
+ localSG, err := getSyncGroupByName(ctx, st, sg.Name)
+
+ if err != nil && verror.ErrorID(err) != verror.ErrNoExist.ID {
+ return err
+ }
+
+ // SG name already claimed.
+ if err == nil && localSG.Id != sg.Id {
+ return verror.New(verror.ErrExist, ctx, sg.Name)
+ }
+
+ // TODO(hpucha): Bootstrap DAG/Genvector etc for syncing the SG
+ // metadata if needed.
+ //
+ // TODO(hpucha): Catch up on SG versions so far.
+
+ // SG already published. Update if needed.
+ if err == nil && localSG.Id == sg.Id {
+ if localSG.Status == interfaces.SyncGroupStatusPublishPending {
+ localSG.Status = interfaces.SyncGroupStatusRunning
+ return setSGDataEntry(ctx, st, localSG.Id, localSG)
+ }
+ return nil
+ }
+
+ // Publish the SyncGroup.
+
+ // TODO(hpucha): Use some ACL check to allow/deny publishing.
+ // TODO(hpucha): Ensure node is on Admin ACL.
+
+ name := fmt.Sprintf("%x", s.id)
+ // TODO(hpucha): Default priority?
+ sg.Joiners[name] = wire.SyncGroupMemberInfo{}
+ sg.Status = interfaces.SyncGroupStatusRunning
+ return addSyncGroup(ctx, st, &sg)
+ })
+
+ return err
}
func (s *syncService) JoinSyncGroup(ctx *context.T, call rpc.ServerCall) error {
diff --git a/services/syncbase/vsync/syncgroup_test.go b/services/syncbase/vsync/syncgroup_test.go
index aa49455..1b89adb 100644
--- a/services/syncbase/vsync/syncgroup_test.go
+++ b/services/syncbase/vsync/syncgroup_test.go
@@ -11,6 +11,7 @@
"testing"
"v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/store"
)
@@ -24,7 +25,7 @@
t.Errorf("num-members (%s): got %v instead of %v", which, num, numMembers)
}
- sgids := make(map[GroupId]struct{})
+ sgids := make(map[interfaces.GroupId]struct{})
for _, info := range view.members {
for gid := range info.gid2info {
sgids[gid] = struct{}{}
@@ -46,12 +47,15 @@
// Add a SyncGroup.
sgName := "foobar"
- sgId := GroupId(1234)
+ sgId := interfaces.GroupId(1234)
- sg := &SyncGroup{
- Name: sgName,
- Id: sgId,
- Version: "etag-0",
+ sg := &interfaces.SyncGroup{
+ Name: sgName,
+ Id: sgId,
+ AppName: "mockApp",
+ DbName: "mockDB",
+ Creator: "mockCreator",
+ SpecVersion: "etag-0",
Spec: nosql.SyncGroupSpec{
Prefixes: []string{"foo", "bar"},
},
@@ -135,7 +139,7 @@
tx.Abort()
sg.Name = sgName
- sg.Id = GroupId(5555)
+ sg.Id = interfaces.GroupId(5555)
tx = st.NewTransaction()
if err = addSyncGroup(nil, tx, sg); err == nil {
@@ -148,7 +152,7 @@
// Fetch a non-existing SyncGroup by ID or name should fail.
badName := "not-available"
- badId := GroupId(999)
+ badId := interfaces.GroupId(999)
if id, err := getSyncGroupId(nil, st, badName); err == nil {
t.Errorf("found non-existing SyncGroup %s: got ID %d", badName, id)
}
@@ -168,7 +172,7 @@
svc := createService(t)
st := svc.St()
- checkBadAddSyncGroup := func(t *testing.T, st store.Store, sg *SyncGroup, msg string) {
+ checkBadAddSyncGroup := func(t *testing.T, st store.Store, sg *interfaces.SyncGroup, msg string) {
tx := st.NewTransaction()
if err := addSyncGroup(nil, tx, sg); err == nil {
t.Errorf("checkBadAddSyncGroup: adding bad SyncGroup (%s) did not fail", msg)
@@ -178,16 +182,16 @@
checkBadAddSyncGroup(t, st, nil, "nil SG")
- sg := &SyncGroup{Id: 1234}
+ sg := &interfaces.SyncGroup{Id: 1234}
checkBadAddSyncGroup(t, st, sg, "SG w/o name")
- sg = &SyncGroup{Name: "foobar"}
+ sg = &interfaces.SyncGroup{Name: "foobar"}
checkBadAddSyncGroup(t, st, sg, "SG w/o Id")
sg.Id = 1234
checkBadAddSyncGroup(t, st, sg, "SG w/o Version")
- sg.Version = "v1"
+ sg.SpecVersion = "v1"
checkBadAddSyncGroup(t, st, sg, "SG w/o Joiners")
sg.Joiners = map[string]nosql.SyncGroupMemberInfo{
@@ -202,7 +206,7 @@
st := svc.St()
sgName := "foobar"
- sgId := GroupId(1234)
+ sgId := interfaces.GroupId(1234)
// Delete non-existing SyncGroups.
@@ -219,10 +223,13 @@
// Create the SyncGroup to delete later.
- sg := &SyncGroup{
- Name: sgName,
- Id: sgId,
- Version: "etag-0",
+ sg := &interfaces.SyncGroup{
+ Name: sgName,
+ Id: sgId,
+ AppName: "mockApp",
+ DbName: "mockDB",
+ Creator: "mockCreator",
+ SpecVersion: "etag-0",
Spec: nosql.SyncGroupSpec{
Prefixes: []string{"foo", "bar"},
},
@@ -284,14 +291,17 @@
st := svc.St()
sgName1, sgName2 := "foo", "bar"
- sgId1, sgId2 := GroupId(1234), GroupId(8888)
+ sgId1, sgId2 := interfaces.GroupId(1234), interfaces.GroupId(8888)
// Add two SyncGroups.
- sg1 := &SyncGroup{
- Name: sgName1,
- Id: sgId1,
- Version: "etag-1",
+ sg1 := &interfaces.SyncGroup{
+ Name: sgName1,
+ Id: sgId1,
+ AppName: "mockApp",
+ DbName: "mockDB",
+ Creator: "mockCreator",
+ SpecVersion: "etag-1",
Spec: nosql.SyncGroupSpec{
Prefixes: []string{"foo"},
},
@@ -301,10 +311,13 @@
"cloud": nosql.SyncGroupMemberInfo{SyncPriority: 1},
},
}
- sg2 := &SyncGroup{
- Name: sgName2,
- Id: sgId2,
- Version: "etag-2",
+ sg2 := &interfaces.SyncGroup{
+ Name: sgName2,
+ Id: sgId2,
+ AppName: "mockApp",
+ DbName: "mockDB",
+ Creator: "mockCreator",
+ SpecVersion: "etag-2",
Spec: nosql.SyncGroupSpec{
Prefixes: []string{"bar"},
},
@@ -346,28 +359,28 @@
expMemberInfo := map[string]*memberInfo{
"phone": &memberInfo{
- gid2info: map[GroupId]nosql.SyncGroupMemberInfo{
+ gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
sgId1: sg1.Joiners["phone"],
},
},
"tablet": &memberInfo{
- gid2info: map[GroupId]nosql.SyncGroupMemberInfo{
+ gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
sgId1: sg1.Joiners["tablet"],
sgId2: sg2.Joiners["tablet"],
},
},
"cloud": &memberInfo{
- gid2info: map[GroupId]nosql.SyncGroupMemberInfo{
+ gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
sgId1: sg1.Joiners["cloud"],
},
},
"door": &memberInfo{
- gid2info: map[GroupId]nosql.SyncGroupMemberInfo{
+ gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
sgId2: sg2.Joiners["door"],
},
},
"lamp": &memberInfo{
- gid2info: map[GroupId]nosql.SyncGroupMemberInfo{
+ gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
sgId2: sg2.Joiners["lamp"],
},
},
@@ -408,17 +421,17 @@
expMemberInfo = map[string]*memberInfo{
"tablet": &memberInfo{
- gid2info: map[GroupId]nosql.SyncGroupMemberInfo{
+ gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
sgId2: sg2.Joiners["tablet"],
},
},
"door": &memberInfo{
- gid2info: map[GroupId]nosql.SyncGroupMemberInfo{
+ gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
sgId2: sg2.Joiners["door"],
},
},
"lamp": &memberInfo{
- gid2info: map[GroupId]nosql.SyncGroupMemberInfo{
+ gid2info: map[interfaces.GroupId]nosql.SyncGroupMemberInfo{
sgId2: sg2.Joiners["lamp"],
},
},
diff --git a/services/syncbase/vsync/types.vdl b/services/syncbase/vsync/types.vdl
index 1bde00c..2434200 100644
--- a/services/syncbase/vsync/types.vdl
+++ b/services/syncbase/vsync/types.vdl
@@ -4,27 +4,7 @@
package vsync
-import (
- "v.io/syncbase/v23/services/syncbase/nosql"
-)
-
-const (
- NoGroupId = GroupId(0)
-)
-
// syncData represents the persistent state of the sync module.
type syncData struct {
Id int64
}
-
-// GroupId is a globally unique SyncGroup ID.
-type GroupId uint64
-
-// SyncGroup contains the state of a SyncGroup object.
-type SyncGroup struct {
- Id GroupId // globally unique identifier generated by Syncbase
- Name string // globally unique Vanadium name chosen by app
- Version string // "etag" for concurrency control
- Spec nosql.SyncGroupSpec // app-given specification
- Joiners map[string]nosql.SyncGroupMemberInfo // map of joiners to their metadata
-}
diff --git a/services/syncbase/vsync/types.vdl.go b/services/syncbase/vsync/types.vdl.go
index 9a9737c..cc15abe 100644
--- a/services/syncbase/vsync/types.vdl.go
+++ b/services/syncbase/vsync/types.vdl.go
@@ -10,9 +10,6 @@
import (
// VDL system imports
"v.io/v23/vdl"
-
- // VDL user imports
- "v.io/syncbase/v23/services/syncbase/nosql"
)
// syncData represents the persistent state of the sync module.
@@ -25,32 +22,6 @@
}) {
}
-// GroupId is a globally unique SyncGroup ID.
-type GroupId uint64
-
-func (GroupId) __VDLReflect(struct {
- Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/vsync.GroupId"`
-}) {
-}
-
-// SyncGroup contains the state of a SyncGroup object.
-type SyncGroup struct {
- Id GroupId // globally unique identifier generated by Syncbase
- Name string // globally unique Vanadium name chosen by app
- Version string // "etag" for concurrency control
- Spec nosql.SyncGroupSpec // app-given specification
- Joiners map[string]nosql.SyncGroupMemberInfo // map of joiners to their metadata
-}
-
-func (SyncGroup) __VDLReflect(struct {
- Name string `vdl:"v.io/syncbase/x/ref/services/syncbase/vsync.SyncGroup"`
-}) {
-}
-
func init() {
vdl.Register((*syncData)(nil))
- vdl.Register((*GroupId)(nil))
- vdl.Register((*SyncGroup)(nil))
}
-
-const NoGroupId = GroupId(0)
diff --git a/services/syncbase/vsync/util_test.go b/services/syncbase/vsync/util_test.go
index 7db1552..986132f 100644
--- a/services/syncbase/vsync/util_test.go
+++ b/services/syncbase/vsync/util_test.go
@@ -29,6 +29,10 @@
return s.st
}
+func (s *mockService) Sync() interfaces.SyncServerMethods {
+ return s.sync
+}
+
func (s *mockService) App(ctx *context.T, call rpc.ServerCall, appName string) (interfaces.App, error) {
return &mockApp{st: s.st}, nil
}
@@ -62,6 +66,18 @@
return verror.NewErrNotImplemented(ctx)
}
+func (a *mockApp) Service() interfaces.Service {
+ return nil
+}
+
+func (a *mockApp) Name() string {
+ return "mockapp"
+}
+
+func (a *mockApp) StKey() string {
+ return ""
+}
+
// mockDatabase emulates a Syncbase Database. It is used to test sync functionality.
type mockDatabase struct {
st store.Store
@@ -71,7 +87,7 @@
return d.st
}
-func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall) error {
+func (d *mockDatabase) CheckPermsInternal(ctx *context.T, call rpc.ServerCall, st store.StoreReadWriter) error {
return verror.NewErrNotImplemented(ctx)
}
@@ -79,6 +95,18 @@
return verror.NewErrNotImplemented(ctx)
}
+func (d *mockDatabase) Name() string {
+ return "mockdb"
+}
+
+func (d *mockDatabase) StKey() string {
+ return ""
+}
+
+func (d *mockDatabase) App() interfaces.App {
+ return nil
+}
+
// createService creates a mock Syncbase service used for testing sync functionality.
// At present it relies on the underlying Memstore engine.
func createService(t *testing.T) *mockService {