Add schema to syncbase
1) Add client APIs related to managing db schema
- adds schema arg to NoSQLDatabase() bind
- adds db.UpgradeIfOutdated() method
- adds db.GetSchemaManager() which returns an object with {Get,Set}SchemaMetadata
- adds Schema{metadata, upgrader} struct and SchemaUpgrader interface
2) Add vdl changes
- {Get,Set}SchemaMetadata methods
- optional SchemaMetadata arg to Database.Create
MultiPart: 1/2
Change-Id: Ia920bbc55c40145cbacf8245c4d1682dcb02496d
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index 9bdf954..b56275b 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -19,7 +19,7 @@
// Create creates this Database.
// If perms is nil, we inherit (copy) the App perms.
// Create requires the caller to have Write permission at the App.
- Create(perms access.Permissions) error {access.Write}
+ Create(perms access.Permissions, metadata ?SchemaMetadata) error {access.Write}
// Delete deletes this Database.
Delete() error {access.Write}
@@ -34,6 +34,7 @@
// Database handle bound to this batch. If this Database is already bound to a
// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics
// are documented in model.go.
+ // TODO(sadovsky): make BatchOptions optional
BeginBatch(bo BatchOptions) (string | error) {access.Read}
// Commit persists the pending changes to the database.
@@ -65,6 +66,10 @@
// BlobManager implements the API for managing blobs attached to rows in
// a Database.
BlobManager
+
+ // SchemaManager implements the API for managing schema metadata attached
+ // to a Database.
+ SchemaManager
}
// Table represents a collection of Rows.
@@ -207,6 +212,20 @@
// aggressively", "sync once per day".
}
+// SchemaManager implements the API for managing schema metadata attached
+// to a Database.
+type SchemaManager interface {
+ // GetSchemaMetadata retrieves schema metadata for this database.
+ //
+ // Requires: Client must have at least Read access on the Database.
+ GetSchemaMetadata() (SchemaMetadata | error) {access.Read}
+
+ // SetSchemaMetadata stores schema metadata for this database.
+ //
+ // Requires: Client must have at least Write access on the Database.
+ SetSchemaMetadata(metadata SchemaMetadata) error {access.Write}
+}
+
// BlobManager is the interface for blob operations.
type BlobManager interface {
// API for resumable blob creation (append-only). After commit, a blob
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 56c50fa..447bd7d 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -1421,6 +1421,142 @@
return s.s.Send(item)
}
+// SchemaManagerClientMethods is the client interface
+// containing SchemaManager methods.
+//
+// SchemaManager implements the API for managing schema metadata attached
+// to a Database.
+type SchemaManagerClientMethods interface {
+ // GetSchemaMetadata retrieves schema metadata for this database.
+ //
+ // Requires: Client must have at least Read access on the Database.
+ GetSchemaMetadata(*context.T, ...rpc.CallOpt) (SchemaMetadata, error)
+ // SetSchemaMetadata stores schema metadata for this database.
+ //
+ // Requires: Client must have at least Write access on the Database.
+ SetSchemaMetadata(ctx *context.T, metadata SchemaMetadata, opts ...rpc.CallOpt) error
+}
+
+// SchemaManagerClientStub adds universal methods to SchemaManagerClientMethods.
+type SchemaManagerClientStub interface {
+ SchemaManagerClientMethods
+ rpc.UniversalServiceMethods
+}
+
+// SchemaManagerClient returns a client stub for SchemaManager.
+func SchemaManagerClient(name string) SchemaManagerClientStub {
+ return implSchemaManagerClientStub{name}
+}
+
+type implSchemaManagerClientStub struct {
+ name string
+}
+
+func (c implSchemaManagerClientStub) GetSchemaMetadata(ctx *context.T, opts ...rpc.CallOpt) (o0 SchemaMetadata, err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "GetSchemaMetadata", nil, []interface{}{&o0}, opts...)
+ return
+}
+
+func (c implSchemaManagerClientStub) SetSchemaMetadata(ctx *context.T, i0 SchemaMetadata, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "SetSchemaMetadata", []interface{}{i0}, nil, opts...)
+ return
+}
+
+// SchemaManagerServerMethods is the interface a server writer
+// implements for SchemaManager.
+//
+// SchemaManager implements the API for managing schema metadata attached
+// to a Database.
+type SchemaManagerServerMethods interface {
+ // GetSchemaMetadata retrieves schema metadata for this database.
+ //
+ // Requires: Client must have at least Read access on the Database.
+ GetSchemaMetadata(*context.T, rpc.ServerCall) (SchemaMetadata, error)
+ // SetSchemaMetadata stores schema metadata for this database.
+ //
+ // Requires: Client must have at least Write access on the Database.
+ SetSchemaMetadata(ctx *context.T, call rpc.ServerCall, metadata SchemaMetadata) error
+}
+
+// SchemaManagerServerStubMethods is the server interface containing
+// SchemaManager methods, as expected by rpc.Server.
+// There is no difference between this interface and SchemaManagerServerMethods
+// since there are no streaming methods.
+type SchemaManagerServerStubMethods SchemaManagerServerMethods
+
+// SchemaManagerServerStub adds universal methods to SchemaManagerServerStubMethods.
+type SchemaManagerServerStub interface {
+ SchemaManagerServerStubMethods
+ // Describe the SchemaManager interfaces.
+ Describe__() []rpc.InterfaceDesc
+}
+
+// SchemaManagerServer returns a server stub for SchemaManager.
+// It converts an implementation of SchemaManagerServerMethods into
+// an object that may be used by rpc.Server.
+func SchemaManagerServer(impl SchemaManagerServerMethods) SchemaManagerServerStub {
+ stub := implSchemaManagerServerStub{
+ impl: impl,
+ }
+ // Initialize GlobState; always check the stub itself first, to handle the
+ // case where the user has the Glob method defined in their VDL source.
+ if gs := rpc.NewGlobState(stub); gs != nil {
+ stub.gs = gs
+ } else if gs := rpc.NewGlobState(impl); gs != nil {
+ stub.gs = gs
+ }
+ return stub
+}
+
+type implSchemaManagerServerStub struct {
+ impl SchemaManagerServerMethods
+ gs *rpc.GlobState
+}
+
+func (s implSchemaManagerServerStub) GetSchemaMetadata(ctx *context.T, call rpc.ServerCall) (SchemaMetadata, error) {
+ return s.impl.GetSchemaMetadata(ctx, call)
+}
+
+func (s implSchemaManagerServerStub) SetSchemaMetadata(ctx *context.T, call rpc.ServerCall, i0 SchemaMetadata) error {
+ return s.impl.SetSchemaMetadata(ctx, call, i0)
+}
+
+func (s implSchemaManagerServerStub) Globber() *rpc.GlobState {
+ return s.gs
+}
+
+func (s implSchemaManagerServerStub) Describe__() []rpc.InterfaceDesc {
+ return []rpc.InterfaceDesc{SchemaManagerDesc}
+}
+
+// SchemaManagerDesc describes the SchemaManager interface.
+var SchemaManagerDesc rpc.InterfaceDesc = descSchemaManager
+
+// descSchemaManager hides the desc to keep godoc clean.
+var descSchemaManager = rpc.InterfaceDesc{
+ Name: "SchemaManager",
+ PkgPath: "v.io/syncbase/v23/services/syncbase/nosql",
+ Doc: "// SchemaManager implements the API for managing schema metadata attached\n// to a Database.",
+ Methods: []rpc.MethodDesc{
+ {
+ Name: "GetSchemaMetadata",
+ Doc: "// GetSchemaMetadata retrieves schema metadata for this database.\n//\n// Requires: Client must have at least Read access on the Database.",
+ OutArgs: []rpc.ArgDesc{
+ {"", ``}, // SchemaMetadata
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+ },
+ {
+ Name: "SetSchemaMetadata",
+ Doc: "// SetSchemaMetadata stores schema metadata for this database.\n//\n// Requires: Client must have at least Write access on the Database.",
+ InArgs: []rpc.ArgDesc{
+ {"metadata", ``}, // SchemaMetadata
+ },
+ Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
+ },
+ },
+}
+
// DatabaseClientMethods is the client interface
// containing Database methods.
//
@@ -1505,10 +1641,13 @@
SyncGroupManagerClientMethods
// BlobManager is the interface for blob operations.
BlobManagerClientMethods
+ // SchemaManager implements the API for managing schema metadata attached
+ // to a Database.
+ SchemaManagerClientMethods
// Create creates this Database.
// If perms is nil, we inherit (copy) the App perms.
// Create requires the caller to have Write permission at the App.
- Create(ctx *context.T, perms access.Permissions, opts ...rpc.CallOpt) error
+ Create(ctx *context.T, perms access.Permissions, metadata *SchemaMetadata, opts ...rpc.CallOpt) error
// Delete deletes this Database.
Delete(*context.T, ...rpc.CallOpt) error
// Exists returns true only if this Database exists. Insufficient permissions
@@ -1520,6 +1659,7 @@
// Database handle bound to this batch. If this Database is already bound to a
// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics
// are documented in model.go.
+ // TODO(sadovsky): make BatchOptions optional
BeginBatch(ctx *context.T, bo BatchOptions, opts ...rpc.CallOpt) (string, error)
// Commit persists the pending changes to the database.
// If this Database is not bound to a batch, Commit() will fail with
@@ -1544,7 +1684,7 @@
// DatabaseClient returns a client stub for Database.
func DatabaseClient(name string) DatabaseClientStub {
- return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name), BlobManagerClient(name)}
+ return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name), BlobManagerClient(name), SchemaManagerClient(name)}
}
type implDatabaseClientStub struct {
@@ -1554,10 +1694,11 @@
DatabaseWatcherClientStub
SyncGroupManagerClientStub
BlobManagerClientStub
+ SchemaManagerClientStub
}
-func (c implDatabaseClientStub) Create(ctx *context.T, i0 access.Permissions, opts ...rpc.CallOpt) (err error) {
- err = v23.GetClient(ctx).Call(ctx, c.name, "Create", []interface{}{i0}, nil, opts...)
+func (c implDatabaseClientStub) Create(ctx *context.T, i0 access.Permissions, i1 *SchemaMetadata, opts ...rpc.CallOpt) (err error) {
+ err = v23.GetClient(ctx).Call(ctx, c.name, "Create", []interface{}{i0, i1}, nil, opts...)
return
}
@@ -1747,10 +1888,13 @@
SyncGroupManagerServerMethods
// BlobManager is the interface for blob operations.
BlobManagerServerMethods
+ // SchemaManager implements the API for managing schema metadata attached
+ // to a Database.
+ SchemaManagerServerMethods
// Create creates this Database.
// If perms is nil, we inherit (copy) the App perms.
// Create requires the caller to have Write permission at the App.
- Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error
+ Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions, metadata *SchemaMetadata) error
// Delete deletes this Database.
Delete(*context.T, rpc.ServerCall) error
// Exists returns true only if this Database exists. Insufficient permissions
@@ -1762,6 +1906,7 @@
// Database handle bound to this batch. If this Database is already bound to a
// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics
// are documented in model.go.
+ // TODO(sadovsky): make BatchOptions optional
BeginBatch(ctx *context.T, call rpc.ServerCall, bo BatchOptions) (string, error)
// Commit persists the pending changes to the database.
// If this Database is not bound to a batch, Commit() will fail with
@@ -1858,10 +2003,13 @@
SyncGroupManagerServerStubMethods
// BlobManager is the interface for blob operations.
BlobManagerServerStubMethods
+ // SchemaManager implements the API for managing schema metadata attached
+ // to a Database.
+ SchemaManagerServerStubMethods
// Create creates this Database.
// If perms is nil, we inherit (copy) the App perms.
// Create requires the caller to have Write permission at the App.
- Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error
+ Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions, metadata *SchemaMetadata) error
// Delete deletes this Database.
Delete(*context.T, rpc.ServerCall) error
// Exists returns true only if this Database exists. Insufficient permissions
@@ -1873,6 +2021,7 @@
// Database handle bound to this batch. If this Database is already bound to a
// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics
// are documented in model.go.
+ // TODO(sadovsky): make BatchOptions optional
BeginBatch(ctx *context.T, call rpc.ServerCall, bo BatchOptions) (string, error)
// Commit persists the pending changes to the database.
// If this Database is not bound to a batch, Commit() will fail with
@@ -1906,6 +2055,7 @@
DatabaseWatcherServerStub: DatabaseWatcherServer(impl),
SyncGroupManagerServerStub: SyncGroupManagerServer(impl),
BlobManagerServerStub: BlobManagerServer(impl),
+ SchemaManagerServerStub: SchemaManagerServer(impl),
}
// Initialize GlobState; always check the stub itself first, to handle the
// case where the user has the Glob method defined in their VDL source.
@@ -1923,11 +2073,12 @@
DatabaseWatcherServerStub
SyncGroupManagerServerStub
BlobManagerServerStub
+ SchemaManagerServerStub
gs *rpc.GlobState
}
-func (s implDatabaseServerStub) Create(ctx *context.T, call rpc.ServerCall, i0 access.Permissions) error {
- return s.impl.Create(ctx, call, i0)
+func (s implDatabaseServerStub) Create(ctx *context.T, call rpc.ServerCall, i0 access.Permissions, i1 *SchemaMetadata) error {
+ return s.impl.Create(ctx, call, i0, i1)
}
func (s implDatabaseServerStub) Delete(ctx *context.T, call rpc.ServerCall) error {
@@ -1959,7 +2110,7 @@
}
func (s implDatabaseServerStub) Describe__() []rpc.InterfaceDesc {
- return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc}
+ return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc, SchemaManagerDesc}
}
// DatabaseDesc describes the Database interface.
@@ -1975,13 +2126,15 @@
{"DatabaseWatcher", "v.io/syncbase/v23/services/syncbase/nosql", "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."},
{"SyncGroupManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SyncGroupManager is the interface for SyncGroup operations.\n// TODO(hpucha): Add blessings to create/join and add a refresh method."},
{"BlobManager", "v.io/syncbase/v23/services/syncbase/nosql", "// BlobManager is the interface for blob operations."},
+ {"SchemaManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SchemaManager implements the API for managing schema metadata attached\n// to a Database."},
},
Methods: []rpc.MethodDesc{
{
Name: "Create",
Doc: "// Create creates this Database.\n// If perms is nil, we inherit (copy) the App perms.\n// Create requires the caller to have Write permission at the App.",
InArgs: []rpc.ArgDesc{
- {"perms", ``}, // access.Permissions
+ {"perms", ``}, // access.Permissions
+ {"metadata", ``}, // *SchemaMetadata
},
Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
},
@@ -2000,7 +2153,7 @@
},
{
Name: "BeginBatch",
- Doc: "// BeginBatch creates a new batch. It returns an App-relative name for a\n// Database handle bound to this batch. If this Database is already bound to a\n// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics\n// are documented in model.go.",
+ Doc: "// BeginBatch creates a new batch. It returns an App-relative name for a\n// Database handle bound to this batch. If this Database is already bound to a\n// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics\n// are documented in model.go.\n// TODO(sadovsky): make BatchOptions optional",
InArgs: []rpc.ArgDesc{
{"bo", ``}, // BatchOptions
},
diff --git a/v23/services/syncbase/nosql/types.vdl b/v23/services/syncbase/nosql/types.vdl
index 1d03be6..e6c0415 100644
--- a/v23/services/syncbase/nosql/types.vdl
+++ b/v23/services/syncbase/nosql/types.vdl
@@ -74,6 +74,81 @@
SyncPriority byte
}
+// ResolverType defines the possible conflict resolution policies.
+// A Conflict is defined as presence of two independent sets of updates
+// originating from the same version of an object. Syncbase
+// uses version vectors to determine sequence of changes to a given row. Hence
+// if device A updates a row with key "foo" from version V3 to V4, then syncs
+// with device B which further updates the same row from version V4 to V5 and
+// then V5 is synced back to device A, device A will see V5 as a forward
+// progression of "foo" and not a conflict with V3 of "foo". But in the
+// meantime if device A had already updated "foo" again from version V4 to
+// version V6 then there is a conflict between V5 and V6 with V4 being the
+// common ancestor.
+type ResolverType enum {
+ // LastWins is a policy which resolves a conflict between two writes by
+ // choosing the version which has the greatest timestamp.
+ // For example, if device A created V6 at time T1 and device B created V5 at
+ // time T2 where timestamp T2 > T1, then V5 will be accepted as the
+ // resolution of the conflict.
+ // Syncbase maintains an internal clock for creating write timestamps. This
+ // clock is kept up to date via regular synchronization with NTP and with
+ // other syncbases. As long as the syncbase has access to an NTP server
+ // directly or indirectly via another syncbase, the internal clock will
+ // track NTP time and correct any skews suffered by the local system clock.
+ LastWins
+
+ // AppResolves is a policy which allows an App to handle resolution of a
+ // conflict on its own. In order to receive the conflicts, the app needs to
+ // register a conflict resolution stream using
+ // Database.StartConflictResolver().
+ AppResolves
+
+ // Defer is a policy that allows an instance of an App to outsource its
+ // conflict resolution to some other instance, typically a more capable
+ // instance (in terms of resources, knowledge or permissions) such as a
+ // cloud or admin instance.
+ Defer
+}
+
+// SchemaMetadata maintains metadata related to the schema of a given database.
+// There is one SchemaMetadata per database.
+type SchemaMetadata struct {
+ // Non negative Schema version number. Should be increased with every schema change
+ // (e.g. adding fields to structs) that cannot be handled by previous
+ // versions of the app.
+ Version int64
+ Policy CrPolicy
+}
+
+// For a given row with a conflict, all rules are matched against the row.
+// If no rules match the row, we default to "LastWins". If multiple
+// rules match the row, ties are broken as follows:
+// 1. If one match has a longer prefix than the other, take that one.
+// 2. Else, if only one match specifies a type, take that one.
+// 3. Else, the two matches are identical; take the last one in the Rules array.
+type CrPolicy struct {
+ Rules []CrRule
+}
+
+// CrRule provides a filter and the type of resolution to perform for a row
+// under conflict that passes the filter.
+type CrRule struct {
+ // TableName is the name of the table that this rule applies to.
+ TableName string
+
+ // KeyPrefix represents the set of keys within the given table for which
+ // this policy applies. TableName must not be empty if this field is set.
+ KeyPrefix string
+
+ // Type includes the full package path for the value type for which this
+ // policy applies.
+ Type string
+
+ // Policy for resolving conflict.
+ Resolver ResolverType
+}
+
// BlobRef is a reference to a blob.
type BlobRef string
diff --git a/v23/services/syncbase/nosql/types.vdl.go b/v23/services/syncbase/nosql/types.vdl.go
index 040e253..0f05cca 100644
--- a/v23/services/syncbase/nosql/types.vdl.go
+++ b/v23/services/syncbase/nosql/types.vdl.go
@@ -102,6 +102,120 @@
}) {
}
+// ResolverType defines the possible conflict resolution policies.
+// A Conflict is defined as presence of two independent sets of updates
+// originating from the same version of an object. Syncbase
+// uses version vectors to determine sequence of changes to a given row. Hence
+// if device A updates a row with key "foo" from version V3 to V4, then syncs
+// with device B which further updates the same row from version V4 to V5 and
+// then V5 is synced back to device A, device A will see V5 as a forward
+// progression of "foo" and not a conflict with V3 of "foo". But in the
+// meantime if device A had already updated "foo" again from version V4 to
+// version V6 then there is a conflict between V5 and V6 with V4 being the
+// common ancestor.
+type ResolverType int
+
+const (
+ ResolverTypeLastWins ResolverType = iota
+ ResolverTypeAppResolves
+ ResolverTypeDefer
+)
+
+// ResolverTypeAll holds all labels for ResolverType.
+var ResolverTypeAll = [...]ResolverType{ResolverTypeLastWins, ResolverTypeAppResolves, ResolverTypeDefer}
+
+// ResolverTypeFromString creates a ResolverType from a string label.
+func ResolverTypeFromString(label string) (x ResolverType, err error) {
+ err = x.Set(label)
+ return
+}
+
+// Set assigns label to x.
+func (x *ResolverType) Set(label string) error {
+ switch label {
+ case "LastWins", "lastwins":
+ *x = ResolverTypeLastWins
+ return nil
+ case "AppResolves", "appresolves":
+ *x = ResolverTypeAppResolves
+ return nil
+ case "Defer", "defer":
+ *x = ResolverTypeDefer
+ return nil
+ }
+ *x = -1
+ return fmt.Errorf("unknown label %q in nosql.ResolverType", label)
+}
+
+// String returns the string label of x.
+func (x ResolverType) String() string {
+ switch x {
+ case ResolverTypeLastWins:
+ return "LastWins"
+ case ResolverTypeAppResolves:
+ return "AppResolves"
+ case ResolverTypeDefer:
+ return "Defer"
+ }
+ return ""
+}
+
+func (ResolverType) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ResolverType"`
+ Enum struct{ LastWins, AppResolves, Defer string }
+}) {
+}
+
+// SchemaMetadata maintains metadata related to the schema of a given database.
+// There is one SchemaMetadata per database.
+type SchemaMetadata struct {
+ // Non negative Schema version number. Should be increased with every schema change
+ // (e.g. adding fields to structs) that cannot be handled by previous
+ // versions of the app.
+ Version int64
+ Policy CrPolicy
+}
+
+func (SchemaMetadata) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.SchemaMetadata"`
+}) {
+}
+
+// For a given row with a conflict, all rules are matched against the row.
+// If no rules match the row, we default to "LastWins". If multiple
+// rules match the row, ties are broken as follows:
+// 1. If one match has a longer prefix than the other, take that one.
+// 2. Else, if only one match specifies a type, take that one.
+// 3. Else, the two matches are identical; take the last one in the Rules array.
+type CrPolicy struct {
+ Rules []CrRule
+}
+
+func (CrPolicy) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.CrPolicy"`
+}) {
+}
+
+// CrRule provides a filter and the type of resolution to perform for a row
+// under conflict that passes the filter.
+type CrRule struct {
+ // TableName is the name of the table that this rule applies to.
+ TableName string
+ // KeyPrefix represents the set of keys within the given table for which
+ // this policy applies. TableName must not be empty if this field is set.
+ KeyPrefix string
+ // Type includes the full package path for the value type for which this
+ // policy applies.
+ Type string
+ // Policy for resolving conflict.
+ Resolver ResolverType
+}
+
+func (CrRule) __VDLReflect(struct {
+ Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.CrRule"`
+}) {
+}
+
// BlobRef is a reference to a blob.
type BlobRef string
@@ -305,6 +419,10 @@
vdl.Register((*KeyValue)(nil))
vdl.Register((*SyncGroupSpec)(nil))
vdl.Register((*SyncGroupMemberInfo)(nil))
+ vdl.Register((*ResolverType)(nil))
+ vdl.Register((*SchemaMetadata)(nil))
+ vdl.Register((*CrPolicy)(nil))
+ vdl.Register((*CrRule)(nil))
vdl.Register((*BlobRef)(nil))
vdl.Register((*FetchState)(nil))
vdl.Register((*FetchStatus)(nil))
diff --git a/v23/syncbase/app.go b/v23/syncbase/app.go
index e9ec228..85142db 100644
--- a/v23/syncbase/app.go
+++ b/v23/syncbase/app.go
@@ -48,8 +48,8 @@
}
// NoSQLDatabase implements App.NoSQLDatabase.
-func (a *app) NoSQLDatabase(relativeName string) nosql.Database {
- return nosql.NewDatabase(a.fullName, relativeName)
+func (a *app) NoSQLDatabase(relativeName string, schema *nosql.Schema) nosql.Database {
+ return nosql.NewDatabase(a.fullName, relativeName, schema)
}
// ListDatabases implements App.ListDatabases.
diff --git a/v23/syncbase/client_v23_test.go b/v23/syncbase/client_v23_test.go
index eaadc2c..ef71c65 100644
--- a/v23/syncbase/client_v23_test.go
+++ b/v23/syncbase/client_v23_test.go
@@ -52,7 +52,7 @@
if err := a.Create(ctx, nil); err != nil {
return fmt.Errorf("unable to create an app: %v", err)
}
- d := a.NoSQLDatabase("d")
+ d := a.NoSQLDatabase("d", nil)
if err := d.Create(ctx, nil); err != nil {
return fmt.Errorf("unable to create a database: %v", err)
}
@@ -123,7 +123,7 @@
if err := a.Create(ctx, nil); err != nil {
return fmt.Errorf("a.Create() failed: %v", err)
}
- for _, d := range []nosql.Database{a.NoSQLDatabase("d1"), a.NoSQLDatabase("d2")} {
+ for _, d := range []nosql.Database{a.NoSQLDatabase("d1", nil), a.NoSQLDatabase("d2", nil)} {
if err := d.Create(ctx, nil); err != nil {
return fmt.Errorf("d.Create() failed: %v", err)
}
@@ -166,7 +166,7 @@
return fmt.Errorf("Databases do not match: got %v, want %v", got, want)
}
for _, dName := range want {
- d := a.NoSQLDatabase(dName)
+ d := a.NoSQLDatabase(dName, nil)
if got, err = d.ListTables(ctx); err != nil {
return fmt.Errorf("d.ListTables() failed: %v", err)
}
diff --git a/v23/syncbase/model.go b/v23/syncbase/model.go
index 184bd8c..dff6f76 100644
--- a/v23/syncbase/model.go
+++ b/v23/syncbase/model.go
@@ -55,7 +55,9 @@
// NoSQLDatabase returns the nosql.Database with the given name.
// relativeName must not contain slashes.
- NoSQLDatabase(relativeName string) nosql.Database
+ // schema can be nil only if schema was never set for the database in the
+ // first place. See nosql.Schema for more details.
+ NoSQLDatabase(relativeName string, schema *nosql.Schema) nosql.Database
// ListDatabases returns a list of all Database names.
// TODO(kash): Include the database type (NoSQL vs. SQL).
diff --git a/v23/syncbase/nosql/batch_test.go b/v23/syncbase/nosql/batch_test.go
index be137ac..bbee3af 100644
--- a/v23/syncbase/nosql/batch_test.go
+++ b/v23/syncbase/nosql/batch_test.go
@@ -510,7 +510,7 @@
// ErrBoundToBatch specifically since in practice bc.Create() will return
// either ErrExist or "invalid name" depending on whether the database and
// batch exist.
- if err := bc.Create(ctx, nil); err == nil {
+ if err := bc.Create(ctx, nil, nil); err == nil {
t.Fatalf("bc.Create() should have failed: %v", err)
}
if err := bc.Delete(ctx); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
@@ -545,7 +545,7 @@
ctx, sName, cleanup := tu.SetupOrDie(nil)
defer cleanup()
a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
- d := a.NoSQLDatabase("d")
+ d := a.NoSQLDatabase("d", nil)
if _, err := d.BeginBatch(ctx, wire.BatchOptions{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
t.Fatalf("d.BeginBatch() should have failed: %v", err)
}
diff --git a/v23/syncbase/nosql/client_test.go b/v23/syncbase/nosql/client_test.go
index 731b689..bca7a84 100644
--- a/v23/syncbase/nosql/client_test.go
+++ b/v23/syncbase/nosql/client_test.go
@@ -23,7 +23,7 @@
// Tests various Name, FullName, and Key methods.
func TestNameAndKey(t *testing.T) {
- d := syncbase.NewService("s").App("a").NoSQLDatabase("d")
+ d := syncbase.NewService("s").App("a").NoSQLDatabase("d", nil)
tb := d.Table("tb")
r := tb.Row("r")
diff --git a/v23/syncbase/nosql/database.go b/v23/syncbase/nosql/database.go
index 030a44b..31aeb19 100644
--- a/v23/syncbase/nosql/database.go
+++ b/v23/syncbase/nosql/database.go
@@ -10,15 +10,18 @@
"v.io/v23/context"
"v.io/v23/naming"
"v.io/v23/security/access"
+ "v.io/v23/verror"
+ "v.io/x/lib/vlog"
)
-func NewDatabase(parentFullName, relativeName string) *database {
+func NewDatabase(parentFullName, relativeName string, schema *Schema) *database {
fullName := naming.Join(parentFullName, relativeName)
return &database{
c: wire.DatabaseClient(fullName),
parentFullName: parentFullName,
fullName: fullName,
name: relativeName,
+ schema: schema,
}
}
@@ -27,6 +30,7 @@
parentFullName string
fullName string
name string
+ schema *Schema
}
var _ Database = (*database)(nil)
@@ -60,7 +64,11 @@
// Create implements Database.Create.
func (d *database) Create(ctx *context.T, perms access.Permissions) error {
- return d.c.Create(ctx, perms)
+ var schemaMetadata *wire.SchemaMetadata = nil
+ if d.schema != nil {
+ schemaMetadata = &d.schema.Metadata
+ }
+ return d.c.Create(ctx, perms, schemaMetadata)
}
// Delete implements Database.Delete.
@@ -108,7 +116,7 @@
if err != nil {
return nil, err
}
- return &batch{database: *NewDatabase(d.parentFullName, relativeName)}, nil
+ return &batch{database: *NewDatabase(d.parentFullName, relativeName, d.schema)}, nil
}
// SetPermissions implements Database.SetPermissions.
@@ -130,3 +138,61 @@
func (d *database) GetSyncGroupNames(ctx *context.T) ([]string, error) {
return d.c.GetSyncGroupNames(ctx)
}
+
+// UpgradeIfOutdated implements Database.UpgradeIfOutdated.
+func (d *database) UpgradeIfOutdated(ctx *context.T) (bool, error) {
+ var schema *Schema = d.schema
+ if schema == nil {
+ return false, verror.New(verror.ErrBadState, ctx, "Schema or SchemaMetadata cannot be nil. A valid Schema needs to be used when creating DB handle.")
+ }
+
+ if schema.Metadata.Version < 0 {
+ return false, verror.New(verror.ErrBadState, ctx, "Schema version cannot be less than zero.")
+ }
+
+ schemaMgr := d.getSchemaManager()
+ currMeta, err := schemaMgr.getSchemaMetadata(ctx)
+ if err != nil {
+ // If the client app did not set a schema as part of create db
+ // getSchemaMetadata() will return ErrNoExist. If so we set the schema
+ // here.
+ if verror.ErrorID(err) == verror.ErrNoExist.ID {
+ err := schemaMgr.setSchemaMetadata(ctx, schema.Metadata)
+ // The database may not yet exist. If so above call will return
+ // ErrNoExist and we return db without error. If the error
+ // is different then return the error to the caller.
+ if (err != nil) && (verror.ErrorID(err) != verror.ErrNoExist.ID) {
+ return false, err
+ }
+ return false, nil
+ }
+ return false, err
+ }
+
+ if currMeta.Version >= schema.Metadata.Version {
+ return false, nil
+ }
+ // Call the Upgrader provided by the app to upgrade the schema.
+ //
+ // TODO(jlodhia): disable sync before running Upgrader and reenable
+ // once Upgrader is finished.
+ //
+ // TODO(jlodhia): prevent other processes (local/remote) from accessing
+ // the database while upgrade is in progress.
+ upgradeErr := schema.Upgrader.Run(d, currMeta.Version, schema.Metadata.Version)
+ if upgradeErr != nil {
+ vlog.Error(upgradeErr)
+ return false, upgradeErr
+ }
+ // Update the schema metadata in db to the latest version.
+ metadataErr := schemaMgr.setSchemaMetadata(ctx, schema.Metadata)
+ if metadataErr != nil {
+ vlog.Error(metadataErr)
+ return false, metadataErr
+ }
+ return true, nil
+}
+
+func (d *database) getSchemaManager() schemaManagerImpl {
+ return newSchemaManager(d.fullName)
+}
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index cb74e01..dab14ee 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -104,6 +104,15 @@
// GetSyncGroupNames returns the global names of all SyncGroups attached to
// this database.
GetSyncGroupNames(ctx *context.T) ([]string, error)
+
+ // This method compares the current schema version of the database with the
+ // schema version provided while creating this database handle. If the
+ // current database schema version is lower, then the SchemaUpdater is
+ // called. If SchemaUpdater is successful this method stores the new schema
+ // metadata in database.
+ // Note: schema can be nil, in which case this method skips schema check
+ // and the caller is responsible for maintaining schema sanity.
+ UpgradeIfOutdated(ctx *context.T) (bool, error)
}
// BatchDatabase is a handle to a set of reads and writes to the database that
@@ -342,3 +351,26 @@
// SyncGroup ACL.
GetMembers(ctx *context.T) (map[string]wire.SyncGroupMemberInfo, error)
}
+
+// SchemaUpgrader interface must be implemented by the App in order to upgrade
+// the database schema from a lower version to a higher version.
+type SchemaUpgrader interface {
+ // Takes an instance of database and upgrades data from old
+ // schema to new schema. This method must be idempotent.
+ Run(db Database, oldVersion, newVersion int64) error
+}
+
+// Each database has a Schema associated with it which defines the current
+// version of the database. When a new version of app wishes to change
+// its data in a way that it is not compatible with the old app's data,
+// the app must change the schema version and provide relevant upgrade logic
+// in the Upgrader. The conflict resolution rules are also associated with the
+// schema version. Hence if the conflict resolution rules change then the schema
+// version also must be bumped.
+//
+// Schema provides metadata and a SchemaUpgrader for a given database.
+// SchemaUpgrader is purely local and not persisted.
+type Schema struct {
+ Metadata wire.SchemaMetadata
+ Upgrader SchemaUpgrader
+}
diff --git a/v23/syncbase/nosql/schema.go b/v23/syncbase/nosql/schema.go
new file mode 100644
index 0000000..9f52a1d
--- /dev/null
+++ b/v23/syncbase/nosql/schema.go
@@ -0,0 +1,45 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/v23/context"
+)
+
+// NewSchema creates a new Schema object.
+func NewSchema(metadata wire.SchemaMetadata, upgrader SchemaUpgrader) *Schema {
+ return &Schema{
+ Metadata: metadata,
+ Upgrader: upgrader,
+ }
+}
+
+//////////////////////////////////////////
+// Implementation of SchemaManager (Not part of public client API)
+
+type schemaManagerImpl struct {
+ dbName string
+ c wire.DatabaseClientMethods
+}
+
+func newSchemaManager(dbName string) schemaManagerImpl {
+ return schemaManagerImpl{
+ dbName: dbName,
+ c: wire.DatabaseClient(dbName),
+ }
+}
+
+// GetSchemaMetadata retrieves the schema metadata for the database it is
+// derived from.
+func (sm *schemaManagerImpl) getSchemaMetadata(ctx *context.T) (wire.SchemaMetadata, error) {
+ return sm.c.GetSchemaMetadata(ctx)
+}
+
+// SetSchemaMetadata stores the schema metadata for the database it is
+// derived from.
+func (sm *schemaManagerImpl) setSchemaMetadata(ctx *context.T, metadata wire.SchemaMetadata) error {
+ return sm.c.SetSchemaMetadata(ctx, metadata)
+}
diff --git a/v23/syncbase/nosql/schema_test.go b/v23/syncbase/nosql/schema_test.go
new file mode 100644
index 0000000..bcafcb7
--- /dev/null
+++ b/v23/syncbase/nosql/schema_test.go
@@ -0,0 +1,104 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql_test
+
+import (
+ "testing"
+
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/v23/syncbase"
+ tu "v.io/syncbase/v23/syncbase/testutil"
+ "v.io/v23/context"
+)
+
+// Tests schema checking logic within App.NoSQLDatabase() method.
+// This test as following steps:
+// 1) Call NoSQLDatabase() for a non existent db.
+// 2) Create the database, and verify if Schema got stored properly.
+// 3) Call UpgradeIfOutdated() to make sure that the method is no-op and is
+// able to read the schema from db.
+// 4) Call NoSQLDatabase() on the same db to create a new handle with an
+// upgraded schema, call UpgradeIfOutdated() and check if SchemaUpgrader
+// is called and if the new schema is stored appropriately.
+func TestSchemaCheck(t *testing.T) {
+ ctx, sName, cleanup := tu.SetupOrDie(nil)
+ defer cleanup()
+ a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+ schema := tu.DefaultSchema()
+ mockUpgrader := schema.Upgrader.(*tu.MockSchemaUpgrader)
+
+ db1 := a.NoSQLDatabase("db1", schema)
+
+ // Verify that calling Upgrade on a non existing database does not throw
+ // errors.
+ _, err := db1.UpgradeIfOutdated(ctx)
+ if err != nil {
+ t.Fatalf("db1.UpgradeIfOutdated() failed: %v", err)
+ }
+ if mockUpgrader.CallCount > 0 {
+ t.Fatal("Call to upgrader was not expected.")
+ }
+
+ // Create db1, this step also stores the schema provided above
+ if err := db1.Create(ctx, nil); err != nil {
+ t.Fatalf("db1.Create() failed: %v", err)
+ }
+ // verify if schema was stored as part of create
+ if _, err := getSchemaMetadata(ctx, db1.FullName()); err != nil {
+ t.Fatalf("Failed to lookup schema after create: %v", err)
+ }
+
+ // Make redundant call to Upgrade to verify that it is a no-op
+ result, err1 := db1.UpgradeIfOutdated(ctx)
+ if result {
+ t.Fatalf("db1.UpgradeIfOutdated() should not return true")
+ }
+ if err1 != nil {
+ t.Fatalf("db1.UpgradeIfOutdated() failed: %v", err1)
+ }
+ if mockUpgrader.CallCount > 0 {
+ t.Fatal("Call to upgrader was not expected.")
+ }
+
+ // try to make a new database object for the same database but this time
+ // with a new schema version
+ schema.Metadata.Version = 1
+ rule := wire.CrRule{"table1", "foo", "", wire.ResolverTypeLastWins}
+ policy := wire.CrPolicy{
+ Rules: []wire.CrRule{rule},
+ }
+ schema.Metadata.Policy = policy
+ otherdb1 := a.NoSQLDatabase("db1", schema)
+ otherresult, othererr := otherdb1.UpgradeIfOutdated(ctx)
+
+ if !otherresult {
+ t.Fatalf("otherdb1.UpgradeIfOutdated() expected to return true")
+ }
+ if othererr != nil {
+ t.Fatalf("otherdb1.UpgradeIfOutdated() failed: %v", othererr)
+ }
+ if mockUpgrader.CallCount != 1 {
+ t.Fatalf("Unexpected number of calls to upgrader. Expected: %d, Actual: %d.", 1, mockUpgrader.CallCount)
+ }
+
+ // check if the contents of SchemaMetadata are correctly stored in the db.
+ metadata, err3 := getSchemaMetadata(ctx, otherdb1.FullName())
+ if err3 != nil {
+ t.Fatalf("GetSchemaMetadata failed: %v", err3)
+ }
+ if metadata.Version != 1 {
+ t.Fatalf("Unexpected version number: %d", metadata.Version)
+ }
+ if len(metadata.Policy.Rules) != 1 {
+ t.Fatalf("Unexpected number of rules: %d", len(metadata.Policy.Rules))
+ }
+ if metadata.Policy.Rules[0] != rule {
+ t.Fatalf("Unexpected number of rules: %d", len(metadata.Policy.Rules))
+ }
+}
+
+func getSchemaMetadata(ctx *context.T, dbName string) (wire.SchemaMetadata, error) {
+ return wire.DatabaseClient(dbName).GetSchemaMetadata(ctx)
+}
diff --git a/v23/syncbase/nosql/syncgroup_test.go b/v23/syncbase/nosql/syncgroup_test.go
index 3b2da73..73e0550 100644
--- a/v23/syncbase/nosql/syncgroup_test.go
+++ b/v23/syncbase/nosql/syncgroup_test.go
@@ -101,7 +101,7 @@
// Create client2.
ctx2 := tu.NewCtx(ctx, rootp, "client2")
a2 := syncbase.NewService(sName).App("a")
- d2 := a2.NoSQLDatabase("d")
+ d2 := a2.NoSQLDatabase("d", nil)
// Check that client2's join fails if the perms disallow access.
joinSyncGroup(t, ctx2, d2, sgNameA, verror.ErrNoAccess.ID)
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index 182141a..adb0058 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -280,7 +280,7 @@
a := syncbase.NewService(args[0]).App("a")
a.Create(ctx, nil)
- d := a.NoSQLDatabase("d")
+ d := a.NoSQLDatabase("d", nil)
d.Create(ctx, nil)
d.CreateTable(ctx, "tb", nil)
@@ -292,7 +292,7 @@
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
- d := a.NoSQLDatabase("d")
+ d := a.NoSQLDatabase("d", nil)
mtName := env.Vars[ref.EnvNamespacePrefix]
spec := wire.SyncGroupSpec{
@@ -315,7 +315,7 @@
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
- d := a.NoSQLDatabase("d")
+ d := a.NoSQLDatabase("d", nil)
sg := d.SyncGroup(args[1])
info := wire.SyncGroupMemberInfo{10}
@@ -330,7 +330,7 @@
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
- d := a.NoSQLDatabase("d")
+ d := a.NoSQLDatabase("d", nil)
// Do Puts.
tb := d.Table("tb")
@@ -351,7 +351,7 @@
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
- d := a.NoSQLDatabase("d")
+ d := a.NoSQLDatabase("d", nil)
// Do Puts.
tb := d.Table("tb")
@@ -373,7 +373,7 @@
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
- d := a.NoSQLDatabase("d")
+ d := a.NoSQLDatabase("d", nil)
// Wait for a bit (up to 4 sec) until the last key appears.
tb := d.Table("tb")
@@ -433,7 +433,7 @@
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
- d := a.NoSQLDatabase("d")
+ d := a.NoSQLDatabase("d", nil)
tb := d.Table("tb")
// Verify through a scan that none of that data exists.
@@ -458,7 +458,7 @@
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
- d := a.NoSQLDatabase("d")
+ d := a.NoSQLDatabase("d", nil)
tb := d.Table("tb")
// Wait for a bit (up to 4 sec) until the last key appears.
@@ -504,7 +504,7 @@
defer shutdown()
a := syncbase.NewService(args[0]).App("a")
- d := a.NoSQLDatabase("d")
+ d := a.NoSQLDatabase("d", nil)
// Wait for a bit (up to 8 sec) until the last key appears. This chosen
// time interval is dependent on how fast the membership view is
@@ -560,7 +560,7 @@
for j := 0; j < numDbs; j++ {
dbName := fmt.Sprintf("d%d", j)
- d := a.NoSQLDatabase(dbName)
+ d := a.NoSQLDatabase(dbName, nil)
d.Create(ctx, nil)
for k := 0; k < numTbs; k++ {
@@ -594,7 +594,7 @@
// For each database...
for j := 0; j < numDbs; j++ {
dbName := fmt.Sprintf("d%d", j)
- d := a.NoSQLDatabase(dbName)
+ d := a.NoSQLDatabase(dbName, nil)
// For each table, pre-populate entries on each prefix.
// Also determine the SyncGroup prefixes.
@@ -653,7 +653,7 @@
for j := 0; j < numDbs; j++ {
dbName := fmt.Sprintf("d%d", j)
- d := a.NoSQLDatabase(dbName)
+ d := a.NoSQLDatabase(dbName, nil)
sgName := naming.Join(sgNamePrefix, appName, dbName)
sg := d.SyncGroup(sgName)
@@ -685,7 +685,7 @@
for j := 0; j < numDbs; j++ {
dbName := fmt.Sprintf("d%d", j)
- d := a.NoSQLDatabase(dbName)
+ d := a.NoSQLDatabase(dbName, nil)
for k := 0; k < numTbs; k++ {
tbName := fmt.Sprintf("tb%d", k)
diff --git a/v23/syncbase/testutil/layer.go b/v23/syncbase/testutil/layer.go
index 372ca3c..cf2c4cb 100644
--- a/v23/syncbase/testutil/layer.go
+++ b/v23/syncbase/testutil/layer.go
@@ -348,7 +348,7 @@
return a.ListDatabases(ctx)
}
func (a *app) Child(childName string) layer {
- return makeLayer(a.NoSQLDatabase(childName))
+ return makeLayer(a.NoSQLDatabase(childName, nil))
}
type database struct {
diff --git a/v23/syncbase/testutil/util.go b/v23/syncbase/testutil/util.go
index 9509c4c..f7c38b1 100644
--- a/v23/syncbase/testutil/util.go
+++ b/v23/syncbase/testutil/util.go
@@ -13,6 +13,7 @@
"runtime/debug"
"testing"
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/v23/syncbase"
"v.io/syncbase/v23/syncbase/nosql"
"v.io/syncbase/v23/syncbase/util"
@@ -48,7 +49,7 @@
}
func CreateNoSQLDatabase(t *testing.T, ctx *context.T, a syncbase.App, name string) nosql.Database {
- d := a.NoSQLDatabase(name)
+ d := a.NoSQLDatabase(name, nil)
if err := d.Create(ctx, nil); err != nil {
Fatalf(t, "d.Create() failed: %v", err)
}
@@ -180,6 +181,26 @@
}
}
+type MockSchemaUpgrader struct {
+ CallCount int
+}
+
+func (msu *MockSchemaUpgrader) Run(db nosql.Database, oldVersion, newVersion int64) error {
+ msu.CallCount++
+ return nil
+}
+
+var _ nosql.SchemaUpgrader = (*MockSchemaUpgrader)(nil)
+
+func DefaultSchema() *nosql.Schema {
+ return &nosql.Schema{
+ Metadata: wire.SchemaMetadata{
+ Version: 0,
+ },
+ Upgrader: nosql.SchemaUpgrader(&MockSchemaUpgrader{}),
+ }
+}
+
////////////////////////////////////////
// Internal helpers
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index 0c8958f..8862f57 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -9,6 +9,7 @@
"sync"
wire "v.io/syncbase/v23/services/syncbase"
+ nosqlwire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/util"
@@ -132,7 +133,7 @@
return dbNames, nil
}
-func (a *app) CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions) error {
+func (a *app) CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions, metadata *nosqlwire.SchemaMetadata) error {
if !a.exists {
vlog.Fatalf("app %q does not exist", a.name)
}
@@ -182,7 +183,7 @@
if perms == nil {
perms = aData.Perms
}
- d, err := nosql.NewDatabase(ctx, a, dbName, nosql.DatabaseOptions{
+ d, err := nosql.NewDatabase(ctx, a, dbName, metadata, nosql.DatabaseOptions{
Perms: perms,
RootDir: rootDir,
Engine: engine,
diff --git a/x/ref/services/syncbase/server/interfaces/app.go b/x/ref/services/syncbase/server/interfaces/app.go
index b1d5d4f..e990b29 100644
--- a/x/ref/services/syncbase/server/interfaces/app.go
+++ b/x/ref/services/syncbase/server/interfaces/app.go
@@ -5,6 +5,7 @@
package interfaces
import (
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/v23/context"
"v.io/v23/rpc"
"v.io/v23/security/access"
@@ -22,7 +23,7 @@
NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error)
// CreateNoSQLDatabase creates the specified NoSQL database.
- CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions) error
+ CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions, metadata *wire.SchemaMetadata) error
// DeleteNoSQLDatabase deletes the specified NoSQL database.
DeleteNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) error
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 2da519e..bfe419a 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -99,7 +99,7 @@
// NewDatabase creates a new database instance and returns it.
// Designed for use from within App.CreateNoSQLDatabase.
-func NewDatabase(ctx *context.T, a interfaces.App, name string, opts DatabaseOptions) (*database, error) {
+func NewDatabase(ctx *context.T, a interfaces.App, name string, metadata *wire.SchemaMetadata, opts DatabaseOptions) (*database, error) {
if opts.Perms == nil {
return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
}
@@ -108,8 +108,9 @@
return nil, err
}
data := &databaseData{
- Name: d.name,
- Perms: opts.Perms,
+ Name: d.name,
+ Perms: opts.Perms,
+ SchemaMetadata: metadata,
}
if err := util.Put(ctx, d.st, d.stKey(), data); err != nil {
return nil, err
@@ -120,7 +121,7 @@
////////////////////////////////////////
// RPC methods
-func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions, metadata *wire.SchemaMetadata) error {
if d.exists {
return verror.New(verror.ErrExist, ctx, d.name)
}
@@ -130,7 +131,7 @@
// This database does not yet exist; d is just an ephemeral handle that holds
// {name string, a *app}. d.a.CreateNoSQLDatabase will create a new database
// handle and store it in d.a.dbs[d.name].
- return d.a.CreateNoSQLDatabase(ctx, call, d.name, perms)
+ return d.a.CreateNoSQLDatabase(ctx, call, d.name, perms, metadata)
}
func (d *databaseReq) Delete(ctx *context.T, call rpc.ServerCall) error {
diff --git a/x/ref/services/syncbase/server/nosql/database_sm.go b/x/ref/services/syncbase/server/nosql/database_sm.go
new file mode 100644
index 0000000..5f69585
--- /dev/null
+++ b/x/ref/services/syncbase/server/nosql/database_sm.go
@@ -0,0 +1,53 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
+ "v.io/syncbase/x/ref/services/syncbase/server/util"
+ "v.io/syncbase/x/ref/services/syncbase/store"
+ "v.io/v23/context"
+ "v.io/v23/rpc"
+ "v.io/v23/verror"
+)
+
+////////////////////////////////////////
+// SchemaManager RPC methods
+
+func (d *databaseReq) GetSchemaMetadata(ctx *context.T, call rpc.ServerCall) (wire.SchemaMetadata, error) {
+ metadata := wire.SchemaMetadata{}
+
+ if !d.exists {
+ return metadata, verror.New(verror.ErrNoExist, ctx, d.Name())
+ }
+
+ // Check permissions on Database and retreve schema metadata.
+ dbData := databaseData{}
+ if err := util.GetWithAuth(ctx, call, d.st, d.stKey(), &dbData); err != nil {
+ return metadata, err
+ }
+ if dbData.SchemaMetadata == nil {
+ return metadata, verror.New(verror.ErrNoExist, ctx, "Schema does not exist for the db")
+ }
+ return *dbData.SchemaMetadata, nil
+}
+
+func (d *databaseReq) SetSchemaMetadata(ctx *context.T, call rpc.ServerCall, metadata wire.SchemaMetadata) error {
+ // Check if database exists
+ if !d.exists {
+ return verror.New(verror.ErrNoExist, ctx, "database: "+d.Name())
+ }
+
+ // Check permissions on Database and store schema metadata.
+ return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
+ dbData := databaseData{}
+ return util.UpdateWithAuth(ctx, call, st, d.stKey(), &dbData, func() error {
+ // NOTE: For now we expect the client to not issue multiple
+ // concurrent SetSchemaMetadata calls.
+ dbData.SchemaMetadata = &metadata
+ return nil
+ })
+ })
+}
diff --git a/x/ref/services/syncbase/server/nosql/types.vdl b/x/ref/services/syncbase/server/nosql/types.vdl
index 33d3883..8ede239 100644
--- a/x/ref/services/syncbase/server/nosql/types.vdl
+++ b/x/ref/services/syncbase/server/nosql/types.vdl
@@ -6,13 +6,15 @@
import (
"v.io/v23/security/access"
+ "v.io/syncbase/v23/services/syncbase/nosql"
)
// databaseData represents the persistent state of a Database.
type databaseData struct {
- Name string
- Version uint64 // covers the fields below
- Perms access.Permissions
+ Name string
+ Version uint64 // covers the Perms field below
+ Perms access.Permissions
+ SchemaMetadata ?nosql.SchemaMetadata
}
// tableData represents the persistent state of a Table.
diff --git a/x/ref/services/syncbase/server/nosql/types.vdl.go b/x/ref/services/syncbase/server/nosql/types.vdl.go
index 021cef6..bf5f346 100644
--- a/x/ref/services/syncbase/server/nosql/types.vdl.go
+++ b/x/ref/services/syncbase/server/nosql/types.vdl.go
@@ -12,14 +12,16 @@
"v.io/v23/vdl"
// VDL user imports
+ "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/v23/security/access"
)
// databaseData represents the persistent state of a Database.
type databaseData struct {
- Name string
- Version uint64 // covers the fields below
- Perms access.Permissions
+ Name string
+ Version uint64 // covers the Perms field below
+ Perms access.Permissions
+ SchemaMetadata *nosql.SchemaMetadata
}
func (databaseData) __VDLReflect(struct {
diff --git a/x/ref/services/syncbase/vsync/test_util.go b/x/ref/services/syncbase/vsync/test_util.go
index 556b2d7..9dd4e3e 100644
--- a/x/ref/services/syncbase/vsync/test_util.go
+++ b/x/ref/services/syncbase/vsync/test_util.go
@@ -12,6 +12,7 @@
"testing"
"time"
+ wire "v.io/syncbase/v23/services/syncbase/nosql"
"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
"v.io/syncbase/x/ref/services/syncbase/server/util"
"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -62,7 +63,7 @@
return []string{"mockdb"}, nil
}
-func (a *mockApp) CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions) error {
+func (a *mockApp) CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions, metadata *wire.SchemaMetadata) error {
return verror.NewErrNotImplemented(ctx)
}
diff --git a/x/ref/syncbase/sb51/shell.go b/x/ref/syncbase/sb51/shell.go
index a41d245..b5f18c0 100644
--- a/x/ref/syncbase/sb51/shell.go
+++ b/x/ref/syncbase/sb51/shell.go
@@ -154,7 +154,7 @@
return nil, err
}
}
- d := app.NoSQLDatabase(dbName)
+ d := app.NoSQLDatabase(dbName, nil)
if exists, err := d.Exists(ctx); err != nil {
return nil, fmt.Errorf("failed checking for db %q: %v", d.FullName(), err)
} else if !exists {