Add schema to syncbase
1) Add client APIs related to managing db schema
- adds schema arg to NoSQLDatabase() bind
- adds db.UpgradeIfOutdated() method
- adds db.GetSchemaManager() which returns an object with {Get,Set}SchemaMetadata
- adds Schema{metadata, upgrader} struct and SchemaUpgrader interface
2) Add vdl changes
- {Get,Set}SchemaMetadata methods
- optional SchemaMetadata arg to Database.Create

MultiPart: 1/2
Change-Id: Ia920bbc55c40145cbacf8245c4d1682dcb02496d
diff --git a/v23/services/syncbase/nosql/service.vdl b/v23/services/syncbase/nosql/service.vdl
index 9bdf954..b56275b 100644
--- a/v23/services/syncbase/nosql/service.vdl
+++ b/v23/services/syncbase/nosql/service.vdl
@@ -19,7 +19,7 @@
 	// Create creates this Database.
 	// If perms is nil, we inherit (copy) the App perms.
 	// Create requires the caller to have Write permission at the App.
-	Create(perms access.Permissions) error {access.Write}
+	Create(perms access.Permissions, metadata ?SchemaMetadata) error {access.Write}
 
 	// Delete deletes this Database.
 	Delete() error {access.Write}
@@ -34,6 +34,7 @@
 	// Database handle bound to this batch. If this Database is already bound to a
 	// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics
 	// are documented in model.go.
+	// TODO(sadovsky): make BatchOptions optional
 	BeginBatch(bo BatchOptions) (string | error) {access.Read}
 
 	// Commit persists the pending changes to the database.
@@ -65,6 +66,10 @@
 	// BlobManager implements the API for managing blobs attached to rows in
 	// a Database.
 	BlobManager
+
+	// SchemaManager implements the API for managing schema metadata attached
+	// to a Database.
+	SchemaManager
 }
 
 // Table represents a collection of Rows.
@@ -207,6 +212,20 @@
 	// aggressively", "sync once per day".
 }
 
+// SchemaManager implements the API for managing schema metadata attached
+// to a Database.
+type SchemaManager interface {
+	// GetSchemaMetadata retrieves schema metadata for this database.
+	//
+	// Requires: Client must have at least Read access on the Database.
+	GetSchemaMetadata() (SchemaMetadata | error) {access.Read}
+
+	// SetSchemaMetadata stores schema metadata for this database.
+	//
+	// Requires: Client must have at least Write access on the Database.
+	SetSchemaMetadata(metadata SchemaMetadata) error {access.Write}
+}
+
 // BlobManager is the interface for blob operations.
 type BlobManager interface {
 	// API for resumable blob creation (append-only). After commit, a blob
diff --git a/v23/services/syncbase/nosql/service.vdl.go b/v23/services/syncbase/nosql/service.vdl.go
index 56c50fa..447bd7d 100644
--- a/v23/services/syncbase/nosql/service.vdl.go
+++ b/v23/services/syncbase/nosql/service.vdl.go
@@ -1421,6 +1421,142 @@
 	return s.s.Send(item)
 }
 
+// SchemaManagerClientMethods is the client interface
+// containing SchemaManager methods.
+//
+// SchemaManager implements the API for managing schema metadata attached
+// to a Database.
+type SchemaManagerClientMethods interface {
+	// GetSchemaMetadata retrieves schema metadata for this database.
+	//
+	// Requires: Client must have at least Read access on the Database.
+	GetSchemaMetadata(*context.T, ...rpc.CallOpt) (SchemaMetadata, error)
+	// SetSchemaMetadata stores schema metadata for this database.
+	//
+	// Requires: Client must have at least Write access on the Database.
+	SetSchemaMetadata(ctx *context.T, metadata SchemaMetadata, opts ...rpc.CallOpt) error
+}
+
+// SchemaManagerClientStub adds universal methods to SchemaManagerClientMethods.
+type SchemaManagerClientStub interface {
+	SchemaManagerClientMethods
+	rpc.UniversalServiceMethods
+}
+
+// SchemaManagerClient returns a client stub for SchemaManager.
+func SchemaManagerClient(name string) SchemaManagerClientStub {
+	return implSchemaManagerClientStub{name}
+}
+
+type implSchemaManagerClientStub struct {
+	name string
+}
+
+func (c implSchemaManagerClientStub) GetSchemaMetadata(ctx *context.T, opts ...rpc.CallOpt) (o0 SchemaMetadata, err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "GetSchemaMetadata", nil, []interface{}{&o0}, opts...)
+	return
+}
+
+func (c implSchemaManagerClientStub) SetSchemaMetadata(ctx *context.T, i0 SchemaMetadata, opts ...rpc.CallOpt) (err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "SetSchemaMetadata", []interface{}{i0}, nil, opts...)
+	return
+}
+
+// SchemaManagerServerMethods is the interface a server writer
+// implements for SchemaManager.
+//
+// SchemaManager implements the API for managing schema metadata attached
+// to a Database.
+type SchemaManagerServerMethods interface {
+	// GetSchemaMetadata retrieves schema metadata for this database.
+	//
+	// Requires: Client must have at least Read access on the Database.
+	GetSchemaMetadata(*context.T, rpc.ServerCall) (SchemaMetadata, error)
+	// SetSchemaMetadata stores schema metadata for this database.
+	//
+	// Requires: Client must have at least Write access on the Database.
+	SetSchemaMetadata(ctx *context.T, call rpc.ServerCall, metadata SchemaMetadata) error
+}
+
+// SchemaManagerServerStubMethods is the server interface containing
+// SchemaManager methods, as expected by rpc.Server.
+// There is no difference between this interface and SchemaManagerServerMethods
+// since there are no streaming methods.
+type SchemaManagerServerStubMethods SchemaManagerServerMethods
+
+// SchemaManagerServerStub adds universal methods to SchemaManagerServerStubMethods.
+type SchemaManagerServerStub interface {
+	SchemaManagerServerStubMethods
+	// Describe the SchemaManager interfaces.
+	Describe__() []rpc.InterfaceDesc
+}
+
+// SchemaManagerServer returns a server stub for SchemaManager.
+// It converts an implementation of SchemaManagerServerMethods into
+// an object that may be used by rpc.Server.
+func SchemaManagerServer(impl SchemaManagerServerMethods) SchemaManagerServerStub {
+	stub := implSchemaManagerServerStub{
+		impl: impl,
+	}
+	// Initialize GlobState; always check the stub itself first, to handle the
+	// case where the user has the Glob method defined in their VDL source.
+	if gs := rpc.NewGlobState(stub); gs != nil {
+		stub.gs = gs
+	} else if gs := rpc.NewGlobState(impl); gs != nil {
+		stub.gs = gs
+	}
+	return stub
+}
+
+type implSchemaManagerServerStub struct {
+	impl SchemaManagerServerMethods
+	gs   *rpc.GlobState
+}
+
+func (s implSchemaManagerServerStub) GetSchemaMetadata(ctx *context.T, call rpc.ServerCall) (SchemaMetadata, error) {
+	return s.impl.GetSchemaMetadata(ctx, call)
+}
+
+func (s implSchemaManagerServerStub) SetSchemaMetadata(ctx *context.T, call rpc.ServerCall, i0 SchemaMetadata) error {
+	return s.impl.SetSchemaMetadata(ctx, call, i0)
+}
+
+func (s implSchemaManagerServerStub) Globber() *rpc.GlobState {
+	return s.gs
+}
+
+func (s implSchemaManagerServerStub) Describe__() []rpc.InterfaceDesc {
+	return []rpc.InterfaceDesc{SchemaManagerDesc}
+}
+
+// SchemaManagerDesc describes the SchemaManager interface.
+var SchemaManagerDesc rpc.InterfaceDesc = descSchemaManager
+
+// descSchemaManager hides the desc to keep godoc clean.
+var descSchemaManager = rpc.InterfaceDesc{
+	Name:    "SchemaManager",
+	PkgPath: "v.io/syncbase/v23/services/syncbase/nosql",
+	Doc:     "// SchemaManager implements the API for managing schema metadata attached\n// to a Database.",
+	Methods: []rpc.MethodDesc{
+		{
+			Name: "GetSchemaMetadata",
+			Doc:  "// GetSchemaMetadata retrieves schema metadata for this database.\n//\n// Requires: Client must have at least Read access on the Database.",
+			OutArgs: []rpc.ArgDesc{
+				{"", ``}, // SchemaMetadata
+			},
+			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Read"))},
+		},
+		{
+			Name: "SetSchemaMetadata",
+			Doc:  "// SetSchemaMetadata stores schema metadata for this database.\n//\n// Requires: Client must have at least Write access on the Database.",
+			InArgs: []rpc.ArgDesc{
+				{"metadata", ``}, // SchemaMetadata
+			},
+			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
+		},
+	},
+}
+
 // DatabaseClientMethods is the client interface
 // containing Database methods.
 //
@@ -1505,10 +1641,13 @@
 	SyncGroupManagerClientMethods
 	// BlobManager is the interface for blob operations.
 	BlobManagerClientMethods
+	// SchemaManager implements the API for managing schema metadata attached
+	// to a Database.
+	SchemaManagerClientMethods
 	// Create creates this Database.
 	// If perms is nil, we inherit (copy) the App perms.
 	// Create requires the caller to have Write permission at the App.
-	Create(ctx *context.T, perms access.Permissions, opts ...rpc.CallOpt) error
+	Create(ctx *context.T, perms access.Permissions, metadata *SchemaMetadata, opts ...rpc.CallOpt) error
 	// Delete deletes this Database.
 	Delete(*context.T, ...rpc.CallOpt) error
 	// Exists returns true only if this Database exists. Insufficient permissions
@@ -1520,6 +1659,7 @@
 	// Database handle bound to this batch. If this Database is already bound to a
 	// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics
 	// are documented in model.go.
+	// TODO(sadovsky): make BatchOptions optional
 	BeginBatch(ctx *context.T, bo BatchOptions, opts ...rpc.CallOpt) (string, error)
 	// Commit persists the pending changes to the database.
 	// If this Database is not bound to a batch, Commit() will fail with
@@ -1544,7 +1684,7 @@
 
 // DatabaseClient returns a client stub for Database.
 func DatabaseClient(name string) DatabaseClientStub {
-	return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name), BlobManagerClient(name)}
+	return implDatabaseClientStub{name, permissions.ObjectClient(name), DatabaseWatcherClient(name), SyncGroupManagerClient(name), BlobManagerClient(name), SchemaManagerClient(name)}
 }
 
 type implDatabaseClientStub struct {
@@ -1554,10 +1694,11 @@
 	DatabaseWatcherClientStub
 	SyncGroupManagerClientStub
 	BlobManagerClientStub
+	SchemaManagerClientStub
 }
 
-func (c implDatabaseClientStub) Create(ctx *context.T, i0 access.Permissions, opts ...rpc.CallOpt) (err error) {
-	err = v23.GetClient(ctx).Call(ctx, c.name, "Create", []interface{}{i0}, nil, opts...)
+func (c implDatabaseClientStub) Create(ctx *context.T, i0 access.Permissions, i1 *SchemaMetadata, opts ...rpc.CallOpt) (err error) {
+	err = v23.GetClient(ctx).Call(ctx, c.name, "Create", []interface{}{i0, i1}, nil, opts...)
 	return
 }
 
@@ -1747,10 +1888,13 @@
 	SyncGroupManagerServerMethods
 	// BlobManager is the interface for blob operations.
 	BlobManagerServerMethods
+	// SchemaManager implements the API for managing schema metadata attached
+	// to a Database.
+	SchemaManagerServerMethods
 	// Create creates this Database.
 	// If perms is nil, we inherit (copy) the App perms.
 	// Create requires the caller to have Write permission at the App.
-	Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error
+	Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions, metadata *SchemaMetadata) error
 	// Delete deletes this Database.
 	Delete(*context.T, rpc.ServerCall) error
 	// Exists returns true only if this Database exists. Insufficient permissions
@@ -1762,6 +1906,7 @@
 	// Database handle bound to this batch. If this Database is already bound to a
 	// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics
 	// are documented in model.go.
+	// TODO(sadovsky): make BatchOptions optional
 	BeginBatch(ctx *context.T, call rpc.ServerCall, bo BatchOptions) (string, error)
 	// Commit persists the pending changes to the database.
 	// If this Database is not bound to a batch, Commit() will fail with
@@ -1858,10 +2003,13 @@
 	SyncGroupManagerServerStubMethods
 	// BlobManager is the interface for blob operations.
 	BlobManagerServerStubMethods
+	// SchemaManager implements the API for managing schema metadata attached
+	// to a Database.
+	SchemaManagerServerStubMethods
 	// Create creates this Database.
 	// If perms is nil, we inherit (copy) the App perms.
 	// Create requires the caller to have Write permission at the App.
-	Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error
+	Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions, metadata *SchemaMetadata) error
 	// Delete deletes this Database.
 	Delete(*context.T, rpc.ServerCall) error
 	// Exists returns true only if this Database exists. Insufficient permissions
@@ -1873,6 +2021,7 @@
 	// Database handle bound to this batch. If this Database is already bound to a
 	// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics
 	// are documented in model.go.
+	// TODO(sadovsky): make BatchOptions optional
 	BeginBatch(ctx *context.T, call rpc.ServerCall, bo BatchOptions) (string, error)
 	// Commit persists the pending changes to the database.
 	// If this Database is not bound to a batch, Commit() will fail with
@@ -1906,6 +2055,7 @@
 		DatabaseWatcherServerStub:  DatabaseWatcherServer(impl),
 		SyncGroupManagerServerStub: SyncGroupManagerServer(impl),
 		BlobManagerServerStub:      BlobManagerServer(impl),
+		SchemaManagerServerStub:    SchemaManagerServer(impl),
 	}
 	// Initialize GlobState; always check the stub itself first, to handle the
 	// case where the user has the Glob method defined in their VDL source.
@@ -1923,11 +2073,12 @@
 	DatabaseWatcherServerStub
 	SyncGroupManagerServerStub
 	BlobManagerServerStub
+	SchemaManagerServerStub
 	gs *rpc.GlobState
 }
 
-func (s implDatabaseServerStub) Create(ctx *context.T, call rpc.ServerCall, i0 access.Permissions) error {
-	return s.impl.Create(ctx, call, i0)
+func (s implDatabaseServerStub) Create(ctx *context.T, call rpc.ServerCall, i0 access.Permissions, i1 *SchemaMetadata) error {
+	return s.impl.Create(ctx, call, i0, i1)
 }
 
 func (s implDatabaseServerStub) Delete(ctx *context.T, call rpc.ServerCall) error {
@@ -1959,7 +2110,7 @@
 }
 
 func (s implDatabaseServerStub) Describe__() []rpc.InterfaceDesc {
-	return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc}
+	return []rpc.InterfaceDesc{DatabaseDesc, permissions.ObjectDesc, DatabaseWatcherDesc, SyncGroupManagerDesc, BlobManagerDesc, SchemaManagerDesc}
 }
 
 // DatabaseDesc describes the Database interface.
@@ -1975,13 +2126,15 @@
 		{"DatabaseWatcher", "v.io/syncbase/v23/services/syncbase/nosql", "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."},
 		{"SyncGroupManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SyncGroupManager is the interface for SyncGroup operations.\n// TODO(hpucha): Add blessings to create/join and add a refresh method."},
 		{"BlobManager", "v.io/syncbase/v23/services/syncbase/nosql", "// BlobManager is the interface for blob operations."},
+		{"SchemaManager", "v.io/syncbase/v23/services/syncbase/nosql", "// SchemaManager implements the API for managing schema metadata attached\n// to a Database."},
 	},
 	Methods: []rpc.MethodDesc{
 		{
 			Name: "Create",
 			Doc:  "// Create creates this Database.\n// If perms is nil, we inherit (copy) the App perms.\n// Create requires the caller to have Write permission at the App.",
 			InArgs: []rpc.ArgDesc{
-				{"perms", ``}, // access.Permissions
+				{"perms", ``},    // access.Permissions
+				{"metadata", ``}, // *SchemaMetadata
 			},
 			Tags: []*vdl.Value{vdl.ValueOf(access.Tag("Write"))},
 		},
@@ -2000,7 +2153,7 @@
 		},
 		{
 			Name: "BeginBatch",
-			Doc:  "// BeginBatch creates a new batch. It returns an App-relative name for a\n// Database handle bound to this batch. If this Database is already bound to a\n// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics\n// are documented in model.go.",
+			Doc:  "// BeginBatch creates a new batch. It returns an App-relative name for a\n// Database handle bound to this batch. If this Database is already bound to a\n// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics\n// are documented in model.go.\n// TODO(sadovsky): make BatchOptions optional",
 			InArgs: []rpc.ArgDesc{
 				{"bo", ``}, // BatchOptions
 			},
diff --git a/v23/services/syncbase/nosql/types.vdl b/v23/services/syncbase/nosql/types.vdl
index 1d03be6..e6c0415 100644
--- a/v23/services/syncbase/nosql/types.vdl
+++ b/v23/services/syncbase/nosql/types.vdl
@@ -74,6 +74,81 @@
 	SyncPriority byte
 }
 
+// ResolverType defines the possible conflict resolution policies.
+// A Conflict is defined as presence of two independent sets of updates
+// originating from the same version of an object. Syncbase
+// uses version vectors to determine sequence of changes to a given row. Hence
+// if device A updates a row with key "foo" from version V3 to V4, then syncs
+// with device B which further updates the same row from version V4 to V5 and
+// then V5 is synced back to device A, device A will see V5 as a forward
+// progression of "foo" and not a conflict with V3 of "foo". But in the
+// meantime if device A had already updated "foo" again from version V4 to
+// version V6 then there is a conflict between V5 and V6 with V4 being the
+// common ancestor.
+type ResolverType enum {
+	// LastWins is a policy which resolves a conflict between two writes by
+	// choosing the version which has the greatest timestamp.
+	// For example, if device A created V6 at time T1 and device B created V5 at
+	// time T2 where timestamp T2 > T1, then V5 will be accepted as the
+	// resolution of the conflict.
+	// Syncbase maintains an internal clock for creating write timestamps. This
+	// clock is kept up to date via regular synchronization with NTP and with
+	// other syncbases. As long as the syncbase has access to an NTP server
+	// directly or indirectly via another syncbase, the internal clock will
+	// track NTP time and correct any skews suffered by the local system clock.
+	LastWins
+
+	// AppResolves is a policy which allows an App to handle resolution of a
+	// conflict on its own. In order to receive the conflicts, the app needs to
+	// register a conflict resolution stream using
+	// Database.StartConflictResolver().
+	AppResolves
+
+	// Defer is a policy that allows an instance of an App to outsource its
+	// conflict resolution to some other instance, typically a more capable
+	// instance (in terms of resources, knowledge or permissions) such as a
+	// cloud or admin instance.
+	Defer
+}
+
+// SchemaMetadata maintains metadata related to the schema of a given database.
+// There is one SchemaMetadata per database.
+type SchemaMetadata struct {
+	// Non negative Schema version number. Should be increased with every schema change
+	// (e.g. adding fields to structs) that cannot be handled by previous
+	// versions of the app.
+	Version int64
+	Policy  CrPolicy
+}
+
+// For a given row with a conflict, all rules are matched against the row.
+// If no rules match the row, we default to "LastWins". If multiple
+// rules match the row, ties are broken as follows:
+//  1. If one match has a longer prefix than the other, take that one.
+//  2. Else, if only one match specifies a type, take that one.
+//  3. Else, the two matches are identical; take the last one in the Rules array.
+type CrPolicy struct {
+	Rules []CrRule
+}
+
+// CrRule provides a filter and the type of resolution to perform for a row
+// under conflict that passes the filter.
+type CrRule struct {
+	// TableName is the name of the table that this rule applies to.
+	TableName	string
+
+	// KeyPrefix represents the set of keys within the given table for which
+	// this policy applies. TableName must not be empty if this field is set.
+	KeyPrefix	string
+
+	// Type includes the full package path for the value type for which this
+	// policy applies.
+	Type 		string
+
+	// Policy for resolving conflict.
+	Resolver 	ResolverType
+}
+
 // BlobRef is a reference to a blob.
 type BlobRef string
 
diff --git a/v23/services/syncbase/nosql/types.vdl.go b/v23/services/syncbase/nosql/types.vdl.go
index 040e253..0f05cca 100644
--- a/v23/services/syncbase/nosql/types.vdl.go
+++ b/v23/services/syncbase/nosql/types.vdl.go
@@ -102,6 +102,120 @@
 }) {
 }
 
+// ResolverType defines the possible conflict resolution policies.
+// A Conflict is defined as presence of two independent sets of updates
+// originating from the same version of an object. Syncbase
+// uses version vectors to determine sequence of changes to a given row. Hence
+// if device A updates a row with key "foo" from version V3 to V4, then syncs
+// with device B which further updates the same row from version V4 to V5 and
+// then V5 is synced back to device A, device A will see V5 as a forward
+// progression of "foo" and not a conflict with V3 of "foo". But in the
+// meantime if device A had already updated "foo" again from version V4 to
+// version V6 then there is a conflict between V5 and V6 with V4 being the
+// common ancestor.
+type ResolverType int
+
+const (
+	ResolverTypeLastWins ResolverType = iota
+	ResolverTypeAppResolves
+	ResolverTypeDefer
+)
+
+// ResolverTypeAll holds all labels for ResolverType.
+var ResolverTypeAll = [...]ResolverType{ResolverTypeLastWins, ResolverTypeAppResolves, ResolverTypeDefer}
+
+// ResolverTypeFromString creates a ResolverType from a string label.
+func ResolverTypeFromString(label string) (x ResolverType, err error) {
+	err = x.Set(label)
+	return
+}
+
+// Set assigns label to x.
+func (x *ResolverType) Set(label string) error {
+	switch label {
+	case "LastWins", "lastwins":
+		*x = ResolverTypeLastWins
+		return nil
+	case "AppResolves", "appresolves":
+		*x = ResolverTypeAppResolves
+		return nil
+	case "Defer", "defer":
+		*x = ResolverTypeDefer
+		return nil
+	}
+	*x = -1
+	return fmt.Errorf("unknown label %q in nosql.ResolverType", label)
+}
+
+// String returns the string label of x.
+func (x ResolverType) String() string {
+	switch x {
+	case ResolverTypeLastWins:
+		return "LastWins"
+	case ResolverTypeAppResolves:
+		return "AppResolves"
+	case ResolverTypeDefer:
+		return "Defer"
+	}
+	return ""
+}
+
+func (ResolverType) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.ResolverType"`
+	Enum struct{ LastWins, AppResolves, Defer string }
+}) {
+}
+
+// SchemaMetadata maintains metadata related to the schema of a given database.
+// There is one SchemaMetadata per database.
+type SchemaMetadata struct {
+	// Non negative Schema version number. Should be increased with every schema change
+	// (e.g. adding fields to structs) that cannot be handled by previous
+	// versions of the app.
+	Version int64
+	Policy  CrPolicy
+}
+
+func (SchemaMetadata) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.SchemaMetadata"`
+}) {
+}
+
+// For a given row with a conflict, all rules are matched against the row.
+// If no rules match the row, we default to "LastWins". If multiple
+// rules match the row, ties are broken as follows:
+//  1. If one match has a longer prefix than the other, take that one.
+//  2. Else, if only one match specifies a type, take that one.
+//  3. Else, the two matches are identical; take the last one in the Rules array.
+type CrPolicy struct {
+	Rules []CrRule
+}
+
+func (CrPolicy) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.CrPolicy"`
+}) {
+}
+
+// CrRule provides a filter and the type of resolution to perform for a row
+// under conflict that passes the filter.
+type CrRule struct {
+	// TableName is the name of the table that this rule applies to.
+	TableName string
+	// KeyPrefix represents the set of keys within the given table for which
+	// this policy applies. TableName must not be empty if this field is set.
+	KeyPrefix string
+	// Type includes the full package path for the value type for which this
+	// policy applies.
+	Type string
+	// Policy for resolving conflict.
+	Resolver ResolverType
+}
+
+func (CrRule) __VDLReflect(struct {
+	Name string `vdl:"v.io/syncbase/v23/services/syncbase/nosql.CrRule"`
+}) {
+}
+
 // BlobRef is a reference to a blob.
 type BlobRef string
 
@@ -305,6 +419,10 @@
 	vdl.Register((*KeyValue)(nil))
 	vdl.Register((*SyncGroupSpec)(nil))
 	vdl.Register((*SyncGroupMemberInfo)(nil))
+	vdl.Register((*ResolverType)(nil))
+	vdl.Register((*SchemaMetadata)(nil))
+	vdl.Register((*CrPolicy)(nil))
+	vdl.Register((*CrRule)(nil))
 	vdl.Register((*BlobRef)(nil))
 	vdl.Register((*FetchState)(nil))
 	vdl.Register((*FetchStatus)(nil))
diff --git a/v23/syncbase/app.go b/v23/syncbase/app.go
index e9ec228..85142db 100644
--- a/v23/syncbase/app.go
+++ b/v23/syncbase/app.go
@@ -48,8 +48,8 @@
 }
 
 // NoSQLDatabase implements App.NoSQLDatabase.
-func (a *app) NoSQLDatabase(relativeName string) nosql.Database {
-	return nosql.NewDatabase(a.fullName, relativeName)
+func (a *app) NoSQLDatabase(relativeName string, schema *nosql.Schema) nosql.Database {
+	return nosql.NewDatabase(a.fullName, relativeName, schema)
 }
 
 // ListDatabases implements App.ListDatabases.
diff --git a/v23/syncbase/client_v23_test.go b/v23/syncbase/client_v23_test.go
index eaadc2c..ef71c65 100644
--- a/v23/syncbase/client_v23_test.go
+++ b/v23/syncbase/client_v23_test.go
@@ -52,7 +52,7 @@
 	if err := a.Create(ctx, nil); err != nil {
 		return fmt.Errorf("unable to create an app: %v", err)
 	}
-	d := a.NoSQLDatabase("d")
+	d := a.NoSQLDatabase("d", nil)
 	if err := d.Create(ctx, nil); err != nil {
 		return fmt.Errorf("unable to create a database: %v", err)
 	}
@@ -123,7 +123,7 @@
 		if err := a.Create(ctx, nil); err != nil {
 			return fmt.Errorf("a.Create() failed: %v", err)
 		}
-		for _, d := range []nosql.Database{a.NoSQLDatabase("d1"), a.NoSQLDatabase("d2")} {
+		for _, d := range []nosql.Database{a.NoSQLDatabase("d1", nil), a.NoSQLDatabase("d2", nil)} {
 			if err := d.Create(ctx, nil); err != nil {
 				return fmt.Errorf("d.Create() failed: %v", err)
 			}
@@ -166,7 +166,7 @@
 			return fmt.Errorf("Databases do not match: got %v, want %v", got, want)
 		}
 		for _, dName := range want {
-			d := a.NoSQLDatabase(dName)
+			d := a.NoSQLDatabase(dName, nil)
 			if got, err = d.ListTables(ctx); err != nil {
 				return fmt.Errorf("d.ListTables() failed: %v", err)
 			}
diff --git a/v23/syncbase/model.go b/v23/syncbase/model.go
index 184bd8c..dff6f76 100644
--- a/v23/syncbase/model.go
+++ b/v23/syncbase/model.go
@@ -55,7 +55,9 @@
 
 	// NoSQLDatabase returns the nosql.Database with the given name.
 	// relativeName must not contain slashes.
-	NoSQLDatabase(relativeName string) nosql.Database
+	// schema can be nil only if schema was never set for the database in the
+	// first place. See nosql.Schema for more details.
+	NoSQLDatabase(relativeName string, schema *nosql.Schema) nosql.Database
 
 	// ListDatabases returns a list of all Database names.
 	// TODO(kash): Include the database type (NoSQL vs. SQL).
diff --git a/v23/syncbase/nosql/batch_test.go b/v23/syncbase/nosql/batch_test.go
index be137ac..bbee3af 100644
--- a/v23/syncbase/nosql/batch_test.go
+++ b/v23/syncbase/nosql/batch_test.go
@@ -510,7 +510,7 @@
 	// ErrBoundToBatch specifically since in practice bc.Create() will return
 	// either ErrExist or "invalid name" depending on whether the database and
 	// batch exist.
-	if err := bc.Create(ctx, nil); err == nil {
+	if err := bc.Create(ctx, nil, nil); err == nil {
 		t.Fatalf("bc.Create() should have failed: %v", err)
 	}
 	if err := bc.Delete(ctx); verror.ErrorID(err) != wire.ErrBoundToBatch.ID {
@@ -545,7 +545,7 @@
 	ctx, sName, cleanup := tu.SetupOrDie(nil)
 	defer cleanup()
 	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
-	d := a.NoSQLDatabase("d")
+	d := a.NoSQLDatabase("d", nil)
 	if _, err := d.BeginBatch(ctx, wire.BatchOptions{}); verror.ErrorID(err) != verror.ErrNoExist.ID {
 		t.Fatalf("d.BeginBatch() should have failed: %v", err)
 	}
diff --git a/v23/syncbase/nosql/client_test.go b/v23/syncbase/nosql/client_test.go
index 731b689..bca7a84 100644
--- a/v23/syncbase/nosql/client_test.go
+++ b/v23/syncbase/nosql/client_test.go
@@ -23,7 +23,7 @@
 
 // Tests various Name, FullName, and Key methods.
 func TestNameAndKey(t *testing.T) {
-	d := syncbase.NewService("s").App("a").NoSQLDatabase("d")
+	d := syncbase.NewService("s").App("a").NoSQLDatabase("d", nil)
 	tb := d.Table("tb")
 	r := tb.Row("r")
 
diff --git a/v23/syncbase/nosql/database.go b/v23/syncbase/nosql/database.go
index 030a44b..31aeb19 100644
--- a/v23/syncbase/nosql/database.go
+++ b/v23/syncbase/nosql/database.go
@@ -10,15 +10,18 @@
 	"v.io/v23/context"
 	"v.io/v23/naming"
 	"v.io/v23/security/access"
+	"v.io/v23/verror"
+	"v.io/x/lib/vlog"
 )
 
-func NewDatabase(parentFullName, relativeName string) *database {
+func NewDatabase(parentFullName, relativeName string, schema *Schema) *database {
 	fullName := naming.Join(parentFullName, relativeName)
 	return &database{
 		c:              wire.DatabaseClient(fullName),
 		parentFullName: parentFullName,
 		fullName:       fullName,
 		name:           relativeName,
+		schema:         schema,
 	}
 }
 
@@ -27,6 +30,7 @@
 	parentFullName string
 	fullName       string
 	name           string
+	schema         *Schema
 }
 
 var _ Database = (*database)(nil)
@@ -60,7 +64,11 @@
 
 // Create implements Database.Create.
 func (d *database) Create(ctx *context.T, perms access.Permissions) error {
-	return d.c.Create(ctx, perms)
+	var schemaMetadata *wire.SchemaMetadata = nil
+	if d.schema != nil {
+		schemaMetadata = &d.schema.Metadata
+	}
+	return d.c.Create(ctx, perms, schemaMetadata)
 }
 
 // Delete implements Database.Delete.
@@ -108,7 +116,7 @@
 	if err != nil {
 		return nil, err
 	}
-	return &batch{database: *NewDatabase(d.parentFullName, relativeName)}, nil
+	return &batch{database: *NewDatabase(d.parentFullName, relativeName, d.schema)}, nil
 }
 
 // SetPermissions implements Database.SetPermissions.
@@ -130,3 +138,61 @@
 func (d *database) GetSyncGroupNames(ctx *context.T) ([]string, error) {
 	return d.c.GetSyncGroupNames(ctx)
 }
+
+// UpgradeIfOutdated implements Database.UpgradeIfOutdated.
+func (d *database) UpgradeIfOutdated(ctx *context.T) (bool, error) {
+	var schema *Schema = d.schema
+	if schema == nil {
+		return false, verror.New(verror.ErrBadState, ctx, "Schema or SchemaMetadata cannot be nil. A valid Schema needs to be used when creating DB handle.")
+	}
+
+	if schema.Metadata.Version < 0 {
+		return false, verror.New(verror.ErrBadState, ctx, "Schema version cannot be less than zero.")
+	}
+
+	schemaMgr := d.getSchemaManager()
+	currMeta, err := schemaMgr.getSchemaMetadata(ctx)
+	if err != nil {
+		// If the client app did not set a schema as part of create db
+		// getSchemaMetadata() will return ErrNoExist. If so we set the schema
+		// here.
+		if verror.ErrorID(err) == verror.ErrNoExist.ID {
+			err := schemaMgr.setSchemaMetadata(ctx, schema.Metadata)
+			// The database may not yet exist. If so above call will return
+			// ErrNoExist and we return db without error. If the error
+			// is different then return the error to the caller.
+			if (err != nil) && (verror.ErrorID(err) != verror.ErrNoExist.ID) {
+				return false, err
+			}
+			return false, nil
+		}
+		return false, err
+	}
+
+	if currMeta.Version >= schema.Metadata.Version {
+		return false, nil
+	}
+	// Call the Upgrader provided by the app to upgrade the schema.
+	//
+	// TODO(jlodhia): disable sync before running Upgrader and reenable
+	// once Upgrader is finished.
+	//
+	// TODO(jlodhia): prevent other processes (local/remote) from accessing
+	// the database while upgrade is in progress.
+	upgradeErr := schema.Upgrader.Run(d, currMeta.Version, schema.Metadata.Version)
+	if upgradeErr != nil {
+		vlog.Error(upgradeErr)
+		return false, upgradeErr
+	}
+	// Update the schema metadata in db to the latest version.
+	metadataErr := schemaMgr.setSchemaMetadata(ctx, schema.Metadata)
+	if metadataErr != nil {
+		vlog.Error(metadataErr)
+		return false, metadataErr
+	}
+	return true, nil
+}
+
+func (d *database) getSchemaManager() schemaManagerImpl {
+	return newSchemaManager(d.fullName)
+}
diff --git a/v23/syncbase/nosql/model.go b/v23/syncbase/nosql/model.go
index cb74e01..dab14ee 100644
--- a/v23/syncbase/nosql/model.go
+++ b/v23/syncbase/nosql/model.go
@@ -104,6 +104,15 @@
 	// GetSyncGroupNames returns the global names of all SyncGroups attached to
 	// this database.
 	GetSyncGroupNames(ctx *context.T) ([]string, error)
+
+	// This method compares the current schema version of the database with the
+	// schema version provided while creating this database handle. If the
+	// current database schema version is lower, then the SchemaUpdater is
+	// called. If SchemaUpdater is successful this method stores the new schema
+	// metadata in database.
+	// Note: schema can be nil, in which case this method skips schema check
+	// and the caller is responsible for maintaining schema sanity.
+	UpgradeIfOutdated(ctx *context.T) (bool, error)
 }
 
 // BatchDatabase is a handle to a set of reads and writes to the database that
@@ -342,3 +351,26 @@
 	// SyncGroup ACL.
 	GetMembers(ctx *context.T) (map[string]wire.SyncGroupMemberInfo, error)
 }
+
+// SchemaUpgrader interface must be implemented by the App in order to upgrade
+// the database schema from a lower version to a higher version.
+type SchemaUpgrader interface {
+	// Takes an instance of database and upgrades data from old
+	// schema to new schema. This method must be idempotent.
+	Run(db Database, oldVersion, newVersion int64) error
+}
+
+// Each database has a Schema associated with it which defines the current
+// version of the database. When a new version of app wishes to change
+// its data in a way that it is not compatible with the old app's data,
+// the app must change the schema version and provide relevant upgrade logic
+// in the Upgrader. The conflict resolution rules are also associated with the
+// schema version. Hence if the conflict resolution rules change then the schema
+// version also must be bumped.
+//
+// Schema provides metadata and a SchemaUpgrader for a given database.
+// SchemaUpgrader is purely local and not persisted.
+type Schema struct {
+	Metadata wire.SchemaMetadata
+	Upgrader SchemaUpgrader
+}
diff --git a/v23/syncbase/nosql/schema.go b/v23/syncbase/nosql/schema.go
new file mode 100644
index 0000000..9f52a1d
--- /dev/null
+++ b/v23/syncbase/nosql/schema.go
@@ -0,0 +1,45 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/v23/context"
+)
+
+// NewSchema creates a new Schema object.
+func NewSchema(metadata wire.SchemaMetadata, upgrader SchemaUpgrader) *Schema {
+	return &Schema{
+		Metadata: metadata,
+		Upgrader: upgrader,
+	}
+}
+
+//////////////////////////////////////////
+// Implementation of SchemaManager (Not part of public client API)
+
+type schemaManagerImpl struct {
+	dbName string
+	c      wire.DatabaseClientMethods
+}
+
+func newSchemaManager(dbName string) schemaManagerImpl {
+	return schemaManagerImpl{
+		dbName: dbName,
+		c:      wire.DatabaseClient(dbName),
+	}
+}
+
+// GetSchemaMetadata retrieves the schema metadata for the database it is
+// derived from.
+func (sm *schemaManagerImpl) getSchemaMetadata(ctx *context.T) (wire.SchemaMetadata, error) {
+	return sm.c.GetSchemaMetadata(ctx)
+}
+
+// SetSchemaMetadata stores the schema metadata for the database it is
+// derived from.
+func (sm *schemaManagerImpl) setSchemaMetadata(ctx *context.T, metadata wire.SchemaMetadata) error {
+	return sm.c.SetSchemaMetadata(ctx, metadata)
+}
diff --git a/v23/syncbase/nosql/schema_test.go b/v23/syncbase/nosql/schema_test.go
new file mode 100644
index 0000000..bcafcb7
--- /dev/null
+++ b/v23/syncbase/nosql/schema_test.go
@@ -0,0 +1,104 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql_test
+
+import (
+	"testing"
+
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/v23/syncbase"
+	tu "v.io/syncbase/v23/syncbase/testutil"
+	"v.io/v23/context"
+)
+
+// Tests schema checking logic within App.NoSQLDatabase() method.
+// This test as following steps:
+// 1) Call NoSQLDatabase() for a non existent db.
+// 2) Create the database, and verify if Schema got stored properly.
+// 3) Call UpgradeIfOutdated() to make sure that the method is no-op and is
+//    able to read the schema from db.
+// 4) Call NoSQLDatabase() on the same db to create a new handle with an
+//    upgraded schema, call UpgradeIfOutdated() and check if SchemaUpgrader
+//    is called and if the new schema is stored appropriately.
+func TestSchemaCheck(t *testing.T) {
+	ctx, sName, cleanup := tu.SetupOrDie(nil)
+	defer cleanup()
+	a := tu.CreateApp(t, ctx, syncbase.NewService(sName), "a")
+	schema := tu.DefaultSchema()
+	mockUpgrader := schema.Upgrader.(*tu.MockSchemaUpgrader)
+
+	db1 := a.NoSQLDatabase("db1", schema)
+
+	// Verify that calling Upgrade on a non existing database does not throw
+	// errors.
+	_, err := db1.UpgradeIfOutdated(ctx)
+	if err != nil {
+		t.Fatalf("db1.UpgradeIfOutdated() failed: %v", err)
+	}
+	if mockUpgrader.CallCount > 0 {
+		t.Fatal("Call to upgrader was not expected.")
+	}
+
+	// Create db1, this step also stores the schema provided above
+	if err := db1.Create(ctx, nil); err != nil {
+		t.Fatalf("db1.Create() failed: %v", err)
+	}
+	// verify if schema was stored as part of create
+	if _, err := getSchemaMetadata(ctx, db1.FullName()); err != nil {
+		t.Fatalf("Failed to lookup schema after create: %v", err)
+	}
+
+	// Make redundant call to Upgrade to verify that it is a no-op
+	result, err1 := db1.UpgradeIfOutdated(ctx)
+	if result {
+		t.Fatalf("db1.UpgradeIfOutdated() should not return true")
+	}
+	if err1 != nil {
+		t.Fatalf("db1.UpgradeIfOutdated() failed: %v", err1)
+	}
+	if mockUpgrader.CallCount > 0 {
+		t.Fatal("Call to upgrader was not expected.")
+	}
+
+	// try to make a new database object for the same database but this time
+	// with a new schema version
+	schema.Metadata.Version = 1
+	rule := wire.CrRule{"table1", "foo", "", wire.ResolverTypeLastWins}
+	policy := wire.CrPolicy{
+		Rules: []wire.CrRule{rule},
+	}
+	schema.Metadata.Policy = policy
+	otherdb1 := a.NoSQLDatabase("db1", schema)
+	otherresult, othererr := otherdb1.UpgradeIfOutdated(ctx)
+
+	if !otherresult {
+		t.Fatalf("otherdb1.UpgradeIfOutdated() expected to return true")
+	}
+	if othererr != nil {
+		t.Fatalf("otherdb1.UpgradeIfOutdated() failed: %v", othererr)
+	}
+	if mockUpgrader.CallCount != 1 {
+		t.Fatalf("Unexpected number of calls to upgrader. Expected: %d, Actual: %d.", 1, mockUpgrader.CallCount)
+	}
+
+	// check if the contents of SchemaMetadata are correctly stored in the db.
+	metadata, err3 := getSchemaMetadata(ctx, otherdb1.FullName())
+	if err3 != nil {
+		t.Fatalf("GetSchemaMetadata failed: %v", err3)
+	}
+	if metadata.Version != 1 {
+		t.Fatalf("Unexpected version number: %d", metadata.Version)
+	}
+	if len(metadata.Policy.Rules) != 1 {
+		t.Fatalf("Unexpected number of rules: %d", len(metadata.Policy.Rules))
+	}
+	if metadata.Policy.Rules[0] != rule {
+		t.Fatalf("Unexpected number of rules: %d", len(metadata.Policy.Rules))
+	}
+}
+
+func getSchemaMetadata(ctx *context.T, dbName string) (wire.SchemaMetadata, error) {
+	return wire.DatabaseClient(dbName).GetSchemaMetadata(ctx)
+}
diff --git a/v23/syncbase/nosql/syncgroup_test.go b/v23/syncbase/nosql/syncgroup_test.go
index 3b2da73..73e0550 100644
--- a/v23/syncbase/nosql/syncgroup_test.go
+++ b/v23/syncbase/nosql/syncgroup_test.go
@@ -101,7 +101,7 @@
 	// Create client2.
 	ctx2 := tu.NewCtx(ctx, rootp, "client2")
 	a2 := syncbase.NewService(sName).App("a")
-	d2 := a2.NoSQLDatabase("d")
+	d2 := a2.NoSQLDatabase("d", nil)
 
 	// Check that client2's join fails if the perms disallow access.
 	joinSyncGroup(t, ctx2, d2, sgNameA, verror.ErrNoAccess.ID)
diff --git a/v23/syncbase/nosql/syncgroup_v23_test.go b/v23/syncbase/nosql/syncgroup_v23_test.go
index 182141a..adb0058 100644
--- a/v23/syncbase/nosql/syncgroup_v23_test.go
+++ b/v23/syncbase/nosql/syncgroup_v23_test.go
@@ -280,7 +280,7 @@
 
 	a := syncbase.NewService(args[0]).App("a")
 	a.Create(ctx, nil)
-	d := a.NoSQLDatabase("d")
+	d := a.NoSQLDatabase("d", nil)
 	d.Create(ctx, nil)
 	d.CreateTable(ctx, "tb", nil)
 
@@ -292,7 +292,7 @@
 	defer shutdown()
 
 	a := syncbase.NewService(args[0]).App("a")
-	d := a.NoSQLDatabase("d")
+	d := a.NoSQLDatabase("d", nil)
 
 	mtName := env.Vars[ref.EnvNamespacePrefix]
 	spec := wire.SyncGroupSpec{
@@ -315,7 +315,7 @@
 	defer shutdown()
 
 	a := syncbase.NewService(args[0]).App("a")
-	d := a.NoSQLDatabase("d")
+	d := a.NoSQLDatabase("d", nil)
 
 	sg := d.SyncGroup(args[1])
 	info := wire.SyncGroupMemberInfo{10}
@@ -330,7 +330,7 @@
 	defer shutdown()
 
 	a := syncbase.NewService(args[0]).App("a")
-	d := a.NoSQLDatabase("d")
+	d := a.NoSQLDatabase("d", nil)
 
 	// Do Puts.
 	tb := d.Table("tb")
@@ -351,7 +351,7 @@
 	defer shutdown()
 
 	a := syncbase.NewService(args[0]).App("a")
-	d := a.NoSQLDatabase("d")
+	d := a.NoSQLDatabase("d", nil)
 
 	// Do Puts.
 	tb := d.Table("tb")
@@ -373,7 +373,7 @@
 	defer shutdown()
 
 	a := syncbase.NewService(args[0]).App("a")
-	d := a.NoSQLDatabase("d")
+	d := a.NoSQLDatabase("d", nil)
 
 	// Wait for a bit (up to 4 sec) until the last key appears.
 	tb := d.Table("tb")
@@ -433,7 +433,7 @@
 	defer shutdown()
 
 	a := syncbase.NewService(args[0]).App("a")
-	d := a.NoSQLDatabase("d")
+	d := a.NoSQLDatabase("d", nil)
 	tb := d.Table("tb")
 
 	// Verify through a scan that none of that data exists.
@@ -458,7 +458,7 @@
 	defer shutdown()
 
 	a := syncbase.NewService(args[0]).App("a")
-	d := a.NoSQLDatabase("d")
+	d := a.NoSQLDatabase("d", nil)
 	tb := d.Table("tb")
 
 	// Wait for a bit (up to 4 sec) until the last key appears.
@@ -504,7 +504,7 @@
 	defer shutdown()
 
 	a := syncbase.NewService(args[0]).App("a")
-	d := a.NoSQLDatabase("d")
+	d := a.NoSQLDatabase("d", nil)
 
 	// Wait for a bit (up to 8 sec) until the last key appears. This chosen
 	// time interval is dependent on how fast the membership view is
@@ -560,7 +560,7 @@
 
 		for j := 0; j < numDbs; j++ {
 			dbName := fmt.Sprintf("d%d", j)
-			d := a.NoSQLDatabase(dbName)
+			d := a.NoSQLDatabase(dbName, nil)
 			d.Create(ctx, nil)
 
 			for k := 0; k < numTbs; k++ {
@@ -594,7 +594,7 @@
 		// For each database...
 		for j := 0; j < numDbs; j++ {
 			dbName := fmt.Sprintf("d%d", j)
-			d := a.NoSQLDatabase(dbName)
+			d := a.NoSQLDatabase(dbName, nil)
 
 			// For each table, pre-populate entries on each prefix.
 			// Also determine the SyncGroup prefixes.
@@ -653,7 +653,7 @@
 
 		for j := 0; j < numDbs; j++ {
 			dbName := fmt.Sprintf("d%d", j)
-			d := a.NoSQLDatabase(dbName)
+			d := a.NoSQLDatabase(dbName, nil)
 
 			sgName := naming.Join(sgNamePrefix, appName, dbName)
 			sg := d.SyncGroup(sgName)
@@ -685,7 +685,7 @@
 
 		for j := 0; j < numDbs; j++ {
 			dbName := fmt.Sprintf("d%d", j)
-			d := a.NoSQLDatabase(dbName)
+			d := a.NoSQLDatabase(dbName, nil)
 
 			for k := 0; k < numTbs; k++ {
 				tbName := fmt.Sprintf("tb%d", k)
diff --git a/v23/syncbase/testutil/layer.go b/v23/syncbase/testutil/layer.go
index 372ca3c..cf2c4cb 100644
--- a/v23/syncbase/testutil/layer.go
+++ b/v23/syncbase/testutil/layer.go
@@ -348,7 +348,7 @@
 	return a.ListDatabases(ctx)
 }
 func (a *app) Child(childName string) layer {
-	return makeLayer(a.NoSQLDatabase(childName))
+	return makeLayer(a.NoSQLDatabase(childName, nil))
 }
 
 type database struct {
diff --git a/v23/syncbase/testutil/util.go b/v23/syncbase/testutil/util.go
index 9509c4c..f7c38b1 100644
--- a/v23/syncbase/testutil/util.go
+++ b/v23/syncbase/testutil/util.go
@@ -13,6 +13,7 @@
 	"runtime/debug"
 	"testing"
 
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase"
 	"v.io/syncbase/v23/syncbase/nosql"
 	"v.io/syncbase/v23/syncbase/util"
@@ -48,7 +49,7 @@
 }
 
 func CreateNoSQLDatabase(t *testing.T, ctx *context.T, a syncbase.App, name string) nosql.Database {
-	d := a.NoSQLDatabase(name)
+	d := a.NoSQLDatabase(name, nil)
 	if err := d.Create(ctx, nil); err != nil {
 		Fatalf(t, "d.Create() failed: %v", err)
 	}
@@ -180,6 +181,26 @@
 	}
 }
 
+type MockSchemaUpgrader struct {
+	CallCount int
+}
+
+func (msu *MockSchemaUpgrader) Run(db nosql.Database, oldVersion, newVersion int64) error {
+	msu.CallCount++
+	return nil
+}
+
+var _ nosql.SchemaUpgrader = (*MockSchemaUpgrader)(nil)
+
+func DefaultSchema() *nosql.Schema {
+	return &nosql.Schema{
+		Metadata: wire.SchemaMetadata{
+			Version: 0,
+		},
+		Upgrader: nosql.SchemaUpgrader(&MockSchemaUpgrader{}),
+	}
+}
+
 ////////////////////////////////////////
 // Internal helpers
 
diff --git a/x/ref/services/syncbase/server/app.go b/x/ref/services/syncbase/server/app.go
index 0c8958f..8862f57 100644
--- a/x/ref/services/syncbase/server/app.go
+++ b/x/ref/services/syncbase/server/app.go
@@ -9,6 +9,7 @@
 	"sync"
 
 	wire "v.io/syncbase/v23/services/syncbase"
+	nosqlwire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
@@ -132,7 +133,7 @@
 	return dbNames, nil
 }
 
-func (a *app) CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions) error {
+func (a *app) CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions, metadata *nosqlwire.SchemaMetadata) error {
 	if !a.exists {
 		vlog.Fatalf("app %q does not exist", a.name)
 	}
@@ -182,7 +183,7 @@
 	if perms == nil {
 		perms = aData.Perms
 	}
-	d, err := nosql.NewDatabase(ctx, a, dbName, nosql.DatabaseOptions{
+	d, err := nosql.NewDatabase(ctx, a, dbName, metadata, nosql.DatabaseOptions{
 		Perms:   perms,
 		RootDir: rootDir,
 		Engine:  engine,
diff --git a/x/ref/services/syncbase/server/interfaces/app.go b/x/ref/services/syncbase/server/interfaces/app.go
index b1d5d4f..e990b29 100644
--- a/x/ref/services/syncbase/server/interfaces/app.go
+++ b/x/ref/services/syncbase/server/interfaces/app.go
@@ -5,6 +5,7 @@
 package interfaces
 
 import (
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/v23/context"
 	"v.io/v23/rpc"
 	"v.io/v23/security/access"
@@ -22,7 +23,7 @@
 	NoSQLDatabaseNames(ctx *context.T, call rpc.ServerCall) ([]string, error)
 
 	// CreateNoSQLDatabase creates the specified NoSQL database.
-	CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions) error
+	CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions, metadata *wire.SchemaMetadata) error
 
 	// DeleteNoSQLDatabase deletes the specified NoSQL database.
 	DeleteNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string) error
diff --git a/x/ref/services/syncbase/server/nosql/database.go b/x/ref/services/syncbase/server/nosql/database.go
index 2da519e..bfe419a 100644
--- a/x/ref/services/syncbase/server/nosql/database.go
+++ b/x/ref/services/syncbase/server/nosql/database.go
@@ -99,7 +99,7 @@
 
 // NewDatabase creates a new database instance and returns it.
 // Designed for use from within App.CreateNoSQLDatabase.
-func NewDatabase(ctx *context.T, a interfaces.App, name string, opts DatabaseOptions) (*database, error) {
+func NewDatabase(ctx *context.T, a interfaces.App, name string, metadata *wire.SchemaMetadata, opts DatabaseOptions) (*database, error) {
 	if opts.Perms == nil {
 		return nil, verror.New(verror.ErrInternal, ctx, "perms must be specified")
 	}
@@ -108,8 +108,9 @@
 		return nil, err
 	}
 	data := &databaseData{
-		Name:  d.name,
-		Perms: opts.Perms,
+		Name:           d.name,
+		Perms:          opts.Perms,
+		SchemaMetadata: metadata,
 	}
 	if err := util.Put(ctx, d.st, d.stKey(), data); err != nil {
 		return nil, err
@@ -120,7 +121,7 @@
 ////////////////////////////////////////
 // RPC methods
 
-func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions) error {
+func (d *databaseReq) Create(ctx *context.T, call rpc.ServerCall, perms access.Permissions, metadata *wire.SchemaMetadata) error {
 	if d.exists {
 		return verror.New(verror.ErrExist, ctx, d.name)
 	}
@@ -130,7 +131,7 @@
 	// This database does not yet exist; d is just an ephemeral handle that holds
 	// {name string, a *app}. d.a.CreateNoSQLDatabase will create a new database
 	// handle and store it in d.a.dbs[d.name].
-	return d.a.CreateNoSQLDatabase(ctx, call, d.name, perms)
+	return d.a.CreateNoSQLDatabase(ctx, call, d.name, perms, metadata)
 }
 
 func (d *databaseReq) Delete(ctx *context.T, call rpc.ServerCall) error {
diff --git a/x/ref/services/syncbase/server/nosql/database_sm.go b/x/ref/services/syncbase/server/nosql/database_sm.go
new file mode 100644
index 0000000..5f69585
--- /dev/null
+++ b/x/ref/services/syncbase/server/nosql/database_sm.go
@@ -0,0 +1,53 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+package nosql
+
+import (
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
+	"v.io/syncbase/x/ref/services/syncbase/server/util"
+	"v.io/syncbase/x/ref/services/syncbase/store"
+	"v.io/v23/context"
+	"v.io/v23/rpc"
+	"v.io/v23/verror"
+)
+
+////////////////////////////////////////
+// SchemaManager RPC methods
+
+func (d *databaseReq) GetSchemaMetadata(ctx *context.T, call rpc.ServerCall) (wire.SchemaMetadata, error) {
+	metadata := wire.SchemaMetadata{}
+
+	if !d.exists {
+		return metadata, verror.New(verror.ErrNoExist, ctx, d.Name())
+	}
+
+	// Check permissions on Database and retreve schema metadata.
+	dbData := databaseData{}
+	if err := util.GetWithAuth(ctx, call, d.st, d.stKey(), &dbData); err != nil {
+		return metadata, err
+	}
+	if dbData.SchemaMetadata == nil {
+		return metadata, verror.New(verror.ErrNoExist, ctx, "Schema does not exist for the db")
+	}
+	return *dbData.SchemaMetadata, nil
+}
+
+func (d *databaseReq) SetSchemaMetadata(ctx *context.T, call rpc.ServerCall, metadata wire.SchemaMetadata) error {
+	// Check if database exists
+	if !d.exists {
+		return verror.New(verror.ErrNoExist, ctx, "database: "+d.Name())
+	}
+
+	// Check permissions on Database and store schema metadata.
+	return store.RunInTransaction(d.st, func(st store.StoreReadWriter) error {
+		dbData := databaseData{}
+		return util.UpdateWithAuth(ctx, call, st, d.stKey(), &dbData, func() error {
+			// NOTE: For now we expect the client to not issue multiple
+			// concurrent SetSchemaMetadata calls.
+			dbData.SchemaMetadata = &metadata
+			return nil
+		})
+	})
+}
diff --git a/x/ref/services/syncbase/server/nosql/types.vdl b/x/ref/services/syncbase/server/nosql/types.vdl
index 33d3883..8ede239 100644
--- a/x/ref/services/syncbase/server/nosql/types.vdl
+++ b/x/ref/services/syncbase/server/nosql/types.vdl
@@ -6,13 +6,15 @@
 
 import (
 	"v.io/v23/security/access"
+	"v.io/syncbase/v23/services/syncbase/nosql"
 )
 
 // databaseData represents the persistent state of a Database.
 type databaseData struct {
-	Name    string
-	Version uint64 // covers the fields below
-	Perms   access.Permissions
+	Name           string
+	Version        uint64 // covers the Perms field below
+	Perms          access.Permissions
+	SchemaMetadata ?nosql.SchemaMetadata
 }
 
 // tableData represents the persistent state of a Table.
diff --git a/x/ref/services/syncbase/server/nosql/types.vdl.go b/x/ref/services/syncbase/server/nosql/types.vdl.go
index 021cef6..bf5f346 100644
--- a/x/ref/services/syncbase/server/nosql/types.vdl.go
+++ b/x/ref/services/syncbase/server/nosql/types.vdl.go
@@ -12,14 +12,16 @@
 	"v.io/v23/vdl"
 
 	// VDL user imports
+	"v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/v23/security/access"
 )
 
 // databaseData represents the persistent state of a Database.
 type databaseData struct {
-	Name    string
-	Version uint64 // covers the fields below
-	Perms   access.Permissions
+	Name           string
+	Version        uint64 // covers the Perms field below
+	Perms          access.Permissions
+	SchemaMetadata *nosql.SchemaMetadata
 }
 
 func (databaseData) __VDLReflect(struct {
diff --git a/x/ref/services/syncbase/vsync/test_util.go b/x/ref/services/syncbase/vsync/test_util.go
index 556b2d7..9dd4e3e 100644
--- a/x/ref/services/syncbase/vsync/test_util.go
+++ b/x/ref/services/syncbase/vsync/test_util.go
@@ -12,6 +12,7 @@
 	"testing"
 	"time"
 
+	wire "v.io/syncbase/v23/services/syncbase/nosql"
 	"v.io/syncbase/x/ref/services/syncbase/server/interfaces"
 	"v.io/syncbase/x/ref/services/syncbase/server/util"
 	"v.io/syncbase/x/ref/services/syncbase/server/watchable"
@@ -62,7 +63,7 @@
 	return []string{"mockdb"}, nil
 }
 
-func (a *mockApp) CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions) error {
+func (a *mockApp) CreateNoSQLDatabase(ctx *context.T, call rpc.ServerCall, dbName string, perms access.Permissions, metadata *wire.SchemaMetadata) error {
 	return verror.NewErrNotImplemented(ctx)
 }
 
diff --git a/x/ref/syncbase/sb51/shell.go b/x/ref/syncbase/sb51/shell.go
index a41d245..b5f18c0 100644
--- a/x/ref/syncbase/sb51/shell.go
+++ b/x/ref/syncbase/sb51/shell.go
@@ -154,7 +154,7 @@
 			return nil, err
 		}
 	}
-	d := app.NoSQLDatabase(dbName)
+	d := app.NoSQLDatabase(dbName, nil)
 	if exists, err := d.Exists(ctx); err != nil {
 		return nil, fmt.Errorf("failed checking for db %q: %v", d.FullName(), err)
 	} else if !exists {