Add expected database schema version to RPCs.
- Add schema version to client code with minimum disruption to
  API surface.
- This cl is based on following cl
  https://vanadium-review.googlesource.com/#/c/13734/
- TODO to port definiton of Schema from go to js.

MultiPart: 2/2
Change-Id: I19ac4764307d65fa5811663b802a95ea94827986
diff --git a/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js b/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
index 1406445..1e5f77d 100644
--- a/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
+++ b/src/gen-vdl/v.io/syncbase/v23/services/syncbase/nosql/index.js
@@ -14,6 +14,7 @@
 
 var access = require('./../../../../../v23/security/access');
 var permissions = require('./../../../../../v23/services/permissions');
+var watch = require('./../../../../../v23/services/watch');
 
 module.exports = {};
 
@@ -27,11 +28,8 @@
 var _type5 = new vdl.Type();
 var _type6 = new vdl.Type();
 var _type7 = new vdl.Type();
-var _type8 = new vdl.Type();
 var _typeBatchOptions = new vdl.Type();
 var _typeBlobRef = new vdl.Type();
-var _typeChange = new vdl.Type();
-var _typeChangeType = new vdl.Type();
 var _typeCrPolicy = new vdl.Type();
 var _typeCrRule = new vdl.Type();
 var _typeFetchState = new vdl.Type();
@@ -39,51 +37,40 @@
 var _typeKeyValue = new vdl.Type();
 var _typePrefixPermissions = new vdl.Type();
 var _typeResolverType = new vdl.Type();
-var _typeResumeMarker = new vdl.Type();
 var _typeSchemaMetadata = new vdl.Type();
+var _typeStoreChange = new vdl.Type();
 var _typeSyncGroupMemberInfo = new vdl.Type();
 var _typeSyncGroupSpec = new vdl.Type();
-var _typeTablePrefixRange = new vdl.Type();
-var _typeWatchRequest = new vdl.Type();
 _type1.kind = vdl.kind.LIST;
 _type1.name = "";
-_type1.elem = _typeTablePrefixRange;
-_type2.kind = vdl.kind.LIST;
+_type1.elem = vdl.types.STRING;
+_type2.kind = vdl.kind.MAP;
 _type2.name = "";
-_type2.elem = vdl.types.BYTE;
+_type2.elem = _typeSyncGroupMemberInfo;
+_type2.key = vdl.types.STRING;
 _type3.kind = vdl.kind.LIST;
 _type3.name = "";
-_type3.elem = vdl.types.STRING;
-_type4.kind = vdl.kind.MAP;
+_type3.elem = vdl.types.BYTE;
+_type4.kind = vdl.kind.LIST;
 _type4.name = "";
-_type4.elem = _typeSyncGroupMemberInfo;
-_type4.key = vdl.types.STRING;
-_type5.kind = vdl.kind.LIST;
+_type4.elem = _typeCrRule;
+_type5.kind = vdl.kind.OPTIONAL;
 _type5.name = "";
-_type5.elem = _typeCrRule;
-_type6.kind = vdl.kind.OPTIONAL;
+_type5.elem = _typeSchemaMetadata;
+_type6.kind = vdl.kind.LIST;
 _type6.name = "";
-_type6.elem = _typeSchemaMetadata;
+_type6.elem = vdl.types.ANY;
 _type7.kind = vdl.kind.LIST;
 _type7.name = "";
-_type7.elem = vdl.types.ANY;
-_type8.kind = vdl.kind.LIST;
-_type8.name = "";
-_type8.elem = _typePrefixPermissions;
+_type7.elem = _typePrefixPermissions;
 _typeBatchOptions.kind = vdl.kind.STRUCT;
 _typeBatchOptions.name = "v.io/syncbase/v23/services/syncbase/nosql.BatchOptions";
 _typeBatchOptions.fields = [{name: "Hint", type: vdl.types.STRING}, {name: "ReadOnly", type: vdl.types.BOOL}];
 _typeBlobRef.kind = vdl.kind.STRING;
 _typeBlobRef.name = "v.io/syncbase/v23/services/syncbase/nosql.BlobRef";
-_typeChange.kind = vdl.kind.STRUCT;
-_typeChange.name = "v.io/syncbase/v23/services/syncbase/nosql.Change";
-_typeChange.fields = [{name: "Table", type: vdl.types.STRING}, {name: "Row", type: vdl.types.STRING}, {name: "ChangeType", type: _typeChangeType}, {name: "Value", type: _type2}, {name: "ResumeMarker", type: _typeResumeMarker}, {name: "FromSync", type: vdl.types.BOOL}, {name: "Continued", type: vdl.types.BOOL}];
-_typeChangeType.kind = vdl.kind.ENUM;
-_typeChangeType.name = "v.io/syncbase/v23/services/syncbase/nosql.ChangeType";
-_typeChangeType.labels = ["Put", "Delete"];
 _typeCrPolicy.kind = vdl.kind.STRUCT;
 _typeCrPolicy.name = "v.io/syncbase/v23/services/syncbase/nosql.CrPolicy";
-_typeCrPolicy.fields = [{name: "Rules", type: _type5}];
+_typeCrPolicy.fields = [{name: "Rules", type: _type4}];
 _typeCrRule.kind = vdl.kind.STRUCT;
 _typeCrRule.name = "v.io/syncbase/v23/services/syncbase/nosql.CrRule";
 _typeCrRule.fields = [{name: "TableName", type: vdl.types.STRING}, {name: "KeyPrefix", type: vdl.types.STRING}, {name: "Type", type: vdl.types.STRING}, {name: "Resolver", type: _typeResolverType}];
@@ -95,30 +82,25 @@
 _typeFetchStatus.fields = [{name: "State", type: _typeFetchState}, {name: "Received", type: vdl.types.UINT64}, {name: "Total", type: vdl.types.UINT64}];
 _typeKeyValue.kind = vdl.kind.STRUCT;
 _typeKeyValue.name = "v.io/syncbase/v23/services/syncbase/nosql.KeyValue";
-_typeKeyValue.fields = [{name: "Key", type: vdl.types.STRING}, {name: "Value", type: _type2}];
+_typeKeyValue.fields = [{name: "Key", type: vdl.types.STRING}, {name: "Value", type: _type3}];
 _typePrefixPermissions.kind = vdl.kind.STRUCT;
 _typePrefixPermissions.name = "v.io/syncbase/v23/services/syncbase/nosql.PrefixPermissions";
 _typePrefixPermissions.fields = [{name: "Prefix", type: vdl.types.STRING}, {name: "Perms", type: new access.Permissions()._type}];
 _typeResolverType.kind = vdl.kind.ENUM;
 _typeResolverType.name = "v.io/syncbase/v23/services/syncbase/nosql.ResolverType";
 _typeResolverType.labels = ["LastWins", "AppResolves", "Defer"];
-_typeResumeMarker.kind = vdl.kind.STRING;
-_typeResumeMarker.name = "v.io/syncbase/v23/services/syncbase/nosql.ResumeMarker";
 _typeSchemaMetadata.kind = vdl.kind.STRUCT;
 _typeSchemaMetadata.name = "v.io/syncbase/v23/services/syncbase/nosql.SchemaMetadata";
-_typeSchemaMetadata.fields = [{name: "Version", type: vdl.types.INT64}, {name: "Policy", type: _typeCrPolicy}];
+_typeSchemaMetadata.fields = [{name: "Version", type: vdl.types.INT32}, {name: "Policy", type: _typeCrPolicy}];
+_typeStoreChange.kind = vdl.kind.STRUCT;
+_typeStoreChange.name = "v.io/syncbase/v23/services/syncbase/nosql.StoreChange";
+_typeStoreChange.fields = [{name: "Value", type: _type3}, {name: "FromSync", type: vdl.types.BOOL}];
 _typeSyncGroupMemberInfo.kind = vdl.kind.STRUCT;
 _typeSyncGroupMemberInfo.name = "v.io/syncbase/v23/services/syncbase/nosql.SyncGroupMemberInfo";
 _typeSyncGroupMemberInfo.fields = [{name: "SyncPriority", type: vdl.types.BYTE}];
 _typeSyncGroupSpec.kind = vdl.kind.STRUCT;
 _typeSyncGroupSpec.name = "v.io/syncbase/v23/services/syncbase/nosql.SyncGroupSpec";
-_typeSyncGroupSpec.fields = [{name: "Description", type: vdl.types.STRING}, {name: "Perms", type: new access.Permissions()._type}, {name: "Prefixes", type: _type3}, {name: "MountTables", type: _type3}, {name: "IsPrivate", type: vdl.types.BOOL}];
-_typeTablePrefixRange.kind = vdl.kind.STRUCT;
-_typeTablePrefixRange.name = "v.io/syncbase/v23/services/syncbase/nosql.TablePrefixRange";
-_typeTablePrefixRange.fields = [{name: "Table", type: vdl.types.STRING}, {name: "Prefix", type: vdl.types.STRING}];
-_typeWatchRequest.kind = vdl.kind.STRUCT;
-_typeWatchRequest.name = "v.io/syncbase/v23/services/syncbase/nosql.WatchRequest";
-_typeWatchRequest.fields = [{name: "Ranges", type: _type1}, {name: "ResumeMarker", type: _typeResumeMarker}];
+_typeSyncGroupSpec.fields = [{name: "Description", type: vdl.types.STRING}, {name: "Perms", type: new access.Permissions()._type}, {name: "Prefixes", type: _type1}, {name: "MountTables", type: _type1}, {name: "IsPrivate", type: vdl.types.BOOL}];
 _type1.freeze();
 _type2.freeze();
 _type3.freeze();
@@ -126,11 +108,8 @@
 _type5.freeze();
 _type6.freeze();
 _type7.freeze();
-_type8.freeze();
 _typeBatchOptions.freeze();
 _typeBlobRef.freeze();
-_typeChange.freeze();
-_typeChangeType.freeze();
 _typeCrPolicy.freeze();
 _typeCrRule.freeze();
 _typeFetchState.freeze();
@@ -138,19 +117,12 @@
 _typeKeyValue.freeze();
 _typePrefixPermissions.freeze();
 _typeResolverType.freeze();
-_typeResumeMarker.freeze();
 _typeSchemaMetadata.freeze();
+_typeStoreChange.freeze();
 _typeSyncGroupMemberInfo.freeze();
 _typeSyncGroupSpec.freeze();
-_typeTablePrefixRange.freeze();
-_typeWatchRequest.freeze();
 module.exports.BatchOptions = (vdl.registry.lookupOrCreateConstructor(_typeBatchOptions));
 module.exports.BlobRef = (vdl.registry.lookupOrCreateConstructor(_typeBlobRef));
-module.exports.Change = (vdl.registry.lookupOrCreateConstructor(_typeChange));
-module.exports.ChangeType = {
-  PUT: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeChangeType))('Put', true), _typeChangeType),
-  DELETE: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeChangeType))('Delete', true), _typeChangeType),
-};
 module.exports.CrPolicy = (vdl.registry.lookupOrCreateConstructor(_typeCrPolicy));
 module.exports.CrRule = (vdl.registry.lookupOrCreateConstructor(_typeCrRule));
 module.exports.FetchState = {
@@ -167,12 +139,10 @@
   APP_RESOLVES: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeResolverType))('AppResolves', true), _typeResolverType),
   DEFER: canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(_typeResolverType))('Defer', true), _typeResolverType),
 };
-module.exports.ResumeMarker = (vdl.registry.lookupOrCreateConstructor(_typeResumeMarker));
 module.exports.SchemaMetadata = (vdl.registry.lookupOrCreateConstructor(_typeSchemaMetadata));
+module.exports.StoreChange = (vdl.registry.lookupOrCreateConstructor(_typeStoreChange));
 module.exports.SyncGroupMemberInfo = (vdl.registry.lookupOrCreateConstructor(_typeSyncGroupMemberInfo));
 module.exports.SyncGroupSpec = (vdl.registry.lookupOrCreateConstructor(_typeSyncGroupSpec));
-module.exports.TablePrefixRange = (vdl.registry.lookupOrCreateConstructor(_typeTablePrefixRange));
-module.exports.WatchRequest = (vdl.registry.lookupOrCreateConstructor(_typeWatchRequest));
 
 
 
@@ -203,6 +173,18 @@
 ]);
 
 
+module.exports.ConcurrentBatchError = makeError('v.io/syncbase/v23/services/syncbase/nosql.ConcurrentBatch', actions.NO_RETRY, {
+  'en': '{1:}{2:} concurrent batch',
+}, [
+]);
+
+
+module.exports.SchemaVersionMismatchError = makeError('v.io/syncbase/v23/services/syncbase/nosql.SchemaVersionMismatch', actions.NO_RETRY, {
+  'en': '{1:}{2:} actual schema version does not match the provided one',
+}, [
+]);
+
+
 
 
 // Services:
@@ -214,13 +196,13 @@
 
     
       
-DatabaseWatcher.prototype.watch = function(ctx, serverCall, req) {
-  throw new Error('Method Watch not implemented');
+DatabaseWatcher.prototype.getResumeMarker = function(ctx, serverCall) {
+  throw new Error('Method GetResumeMarker not implemented');
 };
     
       
-DatabaseWatcher.prototype.getResumeMarker = function(ctx, serverCall) {
-  throw new Error('Method GetResumeMarker not implemented');
+DatabaseWatcher.prototype.watchGlob = function(ctx, serverCall, req) {
+  throw new Error('Method WatchGlob not implemented');
 };
      
 
@@ -228,18 +210,39 @@
 DatabaseWatcher.prototype._serviceDescription = {
   name: 'DatabaseWatcher',
   pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
-  doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
-  embeds: [],
+  doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes.",
+  embeds: [{
+      name: 'GlobWatcher',
+      pkgPath: 'v.io/v23/services/watch',
+      doc: "// GlobWatcher allows a client to receive updates for changes to objects\n// that match a pattern.  See the package comments for details."
+    },
+    ],
   methods: [
     
       
     {
-    name: 'Watch',
-    doc: "// Watch returns a stream of changes for a given watch request. If this\n// Database is bound to a batch, Watch() will fail with ErrBoundToBatch.\n// If the provided ResumeMarker is invalid or outdated, Watch() will fail\n// with \"v.io/v23/services/watch\".ErrUnknownResumeMarker.",
+    name: 'GetResumeMarker',
+    doc: "// GetResumeMarker returns the ResumeMarker that points to the current end\n// of the event log. GetResumeMarker() can be called on a batch.",
+    inArgs: [],
+    outArgs: [{
+      name: '',
+      doc: "",
+      type: new watch.ResumeMarker()._type
+    },
+    ],
+    inStream: null,
+    outStream: null,
+    tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
+  },
+    
+      
+    {
+    name: 'WatchGlob',
+    doc: "// WatchGlob returns a stream of changes that match a pattern.",
     inArgs: [{
       name: 'req',
       doc: "",
-      type: _typeWatchRequest
+      type: new watch.GlobRequest()._type
     },
     ],
     outArgs: [],
@@ -247,25 +250,9 @@
     outStream: {
       name: '',
       doc: '',
-      type: _typeChange
+      type: new watch.Change()._type
     },
-    tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
-  },
-    
-      
-    {
-    name: 'GetResumeMarker',
-    doc: "// GetResumeMarker returns the ResumeMarker that points to the current end\n// of the current event log. GetResumeMarker() can be called on a batch.",
-    inArgs: [],
-    outArgs: [{
-      name: '',
-      doc: "",
-      type: _typeResumeMarker
-    },
-    ],
-    inStream: null,
-    outStream: null,
-    tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
+    tags: [canonicalize.reduce(new access.Tag("Resolve", true), new access.Tag()._type), ]
   },
      
   ]
@@ -339,7 +326,7 @@
     outArgs: [{
       name: '',
       doc: "",
-      type: _type3
+      type: _type1
     },
     ],
     inStream: null,
@@ -517,7 +504,7 @@
     outArgs: [{
       name: 'members',
       doc: "",
-      type: _type4
+      type: _type2
     },
     ],
     inStream: null,
@@ -623,7 +610,7 @@
     inStream: {
       name: '',
       doc: '',
-      type: _type2
+      type: _type3
     },
     outStream: null,
     tags: [canonicalize.reduce(new access.Tag("Write", true), new access.Tag()._type), ]
@@ -658,7 +645,7 @@
     outArgs: [{
       name: '',
       doc: "",
-      type: vdl.types.UINT64
+      type: vdl.types.INT64
     },
     ],
     inStream: null,
@@ -694,7 +681,7 @@
     {
       name: 'offset',
       doc: "",
-      type: vdl.types.UINT64
+      type: vdl.types.INT64
     },
     ],
     outArgs: [],
@@ -702,7 +689,7 @@
     outStream: {
       name: '',
       doc: '',
-      type: _type2
+      type: _type3
     },
     tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
   },
@@ -855,37 +842,37 @@
 
     
       
-Database.prototype.create = function(ctx, serverCall, perms, metadata) {
+Database.prototype.create = function(ctx, serverCall, metadata, perms) {
   throw new Error('Method Create not implemented');
 };
     
       
-Database.prototype.delete = function(ctx, serverCall) {
+Database.prototype.delete = function(ctx, serverCall, schemaVersion) {
   throw new Error('Method Delete not implemented');
 };
     
       
-Database.prototype.exists = function(ctx, serverCall) {
+Database.prototype.exists = function(ctx, serverCall, schemaVersion) {
   throw new Error('Method Exists not implemented');
 };
     
       
-Database.prototype.beginBatch = function(ctx, serverCall, bo) {
+Database.prototype.beginBatch = function(ctx, serverCall, schemaVersion, bo) {
   throw new Error('Method BeginBatch not implemented');
 };
     
       
-Database.prototype.commit = function(ctx, serverCall) {
+Database.prototype.commit = function(ctx, serverCall, schemaVersion) {
   throw new Error('Method Commit not implemented');
 };
     
       
-Database.prototype.exec = function(ctx, serverCall, query) {
+Database.prototype.exec = function(ctx, serverCall, schemaVersion, query) {
   throw new Error('Method Exec not implemented');
 };
     
       
-Database.prototype.abort = function(ctx, serverCall) {
+Database.prototype.abort = function(ctx, serverCall, schemaVersion) {
   throw new Error('Method Abort not implemented');
 };
     
@@ -900,13 +887,13 @@
 };
     
       
-Database.prototype.watch = function(ctx, serverCall, req) {
-  throw new Error('Method Watch not implemented');
+Database.prototype.getResumeMarker = function(ctx, serverCall) {
+  throw new Error('Method GetResumeMarker not implemented');
 };
     
       
-Database.prototype.getResumeMarker = function(ctx, serverCall) {
-  throw new Error('Method GetResumeMarker not implemented');
+Database.prototype.watchGlob = function(ctx, serverCall, req) {
+  throw new Error('Method WatchGlob not implemented');
 };
     
       
@@ -1019,7 +1006,7 @@
 Database.prototype._serviceDescription = {
   name: 'Database',
   pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
-  doc: "// Database represents a collection of Tables. Batches, queries, sync, watch,\n// etc. all operate at the Database level.\n// Database.Glob operates over Table names.\n//\n// TODO(sadovsky): Add Watch method.",
+  doc: "// Database represents a collection of Tables. Batches, queries, sync, watch,\n// etc. all operate at the Database level.\n// Database.Glob operates over Table names.\n// Param schemaVersion is the version number that the client expects the database\n// to be at. To disable schema version checking, pass -1.\n//\n// TODO(sadovsky): Add Watch method.",
   embeds: [{
       name: 'Object',
       pkgPath: 'v.io/v23/services/permissions',
@@ -1028,7 +1015,7 @@
     {
       name: 'DatabaseWatcher',
       pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
-      doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log and a set of (table, row prefix) pairs. Updates with rows not\n// covered by the set are excluded from the result stream. The result stream\n// consists of a never-ending sequence of Change messages (until the call fails\n// or is canceled). Each Change message contains an optional continued bit.\n// A sub-sequence of Change messages with continued=true followed by a Change\n// message with continued=false forms a batch. If the client has no access to\n// a row specified in a change, that change is excluded from the result stream.\n//\n// See \"v.io/v23/services/watch\".GlobWatcher for more detailed explanation of\n// the general behavior.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."
+      doc: "// Watch allows a client to watch for updates in the database. For each watched\n// request, the client will receive a reliable stream of watch events without\n// re-ordering. See watch.GlobWatcher for a detailed explanation of the\n// behavior.\n//\n// The watching is done by starting a streaming RPC. The argument to the RPC\n// contains the ResumeMarker that points to a particular place in the database\n// event log. The result stream consists of a never-ending sequence of Change\n// messages (until the call fails or is canceled). Each Change contains the\n// Name field in the form \"<tableName>/<rowKey>\" and the Value field of the\n// StoreChange type. If the client has no access to a row specified in a change,\n// that change is excluded from the result stream.\n//\n// The DatabaseWatcher is designed to be used in the following way:\n// 1) begin a read-only batch\n// 2) read all information your app needs\n// 3) read the ResumeMarker\n// 4) abort the batch\n// 5) start watching changes to the data using the ResumeMarker\n// In this configuration the client doesn't miss any changes."
     },
     {
       name: 'SyncGroupManager',
@@ -1053,15 +1040,15 @@
     name: 'Create',
     doc: "// Create creates this Database.\n// If perms is nil, we inherit (copy) the App perms.\n// Create requires the caller to have Write permission at the App.",
     inArgs: [{
+      name: 'metadata',
+      doc: "",
+      type: _type5
+    },
+    {
       name: 'perms',
       doc: "",
       type: new access.Permissions()._type
     },
-    {
-      name: 'metadata',
-      doc: "",
-      type: _type6
-    },
     ],
     outArgs: [],
     inStream: null,
@@ -1073,7 +1060,12 @@
     {
     name: 'Delete',
     doc: "// Delete deletes this Database.",
-    inArgs: [],
+    inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    ],
     outArgs: [],
     inStream: null,
     outStream: null,
@@ -1084,7 +1076,12 @@
     {
     name: 'Exists',
     doc: "// Exists returns true only if this Database exists. Insufficient permissions\n// cause Exists to return false instead of an error.\n// TODO(ivanpi): Exists may fail with an error if higher levels of hierarchy\n// do not exist.",
-    inArgs: [],
+    inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    ],
     outArgs: [{
       name: '',
       doc: "",
@@ -1101,6 +1098,11 @@
     name: 'BeginBatch',
     doc: "// BeginBatch creates a new batch. It returns an App-relative name for a\n// Database handle bound to this batch. If this Database is already bound to a\n// batch, BeginBatch() will fail with ErrBoundToBatch. Concurrency semantics\n// are documented in model.go.\n// TODO(sadovsky): make BatchOptions optional",
     inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    {
       name: 'bo',
       doc: "",
       type: _typeBatchOptions
@@ -1121,7 +1123,12 @@
     {
     name: 'Commit',
     doc: "// Commit persists the pending changes to the database.\n// If this Database is not bound to a batch, Commit() will fail with\n// ErrNotBoundToBatch.",
-    inArgs: [],
+    inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    ],
     outArgs: [],
     inStream: null,
     outStream: null,
@@ -1133,6 +1140,11 @@
     name: 'Exec',
     doc: "// Exec executes a syncQL query and returns all results as specified by in the\n// query's select clause. Concurrency semantics are documented in model.go.",
     inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    {
       name: 'query',
       doc: "",
       type: vdl.types.STRING
@@ -1143,7 +1155,7 @@
     outStream: {
       name: '',
       doc: '',
-      type: _type7
+      type: _type6
     },
     tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
   },
@@ -1152,7 +1164,12 @@
     {
     name: 'Abort',
     doc: "// Abort notifies the server that any pending changes can be discarded.\n// It is not strictly required, but it may allow the server to release locks\n// or other resources sooner than if it was not called.\n// If this Database is not bound to a batch, Abort() will fail with\n// ErrNotBoundToBatch.",
-    inArgs: [],
+    inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    ],
     outArgs: [],
     inStream: null,
     outStream: null,
@@ -1203,12 +1220,28 @@
     
       
     {
-    name: 'Watch',
-    doc: "// Watch returns a stream of changes for a given watch request. If this\n// Database is bound to a batch, Watch() will fail with ErrBoundToBatch.\n// If the provided ResumeMarker is invalid or outdated, Watch() will fail\n// with \"v.io/v23/services/watch\".ErrUnknownResumeMarker.",
+    name: 'GetResumeMarker',
+    doc: "// GetResumeMarker returns the ResumeMarker that points to the current end\n// of the event log. GetResumeMarker() can be called on a batch.",
+    inArgs: [],
+    outArgs: [{
+      name: '',
+      doc: "",
+      type: new watch.ResumeMarker()._type
+    },
+    ],
+    inStream: null,
+    outStream: null,
+    tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
+  },
+    
+      
+    {
+    name: 'WatchGlob',
+    doc: "// WatchGlob returns a stream of changes that match a pattern.",
     inArgs: [{
       name: 'req',
       doc: "",
-      type: _typeWatchRequest
+      type: new watch.GlobRequest()._type
     },
     ],
     outArgs: [],
@@ -1216,25 +1249,9 @@
     outStream: {
       name: '',
       doc: '',
-      type: _typeChange
+      type: new watch.Change()._type
     },
-    tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
-  },
-    
-      
-    {
-    name: 'GetResumeMarker',
-    doc: "// GetResumeMarker returns the ResumeMarker that points to the current end\n// of the current event log. GetResumeMarker() can be called on a batch.",
-    inArgs: [],
-    outArgs: [{
-      name: '',
-      doc: "",
-      type: _typeResumeMarker
-    },
-    ],
-    inStream: null,
-    outStream: null,
-    tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
+    tags: [canonicalize.reduce(new access.Tag("Resolve", true), new access.Tag()._type), ]
   },
     
       
@@ -1245,7 +1262,7 @@
     outArgs: [{
       name: '',
       doc: "",
-      type: _type3
+      type: _type1
     },
     ],
     inStream: null,
@@ -1423,7 +1440,7 @@
     outArgs: [{
       name: 'members',
       doc: "",
-      type: _type4
+      type: _type2
     },
     ],
     inStream: null,
@@ -1461,7 +1478,7 @@
     inStream: {
       name: '',
       doc: '',
-      type: _type2
+      type: _type3
     },
     outStream: null,
     tags: [canonicalize.reduce(new access.Tag("Write", true), new access.Tag()._type), ]
@@ -1496,7 +1513,7 @@
     outArgs: [{
       name: '',
       doc: "",
-      type: vdl.types.UINT64
+      type: vdl.types.INT64
     },
     ],
     inStream: null,
@@ -1532,7 +1549,7 @@
     {
       name: 'offset',
       doc: "",
-      type: vdl.types.UINT64
+      type: vdl.types.INT64
     },
     ],
     outArgs: [],
@@ -1540,7 +1557,7 @@
     outStream: {
       name: '',
       doc: '',
-      type: _type2
+      type: _type3
     },
     tags: [canonicalize.reduce(new access.Tag("Read", true), new access.Tag()._type), ]
   },
@@ -1665,42 +1682,42 @@
 
     
       
-Table.prototype.create = function(ctx, serverCall, perms) {
+Table.prototype.create = function(ctx, serverCall, schemaVersion, perms) {
   throw new Error('Method Create not implemented');
 };
     
       
-Table.prototype.delete = function(ctx, serverCall) {
+Table.prototype.delete = function(ctx, serverCall, schemaVersion) {
   throw new Error('Method Delete not implemented');
 };
     
       
-Table.prototype.exists = function(ctx, serverCall) {
+Table.prototype.exists = function(ctx, serverCall, schemaVersion) {
   throw new Error('Method Exists not implemented');
 };
     
       
-Table.prototype.deleteRowRange = function(ctx, serverCall, start, limit) {
+Table.prototype.deleteRowRange = function(ctx, serverCall, schemaVersion, start, limit) {
   throw new Error('Method DeleteRowRange not implemented');
 };
     
       
-Table.prototype.scan = function(ctx, serverCall, start, limit) {
+Table.prototype.scan = function(ctx, serverCall, schemaVersion, start, limit) {
   throw new Error('Method Scan not implemented');
 };
     
       
-Table.prototype.getPermissions = function(ctx, serverCall, key) {
+Table.prototype.getPermissions = function(ctx, serverCall, schemaVersion, key) {
   throw new Error('Method GetPermissions not implemented');
 };
     
       
-Table.prototype.setPermissions = function(ctx, serverCall, prefix, perms) {
+Table.prototype.setPermissions = function(ctx, serverCall, schemaVersion, prefix, perms) {
   throw new Error('Method SetPermissions not implemented');
 };
     
       
-Table.prototype.deletePermissions = function(ctx, serverCall, prefix) {
+Table.prototype.deletePermissions = function(ctx, serverCall, schemaVersion, prefix) {
   throw new Error('Method DeletePermissions not implemented');
 };
      
@@ -1709,7 +1726,7 @@
 Table.prototype._serviceDescription = {
   name: 'Table',
   pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
-  doc: "// Table represents a collection of Rows.\n// Table.Glob operates over the primary keys of Rows in the Table.",
+  doc: "// Table represents a collection of Rows.\n// Table.Glob operates over the primary keys of Rows in the Table.\n// SchemaVersion is the version number that the client expects the database\n// to be at. To disable schema version checking, pass -1.",
   embeds: [],
   methods: [
     
@@ -1718,6 +1735,11 @@
     name: 'Create',
     doc: "// Create creates this Table.\n// If perms is nil, we inherit (copy) the Database perms.",
     inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    {
       name: 'perms',
       doc: "",
       type: new access.Permissions()._type
@@ -1733,7 +1755,12 @@
     {
     name: 'Delete',
     doc: "// Delete deletes this Table.",
-    inArgs: [],
+    inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    ],
     outArgs: [],
     inStream: null,
     outStream: null,
@@ -1744,7 +1771,12 @@
     {
     name: 'Exists',
     doc: "// Exists returns true only if this Table exists. Insufficient permissions\n// cause Exists to return false instead of an error.\n// TODO(ivanpi): Exists may fail with an error if higher levels of hierarchy\n// do not exist.",
-    inArgs: [],
+    inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    ],
     outArgs: [{
       name: '',
       doc: "",
@@ -1761,14 +1793,19 @@
     name: 'DeleteRowRange',
     doc: "// Delete deletes all rows in the given half-open range [start, limit). If\n// limit is \"\", all rows with keys >= start are included.",
     inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    {
       name: 'start',
       doc: "",
-      type: _type2
+      type: _type3
     },
     {
       name: 'limit',
       doc: "",
-      type: _type2
+      type: _type3
     },
     ],
     outArgs: [],
@@ -1782,14 +1819,19 @@
     name: 'Scan',
     doc: "// Scan returns all rows in the given half-open range [start, limit). If limit\n// is \"\", all rows with keys >= start are included. Concurrency semantics are\n// documented in model.go.",
     inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    {
       name: 'start',
       doc: "",
-      type: _type2
+      type: _type3
     },
     {
       name: 'limit',
       doc: "",
-      type: _type2
+      type: _type3
     },
     ],
     outArgs: [],
@@ -1807,6 +1849,11 @@
     name: 'GetPermissions',
     doc: "// GetPermissions returns an array of (prefix, perms) pairs. The array is\n// sorted from longest prefix to shortest, so element zero is the one that\n// applies to the row with the given key. The last element is always the\n// prefix \"\" which represents the table's permissions -- the array will always\n// have at least one element.",
     inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    {
       name: 'key',
       doc: "",
       type: vdl.types.STRING
@@ -1815,7 +1862,7 @@
     outArgs: [{
       name: '',
       doc: "",
-      type: _type8
+      type: _type7
     },
     ],
     inStream: null,
@@ -1828,6 +1875,11 @@
     name: 'SetPermissions',
     doc: "// SetPermissions sets the permissions for all current and future rows with\n// the given prefix. If the prefix overlaps with an existing prefix, the\n// longest prefix that matches a row applies. For example:\n//     SetPermissions(ctx, Prefix(\"a/b\"), perms1)\n//     SetPermissions(ctx, Prefix(\"a/b/c\"), perms2)\n// The permissions for row \"a/b/1\" are perms1, and the permissions for row\n// \"a/b/c/1\" are perms2.",
     inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    {
       name: 'prefix',
       doc: "",
       type: vdl.types.STRING
@@ -1849,6 +1901,11 @@
     name: 'DeletePermissions',
     doc: "// DeletePermissions deletes the permissions for the specified prefix. Any\n// rows covered by this prefix will use the next longest prefix's permissions\n// (see the array returned by GetPermissions).",
     inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    {
       name: 'prefix',
       doc: "",
       type: vdl.types.STRING
@@ -1870,22 +1927,22 @@
 
     
       
-Row.prototype.exists = function(ctx, serverCall) {
+Row.prototype.exists = function(ctx, serverCall, schemaVersion) {
   throw new Error('Method Exists not implemented');
 };
     
       
-Row.prototype.get = function(ctx, serverCall) {
+Row.prototype.get = function(ctx, serverCall, schemaVersion) {
   throw new Error('Method Get not implemented');
 };
     
       
-Row.prototype.put = function(ctx, serverCall, value) {
+Row.prototype.put = function(ctx, serverCall, schemaVersion, value) {
   throw new Error('Method Put not implemented');
 };
     
       
-Row.prototype.delete = function(ctx, serverCall) {
+Row.prototype.delete = function(ctx, serverCall, schemaVersion) {
   throw new Error('Method Delete not implemented');
 };
      
@@ -1894,7 +1951,7 @@
 Row.prototype._serviceDescription = {
   name: 'Row',
   pkgPath: 'v.io/syncbase/v23/services/syncbase/nosql',
-  doc: "// Row represents a single row in a Table.\n// All access checks are performed against the most specific matching prefix\n// permissions in the Table.\n// NOTE(sadovsky): Currently we send []byte values over the wire for Get, Put,\n// and Scan. If there's a way to avoid encoding/decoding on the server side, we\n// can use vdl.Value everywhere without sacrificing performance.",
+  doc: "// Row represents a single row in a Table.\n// All access checks are performed against the most specific matching prefix\n// permissions in the Table.\n// SchemaVersion is the version number that the client expects the database\n// to be at. To disable schema version checking, pass -1.\n// NOTE(sadovsky): Currently we send []byte values over the wire for Get, Put,\n// and Scan. If there's a way to avoid encoding/decoding on the server side, we\n// can use vdl.Value everywhere without sacrificing performance.",
   embeds: [],
   methods: [
     
@@ -1902,7 +1959,12 @@
     {
     name: 'Exists',
     doc: "// Exists returns true only if this Row exists. Insufficient permissions\n// cause Exists to return false instead of an error.\n// TODO(ivanpi): Exists may fail with an error if higher levels of hierarchy\n// do not exist.",
-    inArgs: [],
+    inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    ],
     outArgs: [{
       name: '',
       doc: "",
@@ -1918,11 +1980,16 @@
     {
     name: 'Get',
     doc: "// Get returns the value for this Row.",
-    inArgs: [],
+    inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    ],
     outArgs: [{
       name: '',
       doc: "",
-      type: _type2
+      type: _type3
     },
     ],
     inStream: null,
@@ -1935,9 +2002,14 @@
     name: 'Put',
     doc: "// Put writes the given value for this Row.",
     inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    {
       name: 'value',
       doc: "",
-      type: _type2
+      type: _type3
     },
     ],
     outArgs: [],
@@ -1950,7 +2022,12 @@
     {
     name: 'Delete',
     doc: "// Delete deletes this Row.",
-    inArgs: [],
+    inArgs: [{
+      name: 'schemaVersion',
+      doc: "",
+      type: vdl.types.INT32
+    },
+    ],
     outArgs: [],
     inStream: null,
     outStream: null,
diff --git a/src/gen-vdl/v.io/v23/security/index.js b/src/gen-vdl/v.io/v23/security/index.js
index 4142766..5221584 100644
--- a/src/gen-vdl/v.io/v23/security/index.js
+++ b/src/gen-vdl/v.io/v23/security/index.js
@@ -279,6 +279,12 @@
 
   module.exports.SignatureForDischarge = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("D", true), vdl.types.STRING);
 
+  module.exports.SignatureForMessageSigningV1 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("S1", true), vdl.types.STRING);
+
+  module.exports.SignatureForBlessingCertificatesV1 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("B1", true), vdl.types.STRING);
+
+  module.exports.SignatureForDischargeV1 = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.STRING))("D1", true), vdl.types.STRING);
+
 
 
 // Errors:
@@ -369,6 +375,13 @@
 ]);
 
 
+module.exports.InvalidSigningBlessingCaveatError = makeError('v.io/v23/security.InvalidSigningBlessingCaveat', actions.NO_RETRY, {
+  'en': '{1:}{2:} blessing has caveat with UUID {3} which makes it unsuitable for signing -- please use blessings with just Expiry caveats',
+}, [
+  new uniqueid.Id()._type,
+]);
+
+
 
 
 // Services:
diff --git a/src/gen-vdl/v.io/v23/services/watch/index.js b/src/gen-vdl/v.io/v23/services/watch/index.js
new file mode 100644
index 0000000..426a80c
--- /dev/null
+++ b/src/gen-vdl/v.io/v23/services/watch/index.js
@@ -0,0 +1,112 @@
+// Copyright 2015 The Vanadium Authors. All rights reserved.
+// Use of this source code is governed by a BSD-style
+// license that can be found in the LICENSE file.
+
+// This file was auto-generated by the vanadium vdl tool.
+var vdl = require('vanadium').vdl;
+var makeError = require('vanadium').verror.makeError;
+var actions = require('vanadium').verror.actions;
+var canonicalize = require('vanadium').vdl.canonicalize;
+
+
+
+
+
+var access = require('./../../security/access');
+
+module.exports = {};
+
+
+
+// Types:
+var _typeChange = new vdl.Type();
+var _typeGlobRequest = new vdl.Type();
+var _typeResumeMarker = new vdl.Type();
+_typeChange.kind = vdl.kind.STRUCT;
+_typeChange.name = "v.io/v23/services/watch.Change";
+_typeChange.fields = [{name: "Name", type: vdl.types.STRING}, {name: "State", type: vdl.types.INT32}, {name: "Value", type: vdl.types.ANY}, {name: "ResumeMarker", type: _typeResumeMarker}, {name: "Continued", type: vdl.types.BOOL}];
+_typeGlobRequest.kind = vdl.kind.STRUCT;
+_typeGlobRequest.name = "v.io/v23/services/watch.GlobRequest";
+_typeGlobRequest.fields = [{name: "Pattern", type: vdl.types.STRING}, {name: "ResumeMarker", type: _typeResumeMarker}];
+_typeResumeMarker.kind = vdl.kind.LIST;
+_typeResumeMarker.name = "v.io/v23/services/watch.ResumeMarker";
+_typeResumeMarker.elem = vdl.types.BYTE;
+_typeChange.freeze();
+_typeGlobRequest.freeze();
+_typeResumeMarker.freeze();
+module.exports.Change = (vdl.registry.lookupOrCreateConstructor(_typeChange));
+module.exports.GlobRequest = (vdl.registry.lookupOrCreateConstructor(_typeGlobRequest));
+module.exports.ResumeMarker = (vdl.registry.lookupOrCreateConstructor(_typeResumeMarker));
+
+
+
+
+// Consts:
+
+  module.exports.Exists = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.INT32))(0, true), vdl.types.INT32);
+
+  module.exports.DoesNotExist = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.INT32))(1, true), vdl.types.INT32);
+
+  module.exports.InitialStateSkipped = canonicalize.reduce(new (vdl.registry.lookupOrCreateConstructor(vdl.types.INT32))(2, true), vdl.types.INT32);
+
+
+
+// Errors:
+
+module.exports.UnknownResumeMarkerError = makeError('v.io/v23/services/watch.UnknownResumeMarker', actions.NO_RETRY, {
+  'en': '{1:}{2:} unknown resume marker {_}',
+}, [
+]);
+
+
+
+
+// Services:
+
+  
+    
+function GlobWatcher(){}
+module.exports.GlobWatcher = GlobWatcher;
+
+    
+      
+GlobWatcher.prototype.watchGlob = function(ctx, serverCall, req) {
+  throw new Error('Method WatchGlob not implemented');
+};
+     
+
+    
+GlobWatcher.prototype._serviceDescription = {
+  name: 'GlobWatcher',
+  pkgPath: 'v.io/v23/services/watch',
+  doc: "// GlobWatcher allows a client to receive updates for changes to objects\n// that match a pattern.  See the package comments for details.",
+  embeds: [],
+  methods: [
+    
+      
+    {
+    name: 'WatchGlob',
+    doc: "// WatchGlob returns a stream of changes that match a pattern.",
+    inArgs: [{
+      name: 'req',
+      doc: "",
+      type: _typeGlobRequest
+    },
+    ],
+    outArgs: [],
+    inStream: null,
+    outStream: {
+      name: '',
+      doc: '',
+      type: _typeChange
+    },
+    tags: [canonicalize.reduce(new access.Tag("Resolve", true), new access.Tag()._type), ]
+  },
+     
+  ]
+};
+
+   
+ 
+
+
diff --git a/src/nosql/batch-database.js b/src/nosql/batch-database.js
index 7fac2d6..2329dde 100644
--- a/src/nosql/batch-database.js
+++ b/src/nosql/batch-database.js
@@ -14,12 +14,14 @@
  * @constructor
  * @inner
  * @param {module:syncbase.database.Database} db Database.
+ * @param {number} schemaVersion Database schema version expected by client.
  */
-function BatchDatabase(db) {
+function BatchDatabase(db, schemaVersion) {
   if (!(this instanceof BatchDatabase)) {
-    return new BatchDatabase(db);
+    return new BatchDatabase(db, schemaVersion);
   }
 
+  this.schemaVersion = schemaVersion;
   Object.defineProperty(this, '_db', {
     enumerable: false,
     value: db,
@@ -51,7 +53,7 @@
  * @param {function} cb Callback.
  */
 BatchDatabase.prototype.commit = function(ctx, cb) {
-  this._db._wire(ctx).commit(ctx, cb);
+  this._db._wire(ctx).commit(ctx, this.schemaVersion, cb);
 };
 
 /**
@@ -62,7 +64,7 @@
  * @param {function} cb Callback.
  */
 BatchDatabase.prototype.abort = function(ctx, cb) {
-  this._db._wire(ctx).abort(ctx, cb);
+  this._db._wire(ctx).abort(ctx, this.schemaVersion, cb);
 };
 
 /**
diff --git a/src/nosql/batch.js b/src/nosql/batch.js
index cd5fa36..157a21d 100644
--- a/src/nosql/batch.js
+++ b/src/nosql/batch.js
@@ -8,7 +8,7 @@
  * @summary
  * runInBatch runs a function with a newly created batch. If the function
  * errors, the batch is aborted. If the function succeeds, the batch is
- * committed. If an error occurs during commit, then the batch is aborted.
+ * committed.
  *
  * @param {module:vanadium.context.Context} ctx Vanadium context.
  * @param {module:syncbase.database.Database} db Database.
@@ -17,41 +17,39 @@
  * batch.
  * @param {module:vanadium~voidCb} cb Callback that will be called after the
  * batch has been committed or aborted.
- *
- * TODO(nlacasse): Add retry loop.
  */
 function runInBatch(ctx, db, opts, fn, cb) {
-  db.beginBatch(ctx, opts, function(err, batchDb) {
-    if (err) {
-      return cb(err);
-    }
-
-    function onError(err) {
-      batchDb.abort(ctx, function() {
-        cb(err);
-      });
-    }
-
-    function onSuccess() {
-      // TODO(nlacasse): Commit() can fail for a number of reasons, e.g. RPC
-      // failure or ErrConcurrentTransaction. Depending on the cause of
-      // failure, it may be desirable to retry the Commit() and/or to call
-      // Abort(). For now, we always abort on a failed commit.
-      batchDb.commit(ctx, function(commitErr) {
-        if (commitErr) {
-          return onError(commitErr);
-        }
-        return cb(null);
-      });
-    }
-
-    fn(batchDb, function(err) {
+  function attempt(cb) {
+    db.beginBatch(ctx, opts, function(err, batchDb) {
       if (err) {
-        return onError(err);
+        return cb(err);
       }
-      onSuccess();
+      fn(batchDb, function(err) {
+        if (err) {
+          return batchDb.abort(ctx, function() {
+            return cb(err);  // return fn error, not abort error
+          });
+        }
+        // TODO(sadovsky): commit() can fail for a number of reasons, e.g. RPC
+        // failure or ErrConcurrentTransaction. Depending on the cause of
+        // failure, it may be desirable to retry the commit() and/or to call
+        // abort().
+        batchDb.commit(ctx, cb);
+      });
     });
-  });
+  }
+
+  function retryLoop(i) {
+    attempt(function(err) {
+      if (err && i < 2) {
+        retryLoop(i + 1);
+      } else {
+        cb(err);
+      }
+    });
+  }
+
+  retryLoop(0);
 }
 
 /**
diff --git a/src/nosql/database.js b/src/nosql/database.js
index 41f5384..4575ac1 100644
--- a/src/nosql/database.js
+++ b/src/nosql/database.js
@@ -24,14 +24,18 @@
  * Database.
  * @param {string} relativeName Relative name of this Database.  Must not
  * contain slashes.
+ * @param {module:syncbase.schema.Schema} schema Schema for the database.
+ * TODO(nlacasse): port definition of Schema from go to javascript
  */
-function Database(parentFullName, relativeName) {
+function Database(parentFullName, relativeName, schema) {
   if (!(this instanceof Database)) {
     return new Database(parentFullName, relativeName);
   }
 
   util.addNameProperties(this, parentFullName, relativeName);
 
+  this.schema = null;  // TODO(nlacasse): use schema from params
+  this.schemaVersion = -1;  // TODO(nlacasse): derive this from schema
   /**
    * Caches the database wire object.
    * @private
@@ -68,7 +72,8 @@
  *    https://vanadium-review.googlesource.com/#/c/13040/ .
  */
 Database.prototype.create = function(ctx, perms, cb) {
-  this._wire(ctx).create(ctx, perms, null, cb);
+  //TODO(nlacasse): pass schema.metadata below instead of null
+  this._wire(ctx).create(ctx, null, perms, cb);
 };
 
 /**
@@ -77,7 +82,7 @@
  * @param {function} cb Callback.
  */
 Database.prototype.delete = function(ctx, cb) {
-  this._wire(ctx).delete(ctx, cb);
+  this._wire(ctx).delete(ctx, this.schemaVersion, cb);
 };
 
 /**
@@ -89,7 +94,7 @@
  * @param {function} cb Callback.
  */
 Database.prototype.exists = function(ctx, cb) {
-  this._wire(ctx).exists(ctx, cb);
+  this._wire(ctx).exists(ctx, this.schemaVersion, cb);
 };
 
 /**
@@ -130,7 +135,7 @@
     return cb(null, res.map(unwrap));
   });
 
-  var stream = this._wire(ctx).exec(ctx, query, cb).stream;
+  var stream = this._wire(ctx).exec(ctx, this.schemaVersion, query, cb).stream;
 
   var decodedStream = stream.pipe(streamUnwrapper);
   stream.on('error', function(err) {
@@ -146,7 +151,7 @@
  * @return {module:syncbase.table.Table} Table object.
  */
 Database.prototype.table = function(relativeName) {
-  return new Table(this.fullName, relativeName);
+  return new Table(this.fullName, relativeName, this.schemaVersion);
 };
 
 /**
@@ -194,7 +199,7 @@
  * @param {function} cb Callback.
  */
 Database.prototype.createTable = function(ctx, relativeName, perms, cb) {
-  this._tableWire(ctx, relativeName).create(ctx, perms, cb);
+  this._tableWire(ctx, relativeName).create(ctx, this.schemaVersion, perms, cb);
 };
 
 /**
@@ -205,7 +210,7 @@
  * @param {function} cb Callback.
  */
 Database.prototype.deleteTable = function(ctx, relativeName, cb) {
-  this._tableWire(ctx, relativeName).delete(ctx, cb);
+  this._tableWire(ctx, relativeName).delete(ctx, this.schemaVersion, cb);
 };
 
 /**
@@ -256,18 +261,19 @@
  */
 Database.prototype.beginBatch = function(ctx, opts, cb) {
   var self = this;
-  this._wire(ctx).beginBatch(ctx, opts, function(err, relativeName) {
-    if (err) {
-      return cb(err);
-    }
+  this._wire(ctx).beginBatch(ctx, this.schemaVersion, opts,
+    function(err, relativeName) {
+      if (err) {
+        return cb(err);
+      }
 
-    // The relativeName returned from the beginBatch() call above is different
-    // than the relativeName of the current database. We must create a new
-    // Database with this new relativeName, and then create a BatchDatabase
-    // from that new Database.
-    var db = new Database(self._parentFullName, relativeName);
-    return cb(null, new BatchDatabase(db));
-  });
+      // The relativeName returned from the beginBatch() call above is different
+      // than the relativeName of the current database. We must create a new
+      // Database with this new relativeName, and then create a BatchDatabase
+      // from that new Database.
+      var db = new Database(self._parentFullName, relativeName);
+      return cb(null, new BatchDatabase(db));
+    });
 };
 
 /**
diff --git a/src/nosql/row.js b/src/nosql/row.js
index ba9f633..ae679b1 100644
--- a/src/nosql/row.js
+++ b/src/nosql/row.js
@@ -12,13 +12,14 @@
  * @summary
  * Represents a single row in a Table.
  * Private constructor, use table.row() to get an instance.
+ * @param {number} schemaVersion Database schema version expected by client.
  * @inner
  * @constructor
  * @memberof module:syncbase.nosql
  */
-function Row(parentFullName, key) {
+function Row(parentFullName, key, schemaVersion) {
   if (!(this instanceof Row)) {
-    return new Row(parentFullName, key);
+    return new Row(parentFullName, key, schemaVersion);
   }
 
   // TODO(aghassemi) We may need to escape the key. Align with Go implementation
@@ -27,6 +28,7 @@
   // Should they all behave the same or is row key really different?
   var fullName = vanadium.naming.join(parentFullName, key);
 
+  this.schemaVersion = schemaVersion;
   /**
    * The key of this Row.
    * @property name
@@ -82,7 +84,7 @@
  * @param {function} cb Callback.
  */
 Row.prototype.exists = function(ctx, cb) {
-  this._wire(ctx).exists(ctx, cb);
+  this._wire(ctx).exists(ctx, this.schemaVersion, cb);
 };
 
 /**
@@ -91,7 +93,7 @@
  * @param {function} cb Callback.
  */
 Row.prototype.get = function(ctx, cb) {
-  this._wire(ctx).get(ctx, function(err, value) {
+  this._wire(ctx).get(ctx, this.schemaVersion, function(err, value) {
     if (err) {
       return cb(err);
     }
@@ -121,7 +123,7 @@
   } catch (e) {
     return cb(e);
   }
-  this._wire(ctx).put(ctx, encodedVal, cb);
+  this._wire(ctx).put(ctx, this.schemaVersion, encodedVal, cb);
 };
 
 /**
@@ -130,5 +132,5 @@
  * @param {function} cb Callback.
  */
 Row.prototype.delete = function(ctx, cb) {
-  this._wire(ctx).delete(ctx, cb);
+  this._wire(ctx).delete(ctx, this.schemaVersion, cb);
 };
diff --git a/src/nosql/table.js b/src/nosql/table.js
index 824f816..96a6e98 100644
--- a/src/nosql/table.js
+++ b/src/nosql/table.js
@@ -21,17 +21,20 @@
  * Table.
  * @param {string} relativeName Relative name of this Table.  Must not
  * contain slashes.
+ * @param {number} schemaVersion Database schema version expected by client.
  * @constructor
  * @inner
  * @memberof {module:syncbase.nosql}
  */
-function Table(parentFullName, relativeName) {
+function Table(parentFullName, relativeName, schemaVersion) {
   if (!(this instanceof Table)) {
-    return new Table(parentFullName, relativeName);
+    return new Table(parentFullName, relativeName, schemaVersion);
   }
 
   util.addNameProperties(this, parentFullName, relativeName);
 
+  this.schemaVersion = schemaVersion;
+
   /**
    * Caches the table wire object.
    * @private
@@ -66,7 +69,7 @@
  * @param {function} cb Callback.
  */
 Table.prototype.exists = function(ctx, cb) {
-  this._wire(ctx).exists(ctx, cb);
+  this._wire(ctx).exists(ctx, this.schemaVersion, cb);
 };
 
 /**
@@ -75,7 +78,7 @@
  * @return {module:syncbase.row.Row} Row object.
  */
 Table.prototype.row = function(key) {
-  return new Row(this.fullName, key);
+  return new Row(this.fullName, key, this.schemaVersion);
 };
 
 /**
@@ -116,7 +119,8 @@
  * @param {function} cb Callback.
  */
 Table.prototype.delete = function(ctx, range, cb) {
-  this._wire(ctx).deleteRowRange(ctx, range.start, range.limit, cb);
+  this._wire(ctx).deleteRowRange(
+        ctx, this.schemaVersion, range.start, range.limit, cb);
 };
 
 /**
@@ -143,7 +147,8 @@
     });
   });
 
-  var stream = this._wire(ctx).scan(ctx, range.start, range.limit, cb).stream;
+  var stream = this._wire(ctx)
+        .scan(ctx, this.schemaVersion, range.start, range.limit, cb).stream;
   var decodedStream = stream.pipe(vomStreamDecoder);
   stream.on('error', function(err) {
     decodedStream.emit('error', err);
@@ -171,7 +176,8 @@
  * @param {function} cb Callback.
  */
 Table.prototype.setPermissions = function(ctx, prefix, perms, cb) {
-  this._wire(ctx).setPermissions(ctx, stringifyPrefix(prefix), perms, cb);
+  this._wire(ctx).setPermissions(
+        ctx, this.schemaVersion, stringifyPrefix(prefix), perms, cb);
 };
 
 
@@ -190,20 +196,22 @@
   // There are two PrefixPermission types, one is the wire type which has
   // Prefix as a string and then there is the client type where prefix is a
   // PrefixRange, therefore we convert between the wire and client types.
-  this._wire(ctx).getPermissions(ctx, key, function(err, wirePerms) {
-    if (err) {
-      return cb(err);
-    }
+  this._wire(ctx).getPermissions(ctx, this.schemaVersion, key,
+      function(err, wirePerms) {
+        if (err) {
+          return cb(err);
+        }
 
-    var perms = wirePerms.map(function(v) {
-      return new PrefixPermissions(
-        prefix(v.prefix),
-        v.perms
-      );
-    });
+        var perms = wirePerms.map(function(v) {
+          return new PrefixPermissions(
+            prefix(v.prefix),
+            v.perms
+          );
+        });
 
-    cb(null, perms);
-  });
+        cb(null, perms);
+      }
+  );
 };
 
 /**
@@ -216,7 +224,8 @@
  */
 Table.prototype.deletePermissions = function(ctx, prefix, cb) {
   //TODO(aghassemi): Why is prefix a PrefixRange in Go?
-  this._wire(ctx).deletePermissions(ctx, stringifyPrefix(prefix), cb);
+  this._wire(ctx).deletePermissions(
+        ctx, this.schemaVersion, stringifyPrefix(prefix), cb);
 };
 
 function stringifyPrefix(prefix) {
diff --git a/test/integration/test-batch.js b/test/integration/test-batch.js
index eb2f2cb..2b200d5 100644
--- a/test/integration/test-batch.js
+++ b/test/integration/test-batch.js
@@ -142,8 +142,10 @@
         return batch.table(table.name);
       });
 
+      // Put to the same key in each batch.
+      var key = uniqueName('key');
       async.mapSeries(batchTables, function(batchTable, cb) {
-        var key = uniqueName('key');
+        // Put different value in each batch.
         var value = uniqueName('value');
         batchTable.put(ctx, key, value, function(err) {
           if (err) {
@@ -339,19 +341,33 @@
     var db = o.database;
     var table = o.table;
 
-    db.beginBatch(ctx, {}, putTable);
+    db.beginBatch(ctx, {}, readBatchTable);
 
     var key = uniqueName('key');
     var value = uniqueName('value');
 
     var batch;
+    var batchTable;
 
-    function putTable(err, _batch) {
+    function readBatchTable(err, _batch) {
       if (err) {
-        return end(err);
+        return t.end(err);
       }
 
       batch = _batch;
+      batchTable = batch.table(table.name);
+
+      batchTable.get(ctx, key, function(err) {
+        // Should error because the key does not exist yet.
+        t.ok(err, 'get should error when key does not exist');
+        putTable();
+      });
+    }
+
+    function putTable(err) {
+      if (err) {
+        return end(err);
+      }
 
       // Put on the table directly, not the batch table.  This will conflict
       // with future batchTable.put() call.
@@ -365,7 +381,6 @@
 
       var newValue = uniqueName('value');
 
-      var batchTable = batch.table(table.name);
       batchTable.put(ctx, key, newValue, commit);
     }
 
diff --git a/test/integration/test-run-in-batch.js b/test/integration/test-run-in-batch.js
index 84011f3..cc29d5a 100644
--- a/test/integration/test-run-in-batch.js
+++ b/test/integration/test-run-in-batch.js
@@ -77,7 +77,7 @@
   });
 });
 
-test('runInBatch aborts if commit fails', function(t) {
+test('runInBatch does not abort if commit fails', function(t) {
   var ctx = {};
   var db = new MockDb(true);
 
@@ -90,7 +90,7 @@
 
     t.ok(db.batchDb, 'batch db is created');
     t.ok(db.batchDb.commitCalled, 'batchDb.commit() was called');
-    t.ok(db.batchDb.abortCalled, 'batchDb.abort() was called');
+    t.notok(db.batchDb.abortCalled, 'batchDb.abort() was not called');
 
     t.end();
   });
diff --git a/test/start-syncbased.sh b/test/start-syncbased.sh
index 6cf9f24..bedc838 100755
--- a/test/start-syncbased.sh
+++ b/test/start-syncbased.sh
@@ -9,7 +9,17 @@
 # does not allow flags or arguments to the executables it starts.  We should
 # fix service-runner to allow flags/arguments, and then have it start syncbased
 # directly with the appropriate flags.  Then we can delete this file.
-# TODO(rdaoud): how to cleanup the tmp test dir; "rm" here doesn't do it.
 
-testdir="$(mktemp -d "${TMPDIR:-/tmp}"/sbtest.XXXXXXXX)"
-syncbased -v=1 --name test/syncbased --engine memstore --root-dir "${testdir}" --v23.tcp.address 127.0.0.1:0
+TESTDIR="$(mktemp -d "${TMPDIR:-/tmp}"/sbtest.XXXXXXXX)"
+# Delete TESTDIR and stop syncbased on exit.
+function cleanup {
+	rm -rf "${TESTDIR}"
+	kill -TERM "${CHILD}" 2>/dev/null
+	exit 0
+}
+trap cleanup SIGINT SIGTERM EXIT
+
+syncbased -v=3 --name test/syncbased --engine "${STORAGE_ENGINE:-leveldb}" --root-dir "${TESTDIR}" --v23.tcp.address 127.0.0.1:0 &
+
+CHILD=$!
+wait "${CHILD}"